#include "platform.h"
#include "gnunet_util_lib.h"
-#include "gnunet_mq_lib.h"
#include "gnunet_multicast_service.h"
#include "multicast.h"
struct GNUNET_MQ_Handle *mq;
/**
- * Time to wait until we try to reconnect on failure.
+ * Message to send on connect.
*/
- struct GNUNET_TIME_Relative reconnect_backoff;
+ struct GNUNET_MQ_Envelope *connect_env;
/**
- * Task for reconnecting when the listener fails.
+ * Time to wait until we try to reconnect on failure.
*/
- struct GNUNET_SCHEDULER_Task *reconnect_task;
+ struct GNUNET_TIME_Relative reconnect_delay;
/**
- * Message to send on connect.
+ * Task for reconnecting when the listener fails.
*/
- struct GNUNET_MQ_Envelope *connect_env;
+ struct GNUNET_SCHEDULER_Task *reconnect_task;
GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
jh->peer = jreq->peer;
grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
if (NULL != grp->message_cb)
grp->message_cb (grp->cb_cls, mmsg);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
else
member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
if (NULL != orig->request_cb)
orig->request_cb (grp->cb_cls, req);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
}
}
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
//if (GNUNET_YES != is_admitted)
// GNUNET_MULTICAST_member_part (mem);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
}
static void
group_cleanup (struct GNUNET_MULTICAST_Group *grp)
{
- GNUNET_free (grp->connect_env);
+ if (NULL != grp->connect_env)
+ {
+ GNUNET_MQ_discard (grp->connect_env);
+ grp->connect_env = NULL;
+ }
+ if (NULL != grp->mq)
+ {
+ GNUNET_MQ_destroy (grp->mq);
+ grp->mq = NULL;
+ }
if (NULL != grp->disconnect_cb)
+ {
grp->disconnect_cb (grp->disconnect_cls);
+ grp->disconnect_cb = NULL;
+ }
+ GNUNET_free (grp);
}
static void
-origin_cleanup (void *cls)
+group_disconnect (struct GNUNET_MULTICAST_Group *grp,
+ GNUNET_ContinuationCallback cb,
+ void *cls)
{
- struct GNUNET_MULTICAST_Origin *orig = cls;
- group_cleanup (&orig->grp);
- GNUNET_free (orig);
-}
-
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = cb;
+ grp->disconnect_cls = cls;
-static void
-member_cleanup (void *cls)
-{
- struct GNUNET_MULTICAST_Member *mem = cls;
- group_cleanup (&mem->grp);
- GNUNET_free (mem);
+ if (NULL != grp->mq)
+ {
+ struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
+ if (NULL != last)
+ {
+ GNUNET_MQ_notify_sent (last,
+ (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
+ }
+ else
+ {
+ group_cleanup (grp);
+ }
+ }
+ else
+ {
+ group_cleanup (grp);
+ }
}
}
-void
+static void
origin_connect (struct GNUNET_MULTICAST_Origin *orig);
/**
* Origin client disconnected from service.
*
- * Reconnect after backoff period.=
+ * Reconnect after backoff period.
*/
-void
+static void
origin_disconnected (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_MULTICAST_Origin *orig = cls;
grp->mq = NULL;
}
- grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
- &origin_reconnect,
+ grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+ origin_reconnect,
orig);
- grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+ grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
}
/**
* Connect to service as origin.
*/
-void
+static void
origin_connect (struct GNUNET_MULTICAST_Origin *orig)
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
- GNUNET_MQ_hd_var_size (group_message,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- struct GNUNET_MULTICAST_MessageHeader);
-
- GNUNET_MQ_hd_var_size (origin_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
- struct GNUNET_MULTICAST_RequestHeader);
-
- GNUNET_MQ_hd_fixed_size (group_fragment_ack,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- struct GNUNET_MessageHeader);
-
- GNUNET_MQ_hd_var_size (group_join_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- struct MulticastJoinRequestMessage);
-
- GNUNET_MQ_hd_fixed_size (group_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage);
-
struct GNUNET_MQ_MessageHandler handlers[] = {
- make_group_message_handler (grp),
- make_origin_request_handler (orig),
- make_group_fragment_ack_handler (grp),
- make_group_join_request_handler (grp),
- make_group_replay_request_handler (grp),
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (origin_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ struct GNUNET_MULTICAST_RequestHeader,
+ orig),
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage,
+ grp),
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
GNUNET_MQ_handler_end ()
};
grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
handlers, origin_disconnected, orig);
- if (NULL == grp->mq)
- {
- GNUNET_break (0);
- return;
- }
+ GNUNET_assert (NULL != grp->mq);
GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
}
grp->connect_env = GNUNET_MQ_msg (start,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START);
start->max_fragment_id = max_fragment_id;
- GNUNET_memcpy (&start->group_key, priv_key, sizeof (*priv_key));
+ start->group_key = *priv_key;
- grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
+ grp->is_origin = GNUNET_YES;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
grp->cb_cls = cls;
grp->join_req_cb = join_request_cb;
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = stop_cb;
- grp->disconnect_cls = stop_cls;
-
- // FIXME: wait till queued messages are sent
- if (NULL != grp->mq)
- {
- GNUNET_MQ_destroy (grp->mq);
- grp->mq = NULL;
- }
- origin_cleanup (orig);
+ group_disconnect (grp, stop_cb, stop_cls);
}
"%p OriginTransmitNotify() returned error or invalid message size.\n",
orig);
/* FIXME: handle error */
- GNUNET_free (env);
+ GNUNET_MQ_discard (env);
return;
}
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%p OriginTransmitNotify() - transmission paused.\n", orig);
- GNUNET_free (env);
+ GNUNET_MQ_discard (env);
return; /* Transmission paused. */
}
}
- void
+static void
member_connect (struct GNUNET_MULTICAST_Member *mem);
*
* Reconnect after backoff period.
*/
-void
+static void
member_disconnected (void *cls, enum GNUNET_MQ_Error error)
{
struct GNUNET_MULTICAST_Member *mem = cls;
GNUNET_MQ_destroy (grp->mq);
grp->mq = NULL;
- grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff,
- &member_reconnect,
+ grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay,
+ member_reconnect,
mem);
- grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff);
+ grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
}
/**
* Connect to service as member.
*/
-void
+static void
member_connect (struct GNUNET_MULTICAST_Member *mem)
{
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
- GNUNET_MQ_hd_var_size (group_message,
- GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
- struct GNUNET_MULTICAST_MessageHeader);
-
- GNUNET_MQ_hd_fixed_size (group_fragment_ack,
- GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
- struct GNUNET_MessageHeader);
-
- GNUNET_MQ_hd_var_size (group_join_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
- struct MulticastJoinRequestMessage);
-
- GNUNET_MQ_hd_var_size (member_join_decision,
- GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
- struct MulticastJoinDecisionMessageHeader);
-
- GNUNET_MQ_hd_fixed_size (group_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage);
-
- GNUNET_MQ_hd_var_size (member_replay_response,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
- struct MulticastReplayResponseMessage);
-
struct GNUNET_MQ_MessageHandler handlers[] = {
- make_group_message_handler (grp),
- make_group_fragment_ack_handler (grp),
- make_group_join_request_handler (grp),
- make_member_join_decision_handler (mem),
- make_group_replay_request_handler (grp),
- make_member_replay_response_handler (mem),
+ GNUNET_MQ_hd_var_size (group_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ struct GNUNET_MULTICAST_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_fixed_size (group_fragment_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
+ GNUNET_MQ_hd_var_size (group_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ struct MulticastJoinRequestMessage,
+ grp),
+ GNUNET_MQ_hd_var_size (member_join_decision,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ struct MulticastJoinDecisionMessageHeader,
+ mem),
+ GNUNET_MQ_hd_fixed_size (group_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
+ GNUNET_MQ_hd_var_size (member_replay_response,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+ struct MulticastReplayResponseMessage,
+ mem),
GNUNET_MQ_handler_end ()
};
grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast",
handlers, member_disconnected, mem);
- if (NULL == grp->mq)
- {
- GNUNET_break (0);
- return;
- }
+ GNUNET_assert (NULL != grp->mq);
GNUNET_MQ_send_copy (grp->mq, grp->connect_env);
}
if (0 < join_msg_size)
GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
- grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
+ grp->is_origin = GNUNET_NO;
+ grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
mem->join_dcsn_cb = join_decision_cb;
grp->join_req_cb = join_request_cb;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = part_cb;
- grp->disconnect_cls = part_cls;
-
mem->join_dcsn_cb = NULL;
grp->join_req_cb = NULL;
grp->message_cb = NULL;
grp->replay_msg_cb = NULL;
grp->replay_frag_cb = NULL;
- // FIXME: wait till queued messages are sent
- if (NULL != grp->mq)
- {
- GNUNET_MQ_destroy (grp->mq);
- grp->mq = NULL;
- }
- member_cleanup (mem);
+ group_disconnect (grp, part_cb, part_cls);
}
"MemberTransmitNotify() returned error or invalid message size. "
"ret=%d, buf_size=%u\n", ret, buf_size);
/* FIXME: handle error */
- GNUNET_free (req);
+ GNUNET_MQ_discard (env);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
{
/* Transmission paused. */
- GNUNET_free (req);
+ GNUNET_MQ_discard (env);
return;
}