more code
[oweals/gnunet.git] / src / transport / plugin_transport_unix.c
index a858b9a2dcbb1f4c8da955628974aa0043d7e227..55787a7f93e8fb9ea4d15bfcd7c8b11f448a3dd1 100644 (file)
@@ -82,9 +82,17 @@ struct UNIXMessage
 
 struct Session
 {
+  struct GNUNET_PeerIdentity target;
+
   void *addr;
   size_t addrlen;
-  struct GNUNET_PeerIdentity target;
+
+  /**
+   * Session timeout task
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  struct Plugin * plugin;
 };
 
 struct UNIXMessageWrapper
@@ -236,8 +244,29 @@ struct Plugin
   struct GNUNET_ATS_Information ats_network;
 
   unsigned int bytes_in_queue;
+  unsigned int bytes_in_sent;
+  unsigned int bytes_in_recv;
+  unsigned int bytes_discarded;
 };
 
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s);
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s);
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s);
+
 
 static void
 unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
@@ -275,38 +304,87 @@ reschedule_select (struct Plugin * plugin)
   }
 }
 
+struct LookupCtx
+{
+  struct Session *s;
+  const struct sockaddr_un *addr;
+};
 
-static int
-get_session_delete_it (void *cls, const GNUNET_HashCode * key, void *value)
+int lookup_session_it (void *cls,
+                       const struct GNUNET_HashCode * key,
+                       void *value)
 {
-  struct Session *s = value;
-  struct UNIXMessageWrapper * msgw;
-  struct Plugin *plugin = cls;
+  struct LookupCtx *lctx = cls;
+  struct Session *t = value;
+
+  if (0 == strcmp (t->addr, lctx->addr->sun_path))
+  {
+    lctx->s = t;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+static struct Session *
+lookup_session (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender, const struct sockaddr_un *addr)
+{
+  struct LookupCtx lctx;
+
+  GNUNET_assert (NULL != plugin);
+  GNUNET_assert (NULL != sender);
+  GNUNET_assert (NULL != addr);
+
+  lctx.s = NULL;
+  lctx.addr = addr;
+
+  GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &sender->hashPubKey, &lookup_session_it, &lctx);
+
+  return lctx.s;
+}
+
+/**
+ * Functions with this signature are called whenever we need
+ * to close a session due to a disconnect or failure to
+ * establish a connection.
+ *
+ * @param s session to close down
+ */
+static void
+disconnect_session (struct Session *s)
+{
+  struct UNIXMessageWrapper *msgw;
+  struct UNIXMessageWrapper *next;
+  struct Plugin * plugin = s->plugin;
   int removed;
   GNUNET_assert (plugin != NULL);
+  GNUNET_assert (s != NULL);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr);
+  stop_session_timeout (s);
   plugin->env->session_end (plugin->env->cls, &s->target, s);
 
   msgw = plugin->msg_head;
   removed = GNUNET_NO;
-  while (NULL != (msgw = plugin->msg_head))
+  next = plugin->msg_head;
+  while (NULL != next)
   {
-    if (msgw->session == s)
-    {
-      GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
-      if (msgw->cont != NULL)
-        msgw->cont (msgw->cont_cls,  &msgw->session->target, GNUNET_SYSERR);
-      GNUNET_free (msgw->msg);
-      GNUNET_free (msgw);
-      removed = GNUNET_YES;
-    }
+    msgw = next;
+    next = msgw->next;
+    if (msgw->session != s)
+      continue;
+    GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
+    if (NULL != msgw->cont)
+      msgw->cont (msgw->cont_cls,  &msgw->session->target, GNUNET_SYSERR);
+    GNUNET_free (msgw->msg);
+    GNUNET_free (msgw);
+    removed = GNUNET_YES;    
   }
   if ((GNUNET_YES == removed) && (NULL == plugin->msg_head))
     reschedule_select (plugin);
 
   GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s));
+                 GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s));
 
   GNUNET_STATISTICS_set(plugin->env->stats,
                         "# UNIX sessions active",
@@ -314,6 +392,13 @@ get_session_delete_it (void *cls, const GNUNET_HashCode * key, void *value)
                         GNUNET_NO);
 
   GNUNET_free (s);
+}
+
+static int
+get_session_delete_it (void *cls, const struct GNUNET_HashCode * key, void *value)
+{
+  struct Session *s = value;
+  disconnect_session (s);
   return GNUNET_YES;
 }
 
@@ -413,7 +498,6 @@ unix_real_send (void *cls,
   size_t sbs;
   struct sockaddr_un un;
   size_t slen;
-  int retry;
 
   GNUNET_assert (NULL != plugin);
 
@@ -455,7 +539,6 @@ unix_real_send (void *cls,
 
   /* Send the data */
   sent = 0;
-  retry = GNUNET_NO;
   sent = GNUNET_NETWORK_socket_sendto (send_handle, msgbuf, msgbuf_size, sb, sbs);
 
   if ((GNUNET_SYSERR == sent) && ((errno == EAGAIN) || (errno == ENOBUFS)))
@@ -512,7 +595,7 @@ unix_real_send (void *cls,
   /* Calling continuation */
   if (cont != NULL)
   {
-    if ((sent == GNUNET_SYSERR) && (retry == GNUNET_NO))
+    if (sent == GNUNET_SYSERR)
       cont (cont_cls, target, GNUNET_SYSERR);
     if (sent > 0)
       cont (cont_cls, target, GNUNET_OK);
@@ -528,11 +611,8 @@ unix_real_send (void *cls,
     return -1;
   }
   /* failed and retry: return 0 */
-  if ((GNUNET_SYSERR == sent) && (retry == GNUNET_YES))
+  if (GNUNET_SYSERR == sent)
     return 0;
-  /* failed and no retry: return -1 */
-  if ((GNUNET_SYSERR == sent) && (retry == GNUNET_NO))
-    return -1;
   /* default */
   return -1;
 }
@@ -546,7 +626,7 @@ struct gsi_ctx
 
 
 static int
-get_session_it (void *cls, const GNUNET_HashCode * key, void *value)
+get_session_it (void *cls, const struct GNUNET_HashCode * key, void *value)
 {
   struct gsi_ctx *gsi = cls;
   struct Session *s = value;
@@ -593,13 +673,15 @@ unix_plugin_get_session (void *cls,
   }
 
   /* Create a new session */
-
   s = GNUNET_malloc (sizeof (struct Session) + address->address_length);
   s->addr = &s[1];
   s->addrlen = address->address_length;
+  s->plugin = plugin;
   memcpy(s->addr, address->address, s->addrlen);
   memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity));
 
+  start_session_timeout (s);
+
   GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
       &address->peer.hashPubKey, s,
       GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
@@ -670,7 +752,11 @@ unix_plugin_send (void *cls,
   if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map,
       &session->target.hashPubKey, session))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Invalid session for peer `%s' `%s'\n",
+                GNUNET_i2s (&session->target),
+                (char *) session->addr);
     GNUNET_break (0);
+
     return GNUNET_SYSERR;
   }
 
@@ -682,6 +768,7 @@ unix_plugin_send (void *cls,
           sizeof (struct GNUNET_PeerIdentity));
   memcpy (&message[1], msgbuf, msgbuf_size);
 
+  reschedule_session_timeout (session);
 
   wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
   wrapper->msg = message;
@@ -723,6 +810,8 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
                     const struct sockaddr_un *un, size_t fromlen)
 {
   struct GNUNET_ATS_Information ats[2];
+  struct Session *s = NULL;
+  struct GNUNET_HELLO_Address * addr;
 
   ats[0].type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
   ats[0].value = htonl (UNIX_DIRECT_DISTANCE);
@@ -733,9 +822,21 @@ unix_demultiplexer (struct Plugin *plugin, struct GNUNET_PeerIdentity *sender,
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message from %s\n",
               un->sun_path);
+
+  plugin->bytes_in_recv += ntohs(currhdr->size);
+  GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes received",
+      plugin->bytes_in_recv, GNUNET_NO);
+
+  addr = GNUNET_HELLO_address_allocate(sender, "unix", un->sun_path, strlen (un->sun_path) + 1);
+  s = lookup_session (plugin, sender, un);
+  if (NULL == s)
+    s = unix_plugin_get_session (plugin, addr);
+  reschedule_session_timeout (s);
+
   plugin->env->receive (plugin->env->cls, sender, currhdr,
                         (const struct GNUNET_ATS_Information *) &ats, 2,
-                        NULL, un->sun_path, strlen (un->sun_path) + 1);
+                        s, un->sun_path, strlen (un->sun_path) + 1);
+  GNUNET_free (addr);
 }
 
 
@@ -801,6 +902,7 @@ unix_plugin_select_read (struct Plugin * plugin)
       GNUNET_break_op (0);
       break;
     }
+
     unix_demultiplexer (plugin, &sender, currhdr, &un, sizeof (un));
     offset += csize;
   }
@@ -851,6 +953,9 @@ unix_plugin_select_write (struct Plugin * plugin)
     plugin->bytes_in_queue -= msgw->msgsize;
     GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes in send queue",
         plugin->bytes_in_queue, GNUNET_NO);
+    plugin->bytes_discarded += msgw->msgsize;
+    GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes discarded",
+        plugin->bytes_discarded, GNUNET_NO);
 
     GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
@@ -866,6 +971,9 @@ unix_plugin_select_write (struct Plugin * plugin)
     plugin->bytes_in_queue -= msgw->msgsize;
     GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes in send queue",
         plugin->bytes_in_queue, GNUNET_NO);
+    plugin->bytes_in_sent += msgw->msgsize;
+    GNUNET_STATISTICS_set (plugin->env->stats,"# UNIX bytes sent",
+        plugin->bytes_in_sent, GNUNET_NO);
 
     GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
@@ -1111,7 +1219,81 @@ address_notification (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   plugin->env->notify_address (plugin->env->cls, GNUNET_YES,
                                plugin->unix_socket_path,
-                               strlen (plugin->unix_socket_path) + 1);
+                               strlen (plugin->unix_socket_path) + 1,
+                               "unix");
+}
+
+
+/**
+ * Session was idle, so disconnect it
+ */
+static void
+session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_assert (NULL != cls);
+  struct Session *s = cls;
+
+  s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Session %p was idle for %llu ms, disconnecting\n",
+              s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+  /* call session destroy function */
+  disconnect_session(s);
+}
+
+
+/**
+ * Start session timeout
+ */
+static void
+start_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Timeout for session %p set to %llu ms\n",
+              s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+
+/**
+ * Increment session timeout due to activity
+ */
+static void
+reschedule_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
+
+  GNUNET_SCHEDULER_cancel (s->timeout_task);
+  s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                                   &session_timeout,
+                                                   s);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Timeout rescheduled for session %p set to %llu ms\n",
+              s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+}
+
+
+/**
+ * Cancel timeout
+ */
+static void
+stop_session_timeout (struct Session *s)
+{
+  GNUNET_assert (NULL != s);
+
+  if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
+  {
+    GNUNET_SCHEDULER_cancel (s->timeout_task);
+    s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Timeout stopped for session %p canceled\n",
+                s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
+  }
 }
 
 /**