Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / rps / rps-sampler_common.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/rps-sampler_common.c
21  * @brief Code common to client and service sampler
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_util_lib.h"
26 #include "gnunet_statistics_service.h"
27
28 #include "rps-sampler_common.h"
29 #include "gnunet-service-rps_sampler_elem.h"
30
31 #include <math.h>
32 #include <inttypes.h>
33
34 #include "rps-test_util.h"
35
36 #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler_common",__VA_ARGS__)
37
38 /**
39  * @brief Context for a callback. Contains callback and closure.
40  *
41  * Meant to be an entry in an DLL.
42  */
43 struct SamplerNotifyUpdateCTX
44 {
45   /**
46    * @brief The Callback to call on updates
47    */
48   SamplerNotifyUpdateCB notify_cb;
49
50   /**
51    * @brief The according closure.
52    */
53   void *cls;
54
55   /**
56    * @brief Next element in DLL.
57    */
58   struct SamplerNotifyUpdateCTX *next;
59
60   /**
61    * @brief Previous element in DLL.
62    */
63   struct SamplerNotifyUpdateCTX *prev;
64 };
65
66
67 /**
68  * Closure to _get_n_rand_peers_ready_cb()
69  */
70 struct RPS_SamplerRequestHandle
71 {
72   /**
73    * DLL
74    */
75   struct RPS_SamplerRequestHandle *next;
76   struct RPS_SamplerRequestHandle *prev;
77
78   /**
79    * Number of peers we are waiting for.
80    */
81   uint32_t num_peers;
82
83   /**
84    * Number of peers we currently have.
85    */
86   uint32_t cur_num_peers;
87
88   /**
89    * Pointer to the array holding the ids.
90    */
91   struct GNUNET_PeerIdentity *ids;
92
93   /**
94    * Head and tail for the DLL to store the tasks for single requests
95    */
96   struct GetPeerCls *gpc_head;
97   struct GetPeerCls *gpc_tail;
98
99   /**
100    * Sampler.
101    */
102   struct RPS_Sampler *sampler;
103
104   /**
105    * Callback to be called when all ids are available.
106    */
107   RPS_sampler_n_rand_peers_ready_cb callback;
108
109   /**
110    * Closure given to the callback
111    */
112   void *cls;
113 };
114
115
116 /**
117  * @brief Add a callback that will be called when the next peer is inserted
118  * into the sampler
119  *
120  * @param sampler The sampler on which update it will be called
121  * @param notify_cb The callback
122  * @param cls Closure given to the callback
123  *
124  * @return The context containing callback and closure
125  */
126 struct SamplerNotifyUpdateCTX *
127 sampler_notify_on_update (struct RPS_Sampler *sampler,
128                           SamplerNotifyUpdateCB notify_cb,
129                           void *cls)
130 {
131   struct SamplerNotifyUpdateCTX *notify_ctx;
132
133   LOG (GNUNET_ERROR_TYPE_DEBUG,
134       "Inserting new context for notification\n");
135   notify_ctx = GNUNET_new (struct SamplerNotifyUpdateCTX);
136   notify_ctx->notify_cb = notify_cb;
137   notify_ctx->cls = cls;
138   GNUNET_CONTAINER_DLL_insert (sampler->notify_ctx_head,
139                                sampler->notify_ctx_tail,
140                                notify_ctx);
141   return notify_ctx;
142 }
143
144
145 /**
146  * Get the size of the sampler.
147  *
148  * @param sampler the sampler to return the size of.
149  * @return the size of the sampler
150  */
151 unsigned int
152 RPS_sampler_get_size (struct RPS_Sampler *sampler)
153 {
154   return sampler->sampler_size;
155 }
156
157
158 /**
159  * @brief Notify about update of the sampler.
160  *
161  * Call the callbacks that are waiting for notification on updates to the
162  * sampler.
163  *
164  * @param sampler The sampler the updates are waiting for
165  */
166 static void
167 notify_update (struct RPS_Sampler *sampler)
168 {
169   struct SamplerNotifyUpdateCTX *tmp_notify_head;
170   struct SamplerNotifyUpdateCTX *tmp_notify_tail;
171
172   LOG (GNUNET_ERROR_TYPE_DEBUG,
173       "Calling callbacks waiting for update notification.\n");
174   tmp_notify_head = sampler->notify_ctx_head;
175   tmp_notify_tail = sampler->notify_ctx_tail;
176   sampler->notify_ctx_head = NULL;
177   sampler->notify_ctx_tail = NULL;
178   for (struct SamplerNotifyUpdateCTX *notify_iter = tmp_notify_head;
179        NULL != tmp_notify_head;
180        notify_iter = tmp_notify_head)
181   {
182     GNUNET_assert (NULL != notify_iter->notify_cb);
183     GNUNET_CONTAINER_DLL_remove (tmp_notify_head,
184                                  tmp_notify_tail,
185                                  notify_iter);
186     notify_iter->notify_cb (notify_iter->cls);
187     GNUNET_free (notify_iter);
188   }
189 }
190
191
192 /**
193  * Update every sampler element of this sampler with given peer
194  *
195  * @param sampler the sampler to update.
196  * @param id the PeerID that is put in the sampler
197  */
198   void
199 RPS_sampler_update (struct RPS_Sampler *sampler,
200                     const struct GNUNET_PeerIdentity *id)
201 {
202   to_file (sampler->file_name,
203            "Got %s",
204            GNUNET_i2s_full (id));
205
206   for (uint32_t i = 0; i < sampler->sampler_size; i++)
207   {
208     RPS_sampler_elem_next (sampler->sampler_elements[i],
209                            id);
210   }
211   notify_update (sampler);
212 }
213
214
215 /**
216  * Reinitialise all previously initialised sampler elements with the given value.
217  *
218  * Used to get rid of a PeerID.
219  *
220  * @param sampler the sampler to reinitialise a sampler element in.
221  * @param id the id of the sampler elements to update.
222  */
223   void
224 RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
225                                    const struct GNUNET_PeerIdentity *id)
226 {
227   uint32_t i;
228
229   for (i = 0; i < sampler->sampler_size; i++)
230   {
231     if (0 == GNUNET_CRYPTO_cmp_peer_identity(id,
232           &(sampler->sampler_elements[i]->peer_id)) )
233     {
234       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
235       to_file (sampler->sampler_elements[i]->file_name,
236                "--- non-active");
237       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
238     }
239   }
240 }
241
242
243 /**
244  * Counts how many Samplers currently hold a given PeerID.
245  *
246  * @param sampler the sampler to count ids in.
247  * @param id the PeerID to count.
248  *
249  * @return the number of occurrences of id.
250  */
251   uint32_t
252 RPS_sampler_count_id (struct RPS_Sampler *sampler,
253                       const struct GNUNET_PeerIdentity *id)
254 {
255   uint32_t count;
256   uint32_t i;
257
258   count = 0;
259   for ( i = 0 ; i < sampler->sampler_size ; i++ )
260   {
261     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
262         && EMPTY != sampler->sampler_elements[i]->is_empty)
263       count++;
264   }
265   return count;
266 }
267
268
269 /**
270  * Grow or shrink the size of the sampler.
271  *
272  * @param sampler the sampler to resize.
273  * @param new_size the new size of the sampler
274  */
275 static void
276 sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
277 {
278   unsigned int old_size;
279   uint32_t i;
280
281   // TODO check min and max size
282
283   old_size = sampler->sampler_size;
284
285   if (old_size > new_size)
286   { /* Shrinking */
287
288     LOG (GNUNET_ERROR_TYPE_DEBUG,
289          "Shrinking sampler %d -> %d\n",
290          old_size,
291          new_size);
292
293     to_file (sampler->file_name,
294          "Shrinking sampler %d -> %d",
295          old_size,
296          new_size);
297
298     for (i = new_size ; i < old_size ; i++)
299     {
300       to_file (sampler->file_name,
301                "-%" PRIu32 ": %s",
302                i,
303                sampler->sampler_elements[i]->file_name);
304       RPS_sampler_elem_destroy (sampler->sampler_elements[i]);
305     }
306
307     GNUNET_array_grow (sampler->sampler_elements,
308                        sampler->sampler_size,
309                        new_size);
310     LOG (GNUNET_ERROR_TYPE_DEBUG,
311          "sampler->sampler_elements now points to %p\n",
312          sampler->sampler_elements);
313
314   }
315   else if (old_size < new_size)
316   { /* Growing */
317     LOG (GNUNET_ERROR_TYPE_DEBUG,
318          "Growing sampler %d -> %d\n",
319          old_size,
320          new_size);
321
322     to_file (sampler->file_name,
323          "Growing sampler %d -> %d",
324          old_size,
325          new_size);
326
327     GNUNET_array_grow (sampler->sampler_elements,
328         sampler->sampler_size,
329         new_size);
330
331     for (i = old_size ; i < new_size ; i++)
332     { /* Add new sampler elements */
333       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
334
335       to_file (sampler->file_name,
336                "+%" PRIu32 ": %s",
337                i,
338                sampler->sampler_elements[i]->file_name);
339     }
340   }
341   else
342   {
343     LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
344     return;
345   }
346
347   GNUNET_assert (sampler->sampler_size == new_size);
348 }
349
350
351 /**
352  * Grow or shrink the size of the sampler.
353  *
354  * @param sampler the sampler to resize.
355  * @param new_size the new size of the sampler
356  */
357 void
358 RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
359 {
360   GNUNET_assert (0 < new_size);
361   sampler_resize (sampler, new_size);
362 }
363
364
365 /**
366  * Empty the sampler.
367  *
368  * @param sampler the sampler to empty.
369  * @param new_size the new size of the sampler
370  */
371 static void
372 sampler_empty (struct RPS_Sampler *sampler)
373 {
374   sampler_resize (sampler, 0);
375 }
376
377
378 /**
379  * Callback to _get_rand_peer() used by _get_n_rand_peers().
380  *
381  * Checks whether all n peers are available. If they are,
382  * give those back.
383  */
384 static void
385 check_n_peers_ready (void *cls,
386                      const struct GNUNET_PeerIdentity *id)
387 {
388   struct RPS_SamplerRequestHandle *req_handle = cls;
389   (void) id;
390   RPS_sampler_n_rand_peers_ready_cb tmp_cb;
391   struct GNUNET_PeerIdentity *peers;
392   uint32_t num_peers;
393   void *cb_cls;
394
395   req_handle->cur_num_peers++;
396   LOG (GNUNET_ERROR_TYPE_DEBUG,
397       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
398       req_handle->cur_num_peers, req_handle->num_peers);
399
400   if (req_handle->num_peers == req_handle->cur_num_peers)
401   { /* All peers are ready -- return those to the client */
402     GNUNET_assert (NULL != req_handle->callback);
403
404     LOG (GNUNET_ERROR_TYPE_DEBUG,
405         "returning %" PRIX32 " peers to the client\n",
406         req_handle->num_peers);
407
408     /* Copy pointers and peers temporarily as they
409      * might be deleted from within the callback */
410     tmp_cb = req_handle->callback;
411     num_peers = req_handle->num_peers;
412     peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
413     GNUNET_memcpy (peers,
414                    req_handle->ids,
415                    num_peers * sizeof (struct GNUNET_PeerIdentity));
416     cb_cls = req_handle->cls;
417     RPS_sampler_request_cancel (req_handle);
418     req_handle = NULL;
419     tmp_cb (peers, num_peers, cb_cls);
420     GNUNET_free (peers);
421   }
422 }
423
424
425 /**
426  * Get n random peers out of the sampled peers.
427  *
428  * We might want to reinitialise this sampler after giving the
429  * corrsponding peer to the client.
430  * Random with or without consumption?
431  *
432  * @param sampler the sampler to get peers from.
433  * @param cb callback that will be called once the ids are ready.
434  * @param cls closure given to @a cb
435  * @param for_client #GNUNET_YES if result is used for client,
436  *                   #GNUNET_NO if used internally
437  * @param num_peers the number of peers requested
438  */
439 struct RPS_SamplerRequestHandle *
440 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
441                               uint32_t num_peers,
442                               RPS_sampler_n_rand_peers_ready_cb cb,
443                               void *cls)
444 {
445   uint32_t i;
446   struct RPS_SamplerRequestHandle *req_handle;
447   struct GetPeerCls *gpc;
448
449   GNUNET_assert (0 != sampler->sampler_size);
450   if (0 == num_peers)
451     return NULL;
452
453   // TODO check if we have too much (distinct) sampled peers
454   req_handle = GNUNET_new (struct RPS_SamplerRequestHandle);
455   req_handle->num_peers = num_peers;
456   req_handle->cur_num_peers = 0;
457   req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
458   req_handle->sampler = sampler;
459   req_handle->callback = cb;
460   req_handle->cls = cls;
461   GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head,
462                                sampler->req_handle_tail,
463                                req_handle);
464
465   LOG (GNUNET_ERROR_TYPE_DEBUG,
466       "Scheduling requests for %" PRIu32 " peers\n", num_peers);
467
468   for (i = 0; i < num_peers; i++)
469   {
470     gpc = GNUNET_new (struct GetPeerCls);
471     gpc->req_handle = req_handle;
472     gpc->cont = check_n_peers_ready;
473     gpc->cont_cls = req_handle;
474     gpc->id = &req_handle->ids[i];
475
476     GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head,
477                                  req_handle->gpc_tail,
478                                  gpc);
479     // maybe add a little delay
480     gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers,
481                                                    gpc);
482   }
483   return req_handle;
484 }
485
486 /**
487  * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb.
488  *
489  * @param req_handle the handle to the request
490  */
491 void
492 RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle)
493 {
494   struct GetPeerCls *i;
495
496   while (NULL != (i = req_handle->gpc_head) )
497   {
498     GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head,
499                                  req_handle->gpc_tail,
500                                  i);
501     if (NULL != i->get_peer_task)
502     {
503       GNUNET_SCHEDULER_cancel (i->get_peer_task);
504     }
505     if (NULL != i->notify_ctx)
506     {
507       GNUNET_CONTAINER_DLL_remove (req_handle->sampler->notify_ctx_head,
508                                    req_handle->sampler->notify_ctx_tail,
509                                    i->notify_ctx);
510       GNUNET_free (i->notify_ctx);
511       i->notify_ctx = NULL;
512     }
513     GNUNET_free (i);
514   }
515   GNUNET_free (req_handle->ids);
516   req_handle->ids = NULL;
517   GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head,
518                                req_handle->sampler->req_handle_tail,
519                                req_handle);
520   GNUNET_free (req_handle);
521 }
522
523
524 /**
525  * Cleans the sampler.
526  */
527   void
528 RPS_sampler_destroy (struct RPS_Sampler *sampler)
529 {
530   if (NULL != sampler->req_handle_head)
531   {
532     LOG (GNUNET_ERROR_TYPE_WARNING,
533         "There are still pending requests. Going to remove them.\n");
534     while (NULL != sampler->req_handle_head)
535     {
536       RPS_sampler_request_cancel (sampler->req_handle_head);
537     }
538   }
539   sampler_empty (sampler);
540   GNUNET_free (sampler);
541 }
542
543
544 /* end of rps-sampler_common.c */