-eliminating duplicate code
[oweals/gnunet.git] / src / util / server.c
index 63993ee37d4b75ac0c60c0e9e15f3643d3661b59..419bde13d4b59c61fe6e475b34337875d793ff21 100644 (file)
 #include "gnunet_disk_lib.h"
 #include "gnunet_protocols.h"
 
+#define LOG(kind,...) GNUNET_log_from (kind, "util", __VA_ARGS__)
+
+#define LOG_STRERROR(kind,syscall) GNUNET_log_from_strerror (kind, "util", syscall)
+
+#define LOG_STRERROR_FILE(kind,syscall,filename) GNUNET_log_from_strerror_file (kind, "util", syscall, filename)
+
 #define DEBUG_SERVER GNUNET_EXTRA_LOGGING
 
 /**
@@ -134,6 +140,10 @@ struct GNUNET_SERVER_Handle
    */
   int clients_ignore_shutdown;
 
+  GNUNET_SERVER_MstCreateCallback mst_create;
+  GNUNET_SERVER_MstDestroyCallback mst_destroy;
+  GNUNET_SERVER_MstReceiveCallback mst_receive;
+  void *mst_cls;
 };
 
 
@@ -151,7 +161,7 @@ struct GNUNET_SERVER_Client
   /**
    * Processing of incoming data.
    */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  void *mst;
 
   /**
    * Server that this client belongs to.
@@ -236,6 +246,11 @@ struct GNUNET_SERVER_Client
    */
   int receive_pending;
 
+  /**
+   * Finish pending write when disconnecting?
+   */
+  int finish_pending_write;
+
   /**
    * Persist the file handle for this client no matter what happens,
    * force the OS to close once the process actually dies.  Should only
@@ -294,8 +309,7 @@ process_listen_socket (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       if (sock != NULL)
       {
 #if DEBUG_SERVER
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Server accepted incoming connection.\n");
+        LOG (GNUNET_ERROR_TYPE_DEBUG, "Server accepted incoming connection.\n");
 #endif
         client = GNUNET_SERVER_connect_socket (server, sock);
         GNUNET_CONNECTION_ignore_shutdown (sock,
@@ -350,7 +364,7 @@ open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
   sock = GNUNET_NETWORK_socket_create (serverAddr->sa_family, SOCK_STREAM, 0);
   if (NULL == sock)
   {
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "socket");
+    LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "socket");
     errno = 0;
     return NULL;
   }
@@ -358,14 +372,14 @@ open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
   {
     if (GNUNET_NETWORK_socket_setsockopt
         (sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on)) != GNUNET_OK)
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                           "setsockopt");
+      LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                    "setsockopt");
 #ifdef IPV6_V6ONLY
     if ((serverAddr->sa_family == AF_INET6) &&
         (GNUNET_NETWORK_socket_setsockopt
          (sock, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof (on)) != GNUNET_OK))
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                           "setsockopt");
+      LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                    "setsockopt");
 #endif
   }
   /* bind the socket */
@@ -378,24 +392,24 @@ open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
        * fail if we already took the port on IPv6; if both IPv4 and
        * IPv6 binds fail, then our caller will log using the
        * errno preserved in 'eno' */
-      GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
+      LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "bind");
       if (port != 0)
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    _("`%s' failed for port %d (%s).\n"), "bind", port,
-                    (serverAddr->sa_family == AF_INET) ? "IPv4" : "IPv6");
+        LOG (GNUNET_ERROR_TYPE_ERROR, _("`%s' failed for port %d (%s).\n"),
+             "bind", port,
+             (serverAddr->sa_family == AF_INET) ? "IPv4" : "IPv6");
       eno = 0;
     }
     else
     {
       if (port != 0)
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    _("`%s' failed for port %d (%s): address already in use\n"),
-                    "bind", port,
-                    (serverAddr->sa_family == AF_INET) ? "IPv4" : "IPv6");
+        LOG (GNUNET_ERROR_TYPE_WARNING,
+             _("`%s' failed for port %d (%s): address already in use\n"),
+             "bind", port,
+             (serverAddr->sa_family == AF_INET) ? "IPv4" : "IPv6");
       else if (serverAddr->sa_family == AF_UNIX)
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    _("`%s' failed for `%s': address already in use\n"), "bind",
-                    ((const struct sockaddr_un *) serverAddr)->sun_path);
+        LOG (GNUNET_ERROR_TYPE_WARNING,
+             _("`%s' failed for `%s': address already in use\n"), "bind",
+             ((const struct sockaddr_un *) serverAddr)->sun_path);
 
     }
     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
@@ -404,15 +418,15 @@ open_listen_socket (const struct sockaddr *serverAddr, socklen_t socklen)
   }
   if (GNUNET_OK != GNUNET_NETWORK_socket_listen (sock, 5))
   {
-    GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "listen");
+    LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "listen");
     GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (sock));
     errno = 0;
     return NULL;
   }
 #if DEBUG_SERVER
   if (port != 0)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Server starts to listen on port %u.\n", port);
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Server starts to listen on port %u.\n",
+         port);
 #endif
   return sock;
 }
@@ -506,7 +520,7 @@ GNUNET_SERVER_create (GNUNET_CONNECTION_AccessCheck access, void *access_cls,
     if (j == 0)
     {
       if (errno != 0)
-        GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "bind");
+        LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "bind");
       GNUNET_free (lsocks);
       lsocks = NULL;
     }
@@ -533,7 +547,7 @@ GNUNET_SERVER_destroy (struct GNUNET_SERVER_Handle *s)
   unsigned int i;
 
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server shutting down.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Server shutting down.\n");
 #endif
   if (GNUNET_SCHEDULER_NO_TASK != s->listen_task)
   {
@@ -592,6 +606,20 @@ GNUNET_SERVER_add_handlers (struct GNUNET_SERVER_Handle *server,
 }
 
 
+void
+GNUNET_SERVER_set_callbacks (struct GNUNET_SERVER_Handle *server,
+                             GNUNET_SERVER_MstCreateCallback create,
+                             GNUNET_SERVER_MstDestroyCallback destroy,
+                             GNUNET_SERVER_MstReceiveCallback receive,
+                             void *cls)
+{
+  server->mst_create = create;
+  server->mst_destroy = destroy;
+  server->mst_receive = receive;
+  server->mst_cls = cls;
+}
+
+
 /**
  * Task run to warn about missing calls to 'GNUNET_SERVER_receive_done'.
  *
@@ -607,13 +635,12 @@ warn_no_receive_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
                                     &warn_no_receive_done, client);
   if (0 == (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                _
-                ("Processing code for message of type %u did not call GNUNET_SERVER_receive_done after %llums\n"),
-                (unsigned int) client->warn_type,
-                (unsigned long long)
-                GNUNET_TIME_absolute_get_duration
-                (client->warn_start).rel_value);
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+         _
+         ("Processing code for message of type %u did not call GNUNET_SERVER_receive_done after %llums\n"),
+         (unsigned int) client->warn_type,
+         (unsigned long long)
+         GNUNET_TIME_absolute_get_duration (client->warn_start).rel_value);
 }
 
 
@@ -666,9 +693,9 @@ GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
   size = ntohs (message->size);
 #if DEBUG_SERVER
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Server schedules transmission of %u-byte message of type %u to client.\n",
-              size, type);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Server schedules transmission of %u-byte message of type %u to client.\n",
+       size, type);
 #endif
   pos = server->handlers;
   found = GNUNET_NO;
@@ -683,9 +710,9 @@ GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
         if ((mh->expected_size != 0) && (mh->expected_size != size))
         {
 #if GNUNET8_NETWORK_IS_DEAD
-          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                      "Expected %u bytes for message of type %u, got %u\n",
-                      mh->expected_size, mh->type, size);
+          LOG (GNUNET_ERROR_TYPE_WARNING,
+               "Expected %u bytes for message of type %u, got %u\n",
+               mh->expected_size, mh->type, size);
           GNUNET_break_op (0);
 #endif
           return GNUNET_SYSERR;
@@ -711,8 +738,8 @@ GNUNET_SERVER_inject (struct GNUNET_SERVER_Handle *server,
   }
   if (found == GNUNET_NO)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                "Received message of unknown type %d\n", type);
+    LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
+         "Received message of unknown type %d\n", type);
     if (server->require_found == GNUNET_YES)
       return GNUNET_SYSERR;
   }
@@ -758,9 +785,9 @@ process_mst (struct GNUNET_SERVER_Client *client, int ret)
     {
       client->receive_pending = GNUNET_YES;
 #if DEBUG_SERVER
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Server re-enters receive loop, timeout: %llu.\n",
-                  client->idle_timeout.rel_value);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Server re-enters receive loop, timeout: %llu.\n",
+           client->idle_timeout.rel_value);
 #endif
       GNUNET_CONNECTION_receive (client->connection,
                                  GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
@@ -769,24 +796,29 @@ process_mst (struct GNUNET_SERVER_Client *client, int ret)
       break;
     }
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Server processes additional messages instantly.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Server processes additional messages instantly.\n");
 #endif
-    ret =
-        GNUNET_SERVER_mst_receive (client->mst, client, NULL, 0, GNUNET_NO,
-                                   GNUNET_YES);
+    if (client->server->mst_receive != NULL)
+      ret =
+          client->server->mst_receive (client->server->mst_cls, client->mst,
+                                       client, NULL, 0, GNUNET_NO, GNUNET_YES);
+    else
+      ret =
+          GNUNET_SERVER_mst_receive (client->mst, client, NULL, 0, GNUNET_NO,
+                                     GNUNET_YES);
   }
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Server leaves instant processing loop: ret = %d, server = %p, shutdown = %d, suspended = %u\n",
-              ret, client->server, client->shutdown_now, client->suspended);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Server leaves instant processing loop: ret = %d, server = %p, shutdown = %d, suspended = %u\n",
+       ret, client->server, client->shutdown_now, client->suspended);
 #endif
 
   if (ret == GNUNET_NO)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Server has more data pending but is suspended.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Server has more data pending but is suspended.\n");
 #endif
     client->receive_pending = GNUNET_SYSERR;    /* data pending */
   }
@@ -828,9 +860,9 @@ process_incoming (void *cls, const void *buf, size_t available,
   {
     /* wait longer, timeout changed (i.e. due to us sending) */
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Receive time out, but no disconnect due to sending (%p)\n",
-                GNUNET_a2s (addr, addrlen));
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Receive time out, but no disconnect due to sending (%p)\n",
+         GNUNET_a2s (addr, addrlen));
 #endif
     client->receive_pending = GNUNET_YES;
     GNUNET_CONNECTION_receive (client->connection,
@@ -848,14 +880,21 @@ process_incoming (void *cls, const void *buf, size_t available,
     return;
   }
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Server receives %u bytes from `%s'.\n",
-              (unsigned int) available, GNUNET_a2s (addr, addrlen));
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Server receives %u bytes from `%s'.\n",
+       (unsigned int) available, GNUNET_a2s (addr, addrlen));
 #endif
   GNUNET_SERVER_client_keep (client);
   client->last_activity = now;
-  ret =
-      GNUNET_SERVER_mst_receive (client->mst, client, buf, available, GNUNET_NO,
-                                 GNUNET_YES);
+
+  if (server->mst_receive != NULL)
+    ret =
+        client->server->mst_receive (client->server->mst_cls, client->mst,
+                                     client, buf, available, GNUNET_NO, GNUNET_YES);
+  else
+    ret =
+        GNUNET_SERVER_mst_receive (client->mst, client, buf, available, GNUNET_NO,
+                                   GNUNET_YES);
+
   process_mst (client, ret);
 }
 
@@ -883,8 +922,7 @@ restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if (client->receive_pending == GNUNET_NO)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Server begins to read again from client.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "Server begins to read again from client.\n");
 #endif
     client->receive_pending = GNUNET_YES;
     GNUNET_CONNECTION_receive (client->connection,
@@ -893,8 +931,8 @@ restart_processing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     return;
   }
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Server continues processing messages still in the buffer.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Server continues processing messages still in the buffer.\n");
 #endif
   GNUNET_SERVER_client_keep (client);
   client->receive_pending = GNUNET_NO;
@@ -920,9 +958,9 @@ client_message_tokenizer_callback (void *cls, void *client,
 
 #if DEBUG_SERVER
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Tokenizer gives server message of type %u from client\n",
-              ntohs (message->type));
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Tokenizer gives server message of type %u from client\n",
+       ntohs (message->type));
 #endif
   sender->in_process_client_buffer = GNUNET_YES;
   ret = GNUNET_SERVER_inject (server, sender, message);
@@ -952,8 +990,6 @@ GNUNET_SERVER_connect_socket (struct GNUNET_SERVER_Handle *server,
 
   client = GNUNET_malloc (sizeof (struct GNUNET_SERVER_Client));
   client->connection = connection;
-  client->mst =
-      GNUNET_SERVER_mst_create (&client_message_tokenizer_callback, server);
   client->reference_count = 1;
   client->server = server;
   client->last_activity = GNUNET_TIME_absolute_get ();
@@ -963,6 +999,14 @@ GNUNET_SERVER_connect_socket (struct GNUNET_SERVER_Handle *server,
   client->receive_pending = GNUNET_YES;
   client->callback = NULL;
   client->callback_cls = NULL;
+
+  if (server->mst_create != NULL)
+    client->mst =
+        server->mst_create (server->mst_cls, client);
+  else
+    client->mst =
+        GNUNET_SERVER_mst_create (&client_message_tokenizer_callback, server);
+
   GNUNET_CONNECTION_receive (client->connection,
                              GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
                              client->idle_timeout, &process_incoming, client);
@@ -986,6 +1030,14 @@ GNUNET_SERVER_client_set_timeout (struct GNUNET_SERVER_Client *client,
 }
 
 
+void
+GNUNET_SERVER_client_set_finish_pending_write (struct GNUNET_SERVER_Client *client,
+                                               int finish)
+{
+  client->finish_pending_write = finish;
+}
+
+
 /**
  * Notify the server that the given client handle should
  * be kept (keeps the connection up if possible, increments
@@ -1114,8 +1166,8 @@ GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
   unsigned int rc;
 
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client is being disconnected from the server.\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client is being disconnected from the server.\n");
 #endif
   if (client->restart_task != GNUNET_SCHEDULER_NO_TASK)
   {
@@ -1134,10 +1186,9 @@ GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
   }
 
   rc = client->reference_count;
-  if (client->server != NULL)
+  if (client->shutdown_now != GNUNET_YES)
   {
     server = client->server;
-    client->server = NULL;
     client->shutdown_now = GNUNET_YES;
     prev = NULL;
     pos = server->clients;
@@ -1171,24 +1222,29 @@ GNUNET_SERVER_client_disconnect (struct GNUNET_SERVER_Client *client)
   if (rc > 0)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "RC still positive, not destroying everything.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "RC still positive, not destroying everything.\n");
 #endif
     return;
   }
   if (client->in_process_client_buffer == GNUNET_YES)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Still processing inputs, not destroying everything.\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Still processing inputs, not destroying everything.\n");
 #endif
     return;
   }
 
   if (client->persist == GNUNET_YES)
     GNUNET_CONNECTION_persist_ (client->connection);
-  GNUNET_CONNECTION_destroy (client->connection, GNUNET_NO);
-  GNUNET_SERVER_mst_destroy (client->mst);
+  GNUNET_CONNECTION_destroy (client->connection, client->finish_pending_write);
+
+  if (client->server->mst_destroy != NULL)
+    client->server->mst_destroy (client->server->mst_cls, client->mst);
+  else
+    GNUNET_SERVER_mst_destroy (client->mst);
+
   GNUNET_free (client);
 }
 
@@ -1296,8 +1352,8 @@ GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
   if (success != GNUNET_OK)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "GNUNET_SERVER_receive_done called with failure indication\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "GNUNET_SERVER_receive_done called with failure indication\n");
 #endif
     GNUNET_SERVER_client_disconnect (client);
     return;
@@ -1305,8 +1361,8 @@ GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
   if (client->suspended > 0)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "GNUNET_SERVER_receive_done called, but more clients pending\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "GNUNET_SERVER_receive_done called, but more clients pending\n");
 #endif
     return;
   }
@@ -1318,19 +1374,19 @@ GNUNET_SERVER_receive_done (struct GNUNET_SERVER_Client *client, int success)
   if (client->in_process_client_buffer == GNUNET_YES)
   {
 #if DEBUG_SERVER
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "GNUNET_SERVER_receive_done called while still in processing loop\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "GNUNET_SERVER_receive_done called while still in processing loop\n");
 #endif
     return;
   }
-  if (client->server == NULL)
+  if ((client->server == NULL) || (GNUNET_YES == client->shutdown_now))
   {
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
 #if DEBUG_SERVER
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "GNUNET_SERVER_receive_done causes restart in reading from the socket\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "GNUNET_SERVER_receive_done causes restart in reading from the socket\n");
 #endif
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == client->restart_task);
   client->restart_task = GNUNET_SCHEDULER_add_now (&restart_processing, client);