transport API changes in preparation for the storm
authorChristian Grothoff <christian@grothoff.org>
Tue, 13 Apr 2010 14:12:00 +0000 (14:12 +0000)
committerChristian Grothoff <christian@grothoff.org>
Tue, 13 Apr 2010 14:12:00 +0000 (14:12 +0000)
src/transport/gnunet-service-transport.c
src/transport/plugin_transport.h
src/transport/plugin_transport_tcp.c
src/transport/plugin_transport_template.c
src/transport/plugin_transport_udp.c
src/transport/plugin_transport_udp_nat.c

index d05e8882443291a9c3c0ebca959d7bbad479bb2a..fce1ba7d3d8a22a1847c1a5a96e35811d0711278 100644 (file)
@@ -174,6 +174,12 @@ struct ForeignAddressList
    */
   const void *addr;
 
+  /**
+   * Session (or NULL if no valid session currently exists or if the
+   * plugin does not use sessions).
+   */
+  struct Session *session;
+
   /**
    * What was the last latency observed for this address, plugin and peer?
    */
@@ -1347,6 +1353,7 @@ try_transmission_to_peer (struct NeighbourList *neighbour)
                               mq->message_buf_size,
                               mq->priority,
                               GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                              mq->specific_address->session,
                               mq->specific_address->addr,
                               mq->specific_address->addrlen,
                               force_address,
@@ -1623,6 +1630,61 @@ expire_address_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 }
 
 
+/**
+ * Function that will be called whenever the plugin internally
+ * cleans up a session pointer and hence the service needs to
+ * discard all of those sessions as well.  Plugins that do not
+ * use sessions can simply omit calling this function and always
+ * use NULL wherever a session pointer is needed.
+ * 
+ * @param cls closure
+ * @param peer which peer was the session for 
+ * @param session which session is being destoyed
+ */
+static void
+plugin_env_session_end  (void *cls,
+                        const struct GNUNET_PeerIdentity *peer,
+                        struct Session *session)
+{
+  struct TransportPlugin *p = cls;
+  struct NeighbourList *nl;
+  struct ReadyList *rl;
+  struct ForeignAddressList *pos;
+  struct ForeignAddressList *prev;
+
+  nl = find_neighbour (peer);
+  if (nl == NULL)
+    return;
+  rl = nl->plugins;
+  while (rl != NULL)
+    {
+      if (rl->plugin == p)
+       break;
+      rl = rl->next;
+    }
+  if (rl == NULL)
+    return;
+  prev = NULL;
+  pos = rl->addresses;
+  while ( (pos != NULL) &&
+         (pos->session != session) )
+    {
+      prev = pos;
+      pos = pos->next;
+    }
+  if (pos == NULL)
+    return;
+  pos->session = NULL;
+  if (pos->addrlen != 0)
+    return;
+  if (prev == NULL)
+    rl->addresses = pos->next;
+  else
+    prev->next = pos->next;
+  GNUNET_free (pos);
+}
+
+
 /**
  * Function that must be called by each plugin to notify the
  * transport service about the addresses under which the transport
@@ -1743,6 +1805,8 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
  *
  * @param neighbour which peer we care about
  * @param tname name of the transport plugin
+ * @param session session to look for, NULL for 'any'; otherwise
+ *        can be used for the service to "learn" this session ID
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if no such entry exists
@@ -1750,6 +1814,7 @@ notify_clients_disconnect (const struct GNUNET_PeerIdentity *peer)
 static struct ForeignAddressList *
 find_peer_address(struct NeighbourList *neighbour,
                  const char *tname,
+                 struct Session *session,
                  const char *addr,
                  size_t addrlen)
 {
@@ -1771,6 +1836,8 @@ find_peer_address(struct NeighbourList *neighbour,
          ( (address_head->addrlen != addrlen) ||
            (memcmp(address_head->addr, addr, addrlen) != 0) ) )
     address_head = address_head->next;
+  if (session != NULL)
+    address_head->session = session; /* learn it! */
   return address_head;
 }
 
@@ -1781,6 +1848,7 @@ find_peer_address(struct NeighbourList *neighbour,
  *
  * @param neighbour which peer we care about
  * @param tname name of the transport plugin
+ * @param session session of the plugin, or NULL for none
  * @param addr binary address
  * @param addrlen length of addr
  * @return NULL if we do not have a transport plugin for 'tname'
@@ -1788,13 +1856,14 @@ find_peer_address(struct NeighbourList *neighbour,
 static struct ForeignAddressList *
 add_peer_address (struct NeighbourList *neighbour,
                  const char *tname,
+                 struct Session *session,
                  const char *addr, 
                  size_t addrlen)
 {
   struct ReadyList *head;
   struct ForeignAddressList *ret;
 
-  ret = find_peer_address (neighbour, tname, addr, addrlen);
+  ret = find_peer_address (neighbour, tname, session, addr, addrlen);
   if (ret != NULL)
     return ret;
   head = neighbour->plugins;
@@ -1807,6 +1876,7 @@ add_peer_address (struct NeighbourList *neighbour,
   if (head == NULL)
     return NULL;
   ret = GNUNET_malloc(sizeof(struct ForeignAddressList) + addrlen);
+  ret->session = session;
   ret->addr = (const char*) &ret[1];
   memcpy (&ret[1], addr, addrlen);
   ret->addrlen = addrlen;
@@ -2014,7 +2084,7 @@ add_to_foreign_address_list (void *cls,
                            1,
                            GNUNET_NO);      
   try = GNUNET_NO;
-  fal = find_peer_address (n, tname, addr, addrlen);
+  fal = find_peer_address (n, tname, NULL, addr, addrlen);
   if (fal == NULL)
     {
 #if DEBUG_TRANSPORT
@@ -2025,7 +2095,7 @@ add_to_foreign_address_list (void *cls,
                  GNUNET_i2s (&n->id),
                  expiration.value);
 #endif
-      fal = add_peer_address (n, tname, addr, addrlen);
+      fal = add_peer_address (n, tname, NULL, addr, addrlen);
       if (fal == NULL)
        {
          GNUNET_STATISTICS_update (stats,
@@ -2392,6 +2462,7 @@ check_pending_validation (void *cls,
       n->public_key_valid = GNUNET_YES;
       fal = add_peer_address (n,
                              ve->transport_name,
+                             NULL,
                              ve->addr,
                              ve->addrlen);
       GNUNET_assert (fal != NULL);
@@ -2604,7 +2675,7 @@ run_validation (void *cls,
     neighbour = setup_new_neighbour(&id);
   neighbour->publicKey = va->publicKey;
   neighbour->public_key_valid = GNUNET_YES;
-  peer_address = add_peer_address(neighbour, tname, addr, addrlen);
+  peer_address = add_peer_address(neighbour, tname, NULL, addr, addrlen);
   GNUNET_assert(peer_address != NULL);
   hello_size = GNUNET_HELLO_size(our_hello);
   tsize = sizeof(struct TransportPingMessage) + hello_size;
@@ -3018,8 +3089,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
                  GNUNET_CRYPTO_rsa_sign (my_private_key,
                                          &pong->purpose, &pong->signature));
   n = find_neighbour(peer);
-  if (n == NULL)
-    n = setup_new_neighbour(peer);
+  GNUNET_assert (n != NULL);
   /* first try reliable response transmission */
   rl = n->plugins;
   while (rl != NULL)
@@ -3033,6 +3103,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
                                           ntohs (pong->header.size),
                                           TRANSPORT_PONG_PRIORITY, 
                                           HELLO_VERIFICATION_TIMEOUT,
+                                          fal->session,
                                           fal->addr,
                                           fal->addrlen,
                                           GNUNET_SYSERR,
@@ -3088,6 +3159,7 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
  * @param message the message, NULL if we only care about
  *                learning about the delay until we should receive again
  * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param session identifier used for this session (can be NULL)
  * @param sender_address binary address of the sender (if observed)
  * @param sender_address_len number of bytes in sender_address
  * @return how long the plugin should wait until receiving more data
@@ -3096,11 +3168,13 @@ handle_ping(void *cls, const struct GNUNET_MessageHeader *message,
 static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
-                    unsigned int distance, const char *sender_address,
+                    unsigned int distance,
+                   struct Session *session,
+                   const char *sender_address,
                     size_t sender_address_len)
 {
-  struct ReadyList *service_context;
   struct TransportPlugin *plugin = cls;
+  struct ReadyList *service_context;
   struct TransportClient *cpos;
   struct InboundMessage *im;
   struct ForeignAddressList *peer_address;
@@ -3119,6 +3193,7 @@ plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
     {
       peer_address = add_peer_address(n, 
                                      plugin->short_name,
+                                     session,
                                      sender_address, 
                                      sender_address_len);  
       if (peer_address != NULL)
@@ -3577,6 +3652,7 @@ create_environment (struct TransportPlugin *plug)
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
+  plug->env.session_end = &plugin_env_session_end;
   plug->env.max_connections = max_connect_per_transport;
   plug->env.stats = stats;
 }
index 0291f9bb43f60eb6905b7b125f86e673ee25395a..2b7017036798773ada9e9062e380305cdb33092e 100644 (file)
 #include "gnunet_statistics_service.h"
 #include "gnunet_transport_service.h"
 
+/**
+ * Opaque pointer that plugins can use to distinguish specific
+ * connections to a given peer.  Typically used by stateful plugins to
+ * allow the service to refer to specific streams instead of a more
+ * general notion of "some connection" to the given peer.  This is
+ * useful since sometimes (i.e. for inbound TCP connections) a
+ * connection may not have an address that can be used for meaningful
+ * distinction between sessions to the same peer.
+ */
+struct Session;
+
+
+/**
+ * Function that will be called whenever the plugin internally
+ * cleans up a session pointer and hence the service needs to
+ * discard all of those sessions as well.  Plugins that do not
+ * use sessions can simply omit calling this function and always
+ * use NULL wherever a session pointer is needed.
+ * 
+ * @param cls closure
+ * @param peer which peer was the session for 
+ * @param session which session is being destoyed
+ */
+typedef void (*GNUNET_TRANSPORT_SessionEnd) (void *cls,
+                                            const struct GNUNET_PeerIdentity *peer,
+                                            struct Session *session);
+
 
 /**
  * Function called by the transport for each received message.
@@ -47,6 +74,7 @@
  * @param message the message, NULL if we only care about
  *                learning about the delay until we should receive again -- FIXME!
  * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param session identifier used for this session (can be NULL)
  * @param sender_address binary address of the sender (if observed)
  * @param sender_address_len number of bytes in sender_address
  * @return how long the plugin should wait until receiving more data
@@ -60,6 +88,7 @@ typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) (v
                                                                               GNUNET_MessageHeader *
                                                                               message,
                                                                               uint32_t distance,
+                                                                              struct Session *session,
                                                                               const char *sender_address,
                                                                               size_t sender_address_len);
 
@@ -156,6 +185,12 @@ struct GNUNET_TRANSPORT_PluginEnvironment
    */
   GNUNET_TRANSPORT_TrafficReport traffic_report;
 
+  /**
+   * Function that must be called by the plugin when a non-NULL
+   * session handle stops being valid (is destroyed).
+   */
+  GNUNET_TRANSPORT_SessionEnd session_end;
+
   /**
    * What is the maximum number of connections that this transport
    * should allow?  Transports that do not have sessions (such as
@@ -201,6 +236,7 @@ typedef void
  *                require plugins to discard the message after the timeout,
  *                just advisory for the desired delay; most plugins will ignore
  *                this as well)
+ * @param session which session must be used (or NULL for "any")
  * @param addr the address to use (can be NULL if the plugin
  *                is "on its own" (i.e. re-use existing TCP connection))
  * @param addrlen length of the address in bytes
@@ -226,6 +262,7 @@ typedef ssize_t
                                         size_t msgbuf_size,
                                         uint32_t priority,
                                         struct GNUNET_TIME_Relative timeout,
+                                       struct Session *session,
                                         const void *addr,
                                        size_t addrlen,
                                        int force_address,
@@ -323,7 +360,7 @@ struct GNUNET_TRANSPORT_PluginFunctions
 
   /**
    * Function that the transport service will use to transmit data to
-   * another peer.  May be null for plugins that only support
+   * another peer.  May be NULL for plugins that only support
    * receiving data.  After this call, the plugin call the specified
    * continuation with success or error before notifying us about the
    * target having disconnected.
index 43cdceb43e1091888022a370b05d34cdd5c72b44..2a3d52193d92d5b5a66998e6f4232b689397a52f 100644 (file)
@@ -640,6 +640,7 @@ select_better_session (struct Session *s1,
  *                require plugins to discard the message after the timeout,
  *                just advisory for the desired delay; most plugins will ignore
  *                this as well)
+ * @param session which session must be used (or NULL for "any")
  * @param addr the address to use (can be NULL if the plugin
  *                is "on its own" (i.e. re-use existing TCP connection))
  * @param addrlen length of the address in bytes
@@ -664,13 +665,13 @@ tcp_plugin_send (void *cls,
                  size_t msgbuf_size,
                  uint32_t priority,
                  struct GNUNET_TIME_Relative timeout,
+                struct Session *session,
                 const void *addr,
                 size_t addrlen,
                 int force_address,
                  GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  struct Session *session;
   struct Session *cand_session;
   struct Session *next;
   struct PendingMessage *pm;
@@ -684,44 +685,47 @@ tcp_plugin_send (void *cls,
   /* FIXME: we could do this a cheaper with a hash table
      where we could restrict the iteration to entries that match
      the target peer... */
-  cand_session = NULL;
-  next = plugin->sessions;
-  while (NULL != (session = next)) 
+  if (session == NULL)
     {
-      next = session->next;
-      GNUNET_assert (session->client != NULL);
-      if (0 != memcmp (target,
-                      &session->target, 
-                      sizeof (struct GNUNET_PeerIdentity)))
-       continue;
-      if ( ( (GNUNET_SYSERR == force_address) &&
-            (session->expecting_welcome == GNUNET_NO) ) ||
-          (GNUNET_NO == force_address) )   
+      cand_session = NULL;
+      next = plugin->sessions;
+      while (NULL != (session = next)) 
        {
+         next = session->next;
+         GNUNET_assert (session->client != NULL);
+         if (0 != memcmp (target,
+                          &session->target, 
+                          sizeof (struct GNUNET_PeerIdentity)))
+           continue;
+         if ( ( (GNUNET_SYSERR == force_address) &&
+                (session->expecting_welcome == GNUNET_NO) ) ||
+              (GNUNET_NO == force_address) )   
+           {
+             cand_session = select_better_session (cand_session,
+                                                   session);
+             continue;
+           }
+         if (GNUNET_SYSERR == force_address)
+           continue;
+         GNUNET_break (GNUNET_YES == force_address);
+         if (addr == NULL)
+           {
+             GNUNET_break (0);
+             break;
+           }
+         if (session->inbound == GNUNET_YES) 
+           continue;
+         if (addrlen != session->connect_alen)
+           continue;
+         if (0 != memcmp (session->connect_addr,
+                          addr,
+                          addrlen))
+           continue;
          cand_session = select_better_session (cand_session,
-                                               session);
-         continue;
+                                               session);             
        }
-      if (GNUNET_SYSERR == force_address)
-       continue;
-      GNUNET_break (GNUNET_YES == force_address);
-      if (addr == NULL)
-       {
-         GNUNET_break (0);
-         break;
-       }
-      if (session->inbound == GNUNET_YES) 
-       continue;
-      if (addrlen != session->connect_alen)
-       continue;
-      if (0 != memcmp (session->connect_addr,
-                      addr,
-                      addrlen))
-       continue;
-      cand_session = select_better_session (cand_session,
-                                           session);         
+      session = cand_session;
     }
-  session = cand_session;
   if ( (session == NULL) &&
        (addr == NULL) )
     {
@@ -1134,7 +1138,9 @@ delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
   delay = session->plugin->env->receive (session->plugin->env->cls,
                                         &session->target,
-                                        NULL, 0, NULL, 0);
+                                        NULL, 0, 
+                                        session,
+                                        NULL, 0);
   if (delay.value == 0)
     GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
   else
@@ -1187,9 +1193,9 @@ handle_tcp_data (void *cls,
                            ntohs (message->size),
                            GNUNET_NO); 
   delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1,
-                               session->connect_addr,
-                               session->connect_alen);
-
+                               session
+                               (GNUNET_YES == session->inbound) ? NULL : session->connect_addr,
+                               (GNUNET_YES == session->inbound) ? 0 : session->connect_alen);
   if (delay.value == 0)
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
   else
index 6ad555a51378bc0ee007030b9aec87748e400396..f09503b9b8017c6d5931d0de3c03b1795b0b4b56 100644 (file)
@@ -139,6 +139,7 @@ struct Plugin
  * @param msgbuf the message to transmit
  * @param msgbuf_size number of bytes in 'msgbuf'
  * @param timeout when should we time out 
+ * @param session which session must be used (or NULL for "any")
  * @param addr the address to use (can be NULL if the plugin
  *                is "on its own" (i.e. re-use existing TCP connection))
  * @param addrlen length of the address in bytes
@@ -162,6 +163,7 @@ template_plugin_send (void *cls,
                       size_t msgbuf_size,
                       unsigned int priority,
                       struct GNUNET_TIME_Relative timeout,
+                     struct Session *session,
                       const void *addr,
                       size_t addrlen,
                       int force_address,
index bdec324906b6dae5faaaa2b3c6dd0656bb3cbb78..3e59f89dcf815b65ce3c64e77c42b7baac435658 100644 (file)
@@ -184,6 +184,7 @@ udp_transport_server_stop (void *cls)
  * @param msgbuf_size the size of the msgbuf to send
  * @param priority how important is the message (ignored by UDP)
  * @param timeout when should we time out (give up) if we can not transmit?
+ * @param session which session must be used (always NULL for UDP)
  * @param addr the addr to send the message to, needs to be a sockaddr for us
  * @param addrlen the len of addr
  * @param force_address GNUNET_YES if the plugin MUST use the given address,
@@ -206,6 +207,7 @@ udp_plugin_send (void *cls,
                  size_t msgbuf_size,
                  unsigned int priority,
                  struct GNUNET_TIME_Relative timeout,
+                struct Session *session,
                  const void *addr,
                  size_t addrlen,
                  int force_address,
@@ -216,6 +218,7 @@ udp_plugin_send (void *cls,
   int ssize;
   ssize_t sent;
 
+  GNUNET_assert (NULL == session);
   GNUNET_assert(udp_sock != NULL);
   if ( (addr == NULL) || (addrlen == 0) )
     {
@@ -417,7 +420,8 @@ udp_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
                      count, ntohs(currhdr->type), ntohs(currhdr->size), offset);
 #endif
         plugin->env->receive (plugin->env->cls,
-            sender, currhdr, UDP_DIRECT_DISTANCE, (char *)&addr, fromlen);
+                             sender, currhdr, UDP_DIRECT_DISTANCE, 
+                             NULL, (const char *)&addr, fromlen);
         offset += ntohs(currhdr->size);
 #if DEBUG_UDP
     GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "udp", _
index 3733130e20b59b2e648ab6e575c22aef2863f875..531f2ae100fe12ac72a9bee0bf999f8210587253 100644 (file)
@@ -627,6 +627,7 @@ run_gnunet_nat_client (struct Plugin *plugin, const char *addr, size_t addrlen)
  * @param msgbuf_size the size of the msgbuf to send
  * @param priority how important is the message (ignored by UDP)
  * @param timeout when should we time out (give up) if we can not transmit?
+ * @param session identifier used for this session (can be NULL)
  * @param addr the addr to send the message to, needs to be a sockaddr for us
  * @param addrlen the len of addr
  * @param force_address not used, we had better have an address to send to
@@ -642,15 +643,16 @@ run_gnunet_nat_client (struct Plugin *plugin, const char *addr, size_t addrlen)
  */
 static ssize_t
 udp_nat_plugin_send (void *cls,
-                 const struct GNUNET_PeerIdentity *target,
-                 const char *msgbuf,
-                 size_t msgbuf_size,
-                 unsigned int priority,
-                 struct GNUNET_TIME_Relative timeout,
-                 const void *addr,
-                 size_t addrlen,
-                 int force_address,
-                 GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
+                    const struct GNUNET_PeerIdentity *target,
+                    const char *msgbuf,
+                    size_t msgbuf_size,
+                    unsigned int priority,
+                    struct GNUNET_TIME_Relative timeout,
+                    struct Session *session,
+                    const void *addr,
+                    size_t addrlen,
+                    int force_address,
+                    GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
   ssize_t sent;
@@ -659,6 +661,7 @@ udp_nat_plugin_send (void *cls,
   struct sockaddr_in *sockaddr = (struct sockaddr_in *)addr;
   int other_peer_natd;
 
+  GNUNET_assert (NULL == session);
   other_peer_natd = GNUNET_NO;
   if ((sockaddr->sin_family == AF_INET) && (ntohs(sockaddr->sin_port) == 0))
     {
@@ -1177,7 +1180,8 @@ udp_nat_demultiplexer(struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
       /* If we receive these just ignore! */
       break;
     default:
-      plugin->env->receive (plugin->env->cls, sender, currhdr, UDP_DIRECT_DISTANCE, (char *)sender_addr, fromlen);
+      plugin->env->receive (plugin->env->cls, sender, currhdr, UDP_DIRECT_DISTANCE, 
+                           NULL, (char *)sender_addr, fromlen);
   }
 
 }