2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
17 * @file multicast/multicast_api.c
18 * @brief Multicast service; implements multicast groups using CADET connections.
19 * @author Christian Grothoff
20 * @author Gabor X Toth
24 #include "gnunet_util_lib.h"
25 #include "gnunet_multicast_service.h"
26 #include "multicast.h"
28 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
32 * Handle for a request to send a message to all multicast group members
35 struct GNUNET_MULTICAST_OriginTransmitHandle
37 GNUNET_MULTICAST_OriginTransmitNotify notify;
39 struct GNUNET_MULTICAST_Origin *origin;
42 uint64_t group_generation;
43 uint64_t fragment_offset;
48 * Handle for a message to be delivered from a member to the origin.
50 struct GNUNET_MULTICAST_MemberTransmitHandle
52 GNUNET_MULTICAST_MemberTransmitNotify notify;
54 struct GNUNET_MULTICAST_Member *member;
57 uint64_t fragment_offset;
61 struct GNUNET_MULTICAST_Group
64 * Configuration to use.
66 const struct GNUNET_CONFIGURATION_Handle *cfg;
69 * Client connection to the service.
71 struct GNUNET_MQ_Handle *mq;
74 * Message to send on connect.
76 struct GNUNET_MQ_Envelope *connect_env;
79 * Time to wait until we try to reconnect on failure.
81 struct GNUNET_TIME_Relative reconnect_delay;
84 * Task for reconnecting when the listener fails.
86 struct GNUNET_SCHEDULER_Task *reconnect_task;
88 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
89 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
90 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
91 GNUNET_MULTICAST_MessageCallback message_cb;
95 * Function called after disconnected from the service.
97 GNUNET_ContinuationCallback disconnect_cb;
100 * Closure for @a disconnect_cb.
102 void *disconnect_cls;
105 * Are we currently transmitting a message?
110 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
112 uint8_t acks_pending;
115 * Is this the origin or a member?
120 * Is this channel in the process of disconnecting from the service?
121 * #GNUNET_YES or #GNUNET_NO
123 uint8_t is_disconnecting;
128 * Handle for the origin of a multicast group.
130 struct GNUNET_MULTICAST_Origin
132 struct GNUNET_MULTICAST_Group grp;
133 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
135 GNUNET_MULTICAST_RequestCallback request_cb;
140 * Handle for a multicast group member.
142 struct GNUNET_MULTICAST_Member
144 struct GNUNET_MULTICAST_Group grp;
145 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
147 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
150 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
152 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
154 uint64_t next_fragment_id;
159 * Handle that identifies a join request.
161 * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
162 * corresponding calls to #GNUNET_MULTICAST_join_decision().
164 struct GNUNET_MULTICAST_JoinHandle
166 struct GNUNET_MULTICAST_Group *group;
169 * Public key of the member requesting join.
171 struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
174 * Peer identity of the member requesting join.
176 struct GNUNET_PeerIdentity peer;
181 * Opaque handle to a replay request from the multicast service.
183 struct GNUNET_MULTICAST_ReplayHandle
185 struct GNUNET_MULTICAST_Group *grp;
186 struct MulticastReplayRequestMessage req;
191 * Handle for a replay request.
193 struct GNUNET_MULTICAST_MemberReplayHandle
199 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
202 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
206 * Check join request message.
209 check_group_join_request (void *cls,
210 const struct MulticastJoinRequestMessage *jreq)
212 uint16_t size = ntohs (jreq->header.size);
214 if (sizeof (*jreq) == size)
217 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
220 return GNUNET_SYSERR;
225 * Receive join request from service.
228 handle_group_join_request (void *cls,
229 const struct MulticastJoinRequestMessage *jreq)
231 struct GNUNET_MULTICAST_Group *grp = cls;
232 struct GNUNET_MULTICAST_JoinHandle *jh;
233 const struct GNUNET_MessageHeader *jmsg = NULL;
240 if (NULL == grp->join_req_cb)
243 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
244 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
246 jh = GNUNET_malloc (sizeof (*jh));
248 jh->member_pub_key = jreq->member_pub_key;
249 jh->peer = jreq->peer;
250 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
252 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
257 * Check multicast message.
260 check_group_message (void *cls,
261 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
268 * Receive multicast message from service.
271 handle_group_message (void *cls,
272 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
274 struct GNUNET_MULTICAST_Group *grp = cls;
276 if (GNUNET_YES == grp->is_disconnecting)
279 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
280 "Calling message callback with a message of size %u.\n",
281 ntohs (mmsg->header.size));
283 if (NULL != grp->message_cb)
284 grp->message_cb (grp->cb_cls, mmsg);
286 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
291 * Receive message/request fragment acknowledgement from service.
294 handle_group_fragment_ack (void *cls,
295 const struct GNUNET_MessageHeader *msg)
297 struct GNUNET_MULTICAST_Group *grp = cls;
299 LOG (GNUNET_ERROR_TYPE_DEBUG,
300 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
301 grp, grp->in_transmit, grp->acks_pending);
303 if (0 == grp->acks_pending)
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "%p Ignoring extraneous fragment ACK.\n", grp);
311 if (GNUNET_YES != grp->in_transmit)
314 if (GNUNET_YES == grp->is_origin)
315 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
317 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
319 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
324 * Check unicast request.
327 check_origin_request (void *cls,
328 const struct GNUNET_MULTICAST_RequestHeader *req)
335 * Origin receives unicast request from a member.
338 handle_origin_request (void *cls,
339 const struct GNUNET_MULTICAST_RequestHeader *req)
341 struct GNUNET_MULTICAST_Group *grp;
342 struct GNUNET_MULTICAST_Origin *orig = cls;
345 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
346 "Calling request callback with a request of size %u.\n",
347 ntohs (req->header.size));
349 if (NULL != orig->request_cb)
350 orig->request_cb (grp->cb_cls, req);
352 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
357 * Receive multicast replay request from service.
360 handle_group_replay_request (void *cls,
361 const struct MulticastReplayRequestMessage *rep)
364 struct GNUNET_MULTICAST_Group *grp = cls;
366 if (GNUNET_YES == grp->is_disconnecting)
369 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
371 if (0 != rep->fragment_id)
373 if (NULL != grp->replay_frag_cb)
375 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
378 grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
379 GNUNET_ntohll (rep->fragment_id),
380 GNUNET_ntohll (rep->flags), rh);
383 else if (0 != rep->message_id)
385 if (NULL != grp->replay_msg_cb)
387 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
390 grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
391 GNUNET_ntohll (rep->message_id),
392 GNUNET_ntohll (rep->fragment_offset),
393 GNUNET_ntohll (rep->flags), rh);
397 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
402 * Check replay response.
405 check_member_replay_response (void *cls,
406 const struct MulticastReplayResponseMessage *res)
408 uint16_t size = ntohs (res->header.size);
410 if (sizeof (*res) == size)
413 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
416 return GNUNET_SYSERR;
421 * Receive replay response from service.
424 handle_member_replay_response (void *cls,
425 const struct MulticastReplayResponseMessage *res)
427 struct GNUNET_MULTICAST_Group *grp;
428 struct GNUNET_MULTICAST_Member *mem = cls;
431 if (GNUNET_YES == grp->is_disconnecting)
434 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
436 // FIXME: return result
441 * Check join decision.
444 check_member_join_decision (void *cls,
445 const struct MulticastJoinDecisionMessageHeader *hdcsn)
447 return GNUNET_OK; // checked in handle below
452 * Member receives join decision.
455 handle_member_join_decision (void *cls,
456 const struct MulticastJoinDecisionMessageHeader *hdcsn)
458 struct GNUNET_MULTICAST_Group *grp;
459 struct GNUNET_MULTICAST_Member *mem = cls;
462 const struct MulticastJoinDecisionMessage *
463 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
465 uint16_t dcsn_size = ntohs (dcsn->header.size);
466 int is_admitted = ntohl (dcsn->is_admitted);
468 LOG (GNUNET_ERROR_TYPE_DEBUG,
469 "%p Member got join decision from multicast: %d\n",
472 const struct GNUNET_MessageHeader *join_resp = NULL;
473 uint16_t join_resp_size = 0;
475 uint16_t relay_count = ntohl (dcsn->relay_count);
476 const struct GNUNET_PeerIdentity *relays = NULL;
477 uint16_t relay_size = relay_count * sizeof (*relays);
480 if (dcsn_size < sizeof (*dcsn) + relay_size)
483 is_admitted = GNUNET_SYSERR;
487 relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
491 if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
493 join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
494 join_resp_size = ntohs (join_resp->size);
496 if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
498 LOG (GNUNET_ERROR_TYPE_DEBUG,
499 "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
500 dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
502 is_admitted = GNUNET_SYSERR;
505 if (NULL != mem->join_dcsn_cb)
506 mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
507 relay_count, relays, join_resp);
510 //if (GNUNET_YES != is_admitted)
511 // GNUNET_MULTICAST_member_part (mem);
513 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
518 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
520 if (NULL != grp->connect_env)
522 GNUNET_MQ_discard (grp->connect_env);
523 grp->connect_env = NULL;
527 GNUNET_MQ_destroy (grp->mq);
530 if (NULL != grp->disconnect_cb)
532 grp->disconnect_cb (grp->disconnect_cls);
533 grp->disconnect_cb = NULL;
540 handle_group_part_ack (void *cls,
541 const struct GNUNET_MessageHeader *msg)
543 struct GNUNET_MULTICAST_Group *grp = cls;
550 * Function to call with the decision made for a join request.
552 * Must be called once and only once in response to an invocation of the
553 * #GNUNET_MULTICAST_JoinRequestCallback.
556 * Join request handle.
558 * #GNUNET_YES if the join is approved,
559 * #GNUNET_NO if it is disapproved,
560 * #GNUNET_SYSERR if we cannot answer the request.
562 * Number of relays given.
564 * Array of suggested peers that might be useful relays to use
565 * when joining the multicast group (essentially a list of peers that
566 * are already part of the multicast group and might thus be willing
567 * to help with routing). If empty, only this local peer (which must
568 * be the multicast origin) is a good candidate for building the
569 * multicast tree. Note that it is unnecessary to specify our own
570 * peer identity in this array.
572 * Message to send in response to the joining peer;
573 * can also be used to redirect the peer to a different group at the
574 * application layer; this response is to be transmitted to the
575 * peer that issued the request even if admission is denied.
577 struct GNUNET_MULTICAST_ReplayHandle *
578 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
580 uint16_t relay_count,
581 const struct GNUNET_PeerIdentity *relays,
582 const struct GNUNET_MessageHeader *join_resp)
584 struct GNUNET_MULTICAST_Group *grp = join->group;
585 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
586 uint16_t relay_size = relay_count * sizeof (*relays);
588 struct MulticastJoinDecisionMessageHeader *hdcsn;
589 struct MulticastJoinDecisionMessage *dcsn;
590 struct GNUNET_MQ_Envelope *
591 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
592 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
593 hdcsn->member_pub_key = join->member_pub_key;
594 hdcsn->peer = join->peer;
596 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
597 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
598 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
599 dcsn->is_admitted = htonl (is_admitted);
600 dcsn->relay_count = htonl (relay_count);
602 GNUNET_memcpy (&dcsn[1], relays, relay_size);
603 if (0 < join_resp_size)
604 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
606 GNUNET_MQ_send (grp->mq, env);
613 * Replay a message fragment for the multicast group.
616 * Replay handle identifying which replay operation was requested.
618 * Replayed message fragment, NULL if not found / an error occurred.
620 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
621 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
624 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
625 const struct GNUNET_MessageHeader *msg,
626 enum GNUNET_MULTICAST_ReplayErrorCode ec)
628 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
629 struct MulticastReplayResponseMessage *res;
630 struct GNUNET_MQ_Envelope *
631 env = GNUNET_MQ_msg_extra (res, msg_size,
632 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
633 res->fragment_id = rh->req.fragment_id;
634 res->message_id = rh->req.message_id;
635 res->fragment_offset = rh->req.fragment_offset;
636 res->flags = rh->req.flags;
637 res->error_code = htonl (ec);
639 if (GNUNET_MULTICAST_REC_OK == ec)
641 GNUNET_assert (NULL != msg);
642 GNUNET_memcpy (&res[1], msg, msg_size);
645 GNUNET_MQ_send (rh->grp->mq, env);
647 if (GNUNET_MULTICAST_REC_OK != ec)
653 * Indicate the end of the replay session.
655 * Invalidates the replay handle.
658 * Replay session to end.
661 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
663 struct MulticastReplayResponseMessage *end;
664 struct GNUNET_MQ_Envelope *
665 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
667 end->fragment_id = rh->req.fragment_id;
668 end->message_id = rh->req.message_id;
669 end->fragment_offset = rh->req.fragment_offset;
670 end->flags = rh->req.flags;
672 GNUNET_MQ_send (rh->grp->mq, env);
678 * Replay a message for the multicast group.
681 * Replay handle identifying which replay operation was requested.
683 * Function to call to get the message.
685 * Closure for @a notify.
688 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
689 GNUNET_MULTICAST_ReplayTransmitNotify notify,
696 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
700 origin_reconnect (void *cls)
702 origin_connect (cls);
707 * Origin client disconnected from service.
709 * Reconnect after backoff period.
712 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
714 struct GNUNET_MULTICAST_Origin *orig = cls;
715 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
717 LOG (GNUNET_ERROR_TYPE_DEBUG,
718 "Origin client disconnected (%d), re-connecting\n",
722 GNUNET_MQ_destroy (grp->mq);
726 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
729 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
734 * Connect to service as origin.
737 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
739 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
741 struct GNUNET_MQ_MessageHandler handlers[] = {
742 GNUNET_MQ_hd_var_size (group_message,
743 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
744 struct GNUNET_MULTICAST_MessageHeader,
746 GNUNET_MQ_hd_var_size (origin_request,
747 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
748 struct GNUNET_MULTICAST_RequestHeader,
750 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
751 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
752 struct GNUNET_MessageHeader,
754 GNUNET_MQ_hd_var_size (group_join_request,
755 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
756 struct MulticastJoinRequestMessage,
758 GNUNET_MQ_hd_fixed_size (group_part_ack,
759 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
760 struct GNUNET_MessageHeader,
762 GNUNET_MQ_hd_fixed_size (group_replay_request,
763 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
764 struct MulticastReplayRequestMessage,
766 GNUNET_MQ_handler_end ()
769 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
770 handlers, origin_disconnected, orig);
771 GNUNET_assert (NULL != grp->mq);
772 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
777 * Start a multicast group.
779 * Will advertise the origin in the P2P overlay network under the respective
780 * public key so that other peer can find this peer to join it. Peers that
781 * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
782 * either an existing group member or to the origin. If the joining is
783 * approved, the member is cleared for @e replay and will begin to receive
784 * messages transmitted to the group. If joining is disapproved, the failed
785 * candidate will be given a response. Members in the group can send messages
786 * to the origin (one at a time).
789 * Configuration to use.
791 * ECC key that will be used to sign messages for this
792 * multicast session; public key is used to identify the multicast group;
793 * @param max_fragment_id
794 * Maximum fragment ID already sent to the group.
796 * @param join_request_cb
797 * Function called to approve / disapprove joining of a peer.
798 * @param replay_frag_cb
799 * Function that can be called to replay a message fragment.
800 * @param replay_msg_cb
801 * Function that can be called to replay a message.
803 * Function called with message fragments from group members.
805 * Function called with the message fragments sent to the
806 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
807 * should be stored for answering replay requests later.
809 * Closure for the various callbacks that follow.
811 * @return Handle for the origin, NULL on error.
813 struct GNUNET_MULTICAST_Origin *
814 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
815 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
816 uint64_t max_fragment_id,
817 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
818 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
819 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
820 GNUNET_MULTICAST_RequestCallback request_cb,
821 GNUNET_MULTICAST_MessageCallback message_cb,
824 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
825 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
827 struct MulticastOriginStartMessage *start;
828 grp->connect_env = GNUNET_MQ_msg (start,
829 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
830 start->max_fragment_id = max_fragment_id;
831 start->group_key = *priv_key;
834 grp->is_origin = GNUNET_YES;
835 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
838 grp->join_req_cb = join_request_cb;
839 grp->replay_frag_cb = replay_frag_cb;
840 grp->replay_msg_cb = replay_msg_cb;
841 grp->message_cb = message_cb;
843 orig->request_cb = request_cb;
845 origin_connect (orig);
851 * Stop a multicast group.
854 * Multicast group to stop.
857 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
858 GNUNET_ContinuationCallback stop_cb,
861 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
862 struct GNUNET_MQ_Envelope *env;
864 grp->is_disconnecting = GNUNET_YES;
865 grp->disconnect_cb = stop_cb;
866 grp->disconnect_cls = stop_cls;
867 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
868 GNUNET_MQ_send (grp->mq, env);
873 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
875 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
876 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
877 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
878 GNUNET_assert (GNUNET_YES == grp->in_transmit);
880 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
881 struct GNUNET_MULTICAST_MessageHeader *msg;
882 struct GNUNET_MQ_Envelope *
883 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
884 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
886 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
888 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
889 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
891 LOG (GNUNET_ERROR_TYPE_ERROR,
892 "%p OriginTransmitNotify() returned error or invalid message size.\n",
894 /* FIXME: handle error */
895 GNUNET_MQ_discard (env);
899 if (GNUNET_NO == ret && 0 == buf_size)
901 LOG (GNUNET_ERROR_TYPE_DEBUG,
902 "%p OriginTransmitNotify() - transmission paused.\n", orig);
903 GNUNET_MQ_discard (env);
904 return; /* Transmission paused. */
907 msg->header.size = htons (sizeof (*msg) + buf_size);
908 msg->message_id = GNUNET_htonll (tmit->message_id);
909 msg->group_generation = tmit->group_generation;
910 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
911 tmit->fragment_offset += sizeof (*msg) + buf_size;
914 GNUNET_MQ_send (grp->mq, env);
916 if (GNUNET_YES == ret)
917 grp->in_transmit = GNUNET_NO;
922 * Send a message to the multicast group.
925 * Handle to the multicast group.
927 * Application layer ID for the message. Opaque to multicast.
928 * @param group_generation
929 * Group generation of the message.
930 * Documented in struct GNUNET_MULTICAST_MessageHeader.
932 * Function to call to get the message.
934 * Closure for @a notify.
936 * @return Message handle on success,
937 * NULL on error (i.e. another request is already pending).
939 struct GNUNET_MULTICAST_OriginTransmitHandle *
940 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
942 uint64_t group_generation,
943 GNUNET_MULTICAST_OriginTransmitNotify notify,
946 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
947 if (GNUNET_YES == grp->in_transmit)
949 grp->in_transmit = GNUNET_YES;
951 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
953 tmit->message_id = message_id;
954 tmit->fragment_offset = 0;
955 tmit->group_generation = group_generation;
956 tmit->notify = notify;
957 tmit->notify_cls = notify_cls;
959 origin_to_all (orig);
965 * Resume message transmission to multicast group.
968 * Transmission to cancel.
971 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
973 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
974 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
976 origin_to_all (th->origin);
981 * Cancel request for message transmission to multicast group.
984 * Transmission to cancel.
987 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
989 th->origin->grp.in_transmit = GNUNET_NO;
994 member_connect (struct GNUNET_MULTICAST_Member *mem);
998 member_reconnect (void *cls)
1000 member_connect (cls);
1005 * Member client disconnected from service.
1007 * Reconnect after backoff period.
1010 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1012 struct GNUNET_MULTICAST_Member *mem = cls;
1013 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1015 LOG (GNUNET_ERROR_TYPE_DEBUG,
1016 "Member client disconnected (%d), re-connecting\n",
1018 GNUNET_MQ_destroy (grp->mq);
1021 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1024 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1029 * Connect to service as member.
1032 member_connect (struct GNUNET_MULTICAST_Member *mem)
1034 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1036 struct GNUNET_MQ_MessageHandler handlers[] = {
1037 GNUNET_MQ_hd_var_size (group_message,
1038 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1039 struct GNUNET_MULTICAST_MessageHeader,
1041 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1042 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1043 struct GNUNET_MessageHeader,
1045 GNUNET_MQ_hd_var_size (group_join_request,
1046 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1047 struct MulticastJoinRequestMessage,
1049 GNUNET_MQ_hd_var_size (member_join_decision,
1050 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1051 struct MulticastJoinDecisionMessageHeader,
1053 GNUNET_MQ_hd_fixed_size (group_part_ack,
1054 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
1055 struct GNUNET_MessageHeader,
1057 GNUNET_MQ_hd_fixed_size (group_replay_request,
1058 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1059 struct MulticastReplayRequestMessage,
1061 GNUNET_MQ_hd_var_size (member_replay_response,
1062 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1063 struct MulticastReplayResponseMessage,
1065 GNUNET_MQ_handler_end ()
1068 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1069 handlers, member_disconnected, mem);
1070 GNUNET_assert (NULL != grp->mq);
1071 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1076 * Join a multicast group.
1078 * The entity joining is always the local peer. Further information about the
1079 * candidate can be provided in the @a join_request message. If the join fails, the
1080 * @a message_cb is invoked with a (failure) response and then with NULL. If
1081 * the join succeeds, outstanding (state) messages and ongoing multicast
1082 * messages will be given to the @a message_cb until the member decides to part
1083 * the group. The @a replay_cb function may be called at any time by the
1084 * multicast service to support relaying messages to other members of the group.
1087 * Configuration to use.
1089 * ECC public key that identifies the group to join.
1091 * ECC key that identifies the member
1092 * and used to sign requests sent to the origin.
1094 * Peer ID of the origin to send unicast requsets to. If NULL,
1095 * unicast requests are sent back via multiple hops on the reverse path
1096 * of multicast messages.
1097 * @param relay_count
1098 * Number of peers in the @a relays array.
1100 * Peer identities of members of the group, which serve as relays
1101 * and can be used to join the group at. and send the @a join_request to.
1102 * If empty, the @a join_request is sent directly to the @a origin.
1104 * Application-dependent join message to be passed to the peer @a origin.
1105 * @param join_request_cb
1106 * Function called to approve / disapprove joining of a peer.
1107 * @param join_decision_cb
1108 * Function called to inform about the join decision.
1109 * @param replay_frag_cb
1110 * Function that can be called to replay message fragments
1111 * this peer already knows from this group. NULL if this
1112 * client is unable to support replay.
1113 * @param replay_msg_cb
1114 * Function that can be called to replay message fragments
1115 * this peer already knows from this group. NULL if this
1116 * client is unable to support replay.
1118 * Function to be called for all message fragments we
1119 * receive from the group, excluding those our @a replay_cb
1122 * Closure for callbacks.
1124 * @return Handle for the member, NULL on error.
1126 struct GNUNET_MULTICAST_Member *
1127 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1128 const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1129 const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1130 const struct GNUNET_PeerIdentity *origin,
1131 uint16_t relay_count,
1132 const struct GNUNET_PeerIdentity *relays,
1133 const struct GNUNET_MessageHeader *join_msg,
1134 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1135 GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1136 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1137 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1138 GNUNET_MULTICAST_MessageCallback message_cb,
1141 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1142 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1144 uint16_t relay_size = relay_count * sizeof (*relays);
1145 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1146 struct MulticastMemberJoinMessage *join;
1147 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1148 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1149 join->group_pub_key = *group_pub_key;
1150 join->member_key = *member_key;
1151 join->origin = *origin;
1152 join->relay_count = ntohl (relay_count);
1154 GNUNET_memcpy (&join[1], relays, relay_size);
1155 if (0 < join_msg_size)
1156 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1159 grp->is_origin = GNUNET_NO;
1160 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1162 mem->join_dcsn_cb = join_decision_cb;
1163 grp->join_req_cb = join_request_cb;
1164 grp->replay_frag_cb = replay_frag_cb;
1165 grp->replay_msg_cb = replay_msg_cb;
1166 grp->message_cb = message_cb;
1169 member_connect (mem);
1175 * Part a multicast group.
1177 * Disconnects from all group members and invalidates the @a member handle.
1179 * An application-dependent part message can be transmitted beforehand using
1180 * #GNUNET_MULTICAST_member_to_origin())
1183 * Membership handle.
1186 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1187 GNUNET_ContinuationCallback part_cb,
1190 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1191 struct GNUNET_MQ_Envelope *env;
1193 mem->join_dcsn_cb = NULL;
1194 grp->join_req_cb = NULL;
1195 grp->message_cb = NULL;
1196 grp->replay_msg_cb = NULL;
1197 grp->replay_frag_cb = NULL;
1198 grp->is_disconnecting = GNUNET_YES;
1199 grp->disconnect_cb = part_cb;
1200 grp->disconnect_cls = part_cls;
1201 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
1202 GNUNET_MQ_send (grp->mq, env);
1207 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1208 uint64_t fragment_id,
1209 uint64_t message_id,
1210 uint64_t fragment_offset,
1213 struct MulticastReplayRequestMessage *rep;
1214 struct GNUNET_MQ_Envelope *
1215 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1217 rep->fragment_id = GNUNET_htonll (fragment_id);
1218 rep->message_id = GNUNET_htonll (message_id);
1219 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1220 rep->flags = GNUNET_htonll (flags);
1222 GNUNET_MQ_send (mem->grp.mq, env);
1227 * Request a fragment to be replayed by fragment ID.
1229 * Useful if messages below the @e max_known_fragment_id given when joining are
1230 * needed and not known to the client.
1233 * Membership handle.
1234 * @param fragment_id
1235 * ID of a message fragment that this client would like to see replayed.
1237 * Additional flags for the replay request.
1238 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1240 * @return Replay request handle.
1242 struct GNUNET_MULTICAST_MemberReplayHandle *
1243 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1244 uint64_t fragment_id,
1247 member_replay_request (mem, fragment_id, 0, 0, flags);
1248 // FIXME: return something useful
1254 * Request a message fragment to be replayed.
1256 * Useful if messages below the @e max_known_fragment_id given when joining are
1257 * needed and not known to the client.
1260 * Membership handle.
1262 * ID of the message this client would like to see replayed.
1263 * @param fragment_offset
1264 * Offset of the fragment within the message to replay.
1266 * Additional flags for the replay request.
1267 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1269 * @return Replay request handle, NULL on error.
1271 struct GNUNET_MULTICAST_MemberReplayHandle *
1272 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1273 uint64_t message_id,
1274 uint64_t fragment_offset,
1277 member_replay_request (mem, 0, message_id, fragment_offset, flags);
1278 // FIXME: return something useful
1284 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1286 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1287 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1288 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1289 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1291 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1292 struct GNUNET_MULTICAST_RequestHeader *req;
1293 struct GNUNET_MQ_Envelope *
1294 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1295 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1297 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1299 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1300 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1302 LOG (GNUNET_ERROR_TYPE_ERROR,
1303 "MemberTransmitNotify() returned error or invalid message size. "
1304 "ret=%d, buf_size=%u\n", ret, buf_size);
1305 /* FIXME: handle error */
1306 GNUNET_MQ_discard (env);
1310 if (GNUNET_NO == ret && 0 == buf_size)
1312 /* Transmission paused. */
1313 GNUNET_MQ_discard (env);
1317 req->header.size = htons (sizeof (*req) + buf_size);
1318 req->request_id = GNUNET_htonll (tmit->request_id);
1319 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1320 tmit->fragment_offset += sizeof (*req) + buf_size;
1322 GNUNET_MQ_send (grp->mq, env);
1324 if (GNUNET_YES == ret)
1325 grp->in_transmit = GNUNET_NO;
1330 * Send a message to the origin of the multicast group.
1333 * Membership handle.
1335 * Application layer ID for the request. Opaque to multicast.
1337 * Callback to call to get the message.
1339 * Closure for @a notify.
1341 * @return Handle to cancel request, NULL on error (i.e. request already pending).
1343 struct GNUNET_MULTICAST_MemberTransmitHandle *
1344 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1345 uint64_t request_id,
1346 GNUNET_MULTICAST_MemberTransmitNotify notify,
1349 if (GNUNET_YES == mem->grp.in_transmit)
1351 mem->grp.in_transmit = GNUNET_YES;
1353 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1355 tmit->request_id = request_id;
1356 tmit->fragment_offset = 0;
1357 tmit->notify = notify;
1358 tmit->notify_cls = notify_cls;
1360 member_to_origin (mem);
1366 * Resume message transmission to origin.
1369 * Transmission to cancel.
1372 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1374 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1375 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1377 member_to_origin (th->member);
1382 * Cancel request for message transmission to origin.
1385 * Transmission to cancel.
1388 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1390 th->member->grp.in_transmit = GNUNET_NO;
1394 /* end of multicast_api.c */