fixed some issues
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // TODO modify @brief in every file
38
39 // TODO take care that messages are not longer than 64k
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO malicious peer
48
49 // TODO Change API to accept initialisation peers
50
51 /**
52  * Our configuration.
53  */
54 static const struct GNUNET_CONFIGURATION_Handle *cfg;
55
56 /**
57  * Our own identity.
58  */
59 static struct GNUNET_PeerIdentity *own_identity;
60
61
62   struct GNUNET_PeerIdentity *
63 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
64
65 /***********************************************************************
66  * Sampler
67  *
68  * WARNING: This section needs to be reviewed regarding the use of
69  * functions providing (pseudo)randomness!
70 ***********************************************************************/
71
72 // TODO care about invalid input of the caller (size 0 or less...)
73
74 // It might be interesting to formulate this independent of PeerIDs.
75
76 /**
77  * Callback that is called when a new PeerID is inserted into a sampler.
78  *
79  * @param cls the closure given alongside this function.
80  * @param id the PeerID that is inserted
81  * @param hash the hash the sampler produced of the PeerID
82  */
83 typedef void (* SAMPLER_insertCB) (void *cls,
84     const struct GNUNET_PeerIdentity *id,
85     struct GNUNET_HashCode hash);
86
87 /**
88  * Callback that is called when a new PeerID is removed from a sampler.
89  *
90  * @param cls the closure given alongside this function.
91  * @param id the PeerID that is removed
92  * @param hash the hash the sampler produced of the PeerID
93  */
94 typedef void (* SAMPLER_removeCB) (void *cls,
95     const struct GNUNET_PeerIdentity *id,
96     struct GNUNET_HashCode hash);
97
98 /**
99  * A sampler sampling PeerIDs.
100  */
101 struct Sampler
102 {
103   /**
104    * Min-wise linear permutation used by this sampler.
105    *
106    * This is an key later used by a hmac.
107    */
108   struct GNUNET_CRYPTO_AuthKey auth_key;
109
110   /**
111    * The PeerID this sampler currently samples.
112    */
113   struct GNUNET_PeerIdentity *peer_id;
114
115   /**
116    * The according hash value of this PeerID.
117    */
118   struct GNUNET_HashCode peer_id_hash;
119
120   /**
121    * Samplers are kept in a linked list.
122    */
123   struct Sampler *next;
124
125   /**
126    * Samplers are kept in a linked list.
127    */
128   struct Sampler *prev;
129
130 };
131
132 /**
133  * A n-tuple of samplers.
134  */
135 struct Samplers
136 {
137   /**
138    * Number of samplers we hold.
139    */
140   unsigned int size;
141   //size_t size;
142   
143   /**
144    * All PeerIDs in one array.
145    */
146   struct GNUNET_PeerIdentity *peer_ids;
147
148   /**
149    * Callback to be called when a peer gets inserted into a sampler.
150    */
151   SAMPLER_insertCB insertCB;
152
153   /**
154    * Closure to the insertCB.
155    */
156   void *insertCLS;
157
158   /**
159    * Callback to be called when a peer gets inserted into a sampler.
160    */
161   SAMPLER_removeCB removeCB;
162
163   /**
164    * Closure to the removeCB.
165    */
166   void *removeCLS;
167
168   /**
169    * The head of the DLL.
170    */
171   struct Sampler *head;
172
173   /**
174    * The tail of the DLL.
175    */
176   struct Sampler *tail;
177
178 };
179
180 /**
181  * Reinitialise a previously initialised sampler.
182  *
183  * @param sampler the sampler element.
184  * @param id pointer to the memory that keeps the value.
185  */
186   void
187 SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct GNUNET_PeerIdentity *id)
188 {
189   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
190   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
191                              &(sampler->auth_key.key),
192                              GNUNET_CRYPTO_HASH_LENGTH);
193
194   GNUNET_assert(NULL != id);
195   sampler->peer_id = id;
196   memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
197   // Maybe take a PeerID as second argument.
198
199   GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
200                      sizeof(struct GNUNET_PeerIdentity),
201                      &sampler->peer_id_hash);
202 }
203
204
205 /**
206  * (Re)Initialise given Sampler with random min-wise independent function.
207  *
208  * In this implementation this means choosing an auth_key for later use in
209  * a hmac at random.
210  *
211  * @param id pointer to the place where this sampler will store the PeerID.
212  *           This will be overwritten.
213  */
214   struct Sampler *
215 SAMPLER_init(struct GNUNET_PeerIdentity *id)
216 {
217   struct Sampler *s;
218   
219   s = GNUNET_new(struct Sampler);
220
221   SAMPLER_reinitialise_sampler (s, id);
222   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) \n",
223       GNUNET_i2s(s->peer_id), s->peer_id);
224
225   s->prev = NULL;
226   s->next = NULL;
227
228   return s;
229 }
230
231 /**
232  * Input an PeerID into the given sampler.
233  */
234   static void
235 SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
236     SAMPLER_insertCB insertCB, void *insertCLS,
237     SAMPLER_removeCB removeCB, void *removeCLS)
238   // TODO call update herein
239 {
240   struct GNUNET_HashCode other_hash;
241
242   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
243       GNUNET_i2s(other), other);
244   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
245       GNUNET_i2s(s->peer_id), s->peer_id);
246
247   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
248   {
249     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
250         GNUNET_i2s(other));
251     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
252         GNUNET_i2s(s->peer_id));
253   }
254   else
255   {
256     GNUNET_CRYPTO_hmac(&s->auth_key,
257         other,
258         sizeof(struct GNUNET_PeerIdentity),
259         &other_hash);
260
261     if ( NULL == s->peer_id )
262     { // Or whatever is a valid way to say
263       // "we have no PeerID at the moment"
264       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
265           GNUNET_i2s(other));
266       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
267       //s->peer_id = other;
268       s->peer_id_hash = other_hash;
269       if (NULL != insertCB)
270       {
271         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
272       }
273     }
274     else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
275     {
276       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
277           GNUNET_i2s(other));
278       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
279           GNUNET_i2s(s->peer_id));
280
281       if ( NULL != removeCB )
282       {
283         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
284             GNUNET_i2s(s->peer_id));
285         removeCB(removeCLS, s->peer_id, s->peer_id_hash);
286       }
287
288       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
289       //s->peer_id = other;
290       s->peer_id_hash = other_hash;
291
292       if ( NULL != insertCB )
293       {
294         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
295             GNUNET_i2s(s->peer_id));
296         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
297       }
298     }
299     else
300     {
301       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
302           GNUNET_i2s(other));
303       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
304           GNUNET_i2s(s->peer_id));
305     }
306   }
307 }
308
309 /**
310  * Gow or shrink the size of the tuple of samplers.
311  *
312  * @param samplers the samplers to grow
313  * @param new_size the new size of the samplers
314  * @param fill_up_id if growing, that has to point to a
315  *                   valid PeerID and will be used
316  *                   to initialise newly created samplers
317  */
318   void
319 SAMPLER_samplers_resize (struct Samplers * samplers,
320     unsigned int new_size,
321     struct GNUNET_PeerIdentity *fill_up_id)
322 {
323   if ( samplers->size == new_size )
324   {
325     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
326     return;
327   }
328
329   unsigned int old_size;
330   struct Sampler *iter;
331   uint64_t i;
332   struct Sampler *tmp;
333
334   old_size = samplers->size;
335   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
336
337   iter = samplers->head;
338
339   if ( new_size < old_size )
340   {
341     for ( i = new_size ; i < old_size ; i++ )
342     {/* Remove unneeded rest */
343       tmp = iter->next;
344       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
345       if (NULL != samplers->removeCB)
346         samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
347         // FIXME When this is called and counts the amount of peer_ids in the samplers
348         //       this gets a wrong number.
349       GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
350       GNUNET_free(iter);
351       iter = tmp;
352     }
353   }
354
355   GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
356   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
357
358   if ( new_size > old_size )
359   { /* Growing */
360     GNUNET_assert( NULL != fill_up_id );
361     for ( i = 0 ; i < new_size ; i++ )
362     { /* All samplers */
363       if ( i < old_size )
364       { /* Update old samplers */
365         iter->peer_id = &samplers->peer_ids[i];
366         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
367             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
368         iter = iter->next;
369       }
370       else
371       { /* Add new samplers */
372         memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
373         iter = SAMPLER_init(&samplers->peer_ids[i]);
374         if (NULL != samplers->insertCB)
375         {
376           samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
377         }
378         GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
379         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
380             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
381       }
382     }
383   }
384   else// if ( new_size < old_size )
385   { /* Shrinking */
386     for ( i = 0 ; i < new_size ; i++)
387     { /* All samplers */
388       tmp = iter->next;
389       /* Update remaining samplers */
390       iter->peer_id = &samplers->peer_ids[i];
391       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
392           i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
393
394       iter = tmp;
395     }
396   }
397
398   GNUNET_assert(samplers->size == new_size);
399   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
400 }
401
402
403 /**
404  * Initialise a tuple of samplers.
405  */
406 struct Samplers *
407 SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
408     SAMPLER_insertCB insertCB, void *insertCLS,
409     SAMPLER_removeCB removeCB, void *removeCLS)
410 {
411   struct Samplers *samplers;
412   //struct Sampler *s;
413   //uint64_t i;
414
415   samplers = GNUNET_new(struct Samplers);
416   samplers->size = 0;
417   samplers->peer_ids = NULL;
418   samplers->insertCB = insertCB;
419   samplers->insertCLS = insertCLS;
420   samplers->removeCB = removeCB;
421   samplers->removeCLS = removeCLS;
422   samplers->head = samplers->tail = NULL;
423   //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
424
425   SAMPLER_samplers_resize(samplers, init_size, id);
426
427   GNUNET_assert(init_size == samplers->size);
428   return samplers;
429 }
430
431
432 /**
433  * A fuction to update every sampler in the given list
434  */
435   static void
436 SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
437 {
438   struct Sampler *iter;
439
440   iter = samplers->head;
441   while ( NULL != iter->next )
442   {
443     SAMPLER_next(iter, id,
444         samplers->insertCB, samplers->insertCLS,
445         samplers->removeCB, samplers->removeCLS);
446     iter = iter->next;
447   }
448   
449 }
450
451 /**
452  * Reinitialise all previously initialised sampler with the given value.
453  *
454  * @param samplers the sampler list.
455  * @param id the id of the samplers to update.
456  */
457   void
458 SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
459 {
460   uint64_t i;
461   struct Sampler *iter;
462
463   iter = samplers->head;
464   for ( i = 0 ; i < samplers->size ; i++ )
465   {
466     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
467     {
468       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
469       SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
470     }
471     if (NULL != iter->next)
472       iter = iter->next;
473   }
474 }
475
476 /**
477  * Get one random peer out of the sampled peers.
478  *
479  * We might want to reinitialise this sampler after giving the
480  * corrsponding peer to the client.
481  */
482   const struct GNUNET_PeerIdentity* 
483 SAMPLER_get_rand_peer (struct Samplers *samplers)
484 {
485   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
486
487   if ( 0 == samplers->size )
488   {
489     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID %s\n", GNUNET_i2s(own_identity));
490     return own_identity;
491   }
492   else
493   {
494     const struct GNUNET_PeerIdentity *peer;
495
496     peer = get_rand_peer(samplers->peer_ids, samplers->size);
497     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
498     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", GNUNET_i2s(own_identity));
499
500     return peer;
501   }
502 }
503
504 /**
505  * Get n random peers out of the sampled peers.
506  *
507  * We might want to reinitialise this sampler after giving the
508  * corrsponding peer to the client.
509  * Random with or without consumption?
510  */
511   const struct GNUNET_PeerIdentity*  // TODO give back simple array
512 SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
513 {
514   // TODO check if we have too much (distinct) sampled peers
515   // If we are not ready yet maybe schedule for later
516   struct GNUNET_PeerIdentity *peers;
517   uint64_t i;
518   
519   peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
520
521   for ( i = 0 ; i < n ; i++ ) {
522     //peers[i] = SAMPLER_get_rand_peer(samplers);
523     memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct GNUNET_PeerIdentity));
524   }
525
526   // TODO something else missing?
527   return peers;
528 }
529
530 /**
531  * Counts how many Samplers currently hold a given PeerID.
532  */
533   uint64_t
534 SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
535 {
536   struct Sampler *iter;
537   uint64_t count;
538
539   iter = samplers->head;
540   count = 0;
541   while ( NULL != iter )
542   {
543     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
544       count++;
545     iter = iter->next;
546   }
547   return count;
548 }
549
550
551 /**
552  * Cleans the samplers.
553  * 
554  * @param samplers the samplers to clean up.
555  */
556   void
557 SAMPLER_samplers_destroy (struct Samplers *samplers)
558 {
559   SAMPLER_samplers_resize(samplers, 0, NULL);
560   GNUNET_free(samplers);
561 }
562
563 /***********************************************************************
564  * /Sampler
565 ***********************************************************************/
566
567
568
569 /***********************************************************************
570  * Housekeeping with peers
571 ***********************************************************************/
572
573 /**
574  * Struct used to store the context of a connected client.
575  */
576 struct client_ctx
577 {
578   /**
579    * The message queue to communicate with the client.
580    */
581   struct GNUNET_MQ_Handle *mq;
582 };
583
584 /**
585  * Used to keep track in what lists single peerIDs are.
586  */
587 enum in_list_flag // probably unneeded
588 {
589   in_other_sampler_list = 0x1,
590   in_other_gossip_list  = 0x2, // unneeded?
591   in_own_sampler_list   = 0x4,
592   in_own_gossip_list    = 0x8 // unneeded?
593 };
594
595 /**
596  * Struct used to keep track of other peer's status
597  *
598  * This is stored in a multipeermap.
599  */
600 struct peer_context
601 {
602   /**
603    * In own gossip/sampler list, in other's gossip/sampler list
604    */
605   uint32_t in_flags; // unneeded?
606
607   /**
608    * Message queue open to client
609    */
610   struct GNUNET_MQ_Handle *mq;
611
612   /**
613    * Channel open to client.
614    */
615   struct GNUNET_CADET_Channel *to_channel;
616
617   /**
618    * Channel open from client.
619    */
620   struct GNUNET_CADET_Channel *from_channel; // unneeded
621
622   /**
623    * This is pobably followed by 'statistical' data (when we first saw
624    * him, how did we get his ID, how many pushes (in a timeinterval),
625    * ...)
626    */
627 };
628
629 /***********************************************************************
630  * /Housekeeping with peers
631 ***********************************************************************/
632
633 /**
634  * Set of all peers to keep track of them.
635  */
636 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
637
638
639 /**
640  * The samplers.
641  */
642 static struct Samplers *sampler_list;
643
644
645 /**
646  * The gossiped list of peers.
647  */
648 static struct GNUNET_PeerIdentity *gossip_list;
649
650 /**
651  * Size of the gossiped list
652  */
653 static unsigned int gossip_list_size;
654
655
656 /**
657  * The estimated size of the network.
658  *
659  * Influenced by the stdev.
660  */
661 static unsigned int est_size;
662 //size_t est_size;
663
664
665 /**
666  * Percentage of total peer number in the gossip list
667  * to send random PUSHes to
668  */
669 static float alpha;
670
671 /**
672  * Percentage of total peer number in the gossip list
673  * to send random PULLs to
674  */
675 static float beta;
676
677 /**
678  * The percentage gamma of history updates.
679  * Simply 1 - alpha - beta
680  */
681
682
683
684
685 /**
686  * Identifier for the main task that runs periodically.
687  */
688 static struct GNUNET_SCHEDULER_Task * do_round_task;
689
690 /**
691  * Time inverval the do_round task runs in.
692  */
693 static struct GNUNET_TIME_Relative round_interval;
694
695
696
697 /**
698  * List to store peers received through pushes temporary.
699  */
700 static struct GNUNET_PeerIdentity *push_list;
701
702 /**
703  * Size of the push_list;
704  */
705 static unsigned int push_list_size;
706 //size_t push_list_size;
707
708 /**
709  * List to store peers received through pulls temporary.
710  */
711 static struct GNUNET_PeerIdentity *pull_list;
712
713 /**
714  * Size of the pull_list;
715  */
716 static unsigned int pull_list_size;
717 //size_t pull_list_size;
718
719
720 /**
721  * Handler to NSE.
722  */
723 static struct GNUNET_NSE_Handle *nse;
724
725 /**
726  * Handler to CADET.
727  */
728 static struct GNUNET_CADET_Handle *cadet_handle;
729
730
731 /***********************************************************************
732  * Util functions
733 ***********************************************************************/
734
735 /**
736  * Get random peer from the gossip list.
737  */
738   struct GNUNET_PeerIdentity *
739 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
740 {
741   uint64_t r_index;
742   struct GNUNET_PeerIdentity *peer;
743
744   // FIXME if we have only NULL in gossip list this will block
745   // but then we might have a problem nevertheless
746
747   do
748   {
749
750     /**;
751      * Choose the r_index of the peer we want to return
752      * at random from the interval of the gossip list
753      */
754     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
755                                      list_size);
756
757     peer = &(peer_list[r_index]);
758   } while (NULL == peer);
759
760   return peer;
761 }
762
763 /**
764  * Make sure the context of a given peer exists in the given peer_map.
765  */
766   void
767 touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
768 {
769   struct peer_context *ctx;
770
771   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
772   {
773     ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
774   }
775   else
776   {
777     ctx = GNUNET_malloc(sizeof(struct peer_context));
778     ctx->in_flags = 0;
779     ctx->mq = NULL;
780     ctx->to_channel = NULL;
781     ctx->from_channel = NULL;
782     GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
783   }
784 }
785
786 /**
787  * Get the context of a peer. If not existing, create.
788  */
789   struct peer_context *
790 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
791 {
792   struct peer_context *ctx;
793
794   touch_peer_ctx(peer_map, peer);
795   ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
796   return ctx;
797 }
798
799 /**
800  * Get the channel of a peer. If not existing, create.
801  */
802   void
803 touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
804 {
805   struct peer_context *ctx;
806
807   ctx = get_peer_ctx (peer_map, peer);
808   if (NULL == ctx->to_channel)
809   {
810     ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
811                                                   GNUNET_RPS_CADET_PORT,
812                                                   GNUNET_CADET_OPTION_RELIABLE);
813     // do I have to explicitly put it in the peer_map?
814     GNUNET_CONTAINER_multipeermap_put(peer_map, peer, ctx,
815                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
816   }
817 }
818
819 /**
820  * Get the channel of a peer. If not existing, create.
821  */
822   struct GNUNET_CADET_Channel *
823 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
824 {
825   struct peer_context *ctx;
826
827   ctx = get_peer_ctx (peer_map, peer);
828   touch_channel(peer_map, peer);
829   return ctx->to_channel;
830 }
831
832 /**
833  * Make sure the mq for a given peer exists.
834  *
835  * If we already have a message queue open to this client,
836  * simply return it, otherways create one.
837  */
838   void
839 touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
840 {
841   struct peer_context *ctx;
842
843   ctx = get_peer_ctx(peer_map, peer_id);
844   if (NULL == ctx->mq)
845   {
846     touch_channel(peer_map, peer_id);
847     ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
848     //do I have to explicitly put it in the peer_map?
849     GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
850                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
851   }
852 }
853
854 /**
855  * Get the message queue of a specific peer.
856  *
857  * If we already have a message queue open to this client,
858  * simply return it, otherways create one.
859  */
860   struct GNUNET_MQ_Handle *
861 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
862 {
863   struct peer_context *ctx;
864
865   ctx = get_peer_ctx(peer_map, peer_id);
866   touch_mq(peer_map, peer_id);
867
868   return ctx->mq;
869 }
870
871 /***********************************************************************
872  * /Util functions
873 ***********************************************************************/
874
875 /**
876  * Function called by NSE.
877  *
878  * Updates sizes of sampler list and gossip list and adapt those lists
879  * accordingly.
880  */
881   void
882 nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
883 {
884   double estimate;
885   //double scale; // TODO this might go gloabal/config
886
887   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f\n", logestimate, std_dev);
888   //scale = .01;
889   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
890   // GNUNET_NSE_log_estimate_to_n (logestimate);
891   estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
892   if ( 0 < estimate ) {
893     LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
894     est_size = estimate;
895   } else {
896     LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
897   }
898 }
899
900 /**
901  * Handle RPS request from the client.
902  *
903  * @param cls closure
904  * @param client identification of the client
905  * @param message the actual message
906  */
907 static void
908 // TODO rename
909 handle_cs_request (void *cls,
910             struct GNUNET_SERVER_Client *client,
911             const struct GNUNET_MessageHeader *message)
912 {
913   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
914
915   struct GNUNET_RPS_CS_RequestMessage *msg;
916   //unsigned int n_arr[sampler_list->size];// =
917     //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
918   //struct GNUNET_MQ_Handle *mq;
919   struct client_ctx *cli_ctx;
920   struct GNUNET_MQ_Envelope *ev;
921   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
922   uint64_t num_peers;
923   //uint64_t i;
924
925   // TODO
926   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
927   // Does not work because the compiler seems not to find it.
928   cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
929   if ( NULL == cli_ctx ) {
930     cli_ctx = GNUNET_new(struct client_ctx);
931     cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
932     GNUNET_SERVER_client_set_user_context(client, cli_ctx);
933   }
934   
935   // TODO How many peers do we give back?
936   // Wait until we have enough random peers?
937
938   ev = GNUNET_MQ_msg_extra(out_msg,
939                            GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity),
940                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
941   out_msg->num_peers = msg->num_peers; // No conversion between network and host order
942
943   num_peers = GNUNET_ntohll(msg->num_peers);
944   //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
945   memcpy(&out_msg[1],
946       SAMPLER_get_n_rand_peers(sampler_list, num_peers),
947       num_peers * sizeof(struct GNUNET_PeerIdentity));
948   
949   GNUNET_MQ_send(cli_ctx->mq, ev);
950   //GNUNET_MQ_destroy(mq);
951
952   GNUNET_SERVER_receive_done (client,
953                               GNUNET_OK);
954 }
955
956 /**
957  * Handle a PUSH message from another peer.
958  *
959  * Check the proof of work and store the PeerID
960  * in the temporary list for pushed PeerIDs.
961  *
962  * @param cls Closure
963  * @param channel The channel the PUSH was received over
964  * @param channel_ctx The context associated with this channel
965  * @param msg The message header
966  */
967 static int
968 handle_peer_push (void *cls,
969     struct GNUNET_CADET_Channel *channel,
970     void **channel_ctx,
971     const struct GNUNET_MessageHeader *msg)
972 {
973   LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
974
975   struct GNUNET_PeerIdentity *peer;
976
977   // TODO check the proof of work
978   // and check limit for PUSHes
979   // IF we count per peer PUSHes
980   // maybe remove from gossip/sampler list
981   
982   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( channel, GNUNET_CADET_OPTION_PEER );
983   
984   /* Add the sending peer to the push_list */
985   LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size);
986   GNUNET_array_append(push_list, push_list_size, *peer);
987   LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size);
988
989   return GNUNET_OK;
990 }
991
992 /**
993  * Handle PULL REQUEST request message from another peer.
994  *
995  * Reply with the gossip list of PeerIDs.
996  *
997  * @param cls Closure
998  * @param channel The channel the PUSH was received over
999  * @param channel_ctx The context associated with this channel
1000  * @param msg The message header
1001  */
1002 static int
1003 handle_peer_pull_request (void *cls,
1004     struct GNUNET_CADET_Channel *channel,
1005     void **channel_ctx,
1006     const struct GNUNET_MessageHeader *msg)
1007 {
1008
1009   struct GNUNET_PeerIdentity *peer;
1010   struct GNUNET_MQ_Handle *mq;
1011   //struct GNUNET_RPS_P2P_PullRequestMessage *in_msg;
1012   struct GNUNET_MQ_Envelope *ev;
1013   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1014
1015   // TODO find some way to keep one peer from spamming with pull requests
1016   // allow only one request per time interval ?
1017   // otherwise remove from peerlist?
1018
1019   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER);
1020   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer));
1021
1022   mq = GNUNET_CADET_mq_create(channel); // TODO without mq?
1023   //mq = get_mq(peer_map, peer);
1024
1025   //in_msg = (struct GNUNET_RPS_P2P_PullRequestMessage *) msg;
1026   // TODO how many peers do we actually send?
1027   // GNUNET_ntohll(in_msg->num_peers)
1028   ev = GNUNET_MQ_msg_extra(out_msg,
1029                            gossip_list_size * sizeof(struct GNUNET_PeerIdentity),
1030                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1031   out_msg->num_peers = GNUNET_htonll(gossip_list_size);
1032   memcpy(&out_msg[1], gossip_list,
1033          gossip_list_size * sizeof(struct GNUNET_PeerIdentity));
1034
1035   GNUNET_MQ_send(mq, ev);
1036
1037   GNUNET_MQ_destroy(mq);
1038
1039
1040   return GNUNET_OK;
1041 }
1042
1043 /**
1044  * Handle PULL REPLY message from another peer.
1045  *
1046  * Check whether we sent a corresponding request and
1047  * whether this reply is the first one.
1048  *
1049  * @param cls Closure
1050  * @param channel The channel the PUSH was received over
1051  * @param channel_ctx The context associated with this channel
1052  * @param msg The message header
1053  */
1054 static int
1055 handle_peer_pull_reply (void *cls,
1056     struct GNUNET_CADET_Channel *channel,
1057     void **channel_ctx,
1058     const struct GNUNET_MessageHeader *msg)
1059 {
1060   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1061
1062   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1063   struct GNUNET_PeerIdentity *peers;
1064   uint64_t i;
1065
1066   // TODO check that we sent a request and that it is the first reply
1067
1068   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1069   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1070   for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ ) {
1071     GNUNET_array_append(pull_list, pull_list_size, peers[i]);
1072   }
1073
1074   // TODO maybe a disconnect happens here
1075   
1076   return GNUNET_OK;
1077 }
1078
1079
1080 /**
1081  * Send out PUSHes and PULLs.
1082  *
1083  * This is executed regylary.
1084  */
1085 static void
1086 do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1087 {
1088   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
1089
1090   uint64_t i;
1091   //unsigned int *n_arr;
1092   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
1093   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
1094   struct GNUNET_MQ_Envelope *ev;
1095   struct GNUNET_PeerIdentity *peer;
1096
1097   // TODO print lists, ...
1098   // TODO cleanup peer_map
1099
1100
1101   /* If the NSE has changed adapt the lists accordingly */
1102   if ( sampler_list->size != est_size )
1103     SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1104
1105   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1106
1107
1108   /* Would it make sense to have one shuffeled gossip list and then
1109    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1110    * use the rest to update sampler?
1111    * in essence get random peers with consumption */
1112
1113   /* Send PUSHes */
1114   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
1115   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %f (%f * %u) peers.\n",
1116       alpha * gossip_list_size, alpha, gossip_list_size);
1117   for ( i = 0 ; i < alpha * gossip_list_size ; i++ )
1118   { // TODO compute length
1119     peer = get_rand_peer(gossip_list, gossip_list_size);
1120     // TODO check NULL == peer
1121     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1122
1123     ev = GNUNET_MQ_msg (push_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1124     //ev = GNUNET_MQ_msg_extra();
1125     /* TODO Compute proof of work here
1126     push_msg; */
1127     push_msg->placeholder = 0;
1128     // FIXME sometimes it returns a pointer to a freed mq
1129     GNUNET_MQ_send (get_mq (peer_map, peer), ev);
1130
1131     // modify in_flags of respective peer?
1132   }
1133
1134
1135   /* Send PULL requests */
1136   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1137   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %f (%f * %u) peers.\n",
1138       beta * gossip_list_size, beta, gossip_list_size);
1139   for ( i = 0 ; i < beta * gossip_list_size ; i++ )
1140   { // TODO compute length
1141     peer = get_rand_peer(gossip_list, gossip_list_size);
1142     // TODO check empty_peer != peer
1143     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1144
1145     ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1146     //ev = GNUNET_MQ_msg_extra();
1147     pull_msg->placeholder = 0;
1148     GNUNET_MQ_send( get_mq(peer_map, peer), ev );
1149     // modify in_flags of respective peer?
1150   }
1151
1152
1153   /* Update gossip list */
1154   uint64_t r_index;
1155
1156   if ( push_list_size <= alpha * gossip_list_size &&
1157        push_list_size != 0 &&
1158        pull_list_size != 0 )
1159   {
1160     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
1161
1162     uint64_t first_border;
1163     uint64_t second_border;
1164
1165     first_border = round(alpha * gossip_list_size);
1166     for ( i = 0 ; i < first_border ; i++ )
1167     { // TODO use SAMPLER_get_n_rand_peers
1168       /* Update gossip list with peers received through PUSHes */
1169       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1170                                        push_list_size);
1171       gossip_list[i] = push_list[r_index];
1172       // TODO change the in_flags accordingly
1173     }
1174
1175     second_border = first_border + round(beta * gossip_list_size);
1176     for ( i = first_border ; i < second_border ; i++ )
1177     {
1178       /* Update gossip list with peers received through PULLs */
1179       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1180                                        pull_list_size);
1181       gossip_list[i] = pull_list[r_index];
1182       // TODO change the in_flags accordingly
1183     }
1184
1185     for ( i = second_border ; i < gossip_list_size ; i++ )
1186     {
1187       /* Update gossip list with peers from history */
1188       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1189                                        sampler_list->size);
1190       gossip_list[i] = sampler_list->peer_ids[r_index];
1191       // TODO change the in_flags accordingly
1192     }
1193
1194   }
1195   else
1196   {
1197     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
1198   }
1199   // TODO independent of that also get some peers from CADET_get_peers()?
1200
1201
1202   /* Update samplers */
1203
1204   for ( i = 0 ; i < push_list_size ; i++ )
1205   {
1206     SAMPLER_update_list(sampler_list, &push_list[i]);
1207     // TODO set in_flag?
1208   }
1209
1210   for ( i = 0 ; i < pull_list_size ; i++ )
1211   {
1212     SAMPLER_update_list(sampler_list, &pull_list[i]);
1213     // TODO set in_flag?
1214   }
1215
1216
1217   /* Empty push/pull lists */
1218   GNUNET_array_grow(push_list, push_list_size, 0);
1219   push_list_size = 0; // I guess that's not necessary but doesn't hurt
1220   GNUNET_array_grow(pull_list, pull_list_size, 0);
1221   pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1222
1223
1224   /* Schedule next round */
1225   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL );
1226   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1227 }
1228
1229 /**
1230  * Open a connection to given peer and store channel and mq.
1231  */
1232   void
1233 insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1234 {
1235   touch_mq(peer_map, id);
1236 }
1237
1238 /**
1239  * Close the connection to given peer and delete channel and mq.
1240  */
1241   void
1242 removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1243 {
1244   size_t s;
1245   struct peer_context *ctx;
1246
1247   s = SAMPLER_count_id(sampler_list, id);
1248   if ( 1 >= s ) {
1249     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
1250     {
1251       ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
1252       if (NULL != ctx->to_channel)
1253       {
1254         if (NULL != ctx->mq)
1255         {
1256           GNUNET_MQ_destroy(ctx->mq);
1257         }
1258         GNUNET_CADET_channel_destroy(ctx->to_channel);
1259       }
1260       // TODO cleanup peer
1261       GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
1262     }
1263   }
1264 }
1265
1266 static void
1267 rps_start (struct GNUNET_SERVER_Handle *server);
1268
1269 /**
1270  * This is called from GNUNET_CADET_get_peers().
1271  *
1272  * It is called on every peer(ID) that cadet somehow has contact with.
1273  * We use those to initialise the sampler.
1274  */
1275 void
1276 init_peer_cb (void *cls,
1277               const struct GNUNET_PeerIdentity *peer,
1278               int tunnel, // "Do we have a tunnel towards this peer?"
1279               unsigned int n_paths, // "Number of known paths towards this peer"
1280               unsigned int best_path) // "How long is the best path?
1281                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1282 {
1283   if ( NULL != peer )
1284   {
1285     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1286     SAMPLER_update_list(sampler_list, peer);
1287     touch_peer_ctx(peer_map, peer);
1288
1289     uint64_t i;
1290     i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size);
1291     gossip_list[i] = *peer;
1292     // TODO send push/pull to each of those peers?
1293   }
1294   else
1295   {
1296     rps_start( (struct GNUNET_SERVER_Handle *) cls);
1297   }
1298 }
1299
1300
1301
1302
1303 /**
1304  * Task run during shutdown.
1305  *
1306  * @param cls unused
1307  * @param tc unused
1308  */
1309 static void
1310 shutdown_task (void *cls,
1311                const struct GNUNET_SCHEDULER_TaskContext *tc)
1312 {
1313   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
1314
1315   if ( NULL != do_round_task )
1316   {
1317     GNUNET_SCHEDULER_cancel (do_round_task);
1318     do_round_task = NULL;
1319   }
1320
1321   GNUNET_NSE_disconnect(nse);
1322   GNUNET_CADET_disconnect(cadet_handle);
1323   GNUNET_free(own_identity);
1324   //GNUNET_free(round_interval);
1325   //GNUNET_free(gossip_list);
1326   SAMPLER_samplers_destroy(sampler_list);
1327   GNUNET_array_grow(gossip_list, gossip_list_size, 0);
1328   GNUNET_array_grow(push_list, push_list_size, 0);
1329   GNUNET_array_grow(pull_list, pull_list_size, 0);
1330   // TODO delete global data
1331 }
1332
1333
1334 /**
1335  * A client disconnected.  Remove all of its data structure entries.
1336  *
1337  * @param cls closure, NULL
1338  * @param client identification of the client
1339  */
1340 static void
1341 handle_client_disconnect (void *cls,
1342                           struct GNUNET_SERVER_Client * client)
1343 {
1344 }
1345
1346 /**
1347  * Handle the channel a peer opens to us.
1348  *
1349  * @param cls The closure
1350  * @param channel The channel the peer wants to establish
1351  * @param initiator The peer's peer ID
1352  * @param port The port the channel is being established over
1353  * @param options Further options
1354  */
1355   static void *
1356 handle_inbound_channel (void *cls,
1357                         struct GNUNET_CADET_Channel *channel,
1358                         const struct GNUNET_PeerIdentity *initiator,
1359                         uint32_t port,
1360                         enum GNUNET_CADET_ChannelOption options)
1361 {
1362   LOG(GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us.\n");
1363
1364   GNUNET_assert( NULL != channel );
1365
1366   // TODO we might not even store the from_channel
1367
1368   if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
1369     ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel;
1370     // FIXME there might already be an established channel
1371   } else {
1372     struct peer_context *ctx;
1373
1374     ctx = GNUNET_malloc (sizeof(struct peer_context));
1375     ctx->in_flags = in_other_gossip_list;
1376     ctx->mq = NULL; // TODO create mq?
1377     ctx->from_channel = channel;
1378
1379     GNUNET_CONTAINER_multipeermap_put (peer_map, initiator, ctx,
1380                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1381   }
1382   return NULL; // TODO
1383 }
1384
1385 /**
1386  * This is called when a remote peer destroys a channel.
1387  *
1388  * @param cls The closure
1389  * @param channel The channel being closed
1390  * @param channel_ctx The context associated with this channel
1391  */
1392 static void
1393 cleanup_channel(void *cls,
1394                 const struct GNUNET_CADET_Channel *channel,
1395                 void *channel_ctx)
1396 {
1397   struct GNUNET_PeerIdentity *peer;
1398   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel to remote peer was destroyed.\n");
1399
1400   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1401       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1402        // Guess simply casting isn't the nicest way...
1403   SAMPLER_reinitialise_samplers_by_value(sampler_list, peer);
1404 }
1405
1406 /**
1407  * Actually start the service.
1408  */
1409 static void
1410 rps_start (struct GNUNET_SERVER_Handle *server)
1411 {
1412   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1413     {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
1414     {NULL, NULL, 0, 0}
1415   };
1416
1417   GNUNET_SERVER_add_handlers (server, handlers);
1418   GNUNET_SERVER_disconnect_notify (server,
1419                                    &handle_client_disconnect,
1420                                    NULL);
1421   LOG(GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
1422
1423
1424
1425   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1426   LOG(GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
1427
1428   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1429                                 &shutdown_task,
1430                                 NULL);
1431 }
1432
1433
1434
1435 /**
1436  * Process statistics requests.
1437  *
1438  * @param cls closure
1439  * @param server the initialized server
1440  * @param c configuration to use
1441  */
1442 static void
1443 run (void *cls,
1444      struct GNUNET_SERVER_Handle *server,
1445      const struct GNUNET_CONFIGURATION_Handle *c)
1446 {
1447   // TODO check what this does -- copied from gnunet-boss
1448   // - seems to work as expected
1449   GNUNET_log_setup("rps", GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG), NULL);
1450
1451   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1452
1453   cfg = c;
1454
1455
1456   own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
1457
1458   GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value
1459
1460   GNUNET_assert(NULL != own_identity);
1461
1462   LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1463
1464
1465
1466   /* Get time interval from the configuration */
1467   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1468                                                         "ROUNDINTERVAL",
1469                                                         &round_interval))
1470   {
1471     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
1472     GNUNET_SCHEDULER_shutdown();
1473     return;
1474   }
1475
1476   /* Get initial size of sampler/gossip list from the configuration */
1477   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1478                                                          "INITSIZE",
1479                                                          (long long unsigned int *) &est_size))
1480   {
1481     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1482     GNUNET_SCHEDULER_shutdown();
1483     return;
1484   }
1485   LOG(GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", est_size);
1486
1487   //gossip_list_size = est_size; // TODO rename est_size
1488
1489   gossip_list = NULL;
1490
1491   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1492
1493
1494   /* connect to NSE */
1495   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
1496   // TODO check whether that was successful
1497   // TODO disconnect on shutdown
1498   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1499
1500
1501   alpha = 0.45;
1502   beta  = 0.45;
1503   // TODO initialise thresholds - ?
1504
1505   ///* Get alpha from the configuration */
1506   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1507   //                                                       "ALPHA",
1508   //                                                       &alpha))
1509   //{
1510   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
1511   //}
1512   //LOG(GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
1513  
1514   ///* Get beta from the configuration */
1515   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1516   //                                                       "BETA",
1517   //                                                       &beta))
1518   //{
1519   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1520   //}
1521   //LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1522
1523
1524   peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
1525
1526
1527   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1528     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
1529     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
1530     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
1531     {NULL, 0, 0}
1532   };
1533
1534   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1535   cadet_handle = GNUNET_CADET_connect(cfg,
1536                                     cls,
1537                                     &handle_inbound_channel,
1538                                     &cleanup_channel,
1539                                     cadet_handlers,
1540                                     ports);
1541   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1542
1543
1544   /* Initialise sampler and gossip list */
1545
1546   sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL);
1547
1548   push_list = NULL;
1549   push_list_size = 0;
1550   pull_list = NULL;
1551   pull_list_size = 0;
1552
1553
1554   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1555   GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
1556   // FIXME use magic 0000 PeerID to _start_ the service
1557
1558   // TODO send push/pull to each of those peers?
1559 }
1560
1561
1562 /**
1563  * The main function for the rps service.
1564  *
1565  * @param argc number of arguments from the command line
1566  * @param argv command line arguments
1567  * @return 0 ok, 1 on error
1568  */
1569 int
1570 main (int argc, char *const *argv)
1571 {
1572   return (GNUNET_OK ==
1573           GNUNET_SERVICE_run (argc,
1574                               argv,
1575                               "rps",
1576                               GNUNET_SERVICE_OPTION_NONE,
1577                               &run, NULL)) ? 0 : 1;
1578 }
1579
1580 /* end of gnunet-service-rps.c */