-improve UDP logging
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107 /**
108  * Sampler with its own array of SamplerElements
109  */
110 struct RPS_Sampler
111 {
112   /**
113    * Number of sampler elements we hold.
114    */
115   unsigned int sampler_size;
116   //size_t size;
117
118   /**
119    * All Samplers in one array.
120    */
121   struct RPS_SamplerElement **sampler_elements;
122
123   /**
124    * Max time a round takes
125    *
126    * Used in the context of RPS
127    */
128   struct GNUNET_TIME_Relative max_round_interval;
129
130   /**
131    * Callback to be called when a peer gets inserted into a sampler.
132    */
133   RPS_sampler_insert_cb insert_cb;
134
135   /**
136    * Closure to the insert_cb.
137    */
138   void *insert_cls;
139
140   /**
141    * Callback to be called when a peer gets inserted into a sampler.
142    */
143   RPS_sampler_remove_cb remove_cb;
144
145   /**
146    * Closure to the remove_cb.
147    */
148   void *remove_cls;
149 };
150
151 /**
152  * Closure to _get_n_rand_peers_ready_cb()
153  */
154 struct NRandPeersReadyCls
155 {
156   /**
157    * Number of peers we are waiting for.
158    */
159   uint32_t num_peers;
160
161   /**
162    * Number of peers we currently have.
163    */
164   uint32_t cur_num_peers;
165
166   /**
167    * Pointer to the array holding the ids.
168    */
169   struct GNUNET_PeerIdentity *ids;
170
171   /**
172    * Callback to be called when all ids are available.
173    */
174   RPS_sampler_n_rand_peers_ready_cb callback;
175
176   /**
177    * Closure given to the callback
178    */
179   void *cls;
180 };
181
182 /**
183  * Callback that is called from _get_rand_peer() when the PeerID is ready.
184  *
185  * @param cls the closure given alongside this function.
186  * @param id the PeerID that was returned
187  */
188 typedef void
189 (*RPS_sampler_rand_peer_ready_cont) (void *cls,
190         const struct GNUNET_PeerIdentity *id);
191
192 /**
193  * Closure to #RPS_sampler_get_rand_peer()
194  */
195 struct GetPeerCls
196 {
197   /** DLL */
198   struct GetPeerCls *next;
199   struct GetPeerCls *prev;
200
201   /**
202    * The sampler this function operates on.
203    */
204   struct RPS_Sampler *sampler;
205
206   /**
207    * The task for this function.
208    */
209   struct GNUNET_SCHEDULER_Task *get_peer_task;
210
211   /**
212    * The callback
213    */
214   RPS_sampler_rand_peer_ready_cont cont;
215
216   /**
217    * The closure to the callback
218    */
219   void *cont_cls;
220
221   /**
222    * The address of the id to be stored at
223    */
224   struct GNUNET_PeerIdentity *id;
225 };
226
227
228 ///**
229 // * Global sampler variable.
230 // */
231 //struct RPS_Sampler *sampler;
232
233
234 /**
235  * The minimal size for the extended sampler elements.
236  */
237 static size_t min_size;
238
239 /**
240  * The maximal size the extended sampler elements should grow to.
241  */
242 static size_t max_size;
243
244 /**
245  * The size the extended sampler elements currently have.
246  */
247 //static size_t extra_size;
248
249 /**
250  * Inedex to the sampler element that is the next to be returned
251  */
252 static uint32_t client_get_index;
253
254
255 /** FIXME document */
256 struct GetPeerCls *gpc_head;
257 struct GetPeerCls *gpc_tail;
258
259
260 /**
261  * Callback to _get_rand_peer() used by _get_n_rand_peers().
262  *
263  * Checks whether all n peers are available. If they are,
264  * give those back.
265  */
266   void
267 check_n_peers_ready (void *cls,
268     const struct GNUNET_PeerIdentity *id)
269 {
270   struct NRandPeersReadyCls *n_peers_cls;
271
272   n_peers_cls = (struct NRandPeersReadyCls *) cls;
273
274   n_peers_cls->cur_num_peers++;
275   LOG (GNUNET_ERROR_TYPE_DEBUG,
276       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
277       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
278
279   if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
280   { /* All peers are ready -- return those to the client */
281     GNUNET_assert (NULL != n_peers_cls->callback);
282
283     LOG (GNUNET_ERROR_TYPE_DEBUG,
284         "returning %" PRIX32 " peers to the client\n",
285         n_peers_cls->num_peers);
286     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
287
288     GNUNET_free (n_peers_cls);
289   }
290 }
291
292
293 /**
294  * Reinitialise a previously initialised sampler element.
295  *
296  * @param sampler pointer to the memory that keeps the value.
297  */
298   static void
299 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
300 {
301   sampler_el->is_empty = EMPTY;
302
303   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
304   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
305                              &(sampler_el->auth_key.key),
306                              GNUNET_CRYPTO_HASH_LENGTH);
307
308   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
309
310   sampler_el->birth = GNUNET_TIME_absolute_get ();
311   sampler_el->num_peers = 0;
312   sampler_el->num_change = 0;
313 }
314
315
316 /**
317  * (Re)Initialise given Sampler with random min-wise independent function.
318  *
319  * In this implementation this means choosing an auth_key for later use in
320  * a hmac at random.
321  *
322  * @return a newly created RPS_SamplerElement which currently holds no id.
323  */
324   struct RPS_SamplerElement *
325 RPS_sampler_elem_create (void)
326 {
327   struct RPS_SamplerElement *s;
328
329   s = GNUNET_new (struct RPS_SamplerElement);
330
331   RPS_sampler_elem_reinit (s);
332   LOG (GNUNET_ERROR_TYPE_DEBUG, "initialised with empty PeerID\n");
333
334   return s;
335 }
336
337
338 /**
339  * Input an PeerID into the given sampler element.
340  *
341  * @param sampler the sampler the @a s_elem belongs to.
342  *                Needed to know the 
343  */
344 static void
345 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem,
346                        struct RPS_Sampler *sampler,
347                        const struct GNUNET_PeerIdentity *other,
348                        RPS_sampler_insert_cb insert_cb, void *insert_cls,
349                        RPS_sampler_remove_cb remove_cb, void *remove_cls)
350 {
351   struct GNUNET_HashCode other_hash;
352
353   s_elem->num_peers++;
354
355   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
356   {
357     LOG (GNUNET_ERROR_TYPE_DEBUG, "         Got PeerID %s\n",
358         GNUNET_i2s (other));
359     LOG (GNUNET_ERROR_TYPE_DEBUG, "Have already PeerID %s\n",
360         GNUNET_i2s (&(s_elem->peer_id)));
361   }
362   else
363   {
364     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
365         other,
366         sizeof(struct GNUNET_PeerIdentity),
367         &other_hash);
368
369     if ( EMPTY == s_elem->is_empty )
370     {
371       LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerID %s; Simply accepting (was empty previously).\n",
372           GNUNET_i2s(other));
373       s_elem->peer_id = *other;
374       s_elem->peer_id_hash = other_hash;
375
376       if (NULL != insert_cb)
377         insert_cb (insert_cls, sampler, &(s_elem->peer_id));
378
379       s_elem->num_change++;
380     }
381     else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
382     {
383       LOG (GNUNET_ERROR_TYPE_DEBUG, "           Got PeerID %s\n",
384           GNUNET_i2s (other));
385       LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
386           GNUNET_i2s (&s_elem->peer_id));
387
388       if ( NULL != remove_cb )
389       {
390         LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n",
391             GNUNET_i2s (&s_elem->peer_id));
392         remove_cb (remove_cls, sampler, &s_elem->peer_id);
393       }
394
395       s_elem->peer_id = *other;
396       s_elem->peer_id_hash = other_hash;
397
398       if ( NULL != insert_cb )
399       {
400         LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n",
401             GNUNET_i2s (&s_elem->peer_id));
402         insert_cb (insert_cls, sampler, &s_elem->peer_id);
403       }
404
405       s_elem->num_change++;
406     }
407     else
408     {
409       LOG (GNUNET_ERROR_TYPE_DEBUG, "        Got PeerID %s\n",
410           GNUNET_i2s(other));
411       LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n",
412           GNUNET_i2s(&s_elem->peer_id));
413     }
414   }
415   s_elem->is_empty = NOT_EMPTY;
416 }
417
418
419 /**
420  * Get the size of the sampler.
421  *
422  * @param sampler the sampler to return the size of.
423  * @return the size of the sampler
424  */
425 unsigned int
426 RPS_sampler_get_size (struct RPS_Sampler *sampler)
427 {
428   return sampler->sampler_size;
429 }
430
431
432 /**
433  * Grow or shrink the size of the sampler.
434  *
435  * @param sampler the sampler to resize.
436  * @param new_size the new size of the sampler
437  */
438 static void
439 sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
440 {
441   unsigned int old_size;
442   uint32_t i;
443   struct RPS_SamplerElement **rem_list;
444
445   // TODO check min and max size
446
447   old_size = sampler->sampler_size;
448
449   if (old_size > new_size)
450   { /* Shrinking */
451     /* Temporary store those to properly call the removeCB on those later */
452     rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
453     memcpy (rem_list,
454         &sampler->sampler_elements[new_size],
455         (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
456
457     LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size);
458     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
459     LOG (GNUNET_ERROR_TYPE_DEBUG,
460         "sampler->sampler_elements now points to %p\n",
461         sampler->sampler_elements);
462
463     for (i = 0 ; i < old_size - new_size ; i++)
464     {/* Remove unneeded rest */
465       LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i);
466       if (NULL != sampler->remove_cb)
467         sampler->remove_cb (sampler->remove_cls, sampler, &rem_list[i]->peer_id);
468       GNUNET_free (rem_list[i]);
469     }
470     GNUNET_free (rem_list);
471   }
472   else if (old_size < new_size)
473   { /* Growing */
474     LOG (GNUNET_ERROR_TYPE_DEBUG, "Growing sampler %d -> %d\n", old_size, new_size);
475     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
476     LOG (GNUNET_ERROR_TYPE_DEBUG,
477         "sampler->sampler_elements now points to %p\n",
478         sampler->sampler_elements);
479
480     for ( i = old_size ; i < new_size ; i++ )
481     { /* Add new sampler elements */
482       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
483       if (NULL != sampler->insert_cb)
484         sampler->insert_cb (sampler->insert_cls, sampler, &sampler->sampler_elements[i]->peer_id);
485       LOG (GNUNET_ERROR_TYPE_DEBUG,
486           "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
487           i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
488     }
489   }
490   else
491   {
492     LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
493     return;
494   }
495
496   GNUNET_assert (sampler->sampler_size == new_size);
497   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); // remove
498 }
499
500
501 /**
502  * Grow or shrink the size of the sampler.
503  *
504  * @param sampler the sampler to resize.
505  * @param new_size the new size of the sampler
506  */
507 void
508 RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
509 {
510   GNUNET_assert (0 < new_size);
511   sampler_resize (sampler, new_size);
512 }
513
514
515 /**
516  * Empty the sampler.
517  *
518  * @param sampler the sampler to empty.
519  * @param new_size the new size of the sampler
520  */
521 static void
522 sampler_empty (struct RPS_Sampler *sampler)
523 {
524   sampler_resize (sampler, 0);
525 }
526
527
528 /**
529  * Initialise a tuple of sampler elements.
530  *
531  * @param init_size the size the sampler is initialised with
532  * @param ins_cb the callback that will be called on every PeerID that is
533  *               newly inserted into a sampler element
534  * @param ins_cls the closure given to #ins_cb
535  * @param rem_cb the callback that will be called on every PeerID that is
536  *               removed from a sampler element
537  * @param rem_cls the closure given to #rem_cb
538  * @return a handle to a sampler that consists of sampler elements.
539  */
540 struct RPS_Sampler *
541 RPS_sampler_init (size_t init_size,
542     struct GNUNET_TIME_Relative max_round_interval,
543     RPS_sampler_insert_cb ins_cb, void *ins_cls,
544     RPS_sampler_remove_cb rem_cb, void *rem_cls)
545 {
546   struct RPS_Sampler *sampler;
547   //uint32_t i;
548
549   /* Initialise context around extended sampler */
550   min_size = 10; // TODO make input to _samplers_init()
551   max_size = 1000; // TODO make input to _samplers_init()
552
553   sampler = GNUNET_new (struct RPS_Sampler);
554   sampler->sampler_size = 0;
555   sampler->sampler_elements = NULL;
556   sampler->max_round_interval = max_round_interval;
557   sampler->insert_cb = ins_cb;
558   sampler->insert_cls = ins_cls;
559   sampler->remove_cb = rem_cb;
560   sampler->remove_cls = rem_cls;
561   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
562   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
563   RPS_sampler_resize (sampler, init_size);
564
565   client_get_index = 0;
566
567   //GNUNET_assert (init_size == sampler->sampler_size);
568   return sampler;
569 }
570
571
572 /**
573  * A fuction to update every sampler in the given list
574  *
575  * @param sampler the sampler to update.
576  * @param id the PeerID that is put in the sampler
577  */
578   void
579 RPS_sampler_update (struct RPS_Sampler *sampler,
580                     const struct GNUNET_PeerIdentity *id)
581 {
582   uint32_t i;
583
584   for ( i = 0 ; i < sampler->sampler_size ; i++ )
585     RPS_sampler_elem_next (sampler->sampler_elements[i],
586         sampler, id,
587         sampler->insert_cb, sampler->insert_cls,
588         sampler->remove_cb, sampler->remove_cls);
589 }
590
591
592 /**
593  * Reinitialise all previously initialised sampler elements with the given value.
594  *
595  * Used to get rid of a PeerID.
596  *
597  * @param sampler the sampler to reinitialise a sampler element in.
598  * @param id the id of the sampler elements to update.
599  */
600   void
601 RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
602                                    const struct GNUNET_PeerIdentity *id)
603 {
604   uint32_t i;
605
606   for ( i = 0 ; i < sampler->sampler_size ; i++ )
607   {
608     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
609     {
610       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
611       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
612     }
613   }
614 }
615
616
617 /**
618  * Get one random peer out of the sampled peers.
619  *
620  * We might want to reinitialise this sampler after giving the
621  * corrsponding peer to the client.
622  * Only used internally
623  */
624 static void
625 sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
626 {
627   struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
628   uint32_t r_index;
629
630   gpc->get_peer_task = NULL;
631   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
632   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
633     return;
634
635   /**;
636    * Choose the r_index of the peer we want to return
637    * at random from the interval of the gossip list
638    */
639   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
640       gpc->sampler->sampler_size);
641
642   if ( EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty )
643   {
644     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
645                                                                    GNUNET_TIME_UNIT_SECONDS,
646                                                                    .1),
647                                                        &sampler_get_rand_peer2,
648                                                        cls);
649     return;
650   }
651
652   *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id;
653
654   gpc->cont (gpc->cont_cls, gpc->id);
655   GNUNET_free (gpc);
656 }
657
658
659 /**
660  * Get one random peer out of the sampled peers.
661  *
662  * We might want to reinitialise this sampler after giving the
663  * corrsponding peer to the client.
664  */
665 static void
666 sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
667 {
668   struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
669   struct GNUNET_PeerIdentity tmp_id;
670   struct RPS_SamplerElement *s_elem;
671   struct GNUNET_TIME_Relative last_request_diff;
672   uint32_t tmp_client_get_index;
673
674   gpc->get_peer_task = NULL;
675   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
676   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
677     return;
678
679   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
680
681
682   /* Store the next #client_get_index to check whether we cycled over the whole list */
683   if (0 < client_get_index)
684     tmp_client_get_index = client_get_index - 1;
685   else
686     tmp_client_get_index = gpc->sampler->sampler_size - 1;
687
688   LOG (GNUNET_ERROR_TYPE_DEBUG,
689       "sched for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
690       tmp_client_get_index, gpc->sampler->sampler_size);
691
692   do
693   { /* Get first non empty sampler */
694     if (tmp_client_get_index == client_get_index)
695     {
696       LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n",
697            client_get_index);
698       GNUNET_assert (NULL == gpc->get_peer_task);
699       gpc->get_peer_task =
700         GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
701                                       &sampler_get_rand_peer, cls);
702       return;
703     }
704
705     tmp_id = gpc->sampler->sampler_elements[client_get_index]->peer_id;
706     RPS_sampler_elem_reinit (gpc->sampler->sampler_elements[client_get_index]);
707     RPS_sampler_elem_next (gpc->sampler->sampler_elements[client_get_index],
708                            gpc->sampler, &tmp_id, NULL, NULL, NULL, NULL);
709
710     /* Cycle the #client_get_index one step further */
711     if ( client_get_index == gpc->sampler->sampler_size - 1 )
712       client_get_index = 0;
713     else
714       client_get_index++;
715
716     LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n",
717          client_get_index);
718   } while (EMPTY == gpc->sampler->sampler_elements[client_get_index]->is_empty);
719
720   s_elem = gpc->sampler->sampler_elements[client_get_index];
721   *gpc->id = s_elem->peer_id;
722
723   /* Check whether we may use this sampler to give it back to the client */
724   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
725   {
726     last_request_diff =
727       GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
728                                            GNUNET_TIME_absolute_get ());
729     /* We're not going to give it back now if it was
730      * already requested by a client this round */
731     if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us)
732     {
733       LOG (GNUNET_ERROR_TYPE_DEBUG,
734           "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
735       ///* How many time remains untile the next round has started? */
736       //inv_last_request_diff =
737       //  GNUNET_TIME_absolute_get_difference (last_request_diff,
738       //                                       sampler->max_round_interval);
739       // add a little delay
740       /* Schedule it one round later */
741       GNUNET_assert (NULL == gpc->get_peer_task);
742       gpc->get_peer_task =
743         GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
744                                       &sampler_get_rand_peer, cls);
745       return;
746     }
747     // TODO add other reasons to wait here
748   }
749
750   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
751
752   gpc->cont (gpc->cont_cls, gpc->id);
753   GNUNET_free (gpc);
754 }
755
756
757 /**
758  * Get n random peers out of the sampled peers.
759  *
760  * We might want to reinitialise this sampler after giving the
761  * corrsponding peer to the client.
762  * Random with or without consumption?
763  *
764  * @param sampler the sampler to get peers from.
765  * @param cb callback that will be called once the ids are ready.
766  * @param cls closure given to @a cb
767  * @param for_client #GNUNET_YES if result is used for client,
768  *                   #GNUNET_NO if used internally
769  * @param num_peers the number of peers requested
770  */
771   void
772 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
773                               RPS_sampler_n_rand_peers_ready_cb cb,
774                               void *cls, uint32_t num_peers, int for_client)
775 {
776   GNUNET_assert (0 != sampler->sampler_size);
777
778   // TODO check if we have too much (distinct) sampled peers
779   uint32_t i;
780   struct NRandPeersReadyCls *cb_cls;
781   struct GetPeerCls *gpc;
782
783   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
784   cb_cls->num_peers = num_peers;
785   cb_cls->cur_num_peers = 0;
786   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
787   cb_cls->callback = cb;
788   cb_cls->cls = cls;
789
790   LOG (GNUNET_ERROR_TYPE_DEBUG,
791       "Scheduling requests for %" PRIX32 " peers\n", num_peers);
792
793   for ( i = 0 ; i < num_peers ; i++ )
794   {
795     gpc = GNUNET_new (struct GetPeerCls);
796     gpc->sampler = sampler;
797     gpc->cont = check_n_peers_ready;
798     gpc->cont_cls = cb_cls;
799     gpc->id = &cb_cls->ids[i];
800
801     // maybe add a little delay
802     if (GNUNET_YES == for_client)
803       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer, gpc);
804     else if (GNUNET_NO == for_client)
805       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer2, gpc);
806     else
807       GNUNET_abort ();
808
809     GNUNET_CONTAINER_DLL_insert (gpc_head, gpc_tail, gpc);
810   }
811 }
812
813
814 /**
815  * Counts how many Samplers currently hold a given PeerID.
816  *
817  * @param sampler the sampler to count ids in.
818  * @param id the PeerID to count.
819  *
820  * @return the number of occurrences of id.
821  */
822   uint32_t
823 RPS_sampler_count_id (struct RPS_Sampler *sampler,
824                       const struct GNUNET_PeerIdentity *id)
825 {
826   uint32_t count;
827   uint32_t i;
828
829   count = 0;
830   for ( i = 0 ; i < sampler->sampler_size ; i++ )
831   {
832     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
833         && EMPTY != sampler->sampler_elements[i]->is_empty)
834       count++;
835   }
836   return count;
837 }
838
839
840 /**
841  * Cleans the sampler.
842  */
843   void
844 RPS_sampler_destroy (struct RPS_Sampler *sampler)
845 {
846   struct GetPeerCls *i;
847
848   for (i = gpc_head; NULL != i; i = gpc_head)
849   {
850     GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, i);
851     GNUNET_SCHEDULER_cancel (i->get_peer_task);
852     GNUNET_free (i);
853   }
854
855   sampler_empty (sampler);
856   GNUNET_free (sampler);
857 }
858
859 /* end of gnunet-service-rps.c */