implemented sessions
authorMatthias Wachs <wachs@net.in.tum.de>
Fri, 27 Jan 2012 15:51:30 +0000 (15:51 +0000)
committerMatthias Wachs <wachs@net.in.tum.de>
Fri, 27 Jan 2012 15:51:30 +0000 (15:51 +0000)
src/transport/plugin_transport_unix.c

index 0c8722eab7afb2e7c39d7a825acc5346bcdeaeb5..e4d7564089ae871a7f5b06b99a59c5b4f2acd638 100644 (file)
@@ -102,13 +102,9 @@ struct UNIXMessageWrapper
 
   int retry_counter;
 
-  struct GNUNET_PeerIdentity target;
-
   struct GNUNET_TIME_Relative timeout;
   unsigned int priority;
 
-  void *addr;
-  size_t addrlen;
   struct Session *session;
   GNUNET_TRANSPORT_TransmitContinuation cont;
   void *cont_cls;
@@ -250,6 +246,27 @@ struct Plugin
   struct GNUNET_ATS_Information ats_network;
 };
 
+
+static int
+get_session_delete_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  struct Session *s = value;
+  struct Plugin *plugin = cls;
+  GNUNET_assert (plugin != NULL);
+
+#if DEBUG_UNIX
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr);
+#endif
+
+  plugin->env->session_end (plugin->env->cls, &s->target, s);
+
+  GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s);
+
+  GNUNET_free (s);
+
+  return GNUNET_YES;
+}
+
 /**
  * Disconnect from a remote node.  Clean up session if we have one for this peer
  *
@@ -260,7 +277,10 @@ struct Plugin
 void
 unix_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
 {
-  /** TODO: Implement! */
+  struct Plugin *plugin = cls;
+  GNUNET_assert (plugin != NULL);
+
+  GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_delete_it, plugin);
   return;
 }
 
@@ -284,7 +304,7 @@ unix_transport_server_stop (void *cls)
   {
     GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
     if (msgw->cont != NULL)
-      msgw->cont (msgw->cont_cls,  &msgw->target, GNUNET_SYSERR);
+      msgw->cont (msgw->cont_cls,  &msgw->session->target, GNUNET_SYSERR);
     GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
   }
@@ -432,7 +452,7 @@ unix_real_send (void *cls,
   }
 
 #if DEBUG_UNIX
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "UNIX transmit %u-byte message to %s (%d: %s)\n",
               (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent,
               (sent < 0) ? STRERROR (errno) : "ok");
@@ -461,7 +481,8 @@ unix_real_send (void *cls,
 
 struct gsi_ctx
 {
-  const struct GNUNET_HELLO_Address *address;
+  char *address;
+  size_t addrlen;
   struct Session *res;
 };
 
@@ -471,8 +492,11 @@ get_session_it (void *cls, const GNUNET_HashCode * key, void *value)
   struct gsi_ctx *gsi = cls;
   struct Session *s = value;
 
-  if ((gsi->address->address_length == s->addrlen) &&
-      (0 == memcmp (gsi->address->address, s->addr, s->addrlen)))
+#if DEBUG_UNIX
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing session %s %s\n", gsi->address, s->addr);
+#endif
+  if ((gsi->addrlen == s->addrlen) &&
+      (0 == memcmp (gsi->address, s->addr, s->addrlen)))
   {
     gsi->res = s;
     return GNUNET_NO;
@@ -501,14 +525,33 @@ unix_plugin_get_session (void *cls,
   GNUNET_assert (address != NULL);
 
   /* Check if already existing */
-  gsi.address = address;
+  gsi.address = (char *) address->address;
+  gsi.addrlen = address->address_length;
   gsi.res = NULL;
   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &address->peer.hashPubKey, &get_session_it, &gsi);
+  if (gsi.res != NULL)
+  {
+#if DEBUG_UNIX
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n");
+#endif
+    return gsi.res;
+  }
 
   /* Create a new session */
 
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
-  GNUNET_break (0);
+  s = GNUNET_malloc (sizeof (struct Session) + address->address_length);
+  s->addr = &s[1];
+  s->addrlen = address->address_length;
+  memcpy(s->addr, address->address, s->addrlen);
+  memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity));
+
+  GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
+      &address->peer.hashPubKey, s,
+      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+#if DEBUG_UNIX
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new session\n");
+#endif
+
   return s;
 }
 
@@ -547,10 +590,47 @@ unix_plugin_send (void *cls,
                   struct GNUNET_TIME_Relative to,
                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
 {
-  ssize_t sent = -1;
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
-  GNUNET_break (0);
-  return sent;
+  struct Plugin *plugin = cls;
+  struct UNIXMessageWrapper *wrapper;
+  struct UNIXMessage *message;
+  int ssize;
+
+  GNUNET_assert (plugin != NULL);
+  GNUNET_assert (session != NULL);
+
+  if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map,
+      &session->target.hashPubKey, session))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+
+  ssize = sizeof (struct UNIXMessage) + msgbuf_size;
+  message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
+  message->header.size = htons (ssize);
+  message->header.type = htons (0);
+  memcpy (&message->sender, plugin->env->my_identity,
+          sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&message[1], msgbuf, msgbuf_size);
+
+  wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
+  wrapper->msg = message;
+  wrapper->msgsize = ssize;
+  wrapper->priority = priority;
+  wrapper->timeout = to;
+  wrapper->cont = cont;
+  wrapper->cont_cls = cont_cls;
+  wrapper->retry_counter = 0;
+  wrapper->session = session;
+
+  GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
+
+#if DEBUG_UNIX
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", sent,
+              (char *) session->addr);
+#endif
+
+  return ssize;
 }
 
 
@@ -589,8 +669,7 @@ unix_plugin_send_old (void *cls, const struct GNUNET_PeerIdentity *target,
   struct UNIXMessage *message;
   struct UNIXMessageWrapper *wrapper;
   int ssize;
-
-  GNUNET_assert (NULL == session);
+  struct gsi_ctx gsi;
 
   /* Build the message to be sent */
   wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper) + addrlen);
@@ -608,18 +687,41 @@ unix_plugin_send_old (void *cls, const struct GNUNET_PeerIdentity *target,
           sizeof (struct GNUNET_PeerIdentity));
   memcpy (&message[1], msgbuf, msgbuf_size);
 
+  if (session == NULL)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking for existing session\n");
+    gsi.address = (char *) addr;
+    gsi.addrlen = addrlen;
+    gsi.res = NULL;
+    GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_it, &gsi);
+    wrapper->session = gsi.res;
+    if (gsi.res == NULL)
+    {
+      wrapper->session = GNUNET_malloc (sizeof (struct Session) + addrlen);
+      wrapper->session->addr = &wrapper->session[1];
+      wrapper->session->addrlen = addrlen;
+      memcpy(wrapper->session->addr, addr, wrapper->session->addrlen);
+      memcpy(&wrapper->session->target, target, sizeof (struct GNUNET_PeerIdentity));
+      GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
+          &target->hashPubKey, wrapper->session,
+          GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Created new session for `%s'\n", addr);
+    }
+    else
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n");
+
+  }
+  else
+    wrapper->session = session;
+
   wrapper->msg = message;
   wrapper->msgsize = ssize;
   wrapper->priority = priority;
   wrapper->timeout = timeout;
   wrapper->cont = cont;
   wrapper->cont_cls = cont_cls;
-  wrapper->addr = &wrapper[1];
-  wrapper->addrlen = addrlen;
   wrapper->retry_counter = 0;
-  memcpy (&wrapper->target, target, sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&wrapper[1], addr, addrlen);
-
   GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
 
 #if DEBUG_UNIX
@@ -737,19 +839,20 @@ unix_plugin_select_write (struct Plugin * plugin)
 
   sent = unix_real_send (plugin,
                          plugin->unix_sock.desc,
-                         &msgw->target,
+                         &msgw->session->target,
                          (const char *) msgw->msg,
                          msgw->msgsize,
                          msgw->priority,
                          msgw->timeout,
-                         msgw->addr,
-                         msgw->addrlen,
+                         msgw->session->addr,
+                         msgw->session->addrlen,
                          msgw->cont, msgw->cont_cls);
 
   /* successfully sent bytes */
   if (sent > 0)
   {
     GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+    GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
     return;
   }
@@ -757,9 +860,10 @@ unix_plugin_select_write (struct Plugin * plugin)
   /* max retries */
   if (msgw->retry_counter > MAX_RETRIES)
   {
-    msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
+    msgw->cont (msgw->cont_cls, &msgw->session->target, GNUNET_SYSERR);
     GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
     GNUNET_break (0);
+    GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
     return;
   }
@@ -768,6 +872,7 @@ unix_plugin_select_write (struct Plugin * plugin)
   if (sent == -1)
   {
     GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+    GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
     return;
   }
@@ -1039,6 +1144,7 @@ libgnunet_plugin_transport_unix_done (void *cls)
 
   unix_transport_server_stop (plugin);
 
+  GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, &get_session_delete_it, plugin);
   GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map);
 
   GNUNET_NETWORK_fdset_destroy (plugin->rs);