fair, global message buffer implemented
[oweals/gnunet.git] / src / cadet / gnunet-service-cadet-new_tunnels.c
index e41164220c3ba718a84f311dfa740f5ec4ba90d2..ff07ac6653866e7378284a22b4a234e95a5d7bc5 100644 (file)
  *
  * FIXME:
  * - proper connection evaluation during connection management:
- *   + when managing connections, distinguish those that
- *     have (recently) had traffic from those that were
- *     never ready (or not recently)
- *   + consider quality of current connection set when deciding
- *     how often to do maintenance
+ *   + consider quality (or quality spread?) of current connection set
+ *     when deciding how often to do maintenance
  *   + interact with PEER to drive DHT GET/PUT operations based
  *     on how much we like our connections
  */
@@ -277,7 +274,7 @@ struct CadetTunnelQueueEntry
   /**
    * Continuation to call once sent (on the channel layer).
    */
-  GNUNET_SCHEDULER_TaskCallback cont;
+  GCT_SendContinuation cont;
 
   /**
    * Closure for @c cont.
@@ -408,6 +405,13 @@ struct CadetTunnel
    */
   struct CadetTunnelQueueEntry *tq_tail;
 
+  /**
+   * Identification of the connection from which we are currently processing
+   * a message. Only valid (non-NULL) during #handle_decrypted() and the
+   * handle-*()-functions called from our @e mq during that function.
+   */
+  struct CadetTConnection *current_ct;
+
   /**
    * How long do we wait until we retry the KX?
    */
@@ -1809,8 +1813,11 @@ GCT_handle_kx_auth (struct CadetTConnection *ct,
                    sizeof (kx_auth)))
   {
     /* This KX_AUTH is not using the latest KX/KX_AUTH data
-       we transmitted to the sender, refuse it! */
+       we transmitted to the sender, refuse it, try KX again. */
     GNUNET_break_op (0);
+    send_kx (t,
+             NULL,
+             &t->ax);
     return;
   }
   /* Yep, we're good. */
@@ -2006,7 +2013,8 @@ destroy_tunnel (void *cls)
   while (NULL != (tq = t->tq_head))
   {
     if (NULL != tq->cont)
-      tq->cont (tq->cont_cls);
+      tq->cont (tq->cont_cls,
+                NULL);
     GCT_send_cancel (tq);
   }
   GCP_drop_tunnel (t->destination,
@@ -2037,6 +2045,7 @@ destroy_tunnel (void *cls)
     GNUNET_free (t->unverified_ax);
   }
   cleanup_ax (&t->ax);
+  GNUNET_assert (NULL == t->destroy_task);
   GNUNET_free (t);
 }
 
@@ -2061,12 +2070,14 @@ GCT_remove_channel (struct CadetTunnel *t,
                  GNUNET_CONTAINER_multihashmap32_remove (t->channels,
                                                          ntohl (ctn.cn),
                                                          ch));
-  if (0 ==
-      GCT_count_channels (t))
+  if ( (0 ==
+        GCT_count_channels (t)) &&
+       (NULL == t->destroy_task) )
   {
-    t->destroy_task = GNUNET_SCHEDULER_add_delayed (IDLE_DESTROY_DELAY,
-                                                    &destroy_tunnel,
-                                                    t);
+    t->destroy_task
+      = GNUNET_SCHEDULER_add_delayed (IDLE_DESTROY_DELAY,
+                                      &destroy_tunnel,
+                                      t);
   }
 }
 
@@ -2086,7 +2097,8 @@ destroy_remaining_channels (void *cls,
 {
   struct CadetChannel *ch = value;
 
-  GCCH_handle_remote_destroy (ch);
+  GCCH_handle_remote_destroy (ch,
+                              NULL);
   return GNUNET_OK;
 }
 
@@ -2153,7 +2165,8 @@ try_send_normal_payload (struct CadetTunnel *t,
   GCC_transmit (ct->cc,
                 tq->env);
   if (NULL != tq->cont)
-    tq->cont (tq->cont_cls);
+    tq->cont (tq->cont_cls,
+              GCC_get_id (ct->cc));
   GNUNET_free (tq);
 }
 
@@ -2338,9 +2351,13 @@ evaluate_connection (void *cls,
   struct EvaluationSummary *es = cls;
   struct CadetConnection *cc = ct->cc;
   struct CadetPeerPath *ps = GCC_get_path (cc);
+  const struct CadetConnectionMetrics *metrics;
   GNUNET_CONTAINER_HeapCostType ct_desirability;
+  struct GNUNET_TIME_Relative uptime;
+  struct GNUNET_TIME_Relative last_use;
   uint32_t ct_length;
   double score;
+  double success_rate;
 
   if (ps == es->path)
   {
@@ -2352,11 +2369,17 @@ evaluate_connection (void *cls,
   }
   ct_desirability = GCPP_get_desirability (ps);
   ct_length = GCPP_get_length (ps);
-
-  /* FIXME: calculate score on more than path,
-     include connection performance metrics like
-     last successful transmission, uptime, etc. */
-  score = ct_desirability + ct_length; /* FIXME: weigh these as well! */
+  metrics = GCC_get_metrics (cc);
+  uptime = GNUNET_TIME_absolute_get_duration (metrics->age);
+  last_use = GNUNET_TIME_absolute_get_duration (metrics->last_use);
+  /* We add 1.0 here to avoid division by zero. */
+  success_rate = (metrics->num_acked_transmissions + 1.0) / (metrics->num_successes + 1.0);
+  score
+    = ct_desirability
+    + 100.0 / (1.0 + ct_length) /* longer paths = better */
+    + sqrt (uptime.rel_value_us / 60000000LL) /* larger uptime = better */
+    - last_use.rel_value_us / 1000L;          /* longer idle = worse */
+  score *= success_rate;        /* weigh overall by success rate */
 
   if ( (NULL == es->worst) ||
        (score < es->worst_score) )
@@ -2615,6 +2638,7 @@ handle_plaintext_data (void *cls,
     return;
   }
   GCCH_handle_channel_plaintext_data (ch,
+                                      GCC_get_id (t->current_ct->cc),
                                       msg);
 }
 
@@ -2648,6 +2672,7 @@ handle_plaintext_data_ack (void *cls,
     return;
   }
   GCCH_handle_channel_plaintext_data_ack (ch,
+                                          GCC_get_id (t->current_ct->cc),
                                           ack);
 }
 
@@ -2675,7 +2700,8 @@ handle_plaintext_channel_open (void *cls,
          GNUNET_h2s (&copen->port),
          GCT_2s (t),
          GCCH_2s (ch));
-    GCCH_handle_duplicate_open (ch);
+    GCCH_handle_duplicate_open (ch,
+                                GCC_get_id (t->current_ct->cc));
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2752,7 +2778,8 @@ handle_plaintext_channel_open_ack (void *cls,
        "Received channel OPEN_ACK on channel %s from %s\n",
        GCCH_2s (ch),
        GCT_2s (t));
-  GCCH_handle_channel_open_ack (ch);
+  GCCH_handle_channel_open_ack (ch,
+                                GCC_get_id (t->current_ct->cc));
 }
 
 
@@ -2785,7 +2812,8 @@ handle_plaintext_channel_destroy (void *cls,
        "Receicved channel DESTROY on %s from %s\n",
        GCCH_2s (ch),
        GCT_2s (t));
-  GCCH_handle_remote_destroy (ch);
+  GCCH_handle_remote_destroy (ch,
+                              GCC_get_id (t->current_ct->cc));
 }
 
 
@@ -2803,6 +2831,7 @@ handle_decrypted (void *cls,
 {
   struct CadetTunnel *t = cls;
 
+  GNUNET_assert (NULL != t->current_ct);
   GNUNET_MQ_inject_message (t->mq,
                             msg);
   return GNUNET_OK;
@@ -3089,12 +3118,14 @@ GCT_handle_encrypted (struct CadetTConnection *ct,
   }
 
   /* The MST will ultimately call #handle_decrypted() on each message. */
+  t->current_ct = ct;
   GNUNET_break_op (GNUNET_OK ==
                    GNUNET_MST_from_buffer (t->mst,
                                            cbuf,
                                            decrypted_size,
                                            GNUNET_YES,
                                            GNUNET_NO));
+  t->current_ct = NULL;
 }
 
 
@@ -3111,7 +3142,7 @@ GCT_handle_encrypted (struct CadetTConnection *ct,
 struct CadetTunnelQueueEntry *
 GCT_send (struct CadetTunnel *t,
           const struct GNUNET_MessageHeader *message,
-          GNUNET_SCHEDULER_TaskCallback cont,
+          GCT_SendContinuation cont,
           void *cont_cls)
 {
   struct CadetTunnelQueueEntry *tq;