01d8aa74853341cc31d190afd3ee648b169f9e81
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107 /**
108  * Sampler with its own array of SamplerElements
109  */
110 struct RPS_Sampler
111 {
112   /**
113    * Number of sampler elements we hold.
114    */
115   unsigned int sampler_size;
116   //size_t size;
117
118   /**
119    * All Samplers in one array.
120    */
121   struct RPS_SamplerElement **sampler_elements;
122
123   /**
124    * Max time a round takes
125    *
126    * Used in the context of RPS
127    */
128   struct GNUNET_TIME_Relative max_round_interval;
129
130   /**
131    * Callback to be called when a peer gets inserted into a sampler.
132    */
133   RPS_sampler_insert_cb insert_cb;
134
135   /**
136    * Closure to the insert_cb.
137    */
138   void *insert_cls;
139
140   /**
141    * Callback to be called when a peer gets inserted into a sampler.
142    */
143   RPS_sampler_remove_cb remove_cb;
144
145   /**
146    * Closure to the remove_cb.
147    */
148   void *remove_cls;
149 };
150
151 /**
152  * Closure to _get_n_rand_peers_ready_cb()
153  */
154 struct NRandPeersReadyCls
155 {
156   /**
157    * Number of peers we are waiting for.
158    */
159   uint32_t num_peers;
160
161   /**
162    * Number of peers we currently have.
163    */
164   uint32_t cur_num_peers;
165
166   /**
167    * Pointer to the array holding the ids.
168    */
169   struct GNUNET_PeerIdentity *ids;
170
171   /**
172    * Callback to be called when all ids are available.
173    */
174   RPS_sampler_n_rand_peers_ready_cb callback;
175
176   /**
177    * Closure given to the callback
178    */
179   void *cls;
180 };
181
182 /**
183  * Callback that is called from _get_rand_peer() when the PeerID is ready.
184  *
185  * @param cls the closure given alongside this function.
186  * @param id the PeerID that was returned
187  */
188 typedef void
189 (*RPS_sampler_rand_peer_ready_cb) (void *cls,
190         const struct GNUNET_PeerIdentity *id);
191
192 /**
193  * Closure to #RPS_sampler_get_rand_peer()
194  */
195 struct GetPeerCls
196 {
197   /**
198    * The task for this function.
199    */
200   struct GNUNET_SCHEDULER_Task *get_peer_task;
201
202   /**
203    * The callback
204    */
205   RPS_sampler_rand_peer_ready_cb cb;
206
207   /**
208    * The closure to the callback
209    */
210   void *cb_cls;
211
212   /**
213    * The address of the id to be stored at
214    */
215   struct GNUNET_PeerIdentity *id;
216 };
217
218 /**
219  * Multihashmap that keeps track of all get_peer_tasks that are still scheduled.
220  */
221 struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks;
222
223
224 /**
225  * Global sampler variable.
226  */
227 struct RPS_Sampler *sampler;
228
229
230 /**
231  * The minimal size for the extended sampler elements.
232  */
233 static size_t min_size;
234
235 /**
236  * The maximal size the extended sampler elements should grow to.
237  */
238 static size_t max_size;
239
240 /**
241  * The size the extended sampler elements currently have.
242  */
243 //static size_t extra_size;
244
245 /**
246  * Inedex to the sampler element that is the next to be returned
247  */
248 static uint32_t client_get_index;
249
250
251 /**
252  * Callback to _get_rand_peer() used by _get_n_rand_peers().
253  *
254  * Checks whether all n peers are available. If they are,
255  * give those back.
256  */
257   void
258 check_n_peers_ready (void *cls,
259     const struct GNUNET_PeerIdentity *id)
260 {
261   struct NRandPeersReadyCls *n_peers_cls;
262
263   n_peers_cls = (struct NRandPeersReadyCls *) cls;
264
265   LOG (GNUNET_ERROR_TYPE_DEBUG,
266       "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
267       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
268
269   if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
270   { /* All peers are ready -- return those to the client */
271     GNUNET_assert (NULL != n_peers_cls->callback);
272
273     LOG (GNUNET_ERROR_TYPE_DEBUG,
274         "SAMPLER: returning %" PRIX32 " peers to the client\n",
275         n_peers_cls->num_peers);
276     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
277
278     GNUNET_free (n_peers_cls);
279   }
280 }
281
282
283 /**
284  * Reinitialise a previously initialised sampler element.
285  *
286  * @param sampler pointer to the memory that keeps the value.
287  */
288   static void
289 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
290 {
291   sampler_el->is_empty = EMPTY;
292
293   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
294   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
295                              &(sampler_el->auth_key.key),
296                              GNUNET_CRYPTO_HASH_LENGTH);
297
298   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
299
300   sampler_el->birth = GNUNET_TIME_absolute_get ();
301   sampler_el->num_peers = 0;
302   sampler_el->num_change = 0;
303 }
304
305
306 /**
307  * (Re)Initialise given Sampler with random min-wise independent function.
308  *
309  * In this implementation this means choosing an auth_key for later use in
310  * a hmac at random.
311  *
312  * @return a newly created RPS_SamplerElement which currently holds no id.
313  */
314   struct RPS_SamplerElement *
315 RPS_sampler_elem_create (void)
316 {
317   struct RPS_SamplerElement *s;
318
319   s = GNUNET_new (struct RPS_SamplerElement);
320
321   RPS_sampler_elem_reinit (s);
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
323
324   return s;
325 }
326
327
328 /**
329  * Input an PeerID into the given sampler.
330  */
331   static void
332 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other,
333     RPS_sampler_insert_cb insert_cb, void *insert_cls,
334     RPS_sampler_remove_cb remove_cb, void *remove_cls)
335 {
336   struct GNUNET_HashCode other_hash;
337
338   s_elem->num_peers++;
339
340   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
341   {
342     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
343         GNUNET_i2s (other));
344     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
345         GNUNET_i2s (&(s_elem->peer_id)));
346   }
347   else
348   {
349     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
350         other,
351         sizeof(struct GNUNET_PeerIdentity),
352         &other_hash);
353
354     if ( EMPTY == s_elem->is_empty )
355     {
356       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n",
357           GNUNET_i2s(other));
358       s_elem->peer_id = *other;
359       s_elem->peer_id_hash = other_hash;
360
361       if (NULL != sampler->insert_cb)
362         sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id));
363
364       s_elem->num_change++;
365     }
366     else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
367     {
368       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
369           GNUNET_i2s (other));
370       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
371           GNUNET_i2s (&s_elem->peer_id));
372
373       if ( NULL != sampler->remove_cb )
374       {
375         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
376             GNUNET_i2s (&s_elem->peer_id));
377         sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
378       }
379
380       s_elem->peer_id = *other;
381       s_elem->peer_id_hash = other_hash;
382
383       if ( NULL != sampler->insert_cb )
384       {
385         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
386             GNUNET_i2s (&s_elem->peer_id));
387         sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
388       }
389
390       s_elem->num_change++;
391     }
392     else
393     {
394       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
395           GNUNET_i2s(other));
396       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
397           GNUNET_i2s(&s_elem->peer_id));
398     }
399   }
400   s_elem->is_empty = NOT_EMPTY;
401 }
402
403 /**
404  * Get the size of the sampler.
405  *
406  * @return the size of the sampler
407  */
408 unsigned int
409 RPS_sampler_get_size ()
410 {
411   return sampler->sampler_size;
412 }
413
414
415 /**
416  * Grow or shrink the size of the sampler.
417  *
418  * @param new_size the new size of the sampler
419  */
420   void
421 RPS_sampler_resize (unsigned int new_size)
422 {
423   unsigned int old_size;
424   uint32_t i;
425   struct RPS_SamplerElement **rem_list;
426
427   // TODO check min and max size
428
429   old_size = sampler->sampler_size;
430
431   if (old_size > new_size)
432   { /* Shrinking */
433     /* Temporary store those to properly call the removeCB on those later */
434     rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
435     memcpy (rem_list,
436         &sampler->sampler_elements[new_size],
437         (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
438
439     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size);
440     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
441     LOG (GNUNET_ERROR_TYPE_DEBUG,
442         "SAMPLER: sampler->sampler_elements now points to %p\n",
443         sampler->sampler_elements);
444
445     for (i = 0 ; i < old_size - new_size ; i++)
446     {/* Remove unneeded rest */
447       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX32 ". sampler\n", i);
448       if (NULL != sampler->remove_cb)
449         sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
450       GNUNET_free (rem_list[i]);
451     }
452     GNUNET_free (rem_list);
453   }
454   else if (old_size < new_size)
455   { /* Growing */
456     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size);
457     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
458     LOG (GNUNET_ERROR_TYPE_DEBUG,
459         "SAMPLER: sampler->sampler_elements now points to %p\n",
460         sampler->sampler_elements);
461
462     for ( i = old_size ; i < new_size ; i++ )
463     { /* Add new sampler elements */
464       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
465       if (NULL != sampler->insert_cb)
466         sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
467       LOG (GNUNET_ERROR_TYPE_DEBUG,
468           "SAMPLER: Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
469           i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
470     }
471   }
472   else
473   {
474     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
475     return;
476   }
477
478   GNUNET_assert (sampler->sampler_size == new_size);
479   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove
480 }
481
482
483 /**
484  * Initialise a tuple of sampler elements.
485  *
486  * @param init_size the size the sampler is initialised with
487  * @param ins_cb the callback that will be called on every PeerID that is
488  *               newly inserted into a sampler element
489  * @param ins_cls the closure given to #ins_cb
490  * @param rem_cb the callback that will be called on every PeerID that is
491  *               removed from a sampler element
492  * @param rem_cls the closure given to #rem_cb
493  */
494   void
495 RPS_sampler_init (size_t init_size,
496     struct GNUNET_TIME_Relative max_round_interval,
497     RPS_sampler_insert_cb ins_cb, void *ins_cls,
498     RPS_sampler_remove_cb rem_cb, void *rem_cls)
499 {
500   //struct RPS_Sampler *sampler;
501   //uint32_t i;
502
503   /* Initialise context around extended sampler */
504   min_size = 10; // TODO make input to _samplers_init()
505   max_size = 1000; // TODO make input to _samplers_init()
506
507   sampler = GNUNET_new (struct RPS_Sampler);
508   sampler->sampler_size = 0;
509   sampler->sampler_elements = NULL;
510   sampler->max_round_interval = max_round_interval;
511   sampler->insert_cb = ins_cb;
512   sampler->insert_cls = ins_cls;
513   sampler->remove_cb = rem_cb;
514   sampler->remove_cls = rem_cls;
515   get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
516   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
517   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
518   RPS_sampler_resize (init_size);
519
520   client_get_index = 0;
521
522   //GNUNET_assert (init_size == sampler->sampler_size);
523 }
524
525
526 /**
527  * A fuction to update every sampler in the given list
528  *
529  * @param id the PeerID that is put in the sampler
530  */
531   void
532 RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
533 {
534   uint32_t i;
535
536   for ( i = 0 ; i < sampler->sampler_size ; i++ )
537     RPS_sampler_elem_next (sampler->sampler_elements[i], id,
538         sampler->insert_cb, sampler->insert_cls,
539         sampler->remove_cb, sampler->remove_cls);
540 }
541
542
543 /**
544  * Reinitialise all previously initialised sampler elements with the given value.
545  *
546  * Used to get rid of a PeerID.
547  *
548  * @param id the id of the sampler elements to update.
549  */
550   void
551 RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
552 {
553   uint32_t i;
554
555   for ( i = 0 ; i < sampler->sampler_size ; i++ )
556   {
557     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
558     {
559       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
560       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
561     }
562   }
563 }
564
565
566 /**
567  * Get one random peer out of the sampled peers.
568  *
569  * We might want to reinitialise this sampler after giving the
570  * corrsponding peer to the client.
571  * Only used internally
572  */
573   void
574 RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
575 {
576   struct GetPeerCls *gpc;
577   uint32_t r_index;
578   struct GNUNET_HashCode *hash;
579
580   gpc = (struct GetPeerCls *) cls;
581
582   /**;
583    * Choose the r_index of the peer we want to return
584    * at random from the interval of the gossip list
585    */
586   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
587       sampler->sampler_size);
588
589   if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
590   {
591     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
592                                                                    GNUNET_TIME_UNIT_SECONDS,
593                                                                    .1),
594                                                        &RPS_sampler_get_rand_peer_,
595                                                        cls);
596     return;
597   }
598
599   *gpc->id = sampler->sampler_elements[r_index]->peer_id;
600
601   hash = GNUNET_new (struct GNUNET_HashCode);
602   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
603   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
604       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
605   GNUNET_free (gpc->get_peer_task);
606
607   gpc->cb (gpc->cb_cls, gpc->id);
608 }
609
610
611 /**
612  * Get one random peer out of the sampled peers.
613  *
614  * We might want to reinitialise this sampler after giving the
615  * corrsponding peer to the client.
616  *
617  * @return a random PeerID of the PeerIDs previously put into the sampler.
618  */
619   void
620 RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
621 {
622   struct GetPeerCls *gpc;
623   struct GNUNET_PeerIdentity tmp_id;
624   struct RPS_SamplerElement *s_elem;
625   struct GNUNET_TIME_Relative last_request_diff;
626   struct GNUNET_HashCode *hash;
627   uint32_t tmp_client_get_index;
628
629   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
630
631   gpc = (struct GetPeerCls *) cls;
632   hash = GNUNET_new (struct GNUNET_HashCode);
633
634   /* Store the next #client_get_index to check whether we cycled over the whole list */
635   if (0 < client_get_index)
636     tmp_client_get_index = client_get_index - 1;
637   else
638     tmp_client_get_index = sampler->sampler_size - 1;
639
640   LOG (GNUNET_ERROR_TYPE_DEBUG,
641       "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
642       tmp_client_get_index, sampler->sampler_size);
643
644   do
645   { /* Get first non empty sampler */
646     if (tmp_client_get_index == client_get_index)
647     {
648       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: reached tmp_index %" PRIX32 ".\n", client_get_index);
649       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
650                                                          &RPS_sampler_get_rand_peer,
651                                                          cls);
652       return;
653     }
654
655     tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
656     RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
657     RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
658                            NULL, NULL, NULL, NULL);
659
660     /* Cycle the #client_get_index one step further */
661     if ( client_get_index == sampler->sampler_size - 1 )
662       client_get_index = 0;
663     else
664       client_get_index++;
665
666     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
667   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
668
669   s_elem = sampler->sampler_elements[client_get_index];
670   *gpc->id = s_elem->peer_id;
671
672   /* Check whether we may use this sampler to give it back to the client */
673   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
674   {
675     last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
676                                                              GNUNET_TIME_absolute_get ());
677     /* We're not going to give it back now if it was already requested by a client this round */
678     if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
679     {
680       LOG (GNUNET_ERROR_TYPE_DEBUG,
681           "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
682       ///* How many time remains untile the next round has started? */
683       //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff,
684       //                                                             sampler->max_round_interval);
685       // add a little delay
686       /* Schedule it one round later */
687       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
688                                               &RPS_sampler_get_rand_peer,
689                                               cls);
690       return;
691     }
692     // TODO add other reasons to wait here
693   }
694
695   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
696   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
697       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
698   GNUNET_free (gpc->get_peer_task);
699
700   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
701
702   gpc->cb (gpc->cb_cls, gpc->id);
703 }
704
705
706 /**
707  * Get n random peers out of the sampled peers.
708  *
709  * We might want to reinitialise this sampler after giving the
710  * corrsponding peer to the client.
711  * Random with or without consumption?
712  *
713  * @param cb callback that will be called once the ids are ready.
714  * @param cls closure given to @a cb
715  * @param for_client #GNUNET_YES if result is used for client,
716  *                   #GNUNET_NO if used internally
717  * @param num_peers the number of peers requested
718  */
719   void
720 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
721     void *cls, uint32_t num_peers, int for_client)
722 {
723   GNUNET_assert (GNUNET_YES == for_client ||
724                  GNUNET_NO  == for_client);
725   GNUNET_assert (0 != sampler->sampler_size);
726
727   // TODO check if we have too much (distinct) sampled peers
728   uint32_t i;
729   struct NRandPeersReadyCls *cb_cls;
730   struct GetPeerCls *gpc;
731   struct GNUNET_HashCode *hash;
732
733   hash = GNUNET_new (struct GNUNET_HashCode);
734
735   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
736   cb_cls->num_peers = num_peers;
737   cb_cls->cur_num_peers = 0;
738   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
739   cb_cls->callback = cb;
740   cb_cls->cls = cls;
741
742   LOG (GNUNET_ERROR_TYPE_DEBUG,
743       "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
744
745   for ( i = 0 ; i < num_peers ; i++ )
746   {
747     gpc = GNUNET_new (struct GetPeerCls);
748     gpc->cb = check_n_peers_ready;
749     gpc->cb_cls = cb_cls;
750     gpc->id = &cb_cls->ids[i];
751
752     // maybe add a little delay
753     if (GNUNET_YES == for_client)
754       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
755     else if (GNUNET_NO == for_client)
756       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
757     GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
758     (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
759         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
760   }
761 }
762
763
764 /**
765  * Counts how many Samplers currently hold a given PeerID.
766  *
767  * @param id the PeerID to count.
768  *
769  * @return the number of occurrences of id.
770  */
771   uint32_t
772 RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
773 {
774   uint32_t count;
775   uint32_t i;
776
777   count = 0;
778   for ( i = 0 ; i < sampler->sampler_size ; i++ )
779   {
780     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
781         && EMPTY != sampler->sampler_elements[i]->is_empty)
782       count++;
783   }
784   return count;
785 }
786
787
788 /**
789  * Callback to iterate over the hashmap to cancle the get_peer_tasks.
790  */
791   int
792 clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value)
793 {
794   struct GNUNET_SCHEDULER_Task *task;
795
796   task = (struct GNUNET_SCHEDULER_Task *) value;
797   GNUNET_SCHEDULER_cancel (task);
798
799   GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value);
800
801   return GNUNET_YES;
802 }
803
804
805 /**
806  * Cleans the sampler.
807  */
808   void
809 RPS_sampler_destroy ()
810 {
811   if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks,
812                                                               clear_get_peer_tasks,
813                                                               NULL))
814     LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n");
815   GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks);
816   RPS_sampler_resize (0);
817   GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
818 }
819
820 /* end of gnunet-service-rps.c */