multicast, psyc, social: wait till last message sent before disconnect
authorGabor X Toth <*@tg-x.net>
Wed, 17 Aug 2016 22:28:52 +0000 (22:28 +0000)
committerGabor X Toth <*@tg-x.net>
Wed, 17 Aug 2016 22:28:52 +0000 (22:28 +0000)
src/multicast/multicast_api.c
src/multicast/test_multicast.c
src/psyc/psyc_api.c
src/social/social_api.c

index 89a9bf5e156e860c8f5f4f7b6eb7ac6c7f6d041b..ad9929dca1050f51ba1b628e9e053e2dc71881dd 100644 (file)
@@ -522,27 +522,51 @@ handle_member_join_decision (void *cls,
 static void
 group_cleanup (struct GNUNET_MULTICAST_Group *grp)
 {
-  GNUNET_MQ_discard (grp->connect_env);
+  if (NULL != grp->connect_env)
+  {
+    GNUNET_MQ_discard (grp->connect_env);
+    grp->connect_env = NULL;
+  }
+  if (NULL != grp->mq)
+  {
+    GNUNET_MQ_destroy (grp->mq);
+    grp->mq = NULL;
+  }
   if (NULL != grp->disconnect_cb)
+  {
     grp->disconnect_cb (grp->disconnect_cls);
+    grp->disconnect_cb = NULL;
+  }
+  GNUNET_free (grp);
 }
 
 
 static void
-origin_cleanup (void *cls)
+group_disconnect (struct GNUNET_MULTICAST_Group *grp,
+                  GNUNET_ContinuationCallback cb,
+                  void *cls)
 {
-  struct GNUNET_MULTICAST_Origin *orig = cls;
-  group_cleanup (&orig->grp);
-  GNUNET_free (orig);
-}
-
+  grp->is_disconnecting = GNUNET_YES;
+  grp->disconnect_cb = cb;
+  grp->disconnect_cls = cls;
 
-static void
-member_cleanup (void *cls)
-{
-  struct GNUNET_MULTICAST_Member *mem = cls;
-  group_cleanup (&mem->grp);
-  GNUNET_free (mem);
+  if (NULL != grp->mq)
+  {
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) group_cleanup, grp);
+    }
+    else
+    {
+      group_cleanup (grp);
+    }
+  }
+  else
+  {
+    group_cleanup (grp);
+  }
 }
 
 
@@ -861,17 +885,7 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
 {
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
 
-  grp->is_disconnecting = GNUNET_YES;
-  grp->disconnect_cb = stop_cb;
-  grp->disconnect_cls = stop_cls;
-
-  // FIXME: wait till queued messages are sent
-  if (NULL != grp->mq)
-  {
-    GNUNET_MQ_destroy (grp->mq);
-    grp->mq = NULL;
-  }
-  origin_cleanup (orig);
+  group_disconnect (grp, stop_cb, stop_cls);
 }
 
 
@@ -1198,23 +1212,13 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
 
-  grp->is_disconnecting = GNUNET_YES;
-  grp->disconnect_cb = part_cb;
-  grp->disconnect_cls = part_cls;
-
   mem->join_dcsn_cb = NULL;
   grp->join_req_cb = NULL;
   grp->message_cb = NULL;
   grp->replay_msg_cb = NULL;
   grp->replay_frag_cb = NULL;
 
-  // FIXME: wait till queued messages are sent
-  if (NULL != grp->mq)
-  {
-    GNUNET_MQ_destroy (grp->mq);
-    grp->mq = NULL;
-  }
-  member_cleanup (mem);
+  group_disconnect (grp, part_cb, part_cls);
 }
 
 
index 63f162d0040c7a3b3c14e5f377776a974b751d4e..a4288e93d5527c62f6990fe1f2658e71cb421684 100644 (file)
@@ -239,7 +239,7 @@ member_recv_join_request (void *cls,
                           const struct GNUNET_MessageHeader *join_msg,
                           struct GNUNET_MULTICAST_JoinHandle *jh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_recv_join_request()\n", test);
 }
 
@@ -247,7 +247,7 @@ member_recv_join_request (void *cls,
 static void
 origin_stopped (void *cls)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_stopped()\n", test);
   end ();
 }
@@ -267,7 +267,7 @@ schedule_origin_stop (void *cls)
 static void
 member_parted (void *cls)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_parted()\n", test);
   member = NULL;
 
@@ -283,7 +283,7 @@ member_parted (void *cls)
 
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Invalid test #%d in member_recv_join_decision()\n", test);
+                "Invalid test #%d in member_parted()\n", test);
     GNUNET_assert (0);
   }
 }
@@ -292,7 +292,7 @@ member_parted (void *cls)
 static void
 schedule_member_part (void *cls)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: schedule_member_part()\n", test);
   GNUNET_MULTICAST_member_part (member, member_parted, NULL);
 }
@@ -312,7 +312,7 @@ static void
 member_replay_ok ()
 {
   test = TEST_MEMBER_REPLAY_OK;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_replay_ok()\n", test);
   replay_fragment_id = 1;
   replay_flags = 1 | 1<<11;
@@ -325,7 +325,7 @@ static void
 member_replay_error ()
 {
   test = TEST_MEMBER_REPLAY_ERROR;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_replay_error()\n", test);
   replay_fragment_id = 1234;
   replay_flags = 11 | 1<<11;
@@ -342,7 +342,7 @@ origin_recv_replay_msg (void *cls,
                         uint64_t flags,
                         struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_recv_replay_msg()\n", test);
   GNUNET_assert (0);
 }
@@ -356,7 +356,7 @@ member_recv_replay_msg (void *cls,
                         uint64_t flags,
                         struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_recv_replay_msg()\n", test);
   GNUNET_assert (0);
 }
@@ -369,7 +369,7 @@ origin_recv_replay_frag (void *cls,
                          uint64_t flags,
                          struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_recv_replay_frag()"
               " - fragment_id=%" PRIu64 " flags=%" PRIu64 "\n",
               test, fragment_id, flags);
@@ -416,7 +416,7 @@ member_recv_replay_frag (void *cls,
                          uint64_t flags,
                          struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_recv_replay_frag()\n", test);
   GNUNET_assert (0);
 }
@@ -427,7 +427,7 @@ origin_recv_request (void *cls,
                      const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   struct OriginClosure *ocls = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_recv_request()\n", test);
   if (++ocls->n != ocls->msgs_expected)
     return;
@@ -446,7 +446,7 @@ static void
 member_to_origin ()
 {
   test = TEST_MEMBER_TO_ORIGIN;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_to_origin()\n", test);
 
   struct TransmitClosure *tmit = &tmit_cls;
@@ -471,7 +471,7 @@ member_recv_message (void *cls,
 {
   struct MemberClosure *mcls = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_recv_message() %u/%u\n",
               test,
               (unsigned int) (mcls->n + 1),
@@ -505,7 +505,7 @@ origin_recv_message (void *cls,
                      const struct GNUNET_MULTICAST_MessageHeader *msg)
 {
   struct OriginClosure *ocls = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_recv_message() %u/%u\n",
               test, ocls->n + 1, ocls->msgs_expected);
   if (++ocls->n != ocls->msgs_expected)
@@ -562,7 +562,7 @@ member_recv_join_decision (void *cls,
                            const struct GNUNET_PeerIdentity *relays,
                            const struct GNUNET_MessageHeader *join_msg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: member_recv_join_decision() - is_admitted: %d\n",
               test, is_admitted);
 
@@ -597,7 +597,7 @@ origin_recv_join_request (void *cls,
                           const struct GNUNET_MessageHeader *join_msg,
                           struct GNUNET_MULTICAST_JoinHandle *jh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Test #%u: origin_recv_join_request()\n", test);
 
   GNUNET_assert (0 == memcmp (mem_key, &member_pub_key, sizeof (member_pub_key)));
index 2f6a15bab8f3b96ee52ac92326e454f66d4ab4b1..8e960c7be4e290ee7f8436bb0402f2ee65bbe84b 100644 (file)
@@ -548,29 +548,17 @@ channel_cleanup (struct GNUNET_PSYC_Channel *chn)
     GNUNET_MQ_discard (chn->connect_env);
     chn->connect_env = NULL;
   }
+  if (NULL != chn->mq)
+  {
+    GNUNET_MQ_destroy (chn->mq);
+    chn->mq = NULL;
+  }
   if (NULL != chn->disconnect_cb)
   {
     chn->disconnect_cb (chn->disconnect_cls);
     chn->disconnect_cb = NULL;
   }
-}
-
-
-static void
-master_cleanup (void *cls)
-{
-  struct GNUNET_PSYC_Master *mst = cls;
-  channel_cleanup (&mst->chn);
-  GNUNET_free (mst);
-}
-
-
-static void
-slave_cleanup (void *cls)
-{
-  struct GNUNET_PSYC_Slave *slv = cls;
-  channel_cleanup (&slv->chn);
-  GNUNET_free (slv);
+  GNUNET_free (chn);
 }
 
 
@@ -583,11 +571,22 @@ channel_disconnect (struct GNUNET_PSYC_Channel *chn,
   chn->disconnect_cb = cb;
   chn->disconnect_cls = cls;
 
-  // FIXME: wait till queued messages are sent
   if (NULL != chn->mq)
   {
-    GNUNET_MQ_destroy (chn->mq);
-    chn->mq = NULL;
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (chn->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) channel_cleanup, chn);
+    }
+    else
+    {
+      channel_cleanup (chn);
+    }
+  }
+  else
+  {
+    channel_cleanup (chn);
   }
 }
 
@@ -772,7 +771,6 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst,
   /* FIXME: send msg to service */
 
   channel_disconnect (chn, stop_cb, stop_cls);
-  master_cleanup (mst);
 }
 
 
@@ -1107,7 +1105,6 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv,
   /* FIXME: send msg to service */
 
   channel_disconnect (chn, part_cb, part_cls);
-  slave_cleanup (slv);
 }
 
 
index 9f15b4146562fed0702c307b858b9ed7b48854b7..c08de0356532a1563bbfa53f3d6498fd093e9a66 100644 (file)
@@ -359,7 +359,6 @@ struct GNUNET_SOCIAL_LookHandle
 
 struct ZoneAddPlaceHandle
 {
-  struct ZoneAddPlaceRequest *req;
   GNUNET_ResultCallback result_cb;
   void *result_cls;
 };
@@ -1007,6 +1006,28 @@ handle_app_place_end (void *cls,
 }
 
 
+/*** CLEANUP / DISCONNECT ***/
+
+
+static void
+host_cleanup (struct GNUNET_SOCIAL_Host *hst)
+{
+  if (NULL != hst->slicer)
+  {
+    GNUNET_PSYC_slicer_destroy (hst->slicer);
+    hst->slicer = NULL;
+  }
+  GNUNET_free (hst);
+}
+
+
+static void
+guest_cleanup (struct GNUNET_SOCIAL_Guest *gst)
+{
+  GNUNET_free (gst);
+}
+
+
 static void
 place_cleanup (struct GNUNET_SOCIAL_Place *plc)
 {
@@ -1027,32 +1048,59 @@ place_cleanup (struct GNUNET_SOCIAL_Place *plc)
     GNUNET_MQ_discard (plc->connect_env);
     plc->connect_env = NULL;
   }
+  if (NULL != plc->mq)
+  {
+    GNUNET_MQ_destroy (plc->mq);
+    plc->mq = NULL;
+  }
   if (NULL != plc->disconnect_cb)
   {
     plc->disconnect_cb (plc->disconnect_cls);
     plc->disconnect_cb = NULL;
   }
+
+  (GNUNET_YES == plc->is_host)
+    ? host_cleanup ((struct GNUNET_SOCIAL_Host *) plc)
+    : guest_cleanup ((struct GNUNET_SOCIAL_Guest *) plc);
 }
 
 
-static void
-host_cleanup (struct GNUNET_SOCIAL_Host *hst)
+void
+place_disconnect (struct GNUNET_SOCIAL_Place *plc,
+                  GNUNET_ContinuationCallback disconnect_cb,
+                  void *disconnect_cls)
 {
-  place_cleanup (&hst->plc);
-  if (NULL != hst->slicer)
+  plc->disconnect_cb = disconnect_cb;
+  plc->disconnect_cls = disconnect_cls;
+
+  if (NULL != plc->mq)
   {
-    GNUNET_PSYC_slicer_destroy (hst->slicer);
-    hst->slicer = NULL;
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (plc->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) place_cleanup, plc);
+    }
+    else
+    {
+      place_cleanup (plc);
+    }
+  }
+  else
+  {
+    place_cleanup (plc);
   }
-  GNUNET_free (hst);
 }
 
 
-static void
-guest_cleanup (struct GNUNET_SOCIAL_Guest *gst)
+void
+place_leave (struct GNUNET_SOCIAL_Place *plc)
 {
-  place_cleanup (&gst->plc);
-  GNUNET_free (gst);
+  struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *
+    env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE);
+
+  GNUNET_MQ_send (plc->mq, env);
 }
 
 
@@ -1518,34 +1566,6 @@ GNUNET_SOCIAL_host_get_place (struct GNUNET_SOCIAL_Host *hst)
 }
 
 
-void
-place_leave (struct GNUNET_SOCIAL_Place *plc)
-{
-  struct GNUNET_MessageHeader *msg;
-  struct GNUNET_MQ_Envelope *
-    env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SOCIAL_PLACE_LEAVE);
-
-  GNUNET_MQ_send (plc->mq, env);
-}
-
-
-void
-place_disconnect (struct GNUNET_SOCIAL_Place *plc,
-                  GNUNET_ContinuationCallback disconnect_cb,
-                  void *disconnect_cls)
-{
-  plc->disconnect_cb = disconnect_cb;
-  plc->disconnect_cls = disconnect_cls;
-
-  // FIXME: wait till queued messages are sent
-  if (NULL != plc->mq)
-  {
-    GNUNET_MQ_destroy (plc->mq);
-    plc->mq = NULL;
-  }
-}
-
-
 /**
  * Disconnect from a home.
  *
@@ -1560,7 +1580,6 @@ GNUNET_SOCIAL_host_disconnect (struct GNUNET_SOCIAL_Host *hst,
                                void *cls)
 {
   place_disconnect (&hst->plc, disconnect_cb, cls);
-  host_cleanup (hst);
 }
 
 
@@ -2016,7 +2035,6 @@ GNUNET_SOCIAL_guest_disconnect (struct GNUNET_SOCIAL_Guest *gst,
                                 void *cls)
 {
   place_disconnect (&gst->plc, disconnect_cb, cls);
-  guest_cleanup (gst);
 }
 
 
@@ -2363,7 +2381,6 @@ op_recv_zone_add_place_result (void *cls, int64_t result,
   if (NULL != add_plc->result_cb)
     add_plc->result_cb (add_plc->result_cls, result, err_msg, err_msg_size);
 
-  GNUNET_free (add_plc->req);
   GNUNET_free (add_plc);
 }
 
@@ -2435,7 +2452,6 @@ GNUNET_SOCIAL_zone_add_place (const struct GNUNET_SOCIAL_App *app,
   GNUNET_memcpy (p, relays, relay_size);
 
   struct ZoneAddPlaceHandle * add_plc = GNUNET_malloc (sizeof (*add_plc));
-  add_plc->req = preq;
   add_plc->result_cb = result_cb;
   add_plc->result_cls = result_cls;
 
@@ -2657,6 +2673,22 @@ GNUNET_SOCIAL_app_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
 }
 
 
+static void
+app_cleanup (struct GNUNET_SOCIAL_App *app)
+{
+  if (NULL != app->mq)
+  {
+    GNUNET_MQ_destroy (app->mq);
+    app->mq = NULL;
+  }
+  if (NULL != app->disconnect_cb)
+  {
+    app->disconnect_cb (app->disconnect_cls);
+    app->disconnect_cb = NULL;
+  }
+  GNUNET_free (app);
+}
+
 /**
  * Disconnect application.
  *
@@ -2672,15 +2704,26 @@ GNUNET_SOCIAL_app_disconnect (struct GNUNET_SOCIAL_App *app,
                               GNUNET_ContinuationCallback disconnect_cb,
                               void *disconnect_cls)
 {
-  // FIXME: wait till queued messages are sent
+  app->disconnect_cb = disconnect_cb;
+  app->disconnect_cls = disconnect_cls;
+
   if (NULL != app->mq)
   {
-    GNUNET_MQ_destroy (app->mq);
-    app->mq = NULL;
+    struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (app->mq);
+    if (NULL != last)
+    {
+      GNUNET_MQ_notify_sent (last,
+                             (GNUNET_MQ_NotifyCallback) app_cleanup, app);
+    }
+    else
+    {
+      app_cleanup (app);
+    }
+  }
+  else
+  {
+    app_cleanup (app);
   }
-
-  if (NULL != disconnect_cb)
-    disconnect_cb (disconnect_cls);
 }