clean up MQ error handling in cadet_api
authorChristian Grothoff <christian@grothoff.org>
Thu, 28 Jun 2018 08:24:09 +0000 (10:24 +0200)
committerChristian Grothoff <christian@grothoff.org>
Thu, 28 Jun 2018 08:24:21 +0000 (10:24 +0200)
src/cadet/cadet_api.c

index b019424f9a94062c708c162080626a8c183650e4..85a8be522d7fdb81219e467ceab72aca4aab8b40 100644 (file)
@@ -357,67 +357,52 @@ reconnect (struct GNUNET_CADET_Handle *h);
 
 
 /**
- * Reconnect callback: tries to reconnect again after a failer previous
- * reconnecttion
- *
- * @param cls closure (cadet handle)
- */
-static void
-reconnect_cbk (void *cls)
-{
-  struct GNUNET_CADET_Handle *h = cls;
-
-  h->reconnect_task = NULL;
-  reconnect (h);
-}
-
-
-/**
- * Function called during #reconnect() to destroy
- * all channels that are still open.
+ * Function called during #reconnect_cbk() to (re)open
+ * all ports that are still open.
  *
  * @param cls the `struct GNUNET_CADET_Handle`
- * @param cid chanenl ID
- * @param value a `struct GNUNET_CADET_Channel` to destroy
+ * @param id port ID
+ * @param value a `struct GNUNET_CADET_Channel` to open
  * @return #GNUNET_OK (continue to iterate)
  */
 static int
-destroy_channel_on_reconnect_cb (void *cls,
-                                 uint32_t cid,
-                                 void *value)
+open_port_cb (void *cls,
+             const struct GNUNET_HashCode *id,
+             void *value)
 {
-  /* struct GNUNET_CADET_Handle *handle = cls; */
-  struct GNUNET_CADET_Channel *ch = value;
+  struct GNUNET_CADET_Handle *h = cls;
+  struct GNUNET_CADET_Port *port = value;
+  struct GNUNET_CADET_PortMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "Destroying channel due to reconnect\n");
-  destroy_channel (ch);
+  (void) id;
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
+  msg->port = port->id;
+  GNUNET_MQ_send (h->mq,
+                  env);
   return GNUNET_OK;
 }
 
 
 /**
- * Reconnect to the service, retransmit all infomation to try to restore the
- * original state.
- *
- * @param h handle to the cadet
+ * Reconnect callback: tries to reconnect again after a failer previous
+ * reconnecttion
  *
- * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...)
+ * @param cls closure (cadet handle)
  */
 static void
-schedule_reconnect (struct GNUNET_CADET_Handle *h)
+reconnect_cbk (void *cls)
 {
-  if (NULL != h->reconnect_task)
-    return;
-  GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
-                                           &destroy_channel_on_reconnect_cb,
-                                           h);
-  h->reconnect_task
-    = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
-                                    &reconnect_cbk,
-                                    h);
+  struct GNUNET_CADET_Handle *h = cls;
+
+  h->reconnect_task = NULL;
   h->reconnect_time
     = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
+  reconnect (h);
+  GNUNET_CONTAINER_multihashmap_iterate (h->ports,
+                                         &open_port_cb,
+                                         h);
 }
 
 
@@ -555,15 +540,16 @@ cadet_mq_error_handler (void *cls,
 {
   struct GNUNET_CADET_Channel *ch = cls;
 
-  GNUNET_break (0);
   if (GNUNET_MQ_ERROR_NO_MATCH == error)
   {
     /* Got a message we did not understand, still try to continue! */
+    GNUNET_break_op (0);
     GNUNET_CADET_receive_done (ch);
   }
   else
   {
-    schedule_reconnect (ch->cadet);
+    GNUNET_break (0);
+    GNUNET_CADET_channel_destroy (ch);
   }
 }
 
@@ -581,6 +567,7 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
 {
   struct GNUNET_CADET_Channel *ch = impl_state;
 
+  (void) mq;
   GNUNET_assert (NULL != ch->pending_env);
   GNUNET_MQ_discard (ch->pending_env);
   ch->pending_env = NULL;
@@ -709,6 +696,7 @@ check_local_data (void *cls,
 {
   uint16_t size;
 
+  (void) cls;
   size = ntohs (message->header.size);
   if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size)
   {
@@ -805,6 +793,32 @@ handle_local_ack (void *cls,
 }
 
 
+/**
+ * Function called during #GNUNET_CADET_disconnect() to destroy
+ * all channels that are still open.
+ *
+ * @param cls the `struct GNUNET_CADET_Handle`
+ * @param cid chanenl ID
+ * @param value a `struct GNUNET_CADET_Channel` to destroy
+ * @return #GNUNET_OK (continue to iterate)
+ */
+static int
+destroy_channel_cb (void *cls,
+                    uint32_t cid,
+                    void *value)
+{
+  /* struct GNUNET_CADET_Handle *handle = cls; */
+  struct GNUNET_CADET_Channel *ch = value;
+
+  (void) cls;
+  (void) cid;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "Destroying channel due to GNUNET_CADET_disconnect()\n");
+  destroy_channel (ch);
+  return GNUNET_OK;
+}
+
+
 /**
  * Generic error handler, called with the appropriate error code and
  * the same closure specified at the creation of the message queue.
@@ -822,9 +836,14 @@ handle_mq_error (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "MQ ERROR: %u\n",
               error);
+  GNUNET_CONTAINER_multihashmap32_iterate (h->channels,
+                                           &destroy_channel_cb,
+                                           h);
   GNUNET_MQ_destroy (h->mq);
   h->mq = NULL;
-  reconnect (h);
+  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
+                                                   &reconnect_cbk,
+                                                   h);
 }
 
 
@@ -842,6 +861,7 @@ check_get_peers (void *cls,
 {
   size_t esize;
 
+  (void) cls;
   esize = ntohs (message->size);
   if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize)
     return GNUNET_OK;
@@ -895,11 +915,9 @@ check_get_peer (void *cls,
                 const struct GNUNET_CADET_LocalInfoPeer *message)
 {
   size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer);
-  const struct GNUNET_PeerIdentity *paths_array;
   size_t esize;
-  unsigned int epaths;
-  unsigned int peers;
 
+  (void) cls;
   esize = ntohs (message->header.size);
   if (esize < msize)
   {
@@ -911,10 +929,6 @@ check_get_peer (void *cls,
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity);
-  epaths = ntohs (message->paths);
-  paths_array = (const struct GNUNET_PeerIdentity *) &message[1];
-  
   return GNUNET_OK;
 }
 
@@ -1166,38 +1180,6 @@ reconnect (struct GNUNET_CADET_Handle *h)
                                  handlers,
                                  &handle_mq_error,
                                  h);
-  if (NULL == h->mq)
-  {
-    schedule_reconnect (h);
-    return;
-  }
-  h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
-}
-
-
-/**
- * Function called during #GNUNET_CADET_disconnect() to destroy
- * all channels that are still open.
- *
- * @param cls the `struct GNUNET_CADET_Handle`
- * @param cid chanenl ID
- * @param value a `struct GNUNET_CADET_Channel` to destroy
- * @return #GNUNET_OK (continue to iterate)
- */
-static int
-destroy_channel_cb (void *cls,
-                    uint32_t cid,
-                    void *value)
-{
-  /* struct GNUNET_CADET_Handle *handle = cls; */
-  struct GNUNET_CADET_Channel *ch = value;
-
-  (void) cls;
-  (void) cid;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "Destroying channel due to GNUNET_CADET_disconnect()\n");
-  destroy_channel (ch);
-  return GNUNET_OK;
 }
 
 
@@ -1219,6 +1201,7 @@ destroy_port_cb (void *cls,
   struct GNUNET_CADET_Port *port = value;
 
   (void) cls;
+  (void) id;
   /* This is a warning, the app should have cleanly closed all open ports */
   GNUNET_break (0);
   GNUNET_CADET_close_port (port);
@@ -1633,9 +1616,6 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
     return NULL;
   }
   h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
-  h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
-  h->reconnect_task = NULL;
-
   return h;
 }
 
@@ -1661,8 +1641,6 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
                         GNUNET_CADET_DisconnectEventHandler disconnects,
                         const struct GNUNET_MQ_MessageHandler *handlers)
 {
-  struct GNUNET_CADET_PortMessage *msg;
-  struct GNUNET_MQ_Envelope *env;
   struct GNUNET_CADET_Port *p;
 
   GNUNET_assert (NULL != connects);
@@ -1688,13 +1666,11 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h,
   p->window_changes = window_changes;
   p->disconnects = disconnects;
   p->handlers = GNUNET_MQ_copy_handlers (handlers);
-
-
-  env = GNUNET_MQ_msg (msg,
-                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
-  msg->port = p->id;
-  GNUNET_MQ_send (h->mq,
-                  env);
+  
+  GNUNET_assert (GNUNET_OK ==
+                open_port_cb (h,
+                              &p->id,
+                              p));
   return p;
 }
 
@@ -1753,7 +1729,8 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h,
                                           handlers,
                                           &cadet_mq_error_handler,
                                           ch);
-  GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
+  GNUNET_MQ_set_handlers_closure (ch->mq,
+                                 channel_cls);
 
   /* Request channel creation to service */
   env = GNUNET_MQ_msg (msg,