fixed struct
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // TODO modify @brief in every file
38
39 // TODO take care that messages are not longer than 64k
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO malicious peer
48
49 // TODO Change API to accept initialisation peers
50
51 /**
52  * Our configuration.
53  */
54 static const struct GNUNET_CONFIGURATION_Handle *cfg;
55
56 /**
57  * Our own identity.
58  */
59 static struct GNUNET_PeerIdentity *own_identity;
60
61
62   struct GNUNET_PeerIdentity *
63 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
64
65 /***********************************************************************
66  * Sampler
67  *
68  * WARNING: This section needs to be reviewed regarding the use of
69  * functions providing (pseudo)randomness!
70 ***********************************************************************/
71
72 // TODO care about invalid input of the caller (size 0 or less...)
73
74 // It might be interesting to formulate this independent of PeerIDs.
75
76 /**
77  * Callback that is called when a new PeerID is inserted into a sampler.
78  *
79  * @param cls the closure given alongside this function.
80  * @param id the PeerID that is inserted
81  * @param hash the hash the sampler produced of the PeerID
82  */
83 typedef void (* SAMPLER_insertCB) (void *cls,
84     const struct GNUNET_PeerIdentity *id,
85     struct GNUNET_HashCode hash);
86
87 /**
88  * Callback that is called when a new PeerID is removed from a sampler.
89  *
90  * @param cls the closure given alongside this function.
91  * @param id the PeerID that is removed
92  * @param hash the hash the sampler produced of the PeerID
93  */
94 typedef void (* SAMPLER_removeCB) (void *cls,
95     const struct GNUNET_PeerIdentity *id,
96     struct GNUNET_HashCode hash);
97
98 /**
99  * A sampler sampling PeerIDs.
100  */
101 struct Sampler
102 {
103   /**
104    * Min-wise linear permutation used by this sampler.
105    *
106    * This is an key later used by a hmac.
107    */
108   struct GNUNET_CRYPTO_AuthKey auth_key;
109
110   /**
111    * The PeerID this sampler currently samples.
112    */
113   struct GNUNET_PeerIdentity *peer_id;
114
115   /**
116    * The according hash value of this PeerID.
117    */
118   struct GNUNET_HashCode peer_id_hash;
119
120   /**
121    * Samplers are kept in a linked list.
122    */
123   struct Sampler *next;
124
125   /**
126    * Samplers are kept in a linked list.
127    */
128   struct Sampler *prev;
129
130 };
131
132 /**
133  * A n-tuple of samplers.
134  */
135 struct Samplers
136 {
137   /**
138    * Number of samplers we hold.
139    */
140   unsigned int size;
141   //size_t size;
142   
143   /**
144    * All PeerIDs in one array.
145    */
146   struct GNUNET_PeerIdentity *peer_ids;
147
148   /**
149    * Callback to be called when a peer gets inserted into a sampler.
150    */
151   SAMPLER_insertCB insertCB;
152
153   /**
154    * Closure to the insertCB.
155    */
156   void *insertCLS;
157
158   /**
159    * Callback to be called when a peer gets inserted into a sampler.
160    */
161   SAMPLER_removeCB removeCB;
162
163   /**
164    * Closure to the removeCB.
165    */
166   void *removeCLS;
167
168   /**
169    * The head of the DLL.
170    */
171   struct Sampler *head;
172
173   /**
174    * The tail of the DLL.
175    */
176   struct Sampler *tail;
177
178 };
179
180 /**
181  * Reinitialise a previously initialised sampler.
182  *
183  * @param sampler the sampler element.
184  * @param id pointer to the memory that keeps the value.
185  */
186   void
187 SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct GNUNET_PeerIdentity *id)
188 {
189   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
190   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
191                              &(sampler->auth_key.key),
192                              GNUNET_CRYPTO_HASH_LENGTH);
193
194   GNUNET_assert(NULL != id);
195   sampler->peer_id = id;
196   memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
197   // Maybe take a PeerID as second argument.
198
199   GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
200                      sizeof(struct GNUNET_PeerIdentity),
201                      &sampler->peer_id_hash);
202 }
203
204
205 /**
206  * (Re)Initialise given Sampler with random min-wise independent function.
207  *
208  * In this implementation this means choosing an auth_key for later use in
209  * a hmac at random.
210  *
211  * @param id pointer to the place where this sampler will store the PeerID.
212  *           This will be overwritten.
213  */
214   struct Sampler *
215 SAMPLER_init(struct GNUNET_PeerIdentity *id)
216 {
217   struct Sampler *s;
218   
219   s = GNUNET_new(struct Sampler);
220
221   SAMPLER_reinitialise_sampler (s, id);
222   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) \n",
223       GNUNET_i2s(s->peer_id), s->peer_id);
224
225   s->prev = NULL;
226   s->next = NULL;
227
228   return s;
229 }
230
231 /**
232  * Input an PeerID into the given sampler.
233  */
234   static void
235 SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
236     SAMPLER_insertCB insertCB, void *insertCLS,
237     SAMPLER_removeCB removeCB, void *removeCLS)
238   // TODO call update herein
239 {
240   struct GNUNET_HashCode other_hash;
241
242   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
243       GNUNET_i2s(other), other);
244   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
245       GNUNET_i2s(s->peer_id), s->peer_id);
246
247   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
248   {
249     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
250         GNUNET_i2s(other));
251     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
252         GNUNET_i2s(s->peer_id));
253   }
254   else
255   {
256     GNUNET_CRYPTO_hmac(&s->auth_key,
257         other,
258         sizeof(struct GNUNET_PeerIdentity),
259         &other_hash);
260
261     if ( NULL == s->peer_id )
262     { // Or whatever is a valid way to say
263       // "we have no PeerID at the moment"
264       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
265           GNUNET_i2s(other));
266       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
267       //s->peer_id = other;
268       s->peer_id_hash = other_hash;
269       if (NULL != insertCB)
270       {
271         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
272       }
273     }
274     else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
275     {
276       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
277           GNUNET_i2s(other));
278       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
279           GNUNET_i2s(s->peer_id));
280
281       if ( NULL != removeCB )
282       {
283         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
284             GNUNET_i2s(s->peer_id));
285         removeCB(removeCLS, s->peer_id, s->peer_id_hash);
286       }
287
288       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
289       //s->peer_id = other;
290       s->peer_id_hash = other_hash;
291
292       if ( NULL != insertCB )
293       {
294         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
295             GNUNET_i2s(s->peer_id));
296         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
297       }
298     }
299     else
300     {
301       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
302           GNUNET_i2s(other));
303       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
304           GNUNET_i2s(s->peer_id));
305     }
306   }
307 }
308
309 /**
310  * Gow or shrink the size of the tuple of samplers.
311  *
312  * @param samplers the samplers to grow
313  * @param new_size the new size of the samplers
314  * @param fill_up_id if growing, that has to point to a
315  *                   valid PeerID and will be used
316  *                   to initialise newly created samplers
317  */
318   void
319 SAMPLER_samplers_resize (struct Samplers * samplers,
320     unsigned int new_size,
321     struct GNUNET_PeerIdentity *fill_up_id)
322 {
323   if ( samplers->size == new_size )
324   {
325     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
326     return;
327   }
328
329   unsigned int old_size;
330   struct Sampler *iter;
331   uint64_t i;
332   struct Sampler *tmp;
333
334   old_size = samplers->size;
335   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
336
337   iter = samplers->head;
338
339   if ( new_size < old_size )
340   {
341     for ( i = new_size ; i < old_size ; i++ )
342     {/* Remove unneeded rest */
343       tmp = iter->next;
344       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
345       if (NULL != samplers->removeCB)
346         samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
347         // FIXME When this is called and counts the amount of peer_ids in the samplers
348         //       this gets a wrong number.
349       GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
350       GNUNET_free(iter);
351       iter = tmp;
352     }
353   }
354
355   GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
356   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
357
358   if ( new_size > old_size )
359   { /* Growing */
360     GNUNET_assert( NULL != fill_up_id );
361     for ( i = 0 ; i < new_size ; i++ )
362     { /* All samplers */
363       if ( i < old_size )
364       { /* Update old samplers */
365         iter->peer_id = &samplers->peer_ids[i];
366         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
367             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
368         iter = iter->next;
369       }
370       else
371       { /* Add new samplers */
372         memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
373         iter = SAMPLER_init(&samplers->peer_ids[i]);
374         if (NULL != samplers->insertCB)
375         {
376           samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
377         }
378         GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
379         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
380             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
381       }
382     }
383   }
384   else// if ( new_size < old_size )
385   { /* Shrinking */
386     for ( i = 0 ; i < new_size ; i++)
387     { /* All samplers */
388       tmp = iter->next;
389       /* Update remaining samplers */
390       iter->peer_id = &samplers->peer_ids[i];
391       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
392           i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
393
394       iter = tmp;
395     }
396   }
397
398   GNUNET_assert(samplers->size == new_size);
399   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
400 }
401
402
403 /**
404  * Initialise a tuple of samplers.
405  */
406 struct Samplers *
407 SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
408     SAMPLER_insertCB insertCB, void *insertCLS,
409     SAMPLER_removeCB removeCB, void *removeCLS)
410 {
411   struct Samplers *samplers;
412   //struct Sampler *s;
413   //uint64_t i;
414
415   samplers = GNUNET_new(struct Samplers);
416   samplers->size = 0;
417   samplers->peer_ids = NULL;
418   samplers->insertCB = insertCB;
419   samplers->insertCLS = insertCLS;
420   samplers->removeCB = removeCB;
421   samplers->removeCLS = removeCLS;
422   samplers->head = samplers->tail = NULL;
423   //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
424
425   SAMPLER_samplers_resize(samplers, init_size, id);
426
427   GNUNET_assert(init_size == samplers->size);
428   return samplers;
429 }
430
431
432 /**
433  * A fuction to update every sampler in the given list
434  */
435   static void
436 SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
437 {
438   struct Sampler *iter;
439
440   iter = samplers->head;
441   while ( NULL != iter->next )
442   {
443     SAMPLER_next(iter, id,
444         samplers->insertCB, samplers->insertCLS,
445         samplers->removeCB, samplers->removeCLS);
446     iter = iter->next;
447   }
448   
449 }
450
451 /**
452  * Reinitialise all previously initialised sampler with the given value.
453  *
454  * @param samplers the sampler list.
455  * @param id the id of the samplers to update.
456  */
457   void
458 SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
459 {
460   uint64_t i;
461   struct Sampler *iter;
462
463   iter = samplers->head;
464   for ( i = 0 ; i < samplers->size ; i++ )
465   {
466     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
467     {
468       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
469       SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
470     }
471     if (NULL != iter->next)
472       iter = iter->next;
473   }
474 }
475
476 /**
477  * Get one random peer out of the sampled peers.
478  *
479  * We might want to reinitialise this sampler after giving the
480  * corrsponding peer to the client.
481  */
482   const struct GNUNET_PeerIdentity* 
483 SAMPLER_get_rand_peer (struct Samplers *samplers)
484 {
485   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
486
487   if ( 0 == samplers->size )
488   {
489     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID %s\n", GNUNET_i2s(own_identity));
490     return own_identity;
491   }
492   else
493   {
494     const struct GNUNET_PeerIdentity *peer;
495
496     peer = get_rand_peer(samplers->peer_ids, samplers->size);
497     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
498     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", GNUNET_i2s(own_identity));
499
500     return peer;
501   }
502 }
503
504 /**
505  * Get n random peers out of the sampled peers.
506  *
507  * We might want to reinitialise this sampler after giving the
508  * corrsponding peer to the client.
509  * Random with or without consumption?
510  */
511   const struct GNUNET_PeerIdentity*  // TODO give back simple array
512 SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
513 {
514   // TODO check if we have too much (distinct) sampled peers
515   // If we are not ready yet maybe schedule for later
516   struct GNUNET_PeerIdentity *peers;
517   uint64_t i;
518   
519   peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
520
521   for ( i = 0 ; i < n ; i++ ) {
522     //peers[i] = SAMPLER_get_rand_peer(samplers);
523     memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct GNUNET_PeerIdentity));
524   }
525
526   // TODO something else missing?
527   return peers;
528 }
529
530 /**
531  * Counts how many Samplers currently hold a given PeerID.
532  */
533   uint64_t
534 SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
535 {
536   struct Sampler *iter;
537   uint64_t count;
538
539   iter = samplers->head;
540   count = 0;
541   while ( NULL != iter )
542   {
543     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
544       count++;
545     iter = iter->next;
546   }
547   return count;
548 }
549
550
551 /**
552  * Cleans the samplers.
553  * 
554  * @param samplers the samplers to clean up.
555  */
556   void
557 SAMPLER_samplers_destroy (struct Samplers *samplers)
558 {
559   SAMPLER_samplers_resize(samplers, 0, NULL);
560   GNUNET_free(samplers);
561 }
562
563 /***********************************************************************
564  * /Sampler
565 ***********************************************************************/
566
567
568
569 /***********************************************************************
570  * Housekeeping with peers
571 ***********************************************************************/
572
573 /**
574  * Struct used to store the context of a connected client.
575  */
576 struct client_ctx
577 {
578   /**
579    * The message queue to communicate with the client.
580    */
581   struct GNUNET_MQ_Handle *mq;
582 };
583
584 /**
585  * Used to keep track in what lists single peerIDs are.
586  */
587 enum in_list_flag // probably unneeded
588 {
589   in_other_sampler_list = 0x1,
590   in_other_gossip_list  = 0x2, // unneeded?
591   in_own_sampler_list   = 0x4,
592   in_own_gossip_list    = 0x8 // unneeded?
593 };
594
595 /**
596  * Struct used to keep track of other peer's status
597  *
598  * This is stored in a multipeermap.
599  */
600 struct peer_context
601 {
602   /**
603    * In own gossip/sampler list, in other's gossip/sampler list
604    */
605   uint32_t in_flags; // unneeded?
606
607   /**
608    * Message queue open to client
609    */
610   struct GNUNET_MQ_Handle *mq;
611
612   /**
613    * Channel open to client.
614    */
615   struct GNUNET_CADET_Channel *to_channel;
616
617   /**
618    * Channel open from client.
619    */
620   struct GNUNET_CADET_Channel *from_channel; // unneeded
621
622   /**
623    * This is pobably followed by 'statistical' data (when we first saw
624    * him, how did we get his ID, how many pushes (in a timeinterval),
625    * ...)
626    */
627 };
628
629 /***********************************************************************
630  * /Housekeeping with peers
631 ***********************************************************************/
632
633 /**
634  * Set of all peers to keep track of them.
635  */
636 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
637
638
639 /**
640  * The samplers.
641  */
642 static struct Samplers *sampler_list;
643
644
645 /**
646  * The gossiped list of peers.
647  */
648 static struct GNUNET_PeerIdentity *gossip_list;
649
650 /**
651  * Size of the gossiped list
652  */
653 static unsigned int gossip_list_size;
654
655
656 /**
657  * The estimated size of the network.
658  *
659  * Influenced by the stdev.
660  */
661 static unsigned int est_size;
662 //size_t est_size;
663
664
665 /**
666  * Percentage of total peer number in the gossip list
667  * to send random PUSHes to
668  */
669 static float alpha;
670
671 /**
672  * Percentage of total peer number in the gossip list
673  * to send random PULLs to
674  */
675 static float beta;
676
677 /**
678  * The percentage gamma of history updates.
679  * Simply 1 - alpha - beta
680  */
681
682
683
684
685 /**
686  * Identifier for the main task that runs periodically.
687  */
688 static struct GNUNET_SCHEDULER_Task * do_round_task;
689
690 /**
691  * Time inverval the do_round task runs in.
692  */
693 static struct GNUNET_TIME_Relative round_interval;
694
695
696
697 /**
698  * List to store peers received through pushes temporary.
699  */
700 static struct GNUNET_PeerIdentity *push_list;
701
702 /**
703  * Size of the push_list;
704  */
705 static unsigned int push_list_size;
706 //size_t push_list_size;
707
708 /**
709  * List to store peers received through pulls temporary.
710  */
711 static struct GNUNET_PeerIdentity *pull_list;
712
713 /**
714  * Size of the pull_list;
715  */
716 static unsigned int pull_list_size;
717 //size_t pull_list_size;
718
719
720 /**
721  * Handler to NSE.
722  */
723 static struct GNUNET_NSE_Handle *nse;
724
725 /**
726  * Handler to CADET.
727  */
728 static struct GNUNET_CADET_Handle *cadet_handle;
729
730 /**
731  * Global counter
732  */
733 uint64_t g_i = 0;
734
735
736 /***********************************************************************
737  * Util functions
738 ***********************************************************************/
739
740 /**
741  * Get random peer from the gossip list.
742  */
743   struct GNUNET_PeerIdentity *
744 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
745 {
746   uint64_t r_index;
747   struct GNUNET_PeerIdentity *peer;
748
749   // FIXME if we have only NULL in gossip list this will block
750   // but then we might have a problem nevertheless
751
752   do
753   {
754
755     /**;
756      * Choose the r_index of the peer we want to return
757      * at random from the interval of the gossip list
758      */
759     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
760                                      list_size);
761
762     peer = &(peer_list[r_index]);
763   } while (NULL == peer);
764
765   return peer;
766 }
767
768 /**
769  * Make sure the context of a given peer exists in the given peer_map.
770  */
771   void
772 touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
773 {
774   struct peer_context *ctx;
775
776   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
777   {
778     ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
779   }
780   else
781   {
782     ctx = GNUNET_new(struct peer_context);
783     ctx->in_flags = 0;
784     ctx->mq = NULL;
785     ctx->to_channel = NULL;
786     ctx->from_channel = NULL;
787     GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
788   }
789 }
790
791 /**
792  * Get the context of a peer. If not existing, create.
793  */
794   struct peer_context *
795 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
796 {
797   struct peer_context *ctx;
798
799   touch_peer_ctx(peer_map, peer);
800   ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
801   return ctx;
802 }
803
804 /**
805  * Get the channel of a peer. If not existing, create.
806  */
807   void
808 touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
809 {
810   struct peer_context *ctx;
811
812   ctx = get_peer_ctx (peer_map, peer);
813   if (NULL == ctx->to_channel)
814   {
815     ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
816                                                   GNUNET_RPS_CADET_PORT,
817                                                   GNUNET_CADET_OPTION_RELIABLE);
818     // do I have to explicitly put it in the peer_map?
819     GNUNET_CONTAINER_multipeermap_put(peer_map, peer, ctx,
820                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
821   }
822 }
823
824 /**
825  * Get the channel of a peer. If not existing, create.
826  */
827   struct GNUNET_CADET_Channel *
828 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
829 {
830   struct peer_context *ctx;
831
832   ctx = get_peer_ctx (peer_map, peer);
833   touch_channel(peer_map, peer);
834   return ctx->to_channel;
835 }
836
837 /**
838  * Make sure the mq for a given peer exists.
839  *
840  * If we already have a message queue open to this client,
841  * simply return it, otherways create one.
842  */
843   void
844 touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
845 {
846   struct peer_context *ctx;
847
848   ctx = get_peer_ctx(peer_map, peer_id);
849   if (NULL == ctx->mq)
850   {
851     touch_channel(peer_map, peer_id);
852     ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
853     //do I have to explicitly put it in the peer_map?
854     GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
855                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
856   }
857 }
858
859 /**
860  * Get the message queue of a specific peer.
861  *
862  * If we already have a message queue open to this client,
863  * simply return it, otherways create one.
864  */
865   struct GNUNET_MQ_Handle *
866 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
867 {
868   struct peer_context *ctx;
869
870   ctx = get_peer_ctx(peer_map, peer_id);
871   touch_mq(peer_map, peer_id);
872
873   return ctx->mq;
874 }
875
876 /***********************************************************************
877  * /Util functions
878 ***********************************************************************/
879
880 /**
881  * Function called by NSE.
882  *
883  * Updates sizes of sampler list and gossip list and adapt those lists
884  * accordingly.
885  */
886   void
887 nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
888 {
889   double estimate;
890   //double scale; // TODO this might go gloabal/config
891
892   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f\n", logestimate, std_dev);
893   //scale = .01;
894   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
895   // GNUNET_NSE_log_estimate_to_n (logestimate);
896   estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
897   if ( 0 < estimate ) {
898     LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
899     est_size = estimate;
900   } else {
901     LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
902   }
903 }
904
905 /**
906  * Handle RPS request from the client.
907  *
908  * @param cls closure
909  * @param client identification of the client
910  * @param message the actual message
911  */
912 static void
913 // TODO rename
914 handle_cs_request (void *cls,
915             struct GNUNET_SERVER_Client *client,
916             const struct GNUNET_MessageHeader *message)
917 {
918   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
919
920   struct GNUNET_RPS_CS_RequestMessage *msg;
921   //unsigned int n_arr[sampler_list->size];// =
922     //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
923   //struct GNUNET_MQ_Handle *mq;
924   struct client_ctx *cli_ctx;
925   struct GNUNET_MQ_Envelope *ev;
926   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
927   uint64_t num_peers;
928   //uint64_t i;
929
930   // TODO
931   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
932   cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
933   if ( NULL == cli_ctx ) {
934     cli_ctx = GNUNET_new(struct client_ctx);
935     cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
936     GNUNET_SERVER_client_set_user_context(client, cli_ctx);
937   }
938   
939   // How many peers do we give back?
940   // Wait until we have enough random peers?
941
942   ev = GNUNET_MQ_msg_extra(out_msg,
943                            GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity),
944                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
945   out_msg->num_peers = msg->num_peers; // No conversion between network and host order
946
947   num_peers = GNUNET_ntohll(msg->num_peers);
948   //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
949   memcpy(&out_msg[1],
950       SAMPLER_get_n_rand_peers(sampler_list, num_peers),
951       num_peers * sizeof(struct GNUNET_PeerIdentity));
952   
953   GNUNET_MQ_send(cli_ctx->mq, ev);
954   //GNUNET_MQ_destroy(mq);
955
956   GNUNET_SERVER_receive_done (client,
957                               GNUNET_OK);
958 }
959
960 /**
961  * Handle a PUSH message from another peer.
962  *
963  * Check the proof of work and store the PeerID
964  * in the temporary list for pushed PeerIDs.
965  *
966  * @param cls Closure
967  * @param channel The channel the PUSH was received over
968  * @param channel_ctx The context associated with this channel
969  * @param msg The message header
970  */
971 static int
972 handle_peer_push (void *cls,
973     struct GNUNET_CADET_Channel *channel,
974     void **channel_ctx,
975     const struct GNUNET_MessageHeader *msg)
976 {
977   LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
978
979   struct GNUNET_PeerIdentity *peer;
980
981   // TODO check the proof of work
982   
983   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( channel, GNUNET_CADET_OPTION_PEER );
984   
985   /* Add the sending peer to the push_list */
986   LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size);
987   GNUNET_array_append(push_list, push_list_size, *peer);
988   LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size);
989
990   return GNUNET_OK;
991 }
992
993 /**
994  * Handle PULL REQUEST request message from another peer.
995  *
996  * Reply with the gossip list of PeerIDs.
997  *
998  * @param cls Closure
999  * @param channel The channel the PUSH was received over
1000  * @param channel_ctx The context associated with this channel
1001  * @param msg The message header
1002  */
1003 static int
1004 handle_peer_pull_request (void *cls,
1005     struct GNUNET_CADET_Channel *channel,
1006     void **channel_ctx,
1007     const struct GNUNET_MessageHeader *msg)
1008 {
1009
1010   struct GNUNET_PeerIdentity *peer;
1011   struct GNUNET_MQ_Handle *mq;
1012   //struct GNUNET_RPS_P2P_PullRequestMessage *in_msg;
1013   struct GNUNET_MQ_Envelope *ev;
1014   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1015
1016   // find some way to keep one peer from spamming with pull requests?
1017   // allow only one request per time interval ?
1018   // otherwise remove from peerlist?
1019
1020   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER);
1021   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer));
1022
1023   //mq = GNUNET_CADET_mq_create(channel); // without mq?
1024   mq = get_mq(peer_map, peer);
1025
1026   //in_msg = (struct GNUNET_RPS_P2P_PullRequestMessage *) msg;
1027   // TODO how many peers do we actually send?
1028   // GNUNET_ntohll(in_msg->num_peers)
1029   ev = GNUNET_MQ_msg_extra(out_msg,
1030                            gossip_list_size * sizeof(struct GNUNET_PeerIdentity),
1031                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1032   out_msg->num_peers = GNUNET_htonll(gossip_list_size);
1033   memcpy(&out_msg[1], gossip_list,
1034          gossip_list_size * sizeof(struct GNUNET_PeerIdentity));
1035
1036   GNUNET_MQ_send(mq, ev);
1037
1038   return GNUNET_OK;
1039 }
1040
1041 /**
1042  * Handle PULL REPLY message from another peer.
1043  *
1044  * Check whether we sent a corresponding request and
1045  * whether this reply is the first one.
1046  *
1047  * @param cls Closure
1048  * @param channel The channel the PUSH was received over
1049  * @param channel_ctx The context associated with this channel
1050  * @param msg The message header
1051  */
1052 static int
1053 handle_peer_pull_reply (void *cls,
1054     struct GNUNET_CADET_Channel *channel,
1055     void **channel_ctx,
1056     const struct GNUNET_MessageHeader *msg)
1057 {
1058   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1059
1060   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1061   struct GNUNET_PeerIdentity *peers;
1062   uint64_t i;
1063
1064   // TODO check that we sent a request and that it is the first reply
1065
1066   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1067   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1068   for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ )
1069   {
1070     GNUNET_array_append(pull_list, pull_list_size, peers[i]);
1071   }
1072
1073   return GNUNET_OK;
1074 }
1075
1076
1077 /**
1078  * Send out PUSHes and PULLs.
1079  *
1080  * This is executed regylary.
1081  */
1082 static void
1083 do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1084 {
1085   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
1086
1087   uint64_t i;
1088   //unsigned int *n_arr;
1089   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
1090   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
1091   struct GNUNET_MQ_Envelope *ev;
1092   struct GNUNET_PeerIdentity *peer;
1093
1094   // TODO print lists, ...
1095   // TODO cleanup peer_map
1096
1097
1098   /* Would it make sense to have one shuffeled gossip list and then
1099    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1100    * use the rest to update sampler?
1101    * in essence get random peers with consumption */
1102
1103   /* Send PUSHes */
1104   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
1105   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %f (%f * %u) peers.\n",
1106       alpha * gossip_list_size, alpha, gossip_list_size);
1107   for ( i = 0 ; i < alpha * gossip_list_size ; i++ )
1108   { // TODO compute length
1109     peer = get_rand_peer(gossip_list, gossip_list_size);
1110     if (own_identity != peer)
1111     { // FIXME if this fails schedule/loop this for later
1112       LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1113
1114       ev = GNUNET_MQ_msg (push_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1115       //ev = GNUNET_MQ_msg_extra();
1116       /* TODO Compute proof of work here
1117          push_msg; */
1118       push_msg->placeholder = 0;
1119       // FIXME sometimes it returns a pointer to a freed mq
1120       GNUNET_MQ_send (get_mq (peer_map, peer), ev);
1121
1122       // modify in_flags of respective peer?
1123     }
1124   }
1125
1126
1127   /* Send PULL requests */
1128   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1129   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %f (%f * %u) peers.\n",
1130       beta * gossip_list_size, beta, gossip_list_size);
1131   for ( i = 0 ; i < beta * gossip_list_size ; i++ )
1132   { // TODO compute length
1133     peer = get_rand_peer(gossip_list, gossip_list_size);
1134     if (own_identity != peer)
1135     { // FIXME if this fails schedule/loop this for later
1136       LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1137
1138       ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1139       //ev = GNUNET_MQ_msg_extra();
1140       pull_msg->placeholder = 0;
1141       GNUNET_MQ_send( get_mq(peer_map, peer), ev );
1142       // modify in_flags of respective peer?
1143     }
1144   }
1145
1146
1147   /* If the NSE has changed adapt the lists accordingly */
1148   if ( sampler_list->size != est_size )
1149     SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1150
1151   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1152
1153
1154   /* Update gossip list */
1155   uint64_t r_index;
1156
1157   if ( push_list_size <= alpha * gossip_list_size &&
1158        push_list_size != 0 &&
1159        pull_list_size != 0 )
1160   {
1161     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
1162
1163     uint64_t first_border;
1164     uint64_t second_border;
1165
1166     first_border = round(alpha * gossip_list_size);
1167     for ( i = 0 ; i < first_border ; i++ )
1168     { // TODO use SAMPLER_get_n_rand_peers
1169       /* Update gossip list with peers received through PUSHes */
1170       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1171                                        push_list_size);
1172       gossip_list[i] = push_list[r_index];
1173       // TODO change the in_flags accordingly
1174     }
1175
1176     second_border = first_border + round(beta * gossip_list_size);
1177     for ( i = first_border ; i < second_border ; i++ )
1178     {
1179       /* Update gossip list with peers received through PULLs */
1180       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1181                                        pull_list_size);
1182       gossip_list[i] = pull_list[r_index];
1183       // TODO change the in_flags accordingly
1184     }
1185
1186     for ( i = second_border ; i < gossip_list_size ; i++ )
1187     {
1188       /* Update gossip list with peers from history */
1189       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1190                                        sampler_list->size);
1191       gossip_list[i] = sampler_list->peer_ids[r_index];
1192       // TODO change the in_flags accordingly
1193     }
1194
1195   }
1196   else
1197   {
1198     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
1199   }
1200   // TODO independent of that also get some peers from CADET_get_peers()?
1201
1202
1203   /* Update samplers */
1204
1205   for ( i = 0 ; i < push_list_size ; i++ )
1206   {
1207     SAMPLER_update_list(sampler_list, &push_list[i]);
1208     // TODO set in_flag?
1209   }
1210
1211   for ( i = 0 ; i < pull_list_size ; i++ )
1212   {
1213     SAMPLER_update_list(sampler_list, &pull_list[i]);
1214     // TODO set in_flag?
1215   }
1216
1217
1218   /* Empty push/pull lists */
1219   GNUNET_array_grow(push_list, push_list_size, 0);
1220   push_list_size = 0; // I guess that's not necessary but doesn't hurt
1221   GNUNET_array_grow(pull_list, pull_list_size, 0);
1222   pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1223
1224
1225   /* Schedule next round */
1226   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL );
1227   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1228 }
1229
1230 /**
1231  * Open a connection to given peer and store channel and mq.
1232  */
1233   void
1234 insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1235 {
1236   // We open a channel to be notified when this peer goes down.
1237   touch_channel(peer_map, id);
1238 }
1239
1240 /**
1241  * Close the connection to given peer and delete channel and mq.
1242  */
1243   void
1244 removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1245 {
1246   size_t s;
1247   struct peer_context *ctx;
1248
1249   s = SAMPLER_count_id(sampler_list, id);
1250   if ( 1 >= s ) {
1251     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
1252     {
1253       ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
1254       if (NULL != ctx->to_channel)
1255       {
1256         if (NULL != ctx->mq)
1257         {
1258           GNUNET_MQ_destroy(ctx->mq);
1259         }
1260         GNUNET_CADET_channel_destroy(ctx->to_channel);
1261       }
1262       // TODO cleanup peer
1263       GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
1264     }
1265   }
1266 }
1267
1268 static void
1269 rps_start (struct GNUNET_SERVER_Handle *server);
1270
1271 /**
1272  * This is called from GNUNET_CADET_get_peers().
1273  *
1274  * It is called on every peer(ID) that cadet somehow has contact with.
1275  * We use those to initialise the sampler.
1276  */
1277 void
1278 init_peer_cb (void *cls,
1279               const struct GNUNET_PeerIdentity *peer,
1280               int tunnel, // "Do we have a tunnel towards this peer?"
1281               unsigned int n_paths, // "Number of known paths towards this peer"
1282               unsigned int best_path) // "How long is the best path?
1283                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1284 {
1285   if ( NULL != peer )
1286   {
1287     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1288     SAMPLER_update_list(sampler_list, peer);
1289     touch_peer_ctx(peer_map, peer); // unneeded? -> insertCB
1290
1291     gossip_list[g_i] = *peer;
1292     g_i++;
1293     // FIXME find a better way to have a global counter
1294
1295     // send push/pull to each of those peers?
1296   }
1297   else
1298   {
1299     if (g_i < sampler_list->size)
1300     {
1301       memcpy(&gossip_list[g_i],
1302           &(sampler_list->peer_ids[g_i]),
1303           (sampler_list->size - g_i) * sizeof(struct GNUNET_PeerIdentity));
1304     }
1305     rps_start( (struct GNUNET_SERVER_Handle *) cls);
1306   }
1307 }
1308
1309
1310 /**
1311  * Task run during shutdown.
1312  *
1313  * @param cls unused
1314  * @param tc unused
1315  */
1316 static void
1317 shutdown_task (void *cls,
1318                const struct GNUNET_SCHEDULER_TaskContext *tc)
1319 {
1320   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
1321
1322   if ( NULL != do_round_task )
1323   {
1324     GNUNET_SCHEDULER_cancel (do_round_task);
1325     do_round_task = NULL;
1326   }
1327
1328   GNUNET_NSE_disconnect(nse);
1329   GNUNET_CADET_disconnect(cadet_handle);
1330   GNUNET_free(own_identity);
1331   SAMPLER_samplers_destroy(sampler_list);
1332   GNUNET_array_grow(gossip_list, gossip_list_size, 0);
1333   GNUNET_array_grow(push_list, push_list_size, 0);
1334   GNUNET_array_grow(pull_list, pull_list_size, 0);
1335 }
1336
1337
1338 /**
1339  * A client disconnected.  Remove all of its data structure entries.
1340  *
1341  * @param cls closure, NULL
1342  * @param client identification of the client
1343  */
1344 static void
1345 handle_client_disconnect (void *cls,
1346                           struct GNUNET_SERVER_Client * client)
1347 {
1348 }
1349
1350 /**
1351  * Handle the channel a peer opens to us.
1352  *
1353  * @param cls The closure
1354  * @param channel The channel the peer wants to establish
1355  * @param initiator The peer's peer ID
1356  * @param port The port the channel is being established over
1357  * @param options Further options
1358  */
1359   static void *
1360 handle_inbound_channel (void *cls,
1361                         struct GNUNET_CADET_Channel *channel,
1362                         const struct GNUNET_PeerIdentity *initiator,
1363                         uint32_t port,
1364                         enum GNUNET_CADET_ChannelOption options)
1365 {
1366   struct peer_context *ctx;
1367
1368   LOG(GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us (Peer %s).\n", GNUNET_i2s(initiator));
1369
1370   GNUNET_assert( NULL != channel );
1371
1372   // we might not even store the from_channel
1373
1374   ctx = get_peer_ctx(peer_map, initiator);
1375   if (NULL != ctx->from_channel)
1376   {
1377     ctx->from_channel = channel;
1378   }
1379
1380   // FIXME there might already be an established channel
1381
1382   //ctx->in_flags = in_other_gossip_list;
1383   ctx->mq = NULL; // TODO create mq?
1384
1385   GNUNET_CONTAINER_multipeermap_put (peer_map, initiator, ctx,
1386       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
1387   return NULL; // TODO
1388 }
1389
1390 /**
1391  * This is called when a remote peer destroys a channel.
1392  *
1393  * @param cls The closure
1394  * @param channel The channel being closed
1395  * @param channel_ctx The context associated with this channel
1396  */
1397 static void
1398 cleanup_channel(void *cls,
1399                 const struct GNUNET_CADET_Channel *channel,
1400                 void *channel_ctx)
1401 {
1402   struct GNUNET_PeerIdentity *peer;
1403   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel to remote peer was destroyed.\n");
1404
1405   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1406       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1407        // Guess simply casting isn't the nicest way...
1408   SAMPLER_reinitialise_samplers_by_value(sampler_list, peer);
1409 }
1410
1411 /**
1412  * Actually start the service.
1413  */
1414 static void
1415 rps_start (struct GNUNET_SERVER_Handle *server)
1416 {
1417   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1418     {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
1419     {NULL, NULL, 0, 0}
1420   };
1421
1422   GNUNET_SERVER_add_handlers (server, handlers);
1423   GNUNET_SERVER_disconnect_notify (server,
1424                                    &handle_client_disconnect,
1425                                    NULL);
1426   LOG(GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
1427
1428
1429   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1430   LOG(GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
1431
1432   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1433                                 &shutdown_task,
1434                                 NULL);
1435 }
1436
1437
1438 /**
1439  * Process statistics requests.
1440  *
1441  * @param cls closure
1442  * @param server the initialized server
1443  * @param c configuration to use
1444  */
1445 static void
1446 run (void *cls,
1447      struct GNUNET_SERVER_Handle *server,
1448      const struct GNUNET_CONFIGURATION_Handle *c)
1449 {
1450   // TODO check what this does -- copied from gnunet-boss
1451   // - seems to work as expected
1452   GNUNET_log_setup("rps", GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG), NULL);
1453
1454   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1455
1456   cfg = c;
1457
1458
1459   own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
1460
1461   GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value
1462
1463   GNUNET_assert(NULL != own_identity);
1464
1465   LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1466
1467
1468
1469   /* Get time interval from the configuration */
1470   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1471                                                         "ROUNDINTERVAL",
1472                                                         &round_interval))
1473   {
1474     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
1475     GNUNET_SCHEDULER_shutdown();
1476     return;
1477   }
1478
1479   /* Get initial size of sampler/gossip list from the configuration */
1480   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1481                                                          "INITSIZE",
1482                                                          (long long unsigned int *) &est_size))
1483   {
1484     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1485     GNUNET_SCHEDULER_shutdown();
1486     return;
1487   }
1488   LOG(GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", est_size);
1489
1490   //gossip_list_size = est_size; // TODO rename est_size
1491
1492   gossip_list = NULL;
1493
1494   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1495
1496
1497   /* connect to NSE */
1498   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
1499   // TODO check whether that was successful
1500   // TODO disconnect on shutdown
1501   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1502
1503
1504   alpha = 0.45;
1505   beta  = 0.45;
1506   // TODO initialise thresholds - ?
1507
1508   /* Get alpha from the configuration */
1509   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1510                                                          "ALPHA",
1511                                                          &alpha))
1512   {
1513     LOG(GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
1514   }
1515   LOG(GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
1516  
1517   /* Get beta from the configuration */
1518   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1519                                                          "BETA",
1520                                                          &beta))
1521   {
1522     LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1523   }
1524   LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1525
1526   // TODO check that alpha + beta < 1
1527
1528   peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
1529
1530
1531   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1532     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
1533     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
1534     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
1535     {NULL, 0, 0}
1536   };
1537
1538   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1539   cadet_handle = GNUNET_CADET_connect(cfg,
1540                                     cls,
1541                                     &handle_inbound_channel,
1542                                     &cleanup_channel,
1543                                     cadet_handlers,
1544                                     ports);
1545   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1546
1547
1548   /* Initialise sampler and gossip list */
1549
1550   sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL);
1551
1552   push_list = NULL;
1553   push_list_size = 0;
1554   pull_list = NULL;
1555   pull_list_size = 0;
1556
1557
1558   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1559   GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
1560   // FIXME use magic 0000 PeerID to _start_ the service
1561
1562   // TODO send push/pull to each of those peers?
1563 }
1564
1565
1566 /**
1567  * The main function for the rps service.
1568  *
1569  * @param argc number of arguments from the command line
1570  * @param argv command line arguments
1571  * @return 0 ok, 1 on error
1572  */
1573 int
1574 main (int argc, char *const *argv)
1575 {
1576   return (GNUNET_OK ==
1577           GNUNET_SERVICE_run (argc,
1578                               argv,
1579                               "rps",
1580                               GNUNET_SERVICE_OPTION_NONE,
1581                               &run, NULL)) ? 0 : 1;
1582 }
1583
1584 /* end of gnunet-service-rps.c */