From a900b29ddaa9ea46c731b054b5e3ef3e725b95a8 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 19 Jun 2013 10:48:54 +0000 Subject: [PATCH] - opaque mq structs - mq for mesh - faster hashing for IBFs - mesh replaces stream in set - new set profiler (work in progress) --- src/consensus/consensus_api.c | 319 +++--------- src/consensus/gnunet-service-consensus.c | 28 +- src/dv/gnunet-service-dv.c | 16 +- src/include/gnunet_container_lib.h | 2 +- src/include/gnunet_mesh2_service.h | 17 +- src/include/gnunet_mq_lib.h | 309 ++++++------ src/include/gnunet_set_service.h | 30 +- src/include/gnunet_stream_lib.h | 4 +- src/mesh/mesh2_api.c | 131 ++++- src/set/Makefile.am | 24 +- src/set/gnunet-service-set.c | 278 +++++++---- src/set/gnunet-service-set.h | 73 ++- src/set/gnunet-service-set_union.c | 212 ++++---- ...et-set-ibf.c => gnunet-set-ibf-profiler.c} | 19 +- src/set/gnunet-set-profiler.c | 320 ++++++++++++ src/set/gnunet-set.c | 203 -------- src/set/ibf.c | 40 +- src/set/ibf.h | 2 +- src/set/set_api.c | 57 +-- src/set/strata_estimator.c | 2 +- src/set/strata_estimator.h | 2 +- src/set/test_set_api.c | 10 +- src/stream/stream_api.c | 103 ++-- src/util/mq.c | 464 +++++++++++++----- src/util/test_mq.c | 4 +- src/util/test_mq_client.c | 9 +- 26 files changed, 1550 insertions(+), 1128 deletions(-) rename src/set/{gnunet-set-ibf.c => gnunet-set-ibf-profiler.c} (92%) create mode 100644 src/set/gnunet-set-profiler.c delete mode 100644 src/set/gnunet-set.c diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index e3ddb4913..684580755 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -33,37 +33,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) -/** - * Actions that can be queued. - */ -struct QueuedMessage -{ - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - /** - * The actual queued message. - */ - struct GNUNET_MessageHeader *msg; - - /** - * Will be called after transmit, if not NULL - */ - GNUNET_CONSENSUS_InsertDoneCallback idc; - - /** - * Closure for idc - */ - void *idc_cls; -}; - /** * Handle for the service. @@ -105,21 +74,11 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_PeerIdentity **peers; - /** - * Currently active transmit request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - /** * GNUNES_YES iff the join message has been sent to the service. */ int joined; - /** - * Closure for the insert done callback. - */ - void *idc_cls; - /** * Called when the conclude operation finishes or fails. */ @@ -135,109 +94,36 @@ struct GNUNET_CONSENSUS_Handle */ struct GNUNET_TIME_Absolute conclude_deadline; - unsigned int conclude_min_size; - - struct QueuedMessage *messages_head; - - struct QueuedMessage *messages_tail; - + /** + * Message queue for the client. + */ + struct GNUNET_MQ_Handle *mq; }; - - /** - * Schedule transmitting the next message. - * - * @param consensus consensus handle + * FIXME: this should not bee necessary when the API + * issue has been fixed */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus); - - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_queued (void *cls, size_t size, - void *buf) +struct InsertDoneInfo { - struct GNUNET_CONSENSUS_Handle *consensus; - struct QueuedMessage *qmsg; - size_t msg_size; - - consensus = (struct GNUNET_CONSENSUS_Handle *) cls; - consensus->th = NULL; - - qmsg = consensus->messages_head; - GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); - - if (NULL == buf) - { - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - return 0; - } - - msg_size = ntohs (qmsg->msg->size); - - GNUNET_assert (size >= msg_size); - - memcpy (buf, qmsg->msg, msg_size); - if (NULL != qmsg->idc) - { - qmsg->idc (qmsg->idc_cls, GNUNET_YES); - } - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); - /* FIXME: free the messages */ - - send_next (consensus); - - return msg_size; -} - - -/** - * Schedule transmitting the next message. - * - * @param consensus consensus handle - */ -static void -send_next (struct GNUNET_CONSENSUS_Handle *consensus) -{ - if (NULL != consensus->th) - return; - - if (NULL != consensus->messages_head) - { - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_queued, consensus); - } -} + GNUNET_CONSENSUS_InsertDoneCallback idc; + void *cls; +}; /** * Called when the server has sent is a new element * - * @param consensus consensus handle - * @param msg element message + * @param cls consensus handle + * @param mh element message */ static void -handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) +handle_new_element (void *cls, + const struct GNUNET_MessageHeader *mh) { + struct GNUNET_CONSENSUS_Handle *consensus = cls; + const struct GNUNET_CONSENSUS_ElementMessage *msg + = (const struct GNUNET_CONSENSUS_ElementMessage *) mh; struct GNUNET_SET_Element element; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -247,8 +133,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, element.data = &msg[1]; consensus->new_element_cb (consensus->new_element_cls, &element); - - send_next (consensus); } @@ -256,13 +140,15 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, * Called when the server has announced * that the conclusion is over. * - * @param consensus consensus handle - * @param msg conclude done message + * @param cls consensus handle + * @param mh conclude done message */ static void -handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, +handle_conclude_done (void *cls, const struct GNUNET_MessageHeader *msg) { + struct GNUNET_CONSENSUS_Handle *consensus = cls; + GNUNET_CONSENSUS_ConcludeCallback cc; GNUNET_assert (NULL != (cc = consensus->conclude_cb)); @@ -271,89 +157,6 @@ handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, } -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_CONSENSUS_Handle *consensus = cls; - - LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n"); - - if (NULL == msg) - { - /* Error, timeout, death */ - LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); - GNUNET_CLIENT_disconnect (consensus->client); - consensus->client = NULL; - consensus->new_element_cb (consensus->new_element_cls, NULL); - return; - } - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: - handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); - break; - case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: - handle_conclude_done (consensus, msg); - break; - default: - GNUNET_break (0); - } -} - -/** - * Function called to notify a client about the connection - * begin ready to queue more data. "buf" will be - * NULL and "size" zero if the connection was closed for - * writing in the meantime. - * - * @param cls closure - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf - */ -static size_t -transmit_join (void *cls, size_t size, void *buf) -{ - struct GNUNET_CONSENSUS_JoinMessage *msg; - struct GNUNET_CONSENSUS_Handle *consensus; - int msize; - - GNUNET_assert (NULL != buf); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); - - consensus = cls; - consensus->th = NULL; - consensus->joined = 1; - - msg = buf; - - msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); - - msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); - msg->header.size = htons (msize); - msg->session_id = consensus->session_id; - msg->num_peers = htonl (consensus->num_peers); - memcpy(&msg[1], - consensus->peers, - consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - send_next (consensus); - GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, - GNUNET_TIME_UNIT_FOREVER_REL); - - return msize; -} - /** * Create a consensus session. * @@ -377,7 +180,15 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, void *new_element_cls) { struct GNUNET_CONSENSUS_Handle *consensus; - size_t join_message_size; + struct GNUNET_CONSENSUS_JoinMessage *join_msg; + struct GNUNET_MQ_Envelope *ev; + const static struct GNUNET_MQ_MessageHandler mq_handlers[] = { + {handle_new_element, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT, 0}, + {handle_conclude_done, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE, 0}, + GNUNET_MQ_HANDLERS_END + }; consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); consensus->cfg = cfg; @@ -393,24 +204,33 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client, + mq_handlers, consensus); GNUNET_assert (consensus->client != NULL); - join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + - (num_peers * sizeof (struct GNUNET_PeerIdentity)); - - consensus->th = - GNUNET_CLIENT_notify_transmit_ready (consensus->client, - join_message_size, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_join, consensus); + ev = GNUNET_MQ_msg_extra (join_msg, + (num_peers * sizeof (struct GNUNET_PeerIdentity)), + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); + join_msg->session_id = consensus->session_id; + join_msg->num_peers = htonl (consensus->num_peers); + memcpy(&join_msg[1], + consensus->peers, + consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); - GNUNET_assert (consensus->th != NULL); + GNUNET_MQ_send (consensus->mq, ev); return consensus; } +static void +idc_adapter (void *cls) +{ + struct InsertDoneInfo *i = cls; + i->idc (i->cls, GNUNET_OK); + GNUNET_free (i); +} /** * Insert an element in the set being reconsiled. Must not be called after @@ -428,28 +248,24 @@ GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { - struct QueuedMessage *qmsg; struct GNUNET_CONSENSUS_ElementMessage *element_msg; - size_t element_msg_size; + struct GNUNET_MQ_Envelope *ev; + struct InsertDoneInfo *i; LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); - element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + - element->size); + ev = GNUNET_MQ_msg_extra (element_msg, element->size, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg = GNUNET_malloc (element_msg_size); - element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); - element_msg->header.size = htons (element_msg_size); memcpy (&element_msg[1], element->data, element->size); - - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; - qmsg->idc = idc; - qmsg->idc_cls = idc_cls; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + + if (NULL != idc) + { + i = GNUNET_new (struct InsertDoneInfo); + i->idc = idc; + i->cls = idc_cls; + GNUNET_MQ_notify_sent (ev, idc_adapter, i); + } } @@ -471,7 +287,7 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, GNUNET_CONSENSUS_ConcludeCallback conclude, void *conclude_cls) { - struct QueuedMessage *qmsg; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; GNUNET_assert (NULL != conclude); @@ -480,17 +296,10 @@ GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, consensus->conclude_cls = conclude_cls; consensus->conclude_cb = conclude; - conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); - conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); - conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); + ev = GNUNET_MQ_msg (conclude_msg, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); - qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); - qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; - - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); - - send_next (consensus); + GNUNET_MQ_send (consensus->mq, ev); } diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 5ebff524c..1c2c78422 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -116,7 +116,7 @@ struct ConsensusSession /** * Queued messages to the client. */ - struct GNUNET_MQ_MessageQueue *client_mq; + struct GNUNET_MQ_Handle *client_mq; /** * Timeout for all rounds together, single rounds will schedule a timeout task @@ -217,9 +217,9 @@ struct ConsensusPeerInformation struct GNUNET_SET_OperationHandle *set_op; /** - * Has conclude been called on the set_op? + * Has commit been called on the set_op? */ - int set_op_concluded; + int set_op_commited; }; @@ -548,14 +548,14 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); } session->partner_outgoing->set_op = - GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, - &session->global_id, - (struct GNUNET_MessageHeader *) msg, - 0, /* FIXME */ - GNUNET_SET_RESULT_ADDED, - set_result_cb, session->partner_outgoing); - GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); - session->partner_outgoing->set_op_concluded = GNUNET_YES; + GNUNET_SET_prepare (&session->partner_outgoing->peer_id, + &session->global_id, + (struct GNUNET_MessageHeader *) msg, + 0, /* FIXME */ + GNUNET_SET_RESULT_ADDED, + set_result_cb, session->partner_outgoing); + GNUNET_SET_commit (session->partner_outgoing->set_op, session->element_set); + session->partner_outgoing->set_op_commited = GNUNET_YES; } #ifdef GNUNET_EXTRA_LOGGING @@ -767,12 +767,12 @@ set_listen_cb (void *cls, set_result_cb, &session->info[index]); if (ntohl (msg->exp_subround) == session->exp_subround) { - cpi->set_op_concluded = GNUNET_YES; - GNUNET_SET_conclude (cpi->set_op, session->element_set); + cpi->set_op_commited = GNUNET_YES; + GNUNET_SET_commit (cpi->set_op, session->element_set); } else { - cpi->set_op_concluded = GNUNET_NO; + cpi->set_op_commited = GNUNET_NO; } break; default: diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index b76fd9ab0..92ca5d9da 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -769,7 +769,7 @@ build_set (void *cls) if (DEFAULT_FISHEYE_DEPTH - 1 == neighbor->consensus_insertion_distance) { /* we have added all elements to the set, run the operation */ - GNUNET_SET_conclude (neighbor->set_op, + GNUNET_SET_commit (neighbor->set_op, neighbor->my_set); GNUNET_SET_destroy (neighbor->my_set); neighbor->my_set = NULL; @@ -1425,13 +1425,13 @@ initiate_set_union (void *cls, neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; neighbor->my_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); - neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer, - &neighbor->real_session_id, - NULL, - 0 /* FIXME: salt */, - GNUNET_SET_RESULT_ADDED, - &handle_set_union_result, - neighbor); + neighbor->set_op = GNUNET_SET_prepare (&neighbor->peer, + &neighbor->real_session_id, + NULL, + 0 /* FIXME: salt */, + GNUNET_SET_RESULT_ADDED, + &handle_set_union_result, + neighbor); build_set (neighbor); } diff --git a/src/include/gnunet_container_lib.h b/src/include/gnunet_container_lib.h index d52591148..1eb55a4c5 100644 --- a/src/include/gnunet_container_lib.h +++ b/src/include/gnunet_container_lib.h @@ -534,7 +534,7 @@ enum GNUNET_CONTAINER_MultiHashMapOption * GNUNET_NO if not. */ typedef int (*GNUNET_CONTAINER_HashMapIterator) (void *cls, - const struct GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, void *value); diff --git a/src/include/gnunet_mesh2_service.h b/src/include/gnunet_mesh2_service.h index c69ac2bfd..b6593cf99 100644 --- a/src/include/gnunet_mesh2_service.h +++ b/src/include/gnunet_mesh2_service.h @@ -162,7 +162,8 @@ typedef void (GNUNET_MESH_TunnelEndHandler) (void *cls, * the tunnel. * @param handlers Callbacks for messages we care about, NULL-terminated. Each * one must call GNUNET_MESH_receive_done on the tunnel to - * receive the next message. + * receive the next message. Messages of a type that is not + * in the handlers array are ignored if received. * @param ports NULL or 0-terminated array of port numbers for incoming tunnels. * * @return handle to the mesh service NULL on error @@ -325,7 +326,7 @@ typedef void (*GNUNET_MESH_TunnelCB) (void *cls, /** * Request information about the running mesh peer. * The callback will be called for every tunnel known to the service, - * listing all active peers that blong to the tunnel. + * listing all active peers that belong to the tunnel. * * If called again on the same handle, it will overwrite the previous * callback and cls. To retrieve the cls, monitor_cancel must be @@ -375,6 +376,18 @@ void * GNUNET_MESH_get_tunnels_cancel (struct GNUNET_MESH_Handle *h); +/** + * Create a message queue for a mesh tunnel. + * The message queue can only be used to transmit messages, + * not to receive them. + * + * @param tunnel the tunnel to create the message qeue for + * @return a message queue to messages over the tunnel + */ +struct GNUNET_MQ_Handle * +GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel); + + #if 0 /* keep Emacsens' auto-indent happy */ { #endif diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 54ea806a5..b73cab8d8 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h @@ -21,7 +21,7 @@ /** * @author Florian Dold * @file set/mq.h - * @brief general purpose request queue + * @brief general purpose message queue */ #ifndef GNUNET_MQ_H #define GNUNET_MQ_H @@ -30,7 +30,7 @@ /** - * Allocate a GNUNET_MQ_Message, with extra space allocated after the space needed + * Allocate an envelope, with extra space allocated after the space needed * by the message struct. * The allocated message will already have the type and size field set. * @@ -43,19 +43,19 @@ #define GNUNET_MQ_msg_extra(mvar, esize, type) GNUNET_MQ_msg_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), (esize) + sizeof *(mvar), (type)) /** - * Allocate a GNUNET_MQ_Message. - * The allocated message will already have the type and size field set. + * Allocate a GNUNET_MQ_Envelope. + * The contained message will already have the type and size field set. * * @param mvar variable to store the allocated message in; * must have a header field * @param type type of the message - * @return the MQ message + * @return the allocated envelope */ #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) /** - * Allocate a GNUNET_MQ_Message, where the message only consists of a header. + * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header. * The allocated message will already have the type and size field set. * * @param type type of the message @@ -64,7 +64,7 @@ /** - * Allocate a GNUNET_MQ_Message, where the message only consists of a header and extra space. + * Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space. * The allocated message will already have the type and size field set. * * @param mh pointer that will changed to point at to the allocated message header @@ -75,14 +75,14 @@ /** - * Allocate a GNUNET_MQ_Message, and append a payload message after the given + * Allocate a GNUNET_MQ_Envelope, and append a payload message after the given * message struct. * * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, * whose size is 'sizeof(*mvar) + ntohs (mh->size)' * @param type message type of the allocated message, has no effect on the nested message * @param mh message to nest - * @return a newly allocated 'struct GNUNET_MQ_Message *' + * @return a newly allocated 'struct GNUNET_MQ_Envelope *' */ #define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) @@ -98,11 +98,24 @@ #define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) +/** + * Implementation of the GNUNET_MQ_extract_nexted_mh macro. + * + * @param mh message header to extract nested message header from + * @param base_size size of the message before the nested message's header appears + * @return pointer to the nested message, does not copy the message + */ struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); -struct GNUNET_MQ_Message * +/** + * Implementation of the GNUNET_MQ_msg_nested_mh macro. + * + * @param mhp pointer to the message header pointer that will be changed to allocate at + * the newly allocated space for the message. + */ +struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh); @@ -114,9 +127,15 @@ GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} -struct GNUNET_MQ_MessageQueue; +/** + * Opaque handle to a message queue. + */ +struct GNUNET_MQ_Handle; -struct GNUNET_MQ_Message; +/** + * Opaque handle to an envelope. + */ +struct GNUNET_MQ_Envelope; enum GNUNET_MQ_Error { @@ -133,22 +152,45 @@ enum GNUNET_MQ_Error * @param msg the received message */ typedef void -(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); +(*GNUNET_MQ_MessageCallback) (void *cls, + const struct GNUNET_MessageHeader *msg); /** * Signature of functions implementing the - * sending part of a message queue + * sending functionality of a message queue. * - * @param q the message queue - * @param m the message + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +typedef void +(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, + void *impl_state); + + +/** + * Signature of functions implementing the + * destruction of a message queue. + * Implementations must not free 'mq', but should + * take care of 'impl_state'. + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation */ typedef void -(*GNUNET_MQ_SendImpl) (struct GNUNET_MQ_MessageQueue *q, struct GNUNET_MQ_Message *m); +(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state); +/** + * Implementation function that cancels the currently sent message. + * + * @param mq message queue + * @param impl_state state specific to the implementation + */ typedef void -(*GNUNET_MQ_DestroyImpl) (struct GNUNET_MQ_MessageQueue *q); +(*GNUNET_MQ_CancelImpl) (struct GNUNET_MQ_Handle *mq, void *impl_state); /** @@ -160,117 +202,23 @@ typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); -typedef void -(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); - - -struct GNUNET_MQ_Message -{ - /** - * Messages are stored in a linked list - */ - struct GNUNET_MQ_Message *next; - - /** - * Messages are stored in a linked list - */ - struct GNUNET_MQ_Message *prev; - - /** - * Actual allocated message header, - * usually points to the end of the containing GNUNET_MQ_Message - */ - struct GNUNET_MessageHeader *mh; - - /** - * Queue the message is queued in, NULL if message is not queued. - */ - struct GNUNET_MQ_MessageQueue *parent_queue; - - /** - * Called after the message was sent irrevokably - */ - GNUNET_MQ_NotifyCallback sent_cb; - - /** - * Closure for send_cb - */ - void *sent_cls; -}; - - /** - * Handle to a message queue. + * Generic error handler, called with the appropriate + * error code and the same closure specified at the creation of + * the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure, same closure as for the message handlers + * @param error error code */ -struct GNUNET_MQ_MessageQueue -{ - /** - * Handlers array, or NULL if the queue should not receive messages - */ - const struct GNUNET_MQ_Handler *handlers; - - /** - * Closure for the handler callbacks, - * as well as for the error handler. - */ - void *handlers_cls; - - /** - * Actual implementation of message sending, - * called when a message is added - */ - GNUNET_MQ_SendImpl send_impl; - - /** - * Implementation-dependent queue destruction function - */ - GNUNET_MQ_DestroyImpl destroy_impl; - - /** - * Implementation-specific state - */ - void *impl_state; - - /** - * Callback will be called when an error occurs. - */ - GNUNET_MQ_ErrorHandler error_handler; - - /** - * Linked list of messages pending to be sent - */ - struct GNUNET_MQ_Message *msg_head; - - /** - * Linked list of messages pending to be sent - */ - struct GNUNET_MQ_Message *msg_tail; - - /** - * Message that is currently scheduled to be - * sent. Not the head of the message queue, as the implementation - * needs to know if sending has been already scheduled or not. - */ - struct GNUNET_MQ_Message *current_msg; - - /** - * Map of associations, lazily allocated - */ - struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; - - /** - * Next id that should be used for the assoc_map, - * initialized lazily to a random value together with - * assoc_map - */ - uint32_t assoc_id; -}; +typedef void +(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); /** * Message handler for a specific message type. */ -struct GNUNET_MQ_Handler +struct GNUNET_MQ_MessageHandler { /** * Callback, called every time a new message of @@ -296,14 +244,14 @@ struct GNUNET_MQ_Handler /** - * Create a new message for MQ. + * Create a new envelope. * * @param mhp message header to store the allocated message header in, can be NULL * @param size size of the message to allocate * @param type type of the message, will be set in the allocated message * @return the allocated MQ message */ -struct GNUNET_MQ_Message * +struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type); @@ -315,7 +263,7 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) * @param mqm the message to discard */ void -GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); +GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm); /** @@ -326,7 +274,7 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm); * @param mqm the message to send. */ void -GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm); +GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev); /** @@ -336,7 +284,7 @@ GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm * @param mqm queued message to cancel */ void -GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm); +GNUNET_MQ_send_cancel (struct GNUNET_MQ_Envelope *ev); /** @@ -347,9 +295,7 @@ GNUNET_MQ_send_cancel (struct GNUNET_MQ_Message *mqm); * @param assoc_data to associate */ uint32_t -GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, - struct GNUNET_MQ_Message *mqm, - void *assoc_data); +GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data); /** * Get the data associated with a request id in a queue @@ -359,7 +305,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, * @return the associated data */ void * -GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); +GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id); /** @@ -370,7 +316,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); * @return the associated data */ void * -GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); +GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id); @@ -383,9 +329,9 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id); * @param cls closure for the handlers * @return the message queue */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, - const struct GNUNET_MQ_Handler *handlers, + const struct GNUNET_MQ_MessageHandler *handlers, void *cls); @@ -395,7 +341,7 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti * @param client the client * @return the message queue */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); @@ -404,16 +350,19 @@ GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client); * * @param send function the implements sending messages * @param destroy function that implements destroying the queue + * @param destroy function that implements canceling a message * @param state for the queue, passed to 'send' and 'destroy' * @param handlers array of message handlers * @param error_handler handler for read and write errors + * @param cls closure for handlers * @return a new message queue */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, + GNUNET_MQ_CancelImpl cancel, void *impl_state, - struct GNUNET_MQ_Handler *handlers, + const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *cls); @@ -424,27 +373,30 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, * Takes effect immediately, even for messages that already have been received, but for * with the handler has not been called. * + * If the message queue does not support receiving messages, + * this function has no effect. + * * @param mq message queue * @param new_handlers new handlers * @param cls new closure for the handlers */ void -GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, - const struct GNUNET_MQ_Handler *new_handlers, +GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MQ_MessageHandler *new_handlers, void *cls); /** - * Call a callback once the message has been sent, that is, the message - * can not be canceled anymore. - * There can be only one notify sent callback per message. + * Call a callback once the envelope has been sent, that is, + * sending it can not be canceled anymore. + * There can be only one notify sent callback per envelope. * - * @param mqm message to call the notify callback for + * @param ev message to call the notify callback for * @param cb the notify callback * @param cls closure for the callback */ void -GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, +GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *ev, GNUNET_MQ_NotifyCallback cb, void *cls); @@ -455,7 +407,7 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, * @param mq message queue to destroy */ void -GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); +GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq); /** @@ -465,7 +417,70 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq); * @param mh message to dispatch */ void -GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, - const struct GNUNET_MessageHeader *mh); +GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *mh); + + +/** + * Call the right callback for an error condition. + * + * @param mq message queue + */ +void +GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, + enum GNUNET_MQ_Error error); + + +/** + * Call the send implementation for the next queued message, + * if any. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue to send the next message with + */ +void +GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq); + + +/** + * Get the message that should currently be sent. + * Fails if there is no current message. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue with the current message + * @return message to send, never NULL + */ +const struct GNUNET_MessageHeader * +GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq); + + +/** + * Get the implementation state associated with the + * message queue. + * + * While the GNUNET_MQ_Impl* callbacks receive the + * implementation state, continuations that are scheduled + * by the implementation function often only have one closure + * argument, with this function it is possible to get at the + * implementation state when only passing the GNUNET_MQ_Handle + * as closure. + * + * @param mq message queue with the current message + * @return message to send, never NULL + */ +void * +GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq); + +/** + * Mark the current message as irrevocably sent, but do not + * proceed with sending the next message. + * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. + * + * @param mq message queue + */ +void +GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq); #endif diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index 34c9312d1..e08ed5d69 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h @@ -257,9 +257,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); /** - * Create a set operation for evaluation with another peer. + * Prepare a set operation to be evaluated with another peer. * The evaluation will not start until the client provides - * a local set with GNUNET_SET_conclude. + * a local set with GNUNET_SET_commit. * * @param other_peer peer with the other set * @param app_id hash for the application using the set @@ -273,14 +273,14 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); * @param result_cls closure for result_cb * @return a handle to cancel the operation */ -struct GNUNET_SET_OperationHandle * // FIXME: rename to _connect? -GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, - const struct GNUNET_HashCode *app_id, - const struct GNUNET_MessageHeader *context_msg, - uint16_t salt, - enum GNUNET_SET_ResultMode result_mode, - GNUNET_SET_ResultIterator result_cb, - void *result_cls); +struct GNUNET_SET_OperationHandle * +GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + uint16_t salt, + enum GNUNET_SET_ResultMode result_mode, + GNUNET_SET_ResultIterator result_cb, + void *result_cls); /** @@ -316,7 +316,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); * Accept a request we got via GNUNET_SET_listen. Must be called during * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid * afterwards. - * Call GNUNET_SET_conclude to provide the local set to use for the operation, + * Call GNUNET_SET_commit to provide the local set to use for the operation, * and to begin the exchange with the remote peer. * * @param request request to accept @@ -334,7 +334,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, /** - * Conclude the given set operation using the given set. + * Commit a set to be used with a set operation. * This function is called once we have fully constructed * the set that we want to use for the operation. At this * time, the P2P protocol can then begin to exchange the @@ -344,9 +344,9 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, * @param oh handle to the set operation * @param set the set to use for the operation */ -void // FIXME: rename to _commit -GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, - struct GNUNET_SET_Handle *set); +void +GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, + struct GNUNET_SET_Handle *set); /** diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index ece60c033..65e247ece 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -403,9 +403,9 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh); * @param error_handler callback for errors * @return the message queue for the socket */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_MQ_Handler *msg_handlers, + const struct GNUNET_MQ_MessageHandler *msg_handlers, GNUNET_MQ_ErrorHandler error_handler, void *cls); diff --git a/src/mesh/mesh2_api.c b/src/mesh/mesh2_api.c index 0e4c3b8e6..11ff743ca 100644 --- a/src/mesh/mesh2_api.c +++ b/src/mesh/mesh2_api.c @@ -320,6 +320,24 @@ struct GNUNET_MESH_Tunnel }; +/** + * Implementation state for mesh's message queue. + */ +struct MeshMQState +{ + /** + * The current transmit handle, or NULL + * if no transmit is active. + */ + struct GNUNET_MESH_TransmitHandle *th; + + /** + * Tunnel to send the data over. + */ + struct GNUNET_MESH_Tunnel *tunnel; +}; + + /******************************************************************************/ /*********************** DECLARATIONS *************************/ /******************************************************************************/ @@ -1685,4 +1703,115 @@ GNUNET_MESH_show_tunnel (struct GNUNET_MESH_Handle *h, h->tunnel_cls = callback_cls; return; -} \ No newline at end of file +} + + +/** + * Function called to notify a client about the connection + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the connection was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +mesh_mq_ntr (void *cls, size_t size, + void *buf) +{ + struct GNUNET_MQ_Handle *mq = cls; + struct MeshMQState *state = GNUNET_MQ_impl_state (mq); + const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); + uint16_t msize; + + state->th = NULL; + if (NULL == buf) + { + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); + return 0; + } + msize = ntohs (msg->size); + GNUNET_assert (msize <= size); + memcpy (buf, msg, msize); + GNUNET_MQ_impl_send_continue (mq); + return msize; +} + + +/** + * Signature of functions implementing the + * sending functionality of a message queue. + * + * @param mq the message queue + * @param msg the message to send + * @param impl_state state of the implementation + */ +static void +mesh_mq_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, void *impl_state) +{ + struct MeshMQState *state = impl_state; + + GNUNET_assert (NULL == state->th); + GNUNET_MQ_impl_send_commit (mq); + state->th = + GNUNET_MESH_notify_transmit_ready (state->tunnel, + /* FIXME: add option for corking */ + GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + ntohs (msg->size), + mesh_mq_ntr, mq); + +} + + +/** + * Signature of functions implementing the + * destruction of a message queue. + * Implementations must not free 'mq', but should + * take care of 'impl_state'. + * + * @param mq the message queue to destroy + * @param impl_state state of the implementation + */ +static void +mesh_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) +{ + struct MeshMQState *state = impl_state; + + if (NULL != state->th) + GNUNET_MESH_notify_transmit_ready_cancel (state->th); + + GNUNET_free (state); +} + + +/** + * Create a message queue for a mesh tunnel. + * The message queue can only be used to transmit messages, + * not to receive them. + * + * @param tunnel the tunnel to create the message qeue for + * @return a message queue to messages over the tunnel + */ +struct GNUNET_MQ_Handle * +GNUNET_MESH_mq_create (struct GNUNET_MESH_Tunnel *tunnel) +{ + struct GNUNET_MQ_Handle *mq; + struct MeshMQState *state; + + state = GNUNET_new (struct MeshMQState); + state->tunnel = tunnel; + + mq = GNUNET_MQ_queue_for_callbacks (mesh_mq_send_impl, + mesh_mq_destroy_impl, + NULL, /* FIXME: cancel impl. */ + state, + NULL, /* no msg handlers */ + NULL, /* no err handlers */ + NULL); /* no handler cls */ + return mq; +} + diff --git a/src/set/Makefile.am b/src/set/Makefile.am index 71e71c867..c2449e0ea 100644 --- a/src/set/Makefile.am +++ b/src/set/Makefile.am @@ -16,7 +16,7 @@ if USE_COVERAGE endif bin_PROGRAMS = \ - gnunet-set + gnunet-set-profiler gnunet-set-ibf-profiler libexec_PROGRAMS = \ gnunet-service-set @@ -24,17 +24,24 @@ libexec_PROGRAMS = \ lib_LTLIBRARIES = \ libgnunetset.la -gnunet_set_SOURCES = \ - gnunet-set.c -gnunet_set_LDADD = \ +gnunet_set_profiler_SOURCES = \ + gnunet-set-profiler.c +gnunet_set_profiler_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/set/libgnunetset.la \ - $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/testbed/libgnunettestbed.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ $(GN_LIBINTL) -gnunet_set_DEPENDENCIES = \ +gnunet_set_profiler_DEPENDENCIES = \ libgnunetset.la + +gnunet_set_ibf_profiler_SOURCES = \ + gnunet-set-ibf-profiler.c \ + ibf.c +gnunet_set_ibf_profiler_LDADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(GN_LIBINTL) + gnunet_service_set_SOURCES = \ gnunet-service-set.c \ gnunet-service-set_union.c \ @@ -43,8 +50,7 @@ gnunet_service_set_SOURCES = \ gnunet_service_set_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ - $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/mesh/libgnunetmesh.la \ + $(top_builddir)/src/mesh/libgnunetmesh2.la \ $(GN_LIBINTL) libgnunetset_la_SOURCES = \ diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index d2f0b48d5..bd934de84 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -29,13 +29,16 @@ /** * Configuration of our local peer. + * (Not declared 'static' as also needed in gnunet-service-set_union.c) */ const struct GNUNET_CONFIGURATION_Handle *configuration; /** - * Socket listening for other peers via stream. + * Handle to the mesh service, used + * to listen for and connect to remote peers. + * (Not declared 'static' as also needed in gnunet-service-set_union.c) */ -static struct GNUNET_STREAM_ListenSocket *stream_listen_socket; +struct GNUNET_MESH_Handle *mesh; /** * Sets are held in a doubly linked list. @@ -78,14 +81,14 @@ static uint32_t accept_id = 1; /** - * Get set that is owned by the client, if any. + * Get set that is owned by the given client, if any. * * @param client client to look for * @return set that the client owns, NULL if the client * does not own a set */ static struct Set * -get_set (struct GNUNET_SERVER_Client *client) +set_get (struct GNUNET_SERVER_Client *client) { struct Set *set; for (set = sets_head; NULL != set; set = set->next) @@ -137,7 +140,7 @@ get_incoming (uint32_t id) * @param listener listener to destroy */ static void -destroy_listener (struct Listener *listener) +listener_destroy (struct Listener *listener) { if (NULL != listener->client_mq) { @@ -155,7 +158,7 @@ destroy_listener (struct Listener *listener) * @param set the set to destroy */ static void -destroy_set (struct Set *set) +set_destroy (struct Set *set) { switch (set->operation) { @@ -187,12 +190,12 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) struct Set *set; struct Listener *listener; - set = get_set (client); + set = set_get (client); if (NULL != set) - destroy_set (set); + set_destroy (set); listener = get_listener (client); if (NULL != listener) - destroy_listener (listener); + listener_destroy (listener); } @@ -202,17 +205,14 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) * @param incoming remote request to destroy */ static void -destroy_incoming (struct Incoming *incoming) +incoming_destroy (struct Incoming *incoming) { - if (NULL != incoming->mq) + if (NULL != incoming->tc) { - GNUNET_MQ_destroy (incoming->mq); - incoming->mq = NULL; - } - if (NULL != incoming->socket) - { - GNUNET_STREAM_close (incoming->socket); - incoming->socket = NULL; + GNUNET_free (incoming->tc); + GNUNET_assert (NULL != incoming->tc->tunnel); + GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel); + incoming->tc = NULL; } GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); GNUNET_free (incoming); @@ -237,6 +237,15 @@ get_listener_by_target (enum GNUNET_SET_OperationType op, } + +static void +tunnel_context_destroy (struct TunnelContext *tc) +{ + GNUNET_free (tc); + /* FIXME destroy the rest */ +} + + /** * Handle a request for a set operation from * another peer. @@ -244,16 +253,31 @@ get_listener_by_target (enum GNUNET_SET_OperationType op, * @param cls the incoming socket * @param mh the message */ -static void -handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) +static int +handle_p2p_operation_request (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *mh) { - struct Incoming *incoming = cls; + struct TunnelContext *tc = *tunnel_ctx; + struct Incoming *incoming; const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_RequestMessage *cmsg; struct Listener *listener; const struct GNUNET_MessageHeader *context_msg; + if (CONTEXT_INCOMING != tc->type) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected operation request\n"); + tunnel_context_destroy (tc); + /* don't kill the whole mesh connection */ + return GNUNET_OK; + } + + incoming = tc->data; + context_msg = GNUNET_MQ_extract_nested_mh (msg); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); @@ -263,20 +287,26 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "set operation request from peer failed: " "no set with matching application ID and operation type\n"); - return; + tunnel_context_destroy (tc); + /* don't kill the whole mesh connection */ + return GNUNET_OK; } mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); if (NULL == mqm) { /* FIXME: disconnect the peer */ GNUNET_break_op (0); - return; + tunnel_context_destroy (tc); + /* don't kill the whole mesh connection */ + return GNUNET_OK; } incoming->accept_id = accept_id++; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); cmsg->accept_id = htonl (incoming->accept_id); - cmsg->peer_id = incoming->peer; + cmsg->peer_id = incoming->tc->peer; GNUNET_MQ_send (listener->client_mq, mqm); + + return GNUNET_OK; } @@ -298,7 +328,7 @@ handle_client_create (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", ntohs (msg->operation)); - if (NULL != get_set (client)) + if (NULL != set_get (client)) { GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); @@ -379,7 +409,7 @@ handle_client_remove (void *cls, { struct Set *set; - set = get_set (client); + set = set_get (client); if (NULL == set) { GNUNET_break (0); @@ -428,7 +458,7 @@ handle_client_reject (void *cls, return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); - destroy_incoming (incoming); + incoming_destroy (incoming); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -449,7 +479,7 @@ handle_client_add (void *cls, { struct Set *set; - set = get_set (client); + set = set_get (client); if (NULL == set) { GNUNET_break (0); @@ -486,7 +516,7 @@ handle_client_evaluate (void *cls, { struct Set *set; - set = get_set (client); + set = set_get (client); if (NULL == set) { GNUNET_break (0); @@ -558,8 +588,7 @@ handle_client_accept (void *cls, return; } - - set = get_set (client); + set = set_get (client); if (NULL == set) { @@ -584,50 +613,11 @@ handle_client_accept (void *cls, /* note: _GSS_*_accept has to make sure the socket and mq are set to NULL, * otherwise they will be destroyed and disconnected */ - destroy_incoming (incoming); + incoming_destroy (incoming); GNUNET_SERVER_receive_done (client, GNUNET_OK); } -/** - * Functions of this type are called upon new stream connection from other peers - * or upon binding error which happen when the app_port given in - * GNUNET_STREAM_listen() is already taken. - * - * @param cls the closure from GNUNET_STREAM_listen - * @param socket the socket representing the stream; NULL on binding error - * @param initiator the identity of the peer who wants to establish a stream - * with us; NULL on binding error - * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the - * stream (the socket will be invalid after the call) - */ -static int -stream_listen_cb (void *cls, - struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_PeerIdentity *initiator) -{ - struct Incoming *incoming; - static const struct GNUNET_MQ_Handler handlers[] = { - {handle_p2p_operation_request, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST}, - GNUNET_MQ_HANDLERS_END - }; - - if (NULL == socket) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - - incoming = GNUNET_new (struct Incoming); - incoming->peer = *initiator; - incoming->socket = socket; - incoming->mq = GNUNET_STREAM_mq_create (incoming->socket, handlers, NULL, incoming); - /* FIXME: timeout for peers that only connect but don't send anything */ - GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); - return GNUNET_OK; -} - - /** * Called to clean up, after a shutdown has been requested. * @@ -638,31 +628,126 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - if (NULL != stream_listen_socket) + if (NULL != mesh) { - GNUNET_STREAM_listen_close (stream_listen_socket); - stream_listen_socket = NULL; + GNUNET_MESH_disconnect (mesh); + mesh = NULL; } while (NULL != incoming_head) { - destroy_incoming (incoming_head); + incoming_destroy (incoming_head); } while (NULL != listeners_head) { - destroy_listener (listeners_head); + listener_destroy (listeners_head); } while (NULL != sets_head) { - destroy_set (sets_head); + set_destroy (sets_head); } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n"); } + +/** + * Signature of the main function of a task. + * + * @param cls closure + * @param tc context information (why was this task triggered now) + */ +static void +incoming_timeout_cb (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct Incoming *incoming = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out"); + incoming_destroy (incoming); +} + + +/** + * Method called whenever another peer has added us to a tunnel + * the other peer initiated. + * Only called (once) upon reception of data with a message type which was + * subscribed to in GNUNET_MESH_connect. A call to GNUNET_MESH_tunnel_destroy + * causes te tunnel to be ignored and no further notifications are sent about + * the same tunnel. + * + * @param cls closure + * @param tunnel new handle to the tunnel + * @param initiator peer that started the tunnel + * @param port Port this tunnel is for. + * @return initial tunnel context for the tunnel + * (can be NULL -- that's not an error) + */ +static void * +tunnel_new_cb (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *initiator, + uint32_t port) +{ + struct Incoming *incoming; + struct TunnelContext *tc; + + GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); + tc = GNUNET_new (struct TunnelContext); + incoming = GNUNET_new (struct Incoming); + incoming->tc = tc; + tc->peer = *initiator; + tc->tunnel = tunnel; + tc->mq = GNUNET_MESH_mq_create (tunnel); + tc->data = incoming; + tc->type = CONTEXT_INCOMING; + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); + GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming); + + return tc; +} + + +/** + * Function called whenever a tunnel is destroyed. Should clean up + * any associated state. This function is NOT called if the client has + * explicitly asked for the tunnel to be destroyed using + * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on + * the tunnel. + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end (henceforth invalid) + * @param tunnel_ctx place where local state associated + * with the tunnel is stored + */ +static void +tunnel_end_cb (void *cls, + const struct GNUNET_MESH_Tunnel *tunnel, void *tunnel_ctx) +{ + struct TunnelContext *ctx = tunnel_ctx; + + switch (ctx->type) + { + case CONTEXT_INCOMING: + incoming_destroy ((struct Incoming *) ctx->data); + break; + case CONTEXT_OPERATION_UNION: + _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data); + break; + case CONTEXT_OPERATION_INTERSECTION: + GNUNET_assert (0); + /* FIXME: cfuchs */ + break; + default: + GNUNET_assert (0); + } + +} + + /** * Function called by the service's run * method to run service-specific setup code. @@ -686,16 +771,40 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, {NULL, NULL, 0, 0} }; + static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = { + {handle_p2p_operation_request, + GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0}, + /* messages for the union operation */ + {_GSS_union_handle_p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0}, + {_GSS_union_handle_p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0}, + {_GSS_union_handle_p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0}, + {_GSS_union_handle_p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0}, + {_GSS_union_handle_p2p_message, + GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0}, + /* FIXME: messages for intersection operation */ + {NULL, 0, 0} + }; + static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0}; configuration = cfg; - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, NULL); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); GNUNET_SERVER_add_handlers (server, server_handlers); - stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, - &stream_listen_cb, NULL, - GNUNET_STREAM_OPTION_END); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); + mesh = GNUNET_MESH_connect (cfg, NULL, tunnel_new_cb, tunnel_end_cb, + mesh_handlers, mesh_ports); + if (NULL == mesh) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not connect to mesh\n"); + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n"); } @@ -710,7 +819,8 @@ int main (int argc, char *const *argv) { int ret; - ret = GNUNET_SERVICE_run (argc, argv, "set", GNUNET_SERVICE_OPTION_NONE, &run, NULL); + ret = GNUNET_SERVICE_run (argc, argv, "set", + GNUNET_SERVICE_OPTION_NONE, &run, NULL); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n"); return (GNUNET_OK == ret) ? 0 : 1; } diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 15199eba4..66bff4ff1 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -33,7 +33,7 @@ #include "gnunet_applications.h" #include "gnunet_util_lib.h" #include "gnunet_core_service.h" -#include "gnunet_stream_lib.h" +#include "gnunet_mesh2_service.h" #include "gnunet_set_service.h" #include "set.h" @@ -47,6 +47,8 @@ struct IntersectionState; */ struct UnionState; +struct UnionEvaluateOperation; + /** * A set that supports a specific operation @@ -63,7 +65,7 @@ struct Set /** * Message queue for the client */ - struct GNUNET_MQ_MessageQueue *client_mq; + struct GNUNET_MQ_Handle *client_mq; /** * Type of operation supported for this set @@ -116,7 +118,7 @@ struct Listener /** * Message queue for the client */ - struct GNUNET_MQ_MessageQueue *client_mq; + struct GNUNET_MQ_Handle *client_mq; /** * Type of operation supported for this set @@ -148,19 +150,17 @@ struct Incoming struct Incoming *prev; /** - * Identity of the peer that connected to us + * Tunnel context, stores information about + * the tunnel and its peer. */ - struct GNUNET_PeerIdentity peer; + struct TunnelContext *tc; /** - * Socket connected to the peer + * GNUNET_YES if the incoming peer has sent + * an operation request (and we are waiting + * for the client to ack/nack), GNUNET_NO otherwise. */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Message queue for the peer - */ - struct GNUNET_MQ_MessageQueue *mq; + int received_request; /** * App code, set once the peer has @@ -187,18 +187,37 @@ struct Incoming /** * Unique request id for the request from - * a remote peer, sent to the client with will + * a remote peer, sent to the client, which will * accept or reject the request. */ uint32_t accept_id; }; +enum TunnelContextType { + CONTEXT_INCOMING, + CONTEXT_OPERATION_UNION, + CONTEXT_OPERATION_INTERSECTION, +}; + +struct TunnelContext +{ + struct GNUNET_MESH_Tunnel *tunnel; + struct GNUNET_PeerIdentity peer; + struct GNUNET_MQ_Handle *mq; + enum TunnelContextType type; + void *data; +}; + + + /** * Configuration of the local peer */ extern const struct GNUNET_CONFIGURATION_Handle *configuration; +extern struct GNUNET_MESH_Handle *mesh; + /** * Create a new set supporting the union operation @@ -262,4 +281,32 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, struct Incoming *incoming); +/** + * Destroy a union operation, and free all resources + * associated with it. + * + * @param eo the union operation to destroy + */ +void +_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo); + + +/** + * Dispatch messages for a union operation. + * + * @param cls closure + * @param tunnel mesh tunnel + * @param tunnel_ctx tunnel context + * @param sender ??? + * @param mh message to process + * @return ??? + */ +int +_GSS_union_handle_p2p_message (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *mh); + + #endif diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 6d9658ee5..2b7a0ccba 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -124,15 +124,10 @@ struct UnionEvaluateOperation struct GNUNET_MessageHeader *context_msg; /** - * Stream socket connected to the other peer + * Tunnel context for the peer we + * evaluate the union operation with. */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Message queue for the peer on the other - * end - */ - struct GNUNET_MQ_MessageQueue *mq; + struct TunnelContext *tc; /** * Request ID to multiplex set operations to @@ -397,22 +392,19 @@ destroy_key_to_element_iter (void *cls, * * @param eo the union operation to destroy */ -static void -destroy_union_operation (struct UnionEvaluateOperation *eo) +void +_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); - if (NULL != eo->mq) + if (NULL != eo->tc) { - GNUNET_MQ_destroy (eo->mq); - eo->mq = NULL; + GNUNET_MQ_destroy (eo->tc->mq); + GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); + GNUNET_free (eo->tc); + eo->tc = NULL; } - if (NULL != eo->socket) - { - GNUNET_STREAM_close (eo->socket); - eo->socket = NULL; - } if (NULL != eo->remote_ibf) { ibf_destroy (eo->remote_ibf); @@ -457,14 +449,14 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) static void fail_union_operation (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ResultMessage *msg; mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); msg->request_id = htonl (eo->request_id); GNUNET_MQ_send (eo->set->client_mq, mqm); - destroy_union_operation (eo); + _GSS_union_operation_destroy (eo); } @@ -498,7 +490,7 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) static void send_operation_request (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct OperationRequestMessage *msg; mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); @@ -512,7 +504,7 @@ send_operation_request (struct UnionEvaluateOperation *eo) } msg->operation = htons (GNUNET_SET_OPERATION_UNION); msg->app_id = eo->app_id; - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); if (NULL != eo->context_msg) { @@ -562,7 +554,7 @@ insert_element_iterator (void *cls, * Insert an element into the union operation's * key-to-element mapping * - * @param the union operation + * @param eo the union operation * @param ee the element entry */ static void @@ -685,7 +677,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) while (buckets_sent < (1 << ibf_order)) { unsigned int buckets_in_message; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct IBFMessage *msg; buckets_in_message = (1 << ibf_order) - buckets_sent; @@ -700,7 +692,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); } eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; @@ -715,14 +707,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) static void send_strata_estimator (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_MessageHeader *strata_msg; mqm = GNUNET_MQ_msg_header_extra (strata_msg, SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, GNUNET_MESSAGE_TYPE_SET_P2P_SE); strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); eo->phase = PHASE_EXPECT_IBF; } @@ -751,7 +743,7 @@ get_order_from_difference (unsigned int diff) /** * Handle a strata estimator from a remote peer * - * @param the union operation + * @param cls the union operation * @param mh the message */ static void @@ -804,7 +796,7 @@ send_element_iterator (void *cls, while (NULL != ke) { const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_MessageHeader *mh; GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); @@ -817,7 +809,7 @@ send_element_iterator (void *cls, } memcpy (&mh[1], element->data, element->size); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); ke = ke->next_colliding; } return GNUNET_NO; @@ -889,11 +881,11 @@ decode_and_send (struct UnionEvaluateOperation *eo) } if (GNUNET_NO == res) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); break; } if (1 == side) @@ -902,7 +894,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) } else { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_MessageHeader *msg; /* FIXME: before sending the request, check if we may just have the element */ @@ -910,7 +902,7 @@ decode_and_send (struct UnionEvaluateOperation *eo) mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); *(struct IBF_Key *) &msg[1] = key; - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); } } ibf_destroy (diff_ibf); @@ -987,7 +979,7 @@ static void send_client_element (struct UnionEvaluateOperation *eo, struct GNUNET_SET_Element *element) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ResultMessage *rm; GNUNET_assert (0 != eo->request_id); @@ -1005,27 +997,6 @@ send_client_element (struct UnionEvaluateOperation *eo, } -/** - * Completion callback for shutdown - * - * @param cls the closure from GNUNET_STREAM_shutdown call - * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, - * SHUT_RDWR) - */ -/* -static void -stream_shutdown_cb (void *cls, - int operation) -{ - //struct UnionEvaluateOperation *eo = cls; - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); - - // destroy_union_operation (eo); -} -*/ - - /** * Send a result message to the client indicating * that the operation is over. @@ -1033,12 +1004,11 @@ stream_shutdown_cb (void *cls, * destroy the evaluate operation. * * @param eo union operation - * @param element element to send */ static void send_client_done_and_destroy (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ResultMessage *rm; GNUNET_assert (0 != eo->request_id); @@ -1047,7 +1017,6 @@ send_client_done_and_destroy (struct UnionEvaluateOperation *eo) rm->result_status = htons (GNUNET_SET_STATUS_DONE); GNUNET_MQ_send (eo->set->client_mq, mqm); - // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); } @@ -1153,13 +1122,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) { /* we got all requests, but still have to send our elements as response */ - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); eo->phase = PHASE_FINISHED; mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); - GNUNET_MQ_send (eo->mq, mqm); + GNUNET_MQ_send (eo->tc->mq, mqm); return; } if (eo->phase == PHASE_EXPECT_ELEMENTS) @@ -1174,49 +1143,12 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) } -/** - * The handlers array, used for both evaluate and accept - */ -static const struct GNUNET_MQ_Handler union_handlers[] = { - {handle_p2p_elements, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS}, - {handle_p2p_strata_estimator, GNUNET_MESSAGE_TYPE_SET_P2P_SE}, - {handle_p2p_ibf, GNUNET_MESSAGE_TYPE_SET_P2P_IBF}, - {handle_p2p_element_requests, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS}, - {handle_p2p_done, GNUNET_MESSAGE_TYPE_SET_P2P_DONE}, - GNUNET_MQ_HANDLERS_END -}; - - -/** - * Functions of this type will be called when a stream is established - * - * @param cls the closure from GNUNET_STREAM_open - * @param socket socket to use to communicate with the - * other side (read/write) - */ -static void -stream_open_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct UnionEvaluateOperation *eo = cls; - - GNUNET_assert (NULL == eo->mq); - GNUNET_assert (socket == eo->socket); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "open cb successful\n"); - eo->mq = GNUNET_STREAM_mq_create (eo->socket, union_handlers, NULL, eo); - /* we started the operation, thus we have to send the operation request */ - send_operation_request (eo); - eo->phase = PHASE_EXPECT_SE; -} - - /** * Evaluate a union operation with * a remote peer. * * @param m the evaluate request message from the client - * @parem set the set to evaluate the operation with + * @param set the set to evaluate the operation with */ void _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) @@ -1243,14 +1175,20 @@ _GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) "evaluating union operation, (app %s)\n", GNUNET_h2s (&eo->app_id)); - eo->socket = - GNUNET_STREAM_open (configuration, &eo->peer, GNUNET_APPLICATION_TYPE_SET, - &stream_open_cb, eo, - GNUNET_STREAM_OPTION_END); + eo->tc = GNUNET_new (struct TunnelContext); + eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer, + GNUNET_APPLICATION_TYPE_SET); + GNUNET_assert (NULL != eo->tc->tunnel); + eo->tc->peer = eo->peer; + eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel); + /* we started the operation, thus we have to send the operation request */ + eo->phase = PHASE_EXPECT_SE; + GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, eo->set->state.u->ops_tail, eo); - /* the stream open callback will kick off the operation */ + + send_operation_request (eo); } @@ -1270,25 +1208,17 @@ _GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); eo = GNUNET_new (struct UnionEvaluateOperation); + eo->tc = incoming->tc; eo->generation_created = set->state.u->current_generation++; eo->set = set; - eo->peer = incoming->peer; eo->salt = ntohs (incoming->salt); GNUNET_assert (0 != ntohl (m->request_id)); eo->request_id = ntohl (m->request_id); eo->se = strata_estimator_dup (set->state.u->se); - eo->mq = incoming->mq; /* transfer ownership of mq and socket from incoming to eo */ - incoming->mq = NULL; - eo->socket = incoming->socket; - incoming->socket = NULL; - /* the peer's socket is now ours, we'll receive all messages */ - GNUNET_MQ_replace_handlers (eo->mq, union_handlers, eo); - GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, eo->set->state.u->ops_tail, eo); - /* kick off the operation */ send_strata_estimator (eo); } @@ -1384,7 +1314,7 @@ _GSS_union_set_destroy (struct Set *set) while (NULL != set->state.u->ops_head) { - destroy_union_operation (set->state.u->ops_head); + _GSS_union_operation_destroy (set->state.u->ops_head); } } @@ -1418,3 +1348,57 @@ _GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) ee->generation_removed = set->state.u->current_generation; } + +/** + * Dispatch messages for a union operation. + * + * @param cls closure + * @param tunnel mesh tunnel + * @param tunnel_ctx tunnel context + * @param sender ??? + * @param mh message to process + * @return ??? + */ +int +_GSS_union_handle_p2p_message (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *mh) +{ + struct TunnelContext *tc = *tunnel_ctx; + struct UnionEvaluateOperation *eo; + + if (CONTEXT_OPERATION_UNION != tc->type) + { + /* FIXME: kill the tunnel */ + /* never kill mesh */ + return GNUNET_OK; + } + + eo = tc->data; + + switch (ntohs (mh->type)) + { + case GNUNET_MESSAGE_TYPE_SET_P2P_IBF: + handle_p2p_ibf (eo, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_P2P_SE: + handle_p2p_strata_estimator (eo, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS: + handle_p2p_elements (eo, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS: + handle_p2p_element_requests (eo, mh); + break; + case GNUNET_MESSAGE_TYPE_SET_P2P_DONE: + handle_p2p_done (eo, mh); + break; + default: + /* something wrong with mesh's message handlers? */ + GNUNET_assert (0); + } + /* never kill mesh! */ + return GNUNET_OK; +} diff --git a/src/set/gnunet-set-ibf.c b/src/set/gnunet-set-ibf-profiler.c similarity index 92% rename from src/set/gnunet-set-ibf.c rename to src/set/gnunet-set-ibf-profiler.c index d431795f1..92feb3db4 100644 --- a/src/set/gnunet-set-ibf.c +++ b/src/set/gnunet-set-ibf-profiler.c @@ -19,8 +19,8 @@ */ /** - * @file consensus/gnunet-consensus-ibf.c - * @brief tool for reconciling data with invertible bloom filters + * @file set/gnunet-set-ibf-profiler.c + * @brief tool for profiling the invertible bloom filter implementation * @author Florian Dold */ @@ -35,7 +35,7 @@ static unsigned int asize = 10; static unsigned int bsize = 10; static unsigned int csize = 10; -static unsigned int hash_num = 3; +static unsigned int hash_num = 4; static unsigned int ibf_size = 80; /* FIXME: add parameter for this */ @@ -181,12 +181,14 @@ run (void *cls, char *const *args, const char *cfgfile, start_time = GNUNET_TIME_absolute_get (); - for (;;) + for (i = 0; i <= asize + bsize; i++) { res = ibf_decode (ibf_a, &side, &ibf_key); if (GNUNET_SYSERR == res) { - printf ("decode failed\n"); + printf ("decode failed, %u/%u elements left\n", + GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b), + asize + bsize); return; } if (GNUNET_NO == res) @@ -198,7 +200,9 @@ run (void *cls, char *const *args, const char *cfgfile, printf ("decoded successfully in: %s\n", GNUNET_STRINGS_relative_time_to_string (delta_time, GNUNET_NO)); } else - printf ("decode missed elements\n"); + { + printf ("decode missed elements (should never happen)\n"); + } return; } @@ -207,6 +211,9 @@ run (void *cls, char *const *args, const char *cfgfile, if (side == -1) iter_hashcodes (ibf_key, remove_iterator, set_b); } + printf("cyclic IBF, %u/%u elements left\n", + GNUNET_CONTAINER_multihashmap_size (set_a) + GNUNET_CONTAINER_multihashmap_size (set_b), + asize + bsize); } int diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c new file mode 100644 index 000000000..bbaef7c43 --- /dev/null +++ b/src/set/gnunet-set-profiler.c @@ -0,0 +1,320 @@ +/* + This file is part of GNUnet + (C) 2013 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. + */ + +/** + * @file set/gnunet-set-profiler.c + * @brief profiling tool for set + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_common.h" +#include "gnunet_util_lib.h" +#include "gnunet_set_service.h" +#include "gnunet_testbed_service.h" + + +static int ret; + +static unsigned int num_a = 5; +static unsigned int num_b = 5; +static unsigned int num_c = 20; + +static unsigned int salt = 42; + +static char* op_str = "union"; + +const static struct GNUNET_CONFIGURATION_Handle *config; + +struct GNUNET_CONTAINER_MultiHashMap *map_a; +struct GNUNET_CONTAINER_MultiHashMap *map_b; +struct GNUNET_CONTAINER_MultiHashMap *map_c; + + +/** + * Elements that set a received, should match map_c + * in the end. + */ +struct GNUNET_CONTAINER_MultiHashMap *map_a_received; + +/** + * Elements that set b received, should match map_c + * in the end. + */ +struct GNUNET_CONTAINER_MultiHashMap *map_b_received; + +struct GNUNET_SET_Handle *set_a; +struct GNUNET_SET_Handle *set_b; + +struct GNUNET_HashCode app_id; + +struct GNUNET_PeerIdentity local_peer; + +struct GNUNET_SET_ListenHandle *set_listener; + +struct GNUNET_SET_OperationHandle *set_oh1; +struct GNUNET_SET_OperationHandle *set_oh2; + + +int a_done; +int b_done; + + + +static int +map_remove_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_CONTAINER_MultiHashMap *m = cls; + int ret; + + ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL); + GNUNET_assert (GNUNET_OK == ret); + return GNUNET_YES; + +} + + +static void +set_result_cb_1 (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + GNUNET_assert (GNUNET_NO == a_done); + GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); + switch (status) + { + case GNUNET_SET_STATUS_DONE: + case GNUNET_SET_STATUS_HALF_DONE: + a_done = GNUNET_YES; + GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_a_received); + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received)); + return; + case GNUNET_SET_STATUS_FAILURE: + GNUNET_assert (0); + return; + case GNUNET_SET_STATUS_OK: + break; + default: + GNUNET_assert (0); + } + GNUNET_CONTAINER_multihashmap_put (map_a_received, + element->data, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); +} + + +static void +set_result_cb_2 (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) +{ + GNUNET_assert (GNUNET_NO == b_done); + GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode)); + switch (status) + { + case GNUNET_SET_STATUS_DONE: + case GNUNET_SET_STATUS_HALF_DONE: + b_done = GNUNET_YES; + GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, map_b_received); + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received)); + return; + case GNUNET_SET_STATUS_FAILURE: + GNUNET_assert (0); + return; + case GNUNET_SET_STATUS_OK: + break; + default: + GNUNET_assert (0); + } + GNUNET_CONTAINER_multihashmap_put (map_b_received, + element->data, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); +} + + +static void +set_listen_cb (void *cls, + const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_MessageHeader *context_msg, + struct GNUNET_SET_Request *request) +{ + GNUNET_assert (NULL == set_oh2); + set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, + set_result_cb_2, NULL); + GNUNET_SET_commit (set_oh2, set_b); +} + + + +static int +set_insert_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_SET_Handle *set = cls; + struct GNUNET_SET_Element *el; + + el = GNUNET_malloc (sizeof *el + sizeof *key); + el->type = 0; + memcpy (&el[1], key, sizeof *key); + el->data = &el[1]; + el->size = sizeof *key; + GNUNET_SET_add_element (set, el, NULL, NULL); + GNUNET_free (el); + return GNUNET_YES; +} + + +/** + * Signature of the 'main' function for a (single-peer) testcase that + * is run using 'GNUNET_TESTING_peer_run'. + * + * @param cls closure + * @param cfg configuration of the peer that was started + * @param peer identity of the peer that was created + */ +static void +test_main (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +{ + unsigned int i; + struct GNUNET_HashCode hash; + + config = cfg; + + if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer)) + { + GNUNET_assert (0); + return; + } + + map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO); + map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO); + map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO); + + for (i = 0; i < num_a; i++) + { + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) + { + i--; + continue; + } + GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + } + + for (i = 0; i < num_b; i++) + { + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) + { + i--; + continue; + } + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) + { + i--; + continue; + } + GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + for (i = 0; i < num_c; i++) + { + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash)) + { + i--; + continue; + } + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash)) + { + i--; + continue; + } + if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash)) + { + i--; + continue; + } + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash); + GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + + /* use last hash for app id */ + app_id = hash; + + /* FIXME: also implement intersection etc. */ + set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); + set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION); + + GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a); + GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b); + GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a); + GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b); + + set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, + &app_id, set_listen_cb, NULL); + + set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, GNUNET_SET_RESULT_ADDED, + set_result_cb_1, NULL); + GNUNET_SET_commit (set_oh1, set_a); +} + +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + + ret = GNUNET_TESTING_peer_run ("test_set_api", + "test_set.conf", + &test_main, NULL); +} + + +int +main (int argc, char **argv) +{ + static const struct GNUNET_GETOPT_CommandLineOption options[] = { + { 'A', "num-first", NULL, + gettext_noop ("number of values"), + GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_a }, + { 'B', "num-second", NULL, + gettext_noop ("number of values"), + GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_b }, + { 'B', "num-common", NULL, + gettext_noop ("number of values"), + GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c }, + { 'x', "operation", NULL, + gettext_noop ("oeration to execute"), + GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str }, + GNUNET_GETOPT_OPTION_END + }; + GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus", + "help", + options, &run, NULL, GNUNET_YES); + return ret; +} + diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c deleted file mode 100644 index ae84610fc..000000000 --- a/src/set/gnunet-set.c +++ /dev/null @@ -1,203 +0,0 @@ -/* - This file is part of GNUnet - (C) 2012 Christian Grothoff (and other contributing authors) - - GNUnet is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 2, or (at your - option) any later version. - - GNUnet is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. - */ - -/** - * @file set/gnunet-set.c - * @brief profiling tool for the set service - * @author Florian Dold - */ -#include "platform.h" -#include "gnunet_common.h" -#include "gnunet_util_lib.h" -#include "gnunet_testbed_service.h" -#include "gnunet_set_service.h" - - -static struct GNUNET_PeerIdentity local_id; - -static struct GNUNET_HashCode app_id; -static struct GNUNET_SET_Handle *set1; -static struct GNUNET_SET_Handle *set2; -static struct GNUNET_SET_ListenHandle *listen_handle; -const static struct GNUNET_CONFIGURATION_Handle *config; - -int num_done; - - -static void -result_cb_set1 (void *cls, const struct GNUNET_SET_Element *element, - enum GNUNET_SET_Status status) -{ - switch (status) - { - case GNUNET_SET_STATUS_OK: - printf ("set 1: got element\n"); - break; - case GNUNET_SET_STATUS_FAILURE: - printf ("set 1: failure\n"); - break; - case GNUNET_SET_STATUS_DONE: - printf ("set 1: done\n"); - GNUNET_SET_destroy (set1); - break; - default: - GNUNET_assert (0); - } -} - - -static void -result_cb_set2 (void *cls, const struct GNUNET_SET_Element *element, - enum GNUNET_SET_Status status) -{ - switch (status) - { - case GNUNET_SET_STATUS_OK: - printf ("set 2: got element\n"); - break; - case GNUNET_SET_STATUS_FAILURE: - printf ("set 2: failure\n"); - break; - case GNUNET_SET_STATUS_DONE: - printf ("set 2: done\n"); - GNUNET_SET_destroy (set2); - break; - default: - GNUNET_assert (0); - } -} - - -static void -listen_cb (void *cls, - const struct GNUNET_PeerIdentity *other_peer, - const struct GNUNET_MessageHeader *context_msg, - struct GNUNET_SET_Request *request) -{ - struct GNUNET_SET_OperationHandle *oh; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); - GNUNET_SET_listen_cancel (listen_handle); - - oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); - GNUNET_SET_conclude (oh, set2); -} - - -/** - * Start the set operation. - * - * @param cls closure, unused - */ -static void -start (void *cls) -{ - struct GNUNET_SET_OperationHandle *oh; - - listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, - &app_id, listen_cb, NULL); - oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, - GNUNET_SET_RESULT_ADDED, - result_cb_set1, NULL); - GNUNET_SET_conclude (oh, set1); -} - - -/** - * Initialize the second set, continue - * - * @param cls closure, unused - */ -static void -init_set2 (void *cls) -{ - struct GNUNET_SET_Element element; - - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initializing set 2\n"); - - element.data = "hello"; - element.size = strlen(element.data); - GNUNET_SET_add_element (set2, &element, NULL, NULL); - element.data = "quux"; - element.size = strlen(element.data); - GNUNET_SET_add_element (set2, &element, start, NULL); -} - - -/** - * Initialize the first set, continue. - */ -static void -init_set1 (void) -{ - struct GNUNET_SET_Element element; - - element.data = "hello"; - element.size = strlen(element.data); - GNUNET_SET_add_element (set1, &element, NULL, NULL); - element.data = "bar"; - element.size = strlen(element.data); - GNUNET_SET_add_element (set1, &element, init_set2, NULL); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "initialized set 1\n"); -} - - -/** - * Main function that will be run. - * - * @param cls closure - * @param args remaining command-line arguments - * @param cfgfile name of the configuration file used (for saving, can be NULL!) - * @param cfg configuration - */ -static void -run (void *cls, char *const *args, - const char *cfgfile, - const struct GNUNET_CONFIGURATION_Handle *cfg) -{ - static const char* app_str = "gnunet-set"; - - config = cfg; - - GNUNET_CRYPTO_hash (app_str, strlen (app_str), &app_id); - - GNUNET_CRYPTO_get_host_identity (cfg, &local_id); - - set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); - set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); - - init_set1 (); -} - - - -int -main (int argc, char **argv) -{ - static const struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_OPTION_END - }; - GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set", - "help", - options, &run, NULL, GNUNET_NO); - return 0; -} - diff --git a/src/set/ibf.c b/src/set/ibf.c index 383ce3daf..e3c5be59a 100644 --- a/src/set/ibf.c +++ b/src/set/ibf.c @@ -19,13 +19,19 @@ */ /** - * @file consensus/ibf.c + * @file set/ibf.c * @brief implementation of the invertible bloom filter * @author Florian Dold */ #include "ibf.h" +/** + * Compute the key's hash from the key. + * Redefine to use a different hash function. + */ +#define IBF_KEY_HASH_VAL(k) (GNUNET_CRYPTO_crc32_n (&(k), sizeof (struct IBF_KeyHash))) + /** * Create a key from a hashcode. * @@ -89,23 +95,21 @@ static inline void ibf_get_indices (const struct InvertibleBloomFilter *ibf, struct IBF_Key key, int *dst) { - struct GNUNET_HashCode bucket_indices; - unsigned int filled; - int i; - GNUNET_CRYPTO_hash (&key, sizeof key, &bucket_indices); - filled = 0; - for (i = 0; filled < ibf->hash_num; i++) + uint32_t filled; + uint32_t i; + uint32_t bucket = key.key_val & 0xFFFFFFFF; + + for (i = 0, filled=0; filled < ibf->hash_num; i++) { - unsigned int bucket; unsigned int j; - if ( (0 != i) && (0 == (i % 16)) ) - GNUNET_CRYPTO_hash (&bucket_indices, sizeof (struct GNUNET_HashCode), &bucket_indices); - bucket = bucket_indices.bits[i % 16] % ibf->size; + uint64_t x; for (j = 0; j < filled; j++) if (dst[j] == bucket) goto try_next; - dst[filled++] = bucket; + dst[filled++] = bucket % ibf->size; try_next: ; + x = ((uint64_t) bucket << 32) | i; + bucket = GNUNET_CRYPTO_crc32_n (&x, sizeof x); } } @@ -116,16 +120,14 @@ ibf_insert_into (struct InvertibleBloomFilter *ibf, const int *buckets, int side) { int i; - struct GNUNET_HashCode key_hash_sha; - struct IBF_KeyHash key_hash; - GNUNET_CRYPTO_hash (&key, sizeof key, &key_hash_sha); - key_hash.key_hash_val = key_hash_sha.bits[0]; + for (i = 0; i < ibf->hash_num; i++) { const int bucket = buckets[i]; ibf->count[bucket].count_val += side; ibf->key_sum[bucket].key_val ^= key.key_val; - ibf->key_hash_sum[bucket].key_hash_val ^= key_hash.key_hash_val; + ibf->key_hash_sum[bucket].key_hash_val + ^= IBF_KEY_HASH_VAL (key); } } @@ -183,7 +185,6 @@ ibf_decode (struct InvertibleBloomFilter *ibf, { struct IBF_KeyHash hash; int i; - struct GNUNET_HashCode key_hash_sha; int buckets[ibf->hash_num]; GNUNET_assert (NULL != ibf); @@ -197,8 +198,7 @@ ibf_decode (struct InvertibleBloomFilter *ibf, if ((1 != ibf->count[i].count_val) && (-1 != ibf->count[i].count_val)) continue; - GNUNET_CRYPTO_hash (&ibf->key_sum[i], sizeof (struct IBF_Key), &key_hash_sha); - hash.key_hash_val = key_hash_sha.bits[0]; + hash.key_hash_val = IBF_KEY_HASH_VAL (ibf->key_sum[i]); /* test if the hash matches the key */ if (hash.key_hash_val != ibf->key_hash_sum[i].key_hash_val) diff --git a/src/set/ibf.h b/src/set/ibf.h index 2bf3ef7c7..90ea231c0 100644 --- a/src/set/ibf.h +++ b/src/set/ibf.h @@ -19,7 +19,7 @@ */ /** - * @file consensus/ibf.h + * @file set/ibf.h * @brief invertible bloom filter * @author Florian Dold */ diff --git a/src/set/set_api.c b/src/set/set_api.c index c74933aa0..e1b6132cb 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c @@ -40,7 +40,7 @@ struct GNUNET_SET_Handle { struct GNUNET_CLIENT_Connection *client; - struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Handle *mq; unsigned int messages_since_ack; }; @@ -73,7 +73,7 @@ struct GNUNET_SET_OperationHandle * Message sent to the server on calling conclude, * NULL if conclude has been called. */ - struct GNUNET_MQ_Message *conclude_mqm; + struct GNUNET_MQ_Envelope *conclude_mqm; /** * Address of the request if in the conclude message, @@ -89,7 +89,7 @@ struct GNUNET_SET_OperationHandle struct GNUNET_SET_ListenHandle { struct GNUNET_CLIENT_Connection *client; - struct GNUNET_MQ_MessageQueue* mq; + struct GNUNET_MQ_Handle* mq; GNUNET_SET_ListenCallback listen_cb; void *listen_cls; }; @@ -115,7 +115,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); GNUNET_MQ_send (set->mq, mqm); } @@ -162,7 +162,7 @@ handle_request (void *cls, const struct GNUNET_MessageHeader *mh) if (GNUNET_NO == req->accepted) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_AcceptRejectMessage *amsg; mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); @@ -197,9 +197,9 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op) { struct GNUNET_SET_Handle *set; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_CreateMessage *msg; - static const struct GNUNET_MQ_Handler mq_handlers[] = { + static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, GNUNET_MQ_HANDLERS_END }; @@ -234,7 +234,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set, GNUNET_SET_Continuation cont, void *cont_cls) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ElementMessage *msg; mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); @@ -262,7 +262,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, GNUNET_SET_Continuation cont, void *cont_cls) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ElementMessage *msg; mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); @@ -287,9 +287,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) /** - * Create a set operation for evaluation with another peer. + * Prepare a set operation to be evaluated with another peer. * The evaluation will not start until the client provides - * a local set with GNUNET_SET_conclude. + * a local set with GNUNET_SET_commit. * * @param other_peer peer with the other set * @param app_id hash for the application using the set @@ -304,15 +304,15 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) * @return a handle to cancel the operation */ struct GNUNET_SET_OperationHandle * -GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, - const struct GNUNET_HashCode *app_id, - const struct GNUNET_MessageHeader *context_msg, - uint16_t salt, - enum GNUNET_SET_ResultMode result_mode, - GNUNET_SET_ResultIterator result_cb, - void *result_cls) +GNUNET_SET_prepare (const struct GNUNET_PeerIdentity *other_peer, + const struct GNUNET_HashCode *app_id, + const struct GNUNET_MessageHeader *context_msg, + uint16_t salt, + enum GNUNET_SET_ResultMode result_mode, + GNUNET_SET_ResultIterator result_cb, + void *result_cls) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_OperationHandle *oh; struct GNUNET_SET_EvaluateMessage *msg; @@ -322,9 +322,6 @@ GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); - if (NULL != context_msg) - LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n"); - msg->app_id = *app_id; msg->target_peer = *other_peer; msg->salt = salt; @@ -356,9 +353,9 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, void *listen_cls) { struct GNUNET_SET_ListenHandle *lh; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_ListenMessage *msg; - static const struct GNUNET_MQ_Handler mq_handlers[] = { + static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, GNUNET_MQ_HANDLERS_END }; @@ -403,7 +400,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) * @param result_mode specified how results will be returned, * see 'GNUNET_SET_ResultMode'. * @param result_cb callback for the results - * @param result_cls closure for result_cb + * @param cls closure for result_cb * @return a handle to cancel the operation */ struct GNUNET_SET_OperationHandle * @@ -412,7 +409,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, GNUNET_SET_ResultIterator result_cb, void *cls) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_OperationHandle *oh; struct GNUNET_SET_AcceptRejectMessage *msg; @@ -441,7 +438,7 @@ GNUNET_SET_accept (struct GNUNET_SET_Request *request, void GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_OperationHandle *h_assoc; if (NULL != oh->set) @@ -460,7 +457,7 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) /** - * Conclude the given set operation using the given set. + * Commit a set to be used with a set operation. * This function is called once we have fully constructed * the set that we want to use for the operation. At this * time, the P2P protocol can then begin to exchange the @@ -471,13 +468,13 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) * @param set the set to use for the operation */ void -GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, +GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh, struct GNUNET_SET_Handle *set) { GNUNET_assert (NULL == oh->set); GNUNET_assert (NULL != oh->conclude_mqm); oh->set = set; - oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh); + oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh); *oh->request_id_addr = htonl (oh->request_id); GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); oh->conclude_mqm = NULL; diff --git a/src/set/strata_estimator.c b/src/set/strata_estimator.c index 024bb99c6..18c127cd6 100644 --- a/src/set/strata_estimator.c +++ b/src/set/strata_estimator.c @@ -19,7 +19,7 @@ */ /** - * @file consensus/ibf.h + * @file set/ibf.h * @brief invertible bloom filter * @author Florian Dold */ diff --git a/src/set/strata_estimator.h b/src/set/strata_estimator.h index b3f050743..718c996d0 100644 --- a/src/set/strata_estimator.h +++ b/src/set/strata_estimator.h @@ -19,7 +19,7 @@ */ /** - * @file consensus/strata_estimator.h + * @file set/strata_estimator.h * @brief estimator of set difference * @author Florian Dold */ diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index f773cebdf..db82b83b4 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c @@ -95,7 +95,7 @@ listen_cb (void *cls, GNUNET_SET_listen_cancel (listen_handle); oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); - GNUNET_SET_conclude (oh, set2); + GNUNET_SET_commit (oh, set2); } @@ -111,10 +111,10 @@ start (void *cls) listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, &app_id, listen_cb, NULL); - oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, - GNUNET_SET_RESULT_ADDED, - result_cb_set1, NULL); - GNUNET_SET_conclude (oh, set1); + oh = GNUNET_SET_prepare (&local_id, &app_id, NULL, 42, + GNUNET_SET_RESULT_ADDED, + result_cb_set1, NULL); + GNUNET_SET_commit (oh, set1); } diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 34f1ea0fa..47ed04117 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -3779,11 +3779,11 @@ GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh) * @param size the number of bytes written */ static void -mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) +mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, + size_t size) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Handle *mq = cls; + struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); switch (status) { @@ -3793,56 +3793,32 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size /* FIXME: call shutdown handler */ return; case GNUNET_STREAM_TIMEOUT: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); return; case GNUNET_STREAM_SYSERR: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_WRITE); return; default: GNUNET_assert (0); return; } - - /* call cb for message we finished sending */ - mqm = mq->current_msg; - GNUNET_assert (NULL != mq->current_msg); - if (NULL != mqm->sent_cb) - mqm->sent_cb (mqm->sent_cls); - GNUNET_free (mqm); mss->wh = NULL; - mqm = mq->msg_head; - mq->current_msg = mqm; - if (NULL == mqm) - return; - GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mqm); - mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), - GNUNET_TIME_UNIT_FOREVER_REL, - mq_stream_write_queued, mq); - GNUNET_assert (NULL != mss->wh); + GNUNET_MQ_impl_send_continue (mq); } static void -mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, - struct GNUNET_MQ_Message *mqm) +mq_stream_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, void *impl_state) { - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + struct MQStreamState *mss = impl_state; - if (NULL != mq->current_msg) - { - GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); - return; - } - mq->current_msg = mqm; - mss->wh = GNUNET_STREAM_write (mss->socket, mqm->mh, ntohs (mqm->mh->size), + /* no way to cancel sending now */ + GNUNET_MQ_impl_send_commit (mq); + + mss->wh = GNUNET_STREAM_write (mss->socket, msg, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, mq_stream_write_queued, mq); } @@ -3862,12 +3838,12 @@ mq_stream_send_impl (struct GNUNET_MQ_MessageQueue *mq, */ static int mq_stream_mst_callback (void *cls, void *client, - const struct GNUNET_MessageHeader *message) + const struct GNUNET_MessageHeader *message) { - struct GNUNET_MQ_MessageQueue *mq = cls; + struct GNUNET_MQ_Handle *mq = cls; GNUNET_assert (NULL != message); - GNUNET_MQ_dispatch (mq, message); + GNUNET_MQ_inject_message (mq, message); return GNUNET_OK; } @@ -3889,8 +3865,8 @@ mq_stream_data_processor (void *cls, const void *data, size_t size) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct MQStreamState *mss; + struct GNUNET_MQ_Handle *mq = cls; + struct MQStreamState *mss = GNUNET_MQ_impl_state (mq); int ret; switch (status) @@ -3901,45 +3877,33 @@ mq_stream_data_processor (void *cls, /* FIXME: call shutdown handler */ return 0; case GNUNET_STREAM_TIMEOUT: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_TIMEOUT); return 0; case GNUNET_STREAM_SYSERR: - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return 0; default: GNUNET_assert (0); return 0; } - mss = (struct MQStreamState *) mq->impl_state; - GNUNET_assert (GNUNET_STREAM_OK == status); ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); if (GNUNET_OK != ret) { - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, - "read error (message stream malformed), but no error handler installed for message queue\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return 0; } - /* we always read all data */ mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, mq_stream_data_processor, mq); + /* we always read all data */ return size; } static void -mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +mq_stream_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { - struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; + struct MQStreamState *mss = impl_state; if (NULL != mss->rh) { @@ -3972,24 +3936,21 @@ mq_stream_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) * @param error_handler callback for errors * @return the message queue for the socket */ -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, - const struct GNUNET_MQ_Handler *msg_handlers, + const struct GNUNET_MQ_MessageHandler *msg_handlers, GNUNET_MQ_ErrorHandler error_handler, void *cls) { - struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Handle *mq; struct MQStreamState *mss; - mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); mss = GNUNET_new (struct MQStreamState); mss->socket = socket; - mq->impl_state = mss; - mq->send_impl = mq_stream_send_impl; - mq->destroy_impl = mq_stream_destroy_impl; - mq->handlers = msg_handlers; - mq->handlers_cls = cls; - mq->error_handler = error_handler; + mq = GNUNET_MQ_queue_for_callbacks (mq_stream_send_impl, + mq_stream_destroy_impl, + NULL, + mss, msg_handlers, error_handler, cls); if (NULL != msg_handlers) { mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); diff --git a/src/util/mq.c b/src/util/mq.c index dc87b9711..d0253c40f 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -31,6 +31,118 @@ #define LOG(kind,...) GNUNET_log_from (kind, "mq",__VA_ARGS__) +struct GNUNET_MQ_Envelope +{ + /** + * Messages are stored in a linked list. + * Each queue has its own list of envelopes. + */ + struct GNUNET_MQ_Envelope *next; + + /** + * Messages are stored in a linked list + * Each queue has its own list of envelopes. + */ + struct GNUNET_MQ_Envelope *prev; + + /** + * Actual allocated message header, + * usually points to the end of the containing GNUNET_MQ_Envelope + */ + struct GNUNET_MessageHeader *mh; + + /** + * Queue the message is queued in, NULL if message is not queued. + */ + struct GNUNET_MQ_Handle *parent_queue; + + /** + * Called after the message was sent irrevocably. + */ + GNUNET_MQ_NotifyCallback sent_cb; + + /** + * Closure for send_cb + */ + void *sent_cls; +}; + + +/** + * Handle to a message queue. + */ +struct GNUNET_MQ_Handle +{ + /** + * Handlers array, or NULL if the queue should not receive messages + */ + const struct GNUNET_MQ_MessageHandler *handlers; + + /** + * Closure for the handler callbacks, + * as well as for the error handler. + */ + void *handlers_cls; + + /** + * Actual implementation of message sending, + * called when a message is added + */ + GNUNET_MQ_SendImpl send_impl; + + /** + * Implementation-dependent queue destruction function + */ + GNUNET_MQ_DestroyImpl destroy_impl; + + /** + * Implementation-specific state + */ + void *impl_state; + + /** + * Callback will be called when an error occurs. + */ + GNUNET_MQ_ErrorHandler error_handler; + + /** + * Linked list of messages pending to be sent + */ + struct GNUNET_MQ_Envelope *envelope_head; + + /** + * Linked list of messages pending to be sent + */ + struct GNUNET_MQ_Envelope *envelope_tail; + + /** + * Message that is currently scheduled to be + * sent. Not the head of the message queue, as the implementation + * needs to know if sending has been already scheduled or not. + */ + struct GNUNET_MQ_Envelope *current_envelope; + + /** + * Has the current envelope been commited? + * Either GNUNET_YES or GNUNET_NO. + */ + int commited; + + /** + * Map of associations, lazily allocated + */ + struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map; + + /** + * Next id that should be used for the assoc_map, + * initialized lazily to a random value together with + * assoc_map + */ + uint32_t assoc_id; +}; + + + struct ServerClientSocketState { @@ -42,9 +154,14 @@ struct ServerClientSocketState struct ClientConnectionState { /** - * Did we call receive? + * Did we call receive alread alreadyy? */ int receive_active; + + /** + * Do we also want to receive? + */ + int receive_requested; struct GNUNET_CLIENT_Connection *connection; struct GNUNET_CLIENT_TransmitHandle *th; }; @@ -59,9 +176,9 @@ struct ClientConnectionState * @param mh message to dispatch */ void -GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_MessageHeader *mh) +GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh) { - const struct GNUNET_MQ_Handler *handler; + const struct GNUNET_MQ_MessageHandler *handler; int handled = GNUNET_NO; handler = mq->handlers; @@ -81,8 +198,27 @@ GNUNET_MQ_dispatch (struct GNUNET_MQ_MessageQueue *mq, const struct GNUNET_Messa } +/** + * Call the right callback for an error condition. + * + * @param mq message queue + */ void -GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) +GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq, + enum GNUNET_MQ_Error error) +{ + if (NULL == mq->error_handler) + { + /* FIXME: log what kind of error occured */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "mq: got error, but no handler installed\n"); + return; + } + mq->error_handler (mq->handlers_cls, error); +} + + +void +GNUNET_MQ_discard (struct GNUNET_MQ_Envelope *mqm) { GNUNET_assert (NULL == mqm->parent_queue); GNUNET_free (mqm); @@ -94,20 +230,156 @@ GNUNET_MQ_discard (struct GNUNET_MQ_Message *mqm) * May only be called once per message. * * @param mq message queue - * @param mqm the message to send. + * @param ev the message to send. + */ +void +GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev) +{ + GNUNET_assert (NULL != mq); + GNUNET_assert (NULL == ev->parent_queue); + + /* is the implementation busy? queue it! */ + if (NULL != mq->current_envelope) + { + GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head, mq->envelope_tail, ev); + return; + } + mq->current_envelope = ev; + mq->send_impl (mq, ev->mh, mq->impl_state); +} + + +/** + * Call the send implementation for the next queued message, + * if any. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue to send the next message with */ void -GNUNET_MQ_send (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) +GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq) { + /* call is only valid if we're actually currently sending + * a message */ GNUNET_assert (NULL != mq); - mq->send_impl (mq, mqm); + GNUNET_assert (NULL != mq->current_envelope); + GNUNET_assert (GNUNET_YES == mq->commited); + mq->commited = GNUNET_NO; + GNUNET_free (mq->current_envelope); + if (NULL == mq->envelope_head) + { + mq->current_envelope = NULL; + return; + } + + + GNUNET_assert (NULL != mq->envelope_tail); + GNUNET_assert (NULL != mq->envelope_head); + mq->current_envelope = mq->envelope_head; + GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, + mq->current_envelope); + mq->send_impl (mq, mq->current_envelope->mh, mq->impl_state); } -struct GNUNET_MQ_Message * +/** + * Create a message queue for the specified handlers. + * + * @param send function the implements sending messages + * @param destroy function that implements destroying the queue + * @param destroy function that implements canceling a message + * @param state for the queue, passed to 'send' and 'destroy' + * @param handlers array of message handlers + * @param error_handler handler for read and write errors + * @return a new message queue + */ +struct GNUNET_MQ_Handle * +GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send, + GNUNET_MQ_DestroyImpl destroy, + GNUNET_MQ_CancelImpl cancel, + void *impl_state, + const struct GNUNET_MQ_MessageHandler *handlers, + GNUNET_MQ_ErrorHandler error_handler, + void *cls) +{ + struct GNUNET_MQ_Handle *mq; + + mq = GNUNET_new (struct GNUNET_MQ_Handle); + mq->send_impl = send; + mq->destroy_impl = destroy; + mq->handlers = handlers; + mq->handlers_cls = cls; + mq->impl_state = impl_state; + + return mq; +} + + +/** + * Get the message that should currently be sent. + * Fails if there is no current message. + * Only useful for implementing message queues, + * results in undefined behavior if not used carefully. + * + * @param mq message queue with the current message + * @return message to send, never NULL + */ +const struct GNUNET_MessageHeader * +GNUNET_MQ_impl_current (struct GNUNET_MQ_Handle *mq) +{ + if (NULL == mq->current_envelope) + GNUNET_abort (); + if (NULL == mq->current_envelope->mh) + GNUNET_abort (); + return mq->current_envelope->mh; +} + + +/** + * Get the implementation state associated with the + * message queue. + * + * While the GNUNET_MQ_Impl* callbacks receive the + * implementation state, continuations that are scheduled + * by the implementation function often only have one closure + * argument, with this function it is possible to get at the + * implementation state when only passing the GNUNET_MQ_Handle + * as closure. + * + * @param mq message queue with the current message + * @return message to send, never NULL + */ +void * +GNUNET_MQ_impl_state (struct GNUNET_MQ_Handle *mq) +{ + return mq->impl_state; +} + + + +/** + * Mark the current message as irrevocably sent, but do not + * proceed with sending the next message. + * Will call the appropriate GNUNET_MQ_NotifyCallback, if any. + * + * @param mq message queue + */ +void +GNUNET_MQ_impl_send_commit (struct GNUNET_MQ_Handle *mq) +{ + GNUNET_assert (NULL != mq->current_envelope); + GNUNET_assert (GNUNET_NO == mq->commited); + mq->commited = GNUNET_YES; + if (NULL != mq->current_envelope->sent_cb) + mq->current_envelope->sent_cb (mq->current_envelope->sent_cls); +} + + +struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; mqm = GNUNET_malloc (sizeof *mqm + size); mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1]; @@ -119,11 +391,11 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) } -struct GNUNET_MQ_Message * +struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; uint16_t size; if (NULL == nested_mh) @@ -154,85 +426,62 @@ static size_t transmit_queued (void *cls, size_t size, void *buf) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct GNUNET_MQ_Message *mqm = mq->current_msg; - struct ServerClientSocketState *state = mq->impl_state; + struct GNUNET_MQ_Handle *mq = cls; + struct ServerClientSocketState *state = GNUNET_MQ_impl_state (mq); + const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (mq); size_t msg_size; GNUNET_assert (NULL != buf); - if (NULL != mqm->sent_cb) - { - mqm->sent_cb (mqm->sent_cls); - } - - mq->current_msg = NULL; - GNUNET_assert (NULL != mqm); - msg_size = ntohs (mqm->mh->size); + msg_size = ntohs (msg->size); GNUNET_assert (size >= msg_size); - memcpy (buf, mqm->mh, msg_size); - GNUNET_free (mqm); + memcpy (buf, msg, msg_size); state->th = NULL; - if (NULL != mq->msg_head) - { - mq->current_msg = mq->msg_head; - GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); - state->th = - GNUNET_SERVER_notify_transmit_ready (state->client, msg_size, - GNUNET_TIME_UNIT_FOREVER_REL, - &transmit_queued, mq); - } + GNUNET_MQ_impl_send_continue (mq); + return msg_size; } static void -server_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +server_client_destroy_impl (struct GNUNET_MQ_Handle *mq, + void *impl_state) { - struct ServerClientSocketState *state; + struct ServerClientSocketState *state = impl_state; GNUNET_assert (NULL != mq); - state = mq->impl_state; GNUNET_assert (NULL != state); GNUNET_SERVER_client_drop (state->client); GNUNET_free (state); } static void -server_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, struct GNUNET_MQ_Message *mqm) +server_client_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, void *impl_state) { - struct ServerClientSocketState *state; - int msize; + struct ServerClientSocketState *state = impl_state; GNUNET_assert (NULL != mq); - state = mq->impl_state; GNUNET_assert (NULL != state); - if (NULL != state->th) - { - GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); - return; - } - GNUNET_assert (NULL == mq->msg_head); - GNUNET_assert (NULL == mq->current_msg); - msize = ntohs (mqm->mh->size); - mq->current_msg = mqm; + GNUNET_MQ_impl_send_commit (mq); + state->th = - GNUNET_SERVER_notify_transmit_ready (state->client, msize, + GNUNET_SERVER_notify_transmit_ready (state->client, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, &transmit_queued, mq); } -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_server_client (struct GNUNET_SERVER_Client *client) { - struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Handle *mq; struct ServerClientSocketState *scss; - mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); + mq = GNUNET_new (struct GNUNET_MQ_Handle); scss = GNUNET_new (struct ServerClientSocketState); mq->impl_state = scss; scss->client = client; @@ -254,24 +503,21 @@ static void handle_client_message (void *cls, const struct GNUNET_MessageHeader *msg) { - struct GNUNET_MQ_MessageQueue *mq = cls; + struct GNUNET_MQ_Handle *mq = cls; struct ClientConnectionState *state; state = mq->impl_state; if (NULL == msg) { - if (NULL == mq->error_handler) - LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); - else - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return; } GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, GNUNET_TIME_UNIT_FOREVER_REL); - GNUNET_MQ_dispatch (mq, msg); + GNUNET_MQ_inject_message (mq, msg); } @@ -287,23 +533,22 @@ static size_t connection_client_transmit_queued (void *cls, size_t size, void *buf) { - struct GNUNET_MQ_MessageQueue *mq = cls; - struct GNUNET_MQ_Message *mqm = mq->current_msg; + struct GNUNET_MQ_Handle *mq = cls; + const struct GNUNET_MessageHeader *msg; struct ClientConnectionState *state = mq->impl_state; size_t msg_size; + GNUNET_assert (NULL != mq); + msg = GNUNET_MQ_impl_current (mq); + if (NULL == buf) { - if (NULL == mq->error_handler) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed\n"); - return 0; - } - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ); return 0; } - if ((NULL != mq->handlers) && (GNUNET_NO == state->receive_active)) + if ( (GNUNET_YES == state->receive_requested) && + (GNUNET_NO == state->receive_active) ) { state->receive_active = GNUNET_YES; GNUNET_CLIENT_receive (state->connection, handle_client_message, mq, @@ -311,78 +556,53 @@ connection_client_transmit_queued (void *cls, size_t size, } - GNUNET_assert (NULL != mqm); - - if (NULL != mqm->sent_cb) - { - mqm->sent_cb (mqm->sent_cls); - } - - mq->current_msg = NULL; - GNUNET_assert (NULL != buf); - msg_size = ntohs (mqm->mh->size); + msg_size = ntohs (msg->size); GNUNET_assert (size >= msg_size); - memcpy (buf, mqm->mh, msg_size); - GNUNET_free (mqm); + memcpy (buf, msg, msg_size); state->th = NULL; - if (NULL != mq->msg_head) - { - mq->current_msg = mq->msg_head; - GNUNET_CONTAINER_DLL_remove (mq->msg_head, mq->msg_tail, mq->current_msg); - state->th = - GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (mq->current_msg->mh->size), - GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, - &connection_client_transmit_queued, mq); - } + + GNUNET_MQ_impl_send_continue (mq); + return msg_size; } static void -connection_client_destroy_impl (struct GNUNET_MQ_MessageQueue *mq) +connection_client_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state) { - GNUNET_free (mq->impl_state); + GNUNET_free (impl_state); } static void -connection_client_send_impl (struct GNUNET_MQ_MessageQueue *mq, - struct GNUNET_MQ_Message *mqm) +connection_client_send_impl (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MessageHeader *msg, void *impl_state) { - struct ClientConnectionState *state = mq->impl_state; - int msize; + struct ClientConnectionState *state = impl_state; GNUNET_assert (NULL != state); + GNUNET_assert (NULL == state->th); + + GNUNET_MQ_impl_send_commit (mq); - if (NULL != state->th) - { - GNUNET_CONTAINER_DLL_insert_tail (mq->msg_head, mq->msg_tail, mqm); - return; - } - GNUNET_assert (NULL == mq->current_msg); - mq->current_msg = mqm; - msize = ntohs (mqm->mh->size); state->th = - GNUNET_CLIENT_notify_transmit_ready (state->connection, msize, + GNUNET_CLIENT_notify_transmit_ready (state->connection, ntohs (msg->size), GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, &connection_client_transmit_queued, mq); } - - - -struct GNUNET_MQ_MessageQueue * +struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection, - const struct GNUNET_MQ_Handler *handlers, + const struct GNUNET_MQ_MessageHandler *handlers, void *cls) { - struct GNUNET_MQ_MessageQueue *mq; + struct GNUNET_MQ_Handle *mq; struct ClientConnectionState *state; GNUNET_assert (NULL != connection); - mq = GNUNET_new (struct GNUNET_MQ_MessageQueue); + mq = GNUNET_new (struct GNUNET_MQ_Handle); mq->handlers = handlers; mq->handlers_cls = cls; state = GNUNET_new (struct ClientConnectionState); @@ -390,16 +610,20 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti mq->impl_state = state; mq->send_impl = connection_client_send_impl; mq->destroy_impl = connection_client_destroy_impl; + if (NULL != handlers) + state->receive_requested = GNUNET_YES; return mq; } void -GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, - const struct GNUNET_MQ_Handler *new_handlers, +GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq, + const struct GNUNET_MQ_MessageHandler *new_handlers, void *cls) { + /* FIXME: notify implementation? */ + /* FIXME: what about NULL handlers? abort receive? */ mq->handlers = new_handlers; mq->handlers_cls = cls; } @@ -413,8 +637,7 @@ GNUNET_MQ_replace_handlers (struct GNUNET_MQ_MessageQueue *mq, * @param assoc_data to associate */ uint32_t -GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, - struct GNUNET_MQ_Message *mqm, +GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data) { uint32_t id; @@ -433,7 +656,7 @@ GNUNET_MQ_assoc_add (struct GNUNET_MQ_MessageQueue *mq, void * -GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) +GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { if (NULL == mq->assoc_map) return NULL; @@ -442,7 +665,7 @@ GNUNET_MQ_assoc_get (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) void * -GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) +GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id) { void *val; @@ -456,7 +679,7 @@ GNUNET_MQ_assoc_remove (struct GNUNET_MQ_MessageQueue *mq, uint32_t request_id) void -GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, +GNUNET_MQ_notify_sent (struct GNUNET_MQ_Envelope *mqm, GNUNET_MQ_NotifyCallback cb, void *cls) { @@ -466,13 +689,13 @@ GNUNET_MQ_notify_sent (struct GNUNET_MQ_Message *mqm, void -GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) +GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq) { /* FIXME: destroy all pending messages in the queue */ if (NULL != mq->destroy_impl) { - mq->destroy_impl (mq); + mq->destroy_impl (mq, mq->impl_state); } GNUNET_free (mq); @@ -480,7 +703,6 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) - struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) { diff --git a/src/util/test_mq.c b/src/util/test_mq.c index 55cd80ef1..45bba0a6b 100644 --- a/src/util/test_mq.c +++ b/src/util/test_mq.c @@ -40,7 +40,7 @@ GNUNET_NETWORK_STRUCT_END void test1 (void) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct MyMessage *mm; mm = NULL; @@ -57,7 +57,7 @@ test1 (void) void test2 (void) { - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Envelope *mqm; struct GNUNET_MessageHeader *mh; mqm = GNUNET_MQ_msg_header (42); diff --git a/src/util/test_mq_client.c b/src/util/test_mq_client.c index b7eb1516a..30e498fcc 100644 --- a/src/util/test_mq_client.c +++ b/src/util/test_mq_client.c @@ -60,6 +60,9 @@ recv_cb (void *cls, struct GNUNET_SERVER_Client *argclient, return; } + /* can happen if notify does not work */ + GNUNET_assert (received < 2); + GNUNET_SERVER_receive_done (argclient, GNUNET_YES); } @@ -98,14 +101,16 @@ static struct GNUNET_SERVER_MessageHandler handlers[] = { void send_cb (void *cls) { + /* the notify should only be called once */ + GNUNET_assert (GNUNET_NO == notify); printf ("notify sent\n"); notify = GNUNET_YES; } void test_mq (struct GNUNET_CLIENT_Connection *client) { - struct GNUNET_MQ_MessageQueue *mq; - struct GNUNET_MQ_Message *mqm; + struct GNUNET_MQ_Handle *mq; + struct GNUNET_MQ_Envelope *mqm; /* FIXME: test handling responses */ mq = GNUNET_MQ_queue_for_connection_client (client, NULL, NULL); -- 2.25.1