From d0f14ce4076688b90da1a88db984043eceda0566 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 13 Apr 2010 14:12:00 +0000 Subject: [PATCH] transport API changes in preparation for the storm --- src/transport/gnunet-service-transport.c | 92 +++++++++++++++++++++-- src/transport/plugin_transport.h | 39 +++++++++- src/transport/plugin_transport_tcp.c | 82 ++++++++++---------- src/transport/plugin_transport_template.c | 2 + src/transport/plugin_transport_udp.c | 6 +- src/transport/plugin_transport_udp_nat.c | 24 +++--- 6 files changed, 187 insertions(+), 58 deletions(-) diff --git a/src/transport/gnunet-service-transport.c b/src/transport/gnunet-service-transport.c index d05e88824..fce1ba7d3 100644 --- a/src/transport/gnunet-service-transport.c +++ b/src/transport/gnunet-service-transport.c @@ -174,6 +174,12 @@ struct ForeignAddressList */ const void *addr; + /** + * Session (or NULL if no valid session currently exists or if the + * plugin does not use sessions). + */ + struct Session *session; + /** * What was the last latency observed for this address, plugin and peer? */ @@ -1347,6 +1353,7 @@ try_transmission_to_peer (struct NeighbourList *neighbour) mq->message_buf_size, mq->priority, GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, + mq->specific_address->session, mq->specific_address->addr, mq->specific_address->addrlen, force_address, @@ -1623,6 +1630,61 @@ expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Function that will be called whenever the plugin internally + * cleans up a session pointer and hence the service needs to + * discard all of those sessions as well. Plugins that do not + * use sessions can simply omit calling this function and always + * use NULL wherever a session pointer is needed. + * + * @param cls closure + * @param peer which peer was the session for + * @param session which session is being destoyed + */ +static void +plugin_env_session_end (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Session *session) +{ + struct TransportPlugin *p = cls; + struct NeighbourList *nl; + struct ReadyList *rl; + struct ForeignAddressList *pos; + struct ForeignAddressList *prev; + + nl = find_neighbour (peer); + if (nl == NULL) + return; + rl = nl->plugins; + while (rl != NULL) + { + if (rl->plugin == p) + break; + rl = rl->next; + } + if (rl == NULL) + return; + prev = NULL; + pos = rl->addresses; + while ( (pos != NULL) && + (pos->session != session) ) + { + prev = pos; + pos = pos->next; + } + if (pos == NULL) + return; + pos->session = NULL; + if (pos->addrlen != 0) + return; + if (prev == NULL) + rl->addresses = pos->next; + else + prev->next = pos->next; + GNUNET_free (pos); +} + + /** * Function that must be called by each plugin to notify the * transport service about the addresses under which the transport @@ -1743,6 +1805,8 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) * * @param neighbour which peer we care about * @param tname name of the transport plugin + * @param session session to look for, NULL for 'any'; otherwise + * can be used for the service to "learn" this session ID * @param addr binary address * @param addrlen length of addr * @return NULL if no such entry exists @@ -1750,6 +1814,7 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer) static struct ForeignAddressList * find_peer_address(struct NeighbourList *neighbour, const char *tname, + struct Session *session, const char *addr, size_t addrlen) { @@ -1771,6 +1836,8 @@ find_peer_address(struct NeighbourList *neighbour, ( (address_head->addrlen != addrlen) || (memcmp(address_head->addr, addr, addrlen) != 0) ) ) address_head = address_head->next; + if (session != NULL) + address_head->session = session; /* learn it! */ return address_head; } @@ -1781,6 +1848,7 @@ find_peer_address(struct NeighbourList *neighbour, * * @param neighbour which peer we care about * @param tname name of the transport plugin + * @param session session of the plugin, or NULL for none * @param addr binary address * @param addrlen length of addr * @return NULL if we do not have a transport plugin for 'tname' @@ -1788,13 +1856,14 @@ find_peer_address(struct NeighbourList *neighbour, static struct ForeignAddressList * add_peer_address (struct NeighbourList *neighbour, const char *tname, + struct Session *session, const char *addr, size_t addrlen) { struct ReadyList *head; struct ForeignAddressList *ret; - ret = find_peer_address (neighbour, tname, addr, addrlen); + ret = find_peer_address (neighbour, tname, session, addr, addrlen); if (ret != NULL) return ret; head = neighbour->plugins; @@ -1807,6 +1876,7 @@ add_peer_address (struct NeighbourList *neighbour, if (head == NULL) return NULL; ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen); + ret->session = session; ret->addr = (const char*) &ret[1]; memcpy (&ret[1], addr, addrlen); ret->addrlen = addrlen; @@ -2014,7 +2084,7 @@ add_to_foreign_address_list (void *cls, 1, GNUNET_NO); try = GNUNET_NO; - fal = find_peer_address (n, tname, addr, addrlen); + fal = find_peer_address (n, tname, NULL, addr, addrlen); if (fal == NULL) { #if DEBUG_TRANSPORT @@ -2025,7 +2095,7 @@ add_to_foreign_address_list (void *cls, GNUNET_i2s (&n->id), expiration.value); #endif - fal = add_peer_address (n, tname, addr, addrlen); + fal = add_peer_address (n, tname, NULL, addr, addrlen); if (fal == NULL) { GNUNET_STATISTICS_update (stats, @@ -2392,6 +2462,7 @@ check_pending_validation (void *cls, n->public_key_valid = GNUNET_YES; fal = add_peer_address (n, ve->transport_name, + NULL, ve->addr, ve->addrlen); GNUNET_assert (fal != NULL); @@ -2604,7 +2675,7 @@ run_validation (void *cls, neighbour = setup_new_neighbour(&id); neighbour->publicKey = va->publicKey; neighbour->public_key_valid = GNUNET_YES; - peer_address = add_peer_address(neighbour, tname, addr, addrlen); + peer_address = add_peer_address(neighbour, tname, NULL, addr, addrlen); GNUNET_assert(peer_address != NULL); hello_size = GNUNET_HELLO_size(our_hello); tsize = sizeof(struct TransportPingMessage) + hello_size; @@ -3018,8 +3089,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, GNUNET_CRYPTO_rsa_sign (my_private_key, &pong->purpose, &pong->signature)); n = find_neighbour(peer); - if (n == NULL) - n = setup_new_neighbour(peer); + GNUNET_assert (n != NULL); /* first try reliable response transmission */ rl = n->plugins; while (rl != NULL) @@ -3033,6 +3103,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, ntohs (pong->header.size), TRANSPORT_PONG_PRIORITY, HELLO_VERIFICATION_TIMEOUT, + fal->session, fal->addr, fal->addrlen, GNUNET_SYSERR, @@ -3088,6 +3159,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, * @param message the message, NULL if we only care about * learning about the delay until we should receive again * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param session identifier used for this session (can be NULL) * @param sender_address binary address of the sender (if observed) * @param sender_address_len number of bytes in sender_address * @return how long the plugin should wait until receiving more data @@ -3096,11 +3168,13 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message, static struct GNUNET_TIME_Relative plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, const struct GNUNET_MessageHeader *message, - unsigned int distance, const char *sender_address, + unsigned int distance, + struct Session *session, + const char *sender_address, size_t sender_address_len) { - struct ReadyList *service_context; struct TransportPlugin *plugin = cls; + struct ReadyList *service_context; struct TransportClient *cpos; struct InboundMessage *im; struct ForeignAddressList *peer_address; @@ -3119,6 +3193,7 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer, { peer_address = add_peer_address(n, plugin->short_name, + session, sender_address, sender_address_len); if (peer_address != NULL) @@ -3577,6 +3652,7 @@ create_environment (struct TransportPlugin *plug) plug->env.cls = plug; plug->env.receive = &plugin_env_receive; plug->env.notify_address = &plugin_env_notify_address; + plug->env.session_end = &plugin_env_session_end; plug->env.max_connections = max_connect_per_transport; plug->env.stats = stats; } diff --git a/src/transport/plugin_transport.h b/src/transport/plugin_transport.h index 0291f9bb4..2b7017036 100644 --- a/src/transport/plugin_transport.h +++ b/src/transport/plugin_transport.h @@ -36,6 +36,33 @@ #include "gnunet_statistics_service.h" #include "gnunet_transport_service.h" +/** + * Opaque pointer that plugins can use to distinguish specific + * connections to a given peer. Typically used by stateful plugins to + * allow the service to refer to specific streams instead of a more + * general notion of "some connection" to the given peer. This is + * useful since sometimes (i.e. for inbound TCP connections) a + * connection may not have an address that can be used for meaningful + * distinction between sessions to the same peer. + */ +struct Session; + + +/** + * Function that will be called whenever the plugin internally + * cleans up a session pointer and hence the service needs to + * discard all of those sessions as well. Plugins that do not + * use sessions can simply omit calling this function and always + * use NULL wherever a session pointer is needed. + * + * @param cls closure + * @param peer which peer was the session for + * @param session which session is being destoyed + */ +typedef void (*GNUNET_TRANSPORT_SessionEnd) (void *cls, + const struct GNUNET_PeerIdentity *peer, + struct Session *session); + /** * Function called by the transport for each received message. @@ -47,6 +74,7 @@ * @param message the message, NULL if we only care about * learning about the delay until we should receive again -- FIXME! * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL) + * @param session identifier used for this session (can be NULL) * @param sender_address binary address of the sender (if observed) * @param sender_address_len number of bytes in sender_address * @return how long the plugin should wait until receiving more data @@ -60,6 +88,7 @@ typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) (v GNUNET_MessageHeader * message, uint32_t distance, + struct Session *session, const char *sender_address, size_t sender_address_len); @@ -156,6 +185,12 @@ struct GNUNET_TRANSPORT_PluginEnvironment */ GNUNET_TRANSPORT_TrafficReport traffic_report; + /** + * Function that must be called by the plugin when a non-NULL + * session handle stops being valid (is destroyed). + */ + GNUNET_TRANSPORT_SessionEnd session_end; + /** * What is the maximum number of connections that this transport * should allow? Transports that do not have sessions (such as @@ -201,6 +236,7 @@ typedef void * require plugins to discard the message after the timeout, * just advisory for the desired delay; most plugins will ignore * this as well) + * @param session which session must be used (or NULL for "any") * @param addr the address to use (can be NULL if the plugin * is "on its own" (i.e. re-use existing TCP connection)) * @param addrlen length of the address in bytes @@ -226,6 +262,7 @@ typedef ssize_t size_t msgbuf_size, uint32_t priority, struct GNUNET_TIME_Relative timeout, + struct Session *session, const void *addr, size_t addrlen, int force_address, @@ -323,7 +360,7 @@ struct GNUNET_TRANSPORT_PluginFunctions /** * Function that the transport service will use to transmit data to - * another peer. May be null for plugins that only support + * another peer. May be NULL for plugins that only support * receiving data. After this call, the plugin call the specified * continuation with success or error before notifying us about the * target having disconnected. diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c index 43cdceb43..2a3d52193 100644 --- a/src/transport/plugin_transport_tcp.c +++ b/src/transport/plugin_transport_tcp.c @@ -640,6 +640,7 @@ select_better_session (struct Session *s1, * require plugins to discard the message after the timeout, * just advisory for the desired delay; most plugins will ignore * this as well) + * @param session which session must be used (or NULL for "any") * @param addr the address to use (can be NULL if the plugin * is "on its own" (i.e. re-use existing TCP connection)) * @param addrlen length of the address in bytes @@ -664,13 +665,13 @@ tcp_plugin_send (void *cls, size_t msgbuf_size, uint32_t priority, struct GNUNET_TIME_Relative timeout, + struct Session *session, const void *addr, size_t addrlen, int force_address, GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { struct Plugin *plugin = cls; - struct Session *session; struct Session *cand_session; struct Session *next; struct PendingMessage *pm; @@ -684,44 +685,47 @@ tcp_plugin_send (void *cls, /* FIXME: we could do this a cheaper with a hash table where we could restrict the iteration to entries that match the target peer... */ - cand_session = NULL; - next = plugin->sessions; - while (NULL != (session = next)) + if (session == NULL) { - next = session->next; - GNUNET_assert (session->client != NULL); - if (0 != memcmp (target, - &session->target, - sizeof (struct GNUNET_PeerIdentity))) - continue; - if ( ( (GNUNET_SYSERR == force_address) && - (session->expecting_welcome == GNUNET_NO) ) || - (GNUNET_NO == force_address) ) + cand_session = NULL; + next = plugin->sessions; + while (NULL != (session = next)) { + next = session->next; + GNUNET_assert (session->client != NULL); + if (0 != memcmp (target, + &session->target, + sizeof (struct GNUNET_PeerIdentity))) + continue; + if ( ( (GNUNET_SYSERR == force_address) && + (session->expecting_welcome == GNUNET_NO) ) || + (GNUNET_NO == force_address) ) + { + cand_session = select_better_session (cand_session, + session); + continue; + } + if (GNUNET_SYSERR == force_address) + continue; + GNUNET_break (GNUNET_YES == force_address); + if (addr == NULL) + { + GNUNET_break (0); + break; + } + if (session->inbound == GNUNET_YES) + continue; + if (addrlen != session->connect_alen) + continue; + if (0 != memcmp (session->connect_addr, + addr, + addrlen)) + continue; cand_session = select_better_session (cand_session, - session); - continue; + session); } - if (GNUNET_SYSERR == force_address) - continue; - GNUNET_break (GNUNET_YES == force_address); - if (addr == NULL) - { - GNUNET_break (0); - break; - } - if (session->inbound == GNUNET_YES) - continue; - if (addrlen != session->connect_alen) - continue; - if (0 != memcmp (session->connect_addr, - addr, - addrlen)) - continue; - cand_session = select_better_session (cand_session, - session); + session = cand_session; } - session = cand_session; if ( (session == NULL) && (addr == NULL) ) { @@ -1134,7 +1138,9 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK; delay = session->plugin->env->receive (session->plugin->env->cls, &session->target, - NULL, 0, NULL, 0); + NULL, 0, + session, + NULL, 0); if (delay.value == 0) GNUNET_SERVER_receive_done (session->client, GNUNET_OK); else @@ -1187,9 +1193,9 @@ handle_tcp_data (void *cls, ntohs (message->size), GNUNET_NO); delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1, - session->connect_addr, - session->connect_alen); - + session, + (GNUNET_YES == session->inbound) ? NULL : session->connect_addr, + (GNUNET_YES == session->inbound) ? 0 : session->connect_alen); if (delay.value == 0) GNUNET_SERVER_receive_done (client, GNUNET_OK); else diff --git a/src/transport/plugin_transport_template.c b/src/transport/plugin_transport_template.c index 6ad555a51..f09503b9b 100644 --- a/src/transport/plugin_transport_template.c +++ b/src/transport/plugin_transport_template.c @@ -139,6 +139,7 @@ struct Plugin * @param msgbuf the message to transmit * @param msgbuf_size number of bytes in 'msgbuf' * @param timeout when should we time out + * @param session which session must be used (or NULL for "any") * @param addr the address to use (can be NULL if the plugin * is "on its own" (i.e. re-use existing TCP connection)) * @param addrlen length of the address in bytes @@ -162,6 +163,7 @@ template_plugin_send (void *cls, size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative timeout, + struct Session *session, const void *addr, size_t addrlen, int force_address, diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index bdec32490..3e59f89dc 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -184,6 +184,7 @@ udp_transport_server_stop (void *cls) * @param msgbuf_size the size of the msgbuf to send * @param priority how important is the message (ignored by UDP) * @param timeout when should we time out (give up) if we can not transmit? + * @param session which session must be used (always NULL for UDP) * @param addr the addr to send the message to, needs to be a sockaddr for us * @param addrlen the len of addr * @param force_address GNUNET_YES if the plugin MUST use the given address, @@ -206,6 +207,7 @@ udp_plugin_send (void *cls, size_t msgbuf_size, unsigned int priority, struct GNUNET_TIME_Relative timeout, + struct Session *session, const void *addr, size_t addrlen, int force_address, @@ -216,6 +218,7 @@ udp_plugin_send (void *cls, int ssize; ssize_t sent; + GNUNET_assert (NULL == session); GNUNET_assert(udp_sock != NULL); if ( (addr == NULL) || (addrlen == 0) ) { @@ -417,7 +420,8 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) count, ntohs(currhdr->type), ntohs(currhdr->size), offset); #endif plugin->env->receive (plugin->env->cls, - sender, currhdr, UDP_DIRECT_DISTANCE, (char *)&addr, fromlen); + sender, currhdr, UDP_DIRECT_DISTANCE, + NULL, (const char *)&addr, fromlen); offset += ntohs(currhdr->size); #if DEBUG_UDP GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "udp", _ diff --git a/src/transport/plugin_transport_udp_nat.c b/src/transport/plugin_transport_udp_nat.c index 3733130e2..531f2ae10 100644 --- a/src/transport/plugin_transport_udp_nat.c +++ b/src/transport/plugin_transport_udp_nat.c @@ -627,6 +627,7 @@ run_gnunet_nat_client (struct Plugin *plugin, const char *addr, size_t addrlen) * @param msgbuf_size the size of the msgbuf to send * @param priority how important is the message (ignored by UDP) * @param timeout when should we time out (give up) if we can not transmit? + * @param session identifier used for this session (can be NULL) * @param addr the addr to send the message to, needs to be a sockaddr for us * @param addrlen the len of addr * @param force_address not used, we had better have an address to send to @@ -642,15 +643,16 @@ run_gnunet_nat_client (struct Plugin *plugin, const char *addr, size_t addrlen) */ static ssize_t udp_nat_plugin_send (void *cls, - const struct GNUNET_PeerIdentity *target, - const char *msgbuf, - size_t msgbuf_size, - unsigned int priority, - struct GNUNET_TIME_Relative timeout, - const void *addr, - size_t addrlen, - int force_address, - GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) + const struct GNUNET_PeerIdentity *target, + const char *msgbuf, + size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Relative timeout, + struct Session *session, + const void *addr, + size_t addrlen, + int force_address, + GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) { struct Plugin *plugin = cls; ssize_t sent; @@ -659,6 +661,7 @@ udp_nat_plugin_send (void *cls, struct sockaddr_in *sockaddr = (struct sockaddr_in *)addr; int other_peer_natd; + GNUNET_assert (NULL == session); other_peer_natd = GNUNET_NO; if ((sockaddr->sin_family == AF_INET) && (ntohs(sockaddr->sin_port) == 0)) { @@ -1177,7 +1180,8 @@ udp_nat_demultiplexer(struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, /* If we receive these just ignore! */ break; default: - plugin->env->receive (plugin->env->cls, sender, currhdr, UDP_DIRECT_DISTANCE, (char *)sender_addr, fromlen); + plugin->env->receive (plugin->env->cls, sender, currhdr, UDP_DIRECT_DISTANCE, + NULL, (char *)sender_addr, fromlen); } } -- 2.25.1