-makefile for new test_stream_local (commented)
[oweals/gnunet.git] / src / transport / plugin_transport_unix.c
index 0c8722eab7afb2e7c39d7a825acc5346bcdeaeb5..b7239ee9a24eccab9021bb5b47d47272ea3ab500 100644 (file)
@@ -42,7 +42,7 @@
 #include "gnunet_transport_plugin.h"
 #include "transport.h"
 
-#define DEBUG_UNIX GNUNET_EXTRA_LOGGING
+#define DEBUG_UNIX GNUNET_EXTRALOGGING
 #define DETAILS GNUNET_NO
 
 #define MAX_PROBES 20
@@ -64,8 +64,6 @@
  */
 #define UNIX_NAT_DEFAULT_PORT 22086
 
-#define MAX_RETRIES 5
-
 GNUNET_NETWORK_STRUCT_BEGIN
 
 /**
@@ -100,15 +98,9 @@ struct UNIXMessageWrapper
   struct UNIXMessage * msg;
   size_t msgsize;
 
-  int retry_counter;
-
-  struct GNUNET_PeerIdentity target;
-
   struct GNUNET_TIME_Relative timeout;
   unsigned int priority;
 
-  void *addr;
-  size_t addrlen;
   struct Session *session;
   GNUNET_TRANSPORT_TransmitContinuation cont;
   void *cont_cls;
@@ -231,6 +223,8 @@ struct Plugin
    */
   struct GNUNET_NETWORK_FDSet *ws;
 
+  int with_ws;
+
   /**
    * socket that we transmit all data with
    */
@@ -250,6 +244,28 @@ struct Plugin
   struct GNUNET_ATS_Information ats_network;
 };
 
+
+static int
+get_session_delete_it (void *cls, const GNUNET_HashCode * key, void *value)
+{
+  struct Session *s = value;
+  struct Plugin *plugin = cls;
+  GNUNET_assert (plugin != NULL);
+
+#if DEBUG_UNIX
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting session for peer `%s' `%s' \n", GNUNET_i2s (&s->target), s->addr);
+#endif
+
+  plugin->env->session_end (plugin->env->cls, &s->target, s);
+
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove(plugin->session_map, &s->target.hashPubKey, s));
+
+  GNUNET_free (s);
+
+  return GNUNET_YES;
+}
+
 /**
  * Disconnect from a remote node.  Clean up session if we have one for this peer
  *
@@ -260,7 +276,10 @@ struct Plugin
 void
 unix_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
 {
-  /** TODO: Implement! */
+  struct Plugin *plugin = cls;
+  GNUNET_assert (plugin != NULL);
+
+  GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &target->hashPubKey, &get_session_delete_it, plugin);
   return;
 }
 
@@ -284,7 +303,7 @@ unix_transport_server_stop (void *cls)
   {
     GNUNET_CONTAINER_DLL_remove (plugin->msg_head, plugin->msg_tail, msgw);
     if (msgw->cont != NULL)
-      msgw->cont (msgw->cont_cls,  &msgw->target, GNUNET_SYSERR);
+      msgw->cont (msgw->cont_cls,  &msgw->session->target, GNUNET_SYSERR);
     GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
   }
@@ -298,7 +317,7 @@ unix_transport_server_stop (void *cls)
   GNUNET_break (GNUNET_OK ==
                 GNUNET_NETWORK_socket_close (plugin->unix_sock.desc));
   plugin->unix_sock.desc = NULL;
-
+  plugin->with_ws = GNUNET_NO;
   return GNUNET_OK;
 }
 
@@ -325,7 +344,6 @@ find_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *peer)
  * send_handle squared away!
  *
  * @param cls closure
- * @param incoming_retry_context the retry context to use
  * @param send_handle which handle to send message on
  * @param target who should receive this message (ignored by UNIX)
  * @param msgbuf one or more GNUNET_MessageHeader(s) strung together
@@ -416,11 +434,9 @@ unix_real_send (void *cls,
 
     if (size < msgbuf_size)
     {
-#if DEBUG_UNIX
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Trying to increase socket buffer size from %i to %i for message size %i\n",
-                  size, ((ssize / 1000) + 2) * 1000, ssize);
-#endif
+                  size, ((msgbuf_size / 1000) + 2) * 1000, msgbuf_size);
       size = ((msgbuf_size / 1000) + 2) * 1000;
       if (GNUNET_NETWORK_socket_setsockopt
           ((struct GNUNET_NETWORK_Handle *) send_handle, SOL_SOCKET, SO_SNDBUF,
@@ -432,7 +448,7 @@ unix_real_send (void *cls,
   }
 
 #if DEBUG_UNIX
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "UNIX transmit %u-byte message to %s (%d: %s)\n",
               (unsigned int) msgbuf_size, GNUNET_a2s (sb, sbs), (int) sent,
               (sent < 0) ? STRERROR (errno) : "ok");
@@ -461,7 +477,8 @@ unix_real_send (void *cls,
 
 struct gsi_ctx
 {
-  const struct GNUNET_HELLO_Address *address;
+  char *address;
+  size_t addrlen;
   struct Session *res;
 };
 
@@ -471,8 +488,11 @@ get_session_it (void *cls, const GNUNET_HashCode * key, void *value)
   struct gsi_ctx *gsi = cls;
   struct Session *s = value;
 
-  if ((gsi->address->address_length == s->addrlen) &&
-      (0 == memcmp (gsi->address->address, s->addr, s->addrlen)))
+#if DEBUG_UNIX
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Comparing session %s %s\n", gsi->address, s->addr);
+#endif
+  if ((gsi->addrlen == s->addrlen) &&
+      (0 == memcmp (gsi->address, s->addr, s->addrlen)))
   {
     gsi->res = s;
     return GNUNET_NO;
@@ -501,17 +521,48 @@ unix_plugin_get_session (void *cls,
   GNUNET_assert (address != NULL);
 
   /* Check if already existing */
-  gsi.address = address;
+  gsi.address = (char *) address->address;
+  gsi.addrlen = address->address_length;
   gsi.res = NULL;
   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->session_map, &address->peer.hashPubKey, &get_session_it, &gsi);
+  if (gsi.res != NULL)
+  {
+#if DEBUG_UNIX
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found existing session\n");
+#endif
+    return gsi.res;
+  }
 
   /* Create a new session */
 
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
-  GNUNET_break (0);
+  s = GNUNET_malloc (sizeof (struct Session) + address->address_length);
+  s->addr = &s[1];
+  s->addrlen = address->address_length;
+  memcpy(s->addr, address->address, s->addrlen);
+  memcpy(&s->target, &address->peer, sizeof (struct GNUNET_PeerIdentity));
+
+  GNUNET_CONTAINER_multihashmap_put (plugin->session_map,
+      &address->peer.hashPubKey, s,
+      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+#if DEBUG_UNIX
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new session\n");
+#endif
+
   return s;
 }
 
+/*
+ * @param cls the plugin handle
+ * @param tc the scheduling context (for rescheduling this function again)
+ *
+ * We have been notified that our writeset has something to read.  We don't
+ * know which socket needs to be read, so we have to check each one
+ * Then reschedule this function to be called again once more is available.
+ *
+ */
+static void
+unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
 /**
  * Function that can be used by the transport service to transmit
  * a message using the plugin.   Note that in the case of a
@@ -546,86 +597,60 @@ unix_plugin_send (void *cls,
                   unsigned int priority,
                   struct GNUNET_TIME_Relative to,
                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
-{
-  ssize_t sent = -1;
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "To be implemented\n");
-  GNUNET_break (0);
-  return sent;
-}
-
-
-/**
- * Function that can be used by the transport service to transmit
- * a message using the plugin.
- *
- * @param cls closure
- * @param target who should receive this message (ignored by UNIX)
- * @param msgbuf one or more GNUNET_MessageHeader(s) strung together
- * @param msgbuf_size the size of the msgbuf to send
- * @param priority how important is the message (ignored by UNIX)
- * @param timeout when should we time out (give up) if we can not transmit?
- * @param session identifier used for this session (can be NULL)
- * @param addr the addr to send the message to, needs to be a sockaddr for us
- * @param addrlen the len of addr
- * @param force_address not used, we had better have an address to send to
- *        because we are stateless!!
- * @param cont continuation to call once the message has
- *        been transmitted (or if the transport is ready
- *        for the next transmission call; or if the
- *        peer disconnected...)
- * @param cont_cls closure for cont
- *
- * @return the number of bytes written (may return 0 and the message can
- *         still be transmitted later!)
- */
-static ssize_t
-unix_plugin_send_old (void *cls, const struct GNUNET_PeerIdentity *target,
-                  const char *msgbuf, size_t msgbuf_size, unsigned int priority,
-                  struct GNUNET_TIME_Relative timeout, struct Session *session,
-                  const void *addr, size_t addrlen, int force_address,
-                  GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  struct UNIXMessage *message;
   struct UNIXMessageWrapper *wrapper;
+  struct UNIXMessage *message;
   int ssize;
 
-  GNUNET_assert (NULL == session);
-
-  /* Build the message to be sent */
-  wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper) + addrlen);
-  message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
-  ssize = sizeof (struct UNIXMessage) + msgbuf_size;
+  GNUNET_assert (plugin != NULL);
+  GNUNET_assert (session != NULL);
 
-#if DEBUG_UNIX
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to send message to `%s'\n",
-              (char *) addr);
-#endif
+  if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_contains_value(plugin->session_map,
+      &session->target.hashPubKey, session))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
 
+  ssize = sizeof (struct UNIXMessage) + msgbuf_size;
+  message = GNUNET_malloc (sizeof (struct UNIXMessage) + msgbuf_size);
   message->header.size = htons (ssize);
   message->header.type = htons (0);
   memcpy (&message->sender, plugin->env->my_identity,
           sizeof (struct GNUNET_PeerIdentity));
   memcpy (&message[1], msgbuf, msgbuf_size);
 
+  wrapper = GNUNET_malloc (sizeof (struct UNIXMessageWrapper));
   wrapper->msg = message;
   wrapper->msgsize = ssize;
   wrapper->priority = priority;
-  wrapper->timeout = timeout;
+  wrapper->timeout = to;
   wrapper->cont = cont;
   wrapper->cont_cls = cont_cls;
-  wrapper->addr = &wrapper[1];
-  wrapper->addrlen = addrlen;
-  wrapper->retry_counter = 0;
-  memcpy (&wrapper->target, target, sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&wrapper[1], addr, addrlen);
+  wrapper->session = session;
 
   GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, wrapper);
 
 #if DEBUG_UNIX
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", sent,
-              (char *) addr);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent %d bytes to `%s'\n", ssize,
+              (char *) session->addr);
 #endif
+
+  if (plugin->with_ws == GNUNET_NO)
+  {
+    if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
+      GNUNET_SCHEDULER_cancel(plugin->select_task);
+
+    plugin->select_task =
+        GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
+                                     GNUNET_SCHEDULER_NO_TASK,
+                                     GNUNET_TIME_UNIT_FOREVER_REL,
+                                     plugin->rs,
+                                     plugin->ws,
+                                     &unix_plugin_select, plugin);
+    plugin->with_ws = GNUNET_YES;
+  }
   return ssize;
 }
 
@@ -685,6 +710,9 @@ unix_plugin_select_read (struct Plugin * plugin)
       GNUNET_NETWORK_socket_recvfrom (plugin->unix_sock.desc, buf, sizeof (buf),
                                       (struct sockaddr *) &un, &addrlen);
 
+  if ((GNUNET_SYSERR == ret) && ((errno == EAGAIN) || (errno == ENOBUFS)))
+    return;
+
   if (ret == GNUNET_SYSERR)
   {
     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "recvfrom");
@@ -737,29 +765,20 @@ unix_plugin_select_write (struct Plugin * plugin)
 
   sent = unix_real_send (plugin,
                          plugin->unix_sock.desc,
-                         &msgw->target,
+                         &msgw->session->target,
                          (const char *) msgw->msg,
                          msgw->msgsize,
                          msgw->priority,
                          msgw->timeout,
-                         msgw->addr,
-                         msgw->addrlen,
+                         msgw->session->addr,
+                         msgw->session->addrlen,
                          msgw->cont, msgw->cont_cls);
 
   /* successfully sent bytes */
   if (sent > 0)
   {
     GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
-    GNUNET_free (msgw);
-    return;
-  }
-
-  /* max retries */
-  if (msgw->retry_counter > MAX_RETRIES)
-  {
-    msgw->cont (msgw->cont_cls, &msgw->target, GNUNET_SYSERR);
-    GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
-    GNUNET_break (0);
+    GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
     return;
   }
@@ -768,17 +787,14 @@ unix_plugin_select_write (struct Plugin * plugin)
   if (sent == -1)
   {
     GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, msgw);
+    GNUNET_free (msgw->msg);
     GNUNET_free (msgw);
     return;
   }
 
   /* failed and retry */
   if (sent == 0)
-  {
-    msgw->retry_counter++;
     return;
-  }
-
 }
 
 /*
@@ -799,7 +815,7 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
     return;
 
-
+  plugin->with_ws = GNUNET_NO;
   if ((tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY) != 0)
   {
     GNUNET_assert (GNUNET_NETWORK_fdset_isset
@@ -815,11 +831,17 @@ unix_plugin_select (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     unix_plugin_select_read (plugin);
   }
 
+  if (plugin->select_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (plugin->select_task);
   plugin->select_task =
       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
                                    GNUNET_SCHEDULER_NO_TASK,
-                                   GNUNET_TIME_UNIT_FOREVER_REL, plugin->rs,
-                                   plugin->ws, &unix_plugin_select, plugin);
+                                   GNUNET_TIME_UNIT_FOREVER_REL,
+                                   plugin->rs,
+                                   (plugin->msg_head != NULL) ? plugin->ws : NULL,
+                                   &unix_plugin_select, plugin);
+  if (plugin->msg_head != NULL)
+    plugin->with_ws = GNUNET_YES;
 }
 
 /**
@@ -885,8 +907,12 @@ unix_transport_server_start (void *cls)
   plugin->select_task =
       GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
                                    GNUNET_SCHEDULER_NO_TASK,
-                                   GNUNET_TIME_UNIT_FOREVER_REL, plugin->rs,
-                                   plugin->ws, &unix_plugin_select, plugin);
+                                   GNUNET_TIME_UNIT_FOREVER_REL,
+                                   plugin->rs,
+                                   NULL,
+                                   &unix_plugin_select, plugin);
+  plugin->with_ws = GNUNET_NO;
+
   return 1;
 }
 
@@ -952,6 +978,42 @@ unix_plugin_address_pretty_printer (void *cls, const char *type,
 
 }
 
+/**
+ * Function called to convert a string address to
+ * a binary address.
+ *
+ * @param cls closure ('struct Plugin*')
+ * @param addr string address
+ * @param addrlen length of the address
+ * @param buf location to store the buffer
+ *        If the function returns GNUNET_SYSERR, its contents are undefined.
+ * @param added length of created address
+ * @return GNUNET_OK on success, GNUNET_SYSERR on failure
+ */
+int
+unix_string_to_address (void *cls, const char *addr, uint16_t addrlen,
+    void **buf, size_t *added)
+{
+  if ((NULL == addr) || (addrlen == 0))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+
+  if ((strlen (addr) + 1) != addrlen)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+
+  (*buf) = strdup (addr);
+  (*added) = strlen (addr) + 1;
+  return GNUNET_OK;
+}
+
+
+
+
 /**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
@@ -1001,6 +1063,18 @@ libgnunet_plugin_transport_unix_init (void *cls)
   struct Plugin *plugin;
   int sockets_created;
 
+  if (NULL == env->receive)
+  {
+    /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
+       initialze the plugin or the API */
+    api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
+    api->cls = NULL;
+    api->address_pretty_printer = &unix_plugin_address_pretty_printer;
+    api->address_to_string = &unix_address_to_string;
+    api->string_to_address = NULL; // FIXME!
+    return api;
+  }
+
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-unix", "PORT",
                                              &port))
@@ -1015,12 +1089,12 @@ libgnunet_plugin_transport_unix_init (void *cls)
   api->cls = plugin;
 
   api->get_session = &unix_plugin_get_session;
-  api->send_with_session = &unix_plugin_send;
-  api->send = &unix_plugin_send_old;
+  api->send = &unix_plugin_send;
   api->disconnect = &unix_disconnect;
   api->address_pretty_printer = &unix_plugin_address_pretty_printer;
   api->address_to_string = &unix_address_to_string;
   api->check_address = &unix_check_address;
+  api->string_to_address = &unix_string_to_address;
   sockets_created = unix_transport_server_start (plugin);
   if (sockets_created == 0)
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Failed to open UNIX sockets\n"));
@@ -1037,8 +1111,14 @@ libgnunet_plugin_transport_unix_done (void *cls)
   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
 
+  if (NULL == plugin)
+  {
+    GNUNET_free (api);
+    return NULL;
+  }
   unix_transport_server_stop (plugin);
 
+  GNUNET_CONTAINER_multihashmap_iterate (plugin->session_map, &get_session_delete_it, plugin);
   GNUNET_CONTAINER_multihashmap_destroy (plugin->session_map);
 
   GNUNET_NETWORK_fdset_destroy (plugin->rs);