+ struct SourceInformation *si = client;
+ struct GNUNET_ATS_Information ats[2];
+ struct GNUNET_TIME_Relative delay;
+
+ GNUNET_assert (si->session != NULL);
+ if (GNUNET_YES == si->session->in_destroy)
+ return GNUNET_OK;
+ /* setup ATS */
+ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
+ ats[0].value = htonl (1);
+ ats[1] = si->session->ats;
+ GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED);
+ delay = plugin->env->receive (plugin->env->cls,
+ &si->sender,
+ hdr,
+ (const struct GNUNET_ATS_Information *) &ats, 2,
+ si->session,
+ si->arg,
+ si->args);
+ si->session->flow_delay_for_other_peer = delay;
+ reschedule_session_timeout(si->session);
+ return GNUNET_OK;
+}
+
+
+/**
+ * We've received a UDP Message. Process it (pass contents to main service).
+ *
+ * @param plugin plugin context
+ * @param msg the message
+ * @param sender_addr sender address
+ * @param sender_addr_len number of bytes in sender_addr
+ */
+static void
+process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg,
+ const struct sockaddr *sender_addr,
+ socklen_t sender_addr_len)
+{
+ struct SourceInformation si;
+ struct Session * s;
+ struct IPv4UdpAddress u4;
+ struct IPv6UdpAddress u6;
+ const void *arg;
+ size_t args;
+
+ if (0 != ntohl (msg->reserved))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohs (msg->header.size) <
+ sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ /* convert address */
+ switch (sender_addr->sa_family)
+ {
+ case AF_INET:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in));
+ u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr;
+ u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port;
+ arg = &u4;
+ args = sizeof (u4);
+ break;
+ case AF_INET6:
+ GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6));
+ u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr;
+ u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port;
+ arg = &u6;
+ args = sizeof (u6);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message with %u bytes from peer `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender),
+ GNUNET_a2s (sender_addr, sender_addr_len));
+
+ struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args);
+ s = udp_plugin_get_session(plugin, address);
+ GNUNET_free (address);
+
+ /* iterate over all embedded messages */
+ si.session = s;
+ si.sender = msg->sender;
+ si.arg = arg;
+ si.args = args;
+ s->rc++;
+ GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1],
+ ntohs (msg->header.size) -
+ sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO);
+ s->rc--;
+ if ( (0 == s->rc) && (GNUNET_YES == s->in_destroy))
+ free_session (s);
+}
+
+
+/**
+ * Scan the heap for a receive context with the given address.
+ *
+ * @param cls the 'struct FindReceiveContext'
+ * @param node internal node of the heap
+ * @param element value stored at the node (a 'struct ReceiveContext')
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ * GNUNET_NO if not.
+ */
+static int
+find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
+ void *element, GNUNET_CONTAINER_HeapCostType cost)
+{
+ struct FindReceiveContext *frc = cls;
+ struct DefragContext *e = element;
+
+ if ((frc->addr_len == e->addr_len) &&
+ (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
+ {
+ frc->rc = e;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Process a defragmented message.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param msg the message
+ */
+static void
+fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct DefragContext *rc = cls;
+
+ if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
+ rc->src_addr, rc->addr_len);
+}
+
+
+struct LookupContext
+{
+ const struct sockaddr * addr;
+
+ struct Session *res;
+
+ size_t addrlen;
+};
+
+
+static int
+lookup_session_by_addr_it (void *cls, const struct GNUNET_HashCode * key, void *value)
+{
+ struct LookupContext *l_ctx = cls;
+ struct Session * s = value;
+
+ if ((s->addrlen == l_ctx->addrlen) &&
+ (0 == memcmp (s->sock_addr, l_ctx->addr, s->addrlen)))
+ {
+ l_ctx->res = s;
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Transmit an acknowledgement.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param id message ID (unused)
+ * @param msg ack to transmit
+ */
+static void
+ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
+{
+ struct DefragContext *rc = cls;
+ size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
+ struct UDP_ACK_Message *udp_ack;
+ uint32_t delay = 0;
+ struct UDPMessageWrapper *udpw;
+ struct Session *s;
+
+ struct LookupContext l_ctx;
+ l_ctx.addr = rc->src_addr;
+ l_ctx.addrlen = rc->addr_len;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (rc->plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+
+ if (NULL == s)
+ return;
+
+ if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+ delay = s->flow_delay_for_other_peer.rel_value;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending ACK to `%s' including delay of %u ms\n",
+ GNUNET_a2s (rc->src_addr,
+ (rc->src_addr->sa_family ==
+ AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
+ sockaddr_in6)),
+ delay);
+ udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize);
+ udpw->msg_size = msize;
+ udpw->session = s;
+ udpw->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
+ udpw->udp = (char *)&udpw[1];
+ udp_ack = (struct UDP_ACK_Message *) udpw->udp;
+ udp_ack->header.size = htons ((uint16_t) msize);
+ udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+ udp_ack->delay = htonl (delay);
+ udp_ack->sender = *rc->plugin->env->my_identity;
+ memcpy (&udp_ack[1], msg, ntohs (msg->size));
+
+ enqueue (rc->plugin, udpw);
+}
+
+
+static void
+read_process_msg (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ const char *addr,
+ socklen_t fromlen)
+{
+ if (ntohs (msg->size) < sizeof (struct UDPMessage))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ process_udp_message (plugin, (const struct UDPMessage *) msg,
+ (const struct sockaddr *) addr, fromlen);
+}
+
+
+static void
+read_process_ack (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ const struct GNUNET_MessageHeader *ack;
+ const struct UDP_ACK_Message *udp_ack;
+ struct LookupContext l_ctx;
+ struct Session *s;
+ struct GNUNET_TIME_Relative flow_delay;
+
+ if (ntohs (msg->size) <
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ l_ctx.addr = (const struct sockaddr *) addr;
+ l_ctx.addrlen = fromlen;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+
+ if ((s == NULL) || (s->frag_ctx == NULL))
+ return;