From 0c6918c8a13e9a0bf3dcf58ce86e9ceee2110b8c Mon Sep 17 00:00:00 2001 From: Matthias Wachs Date: Tue, 31 Jan 2012 16:02:38 +0000 Subject: [PATCH] - clean up the receive switch case --- src/transport/plugin_transport_udp_new.c | 192 +++++++++++++---------- 1 file changed, 108 insertions(+), 84 deletions(-) diff --git a/src/transport/plugin_transport_udp_new.c b/src/transport/plugin_transport_udp_new.c index c7fc680fd..2015a3118 100644 --- a/src/transport/plugin_transport_udp_new.c +++ b/src/transport/plugin_transport_udp_new.c @@ -148,7 +148,7 @@ struct FindReceiveContext /** * Where to store the result. */ - struct ReceiveContext *rc; + struct DefragContext *rc; /** * Address to find. @@ -169,7 +169,7 @@ struct FindReceiveContext * Data structure to track defragmentation contexts based * on the source of the UDP traffic. */ -struct ReceiveContext +struct DefragContext { /** @@ -1104,7 +1104,7 @@ find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node, void *element, GNUNET_CONTAINER_HeapCostType cost) { struct FindReceiveContext *frc = cls; - struct ReceiveContext *e = element; + struct DefragContext *e = element; if ((frc->addr_len == e->addr_len) && (0 == memcmp (frc->addr, e->src_addr, frc->addr_len))) @@ -1125,7 +1125,7 @@ find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node, static void fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) { - struct ReceiveContext *rc = cls; + struct DefragContext *rc = cls; if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE) { @@ -1152,7 +1152,7 @@ static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { #if 0 - struct ReceiveContext *rc = cls; + struct DefragContext *rc = cls; size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); char buf[msize]; @@ -1191,6 +1191,95 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) } +static void read_process_msg (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) +{ + if (ntohs (msg->size) < sizeof (struct UDPMessage)) + { + GNUNET_break_op (0); + return; + } + process_udp_message (plugin, (const struct UDPMessage *) msg, + (const struct sockaddr *) addr, fromlen); + return; +} + +static void read_process_ack () +{ + //const struct GNUNET_MessageHeader *ack; + //struct Session *peer_session; + //const struct UDP_ACK_Message *udp_ack; + //struct Session *s = NULL; + //struct GNUNET_TIME_Relative flow_delay; + //struct GNUNET_ATS_Information ats; + GNUNET_break_op (0); +} + +static void read_process_fragment (struct Plugin *plugin, + const struct GNUNET_MessageHeader *msg, + char *addr, + socklen_t fromlen) +{ + struct DefragContext *d_ctx; + struct GNUNET_TIME_Absolute now; + struct FindReceiveContext frc; + + + frc.rc = NULL; + frc.addr = (const struct sockaddr *) addr; + frc.addr_len = fromlen; + +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n", + (unsigned int) ntohs (msg->size), + GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); +#endif + + /* Lookup existing receive context for this address */ + GNUNET_CONTAINER_heap_iterate (plugin->defrag_ctxs, + &find_receive_context, + &frc); + now = GNUNET_TIME_absolute_get (); + d_ctx = frc.rc; + + if (d_ctx == NULL) + { + /* Create a new defragmentation context */ + d_ctx = GNUNET_malloc (sizeof (struct DefragContext) + fromlen); + memcpy (&d_ctx[1], addr, fromlen); + d_ctx->src_addr = (const struct sockaddr *) &d_ctx[1]; + d_ctx->addr_len = fromlen; + d_ctx->plugin = plugin; + d_ctx->defrag = + GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU, + UDP_MAX_MESSAGES_IN_DEFRAG, d_ctx, + &fragment_msg_proc, &ack_proc); + d_ctx->hnode = + GNUNET_CONTAINER_heap_insert (plugin->defrag_ctxs, d_ctx, + (GNUNET_CONTAINER_HeapCostType) + now.abs_value); + } + + if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (d_ctx->defrag, msg)) + { + /* keep this 'rc' from expiring */ + GNUNET_CONTAINER_heap_update_cost (plugin->defrag_ctxs, d_ctx->hnode, + (GNUNET_CONTAINER_HeapCostType) + now.abs_value); + } + if (GNUNET_CONTAINER_heap_get_size (plugin->defrag_ctxs) > + UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG) + { + /* remove 'rc' that was inactive the longest */ + d_ctx = GNUNET_CONTAINER_heap_remove_root (plugin->defrag_ctxs); + GNUNET_assert (NULL != d_ctx); + GNUNET_DEFRAGMENT_context_destroy (d_ctx->defrag); + GNUNET_free (d_ctx); + } +} + /** * Read and process a message from the given socket. * @@ -1205,20 +1294,12 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) char buf[65536]; ssize_t size; const struct GNUNET_MessageHeader *msg; - //const struct GNUNET_MessageHeader *ack; - //struct Session *peer_session; - //const struct UDP_ACK_Message *udp_ack; - struct ReceiveContext *rc; - struct GNUNET_TIME_Absolute now; - struct FindReceiveContext frc; - //struct Session *s = NULL; - //struct GNUNET_TIME_Relative flow_delay; - //struct GNUNET_ATS_Information ats; fromlen = sizeof (addr); memset (&addr, 0, sizeof (addr)); size = GNUNET_NETWORK_socket_recvfrom (rsock, buf, sizeof (buf), (struct sockaddr *) &addr, &fromlen); + if (size < sizeof (struct GNUNET_MessageHeader)) { GNUNET_break_op (0); @@ -1235,82 +1316,25 @@ udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) GNUNET_break_op (0); return; } + switch (ntohs (msg->type)) { case GNUNET_MESSAGE_TYPE_TRANSPORT_BROADCAST_BEACON: - { - udp_broadcast_receive(plugin, &buf, size, addr, fromlen); + udp_broadcast_receive (plugin, &buf, size, addr, fromlen); return; - } + case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE: - if (ntohs (msg->size) < sizeof (struct UDPMessage)) - { - GNUNET_break_op (0); - return; - } - process_udp_message (plugin, (const struct UDPMessage *) msg, - (const struct sockaddr *) addr, fromlen); + read_process_msg (plugin, msg, addr, fromlen); return; + case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK: - if (ntohs (msg->size) < - sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return; - } - /* TODO */ - GNUNET_break_op (0); + read_process_ack (); return; - case GNUNET_MESSAGE_TYPE_FRAGMENT: - frc.rc = NULL; - frc.addr = (const struct sockaddr *) addr; - frc.addr_len = fromlen; - GNUNET_CONTAINER_heap_iterate (plugin->defrags, - &find_receive_context, - &frc); - now = GNUNET_TIME_absolute_get (); - rc = frc.rc; - if (rc == NULL) - { - /* need to create a new RC */ - rc = GNUNET_malloc (sizeof (struct ReceiveContext) + fromlen); - memcpy (&rc[1], addr, fromlen); - rc->src_addr = (const struct sockaddr *) &rc[1]; - rc->addr_len = fromlen; - rc->plugin = plugin; - rc->defrag = - GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU, - UDP_MAX_MESSAGES_IN_DEFRAG, rc, - &fragment_msg_proc, &ack_proc); - rc->hnode = - GNUNET_CONTAINER_heap_insert (plugin->defrags, rc, - (GNUNET_CONTAINER_HeapCostType) - now.abs_value); - } -#if DEBUG_UDP - LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n", - (unsigned int) ntohs (msg->size), - GNUNET_a2s ((const struct sockaddr *) addr, fromlen)); -#endif - - if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (rc->defrag, msg)) - { - /* keep this 'rc' from expiring */ - GNUNET_CONTAINER_heap_update_cost (plugin->defrags, rc->hnode, - (GNUNET_CONTAINER_HeapCostType) - now.abs_value); - } - if (GNUNET_CONTAINER_heap_get_size (plugin->defrags) > - UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG) - { - /* remove 'rc' that was inactive the longest */ - rc = GNUNET_CONTAINER_heap_remove_root (plugin->defrags); - GNUNET_assert (NULL != rc); - GNUNET_DEFRAGMENT_context_destroy (rc->defrag); - GNUNET_free (rc); - } + case GNUNET_MESSAGE_TYPE_FRAGMENT: + read_process_fragment (plugin, msg, addr, fromlen); return; + default: GNUNET_break_op (0); return; @@ -1727,7 +1751,7 @@ libgnunet_plugin_transport_udp_init (void *cls) plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); - plugin->defrags = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + plugin->defrag_ctxs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); plugin->port = port; plugin->aport = aport; @@ -1802,10 +1826,10 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_NETWORK_fdset_destroy (plugin->ws); GNUNET_NAT_unregister (plugin->nat); - if (plugin->defrags != NULL) + if (plugin->defrag_ctxs != NULL) { - GNUNET_CONTAINER_heap_destroy(plugin->defrags); - plugin->defrags = NULL; + GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs); + plugin->defrag_ctxs = NULL; } if (plugin->mst != NULL) { -- 2.25.1