#include "platform.h"
#include "gnunet_util_lib.h"
+#include "gnunet_mq_lib.h"
#include "gnunet_multicast_service.h"
#include "multicast.h"
/**
* Client connection to the service.
*/
- struct GNUNET_CLIENT_MANAGER_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
- * Message to send on reconnect.
+ * Time to wait until we try to reconnect on failure.
*/
- struct GNUNET_MessageHeader *connect_msg;
+ struct GNUNET_TIME_Relative reconnect_backoff;
+
+ /**
+ * Task for reconnecting when the listener fails.
+ */
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
+
+ /**
+ * Message to send on connect.
+ */
+ struct GNUNET_MQ_Envelope *connect_env;
GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
/**
- * Send first message to the service after connecting.
+ * Check join request message.
*/
-static void
-group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
+static int
+check_group_join_request (void *cls,
+ const struct MulticastJoinRequestMessage *jreq)
{
- uint16_t cmsg_size = ntohs (grp->connect_msg->size);
- struct GNUNET_MessageHeader *cmsg = GNUNET_malloc (cmsg_size);
- GNUNET_memcpy (cmsg, grp->connect_msg, cmsg_size);
- GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
- GNUNET_free (cmsg);
-}
+ uint16_t size = ntohs (jreq->header.size);
+ if (sizeof (*jreq) == size)
+ return GNUNET_OK;
-/**
- * Got disconnected from service. Reconnect.
- */
-static void
-group_recv_disconnect (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
-{
- struct GNUNET_MULTICAST_Group *
- grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
- GNUNET_CLIENT_MANAGER_reconnect (client);
- group_send_connect_msg (grp);
+ if (sizeof (*jreq) + sizeof (struct GNUNET_MessageHeader) <= size)
+ return GNUNET_OK;
+
+ return GNUNET_SYSERR;
}
* Receive join request from service.
*/
static void
-group_recv_join_request (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_group_join_request (void *cls,
+ const struct MulticastJoinRequestMessage *jreq)
{
- struct GNUNET_MULTICAST_Group *grp;
- const struct MulticastJoinRequestMessage *jreq;
+ struct GNUNET_MULTICAST_Group *grp = cls;
struct GNUNET_MULTICAST_JoinHandle *jh;
- const struct GNUNET_MessageHeader *jmsg;
+ const struct GNUNET_MessageHeader *jmsg = NULL;
- grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
if (NULL == grp)
{
GNUNET_break (0);
}
if (NULL == grp->join_req_cb)
return;
- /* FIXME: this fails to check that 'msg' is well-formed! */
- jreq = (const struct MulticastJoinRequestMessage *) msg;
+
if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
- else
- jmsg = NULL;
+
jh = GNUNET_malloc (sizeof (*jh));
jh->group = grp;
jh->member_pub_key = jreq->member_pub_key;
jh->peer = jreq->peer;
grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
+
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Check multicast message.
+ */
+static int
+check_group_message (void *cls,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+ return GNUNET_OK;
}
* Receive multicast message from service.
*/
static void
-group_recv_message (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_group_message (void *cls,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
{
- struct GNUNET_MULTICAST_Group *
- grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
- struct GNUNET_MULTICAST_MessageHeader *
- mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+ struct GNUNET_MULTICAST_Group *grp = cls;
if (GNUNET_YES == grp->is_disconnecting)
return;
if (NULL != grp->message_cb)
grp->message_cb (grp->cb_cls, mmsg);
+
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
}
* Receive message/request fragment acknowledgement from service.
*/
static void
-group_recv_fragment_ack (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_group_fragment_ack (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MULTICAST_Group *
- grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_Group *grp = cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
else
member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+}
+
+
+/**
+ * Check unicast request.
+ */
+static int
+check_origin_request (void *cls,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+ return GNUNET_OK;
}
+
/**
- * Origin receives uniquest request from a member.
+ * Origin receives unicast request from a member.
*/
static void
-origin_recv_request (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_origin_request (void *cls,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
{
struct GNUNET_MULTICAST_Group *grp;
- struct GNUNET_MULTICAST_Origin *
- orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_Origin *orig = cls;
grp = &orig->grp;
- struct GNUNET_MULTICAST_RequestHeader *
- req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Calling request callback with a request of size %u.\n",
if (NULL != orig->request_cb)
orig->request_cb (grp->cb_cls, req);
+
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
}
* Receive multicast replay request from service.
*/
static void
-group_recv_replay_request (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_group_replay_request (void *cls,
+ const struct MulticastReplayRequestMessage *rep)
+
{
- struct GNUNET_MULTICAST_Group *
- grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
- struct MulticastReplayRequestMessage *
- rep = (struct MulticastReplayRequestMessage *) msg;
+ struct GNUNET_MULTICAST_Group *grp = cls;
if (GNUNET_YES == grp->is_disconnecting)
return;
GNUNET_ntohll (rep->flags), rh);
}
}
+
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
}
/**
- * Receive multicast replay request from service.
+ * Check replay response.
+ */
+static int
+check_member_replay_response (void *cls,
+ const struct MulticastReplayResponseMessage *res)
+{
+ uint16_t size = ntohs (res->header.size);
+
+ if (sizeof (*res) == size)
+ return GNUNET_OK;
+
+ if (sizeof (*res) + sizeof (struct GNUNET_MULTICAST_MessageHeader) <= size)
+ return GNUNET_OK;
+
+ return GNUNET_SYSERR;
+}
+
+
+/**
+ * Receive replay response from service.
*/
static void
-member_recv_replay_response (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_member_replay_response (void *cls,
+ const struct MulticastReplayResponseMessage *res)
{
struct GNUNET_MULTICAST_Group *grp;
- struct GNUNET_MULTICAST_Member *
- mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_Member *mem = cls;
grp = &mem->grp;
- // FIXME: Something is missing here for the code to make sense
- //struct MulticastReplayResponseMessage *
- // res = (struct MulticastReplayResponseMessage *) msg;
+
if (GNUNET_YES == grp->is_disconnecting)
return;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+
+ // FIXME: return result
+}
+
+
+/**
+ * Check join decision.
+ */
+static int
+check_member_join_decision (void *cls,
+ const struct MulticastJoinDecisionMessageHeader *hdcsn)
+{
+ return GNUNET_OK; // checked in handle below
}
+
/**
* Member receives join decision.
*/
static void
-member_recv_join_decision (void *cls,
- struct GNUNET_CLIENT_MANAGER_Connection *client,
- const struct GNUNET_MessageHeader *msg)
+handle_member_join_decision (void *cls,
+ const struct MulticastJoinDecisionMessageHeader *hdcsn)
{
struct GNUNET_MULTICAST_Group *grp;
- struct GNUNET_MULTICAST_Member *
- mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_Member *mem = cls;
grp = &mem->grp;
- const struct MulticastJoinDecisionMessageHeader *
- hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
// FIXME:
//if (GNUNET_YES != is_admitted)
// GNUNET_MULTICAST_member_part (mem);
-}
-
-/**
- * Message handlers for an origin.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
-{
- { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
- { group_recv_message, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
- { origin_recv_request, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
- sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
-
- { group_recv_fragment_ack, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
-
- { group_recv_join_request, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
-
- { group_recv_replay_request, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
- { NULL, NULL, 0, 0, GNUNET_NO }
-};
-
-
-/**
- * Message handlers for a member.
- */
-static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
-{
- { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
-
- { group_recv_message, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
-
- { group_recv_fragment_ack, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
-
- { group_recv_join_request, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
-
- { member_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
- sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
-
- { group_recv_replay_request, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
- { member_recv_replay_response, NULL,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
- sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
-
- { NULL, NULL, 0, 0, GNUNET_NO }
-};
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+}
static void
group_cleanup (struct GNUNET_MULTICAST_Group *grp)
{
- GNUNET_free (grp->connect_msg);
+ GNUNET_free (grp->connect_env);
if (NULL != grp->disconnect_cb)
grp->disconnect_cb (grp->disconnect_cls);
}
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
uint16_t relay_size = relay_count * sizeof (*relays);
- struct MulticastJoinDecisionMessageHeader * hdcsn;
+ struct MulticastJoinDecisionMessageHeader *hdcsn;
struct MulticastJoinDecisionMessage *dcsn;
- hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
- hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
- hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (hdcsn, sizeof (*dcsn) + relay_size + join_resp_size,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
hdcsn->member_pub_key = join->member_pub_key;
hdcsn->peer = join->peer;
if (0 < join_resp_size)
GNUNET_memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
- GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
- GNUNET_free (hdcsn);
+ GNUNET_MQ_send (grp->mq, env);
GNUNET_free (join);
return NULL;
}
enum GNUNET_MULTICAST_ReplayErrorCode ec)
{
uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
- struct MulticastReplayResponseMessage *
- res = GNUNET_malloc (sizeof (*res) + msg_size);
- *res = (struct MulticastReplayResponseMessage) {
- .header = {
- .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
- .size = htons (sizeof (*res) + msg_size),
- },
- .fragment_id = rh->req.fragment_id,
- .message_id = rh->req.message_id,
- .fragment_offset = rh->req.fragment_offset,
- .flags = rh->req.flags,
- .error_code = htonl (ec),
- };
+ struct MulticastReplayResponseMessage *res;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (res, msg_size,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE);
+ res->fragment_id = rh->req.fragment_id;
+ res->message_id = rh->req.message_id;
+ res->fragment_offset = rh->req.fragment_offset;
+ res->flags = rh->req.flags;
+ res->error_code = htonl (ec);
if (GNUNET_MULTICAST_REC_OK == ec)
{
GNUNET_memcpy (&res[1], msg, msg_size);
}
- GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
- GNUNET_free (res);
+ GNUNET_MQ_send (rh->grp->mq, env);
if (GNUNET_MULTICAST_REC_OK != ec)
GNUNET_free (rh);
void
GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
{
- struct MulticastReplayResponseMessage end = {
- .header = {
- .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
- .size = htons (sizeof (end)),
- },
- .fragment_id = rh->req.fragment_id,
- .message_id = rh->req.message_id,
- .fragment_offset = rh->req.fragment_offset,
- .flags = rh->req.flags,
- };
+ struct MulticastReplayResponseMessage *end;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (end, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END);
+
+ end->fragment_id = rh->req.fragment_id;
+ end->message_id = rh->req.message_id;
+ end->fragment_offset = rh->req.fragment_offset;
+ end->flags = rh->req.flags;
- GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+ GNUNET_MQ_send (rh->grp->mq, env);
GNUNET_free (rh);
}
}
+void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig);
+
+
+static void
+origin_reconnect (void *cls)
+{
+ origin_connect (cls);
+}
+
+
+/**
+ * Origin client disconnected from service.
+ *
+ * Reconnect after backoff period.=
+ */
+void
+origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_MULTICAST_Origin *orig = cls;
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Origin client disconnected (%d), re-connecting\n",
+ (int) error);
+ if (NULL != grp->mq)
+ {
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+ }
+
+ grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
+ &origin_reconnect,
+ orig);
+ grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+}
+
+
+/**
+ * Connect to service as origin.
+ */
+void
+origin_connect (struct GNUNET_MULTICAST_Origin *orig)
+{
+ struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader);
+
+ GNUNET_MQ_hd_var_size (origin_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ struct GNUNET_MULTICAST_RequestHeader);
+
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader);
+
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage);
+
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage);
+
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_group_message_handler (grp),
+ make_origin_request_handler (orig),
+ make_group_fragment_ack_handler (grp),
+ make_group_join_request_handler (grp),
+ make_group_replay_request_handler (grp),
+ GNUNET_MQ_handler_end ()
+ };
+
+ grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+ handlers, origin_disconnected, orig);
+ if (NULL == grp->mq)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
/**
* Start a multicast group.
*
{
struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
- struct MulticastOriginStartMessage *start = GNUNET_malloc (sizeof (*start));
- start->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
- start->header.size = htons (sizeof (*start));
+ struct MulticastOriginStartMessage *start;
+ grp->connect_env = GNUNET_MQ_msg (start,
+ GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
start->max_fragment_id = max_fragment_id;
GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
- grp->connect_msg = (struct GNUNET_MessageHeader *) start;
grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
orig->request_cb = request_cb;
- grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
- GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
- group_send_connect_msg (grp);
-
+ origin_connect (orig);
return orig;
}
grp->disconnect_cb = stop_cb;
grp->disconnect_cls = stop_cls;
- GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES,
- &origin_cleanup, orig);
+ // FIXME: wait till queued messages are sent
+ if (NULL != grp->mq)
+ {
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+ }
+ origin_cleanup (orig);
}
GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
- struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
+ struct GNUNET_MULTICAST_MessageHeader *msg;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (msg, buf_size - sizeof(*msg),
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+
int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
"%p OriginTransmitNotify() returned error or invalid message size.\n",
orig);
/* FIXME: handle error */
- GNUNET_free (msg);
+ GNUNET_free (env);
return;
}
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p OriginTransmitNotify() - transmission paused.\n", orig);
- GNUNET_free (msg);
+ GNUNET_free (env);
return; /* Transmission paused. */
}
- msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
msg->header.size = htons (sizeof (*msg) + buf_size);
msg->message_id = GNUNET_htonll (tmit->message_id);
msg->group_generation = tmit->group_generation;
tmit->fragment_offset += sizeof (*msg) + buf_size;
grp->acks_pending++;
- GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
- GNUNET_free (msg);
+ GNUNET_MQ_send (grp->mq, env);
if (GNUNET_YES == ret)
grp->in_transmit = GNUNET_NO;
}
+ void
+member_connect (struct GNUNET_MULTICAST_Member *mem);
+
+
+static void
+member_reconnect (void *cls)
+{
+ member_connect (cls);
+}
+
+
+/**
+ * Member client disconnected from service.
+ *
+ * Reconnect after backoff period.
+ */
+void
+member_disconnected (void *cls, enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_MULTICAST_Member *mem = cls;
+ struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Member client disconnected (%d), re-connecting\n",
+ (int) error);
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+
+ grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
+ &member_reconnect,
+ mem);
+ grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+}
+
+
+/**
+ * Connect to service as member.
+ */
+void
+member_connect (struct GNUNET_MULTICAST_Member *mem)
+{
+ struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader);
+
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader);
+
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage);
+
+ GNUNET_MQ_hd_var_size (member_join_decision,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ struct MulticastJoinDecisionMessageHeader);
+
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage);
+
+ GNUNET_MQ_hd_var_size (member_replay_response,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+ struct MulticastReplayResponseMessage);
+
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_group_message_handler (grp),
+ make_group_fragment_ack_handler (grp),
+ make_group_join_request_handler (grp),
+ make_member_join_decision_handler (mem),
+ make_group_replay_request_handler (grp),
+ make_member_replay_response_handler (mem),
+ GNUNET_MQ_handler_end ()
+ };
+
+ grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
+ handlers, member_disconnected, mem);
+ if (NULL == grp->mq)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
+}
+
+
/**
* Join a multicast group.
*
uint16_t relay_size = relay_count * sizeof (*relays);
uint16_t join_msg_size = (NULL != join_msg) ? ntohs (join_msg->size) : 0;
- struct MulticastMemberJoinMessage *
- join = GNUNET_malloc (sizeof (*join) + relay_size + join_msg_size);
- join->header.size = htons (sizeof (*join) + relay_size + join_msg_size);
- join->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
+ struct MulticastMemberJoinMessage *join;
+ grp->connect_env = GNUNET_MQ_msg_extra (join, relay_size + join_msg_size,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN);
join->group_pub_key = *group_pub_key;
join->member_key = *member_key;
join->origin = *origin;
if (0 < join_msg_size)
GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->connect_msg = (struct GNUNET_MessageHeader *) join;
+ grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
grp->message_cb = message_cb;
grp->cb_cls = cls;
- grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
- GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
- group_send_connect_msg (grp);
-
+ member_connect (mem);
return mem;
}
grp->replay_msg_cb = NULL;
grp->replay_frag_cb = NULL;
- GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
- member_cleanup, mem);
+ // FIXME: wait till queued messages are sent
+ if (NULL != grp->mq)
+ {
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+ }
+ member_cleanup (mem);
}
uint64_t fragment_offset,
uint64_t flags)
{
- struct MulticastReplayRequestMessage rep = {
- .header = {
- .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
- .size = htons (sizeof (rep)),
- },
- .fragment_id = GNUNET_htonll (fragment_id),
- .message_id = GNUNET_htonll (message_id),
- .fragment_offset = GNUNET_htonll (fragment_offset),
- .flags = GNUNET_htonll (flags),
- };
- GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
+ struct MulticastReplayRequestMessage *rep;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (rep, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST);
+
+ rep->fragment_id = GNUNET_htonll (fragment_id);
+ rep->message_id = GNUNET_htonll (message_id);
+ rep->fragment_offset = GNUNET_htonll (fragment_offset);
+ rep->flags = GNUNET_htonll (flags);
+
+ GNUNET_MQ_send (mem->grp.mq, env);
}
GNUNET_assert (GNUNET_YES == grp->in_transmit);
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
- struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
+ struct GNUNET_MULTICAST_RequestHeader *req;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (req, buf_size - sizeof(*req),
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+
int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
return;
}
- req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
req->header.size = htons (sizeof (*req) + buf_size);
req->request_id = GNUNET_htonll (tmit->request_id);
req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*req) + buf_size;
- GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
- GNUNET_free (req);
+ GNUNET_MQ_send (grp->mq, env);
if (GNUNET_YES == ret)
grp->in_transmit = GNUNET_NO;