From 319422c2d2ae7f88f931fae0bd0e7b1efe2ad68d Mon Sep 17 00:00:00 2001 From: Gabor X Toth <*@tg-x.net> Date: Thu, 4 Aug 2016 20:10:22 +0000 Subject: [PATCH] multicast: switch to MQ --- src/multicast/multicast_api.c | 59 +++++++++++++++-------------------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index db0f0e759..6fb45d722 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -27,7 +27,6 @@ #include "platform.h" #include "gnunet_util_lib.h" -#include "gnunet_mq_lib.h" #include "gnunet_multicast_service.h" #include "multicast.h" @@ -79,7 +78,7 @@ struct GNUNET_MULTICAST_Group /** * Time to wait until we try to reconnect on failure. */ - struct GNUNET_TIME_Relative reconnect_backoff; + struct GNUNET_TIME_Relative reconnect_delay; /** * Task for reconnecting when the listener fails. @@ -255,7 +254,7 @@ handle_group_join_request (void *cls, jh->peer = jreq->peer; grp->join_req_cb (grp->cb_cls, &jreq->member_pub_key, jmsg, jh); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -289,7 +288,7 @@ handle_group_message (void *cls, if (NULL != grp->message_cb) grp->message_cb (grp->cb_cls, mmsg); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -322,7 +321,7 @@ handle_group_fragment_ack (void *cls, else member_to_origin ((struct GNUNET_MULTICAST_Member *) grp); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -355,7 +354,7 @@ handle_origin_request (void *cls, if (NULL != orig->request_cb) orig->request_cb (grp->cb_cls, req); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -400,7 +399,7 @@ handle_group_replay_request (void *cls, } } - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -516,7 +515,7 @@ handle_member_join_decision (void *cls, //if (GNUNET_YES != is_admitted) // GNUNET_MULTICAST_member_part (mem); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; } @@ -693,7 +692,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, } -void +static void origin_connect (struct GNUNET_MULTICAST_Origin *orig); @@ -707,9 +706,9 @@ origin_reconnect (void *cls) /** * Origin client disconnected from service. * - * Reconnect after backoff period.= + * Reconnect after backoff period. */ -void +static void origin_disconnected (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_MULTICAST_Origin *orig = cls; @@ -724,17 +723,17 @@ origin_disconnected (void *cls, enum GNUNET_MQ_Error error) grp->mq = NULL; } - grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &origin_reconnect, orig); - grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); } /** * Connect to service as origin. */ -void +static void origin_connect (struct GNUNET_MULTICAST_Origin *orig) { struct GNUNET_MULTICAST_Group *grp = &orig->grp; @@ -770,11 +769,7 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig) grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", handlers, origin_disconnected, orig); - if (NULL == grp->mq) - { - GNUNET_break (0); - return; - } + GNUNET_assert (NULL != grp->mq); GNUNET_MQ_send_copy (grp->mq, grp->connect_env); } @@ -902,7 +897,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) "%p OriginTransmitNotify() returned error or invalid message size.\n", orig); /* FIXME: handle error */ - GNUNET_free (env); + GNUNET_MQ_discard (env); return; } @@ -910,7 +905,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) { LOG (GNUNET_ERROR_TYPE_DEBUG, "%p OriginTransmitNotify() - transmission paused.\n", orig); - GNUNET_free (env); + GNUNET_MQ_discard (env); return; /* Transmission paused. */ } @@ -1000,7 +995,7 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan } - void +static void member_connect (struct GNUNET_MULTICAST_Member *mem); @@ -1016,7 +1011,7 @@ member_reconnect (void *cls) * * Reconnect after backoff period. */ -void +static void member_disconnected (void *cls, enum GNUNET_MQ_Error error) { struct GNUNET_MULTICAST_Member *mem = cls; @@ -1028,17 +1023,17 @@ member_disconnected (void *cls, enum GNUNET_MQ_Error error) GNUNET_MQ_destroy (grp->mq); grp->mq = NULL; - grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_backoff, + grp->reconnect_task = GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &member_reconnect, mem); - grp->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (grp->reconnect_backoff); + grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); } /** * Connect to service as member. */ -void +static void member_connect (struct GNUNET_MULTICAST_Member *mem) { struct GNUNET_MULTICAST_Group *grp = &mem->grp; @@ -1079,11 +1074,7 @@ member_connect (struct GNUNET_MULTICAST_Member *mem) grp->mq = GNUNET_CLIENT_connecT (grp->cfg, "multicast", handlers, member_disconnected, mem); - if (NULL == grp->mq) - { - GNUNET_break (0); - return; - } + GNUNET_assert (NULL != grp->mq); GNUNET_MQ_send_copy (grp->mq, grp->connect_env); } @@ -1171,7 +1162,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, if (0 < join_msg_size) GNUNET_memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); - grp->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + grp->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; grp->is_origin = GNUNET_NO; grp->cfg = cfg; @@ -1326,14 +1317,14 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) "MemberTransmitNotify() returned error or invalid message size. " "ret=%d, buf_size=%u\n", ret, buf_size); /* FIXME: handle error */ - GNUNET_free (req); + GNUNET_MQ_discard (env); return; } if (GNUNET_NO == ret && 0 == buf_size) { /* Transmission paused. */ - GNUNET_free (req); + GNUNET_MQ_discard (env); return; } -- 2.25.1