From 58a402ee6c05a0a6f97a3026ab0c41f319bc1421 Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Tue, 23 Oct 2012 14:38:21 +0000 Subject: [PATCH] - removal for fragmented messages --- src/transport/plugin_transport_udp.c | 139 +++++++++++++++------------ 1 file changed, 79 insertions(+), 60 deletions(-) diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index 630af4bfa..f67bbd915 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -1003,6 +1003,66 @@ dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw) plugin->ipv6_queue_tail, udpw); } +static void +fragmented_message_done (struct UDP_FragmentationContext *fc, int result) +{ + struct UDP_MessageWrapper *udpw; + struct UDP_MessageWrapper *tmp; + struct UDP_MessageWrapper dummy; + struct Session *s = fc->session; + LOG (GNUNET_ERROR_TYPE_DEBUG, "%p : Fragmented message removed with result %s\n", fc, (result == GNUNET_SYSERR) ? "FAIL" : "SUCCESS"); + + /* Call continuation for fragmented message */ + dummy.msg_type = MSG_FRAGMENTED_COMPLETE; + dummy.msg_buf = NULL; + dummy.msg_size = s->frag_ctx->on_wire_size; + dummy.payload_size = s->frag_ctx->payload_size; + dummy.frag_ctx = s->frag_ctx; + dummy.session = s; + + call_continuation (&dummy, result); + + /* Remove left-over fragments from queue */ + /* Remove leftover fragments from queue */ + if (s->addrlen == sizeof (struct sockaddr_in6)) + { + udpw = plugin->ipv6_queue_head; + while (NULL != udpw) + { + tmp = udpw->next; + if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) + { + dequeue (plugin, udpw); + call_continuation (udpw, GNUNET_SYSERR); + GNUNET_free (udpw); + } + udpw = tmp; + } + } + if (s->addrlen == sizeof (struct sockaddr_in)) + { + udpw = plugin->ipv4_queue_head; + while (udpw!= NULL) + { + tmp = udpw->next; + if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx)) + { + dequeue (plugin, udpw); + call_continuation (udpw, GNUNET_SYSERR); + GNUNET_free (udpw); + } + udpw = tmp; + } + } + + /* Destroy fragmentation context */ + GNUNET_FRAGMENT_context_destroy (fc->frag, + &s->last_expected_msg_delay, + &s->last_expected_ack_delay); + s->frag_ctx = NULL; + GNUNET_free (fc); +} + /** * Functions with this signature are called whenever we need * to close a session due to a disconnect or failure to @@ -1023,6 +1083,13 @@ disconnect_session (struct Session *s) GNUNET_i2s (&s->target), GNUNET_a2s (s->sock_addr, s->addrlen)); stop_session_timeout (s); + + if (NULL != s->frag_ctx) + { + /* Remove fragmented message due to disconnect */ + fragmented_message_done (s->frag_ctx, GNUNET_SYSERR); + } + next = plugin->ipv4_queue_head; while (NULL != (udpw = next)) { @@ -1970,49 +2037,9 @@ read_process_ack (struct Plugin *plugin, "Message full ACK'ed\n", (unsigned int) ntohs (msg->size), GNUNET_i2s (&udp_ack->sender), GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); - GNUNET_FRAGMENT_context_destroy (s->frag_ctx->frag, - &s->last_expected_msg_delay, - &s->last_expected_ack_delay); - if (s->addrlen == sizeof (struct sockaddr_in6)) - { - udpw = plugin->ipv6_queue_head; - while (NULL != udpw) - { - tmp = udpw->next; - if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) - { - dequeue (plugin, udpw); - GNUNET_free (udpw); - } - udpw = tmp; - } - } - if (s->addrlen == sizeof (struct sockaddr_in)) - { - udpw = plugin->ipv4_queue_head; - while (udpw!= NULL) - { - tmp = udpw->next; - if ((NULL != udpw->frag_ctx) && (udpw->frag_ctx == s->frag_ctx)) - { - dequeue (plugin, udpw); - GNUNET_free (udpw); - } - udpw = tmp; - } - } - dummy.msg_type = MSG_FRAGMENTED_COMPLETE; - dummy.msg_buf = NULL; - dummy.msg_size = s->frag_ctx->on_wire_size; - dummy.payload_size = s->frag_ctx->payload_size; - dummy.frag_ctx = s->frag_ctx; - dummy.session = s; - - call_continuation (&dummy, GNUNET_OK); - - GNUNET_free (s->frag_ctx); - s->frag_ctx = NULL; + /* Remove fragmented message after successful sending */ + fragmented_message_done (s->frag_ctx, GNUNET_OK); } @@ -2182,6 +2209,9 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, LOG (GNUNET_ERROR_TYPE_DEBUG, "Message for peer `%s' with size %u timed out\n", GNUNET_i2s(&udpw->session->target), udpw->payload_size); + /* Remove message */ + dequeue (plugin, udpw); + GNUNET_free (udpw); break; case MSG_FRAGMENTED: /* Fragmented message */ @@ -2189,38 +2219,27 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment for message for peer `%s' with size %u timed out\n", GNUNET_i2s(&udpw->session->target), udpw->frag_ctx->payload_size); - GNUNET_FRAGMENT_context_destroy (udpw->frag_ctx->frag, - &udpw->session->last_expected_msg_delay, - &udpw->session->last_expected_ack_delay); - GNUNET_free (udpw->frag_ctx); - udpw->session->frag_ctx = NULL; + /* Remove fragmented message due to timeout */ + fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR); break; case MSG_ACK: LOG (GNUNET_ERROR_TYPE_DEBUG, "ACK Message for peer `%s' with size %u timed out\n", GNUNET_i2s(&udpw->session->target), udpw->payload_size); + dequeue (plugin, udpw); + GNUNET_free (udpw); break; default: break; } - - GNUNET_STATISTICS_update (plugin->env->stats, - "# messages dismissed due to timeout", - 1, GNUNET_NO); - /* Remove message */ if (sock == plugin->sockv4) - { - dequeue (plugin, udpw); - GNUNET_free (udpw); udpw = plugin->ipv4_queue_head; - } if (sock == plugin->sockv6) - { - dequeue (plugin, udpw); - GNUNET_free (udpw); udpw = plugin->ipv6_queue_head; - } + GNUNET_STATISTICS_update (plugin->env->stats, + "# messages dismissed due to timeout", + 1, GNUNET_NO); } else { -- 2.25.1