optimize mqm_head scans by avoiding constantly scanning over definitively non-ready...
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new.c
index 3bcf35db3ae00a9323c69a094fa031975666c501..55c7d1bdb128b9cc6641be840b3bf3c636c0cd6c 100644 (file)
@@ -131,7 +131,7 @@ struct GNUNET_PeerIdentity my_full_id;
 struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key;
 
 /**
- * Signal that shutdown is happening: prevent recover measures.
+ * Signal that shutdown is happening: prevent recovery measures.
  */
 int shutting_down;
 
@@ -183,6 +183,16 @@ unsigned long long ratchet_messages;
  */
 struct GNUNET_TIME_Relative ratchet_time;
 
+/**
+ * How frequently do we send KEEPALIVE messages on idle connections?
+ */
+struct GNUNET_TIME_Relative keepalive_period;
+
+/**
+ * Set to non-zero values to create random drops to test retransmissions.
+ */
+unsigned long long drop_percent;
+
 
 /**
  * Send a message to a client.
@@ -282,7 +292,7 @@ GSC_bind (struct CadetClient *c,
           uint32_t options)
 {
   struct GNUNET_MQ_Envelope *env;
-  struct GNUNET_CADET_LocalChannelCreateMessage *msg;
+  struct GNUNET_CADET_LocalChannelCreateMessage *cm;
   struct GNUNET_CADET_ClientChannelNumber ccn;
 
   ccn = client_get_next_ccn (c);
@@ -292,18 +302,19 @@ GSC_bind (struct CadetClient *c,
                                                       ch,
                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Accepting incoming channel %s from %s on open port %s (%u)\n",
+       "Accepting incoming %s from %s on open port %s (%u), assigning ccn %X\n",
        GCCH_2s (ch),
        GCP_2s (dest),
        GNUNET_h2s (port),
-       ntohl (options));
+       ntohl (options),
+       ntohl (ccn.channel_of_client));
   /* notify local client about incoming connection! */
-  env = GNUNET_MQ_msg (msg,
+  env = GNUNET_MQ_msg (cm,
                        GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
-  msg->ccn = ccn;
-  msg->port = *port;
-  msg->opt = htonl (options);
-  msg->peer = *GCP_get_id (dest);
+  cm->ccn = ccn;
+  cm->port = *port;
+  cm->opt = htonl (options);
+  cm->peer = *GCP_get_id (dest);
   GSC_send_to_client (c,
                       env);
   return ccn;
@@ -356,17 +367,11 @@ destroy_paths_now (void *cls,
 
 
 /**
- * Task run during shutdown.
- *
- * @param cls unused
+ * Shutdown everything once the clients have disconnected.
  */
 static void
-shutdown_task (void *cls)
+shutdown_rest ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Shutting down\n");
-  shutting_down = GNUNET_YES;
-  GCO_shutdown ();
   if (NULL != stats)
   {
     GNUNET_STATISTICS_destroy (stats,
@@ -413,6 +418,23 @@ shutdown_task (void *cls)
 }
 
 
+/**
+ * Task run during shutdown.
+ *
+ * @param cls unused
+ */
+static void
+shutdown_task (void *cls)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Shutting down\n");
+  shutting_down = GNUNET_YES;
+  GCO_shutdown ();
+  if (NULL == clients_head)
+    shutdown_rest ();
+}
+
+
 /**
  * We had a remote connection @a value to port @a port before
  * client @a cls opened port @a port.  Bind them now.
@@ -456,7 +478,7 @@ handle_port_open (void *cls,
   struct CadetClient *c = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Open port %s requested by client %s\n",
+       "Open port %s requested by %s\n",
        GNUNET_h2s (&pmsg->port),
        GSC_2s (c));
   if (NULL == c->ports)
@@ -500,7 +522,7 @@ handle_port_close (void *cls,
   struct CadetClient *c = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Closing port %s as requested by client %s\n",
+       "Closing port %s as requested by %s\n",
        GNUNET_h2s (&pmsg->port),
        GSC_2s (c));
   if (GNUNET_YES !=
@@ -550,7 +572,7 @@ handle_channel_create (void *cls,
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "New channel to %s at port %s requested by client %s\n",
+       "New channel to %s at port %s requested by %s\n",
        GNUNET_i2s (&tcm->peer),
        GNUNET_h2s (&tcm->port),
        GSC_2s (c));
@@ -601,14 +623,16 @@ handle_channel_destroy (void *cls,
     return;
   }
   LOG (GNUNET_ERROR_TYPE_INFO,
-       "Client %s is destroying channel %s\n",
+       "%s is destroying %s\n",
        GSC_2s(c),
        GCCH_2s (ch));
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap32_remove (c->channels,
                                                          ntohl (msg->ccn.channel_of_client),
                                                          ch));
-  GCCH_channel_local_destroy (ch);
+  GCCH_channel_local_destroy (ch,
+                              c,
+                              msg->ccn);
   GNUNET_SERVICE_client_continue (c->client);
 }
 
@@ -621,26 +645,48 @@ handle_channel_destroy (void *cls,
  * @return #GNUNET_OK if @a msg is OK, #GNUNET_SYSERR if not
  */
 static int
-check_data (void *cls,
-            const struct GNUNET_CADET_LocalData *msg)
+check_local_data (void *cls,
+                  const struct GNUNET_CADET_LocalData *msg)
 {
-  const struct GNUNET_MessageHeader *payload;
   size_t payload_size;
   size_t payload_claimed_size;
-
+  const char *buf;
+  struct GNUNET_MessageHeader pa;
+
+  /* FIXME: what is the format we shall allow for @a msg?
+     ONE payload item or multiple? Seems current cadet_api
+     at least in theory allows more than one. Next-gen
+     cadet_api will likely no more, so we could then
+     simplify this mess again. */
   /* Sanity check for message size */
   payload_size = ntohs (msg->header.size) - sizeof (*msg);
-  if ( (payload_size < sizeof (struct GNUNET_MessageHeader)) ||
-       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < payload_size) )
+  buf = (const char *) &msg[1];
+  while (payload_size >= sizeof (struct GNUNET_MessageHeader))
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    /* need to memcpy() for alignment */
+    GNUNET_memcpy (&pa,
+                   buf,
+                   sizeof (pa));
+    payload_claimed_size = ntohs (pa.size);
+    if ( (payload_size < payload_claimed_size) ||
+         (payload_claimed_size < sizeof (struct GNUNET_MessageHeader)) ||
+         (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < payload_claimed_size) )
+    {
+      GNUNET_break (0);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Local data of %u total size had sub-message %u at %u with %u bytes\n",
+           ntohs (msg->header.size),
+           ntohs (pa.type),
+           (unsigned int) (buf - (const char *) &msg[1]),
+           (unsigned int) payload_claimed_size);
+      return GNUNET_SYSERR;
+    }
+    payload_size -= payload_claimed_size;
+    buf += payload_claimed_size;
   }
-  payload = (struct GNUNET_MessageHeader *) &msg[1];
-  payload_claimed_size = ntohs (payload->size);
-  if (payload_size != payload_claimed_size)
+  if (0 != payload_size)
   {
-    GNUNET_break (0);
+    GNUNET_break_op (0);
     return GNUNET_SYSERR;
   }
   return GNUNET_OK;
@@ -655,12 +701,13 @@ check_data (void *cls,
  * @param msg the actual message
  */
 static void
-handle_data (void *cls,
-             const struct GNUNET_CADET_LocalData *msg)
+handle_local_data (void *cls,
+                   const struct GNUNET_CADET_LocalData *msg)
 {
   struct CadetClient *c = cls;
   struct CadetChannel *ch;
-  const struct GNUNET_MessageHeader *payload;
+  size_t payload_size;
+  const char *buf;
 
   ch = lookup_channel (c,
                        msg->ccn);
@@ -671,16 +718,18 @@ handle_data (void *cls,
     GNUNET_SERVICE_client_drop (c->client);
     return;
   }
-
-  payload = (const struct GNUNET_MessageHeader *) &msg[1];
+  payload_size = ntohs (msg->header.size) - sizeof (*msg);
+  buf = (const char *) &msg[1];
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received %u bytes payload from client %s for channel %s\n",
-       ntohs (payload->size),
+       "Received %u bytes payload from %s for %s\n",
+       (unsigned int) payload_size,
        GSC_2s (c),
        GCCH_2s (ch));
   if (GNUNET_OK !=
       GCCH_handle_local_data (ch,
-                              payload))
+                              msg->ccn,
+                              buf,
+                              payload_size))
   {
     GNUNET_SERVICE_client_drop (c->client);
     return;
@@ -696,8 +745,8 @@ handle_data (void *cls,
  * @param msg The actual message.
  */
 static void
-handle_ack (void *cls,
-            const struct GNUNET_CADET_LocalAck *msg)
+handle_local_ack (void *cls,
+                  const struct GNUNET_CADET_LocalAck *msg)
 {
   struct CadetClient *c = cls;
   struct CadetChannel *ch;
@@ -712,10 +761,11 @@ handle_ack (void *cls,
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Got a local ACK from client %s for channel %s\n",
+       "Got a local ACK from %s for %s\n",
        GSC_2s(c),
        GCCH_2s (ch));
-  GCCH_handle_local_ack (ch);
+  GCCH_handle_local_ack (ch,
+                         msg->ccn);
   GNUNET_SERVICE_client_continue (c->client);
 }
 
@@ -917,17 +967,18 @@ handle_info_tunnels (void *cls,
  * Update the message with information about the connection.
  *
  * @param cls a `struct GNUNET_CADET_LocalInfoTunnel` message to update
- * @param c a connection about which we should store information in @a cls
+ * @param ct a connection about which we should store information in @a cls
  */
 static void
 iter_connection (void *cls,
-                 struct CadetConnection *c)
+                 struct CadetTConnection *ct)
 {
   struct GNUNET_CADET_LocalInfoTunnel *msg = cls;
+  struct CadetConnection *cc = ct->cc;
   struct GNUNET_CADET_ConnectionTunnelIdentifier *h;
 
   h = (struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1];
-  h[msg->connections++] = *(GCC_get_id (c));
+  h[msg->connections++] = *(GCC_get_id (cc));
 }
 
 
@@ -1115,7 +1166,7 @@ client_connect_cb (void *cls,
                             +1,
                             GNUNET_NO);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Client %s connected\n",
+       "%s connected\n",
        GSC_2s (c));
   return c;
 }
@@ -1162,20 +1213,21 @@ channel_destroy_iterator (void *cls,
                           void *value)
 {
   struct CadetClient *c = cls;
+  struct GNUNET_CADET_ClientChannelNumber ccn;
   struct CadetChannel *ch = value;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Destroying channel %s, due to client %s disconnecting.\n",
+       "Destroying %s, due to %s disconnecting.\n",
        GCCH_2s (ch),
        GSC_2s (c));
   GNUNET_assert (GNUNET_YES ==
                  GNUNET_CONTAINER_multihashmap32_remove (c->channels,
                                                          key,
                                                          ch));
-  if (key < GNUNET_CADET_LOCAL_CHANNEL_ID_CLI)
-    GCCH_channel_local_destroy (ch);
-  else
-    GCCH_channel_incoming_destroy (ch);
+  ccn.channel_of_client = htonl (key);
+  GCCH_channel_local_destroy (ch,
+                              c,
+                              ccn);
   return GNUNET_OK;
 }
 
@@ -1196,7 +1248,7 @@ client_release_ports (void *cls,
   struct CadetClient *c = value;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Closing port %s due to client %s disconnect.\n",
+       "Closing port %s due to %s disconnect.\n",
        GNUNET_h2s (key),
        GSC_2s (c));
   GNUNET_assert (GNUNET_YES ==
@@ -1227,7 +1279,7 @@ client_disconnect_cb (void *cls,
 
   GNUNET_assert (c->client == client);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Client %s is disconnecting.\n",
+       "%s is disconnecting.\n",
        GSC_2s (c));
   if (NULL != c->channels)
   {
@@ -1251,6 +1303,9 @@ client_disconnect_cb (void *cls,
                             -1,
                             GNUNET_NO);
   GNUNET_free (c);
+  if ( (NULL == clients_head) &&
+       (GNUNET_YES == shutting_down) )
+    shutdown_rest ();
 }
 
 
@@ -1291,7 +1346,34 @@ run (void *cls,
                                "need delay value");
     ratchet_time = GNUNET_TIME_UNIT_HOURS;
   }
-
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_time (c,
+                                           "CADET",
+                                           "REFRESH_CONNECTION_TIME",
+                                           &keepalive_period))
+  {
+    GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_WARNING,
+                               "CADET",
+                               "REFRESH_CONNECTION_TIME",
+                               "need delay value");
+    keepalive_period = GNUNET_TIME_UNIT_MINUTES;
+  }
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "CADET",
+                                             "DROP_PERCENT",
+                                             &drop_percent))
+  {
+    drop_percent = 0;
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Cadet is running with DROP enabled.\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING, "This is NOT a good idea!\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Remove DROP_PERCENT from config file.\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING, "**************************************\n");
+  }
   my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (c);
   if (NULL == my_private_key)
   {
@@ -1351,11 +1433,11 @@ GNUNET_SERVICE_MAIN
                           GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY,
                           struct GNUNET_CADET_LocalChannelDestroyMessage,
                           NULL),
- GNUNET_MQ_hd_var_size (data,
+ GNUNET_MQ_hd_var_size (local_data,
                         GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
                         struct GNUNET_CADET_LocalData,
                         NULL),
- GNUNET_MQ_hd_fixed_size (ack,
+ GNUNET_MQ_hd_fixed_size (local_ack,
                           GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK,
                           struct GNUNET_CADET_LocalAck,
                           NULL),