Move from timer-based to callback-based updates in sampler
authorJulius Bünger <buenger@mytum.de>
Thu, 20 Sep 2018 20:34:18 +0000 (22:34 +0200)
committerJulius Bünger <buenger@mytum.de>
Thu, 20 Sep 2018 20:35:30 +0000 (22:35 +0200)
src/rps/gnunet-service-rps_sampler.c
src/rps/gnunet-service-rps_sampler.h

index 0de15bbc0438c08d609b0380663f0625ec84225d..ff4bc9e4237e4709d6d1e21d8bd8143c570c4463 100644 (file)
@@ -63,6 +63,43 @@ typedef void
                                      const struct GNUNET_PeerIdentity *id);
 
 
+/**
+ * @brief Callback called each time a new peer was put into the sampler
+ *
+ * @param cls A possibly given closure
+ */
+typedef void
+(*SamplerNotifyUpdateCB) (void *cls);
+
+/**
+ * @brief Context for a callback. Contains callback and closure.
+ *
+ * Meant to be an entry in an DLL.
+ */
+struct SamplerNotifyUpdateCTX
+{
+  /**
+   * @brief The Callback to call on updates
+   */
+  SamplerNotifyUpdateCB notify_cb;
+
+  /**
+   * @brief The according closure.
+   */
+  void *cls;
+
+  /**
+   * @brief Next element in DLL.
+   */
+  struct SamplerNotifyUpdateCTX *next;
+
+  /**
+   * @brief Previous element in DLL.
+   */
+  struct SamplerNotifyUpdateCTX *prev;
+};
+
+
 /**
  * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
  */
@@ -84,6 +121,11 @@ struct GetPeerCls
    */
   struct GNUNET_SCHEDULER_Task *get_peer_task;
 
+  /**
+   * @brief Context to the given callback.
+   */
+  struct SamplerNotifyUpdateCTX *notify_ctx;
+
   /**
    * The callback
    */
@@ -164,6 +206,8 @@ struct RPS_Sampler
   struct RPS_SamplerRequestHandle *req_handle_head;
   struct RPS_SamplerRequestHandle *req_handle_tail;
 
+  struct SamplerNotifyUpdateCTX *notify_ctx_head;
+  struct SamplerNotifyUpdateCTX *notify_ctx_tail;
   #ifdef TO_FILE
   /**
    * File name to log to
@@ -247,6 +291,52 @@ static size_t max_size;
 static uint32_t client_get_index;
 
 
+/**
+ * @brief Add a callback that will be called when the next peer is inserted
+ * into the sampler
+ *
+ * @param sampler The sampler on which update it will be called
+ * @param notify_cb The callback
+ * @param cls Closure given to the callback
+ *
+ * @return The context containing callback and closure
+ */
+struct SamplerNotifyUpdateCTX *
+sampler_notify_on_update (struct RPS_Sampler *sampler,
+                          SamplerNotifyUpdateCB notify_cb,
+                          void *cls)
+{
+  struct SamplerNotifyUpdateCTX *notify_ctx;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "Inserting new context for notification\n");
+  notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
+  notify_ctx->notify_cb = notify_cb;
+  notify_ctx->cls = cls;
+  if (NULL != sampler->notify_ctx_head)
+  {
+    for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+        NULL != notify_iter->next;
+        notify_iter = notify_iter->next)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          "Pre: Context\n");
+    }
+  }
+  GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
+                               sampler->notify_ctx_tail,
+                               notify_ctx);
+  for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+       NULL != notify_iter;
+       notify_iter = notify_iter->next)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Post: Context\n");
+  }
+  return notify_ctx;
+}
+
+
 /**
  * Callback to _get_rand_peer() used by _get_n_rand_peers().
  *
@@ -469,7 +559,7 @@ RPS_sampler_mod_init (size_t init_size,
 
 
 /**
- * A fuction to update every sampler in the given list
+ * Update every sampler element of this sampler with given peer
  *
  * @param sampler the sampler to update.
  * @param id the PeerID that is put in the sampler
@@ -478,17 +568,33 @@ RPS_sampler_mod_init (size_t init_size,
 RPS_sampler_update (struct RPS_Sampler *sampler,
                     const struct GNUNET_PeerIdentity *id)
 {
-  uint32_t i;
+  struct SamplerNotifyUpdateCTX *tmp_notify_head;
+  struct SamplerNotifyUpdateCTX *tmp_notify_tail;
 
   to_file (sampler->file_name,
            "Got %s",
            GNUNET_i2s_full (id));
 
-  for (i = 0; i < sampler->sampler_size; i++)
+  for (uint32_t i = 0; i < sampler->sampler_size; i++)
   {
     RPS_sampler_elem_next (sampler->sampler_elements[i],
                            id);
   }
+  tmp_notify_head = sampler->notify_ctx_head;
+  tmp_notify_tail = sampler->notify_ctx_tail;
+  sampler->notify_ctx_head = NULL;
+  sampler->notify_ctx_tail = NULL;
+  for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
+       NULL != tmp_notify_head;
+       notify_iter = tmp_notify_head)
+  {
+    GNUNET_assert (NULL != notify_iter->notify_cb);
+    GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
+                                 tmp_notify_tail,
+                                 notify_iter);
+    notify_iter->notify_cb (notify_iter->cls);
+    GNUNET_free (notify_iter);
+  }
 }
 
 
@@ -535,6 +641,7 @@ sampler_get_rand_peer (void *cls)
   struct RPS_Sampler *sampler;
 
   gpc->get_peer_task = NULL;
+  gpc->notify_ctx = NULL;
   sampler = gpc->req_handle->sampler;
 
   /**;
@@ -549,15 +656,10 @@ sampler_get_rand_peer (void *cls)
     //LOG (GNUNET_ERROR_TYPE_DEBUG,
     //     "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
 
-    /* FIXME no active wait - get notified, when new id arrives?
-     * Might also be a freshly emptied one. Others might still contain ids.
-     * Counter?
-     */
-    gpc->get_peer_task =
-      GNUNET_SCHEDULER_add_delayed (
-          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
-          &sampler_get_rand_peer,
-          cls);
+    gpc->notify_ctx =
+      sampler_notify_on_update (sampler,
+                                &sampler_mod_get_rand_peer,
+                                gpc);
     return;
   }
 
@@ -585,6 +687,7 @@ sampler_mod_get_rand_peer (void *cls)
   struct RPS_Sampler *sampler;
 
   gpc->get_peer_task = NULL;
+  gpc->notify_ctx = NULL;
   sampler = gpc->req_handle->sampler;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
@@ -600,11 +703,11 @@ sampler_mod_get_rand_peer (void *cls)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Sampler_mod element empty, rescheduling.\n");
-    GNUNET_assert (NULL == gpc->get_peer_task);
-    gpc->get_peer_task =
-      GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
-                                    &sampler_mod_get_rand_peer,
-                                    cls);
+    GNUNET_assert (NULL == gpc->notify_ctx);
+    gpc->notify_ctx =
+      sampler_notify_on_update (sampler,
+                                &sampler_mod_get_rand_peer,
+                                gpc);
     return;
   }
 
@@ -626,11 +729,11 @@ sampler_mod_get_rand_peer (void *cls)
       //                                       sampler->max_round_interval);
       // add a little delay
       /* Schedule it one round later */
-      GNUNET_assert (NULL == gpc->get_peer_task);
-      gpc->get_peer_task =
-        GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
-                                      &sampler_mod_get_rand_peer,
-                                      cls);
+      GNUNET_assert (NULL == gpc->notify_ctx);
+      gpc->notify_ctx =
+        sampler_notify_on_update (sampler,
+                                  &sampler_mod_get_rand_peer,
+                                  gpc);
       return;
     }
   }
@@ -638,11 +741,11 @@ sampler_mod_get_rand_peer (void *cls)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "This s_elem saw less than two peers -- scheduling for later\n");
-    GNUNET_assert (NULL == gpc->get_peer_task);
-    gpc->get_peer_task =
-      GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
-                                    &sampler_mod_get_rand_peer,
-                                    cls);
+    GNUNET_assert (NULL == gpc->notify_ctx);
+    gpc->notify_ctx =
+      sampler_notify_on_update (sampler,
+                                &sampler_mod_get_rand_peer,
+                                gpc);
     return;
   }
   /* More reasons to wait could be added here */
@@ -747,6 +850,13 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
     {
       GNUNET_SCHEDULER_cancel (i->get_peer_task);
     }
+    if (NULL != i->notify_ctx)
+    {
+      GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
+                                   req_handle->sampler->notify_ctx_tail,
+                                   i->notify_ctx);
+      GNUNET_free (i->notify_ctx);
+    }
     GNUNET_free (i);
   }
   GNUNET_free (req_handle->ids);
index 6b386596f4cdc9d2df082f5a3e9cd6e20d34d64b..f33aa6eb1cabeaf35b68f5b4c66eaa16a4ee51f7 100644 (file)
@@ -96,7 +96,7 @@ RPS_sampler_mod_init (size_t init_size,
 
 
 /**
- * A fuction to update every sampler in the given list
+ * Update every sampler element of this sampler with given peer
  *
  * @param sampler the sampler to update.
  * @param id the PeerID that is put in the sampler