fixed compiler warning in tests
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // TODO modify @brief in every file
38
39 // TODO take care that messages are not longer than 64k
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO malicious peer
48
49 // TODO Change API to accept initialisation peers
50
51 /**
52  * Our configuration.
53  */
54 static const struct GNUNET_CONFIGURATION_Handle *cfg;
55
56 /**
57  * Our own identity.
58  */
59 static struct GNUNET_PeerIdentity *own_identity;
60
61
62   struct GNUNET_PeerIdentity *
63 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
64
65 /***********************************************************************
66  * Sampler
67  *
68  * WARNING: This section needs to be reviewed regarding the use of
69  * functions providing (pseudo)randomness!
70 ***********************************************************************/
71
72 // TODO care about invalid input of the caller (size 0 or less...)
73
74 // It might be interesting to formulate this independent of PeerIDs.
75
76 /**
77  * Callback that is called when a new PeerID is inserted into a sampler.
78  *
79  * @param cls the closure given alongside this function.
80  * @param id the PeerID that is inserted
81  * @param hash the hash the sampler produced of the PeerID
82  */
83 typedef void (* SAMPLER_insertCB) (void *cls,
84     const struct GNUNET_PeerIdentity *id,
85     struct GNUNET_HashCode hash);
86
87 /**
88  * Callback that is called when a new PeerID is removed from a sampler.
89  *
90  * @param cls the closure given alongside this function.
91  * @param id the PeerID that is removed
92  * @param hash the hash the sampler produced of the PeerID
93  */
94 typedef void (* SAMPLER_removeCB) (void *cls,
95     const struct GNUNET_PeerIdentity *id,
96     struct GNUNET_HashCode hash);
97
98 /**
99  * A sampler sampling PeerIDs.
100  */
101 struct Sampler
102 {
103   /**
104    * Min-wise linear permutation used by this sampler.
105    *
106    * This is an key later used by a hmac.
107    */
108   struct GNUNET_CRYPTO_AuthKey auth_key;
109
110   /**
111    * The PeerID this sampler currently samples.
112    */
113   struct GNUNET_PeerIdentity *peer_id;
114
115   /**
116    * The according hash value of this PeerID.
117    */
118   struct GNUNET_HashCode peer_id_hash;
119
120   /**
121    * Samplers are kept in a linked list.
122    */
123   struct Sampler *next;
124
125   /**
126    * Samplers are kept in a linked list.
127    */
128   struct Sampler *prev;
129
130 };
131
132 /**
133  * A n-tuple of samplers.
134  */
135 struct Samplers
136 {
137   /**
138    * Number of samplers we hold.
139    */
140   unsigned int size;
141   //size_t size;
142   
143   /**
144    * All PeerIDs in one array.
145    */
146   struct GNUNET_PeerIdentity *peer_ids;
147
148   /**
149    * Callback to be called when a peer gets inserted into a sampler.
150    */
151   SAMPLER_insertCB insertCB;
152
153   /**
154    * Closure to the insertCB.
155    */
156   void *insertCLS;
157
158   /**
159    * Callback to be called when a peer gets inserted into a sampler.
160    */
161   SAMPLER_removeCB removeCB;
162
163   /**
164    * Closure to the removeCB.
165    */
166   void *removeCLS;
167
168   /**
169    * The head of the DLL.
170    */
171   struct Sampler *head;
172
173   /**
174    * The tail of the DLL.
175    */
176   struct Sampler *tail;
177
178 };
179
180 /**
181  * Reinitialise a previously initialised sampler.
182  *
183  * @param sampler the sampler element.
184  * @param id pointer to the memory that keeps the value.
185  */
186   void
187 SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct GNUNET_PeerIdentity *id)
188 {
189   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
190   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
191                              &(sampler->auth_key.key),
192                              GNUNET_CRYPTO_HASH_LENGTH);
193
194   GNUNET_assert(NULL != id);
195   sampler->peer_id = id;
196   memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
197   // Maybe take a PeerID as second argument.
198
199   GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
200                      sizeof(struct GNUNET_PeerIdentity),
201                      &sampler->peer_id_hash);
202 }
203
204
205 /**
206  * (Re)Initialise given Sampler with random min-wise independent function.
207  *
208  * In this implementation this means choosing an auth_key for later use in
209  * a hmac at random.
210  *
211  * @param id pointer to the place where this sampler will store the PeerID.
212  *           This will be overwritten.
213  */
214   struct Sampler *
215 SAMPLER_init(struct GNUNET_PeerIdentity *id)
216 {
217   struct Sampler *s;
218   
219   s = GNUNET_new(struct Sampler);
220
221   SAMPLER_reinitialise_sampler (s, id);
222   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) \n",
223       GNUNET_i2s(s->peer_id), s->peer_id);
224
225   s->prev = NULL;
226   s->next = NULL;
227
228   return s;
229 }
230
231 /**
232  * Input an PeerID into the given sampler.
233  */
234   static void
235 SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
236     SAMPLER_insertCB insertCB, void *insertCLS,
237     SAMPLER_removeCB removeCB, void *removeCLS)
238   // TODO call update herein
239 {
240   struct GNUNET_HashCode other_hash;
241
242   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
243       GNUNET_i2s(other), other);
244   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
245       GNUNET_i2s(s->peer_id), s->peer_id);
246
247   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
248   {
249     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
250         GNUNET_i2s(other));
251     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
252         GNUNET_i2s(s->peer_id));
253   }
254   else
255   {
256     GNUNET_CRYPTO_hmac(&s->auth_key,
257         other,
258         sizeof(struct GNUNET_PeerIdentity),
259         &other_hash);
260
261     if ( NULL == s->peer_id )
262     { // Or whatever is a valid way to say
263       // "we have no PeerID at the moment"
264       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
265           GNUNET_i2s(other));
266       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
267       //s->peer_id = other;
268       s->peer_id_hash = other_hash;
269       if (NULL != insertCB)
270       {
271         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
272       }
273     }
274     else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
275     {
276       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
277           GNUNET_i2s(other));
278       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
279           GNUNET_i2s(s->peer_id));
280
281       if ( NULL != removeCB )
282       {
283         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
284             GNUNET_i2s(s->peer_id));
285         removeCB(removeCLS, s->peer_id, s->peer_id_hash);
286       }
287
288       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
289       //s->peer_id = other;
290       s->peer_id_hash = other_hash;
291
292       if ( NULL != insertCB )
293       {
294         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
295             GNUNET_i2s(s->peer_id));
296         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
297       }
298     }
299     else
300     {
301       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
302           GNUNET_i2s(other));
303       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
304           GNUNET_i2s(s->peer_id));
305     }
306   }
307 }
308
309 /**
310  * Gow or shrink the size of the tuple of samplers.
311  *
312  * @param samplers the samplers to grow
313  * @param new_size the new size of the samplers
314  * @param fill_up_id if growing, that has to point to a
315  *                   valid PeerID and will be used
316  *                   to initialise newly created samplers
317  */
318   void
319 SAMPLER_samplers_resize (struct Samplers * samplers,
320     unsigned int new_size,
321     struct GNUNET_PeerIdentity *fill_up_id)
322 {
323   if ( samplers->size == new_size )
324   {
325     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
326     return;
327   }
328
329   unsigned int old_size;
330   struct Sampler *iter;
331   uint64_t i;
332   struct Sampler *tmp;
333
334   old_size = samplers->size;
335   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
336
337   iter = samplers->head;
338
339   if ( new_size < old_size )
340   {
341     for ( i = new_size ; i < old_size ; i++ )
342     {/* Remove unneeded rest */
343       tmp = iter->next;
344       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
345       if (NULL != samplers->removeCB)
346         samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
347         // FIXME When this is called and counts the amount of peer_ids in the samplers
348         //       this gets a wrong number.
349       GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
350       GNUNET_free(iter);
351       iter = tmp;
352     }
353   }
354
355   GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
356   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
357
358   if ( new_size > old_size )
359   { /* Growing */
360     GNUNET_assert( NULL != fill_up_id );
361     for ( i = 0 ; i < new_size ; i++ )
362     { /* All samplers */
363       if ( i < old_size )
364       { /* Update old samplers */
365         iter->peer_id = &samplers->peer_ids[i];
366         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
367             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
368         iter = iter->next;
369       }
370       else
371       { /* Add new samplers */
372         memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
373         iter = SAMPLER_init(&samplers->peer_ids[i]);
374         if (NULL != samplers->insertCB)
375         {
376           samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
377         }
378         GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
379         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
380             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
381       }
382     }
383   }
384   else// if ( new_size < old_size )
385   { /* Shrinking */
386     for ( i = 0 ; i < new_size ; i++)
387     { /* All samplers */
388       tmp = iter->next;
389       /* Update remaining samplers */
390       iter->peer_id = &samplers->peer_ids[i];
391       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
392           i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
393
394       iter = tmp;
395     }
396   }
397
398   GNUNET_assert(samplers->size == new_size);
399   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
400 }
401
402
403 /**
404  * Initialise a tuple of samplers.
405  */
406 struct Samplers *
407 SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
408     SAMPLER_insertCB insertCB, void *insertCLS,
409     SAMPLER_removeCB removeCB, void *removeCLS)
410 {
411   struct Samplers *samplers;
412   //struct Sampler *s;
413   //uint64_t i;
414
415   samplers = GNUNET_new(struct Samplers);
416   samplers->size = 0;
417   samplers->peer_ids = NULL;
418   samplers->insertCB = insertCB;
419   samplers->insertCLS = insertCLS;
420   samplers->removeCB = removeCB;
421   samplers->removeCLS = removeCLS;
422   samplers->head = samplers->tail = NULL;
423   //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
424
425   SAMPLER_samplers_resize(samplers, init_size, id);
426
427   GNUNET_assert(init_size == samplers->size);
428   return samplers;
429 }
430
431
432 /**
433  * A fuction to update every sampler in the given list
434  */
435   static void
436 SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
437 {
438   struct Sampler *iter;
439
440   iter = samplers->head;
441   while ( NULL != iter->next )
442   {
443     SAMPLER_next(iter, id,
444         samplers->insertCB, samplers->insertCLS,
445         samplers->removeCB, samplers->removeCLS);
446     iter = iter->next;
447   }
448   
449 }
450
451 /**
452  * Reinitialise all previously initialised sampler with the given value.
453  *
454  * @param samplers the sampler list.
455  * @param id the id of the samplers to update.
456  */
457   void
458 SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
459 {
460   uint64_t i;
461   struct Sampler *iter;
462
463   iter = samplers->head;
464   for ( i = 0 ; i < samplers->size ; i++ )
465   {
466     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
467     {
468       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
469       SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
470     }
471     if (NULL != iter->next)
472       iter = iter->next;
473   }
474 }
475
476 /**
477  * Get one random peer out of the sampled peers.
478  *
479  * We might want to reinitialise this sampler after giving the
480  * corrsponding peer to the client.
481  */
482   const struct GNUNET_PeerIdentity* 
483 SAMPLER_get_rand_peer (struct Samplers *samplers)
484 {
485   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
486
487   if ( 0 == samplers->size )
488   {
489     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID %s\n", GNUNET_i2s(own_identity));
490     return own_identity;
491   }
492   else
493   {
494     const struct GNUNET_PeerIdentity *peer;
495
496     peer = get_rand_peer(samplers->peer_ids, samplers->size);
497     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
498     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", GNUNET_i2s(own_identity));
499
500     return peer;
501   }
502 }
503
504 /**
505  * Get n random peers out of the sampled peers.
506  *
507  * We might want to reinitialise this sampler after giving the
508  * corrsponding peer to the client.
509  * Random with or without consumption?
510  */
511   const struct GNUNET_PeerIdentity*  // TODO give back simple array
512 SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
513 {
514   // TODO check if we have too much (distinct) sampled peers
515   // If we are not ready yet maybe schedule for later
516   struct GNUNET_PeerIdentity *peers;
517   uint64_t i;
518   
519   peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
520
521   for ( i = 0 ; i < n ; i++ ) {
522     //peers[i] = SAMPLER_get_rand_peer(samplers);
523     memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct GNUNET_PeerIdentity));
524   }
525
526   // TODO something else missing?
527   return peers;
528 }
529
530 /**
531  * Counts how many Samplers currently hold a given PeerID.
532  */
533   uint64_t
534 SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
535 {
536   struct Sampler *iter;
537   uint64_t count;
538
539   iter = samplers->head;
540   count = 0;
541   while ( NULL != iter )
542   {
543     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
544       count++;
545     iter = iter->next;
546   }
547   return count;
548 }
549
550
551 /**
552  * Cleans the samplers.
553  * 
554  * @param samplers the samplers to clean up.
555  */
556   void
557 SAMPLER_samplers_destroy (struct Samplers *samplers)
558 {
559   SAMPLER_samplers_resize(samplers, 0, NULL);
560   GNUNET_free(samplers);
561 }
562
563 /***********************************************************************
564  * /Sampler
565 ***********************************************************************/
566
567
568
569 /***********************************************************************
570  * Housekeeping with peers
571 ***********************************************************************/
572
573 /**
574  * Struct used to store the context of a connected client.
575  */
576 struct client_ctx
577 {
578   /**
579    * The message queue to communicate with the client.
580    */
581   struct GNUNET_MQ_Handle *mq;
582 };
583
584 /**
585  * Used to keep track in what lists single peerIDs are.
586  */
587 enum in_list_flag // probably unneeded
588 {
589   in_other_sampler_list = 0x1,
590   in_other_gossip_list  = 0x2, // unneeded?
591   in_own_sampler_list   = 0x4,
592   in_own_gossip_list    = 0x8 // unneeded?
593 };
594
595 /**
596  * Struct used to keep track of other peer's status
597  *
598  * This is stored in a multipeermap.
599  */
600 struct peer_context
601 {
602   /**
603    * In own gossip/sampler list, in other's gossip/sampler list
604    */
605   uint32_t in_flags; // unneeded?
606
607   /**
608    * Message queue open to client
609    */
610   struct GNUNET_MQ_Handle *mq;
611
612   /**
613    * Channel open to client.
614    */
615   struct GNUNET_CADET_Channel *to_channel;
616
617   /**
618    * Channel open from client.
619    */
620   struct GNUNET_CADET_Channel *from_channel; // unneeded
621
622   /**
623    * This is pobably followed by 'statistical' data (when we first saw
624    * him, how did we get his ID, how many pushes (in a timeinterval),
625    * ...)
626    */
627 };
628
629 /***********************************************************************
630  * /Housekeeping with peers
631 ***********************************************************************/
632
633 /**
634  * Set of all peers to keep track of them.
635  */
636 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
637
638
639 /**
640  * The samplers.
641  */
642 static struct Samplers *sampler_list;
643
644
645 /**
646  * The gossiped list of peers.
647  */
648 static struct GNUNET_PeerIdentity *gossip_list;
649
650 /**
651  * Size of the gossiped list
652  */
653 static unsigned int gossip_list_size;
654
655
656 /**
657  * The estimated size of the network.
658  *
659  * Influenced by the stdev.
660  */
661 static unsigned int est_size;
662 //size_t est_size;
663
664
665 /**
666  * Percentage of total peer number in the gossip list
667  * to send random PUSHes to
668  */
669 static float alpha;
670
671 /**
672  * Percentage of total peer number in the gossip list
673  * to send random PULLs to
674  */
675 static float beta;
676
677 /**
678  * The percentage gamma of history updates.
679  * Simply 1 - alpha - beta
680  */
681
682
683
684
685 /**
686  * Identifier for the main task that runs periodically.
687  */
688 static struct GNUNET_SCHEDULER_Task * do_round_task;
689
690 /**
691  * Time inverval the do_round task runs in.
692  */
693 static struct GNUNET_TIME_Relative round_interval;
694
695
696
697 /**
698  * List to store peers received through pushes temporary.
699  */
700 static struct GNUNET_PeerIdentity *push_list;
701
702 /**
703  * Size of the push_list;
704  */
705 static unsigned int push_list_size;
706 //size_t push_list_size;
707
708 /**
709  * List to store peers received through pulls temporary.
710  */
711 static struct GNUNET_PeerIdentity *pull_list;
712
713 /**
714  * Size of the pull_list;
715  */
716 static unsigned int pull_list_size;
717 //size_t pull_list_size;
718
719
720 /**
721  * Handler to NSE.
722  */
723 static struct GNUNET_NSE_Handle *nse;
724
725 /**
726  * Handler to CADET.
727  */
728 static struct GNUNET_CADET_Handle *cadet_handle;
729
730
731 /***********************************************************************
732  * Util functions
733 ***********************************************************************/
734
735 /**
736  * Get random peer from the gossip list.
737  */
738   struct GNUNET_PeerIdentity *
739 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
740 {
741   uint64_t r_index;
742   struct GNUNET_PeerIdentity *peer;
743
744   // FIXME if we have only NULL in gossip list this will block
745   // but then we might have a problem nevertheless
746
747   do
748   {
749
750     /**;
751      * Choose the r_index of the peer we want to return
752      * at random from the interval of the gossip list
753      */
754     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
755                                      list_size);
756
757     peer = &(peer_list[r_index]);
758   } while (NULL == peer);
759
760   return peer;
761 }
762
763 /**
764  * Make sure the context of a given peer exists in the given peer_map.
765  */
766   void
767 touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
768 {
769   struct peer_context *ctx;
770
771   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
772   {
773     ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
774   }
775   else
776   {
777     ctx = GNUNET_new(struct peer_context);
778     ctx->in_flags = 0;
779     ctx->mq = NULL;
780     ctx->to_channel = NULL;
781     ctx->from_channel = NULL;
782     GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
783   }
784 }
785
786 /**
787  * Get the context of a peer. If not existing, create.
788  */
789   struct peer_context *
790 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
791 {
792   struct peer_context *ctx;
793
794   touch_peer_ctx(peer_map, peer);
795   ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
796   return ctx;
797 }
798
799 /**
800  * Get the channel of a peer. If not existing, create.
801  */
802   void
803 touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
804 {
805   struct peer_context *ctx;
806
807   ctx = get_peer_ctx (peer_map, peer);
808   if (NULL == ctx->to_channel)
809   {
810     ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
811                                                   GNUNET_RPS_CADET_PORT,
812                                                   GNUNET_CADET_OPTION_RELIABLE);
813     // do I have to explicitly put it in the peer_map?
814     GNUNET_CONTAINER_multipeermap_put(peer_map, peer, ctx,
815                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
816   }
817 }
818
819 /**
820  * Get the channel of a peer. If not existing, create.
821  */
822   struct GNUNET_CADET_Channel *
823 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
824 {
825   struct peer_context *ctx;
826
827   ctx = get_peer_ctx (peer_map, peer);
828   touch_channel(peer_map, peer);
829   return ctx->to_channel;
830 }
831
832 /**
833  * Make sure the mq for a given peer exists.
834  *
835  * If we already have a message queue open to this client,
836  * simply return it, otherways create one.
837  */
838   void
839 touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
840 {
841   struct peer_context *ctx;
842
843   ctx = get_peer_ctx(peer_map, peer_id);
844   if (NULL == ctx->mq)
845   {
846     touch_channel(peer_map, peer_id);
847     ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
848     //do I have to explicitly put it in the peer_map?
849     GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
850                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
851   }
852 }
853
854 /**
855  * Get the message queue of a specific peer.
856  *
857  * If we already have a message queue open to this client,
858  * simply return it, otherways create one.
859  */
860   struct GNUNET_MQ_Handle *
861 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
862 {
863   struct peer_context *ctx;
864
865   ctx = get_peer_ctx(peer_map, peer_id);
866   touch_mq(peer_map, peer_id);
867
868   return ctx->mq;
869 }
870
871 /***********************************************************************
872  * /Util functions
873 ***********************************************************************/
874
875 /**
876  * Function called by NSE.
877  *
878  * Updates sizes of sampler list and gossip list and adapt those lists
879  * accordingly.
880  */
881   void
882 nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
883 {
884   double estimate;
885   //double scale; // TODO this might go gloabal/config
886
887   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f\n", logestimate, std_dev);
888   //scale = .01;
889   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
890   // GNUNET_NSE_log_estimate_to_n (logestimate);
891   estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
892   if ( 0 < estimate ) {
893     LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
894     est_size = estimate;
895   } else {
896     LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
897   }
898 }
899
900 /**
901  * Handle RPS request from the client.
902  *
903  * @param cls closure
904  * @param client identification of the client
905  * @param message the actual message
906  */
907 static void
908 // TODO rename
909 handle_cs_request (void *cls,
910             struct GNUNET_SERVER_Client *client,
911             const struct GNUNET_MessageHeader *message)
912 {
913   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
914
915   struct GNUNET_RPS_CS_RequestMessage *msg;
916   //unsigned int n_arr[sampler_list->size];// =
917     //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
918   //struct GNUNET_MQ_Handle *mq;
919   struct client_ctx *cli_ctx;
920   struct GNUNET_MQ_Envelope *ev;
921   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
922   uint64_t num_peers;
923   //uint64_t i;
924
925   // TODO
926   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
927   cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
928   if ( NULL == cli_ctx ) {
929     cli_ctx = GNUNET_new(struct client_ctx);
930     cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
931     GNUNET_SERVER_client_set_user_context(client, cli_ctx);
932   }
933   
934   // How many peers do we give back?
935   // Wait until we have enough random peers?
936
937   ev = GNUNET_MQ_msg_extra(out_msg,
938                            GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity),
939                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
940   out_msg->num_peers = msg->num_peers; // No conversion between network and host order
941
942   num_peers = GNUNET_ntohll(msg->num_peers);
943   //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
944   memcpy(&out_msg[1],
945       SAMPLER_get_n_rand_peers(sampler_list, num_peers),
946       num_peers * sizeof(struct GNUNET_PeerIdentity));
947   
948   GNUNET_MQ_send(cli_ctx->mq, ev);
949   //GNUNET_MQ_destroy(mq);
950
951   GNUNET_SERVER_receive_done (client,
952                               GNUNET_OK);
953 }
954
955 /**
956  * Handle a PUSH message from another peer.
957  *
958  * Check the proof of work and store the PeerID
959  * in the temporary list for pushed PeerIDs.
960  *
961  * @param cls Closure
962  * @param channel The channel the PUSH was received over
963  * @param channel_ctx The context associated with this channel
964  * @param msg The message header
965  */
966 static int
967 handle_peer_push (void *cls,
968     struct GNUNET_CADET_Channel *channel,
969     void **channel_ctx,
970     const struct GNUNET_MessageHeader *msg)
971 {
972   LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
973
974   struct GNUNET_PeerIdentity *peer;
975
976   // TODO check the proof of work
977   
978   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( channel, GNUNET_CADET_OPTION_PEER );
979   
980   /* Add the sending peer to the push_list */
981   LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size);
982   GNUNET_array_append(push_list, push_list_size, *peer);
983   LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size);
984
985   return GNUNET_OK;
986 }
987
988 /**
989  * Handle PULL REQUEST request message from another peer.
990  *
991  * Reply with the gossip list of PeerIDs.
992  *
993  * @param cls Closure
994  * @param channel The channel the PUSH was received over
995  * @param channel_ctx The context associated with this channel
996  * @param msg The message header
997  */
998 static int
999 handle_peer_pull_request (void *cls,
1000     struct GNUNET_CADET_Channel *channel,
1001     void **channel_ctx,
1002     const struct GNUNET_MessageHeader *msg)
1003 {
1004
1005   struct GNUNET_PeerIdentity *peer;
1006   struct GNUNET_MQ_Handle *mq;
1007   //struct GNUNET_RPS_P2P_PullRequestMessage *in_msg;
1008   struct GNUNET_MQ_Envelope *ev;
1009   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1010
1011   // find some way to keep one peer from spamming with pull requests?
1012   // allow only one request per time interval ?
1013   // otherwise remove from peerlist?
1014
1015   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER);
1016   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer));
1017
1018   //mq = GNUNET_CADET_mq_create(channel); // without mq?
1019   mq = get_mq(peer_map, peer);
1020
1021   //in_msg = (struct GNUNET_RPS_P2P_PullRequestMessage *) msg;
1022   // TODO how many peers do we actually send?
1023   // GNUNET_ntohll(in_msg->num_peers)
1024   ev = GNUNET_MQ_msg_extra(out_msg,
1025                            gossip_list_size * sizeof(struct GNUNET_PeerIdentity),
1026                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1027   out_msg->num_peers = GNUNET_htonll(gossip_list_size);
1028   memcpy(&out_msg[1], gossip_list,
1029          gossip_list_size * sizeof(struct GNUNET_PeerIdentity));
1030
1031   GNUNET_MQ_send(mq, ev);
1032
1033   return GNUNET_OK;
1034 }
1035
1036 /**
1037  * Handle PULL REPLY message from another peer.
1038  *
1039  * Check whether we sent a corresponding request and
1040  * whether this reply is the first one.
1041  *
1042  * @param cls Closure
1043  * @param channel The channel the PUSH was received over
1044  * @param channel_ctx The context associated with this channel
1045  * @param msg The message header
1046  */
1047 static int
1048 handle_peer_pull_reply (void *cls,
1049     struct GNUNET_CADET_Channel *channel,
1050     void **channel_ctx,
1051     const struct GNUNET_MessageHeader *msg)
1052 {
1053   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1054
1055   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1056   struct GNUNET_PeerIdentity *peers;
1057   uint64_t i;
1058
1059   // TODO check that we sent a request and that it is the first reply
1060
1061   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1062   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1063   for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ )
1064   {
1065     GNUNET_array_append(pull_list, pull_list_size, peers[i]);
1066   }
1067
1068   return GNUNET_OK;
1069 }
1070
1071
1072 /**
1073  * Send out PUSHes and PULLs.
1074  *
1075  * This is executed regylary.
1076  */
1077 static void
1078 do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1079 {
1080   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
1081
1082   uint64_t i;
1083   //unsigned int *n_arr;
1084   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
1085   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
1086   struct GNUNET_MQ_Envelope *ev;
1087   struct GNUNET_PeerIdentity *peer;
1088
1089   // TODO print lists, ...
1090   // TODO cleanup peer_map
1091
1092
1093   /* If the NSE has changed adapt the lists accordingly */
1094   if ( sampler_list->size != est_size )
1095     SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1096
1097   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1098
1099
1100   /* Would it make sense to have one shuffeled gossip list and then
1101    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1102    * use the rest to update sampler?
1103    * in essence get random peers with consumption */
1104
1105   /* Send PUSHes */
1106   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
1107   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %f (%f * %u) peers.\n",
1108       alpha * gossip_list_size, alpha, gossip_list_size);
1109   for ( i = 0 ; i < alpha * gossip_list_size ; i++ )
1110   { // TODO compute length
1111     peer = get_rand_peer(gossip_list, gossip_list_size);
1112     // TODO check NULL == peer
1113     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1114
1115     ev = GNUNET_MQ_msg (push_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1116     //ev = GNUNET_MQ_msg_extra();
1117     /* TODO Compute proof of work here
1118     push_msg; */
1119     push_msg->placeholder = 0;
1120     // FIXME sometimes it returns a pointer to a freed mq
1121     GNUNET_MQ_send (get_mq (peer_map, peer), ev);
1122
1123     // modify in_flags of respective peer?
1124   }
1125
1126
1127   /* Send PULL requests */
1128   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1129   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %f (%f * %u) peers.\n",
1130       beta * gossip_list_size, beta, gossip_list_size);
1131   for ( i = 0 ; i < beta * gossip_list_size ; i++ )
1132   { // TODO compute length
1133     peer = get_rand_peer(gossip_list, gossip_list_size);
1134     // TODO check empty_peer != peer
1135     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1136
1137     ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1138     //ev = GNUNET_MQ_msg_extra();
1139     pull_msg->placeholder = 0;
1140     GNUNET_MQ_send( get_mq(peer_map, peer), ev );
1141     // modify in_flags of respective peer?
1142   }
1143
1144
1145   /* Update gossip list */
1146   uint64_t r_index;
1147
1148   if ( push_list_size <= alpha * gossip_list_size &&
1149        push_list_size != 0 &&
1150        pull_list_size != 0 )
1151   {
1152     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
1153
1154     uint64_t first_border;
1155     uint64_t second_border;
1156
1157     first_border = round(alpha * gossip_list_size);
1158     for ( i = 0 ; i < first_border ; i++ )
1159     { // TODO use SAMPLER_get_n_rand_peers
1160       /* Update gossip list with peers received through PUSHes */
1161       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1162                                        push_list_size);
1163       gossip_list[i] = push_list[r_index];
1164       // TODO change the in_flags accordingly
1165     }
1166
1167     second_border = first_border + round(beta * gossip_list_size);
1168     for ( i = first_border ; i < second_border ; i++ )
1169     {
1170       /* Update gossip list with peers received through PULLs */
1171       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1172                                        pull_list_size);
1173       gossip_list[i] = pull_list[r_index];
1174       // TODO change the in_flags accordingly
1175     }
1176
1177     for ( i = second_border ; i < gossip_list_size ; i++ )
1178     {
1179       /* Update gossip list with peers from history */
1180       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1181                                        sampler_list->size);
1182       gossip_list[i] = sampler_list->peer_ids[r_index];
1183       // TODO change the in_flags accordingly
1184     }
1185
1186   }
1187   else
1188   {
1189     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
1190   }
1191   // TODO independent of that also get some peers from CADET_get_peers()?
1192
1193
1194   /* Update samplers */
1195
1196   for ( i = 0 ; i < push_list_size ; i++ )
1197   {
1198     SAMPLER_update_list(sampler_list, &push_list[i]);
1199     // TODO set in_flag?
1200   }
1201
1202   for ( i = 0 ; i < pull_list_size ; i++ )
1203   {
1204     SAMPLER_update_list(sampler_list, &pull_list[i]);
1205     // TODO set in_flag?
1206   }
1207
1208
1209   /* Empty push/pull lists */
1210   GNUNET_array_grow(push_list, push_list_size, 0);
1211   push_list_size = 0; // I guess that's not necessary but doesn't hurt
1212   GNUNET_array_grow(pull_list, pull_list_size, 0);
1213   pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1214
1215
1216   /* Schedule next round */
1217   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL );
1218   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1219 }
1220
1221 /**
1222  * Open a connection to given peer and store channel and mq.
1223  */
1224   void
1225 insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1226 {
1227   // We open a channel to be notified when this peer goes down.
1228   touch_channel(peer_map, id);
1229 }
1230
1231 /**
1232  * Close the connection to given peer and delete channel and mq.
1233  */
1234   void
1235 removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1236 {
1237   size_t s;
1238   struct peer_context *ctx;
1239
1240   s = SAMPLER_count_id(sampler_list, id);
1241   if ( 1 >= s ) {
1242     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
1243     {
1244       ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
1245       if (NULL != ctx->to_channel)
1246       {
1247         if (NULL != ctx->mq)
1248         {
1249           GNUNET_MQ_destroy(ctx->mq);
1250         }
1251         GNUNET_CADET_channel_destroy(ctx->to_channel);
1252       }
1253       // TODO cleanup peer
1254       GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
1255     }
1256   }
1257 }
1258
1259 static void
1260 rps_start (struct GNUNET_SERVER_Handle *server);
1261
1262 /**
1263  * This is called from GNUNET_CADET_get_peers().
1264  *
1265  * It is called on every peer(ID) that cadet somehow has contact with.
1266  * We use those to initialise the sampler.
1267  */
1268 void
1269 init_peer_cb (void *cls,
1270               const struct GNUNET_PeerIdentity *peer,
1271               int tunnel, // "Do we have a tunnel towards this peer?"
1272               unsigned int n_paths, // "Number of known paths towards this peer"
1273               unsigned int best_path) // "How long is the best path?
1274                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1275 {
1276   if ( NULL != peer )
1277   {
1278     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1279     SAMPLER_update_list(sampler_list, peer);
1280     touch_peer_ctx(peer_map, peer);
1281
1282     uint64_t i;
1283     i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size);
1284     gossip_list[i] = *peer;
1285     // TODO send push/pull to each of those peers?
1286   }
1287   else
1288   {
1289     rps_start( (struct GNUNET_SERVER_Handle *) cls);
1290   }
1291 }
1292
1293
1294 /**
1295  * Task run during shutdown.
1296  *
1297  * @param cls unused
1298  * @param tc unused
1299  */
1300 static void
1301 shutdown_task (void *cls,
1302                const struct GNUNET_SCHEDULER_TaskContext *tc)
1303 {
1304   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
1305
1306   if ( NULL != do_round_task )
1307   {
1308     GNUNET_SCHEDULER_cancel (do_round_task);
1309     do_round_task = NULL;
1310   }
1311
1312   GNUNET_NSE_disconnect(nse);
1313   GNUNET_CADET_disconnect(cadet_handle);
1314   GNUNET_free(own_identity);
1315   SAMPLER_samplers_destroy(sampler_list);
1316   GNUNET_array_grow(gossip_list, gossip_list_size, 0);
1317   GNUNET_array_grow(push_list, push_list_size, 0);
1318   GNUNET_array_grow(pull_list, pull_list_size, 0);
1319 }
1320
1321
1322 /**
1323  * A client disconnected.  Remove all of its data structure entries.
1324  *
1325  * @param cls closure, NULL
1326  * @param client identification of the client
1327  */
1328 static void
1329 handle_client_disconnect (void *cls,
1330                           struct GNUNET_SERVER_Client * client)
1331 {
1332 }
1333
1334 /**
1335  * Handle the channel a peer opens to us.
1336  *
1337  * @param cls The closure
1338  * @param channel The channel the peer wants to establish
1339  * @param initiator The peer's peer ID
1340  * @param port The port the channel is being established over
1341  * @param options Further options
1342  */
1343   static void *
1344 handle_inbound_channel (void *cls,
1345                         struct GNUNET_CADET_Channel *channel,
1346                         const struct GNUNET_PeerIdentity *initiator,
1347                         uint32_t port,
1348                         enum GNUNET_CADET_ChannelOption options)
1349 {
1350   LOG(GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us.\n");
1351
1352   GNUNET_assert( NULL != channel );
1353
1354   // TODO we might not even store the from_channel
1355
1356   if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
1357     ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel;
1358     // FIXME there might already be an established channel
1359   } else {
1360     struct peer_context *ctx;
1361
1362     ctx = GNUNET_malloc (sizeof(struct peer_context));
1363     ctx->in_flags = in_other_gossip_list;
1364     ctx->mq = NULL; // TODO create mq?
1365     ctx->from_channel = channel;
1366
1367     GNUNET_CONTAINER_multipeermap_put (peer_map, initiator, ctx,
1368                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1369   }
1370   return NULL; // TODO
1371 }
1372
1373 /**
1374  * This is called when a remote peer destroys a channel.
1375  *
1376  * @param cls The closure
1377  * @param channel The channel being closed
1378  * @param channel_ctx The context associated with this channel
1379  */
1380 static void
1381 cleanup_channel(void *cls,
1382                 const struct GNUNET_CADET_Channel *channel,
1383                 void *channel_ctx)
1384 {
1385   struct GNUNET_PeerIdentity *peer;
1386   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel to remote peer was destroyed.\n");
1387
1388   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1389       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1390        // Guess simply casting isn't the nicest way...
1391   SAMPLER_reinitialise_samplers_by_value(sampler_list, peer);
1392 }
1393
1394 /**
1395  * Actually start the service.
1396  */
1397 static void
1398 rps_start (struct GNUNET_SERVER_Handle *server)
1399 {
1400   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1401     {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
1402     {NULL, NULL, 0, 0}
1403   };
1404
1405   GNUNET_SERVER_add_handlers (server, handlers);
1406   GNUNET_SERVER_disconnect_notify (server,
1407                                    &handle_client_disconnect,
1408                                    NULL);
1409   LOG(GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
1410
1411
1412   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1413   LOG(GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
1414
1415   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1416                                 &shutdown_task,
1417                                 NULL);
1418 }
1419
1420
1421 /**
1422  * Process statistics requests.
1423  *
1424  * @param cls closure
1425  * @param server the initialized server
1426  * @param c configuration to use
1427  */
1428 static void
1429 run (void *cls,
1430      struct GNUNET_SERVER_Handle *server,
1431      const struct GNUNET_CONFIGURATION_Handle *c)
1432 {
1433   // TODO check what this does -- copied from gnunet-boss
1434   // - seems to work as expected
1435   GNUNET_log_setup("rps", GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG), NULL);
1436
1437   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1438
1439   cfg = c;
1440
1441
1442   own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
1443
1444   GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value
1445
1446   GNUNET_assert(NULL != own_identity);
1447
1448   LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1449
1450
1451
1452   /* Get time interval from the configuration */
1453   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1454                                                         "ROUNDINTERVAL",
1455                                                         &round_interval))
1456   {
1457     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
1458     GNUNET_SCHEDULER_shutdown();
1459     return;
1460   }
1461
1462   /* Get initial size of sampler/gossip list from the configuration */
1463   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1464                                                          "INITSIZE",
1465                                                          (long long unsigned int *) &est_size))
1466   {
1467     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1468     GNUNET_SCHEDULER_shutdown();
1469     return;
1470   }
1471   LOG(GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", est_size);
1472
1473   //gossip_list_size = est_size; // TODO rename est_size
1474
1475   gossip_list = NULL;
1476
1477   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1478
1479
1480   /* connect to NSE */
1481   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
1482   // TODO check whether that was successful
1483   // TODO disconnect on shutdown
1484   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1485
1486
1487   alpha = 0.45;
1488   beta  = 0.45;
1489   // TODO initialise thresholds - ?
1490
1491   ///* Get alpha from the configuration */
1492   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1493   //                                                       "ALPHA",
1494   //                                                       &alpha))
1495   //{
1496   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
1497   //}
1498   //LOG(GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
1499  
1500   ///* Get beta from the configuration */
1501   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1502   //                                                       "BETA",
1503   //                                                       &beta))
1504   //{
1505   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1506   //}
1507   //LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1508
1509
1510   peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
1511
1512
1513   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1514     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
1515     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
1516     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
1517     {NULL, 0, 0}
1518   };
1519
1520   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1521   cadet_handle = GNUNET_CADET_connect(cfg,
1522                                     cls,
1523                                     &handle_inbound_channel,
1524                                     &cleanup_channel,
1525                                     cadet_handlers,
1526                                     ports);
1527   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1528
1529
1530   /* Initialise sampler and gossip list */
1531
1532   sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL);
1533
1534   push_list = NULL;
1535   push_list_size = 0;
1536   pull_list = NULL;
1537   pull_list_size = 0;
1538
1539
1540   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1541   GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
1542   // FIXME use magic 0000 PeerID to _start_ the service
1543
1544   // TODO send push/pull to each of those peers?
1545 }
1546
1547
1548 /**
1549  * The main function for the rps service.
1550  *
1551  * @param argc number of arguments from the command line
1552  * @param argv command line arguments
1553  * @return 0 ok, 1 on error
1554  */
1555 int
1556 main (int argc, char *const *argv)
1557 {
1558   return (GNUNET_OK ==
1559           GNUNET_SERVICE_run (argc,
1560                               argv,
1561                               "rps",
1562                               GNUNET_SERVICE_OPTION_NONE,
1563                               &run, NULL)) ? 0 : 1;
1564 }
1565
1566 /* end of gnunet-service-rps.c */