convert fs publish to MQ
[oweals/gnunet.git] / src / core / gnunet-service-core_clients.c
index 719e42d0019fa662dfb7730539f163c679470016..c2198848fea452ac4d67f65e7895cd8902bf0453 100644 (file)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2009, 2010, 2011 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -14,8 +14,8 @@
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
@@ -29,6 +29,7 @@
 #include "gnunet_transport_service.h"
 #include "gnunet-service-core.h"
 #include "gnunet-service-core_clients.h"
+#include "gnunet-service-core_neighbours.h"
 #include "gnunet-service-core_sessions.h"
 #include "gnunet-service-core_typemap.h"
 #include "core.h"
@@ -149,7 +150,8 @@ find_client (struct GNUNET_SERVER_Client *client)
  */
 static void
 send_to_client (struct GSC_Client *client,
-                const struct GNUNET_MessageHeader *msg, int can_drop)
+                const struct GNUNET_MessageHeader *msg,
+                int can_drop)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Preparing to send %u bytes of message of type %u to client.\n",
@@ -197,8 +199,9 @@ type_match (uint16_t type, struct GSC_Client *c)
 {
   unsigned int i;
 
-  if (c->tcnt == 0)
-    return GNUNET_YES;          /* peer without handlers matches ALL */
+  if (c->tcnt == 0 && c->options != 0)
+    return GNUNET_YES;          /* peer without handlers and inbound/outbond
+                                  callbacks matches ALL */
   for (i = 0; i < c->tcnt; i++)
     if (type == c->types[i])
       return GNUNET_YES;
@@ -219,8 +222,10 @@ type_match (uint16_t type, struct GSC_Client *c)
  */
 static void
 send_to_all_clients (const struct GNUNET_PeerIdentity *partner,
-                     const struct GNUNET_MessageHeader *msg, int can_drop,
-                     uint32_t options, uint16_t type)
+                     const struct GNUNET_MessageHeader *msg,
+                     int can_drop,
+                     uint32_t options,
+                     uint16_t type)
 {
   struct GSC_Client *c;
   int tm;
@@ -396,9 +401,17 @@ handle_client_send_request (void *cls,
   {
     /* dequeue and recycle memory from pending request, there can only
        be at most one per client and peer */
+    GNUNET_STATISTICS_update (GSC_stats,
+                              gettext_noop
+                              ("# dequeuing CAR (duplicate request)"), 1,
+                              GNUNET_NO);
     GSC_SESSIONS_dequeue_request (car);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Transmission request to `%s' was a duplicate!\n",
+                GNUNET_i2s (&req->peer));
   }
   car->target = req->peer;
+  car->received_time = GNUNET_TIME_absolute_get ();
   car->deadline = GNUNET_TIME_absolute_ntoh (req->deadline);
   car->priority = (enum GNUNET_CORE_Priority) ntohl (req->priority);
   car->msize = ntohs (req->size);
@@ -441,7 +454,7 @@ struct TokenizerContext
 
 
 /**
- * Handle CORE_SEND request.
+ * Handle #GNUNET_MESSAGE_TYPE_CORE_SEND request.
  *
  * @param cls unused
  * @param client the client issuing the request
@@ -456,10 +469,11 @@ handle_client_send (void *cls,
   struct GSC_Client *c;
   struct TokenizerContext tc;
   uint16_t msize;
+  struct GNUNET_TIME_Relative delay;
+  struct GNUNET_TIME_Relative overdue;
 
   msize = ntohs (message->size);
-  if (msize <
-      sizeof (struct SendMessage) + sizeof (struct GNUNET_MessageHeader))
+  if (msize < sizeof (struct SendMessage))
   {
     GNUNET_break (0);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -476,8 +490,9 @@ handle_client_send (void *cls,
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
-  tc.car =
-      GNUNET_CONTAINER_multipeermap_get (c->requests, &sm->peer);
+  tc.car
+    = GNUNET_CONTAINER_multipeermap_get (c->requests,
+                                         &sm->peer);
   if (NULL == tc.car)
   {
     /* Must have been that we first approved the request, then got disconnected
@@ -489,28 +504,44 @@ handle_client_send (void *cls,
                               gettext_noop
                               ("# messages discarded (session disconnected)"),
                               1, GNUNET_NO);
-    GNUNET_SERVER_receive_done (client, GNUNET_OK);
+    GNUNET_SERVER_receive_done (client,
+                                GNUNET_OK);
     return;
   }
+  delay = GNUNET_TIME_absolute_get_duration (tc.car->received_time);
+  overdue = GNUNET_TIME_absolute_get_duration (tc.car->deadline);
+  tc.cork = ntohl (sm->cork);
+  tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority);
+  if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Client waited %s for transmission of %u bytes to `%s'%s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES),
+                msize,
+                GNUNET_i2s (&sm->peer),
+                tc.cork ? "" : " (corked)");
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client waited %s for transmission of %u bytes to `%s'%s\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES),
+                msize,
+                GNUNET_i2s (&sm->peer),
+                tc.cork ? "" : " (corked)");
+
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (c->requests,
                                                        &sm->peer,
                                                        tc.car));
-  tc.cork = ntohl (sm->cork);
-  tc.priority = (enum GNUNET_CORE_Priority) ntohl (sm->priority);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client asked for transmission of %u bytes to `%s' %s\n",
-              msize,
-              GNUNET_i2s (&sm->peer), tc.cork ? "now" : "");
   GNUNET_SERVER_mst_receive (client_mst, &tc,
-                             (const char *) &sm[1], msize,
-                             GNUNET_YES, GNUNET_NO);
-  if (0 !=
-      memcmp (&tc.car->target, &GSC_my_identity,
-              sizeof (struct GNUNET_PeerIdentity)))
-    GSC_SESSIONS_dequeue_request (tc.car);
+                             (const char *) &sm[1],
+                             msize,
+                             GNUNET_YES,
+                             GNUNET_NO);
+  GSC_SESSIONS_dequeue_request (tc.car);
   GNUNET_free (tc.car);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  GNUNET_SERVER_receive_done (client,
+                              GNUNET_OK);
 }
 
 
@@ -525,7 +556,8 @@ handle_client_send (void *cls,
  * @param message the actual message
  */
 static int
-client_tokenizer_callback (void *cls, void *client,
+client_tokenizer_callback (void *cls,
+                           void *client,
                            const struct GNUNET_MessageHeader *message)
 {
   struct TokenizerContext *tc = client;
@@ -535,9 +567,13 @@ client_tokenizer_callback (void *cls, void *client,
   GNUNET_snprintf (buf, sizeof (buf),
                   gettext_noop ("# bytes of messages of type %u received"),
                   (unsigned int) ntohs (message->type));
-  GNUNET_STATISTICS_update (GSC_stats, buf, ntohs (message->size), GNUNET_NO);
+  GNUNET_STATISTICS_update (GSC_stats,
+                            buf,
+                            ntohs (message->size),
+                            GNUNET_NO);
   if (0 ==
-      memcmp (&car->target, &GSC_my_identity,
+      memcmp (&car->target,
+              &GSC_my_identity,
               sizeof (struct GNUNET_PeerIdentity)))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -618,7 +654,8 @@ handle_client_disconnect (void *cls,
   if (NULL == client)
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Client %p has disconnected from core service.\n", client);
+              "Client %p has disconnected from core service.\n",
+              client);
   c = find_client (client);
   if (c == NULL)
     return;                     /* client never sent INIT */
@@ -654,6 +691,8 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
 {
   struct GSC_Client *c;
   struct SendMessageReady smr;
+  struct GNUNET_TIME_Relative delay;
+  struct GNUNET_TIME_Relative left;
 
   c = car->client_handle;
   if (GNUNET_YES !=
@@ -662,12 +701,26 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
   {
     /* connection has gone down since, drop request */
     GNUNET_assert (0 !=
-                   memcmp (&car->target, &GSC_my_identity,
+                   memcmp (&car->target,
+                           &GSC_my_identity,
                            sizeof (struct GNUNET_PeerIdentity)));
     GSC_SESSIONS_dequeue_request (car);
-    GSC_CLIENTS_reject_request (car);
+    GSC_CLIENTS_reject_request (car,
+                                GNUNET_NO);
     return;
   }
+  delay = GNUNET_TIME_absolute_get_duration (car->received_time);
+  left = GNUNET_TIME_absolute_get_duration (car->deadline);
+  if (left.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Client waited %s for permission to transmit to `%s'%s (priority %u)\n",
+                GNUNET_STRINGS_relative_time_to_string (delay,
+                                                        GNUNET_YES),
+                GNUNET_i2s (&car->target),
+                (0 == left.rel_value_us)
+                ? " (past deadline)"
+                : "",
+                car->priority);
   smr.header.size = htons (sizeof (struct SendMessageReady));
   smr.header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_READY);
   smr.size = htons (car->msize);
@@ -678,21 +731,28 @@ GSC_CLIENTS_solicit_request (struct GSC_ClientActiveRequest *car)
 
 
 /**
- * Tell a client that we will never be ready to receive the
- * given message in time (disconnect or timeout).
+ * We will never be ready to transmit the given message in (disconnect
+ * or invalid request).  Frees resources associated with @a car.  We
+ * don't explicitly tell the client, he'll learn with the disconnect
+ * (or violated the protocol).
  *
  * @param car request that now permanently failed; the
  *        responsibility for the handle is now returned
  *        to CLIENTS (SESSIONS is done with it).
+ * @param drop_client #GNUNET_YES if the client violated the protocol
+ *        and we should thus drop the connection
  */
 void
-GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car)
+GSC_CLIENTS_reject_request (struct GSC_ClientActiveRequest *car,
+                            int drop_client)
 {
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multipeermap_remove (car->
                                                        client_handle->requests,
                                                        &car->target,
                                                        car));
+  if (GNUNET_YES == drop_client)
+    GNUNET_SERVER_client_disconnect (car->client_handle->client_handle);
   GNUNET_free (car);
 }
 
@@ -823,7 +883,7 @@ GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender,
          (0 != (options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) ))
     return; /* no client cares about this message notification */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Core service passes message from `%4s' of type %u to client.\n",
+              "Core service passes message from `%s' of type %u to client.\n",
               GNUNET_i2s (sender),
               (unsigned int) ntohs (msg->type));
   GSC_SESSIONS_add_to_typemap (sender, ntohs (msg->type));