92826d8756f714d77c5022954b53f65cbd441ce6
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107 /**
108  * Sampler with its own array of SamplerElements
109  */
110 struct RPS_Sampler
111 {
112   /**
113    * Number of sampler elements we hold.
114    */
115   unsigned int sampler_size;
116   //size_t size;
117
118   /**
119    * All Samplers in one array.
120    */
121   struct RPS_SamplerElement **sampler_elements;
122
123   /**
124    * Max time a round takes
125    *
126    * Used in the context of RPS
127    */
128   struct GNUNET_TIME_Relative max_round_interval;
129
130   /**
131    * Callback to be called when a peer gets inserted into a sampler.
132    */
133   RPS_sampler_insert_cb insert_cb;
134
135   /**
136    * Closure to the insert_cb.
137    */
138   void *insert_cls;
139
140   /**
141    * Callback to be called when a peer gets inserted into a sampler.
142    */
143   RPS_sampler_remove_cb remove_cb;
144
145   /**
146    * Closure to the remove_cb.
147    */
148   void *remove_cls;
149 };
150
151 /**
152  * Closure to _get_n_rand_peers_ready_cb()
153  */
154 struct NRandPeersReadyCls
155 {
156   /**
157    * Number of peers we are waiting for.
158    */
159   uint32_t num_peers;
160
161   /**
162    * Number of peers we currently have.
163    */
164   uint32_t cur_num_peers;
165
166   /**
167    * Pointer to the array holding the ids.
168    */
169   struct GNUNET_PeerIdentity *ids;
170
171   /**
172    * Callback to be called when all ids are available.
173    */
174   RPS_sampler_n_rand_peers_ready_cb callback;
175
176   /**
177    * Closure given to the callback
178    */
179   void *cls;
180 };
181
182 /**
183  * Callback that is called from _get_rand_peer() when the PeerID is ready.
184  *
185  * @param cls the closure given alongside this function.
186  * @param id the PeerID that was returned
187  */
188 typedef void
189 (*RPS_sampler_rand_peer_ready_cont) (void *cls,
190         const struct GNUNET_PeerIdentity *id);
191
192 /**
193  * Closure to #RPS_sampler_get_rand_peer()
194  */
195 struct GetPeerCls
196 {
197   /** DLL */
198   struct GetPeerCls *next;
199   struct GetPeerCls *prev;
200
201   /**
202    * The task for this function.
203    */
204   struct GNUNET_SCHEDULER_Task *get_peer_task;
205
206   /**
207    * The callback
208    */
209   RPS_sampler_rand_peer_ready_cont cont;
210
211   /**
212    * The closure to the callback
213    */
214   void *cont_cls;
215
216   /**
217    * The address of the id to be stored at
218    */
219   struct GNUNET_PeerIdentity *id;
220 };
221
222
223 /**
224  * Global sampler variable.
225  */
226 struct RPS_Sampler *sampler;
227
228
229 /**
230  * The minimal size for the extended sampler elements.
231  */
232 static size_t min_size;
233
234 /**
235  * The maximal size the extended sampler elements should grow to.
236  */
237 static size_t max_size;
238
239 /**
240  * The size the extended sampler elements currently have.
241  */
242 //static size_t extra_size;
243
244 /**
245  * Inedex to the sampler element that is the next to be returned
246  */
247 static uint32_t client_get_index;
248
249
250 /** FIXME document */
251 struct GetPeerCls *gpc_head;
252 struct GetPeerCls *gpc_tail;
253
254
255 /**
256  * Callback to _get_rand_peer() used by _get_n_rand_peers().
257  *
258  * Checks whether all n peers are available. If they are,
259  * give those back.
260  */
261   void
262 check_n_peers_ready (void *cls,
263     const struct GNUNET_PeerIdentity *id)
264 {
265   struct NRandPeersReadyCls *n_peers_cls;
266
267   n_peers_cls = (struct NRandPeersReadyCls *) cls;
268
269   n_peers_cls->cur_num_peers++;
270   LOG (GNUNET_ERROR_TYPE_DEBUG,
271       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
272       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
273
274   if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
275   { /* All peers are ready -- return those to the client */
276     GNUNET_assert (NULL != n_peers_cls->callback);
277
278     LOG (GNUNET_ERROR_TYPE_DEBUG,
279         "returning %" PRIX32 " peers to the client\n",
280         n_peers_cls->num_peers);
281     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
282
283     GNUNET_free (n_peers_cls);
284   }
285 }
286
287
288 /**
289  * Reinitialise a previously initialised sampler element.
290  *
291  * @param sampler pointer to the memory that keeps the value.
292  */
293   static void
294 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
295 {
296   sampler_el->is_empty = EMPTY;
297
298   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
299   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
300                              &(sampler_el->auth_key.key),
301                              GNUNET_CRYPTO_HASH_LENGTH);
302
303   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
304
305   sampler_el->birth = GNUNET_TIME_absolute_get ();
306   sampler_el->num_peers = 0;
307   sampler_el->num_change = 0;
308 }
309
310
311 /**
312  * (Re)Initialise given Sampler with random min-wise independent function.
313  *
314  * In this implementation this means choosing an auth_key for later use in
315  * a hmac at random.
316  *
317  * @return a newly created RPS_SamplerElement which currently holds no id.
318  */
319   struct RPS_SamplerElement *
320 RPS_sampler_elem_create (void)
321 {
322   struct RPS_SamplerElement *s;
323
324   s = GNUNET_new (struct RPS_SamplerElement);
325
326   RPS_sampler_elem_reinit (s);
327   LOG (GNUNET_ERROR_TYPE_DEBUG, "initialised with empty PeerID\n");
328
329   return s;
330 }
331
332
333 /**
334  * Input an PeerID into the given sampler.
335  */
336   static void
337 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct GNUNET_PeerIdentity *other,
338     RPS_sampler_insert_cb insert_cb, void *insert_cls,
339     RPS_sampler_remove_cb remove_cb, void *remove_cls)
340 {
341   struct GNUNET_HashCode other_hash;
342
343   s_elem->num_peers++;
344
345   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)) )
346   {
347     LOG (GNUNET_ERROR_TYPE_DEBUG, "         Got PeerID %s\n",
348         GNUNET_i2s (other));
349     LOG (GNUNET_ERROR_TYPE_DEBUG, "Have already PeerID %s\n",
350         GNUNET_i2s (&(s_elem->peer_id)));
351   }
352   else
353   {
354     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
355         other,
356         sizeof(struct GNUNET_PeerIdentity),
357         &other_hash);
358
359     if ( EMPTY == s_elem->is_empty )
360     {
361       LOG (GNUNET_ERROR_TYPE_DEBUG, "Got PeerID %s; Simply accepting (was empty previously).\n",
362           GNUNET_i2s(other));
363       s_elem->peer_id = *other;
364       s_elem->peer_id_hash = other_hash;
365
366       if (NULL != sampler->insert_cb)
367         sampler->insert_cb (sampler->insert_cls, &(s_elem->peer_id));
368
369       s_elem->num_change++;
370     }
371     else if ( 0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash) )
372     {
373       LOG (GNUNET_ERROR_TYPE_DEBUG, "           Got PeerID %s\n",
374           GNUNET_i2s (other));
375       LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
376           GNUNET_i2s (&s_elem->peer_id));
377
378       if ( NULL != sampler->remove_cb )
379       {
380         LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing old PeerID %s with the remove callback.\n",
381             GNUNET_i2s (&s_elem->peer_id));
382         sampler->remove_cb (sampler->remove_cls, &s_elem->peer_id);
383       }
384
385       s_elem->peer_id = *other;
386       s_elem->peer_id_hash = other_hash;
387
388       if ( NULL != sampler->insert_cb )
389       {
390         LOG (GNUNET_ERROR_TYPE_DEBUG, "Inserting new PeerID %s with the insert callback.\n",
391             GNUNET_i2s (&s_elem->peer_id));
392         sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
393       }
394
395       s_elem->num_change++;
396     }
397     else
398     {
399       LOG (GNUNET_ERROR_TYPE_DEBUG, "        Got PeerID %s\n",
400           GNUNET_i2s(other));
401       LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n",
402           GNUNET_i2s(&s_elem->peer_id));
403     }
404   }
405   s_elem->is_empty = NOT_EMPTY;
406 }
407
408 /**
409  * Get the size of the sampler.
410  *
411  * @return the size of the sampler
412  */
413 unsigned int
414 RPS_sampler_get_size ()
415 {
416   return sampler->sampler_size;
417 }
418
419
420 /**
421  * Grow or shrink the size of the sampler.
422  *
423  * @param new_size the new size of the sampler
424  */
425 static void
426 sampler_resize (unsigned int new_size)
427 {
428   unsigned int old_size;
429   uint32_t i;
430   struct RPS_SamplerElement **rem_list;
431
432   // TODO check min and max size
433
434   old_size = sampler->sampler_size;
435
436   if (old_size > new_size)
437   { /* Shrinking */
438     /* Temporary store those to properly call the removeCB on those later */
439     rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct RPS_SamplerElement *));
440     memcpy (rem_list,
441         &sampler->sampler_elements[new_size],
442         (old_size - new_size) * sizeof (struct RPS_SamplerElement *));
443
444     LOG (GNUNET_ERROR_TYPE_DEBUG, "Shrinking sampler %d -> %d\n", old_size, new_size);
445     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
446     LOG (GNUNET_ERROR_TYPE_DEBUG,
447         "sampler->sampler_elements now points to %p\n",
448         sampler->sampler_elements);
449
450     for (i = 0 ; i < old_size - new_size ; i++)
451     {/* Remove unneeded rest */
452       LOG (GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX32 ". sampler\n", i);
453       if (NULL != sampler->remove_cb)
454         sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
455       GNUNET_free (rem_list[i]);
456     }
457     GNUNET_free (rem_list);
458   }
459   else if (old_size < new_size)
460   { /* Growing */
461     LOG (GNUNET_ERROR_TYPE_DEBUG, "Growing sampler %d -> %d\n", old_size, new_size);
462     GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, new_size);
463     LOG (GNUNET_ERROR_TYPE_DEBUG,
464         "sampler->sampler_elements now points to %p\n",
465         sampler->sampler_elements);
466
467     for ( i = old_size ; i < new_size ; i++ )
468     { /* Add new sampler elements */
469       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
470       if (NULL != sampler->insert_cb)
471         sampler->insert_cb (sampler->insert_cls, &sampler->sampler_elements[i]->peer_id);
472       LOG (GNUNET_ERROR_TYPE_DEBUG,
473           "Added %" PRIX32 ". sampler, now pointing to %p, contains %s\n",
474           i, &sampler->sampler_elements[i], GNUNET_i2s (&sampler->sampler_elements[i]->peer_id));
475     }
476   }
477   else
478   {
479     LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
480     return;
481   }
482
483   GNUNET_assert (sampler->sampler_size == new_size);
484   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n"); // remove
485 }
486
487
488 /**
489  * Grow or shrink the size of the sampler.
490  *
491  * @param new_size the new size of the sampler
492  */
493 void
494 RPS_sampler_resize (unsigned int new_size)
495 {
496   GNUNET_assert (0 < new_size);
497   sampler_resize (new_size);
498 }
499
500
501 /**
502  * Empty the sampler.
503  *
504  * @param new_size the new size of the sampler
505  */
506 static void
507 sampler_empty ()
508 {
509   sampler_resize (0);
510 }
511
512
513 /**
514  * Initialise a tuple of sampler elements.
515  *
516  * @param init_size the size the sampler is initialised with
517  * @param ins_cb the callback that will be called on every PeerID that is
518  *               newly inserted into a sampler element
519  * @param ins_cls the closure given to #ins_cb
520  * @param rem_cb the callback that will be called on every PeerID that is
521  *               removed from a sampler element
522  * @param rem_cls the closure given to #rem_cb
523  */
524   void
525 RPS_sampler_init (size_t init_size,
526     struct GNUNET_TIME_Relative max_round_interval,
527     RPS_sampler_insert_cb ins_cb, void *ins_cls,
528     RPS_sampler_remove_cb rem_cb, void *rem_cls)
529 {
530   //struct RPS_Sampler *sampler;
531   //uint32_t i;
532
533   /* Initialise context around extended sampler */
534   min_size = 10; // TODO make input to _samplers_init()
535   max_size = 1000; // TODO make input to _samplers_init()
536
537   sampler = GNUNET_new (struct RPS_Sampler);
538   sampler->sampler_size = 0;
539   sampler->sampler_elements = NULL;
540   sampler->max_round_interval = max_round_interval;
541   sampler->insert_cb = ins_cb;
542   sampler->insert_cls = ins_cls;
543   sampler->remove_cb = rem_cb;
544   sampler->remove_cls = rem_cls;
545   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
546   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
547   RPS_sampler_resize (init_size);
548
549   client_get_index = 0;
550
551   //GNUNET_assert (init_size == sampler->sampler_size);
552 }
553
554
555 /**
556  * A fuction to update every sampler in the given list
557  *
558  * @param id the PeerID that is put in the sampler
559  */
560   void
561 RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
562 {
563   uint32_t i;
564
565   for ( i = 0 ; i < sampler->sampler_size ; i++ )
566     RPS_sampler_elem_next (sampler->sampler_elements[i], id,
567         sampler->insert_cb, sampler->insert_cls,
568         sampler->remove_cb, sampler->remove_cls);
569 }
570
571
572 /**
573  * Reinitialise all previously initialised sampler elements with the given value.
574  *
575  * Used to get rid of a PeerID.
576  *
577  * @param id the id of the sampler elements to update.
578  */
579   void
580 RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
581 {
582   uint32_t i;
583
584   for ( i = 0 ; i < sampler->sampler_size ; i++ )
585   {
586     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
587     {
588       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
589       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
590     }
591   }
592 }
593
594
595 /**
596  * Get one random peer out of the sampled peers.
597  *
598  * We might want to reinitialise this sampler after giving the
599  * corrsponding peer to the client.
600  * Only used internally
601  */
602 static void
603 sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
604 {
605   struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
606   uint32_t r_index;
607
608   gpc->get_peer_task = NULL;
609   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
610   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
611     return;
612
613   /**;
614    * Choose the r_index of the peer we want to return
615    * at random from the interval of the gossip list
616    */
617   r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
618       sampler->sampler_size);
619
620   if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
621   {
622     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
623                                                                    GNUNET_TIME_UNIT_SECONDS,
624                                                                    .1),
625                                                        &sampler_get_rand_peer2,
626                                                        cls);
627     return;
628   }
629
630   *gpc->id = sampler->sampler_elements[r_index]->peer_id;
631
632   gpc->cont (gpc->cont_cls, gpc->id);
633   GNUNET_free (gpc);
634 }
635
636
637 /**
638  * Get one random peer out of the sampled peers.
639  *
640  * We might want to reinitialise this sampler after giving the
641  * corrsponding peer to the client.
642  *
643  * @return a random PeerID of the PeerIDs previously put into the sampler.
644  */
645 static void
646 sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
647 {
648   struct GetPeerCls *gpc = (struct GetPeerCls *) cls;
649   struct GNUNET_PeerIdentity tmp_id;
650   struct RPS_SamplerElement *s_elem;
651   struct GNUNET_TIME_Relative last_request_diff;
652   uint32_t tmp_client_get_index;
653
654   gpc->get_peer_task = NULL;
655   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
656   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
657     return;
658
659   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
660
661
662   /* Store the next #client_get_index to check whether we cycled over the whole list */
663   if (0 < client_get_index)
664     tmp_client_get_index = client_get_index - 1;
665   else
666     tmp_client_get_index = sampler->sampler_size - 1;
667
668   LOG (GNUNET_ERROR_TYPE_DEBUG,
669       "scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
670       tmp_client_get_index, sampler->sampler_size);
671
672   do
673   { /* Get first non empty sampler */
674     if (tmp_client_get_index == client_get_index)
675     {
676       LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n", client_get_index);
677       GNUNET_assert (NULL == gpc->get_peer_task);
678       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
679                                                          &sampler_get_rand_peer,
680                                                          cls);
681       return;
682     }
683
684     tmp_id = sampler->sampler_elements[client_get_index]->peer_id;
685     RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]);
686     RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id,
687                            NULL, NULL, NULL, NULL);
688
689     /* Cycle the #client_get_index one step further */
690     if ( client_get_index == sampler->sampler_size - 1 )
691       client_get_index = 0;
692     else
693       client_get_index++;
694
695     LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n", client_get_index);
696   } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty);
697
698   s_elem = sampler->sampler_elements[client_get_index];
699   *gpc->id = s_elem->peer_id;
700
701   /* Check whether we may use this sampler to give it back to the client */
702   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
703   {
704     last_request_diff = GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
705                                                              GNUNET_TIME_absolute_get ());
706     /* We're not going to give it back now if it was already requested by a client this round */
707     if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us)
708     {
709       LOG (GNUNET_ERROR_TYPE_DEBUG,
710           "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
711       ///* How many time remains untile the next round has started? */
712       //inv_last_request_diff = GNUNET_TIME_absolute_get_difference (last_request_diff,
713       //                                                             sampler->max_round_interval);
714       // add a little delay
715       /* Schedule it one round later */
716       GNUNET_assert (NULL == gpc->get_peer_task);
717       gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval,
718                                               &sampler_get_rand_peer,
719                                               cls);
720       return;
721     }
722     // TODO add other reasons to wait here
723   }
724
725   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
726
727   gpc->cont (gpc->cont_cls, gpc->id);
728   GNUNET_free (gpc);
729 }
730
731
732 /**
733  * Get n random peers out of the sampled peers.
734  *
735  * We might want to reinitialise this sampler after giving the
736  * corrsponding peer to the client.
737  * Random with or without consumption?
738  *
739  * @param cb callback that will be called once the ids are ready.
740  * @param cls closure given to @a cb
741  * @param for_client #GNUNET_YES if result is used for client,
742  *                   #GNUNET_NO if used internally
743  * @param num_peers the number of peers requested
744  */
745   void
746 RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb,
747                               void *cls, uint32_t num_peers, int for_client)
748 {
749   GNUNET_assert (0 != sampler->sampler_size);
750
751   // TODO check if we have too much (distinct) sampled peers
752   uint32_t i;
753   struct NRandPeersReadyCls *cb_cls;
754   struct GetPeerCls *gpc;
755
756   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
757   cb_cls->num_peers = num_peers;
758   cb_cls->cur_num_peers = 0;
759   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
760   cb_cls->callback = cb;
761   cb_cls->cls = cls;
762
763   LOG (GNUNET_ERROR_TYPE_DEBUG,
764       "Scheduling requests for %" PRIX32 " peers\n", num_peers);
765
766   for ( i = 0 ; i < num_peers ; i++ )
767   {
768     gpc = GNUNET_new (struct GetPeerCls);
769     gpc->cont = check_n_peers_ready;
770     gpc->cont_cls = cb_cls;
771     gpc->id = &cb_cls->ids[i];
772
773     // maybe add a little delay
774     if (GNUNET_YES == for_client)
775       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer, gpc);
776     else if (GNUNET_NO == for_client)
777       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer2, gpc);
778     else
779       GNUNET_abort ();
780
781     GNUNET_CONTAINER_DLL_insert (gpc_head, gpc_tail, gpc);
782   }
783 }
784
785
786 /**
787  * Counts how many Samplers currently hold a given PeerID.
788  *
789  * @param id the PeerID to count.
790  *
791  * @return the number of occurrences of id.
792  */
793   uint32_t
794 RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
795 {
796   uint32_t count;
797   uint32_t i;
798
799   count = 0;
800   for ( i = 0 ; i < sampler->sampler_size ; i++ )
801   {
802     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
803         && EMPTY != sampler->sampler_elements[i]->is_empty)
804       count++;
805   }
806   return count;
807 }
808
809
810 /**
811  * Cleans the sampler.
812  */
813   void
814 RPS_sampler_destroy ()
815 {
816   struct GetPeerCls *i;
817
818   for (i = gpc_head; NULL != i; i = gpc_head)
819   {
820     GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, i);
821     GNUNET_SCHEDULER_cancel (i->get_peer_task);
822     GNUNET_free (i);
823   }
824
825   sampler_empty ();
826 }
827
828 /* end of gnunet-service-rps.c */