2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file multicast/multicast_api.c
21 * @brief Multicast service; implements multicast groups using CADET connections.
22 * @author Christian Grothoff
23 * @author Gabor X Toth
27 #include "gnunet_util_lib.h"
28 #include "gnunet_multicast_service.h"
29 #include "multicast.h"
31 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
35 * Handle for a request to send a message to all multicast group members
38 struct GNUNET_MULTICAST_OriginTransmitHandle
40 GNUNET_MULTICAST_OriginTransmitNotify notify;
42 struct GNUNET_MULTICAST_Origin *origin;
45 uint64_t group_generation;
46 uint64_t fragment_offset;
51 * Handle for a message to be delivered from a member to the origin.
53 struct GNUNET_MULTICAST_MemberTransmitHandle
55 GNUNET_MULTICAST_MemberTransmitNotify notify;
57 struct GNUNET_MULTICAST_Member *member;
60 uint64_t fragment_offset;
64 struct GNUNET_MULTICAST_Group
67 * Configuration to use.
69 const struct GNUNET_CONFIGURATION_Handle *cfg;
72 * Client connection to the service.
74 struct GNUNET_MQ_Handle *mq;
77 * Message to send on connect.
79 struct GNUNET_MQ_Envelope *connect_env;
82 * Time to wait until we try to reconnect on failure.
84 struct GNUNET_TIME_Relative reconnect_delay;
87 * Task for reconnecting when the listener fails.
89 struct GNUNET_SCHEDULER_Task *reconnect_task;
91 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
92 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
93 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
94 GNUNET_MULTICAST_MessageCallback message_cb;
98 * Function called after disconnected from the service.
100 GNUNET_ContinuationCallback disconnect_cb;
103 * Closure for @a disconnect_cb.
105 void *disconnect_cls;
108 * Are we currently transmitting a message?
113 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
115 uint8_t acks_pending;
118 * Is this the origin or a member?
123 * Is this channel in the process of disconnecting from the service?
124 * #GNUNET_YES or #GNUNET_NO
126 uint8_t is_disconnecting;
131 * Handle for the origin of a multicast group.
133 struct GNUNET_MULTICAST_Origin
135 struct GNUNET_MULTICAST_Group grp;
136 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
138 GNUNET_MULTICAST_RequestCallback request_cb;
143 * Handle for a multicast group member.
145 struct GNUNET_MULTICAST_Member
147 struct GNUNET_MULTICAST_Group grp;
148 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
150 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
153 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
155 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
157 uint64_t next_fragment_id;
162 * Handle that identifies a join request.
164 * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
165 * corresponding calls to #GNUNET_MULTICAST_join_decision().
167 struct GNUNET_MULTICAST_JoinHandle
169 struct GNUNET_MULTICAST_Group *group;
172 * Public key of the member requesting join.
174 struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
177 * Peer identity of the member requesting join.
179 struct GNUNET_PeerIdentity peer;
184 * Opaque handle to a replay request from the multicast service.
186 struct GNUNET_MULTICAST_ReplayHandle
188 struct GNUNET_MULTICAST_Group *grp;
189 struct MulticastReplayRequestMessage req;
194 * Handle for a replay request.
196 struct GNUNET_MULTICAST_MemberReplayHandle
202 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
205 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
209 * Check join request message.
212 check_group_join_request (void *cls,
213 const struct MulticastJoinRequestMessage *jreq)
215 uint16_t size = ntohs (jreq->header.size);
217 if (sizeof (*jreq) == size)
220 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
223 return GNUNET_SYSERR;
228 * Receive join request from service.
231 handle_group_join_request (void *cls,
232 const struct MulticastJoinRequestMessage *jreq)
234 struct GNUNET_MULTICAST_Group *grp = cls;
235 struct GNUNET_MULTICAST_JoinHandle *jh;
236 const struct GNUNET_MessageHeader *jmsg = NULL;
243 if (NULL == grp->join_req_cb)
246 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
247 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
249 jh = GNUNET_malloc (sizeof (*jh));
251 jh->member_pub_key = jreq->member_pub_key;
252 jh->peer = jreq->peer;
253 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
255 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
260 * Check multicast message.
263 check_group_message (void *cls,
264 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
271 * Receive multicast message from service.
274 handle_group_message (void *cls,
275 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
277 struct GNUNET_MULTICAST_Group *grp = cls;
279 if (GNUNET_YES == grp->is_disconnecting)
282 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
283 "Calling message callback with a message of size %u.\n",
284 ntohs (mmsg->header.size));
286 if (NULL != grp->message_cb)
287 grp->message_cb (grp->cb_cls, mmsg);
289 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
294 * Receive message/request fragment acknowledgement from service.
297 handle_group_fragment_ack (void *cls,
298 const struct GNUNET_MessageHeader *msg)
300 struct GNUNET_MULTICAST_Group *grp = cls;
302 LOG (GNUNET_ERROR_TYPE_DEBUG,
303 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
304 grp, grp->in_transmit, grp->acks_pending);
306 if (0 == grp->acks_pending)
308 LOG (GNUNET_ERROR_TYPE_DEBUG,
309 "%p Ignoring extraneous fragment ACK.\n", grp);
314 if (GNUNET_YES != grp->in_transmit)
317 if (GNUNET_YES == grp->is_origin)
318 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
320 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
322 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
327 * Check unicast request.
330 check_origin_request (void *cls,
331 const struct GNUNET_MULTICAST_RequestHeader *req)
338 * Origin receives unicast request from a member.
341 handle_origin_request (void *cls,
342 const struct GNUNET_MULTICAST_RequestHeader *req)
344 struct GNUNET_MULTICAST_Group *grp;
345 struct GNUNET_MULTICAST_Origin *orig = cls;
348 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
349 "Calling request callback with a request of size %u.\n",
350 ntohs (req->header.size));
352 if (NULL != orig->request_cb)
353 orig->request_cb (grp->cb_cls, req);
355 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
360 * Receive multicast replay request from service.
363 handle_group_replay_request (void *cls,
364 const struct MulticastReplayRequestMessage *rep)
367 struct GNUNET_MULTICAST_Group *grp = cls;
369 if (GNUNET_YES == grp->is_disconnecting)
372 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
374 if (0 != rep->fragment_id)
376 if (NULL != grp->replay_frag_cb)
378 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
381 grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
382 GNUNET_ntohll (rep->fragment_id),
383 GNUNET_ntohll (rep->flags), rh);
386 else if (0 != rep->message_id)
388 if (NULL != grp->replay_msg_cb)
390 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
393 grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
394 GNUNET_ntohll (rep->message_id),
395 GNUNET_ntohll (rep->fragment_offset),
396 GNUNET_ntohll (rep->flags), rh);
400 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
405 * Check replay response.
408 check_member_replay_response (void *cls,
409 const struct MulticastReplayResponseMessage *res)
411 uint16_t size = ntohs (res->header.size);
413 if (sizeof (*res) == size)
416 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
419 return GNUNET_SYSERR;
424 * Receive replay response from service.
427 handle_member_replay_response (void *cls,
428 const struct MulticastReplayResponseMessage *res)
430 struct GNUNET_MULTICAST_Group *grp;
431 struct GNUNET_MULTICAST_Member *mem = cls;
434 if (GNUNET_YES == grp->is_disconnecting)
437 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
439 // FIXME: return result
444 * Check join decision.
447 check_member_join_decision (void *cls,
448 const struct MulticastJoinDecisionMessageHeader *hdcsn)
450 return GNUNET_OK; // checked in handle below
455 * Member receives join decision.
458 handle_member_join_decision (void *cls,
459 const struct MulticastJoinDecisionMessageHeader *hdcsn)
461 struct GNUNET_MULTICAST_Group *grp;
462 struct GNUNET_MULTICAST_Member *mem = cls;
465 const struct MulticastJoinDecisionMessage *
466 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
468 uint16_t dcsn_size = ntohs (dcsn->header.size);
469 int is_admitted = ntohl (dcsn->is_admitted);
471 LOG (GNUNET_ERROR_TYPE_DEBUG,
472 "%p Member got join decision from multicast: %d\n",
475 const struct GNUNET_MessageHeader *join_resp = NULL;
476 uint16_t join_resp_size = 0;
478 uint16_t relay_count = ntohl (dcsn->relay_count);
479 const struct GNUNET_PeerIdentity *relays = NULL;
480 uint16_t relay_size = relay_count * sizeof (*relays);
483 if (dcsn_size < sizeof (*dcsn) + relay_size)
486 is_admitted = GNUNET_SYSERR;
490 relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
494 if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
496 join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
497 join_resp_size = ntohs (join_resp->size);
499 if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
501 LOG (GNUNET_ERROR_TYPE_DEBUG,
502 "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
503 dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
505 is_admitted = GNUNET_SYSERR;
508 if (NULL != mem->join_dcsn_cb)
509 mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
510 relay_count, relays, join_resp);
513 //if (GNUNET_YES != is_admitted)
514 // GNUNET_MULTICAST_member_part (mem);
516 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
521 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
523 if (NULL != grp->connect_env)
525 GNUNET_MQ_discard (grp->connect_env);
526 grp->connect_env = NULL;
530 GNUNET_MQ_destroy (grp->mq);
533 if (NULL != grp->disconnect_cb)
535 grp->disconnect_cb (grp->disconnect_cls);
536 grp->disconnect_cb = NULL;
543 handle_group_part_ack (void *cls,
544 const struct GNUNET_MessageHeader *msg)
546 struct GNUNET_MULTICAST_Group *grp = cls;
553 * Function to call with the decision made for a join request.
555 * Must be called once and only once in response to an invocation of the
556 * #GNUNET_MULTICAST_JoinRequestCallback.
559 * Join request handle.
561 * #GNUNET_YES if the join is approved,
562 * #GNUNET_NO if it is disapproved,
563 * #GNUNET_SYSERR if we cannot answer the request.
565 * Number of relays given.
567 * Array of suggested peers that might be useful relays to use
568 * when joining the multicast group (essentially a list of peers that
569 * are already part of the multicast group and might thus be willing
570 * to help with routing). If empty, only this local peer (which must
571 * be the multicast origin) is a good candidate for building the
572 * multicast tree. Note that it is unnecessary to specify our own
573 * peer identity in this array.
575 * Message to send in response to the joining peer;
576 * can also be used to redirect the peer to a different group at the
577 * application layer; this response is to be transmitted to the
578 * peer that issued the request even if admission is denied.
580 struct GNUNET_MULTICAST_ReplayHandle *
581 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
583 uint16_t relay_count,
584 const struct GNUNET_PeerIdentity *relays,
585 const struct GNUNET_MessageHeader *join_resp)
587 struct GNUNET_MULTICAST_Group *grp = join->group;
588 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
589 uint16_t relay_size = relay_count * sizeof (*relays);
591 struct MulticastJoinDecisionMessageHeader *hdcsn;
592 struct MulticastJoinDecisionMessage *dcsn;
593 struct GNUNET_MQ_Envelope *
594 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
595 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
596 hdcsn->member_pub_key = join->member_pub_key;
597 hdcsn->peer = join->peer;
599 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
600 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
601 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
602 dcsn->is_admitted = htonl (is_admitted);
603 dcsn->relay_count = htonl (relay_count);
605 GNUNET_memcpy (&dcsn[1], relays, relay_size);
606 if (0 < join_resp_size)
607 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
609 GNUNET_MQ_send (grp->mq, env);
616 * Replay a message fragment for the multicast group.
619 * Replay handle identifying which replay operation was requested.
621 * Replayed message fragment, NULL if not found / an error occurred.
623 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
624 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
627 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
628 const struct GNUNET_MessageHeader *msg,
629 enum GNUNET_MULTICAST_ReplayErrorCode ec)
631 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
632 struct MulticastReplayResponseMessage *res;
633 struct GNUNET_MQ_Envelope *
634 env = GNUNET_MQ_msg_extra (res, msg_size,
635 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
636 res->fragment_id = rh->req.fragment_id;
637 res->message_id = rh->req.message_id;
638 res->fragment_offset = rh->req.fragment_offset;
639 res->flags = rh->req.flags;
640 res->error_code = htonl (ec);
642 if (GNUNET_MULTICAST_REC_OK == ec)
644 GNUNET_assert (NULL != msg);
645 GNUNET_memcpy (&res[1], msg, msg_size);
648 GNUNET_MQ_send (rh->grp->mq, env);
650 if (GNUNET_MULTICAST_REC_OK != ec)
656 * Indicate the end of the replay session.
658 * Invalidates the replay handle.
661 * Replay session to end.
664 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
666 struct MulticastReplayResponseMessage *end;
667 struct GNUNET_MQ_Envelope *
668 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
670 end->fragment_id = rh->req.fragment_id;
671 end->message_id = rh->req.message_id;
672 end->fragment_offset = rh->req.fragment_offset;
673 end->flags = rh->req.flags;
675 GNUNET_MQ_send (rh->grp->mq, env);
681 * Replay a message for the multicast group.
684 * Replay handle identifying which replay operation was requested.
686 * Function to call to get the message.
688 * Closure for @a notify.
691 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
692 GNUNET_MULTICAST_ReplayTransmitNotify notify,
699 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
703 origin_reconnect (void *cls)
705 origin_connect (cls);
710 * Origin client disconnected from service.
712 * Reconnect after backoff period.
715 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
717 struct GNUNET_MULTICAST_Origin *orig = cls;
718 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
720 LOG (GNUNET_ERROR_TYPE_DEBUG,
721 "Origin client disconnected (%d), re-connecting\n",
725 GNUNET_MQ_destroy (grp->mq);
729 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
732 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
737 * Connect to service as origin.
740 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
742 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
744 struct GNUNET_MQ_MessageHandler handlers[] = {
745 GNUNET_MQ_hd_var_size (group_message,
746 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
747 struct GNUNET_MULTICAST_MessageHeader,
749 GNUNET_MQ_hd_var_size (origin_request,
750 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
751 struct GNUNET_MULTICAST_RequestHeader,
753 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
754 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
755 struct GNUNET_MessageHeader,
757 GNUNET_MQ_hd_var_size (group_join_request,
758 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
759 struct MulticastJoinRequestMessage,
761 GNUNET_MQ_hd_fixed_size (group_part_ack,
762 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
763 struct GNUNET_MessageHeader,
765 GNUNET_MQ_hd_fixed_size (group_replay_request,
766 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
767 struct MulticastReplayRequestMessage,
769 GNUNET_MQ_handler_end ()
772 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
773 handlers, origin_disconnected, orig);
774 GNUNET_assert (NULL != grp->mq);
775 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
780 * Start a multicast group.
782 * Will advertise the origin in the P2P overlay network under the respective
783 * public key so that other peer can find this peer to join it. Peers that
784 * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
785 * either an existing group member or to the origin. If the joining is
786 * approved, the member is cleared for @e replay and will begin to receive
787 * messages transmitted to the group. If joining is disapproved, the failed
788 * candidate will be given a response. Members in the group can send messages
789 * to the origin (one at a time).
792 * Configuration to use.
794 * ECC key that will be used to sign messages for this
795 * multicast session; public key is used to identify the multicast group;
796 * @param max_fragment_id
797 * Maximum fragment ID already sent to the group.
799 * @param join_request_cb
800 * Function called to approve / disapprove joining of a peer.
801 * @param replay_frag_cb
802 * Function that can be called to replay a message fragment.
803 * @param replay_msg_cb
804 * Function that can be called to replay a message.
806 * Function called with message fragments from group members.
808 * Function called with the message fragments sent to the
809 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
810 * should be stored for answering replay requests later.
812 * Closure for the various callbacks that follow.
814 * @return Handle for the origin, NULL on error.
816 struct GNUNET_MULTICAST_Origin *
817 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
818 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
819 uint64_t max_fragment_id,
820 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
821 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
822 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
823 GNUNET_MULTICAST_RequestCallback request_cb,
824 GNUNET_MULTICAST_MessageCallback message_cb,
827 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
828 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
830 struct MulticastOriginStartMessage *start;
831 grp->connect_env = GNUNET_MQ_msg (start,
832 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
833 start->max_fragment_id = max_fragment_id;
834 start->group_key = *priv_key;
837 grp->is_origin = GNUNET_YES;
838 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
841 grp->join_req_cb = join_request_cb;
842 grp->replay_frag_cb = replay_frag_cb;
843 grp->replay_msg_cb = replay_msg_cb;
844 grp->message_cb = message_cb;
846 orig->request_cb = request_cb;
848 origin_connect (orig);
854 * Stop a multicast group.
857 * Multicast group to stop.
860 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
861 GNUNET_ContinuationCallback stop_cb,
864 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
865 struct GNUNET_MQ_Envelope *env;
867 grp->is_disconnecting = GNUNET_YES;
868 grp->disconnect_cb = stop_cb;
869 grp->disconnect_cls = stop_cls;
870 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
871 GNUNET_MQ_send (grp->mq, env);
876 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
878 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
879 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
880 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
881 GNUNET_assert (GNUNET_YES == grp->in_transmit);
883 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
884 struct GNUNET_MULTICAST_MessageHeader *msg;
885 struct GNUNET_MQ_Envelope *
886 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
887 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
889 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
891 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
892 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
894 LOG (GNUNET_ERROR_TYPE_ERROR,
895 "%p OriginTransmitNotify() returned error or invalid message size.\n",
897 /* FIXME: handle error */
898 GNUNET_MQ_discard (env);
902 if (GNUNET_NO == ret && 0 == buf_size)
904 LOG (GNUNET_ERROR_TYPE_DEBUG,
905 "%p OriginTransmitNotify() - transmission paused.\n", orig);
906 GNUNET_MQ_discard (env);
907 return; /* Transmission paused. */
910 msg->header.size = htons (sizeof (*msg) + buf_size);
911 msg->message_id = GNUNET_htonll (tmit->message_id);
912 msg->group_generation = tmit->group_generation;
913 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
914 tmit->fragment_offset += sizeof (*msg) + buf_size;
917 GNUNET_MQ_send (grp->mq, env);
919 if (GNUNET_YES == ret)
920 grp->in_transmit = GNUNET_NO;
925 * Send a message to the multicast group.
928 * Handle to the multicast group.
930 * Application layer ID for the message. Opaque to multicast.
931 * @param group_generation
932 * Group generation of the message.
933 * Documented in struct GNUNET_MULTICAST_MessageHeader.
935 * Function to call to get the message.
937 * Closure for @a notify.
939 * @return Message handle on success,
940 * NULL on error (i.e. another request is already pending).
942 struct GNUNET_MULTICAST_OriginTransmitHandle *
943 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
945 uint64_t group_generation,
946 GNUNET_MULTICAST_OriginTransmitNotify notify,
949 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
950 if (GNUNET_YES == grp->in_transmit)
952 grp->in_transmit = GNUNET_YES;
954 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
956 tmit->message_id = message_id;
957 tmit->fragment_offset = 0;
958 tmit->group_generation = group_generation;
959 tmit->notify = notify;
960 tmit->notify_cls = notify_cls;
962 origin_to_all (orig);
968 * Resume message transmission to multicast group.
971 * Transmission to cancel.
974 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
976 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
977 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
979 origin_to_all (th->origin);
984 * Cancel request for message transmission to multicast group.
987 * Transmission to cancel.
990 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
992 th->origin->grp.in_transmit = GNUNET_NO;
997 member_connect (struct GNUNET_MULTICAST_Member *mem);
1001 member_reconnect (void *cls)
1003 member_connect (cls);
1008 * Member client disconnected from service.
1010 * Reconnect after backoff period.
1013 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1015 struct GNUNET_MULTICAST_Member *mem = cls;
1016 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1018 LOG (GNUNET_ERROR_TYPE_DEBUG,
1019 "Member client disconnected (%d), re-connecting\n",
1021 GNUNET_MQ_destroy (grp->mq);
1024 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
1027 grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
1032 * Connect to service as member.
1035 member_connect (struct GNUNET_MULTICAST_Member *mem)
1037 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1039 struct GNUNET_MQ_MessageHandler handlers[] = {
1040 GNUNET_MQ_hd_var_size (group_message,
1041 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1042 struct GNUNET_MULTICAST_MessageHeader,
1044 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1045 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1046 struct GNUNET_MessageHeader,
1048 GNUNET_MQ_hd_var_size (group_join_request,
1049 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1050 struct MulticastJoinRequestMessage,
1052 GNUNET_MQ_hd_var_size (member_join_decision,
1053 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1054 struct MulticastJoinDecisionMessageHeader,
1056 GNUNET_MQ_hd_fixed_size (group_part_ack,
1057 GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
1058 struct GNUNET_MessageHeader,
1060 GNUNET_MQ_hd_fixed_size (group_replay_request,
1061 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1062 struct MulticastReplayRequestMessage,
1064 GNUNET_MQ_hd_var_size (member_replay_response,
1065 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1066 struct MulticastReplayResponseMessage,
1068 GNUNET_MQ_handler_end ()
1071 grp->mq = GNUNET_CLIENT_connect (grp->cfg, "multicast",
1072 handlers, member_disconnected, mem);
1073 GNUNET_assert (NULL != grp->mq);
1074 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1079 * Join a multicast group.
1081 * The entity joining is always the local peer. Further information about the
1082 * candidate can be provided in the @a join_request message. If the join fails, the
1083 * @a message_cb is invoked with a (failure) response and then with NULL. If
1084 * the join succeeds, outstanding (state) messages and ongoing multicast
1085 * messages will be given to the @a message_cb until the member decides to part
1086 * the group. The @a replay_cb function may be called at any time by the
1087 * multicast service to support relaying messages to other members of the group.
1090 * Configuration to use.
1092 * ECC public key that identifies the group to join.
1094 * ECC key that identifies the member
1095 * and used to sign requests sent to the origin.
1097 * Peer ID of the origin to send unicast requsets to. If NULL,
1098 * unicast requests are sent back via multiple hops on the reverse path
1099 * of multicast messages.
1100 * @param relay_count
1101 * Number of peers in the @a relays array.
1103 * Peer identities of members of the group, which serve as relays
1104 * and can be used to join the group at. and send the @a join_request to.
1105 * If empty, the @a join_request is sent directly to the @a origin.
1107 * Application-dependent join message to be passed to the peer @a origin.
1108 * @param join_request_cb
1109 * Function called to approve / disapprove joining of a peer.
1110 * @param join_decision_cb
1111 * Function called to inform about the join decision.
1112 * @param replay_frag_cb
1113 * Function that can be called to replay message fragments
1114 * this peer already knows from this group. NULL if this
1115 * client is unable to support replay.
1116 * @param replay_msg_cb
1117 * Function that can be called to replay message fragments
1118 * this peer already knows from this group. NULL if this
1119 * client is unable to support replay.
1121 * Function to be called for all message fragments we
1122 * receive from the group, excluding those our @a replay_cb
1125 * Closure for callbacks.
1127 * @return Handle for the member, NULL on error.
1129 struct GNUNET_MULTICAST_Member *
1130 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1131 const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1132 const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1133 const struct GNUNET_PeerIdentity *origin,
1134 uint16_t relay_count,
1135 const struct GNUNET_PeerIdentity *relays,
1136 const struct GNUNET_MessageHeader *join_msg,
1137 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1138 GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1139 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1140 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1141 GNUNET_MULTICAST_MessageCallback message_cb,
1144 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1145 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1147 uint16_t relay_size = relay_count * sizeof (*relays);
1148 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1149 struct MulticastMemberJoinMessage *join;
1150 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1151 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1152 join->group_pub_key = *group_pub_key;
1153 join->member_key = *member_key;
1154 join->origin = *origin;
1155 join->relay_count = ntohl (relay_count);
1157 GNUNET_memcpy (&join[1], relays, relay_size);
1158 if (0 < join_msg_size)
1159 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1162 grp->is_origin = GNUNET_NO;
1163 grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1165 mem->join_dcsn_cb = join_decision_cb;
1166 grp->join_req_cb = join_request_cb;
1167 grp->replay_frag_cb = replay_frag_cb;
1168 grp->replay_msg_cb = replay_msg_cb;
1169 grp->message_cb = message_cb;
1172 member_connect (mem);
1178 * Part a multicast group.
1180 * Disconnects from all group members and invalidates the @a member handle.
1182 * An application-dependent part message can be transmitted beforehand using
1183 * #GNUNET_MULTICAST_member_to_origin())
1186 * Membership handle.
1189 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1190 GNUNET_ContinuationCallback part_cb,
1193 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1194 struct GNUNET_MQ_Envelope *env;
1196 mem->join_dcsn_cb = NULL;
1197 grp->join_req_cb = NULL;
1198 grp->message_cb = NULL;
1199 grp->replay_msg_cb = NULL;
1200 grp->replay_frag_cb = NULL;
1201 grp->is_disconnecting = GNUNET_YES;
1202 grp->disconnect_cb = part_cb;
1203 grp->disconnect_cls = part_cls;
1204 env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
1205 GNUNET_MQ_send (grp->mq, env);
1210 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1211 uint64_t fragment_id,
1212 uint64_t message_id,
1213 uint64_t fragment_offset,
1216 struct MulticastReplayRequestMessage *rep;
1217 struct GNUNET_MQ_Envelope *
1218 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1220 rep->fragment_id = GNUNET_htonll (fragment_id);
1221 rep->message_id = GNUNET_htonll (message_id);
1222 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1223 rep->flags = GNUNET_htonll (flags);
1225 GNUNET_MQ_send (mem->grp.mq, env);
1230 * Request a fragment to be replayed by fragment ID.
1232 * Useful if messages below the @e max_known_fragment_id given when joining are
1233 * needed and not known to the client.
1236 * Membership handle.
1237 * @param fragment_id
1238 * ID of a message fragment that this client would like to see replayed.
1240 * Additional flags for the replay request.
1241 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1243 * @return Replay request handle.
1245 struct GNUNET_MULTICAST_MemberReplayHandle *
1246 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1247 uint64_t fragment_id,
1250 member_replay_request (mem, fragment_id, 0, 0, flags);
1251 // FIXME: return something useful
1257 * Request a message fragment to be replayed.
1259 * Useful if messages below the @e max_known_fragment_id given when joining are
1260 * needed and not known to the client.
1263 * Membership handle.
1265 * ID of the message this client would like to see replayed.
1266 * @param fragment_offset
1267 * Offset of the fragment within the message to replay.
1269 * Additional flags for the replay request.
1270 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1272 * @return Replay request handle, NULL on error.
1274 struct GNUNET_MULTICAST_MemberReplayHandle *
1275 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1276 uint64_t message_id,
1277 uint64_t fragment_offset,
1280 member_replay_request (mem, 0, message_id, fragment_offset, flags);
1281 // FIXME: return something useful
1287 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1289 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1290 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1291 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1292 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1294 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1295 struct GNUNET_MULTICAST_RequestHeader *req;
1296 struct GNUNET_MQ_Envelope *
1297 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1298 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1300 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1302 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1303 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1305 LOG (GNUNET_ERROR_TYPE_ERROR,
1306 "MemberTransmitNotify() returned error or invalid message size. "
1307 "ret=%d, buf_size=%u\n", ret, buf_size);
1308 /* FIXME: handle error */
1309 GNUNET_MQ_discard (env);
1313 if (GNUNET_NO == ret && 0 == buf_size)
1315 /* Transmission paused. */
1316 GNUNET_MQ_discard (env);
1320 req->header.size = htons (sizeof (*req) + buf_size);
1321 req->request_id = GNUNET_htonll (tmit->request_id);
1322 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1323 tmit->fragment_offset += sizeof (*req) + buf_size;
1325 GNUNET_MQ_send (grp->mq, env);
1327 if (GNUNET_YES == ret)
1328 grp->in_transmit = GNUNET_NO;
1333 * Send a message to the origin of the multicast group.
1336 * Membership handle.
1338 * Application layer ID for the request. Opaque to multicast.
1340 * Callback to call to get the message.
1342 * Closure for @a notify.
1344 * @return Handle to cancel request, NULL on error (i.e. request already pending).
1346 struct GNUNET_MULTICAST_MemberTransmitHandle *
1347 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1348 uint64_t request_id,
1349 GNUNET_MULTICAST_MemberTransmitNotify notify,
1352 if (GNUNET_YES == mem->grp.in_transmit)
1354 mem->grp.in_transmit = GNUNET_YES;
1356 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1358 tmit->request_id = request_id;
1359 tmit->fragment_offset = 0;
1360 tmit->notify = notify;
1361 tmit->notify_cls = notify_cls;
1363 member_to_origin (mem);
1369 * Resume message transmission to origin.
1372 * Transmission to cancel.
1375 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1377 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1378 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1380 member_to_origin (th->member);
1385 * Cancel request for message transmission to origin.
1388 * Transmission to cancel.
1391 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1393 th->member->grp.in_transmit = GNUNET_NO;
1397 /* end of multicast_api.c */