From 18197ec851b3a4f23b96e8ea9a6ba58ae3982bac Mon Sep 17 00:00:00 2001 From: Bart Polot Date: Tue, 31 Jan 2017 04:17:58 +0100 Subject: [PATCH] Implementation of port opening and handling in MQ --- src/cadet/cadet_api.c | 301 +++++++++++++++++++++++++++--------------- 1 file changed, 191 insertions(+), 110 deletions(-) diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 3491bd75f..3eaa78af8 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -280,14 +280,11 @@ struct GNUNET_CADET_Channel /** * Are we allowed to send to the service? * - * @deprecated + * @deprecated? */ unsigned int allow_send; - /****************************************************************************/ /***************************** MQ ************************************/ - /****************************************************************************/ - /** * Message Queue for the channel. */ @@ -330,6 +327,38 @@ struct GNUNET_CADET_Port * Closure for @a handler. */ void *cls; + + /***************************** MQ ************************************/ + + /** + * Port "number" + */ + struct GNUNET_HashCode id; + + /** + * Handler for incoming channels on this port + */ + GNUNET_CADET_ConnectEventHandler connects; + + /** + * Closure for @ref connects + */ + void * connects_cls; + + /** + * Window size change handler. + */ + GNUNET_CADET_WindowSizeEventHandler window_changes; + + /** + * Handler called when an incoming channel is destroyed.. + */ + GNUNET_CADET_DisconnectEventHandler disconnects; + + /** + * Payload handlers for incoming channels. + */ + const struct GNUNET_MQ_MessageHandler *handlers; }; @@ -553,6 +582,114 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th) } +/******************************************************************************/ +/*********************** MQ API CALLBACKS ****************************/ +/******************************************************************************/ + + +/** + * Implement sending functionality of a message queue for + * us sending messages to a peer. + * + * Encapsulates the payload message in a #GNUNET_CADET_LocalData message + * in order to label the message with the channel ID and send the + * encapsulated message to the service. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct GNUNET_CADET_Channel *ch = impl_state; + struct GNUNET_CADET_Handle *h = ch->cadet; + uint16_t msize; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_LocalData *cadet_msg; + + + if (NULL == h->mq) + { + /* We're currently reconnecting, pretend this worked */ + GNUNET_MQ_impl_send_continue (mq); + return; + } + + /* check message size for sanity */ + msize = ntohs (msg->size); + if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) + { + GNUNET_break (0); + GNUNET_MQ_impl_send_continue (mq); + return; + } + + env = GNUNET_MQ_msg_nested_mh (cadet_msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, + msg); + cadet_msg->ccn = ch->ccn; + GNUNET_MQ_send (h->mq, env); + GNUNET_MQ_impl_send_continue (mq); +} + + +/** + * Handle destruction of a message queue. Implementations must not + * free @a mq, but should take care of @a impl_state. + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation + */ +static void +cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct GNUNET_CADET_Channel *ch = impl_state; + + GNUNET_assert (mq == ch->mq); + ch->mq = NULL; +} + + +/** + * We had an error processing a message we forwarded from a peer to + * the CADET service. We should just complain about it but otherwise + * continue processing. + * + * @param cls closure + * @param error error code + */ +static void +cadet_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + GNUNET_break_op (0); +} + + +/** + * Implementation function that cancels the currently sent message. + * Should basically undo whatever #mq_send_impl() did. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ +static void +cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct GNUNET_CADET_Channel *ch = impl_state; + + LOG (GNUNET_ERROR_TYPE_WARNING, + "Cannot cancel mq message on channel %X of %p\n", + ch->ccn.channel_of_client, ch->cadet); + + GNUNET_break (0); +} + /******************************************************************************/ /*********************** RECEIVE HANDLERS ****************************/ @@ -627,15 +764,17 @@ handle_channel_created (void *cls, port = find_port (h, port_number); if (NULL == port) { + /* We could have closed the port but the service didn't know about it yet + * This is not an error. + */ struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; struct GNUNET_MQ_Envelope *env; GNUNET_break (0); LOG (GNUNET_ERROR_TYPE_DEBUG, - "No handler for incoming channel %X [%s]\n", + "No handler for incoming channel %X (on port %s, recently closed?)\n", ntohl (ccn.channel_of_client), GNUNET_h2s (port_number)); - /* FIXME: should disconnect instead, this is a serious error! */ env = GNUNET_MQ_msg (d_msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); d_msg->ccn = msg->ccn; @@ -651,17 +790,38 @@ handle_channel_created (void *cls, ch->ccn = ccn; ch->incoming_port = port; ch->options = ntohl (msg->opt); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming channel %X [%s] %p\n", ntohl (ccn.channel_of_client), GNUNET_h2s (port_number), ch); - ch->ctx = port->handler (port->cls, - ch, - &msg->peer, - port->hash, - ch->options); + + if (NULL != port->handler) + { + /** @deprecated */ + /* Old style API */ + ch->ctx = port->handler (port->cls, + ch, + &msg->peer, + port->hash, + ch->options); + } else { + /* MQ API */ + GNUNET_assert (NULL != port->connects); + ch->window_changes = port->window_changes; + ch->disconnects = port->disconnects; + ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, + &cadet_mq_destroy_impl, + &cadet_mq_cancel_impl, + ch, + port->handlers, + &cadet_mq_error_handler, + ch); + ch->ctx = port->connects (port->cadet->cls, + ch, + &msg->peer); + GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx); + } } @@ -2236,111 +2396,32 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers) { - return NULL; -} - - -/** - * Implement sending functionality of a message queue for - * us sending messages to a peer. - * - * Encapsulates the payload message in a #GNUNET_CADET_LocalData message - * in order to label the message with the channel ID and send the - * encapsulated message to the service. - * - * @param mq the message queue - * @param msg the message to send - * @param impl_state state of the implementation - */ -static void -cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) -{ - struct GNUNET_CADET_Channel *ch = impl_state; - struct GNUNET_CADET_Handle *h = ch->cadet; - uint16_t msize; + struct GNUNET_CADET_PortMessage *msg; struct GNUNET_MQ_Envelope *env; - struct GNUNET_CADET_LocalData *cadet_msg; + struct GNUNET_CADET_Port *p; + GNUNET_assert (NULL != connects); - if (NULL == h->mq) - { - /* We're currently reconnecting, pretend this worked */ - GNUNET_MQ_impl_send_continue (mq); - return; - } + p = GNUNET_new (struct GNUNET_CADET_Port); + p->cadet = h; + p->id = *port; + p->connects = connects; + p->cls = connects_cls; + p->window_changes = window_changes; + p->disconnects = disconnects; + p->handlers = handlers; - /* check message size for sanity */ - msize = ntohs (msg->size); - if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) - { - GNUNET_break (0); - GNUNET_MQ_impl_send_continue (mq); - return; - } + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (h->ports, + p->hash, + p, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - env = GNUNET_MQ_msg_nested_mh (cadet_msg, - GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, - msg); - cadet_msg->ccn = ch->ccn; + env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); + msg->port = p->id; GNUNET_MQ_send (h->mq, env); - GNUNET_MQ_impl_send_continue (mq); -} - -/** - * Handle destruction of a message queue. Implementations must not - * free @a mq, but should take care of @a impl_state. - * - * @param mq the message queue to destroy - * @param impl_state state of the implementation - */ -static void -cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct GNUNET_CADET_Channel *ch = impl_state; - - GNUNET_assert (mq == ch->mq); - ch->mq = NULL; -} - - -/** - * We had an error processing a message we forwarded from a peer to - * the CADET service. We should just complain about it but otherwise - * continue processing. - * - * @param cls closure - * @param error error code - */ -static void -cadet_mq_error_handler (void *cls, - enum GNUNET_MQ_Error error) -{ - GNUNET_break_op (0); -} - - -/** - * Implementation function that cancels the currently sent message. - * Should basically undo whatever #mq_send_impl() did. - * - * @param mq message queue - * @param impl_state state specific to the implementation - */ -static void -cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct GNUNET_CADET_Channel *ch = impl_state; - - LOG (GNUNET_ERROR_TYPE_WARNING, - "Cannot cancel mq message on channel %X of %p\n", - ch->ccn.channel_of_client, ch->cadet); - - GNUNET_break (0); + return p; } -- 2.25.1