Link libgnunetblockgroup to libgnunetblock
[oweals/gnunet.git] / src / util / service_new.c
index 6d17720fd2859060afde7efee622174f5bb05d2e..8371f7703cac7af81ddd0b25a5d78091cc560df4 100644 (file)
 #endif
 
 
-#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
+#define LOG(kind,...) GNUNET_log_from (kind, "util-service", __VA_ARGS__)
 
-#define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util", syscall)
+#define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util-service", syscall)
 
-#define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
+#define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util-service", syscall, filename)
 
 
 /**
@@ -136,7 +136,7 @@ struct GNUNET_SERVICE_Handle
   /**
    * Message handlers to use for all clients.
    */
-  const struct GNUNET_MQ_MessageHandler *handlers;
+  struct GNUNET_MQ_MessageHandler *handlers;
 
   /**
    * Closure for @e task.
@@ -205,9 +205,8 @@ struct GNUNET_SERVICE_Handle
   int ret;
 
   /**
-   * If GNUNET_YES, consider unknown message types an error where the
+   * If #GNUNET_YES, consider unknown message types an error where the
    * client is disconnected.
-   * FIXME: remove?
    */
   int require_found;
 };
@@ -247,7 +246,7 @@ struct GNUNET_SERVICE_Client
   /**
    * Tokenizer we use for processing incoming data.
    */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  struct GNUNET_MessageStreamTokenizer *mst;
 
   /**
    * Task that warns about missing calls to
@@ -255,6 +254,12 @@ struct GNUNET_SERVICE_Client
    */
   struct GNUNET_SCHEDULER_Task *warn_task;
 
+  /**
+   * Task run to finish dropping the client after the stack has
+   * properly unwound.
+   */
+  struct GNUNET_SCHEDULER_Task *drop_task;
+
   /**
    * Task that receives data from the client to
    * pass it to the handlers.
@@ -266,12 +271,28 @@ struct GNUNET_SERVICE_Client
    */
   struct GNUNET_SCHEDULER_Task *send_task;
 
+  /**
+   * Pointer to the message to be transmitted by @e send_task.
+   */
+  const struct GNUNET_MessageHeader *msg;
+
   /**
    * User context value, value returned from
    * the connect callback.
    */
   void *user_context;
 
+  /**
+   * Time when we last gave a message from this client
+   * to the application.
+   */
+  struct GNUNET_TIME_Absolute warn_start;
+
+  /**
+   * Current position in @e msg at which we are transmitting.
+   */
+  size_t msg_pos;
+
   /**
    * Persist the file handle for this client no matter what happens,
    * force the OS to close once the process actually dies.  Should only
@@ -286,6 +307,11 @@ struct GNUNET_SERVICE_Client
    */
   int is_monitor;
 
+  /**
+   * Are we waiting for the application to call #GNUNET_SERVICE_client_continue()?
+   */
+  int needs_continue;
+
   /**
    * Type of last message processed (for warn_no_receive_done).
    */
@@ -364,9 +390,18 @@ service_main (void *cls)
     GNUNET_SCHEDULER_add_shutdown (&service_shutdown,
                                    sh);
   GNUNET_SERVICE_resume (sh);
-  sh->service_init_cb (sh->cb_cls,
-                       sh->cfg,
-                       sh);
+
+  if (-1 != sh->ready_confirm_fd)
+  {
+    GNUNET_break (1 == WRITE (sh->ready_confirm_fd, ".", 1));
+    GNUNET_break (0 == CLOSE (sh->ready_confirm_fd));
+    sh->ready_confirm_fd = -1;
+  }
+
+  if (NULL != sh->service_init_cb)
+    sh->service_init_cb (sh->cb_cls,
+                        sh->cfg,
+                        sh);
 }
 
 
@@ -386,7 +421,9 @@ process_acl4 (struct GNUNET_STRINGS_IPv4NetworkPolicy **ret,
 {
   char *opt;
 
-  if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+  if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+                                        sh->service_name,
+                                        option))
   {
     *ret = NULL;
     return GNUNET_OK;
@@ -394,7 +431,8 @@ process_acl4 (struct GNUNET_STRINGS_IPv4NetworkPolicy **ret,
   GNUNET_break (GNUNET_OK ==
                 GNUNET_CONFIGURATION_get_value_string (sh->cfg,
                                                        sh->service_name,
-                                                       option, &opt));
+                                                       option,
+                                                      &opt));
   if (NULL == (*ret = GNUNET_STRINGS_parse_ipv4_policy (opt)))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -426,7 +464,9 @@ process_acl6 (struct GNUNET_STRINGS_IPv6NetworkPolicy **ret,
 {
   char *opt;
 
-  if (!GNUNET_CONFIGURATION_have_value (sh->cfg, sh->service_name, option))
+  if (! GNUNET_CONFIGURATION_have_value (sh->cfg,
+                                        sh->service_name,
+                                        option))
   {
     *ret = NULL;
     return GNUNET_OK;
@@ -434,12 +474,15 @@ process_acl6 (struct GNUNET_STRINGS_IPv6NetworkPolicy **ret,
   GNUNET_break (GNUNET_OK ==
                 GNUNET_CONFIGURATION_get_value_string (sh->cfg,
                                                        sh->service_name,
-                                                       option, &opt));
+                                                       option,
+                                                      &opt));
   if (NULL == (*ret = GNUNET_STRINGS_parse_ipv6_policy (opt)))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          _("Could not parse IPv6 network specification `%s' for `%s:%s'\n"),
-         opt, sh->service_name, option);
+         opt,
+        sh->service_name,
+        option);
     GNUNET_free (opt);
     return GNUNET_SYSERR;
   }
@@ -469,12 +512,14 @@ add_unixpath (struct sockaddr **saddrs,
 
   un = GNUNET_new (struct sockaddr_un);
   un->sun_family = AF_UNIX;
-  strncpy (un->sun_path, unixpath, sizeof (un->sun_path) - 1);
+  strncpy (un->sun_path,
+          unixpath,
+          sizeof (un->sun_path) - 1);
 #ifdef LINUX
   if (GNUNET_YES == abstract)
     un->sun_path[0] = '\0';
 #endif
-#if HAVE_SOCKADDR_IN_SIN_LEN
+#if HAVE_SOCKADDR_UN_SUN_LEN
   un->sun_len = (u_char) sizeof (struct sockaddr_un);
 #endif
   *saddrs = (struct sockaddr *) un;
@@ -554,10 +599,10 @@ get_server_addresses (const char *service_name,
                                         0);
     if (NULL == desc)
     {
-      if ((ENOBUFS == errno) ||
-         (ENOMEM == errno) ||
-         (ENFILE == errno) ||
-          (EACCES == errno))
+      if ( (ENOBUFS == errno) ||
+          (ENOMEM == errno) ||
+          (ENFILE == errno) ||
+          (EACCES == errno) )
       {
         LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
                      "socket");
@@ -571,7 +616,8 @@ get_server_addresses (const char *service_name,
     }
     else
     {
-      GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (desc));
       desc = NULL;
     }
   }
@@ -648,9 +694,9 @@ get_server_addresses (const char *service_name,
     if (GNUNET_SYSERR == abstract)
       abstract = GNUNET_NO;
 #endif
-    if ((GNUNET_YES != abstract)
-        && (GNUNET_OK !=
-            GNUNET_DISK_directory_create_for_file (unixpath)))
+    if ( (GNUNET_YES != abstract) &&
+        (GNUNET_OK !=
+         GNUNET_DISK_directory_create_for_file (unixpath)) )
       GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
                                "mkdir",
                                unixpath);
@@ -682,7 +728,8 @@ get_server_addresses (const char *service_name,
     }
     else
     {
-      GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (desc));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (desc));
       desc = NULL;
     }
   }
@@ -994,7 +1041,8 @@ receive_sockets_from_parent (struct GNUNET_SERVICE_Handle *sh)
     LOG (GNUNET_ERROR_TYPE_ERROR,
          _("Could not access a pre-bound socket, will try to bind myself\n"));
     for (i = 0; (i < count) && (NULL != lsocks[i]); i++)
-      GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[i]));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (lsocks[i]));
     GNUNET_free (lsocks);
     return NULL;
   }
@@ -1081,7 +1129,8 @@ open_listen_socket (const struct sockaddr *server_addr,
              GNUNET_a2s (server_addr, socklen));
       }
     }
-    GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (sock));
     errno = eno;
     return NULL;
   }
@@ -1090,7 +1139,8 @@ open_listen_socket (const struct sockaddr *server_addr,
   {
     LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR,
                   "listen");
-    GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (sock));
     errno = 0;
     return NULL;
   }
@@ -1177,7 +1227,8 @@ setup_service (struct GNUNET_SERVICE_Handle *sh)
              (unsigned int) 3 + cnt);
         cnt++;
         while (NULL != lsocks[cnt])
-          GNUNET_break (0 == GNUNET_NETWORK_socket_close (lsocks[cnt++]));
+          GNUNET_break (GNUNET_OK ==
+                       GNUNET_NETWORK_socket_close (lsocks[cnt++]));
         GNUNET_free (lsocks);
         lsocks = NULL;
         break;
@@ -1197,12 +1248,13 @@ setup_service (struct GNUNET_SERVICE_Handle *sh)
   {
     /* listen only on inherited sockets if we have any */
     struct GNUNET_NETWORK_Handle **ls;
-    
+
     for (ls = lsocks; NULL != *ls; ls++)
     {
       struct ServiceListenContext *slc;
 
       slc = GNUNET_new (struct ServiceListenContext);
+      slc->sh = sh;
       slc->listen_socket = *ls;
       GNUNET_CONTAINER_DLL_insert (sh->slc_head,
                                   sh->slc_tail,
@@ -1228,22 +1280,41 @@ setup_service (struct GNUNET_SERVICE_Handle *sh)
       struct ServiceListenContext *slc;
 
       slc = GNUNET_new (struct ServiceListenContext);
+      slc->sh = sh;
       slc->listen_socket = open_listen_socket (addrs[i],
                                               addrlens[i]);
-      GNUNET_break (NULL != slc->listen_socket);
+      if (NULL == slc->listen_socket)
+      {
+        GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+                             "bind");
+        GNUNET_free (addrs[i++]);
+        GNUNET_free (slc);
+        continue;
+      }
+      GNUNET_free (addrs[i++]);
       GNUNET_CONTAINER_DLL_insert (sh->slc_head,
                                   sh->slc_tail,
                                   slc);
     }
+    GNUNET_free_non_null (addrlens);
+    GNUNET_free_non_null (addrs);
+    if ( (0 != num) &&
+         (NULL == sh->slc_head) )
+    {
+      /* All attempts to bind failed, hard failure */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  _("Could not bind to any of the ports I was supposed to, refusing to run!\n"));
+      return GNUNET_SYSERR;
+    }
   }
 
   sh->require_found = tolerant ? GNUNET_NO : GNUNET_YES;
-  sh->match_uid =
-      GNUNET_CONFIGURATION_get_value_yesno (sh->cfg,
+  sh->match_uid
+    = GNUNET_CONFIGURATION_get_value_yesno (sh->cfg,
                                            sh->service_name,
                                             "UNIX_MATCH_UID");
-  sh->match_gid =
-      GNUNET_CONFIGURATION_get_value_yesno (sh->cfg,
+  sh->match_gid
+    = GNUNET_CONFIGURATION_get_value_yesno (sh->cfg,
                                            sh->service_name,
                                             "UNIX_MATCH_GID");
   process_acl4 (&sh->v4_denied,
@@ -1472,6 +1543,118 @@ detach_terminal (struct GNUNET_SERVICE_Handle *sh)
 }
 
 
+/**
+ * Tear down the service, closing the listen sockets and
+ * freeing the ACLs.
+ *
+ * @param sh handle to the service to tear down.
+ */
+static void
+teardown_service (struct GNUNET_SERVICE_Handle *sh)
+{
+  struct ServiceListenContext *slc;
+
+  GNUNET_free_non_null (sh->v4_denied);
+  GNUNET_free_non_null (sh->v6_denied);
+  GNUNET_free_non_null (sh->v4_allowed);
+  GNUNET_free_non_null (sh->v6_allowed);
+  while (NULL != (slc = sh->slc_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sh->slc_head,
+                                 sh->slc_tail,
+                                 slc);
+    if (NULL != slc->listen_task)
+      GNUNET_SCHEDULER_cancel (slc->listen_task);
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (slc->listen_socket));
+    GNUNET_free (slc);
+  }
+}
+
+
+/**
+ * Low-level function to start a service if the scheduler
+ * is already running.  Should only be used directly in
+ * special cases.
+ *
+ * The function will launch the service with the name @a service_name
+ * using the @a service_options to configure its shutdown
+ * behavior. When clients connect or disconnect, the respective
+ * @a connect_cb or @a disconnect_cb functions will be called. For
+ * messages received from the clients, the respective @a handlers will
+ * be invoked; for the closure of the handlers we use the return value
+ * from the @a connect_cb invocation of the respective client.
+ *
+ * Each handler MUST call #GNUNET_SERVICE_client_continue() after each
+ * message to receive further messages from this client.  If
+ * #GNUNET_SERVICE_client_continue() is not called within a short
+ * time, a warning will be logged. If delays are expected, services
+ * should call #GNUNET_SERVICE_client_disable_continue_warning() to
+ * disable the warning.
+ *
+ * Clients sending invalid messages (based on @a handlers) will be
+ * dropped. Additionally, clients can be dropped at any time using
+ * #GNUNET_SERVICE_client_drop().
+ *
+ * The service must be stopped using #GNUNET_SERVICE_stoP().
+ *
+ * @param service_name name of the service to run
+ * @param cfg configuration to use
+ * @param connect_cb function to call whenever a client connects
+ * @param disconnect_cb function to call whenever a client disconnects
+ * @param cls closure argument for @a connect_cb and @a disconnect_cb
+ * @param handlers NULL-terminated array of message handlers for the service,
+ *                 the closure will be set to the value returned by
+ *                 the @a connect_cb for the respective connection
+ * @return NULL on error
+ */
+struct GNUNET_SERVICE_Handle *
+GNUNET_SERVICE_starT (const char *service_name,
+                      const struct GNUNET_CONFIGURATION_Handle *cfg,
+                      GNUNET_SERVICE_ConnectHandler connect_cb,
+                      GNUNET_SERVICE_DisconnectHandler disconnect_cb,
+                      void *cls,
+                      const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  struct GNUNET_SERVICE_Handle *sh;
+
+  sh = GNUNET_new (struct GNUNET_SERVICE_Handle);
+  sh->service_name = service_name;
+  sh->cfg = cfg;
+  sh->connect_cb = connect_cb;
+  sh->disconnect_cb = disconnect_cb;
+  sh->cb_cls = cls;
+  sh->handlers = GNUNET_MQ_copy_handlers (handlers);
+  if (GNUNET_OK != setup_service (sh))
+  {
+    GNUNET_free_non_null (sh->handlers);
+    GNUNET_free (sh);
+    return NULL;
+  }
+  GNUNET_SERVICE_resume (sh);
+  return sh;
+}
+
+
+/**
+ * Stops a service that was started with #GNUNET_SERVICE_starT().
+ *
+ * @param srv service to stop
+ */
+void
+GNUNET_SERVICE_stoP (struct GNUNET_SERVICE_Handle *srv)
+{
+  struct GNUNET_SERVICE_Client *client;
+
+  GNUNET_SERVICE_suspend (srv);
+  while (NULL != (client = srv->clients_head))
+    GNUNET_SERVICE_client_drop (client);
+  teardown_service (srv);
+  GNUNET_free_non_null (srv->handlers);
+  GNUNET_free (srv);
+}
+
+
 /**
  * Creates the "main" function for a GNUnet service.  You
  * should almost always use the #GNUNET_SERVICE_MAIN macro
@@ -1550,6 +1733,9 @@ GNUNET_SERVICE_ruN_ (int argc,
     GNUNET_GETOPT_OPTION_END
   };
 
+  memset (&sh,
+         0,
+         sizeof (sh));
   xdg = getenv ("XDG_CONFIG_HOME");
   if (NULL != xdg)
     GNUNET_asprintf (&cfg_filename,
@@ -1559,16 +1745,21 @@ GNUNET_SERVICE_ruN_ (int argc,
                      GNUNET_OS_project_data_get ()->config_file);
   else
     cfg_filename = GNUNET_strdup (GNUNET_OS_project_data_get ()->user_config_file);
-
+  sh.ready_confirm_fd = -1;
   sh.options = options;
   sh.cfg = cfg = GNUNET_CONFIGURATION_create ();
   sh.service_init_cb = service_init_cb;
   sh.connect_cb = connect_cb;
   sh.disconnect_cb = disconnect_cb;
   sh.cb_cls = cls;
-  sh.handlers = handlers;
+  sh.handlers = GNUNET_MQ_copy_handlers (handlers);
+  sh.service_name = service_name;
 
   /* setup subsystems */
+  loglev = NULL;
+  logfile = NULL;
+  opt_cfg_filename = NULL;
+  do_daemonize = 0;
   ret = GNUNET_GETOPT_run (service_name,
                           service_options,
                           argc,
@@ -1646,6 +1837,8 @@ GNUNET_SERVICE_ruN_ (int argc,
         "Skewing clock by %dll ms\n",
         clock_offset);
   }
+  GNUNET_RESOLVER_connect (sh.cfg);
+
   /* actually run service */
   err = 0;
   GNUNET_SCHEDULER_run (&service_main,
@@ -1657,8 +1850,11 @@ GNUNET_SERVICE_ruN_ (int argc,
 shutdown:
   if (-1 != sh.ready_confirm_fd)
   {
-    if (1 != WRITE (sh.ready_confirm_fd, err ? "I" : "S", 1))
-      LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, "write");
+    if (1 != WRITE (sh.ready_confirm_fd,
+                   err ? "I" : "S",
+                   1))
+      LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING,
+                   "write");
     GNUNET_break (0 == CLOSE (sh.ready_confirm_fd));
   }
 #if HAVE_MALLINFO
@@ -1686,26 +1882,14 @@ shutdown:
     }
   }
 #endif
+  teardown_service (&sh);
+  GNUNET_free_non_null (sh.handlers);
   GNUNET_SPEEDUP_stop_ ();
   GNUNET_CONFIGURATION_destroy (cfg);
-
-  while (NULL != sh.slc_head)
-  {
-    struct ServiceListenContext *slc = sh.slc_head;
-
-    sh.slc_head = slc->next;
-    // FIXME: destroy slc
-    GNUNET_free (slc);
-  }
-
   GNUNET_free_non_null (logfile);
   GNUNET_free_non_null (loglev);
   GNUNET_free (cfg_filename);
   GNUNET_free_non_null (opt_cfg_filename);
-  GNUNET_free_non_null (sh.v4_denied);
-  GNUNET_free_non_null (sh.v6_denied);
-  GNUNET_free_non_null (sh.v4_allowed);
-  GNUNET_free_non_null (sh.v6_allowed);
 
   return err ? GNUNET_SYSERR : sh.ret;
 }
@@ -1733,25 +1917,100 @@ GNUNET_SERVICE_suspend (struct GNUNET_SERVICE_Handle *sh)
 }
 
 
+/**
+ * Task run when we are ready to transmit data to the
+ * client.
+ *
+ * @param cls the `struct GNUNET_SERVICE_Client *` to send to
+ */
+static void
+do_send (void *cls)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+  ssize_t ret;
+  size_t left;
+  const char *buf;
+
+  client->send_task = NULL;
+  buf = (const char *) client->msg;
+  left = ntohs (client->msg->size) - client->msg_pos;
+  ret = GNUNET_NETWORK_socket_send (client->sock,
+                                   &buf[client->msg_pos],
+                                   left);
+  GNUNET_assert (ret <= (ssize_t) left);
+  if (0 == ret)
+  {
+    GNUNET_MQ_inject_error (client->mq,
+                           GNUNET_MQ_ERROR_WRITE);
+    return;
+  }
+  if (-1 == ret)
+  {
+    if ( (EAGAIN == errno) ||
+        (EINTR == errno) )
+    {
+      /* ignore */
+      ret = 0;
+    }
+    else
+    {
+      if (EPIPE != errno)
+        GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                             "send");
+      GNUNET_MQ_inject_error (client->mq,
+                             GNUNET_MQ_ERROR_WRITE);
+      return;
+    }
+  }
+  if (0 == client->msg_pos)
+  {
+    GNUNET_MQ_impl_send_in_flight (client->mq);
+  }
+  client->msg_pos += ret;
+  if (left > ret)
+  {
+    GNUNET_assert (NULL == client->drop_task);
+    client->send_task
+      = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                       client->sock,
+                                       &do_send,
+                                       client);
+    return;
+  }
+  GNUNET_MQ_impl_send_continue (client->mq);
+}
+
+
 /**
  * Signature of functions implementing the sending functionality of a
  * message queue.
  *
  * @param mq the message queue
  * @param msg the message to send
- * @param impl_state state of the implementation
+ * @param impl_state our `struct GNUNET_SERVICE_Client *`
  */
 static void
 service_mq_send (struct GNUNET_MQ_Handle *mq,
                  const struct GNUNET_MessageHeader *msg,
                  void *impl_state)
 {
-  // struct GNUNET_SERVICE_Client *client = cls;
-
-  // FIXME 1: setup "client->send_task" for transmission.
-  // FIXME 2: I seriously hope we do not need to make a copy of `msg`!
-  // OPTIMIZATION: ideally, we'd like the ability to peak at the rest of
-  //               the queue and transmit more than one message if possible.
+  struct GNUNET_SERVICE_Client *client = impl_state;
+
+  if (NULL != client->drop_task)
+    return; /* we're going down right now, do not try to send */
+  GNUNET_assert (NULL == client->send_task);
+
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Sending message of type %u and size %u to client\n",
+       ntohs (msg->type), ntohs (msg->size));
+
+  client->msg = msg;
+  client->msg_pos = 0;
+  client->send_task
+    = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                     client->sock,
+                                     &do_send,
+                                     client);
 }
 
 
@@ -1765,10 +2024,12 @@ static void
 service_mq_cancel (struct GNUNET_MQ_Handle *mq,
                    void *impl_state)
 {
-  // struct GNUNET_SERVICE_Client *client = cls;
+  struct GNUNET_SERVICE_Client *client = impl_state;
 
-  // FIXME: stop transmission! (must be possible, otherwise
-  // we must have told MQ that the message was sent!)
+  GNUNET_assert (0 == client->msg_pos);
+  client->msg = NULL;
+  GNUNET_SCHEDULER_cancel (client->send_task);
+  client->send_task = NULL;
 }
 
 
@@ -1778,7 +2039,7 @@ service_mq_cancel (struct GNUNET_MQ_Handle *mq,
  * the message queue.
  * Not every message queue implementation supports an error handler.
  *
- * @param cls closure
+ * @param cls closure with our `struct GNUNET_SERVICE_Client`
  * @param error error code
  */
 static void
@@ -1786,8 +2047,41 @@ service_mq_error_handler (void *cls,
                           enum GNUNET_MQ_Error error)
 {
   struct GNUNET_SERVICE_Client *client = cls;
+  struct GNUNET_SERVICE_Handle *sh = client->sh;
 
-  // FIXME!
+  if ( (GNUNET_MQ_ERROR_NO_MATCH == error) &&
+       (GNUNET_NO == sh->require_found) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No handler for message of type %u found\n",
+                (unsigned int) client->warn_type);
+    GNUNET_SERVICE_client_continue (client);
+    return; /* ignore error */
+  }
+  GNUNET_SERVICE_client_drop (client);
+}
+
+
+/**
+ * Task run to warn about missing calls to #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client *` to process more requests from
+ */
+static void
+warn_no_client_continue (void *cls)
+{
+  struct GNUNET_SERVICE_Client *client = cls;
+
+  GNUNET_break (0 != client->warn_type); /* type should never be 0 here, as we don't use 0 */
+  client->warn_task
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+                                    &warn_no_client_continue,
+                                   client);
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       _("Processing code for message of type %u did not call `GNUNET_SERVICE_client_continue' after %s\n"),
+       (unsigned int) client->warn_type,
+       GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (client->warn_start),
+                                              GNUNET_YES));
 }
 
 
@@ -1795,23 +2089,36 @@ service_mq_error_handler (void *cls,
  * Functions with this signature are called whenever a
  * complete message is received by the tokenizer for a client.
  *
- * Do not call #GNUNET_SERVER_mst_destroy() from within
+ * Do not call #GNUNET_MST_destroy() from within
  * the scope of this callback.
  *
  * @param cls closure with the `struct GNUNET_SERVICE_Client *`
- * @param client_cls closure with the `struct GNUNET_SERVICE_Client *`
  * @param message the actual message
- * @return #GNUNET_OK on success (always)
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR if the client was dropped
  */
 static int
 service_client_mst_cb (void *cls,
-                       void *client_cls,
                        const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_SERVICE_Client *client = cls;
 
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Received message of type %u and size %u from client\n",
+       ntohs (message->type), ntohs (message->size));
+
+  GNUNET_assert (GNUNET_NO == client->needs_continue);
+  client->needs_continue = GNUNET_YES;
+  client->warn_type = ntohs (message->type);
+  client->warn_start = GNUNET_TIME_absolute_get ();
+  GNUNET_assert (NULL == client->warn_task);
+  client->warn_task
+    = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
+                                   &warn_no_client_continue,
+                                   client);
   GNUNET_MQ_inject_message (client->mq,
                             message);
+  if (NULL != client->drop_task)
+    return GNUNET_SYSERR;
   return GNUNET_OK;
 }
 
@@ -1826,10 +2133,37 @@ static void
 service_client_recv (void *cls)
 {
   struct GNUNET_SERVICE_Client *client = cls;
+  int ret;
 
-  // FIXME: read into buffer, pass to MST, then client->mq inject!
-  // FIXME: revise MST API to avoid the memcpy!
-  // i.e.: GNUNET_MST_read (client->sock);
+  client->recv_task = NULL;
+  ret = GNUNET_MST_read (client->mst,
+                        client->sock,
+                        GNUNET_NO,
+                        GNUNET_YES);
+  if (GNUNET_SYSERR == ret)
+  {
+    /* client closed connection (or IO error) */
+    if (NULL == client->drop_task)
+    {
+      GNUNET_assert (GNUNET_NO == client->needs_continue);
+      GNUNET_SERVICE_client_drop (client);
+    }
+    return;
+  }
+  if (GNUNET_NO == ret)
+    return; /* more messages in buffer, wait for application
+              to be done processing */
+  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_YES == client->needs_continue)
+    return;
+  if (NULL != client->recv_task)
+    return;
+  /* MST needs more data, re-schedule read job */
+  client->recv_task
+    = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    client->sock,
+                                    &service_client_recv,
+                                    client);
 }
 
 
@@ -1859,11 +2193,12 @@ start_client (struct GNUNET_SERVICE_Handle *sh,
                                               sh->handlers,
                                               &service_mq_error_handler,
                                               client);
-  client->mst = GNUNET_SERVER_mst_create (&service_client_mst_cb,
-                                          client);
-  client->user_context = sh->connect_cb (sh->cb_cls,
-                                         client,
-                                         client->mq);
+  client->mst = GNUNET_MST_create (&service_client_mst_cb,
+                                  client);
+  if (NULL != sh->connect_cb)
+    client->user_context = sh->connect_cb (sh->cb_cls,
+                                           client,
+                                           client->mq);
   GNUNET_MQ_set_handlers_closure (client->mq,
                                   client->user_context);
   client->recv_task
@@ -1955,69 +2290,70 @@ accept_client (void *cls)
 
   slc->listen_task = NULL;
   while (1)
+  {
+    struct GNUNET_NETWORK_Handle *sock;
+    const struct sockaddr_in *v4;
+    const struct sockaddr_in6 *v6;
+    struct sockaddr_storage sa;
+    socklen_t addrlen;
+    int ok;
+
+    addrlen = sizeof (sa);
+    sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
+                                        (struct sockaddr *) &sa,
+                                        &addrlen);
+    if (NULL == sock)
+      break;
+    switch (sa.ss_family)
     {
-      struct GNUNET_NETWORK_Handle *sock;
-      const struct sockaddr_in *v4;
-      const struct sockaddr_in6 *v6;
-      struct sockaddr_storage sa;
-      socklen_t addrlen;
-      int ok;
-
-      addrlen = sizeof (sa);
-      sock = GNUNET_NETWORK_socket_accept (slc->listen_socket,
-                                           (struct sockaddr *) &sa,
-                                           &addrlen);
-      if (NULL == sock)
-        break;
-      switch (sa.ss_family)
-      {
-      case AF_INET:
-        GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
-        v4 = (const struct sockaddr_in *) &sa;
-        ok = ( ( (NULL == sh->v4_allowed) ||
-                 (check_ipv4_listed (sh->v4_allowed,
-                                     &v4->sin_addr))) &&
-               ( (NULL == sh->v4_denied) ||
-                 (! check_ipv4_listed (sh->v4_denied,
-                                       &v4->sin_addr)) ) );
-        break;
-      case AF_INET6:
-        GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
-        v6 = (const struct sockaddr_in6 *) &sa;
-        ok = ( ( (NULL == sh->v6_allowed) ||
-                 (check_ipv6_listed (sh->v6_allowed,
-                                     &v6->sin6_addr))) &&
-               ( (NULL == sh->v6_denied) ||
-                 (! check_ipv6_listed (sh->v6_denied,
-                                       &v6->sin6_addr)) ) );
-        break;
+    case AF_INET:
+      GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
+      v4 = (const struct sockaddr_in *) &sa;
+      ok = ( ( (NULL == sh->v4_allowed) ||
+              (check_ipv4_listed (sh->v4_allowed,
+                                  &v4->sin_addr))) &&
+            ( (NULL == sh->v4_denied) ||
+              (! check_ipv4_listed (sh->v4_denied,
+                                    &v4->sin_addr)) ) );
+      break;
+    case AF_INET6:
+      GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
+      v6 = (const struct sockaddr_in6 *) &sa;
+      ok = ( ( (NULL == sh->v6_allowed) ||
+              (check_ipv6_listed (sh->v6_allowed,
+                                  &v6->sin6_addr))) &&
+            ( (NULL == sh->v6_denied) ||
+              (! check_ipv6_listed (sh->v6_denied,
+                                    &v6->sin6_addr)) ) );
+      break;
 #ifndef WINDOWS
-      case AF_UNIX:
-        ok = GNUNET_OK;            /* controlled using file-system ACL now */
-        break;
+    case AF_UNIX:
+      ok = GNUNET_OK;            /* controlled using file-system ACL now */
+      break;
 #endif
-      default:
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             _("Unknown address family %d\n"),
-             sa.ss_family);
-        return;
-      }
-      if (! ok)
-        {
-          LOG (GNUNET_ERROR_TYPE_DEBUG,
-               "Service rejected incoming connection from %s due to policy.\n",
-               GNUNET_a2s ((const struct sockaddr *) &sa,
-                           addrlen));
-          GNUNET_NETWORK_socket_close (sock);
-          continue;
-        }
+    default:
+      LOG (GNUNET_ERROR_TYPE_WARNING,
+          _("Unknown address family %d\n"),
+          sa.ss_family);
+      return;
+    }
+    if (! ok)
+    {
       LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Service accepted incoming connection from %s.\n",
-           GNUNET_a2s ((const struct sockaddr *) &sa,
-                       addrlen));
-      start_client (slc->sh,
-                    sock);
+          "Service rejected incoming connection from %s due to policy.\n",
+          GNUNET_a2s ((const struct sockaddr *) &sa,
+                      addrlen));
+      GNUNET_break (GNUNET_OK ==
+                   GNUNET_NETWORK_socket_close (sock));
+      continue;
     }
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Service accepted incoming connection from %s.\n",
+        GNUNET_a2s ((const struct sockaddr *) &sa,
+                    addrlen));
+    start_client (slc->sh,
+                 sock);
+  }
   slc->listen_task
     = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                     slc->listen_socket,
@@ -2048,6 +2384,44 @@ GNUNET_SERVICE_resume (struct GNUNET_SERVICE_Handle *sh)
 }
 
 
+/**
+ * Task run to resume receiving data from the client after
+ * the client called #GNUNET_SERVICE_client_continue().
+ *
+ * @param cls our `struct GNUNET_SERVICE_Client`
+ */
+static void
+resume_client_receive (void *cls)
+{
+  struct GNUNET_SERVICE_Client *c = cls;
+  int ret;
+
+  c->recv_task = NULL;
+  /* first, check if there is still something in the buffer */
+  ret = GNUNET_MST_next (c->mst,
+                        GNUNET_YES);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (c);
+    return;
+  }
+  if (GNUNET_NO == ret)
+    return; /* done processing, wait for more later */
+  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_YES == c->needs_continue)
+    return; /* #GNUNET_MST_next() did give a message to the client */
+  /* need to receive more data from the network first */
+  if (NULL != c->recv_task)
+    return;
+  c->recv_task
+    = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                    c->sock,
+                                    &service_client_recv,
+                                    c);
+}
+
+
 /**
  * Continue receiving further messages from the given client.
  * Must be called after each message received.
@@ -2057,7 +2431,17 @@ GNUNET_SERVICE_resume (struct GNUNET_SERVICE_Handle *sh)
 void
 GNUNET_SERVICE_client_continue (struct GNUNET_SERVICE_Client *c)
 {
-  GNUNET_break (0); // not implemented
+  GNUNET_assert (GNUNET_YES == c->needs_continue);
+  GNUNET_assert (NULL == c->recv_task);
+  c->needs_continue = GNUNET_NO;
+  if (NULL != c->warn_task)
+  {
+    GNUNET_SCHEDULER_cancel (c->warn_task);
+    c->warn_task = NULL;
+  }
+  c->recv_task
+    = GNUNET_SCHEDULER_add_now (&resume_client_receive,
+                               c);
 }
 
 
@@ -2081,6 +2465,39 @@ GNUNET_SERVICE_client_disable_continue_warning (struct GNUNET_SERVICE_Client *c)
 }
 
 
+/**
+ * Asynchronously finish dropping the client.
+ *
+ * @param cls the `struct GNUNET_SERVICE_Client`.
+ */
+static void
+finish_client_drop (void *cls)
+{
+  struct GNUNET_SERVICE_Client *c = cls;
+  struct GNUNET_SERVICE_Handle *sh = c->sh;
+
+  c->drop_task = NULL;
+  GNUNET_assert (NULL == c->send_task);
+  GNUNET_assert (NULL == c->recv_task);
+  GNUNET_assert (NULL == c->warn_task);
+  GNUNET_MST_destroy (c->mst);
+  GNUNET_MQ_destroy (c->mq);
+  if (GNUNET_NO == c->persist)
+  {
+    GNUNET_break (GNUNET_OK ==
+                 GNUNET_NETWORK_socket_close (c->sock));
+  }
+  else
+  {
+    GNUNET_NETWORK_socket_free_memory_only_ (c->sock);
+  }
+  GNUNET_free (c);
+  if ( (GNUNET_YES == sh->got_shutdown) &&
+       (GNUNET_NO == have_non_monitor_clients (sh)) )
+    GNUNET_SERVICE_shutdown (sh);
+}
+
+
 /**
  * Ask the server to disconnect from the given client.  This is the
  * same as returning #GNUNET_SYSERR within the check procedure when
@@ -2096,12 +2513,19 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c)
 {
   struct GNUNET_SERVICE_Handle *sh = c->sh;
 
+  if (NULL != c->drop_task)
+  {
+    /* asked to drop twice! */
+    GNUNET_assert (0);
+    return;
+  }
   GNUNET_CONTAINER_DLL_remove (sh->clients_head,
                                sh->clients_tail,
                                c);
-  sh->disconnect_cb (sh->cb_cls,
-                     c,
-                     c->user_context);
+  if (NULL != sh->disconnect_cb)
+    sh->disconnect_cb (sh->cb_cls,
+                       c,
+                       c->user_context);
   if (NULL != c->warn_task)
   {
     GNUNET_SCHEDULER_cancel (c->warn_task);
@@ -2117,20 +2541,8 @@ GNUNET_SERVICE_client_drop (struct GNUNET_SERVICE_Client *c)
     GNUNET_SCHEDULER_cancel (c->send_task);
     c->send_task = NULL;
   }
-  GNUNET_SERVER_mst_destroy (c->mst);
-  GNUNET_MQ_destroy (c->mq);
-  if (GNUNET_NO == c->persist)
-  {
-    GNUNET_NETWORK_socket_close (c->sock);
-  }
-  else
-  {
-    GNUNET_NETWORK_socket_free_memory_only_ (c->sock);
-  }
-  GNUNET_free (c);
-  if ( (GNUNET_YES == sh->got_shutdown) &&
-       (GNUNET_NO == have_non_monitor_clients (sh)) )
-    GNUNET_SERVICE_shutdown (sh);
+  c->drop_task = GNUNET_SCHEDULER_add_now (&finish_client_drop,
+                                           c);
 }
 
 
@@ -2187,4 +2599,17 @@ GNUNET_SERVICE_client_persist (struct GNUNET_SERVICE_Client *c)
 }
 
 
+/**
+ * Obtain the message queue of @a c.  Convenience function.
+ *
+ * @param c the client to continue receiving from
+ * @return the message queue of @a c
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_SERVICE_client_get_mq (struct GNUNET_SERVICE_Client *c)
+{
+  return c->mq;
+}
+
+
 /* end of service_new.c */