make sure fkt is called with right argument
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85   
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107 /**
108  * Sampler with its own array of SamplerElements
109  */
110 struct RPS_Sampler
111 {
112   /**
113    * Number of sampler elements we hold.
114    */
115   unsigned int sampler_size;
116   //size_t size;
117
118   /**
119    * All Samplers in one array.
120    */
121   struct RPS_SamplerElement **sampler_elements;
122
123   /**
124    * Max time a round takes
125    *
126    * Used in the context of RPS
127    */
128   struct GNUNET_TIME_Relative max_round_interval;
129
130   /**
131    * Callback to be called when a peer gets inserted into a sampler.
132    */
133   RPS_sampler_insert_cb insert_cb;
134
135   /**
136    * Closure to the insert_cb.
137    */
138   void *insert_cls;
139
140   /**
141    * Callback to be called when a peer gets inserted into a sampler.
142    */
143   RPS_sampler_remove_cb remove_cb;
144
145   /**
146    * Closure to the remove_cb.
147    */
148   void *remove_cls;
149 };
150
151 /**
152  * Closure to _get_n_rand_peers_ready_cb()
153  */
154 struct NRandPeersReadyCls
155 {
156   /**
157    * Number of peers we are waiting for.
158    */
159   uint32_t num_peers;
160
161   /**
162    * Number of peers we currently have.
163    */
164   uint32_t cur_num_peers;
165
166   /**
167    * Pointer to the array holding the ids.
168    */
169   struct GNUNET_PeerIdentity *ids;
170
171   /**
172    * Callback to be called when all ids are available.
173    */
174   RPS_sampler_n_rand_peers_ready_cb callback;
175
176   /**
177    * Closure given to the callback
178    */
179   void *cls;
180 };
181
182 /**
183  * Callback that is called from _get_rand_peer() when the PeerID is ready.
184  *
185  * @param cls the closure given alongside this function.
186  * @param id the PeerID that was returned
187  */
188 typedef void
189 (*RPS_sampler_rand_peer_ready_cb) (void *cls,
190         const struct GNUNET_PeerIdentity *id);
191
192 /**
193  * Closure to #RPS_sampler_get_rand_peer()
194  */
195 struct GetPeerCls
196 {
197   /**
198    * The task for this function.
199    */
200   struct GNUNET_SCHEDULER_Task *get_peer_task;
201
202   /**
203    * The callback
204    */
205   RPS_sampler_rand_peer_ready_cb cb;
206
207   /**
208    * The closure to the callback
209    */
210   void *cb_cls;
211
212   /**
213    * The address of the id to be stored at
214    */
215   struct GNUNET_PeerIdentity *id;
216 };
217
218 /**
219  * Multihashmap that keeps track of all get_peer_tasks that are still scheduled.
220  */
221 struct GNUNET_CONTAINER_MultiHashMap *get_peer_tasks;
222
223
224 /**
225  * Global sampler variable.
226  */
227 struct RPS_Sampler *sampler;
228
229
230 /**
231  * The minimal size for the extended sampler elements.
232  */
233 static size_t min_size;
234
235 /**
236  * The maximal size the extended sampler elements should grow to.
237  */
238 static size_t max_size;
239
240 /**
241  * The size the extended sampler elements currently have.
242  */
243 //static size_t extra_size;
244
245 /**
246  * Inedex to the sampler element that is the next to be returned
247  */
248 static uint32_t client_get_index;
249
250
251 /**
252  * Callback to _get_rand_peer() used by _get_n_rand_peers().
253  *
254  * Checks whether all n peers are available. If they are, 
255  * give those back.
256  */
257   void
258 check_n_peers_ready (void *cls,
259     const struct GNUNET_PeerIdentity *id)
260 {
261   struct NRandPeersReadyCls *n_peers_cls;
262
263   n_peers_cls = (struct NRandPeersReadyCls *) cls;
264
265   LOG (GNUNET_ERROR_TYPE_DEBUG,
266       "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n",
267       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
268
269   if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers)
270   { /* All peers are ready -- return those to the client */
271     GNUNET_assert (NULL != n_peers_cls->callback);
272
273     LOG (GNUNET_ERROR_TYPE_DEBUG,
274         "SAMPLER: returning %" PRIX32 " peers to the client\n",
275         n_peers_cls->num_peers);
276     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
277     
278     GNUNET_free (n_peers_cls);
279   }
280 }
281
282
283 /**
284  * Reinitialise a previously initialised sampler element.
285  *
286  * @param sampler pointer to the memory that keeps the value.
287  */
288   static void
289 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
290 {
291   sampler_el->is_empty = EMPTY;
292
293   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
294   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
295                              &(sampler_el->auth_key.key),
296                              GNUNET_CRYPTO_HASH_LENGTH);
297
298   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
299
300   sampler_el->birth = GNUNET_TIME_absolute_get ();
301   sampler_el->num_peers = 0;
302   sampler_el->num_change = 0;
303 }
304
305
306 /**
307  * (Re)Initialise given Sampler with random min-wise independent function.
308  *
309  * In this implementation this means choosing an auth_key for later use in
310  * a hmac at random.
311  *
312  * @return a newly created RPS_SamplerElement which currently holds no id.
313  */
314   struct RPS_SamplerElement *
315 RPS_sampler_elem_create (void)
316 {
317   struct RPS_SamplerElement *s;
318   
319   s = GNUNET_new (struct RPS_SamplerElement);
320
321   RPS_sampler_elem_reinit (s);
322   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
323
324   return s;
325 }
326
327
328 /**
329  * Input an PeerID into the given sampler.
330  */
331   static void
332 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other,
333     RPS_sampler_insert_cb insert_cb, void *insert_cls,
334     RPS_sampler_remove_cb remove_cb, void *remove_cls)
335 {
336   struct GNUNET_HashCode other_hash;
337
338   s_elem->num_peers++;
339
340   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
341   {
342     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
343         GNUNET_i2s (other));
344     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
345         GNUNET_i2s (&(s_elem->peer_id)));
346   }
347   else
348   {
349     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
350         other,
351         sizeof(struct GNUNET_PeerIdentity),
352         &other_hash);
353
354     if ( EMPTY == s_elem->is_empty )
355     { 
356       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (was empty previously).\n",
357           GNUNET_i2s(other));
358       s_elem->peer_id = *other;
359       s_elem->peer_id_hash = other_hash;
360
361       if (NULL != sampler->insert_cb)
362         sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id));
363
364       s_elem->num_change++;
365     }
366     else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
367     {
368       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
369           GNUNET_i2s (other));
370       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
371           GNUNET_i2s (&s_elem->peer_id));
372
373       if ( NULL != sampler->remove_cb )
374       {
375         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
376             GNUNET_i2s (&s_elem->peer_id));
377         sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
378       }
379
380       s_elem->peer_id = *other;
381       s_elem->peer_id_hash = other_hash;
382
383       if ( NULL != sampler->insert_cb )
384       {
385         LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
386             GNUNET_i2s (&s_elem->peer_id));
387         sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
388       }
389
390       s_elem->num_change++;
391     }
392     else
393     {
394       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
395           GNUNET_i2s(other));
396       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
397           GNUNET_i2s(&s_elem->peer_id));
398     }
399   }
400   s_elem->is_empty = NOT_EMPTY;
401 }
402
403
404 /**
405  * Grow or shrink the size of the sampler.
406  *
407  * @param new_size the new size of the sampler
408  */
409   void
410 RPS_sampler_resize (unsigned int new_size)
411 {
412   unsigned int old_size;
413   uint32_t i;
414   struct RPS_SamplerElement **rem_list;
415
416   // TODO check min and max size
417
418   old_size = sampler->sampler_size;
419
420   if (old_size > new_size)
421   { /* Shrinking */
422     /* Temporary store those to properly call the removeCB on those later */
423     rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
424     memcpy (rem_list,
425         &sampler->sampler_elements[new_size],
426         (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
427
428     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", old_size, new_size);
429     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
430     LOG (GNUNET_ERROR_TYPE_DEBUG,
431         "SAMPLER: sampler->sampler_elements now points to %p\n",
432         sampler->sampler_elements);
433
434     for (i = 0 ; i < old_size - new_size ; i++)
435     {/* Remove unneeded rest */
436       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX32 ". sampler\n", i);
437       if (NULL != sampler->remove_cb)
438         sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
439       GNUNET_free (rem_list[i]);
440     }
441     GNUNET_free (rem_list);
442   }
443   else if (old_size < new_size)
444   { /* Growing */
445     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", old_size, new_size);
446     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
447     LOG (GNUNET_ERROR_TYPE_DEBUG,
448         "SAMPLER: sampler->sampler_elements now points to %p\n",
449         sampler->sampler_elements);
450
451     for ( i = old_size ; i < new_size ; i++ )
452     { /* Add new sampler elements */
453       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
454       if (NULL != sampler->insert_cb)
455         sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
456       LOG (GNUNET_ERROR_TYPE_DEBUG,
457           "SAMPLER: Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
458           i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
459     }
460   }
461   else
462   {
463     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
464     return;
465   }
466
467   GNUNET_assert(sampler->sampler_size == new_size);
468   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // remove
469 }
470
471
472 /**
473  * Initialise a tuple of sampler elements.
474  *
475  * @param init_size the size the sampler is initialised with
476  * @param ins_cb the callback that will be called on every PeerID that is 
477  *               newly inserted into a sampler element
478  * @param ins_cls the closure given to #ins_cb
479  * @param rem_cb the callback that will be called on every PeerID that is
480  *               removed from a sampler element
481  * @param rem_cls the closure given to #rem_cb
482  */
483   void
484 RPS_sampler_init (size_t init_size,
485     struct GNUNET_TIME_Relative max_round_interval,
486     RPS_sampler_insert_cb ins_cb, void *ins_cls,
487     RPS_sampler_remove_cb rem_cb, void *rem_cls)
488 {
489   //struct RPS_Sampler *sampler;
490   //uint32_t i;
491
492   /* Initialise context around extended sampler */
493   min_size = 10; // TODO make input to _samplers_init()
494   max_size = 1000; // TODO make input to _samplers_init()
495
496   sampler = GNUNET_new (struct RPS_Sampler);
497   sampler->sampler_size = 0;
498   sampler->sampler_elements = NULL;
499   sampler->max_round_interval = max_round_interval;
500   sampler->insert_cb = ins_cb;
501   sampler->insert_cls = ins_cls;
502   sampler->remove_cb = rem_cb;
503   sampler->remove_cls = rem_cls;
504   get_peer_tasks = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_NO);
505   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
506   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
507   RPS_sampler_resize (init_size);
508
509   client_get_index = 0;
510
511   //GNUNET_assert (init_size == sampler->sampler_size);
512 }
513
514
515 /**
516  * A fuction to update every sampler in the given list
517  *
518  * @param id the PeerID that is put in the sampler
519  */
520   void
521 RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
522 {
523   uint32_t i;
524
525   for ( i = 0 ; i < sampler->sampler_size ; i++ )
526     RPS_sampler_elem_next (sampler->sampler_elements[i], id,
527         sampler->insert_cb, sampler->insert_cls,
528         sampler->remove_cb, sampler->remove_cls);
529 }
530
531
532 /**
533  * Reinitialise all previously initialised sampler elements with the given value.
534  *
535  * Used to get rid of a PeerID.
536  *
537  * @param id the id of the sampler elements to update.
538  */
539   void
540 RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
541 {
542   uint32_t i;
543
544   for ( i = 0 ; i < sampler->sampler_size ; i++ )
545   {
546     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
547     {
548       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
549       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
550     }
551   }
552 }
553
554
555 /**
556  * Get one random peer out of the sampled peers.
557  *
558  * We might want to reinitialise this sampler after giving the
559  * corrsponding peer to the client.
560  * Only used internally
561  */
562   void
563 RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
564 {
565   struct GetPeerCls *gpc;
566   uint32_t r_index;
567   struct GNUNET_HashCode *hash;
568
569   gpc = (struct GetPeerCls *) cls;
570
571   /**;
572    * Choose the r_index of the peer we want to return
573    * at random from the interval of the gossip list
574    */
575   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
576       sampler->sampler_size);
577
578   if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
579   {
580     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
581                                                                    GNUNET_TIME_UNIT_SECONDS,
582                                                                    .1),
583                                                        &RPS_sampler_get_rand_peer_,
584                                                        cls);
585     return;
586   }
587
588   *gpc->id = sampler->sampler_elements[r_index]->peer_id;
589
590   hash = GNUNET_new (struct GNUNET_HashCode);
591   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
592   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
593       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
594   GNUNET_free (gpc->get_peer_task);
595
596   gpc->cb (gpc->cb_cls, gpc->id);
597 }
598
599
600 /**
601  * Get one random peer out of the sampled peers.
602  *
603  * We might want to reinitialise this sampler after giving the
604  * corrsponding peer to the client.
605  *
606  * @return a random PeerID of the PeerIDs previously put into the sampler.
607  */
608   void
609 RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
610 {
611   struct GetPeerCls *gpc;
612   struct GNUNET_PeerIdentity tmp_id;
613   struct RPS_SamplerElement *s_elem;
614   struct GNUNET_TIME_Relative last_request_diff;
615   struct GNUNET_HashCode *hash;
616   uint32_t tmp_client_get_index;
617
618   LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n");
619
620   gpc = (struct GetPeerCls *) cls;
621   hash = GNUNET_new (struct GNUNET_HashCode);
622
623   /* Store the next #client_get_index to check whether we cycled over the whole list */
624   if (0 < client_get_index)
625     tmp_client_get_index = client_get_index - 1;
626   else
627     tmp_client_get_index = sampler->sampler_size - 1;
628
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630       "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
631       tmp_client_get_index, sampler->sampler_size);
632
633   do
634   { /* Get first non empty sampler */
635     if (tmp_client_get_index == client_get_index)
636     {
637       LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: reached tmp_index %" PRIX32 ".\n", client_get_index);
638       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
639                                                          &RPS_sampler_get_rand_peer,
640                                                          cls);
641       return;
642     }
643
644     tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
645     RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
646     RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
647                            NULL, NULL, NULL, NULL);
648
649     /* Cycle the #client_get_index one step further */
650     if ( client_get_index == sampler->sampler_size - 1 )
651       client_get_index = 0;
652     else
653       client_get_index++;
654
655     LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index);
656   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
657
658   s_elem = sampler->sampler_elements[client_get_index];
659   *gpc->id = s_elem->peer_id;
660
661   /* Check whether we may use this sampler to give it back to the client */
662   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
663   {
664     last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
665                                                              GNUNET_TIME_absolute_get ());
666     /* We're not going to give it back now if it was already requested by a client this round */
667     if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
668     {
669       LOG (GNUNET_ERROR_TYPE_DEBUG,
670           "SAMPLER: Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
671       ///* How many time remains untile the next round has started? */
672       //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff,
673       //                                                             sampler->max_round_interval);
674       // add a little delay
675       /* Schedule it one round later */
676       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
677                                               &RPS_sampler_get_rand_peer,
678                                               cls);
679       return;
680     }
681     // TODO add other reasons to wait here
682   }
683
684   GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
685   if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task))
686       LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n");
687   GNUNET_free (gpc->get_peer_task);
688
689   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
690
691   gpc->cb (gpc->cb_cls, gpc->id);
692 }
693
694
695 /**
696  * Get n random peers out of the sampled peers.
697  *
698  * We might want to reinitialise this sampler after giving the
699  * corrsponding peer to the client.
700  * Random with or without consumption?
701  *
702  * @param cb callback that will be called once the ids are ready.
703  * @param cls closure given to @a cb
704  * @param for_client #GNUNET_YES if result is used for client,
705  *                   #GNUNET_NO if used internally
706  * @param num_peers the number of peers requested
707  */
708   void
709 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
710     void *cls, uint32_t num_peers, int for_client)
711 {
712   GNUNET_assert (GNUNET_YES == for_client ||
713                  GNUNET_NO  == for_client);
714   GNUNET_assert (0 != sampler->sampler_size);
715
716   // TODO check if we have too much (distinct) sampled peers
717   uint32_t i;
718   struct NRandPeersReadyCls *cb_cls;
719   struct GetPeerCls *gpc;
720   struct GNUNET_HashCode *hash;
721
722   hash = GNUNET_new (struct GNUNET_HashCode);
723
724   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
725   cb_cls->num_peers = num_peers;
726   cb_cls->cur_num_peers = 0;
727   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
728   cb_cls->callback = cb;
729   cb_cls->cls = cls;
730
731   LOG (GNUNET_ERROR_TYPE_DEBUG,
732       "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers);
733
734   for ( i = 0 ; i < num_peers ; i++ )
735   {
736     gpc = GNUNET_new (struct GetPeerCls);
737     gpc->cb = check_n_peers_ready;
738     gpc->cb_cls = cb_cls;
739     gpc->id = &cb_cls->ids[i];
740
741     // maybe add a little delay
742     if (GNUNET_YES == for_client)
743       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc);
744     else if (GNUNET_NO == for_client)
745       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc);
746     GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash);
747     (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task,
748         GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
749   }
750 }
751
752
753 /**
754  * Counts how many Samplers currently hold a given PeerID.
755  *
756  * @param id the PeerID to count.
757  *
758  * @return the number of occurrences of id.
759  */
760   uint32_t
761 RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
762 {
763   uint32_t count;
764   uint32_t i;
765
766   count = 0;
767   for ( i = 0 ; i < sampler->sampler_size ; i++ )
768   {
769     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id) 
770         && EMPTY != sampler->sampler_elements[i]->is_empty)
771       count++;
772   }
773   return count;
774 }
775
776
777 /**
778  * Callback to iterate over the hashmap to cancle the get_peer_tasks.
779  */
780   int
781 clear_get_peer_tasks (void *cls, const struct GNUNET_HashCode *key, void *value)
782 {
783   struct GNUNET_SCHEDULER_Task *task;
784
785   task = (struct GNUNET_SCHEDULER_Task *) value;
786   GNUNET_SCHEDULER_cancel (task);
787
788   GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, key, value);
789   
790   return GNUNET_YES;
791 }
792
793
794 /**
795  * Cleans the sampler.
796  */
797   void
798 RPS_sampler_destroy ()
799 {
800   if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_iterate (get_peer_tasks,
801                                                               clear_get_peer_tasks,
802                                                               NULL))
803     LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: iteration over hashmap was cancelled\n");
804   GNUNET_CONTAINER_multihashmap_destroy (get_peer_tasks);
805   RPS_sampler_resize (0);
806   GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
807 }
808
809 /* end of gnunet-service-rps.c */