- refactor to check messages from both enc systems
[oweals/gnunet.git] / src / rps / gnunet-service-rps_sampler.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps_sampler.c
23  * @brief sampler implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29
30 #include "gnunet-service-rps_sampler.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log_from(kind,"rps-sampler",__VA_ARGS__)
36
37 // multiple 'clients'?
38
39 // TODO check for overflows
40
41 // TODO align message structs
42
43 // hist_size_init, hist_size_max
44
45 /***********************************************************************
46  * WARNING: This section needs to be reviewed regarding the use of
47  * functions providing (pseudo)randomness!
48 ***********************************************************************/
49
50 // TODO care about invalid input of the caller (size 0 or less...)
51
52 enum RPS_SamplerEmpty
53 {
54   NOT_EMPTY = 0x0,
55       EMPTY = 0x1
56 };
57
58 /**
59  * A sampler element sampling one PeerID at a time.
60  */
61 struct RPS_SamplerElement
62 {
63   /**
64    * Min-wise linear permutation used by this sampler.
65    *
66    * This is an key later used by a hmac.
67    */
68   struct GNUNET_CRYPTO_AuthKey auth_key;
69
70   /**
71    * The PeerID this sampler currently samples.
72    */
73   struct GNUNET_PeerIdentity peer_id;
74
75   /**
76    * The according hash value of this PeerID.
77    */
78   struct GNUNET_HashCode peer_id_hash;
79
80
81   /**
82    * Time of last request.
83    */
84   struct GNUNET_TIME_Absolute last_client_request;
85
86   /**
87    * Flag that indicates that we are not holding a valid PeerID right now.
88    */
89   enum RPS_SamplerEmpty is_empty;
90
91   /**
92    * 'Birth'
93    */
94   struct GNUNET_TIME_Absolute birth;
95
96   /**
97    * How many times a PeerID was put in this sampler.
98    */
99   uint32_t num_peers;
100
101   /**
102    * How many times this sampler changed the peer_id.
103    */
104   uint32_t num_change;
105 };
106
107
108 /**
109  * Sampler with its own array of SamplerElements
110  */
111 struct RPS_Sampler
112 {
113   /**
114    * Number of sampler elements we hold.
115    */
116   unsigned int sampler_size;
117   //size_t size;
118
119   /**
120    * All Samplers in one array.
121    */
122   struct RPS_SamplerElement **sampler_elements;
123
124   /**
125    * Max time a round takes
126    *
127    * Used in the context of RPS
128    */
129   struct GNUNET_TIME_Relative max_round_interval;
130
131   #ifdef TO_FILE
132   /**
133    * File name to log to
134    */
135   char *file_name;
136   #endif /* TO_FILE */
137 };
138
139 /**
140  * Closure to _get_n_rand_peers_ready_cb()
141  */
142 struct NRandPeersReadyCls
143 {
144   /**
145    * Number of peers we are waiting for.
146    */
147   uint32_t num_peers;
148
149   /**
150    * Number of peers we currently have.
151    */
152   uint32_t cur_num_peers;
153
154   /**
155    * Pointer to the array holding the ids.
156    */
157   struct GNUNET_PeerIdentity *ids;
158
159   /**
160    * Callback to be called when all ids are available.
161    */
162   RPS_sampler_n_rand_peers_ready_cb callback;
163
164   /**
165    * Closure given to the callback
166    */
167   void *cls;
168 };
169
170 /**
171  * Callback that is called from _get_rand_peer() when the PeerID is ready.
172  *
173  * @param cls the closure given alongside this function.
174  * @param id the PeerID that was returned
175  */
176 typedef void
177 (*RPS_sampler_rand_peer_ready_cont) (void *cls,
178         const struct GNUNET_PeerIdentity *id);
179
180 /**
181  * Closure for #sampler_get_rand_peer()
182  */
183 struct GetPeerCls
184 {
185   /**
186    * DLL
187    */
188   struct GetPeerCls *next;
189
190   /**
191    * DLL
192    */
193   struct GetPeerCls *prev;
194
195   /**
196    * The sampler this function operates on.
197    */
198   struct RPS_Sampler *sampler;
199
200   /**
201    * The task for this function.
202    */
203   struct GNUNET_SCHEDULER_Task *get_peer_task;
204
205   /**
206    * The callback
207    */
208   RPS_sampler_rand_peer_ready_cont cont;
209
210   /**
211    * The closure to the callback @e cont
212    */
213   void *cont_cls;
214
215   /**
216    * The address of the id to be stored at
217    */
218   struct GNUNET_PeerIdentity *id;
219 };
220
221
222 ///**
223 // * Global sampler variable.
224 // */
225 //struct RPS_Sampler *sampler;
226
227
228 /**
229  * The minimal size for the extended sampler elements.
230  */
231 static size_t min_size;
232
233 /**
234  * The maximal size the extended sampler elements should grow to.
235  */
236 static size_t max_size;
237
238 /**
239  * The size the extended sampler elements currently have.
240  */
241 //static size_t extra_size;
242
243 /**
244  * Inedex to the sampler element that is the next to be returned
245  */
246 static uint32_t client_get_index;
247
248
249 #ifdef TO_FILE
250 /**
251  * This function is used to facilitate writing important information to disk
252  */
253 #define to_file(file_name, ...) do {char tmp_buf[512];\
254   int size;\
255   size = GNUNET_snprintf(tmp_buf,sizeof(tmp_buf),__VA_ARGS__);\
256   if (0 > size)\
257     LOG (GNUNET_ERROR_TYPE_WARNING,\
258          "Failed to create tmp_buf\n");\
259   else\
260     to_file_(file_name,tmp_buf);\
261 } while (0);
262
263 static void
264 to_file_ (char *file_name, char *line)
265 {
266   struct GNUNET_DISK_FileHandle *f;
267   char output_buffer[512];
268   //size_t size;
269   int size;
270   size_t size2;
271
272
273   if (NULL == (f = GNUNET_DISK_file_open (file_name,
274                                           GNUNET_DISK_OPEN_APPEND |
275                                           GNUNET_DISK_OPEN_WRITE |
276                                           GNUNET_DISK_OPEN_CREATE,
277                                           GNUNET_DISK_PERM_USER_WRITE)))
278   {
279     LOG (GNUNET_ERROR_TYPE_WARNING,
280          "Not able to open file %s\n",
281          file_name);
282     return;
283   }
284   size = GNUNET_snprintf (output_buffer,
285                           sizeof (output_buffer),
286                           "%llu %s\n",
287                           GNUNET_TIME_absolute_get ().abs_value_us,
288                           line);
289   if (0 > size)
290   {
291     LOG (GNUNET_ERROR_TYPE_WARNING,
292          "Failed to write string to buffer (size: %i)\n",
293          size);
294     return;
295   }
296
297   size2 = GNUNET_DISK_file_write (f, output_buffer, size);
298   if (size != size2)
299   {
300     LOG (GNUNET_ERROR_TYPE_WARNING,
301          "Unable to write to file! (Size: %u, size2: %u)\n",
302          size,
303          size2);
304     return;
305   }
306
307   if (GNUNET_YES != GNUNET_DISK_file_close (f))
308     LOG (GNUNET_ERROR_TYPE_WARNING,
309          "Unable to close file\n");
310 }
311 #endif /* TO_FILE */
312
313
314 /** FIXME document */
315 static struct GetPeerCls *gpc_head;
316 static struct GetPeerCls *gpc_tail;
317
318
319 /**
320  * Callback to _get_rand_peer() used by _get_n_rand_peers().
321  *
322  * Checks whether all n peers are available. If they are,
323  * give those back.
324  */
325 static void
326 check_n_peers_ready (void *cls,
327     const struct GNUNET_PeerIdentity *id)
328 {
329   struct NRandPeersReadyCls *n_peers_cls = cls;
330
331   n_peers_cls->cur_num_peers++;
332   LOG (GNUNET_ERROR_TYPE_DEBUG,
333       "Got %" PRIX32 ". of %" PRIX32 " peers\n",
334       n_peers_cls->cur_num_peers, n_peers_cls->num_peers);
335
336   if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers)
337   { /* All peers are ready -- return those to the client */
338     GNUNET_assert (NULL != n_peers_cls->callback);
339
340     LOG (GNUNET_ERROR_TYPE_DEBUG,
341         "returning %" PRIX32 " peers to the client\n",
342         n_peers_cls->num_peers);
343     n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers);
344
345     GNUNET_free (n_peers_cls);
346   }
347 }
348
349
350 /**
351  * Reinitialise a previously initialised sampler element.
352  *
353  * @param sampler pointer to the memory that keeps the value.
354  */
355   static void
356 RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
357 {
358   sampler_el->is_empty = EMPTY;
359
360   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
361   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
362                              &(sampler_el->auth_key.key),
363                              GNUNET_CRYPTO_HASH_LENGTH);
364
365   sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS;
366
367   sampler_el->birth = GNUNET_TIME_absolute_get ();
368   sampler_el->num_peers = 0;
369   sampler_el->num_change = 0;
370 }
371
372
373 /**
374  * (Re)Initialise given Sampler with random min-wise independent function.
375  *
376  * In this implementation this means choosing an auth_key for later use in
377  * a hmac at random.
378  *
379  * @return a newly created RPS_SamplerElement which currently holds no id.
380  */
381   struct RPS_SamplerElement *
382 RPS_sampler_elem_create (void)
383 {
384   struct RPS_SamplerElement *s;
385
386   s = GNUNET_new (struct RPS_SamplerElement);
387
388   RPS_sampler_elem_reinit (s);
389
390   return s;
391 }
392
393
394 /**
395  * Input an PeerID into the given sampler element.
396  *
397  * @param sampler the sampler the @a s_elem belongs to.
398  *                Needed to know the
399  */
400 static void
401 RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem,
402                        struct RPS_Sampler *sampler,
403                        const struct GNUNET_PeerIdentity *other)
404 {
405   struct GNUNET_HashCode other_hash;
406
407   s_elem->num_peers++;
408
409   if (0 == GNUNET_CRYPTO_cmp_peer_identity (other, &(s_elem->peer_id)))
410   {
411     LOG (GNUNET_ERROR_TYPE_DEBUG, "         Got PeerID %s\n",
412         GNUNET_i2s (other));
413     LOG (GNUNET_ERROR_TYPE_DEBUG, "Have already PeerID %s\n",
414         GNUNET_i2s (&(s_elem->peer_id)));
415   }
416   else
417   {
418     GNUNET_CRYPTO_hmac(&s_elem->auth_key,
419         other,
420         sizeof(struct GNUNET_PeerIdentity),
421         &other_hash);
422
423     if (EMPTY == s_elem->is_empty)
424     {
425       LOG (GNUNET_ERROR_TYPE_DEBUG,
426            "Got PeerID %s; Simply accepting (was empty previously).\n",
427            GNUNET_i2s(other));
428       s_elem->peer_id = *other;
429       s_elem->peer_id_hash = other_hash;
430
431       s_elem->num_change++;
432     }
433     else if (0 > GNUNET_CRYPTO_hash_cmp (&other_hash, &s_elem->peer_id_hash))
434     {
435       LOG (GNUNET_ERROR_TYPE_DEBUG, "           Got PeerID %s\n",
436           GNUNET_i2s (other));
437       LOG (GNUNET_ERROR_TYPE_DEBUG, "Discarding old PeerID %s\n",
438           GNUNET_i2s (&s_elem->peer_id));
439       s_elem->peer_id = *other;
440       s_elem->peer_id_hash = other_hash;
441
442       s_elem->num_change++;
443     }
444     else
445     {
446       LOG (GNUNET_ERROR_TYPE_DEBUG, "        Got PeerID %s\n",
447           GNUNET_i2s (other));
448       LOG (GNUNET_ERROR_TYPE_DEBUG, "Keeping old PeerID %s\n",
449           GNUNET_i2s (&s_elem->peer_id));
450     }
451   }
452   s_elem->is_empty = NOT_EMPTY;
453 }
454
455
456 /**
457  * Get the size of the sampler.
458  *
459  * @param sampler the sampler to return the size of.
460  * @return the size of the sampler
461  */
462 unsigned int
463 RPS_sampler_get_size (struct RPS_Sampler *sampler)
464 {
465   return sampler->sampler_size;
466 }
467
468
469 /**
470  * Grow or shrink the size of the sampler.
471  *
472  * @param sampler the sampler to resize.
473  * @param new_size the new size of the sampler
474  */
475 static void
476 sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
477 {
478   unsigned int old_size;
479   uint32_t i;
480
481   // TODO check min and max size
482
483   old_size = sampler->sampler_size;
484
485   if (old_size > new_size)
486   { /* Shrinking */
487     /* Temporary store those to properly call the removeCB on those later */
488
489     LOG (GNUNET_ERROR_TYPE_DEBUG,
490          "Shrinking sampler %d -> %d\n",
491          old_size,
492          new_size);
493     #ifdef TO_FILE
494     to_file (sampler->file_name,
495          "Shrinking sampler %d -> %d\n",
496          old_size,
497          new_size);
498     #endif /* TO_FILE */
499     GNUNET_array_grow (sampler->sampler_elements,
500         sampler->sampler_size,
501         new_size);
502     LOG (GNUNET_ERROR_TYPE_DEBUG,
503         "sampler->sampler_elements now points to %p\n",
504         sampler->sampler_elements);
505
506   }
507   else if (old_size < new_size)
508   { /* Growing */
509     LOG (GNUNET_ERROR_TYPE_DEBUG,
510          "Growing sampler %d -> %d\n",
511          old_size,
512          new_size);
513     #ifdef TO_FILE
514     to_file (sampler->file_name,
515          "Growing sampler %d -> %d\n",
516          old_size,
517          new_size);
518     #endif /* TO_FILE */
519     GNUNET_array_grow (sampler->sampler_elements,
520         sampler->sampler_size,
521         new_size);
522
523     for (i = old_size ; i < new_size ; i++)
524     { /* Add new sampler elements */
525       sampler->sampler_elements[i] = RPS_sampler_elem_create ();
526       #ifdef TO_FILE
527       to_file (sampler->file_name,
528                "%" PRIu32 ": Initialised empty sampler element\n",
529                i);
530                //"New sampler with key %s\n",
531                //GNUNET_h2s_full (sampler->sampler_elements[i]->auth_key));
532       #endif /* TO_FILE */
533     }
534   }
535   else
536   {
537     LOG (GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
538     return;
539   }
540
541   GNUNET_assert (sampler->sampler_size == new_size);
542 }
543
544
545 /**
546  * Grow or shrink the size of the sampler.
547  *
548  * @param sampler the sampler to resize.
549  * @param new_size the new size of the sampler
550  */
551 void
552 RPS_sampler_resize (struct RPS_Sampler *sampler, unsigned int new_size)
553 {
554   GNUNET_assert (0 < new_size);
555   sampler_resize (sampler, new_size);
556 }
557
558
559 /**
560  * Empty the sampler.
561  *
562  * @param sampler the sampler to empty.
563  * @param new_size the new size of the sampler
564  */
565 static void
566 sampler_empty (struct RPS_Sampler *sampler)
567 {
568   sampler_resize (sampler, 0);
569 }
570
571
572 /**
573  * Initialise a tuple of sampler elements.
574  *
575  * @param init_size the size the sampler is initialised with
576  * @param ins_cb the callback that will be called on every PeerID that is
577  *               newly inserted into a sampler element
578  * @param ins_cls the closure given to #ins_cb
579  * @param rem_cb the callback that will be called on every PeerID that is
580  *               removed from a sampler element
581  * @param rem_cls the closure given to #rem_cb
582  * @return a handle to a sampler that consists of sampler elements.
583  */
584 struct RPS_Sampler *
585 RPS_sampler_init (size_t init_size,
586     struct GNUNET_TIME_Relative max_round_interval)
587 {
588   struct RPS_Sampler *sampler;
589   //uint32_t i;
590
591   /* Initialise context around extended sampler */
592   min_size = 10; // TODO make input to _samplers_init()
593   max_size = 1000; // TODO make input to _samplers_init()
594
595   sampler = GNUNET_new (struct RPS_Sampler);
596
597   #ifdef TO_FILE
598   if (NULL == (sampler->file_name = GNUNET_DISK_mktemp ("sampler-")))
599     LOG (GNUNET_ERROR_TYPE_WARNING,
600          "Could not create file\n");
601   #endif /* TO_FILE */
602
603   sampler->sampler_size = 0;
604   sampler->sampler_elements = NULL;
605   sampler->max_round_interval = max_round_interval;
606   //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
607   //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size);
608   RPS_sampler_resize (sampler, init_size);
609
610   client_get_index = 0;
611
612   //GNUNET_assert (init_size == sampler->sampler_size);
613   return sampler;
614 }
615
616
617 /**
618  * A fuction to update every sampler in the given list
619  *
620  * @param sampler the sampler to update.
621  * @param id the PeerID that is put in the sampler
622  */
623   void
624 RPS_sampler_update (struct RPS_Sampler *sampler,
625                     const struct GNUNET_PeerIdentity *id)
626 {
627   uint32_t i;
628
629   for (i = 0 ; i < sampler->sampler_size ; i++)
630   {
631     RPS_sampler_elem_next (sampler->sampler_elements[i],
632                            sampler,
633                            id);
634     #ifdef TO_FILE
635     to_file (sampler->file_name,
636              "%" PRIu32 ": Now contains %s\n",
637              i,
638              GNUNET_i2s_full (&sampler->sampler_elements[i]->peer_id));
639     #endif /* TO_FILE */
640   }
641 }
642
643
644 /**
645  * Reinitialise all previously initialised sampler elements with the given value.
646  *
647  * Used to get rid of a PeerID.
648  *
649  * @param sampler the sampler to reinitialise a sampler element in.
650  * @param id the id of the sampler elements to update.
651  */
652   void
653 RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler,
654                                    const struct GNUNET_PeerIdentity *id)
655 {
656   uint32_t i;
657
658   for ( i = 0 ; i < sampler->sampler_size ; i++ )
659   {
660     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &(sampler->sampler_elements[i]->peer_id)) )
661     {
662       LOG (GNUNET_ERROR_TYPE_DEBUG, "Reinitialising sampler\n");
663       RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
664     }
665   }
666 }
667
668
669 /**
670  * Get one random peer out of the sampled peers.
671  *
672  * We might want to reinitialise this sampler after giving the
673  * corrsponding peer to the client.
674  * Only used internally
675  */
676 static void
677 sampler_get_rand_peer2 (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
678 {
679   struct GetPeerCls *gpc = cls;
680   uint32_t r_index;
681
682   gpc->get_peer_task = NULL;
683   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
684   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
685     return;
686
687   /**;
688    * Choose the r_index of the peer we want to return
689    * at random from the interval of the gossip list
690    */
691   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
692       gpc->sampler->sampler_size);
693
694   if ( EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty )
695   {
696     gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(
697                                                                    GNUNET_TIME_UNIT_SECONDS,
698                                                                    .1),
699                                                        &sampler_get_rand_peer2,
700                                                        cls);
701     return;
702   }
703
704   *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id;
705
706   gpc->cont (gpc->cont_cls, gpc->id);
707   GNUNET_free (gpc);
708 }
709
710
711 /**
712  * Get one random peer out of the sampled peers.
713  *
714  * We might want to reinitialise this sampler after giving the
715  * corrsponding peer to the client.
716  */
717 static void
718 sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
719 {
720   struct GetPeerCls *gpc = cls;
721   struct GNUNET_PeerIdentity tmp_id;
722   unsigned int empty_flag;
723   struct RPS_SamplerElement *s_elem;
724   struct GNUNET_TIME_Relative last_request_diff;
725   uint32_t tmp_client_get_index;
726
727   gpc->get_peer_task = NULL;
728   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
729     return;
730
731   LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n");
732
733
734   /* Store the next #client_get_index to check whether we cycled over the whole list */
735   if (0 < client_get_index)
736     tmp_client_get_index = client_get_index - 1;
737   else
738     tmp_client_get_index = gpc->sampler->sampler_size - 1;
739
740   LOG (GNUNET_ERROR_TYPE_DEBUG,
741       "sched for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n",
742       tmp_client_get_index, gpc->sampler->sampler_size);
743
744   do
745   { /* Get first non empty sampler */
746     if (tmp_client_get_index == client_get_index)
747     { /* We once cycled over the whole list */
748       LOG (GNUNET_ERROR_TYPE_DEBUG, "reached tmp_index %" PRIX32 ".\n",
749            client_get_index);
750       GNUNET_assert (NULL == gpc->get_peer_task);
751       gpc->get_peer_task =
752         GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
753                                       &sampler_get_rand_peer,
754                                       cls);
755       return;
756     }
757
758     tmp_id = gpc->sampler->sampler_elements[client_get_index]->peer_id;
759     empty_flag = gpc->sampler->sampler_elements[client_get_index]->is_empty;
760     RPS_sampler_elem_reinit (gpc->sampler->sampler_elements[client_get_index]);
761     if (EMPTY != empty_flag)
762       RPS_sampler_elem_next (gpc->sampler->sampler_elements[client_get_index],
763                              gpc->sampler,
764                              &tmp_id);
765
766     /* Cycle the #client_get_index one step further */
767     if ( client_get_index == gpc->sampler->sampler_size - 1 )
768       client_get_index = 0;
769     else
770       client_get_index++;
771
772     LOG (GNUNET_ERROR_TYPE_DEBUG, "incremented index to %" PRIX32 ".\n",
773          client_get_index);
774   } while (EMPTY == gpc->sampler->sampler_elements[client_get_index]->is_empty);
775
776   s_elem = gpc->sampler->sampler_elements[client_get_index];
777   *gpc->id = s_elem->peer_id;
778
779   /* Check whether we may use this sampler to give it back to the client */
780   if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us)
781   {
782     last_request_diff =
783       GNUNET_TIME_absolute_get_difference (s_elem->last_client_request,
784                                            GNUNET_TIME_absolute_get ());
785     /* We're not going to give it back now if it was
786      * already requested by a client this round */
787     if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us)
788     {
789       LOG (GNUNET_ERROR_TYPE_DEBUG,
790           "Last client request on this sampler was less than max round interval ago -- scheduling for later\n");
791       ///* How many time remains untile the next round has started? */
792       //inv_last_request_diff =
793       //  GNUNET_TIME_absolute_get_difference (last_request_diff,
794       //                                       sampler->max_round_interval);
795       // add a little delay
796       /* Schedule it one round later */
797       GNUNET_assert (NULL == gpc->get_peer_task);
798       gpc->get_peer_task =
799         GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval,
800                                       &sampler_get_rand_peer,
801                                       cls);
802       return;
803     }
804     // TODO add other reasons to wait here
805   }
806
807   s_elem->last_client_request = GNUNET_TIME_absolute_get ();
808
809   GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, gpc);
810   gpc->cont (gpc->cont_cls, gpc->id);
811   GNUNET_free (gpc);
812 }
813
814
815 /**
816  * Get n random peers out of the sampled peers.
817  *
818  * We might want to reinitialise this sampler after giving the
819  * corrsponding peer to the client.
820  * Random with or without consumption?
821  *
822  * @param sampler the sampler to get peers from.
823  * @param cb callback that will be called once the ids are ready.
824  * @param cls closure given to @a cb
825  * @param for_client #GNUNET_YES if result is used for client,
826  *                   #GNUNET_NO if used internally
827  * @param num_peers the number of peers requested
828  */
829   void
830 RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler,
831                               RPS_sampler_n_rand_peers_ready_cb cb,
832                               void *cls, uint32_t num_peers, int for_client)
833 {
834   GNUNET_assert (0 != sampler->sampler_size);
835
836   // TODO check if we have too much (distinct) sampled peers
837   uint32_t i;
838   struct NRandPeersReadyCls *cb_cls;
839   struct GetPeerCls *gpc;
840
841   cb_cls = GNUNET_new (struct NRandPeersReadyCls);
842   cb_cls->num_peers = num_peers;
843   cb_cls->cur_num_peers = 0;
844   cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
845   cb_cls->callback = cb;
846   cb_cls->cls = cls;
847
848   #ifdef TO_FILE
849   if (GNUNET_NO == for_client)
850   {
851     to_file (sampler->file_name,
852              "This sampler is probably for Brahms itself\n");
853   }
854   #endif /* TO_FILE */
855
856   LOG (GNUNET_ERROR_TYPE_DEBUG,
857       "Scheduling requests for %" PRIX32 " peers\n", num_peers);
858
859   for (i = 0 ; i < num_peers ; i++)
860   {
861     gpc = GNUNET_new (struct GetPeerCls);
862     gpc->sampler = sampler;
863     gpc->cont = check_n_peers_ready;
864     gpc->cont_cls = cb_cls;
865     gpc->id = &cb_cls->ids[i];
866
867     // maybe add a little delay
868     if (GNUNET_YES == for_client)
869       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer, gpc);
870     else if (GNUNET_NO == for_client)
871       gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&sampler_get_rand_peer2, gpc);
872     else
873       GNUNET_assert (0);
874
875     GNUNET_CONTAINER_DLL_insert (gpc_head, gpc_tail, gpc);
876   }
877 }
878
879
880 /**
881  * Counts how many Samplers currently hold a given PeerID.
882  *
883  * @param sampler the sampler to count ids in.
884  * @param id the PeerID to count.
885  *
886  * @return the number of occurrences of id.
887  */
888   uint32_t
889 RPS_sampler_count_id (struct RPS_Sampler *sampler,
890                       const struct GNUNET_PeerIdentity *id)
891 {
892   uint32_t count;
893   uint32_t i;
894
895   count = 0;
896   for ( i = 0 ; i < sampler->sampler_size ; i++ )
897   {
898     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity (&sampler->sampler_elements[i]->peer_id, id)
899         && EMPTY != sampler->sampler_elements[i]->is_empty)
900       count++;
901   }
902   return count;
903 }
904
905
906 /**
907  * Cleans the sampler.
908  */
909   void
910 RPS_sampler_destroy (struct RPS_Sampler *sampler)
911 {
912   struct GetPeerCls *i;
913
914   for (i = gpc_head; NULL != i; i = gpc_head)
915   {
916     GNUNET_CONTAINER_DLL_remove (gpc_head, gpc_tail, i);
917     GNUNET_SCHEDULER_cancel (i->get_peer_task);
918     GNUNET_free (i);
919   }
920
921   sampler_empty (sampler);
922   GNUNET_free (sampler);
923 }
924
925 /* end of gnunet-service-rps.c */