- cancel transmit requests before destroying channels, warn API users if not
authorBart Polot <bart@net.in.tum.de>
Fri, 2 Oct 2015 03:37:41 +0000 (03:37 +0000)
committerBart Polot <bart@net.in.tum.de>
Fri, 2 Oct 2015 03:37:41 +0000 (03:37 +0000)
src/cadet/cadet_api.c
src/cadet/test_cadet.c

index c1a4807b57d81da62ed3bf2faebde155fee53082..48de33041489192768487fe23ac7c6de5a6ee571 100644 (file)
@@ -1683,6 +1683,8 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel)
       {
         /* applications should cancel before destroying channel */
         GNUNET_break (0);
+        LOG (GNUNET_ERROR_TYPE_WARNING,
+             "Channel destroyed without cancelling transmission requests\n");
         th->notify (th->notify_cls, 0, NULL);
       }
       GNUNET_CADET_notify_transmit_ready_cancel (th);
index 31c2094de2ef06b39e531bc7bea44155e22c4dd6..2b69579a5afcaa13a08df7e520c65b2ab46b230d 100644 (file)
@@ -176,6 +176,17 @@ static struct GNUNET_CADET_Channel *ch;
  */
 static struct GNUNET_CADET_Channel *incoming_ch;
 
+/**
+ * Transmit handle for root data calls
+ */
+static struct GNUNET_CADET_TransmitHandle *th;
+
+/**
+ * Transmit handle for root data calls
+ */
+static struct GNUNET_CADET_TransmitHandle *incoming_th;
+
+
 /**
  * Time we started the data transmission (after channel has been established
  * and initilized).
@@ -286,11 +297,21 @@ disconnect_cadet_peers (void *cls,
   }
   if (NULL != ch)
   {
+    if (NULL != th)
+    {
+      GNUNET_CADET_notify_transmit_ready_cancel (th);
+      th = NULL;
+    }
     GNUNET_CADET_channel_destroy (ch);
     ch = NULL;
   }
   if (NULL != incoming_ch)
   {
+    if (NULL != incoming_th)
+    {
+      GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
+      incoming_th = NULL;
+    }
     GNUNET_CADET_channel_destroy (incoming_ch);
     incoming_ch = NULL;
   }
@@ -372,8 +393,9 @@ static void
 gather_stats_and_exit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   disconnect_task = NULL;
+  long l = (long) cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "gathering statistics\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "gathering statistics from line %d\n", l);
 
   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
   {
@@ -385,6 +407,11 @@ gather_stats_and_exit (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 
   if (NULL != ch)
   {
+    if (NULL != th)
+    {
+      GNUNET_CADET_notify_transmit_ready_cancel (th);
+      th = NULL;
+    }
     GNUNET_CADET_channel_destroy (ch);
     ch = NULL;
   }
@@ -426,7 +453,7 @@ tmt_rdy (void *cls, size_t size, void *buf);
 
 
 /**
- * Task to schedule a new data transmission.
+ * Task to request a new data transmission.
  *
  * @param cls Closure (peer #).
  * @param tc Task Context.
@@ -434,8 +461,9 @@ tmt_rdy (void *cls, size_t size, void *buf);
 static void
 data_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_CADET_TransmitHandle *th;
   struct GNUNET_CADET_Channel *channel;
+  static struct GNUNET_CADET_TransmitHandle **pth;
+
   long src;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Data task\n");
@@ -446,24 +474,24 @@ data_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if (GNUNET_YES == test_backwards)
   {
     channel = incoming_ch;
+    pth = &incoming_th;
     src = peers_requested - 1;
   }
   else
   {
     channel = ch;
+    pth = &th;
     src = 0;
   }
 
-  if (NULL == channel)
-  {
-    GNUNET_break (0);
-    return;
-  }
-  th = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           size_payload + data_sent,
-                                           &tmt_rdy, (void *) src);
-  if (NULL == th)
+  GNUNET_assert (NULL != channel);
+  GNUNET_assert (NULL == *pth);
+
+  *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             size_payload + data_sent,
+                                             &tmt_rdy, (void *) src);
+  if (NULL == *pth)
   {
     unsigned long i = (unsigned long) cls;
 
@@ -478,9 +506,9 @@ data_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
     {
       i++;
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "in %u ms\n", i);
-      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
-                                      GNUNET_TIME_UNIT_MILLISECONDS,
-                                      i),
+      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
+                                        GNUNET_TIME_UNIT_MILLISECONDS,
+                                        i),
                                     &data_task, (void *) i);
     }
   }
@@ -504,6 +532,12 @@ tmt_rdy (void *cls, size_t size, void *buf)
   unsigned int counter;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmt_rdy on %ld, filling buffer\n", id);
+  if (0 == id)
+    th = NULL;
+  else if ((peers_requested - 1) == id)
+    incoming_th = NULL;
+  else
+    GNUNET_assert (0);
   counter = get_expected_target () == id ? ack_sent : data_sent;
   msg_size = size_payload + counter;
   if (size < msg_size || NULL == buf)
@@ -551,7 +585,7 @@ tmt_rdy (void *cls, size_t size, void *buf)
 /**
  * Function is called whenever a message is received.
  *
- * @param cls closure (set from GNUNET_CADET_connect)
+ * @param cls closure (set from GNUNET_CADET_connect, peer number)
  * @param channel connection to the other end
  * @param channel_ctx place to store local state associated with the channel
  * @param message the actual message
@@ -563,6 +597,7 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
                void **channel_ctx,
                const struct GNUNET_MessageHeader *message)
 {
+  struct GNUNET_CADET_TransmitHandle **pth;
   long client = (long) cls;
   long expected_target_client;
   uint32_t *data;
@@ -590,15 +625,19 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
   {
   case 0L:
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
+    GNUNET_assert (channel == ch);
+    pth = &th;
     break;
   case 1L:
   case 4L:
     GNUNET_assert (client == peers_requested - 1);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %li got a message.\n",
+    GNUNET_assert (channel == incoming_ch);
+    pth = &incoming_th;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %ld got a message.\n",
                 client);
     break;
   default:
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %li not valid.\n", client);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %ld not valid.\n", client);
     GNUNET_assert (0);
   }
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal);
@@ -635,10 +674,11 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
     if (SPEED != test || (ok_goal - 2) == ok)
     {
       /* Send ACK */
-      GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          size_payload + ack_sent, &tmt_rdy,
-                                          (void *) client);
+      GNUNET_assert (NULL == *pth);
+      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 size_payload + ack_sent,
+                                                 &tmt_rdy, (void *) client);
       return GNUNET_OK;
     }
     else
@@ -653,10 +693,12 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
     {
       ack_received++;
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
-      GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                          GNUNET_TIME_UNIT_FOREVER_REL,
-                                          size_payload + data_sent, &tmt_rdy,
-                                          (void *) client);
+      /* send more data */
+      GNUNET_assert (NULL == *pth);
+      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 size_payload + data_sent,
+                                                 &tmt_rdy, (void *) client);
       if (ack_received < TOTAL_PACKETS && SPEED != test)
         return GNUNET_OK;
       if (ok == 2 && SPEED == test)
@@ -665,11 +707,21 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
     }
     if (test == P2P_SIGNAL)
     {
+      if (NULL != incoming_th)
+      {
+        GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
+        incoming_th = NULL;
+      }
       GNUNET_CADET_channel_destroy (incoming_ch);
       incoming_ch = NULL;
     }
     else
     {
+      if (NULL != th)
+      {
+        GNUNET_CADET_notify_transmit_ready_cancel (th);
+        th = NULL;
+      }
       GNUNET_CADET_channel_destroy (ch);
       ch = NULL;
     }
@@ -680,7 +732,8 @@ data_callback (void *cls, struct GNUNET_CADET_Channel *channel,
 
 
 /**
- * Handlers, for diverse services
+ * Data handlers for every message type of CADET's payload.
+ * {callback_function, message_type, size_expected}
  */
 static struct GNUNET_CADET_MessageHandler handlers[] = {
   {&data_callback, 1, sizeof (struct GNUNET_MessageHeader)},
@@ -733,7 +786,7 @@ incoming_channel (void *cls, struct GNUNET_CADET_Channel *channel,
  * Function called whenever an inbound channel is destroyed.  Should clean up
  * any associated state.
  *
- * @param cls closure (set from GNUNET_CADET_connect)
+ * @param cls closure (set from GNUNET_CADET_connect, peer number)
  * @param channel connection to the other end (henceforth invalid)
  * @param channel_ctx place where local state associated
  *                   with the channel is stored
@@ -781,7 +834,7 @@ channel_cleaner (void *cls, const struct GNUNET_CADET_Channel *channel,
  * START THE TESTCASE ITSELF, AS WE ARE CONNECTED TO THE CADET SERVICES.
  *
  * Testcase continues when the root receives confirmation of connected peers,
- * on callback funtion ch.
+ * on callback function ch.
  *
  * @param cls Closure (unused).
  * @param tc Task Context.
@@ -794,7 +847,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0)
     return;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "do_test\n");
 
   if (NULL != disconnect_task)
   {
@@ -820,10 +873,10 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
   data_sent = 0;
   ack_received = 0;
   ack_sent = 0;
-  GNUNET_CADET_notify_transmit_ready (ch, GNUNET_NO,
-                                      GNUNET_TIME_UNIT_FOREVER_REL,
-                                      size_payload + 1000,
-                                      &tmt_rdy, (void *) 0L);
+  th = GNUNET_CADET_notify_transmit_ready (ch, GNUNET_NO,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           size_payload + 1000,
+                                           &tmt_rdy, (void *) 0L);
 }
 
 /**