indentation, comment and style fixes, no semantic changes
[oweals/gnunet.git] / src / fs / gnunet-service-fs_cadet_server.c
index 32fe4191a1877a6799d7c764341f94f77844cc84..adbce1154568a4cabc2c351f8f15d8494b3db7d9 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013 GNUnet e.V.
+     Copyright (C) 2012, 2013, 2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -85,11 +85,6 @@ struct CadetClient
    */
   struct GNUNET_CADET_Channel *channel;
 
-  /**
-   * Handle for active write operation, or NULL.
-   */
-  struct GNUNET_CADET_TransmitHandle *wh;
-
   /**
    * Head of write queue.
    */
@@ -124,9 +119,9 @@ struct CadetClient
 
 
 /**
- * Listen channel for incoming requests.
+ * Listen port for incoming requests.
  */
-static struct GNUNET_CADET_Handle *listen_channel;
+static struct GNUNET_CADET_Port *cadet_port;
 
 /**
  * Head of DLL of cadet clients.
@@ -188,121 +183,29 @@ refresh_timeout_task (struct CadetClient *sc)
 
 
 /**
- * We're done handling a request from a client, read the next one.
+ * Check if we are done with the write queue, and if so tell CADET
+ * that we are ready to read more.
  *
- * @param sc client to continue reading requests from
+ * @param cls where to process the write queue
  */
 static void
-continue_reading (struct CadetClient *sc)
-{
-  refresh_timeout_task (sc);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Finished processing cadet request from client %p, ready to receive the next one\n",
-             sc);
-  GNUNET_CADET_receive_done (sc->channel);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc);
-
-
-/**
- * Send a reply now, cadet is ready.
- *
- * @param cls closure with the `struct CadetClient` which sent the query
- * @param size number of bytes available in @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-write_continuation (void *cls,
-                   size_t size,
-                   void *buf)
+continue_writing (void *cls)
 {
   struct CadetClient *sc = cls;
-  struct GNUNET_CADET_Channel *tun;
-  struct WriteQueueItem *wqi;
-  size_t ret;
-
-  sc->wh = NULL;
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    return 0;
-  }
-  if ( (0 == size) ||
-       (size < wqi->msize) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Transmission of reply failed, terminating cadet\n");
-    tun = sc->channel;
-    sc->channel = NULL;
-    GNUNET_CADET_channel_destroy (tun);
-    return 0;
-  }
-  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitted %u byte reply via cadet to %p\n",
-             (unsigned int) size,
-             sc);
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# Blocks transferred via cadet"), 1,
-                           GNUNET_NO);
-  ret = wqi->msize;
-  GNUNET_memcpy (buf, &wqi[1], ret);
-  GNUNET_free (wqi);
-  continue_writing (sc);
-  return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc)
-{
-  struct WriteQueueItem *wqi;
-  struct GNUNET_CADET_Channel *tun;
+  struct GNUNET_MQ_Handle *mq;
 
-  if (NULL != sc->wh)
+  mq = GNUNET_CADET_get_mq (sc->channel);
+  if (0 != GNUNET_MQ_get_length (mq))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Write pending, waiting for it to complete\n");
-    return; /* write already pending */
-  }
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    continue_reading (sc);
-    return;
-  }
-  sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             wqi->msize,
-                                             &write_continuation,
-                                             sc);
-  if (NULL == sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write failed; terminating cadet\n");
-    tun = sc->channel;
-    sc->channel = NULL;
-    GNUNET_CADET_channel_destroy (tun);
     return;
   }
+  refresh_timeout_task (sc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Finished processing cadet request from client %p, ready to receive the next one\n",
+             sc);
+  GNUNET_CADET_receive_done (sc->channel);
 }
 
 
@@ -333,7 +236,7 @@ handle_datastore_reply (void *cls,
 {
   struct CadetClient *sc = cls;
   size_t msize = size + sizeof (struct CadetReplyMessage);
-  struct WriteQueueItem *wqi;
+  struct GNUNET_MQ_Envelope *env;
   struct CadetReplyMessage *srm;
 
   sc->qe = NULL;
@@ -357,7 +260,8 @@ handle_datastore_reply (void *cls,
                  GNUNET_h2s (key));
     }
     GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop ("# queries received via CADET not answered"), 1,
+                              gettext_noop ("# queries received via CADET not answered"),
+                              1,
                               GNUNET_NO);
     continue_writing (sc);
     return;
@@ -369,9 +273,13 @@ handle_datastore_reply (void *cls,
                GNUNET_h2s (key));
     if (GNUNET_OK !=
        GNUNET_FS_handle_on_demand_block (key,
-                                         size, data, type,
-                                         priority, anonymity,
-                                         expiration, uid,
+                                         size,
+                                          data,
+                                          type,
+                                         priority,
+                                          anonymity,
+                                         expiration,
+                                          uid,
                                          &handle_datastore_reply,
                                          sc))
     {
@@ -394,19 +302,23 @@ handle_datastore_reply (void *cls,
               (unsigned int) type,
              GNUNET_h2s (key),
              sc);
-  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
-  wqi->msize = msize;
-  srm = (struct CadetReplyMessage *) &wqi[1];
-  srm->header.size = htons ((uint16_t) msize);
-  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
+  env = GNUNET_MQ_msg_extra (srm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
   srm->type = htonl (type);
   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  GNUNET_memcpy (&srm[1], data, size);
-  sc->reply_size = msize;
-  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  continue_writing (sc);
+  GNUNET_memcpy (&srm[1],
+                 data,
+                 size);
+  GNUNET_MQ_notify_sent (env,
+                         &continue_writing,
+                         sc);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# Blocks transferred via cadet"),
+                            1,
+                           GNUNET_NO);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
+                  env);
 }
 
 
@@ -414,30 +326,22 @@ handle_datastore_reply (void *cls,
  * Functions with this signature are called whenever a
  * complete query message is received.
  *
- * Do not call #GNUNET_SERVER_mst_destroy() in callback
- *
  * @param cls closure with the `struct CadetClient`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param sqm the actual message
  */
-static int
-request_cb (void *cls,
-           struct GNUNET_CADET_Channel *channel,
-           void **channel_ctx,
-           const struct GNUNET_MessageHeader *message)
+static void
+handle_request (void *cls,
+                const struct CadetQueryMessage *sqm)
 {
-  struct CadetClient *sc = *channel_ctx;
-  const struct CadetQueryMessage *sqm;
+  struct CadetClient *sc = cls;
 
-  sqm = (const struct CadetQueryMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received query for `%s' via cadet from client %p\n",
              GNUNET_h2s (&sqm->query),
              sc);
   GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# queries received via cadet"), 1,
+                           gettext_noop ("# queries received via cadet"),
+                            1,
                            GNUNET_NO);
   refresh_timeout_task (sc);
   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
@@ -446,14 +350,14 @@ request_cb (void *cls,
                                     ntohl (sqm->type),
                                     0 /* priority */,
                                     GSF_datastore_queue_size,
-                                    &handle_datastore_reply, sc);
+                                    &handle_datastore_reply,
+                                     sc);
   if (NULL == sc->qe)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Queueing request with datastore failed (queue full?)\n");
     continue_writing (sc);
   }
-  return GNUNET_OK;
 }
 
 
@@ -464,16 +368,12 @@ request_cb (void *cls,
  * @param channel the channel representing the cadet
  * @param initiator the identity of the peer who wants to establish a cadet
  *            with us; NULL on binding error
- * @param port cadet port used for the incoming connection
- * @param options channel option flags
- * @return initial channel context (our 'struct CadetClient')
+ * @return initial channel context (our `struct CadetClient`)
  */
 static void *
-accept_cb (void *cls,
-          struct GNUNET_CADET_Channel *channel,
-          const struct GNUNET_PeerIdentity *initiator,
-          const struct GNUNET_HashCode *port,
-           enum GNUNET_CADET_ChannelOption options)
+connect_cb (void *cls,
+            struct GNUNET_CADET_Channel *channel,
+            const struct GNUNET_PeerIdentity *initiator)
 {
   struct CadetClient *sc;
 
@@ -481,13 +381,15 @@ accept_cb (void *cls,
   if (sc_count >= sc_count_max)
   {
     GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# cadet client connections rejected"), 1,
+                             gettext_noop ("# cadet client connections rejected"),
+                              1,
                              GNUNET_NO);
     GNUNET_CADET_channel_destroy (channel);
     return NULL;
   }
   GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# cadet connections active"), 1,
+                           gettext_noop ("# cadet connections active"),
+                            1,
                            GNUNET_NO);
   sc = GNUNET_new (struct CadetClient);
   sc->channel = channel;
@@ -506,18 +408,17 @@ accept_cb (void *cls,
 
 /**
  * Function called by cadet when a client disconnects.
- * Cleans up our 'struct CadetClient' of that channel.
+ * Cleans up our `struct CadetClient` of that channel.
  *
- * @param cls NULL
+ * @param cls  our `struct CadetClient`
  * @param channel channel of the disconnecting client
- * @param channel_ctx our 'struct CadetClient'
+ * @param channel_ctx
  */
 static void
-cleaner_cb (void *cls,
-           const struct GNUNET_CADET_Channel *channel,
-           void *channel_ctx)
+disconnect_cb (void *cls,
+               const struct GNUNET_CADET_Channel *channel)
 {
-  struct CadetClient *sc = channel_ctx;
+  struct CadetClient *sc = cls;
   struct WriteQueueItem *wqi;
 
   if (NULL == sc)
@@ -533,8 +434,6 @@ cleaner_cb (void *cls,
     GNUNET_SCHEDULER_cancel (sc->terminate_task);
   if (NULL != sc->timeout_task)
     GNUNET_SCHEDULER_cancel (sc->timeout_task);
-  if (NULL != sc->wh)
-    GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
   if (NULL != sc->qe)
     GNUNET_DATASTORE_cancel (sc->qe);
   while (NULL != (wqi = sc->wqi_head))
@@ -552,16 +451,44 @@ cleaner_cb (void *cls,
 }
 
 
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ *                    this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+                  const struct GNUNET_CADET_Channel *channel,
+                  int window_size)
+{
+  /* FIXME: could do flow control here... */
+}
+
+
 /**
  * Initialize subsystem for non-anonymous file-sharing.
  */
 void
 GSF_cadet_start_server ()
 {
-  static const struct GNUNET_CADET_MessageHandler handlers[] = {
-    { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
-    { NULL, 0, 0 }
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (request,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
+                             struct CadetQueryMessage,
+                             NULL),
+    GNUNET_MQ_handler_end ()
   };
+  struct GNUNET_HashCode port;
+
   if (GNUNET_YES !=
       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
                                             "fs",
@@ -571,14 +498,19 @@ GSF_cadet_start_server ()
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Initializing cadet FS server with a limit of %llu connections\n",
              sc_count_max);
-  listen_channel = GNUNET_CADET_connect (GSF_cfg,
-                                         NULL,
-                                         &cleaner_cb,
-                                         handlers);
-  GNUNET_assert (NULL != listen_channel);
-  GNUNET_CADET_open_port (listen_channel,
-                          GC_u2h (GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER),
-                          &accept_cb, NULL);
+  cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+  cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
+  GNUNET_assert (NULL != cadet_handle);
+  GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+                      strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+                      &port);
+  cadet_port = GNUNET_CADET_open_porT (cadet_handle,
+                                       &port,
+                                       &connect_cb,
+                                       NULL,
+                                       &window_change_cb,
+                                       &disconnect_cb,
+                                       handlers);
 }
 
 
@@ -588,10 +520,20 @@ GSF_cadet_start_server ()
 void
 GSF_cadet_stop_server ()
 {
-  if (NULL != listen_channel)
+  GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
+                                        &GSF_cadet_release_clients,
+                                        NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
+  cadet_map = NULL;
+  if (NULL != cadet_port)
+  {
+    GNUNET_CADET_close_port (cadet_port);
+    cadet_port = NULL;
+  }
+  if (NULL != cadet_handle)
   {
-    GNUNET_CADET_disconnect (listen_channel);
-    listen_channel = NULL;
+    GNUNET_CADET_disconnect (cadet_handle);
+    cadet_handle = NULL;
   }
   GNUNET_assert (NULL == sc_head);
   GNUNET_assert (0 == sc_count);