X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fcadet%2Fcadet_api.c;h=decf473a9c11679bf0811eabcfb907e042ac5613;hb=8d71f909cb22fbf6774e4042309a8eb133af3bfc;hp=1ca8bad9d78c650962c24ad1122e5581b2b43eaf;hpb=caa4196867da8684c5b1fd2e747ecb9d21cbba8b;p=oweals%2Fgnunet.git diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c index 1ca8bad9d..decf473a9 100644 --- a/src/cadet/cadet_api.c +++ b/src/cadet/cadet_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2011 GNUnet e.V. + Copyright (C) 2011, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -19,78 +19,25 @@ */ /** * @file cadet/cadet_api.c - * @brief cadet api: client implementation of new cadet service + * @brief cadet api: client implementation of cadet service * @author Bartlomiej Polot + * @author Christian Grothoff */ - #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_constants.h" #include "gnunet_cadet_service.h" #include "cadet.h" #include "cadet_protocol.h" #define LOG(kind,...) GNUNET_log_from (kind, "cadet-api",__VA_ARGS__) -#define DATA_OVERHEAD sizeof(struct GNUNET_CADET_LocalData) - -/******************************************************************************/ -/************************ DATA STRUCTURES ****************************/ -/******************************************************************************/ /** - * Transmission queue to the service + * Ugly legacy hack. */ -struct GNUNET_CADET_TransmitHandle +union CadetInfoCB { - /** - * Double Linked list - */ - struct GNUNET_CADET_TransmitHandle *next; - - /** - * Double Linked list - */ - struct GNUNET_CADET_TransmitHandle *prev; - - /** - * Channel this message is sent on / for (may be NULL for control messages). - */ - struct GNUNET_CADET_Channel *channel; - - /** - * Callback to obtain the message to transmit, or NULL if we - * got the message in 'data'. Notice that messages built - * by 'notify' need to be encapsulated with information about - * the 'target'. - */ - GNUNET_CONNECTION_TransmitReadyNotify notify; - - /** - * Closure for 'notify' - */ - void *notify_cls; - - /** - * How long is this message valid. Once the timeout has been - * reached, the message must no longer be sent. If this - * is a message with a 'notify' callback set, the 'notify' - * function should be called with 'buf' NULL and size 0. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * Task triggering a timeout, can be NO_TASK if the timeout is FOREVER. - */ - struct GNUNET_SCHEDULER_Task * timeout_task; - - /** - * Size of 'data' -- or the desired size of 'notify' if 'data' is NULL. - */ - size_t size; -}; - -union CadetInfoCB { - /** * Channel callback. */ @@ -123,87 +70,35 @@ union CadetInfoCB { */ struct GNUNET_CADET_Handle { - - /** - * Handle to the server connection, to send messages later - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Set of handlers used for processing incoming messages in the channels - */ - const struct GNUNET_CADET_MessageHandler *message_handlers; - /** - * Number of handlers in the handlers array. + * Message queue. */ - unsigned int n_handlers; + struct GNUNET_MQ_Handle *mq; /** * Ports open. */ struct GNUNET_CONTAINER_MultiHashMap *ports; - /** - * Double linked list of the channels this client is connected to, head. - */ - struct GNUNET_CADET_Channel *channels_head; - - /** - * Double linked list of the channels this client is connected to, tail. - */ - struct GNUNET_CADET_Channel *channels_tail; - - /** - * Callback for inbound channel disconnection - */ - GNUNET_CADET_ChannelEndHandler *cleaner; - - /** - * Handle to cancel pending transmissions in case of disconnection - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * Closure for all the handlers given by the client - */ - void *cls; - - /** - * Messages to send to the service, head. - */ - struct GNUNET_CADET_TransmitHandle *th_head; - - /** - * Messages to send to the service, tail. - */ - struct GNUNET_CADET_TransmitHandle *th_tail; - - /** - * chid of the next channel to create (to avoid reusing IDs often) - */ - CADET_ChannelNumber next_chid; - - /** - * Have we started the task to receive messages from the service - * yet? We do this after we send the 'CADET_LOCAL_CONNECT' message. - */ - int in_receive; + /** + * Channels open. + */ + struct GNUNET_CONTAINER_MultiHashMap32 *channels; /** - * Configuration given by the client, in case of reconnection + * child of the next channel to create (to avoid reusing IDs often) */ - const struct GNUNET_CONFIGURATION_Handle *cfg; + struct GNUNET_CADET_ClientChannelNumber next_ccn; /** - * Time to the next reconnect in case one reconnect fails + * Configuration given by the client, in case of reconnection */ - struct GNUNET_TIME_Relative reconnect_time; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Task for trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * Callback for an info task (only one active at a time). @@ -214,23 +109,12 @@ struct GNUNET_CADET_Handle * Info callback closure for @c info_cb. */ void *info_cls; -}; - - -/** - * Description of a peer - */ -struct GNUNET_CADET_Peer -{ - /** - * ID of the peer in short form - */ - GNUNET_PEER_Id id; /** - * Channel this peer belongs to + * Time to the next reconnect in case one reconnect fails */ - struct GNUNET_CADET_Channel *t; + struct GNUNET_TIME_Relative reconnect_time; + }; @@ -239,138 +123,119 @@ struct GNUNET_CADET_Peer */ struct GNUNET_CADET_Channel { - /** - * DLL next - */ - struct GNUNET_CADET_Channel *next; - /** - * DLL prev - */ - struct GNUNET_CADET_Channel *prev; + /** + * Other end of the channel. + */ + struct GNUNET_PeerIdentity peer; - /** - * Handle to the cadet this channel belongs to - */ + /** + * Handle to the cadet this channel belongs to + */ struct GNUNET_CADET_Handle *cadet; - /** - * Local ID of the channel - */ - CADET_ChannelNumber chid; + /** + * Channel's port, if incoming. + */ + struct GNUNET_CADET_Port *incoming_port; - /** - * Channel's port, if any. - */ - struct GNUNET_CADET_Port *port; + /** + * Any data the caller wants to put in here, used for the + * various callbacks (@e disconnects, @e window_changes, handlers). + */ + void *ctx; - /** - * Other end of the channel. - */ - GNUNET_PEER_Id peer; + /** + * Message Queue for the channel (which we are implementing). + */ + struct GNUNET_MQ_Handle *mq; /** - * Any data the caller wants to put in here + * Task to allow mq to send more traffic. */ - void *ctx; + struct GNUNET_SCHEDULER_Task *mq_cont; - /** - * Size of packet queued in this channel - */ - unsigned int packet_size; + /** + * Pending envelope with a message to be transmitted to the + * service as soon as we are allowed to. Should only be + * non-NULL if @e allow_send is 0. + */ + struct GNUNET_MQ_Envelope *pending_env; - /** - * Channel options: reliability, etc. - */ + /** + * Window change handler. + */ + GNUNET_CADET_WindowSizeEventHandler window_changes; + + /** + * Disconnect handler. + */ + GNUNET_CADET_DisconnectEventHandler disconnects; + + /** + * Local ID of the channel, #GNUNET_CADET_LOCAL_CHANNEL_ID_CLI bit is set if outbound. + */ + struct GNUNET_CADET_ClientChannelNumber ccn; + + /** + * Channel options: reliability, etc. + */ enum GNUNET_CADET_ChannelOption options; - /** - * Are we allowed to send to the service? - */ - int allow_send; + /** + * How many messages are we allowed to send to the service right now? + */ + unsigned int allow_send; }; + /** * Opaque handle to a port. */ struct GNUNET_CADET_Port { - /** - * Handle to the CADET session this port belongs to. - */ - struct GNUNET_CADET_Handle *cadet; - /** - * Port ID. - */ - struct GNUNET_HashCode *hash; + /** + * Port "number" + */ + struct GNUNET_HashCode id; - /** - * Callback handler for incoming channels on this port. - */ - GNUNET_CADET_InboundChannelNotificationHandler *handler; + /** + * Handle to the CADET session this port belongs to. + */ + struct GNUNET_CADET_Handle *cadet; - /** - * Closure for @a handler. - */ + /** + * Closure for @a handler. + */ void *cls; -}; - -/** - * Implementation state for cadet's message queue. - */ -struct CadetMQState -{ /** - * The current transmit handle, or NULL - * if no transmit is active. + * Handler for incoming channels on this port */ - struct GNUNET_CADET_TransmitHandle *th; + GNUNET_CADET_ConnectEventHandler connects; /** - * Channel to send the data over. + * Closure for @ref connects */ - struct GNUNET_CADET_Channel *channel; -}; - - -/******************************************************************************/ -/*********************** DECLARATIONS *************************/ -/******************************************************************************/ - -/** - * Function called to send a message to the service. - * "buf" will be NULL and "size" zero if the socket was closed for writing in - * the meantime. - * - * @param cls closure, the cadet handle - * @param size number of bytes available in buf - * @param buf where the callee should write the connect message - * @return number of bytes written to buf - */ -static size_t -send_callback (void *cls, size_t size, void *buf); + void *connects_cls; + /** + * Window size change handler. + */ + GNUNET_CADET_WindowSizeEventHandler window_changes; -/******************************************************************************/ -/*********************** AUXILIARY FUNCTIONS *************************/ -/******************************************************************************/ + /** + * Handler called when an incoming channel is destroyed. + */ + GNUNET_CADET_DisconnectEventHandler disconnects; -/** - * Check if transmission is a payload packet. - * - * @param th Transmission handle. - * - * @return #GNUNET_YES if it is a payload packet, - * #GNUNET_NO if it is a cadet management packet. - */ -static int -th_is_payload (struct GNUNET_CADET_TransmitHandle *th) -{ - return (th->notify != NULL) ? GNUNET_YES : GNUNET_NO; -} + /** + * Payload handlers for incoming channels. + */ + struct GNUNET_MQ_MessageHandler *handlers; +}; /** @@ -378,63 +243,30 @@ th_is_payload (struct GNUNET_CADET_TransmitHandle *th) * * @param h CADET handle. * @param hash HashCode for the port number. - * * @return The port handle if known, NULL otherwise. */ static struct GNUNET_CADET_Port * find_port (const struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *hash) { - struct GNUNET_CADET_Port *p; - - p = GNUNET_CONTAINER_multihashmap_get (h->ports, hash); - - return p; -} - -/** - * Check whether there is any message ready in the queue and find the size. - * - * @param h Cadet handle. - * - * @return The size of the first ready message in the queue, including overhead. - * 0 if there is none. - */ -static size_t -message_ready_size (struct GNUNET_CADET_Handle *h) -{ - struct GNUNET_CADET_TransmitHandle *th; - struct GNUNET_CADET_Channel *ch; - - for (th = h->th_head; NULL != th; th = th->next) - { - ch = th->channel; - if (GNUNET_NO == th_is_payload (th) || GNUNET_YES == ch->allow_send) - return th->size; - } - return 0; + return GNUNET_CONTAINER_multihashmap_get (h->ports, + hash); } /** * Get the channel handler for the channel specified by id from the given handle + * * @param h Cadet handle - * @param chid ID of the wanted channel + * @param ccn ID of the wanted channel * @return handle to the required channel or NULL if not found */ static struct GNUNET_CADET_Channel * -retrieve_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid) +find_channel (struct GNUNET_CADET_Handle *h, + struct GNUNET_CADET_ClientChannelNumber ccn) { - struct GNUNET_CADET_Channel *ch; - - ch = h->channels_head; - while (ch != NULL) - { - if (ch->chid == chid) - return ch; - ch = ch->next; - } - return NULL; + return GNUNET_CONTAINER_multihashmap32_get (h->channels, + ntohl (ccn.channel_of_client)); } @@ -442,33 +274,37 @@ retrieve_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid) * Create a new channel and insert it in the channel list of the cadet handle * * @param h Cadet handle - * @param chid Desired chid of the channel, 0 to assign one automatically. - * + * @param ccnp pointer to desired ccn of the channel, NULL to assign one automatically. * @return Handle to the created channel. */ static struct GNUNET_CADET_Channel * -create_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid) +create_channel (struct GNUNET_CADET_Handle *h, + const struct GNUNET_CADET_ClientChannelNumber *ccnp) { struct GNUNET_CADET_Channel *ch; + struct GNUNET_CADET_ClientChannelNumber ccn; ch = GNUNET_new (struct GNUNET_CADET_Channel); - GNUNET_CONTAINER_DLL_insert (h->channels_head, h->channels_tail, ch); ch->cadet = h; - if (0 == chid) + if (NULL == ccnp) { - ch->chid = h->next_chid; - while (NULL != retrieve_channel (h, h->next_chid)) - { - h->next_chid++; - h->next_chid &= ~GNUNET_CADET_LOCAL_CHANNEL_ID_SERV; - h->next_chid |= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI; - } + while (NULL != + find_channel (h, + h->next_ccn)) + h->next_ccn.channel_of_client + = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI | (1 + ntohl (h->next_ccn.channel_of_client))); + ccn = h->next_ccn; } else { - ch->chid = chid; + ccn = *ccnp; } - ch->allow_send = GNUNET_NO; + ch->ccn = ccn; + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap32_put (h->channels, + ntohl (ch->ccn.channel_of_client), + ch, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); return ch; } @@ -482,255 +318,275 @@ create_channel (struct GNUNET_CADET_Handle *h, CADET_ChannelNumber chid) * * @param ch Pointer to the channel. * @param call_cleaner Whether to call the cleaner handler. - * - * @return Handle to the required channel or NULL if not found. */ static void -destroy_channel (struct GNUNET_CADET_Channel *ch, int call_cleaner) +destroy_channel (struct GNUNET_CADET_Channel *ch) { - struct GNUNET_CADET_Handle *h; - struct GNUNET_CADET_TransmitHandle *th; - struct GNUNET_CADET_TransmitHandle *next; - - LOG (GNUNET_ERROR_TYPE_DEBUG, " destroy_channel %X\n", ch->chid); + struct GNUNET_CADET_Handle *h = ch->cadet; - if (NULL == ch) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Destroying channel %X of %p\n", + ch->ccn, + h); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap32_remove (h->channels, + ntohl (ch->ccn.channel_of_client), + ch)); + if (NULL != ch->mq_cont) { - GNUNET_break (0); - return; + GNUNET_SCHEDULER_cancel (ch->mq_cont); + ch->mq_cont = NULL; } - h = ch->cadet; - - GNUNET_CONTAINER_DLL_remove (h->channels_head, h->channels_tail, ch); - /* signal channel destruction */ - if ( (NULL != h->cleaner) && (0 != ch->peer) && (GNUNET_YES == call_cleaner) ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, " calling cleaner\n"); - h->cleaner (h->cls, ch, ch->ctx); - } - - /* check that clients did not leave messages behind in the queue */ - for (th = h->th_head; NULL != th; th = next) - { - next = th->next; - if (th->channel != ch) - continue; - /* Clients should have aborted their requests already. - * Management traffic should be ok, as clients can't cancel that. - * If the service crashed and we are reconnecting, it's ok. - */ - GNUNET_break (GNUNET_NO == th_is_payload (th) - || GNUNET_NO == h->in_receive); - GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); - - /* clean up request */ - if (NULL != th->timeout_task) - GNUNET_SCHEDULER_cancel (th->timeout_task); - GNUNET_free (th); - } - - /* if there are no more pending requests with cadet service, cancel active request */ - /* Note: this should be unnecessary... */ - if ((0 == message_ready_size (h)) && (NULL != h->th)) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - - if (0 != ch->peer) - GNUNET_PEER_change_rc (ch->peer, -1); + if (NULL != ch->disconnects) + ch->disconnects (ch->ctx, + ch); + if (NULL != ch->pending_env) + GNUNET_MQ_discard (ch->pending_env); + GNUNET_MQ_destroy (ch->mq); GNUNET_free (ch); - return; } /** - * Notify client that the transmission has timed out + * Reconnect to the service, retransmit all infomation to try to restore the + * original state. * - * @param cls closure + * @param h handle to the cadet */ static void -timeout_transmission (void *cls) -{ - struct GNUNET_CADET_TransmitHandle *th = cls; - struct GNUNET_CADET_Handle *cadet = th->channel->cadet; - - th->timeout_task = NULL; - th->channel->packet_size = 0; - GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th); - if (GNUNET_YES == th_is_payload (th)) - GNUNET_break (0 == th->notify (th->notify_cls, 0, NULL)); - GNUNET_free (th); - if ((0 == message_ready_size (cadet)) && (NULL != cadet->th)) - { - /* nothing ready to transmit, no point in asking for transmission */ - GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th); - cadet->th = NULL; - } -} +reconnect (struct GNUNET_CADET_Handle *h); /** - * Add a transmit handle to the transmission queue and set the - * timeout if needed. + * Reconnect callback: tries to reconnect again after a failer previous + * reconnecttion * - * @param h cadet handle with the queue head and tail - * @param th handle to the packet to be transmitted + * @param cls closure (cadet handle) */ static void -add_to_queue (struct GNUNET_CADET_Handle *h, - struct GNUNET_CADET_TransmitHandle *th) +reconnect_cbk (void *cls) { - GNUNET_CONTAINER_DLL_insert_tail (h->th_head, h->th_tail, th); - if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us == th->timeout.abs_value_us) - return; - th->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (th->timeout), &timeout_transmission, th); + struct GNUNET_CADET_Handle *h = cls; + + h->reconnect_task = NULL; + reconnect (h); } /** - * Auxiliary function to send an already constructed packet to the service. - * Takes care of creating a new queue element, copying the message and - * calling the tmt_rdy function if necessary. + * Function called during #reconnect() to destroy + * all channels that are still open. * - * @param h cadet handle - * @param msg message to transmit - * @param channel channel this send is related to (NULL if N/A) + * @param cls the `struct GNUNET_CADET_Handle` + * @param cid chanenl ID + * @param value a `struct GNUNET_CADET_Channel` to destroy + * @return #GNUNET_OK (continue to iterate) */ -static void -send_packet (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *msg, - struct GNUNET_CADET_Channel *channel); +static int +destroy_channel_on_reconnect_cb (void *cls, + uint32_t cid, + void *value) +{ + /* struct GNUNET_CADET_Handle *handle = cls; */ + struct GNUNET_CADET_Channel *ch = value; + + destroy_channel (ch); + return GNUNET_OK; +} /** - * Send an ack on the channel to confirm the processing of a message. + * Reconnect to the service, retransmit all infomation to try to restore the + * original state. * - * @param ch Channel on which to send the ACK. + * @param h handle to the cadet + * + * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) */ static void -send_ack (struct GNUNET_CADET_Channel *ch) +schedule_reconnect (struct GNUNET_CADET_Handle *h) { - struct GNUNET_CADET_LocalAck msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK on channel %X\n", ch->chid); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); - msg.header.size = htons (sizeof (msg)); - msg.channel_id = htonl (ch->chid); - - send_packet (ch->cadet, &msg.header, ch); - return; + if (NULL != h->reconnect_task) + return; + GNUNET_CONTAINER_multihashmap32_iterate (h->channels, + &destroy_channel_on_reconnect_cb, + h); + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, + &reconnect_cbk, + h); + h->reconnect_time + = GNUNET_TIME_STD_BACKOFF (h->reconnect_time); } - /** - * Reconnect callback: tries to reconnect again after a failer previous - * reconnection. + * Notify the application about a change in the window size (if needed). * - * @param cls closure (cadet handle) + * @param ch Channel to notify about. */ static void -reconnect_cbk (void *cls); +notify_window_size (struct GNUNET_CADET_Channel *ch) +{ + if (NULL != ch->window_changes) + ch->window_changes (ch->ctx, + ch, /* FIXME: remove 'ch'? */ + ch->allow_send); +} /** - * Reconnect to the service, retransmit all infomation to try to restore the - * original state. - * - * @param h handle to the cadet + * Transmit the next message from our queue. * - * @return GNUNET_YES in case of sucess, GNUNET_NO otherwise (service down...) + * @param cls Closure (channel whose mq to activate). */ -static int -do_reconnect (struct GNUNET_CADET_Handle *h) +static void +cadet_mq_send_now (void *cls) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "******* RECONNECT *******\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "******** on %p *******\n", h); - LOG (GNUNET_ERROR_TYPE_DEBUG, "*****************************\n"); - - /* disconnect */ - if (NULL != h->th) + struct GNUNET_CADET_Channel *ch = cls; + struct GNUNET_MQ_Envelope *env = ch->pending_env; + + ch->mq_cont = NULL; + if (0 == ch->allow_send) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; + /* how did we get here? */ + GNUNET_break (0); + return; } - if (NULL != h->client) + if (NULL == env) { - GNUNET_CLIENT_disconnect (h->client); + /* how did we get here? */ + GNUNET_break (0); + return; } + ch->allow_send--; + ch->pending_env = NULL; + GNUNET_MQ_send (ch->cadet->mq, + env); + GNUNET_MQ_impl_send_continue (ch->mq); +} + - /* connect again */ - h->client = GNUNET_CLIENT_connect ("cadet", h->cfg); - if (h->client == NULL) +/** + * Implement sending functionality of a message queue for + * us sending messages to a peer. + * + * Encapsulates the payload message in a #GNUNET_CADET_LocalData message + * in order to label the message with the channel ID and send the + * encapsulated message to the service. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state) +{ + struct GNUNET_CADET_Channel *ch = impl_state; + struct GNUNET_CADET_Handle *h = ch->cadet; + uint16_t msize; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_LocalData *cadet_msg; + + if (NULL == h->mq) { - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, - &reconnect_cbk, h); - h->reconnect_time = - GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, - GNUNET_TIME_relative_multiply - (h->reconnect_time, 2)); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Next retry in %s\n", - GNUNET_STRINGS_relative_time_to_string (h->reconnect_time, - GNUNET_NO)); - GNUNET_break (0); - return GNUNET_NO; + /* We're currently reconnecting, pretend this worked */ + GNUNET_MQ_impl_send_continue (mq); + return; } - else + + /* check message size for sanity */ + msize = ntohs (msg->size); + if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE) { - h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; + GNUNET_break (0); + GNUNET_MQ_impl_send_continue (mq); + return; } - return GNUNET_YES; + env = GNUNET_MQ_msg_nested_mh (cadet_msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, + msg); + cadet_msg->ccn = ch->ccn; + GNUNET_assert (NULL == ch->pending_env); + ch->pending_env = env; + if (0 < ch->allow_send) + ch->mq_cont + = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, + ch); } + /** - * Reconnect callback: tries to reconnect again after a failer previous - * reconnecttion + * Handle destruction of a message queue. Implementations must not + * free @a mq, but should take care of @a impl_state. * - * @param cls closure (cadet handle) + * @param mq the message queue to destroy + * @param impl_state state of the implementation */ static void -reconnect_cbk (void *cls) +cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) { - struct GNUNET_CADET_Handle *h = cls; + struct GNUNET_CADET_Channel *ch = impl_state; - h->reconnect_task = NULL; - do_reconnect (h); + GNUNET_assert (mq == ch->mq); + ch->mq = NULL; } /** - * Reconnect to the service, retransmit all infomation to try to restore the - * original state. - * - * @param h handle to the cadet + * We had an error processing a message we forwarded from a peer to + * the CADET service. We should just complain about it but otherwise + * continue processing. * - * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service down...) + * @param cls closure with our `struct GNUNET_CADET_Channel` + * @param error error code */ static void -reconnect (struct GNUNET_CADET_Handle *h) +cadet_mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - struct GNUNET_CADET_Channel *ch; + struct GNUNET_CADET_Channel *ch = cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Requested RECONNECT, destroying all channels\n"); - h->in_receive = GNUNET_NO; - for (ch = h->channels_head; NULL != ch; ch = h->channels_head) - destroy_channel (ch, GNUNET_YES); - if (NULL == h->reconnect_task) - h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time, - &reconnect_cbk, h); + GNUNET_break (0); + if (GNUNET_MQ_ERROR_NO_MATCH == error) + { + /* Got a message we did not understand, still try to continue! */ + GNUNET_CADET_receive_done (ch); + } + else + { + schedule_reconnect (ch->cadet); + } } -/******************************************************************************/ -/*********************** RECEIVE HANDLERS ****************************/ -/******************************************************************************/ +/** + * Implementation function that cancels the currently sent message. + * Should basically undo whatever #mq_send_impl() did. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ +static void +cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) +{ + struct GNUNET_CADET_Channel *ch = impl_state; + + GNUNET_assert (NULL != ch->pending_env); + GNUNET_MQ_discard (ch->pending_env); + ch->pending_env = NULL; + if (NULL != ch->mq_cont) + { + GNUNET_SCHEDULER_cancel (ch->mq_cont); + ch->mq_cont = NULL; + } +} + /** * Process the new channel notification and add it to the channels in the handle @@ -739,55 +595,71 @@ reconnect (struct GNUNET_CADET_Handle *h) * @param msg A message with the details of the new incoming channel */ static void -process_channel_created (struct GNUNET_CADET_Handle *h, - const struct GNUNET_CADET_ChannelCreateMessage *msg) +handle_channel_created (void *cls, + const struct GNUNET_CADET_LocalChannelCreateMessage *msg) { + struct GNUNET_CADET_Handle *h = cls; struct GNUNET_CADET_Channel *ch; struct GNUNET_CADET_Port *port; const struct GNUNET_HashCode *port_number; - CADET_ChannelNumber chid; + struct GNUNET_CADET_ClientChannelNumber ccn; - chid = ntohl (msg->channel_id); + ccn = msg->ccn; port_number = &msg->port; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating incoming channel %X [%s]\n", - chid, GNUNET_h2s (port_number)); - if (chid < GNUNET_CADET_LOCAL_CHANNEL_ID_SERV) + if (ntohl (ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) { GNUNET_break (0); return; } - port = find_port (h, port_number); - if (NULL != port) - { - void *ctx; - - ch = create_channel (h, chid); - ch->allow_send = GNUNET_NO; - ch->peer = GNUNET_PEER_intern (&msg->peer); - ch->cadet = h; - ch->chid = chid; - ch->port = port; - ch->options = ntohl (msg->opt); - - LOG (GNUNET_ERROR_TYPE_DEBUG, " created channel %p\n", ch); - ctx = port->handler (port->cls, ch, &msg->peer, port->hash, ch->options); - if (NULL != ctx) - ch->ctx = ctx; - LOG (GNUNET_ERROR_TYPE_DEBUG, "User notified\n"); - } - else + port = find_port (h, + port_number); + if (NULL == port) { - struct GNUNET_CADET_ChannelDestroyMessage d_msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "No handler for incoming channels\n"); - - d_msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); - d_msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage)); - d_msg.channel_id = msg->channel_id; + /* We could have closed the port but the service didn't know about it yet + * This is not an error. + */ + struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg; + struct GNUNET_MQ_Envelope *env; - send_packet (h, &d_msg.header, NULL); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "No handler for incoming channel %X (on port %s, recently closed?)\n", + ntohl (ccn.channel_of_client), + GNUNET_h2s (port_number)); + env = GNUNET_MQ_msg (d_msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); + d_msg->ccn = msg->ccn; + GNUNET_MQ_send (h->mq, + env); + return; } - return; + + ch = create_channel (h, + &ccn); + ch->peer = msg->peer; + ch->cadet = h; + ch->incoming_port = port; + ch->options = ntohl (msg->opt); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating incoming channel %X [%s] %p\n", + ntohl (ccn.channel_of_client), + GNUNET_h2s (port_number), + ch); + + GNUNET_assert (NULL != port->connects); + ch->window_changes = port->window_changes; + ch->disconnects = port->disconnects; + ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, + &cadet_mq_destroy_impl, + &cadet_mq_cancel_impl, + ch, + port->handlers, + &cadet_mq_error_handler, + ch); + ch->ctx = port->connects (port->cls, + ch, + &msg->peer); + GNUNET_MQ_set_handlers_closure (ch->mq, + ch->ctx); } @@ -798,85 +670,89 @@ process_channel_created (struct GNUNET_CADET_Handle *h, * @param msg A message with the details of the channel being destroyed */ static void -process_channel_destroy (struct GNUNET_CADET_Handle *h, - const struct GNUNET_CADET_ChannelDestroyMessage *msg) +handle_channel_destroy (void *cls, + const struct GNUNET_CADET_LocalChannelDestroyMessage *msg) { + struct GNUNET_CADET_Handle *h = cls; struct GNUNET_CADET_Channel *ch; - CADET_ChannelNumber chid; - - chid = ntohl (msg->channel_id); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Channel %X Destroy from service\n", chid); - ch = retrieve_channel (h, chid); + ch = find_channel (h, + msg->ccn); if (NULL == ch) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X unknown\n", chid); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received channel destroy for unknown channel %X from CADET service (recently close?)\n", + ntohl (msg->ccn.channel_of_client)); return; } - destroy_channel (ch, GNUNET_YES); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received channel destroy for channel %X from CADET service\n", + ntohl (msg->ccn.channel_of_client)); + destroy_channel (ch); +} + + +/** + * Check that message received from CADET service is well-formed. + * + * @param cls the `struct GNUNET_CADET_Handle` + * @param message the message we got + * @return #GNUNET_OK if the message is well-formed, + * #GNUNET_SYSERR otherwise + */ +static int +check_local_data (void *cls, + const struct GNUNET_CADET_LocalData *message) +{ + uint16_t size; + + size = ntohs (message->header.size); + if (sizeof (*message) + sizeof (struct GNUNET_MessageHeader) > size) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** * Process the incoming data packets, call appropriate handlers. * - * @param h The cadet handle - * @param message A message encapsulating the data + * @param h The cadet handle + * @param message A message encapsulating the data */ static void -process_incoming_data (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +handle_local_data (void *cls, + const struct GNUNET_CADET_LocalData *message) { + struct GNUNET_CADET_Handle *h = cls; const struct GNUNET_MessageHeader *payload; - const struct GNUNET_CADET_MessageHandler *handler; - struct GNUNET_CADET_LocalData *dmsg; struct GNUNET_CADET_Channel *ch; - size_t size; - unsigned int i; uint16_t type; + int fwd; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Got a data message!\n"); - dmsg = (struct GNUNET_CADET_LocalData *) message; - ch = retrieve_channel (h, ntohl (dmsg->id)); + ch = find_channel (h, + message->ccn); if (NULL == ch) { - GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Unknown channel %X for incoming data (recently closed?)\n", + ntohl (message->ccn.channel_of_client)); return; } - payload = (struct GNUNET_MessageHeader *) &dmsg[1]; - LOG (GNUNET_ERROR_TYPE_DEBUG, " %s data on channel %s [%X]\n", - GC_f2s (ch->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV), - GNUNET_i2s (GNUNET_PEER_resolve2 (ch->peer)), ntohl (dmsg->id)); - - size = ntohs (message->size); - LOG (GNUNET_ERROR_TYPE_DEBUG, " %u bytes\n", size); - + payload = (const struct GNUNET_MessageHeader *) &message[1]; type = ntohs (payload->type); - size = ntohs (payload->size); - LOG (GNUNET_ERROR_TYPE_DEBUG, " payload type %s\n", GC_m2s (type)); - for (i = 0; i < h->n_handlers; i++) - { - handler = &h->message_handlers[i]; - LOG (GNUNET_ERROR_TYPE_DEBUG, " checking handler for type %u\n", - handler->type); - if (handler->type == type) - { - if (GNUNET_OK != - handler->callback (h->cls, ch, &ch->ctx, payload)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "callback caused disconnection\n"); - GNUNET_CADET_channel_destroy (ch); - return; - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "callback completed successfully\n"); - return; - } - } - } + fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got a %s data on channel %s [%X] of type %u\n", + fwd ? "FWD" : "BWD", + GNUNET_i2s (&ch->peer), + ntohl (message->ccn.channel_of_client), + type); + GNUNET_MQ_inject_message (ch->mq, + payload); } @@ -888,324 +764,240 @@ process_incoming_data (struct GNUNET_CADET_Handle *h, * @param message Message itself. */ static void -process_ack (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +handle_local_ack (void *cls, + const struct GNUNET_CADET_LocalAck *message) { - struct GNUNET_CADET_LocalAck *msg; + struct GNUNET_CADET_Handle *h = cls; struct GNUNET_CADET_Channel *ch; - CADET_ChannelNumber chid; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Got an ACK!\n"); - msg = (struct GNUNET_CADET_LocalAck *) message; - chid = ntohl (msg->channel_id); - ch = retrieve_channel (h, chid); + ch = find_channel (h, + message->ccn); if (NULL == ch) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "ACK on unknown channel %X\n", chid); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "ACK on unknown channel %X\n", + ntohl (message->ccn.channel_of_client)); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X!\n", ch->chid); - ch->allow_send = GNUNET_YES; - if (NULL == h->th && 0 < ch->packet_size) + ch->allow_send++; + if (NULL == ch->pending_env) { - LOG (GNUNET_ERROR_TYPE_DEBUG, " tmt rdy was NULL, requesting!\n"); - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, ch->packet_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, &send_callback, h); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got an ACK on mq channel %X, allow send now %u!\n", + ntohl (ch->ccn.channel_of_client), + ch->allow_send); + notify_window_size (ch); + return; } + if (NULL != ch->mq_cont) + return; /* already working on it! */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got an ACK on mq channel %X, sending pending message!\n", + ntohl (ch->ccn.channel_of_client)); + ch->mq_cont + = GNUNET_SCHEDULER_add_now (&cadet_mq_send_now, + ch); } -/* - * Process a local reply about info on all channels, pass info to the user. +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param h Cadet handle. - * @param message Message itself. + * @param cls closure, a `struct GNUNET_CORE_Handle *` + * @param error error code */ -// static void -// process_get_channels (struct GNUNET_CADET_Handle *h, -// const struct GNUNET_MessageHeader *message) -// { -// struct GNUNET_CADET_LocalInfo *msg; -// -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Get Channels messasge received\n"); -// -// if (NULL == h->channels_cb) -// { -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " ignored\n"); -// return; -// } -// -// msg = (struct GNUNET_CADET_LocalInfo *) message; -// if (ntohs (message->size) != -// (sizeof (struct GNUNET_CADET_LocalInfo) + -// sizeof (struct GNUNET_PeerIdentity))) -// { -// GNUNET_break_op (0); -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -// "Get channels message: size %hu - expected %u\n", -// ntohs (message->size), -// sizeof (struct GNUNET_CADET_LocalInfo)); -// return; -// } -// h->channels_cb (h->channels_cls, -// ntohl (msg->channel_id), -// &msg->owner, -// &msg->destination); -// } +static void +handle_mq_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_CADET_Handle *h = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "MQ ERROR: %u\n", + error); + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + reconnect (h); +} -/* - * Process a local monitor_channel reply, pass info to the user. +/** + * Process a local reply about info on all tunnels, pass info to the user. * - * @param h Cadet handle. - * @param message Message itself. + * @param cls Closure (Cadet handle). + * @param msg Message itself. */ -// static void -// process_show_channel (struct GNUNET_CADET_Handle *h, -// const struct GNUNET_MessageHeader *message) -// { -// struct GNUNET_CADET_LocalInfo *msg; -// size_t esize; -// -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Show Channel messasge received\n"); -// -// if (NULL == h->channel_cb) -// { -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " ignored\n"); -// return; -// } -// -// /* Verify message sanity */ -// msg = (struct GNUNET_CADET_LocalInfo *) message; -// esize = sizeof (struct GNUNET_CADET_LocalInfo); -// if (ntohs (message->size) != esize) -// { -// GNUNET_break_op (0); -// GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -// "Show channel message: size %hu - expected %u\n", -// ntohs (message->size), -// esize); -// -// h->channel_cb (h->channel_cls, NULL, NULL); -// h->channel_cb = NULL; -// h->channel_cls = NULL; -// -// return; -// } -// -// h->channel_cb (h->channel_cls, -// &msg->destination, -// &msg->owner); -// } +static void +handle_get_peers (void *cls, + const struct GNUNET_CADET_LocalInfoPeer *msg) +{ + struct GNUNET_CADET_Handle *h = cls; + if (NULL == h->info_cb.peers_cb) + return; + h->info_cb.peers_cb (h->info_cls, + &msg->destination, + (int) ntohs (msg->tunnel), + (unsigned int) ntohs (msg->paths), + 0); +} /** - * Process a local reply about info on all tunnels, pass info to the user. + * Check that message received from CADET service is well-formed. * - * @param h Cadet handle. - * @param message Message itself. + * @param cls the `struct GNUNET_CADET_Handle` + * @param message the message we got + * @return #GNUNET_OK if the message is well-formed, + * #GNUNET_SYSERR otherwise */ -static void -process_get_peers (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +static int +check_get_peer (void *cls, + const struct GNUNET_CADET_LocalInfoPeer *message) { - struct GNUNET_CADET_LocalInfoPeer *msg; - uint16_t size; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Peer messasge received\n"); + size_t msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); + const struct GNUNET_PeerIdentity *paths_array; + size_t esize; + unsigned int epaths; + unsigned int paths; + unsigned int peers; - if (NULL == h->info_cb.peers_cb) + esize = ntohs (message->header.size); + if (esize < msize) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ignored\n"); - return; + GNUNET_break (0); + return GNUNET_SYSERR; } - - size = ntohs (message->size); - if (sizeof (struct GNUNET_CADET_LocalInfoPeer) > size) + if (0 != ((esize - msize) % sizeof (struct GNUNET_PeerIdentity))) { - h->info_cb.peers_cb (h->info_cls, NULL, -1, 0, 0); - h->info_cb.peers_cb = NULL; - h->info_cls = NULL; - return; + GNUNET_break (0); + return GNUNET_SYSERR; } - - msg = (struct GNUNET_CADET_LocalInfoPeer *) message; - h->info_cb.peers_cb (h->info_cls, &msg->destination, - (int) ntohs (msg->tunnel), - (unsigned int ) ntohs (msg->paths), - 0); + peers = (esize - msize) / sizeof (struct GNUNET_PeerIdentity); + epaths = ntohs (message->paths); + paths_array = (const struct GNUNET_PeerIdentity *) &message[1]; + paths = 0; + for (unsigned int i = 0; i < peers; i++) + if (0 == memcmp (&paths_array[i], + &message->destination, + sizeof (struct GNUNET_PeerIdentity))) + paths++; + if (paths != epaths) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** * Process a local peer info reply, pass info to the user. * - * @param h Cadet handle. + * @param cls Closure (Cadet handle). * @param message Message itself. */ static void -process_get_peer (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +handle_get_peer (void *cls, + const struct GNUNET_CADET_LocalInfoPeer *message) { - struct GNUNET_CADET_LocalInfoPeer *msg; - struct GNUNET_PeerIdentity *id; - unsigned int epaths; + struct GNUNET_CADET_Handle *h = cls; + const struct GNUNET_PeerIdentity *paths_array; unsigned int paths; unsigned int path_length; - unsigned int i; int neighbor; - size_t esize; - size_t msize; + unsigned int peers; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Info Peer messasge received\n"); if (NULL == h->info_cb.peer_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ignored\n"); return; - } - - /* Verify message sanity */ - msg = (struct GNUNET_CADET_LocalInfoPeer *) message; - esize = ntohs (message->size); - msize = sizeof (struct GNUNET_CADET_LocalInfoPeer); - if (esize < msize) - { - GNUNET_break_op (0); - h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL); - goto clean_cls; - } - epaths = (unsigned int) ntohs (msg->paths); - paths = 0; + paths = ntohs (message->paths); + paths_array = (const struct GNUNET_PeerIdentity *) &message[1]; + peers = (ntohs (message->header.size) - sizeof (*message)) + / sizeof (struct GNUNET_PeerIdentity); path_length = 0; neighbor = GNUNET_NO; - id = (struct GNUNET_PeerIdentity *) &msg[1]; - for (i = 0; msize < esize; i++) + + for (unsigned int i = 0; i < peers; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " %s\n", GNUNET_i2s (&id[i])); - msize += sizeof (struct GNUNET_PeerIdentity); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " %s\n", + GNUNET_i2s (&paths_array[i])); path_length++; - if (0 == memcmp (&id[i], &msg->destination, + if (0 == memcmp (&paths_array[i], &message->destination, sizeof (struct GNUNET_PeerIdentity))) { if (1 == path_length) neighbor = GNUNET_YES; path_length = 0; - paths++; } } - if (msize != esize) - { - GNUNET_break_op (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "m:%u, e: %u\n", - (unsigned int) msize, - (unsigned int) esize); - h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL); - goto clean_cls; - } - if (paths != epaths) - { - GNUNET_break_op (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "p:%u, e: %u\n", paths, epaths); - h->info_cb.peer_cb (h->info_cls, NULL, 0, 0, 0, NULL); - goto clean_cls; - } /* Call Callback with tunnel info. */ - id = (struct GNUNET_PeerIdentity *) &msg[1]; + paths_array = (const struct GNUNET_PeerIdentity *) &message[1]; h->info_cb.peer_cb (h->info_cls, - &msg->destination, - (int) ntohs (msg->tunnel), + &message->destination, + (int) ntohs (message->tunnel), neighbor, paths, - id); - - clean_cls: - h->info_cb.peer_cb = NULL; - h->info_cls = NULL; + paths_array); } /** * Process a local reply about info on all tunnels, pass info to the user. * - * @param h Cadet handle. + * @param cls Closure (Cadet handle). * @param message Message itself. */ static void -process_get_tunnels (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +handle_get_tunnels (void *cls, + const struct GNUNET_CADET_LocalInfoTunnel *msg) { - struct GNUNET_CADET_LocalInfoTunnel *msg; - uint16_t size; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Tunnels messasge received\n"); + struct GNUNET_CADET_Handle *h = cls; if (NULL == h->info_cb.tunnels_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ignored\n"); return; - } - - size = ntohs (message->size); - if (sizeof (struct GNUNET_CADET_LocalInfoTunnel) > size) - { - h->info_cb.tunnels_cb (h->info_cls, NULL, 0, 0, 0, 0); - h->info_cb.tunnels_cb = NULL; - h->info_cls = NULL; - return; - } - - msg = (struct GNUNET_CADET_LocalInfoTunnel *) message; - h->info_cb.tunnels_cb (h->info_cls, &msg->destination, - ntohl (msg->channels), ntohl (msg->connections), - ntohs (msg->estate), ntohs (msg->cstate)); + h->info_cb.tunnels_cb (h->info_cls, + &msg->destination, + ntohl (msg->channels), + ntohl (msg->connections), + ntohs (msg->estate), + ntohs (msg->cstate)); } /** - * Process a local tunnel info reply, pass info to the user. + * Check that message received from CADET service is well-formed. * - * @param h Cadet handle. - * @param message Message itself. + * @param cls the `struct GNUNET_CADET_Handle` + * @param msg the message we got + * @return #GNUNET_OK if the message is well-formed, + * #GNUNET_SYSERR otherwise */ -static void -process_get_tunnel (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *message) +static int +check_get_tunnel (void *cls, + const struct GNUNET_CADET_LocalInfoTunnel *msg) { - struct GNUNET_CADET_LocalInfoTunnel *msg; - size_t esize; - size_t msize; unsigned int ch_n; unsigned int c_n; - struct GNUNET_CADET_Hash *conns; - CADET_ChannelNumber *chns; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Get Tunnel messasge received\n"); - if (NULL == h->info_cb.tunnel_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ignored\n"); - return; - } + size_t esize; + size_t msize; /* Verify message sanity */ - msg = (struct GNUNET_CADET_LocalInfoTunnel *) message; - msize = ntohs (message->size); + msize = ntohs (msg->header.size); esize = sizeof (struct GNUNET_CADET_LocalInfoTunnel); if (esize > msize) { - GNUNET_break_op (0); - h->info_cb.tunnel_cb (h->info_cls, NULL, 0, 0, NULL, NULL, 0, 0); - goto clean_cls; + GNUNET_break (0); + return GNUNET_SYSERR; } ch_n = ntohl (msg->channels); c_n = ntohl (msg->connections); - esize += ch_n * sizeof (CADET_ChannelNumber); - esize += c_n * sizeof (struct GNUNET_CADET_Hash); + esize += ch_n * sizeof (struct GNUNET_CADET_ChannelTunnelNumber); + esize += c_n * sizeof (struct GNUNET_CADET_ConnectionTunnelIdentifier); if (msize != esize) { GNUNET_break_op (0); @@ -1215,434 +1007,197 @@ process_get_tunnel (struct GNUNET_CADET_Handle *h, (unsigned int) esize, ch_n, c_n); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u (%u ch, %u conn)\n", - (unsigned int) sizeof (struct GNUNET_CADET_LocalInfoTunnel), - (unsigned int) sizeof (CADET_ChannelNumber), - (unsigned int) sizeof (struct GNUNET_HashCode)); - h->info_cb.tunnel_cb (h->info_cls, NULL, 0, 0, NULL, NULL, 0, 0); - goto clean_cls; - } - - /* Call Callback with tunnel info. */ - conns = (struct GNUNET_CADET_Hash *) &msg[1]; - chns = (CADET_ChannelNumber *) &conns[c_n]; - h->info_cb.tunnel_cb (h->info_cls, &msg->destination, - ch_n, c_n, chns, conns, - ntohs (msg->estate), ntohs (msg->cstate)); - -clean_cls: - h->info_cb.tunnel_cb = NULL; - h->info_cls = NULL; -} - - -/** - * Function to process all messages received from the service - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -msg_received (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CADET_Handle *h = cls; - uint16_t type; - - if (msg == NULL) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Cadet service disconnected, reconnecting\n", h); - reconnect (h); - return; - } - type = ntohs (msg->type); - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received a message: %s\n", - GC_m2s (type)); - switch (type) - { - /* Notify of a new incoming channel */ - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: - process_channel_created (h, - (struct GNUNET_CADET_ChannelCreateMessage *) msg); - break; - /* Notify of a channel disconnection */ - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: /* TODO separate(gid problem)*/ - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_NACK: - process_channel_destroy (h, - (struct GNUNET_CADET_ChannelDestroyMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA: - process_incoming_data (h, msg); - break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK: - process_ack (h, msg); - break; -// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS: -// process_get_channels (h, msg); -// break; -// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: -// process_show_channel (h, msg); -// break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS: - process_get_peers (h, msg); - break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER: - process_get_peer (h, msg); - break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS: - process_get_tunnels (h, msg); - break; - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL: - process_get_tunnel (h, msg); - break; -// case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: -// process_show_channel (h, msg); -// break; - default: - /* We shouldn't get any other packages, log and ignore */ - LOG (GNUNET_ERROR_TYPE_WARNING, - "unsolicited message form service (type %s)\n", - GC_m2s (ntohs (msg->type))); - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "message processed\n"); - if (GNUNET_YES == h->in_receive) - { - GNUNET_CLIENT_receive (h->client, &msg_received, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "in receive off, not calling CLIENT_receive\n"); - } -} - - -/******************************************************************************/ -/************************ SEND FUNCTIONS ****************************/ -/******************************************************************************/ - -/** - * Function called to send a message to the service. - * "buf" will be NULL and "size" zero if the socket was closed for writing in - * the meantime. - * - * @param cls closure, the cadet handle - * @param size number of bytes available in buf - * @param buf where the callee should write the connect message - * @return number of bytes written to buf - */ -static size_t -send_callback (void *cls, size_t size, void *buf) -{ - struct GNUNET_CADET_Handle *h = cls; - struct GNUNET_CADET_TransmitHandle *th; - struct GNUNET_CADET_TransmitHandle *next; - struct GNUNET_CADET_Channel *ch; - char *cbuf = buf; - size_t tsize; - size_t psize; - size_t nsize; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback, buffer %u\n", size); - if ((0 == size) || (NULL == buf)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "# Received NULL send callback on %p\n", h); - reconnect (h); - h->th = NULL; - return 0; - } - tsize = 0; - next = h->th_head; - nsize = message_ready_size (h); - while ((NULL != (th = next)) && (0 < nsize) && (size >= nsize)) - { - ch = th->channel; - if (GNUNET_YES == th_is_payload (th)) - { - struct GNUNET_CADET_LocalData *dmsg; - struct GNUNET_MessageHeader *mh; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "# payload, %u bytes on %X (%p)\n", - th->size, ch->chid, ch); - if (GNUNET_NO == ch->allow_send) - { - /* This channel is not ready to transmit yet, Try the next message */ - next = th->next; - continue; - } - ch->packet_size = 0; - GNUNET_assert (size >= th->size); - dmsg = (struct GNUNET_CADET_LocalData *) cbuf; - mh = (struct GNUNET_MessageHeader *) &dmsg[1]; - psize = th->notify (th->notify_cls, size - DATA_OVERHEAD, mh); - - if (psize > 0) - { - GNUNET_assert (sizeof (struct GNUNET_MessageHeader) <= psize); - psize += DATA_OVERHEAD; - GNUNET_assert (size >= psize); - dmsg->header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA); - dmsg->header.size = htons (psize); - dmsg->id = htonl (ch->chid); - LOG (GNUNET_ERROR_TYPE_DEBUG, "# sending, type %s\n", - GC_m2s (ntohs (mh->type))); - ch->allow_send = GNUNET_NO; - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "# callback returned size 0, " - "application canceled transmission\n"); - } - } - else - { - const struct GNUNET_MessageHeader *mh; - - mh = (const struct GNUNET_MessageHeader *) &th[1]; - LOG (GNUNET_ERROR_TYPE_DEBUG, "# cadet internal traffic, type %s\n", - GC_m2s (ntohs (mh->type))); - GNUNET_memcpy (cbuf, &th[1], th->size); - psize = th->size; - } - GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= psize); - if (th->timeout_task != NULL) - GNUNET_SCHEDULER_cancel (th->timeout_task); - next = th->next; - GNUNET_CONTAINER_DLL_remove (h->th_head, h->th_tail, th); - GNUNET_free (th); - nsize = message_ready_size (h); - cbuf += psize; - size -= psize; - tsize += psize; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "# total size: %u\n", tsize); - h->th = NULL; - size = message_ready_size (h); - if (0 != size) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "# next size: %u\n", size); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, &send_callback, h); - } - else - { - if (NULL != h->th_head) - LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing ready to transmit\n"); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, "# nothing left to transmit\n"); - } - if (GNUNET_NO == h->in_receive) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, "# start receiving from service\n"); - h->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &msg_received, h, - GNUNET_TIME_UNIT_FOREVER_REL); + return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "# Send callback() END\n"); - return tsize; + return GNUNET_OK; } /** - * Auxiliary function to send an already constructed packet to the service. - * Takes care of creating a new queue element, copying the message and - * calling the tmt_rdy function if necessary. + * Process a local tunnel info reply, pass info to the user. * - * @param h cadet handle - * @param msg message to transmit - * @param channel channel this send is related to (NULL if N/A) + * @param cls Closure (Cadet handle). + * @param msg Message itself. */ static void -send_packet (struct GNUNET_CADET_Handle *h, - const struct GNUNET_MessageHeader *msg, - struct GNUNET_CADET_Channel *channel) +handle_get_tunnel (void *cls, + const struct GNUNET_CADET_LocalInfoTunnel *msg) { - struct GNUNET_CADET_TransmitHandle *th; - size_t msize; + struct GNUNET_CADET_Handle *h = cls; + unsigned int ch_n; + unsigned int c_n; + const struct GNUNET_CADET_ConnectionTunnelIdentifier *conns; + const struct GNUNET_CADET_ChannelTunnelNumber *chns; - LOG (GNUNET_ERROR_TYPE_DEBUG, " Sending message to service: %s\n", - GC_m2s(ntohs(msg->type))); - msize = ntohs (msg->size); - th = GNUNET_malloc (sizeof (struct GNUNET_CADET_TransmitHandle) + msize); - th->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; - th->size = msize; - th->channel = channel; - GNUNET_memcpy (&th[1], msg, msize); - add_to_queue (h, th); - if (NULL != h->th) + if (NULL == h->info_cb.tunnel_cb) return; - LOG (GNUNET_ERROR_TYPE_DEBUG, " calling ntfy tmt rdy for %u bytes\n", msize); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, &send_callback, h); -} + ch_n = ntohl (msg->channels); + c_n = ntohl (msg->connections); -/******************************************************************************/ -/********************** API CALL DEFINITIONS *************************/ -/******************************************************************************/ + /* Call Callback with tunnel info. */ + conns = (const struct GNUNET_CADET_ConnectionTunnelIdentifier *) &msg[1]; + chns = (const struct GNUNET_CADET_ChannelTunnelNumber *) &conns[c_n]; + h->info_cb.tunnel_cb (h->info_cls, + &msg->destination, + ch_n, + c_n, + chns, + conns, + ntohs (msg->estate), + ntohs (msg->cstate)); +} -struct GNUNET_CADET_Handle * -GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls, - GNUNET_CADET_ChannelEndHandler cleaner, - const struct GNUNET_CADET_MessageHandler *handlers) -{ - struct GNUNET_CADET_Handle *h; - LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect()\n"); - h = GNUNET_new (struct GNUNET_CADET_Handle); - LOG (GNUNET_ERROR_TYPE_DEBUG, " addr %p\n", h); - h->cfg = cfg; - h->cleaner = cleaner; - h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES); - h->client = GNUNET_CLIENT_connect ("cadet", cfg); - if (h->client == NULL) +/** + * Reconnect to the service, retransmit all infomation to try to restore the + * original state. + * + * @param h handle to the cadet + */ +static void +reconnect (struct GNUNET_CADET_Handle *h) +{ + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (channel_created, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE, + struct GNUNET_CADET_LocalChannelCreateMessage, + h), + GNUNET_MQ_hd_fixed_size (channel_destroy, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY, + struct GNUNET_CADET_LocalChannelDestroyMessage, + h), + GNUNET_MQ_hd_var_size (local_data, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA, + struct GNUNET_CADET_LocalData, + h), + GNUNET_MQ_hd_fixed_size (local_ack, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK, + struct GNUNET_CADET_LocalAck, + h), + GNUNET_MQ_hd_fixed_size (get_peers, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS, + struct GNUNET_CADET_LocalInfoPeer, + h), + GNUNET_MQ_hd_var_size (get_peer, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER, + struct GNUNET_CADET_LocalInfoPeer, + h), + GNUNET_MQ_hd_fixed_size (get_tunnels, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS, + struct GNUNET_CADET_LocalInfoTunnel, + h), + GNUNET_MQ_hd_var_size (get_tunnel, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL, + struct GNUNET_CADET_LocalInfoTunnel, + h), + GNUNET_MQ_handler_end () + }; + + h->mq = GNUNET_CLIENT_connect (h->cfg, + "cadet", + handlers, + &handle_mq_error, + h); + if (NULL == h->mq) { - GNUNET_break (0); - GNUNET_free (h); - return NULL; + schedule_reconnect (h); + return; } - h->cls = cls; - h->message_handlers = handlers; - h->next_chid = GNUNET_CADET_LOCAL_CHANNEL_ID_CLI; h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; - h->reconnect_task = NULL; - - /* count handlers */ - for (h->n_handlers = 0; - handlers && handlers[h->n_handlers].type; - h->n_handlers++) ; - LOG (GNUNET_ERROR_TYPE_DEBUG, "GNUNET_CADET_connect() END\n"); - return h; } -void -GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle) +/** + * Function called during #GNUNET_CADET_disconnect() to destroy + * all channels that are still open. + * + * @param cls the `struct GNUNET_CADET_Handle` + * @param cid chanenl ID + * @param value a `struct GNUNET_CADET_Channel` to destroy + * @return #GNUNET_OK (continue to iterate) + */ +static int +destroy_channel_cb (void *cls, + uint32_t cid, + void *value) { - struct GNUNET_CADET_Channel *ch; - struct GNUNET_CADET_Channel *aux; - struct GNUNET_CADET_TransmitHandle *th; + /* struct GNUNET_CADET_Handle *handle = cls; */ + struct GNUNET_CADET_Channel *ch = value; - LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET DISCONNECT\n"); - - ch = handle->channels_head; - while (NULL != ch) + if (ntohl (ch->ccn.channel_of_client) >= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI) { - aux = ch->next; - if (ch->chid < GNUNET_CADET_LOCAL_CHANNEL_ID_SERV) - { - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_DEBUG, "channel %X not destroyed\n", ch->chid); - } - destroy_channel (ch, GNUNET_YES); - ch = aux; + GNUNET_break (0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "channel %X not destroyed\n", + ntohl (ch->ccn.channel_of_client)); } - while ( (th = handle->th_head) != NULL) - { - struct GNUNET_MessageHeader *msg; + destroy_channel (ch); + return GNUNET_OK; +} - /* Make sure it is an allowed packet (everything else should have been - * already canceled). - */ - GNUNET_break (GNUNET_NO == th_is_payload (th)); - msg = (struct GNUNET_MessageHeader *) &th[1]; - switch (ntohs(msg->type)) - { - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE: - case GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNELS: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL: - case GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS: - break; - default: - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_ERROR, "unexpected unsent msg %s\n", - GC_m2s (ntohs(msg->type))); - } - GNUNET_CONTAINER_DLL_remove (handle->th_head, handle->th_tail, th); - GNUNET_free (th); - } +/** + * Function called during #GNUNET_CADET_disconnect() to destroy + * all ports that are still open. + * + * @param cls the `struct GNUNET_CADET_Handle` + * @param id port ID + * @param value a `struct GNUNET_CADET_Channel` to destroy + * @return #GNUNET_OK (continue to iterate) + */ +static int +destroy_port_cb (void *cls, + const struct GNUNET_HashCode *id, + void *value) +{ + /* struct GNUNET_CADET_Handle *handle = cls; */ + struct GNUNET_CADET_Port *port = value; - if (NULL != handle->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); - handle->th = NULL; - } - if (NULL != handle->client) + /* This is a warning, the app should have cleanly closed all open ports */ + GNUNET_break (0); + GNUNET_CADET_close_port (port); + return GNUNET_OK; +} + + +/** + * Disconnect from the cadet service. All channels will be destroyed. All channel + * disconnect callbacks will be called on any still connected peers, notifying + * about their disconnection. The registered inbound channel cleaner will be + * called should any inbound channels still exist. + * + * @param handle connection to cadet to disconnect + */ +void +GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle *handle) +{ + GNUNET_CONTAINER_multihashmap_iterate (handle->ports, + &destroy_port_cb, + handle); + GNUNET_CONTAINER_multihashmap_destroy (handle->ports); + handle->ports = NULL; + GNUNET_CONTAINER_multihashmap32_iterate (handle->channels, + &destroy_channel_cb, + handle); + GNUNET_CONTAINER_multihashmap32_destroy (handle->channels); + handle->channels = NULL; + if (NULL != handle->mq) { - GNUNET_CLIENT_disconnect (handle->client); - handle->client = NULL; + GNUNET_MQ_destroy (handle->mq); + handle->mq = NULL; } if (NULL != handle->reconnect_task) { - GNUNET_SCHEDULER_cancel(handle->reconnect_task); + GNUNET_SCHEDULER_cancel (handle->reconnect_task); handle->reconnect_task = NULL; } - - GNUNET_CONTAINER_multihashmap_destroy (handle->ports); - handle->ports = NULL; GNUNET_free (handle); } /** - * Open a port to receive incomming channels. - * - * @param h CADET handle. - * @param port Hash representing the port number. - * @param new_channel Function called when an channel is received. - * @param new_channel_cls Closure for @a new_channel. - * - * @return Port handle. - */ -struct GNUNET_CADET_Port * -GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, - const struct GNUNET_HashCode *port, - GNUNET_CADET_InboundChannelNotificationHandler - new_channel, - void *new_channel_cls) -{ - struct GNUNET_CADET_Port *p; - struct GNUNET_CADET_PortMessage msg; - - GNUNET_assert (NULL != new_channel); - p = GNUNET_new (struct GNUNET_CADET_Port); - p->cadet = h; - p->hash = GNUNET_new (struct GNUNET_HashCode); - *p->hash = *port; - p->handler = new_channel; - p->cls = new_channel_cls; - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (h->ports, - p->hash, - p, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); - msg.port = *p->hash; - send_packet (p->cadet, &msg.header, NULL); - - return p; -} - -/** - * Close a port opened with @a GNUNET_CADET_open_port. + * Close a port opened with @a GNUNET_CADET_open_port(). * The @a new_channel callback will no longer be called. * * @param p Port handle. @@ -1650,102 +1205,48 @@ GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, void GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p) { - struct GNUNET_CADET_PortMessage msg; - - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE); - msg.port = *p->hash; - send_packet (p->cadet, &msg.header, NULL); - GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p); - GNUNET_free (p->hash); + struct GNUNET_CADET_PortMessage *msg; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE); + msg->port = p->id; + GNUNET_MQ_send (p->cadet->mq, + env); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, + &p->id, + p)); + GNUNET_free_non_null (p->handlers); GNUNET_free (p); } /** - * Create a new channel towards a remote peer. - * - * If the destination port is not open by any peer or the destination peer - * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called - * for this channel. + * Destroy an existing channel. * - * @param h cadet handle - * @param channel_ctx client's channel context to associate with the channel - * @param peer peer identity the channel should go to - * @param port Port hash (port number). - * @param options CadetOption flag field, with all desired option bits set to 1. + * The existing end callback for the channel will be called immediately. + * Any pending outgoing messages will be sent but no incoming messages will be + * accepted and no data callbacks will be called. * - * @return handle to the channel + * @param channel Channel handle, becomes invalid after this call. */ -struct GNUNET_CADET_Channel * -GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, - void *channel_ctx, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) -{ - struct GNUNET_CADET_Channel *ch; - struct GNUNET_CADET_ChannelCreateMessage msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Creating new channel to %s:%u\n", - GNUNET_i2s (peer), port); - ch = create_channel (h, 0); - LOG (GNUNET_ERROR_TYPE_DEBUG, " at %p\n", ch); - LOG (GNUNET_ERROR_TYPE_DEBUG, " number %X\n", ch->chid); - ch->ctx = channel_ctx; - ch->peer = GNUNET_PEER_intern (peer); - - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_CREATE); - msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelCreateMessage)); - msg.channel_id = htonl (ch->chid); - msg.port = *port; - msg.peer = *peer; - msg.opt = htonl (options); - ch->allow_send = GNUNET_NO; - send_packet (h, &msg.header, ch); - - return ch; -} - - void GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel) { - struct GNUNET_CADET_Handle *h; - struct GNUNET_CADET_ChannelDestroyMessage msg; - struct GNUNET_CADET_TransmitHandle *th; + struct GNUNET_CADET_Handle *h = channel->cadet; + struct GNUNET_CADET_LocalChannelDestroyMessage *msg; + struct GNUNET_MQ_Envelope *env; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Destroying channel\n"); - h = channel->cadet; - - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_CHANNEL_DESTROY); - msg.header.size = htons (sizeof (struct GNUNET_CADET_ChannelDestroyMessage)); - msg.channel_id = htonl (channel->chid); - th = h->th_head; - while (th != NULL) + if (NULL != h->mq) { - struct GNUNET_CADET_TransmitHandle *aux; - if (th->channel == channel) - { - aux = th->next; - if (GNUNET_YES == th_is_payload (th)) - { - /* applications should cancel before destroying channel */ - GNUNET_break (0); - LOG (GNUNET_ERROR_TYPE_WARNING, - "Channel destroyed without cancelling transmission requests\n"); - th->notify (th->notify_cls, 0, NULL); - } - GNUNET_CADET_notify_transmit_ready_cancel (th); - th = aux; - } - else - th = th->next; + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY); + msg->ccn = channel->ccn; + GNUNET_MQ_send (h->mq, + env); } - - destroy_channel (channel, GNUNET_YES); - send_packet (h, &msg.header, NULL); + destroy_channel (channel); } @@ -1760,118 +1261,71 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel *channel) */ const union GNUNET_CADET_ChannelInfo * GNUNET_CADET_channel_get_info (struct GNUNET_CADET_Channel *channel, - enum GNUNET_CADET_ChannelOption option, ...) + enum GNUNET_CADET_ChannelOption option, + ...) { static int bool_flag; - const union GNUNET_CADET_ChannelInfo *ret; switch (option) { case GNUNET_CADET_OPTION_NOBUFFER: case GNUNET_CADET_OPTION_RELIABLE: - case GNUNET_CADET_OPTION_OOORDER: + case GNUNET_CADET_OPTION_OUT_OF_ORDER: if (0 != (option & channel->options)) bool_flag = GNUNET_YES; else bool_flag = GNUNET_NO; - ret = (const union GNUNET_CADET_ChannelInfo *) &bool_flag; + return (const union GNUNET_CADET_ChannelInfo *) &bool_flag; break; case GNUNET_CADET_OPTION_PEER: - ret = (const union GNUNET_CADET_ChannelInfo *) GNUNET_PEER_resolve2 (channel->peer); + return (const union GNUNET_CADET_ChannelInfo *) &channel->peer; break; default: GNUNET_break (0); return NULL; } - - return ret; -} - -struct GNUNET_CADET_TransmitHandle * -GNUNET_CADET_notify_transmit_ready (struct GNUNET_CADET_Channel *channel, int cork, - struct GNUNET_TIME_Relative maxdelay, - size_t notify_size, - GNUNET_CONNECTION_TransmitReadyNotify notify, - void *notify_cls) -{ - struct GNUNET_CADET_TransmitHandle *th; - - GNUNET_assert (NULL != channel); - GNUNET_assert (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE >= notify_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X\n", channel->chid); - LOG (GNUNET_ERROR_TYPE_DEBUG, " allow_send %d\n", channel->allow_send); - if (channel->chid >= GNUNET_CADET_LOCAL_CHANNEL_ID_SERV) - LOG (GNUNET_ERROR_TYPE_DEBUG, " to origin\n"); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, " to destination\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, " payload size %u\n", notify_size); - GNUNET_assert (NULL != notify); - GNUNET_assert (0 == channel->packet_size); // Only one data packet allowed - th = GNUNET_new (struct GNUNET_CADET_TransmitHandle); - th->channel = channel; - th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); - th->size = notify_size + DATA_OVERHEAD; - channel->packet_size = th->size; - LOG (GNUNET_ERROR_TYPE_DEBUG, " total size %u\n", th->size); - th->notify = notify; - th->notify_cls = notify_cls; - add_to_queue (channel->cadet, th); - if (NULL != channel->cadet->th) - return th; - if (GNUNET_NO == channel->allow_send) - return th; - LOG (GNUNET_ERROR_TYPE_DEBUG, " call client notify tmt rdy\n"); - channel->cadet->th = - GNUNET_CLIENT_notify_transmit_ready (channel->cadet->client, th->size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, &send_callback, - channel->cadet); - LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY END\n"); - return th; -} - - -void -GNUNET_CADET_notify_transmit_ready_cancel (struct GNUNET_CADET_TransmitHandle *th) -{ - struct GNUNET_CADET_Handle *cadet; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL\n"); - LOG (GNUNET_ERROR_TYPE_DEBUG, " on channel %X (%p)\n", - th->channel->chid, th->channel); - LOG (GNUNET_ERROR_TYPE_DEBUG, " size %u bytes\n", th->size); - th->channel->packet_size = 0; - cadet = th->channel->cadet; - if (th->timeout_task != NULL) - GNUNET_SCHEDULER_cancel (th->timeout_task); - GNUNET_CONTAINER_DLL_remove (cadet->th_head, cadet->th_tail, th); - GNUNET_free (th); - if ((0 == message_ready_size (cadet)) && (NULL != cadet->th)) - { - /* queue empty, no point in asking for transmission */ - GNUNET_CLIENT_notify_transmit_ready_cancel (cadet->th); - cadet->th = NULL; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, "CADET NOTIFY TRANSMIT READY CANCEL END\n"); } +/** + * Send an ack on the channel to confirm the processing of a message. + * + * @param ch Channel on which to send the ACK. + */ void GNUNET_CADET_receive_done (struct GNUNET_CADET_Channel *channel) { - send_ack (channel); + struct GNUNET_CADET_LocalAck *msg; + struct GNUNET_MQ_Envelope *env; + + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_ACK); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Sending ACK on channel %X\n", + ntohl (channel->ccn.channel_of_client)); + msg->ccn = channel->ccn; + GNUNET_MQ_send (channel->cadet->mq, + env); } +/** + * Send message of @a type to CADET service of @a h + * + * @param h handle to CADET service + * @param type message type of trivial information request to send + */ static void -send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type) +send_info_request (struct GNUNET_CADET_Handle *h, + uint16_t type) { - struct GNUNET_MessageHeader msg; + struct GNUNET_MessageHeader *msg; + struct GNUNET_MQ_Envelope *env; - msg.size = htons (sizeof (msg)); - msg.type = htons (type); - send_packet (h, &msg, NULL); + env = GNUNET_MQ_msg (msg, + type); + GNUNET_MQ_send (h->mq, + env); } @@ -1885,8 +1339,8 @@ send_info_request (struct GNUNET_CADET_Handle *h, uint16_t type) void GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "requesting dump\n"); - send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP); + send_info_request (h, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_DUMP); } @@ -1895,13 +1349,11 @@ GNUNET_CADET_request_dump (struct GNUNET_CADET_Handle *h) * The callback will be called for every peer known to the service. * Only one info request (of any kind) can be active at once. * - * * WARNING: unstable API, likely to change in the future! * * @param h Handle to the cadet peer. * @param callback Function to call with the requested data. * @param callback_cls Closure for @c callback. - * * @return #GNUNET_OK / #GNUNET_SYSERR */ int @@ -1914,7 +1366,8 @@ GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h, GNUNET_break (0); return GNUNET_SYSERR; } - send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS); + send_info_request (h, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEERS); h->info_cb.peers_cb = callback; h->info_cls = callback_cls; return GNUNET_OK; @@ -1927,15 +1380,13 @@ GNUNET_CADET_get_peers (struct GNUNET_CADET_Handle *h, * WARNING: unstable API, likely to change in the future! * * @param h Cadet handle. - * - * @return Closure given to GNUNET_CADET_get_peers. + * @return Closure given to GNUNET_CADET_get_peers(). */ void * GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h) { - void *cls; + void *cls = h->info_cls; - cls = h->info_cls; h->info_cb.peers_cb = NULL; h->info_cls = NULL; return cls; @@ -1953,28 +1404,27 @@ GNUNET_CADET_get_peers_cancel (struct GNUNET_CADET_Handle *h) * @param id Peer whose tunnel to examine. * @param callback Function to call with the requested data. * @param callback_cls Closure for @c callback. - * * @return #GNUNET_OK / #GNUNET_SYSERR */ int GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, - const struct GNUNET_PeerIdentity *id, - GNUNET_CADET_PeerCB callback, - void *callback_cls) + const struct GNUNET_PeerIdentity *id, + GNUNET_CADET_PeerCB callback, + void *callback_cls) { - struct GNUNET_CADET_LocalInfo msg; + struct GNUNET_CADET_LocalInfo *msg; + struct GNUNET_MQ_Envelope *env; if (NULL != h->info_cb.peer_cb) { GNUNET_break (0); return GNUNET_SYSERR; } - - memset (&msg, 0, sizeof (msg)); - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER); - msg.peer = *id; - send_packet (h, &msg.header, NULL); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_PEER); + msg->peer = *id; + GNUNET_MQ_send (h->mq, + env); h->info_cb.peer_cb = callback; h->info_cls = callback_cls; return GNUNET_OK; @@ -1991,7 +1441,6 @@ GNUNET_CADET_get_peer (struct GNUNET_CADET_Handle *h, * @param h Handle to the cadet peer. * @param callback Function to call with the requested data. * @param callback_cls Closure for @c callback. - * * @return #GNUNET_OK / #GNUNET_SYSERR */ int @@ -2004,7 +1453,8 @@ GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h, GNUNET_break (0); return GNUNET_SYSERR; } - send_info_request (h, GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS); + send_info_request (h, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNELS); h->info_cb.tunnels_cb = callback; h->info_cls = callback_cls; return GNUNET_OK; @@ -2015,23 +1465,19 @@ GNUNET_CADET_get_tunnels (struct GNUNET_CADET_Handle *h, * Cancel a monitor request. The monitor callback will not be called. * * @param h Cadet handle. - * - * @return Closure given to GNUNET_CADET_get_tunnels. + * @return Closure given to GNUNET_CADET_get_tunnels(). */ void * GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h) { - void *cls; + void *cls = h->info_cls; h->info_cb.tunnels_cb = NULL; - cls = h->info_cls; h->info_cls = NULL; - return cls; } - /** * Request information about a tunnel of the running cadet peer. * The callback will be called for the tunnel once. @@ -2043,7 +1489,6 @@ GNUNET_CADET_get_tunnels_cancel (struct GNUNET_CADET_Handle *h) * @param id Peer whose tunnel to examine. * @param callback Function to call with the requested data. * @param callback_cls Closure for @c callback. - * * @return #GNUNET_OK / #GNUNET_SYSERR */ int @@ -2052,19 +1497,19 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h, GNUNET_CADET_TunnelCB callback, void *callback_cls) { - struct GNUNET_CADET_LocalInfo msg; + struct GNUNET_CADET_LocalInfo *msg; + struct GNUNET_MQ_Envelope *env; if (NULL != h->info_cb.tunnel_cb) { GNUNET_break (0); return GNUNET_SYSERR; } - - memset (&msg, 0, sizeof (msg)); - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL); - msg.peer = *id; - send_packet (h, &msg.header, NULL); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL); + msg->peer = *id; + GNUNET_MQ_send (h->mq, + env); h->info_cb.tunnel_cb = callback; h->info_cls = callback_cls; return GNUNET_OK; @@ -2072,175 +1517,190 @@ GNUNET_CADET_get_tunnel (struct GNUNET_CADET_Handle *h, /** - * Request information about a specific channel of the running cadet peer. - * - * WARNING: unstable API, likely to change in the future! - * FIXME Add destination option. + * Transitional function to convert an unsigned int port to a hash value. + * WARNING: local static value returned, NOT reentrant! + * WARNING: do not use this function for new code! * - * @param h Handle to the cadet peer. - * @param initiator ID of the owner of the channel. - * @param channel_number Channel number. - * @param callback Function to call with the requested data. - * @param callback_cls Closure for @c callback. + * @param port Numerical port (unsigned int format). * - * @return #GNUNET_OK / #GNUNET_SYSERR + * @return A GNUNET_HashCode usable for the new CADET API. */ -int -GNUNET_CADET_show_channel (struct GNUNET_CADET_Handle *h, - struct GNUNET_PeerIdentity *initiator, - unsigned int channel_number, - GNUNET_CADET_ChannelCB callback, - void *callback_cls) +const struct GNUNET_HashCode * +GC_u2h (uint32_t port) { - struct GNUNET_CADET_LocalInfo msg; - - if (NULL != h->info_cb.channel_cb) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } + static struct GNUNET_HashCode hash; - msg.header.size = htons (sizeof (msg)); - msg.header.type = htons (GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_CHANNEL); - msg.peer = *initiator; - msg.channel_id = htonl (channel_number); -// msg.reserved = 0; - send_packet (h, &msg.header, NULL); - h->info_cb.channel_cb = callback; - h->info_cls = callback_cls; - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "This is a transitional function, use proper crypto hashes as CADET ports\n"); + GNUNET_CRYPTO_hash (&port, + sizeof (port), + &hash); + return &hash; } /** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. + * Connect to the MQ-based cadet service. * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cfg Configuration to use. + * + * @return Handle to the cadet service NULL on error. */ -static size_t -cadet_mq_ntr (void *cls, size_t size, - void *buf) +struct GNUNET_CADET_Handle * +GNUNET_CADET_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct GNUNET_MQ_Handle *mq = cls; - struct CadetMQState *state = GNUNET_MQ_impl_state (mq); - const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); - uint16_t msize; + struct GNUNET_CADET_Handle *h; - state->th = NULL; - if (NULL == buf) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "GNUNET_CADET_connect()\n"); + h = GNUNET_new (struct GNUNET_CADET_Handle); + h->cfg = cfg; + h->ports = GNUNET_CONTAINER_multihashmap_create (4, + GNUNET_YES); + h->channels = GNUNET_CONTAINER_multihashmap32_create (4); + reconnect (h); + if (NULL == h->mq) { - GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); - return 0; + GNUNET_break (0); + GNUNET_CADET_disconnect (h); + return NULL; } - msize = ntohs (msg->size); - GNUNET_assert (msize <= size); - GNUNET_memcpy (buf, msg, msize); - GNUNET_MQ_impl_send_continue (mq); - return msize; + h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI); + h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS; + h->reconnect_task = NULL; + + return h; } /** - * Signature of functions implementing the - * sending functionality of a message queue. + * Open a port to receive incomming MQ-based channels. * - * @param mq the message queue - * @param msg the message to send - * @param impl_state state of the implementation + * @param h CADET handle. + * @param port Hash identifying the port. + * @param connects Function called when an incoming channel is connected. + * @param connects_cls Closure for the @a connects handler. + * @param window_changes Function called when the transmit window size changes. + * @param disconnects Function called when a channel is disconnected. + * @param handlers Callbacks for messages we care about, NULL-terminated. + * @return Port handle. */ -static void -cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq, - const struct GNUNET_MessageHeader *msg, - void *impl_state) +struct GNUNET_CADET_Port * +GNUNET_CADET_open_port (struct GNUNET_CADET_Handle *h, + const struct GNUNET_HashCode *port, + GNUNET_CADET_ConnectEventHandler connects, + void * connects_cls, + GNUNET_CADET_WindowSizeEventHandler window_changes, + GNUNET_CADET_DisconnectEventHandler disconnects, + const struct GNUNET_MQ_MessageHandler *handlers) { - struct CadetMQState *state = impl_state; - - GNUNET_assert (NULL == state->th); - state->th = - GNUNET_CADET_notify_transmit_ready (state->channel, - /* FIXME: add option for corking */ - GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - ntohs (msg->size), - &cadet_mq_ntr, mq); - -} + struct GNUNET_CADET_PortMessage *msg; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_CADET_Port *p; + GNUNET_assert (NULL != connects); + GNUNET_assert (NULL != disconnects); -/** - * Signature of functions implementing the - * destruction of a message queue. - * Implementations must not free 'mq', but should - * take care of 'impl_state'. - * - * @param mq the message queue to destroy - * @param impl_state state of the implementation - */ -static void -cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, - void *impl_state) -{ - struct CadetMQState *state = impl_state; + p = GNUNET_new (struct GNUNET_CADET_Port); + p->cadet = h; + p->id = *port; + p->connects = connects; + p->cls = connects_cls; + p->window_changes = window_changes; + p->disconnects = disconnects; + p->handlers = GNUNET_MQ_copy_handlers (handlers); - if (NULL != state->th) - GNUNET_CADET_notify_transmit_ready_cancel (state->th); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (h->ports, + &p->id, + p, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - GNUNET_free (state); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN); + msg->port = p->id; + GNUNET_MQ_send (h->mq, + env); + return p; } /** - * Create a message queue for a cadet channel. - * The message queue can only be used to transmit messages, - * not to receive them. + * Create a new channel towards a remote peer. + * + * If the destination port is not open by any peer or the destination peer + * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called + * for this channel. * - * @param channel the channel to create the message qeue for - * @return a message queue to messages over the channel + * @param h CADET handle. + * @param channel_cls Closure for the channel. It's given to: + * - The disconnect handler @a disconnects + * - Each message type callback in @a handlers + * @param destination Peer identity the channel should go to. + * @param port Identification of the destination port. + * @param options CadetOption flag field, with all desired option bits set to 1. + * @param window_changes Function called when the transmit window size changes. + * @param disconnects Function called when the channel is disconnected. + * @param handlers Callbacks for messages we care about, NULL-terminated. + * @return Handle to the channel. */ -struct GNUNET_MQ_Handle * -GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel) +struct GNUNET_CADET_Channel * +GNUNET_CADET_channel_create (struct GNUNET_CADET_Handle *h, + void *channel_cls, + const struct GNUNET_PeerIdentity *destination, + const struct GNUNET_HashCode *port, + enum GNUNET_CADET_ChannelOption options, + GNUNET_CADET_WindowSizeEventHandler window_changes, + GNUNET_CADET_DisconnectEventHandler disconnects, + const struct GNUNET_MQ_MessageHandler *handlers) { - struct GNUNET_MQ_Handle *mq; - struct CadetMQState *state; - - state = GNUNET_new (struct CadetMQState); - state->channel = channel; - - mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, - &cadet_mq_destroy_impl, - NULL, /* FIXME: cancel impl. */ - state, - NULL, /* no msg handlers */ - NULL, /* no err handlers */ - NULL); /* no handler cls */ - return mq; + struct GNUNET_CADET_Channel *ch; + struct GNUNET_CADET_LocalChannelCreateMessage *msg; + struct GNUNET_MQ_Envelope *env; + + GNUNET_assert (NULL != disconnects); + ch = create_channel (h, + NULL); + ch->ctx = channel_cls; + ch->peer = *destination; + ch->options = options; + ch->window_changes = window_changes; + ch->disconnects = disconnects; + + /* Create MQ for channel */ + ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl, + &cadet_mq_destroy_impl, + &cadet_mq_cancel_impl, + ch, + handlers, + &cadet_mq_error_handler, + ch); + GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls); + + /* Request channel creation to service */ + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE); + msg->ccn = ch->ccn; + msg->port = *port; + msg->peer = *destination; + msg->opt = htonl (options); + GNUNET_MQ_send (h->mq, + env); + return ch; } /** - * Transitional function to convert an unsigned int port to a hash value. - * WARNING: local static value returned, NOT reentrant! - * WARNING: do not use this function for new code! + * Obtain the message queue for a connected peer. * - * @param port Numerical port (unsigned int format). + * @param channel The channel handle from which to get the MQ. * - * @return A GNUNET_HashCode usable for the new CADET API. + * @return NULL if @a channel is not yet connected. */ -const struct GNUNET_HashCode * -GC_u2h (uint32_t port) +struct GNUNET_MQ_Handle * +GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel) { - static struct GNUNET_HashCode hash; - - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "This is a transitional function, " - "use proper crypto hashes as CADET ports\n"); - GNUNET_CRYPTO_hash (&port, sizeof (port), &hash); - - return &hash; + return channel->mq; } + +/* end of cadet_api.c */