REST/NAMESTORE: rework API
[oweals/gnunet.git] / src / transport / gnunet-communicator-unix.c
index cd3ae5dcedb412485153e68e137112538500500c..642703f29426184025f8aa6657c19794c9ab001d 100644 (file)
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      Affero General Public License for more details.
-    
+
      You should have received a copy of the GNU Affero General Public License
      along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+     SPDX-License-Identifier: AGPL3.0-or-later
 */
 
 /**
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
+#include "gnunet_constants.h"
+#include "gnunet_nt_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_transport_communication_service.h"
 
 /**
- * How many messages do we keep at most in the queue to the 
- * transport service before we start to drop (default, 
+ * How many messages do we keep at most in the queue to the
+ * transport service before we start to drop (default,
  * can be changed via the configuration file).
  * Should be _below_ the level of the communicator API, as
  * otherwise we may read messages just to have them dropped
  */
 #define COMMUNICATOR_CONFIG_SECTION "communicator-unix"
 
+/**
+ * Our MTU.
+ */
+#define UNIX_MTU UINT16_MAX
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
@@ -109,17 +117,17 @@ struct Queue
    * if this queue is in the #queue_head DLL.
    */
   const struct GNUNET_MessageHeader *msg;
-  
+
   /**
    * Message queue we are providing for the #ch.
    */
   struct GNUNET_MQ_Handle *mq;
-  
+
   /**
    * handle for this queue with the #ch.
-   */ 
+   */
   struct GNUNET_TRANSPORT_QueueHandle *qh;
-  
+
   /**
    * Number of bytes we currently have in our write queue.
    */
@@ -207,14 +215,14 @@ queue_destroy (struct Queue *queue)
   struct GNUNET_MQ_Handle *mq;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Disconnecting queue for peer `%s'\n",       
+             "Disconnecting queue for peer `%s'\n",
              GNUNET_i2s (&queue->target));
   if (0 != queue->bytes_in_queue)
   {
     GNUNET_CONTAINER_DLL_remove (queue_head,
                                 queue_tail,
                                 queue);
-    queue->bytes_in_queue = 0; 
+    queue->bytes_in_queue = 0;
   }
   if (NULL != (mq = queue->mq))
   {
@@ -226,7 +234,7 @@ queue_destroy (struct Queue *queue)
                                                       &queue->target,
                                                       queue));
   GNUNET_STATISTICS_set (stats,
-                        "# UNIX queues active",
+                        "# queues active",
                         GNUNET_CONTAINER_multipeermap_size (queue_map),
                         GNUNET_NO);
   if (NULL != queue->timeout_task)
@@ -435,7 +443,7 @@ select_write_cb (void *cls)
                                        queue->address_len);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "UNIX transmitted message to %s (%d/%u: %s)\n",
-             GNUNET_i2s (&queue->target),      
+             GNUNET_i2s (&queue->target),
              (int) sent,
              (unsigned int) msg_size,
              (sent < 0) ? STRERROR (errno) : "ok");
@@ -459,7 +467,7 @@ select_write_cb (void *cls)
     /* We should retry later... */
     GNUNET_log_strerror (GNUNET_ERROR_TYPE_DEBUG,
                         "send");
-    return; 
+    return;
   case EMSGSIZE:
     {
       socklen_t size = 0;
@@ -529,10 +537,11 @@ mq_send (struct GNUNET_MQ_Handle *mq,
 
   GNUNET_assert (mq == queue->mq);
   GNUNET_assert (NULL == queue->msg);
-  queue->msg = msg; 
+  queue->msg = msg;
   GNUNET_CONTAINER_DLL_insert (queue_head,
                               queue_tail,
                               queue);
+  GNUNET_assert (NULL != unix_sock);
   if (NULL == write_task)
     write_task =
       GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
@@ -618,26 +627,28 @@ mq_error (void *cls,
  * data to another peer.
  *
  * @param peer the target peer
+ * @param cs inbound or outbound queue
  * @param un the address
  * @param un_len number of bytes in @a un
  * @return the queue or NULL of max connections exceeded
  */
 static struct Queue *
 setup_queue (const struct GNUNET_PeerIdentity *target,
-            const struct sockaddr_un *un,
-            socklen_t un_len)
+             enum GNUNET_TRANSPORT_ConnectionStatus cs,
+             const struct sockaddr_un *un,
+             socklen_t un_len)
 {
   struct Queue *queue;
 
   queue = GNUNET_new (struct Queue);
   queue->target = *target;
   queue->address = GNUNET_memdup (un,
-                                 un_len);
+                                  un_len);
   queue->address_len = un_len;
   (void) GNUNET_CONTAINER_multipeermap_put (queue_map,
-                                           &queue->target,
-                                           queue,
-                                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+                                            &queue->target,
+                                            queue,
+                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_STATISTICS_set (stats,
                         "# queues active",
                         GNUNET_CONTAINER_multipeermap_size (queue_map),
@@ -645,35 +656,37 @@ setup_queue (const struct GNUNET_PeerIdentity *target,
   queue->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
   queue->timeout_task
     = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                   &queue_timeout,
-                                   queue);
+                                    &queue_timeout,
+                                    queue);
   queue->mq
     = GNUNET_MQ_queue_for_callbacks (&mq_send,
-                                    &mq_destroy,
-                                    &mq_cancel,
-                                    queue,
-                                    NULL,
-                                    &mq_error,
-                                    queue);
+                                     &mq_destroy,
+                                     &mq_cancel,
+                                     queue,
+                                     NULL,
+                                     &mq_error,
+                                     queue);
   {
     char *foreign_addr;
-    
+
     if ('\0' == un->sun_path[0])
       GNUNET_asprintf (&foreign_addr,
-                      "%s-@%s",
-                      COMMUNICATOR_ADDRESS_PREFIX,
-                      &un->sun_path[1]);
+                       "%s-@%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       &un->sun_path[1]);
     else
       GNUNET_asprintf (&foreign_addr,
-                      "%s-%s",
-                      COMMUNICATOR_ADDRESS_PREFIX,
-                      un->sun_path);
+                       "%s-%s",
+                       COMMUNICATOR_ADDRESS_PREFIX,
+                       un->sun_path);
     queue->qh
       = GNUNET_TRANSPORT_communicator_mq_add (ch,
-                                             &queue->target,
-                                             foreign_addr,
-                                             GNUNET_ATS_NET_LOOPBACK,
-                                             queue->mq);
+                                              &queue->target,
+                                              foreign_addr,
+                                              UNIX_MTU,
+                                              GNUNET_NT_LOOPBACK,
+                                              cs,
+                                              queue->mq);
     GNUNET_free (foreign_addr);
   }
   return queue;
@@ -705,15 +718,16 @@ receive_complete_cb (void *cls,
   delivering_messages--;
   if (GNUNET_OK != success)
     GNUNET_STATISTICS_update (stats,
-                             "# transport transmission failures",
-                             1,
-                             GNUNET_NO);
+                              "# transport transmission failures",
+                              1,
+                              GNUNET_NO);
+  GNUNET_assert (NULL != unix_sock);
   if ( (NULL == read_task) &&
        (delivering_messages < max_queue_length) )
     read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                              unix_sock,
-                                              &select_read_cb,
-                                              NULL);
+                                               unix_sock,
+                                               &select_read_cb,
+                                               NULL);
 }
 
 
@@ -735,17 +749,18 @@ select_read_cb (void *cls)
   ssize_t ret;
   uint16_t msize;
 
+  GNUNET_assert (NULL != unix_sock);
   read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                            unix_sock,
-                                            &select_read_cb,
-                                            NULL);
+                                             unix_sock,
+                                             &select_read_cb,
+                                             NULL);
   addrlen = sizeof (un);
   memset (&un,
          0,
          sizeof (un));
   ret = GNUNET_NETWORK_socket_recvfrom (unix_sock,
                                         buf,
-                                       sizeof (buf),
+                                        sizeof (buf),
                                         (struct sockaddr *) &un,
                                         &addrlen);
   if ( (-1 == ret) &&
@@ -759,9 +774,9 @@ select_read_cb (void *cls)
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Read %d bytes from socket %s\n",
-             (int) ret,
-             un.sun_path);
+              "Read %d bytes from socket %s\n",
+              (int) ret,
+              un.sun_path);
   GNUNET_assert (AF_UNIX == (un.sun_family));
   msg = (struct UNIXMessage *) buf;
   msize = ntohs (msg->header.size);
@@ -772,12 +787,13 @@ select_read_cb (void *cls)
     return;
   }
   queue = lookup_queue (&msg->sender,
-                       &un,
-                       addrlen);
+                        &un,
+                        addrlen);
   if (NULL == queue)
     queue = setup_queue (&msg->sender,
-                        &un,
-                        addrlen);
+                         GNUNET_TRANSPORT_CS_INBOUND,
+                         &un,
+                         addrlen);
   else
     reschedule_queue_timeout (queue);
   if (NULL == queue)
@@ -786,12 +802,12 @@ select_read_cb (void *cls)
                _("Maximum number of UNIX connections exceeded, dropping incoming message\n"));
     return;
   }
+
   {
     uint16_t offset = 0;
     uint16_t tsize = msize - sizeof (struct UNIXMessage);
     const char *msgbuf = (const char *) &msg[1];
-    
+
     while (offset + sizeof (struct GNUNET_MessageHeader) <= tsize)
     {
       const struct GNUNET_MessageHeader *currhdr;
@@ -805,16 +821,17 @@ select_read_cb (void *cls)
              sizeof (al_hdr));
       csize = ntohs (al_hdr.size);
       if ( (csize < sizeof (struct GNUNET_MessageHeader)) ||
-          (csize > tsize - offset))
+           (csize > tsize - offset))
       {
-       GNUNET_break_op (0);
-       break;
+        GNUNET_break_op (0);
+        break;
       }
       ret = GNUNET_TRANSPORT_communicator_receive (ch,
-                                                  &msg->sender,
-                                                  currhdr,
-                                                  &receive_complete_cb,
-                                                  NULL);
+                                                   &msg->sender,
+                                                   currhdr,
+                                                   GNUNET_TIME_UNIT_FOREVER_REL,
+                                                   &receive_complete_cb,
+                                                   NULL);
       if (GNUNET_SYSERR == ret)
        return; /* transport not up */
       if (GNUNET_NO == ret)
@@ -858,7 +875,7 @@ mq_init (void *cls,
   const char *path;
   struct sockaddr_un *un;
   socklen_t un_len;
-  
+
   if (0 != strncmp (address,
                    COMMUNICATOR_ADDRESS_PREFIX "-",
                    strlen (COMMUNICATOR_ADDRESS_PREFIX "-")))
@@ -882,15 +899,16 @@ mq_init (void *cls,
     return GNUNET_OK;
   }
   queue = setup_queue (peer,
-                      un,
-                      un_len);
+                       GNUNET_TRANSPORT_CS_OUTBOUND,
+                       un,
+                       un_len);
   GNUNET_free (un);
   if (NULL == queue)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-               "Failed to setup queue to %s at `%s'\n",
-               GNUNET_i2s (peer),              
-               path);
+                "Failed to setup queue to %s at `%s'\n",
+                GNUNET_i2s (peer),
+                path);
     return GNUNET_NO;
   }
   return GNUNET_OK;
@@ -966,9 +984,32 @@ do_shutdown (void *cls)
 }
 
 
+/**
+ * Function called when the transport service has received an
+ * acknowledgement for this communicator (!) via a different return
+ * path.
+ *
+ * Not applicable for UNIX.
+ *
+ * @param cls closure
+ * @param sender which peer sent the notification
+ * @param msg payload
+ */
+static void
+enc_notify_cb (void *cls,
+               const struct GNUNET_PeerIdentity *sender,
+               const struct GNUNET_MessageHeader *msg)
+{
+  (void) cls;
+  (void) sender;
+  (void) msg;
+  GNUNET_break_op (0);
+}
+
+
 /**
  * Setup communicator and launch network interactions.
- * 
+ *
  * @param cls NULL (always)
  * @param args remaining command-line arguments
  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
@@ -985,7 +1026,7 @@ run (void *cls,
   socklen_t un_len;
   char *my_addr;
   (void) cls;
-  
+
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_filename (cfg,
                                               COMMUNICATOR_CONFIG_SECTION,
@@ -1003,7 +1044,7 @@ run (void *cls,
                                             "MAX_QUEUE_LENGTH",
                                             &max_queue_length))
     max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
-    
+
   un = unix_address_to_sockaddr (unix_socket_path,
                                  &un_len);
   if (NULL == un)
@@ -1069,9 +1110,11 @@ run (void *cls,
   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
                                              COMMUNICATOR_CONFIG_SECTION,
                                              COMMUNICATOR_ADDRESS_PREFIX,
-                                             65535,
+                                              GNUNET_TRANSPORT_CC_RELIABLE,
                                              &mq_init,
-                                             NULL);
+                                             NULL,
+                                              &enc_notify_cb,
+                                              NULL);
   if (NULL == ch)
   {
     GNUNET_break (0);
@@ -1083,16 +1126,12 @@ run (void *cls,
                   "%s-%s",
                   COMMUNICATOR_ADDRESS_PREFIX,
                   unix_socket_path);
+  GNUNET_free (unix_socket_path);
   ai = GNUNET_TRANSPORT_communicator_address_add (ch,
                                                  my_addr,
-                                                 GNUNET_ATS_NET_LOOPBACK,
+                                                 GNUNET_NT_LOOPBACK,
                                                  GNUNET_TIME_UNIT_FOREVER_REL);
   GNUNET_free (my_addr);
-  GNUNET_free (unix_socket_path);
-  read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
-                                            unix_sock,
-                                            &select_read_cb,
-                                            NULL);
 }