From: Christian Grothoff Date: Sun, 19 Jun 2016 21:29:20 +0000 (+0000) Subject: refactoring core API to use new MQ lib X-Git-Tag: initial-import-from-subversion-38251~776 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=8b1a351c049deec3226aa40a883b97c76916bea7;p=oweals%2Fgnunet.git refactoring core API to use new MQ lib --- diff --git a/src/core/core_api.c b/src/core/core_api.c index 7b423b6a0..af78ab4f9 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009-2014 GNUnet e.V. + Copyright (C) 2009-2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -43,12 +43,6 @@ struct GNUNET_CORE_TransmitHandle */ struct PeerRecord *peer; - /** - * Corresponding SEND_REQUEST message. Only non-NULL - * while SEND_REQUEST message is pending. - */ - struct ControlMessage *cm; - /** * Function that will be called to get the actual request * (once we are ready to transmit this request to the core). @@ -103,26 +97,12 @@ struct PeerRecord { /** - * We generally do NOT keep peer records in a DLL; this - * DLL is only used IF this peer's 'pending_head' message - * is ready for transmission. - */ - struct PeerRecord *prev; - - /** - * We generally do NOT keep peer records in a DLL; this - * DLL is only used IF this peer's 'pending_head' message - * is ready for transmission. - */ - struct PeerRecord *next; - - /** - * Corresponding core handle. + * Corresponding CORE handle. */ struct GNUNET_CORE_Handle *ch; /** - * Pending request, if any. 'th->peer' is set to NULL if the + * Pending request, if any. 'th->peer' is set to NULL if the * request is not active. */ struct GNUNET_CORE_TransmitHandle th; @@ -132,11 +112,6 @@ struct PeerRecord */ struct GNUNET_PeerIdentity peer; - /** - * ID of task to run #run_request_next_transmission(). - */ - struct GNUNET_SCHEDULER_Task *ntr_task; - /** * SendMessageRequest ID generator for this peer. */ @@ -145,58 +120,6 @@ struct PeerRecord }; -/** - * Type of function called upon completion. - * - * @param cls closure - * @param success #GNUNET_OK on success (which for request_connect - * ONLY means that we transmitted the connect request to CORE, - * it does not mean that we are actually now connected!); - * #GNUNET_NO on timeout, - * #GNUNET_SYSERR if core was shut down - */ -typedef void -(*GNUNET_CORE_ControlContinuation) (void *cls, - int success); - - -/** - * Entry in a doubly-linked list of control messages to be transmitted - * to the core service. Control messages include traffic allocation, - * connection requests and of course our initial 'init' request. - * - * The actual message is allocated at the end of this struct. - */ -struct ControlMessage -{ - /** - * This is a doubly-linked list. - */ - struct ControlMessage *next; - - /** - * This is a doubly-linked list. - */ - struct ControlMessage *prev; - - /** - * Function to run after transmission failed/succeeded. - */ - GNUNET_CORE_ControlContinuation cont; - - /** - * Closure for @e cont. - */ - void *cont_cls; - - /** - * Transmit handle (if one is associated with this ControlMessage), or NULL. - */ - struct GNUNET_CORE_TransmitHandle *th; -}; - - - /** * Context for the core service connection. */ @@ -241,39 +164,12 @@ struct GNUNET_CORE_Handle /** * Function handlers for messages of particular type. */ - const struct GNUNET_CORE_MessageHandler *handlers; - - /** - * Our connection to the service. - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Handle for our current transmission request. - */ - struct GNUNET_CLIENT_TransmitHandle *cth; - - /** - * Head of doubly-linked list of pending requests. - */ - struct ControlMessage *control_pending_head; - - /** - * Tail of doubly-linked list of pending requests. - */ - struct ControlMessage *control_pending_tail; - - /** - * Head of doubly-linked list of peers that are core-approved - * to send their next message. - */ - struct PeerRecord *ready_peer_head; + struct GNUNET_CORE_MessageHandler *handlers; /** - * Tail of doubly-linked list of peers that are core-approved - * to send their next message. + * Our message queue for transmissions to the service. */ - struct PeerRecord *ready_peer_tail; + struct GNUNET_MQ_Handle *mq; /** * Hash map listing all of the peers that we are currently @@ -289,7 +185,7 @@ struct GNUNET_CORE_Handle /** * ID of reconnect task (if any). */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * Current delay we use for re-trying to connect to core. @@ -351,8 +247,8 @@ reconnect_task (void *cls) /** - * Notify clients about disconnect and free - * the entry for connected peer. + * Notify clients about disconnect and free the entry for connected + * peer. * * @param cls the `struct GNUNET_CORE_Handle *` * @param key the peer identity (not used) @@ -368,17 +264,6 @@ disconnect_and_free_peer_entry (void *cls, struct GNUNET_CORE_TransmitHandle *th; struct PeerRecord *pr = value; - if (NULL != pr->ntr_task) - { - GNUNET_SCHEDULER_cancel (pr->ntr_task); - pr->ntr_task = NULL; - } - if ( (NULL != pr->prev) || - (NULL != pr->next) || - (h->ready_peer_head == pr) ) - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, - h->ready_peer_tail, - pr); if (NULL != h->disconnects) h->disconnects (h->cls, &pr->peer); @@ -388,14 +273,13 @@ disconnect_and_free_peer_entry (void *cls, { GNUNET_break (0); th->peer = NULL; - if (NULL != th->cm) - th->cm->th = NULL; } /* done with 'voluntary' cleanups, now on to normal freeing */ GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_remove (h->peers, key, pr)); + GNUNET_CONTAINER_multipeermap_remove (h->peers, + key, + pr)); GNUNET_assert (pr->ch == h); - GNUNET_assert (NULL == pr->ntr_task); GNUNET_free (pr); return GNUNET_YES; } @@ -410,659 +294,490 @@ disconnect_and_free_peer_entry (void *cls, static void reconnect_later (struct GNUNET_CORE_Handle *h) { - struct ControlMessage *cm; - struct PeerRecord *pr; - GNUNET_assert (NULL == h->reconnect_task); - if (NULL != h->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } h->currently_down = GNUNET_YES; GNUNET_assert (h->reconnect_task == NULL); h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff, - &reconnect_task, h); - while (NULL != (cm = h->control_pending_head)) - { - GNUNET_CONTAINER_DLL_remove (h->control_pending_head, - h->control_pending_tail, - cm); - if (NULL != cm->th) - cm->th->cm = NULL; - if (NULL != cm->cont) - cm->cont (cm->cont_cls, GNUNET_NO); - GNUNET_free (cm); - } + &reconnect_task, + h); GNUNET_CONTAINER_multipeermap_iterate (h->peers, - &disconnect_and_free_peer_entry, h); - while (NULL != (pr = h->ready_peer_head)) - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, - h->ready_peer_tail, - pr); - GNUNET_assert (NULL == h->control_pending_head); + &disconnect_and_free_peer_entry, + h); h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); } /** - * Check the list of pending requests, send the next - * one to the core. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param h core handle - * @param ignore_currently_down transmit message even if not initialized? + * @param cls closure, a `struct GNUNET_CORE_Handle *` + * @param error error code */ static void -trigger_next_request (struct GNUNET_CORE_Handle *h, - int ignore_currently_down); +handle_mq_error (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_CORE_Handle *h = cls; + + reconnect_later (h); +} /** - * Send a control message to the peer asking for transmission - * of the message in the given peer record. + * Handle init reply message received from CORE service. Notify + * application that we are now connected to the CORE. Also fake + * loopback connection. * - * @param pr peer to request transmission to + * @param cls the `struct GNUNET_CORE_Handle` + * @param m the init reply */ static void -request_next_transmission (struct PeerRecord *pr) +handle_init_reply (void *cls, + const struct InitReplyMessage *m) { - struct GNUNET_CORE_Handle *h = pr->ch; - struct ControlMessage *cm; - struct SendMessageRequest *smr; - struct GNUNET_CORE_TransmitHandle *th; + struct GNUNET_CORE_Handle *h = cls; + GNUNET_CORE_StartupCallback init; + struct PeerRecord *pr; - th = &pr->th; - if (NULL == th->peer) + GNUNET_break (0 == ntohl (m->reserved)); + GNUNET_break (GNUNET_YES == h->currently_down); + h->currently_down = GNUNET_NO; + h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + if (NULL != (init = h->init)) { - trigger_next_request (h, GNUNET_NO); - return; + /* mark so we don't call init on reconnect */ + h->init = NULL; + h->me = m->my_identity; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connected to core service of peer `%s'.\n", + GNUNET_i2s (&h->me)); + init (h->cls, + &h->me); } - if (NULL != th->cm) - return; /* already done */ - GNUNET_assert (NULL == pr->prev); - GNUNET_assert (NULL == pr->next); - cm = GNUNET_malloc (sizeof (struct ControlMessage) + - sizeof (struct SendMessageRequest)); - th->cm = cm; - cm->th = th; - smr = (struct SendMessageRequest *) &cm[1]; - smr->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); - smr->header.size = htons (sizeof (struct SendMessageRequest)); - smr->priority = htonl ((uint32_t) th->priority); - smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); - smr->peer = pr->peer; - smr->reserved = htonl (0); - smr->size = htons (th->msize); - smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); - GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, - h->control_pending_tail, cm); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding SEND REQUEST for peer `%s' to message queue\n", - GNUNET_i2s (&pr->peer)); - trigger_next_request (h, GNUNET_NO); + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Successfully reconnected to core service.\n"); + GNUNET_break (0 == memcmp (&h->me, + &m->my_identity, + sizeof (struct GNUNET_PeerIdentity))); + } + /* fake 'connect to self' */ + pr = GNUNET_new (struct PeerRecord); + pr->peer = h->me; + pr->ch = h; + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put (h->peers, + &h->me, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (NULL != h->connects) + h->connects (h->cls, + &pr->peer); } /** - * Transmit the next message to the core service. + * Handle connect message received from CORE service. + * Notify the application about the new connection. * - * @param cls closure with the `struct GNUNET_CORE_Handle` - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf + * @param cls the `struct GNUNET_CORE_Handle` + * @param cnm the connect message */ -static size_t -transmit_message (void *cls, - size_t size, - void *buf) +static void +handle_connect_notify (void *cls, + const struct ConnectNotifyMessage * cnm) { struct GNUNET_CORE_Handle *h = cls; - struct ControlMessage *cm; - struct GNUNET_CORE_TransmitHandle *th; - struct GNUNET_TIME_Relative delay; - struct GNUNET_TIME_Relative overdue; struct PeerRecord *pr; - struct SendMessage *sm; - const struct GNUNET_MessageHeader *hdr; - uint16_t msize; - size_t ret; - GNUNET_assert (h->reconnect_task == NULL); - h->cth = NULL; - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission failed, initiating reconnect\n"); - reconnect_later (h); - return 0; - } - /* first check for control messages */ - if (NULL != (cm = h->control_pending_head)) - { - hdr = (const struct GNUNET_MessageHeader *) &cm[1]; - msize = ntohs (hdr->size); - if (size < msize) - { - trigger_next_request (h, GNUNET_NO); - return 0; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting control message with %u bytes of type %u to core.\n", - (unsigned int) msize, - (unsigned int) ntohs (hdr->type)); - memcpy (buf, hdr, msize); - GNUNET_CONTAINER_DLL_remove (h->control_pending_head, - h->control_pending_tail, cm); - if (NULL != cm->th) - cm->th->cm = NULL; - if (NULL != cm->cont) - cm->cont (cm->cont_cls, GNUNET_OK); - GNUNET_free (cm); - trigger_next_request (h, GNUNET_NO); - return msize; - } - /* now check for 'ready' P2P messages */ - if (NULL == (pr = h->ready_peer_head)) - return 0; - GNUNET_assert (NULL != pr->th.peer); - th = &pr->th; - if (size < th->msize + sizeof (struct SendMessage)) - { - trigger_next_request (h, GNUNET_NO); - return 0; - } - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, - h->ready_peer_tail, - pr); - th->peer = NULL; + GNUNET_break (GNUNET_NO == h->currently_down); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting SEND request to `%s' with %u bytes.\n", - GNUNET_i2s (&pr->peer), - (unsigned int) th->msize); - sm = (struct SendMessage *) buf; - sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); - sm->priority = htonl ((uint32_t) th->priority); - sm->deadline = GNUNET_TIME_absolute_hton (th->deadline); - sm->peer = pr->peer; - sm->cork = htonl ((uint32_t) th->cork); - sm->reserved = htonl (0); - ret = - th->get_message (th->get_message_cls, - size - sizeof (struct SendMessage), - &sm[1]); - delay = GNUNET_TIME_absolute_get_duration (th->request_time); - overdue = GNUNET_TIME_absolute_get_duration (th->deadline); - if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) - LOG (GNUNET_ERROR_TYPE_WARNING, - "Transmitting overdue %u bytes to `%s' at priority %u with %s delay%s\n", - ret, - GNUNET_i2s (&pr->peer), - (unsigned int) th->priority, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES), - (th->cork) ? " (corked)" : ""); - else - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting %u bytes to `%s' at priority %u with %s delay%s\n", - ret, - GNUNET_i2s (&pr->peer), - (unsigned int) th->priority, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES), - (th->cork) ? " (corked)" : ""); - if ( (0 == ret) && - (GNUNET_CORE_PRIO_BACKGROUND == th->priority) ) + "Received notification about connection from `%s'.\n", + GNUNET_i2s (&cnm->peer)); + if (0 == memcmp (&h->me, + &cnm->peer, + sizeof (struct GNUNET_PeerIdentity))) { - /* client decided to send nothing; as the priority was - BACKGROUND, we can just not send anything to core. - For higher-priority messages, we must give an - empty message to CORE so that it knows that this - message is no longer pending. */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Size of clients message to peer %s is 0!\n", - GNUNET_i2s (&pr->peer)); - request_next_transmission (pr); - return 0; + /* connect to self!? */ + GNUNET_break (0); + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Produced SEND message to core with %u bytes payload\n", - (unsigned int) ret); - if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + pr = GNUNET_CONTAINER_multipeermap_get (h->peers, + &cnm->peer); + if (NULL != pr) { GNUNET_break (0); - request_next_transmission (pr); - return 0; + reconnect_later (h); + return; } - ret += sizeof (struct SendMessage); - sm->header.size = htons (ret); - GNUNET_assert (ret <= size); - request_next_transmission (pr); - return ret; + pr = GNUNET_new (struct PeerRecord); + pr->peer = cnm->peer; + pr->ch = h; + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multipeermap_put (h->peers, + &cnm->peer, + pr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (NULL != h->connects) + h->connects (h->cls, + &pr->peer); } /** - * Check the list of pending requests, send the next one to the core. + * Handle disconnect message received from CORE service. + * Notify the application about the lost connection. * - * @param h core handle - * @param ignore_currently_down transmit message even if not initialized? + * @param cls the `struct GNUNET_CORE_Handle` + * @param dnm message about the disconnect event */ static void -trigger_next_request (struct GNUNET_CORE_Handle *h, - int ignore_currently_down) +handle_disconnect_notify (void *cls, + const struct DisconnectNotifyMessage * dnm) { - uint16_t msize; + struct GNUNET_CORE_Handle *h = cls; + struct PeerRecord *pr; - if ( (GNUNET_YES == h->currently_down) && - (GNUNET_NO == ignore_currently_down) ) + GNUNET_break (GNUNET_NO == h->currently_down); + if (0 == memcmp (&h->me, + &dnm->peer, + sizeof (struct GNUNET_PeerIdentity))) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Core connection down, not processing queue\n"); + /* connection to self!? */ + GNUNET_break (0); return; } - if (NULL != h->cth) + GNUNET_break (0 == ntohl (dnm->reserved)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received notification about disconnect from `%s'.\n", + GNUNET_i2s (&dnm->peer)); + pr = GNUNET_CONTAINER_multipeermap_get (h->peers, + &dnm->peer); + if (NULL == pr) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request pending, not processing queue\n"); + GNUNET_break (0); + reconnect_later (h); return; } - if (NULL != h->control_pending_head) - msize = - ntohs (((struct GNUNET_MessageHeader *) &h-> - control_pending_head[1])->size); - else if (h->ready_peer_head != NULL) - msize = - h->ready_peer_head->th.msize + sizeof (struct SendMessage); - else + disconnect_and_free_peer_entry (h, + &dnm->peer, + pr); +} + + +/** + * Check that message received from CORE service is well-formed. + * + * @param cls the `struct GNUNET_CORE_Handle` + * @param ntm the message we got + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_notify_inbound (void *cls, + const struct NotifyTrafficMessage *ntm) +{ + struct GNUNET_CORE_Handle *h = cls; + uint16_t msize; + const struct GNUNET_MessageHeader *em; + + GNUNET_break (GNUNET_NO == h->currently_down); + msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage); + if (msize < sizeof (struct GNUNET_MessageHeader)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request queue empty, not processing queue\n"); - return; /* no pending message */ + GNUNET_break (0); + return GNUNET_SYSERR; } - h->cth = - GNUNET_CLIENT_notify_transmit_ready (h->client, msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &transmit_message, h); + em = (const struct GNUNET_MessageHeader *) &ntm[1]; + if ( (GNUNET_NO == h->inbound_hdr_only) && + (msize != ntohs (em->size)) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; } /** - * Handler for notification messages received from the core. + * Handle inbound message received from CORE service. If applicable, + * notify the application. * - * @param cls our `struct GNUNET_CORE_Handle` - * @param msg the message received from the core service + * @param cls the `struct GNUNET_CORE_Handle` + * @param ntm the message we got from CORE. */ static void -main_notify_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_notify_inbound (void *cls, + const struct NotifyTrafficMessage *ntm) { struct GNUNET_CORE_Handle *h = cls; - const struct InitReplyMessage *m; - const struct ConnectNotifyMessage *cnm; - const struct DisconnectNotifyMessage *dnm; - const struct NotifyTrafficMessage *ntm; const struct GNUNET_MessageHeader *em; - const struct SendMessageReady *smr; - const struct GNUNET_CORE_MessageHandler *mh; - GNUNET_CORE_StartupCallback init; struct PeerRecord *pr; - struct GNUNET_CORE_TransmitHandle *th; - unsigned int hpos; - int trigger; - uint16_t msize; uint16_t et; - if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_INFO, - _("Client was disconnected from core service, trying to reconnect.\n")); - reconnect_later (h); - return; - } - msize = ntohs (msg->size); + GNUNET_break (GNUNET_NO == h->currently_down); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Processing message of type %u and size %u from core service\n", - ntohs (msg->type), msize); - switch (ntohs (msg->type)) + "Received inbound message from `%s'.\n", + GNUNET_i2s (&ntm->peer)); + em = (const struct GNUNET_MessageHeader *) &ntm[1]; + et = ntohs (em->type); + for (unsigned int hpos = 0; NULL != h->handlers[hpos].callback; hpos++) { - case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: - if (ntohs (msg->size) != sizeof (struct InitReplyMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - m = (const struct InitReplyMessage *) msg; - GNUNET_break (0 == ntohl (m->reserved)); - /* start our message processing loop */ - if (GNUNET_YES == h->currently_down) - { - h->currently_down = GNUNET_NO; - trigger_next_request (h, GNUNET_NO); - } - h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; - h->me = m->my_identity; - if (NULL != (init = h->init)) - { - /* mark so we don't call init on reconnect */ - h->init = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connected to core service of peer `%s'.\n", - GNUNET_i2s (&h->me)); - init (h->cls, &h->me); - } - else - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Successfully reconnected to core service.\n"); - } - /* fake 'connect to self' */ - pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &h->me); - GNUNET_assert (NULL == pr); - pr = GNUNET_new (struct PeerRecord); - pr->peer = h->me; - pr->ch = h; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put (h->peers, - &h->me, pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - if (NULL != h->connects) - h->connects (h->cls, &pr->peer); - break; - case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT: - if (msize < sizeof (struct ConnectNotifyMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - cnm = (const struct ConnectNotifyMessage *) msg; - if (msize != - sizeof (struct ConnectNotifyMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received notification about connection from `%s'.\n", - GNUNET_i2s (&cnm->peer)); - if (0 == memcmp (&h->me, - &cnm->peer, - sizeof (struct GNUNET_PeerIdentity))) - { - /* connect to self!? */ - GNUNET_break (0); - return; - } - pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &cnm->peer); - if (NULL != pr) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - pr = GNUNET_new (struct PeerRecord); - pr->peer = cnm->peer; - pr->ch = h; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multipeermap_put (h->peers, - &cnm->peer, pr, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); - if (NULL != h->connects) - h->connects (h->cls, &pr->peer); - break; - case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT: - if (msize != sizeof (struct DisconnectNotifyMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - dnm = (const struct DisconnectNotifyMessage *) msg; - if (0 == memcmp (&h->me, - &dnm->peer, - sizeof (struct GNUNET_PeerIdentity))) - { - /* connection to self!? */ - GNUNET_break (0); - return; - } - GNUNET_break (0 == ntohl (dnm->reserved)); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received notification about disconnect from `%s'.\n", - GNUNET_i2s (&dnm->peer)); - pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &dnm->peer); - if (NULL == pr) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - trigger = ((pr->prev != NULL) || (pr->next != NULL) || - (h->ready_peer_head == pr)); - disconnect_and_free_peer_entry (h, &dnm->peer, pr); - if (trigger) - trigger_next_request (h, GNUNET_NO); - break; - case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND: - if (msize < sizeof (struct NotifyTrafficMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - ntm = (const struct NotifyTrafficMessage *) msg; - if ((msize < - sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)) ) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - em = (const struct GNUNET_MessageHeader *) &ntm[1]; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %u and size %u from peer `%s'\n", - ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); - if ((GNUNET_NO == h->inbound_hdr_only) && - (msize != - ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - et = ntohs (em->type); - for (hpos = 0; hpos < h->hcnt; hpos++) - { - mh = &h->handlers[hpos]; - if (mh->type != et) - continue; - if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Unexpected message size %u for message of type %u from peer `%s'\n", - htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); - GNUNET_break_op (0); - continue; - } - pr = GNUNET_CONTAINER_multipeermap_get (h->peers, &ntm->peer); - if (NULL == pr) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - if (GNUNET_OK != - h->handlers[hpos].callback (h->cls, &ntm->peer, em)) - { - /* error in processing, do not process other messages! */ - break; - } - } - if (NULL != h->inbound_notify) - h->inbound_notify (h->cls, &ntm->peer, em); - break; - case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: - if (msize < sizeof (struct NotifyTrafficMessage)) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - ntm = (const struct NotifyTrafficMessage *) msg; - if ((msize < - sizeof (struct NotifyTrafficMessage) + - sizeof (struct GNUNET_MessageHeader)) ) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - em = (const struct GNUNET_MessageHeader *) &ntm[1]; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received notification about transmission to `%s'.\n", - GNUNET_i2s (&ntm->peer)); - if ((GNUNET_NO == h->outbound_hdr_only) && - (msize != - ntohs (em->size) + sizeof (struct NotifyTrafficMessage))) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - if (NULL == h->outbound_notify) - { - GNUNET_break (0); - break; - } - h->outbound_notify (h->cls, &ntm->peer, em); - break; - case GNUNET_MESSAGE_TYPE_CORE_SEND_READY: - if (msize != sizeof (struct SendMessageReady)) + const struct GNUNET_CORE_MessageHandler *mh; + + mh = &h->handlers[hpos]; + if (mh->type != et) + continue; + if ( (mh->expected_size != ntohs (em->size)) && + (0 != mh->expected_size) ) { - GNUNET_break (0); - reconnect_later (h); - return; + LOG (GNUNET_ERROR_TYPE_ERROR, + "Unexpected message size %u for message of type %u from peer `%s'\n", + htons (em->size), + mh->type, + GNUNET_i2s (&ntm->peer)); + GNUNET_break_op (0); + continue; } - smr = (const struct SendMessageReady *) msg; pr = GNUNET_CONTAINER_multipeermap_get (h->peers, - &smr->peer); + &ntm->peer); if (NULL == pr) { GNUNET_break (0); reconnect_later (h); return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received notification about transmission readiness to `%s'.\n", - GNUNET_i2s (&smr->peer)); - if (NULL == pr->th.peer) + if (GNUNET_OK != + h->handlers[hpos].callback (h->cls, + &ntm->peer, + em)) { - /* request must have been cancelled between the original request - * and the response from core, ignore core's readiness */ + /* error in processing, do not process other messages! */ break; } + } + if (NULL != h->inbound_notify) + h->inbound_notify (h->cls, + &ntm->peer, + em); +} - th = &pr->th; - if (ntohs (smr->smr_id) != th->smr_id) - { - /* READY message is for expired or cancelled message, - * ignore! (we should have already sent another request) */ - break; - } - if ( (NULL != pr->prev) || - (NULL != pr->next) || - (h->ready_peer_head == pr) ) - { - /* we should not already be on the ready list... */ - GNUNET_break (0); - reconnect_later (h); - return; - } - GNUNET_CONTAINER_DLL_insert (h->ready_peer_head, - h->ready_peer_tail, - pr); - trigger_next_request (h, GNUNET_NO); - break; - default: - reconnect_later (h); + +/** + * Check that message received from CORE service is well-formed. + * + * @param cls the `struct GNUNET_CORE_Handle` + * @param ntm the message we got + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_notify_outbound (void *cls, + const struct NotifyTrafficMessage *ntm) +{ + struct GNUNET_CORE_Handle *h = cls; + uint16_t msize; + const struct GNUNET_MessageHeader *em; + + GNUNET_break (GNUNET_NO == h->currently_down); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received outbound message from `%s'.\n", + GNUNET_i2s (&ntm->peer)); + msize = ntohs (ntm->header.size) - sizeof (struct NotifyTrafficMessage); + if (msize < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + em = (const struct GNUNET_MessageHeader *) &ntm[1]; + if ( (GNUNET_NO == h->outbound_hdr_only) && + (msize != ntohs (em->size)) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Handle outbound message received from CORE service. If applicable, + * notify the application. + * + * @param cls the `struct GNUNET_CORE_Handle` + * @param ntm the message we got + */ +static void +handle_notify_outbound (void *cls, + const struct NotifyTrafficMessage *ntm) +{ + struct GNUNET_CORE_Handle *h = cls; + const struct GNUNET_MessageHeader *em; + + GNUNET_break (GNUNET_NO == h->currently_down); + em = (const struct GNUNET_MessageHeader *) &ntm[1]; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received notification about transmission to `%s'.\n", + GNUNET_i2s (&ntm->peer)); + if (NULL == h->outbound_notify) + { + GNUNET_break (0); return; } - GNUNET_CLIENT_receive (h->client, - &main_notify_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); + h->outbound_notify (h->cls, + &ntm->peer, + em); } /** - * Task executed once we are done transmitting the INIT message. - * Starts our 'receive' loop. + * Handle message received from CORE service notifying us that we are + * now allowed to send a message to a peer. If that message is still + * pending, put it into the queue to be transmitted. * - * @param cls the 'struct GNUNET_CORE_Handle' - * @param success were we successful + * @param cls the `struct GNUNET_CORE_Handle` + * @param ntm the message we got */ static void -init_done_task (void *cls, int success) +handle_send_ready (void *cls, + const struct SendMessageReady *smr) { struct GNUNET_CORE_Handle *h = cls; + struct PeerRecord *pr; + struct GNUNET_CORE_TransmitHandle *th; + struct SendMessage *sm; + struct GNUNET_MQ_Envelope *env; + struct GNUNET_TIME_Relative delay; + struct GNUNET_TIME_Relative overdue; + unsigned int ret; - if (GNUNET_SYSERR == success) - return; /* shutdown */ - if (GNUNET_NO == success) + GNUNET_break (GNUNET_NO == h->currently_down); + pr = GNUNET_CONTAINER_multipeermap_get (h->peers, + &smr->peer); + if (NULL == pr) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to exchange INIT with core, retrying\n"); - if (h->reconnect_task == NULL) - reconnect_later (h); + GNUNET_break (0); + reconnect_later (h); + return; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received notification about transmission readiness to `%s'.\n", + GNUNET_i2s (&smr->peer)); + if (NULL == pr->th.peer) + { + /* request must have been cancelled between the original request + * and the response from CORE, ignore CORE's readiness */ return; } - GNUNET_CLIENT_receive (h->client, - &main_notify_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); + th = &pr->th; + if (ntohs (smr->smr_id) != th->smr_id) + { + /* READY message is for expired or cancelled message, + * ignore! (we should have already sent another request) */ + return; + } + /* ok, all good, send message out! */ + th->peer = NULL; + env = GNUNET_MQ_msg_extra (sm, + th->msize, + GNUNET_MESSAGE_TYPE_CORE_SEND); + sm->priority = htonl ((uint32_t) th->priority); + sm->deadline = GNUNET_TIME_absolute_hton (th->deadline); + sm->peer = pr->peer; + sm->cork = htonl ((uint32_t) th->cork); + sm->reserved = htonl (0); + ret = th->get_message (th->get_message_cls, + th->msize, + &sm[1]); + GNUNET_assert (ret == th->msize); /* NOTE: API change! */ + delay = GNUNET_TIME_absolute_get_duration (th->request_time); + overdue = GNUNET_TIME_absolute_get_duration (th->deadline); + if (overdue.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Transmitting overdue %u bytes to `%s' at priority %u with %s delay %s\n", + ret, + GNUNET_i2s (&pr->peer), + (unsigned int) th->priority, + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES), + (th->cork) ? " (corked)" : ""); + else + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting %u bytes to `%s' at priority %u with %s delay %s\n", + ret, + GNUNET_i2s (&pr->peer), + (unsigned int) th->priority, + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES), + (th->cork) ? " (corked)" : ""); + GNUNET_MQ_send (h->mq, + env); } /** - * Our current client connection went down. Clean it up - * and try to reconnect! + * Our current client connection went down. Clean it up and try to + * reconnect! * * @param h our handle to the core service */ static void reconnect (struct GNUNET_CORE_Handle *h) { - struct ControlMessage *cm; + GNUNET_MQ_hd_fixed_size (init_reply, + GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY, + struct InitReplyMessage); + GNUNET_MQ_hd_fixed_size (connect_notify, + GNUNET_MESSAGE_TYPE_CORE_NOTIFY_CONNECT, + struct ConnectNotifyMessage); + GNUNET_MQ_hd_fixed_size (disconnect_notify, + GNUNET_MESSAGE_TYPE_CORE_NOTIFY_DISCONNECT, + struct DisconnectNotifyMessage); + GNUNET_MQ_hd_var_size (notify_inbound, + GNUNET_MESSAGE_TYPE_CORE_NOTIFY_INBOUND, + struct NotifyTrafficMessage); + GNUNET_MQ_hd_var_size (notify_outbound, + GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND, + struct NotifyTrafficMessage); + GNUNET_MQ_hd_fixed_size (send_ready, + GNUNET_MESSAGE_TYPE_CORE_SEND_READY, + struct SendMessageReady); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_init_reply_handler (h), + make_connect_notify_handler (h), + make_disconnect_notify_handler (h), + make_notify_inbound_handler (h), + make_notify_outbound_handler (h), + make_send_ready_handler (h), + GNUNET_MQ_handler_end () + }; struct InitMessage *init; + struct GNUNET_MQ_Envelope *env; uint32_t opt; - uint16_t msize; uint16_t *ts; - unsigned int hpos; - GNUNET_assert (NULL == h->client); + GNUNET_assert (NULL == h->mq); GNUNET_assert (GNUNET_YES == h->currently_down); - GNUNET_assert (NULL != h->cfg); - h->client = GNUNET_CLIENT_connect ("core", h->cfg); - if (NULL == h->client) + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "core", + handlers, + &handle_mq_error, + h); + if (NULL == h->mq) { reconnect_later (h); return; } - msize = h->hcnt * sizeof (uint16_t) + sizeof (struct InitMessage); - cm = GNUNET_malloc (sizeof (struct ControlMessage) + msize); - cm->cont = &init_done_task; - cm->cont_cls = h; - init = (struct InitMessage *) &cm[1]; - init->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_INIT); - init->header.size = htons (msize); + env = GNUNET_MQ_msg_extra (init, + sizeof (uint16_t) * h->hcnt, + GNUNET_MESSAGE_TYPE_CORE_INIT); opt = 0; if (NULL != h->inbound_notify) { @@ -1081,22 +796,18 @@ reconnect (struct GNUNET_CORE_Handle *h) LOG (GNUNET_ERROR_TYPE_INFO, "(Re)connecting to CORE service, monitoring messages of type %u\n", opt); - init->options = htonl (opt); - ts = (uint16_t *) & init[1]; - for (hpos = 0; hpos < h->hcnt; hpos++) + ts = (uint16_t *) &init[1]; + for (unsigned int hpos = 0; hpos < h->hcnt; hpos++) ts[hpos] = htons (h->handlers[hpos].type); - GNUNET_CONTAINER_DLL_insert (h->control_pending_head, - h->control_pending_tail, - cm); - trigger_next_request (h, GNUNET_YES); + GNUNET_MQ_send (h->mq, + env); } - /** - * Connect to the core service. Note that the connection may - * complete (or fail) asynchronously. + * Connect to the core service. Note that the connection may complete + * (or fail) asynchronously. * * @param cfg configuration to use * @param cls closure for the various callbacks that follow (including handlers in the handlers array) @@ -1129,8 +840,8 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CORE_MessageHandler *handlers) { struct GNUNET_CORE_Handle *h; + unsigned int hcnt; - GNUNET_assert (NULL != cfg); h = GNUNET_new (struct GNUNET_CORE_Handle); h->cfg = cfg; h->cls = cls; @@ -1141,14 +852,20 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->outbound_notify = outbound_notify; h->inbound_hdr_only = inbound_hdr_only; h->outbound_hdr_only = outbound_hdr_only; - h->handlers = handlers; - h->hcnt = 0; h->currently_down = GNUNET_YES; h->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_NO); + hcnt = 0; + if (NULL != handlers) + while (NULL != handlers[hcnt].callback) + hcnt++; + h->handlers = GNUNET_new_array (hcnt + 1, + struct GNUNET_CORE_MessageHandler); if (NULL != handlers) - while (NULL != handlers[h->hcnt].callback) - h->hcnt++; - GNUNET_assert (h->hcnt < + memcpy (h->handlers, + handlers, + hcnt * sizeof (struct GNUNET_CORE_MessageHandler)); + h->hcnt = hcnt; + GNUNET_assert (hcnt < (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct InitMessage)) / sizeof (uint16_t)); LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1168,62 +885,28 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) { - struct ControlMessage *cm; - - GNUNET_assert (NULL != handle); LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n"); - if (NULL != handle->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); - handle->cth = NULL; - } - while (NULL != (cm = handle->control_pending_head)) - { - GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, - handle->control_pending_tail, - cm); - if (NULL != cm->th) - cm->th->cm = NULL; - if (NULL != cm->cont) - cm->cont (cm->cont_cls, GNUNET_SYSERR); - GNUNET_free (cm); - } - if (NULL != handle->client) - { - GNUNET_CLIENT_disconnect (handle->client); - handle->client = NULL; - } GNUNET_CONTAINER_multipeermap_iterate (handle->peers, &disconnect_and_free_peer_entry, handle); + GNUNET_CONTAINER_multipeermap_destroy (handle->peers); + handle->peers = NULL; if (NULL != handle->reconnect_task) { GNUNET_SCHEDULER_cancel (handle->reconnect_task); handle->reconnect_task = NULL; } - GNUNET_CONTAINER_multipeermap_destroy (handle->peers); - handle->peers = NULL; - GNUNET_break (NULL == handle->ready_peer_head); + if (NULL != handle->mq) + { + GNUNET_MQ_destroy (handle->mq); + handle->mq = NULL; + } + GNUNET_free (handle->handlers); GNUNET_free (handle); } -/** - * Task that calls #request_next_transmission(). - * - * @param cls the `struct PeerRecord *` - */ -static void -run_request_next_transmission (void *cls) -{ - struct PeerRecord *pr = cls; - - pr->ntr_task = NULL; - request_next_transmission (pr); -} - - /** * Ask the core to call @a notify once it is ready to transmit the * given number of bytes to the specified @a target. Must only be @@ -1261,13 +944,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, { struct PeerRecord *pr; struct GNUNET_CORE_TransmitHandle *th; + struct SendMessageRequest *smr; + struct GNUNET_MQ_Envelope *env; - if (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + GNUNET_assert (NULL != notify); + if ( (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) || + (notify_size + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ) { - GNUNET_break (0); - return NULL; + GNUNET_break (0); + return NULL; } - GNUNET_assert (NULL != notify); LOG (GNUNET_ERROR_TYPE_DEBUG, "Asking core for transmission of %u bytes to `%s'\n", (unsigned int) notify_size, @@ -1286,10 +972,10 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, GNUNET_break (0); return NULL; } - GNUNET_assert (notify_size + sizeof (struct SendMessage) < - GNUNET_SERVER_MAX_MESSAGE_SIZE); th = &pr->th; - memset (th, 0, sizeof (struct GNUNET_CORE_TransmitHandle)); + memset (th, + 0, + sizeof (struct GNUNET_CORE_TransmitHandle)); th->peer = pr; th->get_message = notify; th->get_message_cls = notify_cls; @@ -1301,9 +987,16 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, th->priority = priority; th->msize = notify_size; th->cork = cork; - GNUNET_assert (NULL == pr->ntr_task); - pr->ntr_task = - GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); + env = GNUNET_MQ_msg (smr, + GNUNET_MESSAGE_TYPE_CORE_SEND_REQUEST); + smr->priority = htonl ((uint32_t) th->priority); + smr->deadline = GNUNET_TIME_absolute_hton (th->deadline); + smr->peer = pr->peer; + smr->reserved = htonl (0); + smr->size = htons (th->msize); + smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); + GNUNET_MQ_send (handle->mq, + env); LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n"); return th; @@ -1319,41 +1012,12 @@ void GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) { struct PeerRecord *pr = th->peer; - struct GNUNET_CORE_Handle *h; - GNUNET_assert (NULL != th); - GNUNET_assert (NULL != pr); LOG (GNUNET_ERROR_TYPE_DEBUG, "Aborting transmission request to core for %u bytes to `%s'\n", (unsigned int) th->msize, GNUNET_i2s (&pr->peer)); th->peer = NULL; - h = pr->ch; - if (NULL != th->cm) - { - /* we're currently in the control queue, remove */ - GNUNET_CONTAINER_DLL_remove (h->control_pending_head, - h->control_pending_tail, - th->cm); - GNUNET_free (th->cm); - th->cm = NULL; - } - if ( (NULL != pr->prev) || - (NULL != pr->next) || - (pr == h->ready_peer_head) ) - { - /* the request that was 'approved' by core was - * canceled before it could be transmitted; remove - * us from the 'ready' list */ - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, - h->ready_peer_tail, - pr); - } - if (NULL != pr->ntr_task) - { - GNUNET_SCHEDULER_cancel (pr->ntr_task); - pr->ntr_task = NULL; - } } @@ -1376,9 +1040,8 @@ int GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, const struct GNUNET_PeerIdentity *pid) { - GNUNET_assert (NULL != h); - GNUNET_assert (NULL != pid); - return GNUNET_CONTAINER_multipeermap_contains (h->peers, pid); + return GNUNET_CONTAINER_multipeermap_contains (h->peers, + pid); } diff --git a/src/core/gnunet-service-core_clients.c b/src/core/gnunet-service-core_clients.c index 3f7824066..c90b674e0 100644 --- a/src/core/gnunet-service-core_clients.c +++ b/src/core/gnunet-service-core_clients.c @@ -876,7 +876,7 @@ GSC_CLIENTS_deliver_message (const struct GNUNET_PeerIdentity *sender, (0 != (options & GNUNET_CORE_OPTION_SEND_FULL_INBOUND)) )) return; /* no client cares about this message notification */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core service passes message from `%4s' of type %u to client.\n", + "Core service passes message from `%s' of type %u to client.\n", GNUNET_i2s (sender), (unsigned int) ntohs (msg->type)); GSC_SESSIONS_add_to_typemap (sender, ntohs (msg->type)); diff --git a/src/core/test_core_api_reliability.c b/src/core/test_core_api_reliability.c index 49affdff3..c7672afdb 100644 --- a/src/core/test_core_api_reliability.c +++ b/src/core/test_core_api_reliability.c @@ -138,7 +138,9 @@ terminate_task (void *cls) FPRINTF (stderr, "\nThroughput was %llu kb/s\n", total_bytes * 1000000LL / 1024 / delta); - GAUGER ("CORE", "Core throughput/s", total_bytes * 1000000LL / 1024 / delta, + GAUGER ("CORE", + "Core throughput/s", + total_bytes * 1000000LL / 1024 / delta, "kb/s"); ok = 0; } @@ -155,7 +157,9 @@ terminate_task_error (void *cls) static size_t -transmit_ready (void *cls, size_t size, void *buf) +transmit_ready (void *cls, + size_t size, + void *buf) { char *cbuf = buf; struct TestMessage hdr; @@ -183,7 +187,10 @@ transmit_ready (void *cls, size_t size, void *buf) do { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending message %u of size %u at offset %u\n", tr_n, s, ret); + "Sending message %u of size %u at offset %u\n", + tr_n, + s, + ret); hdr.header.size = htons (s); hdr.header.type = htons (MTYPE); hdr.num = htonl (tr_n); @@ -202,14 +209,16 @@ transmit_ready (void *cls, size_t size, void *buf) GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Returning total message block of size %u\n", ret); + "Returning total message block of size %u\n", + ret); total_bytes += ret; return ret; } static void -connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer) +connect_notify (void *cls, + const struct GNUNET_PeerIdentity *peer) { struct PeerContext *pc = cls; @@ -240,30 +249,35 @@ connect_notify (void *cls, const struct GNUNET_PeerIdentity *peer) static void -disconnect_notify (void *cls, const struct GNUNET_PeerIdentity *peer) +disconnect_notify (void *cls, + const struct GNUNET_PeerIdentity *peer) { struct PeerContext *pc = cls; if (0 == memcmp (&pc->id, peer, sizeof (struct GNUNET_PeerIdentity))) return; pc->connect_status = 0; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Encrypted connection to `%s' cut\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Encrypted connection to `%s' cut\n", GNUNET_i2s (peer)); } static int -inbound_notify (void *cls, const struct GNUNET_PeerIdentity *other, +inbound_notify (void *cls, + const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Core provides inbound data from `%s'.\n", GNUNET_i2s (other)); + "Core provides inbound data from `%s'.\n", + GNUNET_i2s (other)); return GNUNET_OK; } static int -outbound_notify (void *cls, const struct GNUNET_PeerIdentity *other, +outbound_notify (void *cls, + const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -274,7 +288,9 @@ outbound_notify (void *cls, const struct GNUNET_PeerIdentity *other, static size_t -transmit_ready (void *cls, size_t size, void *buf); +transmit_ready (void *cls, + size_t size, + void *buf); static int @@ -294,22 +310,29 @@ process_mtype (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Expected message %u of size %u, got %u bytes of message %u\n", - n, s, ntohs (message->size), ntohl (hdr->num)); + n, s, + ntohs (message->size), + ntohl (hdr->num)); GNUNET_SCHEDULER_cancel (err_task); - err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, NULL); + err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, + NULL); return GNUNET_SYSERR; } if (ntohl (hdr->num) != n) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Expected message %u of size %u, got %u bytes of message %u\n", - n, s, ntohs (message->size), ntohl (hdr->num)); + n, s, + ntohs (message->size), + ntohl (hdr->num)); GNUNET_SCHEDULER_cancel (err_task); err_task = GNUNET_SCHEDULER_add_now (&terminate_task_error, NULL); return GNUNET_SYSERR; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got message %u of size %u\n", - ntohl (hdr->num), ntohs (message->size)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Got message %u of size %u\n", + ntohl (hdr->num), + ntohs (message->size)); n++; if (0 == (n % (TOTAL_MSGS / 100))) FPRINTF (stderr, "%s", "."); @@ -379,7 +402,8 @@ init_notify (void *cls, static void -process_hello (void *cls, const struct GNUNET_MessageHeader *message) +process_hello (void *cls, + const struct GNUNET_MessageHeader *message) { struct PeerContext *p = cls; @@ -400,7 +424,8 @@ process_hello (void *cls, const struct GNUNET_MessageHeader *message) static void -setup_peer (struct PeerContext *p, const char *cfgname) +setup_peer (struct PeerContext *p, + const char *cfgname) { char *binary; @@ -423,7 +448,9 @@ setup_peer (struct PeerContext *p, const char *cfgname) static void -run (void *cls, char *const *args, const char *cfgfile, +run (void *cls, + char *const *args, + const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { GNUNET_assert (ok == 1); @@ -431,7 +458,9 @@ run (void *cls, char *const *args, const char *cfgfile, setup_peer (&p1, "test_core_api_peer1.conf"); setup_peer (&p2, "test_core_api_peer2.conf"); err_task = - GNUNET_SCHEDULER_add_delayed (TIMEOUT, &terminate_task_error, NULL); + GNUNET_SCHEDULER_add_delayed (TIMEOUT, + &terminate_task_error, + NULL); GNUNET_assert (NULL != (p1.ch = GNUNET_CORE_connect (p1.cfg, &p1, &init_notify, @@ -447,10 +476,13 @@ static void stop_arm (struct PeerContext *p) { if (0 != GNUNET_OS_process_kill (p->arm_proc, GNUNET_TERM_SIG)) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "kill"); if (GNUNET_OS_process_wait (p->arm_proc) != GNUNET_OK) - GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "waitpid"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ARM process %u stopped\n", + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "waitpid"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ARM process %u stopped\n", GNUNET_OS_process_get_pid (p->arm_proc)); GNUNET_OS_process_destroy (p->arm_proc); p->arm_proc = NULL; @@ -461,7 +493,8 @@ stop_arm (struct PeerContext *p) int main (int argc, char *argv1[]) { - char *const argv[] = { "test-core-api-reliability", + char *const argv[] = { + "test-core-api-reliability", "-c", "test_core_api_data.conf", NULL diff --git a/src/core/test_core_api_send_to_self.c b/src/core/test_core_api_send_to_self.c index 54dd29ca8..4889a038f 100644 --- a/src/core/test_core_api_send_to_self.c +++ b/src/core/test_core_api_send_to_self.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2010 Christian Grothoff + Copyright (C) 2010, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -20,7 +20,7 @@ /** * @file core/test_core_api_send_to_self.c - * @brief + * @brief test that sending a message to ourselves via CORE works * @author Philipp Toelke */ #include "platform.h" @@ -38,7 +38,7 @@ static int ret; /** * Handle to the cleanup task. */ -struct GNUNET_SCHEDULER_Task * die_task; +static struct GNUNET_SCHEDULER_Task *die_task; /** * Identity of this peer. @@ -48,7 +48,7 @@ static struct GNUNET_PeerIdentity myself; /** * The handle to core */ -struct GNUNET_CORE_Handle *core; +static struct GNUNET_CORE_Handle *core; /** @@ -57,45 +57,64 @@ struct GNUNET_CORE_Handle *core; static void cleanup (void *cls) { - die_task = NULL; - - if (core != NULL) + if (NULL != die_task) + { + GNUNET_SCHEDULER_cancel (die_task); + die_task = NULL; + } + if (NULL != core) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting core.\n"); GNUNET_CORE_disconnect (core); core = NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ending test.\n"); +} + + +/** + * Function scheduled as very last function, cleans up after us + */ +static void +do_timeout (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Test timeout.\n"); + die_task = NULL; + GNUNET_SCHEDULER_shutdown (); } static int -receive (void *cls, const struct GNUNET_PeerIdentity *other, +receive (void *cls, + const struct GNUNET_PeerIdentity *other, const struct GNUNET_MessageHeader *message) { - if (die_task != NULL) - GNUNET_SCHEDULER_cancel (die_task); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received message from peer %s\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message from peer %s\n", GNUNET_i2s (other)); GNUNET_assert (GNUNET_MESSAGE_TYPE_DUMMY == ntohs (message->type)); GNUNET_assert (0 == memcmp (other, &myself, sizeof (myself))); - GNUNET_SCHEDULER_add_now (&cleanup, NULL); + GNUNET_SCHEDULER_shutdown (); ret = 0; return GNUNET_OK; } static size_t -send_message (void *cls, size_t size, void *buf) +send_message (void *cls, + size_t size, + void *buf) { - if (size == 0 || buf == NULL) + struct GNUNET_MessageHeader *hdr = buf; + if ( (size == 0) || (buf == NULL) ) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not send; got 0 buffer\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Could not send; got 0 buffer\n"); return 0; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending!\n"); - struct GNUNET_MessageHeader *hdr = buf; - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending!\n"); hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); hdr->type = htons (GNUNET_MESSAGE_TYPE_DUMMY); return ntohs (hdr->size); @@ -114,23 +133,32 @@ init (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Correctly connected to CORE; we are the peer %s.\n", GNUNET_i2s (my_identity)); - memcpy (&myself, my_identity, sizeof (struct GNUNET_PeerIdentity)); + memcpy (&myself, + my_identity, + sizeof (struct GNUNET_PeerIdentity)); } static void -connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) +connect_cb (void *cls, + const struct GNUNET_PeerIdentity *peer) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to peer %s.\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connected to peer %s.\n", GNUNET_i2s (peer)); - if (0 == memcmp (peer, &myself, sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (peer, + &myself, + sizeof (struct GNUNET_PeerIdentity))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected to myself; sending message!\n"); - GNUNET_CORE_notify_transmit_ready (core, GNUNET_YES, 0, - GNUNET_TIME_UNIT_FOREVER_REL, peer, + GNUNET_CORE_notify_transmit_ready (core, + GNUNET_YES, + 0, + GNUNET_TIME_UNIT_FOREVER_REL, + peer, sizeof (struct GNUNET_MessageHeader), - send_message, NULL); + &send_message, NULL); } } @@ -147,16 +175,20 @@ run (void *cls, struct GNUNET_TESTING_Peer *peer) { const static struct GNUNET_CORE_MessageHandler handlers[] = { - {&receive, GNUNET_MESSAGE_TYPE_DUMMY, 0}, + { &receive, + GNUNET_MESSAGE_TYPE_DUMMY, + sizeof (struct GNUNET_MessageHeader) }, {NULL, 0, 0} }; core = - GNUNET_CORE_connect (cfg, NULL, &init, &connect_cb, NULL, NULL, + GNUNET_CORE_connect (cfg, NULL, &init, + &connect_cb, NULL, NULL, 0, NULL, 0, handlers); - die_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 300), &cleanup, - NULL); + GNUNET_SCHEDULER_add_shutdown (&cleanup, + NULL); + die_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &do_timeout, + NULL); } @@ -170,6 +202,7 @@ run (void *cls, int main (int argc, char *argv[]) { + ret = 1; if (0 != GNUNET_TESTING_peer_run ("test-core-api-send-to-self", "test_core_api_peer1.conf", &run, NULL))