-split up sampler and sampler element
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31 #include "gnunet-service-rps_sampler_elem.h"
32
33 #include <math.h>
34 #include <inttypes.h>
35
36 #include "rps-test_util.h"
37
38 #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
39
40
41 // multiple 'clients'?
42
43 // TODO check for overflows
44
45 // TODO align message structs
46
47 // hist_size_init, hist_size_max
48
49 /***********************************************************************
50  * WARNING: This section needs to be reviewed regarding the use of
51  * functions providing (pseudo)randomness!
52 ***********************************************************************/
53
54 // TODO care about invalid input of the caller (size 0 or less...)
55
56 /**
57  * Callback that is called from _get_rand_peer() when the PeerID is ready.
58  *
59  * @param cls the closure given alongside this function.
60  * @param id the PeerID that was returned
61  */
62 typedef void
63 (*RPS_sampler_rand_peer_ready_cont) (void *cls,
64                                      const struct GNUNET_PeerIdentity *id);
65
66
67 /**
68  * Closure for #sampler_mod_get_rand_peer() and #sampler_get_rand_peer
69  */
70 struct GetPeerCls
71 {
72   /**
73    * DLL
74    */
75   struct GetPeerCls *next;
76
77   /**
78    * DLL
79    */
80   struct GetPeerCls *prev;
81
82   /**
83    * The sampler this function operates on.
84    */
85   struct RPS_Sampler *sampler;
86
87   /**
88    * The task for this function.
89    */
90   struct GNUNET_SCHEDULER_Task *get_peer_task;
91
92   /**
93    * The callback
94    */
95   RPS_sampler_rand_peer_ready_cont cont;
96
97   /**
98    * The closure to the callback @e cont
99    */
100   void *cont_cls;
101
102   /**
103    * The address of the id to be stored at
104    */
105   struct GNUNET_PeerIdentity *id;
106 };
107
108
109 /**
110  * Type of function used to differentiate between modified and not modified
111  * Sampler.
112  */
113 typedef void
114 (*RPS_get_peers_type) (void *cls,
115                        const struct GNUNET_SCHEDULER_TaskContext *tc);
116
117 /**
118  * Get one random peer out of the sampled peers.
119  *
120  * We might want to reinitialise this sampler after giving the
121  * corrsponding peer to the client.
122  * Only used internally
123  */
124 static void
125 sampler_get_rand_peer (void *cls,
126                         const struct GNUNET_SCHEDULER_TaskContext *tc);
127
128 /**
129  * Get one random peer out of the sampled peers.
130  *
131  * We might want to reinitialise this sampler after giving the
132  * corrsponding peer to the client.
133  */
134 static void
135 sampler_mod_get_rand_peer (void *cls,
136                        const struct GNUNET_SCHEDULER_TaskContext *tc);
137
138
139 /**
140  * Sampler with its own array of SamplerElements
141  */
142 struct RPS_Sampler
143 {
144   /**
145    * Number of sampler elements we hold.
146    */
147   unsigned int sampler_size;
148   //size_t size;
149
150   /**
151    * All sampler elements in one array.
152    */
153   struct RPS_SamplerElement **sampler_elements;
154
155   /**
156    * Maximum time a round takes
157    *
158    * Used in the context of RPS
159    */
160   struct GNUNET_TIME_Relative max_round_interval;
161
162   /**
163    * Stores the function to return peers. Which one it is depends on whether
164    * the Sampler is the modified one or not.
165    */
166   RPS_get_peers_type get_peers;
167
168   /**
169    * Head for the DLL to store the closures to pending requests.
170    */
171   struct GetPeerCls *gpc_head;
172
173   /**
174    * Tail for the DLL to store the closures to pending requests.
175    */
176   struct GetPeerCls *gpc_tail;
177
178   #ifdef TO_FILE
179   /**
180    * File name to log to
181    */
182   char *file_name;
183   #endif /* TO_FILE */
184 };
185
186 /**
187  * Closure to _get_n_rand_peers_ready_cb()
188  */
189 struct NRandPeersReadyCls
190 {
191   /**
192    * Number of peers we are waiting for.
193    */
194   uint32_t num_peers;
195
196   /**
197    * Number of peers we currently have.
198    */
199   uint32_t cur_num_peers;
200
201   /**
202    * Pointer to the array holding the ids.
203    */
204   struct GNUNET_PeerIdentity *ids;
205
206   /**
207    * Callback to be called when all ids are available.
208    */
209   RPS_sampler_n_rand_peers_ready_cb callback;
210
211   /**
212    * Closure given to the callback
213    */
214   void *cls;
215 };
216
217 ///**
218 // * Global sampler variable.
219 // */
220 //struct RPS_Sampler *sampler;
221
222
223 /**
224  * The minimal size for the extended sampler elements.
225  */
226 static size_t min_size;
227
228 /**
229  * The maximal size the extended sampler elements should grow to.
230  */
231 static size_t max_size;
232
233 /**
234  * The size the extended sampler elements currently have.
235  */
236 //static size_t extra_size;
237
238 /**
239  * Inedex to the sampler element that is the next to be returned
240  */
241 static uint32_t client_get_index;
242
243
244 /**
245  * Callback to _get_rand_peer() used by _get_n_rand_peers().
246  *
247  * Checks whether all n peers are available. If they are,
248  * give those back.
249  */
250 static void
251 check_n_peers_ready (void *cls,
252     const struct GNUNET_PeerIdentity *id)
253 {
254   struct NRandPeersReadyCls *n_peers_cls = cls;
255
256   n_peers_cls->cur_num_peers++;
257   LOG (GNUNET_ERROR_TYPE_DEBUG,
258       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
259       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
260
261   if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
262   { /* All peers are ready -- return those to the client */
263     GNUNET_assert (NULL != n_peers_cls->callback);
264
265     LOG (GNUNET_ERROR_TYPE_DEBUG,
266         "returning %" PRIX32 " peers to the client\n",
267         n_peers_cls->num_peers);
268     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
269
270     GNUNET_free (n_peers_cls);
271   }
272 }
273
274
275 /**
276  * Get the size of the sampler.
277  *
278  * @param sampler the sampler to return the size of.
279  * @return the size of the sampler
280  */
281 unsigned int
282 RPS_sampler_get_size (struct RPS_Sampler *sampler)
283 {
284   return sampler->sampler_size;
285 }
286
287
288 /**
289  * Grow or shrink the size of the sampler.
290  *
291  * @param sampler the sampler to resize.
292  * @param new_size the new size of the sampler
293  */
294 static void
295 sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
296 {
297   unsigned int old_size;
298   uint32_t i;
299
300   // TODO check min and max size
301
302   old_size = sampler->sampler_size;
303
304   if (old_size > new_size)
305   { /* Shrinking */
306
307     LOG (GNUNET_ERROR_TYPE_DEBUG,
308          "Shrinking sampler %d -> %d\n",
309          old_size,
310          new_size);
311
312     to_file (sampler->file_name,
313          "Shrinking sampler %d -> %d",
314          old_size,
315          new_size);
316
317     for (i = new_size ; i < old_size ; i++)
318     {
319       to_file (sampler->file_name,
320                "-%" PRIu32 ": %s",
321                i,
322                sampler->sampler_elements[i]->file_name);
323     }
324
325     GNUNET_array_grow (sampler->sampler_elements,
326                        sampler->sampler_size,
327                        new_size);
328     LOG (GNUNET_ERROR_TYPE_DEBUG,
329          "sampler->sampler_elements now points to %p\n",
330          sampler->sampler_elements);
331
332   }
333   else if (old_size < new_size)
334   { /* Growing */
335     LOG (GNUNET_ERROR_TYPE_DEBUG,
336          "Growing sampler %d -> %d\n",
337          old_size,
338          new_size);
339
340     to_file (sampler->file_name,
341          "Growing sampler %d -> %d",
342          old_size,
343          new_size);
344
345     GNUNET_array_grow (sampler->sampler_elements,
346         sampler->sampler_size,
347         new_size);
348
349     for (i = old_size ; i < new_size ; i++)
350     { /* Add new sampler elements */
351       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
352
353       to_file (sampler->file_name,
354                "+%" PRIu32 ": %s",
355                i,
356                sampler->sampler_elements[i]->file_name);
357     }
358   }
359   else
360   {
361     LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
362     return;
363   }
364
365   GNUNET_assert (sampler->sampler_size == new_size);
366 }
367
368
369 /**
370  * Grow or shrink the size of the sampler.
371  *
372  * @param sampler the sampler to resize.
373  * @param new_size the new size of the sampler
374  */
375 void
376 RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
377 {
378   GNUNET_assert (0 < new_size);
379   sampler_resize (sampler, new_size);
380 }
381
382
383 /**
384  * Empty the sampler.
385  *
386  * @param sampler the sampler to empty.
387  * @param new_size the new size of the sampler
388  */
389 static void
390 sampler_empty (struct RPS_Sampler *sampler)
391 {
392   sampler_resize (sampler, 0);
393 }
394
395
396 /**
397  * Initialise a tuple of sampler elements.
398  *
399  * @param init_size the size the sampler is initialised with
400  * @param max_round_interval maximum time a round takes
401  * @return a handle to a sampler that consists of sampler elements.
402  */
403 struct RPS_Sampler *
404 RPS_sampler_init (size_t init_size,
405                   struct GNUNET_TIME_Relative max_round_interval)
406 {
407   struct RPS_Sampler *sampler;
408
409   /* Initialise context around extended sampler */
410   min_size = 10; // TODO make input to _samplers_init()
411   max_size = 1000; // TODO make input to _samplers_init()
412
413   sampler = GNUNET_new (struct RPS_Sampler);
414
415   #ifdef TO_FILE
416   sampler->file_name = create_file ("sampler-");
417
418   LOG (GNUNET_ERROR_TYPE_DEBUG,
419        "Initialised sampler %s\n",
420        sampler->file_name);
421   #endif /* TO_FILE */
422
423   sampler->sampler_size = 0;
424   sampler->sampler_elements = NULL;
425   sampler->max_round_interval = max_round_interval;
426   sampler->get_peers = sampler_get_rand_peer;
427   sampler->gpc_head = NULL;
428   sampler->gpc_tail = NULL;
429   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
430   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
431   RPS_sampler_resize (sampler, init_size);
432
433   client_get_index = 0;
434
435   //GNUNET_assert (init_size == sampler->sampler_size);
436   return sampler;
437 }
438
439 /**
440  * Initialise a modified tuple of sampler elements.
441  *
442  * @param init_size the size the sampler is initialised with
443  * @param max_round_interval maximum time a round takes
444  * @return a handle to a sampler that consists of sampler elements.
445  */
446 struct RPS_Sampler *
447 RPS_sampler_mod_init (size_t init_size,
448                       struct GNUNET_TIME_Relative max_round_interval)
449 {
450   struct RPS_Sampler *sampler;
451
452   sampler = RPS_sampler_init (init_size, max_round_interval);
453   sampler->get_peers = sampler_mod_get_rand_peer;
454
455   LOG (GNUNET_ERROR_TYPE_DEBUG,
456        "Initialised modified sampler %s\n",
457        sampler->file_name);
458   to_file (sampler->file_name,
459            "This is a modified sampler");
460
461   return sampler;
462 }
463
464
465 /**
466  * A fuction to update every sampler in the given list
467  *
468  * @param sampler the sampler to update.
469  * @param id the PeerID that is put in the sampler
470  */
471   void
472 RPS_sampler_update (struct RPS_Sampler *sampler,
473                     const struct GNUNET_PeerIdentity *id)
474 {
475   uint32_t i;
476
477   to_file (sampler->file_name,
478            "Got %s",
479            GNUNET_i2s_full (id));
480
481   for (i = 0 ; i < sampler->sampler_size ; i++)
482   {
483     RPS_sampler_elem_next (sampler->sampler_elements[i],
484                            id);
485   }
486
487 }
488
489
490 /**
491  * Reinitialise all previously initialised sampler elements with the given value.
492  *
493  * Used to get rid of a PeerID.
494  *
495  * @param sampler the sampler to reinitialise a sampler element in.
496  * @param id the id of the sampler elements to update.
497  */
498   void
499 RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
500                                    const struct GNUNET_PeerIdentity *id)
501 {
502   uint32_t i;
503   struct RPS_SamplerElement *trash_entry;
504
505   for ( i = 0 ; i < sampler->sampler_size ; i++ )
506   {
507     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
508     {
509       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
510       trash_entry = GNUNET_new (struct RPS_SamplerElement);
511       *trash_entry = *(sampler->sampler_elements[i]);
512       to_file (trash_entry->file_name,
513                "--- non-active");
514       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
515     }
516   }
517 }
518
519
520 /**
521  * Get one random peer out of the sampled peers.
522  *
523  * We might want to reinitialise this sampler after giving the
524  * corrsponding peer to the client.
525  * Only used internally
526  */
527 static void
528 sampler_get_rand_peer (void *cls,
529                         const struct GNUNET_SCHEDULER_TaskContext *tc)
530 {
531   struct GetPeerCls *gpc = cls;
532   uint32_t r_index;
533
534   gpc->get_peer_task = NULL;
535   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
536     return;
537
538   /**;
539    * Choose the r_index of the peer we want to return
540    * at random from the interval of the gossip list
541    */
542   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
543       gpc->sampler->sampler_size);
544
545   if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty)
546   {
547     //LOG (GNUNET_ERROR_TYPE_DEBUG,
548     //     "Not returning randomly selected, empty PeerID. - Rescheduling.\n");
549
550     /* FIXME no active wait - get notified, when new id arrives?
551      * Might also be a freshly emptied one. Others might still contain ids.
552      * Counter?
553      */
554     gpc->get_peer_task =
555       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (
556                                         GNUNET_TIME_UNIT_SECONDS, 0.1),
557                                     &sampler_get_rand_peer,
558                                     cls);
559     return;
560   }
561
562   *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id;
563
564   gpc->cont (gpc->cont_cls, gpc->id);
565
566   GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
567                                gpc->sampler->gpc_tail,
568                                gpc);
569
570   GNUNET_free (gpc);
571 }
572
573
574 /**
575  * Get one random peer out of the sampled peers.
576  *
577  * We might want to reinitialise this sampler after giving the
578  * corrsponding peer to the client.
579  */
580 static void
581 sampler_mod_get_rand_peer (void *cls,
582                        const struct GNUNET_SCHEDULER_TaskContext *tc)
583 {
584   struct GetPeerCls *gpc = cls;
585   struct RPS_SamplerElement *s_elem;
586   struct GNUNET_TIME_Relative last_request_diff;
587
588   gpc->get_peer_task = NULL;
589   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
590     return;
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
593
594   /* Cycle the #client_get_index one step further */
595   client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size;
596
597   s_elem = gpc->sampler->sampler_elements[client_get_index];
598   *gpc->id = s_elem->peer_id;
599
600   if (NULL == s_elem)
601   {
602     LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n");
603     GNUNET_assert (NULL == gpc->get_peer_task);
604     gpc->get_peer_task =
605       GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
606                                     &sampler_mod_get_rand_peer,
607                                     cls);
608     return;
609   }
610
611   /* Check whether we may use this sampler to give it back to the client */
612   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
613   {
614     last_request_diff =
615       GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
616                                            GNUNET_TIME_absolute_get ());
617     /* We're not going to give it back now if it was
618      * already requested by a client this round */
619     if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us)
620     {
621       LOG (GNUNET_ERROR_TYPE_DEBUG,
622           "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
623       ///* How many time remains untile the next round has started? */
624       //inv_last_request_diff =
625       //  GNUNET_TIME_absolute_get_difference (last_request_diff,
626       //                                       sampler->max_round_interval);
627       // add a little delay
628       /* Schedule it one round later */
629       GNUNET_assert (NULL == gpc->get_peer_task);
630       gpc->get_peer_task =
631         GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
632                                       &sampler_mod_get_rand_peer,
633                                       cls);
634       return;
635     }
636     // TODO add other reasons to wait here
637   }
638
639   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
640
641   GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head,
642                                gpc->sampler->gpc_tail,
643                                gpc);
644   gpc->cont (gpc->cont_cls, gpc->id);
645   GNUNET_free (gpc);
646 }
647
648
649 /**
650  * Get n random peers out of the sampled peers.
651  *
652  * We might want to reinitialise this sampler after giving the
653  * corrsponding peer to the client.
654  * Random with or without consumption?
655  *
656  * @param sampler the sampler to get peers from.
657  * @param cb callback that will be called once the ids are ready.
658  * @param cls closure given to @a cb
659  * @param for_client #GNUNET_YES if result is used for client,
660  *                   #GNUNET_NO if used internally
661  * @param num_peers the number of peers requested
662  */
663   void
664 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
665                               RPS_sampler_n_rand_peers_ready_cb cb,
666                               void *cls, uint32_t num_peers)
667 {
668   GNUNET_assert (0 != sampler->sampler_size);
669   if (0 == num_peers)
670     return;
671
672   // TODO check if we have too much (distinct) sampled peers
673   uint32_t i;
674   struct NRandPeersReadyCls *cb_cls;
675   struct GetPeerCls *gpc;
676
677   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
678   cb_cls->num_peers = num_peers;
679   cb_cls->cur_num_peers = 0;
680   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
681   cb_cls->callback = cb;
682   cb_cls->cls = cls;
683
684   LOG (GNUNET_ERROR_TYPE_DEBUG,
685       "Scheduling requests for %" PRIu32 " peers\n", num_peers);
686
687   for (i = 0 ; i < num_peers ; i++)
688   {
689     gpc = GNUNET_new (struct GetPeerCls);
690     gpc->sampler = sampler;
691     gpc->cont = check_n_peers_ready;
692     gpc->cont_cls = cb_cls;
693     gpc->id = &cb_cls->ids[i];
694
695     // maybe add a little delay
696     gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc);
697
698     GNUNET_CONTAINER_DLL_insert (sampler->gpc_head,
699                                  sampler->gpc_tail,
700                                  gpc);
701   }
702 }
703
704
705 /**
706  * Counts how many Samplers currently hold a given PeerID.
707  *
708  * @param sampler the sampler to count ids in.
709  * @param id the PeerID to count.
710  *
711  * @return the number of occurrences of id.
712  */
713   uint32_t
714 RPS_sampler_count_id (struct RPS_Sampler *sampler,
715                       const struct GNUNET_PeerIdentity *id)
716 {
717   uint32_t count;
718   uint32_t i;
719
720   count = 0;
721   for ( i = 0 ; i < sampler->sampler_size ; i++ )
722   {
723     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
724         && EMPTY != sampler->sampler_elements[i]->is_empty)
725       count++;
726   }
727   return count;
728 }
729
730
731 /**
732  * Cleans the sampler.
733  */
734   void
735 RPS_sampler_destroy (struct RPS_Sampler *sampler)
736 {
737   struct GetPeerCls *i;
738
739   for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head)
740   {
741     GNUNET_CONTAINER_DLL_remove (sampler->gpc_head,
742                                  sampler->gpc_tail,
743                                  i);
744     GNUNET_SCHEDULER_cancel (i->get_peer_task);
745     GNUNET_free (i);
746   }
747
748   sampler_empty (sampler);
749   GNUNET_free (sampler);
750 }
751
752 /* end of gnunet-service-rps.c */