From: Christian Grothoff Date: Thu, 28 Jun 2018 08:24:09 +0000 (+0200) Subject: clean up MQ error handling in cadet_api X-Git-Tag: v0.11.0~328 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=aab6c1174f7868000b21738142a8b16e222d1835;p=oweals%2Fgnunet.git clean up MQ error handling in cadet_api --- diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index b019424f9..85a8be522 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -357,67 +357,52 @@ reconnect (struct GNUNET_CADET_Handle *h); /** - * Reconnect callback: tries to reconnect again after a failer previous - * reconnecttion - * - * @param cls closure (cadet handle) - */ -static void -reconnect_cbk (void *cls) -{ - struct GNUNET_CADET_Handle *h = cls; - - h->reconnect_task = NULL; - reconnect (h); -} - - -/** - * Function called during #reconnect() to destroy - * all channels that are still open. + * Function called during #reconnect_cbk() to (re)open + * all ports that are still open. * * @param cls the `struct GNUNET_CADET_Handle` - * @param cid chanenl ID - * @param value a `struct GNUNET_CADET_Channel` to destroy + * @param id port ID + * @param value a `struct GNUNET_CADET_Channel` to open * @return #GNUNET_OK (continue to iterate) */ static int -destroy_channel_on_reconnect_cb (void *cls, - uint32_t cid, - void *value) +open_port_cb (void *cls, + const struct GNUNET_HashCode *id, + void *value) { - /* struct GNUNET_CADET_Handle *handle = cls; */ - struct GNUNET_CADET_Channel *ch = value; + struct GNUNET_CADET_Handle *h = cls; + struct GNUNET_CADET_Port *port = value; + struct GNUNET_CADET_PortMessage *msg; + struct GNUNET_MQ_Envelope *env; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Destroying channel due to reconnect\n"); - destroy_channel (ch); + (void) id; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); + msg->port = port->id; + GNUNET_MQ_send (h->mq, + env); return GNUNET_OK; } /** - * Reconnect to the service, retransmit all infomation to try to restore the - * original state. - * - * @param h handle to the cadet + * Reconnect callback: tries to reconnect again after a failer previous + * reconnecttion * - * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) + * @param cls closure (cadet handle) */ static void -schedule_reconnect (struct GNUNET_CADET_Handle *h) +reconnect_cbk (void *cls) { - if (NULL != h->reconnect_task) - return; - GNUNET_CONTAINER_multihashmap32_iterate (h->channels, - &destroy_channel_on_reconnect_cb, - h); - h->reconnect_task - = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, - &reconnect_cbk, - h); + struct GNUNET_CADET_Handle *h = cls; + + h->reconnect_task = NULL; h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time); + reconnect (h); + GNUNET_CONTAINER_multihashmap_iterate (h->ports, + &open_port_cb, + h); } @@ -555,15 +540,16 @@ cadet_mq_error_handler (void *cls, { struct GNUNET_CADET_Channel *ch = cls; - GNUNET_break (0); if (GNUNET_MQ_ERROR_NO_MATCH == error) { /* Got a message we did not understand, still try to continue! */ + GNUNET_break_op (0); GNUNET_CADET_receive_done (ch); } else { - schedule_reconnect (ch->cadet); + GNUNET_break (0); + GNUNET_CADET_channel_destroy (ch); } } @@ -581,6 +567,7 @@ cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, { struct GNUNET_CADET_Channel *ch = impl_state; + (void) mq; GNUNET_assert (NULL != ch->pending_env); GNUNET_MQ_discard (ch->pending_env); ch->pending_env = NULL; @@ -709,6 +696,7 @@ check_local_data (void *cls, { uint16_t size; + (void) cls; size = ntohs (message->header.size); if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) { @@ -805,6 +793,32 @@ handle_local_ack (void *cls, } +/** + * Function called during #GNUNET_CADET_disconnect() to destroy + * all channels that are still open. + * + * @param cls the `struct GNUNET_CADET_Handle` + * @param cid chanenl ID + * @param value a `struct GNUNET_CADET_Channel` to destroy + * @return #GNUNET_OK (continue to iterate) + */ +static int +destroy_channel_cb (void *cls, + uint32_t cid, + void *value) +{ + /* struct GNUNET_CADET_Handle *handle = cls; */ + struct GNUNET_CADET_Channel *ch = value; + + (void) cls; + (void) cid; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Destroying channel due to GNUNET_CADET_disconnect()\n"); + destroy_channel (ch); + return GNUNET_OK; +} + + /** * Generic error handler, called with the appropriate error code and * the same closure specified at the creation of the message queue. @@ -822,9 +836,14 @@ handle_mq_error (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "MQ ERROR: %u\n", error); + GNUNET_CONTAINER_multihashmap32_iterate (h->channels, + &destroy_channel_cb, + h); GNUNET_MQ_destroy (h->mq); h->mq = NULL; - reconnect (h); + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, + &reconnect_cbk, + h); } @@ -842,6 +861,7 @@ check_get_peers (void *cls, { size_t esize; + (void) cls; esize = ntohs (message->size); if (sizeof (struct GNUNET_CADET_LocalInfoPeer) == esize) return GNUNET_OK; @@ -895,11 +915,9 @@ check_get_peer (void *cls, const struct GNUNET_CADET_LocalInfoPeer *message) { size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); - const struct GNUNET_PeerIdentity *paths_array; size_t esize; - unsigned int epaths; - unsigned int peers; + (void) cls; esize = ntohs (message->header.size); if (esize < msize) { @@ -911,10 +929,6 @@ check_get_peer (void *cls, GNUNET_break (0); return GNUNET_SYSERR; } - peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity); - epaths = ntohs (message->paths); - paths_array = (const struct GNUNET_PeerIdentity *) &message[1]; - return GNUNET_OK; } @@ -1166,38 +1180,6 @@ reconnect (struct GNUNET_CADET_Handle *h) handlers, &handle_mq_error, h); - if (NULL == h->mq) - { - schedule_reconnect (h); - return; - } - h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; -} - - -/** - * Function called during #GNUNET_CADET_disconnect() to destroy - * all channels that are still open. - * - * @param cls the `struct GNUNET_CADET_Handle` - * @param cid chanenl ID - * @param value a `struct GNUNET_CADET_Channel` to destroy - * @return #GNUNET_OK (continue to iterate) - */ -static int -destroy_channel_cb (void *cls, - uint32_t cid, - void *value) -{ - /* struct GNUNET_CADET_Handle *handle = cls; */ - struct GNUNET_CADET_Channel *ch = value; - - (void) cls; - (void) cid; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Destroying channel due to GNUNET_CADET_disconnect()\n"); - destroy_channel (ch); - return GNUNET_OK; } @@ -1219,6 +1201,7 @@ destroy_port_cb (void *cls, struct GNUNET_CADET_Port *port = value; (void) cls; + (void) id; /* This is a warning, the app should have cleanly closed all open ports */ GNUNET_break (0); GNUNET_CADET_close_port (port); @@ -1633,9 +1616,6 @@ GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) return NULL; } h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); - h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; - h->reconnect_task = NULL; - return h; } @@ -1661,8 +1641,6 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers) { - struct GNUNET_CADET_PortMessage *msg; - struct GNUNET_MQ_Envelope *env; struct GNUNET_CADET_Port *p; GNUNET_assert (NULL != connects); @@ -1688,13 +1666,11 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, p->window_changes = window_changes; p->disconnects = disconnects; p->handlers = GNUNET_MQ_copy_handlers (handlers); - - - env = GNUNET_MQ_msg (msg, - GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); - msg->port = p->id; - GNUNET_MQ_send (h->mq, - env); + + GNUNET_assert (GNUNET_OK == + open_port_cb (h, + &p->id, + p)); return p; } @@ -1753,7 +1729,8 @@ GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, handlers, &cadet_mq_error_handler, ch); - GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls); + GNUNET_MQ_set_handlers_closure (ch->mq, + channel_cls); /* Request channel creation to service */ env = GNUNET_MQ_msg (msg,