X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmulticast%2Fmulticast_api.c;h=7cfe708359db25b447ffc246f3ffcb21ed38c69f;hb=2f45a7c9691aa2670c8902618be5e8011428f0af;hp=da81de486250631e17d6c950811c06e71112d562;hpb=4e1baae59f18ee5d7cd47afe28ced3daaaa5a5ad;p=oweals%2Fgnunet.git diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index da81de486..7cfe70835 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2012, 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2012, 2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -14,43 +14,30 @@ You should have received a copy of the GNU General Public License along with GNUnet; see the file COPYING. If not, write to the - Free Software Foundation, Inc., 59 Temple Place - Suite 330, - Boston, MA 02111-1307, USA. + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. */ /** * @file multicast/multicast_api.c - * @brief multicast service; establish tunnels to distant peers + * @brief Multicast service; implements multicast groups using CADET connections. * @author Christian Grothoff * @author Gabor X Toth */ + #include "platform.h" #include "gnunet_util_lib.h" -#include "gnunet_signatures.h" #include "gnunet_multicast_service.h" #include "multicast.h" #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__) -/** - * Started origins. - * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin - */ -static struct GNUNET_CONTAINER_MultiHashMap *origins; - -/** - * Joined members. - * group_key_hash -> struct GNUNET_MULTICAST_Member - */ -static struct GNUNET_CONTAINER_MultiHashMap *members; - - /** * Handle for a request to send a message to all multicast group members * (from the origin). */ -struct GNUNET_MULTICAST_OriginMessageHandle +struct GNUNET_MULTICAST_OriginTransmitHandle { GNUNET_MULTICAST_OriginTransmitNotify notify; void *notify_cls; @@ -62,47 +49,95 @@ struct GNUNET_MULTICAST_OriginMessageHandle }; -struct GNUNET_MULTICAST_Group +/** + * Handle for a message to be delivered from a member to the origin. + */ +struct GNUNET_MULTICAST_MemberTransmitHandle { - uint8_t is_origin; + GNUNET_MULTICAST_MemberTransmitNotify notify; + void *notify_cls; + struct GNUNET_MULTICAST_Member *member; + + uint64_t request_id; + uint64_t fragment_offset; }; -/** - * Handle for the origin of a multicast group. - */ -struct GNUNET_MULTICAST_Origin + +struct GNUNET_MULTICAST_Group { - struct GNUNET_MULTICAST_Group grp; + /** + * Configuration to use. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; - struct GNUNET_MULTICAST_OriginMessageHandle msg_handle; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; + /** + * Client connection to the service. + */ + struct GNUNET_MQ_Handle *mq; - GNUNET_MULTICAST_JoinCallback join_cb; - GNUNET_MULTICAST_MembershipTestCallback mem_test_cb; + /** + * Message to send on connect. + */ + struct GNUNET_MQ_Envelope *connect_env; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_delay; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + + GNUNET_MULTICAST_JoinRequestCallback join_req_cb; GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; - GNUNET_MULTICAST_RequestCallback request_cb; GNUNET_MULTICAST_MessageCallback message_cb; - void *cls; + void *cb_cls; - uint64_t next_fragment_id; + /** + * Function called after disconnected from the service. + */ + GNUNET_ContinuationCallback disconnect_cb; + + /** + * Closure for @a disconnect_cb. + */ + void *disconnect_cls; + + /** + * Are we currently transmitting a message? + */ + uint8_t in_transmit; + + /** + * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for. + */ + uint8_t acks_pending; - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - struct GNUNET_HashCode pub_key_hash; + /** + * Is this the origin or a member? + */ + uint8_t is_origin; + + /** + * Is this channel in the process of disconnecting from the service? + * #GNUNET_YES or #GNUNET_NO + */ + uint8_t is_disconnecting; }; /** - * Handle for a message to be delivered from a member to the origin. + * Handle for the origin of a multicast group. */ -struct GNUNET_MULTICAST_MemberRequestHandle +struct GNUNET_MULTICAST_Origin { - GNUNET_MULTICAST_MemberTransmitNotify notify; - void *notify_cls; - struct GNUNET_MULTICAST_Member *member; + struct GNUNET_MULTICAST_Group grp; + struct GNUNET_MULTICAST_OriginTransmitHandle tmit; - uint64_t request_id; - uint64_t fragment_offset; + GNUNET_MULTICAST_RequestCallback request_cb; }; @@ -112,43 +147,38 @@ struct GNUNET_MULTICAST_MemberRequestHandle struct GNUNET_MULTICAST_Member { struct GNUNET_MULTICAST_Group grp; + struct GNUNET_MULTICAST_MemberTransmitHandle tmit; - struct GNUNET_MULTICAST_MemberRequestHandle req_handle; + GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; - struct GNUNET_CRYPTO_EddsaPublicKey group_key; - struct GNUNET_CRYPTO_EddsaPrivateKey member_key; - struct GNUNET_PeerIdentity origin; - struct GNUNET_PeerIdentity relays; - uint32_t relay_count; - struct GNUNET_MessageHeader *join_request; - GNUNET_MULTICAST_JoinCallback join_cb; - GNUNET_MULTICAST_MembershipTestCallback member_test_cb; - GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; - GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; - GNUNET_MULTICAST_MessageCallback message_cb; - void *cls; + /** + * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *replay_reqs; uint64_t next_fragment_id; - struct GNUNET_HashCode group_key_hash; }; /** * Handle that identifies a join request. * - * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the + * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the * corresponding calls to #GNUNET_MULTICAST_join_decision(). */ struct GNUNET_MULTICAST_JoinHandle { -}; + struct GNUNET_MULTICAST_Group *group; + /** + * Public key of the member requesting join. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; -/** - * Handle to pass back for the answer of a membership test. - */ -struct GNUNET_MULTICAST_MembershipTestHandle -{ + /** + * Peer identity of the member requesting join. + */ + struct GNUNET_PeerIdentity peer; }; @@ -157,6 +187,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle */ struct GNUNET_MULTICAST_ReplayHandle { + struct GNUNET_MULTICAST_Group *grp; + struct MulticastReplayRequestMessage req; }; @@ -168,108 +200,373 @@ struct GNUNET_MULTICAST_MemberReplayHandle }; +static void +origin_to_all (struct GNUNET_MULTICAST_Origin *orig); + +static void +member_to_origin (struct GNUNET_MULTICAST_Member *mem); + + /** - * Iterator callback for calling message callbacks for all groups. + * Check join request message. */ static int -message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, - void *group) +check_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) { - const struct GNUNET_MessageHeader *msg = cls; - struct GNUNET_MULTICAST_Group *grp = group; + uint16_t size = ntohs (jreq->header.size); - if (GNUNET_YES == grp->is_origin) + if (sizeof (*jreq) == size) + return GNUNET_OK; + + if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; +} + + +/** + * Receive join request from service. + */ +static void +handle_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + struct GNUNET_MULTICAST_JoinHandle *jh; + const struct GNUNET_MessageHeader *jmsg = NULL; + + if (NULL == grp) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling origin's message callback " - "with a message of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); - struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; - orig->message_cb (orig->cls, msg); + GNUNET_break (0); + return; } - else + if (NULL == grp->join_req_cb) + return; + + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) + jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; + + jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_pub_key = jreq->member_pub_key; + jh->peer = jreq->peer; + grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check multicast message. + */ +static int +check_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + return GNUNET_OK; +} + + +/** + * Receive multicast message from service. + */ +static void +handle_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling message callback with a message of size %u.\n", + ntohs (mmsg->header.size)); + + if (NULL != grp->message_cb) + grp->message_cb (grp->cb_cls, mmsg); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Receive message/request fragment acknowledgement from service. + */ +static void +handle_group_fragment_ack (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", + grp, grp->in_transmit, grp->acks_pending); + + if (0 == grp->acks_pending) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling member's message callback " - "with a message of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); - struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; - mem->message_cb (mem->cls, msg); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Ignoring extraneous fragment ACK.\n", grp); + return; } + grp->acks_pending--; + + if (GNUNET_YES != grp->in_transmit) + return; + + if (GNUNET_YES == grp->is_origin) + origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); + else + member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); - return GNUNET_YES; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Handle a multicast message from the service. - * - * Call message callbacks of all origins and members of the destination group. - * - * @param grp Destination group of the message. - * @param msg The message. + * Check unicast request. + */ +static int +check_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + return GNUNET_OK; +} + + +/** + * Origin receives unicast request from a member. */ static void -handle_multicast_message (struct GNUNET_MULTICAST_Group *grp, - const struct GNUNET_MULTICAST_MessageHeader *msg) +handle_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { - struct GNUNET_HashCode *hash; + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Origin *orig = cls; + grp = &orig->grp; - if (GNUNET_YES == grp->is_origin) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Calling request callback with a request of size %u.\n", + ntohs (req->header.size)); + + if (NULL != orig->request_cb) + orig->request_cb (grp->cb_cls, req); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Receive multicast replay request from service. + */ +static void +handle_group_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) + +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n"); + + if (0 != rep->fragment_id) { - struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; - hash = &orig->pub_key_hash; + if (NULL != grp->replay_frag_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->fragment_id), + GNUNET_ntohll (rep->flags), rh); + } } - else + else if (0 != rep->message_id) { - struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; - hash = &mem->group_key_hash; + if (NULL != grp->replay_msg_cb) + { + struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); + rh->grp = grp; + rh->req = *rep; + grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key, + GNUNET_ntohll (rep->message_id), + GNUNET_ntohll (rep->fragment_offset), + GNUNET_ntohll (rep->flags), rh); + } } - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, message_callback, - (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, message_callback, - (void *) msg); + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Iterator callback for calling request callbacks of origins. + * Check replay response. */ static int -request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, - void *origin) +check_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { - const struct GNUNET_MULTICAST_RequestHeader *req = cls; - struct GNUNET_MULTICAST_Origin *orig = origin; + uint16_t size = ntohs (res->header.size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling request callback for a request of type %u and size %u.\n", - ntohs (req->header.type), ntohs (req->header.size)); + if (sizeof (*res) == size) + return GNUNET_OK; + + if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size) + return GNUNET_OK; - orig->request_cb (orig->cls, &req->member_key, - (const struct GNUNET_MessageHeader *) req, 0); - return GNUNET_YES; + return GNUNET_SYSERR; } /** - * Handle a multicast request from the service. - * - * Call request callbacks of all origins of the destination group. - * - * @param grp Destination group of the message. - * @param msg The message. + * Receive replay response from service. */ static void -handle_multicast_request (const struct GNUNET_HashCode *group_key_hash, - const struct GNUNET_MULTICAST_RequestHeader *req) +handle_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash, - request_callback, (void *) req); + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; + + if (GNUNET_YES == grp->is_disconnecting) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); + + // FIXME: return result +} + + +/** + * Check join decision. + */ +static int +check_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + return GNUNET_OK; // checked in handle below +} + + +/** + * Member receives join decision. + */ +static void +handle_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member *mem = cls; + grp = &mem->grp; + + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; + + uint16_t dcsn_size = ntohs (dcsn->header.size); + int is_admitted = ntohl (dcsn->is_admitted); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Member got join decision from multicast: %d\n", + mem, is_admitted); + + const struct GNUNET_MessageHeader *join_resp = NULL; + uint16_t join_resp_size = 0; + + uint16_t relay_count = ntohl (dcsn->relay_count); + const struct GNUNET_PeerIdentity *relays = NULL; + uint16_t relay_size = relay_count * sizeof (*relays); + if (0 < relay_count) + { + if (dcsn_size < sizeof (*dcsn) + relay_size) + { + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + else + { + relays = (struct GNUNET_PeerIdentity *) &dcsn[1]; + } + } + + if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size) + { + join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size); + join_resp_size = ntohs (join_resp->size); + } + if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received invalid join decision message from multicast: %u < %u + %u + %u\n", + dcsn_size , sizeof (*dcsn), relay_size, join_resp_size); + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + + if (NULL != mem->join_dcsn_cb) + mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer, + relay_count, relays, join_resp); + + // FIXME: + //if (GNUNET_YES != is_admitted) + // GNUNET_MULTICAST_member_part (mem); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static void +group_cleanup (struct GNUNET_MULTICAST_Group *grp) +{ + if (NULL != grp->connect_env) + { + GNUNET_MQ_discard (grp->connect_env); + grp->connect_env = NULL; + } + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + if (NULL != grp->disconnect_cb) + { + grp->disconnect_cb (grp->disconnect_cls); + grp->disconnect_cb = NULL; + } + GNUNET_free (grp); +} + + +static void +group_disconnect (struct GNUNET_MULTICAST_Group *grp, + GNUNET_ContinuationCallback cb, + void *cls) +{ + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = cb; + grp->disconnect_cls = cls; + + if (NULL != grp->mq) + { + struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq); + if (NULL != last) + { + GNUNET_MQ_notify_sent (last, + (GNUNET_MQ_NotifyCallback) group_cleanup, grp); + } + else + { + group_cleanup (grp); + } + } + else + { + group_cleanup (grp); + } } @@ -277,61 +574,102 @@ handle_multicast_request (const struct GNUNET_HashCode *group_key_hash, * Function to call with the decision made for a join request. * * Must be called once and only once in response to an invocation of the - * #GNUNET_MULTICAST_JoinCallback. + * #GNUNET_MULTICAST_JoinRequestCallback. * - * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved - * @param relay_count Number of relays given. - * @param relays Array of suggested peers that might be useful relays to use + * @param join + * Join request handle. + * @param is_admitted + * #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. + * @param relay_count + * Number of relays given. + * @param relays + * Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that * are already part of the multicast group and might thus be willing * to help with routing). If empty, only this local peer (which must * be the multicast origin) is a good candidate for building the * multicast tree. Note that it is unnecessary to specify our own * peer identity in this array. - * @param join_response Message to send in response to the joining peer; + * @param join_resp + * Message to send in response to the joining peer; * can also be used to redirect the peer to a different group at the * application layer; this response is to be transmitted to the * peer that issued the request even if admission is denied. */ struct GNUNET_MULTICAST_ReplayHandle * -GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, int is_admitted, - unsigned int relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, - const struct GNUNET_MessageHeader *join_response) + const struct GNUNET_MessageHeader *join_resp) { + struct GNUNET_MULTICAST_Group *grp = join->group; + uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; + uint16_t relay_size = relay_count * sizeof (*relays); + + struct MulticastJoinDecisionMessageHeader *hdcsn; + struct MulticastJoinDecisionMessage *dcsn; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_pub_key = join->member_pub_key; + hdcsn->peer = join->peer; + + dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; + dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); + dcsn->is_admitted = htonl (is_admitted); + dcsn->relay_count = htonl (relay_count); + if (0 < relay_size) + GNUNET_memcpy (&dcsn[1], relays, relay_size); + if (0 < join_resp_size) + GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); + + GNUNET_MQ_send (grp->mq, env); + GNUNET_free (join); return NULL; } -/** - * Call informing multicast about the decision taken for a membership test. - * - * @param mth Handle that was given for the query. - * @param result #GNUNET_YES if peer was a member, #GNUNET_NO if peer was not a member, - * #GNUNET_SYSERR if we cannot answer the membership test. - */ -void -GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestHandle *mth, - int result) -{ -} - - /** * Replay a message fragment for the multicast group. * - * @param rh Replay handle identifying which replay operation was requested. - * @param msg Replayed message fragment, NULL if unknown/error. - * @param ec Error code. + * @param rh + * Replay handle identifying which replay operation was requested. + * @param msg + * Replayed message fragment, NULL if not found / an error occurred. + * @param ec + * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode + * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated. */ void GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, const struct GNUNET_MessageHeader *msg, enum GNUNET_MULTICAST_ReplayErrorCode ec) { + uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; + struct MulticastReplayResponseMessage *res; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE); + res->fragment_id = rh->req.fragment_id; + res->message_id = rh->req.message_id; + res->fragment_offset = rh->req.fragment_offset; + res->flags = rh->req.flags; + res->error_code = htonl (ec); + + if (GNUNET_MULTICAST_REC_OK == ec) + { + GNUNET_assert (NULL != msg); + GNUNET_memcpy (&res[1], msg, msg_size); + } + + GNUNET_MQ_send (rh->grp->mq, env); + + if (GNUNET_MULTICAST_REC_OK != ec) + GNUNET_free (rh); } @@ -340,20 +678,35 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, * * Invalidates the replay handle. * - * @param rh Replay session to end. + * @param rh + * Replay session to end. */ void GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) { + struct MulticastReplayResponseMessage *end; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END); + + end->fragment_id = rh->req.fragment_id; + end->message_id = rh->req.message_id; + end->fragment_offset = rh->req.fragment_offset; + end->flags = rh->req.flags; + + GNUNET_MQ_send (rh->grp->mq, env); + GNUNET_free (rh); } /** * Replay a message for the multicast group. * - * @param rh Replay handle identifying which replay operation was requested. - * @param notify Function to call to get the message. - * @param notify_cls Closure for @a notify. + * @param rh + * Replay handle identifying which replay operation was requested. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. */ void GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, @@ -363,6 +716,83 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, } +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig); + + +static void +origin_reconnect (void *cls) +{ + origin_connect (cls); +} + + +/** + * Origin client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +origin_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Origin *orig = cls; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + origin_reconnect, + orig); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as origin. + */ +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (origin_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + orig), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, origin_disconnected, orig); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); +} + + /** * Start a multicast group. * @@ -375,28 +805,36 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, * candidate will be given a response. Members in the group can send messages * to the origin (one at a time). * - * @param cfg Configuration to use. - * @param priv_key ECC key that will be used to sign messages for this + * @param cfg + * Configuration to use. + * @param priv_key + * ECC key that will be used to sign messages for this * multicast session; public key is used to identify the multicast group; - * @param next_fragment_id Next fragment ID to continue counting fragments from - * when restarting the origin. 0 for a new group. - * @param join_cb Function called to approve / disapprove joining of a peer. - * @param mem_test_cb Function multicast can use to test group membership. - * @param replay_frag_cb Function that can be called to replay a message fragment. - * @param replay_msg_cb Function that can be called to replay a message. - * @param request_cb Function called with message fragments from group members. - * @param message_cb Function called with the message fragments sent to the + * @param max_fragment_id + * Maximum fragment ID already sent to the group. + * 0 for a new group. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param replay_frag_cb + * Function that can be called to replay a message fragment. + * @param replay_msg_cb + * Function that can be called to replay a message. + * @param request_cb + * Function called with message fragments from group members. + * @param message_cb + * Function called with the message fragments sent to the * network by GNUNET_MULTICAST_origin_to_all(). These message fragments * should be stored for answering replay requests later. - * @param cls Closure for the various callbacks that follow. + * @param cls + * Closure for the various callbacks that follow. + * * @return Handle for the origin, NULL on error. */ struct GNUNET_MULTICAST_Origin * GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, - uint64_t next_fragment_id, - GNUNET_MULTICAST_JoinCallback join_cb, - GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, + uint64_t max_fragment_id, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, GNUNET_MULTICAST_RequestCallback request_cb, @@ -404,29 +842,27 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls) { struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); - orig->grp.is_origin = GNUNET_YES; - orig->priv_key = *priv_key; - orig->next_fragment_id = next_fragment_id; - orig->join_cb = join_cb; - orig->mem_test_cb = mem_test_cb; - orig->replay_frag_cb = replay_frag_cb; - orig->replay_msg_cb = replay_msg_cb; - orig->request_cb = request_cb; - orig->message_cb = message_cb; - orig->cls = cls; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; - GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key); - GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key), - &orig->pub_key_hash); + struct MulticastOriginStartMessage *start; + grp->connect_env = GNUNET_MQ_msg (start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); + start->max_fragment_id = max_fragment_id; + start->group_key = *priv_key; - if (NULL == origins) - origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + grp->cfg = cfg; + grp->is_origin = GNUNET_YES; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; - GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + grp->cb_cls = cls; + grp->join_req_cb = join_request_cb; + grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; + grp->message_cb = message_cb; - /* FIXME: send ORIGIN_START to service */ + orig->request_cb = request_cb; + origin_connect (orig); return orig; } @@ -434,130 +870,216 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, /** * Stop a multicast group. * - * @param origin Multicast group to stop. + * @param origin + * Multicast group to stop. */ void -GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) +GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, + GNUNET_ContinuationCallback stop_cb, + void *stop_cls) { - GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig); - GNUNET_free (orig); + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + group_disconnect (grp, stop_cb, stop_cls); } -/* FIXME: for now just call clients' callbacks - * without sending anything to multicast. */ static void -schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +origin_to_all (struct GNUNET_MULTICAST_Origin *orig) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n"); - struct GNUNET_MULTICAST_Origin *orig = cls; - struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; + LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig); + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = ""; - struct GNUNET_MULTICAST_MessageHeader *msg - = (struct GNUNET_MULTICAST_MessageHeader *) buf; - int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); + struct GNUNET_MULTICAST_MessageHeader *msg; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg), + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + + int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "OriginTransmitNotify() returned error or invalid message size.\n"); + "%p OriginTransmitNotify() returned error or invalid message size.\n", + orig); /* FIXME: handle error */ + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p OriginTransmitNotify() - transmission paused.\n", orig); + GNUNET_MQ_discard (env); return; /* Transmission paused. */ + } - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (sizeof (*msg) + buf_size); - msg->message_id = GNUNET_htonll (mh->message_id); - msg->group_generation = mh->group_generation; - - /* FIXME: add fragment ID and signature in the service instead of here */ - msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++); - msg->fragment_offset = GNUNET_htonll (mh->fragment_offset); - mh->fragment_offset += sizeof (*msg) + buf_size; - msg->purpose.size = htonl (sizeof (*msg) + buf_size - - sizeof (msg->header) - - sizeof (msg->hop_counter) - - sizeof (msg->signature)); - msg->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); - - if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&orig->priv_key, &msg->purpose, - &msg->signature)) - { - /* FIXME: handle error */ - return; - } + msg->message_id = GNUNET_htonll (tmit->message_id); + msg->group_generation = tmit->group_generation; + msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); + tmit->fragment_offset += sizeof (*msg) + buf_size; - /* FIXME: send msg to the service and only then call handle_multicast_message - * with the returned signed message. - */ - handle_multicast_message (&orig->grp, msg); + grp->acks_pending++; + GNUNET_MQ_send (grp->mq, env); - if (GNUNET_NO == ret) - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_origin_to_all, orig); + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } /** * Send a message to the multicast group. * - * @param origin Handle to the multicast group. - * @param message_id Application layer ID for the message. Opaque to multicast. - * @param group_generation Group generation of the message. Documented in - * `struct GNUNET_MULTICAST_MessageHeader`. - * @param notify Function to call to get the message. - * @param notify_cls Closure for @a notify. - * @return NULL on error (i.e. request already pending). + * @param orig + * Handle to the multicast group. + * @param message_id + * Application layer ID for the message. Opaque to multicast. + * @param group_generation + * Group generation of the message. + * Documented in struct GNUNET_MULTICAST_MessageHeader. + * @param notify + * Function to call to get the message. + * @param notify_cls + * Closure for @a notify. + * + * @return Message handle on success, + * NULL on error (i.e. another request is already pending). */ -struct GNUNET_MULTICAST_OriginMessageHandle * -GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, +struct GNUNET_MULTICAST_OriginTransmitHandle * +GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, uint64_t message_id, uint64_t group_generation, GNUNET_MULTICAST_OriginTransmitNotify notify, void *notify_cls) { - struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle; - mh->origin = origin; - mh->message_id = message_id; - mh->group_generation = group_generation; - mh->notify = notify; - mh->notify_cls = notify_cls; - - /* add some delay for testing */ - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_origin_to_all, origin); - return &origin->msg_handle; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + if (GNUNET_YES == grp->in_transmit) + return NULL; + grp->in_transmit = GNUNET_YES; + + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + tmit->origin = orig; + tmit->message_id = message_id; + tmit->fragment_offset = 0; + tmit->group_generation = group_generation; + tmit->notify = notify; + tmit->notify_cls = notify_cls; + + origin_to_all (orig); + return tmit; } /** * Resume message transmission to multicast group. * - * @param mh Request to cancel. + * @param th + * Transmission to cancel. */ void -GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh) +GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) { - GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin); + struct GNUNET_MULTICAST_Group *grp = &th->origin->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; + origin_to_all (th->origin); } /** * Cancel request for message transmission to multicast group. * - * @param mh Request to cancel. + * @param th + * Transmission to cancel. */ void -GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHandle *mh) +GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) +{ + th->origin->grp.in_transmit = GNUNET_NO; +} + + +static void +member_connect (struct GNUNET_MULTICAST_Member *mem); + + +static void +member_reconnect (void *cls) { + member_connect (cls); +} + + +/** + * Member client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +member_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Member *mem = cls; + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Member client disconnected (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + member_reconnect, + mem); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as member. + */ +static void +member_connect (struct GNUNET_MULTICAST_Member *mem) +{ + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + mem), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + mem), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, member_disconnected, mem); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); } @@ -569,83 +1091,93 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginMessageHand * @a message_cb is invoked with a (failure) response and then with NULL. If * the join succeeds, outstanding (state) messages and ongoing multicast * messages will be given to the @a message_cb until the member decides to part - * the group. The @a test_cb and @a replay_cb functions may be called at - * anytime by the multicast service to support relaying messages to other - * members of the group. - * - * @param cfg Configuration to use. - * @param group_key ECC public key that identifies the group to join. - * @param member_key ECC key that identifies the member and used to sign - * requests sent to the origin. - * @param origin Peer ID of the origin to send unicast requsets to. If NULL, + * the group. The @a replay_cb function may be called at any time by the + * multicast service to support relaying messages to other members of the group. + * + * @param cfg + * Configuration to use. + * @param group_key + * ECC public key that identifies the group to join. + * @param member_key + * ECC key that identifies the member + * and used to sign requests sent to the origin. + * @param origin + * Peer ID of the origin to send unicast requsets to. If NULL, * unicast requests are sent back via multiple hops on the reverse path * of multicast messages. - * @param relay_count Number of peers in the @a relays array. - * @param relays Peer identities of members of the group, which serve as relays + * @param relay_count + * Number of peers in the @a relays array. + * @param relays + * Peer identities of members of the group, which serve as relays * and can be used to join the group at. and send the @a join_request to. * If empty, the @a join_request is sent directly to the @a origin. - * @param join_request Application-dependent join request to be passed to the peer - * @a relay (might, for example, contain a user, bind user - * identity/pseudonym to peer identity, application-level message to - * origin, etc.). - * @param join_cb Function called to approve / disapprove joining of a peer. - * @param mem_test_cb Function multicast can use to test group membership. - * @param replay_frag_cb Function that can be called to replay message fragments + * @param join_msg + * Application-dependent join message to be passed to the peer @a origin. + * @param join_request_cb + * Function called to approve / disapprove joining of a peer. + * @param join_decision_cb + * Function called to inform about the join decision. + * @param replay_frag_cb + * Function that can be called to replay message fragments * this peer already knows from this group. NULL if this * client is unable to support replay. - * @param replay_msg_cb Function that can be called to replay message fragments + * @param replay_msg_cb + * Function that can be called to replay message fragments * this peer already knows from this group. NULL if this * client is unable to support replay. - * @param message_cb Function to be called for all message fragments we + * @param message_cb + * Function to be called for all message fragments we * receive from the group, excluding those our @a replay_cb * already has. - * @param cls Closure for callbacks. + * @param cls + * Closure for callbacks. + * * @return Handle for the member, NULL on error. */ struct GNUNET_MULTICAST_Member * GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, - const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key, + const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key, + const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key, const struct GNUNET_PeerIdentity *origin, - uint32_t relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, - const struct GNUNET_MessageHeader *join_request, - GNUNET_MULTICAST_JoinCallback join_cb, - GNUNET_MULTICAST_MembershipTestCallback member_test_cb, + const struct GNUNET_MessageHeader *join_msg, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, GNUNET_MULTICAST_MessageCallback message_cb, void *cls) { struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem)); - mem->group_key = *group_key; - mem->member_key = *member_key; - mem->origin = *origin; - mem->relay_count = relay_count; - mem->relays = *relays; - mem->join_cb = join_cb; - mem->member_test_cb = member_test_cb; - mem->replay_frag_cb = replay_frag_cb; - mem->message_cb = message_cb; - mem->cls = cls; - - if (NULL != join_request) - { - uint16_t size = ntohs (join_request->size); - mem->join_request = GNUNET_malloc (size); - memcpy (mem->join_request, join_request, size); - } - - GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), &mem->group_key_hash); - - if (NULL == members) - members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - /* FIXME: send MEMBER_JOIN to service */ - + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + uint16_t relay_size = relay_count * sizeof (*relays); + uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; + struct MulticastMemberJoinMessage *join; + grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); + join->group_pub_key = *group_pub_key; + join->member_key = *member_key; + join->origin = *origin; + join->relay_count = ntohl (relay_count); + if (0 < relay_size) + GNUNET_memcpy (&join[1], relays, relay_size); + if (0 < join_msg_size) + GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); + + grp->cfg = cfg; + grp->is_origin = GNUNET_NO; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + + mem->join_dcsn_cb = join_decision_cb; + grp->join_req_cb = join_request_cb; + grp->replay_frag_cb = replay_frag_cb; + grp->replay_msg_cb = replay_msg_cb; + grp->message_cb = message_cb; + grp->cb_cls = cls; + + member_connect (mem); return mem; } @@ -658,13 +1190,44 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * An application-dependent part message can be transmitted beforehand using * #GNUNET_MULTICAST_member_to_origin()) * - * @param member Membership handle. + * @param member + * Membership handle. */ void -GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) +GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, + GNUNET_ContinuationCallback part_cb, + void *part_cls) { - GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem); - GNUNET_free (mem); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + mem->join_dcsn_cb = NULL; + grp->join_req_cb = NULL; + grp->message_cb = NULL; + grp->replay_msg_cb = NULL; + grp->replay_frag_cb = NULL; + + group_disconnect (grp, part_cb, part_cls); +} + + +void +member_replay_request (struct GNUNET_MULTICAST_Member *mem, + uint64_t fragment_id, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags) +{ + struct MulticastReplayRequestMessage *rep; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST); + + rep->fragment_id = GNUNET_htonll (fragment_id); + rep->message_id = GNUNET_htonll (message_id); + rep->fragment_offset = GNUNET_htonll (fragment_offset); + rep->flags = GNUNET_htonll (flags); + + GNUNET_MQ_send (mem->grp.mq, env); } @@ -674,19 +1237,23 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param fragment_id ID of a message fragment that this client would like to - see replayed. - * @param flags Additional flags for the replay request. It is used and defined - * by the replay callback. FIXME: which replay callback? FIXME: use enum? - * FIXME: why not pass reply cb here? - * @return Replay request handle, NULL on error. + * @param member + * Membership handle. + * @param fragment_id + * ID of a message fragment that this client would like to see replayed. + * @param flags + * Additional flags for the replay request. + * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback + * + * @return Replay request handle. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, uint64_t fragment_id, uint64_t flags) { + member_replay_request (mem, fragment_id, 0, 0, flags); + // FIXME: return something useful return NULL; } @@ -697,146 +1264,138 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, * Useful if messages below the @e max_known_fragment_id given when joining are * needed and not known to the client. * - * @param member Membership handle. - * @param message_id ID of the message this client would like to see replayed. - * @param fragment_offset Offset of the fragment within the message to replay. - * @param flags Additional flags for the replay request. It is used & defined - * by the replay callback. - * @param result_cb Function to be called for the replayed message. - * @param result_cb_cls Closure for @a result_cb. + * @param member + * Membership handle. + * @param message_id + * ID of the message this client would like to see replayed. + * @param fragment_offset + * Offset of the fragment within the message to replay. + * @param flags + * Additional flags for the replay request. + * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback + * * @return Replay request handle, NULL on error. */ struct GNUNET_MULTICAST_MemberReplayHandle * -GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member, +GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem, uint64_t message_id, uint64_t fragment_offset, - uint64_t flags, - GNUNET_MULTICAST_ResultCallback result_cb, - void *result_cb_cls) + uint64_t flags) { + member_replay_request (mem, 0, message_id, fragment_offset, flags); + // FIXME: return something useful return NULL; } -/** - * Cancel a replay request. - * - * @param rh Request to cancel. - */ -void -GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh) -{ -} - - -/* FIXME: for now just send back to the client what it sent. */ static void -schedule_member_to_origin (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +member_to_origin (struct GNUNET_MULTICAST_Member *mem) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n"); - struct GNUNET_MULTICAST_Member *mem = cls; - struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle; + LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); - size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; - char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; - struct GNUNET_MULTICAST_RequestHeader *req - = (struct GNUNET_MULTICAST_RequestHeader *) buf; - int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]); + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_RequestHeader *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req), + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); + + int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "MemberTransmitNotify() returned error or invalid message size.\n"); + "MemberTransmitNotify() returned error or invalid message size. " + "ret=%d, buf_size=%u\n", ret, buf_size); /* FIXME: handle error */ + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) - return; /* Transmission paused. */ - - req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); - req->header.size = htons (sizeof (*req) + buf_size); - req->request_id = GNUNET_htonll (rh->request_id); - - /* FIXME: add fragment ID and signature in the service instead of here */ - req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++); - req->fragment_offset = GNUNET_ntohll (rh->fragment_offset); - rh->fragment_offset += sizeof (*req) + buf_size; - req->purpose.size = htonl (sizeof (*req) + buf_size - - sizeof (req->header) - - sizeof (req->member_key) - - sizeof (req->signature)); - req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE); - - if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose, - &req->signature)) { - /* FIXME: handle error */ + /* Transmission paused. */ + GNUNET_MQ_discard (env); return; } - /* FIXME: send req to the service and only then call handle_multicast_request - * with the returned request. - */ - handle_multicast_request (&mem->group_key_hash, req); + req->header.size = htons (sizeof (*req) + buf_size); + req->request_id = GNUNET_htonll (tmit->request_id); + req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); + tmit->fragment_offset += sizeof (*req) + buf_size; - if (GNUNET_NO == ret) - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_member_to_origin, mem); + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } /** * Send a message to the origin of the multicast group. * - * @param member Membership handle. - * @param request_id Application layer ID for the request. Opaque to multicast. - * @param notify Callback to call to get the message. - * @param notify_cls Closure for @a notify. + * @param mem + * Membership handle. + * @param request_id + * Application layer ID for the request. Opaque to multicast. + * @param notify + * Callback to call to get the message. + * @param notify_cls + * Closure for @a notify. + * * @return Handle to cancel request, NULL on error (i.e. request already pending). */ -struct GNUNET_MULTICAST_MemberRequestHandle * -GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member, +struct GNUNET_MULTICAST_MemberTransmitHandle * +GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, uint64_t request_id, GNUNET_MULTICAST_MemberTransmitNotify notify, void *notify_cls) { - struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle; - rh->member = member; - rh->request_id = request_id; - rh->notify = notify; - rh->notify_cls = notify_cls; - - /* FIXME: remove delay, it's there only for testing */ - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_member_to_origin, member); - return &member->req_handle; + if (GNUNET_YES == mem->grp.in_transmit) + return NULL; + mem->grp.in_transmit = GNUNET_YES; + + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + tmit->member = mem; + tmit->request_id = request_id; + tmit->fragment_offset = 0; + tmit->notify = notify; + tmit->notify_cls = notify_cls; + + member_to_origin (mem); + return tmit; } /** * Resume message transmission to origin. * - * @param rh Request to cancel. + * @param th + * Transmission to cancel. */ void -GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberRequestHandle *rh) +GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { - + struct GNUNET_MULTICAST_Group *grp = &th->member->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; + member_to_origin (th->member); } /** * Cancel request for message transmission to origin. * - * @param rh Request to cancel. + * @param th + * Transmission to cancel. */ void -GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberRequestHandle *rh) +GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { + th->member->grp.in_transmit = GNUNET_NO; }