From: Matthias Wachs Date: Tue, 31 Jan 2012 17:53:30 +0000 (+0000) Subject: - changes X-Git-Tag: initial-import-from-subversion-38251~14963 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e869d0348315744895c438a09ff4055b5de29dca;p=oweals%2Fgnunet.git - changes --- diff --git a/src/transport/plugin_transport_udp_new.c b/src/transport/plugin_transport_udp_new.c index 2015a3118..efb131dd4 100644 --- a/src/transport/plugin_transport_udp_new.c +++ b/src/transport/plugin_transport_udp_new.c @@ -104,6 +104,10 @@ struct Session */ struct GNUNET_TIME_Absolute flow_delay_from_other_peer; + struct GNUNET_TIME_Relative flow_delay_for_other_peer; + + struct GNUNET_ATS_Information ats; + struct FragmentationContext * head; struct FragmentationContext * tail; }; @@ -238,7 +242,7 @@ struct UDPMessageWrapper struct Session *session; struct UDPMessageWrapper *prev; struct UDPMessageWrapper *next; - struct UDPMessage *udp; + char *udp; size_t msg_size; struct GNUNET_TIME_Absolute timeout; @@ -585,7 +589,6 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, struct sockaddr_in *v4; struct sockaddr_in6 *v6; size_t len; - struct GNUNET_ATS_Information ats; switch (addrlen) { @@ -604,7 +607,7 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, #endif v4->sin_port = t4->u4_port; v4->sin_addr.s_addr = t4->ipv4_addr; - ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in)); + s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v4, sizeof (struct sockaddr_in)); break; case sizeof (struct IPv6UdpAddress): if (NULL == plugin->sockv6) @@ -622,7 +625,7 @@ create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, #endif v6->sin6_port = t6->u6_port; v6->sin6_addr = t6->ipv6_addr; - ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6)); + s->ats = plugin->env->get_address_type (plugin->env->cls, (const struct sockaddr *) v6, sizeof (struct sockaddr_in6)); break; default: /* Must have a valid address to send to */ @@ -775,7 +778,7 @@ enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg) udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len); udpw->session = frag_ctx->session; - udpw->udp = (struct UDPMessage *) &udpw[1]; + udpw->udp = (char *) &udpw[1]; udpw->msg_size = msg_len; udpw->cont = frag_ctx->cont; @@ -862,7 +865,7 @@ udp_plugin_send (void *cls, { udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen); udpw->session = s; - udpw->udp = (struct UDPMessage *) &udpw[1]; + udpw->udp = (char *) &udpw[1]; udpw->msg_size = mlen; udpw->timeout = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), to); udpw->cont = cont; @@ -870,7 +873,7 @@ udp_plugin_send (void *cls, udpw->frag = NULL; memcpy (udpw->udp, udp, sizeof (struct UDPMessage)); - memcpy (&udpw->udp[1], msgbuf, msgbuf_size); + memcpy (&udpw->udp[sizeof (struct UDPMessage)], msgbuf, msgbuf_size); GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); } @@ -999,20 +1002,21 @@ process_inbound_tokenized_messages (void *cls, void *client, struct GNUNET_ATS_Information ats[2]; struct GNUNET_TIME_Relative delay; + GNUNET_assert (si->session != NULL); /* setup ATS */ ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); ats[0].value = htonl (1); - ats[1].type = htonl (GNUNET_ATS_NETWORK_TYPE); - ats[1].value = htonl (GNUNET_ATS_COST_WAN); - //GNUNET_break (ntohl(si->session->ats_address_network_type) != GNUNET_ATS_NET_UNSPECIFIED); + ats[1] = si->session->ats; + GNUNET_break (ntohl(ats[1].value) != GNUNET_ATS_NET_UNSPECIFIED); + delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, (const struct GNUNET_ATS_Information *) &ats, 2, - NULL, + si->session, si->arg, si->args); - //si->session->flow_delay_for_other_peer = delay; + si->session->flow_delay_for_other_peer = delay; } @@ -1030,6 +1034,7 @@ process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg, socklen_t sender_addr_len) { struct SourceInformation si; + struct Session * s = NULL; struct IPv4UdpAddress u4; struct IPv6UdpAddress u6; struct GNUNET_ATS_Information ats; @@ -1078,7 +1083,12 @@ process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg, GNUNET_a2s (sender_addr, sender_addr_len)); #endif + struct GNUNET_HELLO_Address * address = GNUNET_HELLO_address_allocate(&msg->sender, "udp", arg, args); + s = udp_plugin_get_session(plugin, address); + GNUNET_free (address); + /* iterate over all embedded messages */ + si.session = s; si.sender = msg->sender; si.arg = arg; si.args = args; @@ -1141,6 +1151,29 @@ fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) rc->src_addr, rc->addr_len); } +struct LookupContext +{ + struct DefragContext *rc; + struct Session *res; +}; + +static int +lookup_session_by_addr_it (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct LookupContext *l_ctx = cls; + struct Session * s = value; + + if ((s->addrlen == l_ctx->rc->addr_len) && + (0 == memcmp (s->sock_addr, l_ctx->rc->src_addr, s->addrlen))) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY \n"); + l_ctx->res = s; + return GNUNET_NO; + } + return GNUNET_YES; +} + /** * Transmit an acknowledgement. * @@ -1151,17 +1184,22 @@ fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg) static void ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) { -#if 0 struct DefragContext *rc = cls; size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size); - char buf[msize]; struct UDP_ACK_Message *udp_ack; uint32_t delay = 0; - + struct UDPMessageWrapper *udpw; struct Session *s; - s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len); + struct LookupContext l_ctx; + l_ctx.rc = rc; + l_ctx.res = NULL; + GNUNET_CONTAINER_multihashmap_get_multiple(rc->plugin->sessions, + &rc->id.hashPubKey, + &lookup_session_by_addr_it, + &l_ctx); + s = l_ctx.res; if (s != NULL) { if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX) @@ -1180,14 +1218,23 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) sockaddr_in6)), delay); #endif - udp_ack = (struct UDP_ACK_Message *) buf; + udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msize); + udpw->cont = NULL; + udpw->cont_cls = NULL; + udpw->frag = NULL; + udpw->msg_size = msize; + udpw->session = s; + udpw->timeout = GNUNET_TIME_absolute_get_forever(); + udpw->udp = (char *)&udpw[1]; + + udp_ack = (struct UDP_ACK_Message *) udpw->udp; udp_ack->header.size = htons ((uint16_t) msize); udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK); udp_ack->delay = htonl (delay); udp_ack->sender = *rc->plugin->env->my_identity; memcpy (&udp_ack[1], msg, ntohs (msg->size)); - (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header); -#endif + + GNUNET_CONTAINER_DLL_insert(rc->plugin->msg_head, rc->plugin->msg_tail, udpw); } @@ -1790,6 +1837,22 @@ libgnunet_plugin_transport_udp_init (void *cls) return api; } +int heap_cleanup_iterator (void *cls, + struct GNUNET_CONTAINER_HeapNode * + node, void *element, + GNUNET_CONTAINER_HeapCostType + cost) +{ + struct DefragContext * d_ctx = element; + + GNUNET_CONTAINER_heap_remove_node (node); + GNUNET_DEFRAGMENT_context_destroy(d_ctx->defrag); + GNUNET_free (d_ctx); + + return GNUNET_YES; +} + + /** * The exported method. Makes the core api available via a global and * returns the udp transport API. @@ -1828,6 +1891,8 @@ libgnunet_plugin_transport_udp_done (void *cls) if (plugin->defrag_ctxs != NULL) { + GNUNET_CONTAINER_heap_iterate(plugin->defrag_ctxs, + heap_cleanup_iterator, NULL); GNUNET_CONTAINER_heap_destroy(plugin->defrag_ctxs); plugin->defrag_ctxs = NULL; }