-fix time assertion introduce in last patch
[oweals/gnunet.git] / src / transport / plugin_transport_unix.c
index 6b90cf1f4d9114d7bcd137da3818d40574819dac..6c0c12065523ada9f9dc960f64d7c0915050d155 100644 (file)
@@ -337,6 +337,39 @@ struct Plugin
 };
 
 
+/**
+ * If a session monitor is attached, notify it about the new
+ * session state.
+ *
+ * @param plugin our plugin
+ * @param session session that changed state
+ * @param state new state of the session
+ */
+static void
+notify_session_monitor (struct Plugin *plugin,
+                        struct Session *session,
+                        enum GNUNET_TRANSPORT_SessionState state)
+{
+  struct GNUNET_TRANSPORT_SessionInfo info;
+
+  if (NULL == plugin->sic)
+    return;
+  memset (&info, 0, sizeof (info));
+  info.state = state;
+  info.is_inbound = GNUNET_SYSERR; /* hard to say */
+  info.num_msg_pending = session->msgs_in_queue;
+  info.num_bytes_pending = session->bytes_in_queue;
+  /* info.receive_delay remains zero as this is not supported by UNIX
+     (cannot selectively not receive from 'some' peer while continuing
+     to receive from others) */
+  info.session_timeout = session->timeout;
+  info.address = session->address;
+  plugin->sic (plugin->sic_cls,
+               session,
+               &info);
+}
+
+
 /**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
@@ -349,9 +382,9 @@ struct Plugin
  * @return string representing the same address
  */
 static const char *
-unix_address_to_string (void *cls,
-                        const void *addr,
-                        size_t addrlen)
+unix_plugin_address_to_string (void *cls,
+                               const void *addr,
+                               size_t addrlen)
 {
   static char rbuf[1024];
   struct UnixAddress *ua = (struct UnixAddress *) addr;
@@ -404,13 +437,13 @@ unix_address_to_string (void *cls,
  * to close a session due to a disconnect or failure to
  * establish a connection.
  *
- * @param cls closure with the `struct Plugin`
+ * @param cls closure with the `struct Plugin *`
  * @param session session to close down
  * @return #GNUNET_OK on success
  */
 static int
-unix_session_disconnect (void *cls,
-                         struct Session *session)
+unix_plugin_session_disconnect (void *cls,
+                                struct Session *session)
 {
   struct Plugin *plugin = cls;
   struct UNIXMessageWrapper *msgw;
@@ -419,9 +452,9 @@ unix_session_disconnect (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Disconnecting session for peer `%s' `%s'\n",
        GNUNET_i2s (&session->target),
-       unix_address_to_string (NULL,
-                               session->address->address,
-                               session->address->address_length));
+       unix_plugin_address_to_string (NULL,
+                                      session->address->address,
+                                      session->address->address_length));
   plugin->env->session_end (plugin->env->cls,
                             session->address,
                             session);
@@ -460,7 +493,11 @@ unix_session_disconnect (void *cls,
   {
     GNUNET_SCHEDULER_cancel (session->timeout_task);
     session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    session->timeout = GNUNET_TIME_UNIT_ZERO_ABS;
   }
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_DOWN);
   GNUNET_HELLO_address_free (session->address);
   GNUNET_break (0 == session->bytes_in_queue);
   GNUNET_break (0 == session->msgs_in_queue);
@@ -472,50 +509,56 @@ unix_session_disconnect (void *cls,
 /**
  * Session was idle for too long, so disconnect it
  *
- * @param cls the 'struct Session' to disconnect
+ * @param cls the `struct Session *` to disconnect
  * @param tc scheduler context
  */
 static void
 session_timeout (void *cls,
                 const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct Session *s = cls;
+  struct Session *session = cls;
   struct GNUNET_TIME_Relative left;
 
-  s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  left = GNUNET_TIME_absolute_get_remaining (s->timeout);
+  session->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  left = GNUNET_TIME_absolute_get_remaining (session->timeout);
   if (0 != left.rel_value_us)
     {
-      /* not actually our turn yet */
-      s->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
-                                                      &session_timeout,
-                                                      s);
+      /* not actually our turn yet, but let's at least update
+         the monitor, it may think we're about to die ... */
+      notify_session_monitor (session->plugin,
+                              session,
+                              GNUNET_TRANSPORT_SS_UP);
+      session->timeout_task = GNUNET_SCHEDULER_add_delayed (left,
+                                                            &session_timeout,
+                                                            session);
       return;
     }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Session %p was idle for %s, disconnecting\n",
-       s,
+       session,
        GNUNET_STRINGS_relative_time_to_string (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
                                               GNUNET_YES));
-  unix_session_disconnect (s->plugin, s);
+  unix_plugin_session_disconnect (session->plugin, session);
 }
 
 
 /**
- * Increment session timeout due to activity
+ * Increment session timeout due to activity.  We do not immediately
+ * notify the monitor here as that might generate excessive
+ * signalling.
  *
- * @param s session for which the timeout should be rescheduled
+ * @param session session for which the timeout should be rescheduled
  */
 static void
-reschedule_session_timeout (struct Session *s)
+reschedule_session_timeout (struct Session *session)
 {
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
-  s->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != session->timeout_task);
+  session->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
 }
 
 
 /**
- * Convert unix path to a `struct sockaddr_un`
+ * Convert unix path to a `struct sockaddr_un *`
  *
  * @param unixpath path to convert
  * @param sock_len[out] set to the length of the address
@@ -576,11 +619,12 @@ lookup_session_it (void *cls,
                   void *value)
 {
   struct LookupCtx *lctx = cls;
-  struct Session *s = value;
+  struct Session *session = value;
 
-  if (0 == GNUNET_HELLO_address_cmp (lctx->address, s->address))
+  if (0 == GNUNET_HELLO_address_cmp (lctx->address,
+                                     session->address))
   {
-    lctx->res = s;
+    lctx->res = session;
     return GNUNET_NO;
   }
   return GNUNET_YES;
@@ -618,7 +662,7 @@ lookup_session (struct Plugin *plugin,
  * @return keepalive factor
  */
 static unsigned int
-unix_query_keepalive_factor (void *cls)
+unix_plugin_query_keepalive_factor (void *cls)
 {
   return 3;
 }
@@ -764,8 +808,8 @@ resend:
  * @return the network type in HBO or #GNUNET_SYSERR
  */
 static enum GNUNET_ATS_Network_Type
-unix_get_network (void *cls,
-                 struct Session *session)
+unix_plugin_get_network (void *cls,
+                         struct Session *session)
 {
   GNUNET_assert (NULL != session);
   return GNUNET_ATS_NET_LOOPBACK;
@@ -785,7 +829,7 @@ unix_plugin_get_session (void *cls,
                         const struct GNUNET_HELLO_Address *address)
 {
   struct Plugin *plugin = cls;
-  struct Session *s;
+  struct Session *session;
   struct UnixAddress *ua;
   char * addrstr;
   uint32_t addr_str_len;
@@ -825,41 +869,43 @@ unix_plugin_get_session (void *cls,
   }
 
   /* Check if a session for this address already exists */
-  if (NULL != (s = lookup_session (plugin,
-                                   address)))
-  {
+  if (NULL != (session = lookup_session (plugin,
+                                         address)))
+    {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Found existing session %p for address `%s'\n",
-        s,
-        unix_address_to_string (NULL,
-                                 address->address,
-                                 address->address_length));
-    return s;
+        session,
+        unix_plugin_address_to_string (NULL,
+                                        address->address,
+                                        address->address_length));
+    return session;
   }
 
   /* create a new session */
-  s = GNUNET_new (struct Session);
-  s->target = address->peer;
-  s->address = GNUNET_HELLO_address_copy (address);
-  s->plugin = plugin;
-  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
-  s->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                                 &session_timeout,
-                                                 s);
+  session = GNUNET_new (struct Session);
+  session->target = address->peer;
+  session->address = GNUNET_HELLO_address_copy (address);
+  session->plugin = plugin;
+  session->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                        &session_timeout,
+                                                        session);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Creating a new session %p for address `%s'\n",
-       s,
-       unix_address_to_string (NULL,
-                               address->address,
-                               address->address_length));
+       session,
+       unix_plugin_address_to_string (NULL,
+                                      address->address,
+                                      address->address_length));
   (void) GNUNET_CONTAINER_multipeermap_put (plugin->session_map,
-                                           &address->peer, s,
+                                           &address->peer, session,
                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_STATISTICS_set (plugin->env->stats,
                         "# UNIX sessions active",
                         GNUNET_CONTAINER_multipeermap_size (plugin->session_map),
                         GNUNET_NO);
-  return s;
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
+  return session;
 }
 
 
@@ -901,18 +947,19 @@ unix_plugin_update_session_timeout (void *cls,
  * @param ua_len length of the address @a ua
  */
 static void
-unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
+unix_demultiplexer (struct Plugin *plugin,
+                    struct GNUNET_PeerIdentity *sender,
                     const struct GNUNET_MessageHeader *currhdr,
                     const struct UnixAddress *ua, size_t ua_len)
 {
-  struct Session *s = NULL;
+  struct Session *session;
   struct GNUNET_HELLO_Address *address;
 
   GNUNET_break (ntohl(plugin->ats_network.value) != GNUNET_ATS_NET_UNSPECIFIED);
   GNUNET_assert (ua_len >= sizeof (struct UnixAddress));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received message from %s\n",
-       unix_address_to_string (NULL, ua, ua_len));
+       unix_plugin_address_to_string (NULL, ua, ua_len));
   GNUNET_STATISTICS_update (plugin->env->stats,
                            "# bytes received via UNIX",
                            ntohs (currhdr->size),
@@ -923,26 +970,31 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
                                            PLUGIN_NAME,
                                            ua, ua_len,
                                            GNUNET_HELLO_ADDRESS_INFO_NONE); /* UNIX does not have "inbound" sessions */
-  s = lookup_session (plugin, address);
-  if (NULL == s)
+  session = lookup_session (plugin, address);
+  if (NULL == session)
   {
-    s = unix_plugin_get_session (plugin, address);
+    session = unix_plugin_get_session (plugin, address);
     /* Notify transport and ATS about new inbound session */
     plugin->env->session_start (NULL,
-                                s->address,
-                                s,
+                                session->address,
+                                session,
                                 &plugin->ats_network, 1);
+    notify_session_monitor (plugin,
+                            session,
+                            GNUNET_TRANSPORT_SS_UP);
+  }
+  else
+  {
+    reschedule_session_timeout (session);
   }
   GNUNET_HELLO_address_free (address);
-  reschedule_session_timeout (s);
-
   plugin->env->receive (plugin->env->cls,
-                        s->address,
-                        s,
+                        session->address,
+                        session,
                         currhdr);
   plugin->env->update_address_metrics (plugin->env->cls,
-                                       s->address,
-                                       s,
+                                       session->address,
+                                       session,
                                       &plugin->ats_network, 1);
 }
 
@@ -1018,7 +1070,9 @@ unix_plugin_do_read (struct Plugin *plugin)
     return;
   }
   msgbuf = (char *) &msg[1];
-  memcpy (&sender, &msg->sender, sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&sender,
+          &msg->sender,
+          sizeof (struct GNUNET_PeerIdentity));
   offset = 0;
   tsize = csize - sizeof (struct UNIXMessage);
   while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
@@ -1049,12 +1103,15 @@ unix_plugin_do_write (struct Plugin *plugin)
   ssize_t sent = 0;
   struct UNIXMessageWrapper *msgw;
   struct Session *session;
+  int did_delete;
 
+  did_delete = GNUNET_NO;
   while (NULL != (msgw = plugin->msg_head))
   {
     if (GNUNET_TIME_absolute_get_remaining (msgw->timeout).rel_value_us > 0)
       break; /* Message is ready for sending */
     /* Message has a timeout */
+    did_delete = GNUNET_YES;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Timeout for message with %u bytes \n",
         (unsigned int) msgw->msgsize);
@@ -1085,7 +1142,13 @@ unix_plugin_do_write (struct Plugin *plugin)
     GNUNET_free (msgw);
   }
   if (NULL == msgw)
+  {
+    if (GNUNET_YES == did_delete)
+      notify_session_monitor (plugin,
+                              session,
+                              GNUNET_TRANSPORT_SS_UP);
     return; /* Nothing to send at the moment */
+  }
 
   sent = unix_real_send (plugin,
                          plugin->unix_sock.desc,
@@ -1098,12 +1161,14 @@ unix_plugin_do_write (struct Plugin *plugin)
                          msgw->session->address->address_length,
                          msgw->payload,
                          msgw->cont, msgw->cont_cls);
-
   if (RETRY == sent)
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
                              "# UNIX retry attempts",
                              1, GNUNET_NO);
+    notify_session_monitor (plugin,
+                            session,
+                            GNUNET_TRANSPORT_SS_UP);
     return;
   }
   GNUNET_CONTAINER_DLL_remove (plugin->msg_head,
@@ -1118,6 +1183,9 @@ unix_plugin_do_write (struct Plugin *plugin)
   GNUNET_STATISTICS_set (plugin->env->stats,
                          "# bytes currently in UNIX buffers",
                          plugin->bytes_in_queue, GNUNET_NO);
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
   if (GNUNET_SYSERR == sent)
   {
     /* failed and no retry */
@@ -1253,9 +1321,9 @@ unix_plugin_send (void *cls,
     LOG (GNUNET_ERROR_TYPE_ERROR,
         "Invalid session for peer `%s' `%s'\n",
         GNUNET_i2s (&session->target),
-        unix_address_to_string(NULL,
-                                session->address->address,
-                                session->address->address_length));
+        unix_plugin_address_to_string (NULL,
+                                        session->address->address,
+                                        session->address->address_length));
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
@@ -1263,9 +1331,9 @@ unix_plugin_send (void *cls,
        "Sending %u bytes with session for peer `%s' `%s'\n",
        msgbuf_size,
        GNUNET_i2s (&session->target),
-       unix_address_to_string (NULL,
-                               session->address->address,
-                               session->address->address_length));
+       unix_plugin_address_to_string (NULL,
+                                      session->address->address,
+                                      session->address->address_length));
   ssize = sizeof (struct UNIXMessage) + msgbuf_size;
   message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
   message->header.size = htons (ssize);
@@ -1293,6 +1361,9 @@ unix_plugin_send (void *cls,
                         "# bytes currently in UNIX buffers",
                         plugin->bytes_in_queue,
                         GNUNET_NO);
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
   if (GNUNET_SCHEDULER_NO_TASK == plugin->write_task)
     plugin->write_task =
       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1381,9 +1452,9 @@ unix_transport_server_start (void *cls)
  *
  */
 static int
-unix_check_address (void *cls,
-                    const void *addr,
-                    size_t addrlen)
+unix_plugin_check_address (void *cls,
+                           const void *addr,
+                           size_t addrlen)
 {
   struct Plugin* plugin = cls;
   const struct UnixAddress *ua = addr;
@@ -1442,9 +1513,9 @@ unix_plugin_address_pretty_printer (void *cls, const char *type,
   const char *ret;
 
   if ( (NULL != addr) && (addrlen > 0))
-    ret = unix_address_to_string (NULL,
-                                  addr,
-                                  addrlen);
+    ret = unix_plugin_address_to_string (NULL,
+                                         addr,
+                                         addrlen);
   else
     ret = NULL;
   asc (asc_cls,
@@ -1467,10 +1538,10 @@ unix_plugin_address_pretty_printer (void *cls, const char *type,
  * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
  */
 static int
-unix_string_to_address (void *cls,
-                        const char *addr,
-                        uint16_t addrlen,
-                        void **buf, size_t *added)
+unix_plugin_string_to_address (void *cls,
+                               const char *addr,
+                               uint16_t addrlen,
+                               void **buf, size_t *added)
 {
   struct UnixAddress *ua;
   char *address;
@@ -1590,9 +1661,9 @@ get_session_delete_it (void *cls,
                       void *value)
 {
   struct Plugin *plugin = cls;
-  struct Session *s = value;
+  struct Session *session = value;
 
-  unix_session_disconnect (plugin, s);
+  unix_plugin_session_disconnect (plugin, session);
   return GNUNET_YES;
 }
 
@@ -1605,12 +1676,11 @@ get_session_delete_it (void *cls,
  * @return #GNUNET_OK on success, #GNUNET_SYSERR if the operation failed
  */
 static void
-unix_peer_disconnect (void *cls,
-                      const struct GNUNET_PeerIdentity *target)
+unix_plugin_peer_disconnect (void *cls,
+                             const struct GNUNET_PeerIdentity *target)
 {
   struct Plugin *plugin = cls;
 
-  GNUNET_assert (NULL != plugin);
   GNUNET_CONTAINER_multipeermap_get_multiple (plugin->session_map,
                                              target,
                                              &get_session_delete_it, plugin);
@@ -1633,21 +1703,10 @@ send_session_info_iter (void *cls,
 {
   struct Plugin *plugin = cls;
   struct Session *session = value;
-  struct GNUNET_TRANSPORT_SessionInfo info;
 
-  memset (&info, 0, sizeof (info));
-  info.state = GNUNET_TRANSPORT_SS_UP; /* all are up if we have them */
-  info.is_inbound = GNUNET_SYSERR; /* hard to say */
-  info.num_msg_pending = session->msgs_in_queue;
-  info.num_bytes_pending = session->bytes_in_queue;
-  /* info.receive_delay remains zero as this is not supported by UNIX
-     (cannot selectively not receive from 'some' peer while continuing
-     to receive from others) */
-  info.session_timeout = session->timeout;
-  info.address = session->address;
-  plugin->sic (plugin->sic_cls,
-               session,
-               &info);
+  notify_session_monitor (plugin,
+                          session,
+                          GNUNET_TRANSPORT_SS_UP);
   return GNUNET_OK;
 }
 
@@ -1665,18 +1724,22 @@ send_session_info_iter (void *cls,
  * @param sic_cls closure for @a sic
  */
 static void
-unix_setup_monitor (void *cls,
-                    GNUNET_TRANSPORT_SessionInfoCallback sic,
-                    void *sic_cls)
+unix_plugin_setup_monitor (void *cls,
+                           GNUNET_TRANSPORT_SessionInfoCallback sic,
+                           void *sic_cls)
 {
   struct Plugin *plugin = cls;
 
   plugin->sic = sic;
   plugin->sic_cls = sic_cls;
   if (NULL != sic)
+  {
     GNUNET_CONTAINER_multipeermap_iterate (plugin->session_map,
                                            &send_session_info_iter,
                                            plugin);
+    /* signal end of first iteration */
+    sic (sic_cls, NULL, NULL);
+  }
 }
 
 
@@ -1702,8 +1765,8 @@ libgnunet_plugin_transport_unix_init (void *cls)
     api = GNUNET_new (struct GNUNET_TRANSPORT_PluginFunctions);
     api->cls = NULL;
     api->address_pretty_printer = &unix_plugin_address_pretty_printer;
-    api->address_to_string = &unix_address_to_string;
-    api->string_to_address = &unix_string_to_address;
+    api->address_to_string = &unix_plugin_address_to_string;
+    api->string_to_address = &unix_plugin_string_to_address;
     return api;
   }
 
@@ -1737,16 +1800,16 @@ libgnunet_plugin_transport_unix_init (void *cls)
   api->cls = plugin;
   api->get_session = &unix_plugin_get_session;
   api->send = &unix_plugin_send;
-  api->disconnect_peer = &unix_peer_disconnect;
-  api->disconnect_session = &unix_session_disconnect;
-  api->query_keepalive_factor = &unix_query_keepalive_factor;
+  api->disconnect_peer = &unix_plugin_peer_disconnect;
+  api->disconnect_session = &unix_plugin_session_disconnect;
+  api->query_keepalive_factor = &unix_plugin_query_keepalive_factor;
   api->address_pretty_printer = &unix_plugin_address_pretty_printer;
-  api->address_to_string = &unix_address_to_string;
-  api->check_address = &unix_check_address;
-  api->string_to_address = &unix_string_to_address;
-  api->get_network = &unix_get_network;
+  api->address_to_string = &unix_plugin_address_to_string;
+  api->check_address = &unix_plugin_check_address;
+  api->string_to_address = &unix_plugin_string_to_address;
+  api->get_network = &unix_plugin_get_network;
   api->update_session_timeout = &unix_plugin_update_session_timeout;
-  api->setup_monitor = &unix_setup_monitor;
+  api->setup_monitor = &unix_plugin_setup_monitor;
   sockets_created = unix_transport_server_start (plugin);
   if ((0 == sockets_created) || (GNUNET_SYSERR == sockets_created))
   {