- never resize to 0
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107 /**
108  * Sampler with its own array of SamplerElements
109  */
110 struct RPS_Sampler
111 {
112   /**
113    * Number of sampler elements we hold.
114    */
115   unsigned int sampler_size;
116   //size_t size;
117
118   /**
119    * All Samplers in one array.
120    */
121   struct RPS_SamplerElement **sampler_elements;
122
123   /**
124    * Max time a round takes
125    *
126    * Used in the context of RPS
127    */
128   struct GNUNET_TIME_Relative max_round_interval;
129
130   /**
131    * Callback to be called when a peer gets inserted into a sampler.
132    */
133   RPS_sampler_insert_cb insert_cb;
134
135   /**
136    * Closure to the insert_cb.
137    */
138   void *insert_cls;
139
140   /**
141    * Callback to be called when a peer gets inserted into a sampler.
142    */
143   RPS_sampler_remove_cb remove_cb;
144
145   /**
146    * Closure to the remove_cb.
147    */
148   void *remove_cls;
149 };
150
151 /**
152  * Closure to _get_n_rand_peers_ready_cb()
153  */
154 struct NRandPeersReadyCls
155 {
156   /**
157    * Number of peers we are waiting for.
158    */
159   uint32_t num_peers;
160
161   /**
162    * Number of peers we currently have.
163    */
164   uint32_t cur_num_peers;
165
166   /**
167    * Pointer to the array holding the ids.
168    */
169   struct GNUNET_PeerIdentity *ids;
170
171   /**
172    * Callback to be called when all ids are available.
173    */
174   RPS_sampler_n_rand_peers_ready_cb callback;
175
176   /**
177    * Closure given to the callback
178    */
179   void *cls;
180 };
181
182 /**
183  * Callback that is called from _get_rand_peer() when the PeerID is ready.
184  *
185  * @param cls the closure given alongside this function.
186  * @param id the PeerID that was returned
187  */
188 typedef void
189 (*RPS_sampler_rand_peer_ready_cb) (void *cls,
190         const struct GNUNET_PeerIdentity *id);
191
192 /**
193  * Closure to #RPS_sampler_get_rand_peer()
194  */
195 struct GetPeerCls
196 {
197   /**
198    * The task for this function.
199    */
200   struct GNUNET_SCHEDULER_Task *get_peer_task;
201
202   /**
203    * The callback
204    */
205   RPS_sampler_rand_peer_ready_cb cb;
206
207   /**
208    * The closure to the callback
209    */
210   void *cb_cls;
211
212   /**
213    * The address of the id to be stored at
214    */
215   struct GNUNET_PeerIdentity *id;
216 };
217
218 /**
219  * Multihashmap that keeps track of all get_peer_tasks that are still scheduled.
220  */
221 struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks;
222
223
224 /**
225  * Global sampler variable.
226  */
227 struct RPS_Sampler *sampler;
228
229
230 /**
231  * The minimal size for the extended sampler elements.
232  */
233 static size_t min_size;
234
235 /**
236  * The maximal size the extended sampler elements should grow to.
237  */
238 static size_t max_size;
239
240 /**
241  * The size the extended sampler elements currently have.
242  */
243 //static size_t extra_size;
244
245 /**
246  * Inedex to the sampler element that is the next to be returned
247  */
248 static uint32_t client_get_index;
249
250
251 /**
252  * Callback to _get_rand_peer() used by _get_n_rand_peers().
253  *
254  * Checks whether all n peers are available. If they are,
255  * give those back.
256  */
257   void
258 check_n_peers_ready (void *cls,
259     const struct GNUNET_PeerIdentity *id)
260 {
261   struct NRandPeersReadyCls *n_peers_cls;
262
263   n_peers_cls = (struct NRandPeersReadyCls *) cls;
264
265   LOG (GNUNET_ERROR_TYPE_DEBUG,
266       "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
267       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
268
269   if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
270   { /* All peers are ready -- return those to the client */
271     GNUNET_assert (NULL != n_peers_cls->callback);
272
273     LOG (GNUNET_ERROR_TYPE_DEBUG,
274         "SAMPLER: returning %" PRIX32 " peers to the client\n",
275         n_peers_cls->num_peers);
276     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
277
278     GNUNET_free (n_peers_cls);
279   }
280 }
281
282
283 /**
284  * Reinitialise a previously initialised sampler element.
285  *
286  * @param sampler pointer to the memory that keeps the value.
287  */
288   static void
289 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
290 {
291   sampler_el->is_empty = EMPTY;
292
293   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
294   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
295                              &(sampler_el->auth_key.key),
296                              GNUNET_CRYPTO_HASH_LENGTH);
297
298   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
299
300   sampler_el->birth = GNUNET_TIME_absolute_get ();
301   sampler_el->num_peers = 0;
302   sampler_el->num_change = 0;
303 }
304
305
306 /**
307  * (Re)Initialise given Sampler with random min-wise independent function.
308  *
309  * In this implementation this means choosing an auth_key for later use in
310  * a hmac at random.
311  *
312  * @return a newly created RPS_SamplerElement which currently holds no id.
313  */
314   struct RPS_SamplerElement *
315 RPS_sampler_elem_create (void)
316 {
317   struct RPS_SamplerElement *s;
318
319   s = GNUNET_new (struct RPS_SamplerElement);
320
321   RPS_sampler_elem_reinit (s);
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
323
324   return s;
325 }
326
327
328 /**
329  * Input an PeerID into the given sampler.
330  */
331   static void
332 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other,
333     RPS_sampler_insert_cb insert_cb, void *insert_cls,
334     RPS_sampler_remove_cb remove_cb, void *remove_cls)
335 {
336   struct GNUNET_HashCode other_hash;
337
338   s_elem->num_peers++;
339
340   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
341   {
342     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
343         GNUNET_i2s (other));
344     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
345         GNUNET_i2s (&(s_elem->peer_id)));
346   }
347   else
348   {
349     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
350         other,
351         sizeof(struct GNUNET_PeerIdentity),
352         &other_hash);
353
354     if ( EMPTY == s_elem->is_empty )
355     {
356       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n",
357           GNUNET_i2s(other));
358       s_elem->peer_id = *other;
359       s_elem->peer_id_hash = other_hash;
360
361       if (NULL != sampler->insert_cb)
362         sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id));
363
364       s_elem->num_change++;
365     }
366     else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
367     {
368       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
369           GNUNET_i2s (other));
370       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
371           GNUNET_i2s (&s_elem->peer_id));
372
373       if ( NULL != sampler->remove_cb )
374       {
375         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
376             GNUNET_i2s (&s_elem->peer_id));
377         sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
378       }
379
380       s_elem->peer_id = *other;
381       s_elem->peer_id_hash = other_hash;
382
383       if ( NULL != sampler->insert_cb )
384       {
385         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
386             GNUNET_i2s (&s_elem->peer_id));
387         sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
388       }
389
390       s_elem->num_change++;
391     }
392     else
393     {
394       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
395           GNUNET_i2s(other));
396       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
397           GNUNET_i2s(&s_elem->peer_id));
398     }
399   }
400   s_elem->is_empty = NOT_EMPTY;
401 }
402
403 /**
404  * Get the size of the sampler.
405  *
406  * @return the size of the sampler
407  */
408 unsigned int
409 RPS_sampler_get_size ()
410 {
411   return sampler->sampler_size;
412 }
413
414
415 /**
416  * Grow or shrink the size of the sampler.
417  *
418  * @param new_size the new size of the sampler
419  */
420 static void
421 sampler_resize (unsigned int new_size)
422 {
423   unsigned int old_size;
424   uint32_t i;
425   struct RPS_SamplerElement **rem_list;
426
427   // TODO check min and max size
428
429   old_size = sampler->sampler_size;
430
431   if (old_size > new_size)
432   { /* Shrinking */
433     /* Temporary store those to properly call the removeCB on those later */
434     rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
435     memcpy (rem_list,
436         &sampler->sampler_elements[new_size],
437         (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
438
439     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size);
440     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
441     LOG (GNUNET_ERROR_TYPE_DEBUG,
442         "SAMPLER: sampler->sampler_elements now points to %p\n",
443         sampler->sampler_elements);
444
445     for (i = 0 ; i < old_size - new_size ; i++)
446     {/* Remove unneeded rest */
447       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX32 ". sampler\n", i);
448       if (NULL != sampler->remove_cb)
449         sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
450       GNUNET_free (rem_list[i]);
451     }
452     GNUNET_free (rem_list);
453   }
454   else if (old_size < new_size)
455   { /* Growing */
456     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size);
457     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
458     LOG (GNUNET_ERROR_TYPE_DEBUG,
459         "SAMPLER: sampler->sampler_elements now points to %p\n",
460         sampler->sampler_elements);
461
462     for ( i = old_size ; i < new_size ; i++ )
463     { /* Add new sampler elements */
464       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
465       if (NULL != sampler->insert_cb)
466         sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
467       LOG (GNUNET_ERROR_TYPE_DEBUG,
468           "SAMPLER: Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
469           i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
470     }
471   }
472   else
473   {
474     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
475     return;
476   }
477
478   GNUNET_assert (sampler->sampler_size == new_size);
479   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove
480 }
481
482
483 /**
484  * Grow or shrink the size of the sampler.
485  *
486  * @param new_size the new size of the sampler
487  */
488 void
489 RPS_sampler_resize (unsigned int new_size)
490 {
491   GNUNET_assert (0 < new_size);
492   sampler_resize (new_size);
493 }
494
495
496 /**
497  * Empty the sampler.
498  *
499  * @param new_size the new size of the sampler
500  */
501 void
502 RPS_sampler_empty ()
503 {
504   sampler_resize (0);
505 }
506
507
508 /**
509  * Initialise a tuple of sampler elements.
510  *
511  * @param init_size the size the sampler is initialised with
512  * @param ins_cb the callback that will be called on every PeerID that is
513  *               newly inserted into a sampler element
514  * @param ins_cls the closure given to #ins_cb
515  * @param rem_cb the callback that will be called on every PeerID that is
516  *               removed from a sampler element
517  * @param rem_cls the closure given to #rem_cb
518  */
519   void
520 RPS_sampler_init (size_t init_size,
521     struct GNUNET_TIME_Relative max_round_interval,
522     RPS_sampler_insert_cb ins_cb, void *ins_cls,
523     RPS_sampler_remove_cb rem_cb, void *rem_cls)
524 {
525   //struct RPS_Sampler *sampler;
526   //uint32_t i;
527
528   /* Initialise context around extended sampler */
529   min_size = 10; // TODO make input to _samplers_init()
530   max_size = 1000; // TODO make input to _samplers_init()
531
532   sampler = GNUNET_new (struct RPS_Sampler);
533   sampler->sampler_size = 0;
534   sampler->sampler_elements = NULL;
535   sampler->max_round_interval = max_round_interval;
536   sampler->insert_cb = ins_cb;
537   sampler->insert_cls = ins_cls;
538   sampler->remove_cb = rem_cb;
539   sampler->remove_cls = rem_cls;
540   get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
541   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
542   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
543   RPS_sampler_resize (init_size);
544
545   client_get_index = 0;
546
547   //GNUNET_assert (init_size == sampler->sampler_size);
548 }
549
550
551 /**
552  * A fuction to update every sampler in the given list
553  *
554  * @param id the PeerID that is put in the sampler
555  */
556   void
557 RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
558 {
559   uint32_t i;
560
561   for ( i = 0 ; i < sampler->sampler_size ; i++ )
562     RPS_sampler_elem_next (sampler->sampler_elements[i], id,
563         sampler->insert_cb, sampler->insert_cls,
564         sampler->remove_cb, sampler->remove_cls);
565 }
566
567
568 /**
569  * Reinitialise all previously initialised sampler elements with the given value.
570  *
571  * Used to get rid of a PeerID.
572  *
573  * @param id the id of the sampler elements to update.
574  */
575   void
576 RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
577 {
578   uint32_t i;
579
580   for ( i = 0 ; i < sampler->sampler_size ; i++ )
581   {
582     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
583     {
584       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
585       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
586     }
587   }
588 }
589
590
591 /**
592  * Get one random peer out of the sampled peers.
593  *
594  * We might want to reinitialise this sampler after giving the
595  * corrsponding peer to the client.
596  * Only used internally
597  */
598   void
599 RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
600 {
601   struct GetPeerCls *gpc;
602   uint32_t r_index;
603   struct GNUNET_HashCode *hash;
604
605   gpc = (struct GetPeerCls *) cls;
606
607   /**;
608    * Choose the r_index of the peer we want to return
609    * at random from the interval of the gossip list
610    */
611   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
612       sampler->sampler_size);
613
614   if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
615   {
616     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
617                                                                    GNUNET_TIME_UNIT_SECONDS,
618                                                                    .1),
619                                                        &RPS_sampler_get_rand_peer_,
620                                                        cls);
621     return;
622   }
623
624   *gpc->id = sampler->sampler_elements[r_index]->peer_id;
625
626   hash = GNUNET_new (struct GNUNET_HashCode);
627   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
628   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
629       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
630   GNUNET_free (gpc->get_peer_task);
631
632   gpc->cb (gpc->cb_cls, gpc->id);
633 }
634
635
636 /**
637  * Get one random peer out of the sampled peers.
638  *
639  * We might want to reinitialise this sampler after giving the
640  * corrsponding peer to the client.
641  *
642  * @return a random PeerID of the PeerIDs previously put into the sampler.
643  */
644   void
645 RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
646 {
647   struct GetPeerCls *gpc;
648   struct GNUNET_PeerIdentity tmp_id;
649   struct RPS_SamplerElement *s_elem;
650   struct GNUNET_TIME_Relative last_request_diff;
651   struct GNUNET_HashCode *hash;
652   uint32_t tmp_client_get_index;
653
654   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
655
656   gpc = (struct GetPeerCls *) cls;
657   hash = GNUNET_new (struct GNUNET_HashCode);
658
659   /* Store the next #client_get_index to check whether we cycled over the whole list */
660   if (0 < client_get_index)
661     tmp_client_get_index = client_get_index - 1;
662   else
663     tmp_client_get_index = sampler->sampler_size - 1;
664
665   LOG (GNUNET_ERROR_TYPE_DEBUG,
666       "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
667       tmp_client_get_index, sampler->sampler_size);
668
669   do
670   { /* Get first non empty sampler */
671     if (tmp_client_get_index == client_get_index)
672     {
673       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: reached tmp_index %" PRIX32 ".\n", client_get_index);
674       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
675                                                          &RPS_sampler_get_rand_peer,
676                                                          cls);
677       return;
678     }
679
680     tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
681     RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
682     RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
683                            NULL, NULL, NULL, NULL);
684
685     /* Cycle the #client_get_index one step further */
686     if ( client_get_index == sampler->sampler_size - 1 )
687       client_get_index = 0;
688     else
689       client_get_index++;
690
691     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
692   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
693
694   s_elem = sampler->sampler_elements[client_get_index];
695   *gpc->id = s_elem->peer_id;
696
697   /* Check whether we may use this sampler to give it back to the client */
698   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
699   {
700     last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
701                                                              GNUNET_TIME_absolute_get ());
702     /* We're not going to give it back now if it was already requested by a client this round */
703     if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
704     {
705       LOG (GNUNET_ERROR_TYPE_DEBUG,
706           "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
707       ///* How many time remains untile the next round has started? */
708       //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff,
709       //                                                             sampler->max_round_interval);
710       // add a little delay
711       /* Schedule it one round later */
712       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
713                                               &RPS_sampler_get_rand_peer,
714                                               cls);
715       return;
716     }
717     // TODO add other reasons to wait here
718   }
719
720   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
721   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
722       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
723   GNUNET_free (gpc->get_peer_task);
724
725   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
726
727   gpc->cb (gpc->cb_cls, gpc->id);
728 }
729
730
731 /**
732  * Get n random peers out of the sampled peers.
733  *
734  * We might want to reinitialise this sampler after giving the
735  * corrsponding peer to the client.
736  * Random with or without consumption?
737  *
738  * @param cb callback that will be called once the ids are ready.
739  * @param cls closure given to @a cb
740  * @param for_client #GNUNET_YES if result is used for client,
741  *                   #GNUNET_NO if used internally
742  * @param num_peers the number of peers requested
743  */
744   void
745 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
746     void *cls, uint32_t num_peers, int for_client)
747 {
748   GNUNET_assert (GNUNET_YES == for_client ||
749                  GNUNET_NO  == for_client);
750   GNUNET_assert (0 != sampler->sampler_size);
751
752   // TODO check if we have too much (distinct) sampled peers
753   uint32_t i;
754   struct NRandPeersReadyCls *cb_cls;
755   struct GetPeerCls *gpc;
756   struct GNUNET_HashCode *hash;
757
758   hash = GNUNET_new (struct GNUNET_HashCode);
759
760   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
761   cb_cls->num_peers = num_peers;
762   cb_cls->cur_num_peers = 0;
763   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
764   cb_cls->callback = cb;
765   cb_cls->cls = cls;
766
767   LOG (GNUNET_ERROR_TYPE_DEBUG,
768       "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
769
770   for ( i = 0 ; i < num_peers ; i++ )
771   {
772     gpc = GNUNET_new (struct GetPeerCls);
773     gpc->cb = check_n_peers_ready;
774     gpc->cb_cls = cb_cls;
775     gpc->id = &cb_cls->ids[i];
776
777     // maybe add a little delay
778     if (GNUNET_YES == for_client)
779       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
780     else if (GNUNET_NO == for_client)
781       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
782     GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
783     (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
784         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
785   }
786 }
787
788
789 /**
790  * Counts how many Samplers currently hold a given PeerID.
791  *
792  * @param id the PeerID to count.
793  *
794  * @return the number of occurrences of id.
795  */
796   uint32_t
797 RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
798 {
799   uint32_t count;
800   uint32_t i;
801
802   count = 0;
803   for ( i = 0 ; i < sampler->sampler_size ; i++ )
804   {
805     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
806         && EMPTY != sampler->sampler_elements[i]->is_empty)
807       count++;
808   }
809   return count;
810 }
811
812
813 /**
814  * Callback to iterate over the hashmap to cancle the get_peer_tasks.
815  */
816   int
817 clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value)
818 {
819   struct GNUNET_SCHEDULER_Task *task;
820
821   task = (struct GNUNET_SCHEDULER_Task *) value;
822   GNUNET_SCHEDULER_cancel (task);
823
824   GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value);
825
826   return GNUNET_YES;
827 }
828
829
830 /**
831  * Cleans the sampler.
832  */
833   void
834 RPS_sampler_destroy ()
835 {
836   if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks,
837                                                               clear_get_peer_tasks,
838                                                               NULL))
839     LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n");
840   GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks);
841   RPS_sampler_resize (0);
842   GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
843 }
844
845 /* end of gnunet-service-rps.c */