2 This file is part of GNUnet.
3 Copyright (C) 2012, 2013 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file multicast/multicast_api.c
23 * @brief Multicast service; implements multicast groups using CADET connections.
24 * @author Christian Grothoff
25 * @author Gabor X Toth
29 #include "gnunet_util_lib.h"
30 #include "gnunet_mq_lib.h"
31 #include "gnunet_multicast_service.h"
32 #include "multicast.h"
34 #define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
38 * Handle for a request to send a message to all multicast group members
41 struct GNUNET_MULTICAST_OriginTransmitHandle
43 GNUNET_MULTICAST_OriginTransmitNotify notify;
45 struct GNUNET_MULTICAST_Origin *origin;
48 uint64_t group_generation;
49 uint64_t fragment_offset;
54 * Handle for a message to be delivered from a member to the origin.
56 struct GNUNET_MULTICAST_MemberTransmitHandle
58 GNUNET_MULTICAST_MemberTransmitNotify notify;
60 struct GNUNET_MULTICAST_Member *member;
63 uint64_t fragment_offset;
67 struct GNUNET_MULTICAST_Group
70 * Configuration to use.
72 const struct GNUNET_CONFIGURATION_Handle *cfg;
75 * Client connection to the service.
77 struct GNUNET_MQ_Handle *mq;
80 * Time to wait until we try to reconnect on failure.
82 struct GNUNET_TIME_Relative reconnect_backoff;
85 * Task for reconnecting when the listener fails.
87 struct GNUNET_SCHEDULER_Task *reconnect_task;
90 * Message to send on connect.
92 struct GNUNET_MQ_Envelope *connect_env;
94 GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
95 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
96 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
97 GNUNET_MULTICAST_MessageCallback message_cb;
101 * Function called after disconnected from the service.
103 GNUNET_ContinuationCallback disconnect_cb;
106 * Closure for @a disconnect_cb.
108 void *disconnect_cls;
111 * Are we currently transmitting a message?
116 * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
118 uint8_t acks_pending;
121 * Is this the origin or a member?
126 * Is this channel in the process of disconnecting from the service?
127 * #GNUNET_YES or #GNUNET_NO
129 uint8_t is_disconnecting;
134 * Handle for the origin of a multicast group.
136 struct GNUNET_MULTICAST_Origin
138 struct GNUNET_MULTICAST_Group grp;
139 struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
141 GNUNET_MULTICAST_RequestCallback request_cb;
146 * Handle for a multicast group member.
148 struct GNUNET_MULTICAST_Member
150 struct GNUNET_MULTICAST_Group grp;
151 struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
153 GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
156 * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
158 struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
160 uint64_t next_fragment_id;
165 * Handle that identifies a join request.
167 * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
168 * corresponding calls to #GNUNET_MULTICAST_join_decision().
170 struct GNUNET_MULTICAST_JoinHandle
172 struct GNUNET_MULTICAST_Group *group;
175 * Public key of the member requesting join.
177 struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
180 * Peer identity of the member requesting join.
182 struct GNUNET_PeerIdentity peer;
187 * Opaque handle to a replay request from the multicast service.
189 struct GNUNET_MULTICAST_ReplayHandle
191 struct GNUNET_MULTICAST_Group *grp;
192 struct MulticastReplayRequestMessage req;
197 * Handle for a replay request.
199 struct GNUNET_MULTICAST_MemberReplayHandle
205 origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
208 member_to_origin (struct GNUNET_MULTICAST_Member *mem);
212 * Check join request message.
215 check_group_join_request (void *cls,
216 const struct MulticastJoinRequestMessage *jreq)
218 uint16_t size = ntohs (jreq->header.size);
220 if (sizeof (*jreq) == size)
223 if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
226 return GNUNET_SYSERR;
231 * Receive join request from service.
234 handle_group_join_request (void *cls,
235 const struct MulticastJoinRequestMessage *jreq)
237 struct GNUNET_MULTICAST_Group *grp = cls;
238 struct GNUNET_MULTICAST_JoinHandle *jh;
239 const struct GNUNET_MessageHeader *jmsg = NULL;
246 if (NULL == grp->join_req_cb)
249 if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
250 jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
252 jh = GNUNET_malloc (sizeof (*jh));
254 jh->member_pub_key = jreq->member_pub_key;
255 jh->peer = jreq->peer;
256 grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
258 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
263 * Check multicast message.
266 check_group_message (void *cls,
267 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
274 * Receive multicast message from service.
277 handle_group_message (void *cls,
278 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
280 struct GNUNET_MULTICAST_Group *grp = cls;
282 if (GNUNET_YES == grp->is_disconnecting)
285 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
286 "Calling message callback with a message of size %u.\n",
287 ntohs (mmsg->header.size));
289 if (NULL != grp->message_cb)
290 grp->message_cb (grp->cb_cls, mmsg);
292 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
297 * Receive message/request fragment acknowledgement from service.
300 handle_group_fragment_ack (void *cls,
301 const struct GNUNET_MessageHeader *msg)
303 struct GNUNET_MULTICAST_Group *grp = cls;
305 LOG (GNUNET_ERROR_TYPE_DEBUG,
306 "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
307 grp, grp->in_transmit, grp->acks_pending);
309 if (0 == grp->acks_pending)
311 LOG (GNUNET_ERROR_TYPE_DEBUG,
312 "%p Ignoring extraneous fragment ACK.\n", grp);
317 if (GNUNET_YES != grp->in_transmit)
320 if (GNUNET_YES == grp->is_origin)
321 origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
323 member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
325 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
330 * Check unicast request.
333 check_origin_request (void *cls,
334 const struct GNUNET_MULTICAST_RequestHeader *req)
341 * Origin receives unicast request from a member.
344 handle_origin_request (void *cls,
345 const struct GNUNET_MULTICAST_RequestHeader *req)
347 struct GNUNET_MULTICAST_Group *grp;
348 struct GNUNET_MULTICAST_Origin *orig = cls;
351 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
352 "Calling request callback with a request of size %u.\n",
353 ntohs (req->header.size));
355 if (NULL != orig->request_cb)
356 orig->request_cb (grp->cb_cls, req);
358 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
363 * Receive multicast replay request from service.
366 handle_group_replay_request (void *cls,
367 const struct MulticastReplayRequestMessage *rep)
370 struct GNUNET_MULTICAST_Group *grp = cls;
372 if (GNUNET_YES == grp->is_disconnecting)
375 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
377 if (0 != rep->fragment_id)
379 if (NULL != grp->replay_frag_cb)
381 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
384 grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
385 GNUNET_ntohll (rep->fragment_id),
386 GNUNET_ntohll (rep->flags), rh);
389 else if (0 != rep->message_id)
391 if (NULL != grp->replay_msg_cb)
393 struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
396 grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
397 GNUNET_ntohll (rep->message_id),
398 GNUNET_ntohll (rep->fragment_offset),
399 GNUNET_ntohll (rep->flags), rh);
403 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
408 * Check replay response.
411 check_member_replay_response (void *cls,
412 const struct MulticastReplayResponseMessage *res)
414 uint16_t size = ntohs (res->header.size);
416 if (sizeof (*res) == size)
419 if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
422 return GNUNET_SYSERR;
427 * Receive replay response from service.
430 handle_member_replay_response (void *cls,
431 const struct MulticastReplayResponseMessage *res)
433 struct GNUNET_MULTICAST_Group *grp;
434 struct GNUNET_MULTICAST_Member *mem = cls;
437 if (GNUNET_YES == grp->is_disconnecting)
440 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
442 // FIXME: return result
447 * Check join decision.
450 check_member_join_decision (void *cls,
451 const struct MulticastJoinDecisionMessageHeader *hdcsn)
453 return GNUNET_OK; // checked in handle below
458 * Member receives join decision.
461 handle_member_join_decision (void *cls,
462 const struct MulticastJoinDecisionMessageHeader *hdcsn)
464 struct GNUNET_MULTICAST_Group *grp;
465 struct GNUNET_MULTICAST_Member *mem = cls;
468 const struct MulticastJoinDecisionMessage *
469 dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
471 uint16_t dcsn_size = ntohs (dcsn->header.size);
472 int is_admitted = ntohl (dcsn->is_admitted);
474 LOG (GNUNET_ERROR_TYPE_DEBUG,
475 "%p Member got join decision from multicast: %d\n",
478 const struct GNUNET_MessageHeader *join_resp = NULL;
479 uint16_t join_resp_size = 0;
481 uint16_t relay_count = ntohl (dcsn->relay_count);
482 const struct GNUNET_PeerIdentity *relays = NULL;
483 uint16_t relay_size = relay_count * sizeof (*relays);
486 if (dcsn_size < sizeof (*dcsn) + relay_size)
489 is_admitted = GNUNET_SYSERR;
493 relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
497 if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
499 join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
500 join_resp_size = ntohs (join_resp->size);
502 if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
504 LOG (GNUNET_ERROR_TYPE_DEBUG,
505 "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
506 dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
508 is_admitted = GNUNET_SYSERR;
511 if (NULL != mem->join_dcsn_cb)
512 mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
513 relay_count, relays, join_resp);
516 //if (GNUNET_YES != is_admitted)
517 // GNUNET_MULTICAST_member_part (mem);
519 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
524 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
526 GNUNET_free (grp->connect_env);
527 if (NULL != grp->disconnect_cb)
528 grp->disconnect_cb (grp->disconnect_cls);
533 origin_cleanup (void *cls)
535 struct GNUNET_MULTICAST_Origin *orig = cls;
536 group_cleanup (&orig->grp);
542 member_cleanup (void *cls)
544 struct GNUNET_MULTICAST_Member *mem = cls;
545 group_cleanup (&mem->grp);
551 * Function to call with the decision made for a join request.
553 * Must be called once and only once in response to an invocation of the
554 * #GNUNET_MULTICAST_JoinRequestCallback.
557 * Join request handle.
559 * #GNUNET_YES if the join is approved,
560 * #GNUNET_NO if it is disapproved,
561 * #GNUNET_SYSERR if we cannot answer the request.
563 * Number of relays given.
565 * Array of suggested peers that might be useful relays to use
566 * when joining the multicast group (essentially a list of peers that
567 * are already part of the multicast group and might thus be willing
568 * to help with routing). If empty, only this local peer (which must
569 * be the multicast origin) is a good candidate for building the
570 * multicast tree. Note that it is unnecessary to specify our own
571 * peer identity in this array.
573 * Message to send in response to the joining peer;
574 * can also be used to redirect the peer to a different group at the
575 * application layer; this response is to be transmitted to the
576 * peer that issued the request even if admission is denied.
578 struct GNUNET_MULTICAST_ReplayHandle *
579 GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
581 uint16_t relay_count,
582 const struct GNUNET_PeerIdentity *relays,
583 const struct GNUNET_MessageHeader *join_resp)
585 struct GNUNET_MULTICAST_Group *grp = join->group;
586 uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
587 uint16_t relay_size = relay_count * sizeof (*relays);
589 struct MulticastJoinDecisionMessageHeader *hdcsn;
590 struct MulticastJoinDecisionMessage *dcsn;
591 struct GNUNET_MQ_Envelope *
592 env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
593 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
594 hdcsn->member_pub_key = join->member_pub_key;
595 hdcsn->peer = join->peer;
597 dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
598 dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
599 dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
600 dcsn->is_admitted = htonl (is_admitted);
601 dcsn->relay_count = htonl (relay_count);
603 GNUNET_memcpy (&dcsn[1], relays, relay_size);
604 if (0 < join_resp_size)
605 GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
607 GNUNET_MQ_send (grp->mq, env);
614 * Replay a message fragment for the multicast group.
617 * Replay handle identifying which replay operation was requested.
619 * Replayed message fragment, NULL if not found / an error occurred.
621 * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
622 * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
625 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
626 const struct GNUNET_MessageHeader *msg,
627 enum GNUNET_MULTICAST_ReplayErrorCode ec)
629 uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
630 struct MulticastReplayResponseMessage *res;
631 struct GNUNET_MQ_Envelope *
632 env = GNUNET_MQ_msg_extra (res, msg_size,
633 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
634 res->fragment_id = rh->req.fragment_id;
635 res->message_id = rh->req.message_id;
636 res->fragment_offset = rh->req.fragment_offset;
637 res->flags = rh->req.flags;
638 res->error_code = htonl (ec);
640 if (GNUNET_MULTICAST_REC_OK == ec)
642 GNUNET_assert (NULL != msg);
643 GNUNET_memcpy (&res[1], msg, msg_size);
646 GNUNET_MQ_send (rh->grp->mq, env);
648 if (GNUNET_MULTICAST_REC_OK != ec)
654 * Indicate the end of the replay session.
656 * Invalidates the replay handle.
659 * Replay session to end.
662 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
664 struct MulticastReplayResponseMessage *end;
665 struct GNUNET_MQ_Envelope *
666 env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
668 end->fragment_id = rh->req.fragment_id;
669 end->message_id = rh->req.message_id;
670 end->fragment_offset = rh->req.fragment_offset;
671 end->flags = rh->req.flags;
673 GNUNET_MQ_send (rh->grp->mq, env);
679 * Replay a message for the multicast group.
682 * Replay handle identifying which replay operation was requested.
684 * Function to call to get the message.
686 * Closure for @a notify.
689 GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
690 GNUNET_MULTICAST_ReplayTransmitNotify notify,
697 origin_connect (struct GNUNET_MULTICAST_Origin *orig);
701 origin_reconnect (void *cls)
703 origin_connect (cls);
708 * Origin client disconnected from service.
710 * Reconnect after backoff period.=
713 origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
715 struct GNUNET_MULTICAST_Origin *orig = cls;
716 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
718 LOG (GNUNET_ERROR_TYPE_DEBUG,
719 "Origin client disconnected (%d), re-connecting\n",
723 GNUNET_MQ_destroy (grp->mq);
727 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
730 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
735 * Connect to service as origin.
738 origin_connect (struct GNUNET_MULTICAST_Origin *orig)
740 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
742 GNUNET_MQ_hd_var_size (group_message,
743 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
744 struct GNUNET_MULTICAST_MessageHeader);
746 GNUNET_MQ_hd_var_size (origin_request,
747 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
748 struct GNUNET_MULTICAST_RequestHeader);
750 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
751 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
752 struct GNUNET_MessageHeader);
754 GNUNET_MQ_hd_var_size (group_join_request,
755 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
756 struct MulticastJoinRequestMessage);
758 GNUNET_MQ_hd_fixed_size (group_replay_request,
759 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
760 struct MulticastReplayRequestMessage);
762 struct GNUNET_MQ_MessageHandler handlers[] = {
763 make_group_message_handler (grp),
764 make_origin_request_handler (orig),
765 make_group_fragment_ack_handler (grp),
766 make_group_join_request_handler (grp),
767 make_group_replay_request_handler (grp),
768 GNUNET_MQ_handler_end ()
771 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
772 handlers, origin_disconnected, orig);
778 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
783 * Start a multicast group.
785 * Will advertise the origin in the P2P overlay network under the respective
786 * public key so that other peer can find this peer to join it. Peers that
787 * issue GNUNET_MULTICAST_member_join() can then transmit a join request to
788 * either an existing group member or to the origin. If the joining is
789 * approved, the member is cleared for @e replay and will begin to receive
790 * messages transmitted to the group. If joining is disapproved, the failed
791 * candidate will be given a response. Members in the group can send messages
792 * to the origin (one at a time).
795 * Configuration to use.
797 * ECC key that will be used to sign messages for this
798 * multicast session; public key is used to identify the multicast group;
799 * @param max_fragment_id
800 * Maximum fragment ID already sent to the group.
802 * @param join_request_cb
803 * Function called to approve / disapprove joining of a peer.
804 * @param replay_frag_cb
805 * Function that can be called to replay a message fragment.
806 * @param replay_msg_cb
807 * Function that can be called to replay a message.
809 * Function called with message fragments from group members.
811 * Function called with the message fragments sent to the
812 * network by GNUNET_MULTICAST_origin_to_all(). These message fragments
813 * should be stored for answering replay requests later.
815 * Closure for the various callbacks that follow.
817 * @return Handle for the origin, NULL on error.
819 struct GNUNET_MULTICAST_Origin *
820 GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
821 const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
822 uint64_t max_fragment_id,
823 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
824 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
825 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
826 GNUNET_MULTICAST_RequestCallback request_cb,
827 GNUNET_MULTICAST_MessageCallback message_cb,
830 struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
831 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
833 struct MulticastOriginStartMessage *start;
834 grp->connect_env = GNUNET_MQ_msg (start,
835 GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
836 start->max_fragment_id = max_fragment_id;
837 GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
839 grp->is_origin = GNUNET_YES;
843 grp->join_req_cb = join_request_cb;
844 grp->replay_frag_cb = replay_frag_cb;
845 grp->replay_msg_cb = replay_msg_cb;
846 grp->message_cb = message_cb;
848 orig->request_cb = request_cb;
850 origin_connect (orig);
856 * Stop a multicast group.
859 * Multicast group to stop.
862 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
863 GNUNET_ContinuationCallback stop_cb,
866 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
868 grp->is_disconnecting = GNUNET_YES;
869 grp->disconnect_cb = stop_cb;
870 grp->disconnect_cls = stop_cls;
872 // FIXME: wait till queued messages are sent
875 GNUNET_MQ_destroy (grp->mq);
878 origin_cleanup (orig);
883 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
885 LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
886 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
887 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
888 GNUNET_assert (GNUNET_YES == grp->in_transmit);
890 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
891 struct GNUNET_MULTICAST_MessageHeader *msg;
892 struct GNUNET_MQ_Envelope *
893 env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
894 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
896 int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
898 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
899 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
901 LOG (GNUNET_ERROR_TYPE_ERROR,
902 "%p OriginTransmitNotify() returned error or invalid message size.\n",
904 /* FIXME: handle error */
909 if (GNUNET_NO == ret && 0 == buf_size)
911 LOG (GNUNET_ERROR_TYPE_DEBUG,
912 "%p OriginTransmitNotify() - transmission paused.\n", orig);
914 return; /* Transmission paused. */
917 msg->header.size = htons (sizeof (*msg) + buf_size);
918 msg->message_id = GNUNET_htonll (tmit->message_id);
919 msg->group_generation = tmit->group_generation;
920 msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
921 tmit->fragment_offset += sizeof (*msg) + buf_size;
924 GNUNET_MQ_send (grp->mq, env);
926 if (GNUNET_YES == ret)
927 grp->in_transmit = GNUNET_NO;
932 * Send a message to the multicast group.
935 * Handle to the multicast group.
937 * Application layer ID for the message. Opaque to multicast.
938 * @param group_generation
939 * Group generation of the message.
940 * Documented in struct GNUNET_MULTICAST_MessageHeader.
942 * Function to call to get the message.
944 * Closure for @a notify.
946 * @return Message handle on success,
947 * NULL on error (i.e. another request is already pending).
949 struct GNUNET_MULTICAST_OriginTransmitHandle *
950 GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
952 uint64_t group_generation,
953 GNUNET_MULTICAST_OriginTransmitNotify notify,
956 struct GNUNET_MULTICAST_Group *grp = &orig->grp;
957 if (GNUNET_YES == grp->in_transmit)
959 grp->in_transmit = GNUNET_YES;
961 struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
963 tmit->message_id = message_id;
964 tmit->fragment_offset = 0;
965 tmit->group_generation = group_generation;
966 tmit->notify = notify;
967 tmit->notify_cls = notify_cls;
969 origin_to_all (orig);
975 * Resume message transmission to multicast group.
978 * Transmission to cancel.
981 GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
983 struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
984 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
986 origin_to_all (th->origin);
991 * Cancel request for message transmission to multicast group.
994 * Transmission to cancel.
997 GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
999 th->origin->grp.in_transmit = GNUNET_NO;
1004 member_connect (struct GNUNET_MULTICAST_Member *mem);
1008 member_reconnect (void *cls)
1010 member_connect (cls);
1015 * Member client disconnected from service.
1017 * Reconnect after backoff period.
1020 member_disconnected (void *cls, enum GNUNET_MQ_Error error)
1022 struct GNUNET_MULTICAST_Member *mem = cls;
1023 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1025 LOG (GNUNET_ERROR_TYPE_DEBUG,
1026 "Member client disconnected (%d), re-connecting\n",
1028 GNUNET_MQ_destroy (grp->mq);
1031 grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
1034 grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
1039 * Connect to service as member.
1042 member_connect (struct GNUNET_MULTICAST_Member *mem)
1044 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1046 GNUNET_MQ_hd_var_size (group_message,
1047 GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
1048 struct GNUNET_MULTICAST_MessageHeader);
1050 GNUNET_MQ_hd_fixed_size (group_fragment_ack,
1051 GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
1052 struct GNUNET_MessageHeader);
1054 GNUNET_MQ_hd_var_size (group_join_request,
1055 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
1056 struct MulticastJoinRequestMessage);
1058 GNUNET_MQ_hd_var_size (member_join_decision,
1059 GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
1060 struct MulticastJoinDecisionMessageHeader);
1062 GNUNET_MQ_hd_fixed_size (group_replay_request,
1063 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
1064 struct MulticastReplayRequestMessage);
1066 GNUNET_MQ_hd_var_size (member_replay_response,
1067 GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
1068 struct MulticastReplayResponseMessage);
1070 struct GNUNET_MQ_MessageHandler handlers[] = {
1071 make_group_message_handler (grp),
1072 make_group_fragment_ack_handler (grp),
1073 make_group_join_request_handler (grp),
1074 make_member_join_decision_handler (mem),
1075 make_group_replay_request_handler (grp),
1076 make_member_replay_response_handler (mem),
1077 GNUNET_MQ_handler_end ()
1080 grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
1081 handlers, member_disconnected, mem);
1082 if (NULL == grp->mq)
1087 GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
1092 * Join a multicast group.
1094 * The entity joining is always the local peer. Further information about the
1095 * candidate can be provided in the @a join_request message. If the join fails, the
1096 * @a message_cb is invoked with a (failure) response and then with NULL. If
1097 * the join succeeds, outstanding (state) messages and ongoing multicast
1098 * messages will be given to the @a message_cb until the member decides to part
1099 * the group. The @a replay_cb function may be called at any time by the
1100 * multicast service to support relaying messages to other members of the group.
1103 * Configuration to use.
1105 * ECC public key that identifies the group to join.
1107 * ECC key that identifies the member
1108 * and used to sign requests sent to the origin.
1110 * Peer ID of the origin to send unicast requsets to. If NULL,
1111 * unicast requests are sent back via multiple hops on the reverse path
1112 * of multicast messages.
1113 * @param relay_count
1114 * Number of peers in the @a relays array.
1116 * Peer identities of members of the group, which serve as relays
1117 * and can be used to join the group at. and send the @a join_request to.
1118 * If empty, the @a join_request is sent directly to the @a origin.
1120 * Application-dependent join message to be passed to the peer @a origin.
1121 * @param join_request_cb
1122 * Function called to approve / disapprove joining of a peer.
1123 * @param join_decision_cb
1124 * Function called to inform about the join decision.
1125 * @param replay_frag_cb
1126 * Function that can be called to replay message fragments
1127 * this peer already knows from this group. NULL if this
1128 * client is unable to support replay.
1129 * @param replay_msg_cb
1130 * Function that can be called to replay message fragments
1131 * this peer already knows from this group. NULL if this
1132 * client is unable to support replay.
1134 * Function to be called for all message fragments we
1135 * receive from the group, excluding those our @a replay_cb
1138 * Closure for callbacks.
1140 * @return Handle for the member, NULL on error.
1142 struct GNUNET_MULTICAST_Member *
1143 GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
1144 const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
1145 const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
1146 const struct GNUNET_PeerIdentity *origin,
1147 uint16_t relay_count,
1148 const struct GNUNET_PeerIdentity *relays,
1149 const struct GNUNET_MessageHeader *join_msg,
1150 GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
1151 GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
1152 GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
1153 GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
1154 GNUNET_MULTICAST_MessageCallback message_cb,
1157 struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
1158 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1160 uint16_t relay_size = relay_count * sizeof (*relays);
1161 uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
1162 struct MulticastMemberJoinMessage *join;
1163 grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
1164 GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
1165 join->group_pub_key = *group_pub_key;
1166 join->member_key = *member_key;
1167 join->origin = *origin;
1168 join->relay_count = ntohl (relay_count);
1170 GNUNET_memcpy (&join[1], relays, relay_size);
1171 if (0 < join_msg_size)
1172 GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
1174 grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
1175 grp->is_origin = GNUNET_NO;
1178 mem->join_dcsn_cb = join_decision_cb;
1179 grp->join_req_cb = join_request_cb;
1180 grp->replay_frag_cb = replay_frag_cb;
1181 grp->replay_msg_cb = replay_msg_cb;
1182 grp->message_cb = message_cb;
1185 member_connect (mem);
1191 * Part a multicast group.
1193 * Disconnects from all group members and invalidates the @a member handle.
1195 * An application-dependent part message can be transmitted beforehand using
1196 * #GNUNET_MULTICAST_member_to_origin())
1199 * Membership handle.
1202 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
1203 GNUNET_ContinuationCallback part_cb,
1206 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
1207 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1209 grp->is_disconnecting = GNUNET_YES;
1210 grp->disconnect_cb = part_cb;
1211 grp->disconnect_cls = part_cls;
1213 mem->join_dcsn_cb = NULL;
1214 grp->join_req_cb = NULL;
1215 grp->message_cb = NULL;
1216 grp->replay_msg_cb = NULL;
1217 grp->replay_frag_cb = NULL;
1219 // FIXME: wait till queued messages are sent
1220 if (NULL != grp->mq)
1222 GNUNET_MQ_destroy (grp->mq);
1225 member_cleanup (mem);
1230 member_replay_request (struct GNUNET_MULTICAST_Member *mem,
1231 uint64_t fragment_id,
1232 uint64_t message_id,
1233 uint64_t fragment_offset,
1236 struct MulticastReplayRequestMessage *rep;
1237 struct GNUNET_MQ_Envelope *
1238 env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
1240 rep->fragment_id = GNUNET_htonll (fragment_id);
1241 rep->message_id = GNUNET_htonll (message_id);
1242 rep->fragment_offset = GNUNET_htonll (fragment_offset);
1243 rep->flags = GNUNET_htonll (flags);
1245 GNUNET_MQ_send (mem->grp.mq, env);
1250 * Request a fragment to be replayed by fragment ID.
1252 * Useful if messages below the @e max_known_fragment_id given when joining are
1253 * needed and not known to the client.
1256 * Membership handle.
1257 * @param fragment_id
1258 * ID of a message fragment that this client would like to see replayed.
1260 * Additional flags for the replay request.
1261 * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
1263 * @return Replay request handle.
1265 struct GNUNET_MULTICAST_MemberReplayHandle *
1266 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
1267 uint64_t fragment_id,
1270 member_replay_request (mem, fragment_id, 0, 0, flags);
1271 // FIXME: return something useful
1277 * Request a message fragment to be replayed.
1279 * Useful if messages below the @e max_known_fragment_id given when joining are
1280 * needed and not known to the client.
1283 * Membership handle.
1285 * ID of the message this client would like to see replayed.
1286 * @param fragment_offset
1287 * Offset of the fragment within the message to replay.
1289 * Additional flags for the replay request.
1290 * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
1292 * @return Replay request handle, NULL on error.
1294 struct GNUNET_MULTICAST_MemberReplayHandle *
1295 GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
1296 uint64_t message_id,
1297 uint64_t fragment_offset,
1300 member_replay_request (mem, 0, message_id, fragment_offset, flags);
1301 // FIXME: return something useful
1307 member_to_origin (struct GNUNET_MULTICAST_Member *mem)
1309 LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
1310 struct GNUNET_MULTICAST_Group *grp = &mem->grp;
1311 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1312 GNUNET_assert (GNUNET_YES == grp->in_transmit);
1314 size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
1315 struct GNUNET_MULTICAST_RequestHeader *req;
1316 struct GNUNET_MQ_Envelope *
1317 env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
1318 GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
1320 int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
1322 if (! (GNUNET_YES == ret || GNUNET_NO == ret)
1323 || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
1325 LOG (GNUNET_ERROR_TYPE_ERROR,
1326 "MemberTransmitNotify() returned error or invalid message size. "
1327 "ret=%d, buf_size=%u\n", ret, buf_size);
1328 /* FIXME: handle error */
1333 if (GNUNET_NO == ret && 0 == buf_size)
1335 /* Transmission paused. */
1340 req->header.size = htons (sizeof (*req) + buf_size);
1341 req->request_id = GNUNET_htonll (tmit->request_id);
1342 req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
1343 tmit->fragment_offset += sizeof (*req) + buf_size;
1345 GNUNET_MQ_send (grp->mq, env);
1347 if (GNUNET_YES == ret)
1348 grp->in_transmit = GNUNET_NO;
1353 * Send a message to the origin of the multicast group.
1356 * Membership handle.
1358 * Application layer ID for the request. Opaque to multicast.
1360 * Callback to call to get the message.
1362 * Closure for @a notify.
1364 * @return Handle to cancel request, NULL on error (i.e. request already pending).
1366 struct GNUNET_MULTICAST_MemberTransmitHandle *
1367 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
1368 uint64_t request_id,
1369 GNUNET_MULTICAST_MemberTransmitNotify notify,
1372 if (GNUNET_YES == mem->grp.in_transmit)
1374 mem->grp.in_transmit = GNUNET_YES;
1376 struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
1378 tmit->request_id = request_id;
1379 tmit->fragment_offset = 0;
1380 tmit->notify = notify;
1381 tmit->notify_cls = notify_cls;
1383 member_to_origin (mem);
1389 * Resume message transmission to origin.
1392 * Transmission to cancel.
1395 GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1397 struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
1398 if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
1400 member_to_origin (th->member);
1405 * Cancel request for message transmission to origin.
1408 * Transmission to cancel.
1411 GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
1413 th->member->grp.in_transmit = GNUNET_NO;
1417 /* end of multicast_api.c */