From: Matthias Wachs Date: Mon, 30 Jan 2012 15:33:17 +0000 (+0000) Subject: - sending and getting sessions X-Git-Tag: initial-import-from-subversion-38251~14990 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=8fd5ba93eabe126d751662bdedc507f3270cd5c0;p=oweals%2Fgnunet.git - sending and getting sessions --- diff --git a/src/transport/plugin_transport_udp_new.c b/src/transport/plugin_transport_udp_new.c index 1a706a680..02a2fac0e 100644 --- a/src/transport/plugin_transport_udp_new.c +++ b/src/transport/plugin_transport_udp_new.c @@ -77,6 +77,11 @@ struct Session const struct sockaddr *sock_addr; size_t addrlen; + + /** + * Desired delay for next sending we received from other peer + */ + struct GNUNET_TIME_Absolute flow_delay_from_other_peer; }; @@ -87,6 +92,29 @@ struct SessionCompareContext }; +/** + * Closure for 'process_inbound_tokenized_messages' + */ +struct SourceInformation +{ + /** + * Sender identity. + */ + struct GNUNET_PeerIdentity sender; + + /** + * Source address. + */ + const void *arg; + + /** + * Number of bytes in source address. + */ + size_t args; + + struct Session *session; +}; + /** * Function called for a quick conversion of the binary address to @@ -442,43 +470,44 @@ static int session_cmp_it (void *cls, struct SessionCompareContext * cctx = cls; const struct GNUNET_HELLO_Address *address = cctx->addr; struct Session *s = value; - struct Session *r = cctx->res; - struct IPv4UdpAddress * u4 = NULL; - struct IPv6UdpAddress * u6 = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "AAAAAAAAAAAAAAAAAAa\n"); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Looking for existing session for address %s\n", udp_address_to_string (NULL, (void *) address->address, address->address_length)); - if (s->addrlen == address->address_length) + socklen_t s_addrlen = s->addrlen; + +#if VERBOSE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing address %s <-> %s\n", + udp_address_to_string (NULL, (void *) address->address, address->address_length), + GNUNET_a2s (s->sock_addr, s->addrlen)); +#endif + + if ((address->address_length == sizeof (struct IPv4UdpAddress)) && + (s_addrlen == sizeof (struct sockaddr_in))) { - if (address->address_length == sizeof (struct IPv4UdpAddress)) + struct IPv4UdpAddress * u4 = NULL; + u4 = (struct IPv4UdpAddress *) address->address; + const struct sockaddr_in *s4 = (const struct sockaddr_in *) s->sock_addr; + if ((0 == memcmp ((const void *) &u4->ipv4_addr,(const void *) &s4->sin_addr, sizeof (struct in_addr))) && + (u4->u4_port == s4->sin_port)) { - u4 = (struct IPv4UdpAddress * ) address->address; - struct sockaddr_in *sai = (struct sockaddr_in *) s->sock_addr; - if ((u4->ipv4_addr == sai->sin_addr.s_addr) && - (u4->u4_port == sai->sin_port)) - { - r = s; - return GNUNET_NO; - } + cctx->res = s; + return GNUNET_NO; } - else if (address->address_length == sizeof (struct IPv6UdpAddress)) - { - u6 = (struct IPv6UdpAddress * ) address->address; - struct sockaddr_in6 *sai = (struct sockaddr_in6 *) s->sock_addr; - if ((0 == memcmp (&u6->ipv6_addr, &sai->sin6_addr, sizeof (struct in6_addr))) && - (u6->u6_port == sai->sin6_port)) - { - r = s; - return GNUNET_NO; - } - } - else + } + if ((address->address_length == sizeof (struct IPv6UdpAddress)) && + (s_addrlen == sizeof (struct sockaddr_in6))) + { + struct IPv6UdpAddress * u6 = NULL; + u6 = (struct IPv6UdpAddress *) address->address; + const struct sockaddr_in6 *s6 = (const struct sockaddr_in6 *) s->sock_addr; + if ((0 == memcmp (&u6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr))) && + (u6->u6_port == s6->sin6_port)) { - GNUNET_break (0); - return GNUNET_YES; + cctx->res = s; + return GNUNET_NO; } } + + return GNUNET_YES; } @@ -510,33 +539,39 @@ udp_plugin_get_session (void *cls, } /* check if session already exists */ - if (NULL != NULL) - { struct SessionCompareContext cctx; cctx.addr = address; cctx.res = NULL; #if DEBUG_UDP - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session for peer `%s' `%s' \n", GNUNET_i2s (&address->peer), udp_address_to_string(NULL, address->address, address->address_length)); #endif GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessions, &address->peer.hashPubKey, session_cmp_it, &cctx); if (cctx.res != NULL) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Found existing session\n"); +#if DEBUG_UDP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session %p\n", cctx.res); +#endif return cctx.res; } - } + /* otherwise create new */ s = create_session (plugin, &address->peer, address->address, address->address_length, NULL, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Creating new session %p\n", s); +#if DEBUG_UDP + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating new session %p for peer `%s' address `%s'\n", + s, + GNUNET_i2s(&address->peer), + udp_address_to_string(NULL,address->address,address->address_length)); +#endif GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (plugin->sessions, &s->target.hashPubKey, s, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); return s; } @@ -571,15 +606,58 @@ udp_plugin_get_session (void *cls, */ static ssize_t udp_plugin_send (void *cls, - struct Session *session, + struct Session *s, const char *msgbuf, size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative to, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { + struct Plugin *plugin = cls; + size_t mlen = msgbuf_size + sizeof (struct UDPMessage);; + + struct GNUNET_TIME_Relative delta; + struct UDPMessageWrapper * udpw; + struct UDPMessage *udp; + + GNUNET_assert (plugin != NULL); + GNUNET_assert (s != NULL); + + if (mlen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP transmits %u-byte message to `%s' using address `%s'\n", + msgbuf_size, + GNUNET_i2s (&s->target), + GNUNET_a2s(s->sock_addr, s->addrlen)); + + if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, &s->target.hashPubKey, s)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + sizeof (struct UDPMessage) + msgbuf_size); + udpw->session = s; + udp = (struct UDPMessage *) &udpw[1]; + udpw->udp = udp; + udpw->msg_size = mlen; + udpw->cont = cont; + udpw->cont_cls = cont_cls; - return 0; + udp->header.size = htons (mlen); + udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE); + udp->reserved = htonl (0); + udp->sender = *plugin->env->my_identity; + memcpy (&udp[1], msgbuf, msgbuf_size); + GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw); + + delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer); + return mlen; } static ssize_t udp_plugin_send_wrapper (void *cls, @@ -660,6 +738,114 @@ udp_nat_port_map_callback (void *cls, int add_remove, } + +/** + * Message tokenizer has broken up an incomming message. Pass it on + * to the service. + * + * @param cls the 'struct Plugin' + * @param client the 'struct SourceInformation' + * @param hdr the actual message + */ +static void +process_inbound_tokenized_messages (void *cls, void *client, + const struct GNUNET_MessageHeader *hdr) +{ + struct Plugin *plugin = cls; + struct SourceInformation *si = client; + struct GNUNET_ATS_Information ats[2]; + struct GNUNET_TIME_Relative delay; + + /* setup ATS */ + ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE); + ats[0].value = htonl (1); + ats[1].type = htonl (GNUNET_ATS_NETWORK_TYPE); + ats[1].value = htonl (GNUNET_ATS_COST_WAN); + //GNUNET_break (ntohl(si->session->ats_address_network_type) != GNUNET_ATS_NET_UNSPECIFIED); + delay = plugin->env->receive (plugin->env->cls, + &si->sender, + hdr, + (const struct GNUNET_ATS_Information *) &ats, 2, + NULL, + si->arg, + si->args); + //si->session->flow_delay_for_other_peer = delay; +} + + +/** + * We've received a UDP Message. Process it (pass contents to main service). + * + * @param plugin plugin context + * @param msg the message + * @param sender_addr sender address + * @param sender_addr_len number of bytes in sender_addr + */ +static void +process_udp_message (struct Plugin *plugin, const struct UDPMessage *msg, + const struct sockaddr *sender_addr, + socklen_t sender_addr_len) +{ + struct SourceInformation si; + struct IPv4UdpAddress u4; + struct IPv6UdpAddress u6; + struct GNUNET_ATS_Information ats; + const void *arg; + size_t args; + + if (0 != ntohl (msg->reserved)) + { + GNUNET_break_op (0); + return; + } + if (ntohs (msg->header.size) < + sizeof (struct GNUNET_MessageHeader) + sizeof (struct UDPMessage)) + { + GNUNET_break_op (0); + return; + } + + ats.type = htonl (GNUNET_ATS_NETWORK_TYPE); + ats.value = htonl (GNUNET_ATS_NET_UNSPECIFIED); + /* convert address */ + switch (sender_addr->sa_family) + { + case AF_INET: + GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in)); + u4.ipv4_addr = ((struct sockaddr_in *) sender_addr)->sin_addr.s_addr; + u4.u4_port = ((struct sockaddr_in *) sender_addr)->sin_port; + arg = &u4; + args = sizeof (u4); + break; + case AF_INET6: + GNUNET_assert (sender_addr_len == sizeof (struct sockaddr_in6)); + u6.ipv6_addr = ((struct sockaddr_in6 *) sender_addr)->sin6_addr; + u6.u6_port = ((struct sockaddr_in6 *) sender_addr)->sin6_port; + arg = &u6; + args = sizeof (u6); + break; + default: + GNUNET_break (0); + return; + } +#if DEBUG_UDP + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received message with %u bytes from peer `%s' at `%s'\n", + (unsigned int) ntohs (msg->header.size), GNUNET_i2s (&msg->sender), + GNUNET_a2s (sender_addr, sender_addr_len)); +#endif + + /* iterate over all embedded messages */ + si.sender = msg->sender; + si.arg = arg; + si.args = args; + + GNUNET_SERVER_mst_receive (plugin->mst, &si, (const char *) &msg[1], + ntohs (msg->header.size) - + sizeof (struct UDPMessage), GNUNET_YES, GNUNET_NO); +} + + /** * Read and process a message from the given socket. * @@ -667,7 +853,7 @@ udp_nat_port_map_callback (void *cls, int add_remove, * @param rsock socket to read from */ static void -udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) +udp_select_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) { socklen_t fromlen; char addr[32]; @@ -717,10 +903,10 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) GNUNET_break_op (0); return; } - /* + process_udp_message (plugin, (const struct UDPMessage *) msg, (const struct sockaddr *) addr, fromlen); - */ + return; default: GNUNET_break_op (0); @@ -728,6 +914,59 @@ udp_read (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *rsock) } } +size_t +udp_select_send (struct Plugin *plugin) +{ + ssize_t sent; + size_t slen; + + struct UDPMessageWrapper *udpw = plugin->msg_head; + const struct sockaddr * sa = udpw->session->sock_addr; + + switch (sa->sa_family) + { + case AF_INET: + if (NULL == plugin->sockv4) + return 0; + sent = + GNUNET_NETWORK_socket_sendto (plugin->sockv4, udpw->udp, udpw->msg_size, + sa, slen = sizeof (struct sockaddr_in)); + break; + case AF_INET6: + if (NULL == plugin->sockv6) + return 0; + sent = + GNUNET_NETWORK_socket_sendto (plugin->sockv6, udpw->udp, udpw->msg_size, + sa, slen = sizeof (struct sockaddr_in6)); + break; + default: + GNUNET_break (0); + return 0; + } + if (GNUNET_SYSERR == sent) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "sendto"); + LOG (GNUNET_ERROR_TYPE_ERROR, + "UDP transmitted %u-byte message to %s (%d: %s)\n", + (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, + (sent < 0) ? STRERROR (errno) : "ok"); + if (udpw->cont != NULL) + udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR); + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "UDP transmitted %u-byte message to %s (%d: %s)\n", + (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) sent, + (sent < 0) ? STRERROR (errno) : "ok"); + + if (udpw->cont != NULL) + udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK); + + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw); + GNUNET_free (udpw); + + return sent; +} + /** * We have been notified that our readset has something to read. We don't * know which socket needs to be read, so we have to check each one @@ -749,15 +988,16 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { if ((NULL != plugin->sockv4) && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4))) - udp_read (plugin, plugin->sockv4); + udp_select_read (plugin, plugin->sockv4); if ((NULL != plugin->sockv6) && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6))) - udp_read (plugin, plugin->sockv6); + udp_select_read (plugin, plugin->sockv6); } if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0) { - /* TODO */ + if (plugin->msg_head != NULL) + udp_select_send (plugin); } plugin->select_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, @@ -1033,6 +1273,7 @@ libgnunet_plugin_transport_udp_init (void *cls) plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10); + plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, plugin); plugin->port = port; plugin->aport = aport; plugin->last_expected_delay = GNUNET_TIME_UNIT_SECONDS; @@ -1050,7 +1291,7 @@ libgnunet_plugin_transport_udp_init (void *cls) api->send = &udp_plugin_send_wrapper; api->send_with_session = &udp_plugin_send; - LOG (GNUNET_ERROR_TYPE_ERROR, "Setting up sockets\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Setting up sockets\n"); res = setup_sockets (plugin, &serverAddrv6, &serverAddrv4); if ((res == 0) || ((plugin->sockv4 == NULL) && (plugin->sockv6 == NULL))) { @@ -1060,8 +1301,9 @@ libgnunet_plugin_transport_udp_init (void *cls) return NULL; } - LOG (GNUNET_ERROR_TYPE_ERROR, "Starting broadcasting\n"); - setup_broadcast (plugin, &serverAddrv6, &serverAddrv4); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting broadcasting\n"); + if (broadcast == GNUNET_YES) + setup_broadcast (plugin, &serverAddrv6, &serverAddrv4); GNUNET_free_non_null (bind4_address); @@ -1105,6 +1347,24 @@ libgnunet_plugin_transport_udp_done (void *cls) GNUNET_NETWORK_fdset_destroy (plugin->ws); GNUNET_NAT_unregister (plugin->nat); + if (plugin->mst != NULL) + { + GNUNET_SERVER_mst_destroy(plugin->mst); + plugin->mst = NULL; + } + + /* Clean up leftover messages */ + struct UDPMessageWrapper *udpw = plugin->msg_head; + while (udpw != NULL) + { + struct UDPMessageWrapper *tmp = udpw->next; + GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw); + if (udpw->cont != NULL) + udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_SYSERR); + GNUNET_free (udpw); + udpw = tmp; + } + /* Clean up sessions */ #if DEBUG_UDP LOG (GNUNET_ERROR_TYPE_DEBUG, diff --git a/src/transport/plugin_transport_udp_new.h b/src/transport/plugin_transport_udp_new.h index 062849e9d..b565f9cb6 100644 --- a/src/transport/plugin_transport_udp_new.h +++ b/src/transport/plugin_transport_udp_new.h @@ -41,7 +41,7 @@ #define LOG(kind,...) GNUNET_log_from (kind, "transport-udp", __VA_ARGS__) -#define DEBUG_UDP GNUNET_YES +#define DEBUG_UDP GNUNET_NO #define DEBUG_UDP_BROADCASTING GNUNET_NO /** @@ -110,6 +110,23 @@ struct UDPMessage }; +struct UDPMessageWrapper +{ + struct Session *session; + struct UDPMessageWrapper *prev; + struct UDPMessageWrapper *next; + struct UDPMessage *udp; + size_t msg_size; + /** + * Function to call upon completion of the transmission. + */ + GNUNET_TRANSPORT_TransmitContinuation cont; + + /** + * Closure for 'cont'. + */ + void *cont_cls; +}; /** * Encapsulation of all of the state of the plugin. @@ -257,6 +274,9 @@ struct Plugin */ uint16_t aport; + struct UDPMessageWrapper *msg_head; + struct UDPMessageWrapper *msg_tail; + };