+ return;
+ }
+ process_udp_message (plugin, (const struct UDPMessage *) msg,
+ (const struct sockaddr *) addr, fromlen);
+}
+
+
+static void
+read_process_ack (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ const struct GNUNET_MessageHeader *ack;
+ const struct UDP_ACK_Message *udp_ack;
+ struct LookupContext l_ctx;
+ struct Session *s;
+ struct GNUNET_TIME_Relative flow_delay;
+
+ if (ntohs (msg->size) <
+ sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ udp_ack = (const struct UDP_ACK_Message *) msg;
+ l_ctx.addr = (const struct sockaddr *) addr;
+ l_ctx.addrlen = fromlen;
+ l_ctx.res = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (plugin->sessions,
+ &lookup_session_by_addr_it,
+ &l_ctx);
+ s = l_ctx.res;
+
+ if ((s == NULL) || (s->frag_ctx == NULL))
+ return;
+
+ flow_delay.rel_value = (uint64_t) ntohl (udp_ack->delay);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "We received a sending delay of %llu\n",
+ flow_delay.rel_value);
+ s->flow_delay_from_other_peer =
+ GNUNET_TIME_relative_to_absolute (flow_delay);
+
+ ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+ if (ntohs (ack->size) !=
+ ntohs (msg->size) - sizeof (struct UDP_ACK_Message))
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ if (GNUNET_OK != GNUNET_FRAGMENT_process_ack (s->frag_ctx->frag, ack))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "UDP processes %u-byte acknowledgement from `%s' at `%s'\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ return;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "FULL MESSAGE ACKed\n",
+ (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ s->last_expected_delay = GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag);
+
+ struct UDPMessageWrapper * udpw;
+ struct UDPMessageWrapper * tmp;
+ if (s->addrlen == sizeof (struct sockaddr_in6))
+ {
+ udpw = plugin->ipv6_queue_head;
+ while (NULL != udpw)
+ {
+ tmp = udpw->next;
+ if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ GNUNET_free (udpw);
+ }
+ udpw = tmp;
+ }
+ }
+ if (s->addrlen == sizeof (struct sockaddr_in))
+ {
+ udpw = plugin->ipv4_queue_head;
+ while (udpw!= NULL)
+ {
+ tmp = udpw->next;
+ if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
+ {
+ GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ GNUNET_free (udpw);
+ }
+ udpw = tmp;
+ }
+ }
+
+ if (s->frag_ctx->cont != NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling continuation for fragmented message to `%s' with result %s\n",
+ GNUNET_i2s (&s->target), "OK");
+ s->frag_ctx->cont (s->frag_ctx->cont_cls, &udp_ack->sender, GNUNET_OK);
+ }
+
+ GNUNET_free (s->frag_ctx);
+ s->frag_ctx = NULL;
+}
+
+
+static void
+read_process_fragment (struct Plugin *plugin,
+ const struct GNUNET_MessageHeader *msg,
+ char *addr,
+ socklen_t fromlen)
+{
+ struct DefragContext *d_ctx;
+ struct GNUNET_TIME_Absolute now;
+ struct FindReceiveContext frc;
+
+ frc.rc = NULL;
+ frc.addr = (const struct sockaddr *) addr;
+ frc.addr_len = fromlen;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ /* Lookup existing receive context for this address */
+ GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs,
+ &find_receive_context,
+ &frc);
+ now = GNUNET_TIME_absolute_get ();
+ d_ctx = frc.rc;
+
+ if (d_ctx == NULL)
+ {
+ /* Create a new defragmentation context */
+ d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen);
+ memcpy (&d_ctx[1], addr, fromlen);
+ d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1];
+ d_ctx->addr_len = fromlen;
+ d_ctx->plugin = plugin;
+ d_ctx->defrag =
+ GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
+ UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx,
+ &fragment_msg_proc, &ack_proc);
+ d_ctx->hnode =
+ GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Created new defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Found existing defragmentation context for %u-byte fragment from `%s'\n",
+ (unsigned int) ntohs (msg->size),
+ GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+ }
+
+ if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg))
+ {
+ /* keep this 'rc' from expiring */
+ GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode,
+ (GNUNET_CONTAINER_HeapCostType)
+ now.abs_value);
+ }
+ if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) >
+ UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
+ {
+ /* remove 'rc' that was inactive the longest */
+ d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs);
+ GNUNET_assert (NULL != d_ctx);
+ GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag);
+ GNUNET_free (d_ctx);