/*
This file is part of GNUnet.
- (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2012, 2013 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ Boston, MA 02110-1301, USA.
*/
/**
* @file multicast/multicast_api.c
- * @brief multicast service; establish tunnels to distant peers
+ * @brief Multicast service; implements multicast groups using CADET connections.
* @author Christian Grothoff
* @author Gabor X Toth
*/
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_multicast_service.h"
#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
-/**
- * Started origins.
- * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
- */
-static struct GNUNET_CONTAINER_MultiHashMap *origins;
-
-/**
- * Joined members.
- * group_key_hash -> struct GNUNET_MULTICAST_Member
- */
-static struct GNUNET_CONTAINER_MultiHashMap *members;
-
-
-struct MessageQueue
-{
- struct MessageQueue *prev;
- struct MessageQueue *next;
-};
-
-
/**
* Handle for a request to send a message to all multicast group members
* (from the origin).
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Currently pending transmission request, or NULL for none.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * Head of operations to transmit.
- */
- struct MessageQueue *tmit_head;
-
- /**
- * Tail of operations to transmit.
- */
- struct MessageQueue *tmit_tail;
-
- /**
- * Message being transmitted to the Multicast service.
- */
- struct MessageQueue *tmit_msg;
+ struct GNUNET_CLIENT_MANAGER_Connection *client;
/**
* Message to send on reconnect.
*/
- struct GNUNET_MessageHeader *reconnect_msg;
-
- /**
- * Task doing exponential back-off trying to reconnect.
- */
- GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
- /**
- * Time for next connect retry.
- */
- struct GNUNET_TIME_Relative reconnect_delay;
-
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
+ struct GNUNET_MessageHeader *connect_msg;
- GNUNET_MULTICAST_JoinCallback join_cb;
- GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
+ GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
GNUNET_MULTICAST_MessageCallback message_cb;
void *cb_cls;
/**
- * Are we polling for incoming messages right now?
+ * Function called after disconnected from the service.
*/
- uint8_t in_receive;
+ GNUNET_ContinuationCallback disconnect_cb;
+
+ /**
+ * Closure for @a disconnect_cb.
+ */
+ void *disconnect_cls;
/**
* Are we currently transmitting a message?
*/
uint8_t in_transmit;
+ /**
+ * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+ */
+ uint8_t acks_pending;
+
/**
* Is this the origin or a member?
*/
uint8_t is_origin;
+
+ /**
+ * Is this channel in the process of disconnecting from the service?
+ * #GNUNET_YES or #GNUNET_NO
+ */
+ uint8_t is_disconnecting;
};
{
struct GNUNET_MULTICAST_Group grp;
struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
GNUNET_MULTICAST_RequestCallback request_cb;
};
struct GNUNET_MULTICAST_Group grp;
struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
- struct GNUNET_PeerIdentity origin;
- struct GNUNET_PeerIdentity relays;
- uint32_t relay_count;
+ GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
- struct GNUNET_MessageHeader *join_request;
+ /**
+ * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
uint64_t next_fragment_id;
};
/**
* Handle that identifies a join request.
*
- * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
+ * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
* corresponding calls to #GNUNET_MULTICAST_join_decision().
*/
struct GNUNET_MULTICAST_JoinHandle
struct GNUNET_MULTICAST_Group *group;
/**
- * Public key of the joining member.
+ * Public key of the member requesting join.
*/
- struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+ struct GNUNET_CRYPTO_EcdsaPublicKey member_pub_key;
/**
- * Peer identity of the joining member.
+ * Peer identity of the member requesting join.
*/
- struct GNUNET_PeerIdentity member_peer;
-};
-
-
-/**
- * Handle to pass back for the answer of a membership test.
- */
-struct GNUNET_MULTICAST_MembershipTestHandle
-{
+ struct GNUNET_PeerIdentity peer;
};
*/
struct GNUNET_MULTICAST_ReplayHandle
{
+ struct GNUNET_MULTICAST_Group *grp;
+ struct MulticastReplayRequestMessage req;
};
static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp PSYC channel handle
+ * Send first message to the service after connecting.
*/
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp);
+group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
+{
+ uint16_t cmsg_size = ntohs (grp->connect_msg->size);
+ struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
+ memcpy (cmsg, grp->connect_msg, cmsg_size);
+ GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
+ GNUNET_free (cmsg);
+}
+/**
+ * Got disconnected from service. Reconnect.
+ */
static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
+group_recv_disconnect (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ GNUNET_CLIENT_MANAGER_reconnect (client);
+ group_send_connect_msg (grp);
+}
/**
- * Reschedule a connect attempt to the service.
- *
- * @param c channel to reconnect
+ * Receive join request from service.
*/
static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
+group_recv_join_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+ struct GNUNET_MULTICAST_Group *grp;
+ const struct MulticastJoinRequestMessage *jreq;
+ struct GNUNET_MULTICAST_JoinHandle *jh;
+ const struct GNUNET_MessageHeader *jmsg;
- if (NULL != grp->th)
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ if (NULL == grp)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
+ GNUNET_break (0);
+ return;
}
- grp->in_receive = GNUNET_NO;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to Multicast service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES));
- grp->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
- grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+ if (NULL == grp->join_req_cb)
+ return;
+ /* FIXME: this fails to check that 'msg' is well-formed! */
+ jreq = (const struct MulticastJoinRequestMessage *) msg;
+ if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
+ jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
+ else
+ jmsg = NULL;
+ jh = GNUNET_malloc (sizeof (*jh));
+ jh->group = grp;
+ jh->member_pub_key = jreq->member_pub_key;
+ jh->peer = jreq->peer;
+ grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
}
/**
- * Reset stored data related to the last received message.
+ * Receive multicast message from service.
*/
static void
-recv_reset (struct GNUNET_MULTICAST_Group *grp)
+group_recv_message (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
-}
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_MessageHeader *
+ mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
-static void
-recv_error (struct GNUNET_MULTICAST_Group *grp)
-{
- if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling message callback with a message of size %u.\n",
+ ntohs (mmsg->header.size));
- recv_reset (grp);
+ if (NULL != grp->message_cb)
+ grp->message_cb (grp->cb_cls, mmsg);
}
/**
- * Transmit next message to service.
- *
- * @param cls The struct GNUNET_MULTICAST_Group.
- * @param size Number of bytes available in @a buf.
- * @param buf Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
+ * Receive message/request fragment acknowledgement from service.
*/
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
+static void
+group_recv_fragment_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
- struct GNUNET_MULTICAST_Group *grp = cls;
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return 0;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- size_t ret = ntohs (qmsg->size);
- grp->th = NULL;
- if (ret > size)
- {
- reschedule_connect (grp);
- return 0;
- }
- memcpy (buf, qmsg, ret);
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
- GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
- GNUNET_free (mq);
-
- if (NULL != grp->tmit_head)
- transmit_next (grp);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+ grp, grp->in_transmit, grp->acks_pending);
- if (GNUNET_NO == grp->in_receive)
+ if (0 == grp->acks_pending)
{
- grp->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Ignoring extraneous fragment ACK.\n", grp);
+ return;
}
- return ret;
-}
+ grp->acks_pending--;
+ if (GNUNET_YES != grp->in_transmit)
+ return;
+
+ if (GNUNET_YES == grp->is_origin)
+ origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+ else
+ member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp Multicast group handle.
+ * Origin receives uniquest request from a member.
*/
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp)
+origin_recv_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
- if (NULL != grp->th || NULL == grp->client)
- return;
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Origin *
+ orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &orig->grp;
+ struct GNUNET_MULTICAST_RequestHeader *
+ req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-
- grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
- ntohs (qmsg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &send_next_message,
- grp);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling request callback with a request of size %u.\n",
+ ntohs (req->header.size));
+
+ if (NULL != orig->request_cb)
+ orig->request_cb (grp->cb_cls, req);
}
/**
- * Try again to connect to the Multicast service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
+ * Receive multicast replay request from service.
*/
static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+group_recv_replay_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MULTICAST_Group *grp = cls;
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct MulticastReplayRequestMessage *
+ rep = (struct MulticastReplayRequestMessage *) msg;
- recv_reset (grp);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to Multicast service.\n");
- GNUNET_assert (NULL == grp->client);
- grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
- GNUNET_assert (NULL != grp->client);
- uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
-
- if (NULL == grp->tmit_head ||
- 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
+
+ if (0 != rep->fragment_id)
{
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
- memcpy (&mq[1], grp->reconnect_msg, reconn_size);
- GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
+ if (NULL != grp->replay_frag_cb)
+ {
+ struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+ rh->grp = grp;
+ rh->req = *rep;
+ grp->replay_frag_cb (grp->cb_cls, &rep->member_pub_key,
+ GNUNET_ntohll (rep->fragment_id),
+ GNUNET_ntohll (rep->flags), rh);
+ }
+ }
+ else if (0 != rep->message_id)
+ {
+ if (NULL != grp->replay_msg_cb)
+ {
+ struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+ rh->grp = grp;
+ rh->req = *rep;
+ grp->replay_msg_cb (grp->cb_cls, &rep->member_pub_key,
+ GNUNET_ntohll (rep->message_id),
+ GNUNET_ntohll (rep->fragment_offset),
+ GNUNET_ntohll (rep->flags), rh);
+ }
}
- transmit_next (grp);
}
/**
- * Disconnect from the Multicast service.
- *
- * @param g Group handle to disconnect.
+ * Receive multicast replay request from service.
*/
static void
-disconnect (void *g)
+member_recv_replay_response (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MULTICAST_Group *grp = g;
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Member *
+ mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &mem->grp;
+ struct MulticastReplayResponseMessage *
+ res = (struct MulticastReplayResponseMessage *) msg;
+
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
- GNUNET_assert (NULL != grp);
- if (grp->tmit_head != grp->tmit_tail)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Disconnecting while there are still outstanding messages!\n");
- GNUNET_break (0);
- }
- if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (grp->reconnect_task);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != grp->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
- }
- if (NULL != grp->reconnect_msg)
- {
- GNUNET_free (grp->reconnect_msg);
- grp->reconnect_msg = NULL;
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
}
-
/**
- * Iterator callback for calling message callbacks for all groups.
+ * Member receives join decision.
*/
-static int
-message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
+static void
+member_recv_join_decision (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_MessageHeader *msg = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Member *
+ mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &mem->grp;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling message callback with a message "
- "of type %u and size %u.\n",
- ntohs (msg->type), ntohs (msg->size));
+ const struct MulticastJoinDecisionMessageHeader *
+ hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
+ const struct MulticastJoinDecisionMessage *
+ dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
- if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, msg);
+ uint16_t dcsn_size = ntohs (dcsn->header.size);
+ int is_admitted = ntohl (dcsn->is_admitted);
- return GNUNET_YES;
-}
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Member got join decision from multicast: %d\n",
+ mem, is_admitted);
+ const struct GNUNET_MessageHeader *join_resp = NULL;
+ uint16_t join_resp_size = 0;
-/**
- * Iterator callback for calling request callbacks of origins.
- */
-static int
-request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin)
-{
- const struct GNUNET_MULTICAST_RequestHeader *req = cls;
- struct GNUNET_MULTICAST_Origin *orig = origin;
+ uint16_t relay_count = ntohl (dcsn->relay_count);
+ const struct GNUNET_PeerIdentity *relays = NULL;
+ uint16_t relay_size = relay_count * sizeof (*relays);
+ if (0 < relay_count)
+ {
+ if (dcsn_size < sizeof (*dcsn) + relay_size)
+ {
+ GNUNET_break_op (0);
+ is_admitted = GNUNET_SYSERR;
+ }
+ else
+ {
+ relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
+ }
+ }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling request callback for a request of type %u and size %u.\n",
- ntohs (req->header.type), ntohs (req->header.size));
+ if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
+ {
+ join_resp = (const struct GNUNET_MessageHeader *) ((char *) &dcsn[1] + relay_size);
+ join_resp_size = ntohs (join_resp->size);
+ }
+ if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received invalid join decision message from multicast: %u < %u + %u + %u\n",
+ dcsn_size , sizeof (*dcsn), relay_size, join_resp_size);
+ GNUNET_break_op (0);
+ is_admitted = GNUNET_SYSERR;
+ }
+
+ if (NULL != mem->join_dcsn_cb)
+ mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
+ relay_count, relays, join_resp);
- orig->request_cb (orig->grp.cb_cls, &req->member_key,
- (const struct GNUNET_MessageHeader *) req, 0);
- return GNUNET_YES;
+ // FIXME:
+ //if (GNUNET_YES != is_admitted)
+ // GNUNET_MULTICAST_member_part (mem);
}
/**
- * Iterator callback for calling join request callbacks of origins.
+ * Message handlers for an origin.
*/
-static int
-join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *group)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
{
- const struct MulticastJoinRequestMessage *req = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
+ { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
- jh->group = grp;
- jh->member_key = req->member_key;
- jh->member_peer = req->member_peer;
+ { group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- const struct GNUNET_MessageHeader *msg = NULL;
- if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
- msg =(const struct GNUNET_MessageHeader *) &req[1];
+ { origin_recv_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
- grp->join_cb (grp->cb_cls, &req->member_key, msg, jh);
- return GNUNET_YES;
-}
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
+ { group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
+
+ { group_recv_replay_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
/**
- * Function called when we receive a message from the service.
- *
- * @param cls struct GNUNET_MULTICAST_Group
- * @param msg Message received, NULL on timeout or fatal error.
+ * Message handlers for a member.
*/
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
{
- struct GNUNET_MULTICAST_Group *grp = cls;
+ { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- if (NULL == msg)
- {
- // timeout / disconnected from service, reconnect
- reschedule_connect (grp);
- return;
- }
+ { group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- uint16_t size_eq = 0;
- uint16_t size_min = 0;
- uint16_t size = ntohs (msg->size);
- uint16_t type = ntohs (msg->type);
+ { group_recv_fragment_ack, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %d and size %u from Multicast service\n",
- type, size);
+ { group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
- break;
+ { member_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
- break;
+ { group_recv_replay_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- size_min = sizeof (struct MulticastJoinRequestMessage);
- break;
+ { member_recv_replay_response, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
- default:
- GNUNET_break_op (0);
- type = 0;
- }
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
- if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size_min <= size)))
- {
- GNUNET_break_op (0);
- type = 0;
- }
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- if (origins != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_cb, (void *) msg);
- if (members != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- message_cb, (void *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- if (GNUNET_YES != grp->is_origin)
- {
- GNUNET_break (0);
- break;
- }
+static void
+group_cleanup (struct GNUNET_MULTICAST_Group *grp)
+{
+ GNUNET_free (grp->connect_msg);
+ if (NULL != grp->disconnect_cb)
+ grp->disconnect_cb (grp->disconnect_cls);
+}
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- request_cb, (void *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- join_request_cb, (void *) msg);
- if (NULL != members)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- join_request_cb, (void *) msg);
- break;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
+static void
+origin_cleanup (void *cls)
+{
+ struct GNUNET_MULTICAST_Origin *orig = cls;
+ group_cleanup (&orig->grp);
+ GNUNET_free (orig);
+}
+
+
+static void
+member_cleanup (void *cls)
+{
+ struct GNUNET_MULTICAST_Member *mem = cls;
+ group_cleanup (&mem->grp);
+ GNUNET_free (mem);
}
* Function to call with the decision made for a join request.
*
* Must be called once and only once in response to an invocation of the
- * #GNUNET_MULTICAST_JoinCallback.
+ * #GNUNET_MULTICAST_JoinRequestCallback.
*
- * @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved
- * @param relay_count Number of relays given.
- * @param relays Array of suggested peers that might be useful relays to use
+ * @param join
+ * Join request handle.
+ * @param is_admitted
+ * #GNUNET_YES if the join is approved,
+ * #GNUNET_NO if it is disapproved,
+ * #GNUNET_SYSERR if we cannot answer the request.
+ * @param relay_count
+ * Number of relays given.
+ * @param relays
+ * Array of suggested peers that might be useful relays to use
* when joining the multicast group (essentially a list of peers that
* are already part of the multicast group and might thus be willing
* to help with routing). If empty, only this local peer (which must
* be the multicast origin) is a good candidate for building the
* multicast tree. Note that it is unnecessary to specify our own
* peer identity in this array.
- * @param join_response Message to send in response to the joining peer;
+ * @param join_resp
+ * Message to send in response to the joining peer;
* can also be used to redirect the peer to a different group at the
* application layer; this response is to be transmitted to the
* peer that issued the request even if admission is denied.
*/
struct GNUNET_MULTICAST_ReplayHandle *
-GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
+GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
int is_admitted,
- unsigned int relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
- const struct GNUNET_MessageHeader *join_response)
+ const struct GNUNET_MessageHeader *join_resp)
{
- GNUNET_free (jh);
- return NULL;
-}
-
+ struct GNUNET_MULTICAST_Group *grp = join->group;
+ uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
+ uint16_t relay_size = relay_count * sizeof (*relays);
-/**
- * Call informing multicast about the decision taken for a membership test.
- *
- * @param mth Handle that was given for the query.
- * @param result #GNUNET_YES if peer was a member, #GNUNET_NO if peer was not a member,
- * #GNUNET_SYSERR if we cannot answer the membership test.
- */
-void
-GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestHandle *mth,
- int result)
-{
+ struct MulticastJoinDecisionMessageHeader * hdcsn;
+ struct MulticastJoinDecisionMessage *dcsn;
+ hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
+ hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
+ hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ hdcsn->member_pub_key = join->member_pub_key;
+ hdcsn->peer = join->peer;
+
+ dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
+ dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
+ dcsn->is_admitted = htonl (is_admitted);
+ dcsn->relay_count = htonl (relay_count);
+ if (0 < relay_size)
+ memcpy (&dcsn[1], relays, relay_size);
+ if (0 < join_resp_size)
+ memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
+
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+ GNUNET_free (hdcsn);
+ GNUNET_free (join);
+ return NULL;
}
/**
* Replay a message fragment for the multicast group.
*
- * @param rh Replay handle identifying which replay operation was requested.
- * @param msg Replayed message fragment, NULL if unknown/error.
- * @param ec Error code.
+ * @param rh
+ * Replay handle identifying which replay operation was requested.
+ * @param msg
+ * Replayed message fragment, NULL if not found / an error occurred.
+ * @param ec
+ * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
+ * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
*/
void
GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
const struct GNUNET_MessageHeader *msg,
enum GNUNET_MULTICAST_ReplayErrorCode ec)
{
+ uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
+ struct MulticastReplayResponseMessage *
+ res = GNUNET_malloc (sizeof (*res) + msg_size);
+ *res = (struct MulticastReplayResponseMessage) {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
+ .size = htons (sizeof (*res) + msg_size),
+ },
+ .fragment_id = rh->req.fragment_id,
+ .message_id = rh->req.message_id,
+ .fragment_offset = rh->req.fragment_offset,
+ .flags = rh->req.flags,
+ .error_code = htonl (ec),
+ };
+
+ if (GNUNET_MULTICAST_REC_OK == ec)
+ {
+ GNUNET_assert (NULL != msg);
+ memcpy (&res[1], msg, msg_size);
+ }
+
+ GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
+ GNUNET_free (res);
+
+ if (GNUNET_MULTICAST_REC_OK != ec)
+ GNUNET_free (rh);
}
*
* Invalidates the replay handle.
*
- * @param rh Replay session to end.
+ * @param rh
+ * Replay session to end.
*/
void
GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
{
+ struct MulticastReplayResponseMessage end = {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
+ .size = htons (sizeof (end)),
+ },
+ .fragment_id = rh->req.fragment_id,
+ .message_id = rh->req.message_id,
+ .fragment_offset = rh->req.fragment_offset,
+ .flags = rh->req.flags,
+ };
+
+ GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+ GNUNET_free (rh);
}
/**
* Replay a message for the multicast group.
*
- * @param rh Replay handle identifying which replay operation was requested.
- * @param notify Function to call to get the message.
- * @param notify_cls Closure for @a notify.
+ * @param rh
+ * Replay handle identifying which replay operation was requested.
+ * @param notify
+ * Function to call to get the message.
+ * @param notify_cls
+ * Closure for @a notify.
*/
void
GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
* candidate will be given a response. Members in the group can send messages
* to the origin (one at a time).
*
- * @param cfg Configuration to use.
- * @param priv_key ECC key that will be used to sign messages for this
+ * @param cfg
+ * Configuration to use.
+ * @param priv_key
+ * ECC key that will be used to sign messages for this
* multicast session; public key is used to identify the multicast group;
- * @param max_fragment_id Maximum fragment ID already sent to the group.
+ * @param max_fragment_id
+ * Maximum fragment ID already sent to the group.
* 0 for a new group.
- * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param member_test_cb Function multicast can use to test group membership.
- * @param replay_frag_cb Function that can be called to replay a message fragment.
- * @param replay_msg_cb Function that can be called to replay a message.
- * @param request_cb Function called with message fragments from group members.
- * @param message_cb Function called with the message fragments sent to the
+ * @param join_request_cb
+ * Function called to approve / disapprove joining of a peer.
+ * @param replay_frag_cb
+ * Function that can be called to replay a message fragment.
+ * @param replay_msg_cb
+ * Function that can be called to replay a message.
+ * @param request_cb
+ * Function called with message fragments from group members.
+ * @param message_cb
+ * Function called with the message fragments sent to the
* network by GNUNET_MULTICAST_origin_to_all(). These message fragments
* should be stored for answering replay requests later.
- * @param cls Closure for the various callbacks that follow.
+ * @param cls
+ * Closure for the various callbacks that follow.
*
* @return Handle for the origin, NULL on error.
*/
GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
uint64_t max_fragment_id,
- GNUNET_MULTICAST_JoinCallback join_cb,
- GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
GNUNET_MULTICAST_RequestCallback request_cb,
start->max_fragment_id = max_fragment_id;
memcpy (&start->group_key, priv_key, sizeof (*priv_key));
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) start;
grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
grp->cb_cls = cls;
- grp->join_cb = join_cb;
- grp->member_test_cb = member_test_cb;
+ grp->join_req_cb = join_request_cb;
grp->replay_frag_cb = replay_frag_cb;
grp->replay_msg_cb = replay_msg_cb;
grp->message_cb = message_cb;
orig->request_cb = request_cb;
- orig->priv_key = *priv_key;
-
- GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
- &grp->pub_key_hash);
-
- if (NULL == origins)
- origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
+ group_send_connect_msg (grp);
return orig;
}
/**
* Stop a multicast group.
*
- * @param origin Multicast group to stop.
+ * @param origin
+ * Multicast group to stop.
*/
void
-GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
+GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
+ GNUNET_ContinuationCallback stop_cb,
+ void *stop_cls)
{
- disconnect (&orig->grp);
- GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig);
- GNUNET_free (orig);
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = stop_cb;
+ grp->disconnect_cls = stop_cls;
+
+ GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES,
+ &origin_cleanup, orig);
}
static void
origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_MessageHeader *
- msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
+ struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "OriginTransmitNotify() returned error or invalid message size.\n");
+ "%p OriginTransmitNotify() returned error or invalid message size.\n",
+ orig);
/* FIXME: handle error */
- GNUNET_free (mq);
+ GNUNET_free (msg);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
{
- GNUNET_free (mq);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p OriginTransmitNotify() - transmission paused.\n", orig);
+ GNUNET_free (msg);
return; /* Transmission paused. */
}
msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*msg) + buf_size;
- transmit_next (grp);
+ grp->acks_pending++;
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+ GNUNET_free (msg);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
/**
* Send a message to the multicast group.
*
- * @param orig Handle to the multicast group.
- * @param message_id Application layer ID for the message. Opaque to multicast.
- * @param group_generation Group generation of the message.
- * Documented in struct GNUNET_MULTICAST_MessageHeader.
- * @param notify Function to call to get the message.
- * @param notify_cls Closure for @a notify.
+ * @param orig
+ * Handle to the multicast group.
+ * @param message_id
+ * Application layer ID for the message. Opaque to multicast.
+ * @param group_generation
+ * Group generation of the message.
+ * Documented in struct GNUNET_MULTICAST_MessageHeader.
+ * @param notify
+ * Function to call to get the message.
+ * @param notify_cls
+ * Closure for @a notify.
*
* @return Message handle on success,
* NULL on error (i.e. another request is already pending).
GNUNET_MULTICAST_OriginTransmitNotify notify,
void *notify_cls)
{
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+ if (GNUNET_YES == grp->in_transmit)
+ return NULL;
+ grp->in_transmit = GNUNET_YES;
+
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
tmit->origin = orig;
tmit->message_id = message_id;
+ tmit->fragment_offset = 0;
tmit->group_generation = group_generation;
tmit->notify = notify;
tmit->notify_cls = notify_cls;
/**
* Resume message transmission to multicast group.
*
- * @param th Transmission to cancel.
+ * @param th
+ * Transmission to cancel.
*/
void
GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
origin_to_all (th->origin);
}
/**
* Cancel request for message transmission to multicast group.
*
- * @param th Transmission to cancel.
+ * @param th
+ * Transmission to cancel.
*/
void
GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHandle *th)
{
+ th->origin->grp.in_transmit = GNUNET_NO;
}
* @a message_cb is invoked with a (failure) response and then with NULL. If
* the join succeeds, outstanding (state) messages and ongoing multicast
* messages will be given to the @a message_cb until the member decides to part
- * the group. The @a test_cb and @a replay_cb functions may be called at
- * anytime by the multicast service to support relaying messages to other
- * members of the group.
+ * the group. The @a replay_cb function may be called at any time by the
+ * multicast service to support relaying messages to other members of the group.
*
- * @param cfg Configuration to use.
- * @param group_key ECC public key that identifies the group to join.
- * @param member_key ECC key that identifies the member and used to sign
- * requests sent to the origin.
- * @param origin Peer ID of the origin to send unicast requsets to. If NULL,
+ * @param cfg
+ * Configuration to use.
+ * @param group_key
+ * ECC public key that identifies the group to join.
+ * @param member_key
+ * ECC key that identifies the member
+ * and used to sign requests sent to the origin.
+ * @param origin
+ * Peer ID of the origin to send unicast requsets to. If NULL,
* unicast requests are sent back via multiple hops on the reverse path
* of multicast messages.
- * @param relay_count Number of peers in the @a relays array.
- * @param relays Peer identities of members of the group, which serve as relays
+ * @param relay_count
+ * Number of peers in the @a relays array.
+ * @param relays
+ * Peer identities of members of the group, which serve as relays
* and can be used to join the group at. and send the @a join_request to.
* If empty, the @a join_request is sent directly to the @a origin.
- * @param join_req Application-dependent join request to be passed to the peer
- * @a relay (might, for example, contain a user, bind user
- * identity/pseudonym to peer identity, application-level message to
- * origin, etc.).
- * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param member_test_cb Function multicast can use to test group membership.
- * @param replay_frag_cb Function that can be called to replay message fragments
+ * @param join_msg
+ * Application-dependent join message to be passed to the peer @a origin.
+ * @param join_request_cb
+ * Function called to approve / disapprove joining of a peer.
+ * @param join_decision_cb
+ * Function called to inform about the join decision.
+ * @param replay_frag_cb
+ * Function that can be called to replay message fragments
* this peer already knows from this group. NULL if this
* client is unable to support replay.
- * @param replay_msg_cb Function that can be called to replay message fragments
+ * @param replay_msg_cb
+ * Function that can be called to replay message fragments
* this peer already knows from this group. NULL if this
* client is unable to support replay.
- * @param message_cb Function to be called for all message fragments we
+ * @param message_cb
+ * Function to be called for all message fragments we
* receive from the group, excluding those our @a replay_cb
* already has.
- * @param cls Closure for callbacks.
+ * @param cls
+ * Closure for callbacks.
+ *
* @return Handle for the member, NULL on error.
*/
struct GNUNET_MULTICAST_Member *
GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
- const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *group_pub_key,
+ const struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key,
const struct GNUNET_PeerIdentity *origin,
- uint32_t relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
- const struct GNUNET_MessageHeader *join_req,
- GNUNET_MULTICAST_JoinCallback join_cb,
- GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
+ const struct GNUNET_MessageHeader *join_msg,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
+ GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
GNUNET_MULTICAST_MessageCallback message_cb,
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
uint16_t relay_size = relay_count * sizeof (*relays);
- uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
+ uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
struct MulticastMemberJoinMessage *
- join = GNUNET_malloc (sizeof (*join) + relay_size + join_req_size);
+ join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
+ join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
- join->header.size = htons (sizeof (*join) + relay_size + join_req_size);
- join->group_key = *group_key;
+ join->group_pub_key = *group_pub_key;
join->member_key = *member_key;
join->origin = *origin;
- memcpy (&join[1], relays, relay_size);
- memcpy (((char *) &join[1]) + relay_size, join_req, join_req_size);
+ join->relay_count = ntohl (relay_count);
+ if (0 < relay_size)
+ memcpy (&join[1], relays, relay_size);
+ if (0 < join_msg_size)
+ memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) join;
grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
- grp->pub_key = *group_key;
- grp->join_cb = join_cb;
- grp->member_test_cb = member_test_cb;
+ mem->join_dcsn_cb = join_decision_cb;
+ grp->join_req_cb = join_request_cb;
grp->replay_frag_cb = replay_frag_cb;
+ grp->replay_msg_cb = replay_msg_cb;
grp->message_cb = message_cb;
grp->cb_cls = cls;
- mem->origin = *origin;
- mem->relay_count = relay_count;
- mem->relays = *relays;
- mem->priv_key = *member_key;
-
- GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
-
- if (NULL == members)
- members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
- GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
+ group_send_connect_msg (grp);
return mem;
}
* An application-dependent part message can be transmitted beforehand using
* #GNUNET_MULTICAST_member_to_origin())
*
- * @param member Membership handle.
+ * @param member
+ * Membership handle.
*/
void
-GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
+GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
+ GNUNET_ContinuationCallback part_cb,
+ void *part_cls)
{
- disconnect (&mem->grp);
- GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
- GNUNET_free (mem);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
+ struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = part_cb;
+ grp->disconnect_cls = part_cls;
+
+ mem->join_dcsn_cb = NULL;
+ grp->join_req_cb = NULL;
+ grp->message_cb = NULL;
+ grp->replay_msg_cb = NULL;
+ grp->replay_frag_cb = NULL;
+
+ GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
+ member_cleanup, mem);
+}
+
+
+void
+member_replay_request (struct GNUNET_MULTICAST_Member *mem,
+ uint64_t fragment_id,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags)
+{
+ struct MulticastReplayRequestMessage rep = {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
+ .size = htons (sizeof (rep)),
+ },
+ .fragment_id = GNUNET_htonll (fragment_id),
+ .message_id = GNUNET_htonll (message_id),
+ .fragment_offset = GNUNET_htonll (fragment_offset),
+ .flags = GNUNET_htonll (flags),
+ };
+ GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
}
* Useful if messages below the @e max_known_fragment_id given when joining are
* needed and not known to the client.
*
- * @param member Membership handle.
- * @param fragment_id ID of a message fragment that this client would like to
- see replayed.
- * @param flags Additional flags for the replay request. It is used and defined
- * by the replay callback. FIXME: which replay callback? FIXME: use enum?
- * FIXME: why not pass reply cb here?
- * @return Replay request handle, NULL on error.
+ * @param member
+ * Membership handle.
+ * @param fragment_id
+ * ID of a message fragment that this client would like to see replayed.
+ * @param flags
+ * Additional flags for the replay request.
+ * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
+ *
+ * @return Replay request handle.
*/
struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
uint64_t fragment_id,
uint64_t flags)
{
- return NULL;
+ member_replay_request (mem, fragment_id, 0, 0, flags);
+ // FIXME: return
}
* Useful if messages below the @e max_known_fragment_id given when joining are
* needed and not known to the client.
*
- * @param member Membership handle.
- * @param message_id ID of the message this client would like to see replayed.
- * @param fragment_offset Offset of the fragment within the message to replay.
- * @param flags Additional flags for the replay request. It is used & defined
- * by the replay callback.
- * @param result_cb Function to be called for the replayed message.
- * @param result_cb_cls Closure for @a result_cb.
+ * @param member
+ * Membership handle.
+ * @param message_id
+ * ID of the message this client would like to see replayed.
+ * @param fragment_offset
+ * Offset of the fragment within the message to replay.
+ * @param flags
+ * Additional flags for the replay request.
+ * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
+ *
* @return Replay request handle, NULL on error.
*/
struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
uint64_t message_id,
uint64_t fragment_offset,
- uint64_t flags,
- GNUNET_MULTICAST_ResultCallback result_cb,
- void *result_cb_cls)
-{
- return NULL;
-}
-
-
-/**
- * Cancel a replay request.
- *
- * @param rh Request to cancel.
- */
-void
-GNUNET_MULTICAST_member_replay_cancel (struct GNUNET_MULTICAST_MemberReplayHandle *rh)
+ uint64_t flags)
{
+ member_replay_request (mem, 0, message_id, fragment_offset, flags);
+ // FIXME: return
}
LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+ GNUNET_assert (GNUNET_YES == grp->in_transmit);
- size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_RequestHeader *
- req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
+ size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+ struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "MemberTransmitNotify() returned error or invalid message size.\n");
+ "MemberTransmitNotify() returned error or invalid message size. "
+ "ret=%d, buf_size=%u\n", ret, buf_size);
/* FIXME: handle error */
+ GNUNET_free (req);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
- return; /* Transmission paused. */
+ {
+ /* Transmission paused. */
+ GNUNET_free (req);
+ return;
+ }
req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
req->header.size = htons (sizeof (*req) + buf_size);
req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*req) + buf_size;
- transmit_next (grp);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+ GNUNET_free (req);
+
+ if (GNUNET_YES == ret)
+ grp->in_transmit = GNUNET_NO;
}
/**
* Send a message to the origin of the multicast group.
*
- * @param mem Membership handle.
- * @param request_id Application layer ID for the request. Opaque to multicast.
- * @param notify Callback to call to get the message.
- * @param notify_cls Closure for @a notify.
+ * @param mem
+ * Membership handle.
+ * @param request_id
+ * Application layer ID for the request. Opaque to multicast.
+ * @param notify
+ * Callback to call to get the message.
+ * @param notify_cls
+ * Closure for @a notify.
+ *
* @return Handle to cancel request, NULL on error (i.e. request already pending).
*/
struct GNUNET_MULTICAST_MemberTransmitHandle *
GNUNET_MULTICAST_MemberTransmitNotify notify,
void *notify_cls)
{
+ if (GNUNET_YES == mem->grp.in_transmit)
+ return NULL;
+ mem->grp.in_transmit = GNUNET_YES;
+
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
tmit->member = mem;
tmit->request_id = request_id;
+ tmit->fragment_offset = 0;
tmit->notify = notify;
tmit->notify_cls = notify_cls;
/**
* Resume message transmission to origin.
*
- * @param th Transmission to cancel.
+ * @param th
+ * Transmission to cancel.
*/
void
GNUNET_MULTICAST_member_to_origin_resume (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
{
-
+ struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+ if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+ return;
+ member_to_origin (th->member);
}
/**
* Cancel request for message transmission to origin.
*
- * @param th Transmission to cancel.
+ * @param th
+ * Transmission to cancel.
*/
void
GNUNET_MULTICAST_member_to_origin_cancel (struct GNUNET_MULTICAST_MemberTransmitHandle *th)
{
+ th->member->grp.in_transmit = GNUNET_NO;
}