const struct GNUNET_PeerIdentity *id);
+/**
+ * @brief Callback called each time a new peer was put into the sampler
+ *
+ * @param cls A possibly given closure
+ */
+typedef void
+(*SamplerNotifyUpdateCB) (void *cls);
+
+/**
+ * @brief Context for a callback. Contains callback and closure.
+ *
+ * Meant to be an entry in an DLL.
+ */
+struct SamplerNotifyUpdateCTX
+{
+ /**
+ * @brief The Callback to call on updates
+ */
+ SamplerNotifyUpdateCB notify_cb;
+
+ /**
+ * @brief The according closure.
+ */
+ void *cls;
+
+ /**
+ * @brief Next element in DLL.
+ */
+ struct SamplerNotifyUpdateCTX *next;
+
+ /**
+ * @brief Previous element in DLL.
+ */
+ struct SamplerNotifyUpdateCTX *prev;
+};
+
+
/**
* Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
*/
*/
struct GNUNET_SCHEDULER_Task *get_peer_task;
+ /**
+ * @brief Context to the given callback.
+ */
+ struct SamplerNotifyUpdateCTX *notify_ctx;
+
/**
* The callback
*/
struct RPS_SamplerRequestHandle *req_handle_head;
struct RPS_SamplerRequestHandle *req_handle_tail;
+ struct SamplerNotifyUpdateCTX *notify_ctx_head;
+ struct SamplerNotifyUpdateCTX *notify_ctx_tail;
#ifdef TO_FILE
/**
* File name to log to
static uint32_t client_get_index;
+/**
+ * @brief Add a callback that will be called when the next peer is inserted
+ * into the sampler
+ *
+ * @param sampler The sampler on which update it will be called
+ * @param notify_cb The callback
+ * @param cls Closure given to the callback
+ *
+ * @return The context containing callback and closure
+ */
+struct SamplerNotifyUpdateCTX *
+sampler_notify_on_update (struct RPS_Sampler *sampler,
+ SamplerNotifyUpdateCB notify_cb,
+ void *cls)
+{
+ struct SamplerNotifyUpdateCTX *notify_ctx;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Inserting new context for notification\n");
+ notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
+ notify_ctx->notify_cb = notify_cb;
+ notify_ctx->cls = cls;
+ if (NULL != sampler->notify_ctx_head)
+ {
+ for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+ NULL != notify_iter->next;
+ notify_iter = notify_iter->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Pre: Context\n");
+ }
+ }
+ GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
+ sampler->notify_ctx_tail,
+ notify_ctx);
+ for (struct SamplerNotifyUpdateCTX *notify_iter = sampler->notify_ctx_head;
+ NULL != notify_iter;
+ notify_iter = notify_iter->next)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Post: Context\n");
+ }
+ return notify_ctx;
+}
+
+
/**
* Callback to _get_rand_peer() used by _get_n_rand_peers().
*
/**
- * A fuction to update every sampler in the given list
+ * Update every sampler element of this sampler with given peer
*
* @param sampler the sampler to update.
* @param id the PeerID that is put in the sampler
RPS_sampler_update (struct RPS_Sampler *sampler,
const struct GNUNET_PeerIdentity *id)
{
- uint32_t i;
+ struct SamplerNotifyUpdateCTX *tmp_notify_head;
+ struct SamplerNotifyUpdateCTX *tmp_notify_tail;
to_file (sampler->file_name,
"Got %s",
GNUNET_i2s_full (id));
- for (i = 0; i < sampler->sampler_size; i++)
+ for (uint32_t i = 0; i < sampler->sampler_size; i++)
{
RPS_sampler_elem_next (sampler->sampler_elements[i],
id);
}
+ tmp_notify_head = sampler->notify_ctx_head;
+ tmp_notify_tail = sampler->notify_ctx_tail;
+ sampler->notify_ctx_head = NULL;
+ sampler->notify_ctx_tail = NULL;
+ for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
+ NULL != tmp_notify_head;
+ notify_iter = tmp_notify_head)
+ {
+ GNUNET_assert (NULL != notify_iter->notify_cb);
+ GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
+ tmp_notify_tail,
+ notify_iter);
+ notify_iter->notify_cb (notify_iter->cls);
+ GNUNET_free (notify_iter);
+ }
}
struct RPS_Sampler *sampler;
gpc->get_peer_task = NULL;
+ gpc->notify_ctx = NULL;
sampler = gpc->req_handle->sampler;
/**;
//LOG (GNUNET_ERROR_TYPE_DEBUG,
// "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
- /* FIXME no active wait - get notified, when new id arrives?
- * Might also be a freshly emptied one. Others might still contain ids.
- * Counter?
- */
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
- &sampler_get_rand_peer,
- cls);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
struct RPS_Sampler *sampler;
gpc->get_peer_task = NULL;
+ gpc->notify_ctx = NULL;
sampler = gpc->req_handle->sampler;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sampler_mod element empty, rescheduling.\n");
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
// sampler->max_round_interval);
// add a little delay
/* Schedule it one round later */
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
}
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"This s_elem saw less than two peers -- scheduling for later\n");
- GNUNET_assert (NULL == gpc->get_peer_task);
- gpc->get_peer_task =
- GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
- &sampler_mod_get_rand_peer,
- cls);
+ GNUNET_assert (NULL == gpc->notify_ctx);
+ gpc->notify_ctx =
+ sampler_notify_on_update (sampler,
+ &sampler_mod_get_rand_peer,
+ gpc);
return;
}
/* More reasons to wait could be added here */
{
GNUNET_SCHEDULER_cancel (i->get_peer_task);
}
+ if (NULL != i->notify_ctx)
+ {
+ GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
+ req_handle->sampler->notify_ctx_tail,
+ i->notify_ctx);
+ GNUNET_free (i->notify_ctx);
+ }
GNUNET_free (i);
}
GNUNET_free (req_handle->ids);