X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmulticast%2Fmulticast_api.c;h=7cfe708359db25b447ffc246f3ffcb21ed38c69f;hb=2f45a7c9691aa2670c8902618be5e8011428f0af;hp=501ff4b701c5ebba821301bae2e552f9fbb66afe;hpb=c0d549b6ab07a951380b807f1a1c1a767bfd5be0;p=oweals%2Fgnunet.git diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 501ff4b70..7cfe70835 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012, 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2012, 2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,16 +14,17 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file multicast/multicast_api.c - * @brief multicast service; establish tunnels to distant peers + * @brief Multicast service; implements multicast groups using CADET connections. * @author Christian Grothoff * @author Gabor X Toth */ + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_multicast_service.h" @@ -32,26 +33,6 @@ #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__) -/** - * Started origins. - * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin - */ -static struct GNUNET_CONTAINER_MultiHashMap *origins; - -/** - * Joined members. - * group_key_hash -> struct GNUNET_MULTICAST_Member - */ -static struct GNUNET_CONTAINER_MultiHashMap *members; - - -struct MessageQueue -{ - struct MessageQueue *prev; - struct MessageQueue *next; -}; - - /** * Handle for a request to send a message to all multicast group members * (from the origin). @@ -90,69 +71,61 @@ struct GNUNET_MULTICAST_Group const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connection to the service. */ - struct GNUNET_CLIENT_Connection *client; + struct GNUNET_MQ_Handle *mq; /** - * Currently pending transmission request, or NULL for none. + * Message to send on connect. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Envelope *connect_env; /** - * Head of operations to transmit. + * Time to wait until we try to reconnect on failure. */ - struct MessageQueue *tmit_head; + struct GNUNET_TIME_Relative reconnect_delay; /** - * Tail of operations to transmit. + * Task for reconnecting when the listener fails. */ - struct MessageQueue *tmit_tail; + struct GNUNET_SCHEDULER_Task *reconnect_task; - /** - * Message being transmitted to the Multicast service. - */ - struct MessageQueue *tmit_msg; + GNUNET_MULTICAST_JoinRequestCallback join_req_cb; + GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; + GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; + GNUNET_MULTICAST_MessageCallback message_cb; + void *cb_cls; /** - * Message to send on reconnect. + * Function called after disconnected from the service. */ - struct GNUNET_MessageHeader *reconnect_msg; + GNUNET_ContinuationCallback disconnect_cb; /** - * Task doing exponential back-off trying to reconnect. + * Closure for @a disconnect_cb. */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; + void *disconnect_cls; /** - * Time for next connect retry. + * Are we currently transmitting a message? */ - struct GNUNET_TIME_Relative reconnect_delay; - - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - struct GNUNET_HashCode pub_key_hash; - - GNUNET_MULTICAST_JoinCallback join_cb; - GNUNET_MULTICAST_MembershipTestCallback member_test_cb; - GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; - GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; - GNUNET_MULTICAST_MessageCallback message_cb; - void *cb_cls; + uint8_t in_transmit; /** - * Are we polling for incoming messages right now? + * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for. */ - uint8_t in_receive; + uint8_t acks_pending; /** - * Are we currently transmitting a message? + * Is this the origin or a member? */ - uint8_t in_transmit; + uint8_t is_origin; /** - * Is this the origin or a member? + * Is this channel in the process of disconnecting from the service? + * #GNUNET_YES or #GNUNET_NO */ - uint8_t is_origin; + uint8_t is_disconnecting; }; @@ -163,7 +136,6 @@ struct GNUNET_MULTICAST_Origin { struct GNUNET_MULTICAST_Group grp; struct GNUNET_MULTICAST_OriginTransmitHandle tmit; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; GNUNET_MULTICAST_RequestCallback request_cb; }; @@ -177,10 +149,12 @@ struct GNUNET_MULTICAST_Member struct GNUNET_MULTICAST_Group grp; struct GNUNET_MULTICAST_MemberTransmitHandle tmit; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; - struct GNUNET_PeerIdentity origin; - struct GNUNET_PeerIdentity relays; - uint32_t relay_count; + GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; + + /** + * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *replay_reqs; uint64_t next_fragment_id; }; @@ -189,7 +163,7 @@ struct GNUNET_MULTICAST_Member /** * Handle that identifies a join request. * - * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the + * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the * corresponding calls to #GNUNET_MULTICAST_join_decision(). */ struct GNUNET_MULTICAST_JoinHandle @@ -197,22 +171,14 @@ struct GNUNET_MULTICAST_JoinHandle struct GNUNET_MULTICAST_Group *group; /** - * Public key of the joining member. + * Public key of the member requesting join. */ - struct GNUNET_CRYPTO_EddsaPublicKey member_key; + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; /** - * Peer identity of the joining member. + * Peer identity of the member requesting join. */ - struct GNUNET_PeerIdentity member_peer; -}; - - -/** - * Handle to pass back for the answer of a membership test. - */ -struct GNUNET_MULTICAST_MembershipTestHandle -{ + struct GNUNET_PeerIdentity peer; }; @@ -221,6 +187,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle */ struct GNUNET_MULTICAST_ReplayHandle { + struct GNUNET_MULTICAST_Group *grp; + struct MulticastReplayRequestMessage req; }; @@ -233,401 +201,371 @@ struct GNUNET_MULTICAST_MemberReplayHandle static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - +origin_to_all (struct GNUNET_MULTICAST_Origin *orig); static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp); +member_to_origin (struct GNUNET_MULTICAST_Member *mem); /** - * Schedule transmission of the next message from our queue. - * - * @param grp PSYC channel handle + * Check join request message. */ -static void -transmit_next (struct GNUNET_MULTICAST_Group *grp); +static int +check_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) +{ + uint16_t size = ntohs (jreq->header.size); + if (sizeof (*jreq) == size) + return GNUNET_OK; -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg); + if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; +} /** - * Reschedule a connect attempt to the service. - * - * @param c channel to reconnect + * Receive join request from service. */ static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp) +handle_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) { - GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + struct GNUNET_MULTICAST_Group *grp = cls; + struct GNUNET_MULTICAST_JoinHandle *jh; + const struct GNUNET_MessageHeader *jmsg = NULL; - if (NULL != grp->th) + if (NULL == grp) { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) - { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; + GNUNET_break (0); + return; } - grp->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to Multicast service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES)); - grp->reconnect_task = - GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp); - grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); + if (NULL == grp->join_req_cb) + return; + + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) + jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; + + jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_pub_key = jreq->member_pub_key; + jh->peer = jreq->peer; + grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Reset stored data related to the last received message. + * Check multicast message. */ -static void -recv_reset (struct GNUNET_MULTICAST_Group *grp) +static int +check_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) { + return GNUNET_OK; } +/** + * Receive multicast message from service. + */ static void -recv_error (struct GNUNET_MULTICAST_Group *grp) +handle_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) { + struct GNUNET_MULTICAST_Group *grp = cls; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling message callback with a message of size %u.\n", + ntohs (mmsg->header.size)); + if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, NULL); + grp->message_cb (grp->cb_cls, mmsg); - recv_reset (grp); + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Transmit next message to service. - * - * @param cls The struct GNUNET_MULTICAST_Group. - * @param size Number of bytes available in @a buf. - * @param buf Where to copy the message. - * - * @return Number of bytes copied to @a buf. + * Receive message/request fragment acknowledgement from service. */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +static void +handle_group_fragment_ack (void *cls, + const struct GNUNET_MessageHeader *msg) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); struct GNUNET_MULTICAST_Group *grp = cls; - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return 0; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - size_t ret = ntohs (qmsg->size); - grp->th = NULL; - if (ret > size) + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", + grp, grp->in_transmit, grp->acks_pending); + + if (0 == grp->acks_pending) { - reschedule_connect (grp); - return 0; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Ignoring extraneous fragment ACK.\n", grp); + return; } - memcpy (buf, qmsg, ret); + grp->acks_pending--; - GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq); - GNUNET_free (mq); + if (GNUNET_YES != grp->in_transmit) + return; - if (NULL != grp->tmit_head) - transmit_next (grp); + if (GNUNET_YES == grp->is_origin) + origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); + else + member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); - if (GNUNET_NO == grp->in_receive) - { - grp->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Schedule transmission of the next message from our queue. - * - * @param grp Multicast group handle. + * Check unicast request. */ -static void -transmit_next (struct GNUNET_MULTICAST_Group *grp) +static int +check_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); - if (NULL != grp->th || NULL == grp->client) - return; - - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - - grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client, - ntohs (qmsg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - grp); + return GNUNET_OK; } /** - * Try again to connect to the Multicast service. - * - * @param cls Channel handle. - * @param tc Scheduler context. + * Origin receives unicast request from a member. */ static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +handle_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { - struct GNUNET_MULTICAST_Group *grp = cls; + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Origin *orig = cls; + grp = &orig->grp; - recv_reset (grp); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to Multicast service.\n"); - GNUNET_assert (NULL == grp->client); - grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg); - GNUNET_assert (NULL != grp->client); - uint16_t reconn_size = ntohs (grp->reconnect_msg->size); - - if (NULL == grp->tmit_head || - 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size)) - { - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); - memcpy (&mq[1], grp->reconnect_msg, reconn_size); - GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq); - } - transmit_next (grp); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling request callback with a request of size %u.\n", + ntohs (req->header.size)); + + if (NULL != orig->request_cb) + orig->request_cb (grp->cb_cls, req); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Disconnect from the Multicast service. - * - * @param g Group handle to disconnect. + * Receive multicast replay request from service. */ static void -disconnect (void *g) +handle_group_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) + { - struct GNUNET_MULTICAST_Group *grp = g; + struct GNUNET_MULTICAST_Group *grp = cls; - GNUNET_assert (NULL != grp); - if (grp->tmit_head != grp->tmit_tail) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Disconnecting while there are still outstanding messages!\n"); - GNUNET_break (0); - } - if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (grp->reconnect_task); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != grp->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n"); + + if (0 != rep->fragment_id) { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; + if (NULL != grp->replay_frag_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->fragment_id), + GNUNET_ntohll (rep->flags), rh); + } } - if (NULL != grp->reconnect_msg) + else if (0 != rep->message_id) { - GNUNET_free (grp->reconnect_msg); - grp->reconnect_msg = NULL; + if (NULL != grp->replay_msg_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->message_id), + GNUNET_ntohll (rep->fragment_offset), + GNUNET_ntohll (rep->flags), rh); + } } + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Iterator callback for calling message callbacks for all groups. + * Check replay response. */ static int -message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) +check_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { - const struct GNUNET_MessageHeader *msg = cls; - struct GNUNET_MULTICAST_Group *grp = group; + uint16_t size = ntohs (res->header.size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling message callback with a message " - "of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); + if (sizeof (*res) == size) + return GNUNET_OK; - if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, msg); + if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size) + return GNUNET_OK; - return GNUNET_YES; + return GNUNET_SYSERR; } /** - * Iterator callback for calling request callbacks of origins. + * Receive replay response from service. */ -static int -request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) +static void +handle_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { - const struct GNUNET_MULTICAST_RequestHeader *req = cls; - struct GNUNET_MULTICAST_Origin *orig = origin; + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling request callback for a request of type %u and size %u.\n", - ntohs (req->header.type), ntohs (req->header.size)); + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); - orig->request_cb (orig->grp.cb_cls, &req->member_key, - (const struct GNUNET_MessageHeader *) req, 0); - return GNUNET_YES; + // FIXME: return result } /** - * Iterator callback for calling join request callbacks of origins. + * Check join decision. */ static int -join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *group) +check_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) { - const struct MulticastJoinRequestMessage *req = cls; - struct GNUNET_MULTICAST_Group *grp = group; - - struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - jh->group = grp; - jh->member_key = req->member_key; - jh->member_peer = req->member_peer; - - const struct GNUNET_MessageHeader *msg = NULL; - if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) - msg = (const struct GNUNET_MessageHeader *) &req[1]; - - grp->join_cb (grp->cb_cls, &req->member_key, msg, jh); - return GNUNET_YES; + return GNUNET_OK; // checked in handle below } /** - * Iterator callback for calling join decision callbacks of members. + * Member receives join decision. */ -static int -join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *member) +static void +handle_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) { - const struct MulticastJoinDecisionMessage *dcsn = cls; - struct GNUNET_MULTICAST_Member *mem = member; - struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; - const struct GNUNET_MessageHeader *msg = NULL; - if (sizeof (*dcsn) + sizeof (*msg) <= ntohs (dcsn->header.size)) - msg = (const struct GNUNET_MessageHeader *) &dcsn[1]; + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; - // FIXME: grp->join_decision_cb (grp->cb_cls, msg); - return GNUNET_YES; -} + uint16_t dcsn_size = ntohs (dcsn->header.size); + int is_admitted = ntohl (dcsn->is_admitted); -/** - * Function called when we receive a message from the service. - * - * @param cls struct GNUNET_MULTICAST_Group - * @param msg Message received, NULL on timeout or fatal error. - */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_MULTICAST_Group *grp = cls; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Member got join decision from multicast: %d\n", + mem, is_admitted); + + const struct GNUNET_MessageHeader *join_resp = NULL; + uint16_t join_resp_size = 0; - if (NULL == msg) + uint16_t relay_count = ntohl (dcsn->relay_count); + const struct GNUNET_PeerIdentity *relays = NULL; + uint16_t relay_size = relay_count * sizeof (*relays); + if (0 < relay_count) { - // timeout / disconnected from service, reconnect - reschedule_connect (grp); - return; + if (dcsn_size < sizeof (*dcsn) + relay_size) + { + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + else + { + relays = (struct GNUNET_PeerIdentity *) &dcsn[1]; + } } - uint16_t size_eq = 0; - uint16_t size_min = 0; - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d and size %u from Multicast service\n", - type, size); - - switch (type) + if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size) + { + join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size); + join_resp_size = ntohs (join_resp->size); + } + if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size) { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader); - break; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received invalid join decision message from multicast: %u < %u + %u + %u\n", + dcsn_size , sizeof (*dcsn), relay_size, join_resp_size); + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); - break; + if (NULL != mem->join_dcsn_cb) + mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer, + relay_count, relays, join_resp); - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - size_min = sizeof (struct MulticastJoinRequestMessage); - break; + // FIXME: + //if (GNUNET_YES != is_admitted) + // GNUNET_MULTICAST_member_part (mem); - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - size_min = sizeof (struct MulticastJoinDecisionMessage); - break; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} - default: - GNUNET_break_op (0); - type = 0; - } - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) +static void +group_cleanup (struct GNUNET_MULTICAST_Group *grp) +{ + if (NULL != grp->connect_env) { - GNUNET_break_op (0); - type = 0; + GNUNET_MQ_discard (grp->connect_env); + grp->connect_env = NULL; + } + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; } + if (NULL != grp->disconnect_cb) + { + grp->disconnect_cb (grp->disconnect_cls); + grp->disconnect_cb = NULL; + } + GNUNET_free (grp); +} - switch (type) + +static void +group_disconnect (struct GNUNET_MULTICAST_Group *grp, + GNUNET_ContinuationCallback cb, + void *cls) +{ + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = cb; + grp->disconnect_cls = cls; + + if (NULL != grp->mq) { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_cb, (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - message_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - if (GNUNET_YES != grp->is_origin) + struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq); + if (NULL != last) { - GNUNET_break (0); - break; + GNUNET_MQ_notify_sent (last, + (GNUNET_MQ_NotifyCallback) group_cleanup, grp); + } + else + { + group_cleanup (grp); } - - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - join_request_cb, (void *) msg); - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - join_decision_cb, (void *) msg); - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_decision_cb, (void *) msg); - break; } - - if (NULL != grp->client) + else { - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); + group_cleanup (grp); } } @@ -636,85 +574,102 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * Function to call with the decision made for a join request. * * Must be called once and only once in response to an invocation of the - * #GNUNET_MULTICAST_JoinCallback. + * #GNUNET_MULTICAST_JoinRequestCallback. * - * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved - * @param relay_count Number of relays given. - * @param relays Array of suggested peers that might be useful relays to use + * @param join + * Join request handle. + * @param is_admitted + * #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. + * @param relay_count + * Number of relays given. + * @param relays + * Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that * are already part of the multicast group and might thus be willing * to help with routing). If empty, only this local peer (which must * be the multicast origin) is a good candidate for building the * multicast tree. Note that it is unnecessary to specify our own * peer identity in this array. - * @param join_resp Message to send in response to the joining peer; + * @param join_resp + * Message to send in response to the joining peer; * can also be used to redirect the peer to a different group at the * application layer; this response is to be transmitted to the * peer that issued the request even if admission is denied. */ struct GNUNET_MULTICAST_ReplayHandle * -GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, int is_admitted, - unsigned int relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { - struct GNUNET_MULTICAST_Group *grp = jh->group; + struct GNUNET_MULTICAST_Group *grp = join->group; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); - struct MulticastClientJoinDecisionMessage * dcsn; - struct MessageQueue * - mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) - + relay_size + join_resp_size); - dcsn = (struct MulticastClientJoinDecisionMessage *) &mq[1]; + struct MulticastJoinDecisionMessageHeader *hdcsn; + struct MulticastJoinDecisionMessage *dcsn; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_pub_key = join->member_pub_key; + hdcsn->peer = join->peer; + + dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); - dcsn->member_key = jh->member_key; - dcsn->member_peer = jh->member_peer; - dcsn->is_admitted = is_admitted; - dcsn->relay_count = relay_count; + dcsn->is_admitted = htonl (is_admitted); + dcsn->relay_count = htonl (relay_count); if (0 < relay_size) - memcpy (&dcsn[1], relays, relay_size); + GNUNET_memcpy (&dcsn[1], relays, relay_size); if (0 < join_resp_size) - memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); + GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - transmit_next (grp); - - GNUNET_free (jh); + GNUNET_MQ_send (grp->mq, env); + GNUNET_free (join); return NULL; } -/** - * Call informing multicast about the decision taken for a membership test. - * - * @param mth Handle that was given for the query. - * @param result #GNUNET_YES if peer was a member, #GNUNET_NO if peer was not a member, - * #GNUNET_SYSERR if we cannot answer the membership test. - */ -void -GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestHandle *mth, - int result) -{ -} - - /** * Replay a message fragment for the multicast group. * - * @param rh Replay handle identifying which replay operation was requested. - * @param msg Replayed message fragment, NULL if unknown/error. - * @param ec Error code. + * @param rh + * Replay handle identifying which replay operation was requested. + * @param msg + * Replayed message fragment, NULL if not found / an error occurred. + * @param ec + * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode + * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated. */ void GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, const struct GNUNET_MessageHeader *msg, enum GNUNET_MULTICAST_ReplayErrorCode ec) { + uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; + struct MulticastReplayResponseMessage *res; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE); + res->fragment_id = rh->req.fragment_id; + res->message_id = rh->req.message_id; + res->fragment_offset = rh->req.fragment_offset; + res->flags = rh->req.flags; + res->error_code = htonl (ec); + + if (GNUNET_MULTICAST_REC_OK == ec) + { + GNUNET_assert (NULL != msg); + GNUNET_memcpy (&res[1], msg, msg_size); + } + + GNUNET_MQ_send (rh->grp->mq, env); + + if (GNUNET_MULTICAST_REC_OK != ec) + GNUNET_free (rh); } @@ -723,20 +678,35 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, * * Invalidates the replay handle. * - * @param rh Replay session to end. + * @param rh + * Replay session to end. */ void GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) { + struct MulticastReplayResponseMessage *end; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END); + + end->fragment_id = rh->req.fragment_id; + end->message_id = rh->req.message_id; + end->fragment_offset = rh->req.fragment_offset; + end->flags = rh->req.flags; + + GNUNET_MQ_send (rh->grp->mq, env); + GNUNET_free (rh); } /** * Replay a message for the multicast group. * - * @param rh Replay handle identifying which replay operation was requested. - * @param notify Function to call to get the message. - * @param notify_cls Closure for @a notify. + * @param rh + * Replay handle identifying which replay operation was requested. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. */ void GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, @@ -746,6 +716,83 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, } +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig); + + +static void +origin_reconnect (void *cls) +{ + origin_connect (cls); +} + + +/** + * Origin client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +origin_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Origin *orig = cls; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + origin_reconnect, + orig); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as origin. + */ +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (origin_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + orig), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, origin_disconnected, orig); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); +} + + /** * Start a multicast group. * @@ -758,20 +805,28 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, * candidate will be given a response. Members in the group can send messages * to the origin (one at a time). * - * @param cfg Configuration to use. - * @param priv_key ECC key that will be used to sign messages for this + * @param cfg + * Configuration to use. + * @param priv_key + * ECC key that will be used to sign messages for this * multicast session; public key is used to identify the multicast group; - * @param max_fragment_id Maximum fragment ID already sent to the group. + * @param max_fragment_id + * Maximum fragment ID already sent to the group. * 0 for a new group. - * @param join_cb Function called to approve / disapprove joining of a peer. - * @param member_test_cb Function multicast can use to test group membership. - * @param replay_frag_cb Function that can be called to replay a message fragment. - * @param replay_msg_cb Function that can be called to replay a message. - * @param request_cb Function called with message fragments from group members. - * @param message_cb Function called with the message fragments sent to the + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param replay_frag_cb + * Function that can be called to replay a message fragment. + * @param replay_msg_cb + * Function that can be called to replay a message. + * @param request_cb + * Function called with message fragments from group members. + * @param message_cb + * Function called with the message fragments sent to the * network by GNUNET_MULTICAST_origin_to_all(). These message fragments * should be stored for answering replay requests later. - * @param cls Closure for the various callbacks that follow. + * @param cls + * Closure for the various callbacks that follow. * * @return Handle for the origin, NULL on error. */ @@ -779,8 +834,7 @@ struct GNUNET_MULTICAST_Origin * GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, uint64_t max_fragment_id, - GNUNET_MULTICAST_JoinCallback join_cb, - GNUNET_MULTICAST_MembershipTestCallback member_test_cb, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, GNUNET_MULTICAST_RequestCallback request_cb, @@ -789,40 +843,26 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); struct GNUNET_MULTICAST_Group *grp = &orig->grp; - struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start)); - start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); - start->header.size = htons (sizeof (*start)); + struct MulticastOriginStartMessage *start; + grp->connect_env = GNUNET_MQ_msg (start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); start->max_fragment_id = max_fragment_id; - memcpy (&start->group_key, priv_key, sizeof (*priv_key)); + start->group_key = *priv_key; - grp->reconnect_msg = (struct GNUNET_MessageHeader *) start; - grp->is_origin = GNUNET_YES; grp->cfg = cfg; + grp->is_origin = GNUNET_YES; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; grp->cb_cls = cls; - grp->join_cb = join_cb; - grp->member_test_cb = member_test_cb; + grp->join_req_cb = join_request_cb; grp->replay_frag_cb = replay_frag_cb; grp->replay_msg_cb = replay_msg_cb; grp->message_cb = message_cb; orig->request_cb = request_cb; - orig->priv_key = *priv_key; - - GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), - &grp->pub_key_hash); - - if (NULL == origins) - origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); + origin_connect (orig); return orig; } @@ -830,68 +870,83 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Stop a multicast group. * - * @param origin Multicast group to stop. + * @param origin + * Multicast group to stop. */ void -GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) +GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, + GNUNET_ContinuationCallback stop_cb, + void *stop_cls) { - disconnect (&orig->grp); - GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig); - GNUNET_free (orig); + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + group_disconnect (grp, stop_cb, stop_cls); } static void origin_to_all (struct GNUNET_MULTICAST_Origin *orig) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig); struct GNUNET_MULTICAST_Group *grp = &orig->grp; struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); + struct GNUNET_MULTICAST_MessageHeader *msg; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg), + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); - struct GNUNET_MULTICAST_MessageHeader * - msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1]; int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "OriginTransmitNotify() returned error or invalid message size.\n"); + "%p OriginTransmitNotify() returned error or invalid message size.\n", + orig); /* FIXME: handle error */ - GNUNET_free (mq); + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) { - GNUNET_free (mq); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p OriginTransmitNotify() - transmission paused.\n", orig); + GNUNET_MQ_discard (env); return; /* Transmission paused. */ } - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (sizeof (*msg) + buf_size); msg->message_id = GNUNET_htonll (tmit->message_id); msg->group_generation = tmit->group_generation; msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*msg) + buf_size; - transmit_next (grp); + grp->acks_pending++; + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } /** * Send a message to the multicast group. * - * @param orig Handle to the multicast group. - * @param message_id Application layer ID for the message. Opaque to multicast. - * @param group_generation Group generation of the message. - * Documented in struct GNUNET_MULTICAST_MessageHeader. - * @param notify Function to call to get the message. - * @param notify_cls Closure for @a notify. + * @param orig + * Handle to the multicast group. + * @param message_id + * Application layer ID for the message. Opaque to multicast. + * @param group_generation + * Group generation of the message. + * Documented in struct GNUNET_MULTICAST_MessageHeader. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. * * @return Message handle on success, * NULL on error (i.e. another request is already pending). @@ -903,9 +958,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, GNUNET_MULTICAST_OriginTransmitNotify notify, void *notify_cls) { + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + if (GNUNET_YES == grp->in_transmit) + return NULL; + grp->in_transmit = GNUNET_YES; + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; tmit->origin = orig; tmit->message_id = message_id; + tmit->fragment_offset = 0; tmit->group_generation = group_generation; tmit->notify = notify; tmit->notify_cls = notify_cls; @@ -918,11 +979,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, /** * Resume message transmission to multicast group. * - * @param th Transmission to cancel. + * @param th + * Transmission to cancel. */ void GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) { + struct GNUNET_MULTICAST_Group *grp = &th->origin->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; origin_to_all (th->origin); } @@ -930,11 +995,91 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan /** * Cancel request for message transmission to multicast group. * - * @param th Transmission to cancel. + * @param th + * Transmission to cancel. */ void GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) { + th->origin->grp.in_transmit = GNUNET_NO; +} + + +static void +member_connect (struct GNUNET_MULTICAST_Member *mem); + + +static void +member_reconnect (void *cls) +{ + member_connect (cls); +} + + +/** + * Member client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +member_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Member *mem = cls; + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Member client disconnected (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + member_reconnect, + mem); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as member. + */ +static void +member_connect (struct GNUNET_MULTICAST_Member *mem) +{ + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + mem), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + mem), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, member_disconnected, mem); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); } @@ -946,47 +1091,59 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan * @a message_cb is invoked with a (failure) response and then with NULL. If * the join succeeds, outstanding (state) messages and ongoing multicast * messages will be given to the @a message_cb until the member decides to part - * the group. The @a test_cb and @a replay_cb functions may be called at - * anytime by the multicast service to support relaying messages to other - * members of the group. + * the group. The @a replay_cb function may be called at any time by the + * multicast service to support relaying messages to other members of the group. * - * @param cfg Configuration to use. - * @param group_key ECC public key that identifies the group to join. - * @param member_key ECC key that identifies the member and used to sign - * requests sent to the origin. - * @param origin Peer ID of the origin to send unicast requsets to. If NULL, + * @param cfg + * Configuration to use. + * @param group_key + * ECC public key that identifies the group to join. + * @param member_key + * ECC key that identifies the member + * and used to sign requests sent to the origin. + * @param origin + * Peer ID of the origin to send unicast requsets to. If NULL, * unicast requests are sent back via multiple hops on the reverse path * of multicast messages. - * @param relay_count Number of peers in the @a relays array. - * @param relays Peer identities of members of the group, which serve as relays + * @param relay_count + * Number of peers in the @a relays array. + * @param relays + * Peer identities of members of the group, which serve as relays * and can be used to join the group at. and send the @a join_request to. * If empty, the @a join_request is sent directly to the @a origin. - * @param join_msg Application-dependent join message to be passed to the peer - * @a origin. - * @param join_cb Function called to approve / disapprove joining of a peer. - * @param member_test_cb Function multicast can use to test group membership. - * @param replay_frag_cb Function that can be called to replay message fragments + * @param join_msg + * Application-dependent join message to be passed to the peer @a origin. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param join_decision_cb + * Function called to inform about the join decision. + * @param replay_frag_cb + * Function that can be called to replay message fragments * this peer already knows from this group. NULL if this * client is unable to support replay. - * @param replay_msg_cb Function that can be called to replay message fragments + * @param replay_msg_cb + * Function that can be called to replay message fragments * this peer already knows from this group. NULL if this * client is unable to support replay. - * @param message_cb Function to be called for all message fragments we + * @param message_cb + * Function to be called for all message fragments we * receive from the group, excluding those our @a replay_cb * already has. - * @param cls Closure for callbacks. + * @param cls + * Closure for callbacks. + * * @return Handle for the member, NULL on error. */ struct GNUNET_MULTICAST_Member * GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, - const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key, + const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key, const struct GNUNET_PeerIdentity *origin, - uint32_t relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_msg, - GNUNET_MULTICAST_JoinCallback join_cb, - GNUNET_MULTICAST_MembershipTestCallback member_test_cb, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, GNUNET_MULTICAST_MessageCallback message_cb, @@ -997,46 +1154,30 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint16_t relay_size = relay_count * sizeof (*relays); uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; - struct MulticastMemberJoinMessage * - join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size); - join->header.size = htons (sizeof (*join) + relay_size + join_msg_size); - join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); - join->group_key = *group_key; + struct MulticastMemberJoinMessage *join; + grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); + join->group_pub_key = *group_pub_key; join->member_key = *member_key; join->origin = *origin; + join->relay_count = ntohl (relay_count); if (0 < relay_size) - memcpy (&join[1], relays, relay_size); + GNUNET_memcpy (&join[1], relays, relay_size); if (0 < join_msg_size) - memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); + GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); - grp->reconnect_msg = (struct GNUNET_MessageHeader *) join; - grp->is_origin = GNUNET_NO; grp->cfg = cfg; - grp->pub_key = *group_key; + grp->is_origin = GNUNET_NO; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; - grp->join_cb = join_cb; - grp->member_test_cb = member_test_cb; + mem->join_dcsn_cb = join_decision_cb; + grp->join_req_cb = join_request_cb; grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; grp->message_cb = message_cb; grp->cb_cls = cls; - mem->origin = *origin; - mem->relay_count = relay_count; - mem->relays = *relays; - mem->priv_key = *member_key; - - GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); - - if (NULL == members) - members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); - + member_connect (mem); return mem; } @@ -1049,14 +1190,44 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * An application-dependent part message can be transmitted beforehand using * #GNUNET_MULTICAST_member_to_origin()) * - * @param member Membership handle. + * @param member + * Membership handle. */ void -GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) +GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, + GNUNET_ContinuationCallback part_cb, + void *part_cls) { - disconnect (&mem->grp); - GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem); - GNUNET_free (mem); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + mem->join_dcsn_cb = NULL; + grp->join_req_cb = NULL; + grp->message_cb = NULL; + grp->replay_msg_cb = NULL; + grp->replay_frag_cb = NULL; + + group_disconnect (grp, part_cb, part_cls); +} + + +void +member_replay_request (struct GNUNET_MULTICAST_Member *mem, + uint64_t fragment_id, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags) +{ + struct MulticastReplayRequestMessage *rep; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST); + + rep->fragment_id = GNUNET_htonll (fragment_id); + rep->message_id = GNUNET_htonll (message_id); + rep->fragment_offset = GNUNET_htonll (fragment_offset); + rep->flags = GNUNET_htonll (flags); + + GNUNET_MQ_send (mem->grp.mq, env); } @@ -1066,19 +1237,23 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param fragment_id ID of a message fragment that this client would like to - see replayed. - * @param flags Additional flags for the replay request. It is used and defined - * by the replay callback. FIXME: which replay callback? FIXME: use enum? - * FIXME: why not pass reply cb here? - * @return Replay request handle, NULL on error. + * @param member + * Membership handle. + * @param fragment_id + * ID of a message fragment that this client would like to see replayed. + * @param flags + * Additional flags for the replay request. + * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback + * + * @return Replay request handle. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, uint64_t fragment_id, uint64_t flags) { + member_replay_request (mem, fragment_id, 0, 0, flags); + // FIXME: return something useful return NULL; } @@ -1089,82 +1264,88 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param message_id ID of the message this client would like to see replayed. - * @param fragment_offset Offset of the fragment within the message to replay. - * @param flags Additional flags for the replay request. It is used & defined - * by the replay callback. - * @param result_cb Function to be called for the replayed message. - * @param result_cb_cls Closure for @a result_cb. + * @param member + * Membership handle. + * @param message_id + * ID of the message this client would like to see replayed. + * @param fragment_offset + * Offset of the fragment within the message to replay. + * @param flags + * Additional flags for the replay request. + * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback + * * @return Replay request handle, NULL on error. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem, uint64_t message_id, uint64_t fragment_offset, - uint64_t flags, - GNUNET_MULTICAST_ResultCallback result_cb, - void *result_cb_cls) + uint64_t flags) { + member_replay_request (mem, 0, message_id, fragment_offset, flags); + // FIXME: return something useful return NULL; } -/** - * Cancel a replay request. - * - * @param rh Request to cancel. - */ -void -GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh) -{ -} - - static void member_to_origin (struct GNUNET_MULTICAST_Member *mem) { LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); struct GNUNET_MULTICAST_Group *grp = &mem->grp; struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); - size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_RequestHeader *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req), + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); - struct GNUNET_MULTICAST_RequestHeader * - req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1]; int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "MemberTransmitNotify() returned error or invalid message size.\n"); + "MemberTransmitNotify() returned error or invalid message size. " + "ret=%d, buf_size=%u\n", ret, buf_size); /* FIXME: handle error */ + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) - return; /* Transmission paused. */ + { + /* Transmission paused. */ + GNUNET_MQ_discard (env); + return; + } - req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); req->header.size = htons (sizeof (*req) + buf_size); req->request_id = GNUNET_htonll (tmit->request_id); req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*req) + buf_size; - transmit_next (grp); + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } /** * Send a message to the origin of the multicast group. * - * @param mem Membership handle. - * @param request_id Application layer ID for the request. Opaque to multicast. - * @param notify Callback to call to get the message. - * @param notify_cls Closure for @a notify. + * @param mem + * Membership handle. + * @param request_id + * Application layer ID for the request. Opaque to multicast. + * @param notify + * Callback to call to get the message. + * @param notify_cls + * Closure for @a notify. + * * @return Handle to cancel request, NULL on error (i.e. request already pending). */ struct GNUNET_MULTICAST_MemberTransmitHandle * @@ -1173,9 +1354,14 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, GNUNET_MULTICAST_MemberTransmitNotify notify, void *notify_cls) { + if (GNUNET_YES == mem->grp.in_transmit) + return NULL; + mem->grp.in_transmit = GNUNET_YES; + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; tmit->member = mem; tmit->request_id = request_id; + tmit->fragment_offset = 0; tmit->notify = notify; tmit->notify_cls = notify_cls; @@ -1187,23 +1373,29 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, /** * Resume message transmission to origin. * - * @param th Transmission to cancel. + * @param th + * Transmission to cancel. */ void GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { - + struct GNUNET_MULTICAST_Group *grp = &th->member->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; + member_to_origin (th->member); } /** * Cancel request for message transmission to origin. * - * @param th Transmission to cancel. + * @param th + * Transmission to cancel. */ void GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { + th->member->grp.in_transmit = GNUNET_NO; }