*/
struct GNUNET_MQ_Handle *mq;
+ /**
+ * Message to send on connect.
+ */
+ struct GNUNET_MQ_Envelope *connect_env;
+
/**
* Time to wait until we try to reconnect on failure.
*/
*/
struct GNUNET_SCHEDULER_Task *reconnect_task;
- /**
- * Message to send on connect.
- */
- struct GNUNET_MQ_Envelope *connect_env;
-
GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
static void
group_cleanup (struct GNUNET_MULTICAST_Group *grp)
{
- GNUNET_free (grp->connect_env);
+ if (NULL != grp->connect_env)
+ {
+ GNUNET_MQ_discard (grp->connect_env);
+ grp->connect_env = NULL;
+ }
+ if (NULL != grp->mq)
+ {
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+ }
if (NULL != grp->disconnect_cb)
+ {
grp->disconnect_cb (grp->disconnect_cls);
+ grp->disconnect_cb = NULL;
+ }
+ GNUNET_free (grp);
}
static void
-origin_cleanup (void *cls)
+group_disconnect (struct GNUNET_MULTICAST_Group *grp,
+ GNUNET_ContinuationCallback cb,
+ void *cls)
{
- struct GNUNET_MULTICAST_Origin *orig = cls;
- group_cleanup (&orig->grp);
- GNUNET_free (orig);
-}
-
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = cb;
+ grp->disconnect_cls = cls;
-static void
-member_cleanup (void *cls)
-{
- struct GNUNET_MULTICAST_Member *mem = cls;
- group_cleanup (&mem->grp);
- GNUNET_free (mem);
+ if (NULL != grp->mq)
+ {
+ struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
+ if (NULL != last)
+ {
+ GNUNET_MQ_notify_sent (last,
+ (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
+ }
+ else
+ {
+ group_cleanup (grp);
+ }
+ }
+ else
+ {
+ group_cleanup (grp);
+ }
}
}
grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
- &origin_reconnect,
+ origin_reconnect,
orig);
grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
}
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
- GNUNET_MQ_hd_var_size (group_message,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- struct GNUNET_MULTICAST_MessageHeader);
-
- GNUNET_MQ_hd_var_size (origin_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
- struct GNUNET_MULTICAST_RequestHeader);
-
- GNUNET_MQ_hd_fixed_size (group_fragment_ack,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- struct GNUNET_MessageHeader);
-
- GNUNET_MQ_hd_var_size (group_join_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- struct MulticastJoinRequestMessage);
-
- GNUNET_MQ_hd_fixed_size (group_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage);
-
struct GNUNET_MQ_MessageHandler handlers[] = {
- make_group_message_handler (grp),
- make_origin_request_handler (orig),
- make_group_fragment_ack_handler (grp),
- make_group_join_request_handler (grp),
- make_group_replay_request_handler (grp),
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (origin_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ struct GNUNET_MULTICAST_RequestHeader,
+ orig),
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage,
+ grp),
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
GNUNET_MQ_handler_end ()
};
grp->connect_env = GNUNET_MQ_msg (start,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
start->max_fragment_id = max_fragment_id;
- GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
+ start->group_key = *priv_key;
- grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
+ grp->is_origin = GNUNET_YES;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
grp->cb_cls = cls;
grp->join_req_cb = join_request_cb;
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = stop_cb;
- grp->disconnect_cls = stop_cls;
-
- // FIXME: wait till queued messages are sent
- if (NULL != grp->mq)
- {
- GNUNET_MQ_destroy (grp->mq);
- grp->mq = NULL;
- }
- origin_cleanup (orig);
+ group_disconnect (grp, stop_cb, stop_cls);
}
grp->mq = NULL;
grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
- &member_reconnect,
+ member_reconnect,
mem);
grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
}
{
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
- GNUNET_MQ_hd_var_size (group_message,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- struct GNUNET_MULTICAST_MessageHeader);
-
- GNUNET_MQ_hd_fixed_size (group_fragment_ack,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- struct GNUNET_MessageHeader);
-
- GNUNET_MQ_hd_var_size (group_join_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- struct MulticastJoinRequestMessage);
-
- GNUNET_MQ_hd_var_size (member_join_decision,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
- struct MulticastJoinDecisionMessageHeader);
-
- GNUNET_MQ_hd_fixed_size (group_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage);
-
- GNUNET_MQ_hd_var_size (member_replay_response,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
- struct MulticastReplayResponseMessage);
-
struct GNUNET_MQ_MessageHandler handlers[] = {
- make_group_message_handler (grp),
- make_group_fragment_ack_handler (grp),
- make_group_join_request_handler (grp),
- make_member_join_decision_handler (mem),
- make_group_replay_request_handler (grp),
- make_member_replay_response_handler (mem),
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage,
+ grp),
+ GNUNET_MQ_hd_var_size (member_join_decision,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ struct MulticastJoinDecisionMessageHeader,
+ mem),
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
+ GNUNET_MQ_hd_var_size (member_replay_response,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+ struct MulticastReplayResponseMessage,
+ mem),
GNUNET_MQ_handler_end ()
};
if (0 < join_msg_size)
GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
- grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
+ grp->is_origin = GNUNET_NO;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
mem->join_dcsn_cb = join_decision_cb;
grp->join_req_cb = join_request_cb;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = part_cb;
- grp->disconnect_cls = part_cls;
-
mem->join_dcsn_cb = NULL;
grp->join_req_cb = NULL;
grp->message_cb = NULL;
grp->replay_msg_cb = NULL;
grp->replay_frag_cb = NULL;
- // FIXME: wait till queued messages are sent
- if (NULL != grp->mq)
- {
- GNUNET_MQ_destroy (grp->mq);
- grp->mq = NULL;
- }
- member_cleanup (mem);
+ group_disconnect (grp, part_cb, part_cls);
}