From: Christian Grothoff Date: Thu, 16 Feb 2017 22:24:48 +0000 (+0100) Subject: first, very rough conversion of SET service to new cadet client API (fails tests... X-Git-Tag: taler-0.2.1~163 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=376138d90fe4e97d8611a8b7dfb798e79aa427cb;p=oweals%2Fgnunet.git first, very rough conversion of SET service to new cadet client API (fails tests, as before) --- diff --git a/src/include/gnunet_cadet_service.h b/src/include/gnunet_cadet_service.h index 1b3aac7c9..6f71424fb 100644 --- a/src/include/gnunet_cadet_service.h +++ b/src/include/gnunet_cadet_service.h @@ -713,6 +713,7 @@ typedef void * struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source); + /** * Function called whenever an MQ-channel is destroyed, even if the destruction * was requested by #GNUNET_CADET_channel_destroy. @@ -728,6 +729,7 @@ typedef void (*GNUNET_CADET_DisconnectEventHandler) (void *cls, const struct GNUNET_CADET_Channel *channel); + /** * Function called whenever an MQ-channel's transmission window size changes. * @@ -747,6 +749,7 @@ typedef void const struct GNUNET_CADET_Channel *channel, int window_size); + /** * Connect to the MQ-based cadet service. * @@ -756,6 +759,7 @@ typedef void struct GNUNET_CADET_Handle * GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg); + /** * Open a port to receive incomming MQ-based channels. * @@ -810,6 +814,7 @@ GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers); + /** * Obtain the message queue for a connected channel. * diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 4e2eb6a28..3f1086891 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -1174,18 +1174,14 @@ handle_incoming_disconnect (struct Operation *op) * * @param cls closure * @param channel new handle to the channel - * @param initiator peer that started the channel - * @param port Port this channel is for. - * @param options Unused. + * @param source peer that started the channel * @return initial channel context for the channel * returns NULL on error */ static void * channel_new_cb (void *cls, struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) + const struct GNUNET_PeerIdentity *source) { static const struct SetVT incoming_vt = { .msg_handler = &handle_incoming_msg, @@ -1199,7 +1195,7 @@ channel_new_cb (void *cls, incoming = GNUNET_new (struct Operation); incoming->listener = listener; incoming->is_incoming = GNUNET_YES; - incoming->peer = *initiator; + incoming->peer = *source; incoming->channel = channel; incoming->mq = GNUNET_CADET_mq_create (incoming->channel); incoming->vt = &incoming_vt; @@ -1216,6 +1212,127 @@ channel_new_cb (void *cls, } +/** + * Function called whenever a channel is destroyed. Should clean up + * any associated state. It must NOT call + * GNUNET_CADET_channel_destroy() on the channel. + * + * The peer_disconnect function is part of a a virtual table set initially either + * when a peer creates a new channel with us, or once we create + * a new channel ourselves (evaluate). + * + * Once we know the exact type of operation (union/intersection), the vt is + * replaced with an operation specific instance (_GSS_[op]_vt). + * + * @param channel_ctx place where local state associated + * with the channel is stored + * @param channel connection to the other end (henceforth invalid) + */ +static void +channel_end_cb (void *channel_ctx, + const struct GNUNET_CADET_Channel *channel) +{ + struct Operation *op = channel_ctx; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_end_cb called\n"); + op->channel = NULL; + op->keep++; + /* the vt can be null if a client already requested canceling op. */ + if (NULL != op->vt) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "calling peer disconnect due to channel end\n"); + op->vt->peer_disconnect (op); + } + op->keep--; + if (0 == op->keep) + { + /* cadet will never call us with the context again! */ + GNUNET_free (op); + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "channel_end_cb finished\n"); +} + + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +channel_window_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: not implemented, we could do flow control here... */ +} + +/** + * FIXME: hack-job. Migrate to proper handler array use! + * + * @param cls local state associated with the channel. + * @param message The actual message. + */ +static int +check_p2p_message (void *cls, + const struct GNUNET_MessageHeader *message) +{ + return GNUNET_OK; +} + + +/** + * FIXME: hack-job. Migrate to proper handler array use! + * + * Functions with this signature are called whenever a message is + * received via a cadet channel. + * + * The msg_handler is a virtual table set in initially either when a peer + * creates a new channel with us, or once we create a new channel + * ourselves (evaluate). + * + * Once we know the exact type of operation (union/intersection), the vt is + * replaced with an operation specific instance (_GSS_[op]_vt). + * + * @param cls local state associated with the channel. + * @param message The actual message. + */ +static void +handle_p2p_message (void *cls, + const struct GNUNET_MessageHeader *message) +{ + struct Operation *op = cls; + int ret; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Dispatching cadet message (type: %u)\n", + ntohs (message->type)); + /* do this before the handler, as the handler might kill the channel */ + GNUNET_CADET_receive_done (op->channel); + if (NULL != op->vt) + ret = op->vt->msg_handler (op, + message); + else + ret = GNUNET_SYSERR; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Handled cadet message (type: %u)\n", + ntohs (message->type)); + if (GNUNET_OK != ret) + GNUNET_CADET_channel_destroy (op->channel); +} + + /** * Called when a client wants to create a new listener. * @@ -1227,6 +1344,57 @@ handle_client_listen (void *cls, const struct GNUNET_SET_ListenMessage *msg) { struct GNUNET_SERVICE_Client *client = cls; + struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_handler_end () + }; struct Listener *listener; struct Operation *op; @@ -1245,15 +1413,17 @@ handle_client_listen (void *cls, GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener); - listener->open_port = GNUNET_CADET_open_port (cadet, - &msg->app_id, - &channel_new_cb, - listener); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New listener created (op %u, port %s)\n", listener->operation, GNUNET_h2s (&listener->app_id)); - + listener->open_port = GNUNET_CADET_open_porT (cadet, + &msg->app_id, + &channel_new_cb, + listener, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); /* check for existing incoming requests the listener might be interested in */ for (op = incoming_head; NULL != op; op = op->next) { @@ -1428,15 +1598,67 @@ handle_client_evaluate (void *cls, const struct GNUNET_SET_EvaluateMessage *msg) { struct GNUNET_SERVICE_Client *client = cls; + struct Operation *op = GNUNET_new (struct Operation); + const struct GNUNET_MQ_MessageHandler cadet_handlers[] = { + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_hd_var_size (p2p_message, + GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, + struct GNUNET_MessageHeader, + op), + GNUNET_MQ_handler_end () + }; struct Set *set; struct OperationSpecification *spec; - struct Operation *op; const struct GNUNET_MessageHeader *context; set = set_get (client); if (NULL == set) { GNUNET_break (0); + GNUNET_free (op); GNUNET_SERVICE_client_drop (client); return; } @@ -1450,7 +1672,6 @@ handle_client_evaluate (void *cls, spec->result_mode = ntohl (msg->result_mode); spec->client_request_id = ntohl (msg->request_id); context = GNUNET_MQ_extract_nested_mh (msg); - op = GNUNET_new (struct Operation); op->spec = spec; // Advance generation values, so that @@ -1465,11 +1686,14 @@ handle_client_evaluate (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new CADET channel to port %s\n", GNUNET_h2s (&msg->app_id)); - op->channel = GNUNET_CADET_channel_create (cadet, + op->channel = GNUNET_CADET_channel_creatE (cadet, op, &msg->target_peer, &msg->app_id, - GNUNET_CADET_OPTION_RELIABLE); + GNUNET_CADET_OPTION_RELIABLE, + &channel_window_cb, + &channel_end_cb, + cadet_handlers); op->mq = GNUNET_CADET_mq_create (op->channel); set->vt->evaluate (op, context); @@ -1823,96 +2047,6 @@ shutdown_task (void *cls) } -/** - * Function called whenever a channel is destroyed. Should clean up - * any associated state. It must NOT call - * GNUNET_CADET_channel_destroy() on the channel. - * - * The peer_disconnect function is part of a a virtual table set initially either - * when a peer creates a new channel with us, or once we create - * a new channel ourselves (evaluate). - * - * Once we know the exact type of operation (union/intersection), the vt is - * replaced with an operation specific instance (_GSS_[op]_vt). - * - * @param cls closure (set from GNUNET_CADET_connect()) - * @param channel connection to the other end (henceforth invalid) - * @param channel_ctx place where local state associated - * with the channel is stored - */ -static void -channel_end_cb (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) -{ - struct Operation *op = channel_ctx; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "channel_end_cb called\n"); - op->channel = NULL; - op->keep++; - /* the vt can be null if a client already requested canceling op. */ - if (NULL != op->vt) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "calling peer disconnect due to channel end\n"); - op->vt->peer_disconnect (op); - } - op->keep--; - if (0 == op->keep) - { - /* cadet will never call us with the context again! */ - GNUNET_free (op); - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "channel_end_cb finished\n"); -} - - -/** - * Functions with this signature are called whenever a message is - * received via a cadet channel. - * - * The msg_handler is a virtual table set in initially either when a peer - * creates a new channel with us, or once we create a new channel - * ourselves (evaluate). - * - * Once we know the exact type of operation (union/intersection), the vt is - * replaced with an operation specific instance (_GSS_[op]_vt). - * - * @param cls Closure (set from GNUNET_CADET_connect()). - * @param channel Connection to the other end. - * @param channel_ctx Place to store local state associated with the channel. - * @param message The actual message. - * @return #GNUNET_OK to keep the channel open, - * #GNUNET_SYSERR to close it (signal serious error). - */ -static int -dispatch_p2p_message (void *cls, - struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) -{ - struct Operation *op = *channel_ctx; - int ret; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Dispatching cadet message (type: %u)\n", - ntohs (message->type)); - /* do this before the handler, as the handler might kill the channel */ - GNUNET_CADET_receive_done (channel); - if (NULL != op->vt) - ret = op->vt->msg_handler (op, - message); - else - ret = GNUNET_SYSERR; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Handled cadet message (type: %u)\n", - ntohs (message->type)); - return ret; -} - - /** * Function called by the service's run * method to run service-specific setup code. @@ -1926,31 +2060,11 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service) { - static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0}, - { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE, 0}, - {NULL, 0, 0} - }; - configuration = cfg; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); _GSS_statistics = GNUNET_STATISTICS_create ("set", cfg); - cadet = GNUNET_CADET_connect (cfg, - NULL, - &channel_end_cb, - cadet_handlers); + cadet = GNUNET_CADET_connecT (cfg); if (NULL == cadet) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR,