2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file multicast/multicast_api.c
23 * @brief Multicast service; implements multicast groups using CADET connections.
24 * @author Christian Grothoff
25 * @author Gabor X Toth
29 #include "gnunet_util_lib.h"
30 #include "gnunet_multicast_service.h"
31 #include "multicast.h"
33 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
37 * Handle for a request to send a message to all multicast group members
40 struct GNUNET_MULTICAST_OriginTransmitHandle
42 GNUNET_MULTICAST_OriginTransmitNotify notify;
44 struct GNUNET_MULTICAST_Origin *origin;
47 uint64_t group_generation;
48 uint64_t fragment_offset;
53 * Handle for a message to be delivered from a member to the origin.
55 struct GNUNET_MULTICAST_MemberTransmitHandle
57 GNUNET_MULTICAST_MemberTransmitNotify notify;
59 struct GNUNET_MULTICAST_Member *member;
62 uint64_t fragment_offset;
66 struct GNUNET_MULTICAST_Group
69 * Configuration to use.
71 const struct GNUNET_CONFIGURATION_Handle *cfg;
74 * Client connection to the service.
76 struct GNUNET_MQ_Handle *mq;
79 * Message to send on connect.
81 struct GNUNET_MQ_Envelope *connect_env;
84 * Time to wait until we try to reconnect on failure.
86 struct GNUNET_TIME_Relative reconnect_delay;
89 * Task for reconnecting when the listener fails.
91 struct GNUNET_SCHEDULER_Task *reconnect_task;
93 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
94 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
95 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
96 GNUNET_MULTICAST_MessageCallback message_cb;
100 * Function called after disconnected from the service.
102 GNUNET_ContinuationCallback disconnect_cb;
105 * Closure for @a disconnect_cb.
107 void *disconnect_cls;
110 * Are we currently transmitting a message?
115 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
117 uint8_t acks_pending;
120 * Is this the origin or a member?
125 * Is this channel in the process of disconnecting from the service?
126 * #GNUNET_YES or #GNUNET_NO
128 uint8_t is_disconnecting;
133 * Handle for the origin of a multicast group.
135 struct GNUNET_MULTICAST_Origin
137 struct GNUNET_MULTICAST_Group grp;
138 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
140 GNUNET_MULTICAST_RequestCallback request_cb;
145 * Handle for a multicast group member.
147 struct GNUNET_MULTICAST_Member
149 struct GNUNET_MULTICAST_Group grp;
150 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
152 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
155 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
157 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
159 uint64_t next_fragment_id;
164 * Handle that identifies a join request.
166 * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
167 * corresponding calls to #GNUNET_MULTICAST_join_decision().
169 struct GNUNET_MULTICAST_JoinHandle
171 struct GNUNET_MULTICAST_Group *group;
174 * Public key of the member requesting join.
176 struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
179 * Peer identity of the member requesting join.
181 struct GNUNET_PeerIdentity peer;
186 * Opaque handle to a replay request from the multicast service.
188 struct GNUNET_MULTICAST_ReplayHandle
190 struct GNUNET_MULTICAST_Group *grp;
191 struct MulticastReplayRequestMessage req;
196 * Handle for a replay request.
198 struct GNUNET_MULTICAST_MemberReplayHandle
204 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
207 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
211 * Check join request message.
214 check_group_join_request (void *cls,
215 const struct MulticastJoinRequestMessage *jreq)
217 uint16_t size = ntohs (jreq->header.size);
219 if (sizeof (*jreq) == size)
222 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
225 return GNUNET_SYSERR;
230 * Receive join request from service.
233 handle_group_join_request (void *cls,
234 const struct MulticastJoinRequestMessage *jreq)
236 struct GNUNET_MULTICAST_Group *grp = cls;
237 struct GNUNET_MULTICAST_JoinHandle *jh;
238 const struct GNUNET_MessageHeader *jmsg = NULL;
245 if (NULL == grp->join_req_cb)
248 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
249 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
251 jh = GNUNET_malloc (sizeof (*jh));
253 jh->member_pub_key = jreq->member_pub_key;
254 jh->peer = jreq->peer;
255 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
257 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
262 * Check multicast message.
265 check_group_message (void *cls,
266 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
273 * Receive multicast message from service.
276 handle_group_message (void *cls,
277 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
279 struct GNUNET_MULTICAST_Group *grp = cls;
281 if (GNUNET_YES == grp->is_disconnecting)
284 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285 "Calling message callback with a message of size %u.\n",
286 ntohs (mmsg->header.size));
288 if (NULL != grp->message_cb)
289 grp->message_cb (grp->cb_cls, mmsg);
291 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
296 * Receive message/request fragment acknowledgement from service.
299 handle_group_fragment_ack (void *cls,
300 const struct GNUNET_MessageHeader *msg)
302 struct GNUNET_MULTICAST_Group *grp = cls;
304 LOG (GNUNET_ERROR_TYPE_DEBUG,
305 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
306 grp, grp->in_transmit, grp->acks_pending);
308 if (0 == grp->acks_pending)
310 LOG (GNUNET_ERROR_TYPE_DEBUG,
311 "%p Ignoring extraneous fragment ACK.\n", grp);
316 if (GNUNET_YES != grp->in_transmit)
319 if (GNUNET_YES == grp->is_origin)
320 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
322 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
324 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
329 * Check unicast request.
332 check_origin_request (void *cls,
333 const struct GNUNET_MULTICAST_RequestHeader *req)
340 * Origin receives unicast request from a member.
343 handle_origin_request (void *cls,
344 const struct GNUNET_MULTICAST_RequestHeader *req)
346 struct GNUNET_MULTICAST_Group *grp;
347 struct GNUNET_MULTICAST_Origin *orig = cls;
350 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
351 "Calling request callback with a request of size %u.\n",
352 ntohs (req->header.size));
354 if (NULL != orig->request_cb)
355 orig->request_cb (grp->cb_cls, req);
357 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
362 * Receive multicast replay request from service.
365 handle_group_replay_request (void *cls,
366 const struct MulticastReplayRequestMessage *rep)
369 struct GNUNET_MULTICAST_Group *grp = cls;
371 if (GNUNET_YES == grp->is_disconnecting)
374 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
376 if (0 != rep->fragment_id)
378 if (NULL != grp->replay_frag_cb)
380 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
383 grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
384 GNUNET_ntohll (rep->fragment_id),
385 GNUNET_ntohll (rep->flags), rh);
388 else if (0 != rep->message_id)
390 if (NULL != grp->replay_msg_cb)
392 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
395 grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
396 GNUNET_ntohll (rep->message_id),
397 GNUNET_ntohll (rep->fragment_offset),
398 GNUNET_ntohll (rep->flags), rh);
402 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
407 * Check replay response.
410 check_member_replay_response (void *cls,
411 const struct MulticastReplayResponseMessage *res)
413 uint16_t size = ntohs (res->header.size);
415 if (sizeof (*res) == size)
418 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
421 return GNUNET_SYSERR;
426 * Receive replay response from service.
429 handle_member_replay_response (void *cls,
430 const struct MulticastReplayResponseMessage *res)
432 struct GNUNET_MULTICAST_Group *grp;
433 struct GNUNET_MULTICAST_Member *mem = cls;
436 if (GNUNET_YES == grp->is_disconnecting)
439 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
441 // FIXME: return result
446 * Check join decision.
449 check_member_join_decision (void *cls,
450 const struct MulticastJoinDecisionMessageHeader *hdcsn)
452 return GNUNET_OK; // checked in handle below
457 * Member receives join decision.
460 handle_member_join_decision (void *cls,
461 const struct MulticastJoinDecisionMessageHeader *hdcsn)
463 struct GNUNET_MULTICAST_Group *grp;
464 struct GNUNET_MULTICAST_Member *mem = cls;
467 const struct MulticastJoinDecisionMessage *
468 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
470 uint16_t dcsn_size = ntohs (dcsn->header.size);
471 int is_admitted = ntohl (dcsn->is_admitted);
473 LOG (GNUNET_ERROR_TYPE_DEBUG,
474 "%p Member got join decision from multicast: %d\n",
477 const struct GNUNET_MessageHeader *join_resp = NULL;
478 uint16_t join_resp_size = 0;
480 uint16_t relay_count = ntohl (dcsn->relay_count);
481 const struct GNUNET_PeerIdentity *relays = NULL;
482 uint16_t relay_size = relay_count * sizeof (*relays);
485 if (dcsn_size < sizeof (*dcsn) + relay_size)
488 is_admitted = GNUNET_SYSERR;
492 relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
496 if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
498 join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
499 join_resp_size = ntohs (join_resp->size);
501 if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
503 LOG (GNUNET_ERROR_TYPE_DEBUG,
504 "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
505 dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
507 is_admitted = GNUNET_SYSERR;
510 if (NULL != mem->join_dcsn_cb)
511 mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
512 relay_count, relays, join_resp);
515 //if (GNUNET_YES != is_admitted)
516 // GNUNET_MULTICAST_member_part (mem);
518 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
523 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
525 if (NULL != grp->connect_env)
527 GNUNET_MQ_discard (grp->connect_env);
528 grp->connect_env = NULL;
532 GNUNET_MQ_destroy (grp->mq);
535 if (NULL != grp->disconnect_cb)
537 grp->disconnect_cb (grp->disconnect_cls);
538 grp->disconnect_cb = NULL;
545 group_disconnect (struct GNUNET_MULTICAST_Group *grp,
546 GNUNET_ContinuationCallback cb,
549 grp->is_disconnecting = GNUNET_YES;
550 grp->disconnect_cb = cb;
551 grp->disconnect_cls = cls;
555 struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
558 GNUNET_MQ_notify_sent (last,
559 (GNUNET_SCHEDULER_TaskCallback) group_cleanup, grp);
574 * Function to call with the decision made for a join request.
576 * Must be called once and only once in response to an invocation of the
577 * #GNUNET_MULTICAST_JoinRequestCallback.
580 * Join request handle.
582 * #GNUNET_YES if the join is approved,
583 * #GNUNET_NO if it is disapproved,
584 * #GNUNET_SYSERR if we cannot answer the request.
586 * Number of relays given.
588 * Array of suggested peers that might be useful relays to use
589 * when joining the multicast group (essentially a list of peers that
590 * are already part of the multicast group and might thus be willing
591 * to help with routing). If empty, only this local peer (which must
592 * be the multicast origin) is a good candidate for building the
593 * multicast tree. Note that it is unnecessary to specify our own
594 * peer identity in this array.
596 * Message to send in response to the joining peer;
597 * can also be used to redirect the peer to a different group at the
598 * application layer; this response is to be transmitted to the
599 * peer that issued the request even if admission is denied.
601 struct GNUNET_MULTICAST_ReplayHandle *
602 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
604 uint16_t relay_count,
605 const struct GNUNET_PeerIdentity *relays,
606 const struct GNUNET_MessageHeader *join_resp)
608 struct GNUNET_MULTICAST_Group *grp = join->group;
609 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
610 uint16_t relay_size = relay_count * sizeof (*relays);
612 struct MulticastJoinDecisionMessageHeader *hdcsn;
613 struct MulticastJoinDecisionMessage *dcsn;
614 struct GNUNET_MQ_Envelope *
615 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
616 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
617 hdcsn->member_pub_key = join->member_pub_key;
618 hdcsn->peer = join->peer;
620 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
621 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
622 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
623 dcsn->is_admitted = htonl (is_admitted);
624 dcsn->relay_count = htonl (relay_count);
626 GNUNET_memcpy (&dcsn[1], relays, relay_size);
627 if (0 < join_resp_size)
628 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
630 GNUNET_MQ_send (grp->mq, env);
637 * Replay a message fragment for the multicast group.
640 * Replay handle identifying which replay operation was requested.
642 * Replayed message fragment, NULL if not found / an error occurred.
644 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
645 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
648 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
649 const struct GNUNET_MessageHeader *msg,
650 enum GNUNET_MULTICAST_ReplayErrorCode ec)
652 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
653 struct MulticastReplayResponseMessage *res;
654 struct GNUNET_MQ_Envelope *
655 env = GNUNET_MQ_msg_extra (res, msg_size,
656 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
657 res->fragment_id = rh->req.fragment_id;
658 res->message_id = rh->req.message_id;
659 res->fragment_offset = rh->req.fragment_offset;
660 res->flags = rh->req.flags;
661 res->error_code = htonl (ec);
663 if (GNUNET_MULTICAST_REC_OK == ec)
665 GNUNET_assert (NULL != msg);
666 GNUNET_memcpy (&res[1], msg, msg_size);
669 GNUNET_MQ_send (rh->grp->mq, env);
671 if (GNUNET_MULTICAST_REC_OK != ec)
677 * Indicate the end of the replay session.
679 * Invalidates the replay handle.
682 * Replay session to end.
685 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
687 struct MulticastReplayResponseMessage *end;
688 struct GNUNET_MQ_Envelope *
689 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
691 end->fragment_id = rh->req.fragment_id;
692 end->message_id = rh->req.message_id;
693 end->fragment_offset = rh->req.fragment_offset;
694 end->flags = rh->req.flags;
696 GNUNET_MQ_send (rh->grp->mq, env);
702 * Replay a message for the multicast group.
705 * Replay handle identifying which replay operation was requested.
707 * Function to call to get the message.
709 * Closure for @a notify.
712 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
713 GNUNET_MULTICAST_ReplayTransmitNotify notify,
720 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
724 origin_reconnect (void *cls)
726 origin_connect (cls);
731 * Origin client disconnected from service.
733 * Reconnect after backoff period.
736 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
738 struct GNUNET_MULTICAST_Origin *orig = cls;
739 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
741 LOG (GNUNET_ERROR_TYPE_DEBUG,
742 "Origin client disconnected (%d), re-connecting\n",
746 GNUNET_MQ_destroy (grp->mq);
750 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
753 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
758 * Connect to service as origin.
761 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
763 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
765 struct GNUNET_MQ_MessageHandler handlers[] = {
766 GNUNET_MQ_hd_var_size (group_message,
767 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
768 struct GNUNET_MULTICAST_MessageHeader,
770 GNUNET_MQ_hd_var_size (origin_request,
771 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
772 struct GNUNET_MULTICAST_RequestHeader,
774 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
775 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
776 struct GNUNET_MessageHeader,
778 GNUNET_MQ_hd_var_size (group_join_request,
779 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
780 struct MulticastJoinRequestMessage,
782 GNUNET_MQ_hd_fixed_size (group_replay_request,
783 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
784 struct MulticastReplayRequestMessage,
786 GNUNET_MQ_handler_end ()
789 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
790 handlers, origin_disconnected, orig);
791 GNUNET_assert (NULL != grp->mq);
792 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
797 * Start a multicast group.
799 * Will advertise the origin in the P2P overlay network under the respective
800 * public key so that other peer can find this peer to join it. Peers that
801 * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
802 * either an existing group member or to the origin. If the joining is
803 * approved, the member is cleared for @e replay and will begin to receive
804 * messages transmitted to the group. If joining is disapproved, the failed
805 * candidate will be given a response. Members in the group can send messages
806 * to the origin (one at a time).
809 * Configuration to use.
811 * ECC key that will be used to sign messages for this
812 * multicast session; public key is used to identify the multicast group;
813 * @param max_fragment_id
814 * Maximum fragment ID already sent to the group.
816 * @param join_request_cb
817 * Function called to approve / disapprove joining of a peer.
818 * @param replay_frag_cb
819 * Function that can be called to replay a message fragment.
820 * @param replay_msg_cb
821 * Function that can be called to replay a message.
823 * Function called with message fragments from group members.
825 * Function called with the message fragments sent to the
826 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
827 * should be stored for answering replay requests later.
829 * Closure for the various callbacks that follow.
831 * @return Handle for the origin, NULL on error.
833 struct GNUNET_MULTICAST_Origin *
834 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
835 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
836 uint64_t max_fragment_id,
837 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
838 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
839 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
840 GNUNET_MULTICAST_RequestCallback request_cb,
841 GNUNET_MULTICAST_MessageCallback message_cb,
844 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
845 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
847 struct MulticastOriginStartMessage *start;
848 grp->connect_env = GNUNET_MQ_msg (start,
849 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
850 start->max_fragment_id = max_fragment_id;
851 start->group_key = *priv_key;
854 grp->is_origin = GNUNET_YES;
855 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
858 grp->join_req_cb = join_request_cb;
859 grp->replay_frag_cb = replay_frag_cb;
860 grp->replay_msg_cb = replay_msg_cb;
861 grp->message_cb = message_cb;
863 orig->request_cb = request_cb;
865 origin_connect (orig);
871 * Stop a multicast group.
874 * Multicast group to stop.
877 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
878 GNUNET_ContinuationCallback stop_cb,
881 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
883 group_disconnect (grp, stop_cb, stop_cls);
888 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
890 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
891 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
892 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
893 GNUNET_assert (GNUNET_YES == grp->in_transmit);
895 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
896 struct GNUNET_MULTICAST_MessageHeader *msg;
897 struct GNUNET_MQ_Envelope *
898 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
899 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
901 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
903 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
904 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
906 LOG (GNUNET_ERROR_TYPE_ERROR,
907 "%p OriginTransmitNotify() returned error or invalid message size.\n",
909 /* FIXME: handle error */
910 GNUNET_MQ_discard (env);
914 if (GNUNET_NO == ret && 0 == buf_size)
916 LOG (GNUNET_ERROR_TYPE_DEBUG,
917 "%p OriginTransmitNotify() - transmission paused.\n", orig);
918 GNUNET_MQ_discard (env);
919 return; /* Transmission paused. */
922 msg->header.size = htons (sizeof (*msg) + buf_size);
923 msg->message_id = GNUNET_htonll (tmit->message_id);
924 msg->group_generation = tmit->group_generation;
925 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
926 tmit->fragment_offset += sizeof (*msg) + buf_size;
929 GNUNET_MQ_send (grp->mq, env);
931 if (GNUNET_YES == ret)
932 grp->in_transmit = GNUNET_NO;
937 * Send a message to the multicast group.
940 * Handle to the multicast group.
942 * Application layer ID for the message. Opaque to multicast.
943 * @param group_generation
944 * Group generation of the message.
945 * Documented in struct GNUNET_MULTICAST_MessageHeader.
947 * Function to call to get the message.
949 * Closure for @a notify.
951 * @return Message handle on success,
952 * NULL on error (i.e. another request is already pending).
954 struct GNUNET_MULTICAST_OriginTransmitHandle *
955 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
957 uint64_t group_generation,
958 GNUNET_MULTICAST_OriginTransmitNotify notify,
961 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
962 if (GNUNET_YES == grp->in_transmit)
964 grp->in_transmit = GNUNET_YES;
966 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
968 tmit->message_id = message_id;
969 tmit->fragment_offset = 0;
970 tmit->group_generation = group_generation;
971 tmit->notify = notify;
972 tmit->notify_cls = notify_cls;
974 origin_to_all (orig);
980 * Resume message transmission to multicast group.
983 * Transmission to cancel.
986 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
988 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
989 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
991 origin_to_all (th->origin);
996 * Cancel request for message transmission to multicast group.
999 * Transmission to cancel.
1002 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
1004 th->origin->grp.in_transmit = GNUNET_NO;
1009 member_connect (struct GNUNET_MULTICAST_Member *mem);
1013 member_reconnect (void *cls)
1015 member_connect (cls);
1020 * Member client disconnected from service.
1022 * Reconnect after backoff period.
1025 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1027 struct GNUNET_MULTICAST_Member *mem = cls;
1028 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1030 LOG (GNUNET_ERROR_TYPE_DEBUG,
1031 "Member client disconnected (%d), re-connecting\n",
1033 GNUNET_MQ_destroy (grp->mq);
1036 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1039 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1044 * Connect to service as member.
1047 member_connect (struct GNUNET_MULTICAST_Member *mem)
1049 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1051 struct GNUNET_MQ_MessageHandler handlers[] = {
1052 GNUNET_MQ_hd_var_size (group_message,
1053 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1054 struct GNUNET_MULTICAST_MessageHeader,
1056 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1057 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1058 struct GNUNET_MessageHeader,
1060 GNUNET_MQ_hd_var_size (group_join_request,
1061 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1062 struct MulticastJoinRequestMessage,
1064 GNUNET_MQ_hd_var_size (member_join_decision,
1065 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1066 struct MulticastJoinDecisionMessageHeader,
1068 GNUNET_MQ_hd_fixed_size (group_replay_request,
1069 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1070 struct MulticastReplayRequestMessage,
1072 GNUNET_MQ_hd_var_size (member_replay_response,
1073 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1074 struct MulticastReplayResponseMessage,
1076 GNUNET_MQ_handler_end ()
1079 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1080 handlers, member_disconnected, mem);
1081 GNUNET_assert (NULL != grp->mq);
1082 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1087 * Join a multicast group.
1089 * The entity joining is always the local peer. Further information about the
1090 * candidate can be provided in the @a join_request message. If the join fails, the
1091 * @a message_cb is invoked with a (failure) response and then with NULL. If
1092 * the join succeeds, outstanding (state) messages and ongoing multicast
1093 * messages will be given to the @a message_cb until the member decides to part
1094 * the group. The @a replay_cb function may be called at any time by the
1095 * multicast service to support relaying messages to other members of the group.
1098 * Configuration to use.
1100 * ECC public key that identifies the group to join.
1102 * ECC key that identifies the member
1103 * and used to sign requests sent to the origin.
1105 * Peer ID of the origin to send unicast requsets to. If NULL,
1106 * unicast requests are sent back via multiple hops on the reverse path
1107 * of multicast messages.
1108 * @param relay_count
1109 * Number of peers in the @a relays array.
1111 * Peer identities of members of the group, which serve as relays
1112 * and can be used to join the group at. and send the @a join_request to.
1113 * If empty, the @a join_request is sent directly to the @a origin.
1115 * Application-dependent join message to be passed to the peer @a origin.
1116 * @param join_request_cb
1117 * Function called to approve / disapprove joining of a peer.
1118 * @param join_decision_cb
1119 * Function called to inform about the join decision.
1120 * @param replay_frag_cb
1121 * Function that can be called to replay message fragments
1122 * this peer already knows from this group. NULL if this
1123 * client is unable to support replay.
1124 * @param replay_msg_cb
1125 * Function that can be called to replay message fragments
1126 * this peer already knows from this group. NULL if this
1127 * client is unable to support replay.
1129 * Function to be called for all message fragments we
1130 * receive from the group, excluding those our @a replay_cb
1133 * Closure for callbacks.
1135 * @return Handle for the member, NULL on error.
1137 struct GNUNET_MULTICAST_Member *
1138 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1139 const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1140 const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1141 const struct GNUNET_PeerIdentity *origin,
1142 uint16_t relay_count,
1143 const struct GNUNET_PeerIdentity *relays,
1144 const struct GNUNET_MessageHeader *join_msg,
1145 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1146 GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1147 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1148 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1149 GNUNET_MULTICAST_MessageCallback message_cb,
1152 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1153 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1155 uint16_t relay_size = relay_count * sizeof (*relays);
1156 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1157 struct MulticastMemberJoinMessage *join;
1158 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1159 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1160 join->group_pub_key = *group_pub_key;
1161 join->member_key = *member_key;
1162 join->origin = *origin;
1163 join->relay_count = ntohl (relay_count);
1165 GNUNET_memcpy (&join[1], relays, relay_size);
1166 if (0 < join_msg_size)
1167 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1170 grp->is_origin = GNUNET_NO;
1171 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1173 mem->join_dcsn_cb = join_decision_cb;
1174 grp->join_req_cb = join_request_cb;
1175 grp->replay_frag_cb = replay_frag_cb;
1176 grp->replay_msg_cb = replay_msg_cb;
1177 grp->message_cb = message_cb;
1180 member_connect (mem);
1186 * Part a multicast group.
1188 * Disconnects from all group members and invalidates the @a member handle.
1190 * An application-dependent part message can be transmitted beforehand using
1191 * #GNUNET_MULTICAST_member_to_origin())
1194 * Membership handle.
1197 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1198 GNUNET_ContinuationCallback part_cb,
1201 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
1202 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1204 mem->join_dcsn_cb = NULL;
1205 grp->join_req_cb = NULL;
1206 grp->message_cb = NULL;
1207 grp->replay_msg_cb = NULL;
1208 grp->replay_frag_cb = NULL;
1210 group_disconnect (grp, part_cb, part_cls);
1215 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1216 uint64_t fragment_id,
1217 uint64_t message_id,
1218 uint64_t fragment_offset,
1221 struct MulticastReplayRequestMessage *rep;
1222 struct GNUNET_MQ_Envelope *
1223 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1225 rep->fragment_id = GNUNET_htonll (fragment_id);
1226 rep->message_id = GNUNET_htonll (message_id);
1227 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1228 rep->flags = GNUNET_htonll (flags);
1230 GNUNET_MQ_send (mem->grp.mq, env);
1235 * Request a fragment to be replayed by fragment ID.
1237 * Useful if messages below the @e max_known_fragment_id given when joining are
1238 * needed and not known to the client.
1241 * Membership handle.
1242 * @param fragment_id
1243 * ID of a message fragment that this client would like to see replayed.
1245 * Additional flags for the replay request.
1246 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1248 * @return Replay request handle.
1250 struct GNUNET_MULTICAST_MemberReplayHandle *
1251 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1252 uint64_t fragment_id,
1255 member_replay_request (mem, fragment_id, 0, 0, flags);
1256 // FIXME: return something useful
1262 * Request a message fragment to be replayed.
1264 * Useful if messages below the @e max_known_fragment_id given when joining are
1265 * needed and not known to the client.
1268 * Membership handle.
1270 * ID of the message this client would like to see replayed.
1271 * @param fragment_offset
1272 * Offset of the fragment within the message to replay.
1274 * Additional flags for the replay request.
1275 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1277 * @return Replay request handle, NULL on error.
1279 struct GNUNET_MULTICAST_MemberReplayHandle *
1280 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1281 uint64_t message_id,
1282 uint64_t fragment_offset,
1285 member_replay_request (mem, 0, message_id, fragment_offset, flags);
1286 // FIXME: return something useful
1292 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1294 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1295 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1296 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1297 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1299 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1300 struct GNUNET_MULTICAST_RequestHeader *req;
1301 struct GNUNET_MQ_Envelope *
1302 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1303 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1305 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1307 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1308 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1310 LOG (GNUNET_ERROR_TYPE_ERROR,
1311 "MemberTransmitNotify() returned error or invalid message size. "
1312 "ret=%d, buf_size=%u\n", ret, buf_size);
1313 /* FIXME: handle error */
1314 GNUNET_MQ_discard (env);
1318 if (GNUNET_NO == ret && 0 == buf_size)
1320 /* Transmission paused. */
1321 GNUNET_MQ_discard (env);
1325 req->header.size = htons (sizeof (*req) + buf_size);
1326 req->request_id = GNUNET_htonll (tmit->request_id);
1327 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1328 tmit->fragment_offset += sizeof (*req) + buf_size;
1330 GNUNET_MQ_send (grp->mq, env);
1332 if (GNUNET_YES == ret)
1333 grp->in_transmit = GNUNET_NO;
1338 * Send a message to the origin of the multicast group.
1341 * Membership handle.
1343 * Application layer ID for the request. Opaque to multicast.
1345 * Callback to call to get the message.
1347 * Closure for @a notify.
1349 * @return Handle to cancel request, NULL on error (i.e. request already pending).
1351 struct GNUNET_MULTICAST_MemberTransmitHandle *
1352 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1353 uint64_t request_id,
1354 GNUNET_MULTICAST_MemberTransmitNotify notify,
1357 if (GNUNET_YES == mem->grp.in_transmit)
1359 mem->grp.in_transmit = GNUNET_YES;
1361 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1363 tmit->request_id = request_id;
1364 tmit->fragment_offset = 0;
1365 tmit->notify = notify;
1366 tmit->notify_cls = notify_cls;
1368 member_to_origin (mem);
1374 * Resume message transmission to origin.
1377 * Transmission to cancel.
1380 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1382 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1383 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1385 member_to_origin (th->member);
1390 * Cancel request for message transmission to origin.
1393 * Transmission to cancel.
1396 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1398 th->member->grp.in_transmit = GNUNET_NO;
1402 /* end of multicast_api.c */