X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fmulticast%2Fmulticast_api.c;h=7cfe708359db25b447ffc246f3ffcb21ed38c69f;hb=2f45a7c9691aa2670c8902618be5e8011428f0af;hp=76c7fb004f4fa2994a6b5b0d8a3f0b29082ba407;hpb=0219647545ddc7c66dcab13414541cf57d792618;p=oweals%2Fgnunet.git diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 76c7fb004..7cfe70835 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2012, 2013 Christian Grothoff (and other contributing authors) + Copyright (C) 2012, 2013 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -73,12 +73,22 @@ struct GNUNET_MULTICAST_Group /** * Client connection to the service. */ - struct GNUNET_CLIENT_MANAGER_Connection *client; + struct GNUNET_MQ_Handle *mq; /** - * Message to send on reconnect. + * Message to send on connect. */ - struct GNUNET_MessageHeader *connect_msg; + struct GNUNET_MQ_Envelope *connect_env; + + /** + * Time to wait until we try to reconnect on failure. + */ + struct GNUNET_TIME_Relative reconnect_delay; + + /** + * Task for reconnecting when the listener fails. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; GNUNET_MULTICAST_JoinRequestCallback join_req_cb; GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; @@ -101,6 +111,11 @@ struct GNUNET_MULTICAST_Group */ uint8_t in_transmit; + /** + * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for. + */ + uint8_t acks_pending; + /** * Is this the origin or a member? */ @@ -158,7 +173,7 @@ struct GNUNET_MULTICAST_JoinHandle /** * Public key of the member requesting join. */ - struct GNUNET_CRYPTO_EcdsaPublicKey member_key; + struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key; /** * Peer identity of the member requesting join. @@ -182,37 +197,32 @@ struct GNUNET_MULTICAST_ReplayHandle */ struct GNUNET_MULTICAST_MemberReplayHandle { - - GNUNET_MULTICAST_ResultCallback result_cb; - void *result_cls; }; -/** - * Send first message to the service after connecting. - */ static void -group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) -{ - uint16_t cmsg_size = ntohs (grp->connect_msg->size); - struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); - memcpy (cmsg, grp->connect_msg, cmsg_size); - GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg); -} +origin_to_all (struct GNUNET_MULTICAST_Origin *orig); + +static void +member_to_origin (struct GNUNET_MULTICAST_Member *mem); /** - * Got disconnected from service. Reconnect. + * Check join request message. */ -static void -group_recv_disconnect (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +static int +check_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) { - struct GNUNET_MULTICAST_Group * - grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); - GNUNET_CLIENT_MANAGER_reconnect (client); - group_send_connect_msg (grp); + uint16_t size = ntohs (jreq->header.size); + + if (sizeof (*jreq) == size) + return GNUNET_OK; + + if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; } @@ -220,16 +230,13 @@ group_recv_disconnect (void *cls, * Receive join request from service. */ static void -group_recv_join_request (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_group_join_request (void *cls, + const struct MulticastJoinRequestMessage *jreq) { - struct GNUNET_MULTICAST_Group *grp; - const struct MulticastJoinRequestMessage *jreq; + struct GNUNET_MULTICAST_Group *grp = cls; struct GNUNET_MULTICAST_JoinHandle *jh; - const struct GNUNET_MessageHeader *jmsg; + const struct GNUNET_MessageHeader *jmsg = NULL; - grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); if (NULL == grp) { GNUNET_break (0); @@ -237,17 +244,28 @@ group_recv_join_request (void *cls, } if (NULL == grp->join_req_cb) return; - /* FIXME: this fails to check that 'msg' is well-formed! */ - jreq = (const struct MulticastJoinRequestMessage *) msg; + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; - else - jmsg = NULL; + jh = GNUNET_malloc (sizeof (*jh)); jh->group = grp; - jh->member_key = jreq->member_key; + jh->member_pub_key = jreq->member_pub_key; jh->peer = jreq->peer; - grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh); + grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check multicast message. + */ +static int +check_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + return GNUNET_OK; } @@ -255,14 +273,10 @@ group_recv_join_request (void *cls, * Receive multicast message from service. */ static void -group_recv_message (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_group_message (void *cls, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) { - struct GNUNET_MULTICAST_Group * - grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); - struct GNUNET_MULTICAST_MessageHeader * - mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; + struct GNUNET_MULTICAST_Group *grp = cls; if (GNUNET_YES == grp->is_disconnecting) return; @@ -273,23 +287,65 @@ group_recv_message (void *cls, if (NULL != grp->message_cb) grp->message_cb (grp->cb_cls, mmsg); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Origin receives uniquest request from a member. + * Receive message/request fragment acknowledgement from service. */ static void -origin_recv_request (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_group_fragment_ack (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n", + grp, grp->in_transmit, grp->acks_pending); + + if (0 == grp->acks_pending) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p Ignoring extraneous fragment ACK.\n", grp); + return; + } + grp->acks_pending--; + + if (GNUNET_YES != grp->in_transmit) + return; + + if (GNUNET_YES == grp->is_origin) + origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp); + else + member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +/** + * Check unicast request. + */ +static int +check_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + return GNUNET_OK; +} + + +/** + * Origin receives unicast request from a member. + */ +static void +handle_origin_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { struct GNUNET_MULTICAST_Group *grp; - struct GNUNET_MULTICAST_Origin * - orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct GNUNET_MULTICAST_Origin *orig = cls; grp = &orig->grp; - struct GNUNET_MULTICAST_RequestHeader * - req = (struct GNUNET_MULTICAST_RequestHeader *) msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Calling request callback with a request of size %u.\n", @@ -297,6 +353,8 @@ origin_recv_request (void *cls, if (NULL != orig->request_cb) orig->request_cb (grp->cb_cls, req); + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -304,14 +362,11 @@ origin_recv_request (void *cls, * Receive multicast replay request from service. */ static void -group_recv_replay_request (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_group_replay_request (void *cls, + const struct MulticastReplayRequestMessage *rep) + { - struct GNUNET_MULTICAST_Group * - grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); - struct MulticastReplayRequestMessage * - rep = (struct MulticastReplayRequestMessage *) msg; + struct GNUNET_MULTICAST_Group *grp = cls; if (GNUNET_YES == grp->is_disconnecting) return; @@ -325,7 +380,7 @@ group_recv_replay_request (void *cls, struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); rh->grp = grp; rh->req = *rep; - grp->replay_frag_cb (grp->cb_cls, &rep->member_key, + grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key, GNUNET_ntohll (rep->fragment_id), GNUNET_ntohll (rep->flags), rh); } @@ -337,51 +392,78 @@ group_recv_replay_request (void *cls, struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh)); rh->grp = grp; rh->req = *rep; - grp->replay_msg_cb (grp->cb_cls, &rep->member_key, + grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key, GNUNET_ntohll (rep->message_id), GNUNET_ntohll (rep->fragment_offset), GNUNET_ntohll (rep->flags), rh); } } + + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } /** - * Receive multicast replay request from service. + * Check replay response. + */ +static int +check_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) +{ + uint16_t size = ntohs (res->header.size); + + if (sizeof (*res) == size) + return GNUNET_OK; + + if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size) + return GNUNET_OK; + + return GNUNET_SYSERR; +} + + +/** + * Receive replay response from service. */ static void -member_recv_replay_response (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_member_replay_response (void *cls, + const struct MulticastReplayResponseMessage *res) { struct GNUNET_MULTICAST_Group *grp; - struct GNUNET_MULTICAST_Member * - mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct GNUNET_MULTICAST_Member *mem = cls; grp = &mem->grp; - struct MulticastReplayResponseMessage * - res = (struct MulticastReplayResponseMessage *) msg; if (GNUNET_YES == grp->is_disconnecting) return; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n"); + + // FIXME: return result +} + + +/** + * Check join decision. + */ +static int +check_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) +{ + return GNUNET_OK; // checked in handle below } + /** * Member receives join decision. */ static void -member_recv_join_decision (void *cls, - struct GNUNET_CLIENT_MANAGER_Connection *client, - const struct GNUNET_MessageHeader *msg) +handle_member_join_decision (void *cls, + const struct MulticastJoinDecisionMessageHeader *hdcsn) { struct GNUNET_MULTICAST_Group *grp; - struct GNUNET_MULTICAST_Member * - mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct GNUNET_MULTICAST_Member *mem = cls; grp = &mem->grp; - const struct MulticastJoinDecisionMessageHeader * - hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg; const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; @@ -432,91 +514,59 @@ member_recv_join_decision (void *cls, // FIXME: //if (GNUNET_YES != is_admitted) // GNUNET_MULTICAST_member_part (mem); -} - - -/** - * Message handlers for an origin. - */ -static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = -{ - { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - - { group_recv_message, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, - sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - - { origin_recv_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, - sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, - - { group_recv_join_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, - sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - - { group_recv_replay_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, - - { NULL, NULL, 0, 0, GNUNET_NO } -}; - - -/** - * Message handlers for a member. - */ -static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = -{ - { group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - - { group_recv_message, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, - sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - - { group_recv_join_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, - sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - { member_recv_join_decision, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, - sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, - - { group_recv_replay_request, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, - - { member_recv_replay_response, NULL, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, - sizeof (struct MulticastReplayRequestMessage), GNUNET_NO }, - - { NULL, NULL, 0, 0, GNUNET_NO } -}; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} static void group_cleanup (struct GNUNET_MULTICAST_Group *grp) { - GNUNET_free (grp->connect_msg); + if (NULL != grp->connect_env) + { + GNUNET_MQ_discard (grp->connect_env); + grp->connect_env = NULL; + } + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } if (NULL != grp->disconnect_cb) + { grp->disconnect_cb (grp->disconnect_cls); + grp->disconnect_cb = NULL; + } + GNUNET_free (grp); } static void -origin_cleanup (void *cls) +group_disconnect (struct GNUNET_MULTICAST_Group *grp, + GNUNET_ContinuationCallback cb, + void *cls) { - struct GNUNET_MULTICAST_Origin *orig = cls; - group_cleanup (&orig->grp); - GNUNET_free (orig); -} - + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = cb; + grp->disconnect_cls = cls; -static void -member_cleanup (void *cls) -{ - struct GNUNET_MULTICAST_Member *mem = cls; - group_cleanup (&mem->grp); - GNUNET_free (mem); + if (NULL != grp->mq) + { + struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq); + if (NULL != last) + { + GNUNET_MQ_notify_sent (last, + (GNUNET_MQ_NotifyCallback) group_cleanup, grp); + } + else + { + group_cleanup (grp); + } + } + else + { + group_cleanup (grp); + } } @@ -559,14 +609,12 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); - struct MulticastJoinDecisionMessageHeader * hdcsn; + struct MulticastJoinDecisionMessageHeader *hdcsn; struct MulticastJoinDecisionMessage *dcsn; - hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); - hdcsn->member_key = join->member_key; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_pub_key = join->member_pub_key; hdcsn->peer = join->peer; dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; @@ -575,11 +623,11 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, dcsn->is_admitted = htonl (is_admitted); dcsn->relay_count = htonl (relay_count); if (0 < relay_size) - memcpy (&dcsn[1], relays, relay_size); + GNUNET_memcpy (&dcsn[1], relays, relay_size); if (0 < join_resp_size) - memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); + GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); - GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); + GNUNET_MQ_send (grp->mq, env); GNUNET_free (join); return NULL; } @@ -602,28 +650,23 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, enum GNUNET_MULTICAST_ReplayErrorCode ec) { uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0; - struct MulticastReplayResponseMessage * - res = GNUNET_malloc (sizeof (*res) + msg_size); - *res = (struct MulticastReplayResponseMessage) { - .header = { - .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE), - .size = htons (sizeof (*res) + msg_size), - }, - .fragment_id = rh->req.fragment_id, - .message_id = rh->req.message_id, - .fragment_offset = rh->req.fragment_offset, - .flags = rh->req.flags, - .error_code = htonl (ec), - }; + struct MulticastReplayResponseMessage *res; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE); + res->fragment_id = rh->req.fragment_id; + res->message_id = rh->req.message_id; + res->fragment_offset = rh->req.fragment_offset; + res->flags = rh->req.flags; + res->error_code = htonl (ec); if (GNUNET_MULTICAST_REC_OK == ec) { GNUNET_assert (NULL != msg); - memcpy (&res[1], msg, msg_size); + GNUNET_memcpy (&res[1], msg, msg_size); } - GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header); - GNUNET_free (res); + GNUNET_MQ_send (rh->grp->mq, env); if (GNUNET_MULTICAST_REC_OK != ec) GNUNET_free (rh); @@ -641,18 +684,16 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh, void GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh) { - struct MulticastReplayResponseMessage end = { - .header = { - .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END), - .size = htons (sizeof (end)), - }, - .fragment_id = rh->req.fragment_id, - .message_id = rh->req.message_id, - .fragment_offset = rh->req.fragment_offset, - .flags = rh->req.flags, - }; + struct MulticastReplayResponseMessage *end; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END); + + end->fragment_id = rh->req.fragment_id; + end->message_id = rh->req.message_id; + end->fragment_offset = rh->req.fragment_offset; + end->flags = rh->req.flags; - GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header); + GNUNET_MQ_send (rh->grp->mq, env); GNUNET_free (rh); } @@ -675,6 +716,83 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, } +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig); + + +static void +origin_reconnect (void *cls) +{ + origin_connect (cls); +} + + +/** + * Origin client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +origin_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Origin *orig = cls; + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != grp->mq) + { + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + } + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + origin_reconnect, + orig); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as origin. + */ +static void +origin_connect (struct GNUNET_MULTICAST_Origin *orig) +{ + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (origin_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + struct GNUNET_MULTICAST_RequestHeader, + orig), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, origin_disconnected, orig); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); +} + + /** * Start a multicast group. * @@ -725,16 +843,16 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig)); struct GNUNET_MULTICAST_Group *grp = &orig->grp; - struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start)); - start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); - start->header.size = htons (sizeof (*start)); + struct MulticastOriginStartMessage *start; + grp->connect_env = GNUNET_MQ_msg (start, + GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START); start->max_fragment_id = max_fragment_id; - memcpy (&start->group_key, priv_key, sizeof (*priv_key)); + start->group_key = *priv_key; - grp->connect_msg = (struct GNUNET_MessageHeader *) start; - grp->is_origin = GNUNET_YES; grp->cfg = cfg; + grp->is_origin = GNUNET_YES; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; grp->cb_cls = cls; grp->join_req_cb = join_request_cb; @@ -744,10 +862,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, orig->request_cb = request_cb; - grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); - GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp)); - group_send_connect_msg (grp); - + origin_connect (orig); return orig; } @@ -765,50 +880,56 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, { struct GNUNET_MULTICAST_Group *grp = &orig->grp; - grp->is_disconnecting = GNUNET_YES; - grp->disconnect_cb = stop_cb; - grp->disconnect_cls = stop_cls; - - GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES, - &origin_cleanup, orig); + group_disconnect (grp, stop_cb, stop_cls); } static void origin_to_all (struct GNUNET_MULTICAST_Origin *orig) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig); struct GNUNET_MULTICAST_Group *grp = &orig->grp; struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); + struct GNUNET_MULTICAST_MessageHeader *msg; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg), + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "OriginTransmitNotify() returned error or invalid message size.\n"); + "%p OriginTransmitNotify() returned error or invalid message size.\n", + orig); /* FIXME: handle error */ - GNUNET_free (msg); + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) { - GNUNET_free (msg); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%p OriginTransmitNotify() - transmission paused.\n", orig); + GNUNET_MQ_discard (env); return; /* Transmission paused. */ } - msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (sizeof (*msg) + buf_size); msg->message_id = GNUNET_htonll (tmit->message_id); msg->group_generation = tmit->group_generation; msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*msg) + buf_size; - GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); + grp->acks_pending++; + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } @@ -837,15 +958,15 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, GNUNET_MULTICAST_OriginTransmitNotify notify, void *notify_cls) { -/* FIXME - if (GNUNET_YES == orig->grp.in_transmit) + struct GNUNET_MULTICAST_Group *grp = &orig->grp; + if (GNUNET_YES == grp->in_transmit) return NULL; - orig->grp.in_transmit = GNUNET_YES; -*/ + grp->in_transmit = GNUNET_YES; struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; tmit->origin = orig; tmit->message_id = message_id; + tmit->fragment_offset = 0; tmit->group_generation = group_generation; tmit->notify = notify; tmit->notify_cls = notify_cls; @@ -864,6 +985,9 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, void GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th) { + struct GNUNET_MULTICAST_Group *grp = &th->origin->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; origin_to_all (th->origin); } @@ -877,6 +1001,85 @@ GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHan void GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th) { + th->origin->grp.in_transmit = GNUNET_NO; +} + + +static void +member_connect (struct GNUNET_MULTICAST_Member *mem); + + +static void +member_reconnect (void *cls) +{ + member_connect (cls); +} + + +/** + * Member client disconnected from service. + * + * Reconnect after backoff period. + */ +static void +member_disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_MULTICAST_Member *mem = cls; + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Member client disconnected (%d), re-connecting\n", + (int) error); + GNUNET_MQ_destroy (grp->mq); + grp->mq = NULL; + + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, + member_reconnect, + mem); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); +} + + +/** + * Connect to service as member. + */ +static void +member_connect (struct GNUNET_MULTICAST_Member *mem) +{ + struct GNUNET_MULTICAST_Group *grp = &mem->grp; + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (group_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + struct GNUNET_MULTICAST_MessageHeader, + grp), + GNUNET_MQ_hd_fixed_size (group_fragment_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK, + struct GNUNET_MessageHeader, + grp), + GNUNET_MQ_hd_var_size (group_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + struct MulticastJoinRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_join_decision, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + struct MulticastJoinDecisionMessageHeader, + mem), + GNUNET_MQ_hd_fixed_size (group_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), + GNUNET_MQ_hd_var_size (member_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, + struct MulticastReplayResponseMessage, + mem), + GNUNET_MQ_handler_end () + }; + + grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", + handlers, member_disconnected, mem); + GNUNET_assert (NULL != grp->mq); + GNUNET_MQ_send_copy (grp->mq, grp->connect_env); } @@ -933,7 +1136,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan */ struct GNUNET_MULTICAST_Member * GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, + const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key, const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key, const struct GNUNET_PeerIdentity *origin, uint16_t relay_count, @@ -951,22 +1154,21 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint16_t relay_size = relay_count * sizeof (*relays); uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0; - struct MulticastMemberJoinMessage * - join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size); - join->header.size = htons (sizeof (*join) + relay_size + join_msg_size); - join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); - join->group_key = *group_key; + struct MulticastMemberJoinMessage *join; + grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN); + join->group_pub_key = *group_pub_key; join->member_key = *member_key; join->origin = *origin; join->relay_count = ntohl (relay_count); if (0 < relay_size) - memcpy (&join[1], relays, relay_size); + GNUNET_memcpy (&join[1], relays, relay_size); if (0 < join_msg_size) - memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); + GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); - grp->connect_msg = (struct GNUNET_MessageHeader *) join; - grp->is_origin = GNUNET_NO; grp->cfg = cfg; + grp->is_origin = GNUNET_NO; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; mem->join_dcsn_cb = join_decision_cb; grp->join_req_cb = join_request_cb; @@ -975,10 +1177,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->message_cb = message_cb; grp->cb_cls = cls; - grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); - GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp)); - group_send_connect_msg (grp); - + member_connect (mem); return mem; } @@ -1002,18 +1201,13 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); struct GNUNET_MULTICAST_Group *grp = &mem->grp; - grp->is_disconnecting = GNUNET_YES; - grp->disconnect_cb = part_cb; - grp->disconnect_cls = part_cls; - mem->join_dcsn_cb = NULL; grp->join_req_cb = NULL; grp->message_cb = NULL; grp->replay_msg_cb = NULL; grp->replay_frag_cb = NULL; - GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, - member_cleanup, mem); + group_disconnect (grp, part_cb, part_cls); } @@ -1024,17 +1218,16 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem, uint64_t fragment_offset, uint64_t flags) { - struct MulticastReplayRequestMessage rep = { - .header = { - .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST), - .size = htons (sizeof (rep)), - }, - .fragment_id = GNUNET_htonll (fragment_id), - .message_id = GNUNET_htonll (message_id), - .fragment_offset = GNUNET_htonll (fragment_offset), - .flags = GNUNET_htonll (flags), - }; - GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header); + struct MulticastReplayRequestMessage *rep; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST); + + rep->fragment_id = GNUNET_htonll (fragment_id); + rep->message_id = GNUNET_htonll (message_id); + rep->fragment_offset = GNUNET_htonll (fragment_offset); + rep->flags = GNUNET_htonll (flags); + + GNUNET_MQ_send (mem->grp.mq, env); } @@ -1051,21 +1244,17 @@ member_replay_request (struct GNUNET_MULTICAST_Member *mem, * @param flags * Additional flags for the replay request. * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback - * @param result_cb - * Function to call when the replayed message fragment arrives. - * @param result_cls - * Closure for @a result_cb. * * @return Replay request handle. */ struct GNUNET_MULTICAST_MemberReplayHandle * GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, uint64_t fragment_id, - uint64_t flags, - GNUNET_MULTICAST_ResultCallback result_cb, - void *result_cls) + uint64_t flags) { member_replay_request (mem, fragment_id, 0, 0, flags); + // FIXME: return something useful + return NULL; } @@ -1084,10 +1273,6 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem, * @param flags * Additional flags for the replay request. * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback - * @param result_cb - * Function to call for each replayed message fragment. - * @param result_cls - * Closure for @a result_cb. * * @return Replay request handle, NULL on error. */ @@ -1095,23 +1280,11 @@ struct GNUNET_MULTICAST_MemberReplayHandle * GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem, uint64_t message_id, uint64_t fragment_offset, - uint64_t flags, - GNUNET_MULTICAST_ResultCallback result_cb, - void *result_cls) + uint64_t flags) { member_replay_request (mem, 0, message_id, fragment_offset, flags); -} - - -/** - * Cancel a replay request. - * - * @param rh - * Request to cancel. - */ -void -GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh) -{ + // FIXME: return something useful + return NULL; } @@ -1121,9 +1294,14 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n"); struct GNUNET_MULTICAST_Group *grp = &mem->grp; struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; + GNUNET_assert (GNUNET_YES == grp->in_transmit); size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); + struct GNUNET_MULTICAST_RequestHeader *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req), + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); + int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) @@ -1133,24 +1311,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) "MemberTransmitNotify() returned error or invalid message size. " "ret=%d, buf_size=%u\n", ret, buf_size); /* FIXME: handle error */ - GNUNET_free (req); + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) { /* Transmission paused. */ - GNUNET_free (req); + GNUNET_MQ_discard (env); return; } - req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); req->header.size = htons (sizeof (*req) + buf_size); req->request_id = GNUNET_htonll (tmit->request_id); req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*req) + buf_size; - GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); + GNUNET_MQ_send (grp->mq, env); + + if (GNUNET_YES == ret) + grp->in_transmit = GNUNET_NO; } @@ -1174,15 +1354,14 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, GNUNET_MULTICAST_MemberTransmitNotify notify, void *notify_cls) { -/* FIXME if (GNUNET_YES == mem->grp.in_transmit) return NULL; mem->grp.in_transmit = GNUNET_YES; -*/ struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; tmit->member = mem; tmit->request_id = request_id; + tmit->fragment_offset = 0; tmit->notify = notify; tmit->notify_cls = notify_cls; @@ -1200,6 +1379,9 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, void GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { + struct GNUNET_MULTICAST_Group *grp = &th->member->grp; + if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit) + return; member_to_origin (th->member); } @@ -1213,6 +1395,7 @@ GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmit void GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th) { + th->member->grp.in_transmit = GNUNET_NO; }