now using API calls instead of own fkts
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include <math.h>
33 #include <inttypes.h>
34
35 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
36
37 // TODO modify @brief in every file
38
39 // TODO take care that messages are not longer than 64k
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO malicious peer
48
49 // TODO Change API to accept initialisation peers
50
51 /**
52  * Our configuration.
53  */
54 static const struct GNUNET_CONFIGURATION_Handle *cfg;
55
56 /**
57  * Our own identity.
58  */
59 struct GNUNET_PeerIdentity *own_identity;
60
61
62   struct GNUNET_PeerIdentity *
63 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
64
65 /***********************************************************************
66  * Sampler
67  *
68  * WARNING: This section needs to be reviewed regarding the use of
69  * functions providing (pseudo)randomness!
70 ***********************************************************************/
71
72 // TODO care about invalid input of the caller (size 0 or less...)
73
74 // It might be interesting to formulate this independent of PeerIDs.
75
76 /**
77  * Callback that is called when a new PeerID is inserted into a sampler.
78  *
79  * @param cls the closure given alongside this function.
80  * @param id the PeerID that is inserted
81  * @param hash the hash the sampler produced of the PeerID
82  */
83 typedef void (* SAMPLER_insertCB) (void *cls,
84     const struct GNUNET_PeerIdentity *id,
85     struct GNUNET_HashCode hash);
86
87 /**
88  * Callback that is called when a new PeerID is removed from a sampler.
89  *
90  * @param cls the closure given alongside this function.
91  * @param id the PeerID that is removed
92  * @param hash the hash the sampler produced of the PeerID
93  */
94 typedef void (* SAMPLER_removeCB) (void *cls,
95     const struct GNUNET_PeerIdentity *id,
96     struct GNUNET_HashCode hash);
97
98 /**
99  * A sampler sampling PeerIDs.
100  */
101 struct Sampler
102 {
103   /**
104    * Min-wise linear permutation used by this sampler.
105    *
106    * This is an key later used by a hmac.
107    */
108   struct GNUNET_CRYPTO_AuthKey auth_key;
109
110   /**
111    * The PeerID this sampler currently samples.
112    */
113   struct GNUNET_PeerIdentity *peer_id;
114
115   /**
116    * The according hash value of this PeerID.
117    */
118   struct GNUNET_HashCode peer_id_hash;
119
120   /**
121    * Samplers are kept in a linked list.
122    */
123   struct Sampler *next;
124
125   /**
126    * Samplers are kept in a linked list.
127    */
128   struct Sampler *prev;
129
130 };
131
132 /**
133  * A n-tuple of samplers.
134  */
135 struct Samplers
136 {
137   /**
138    * Number of samplers we hold.
139    */
140   unsigned int size;
141   //size_t size;
142   
143   /**
144    * All PeerIDs in one array.
145    */
146   struct GNUNET_PeerIdentity *peer_ids;
147
148   /**
149    * Callback to be called when a peer gets inserted into a sampler.
150    */
151   SAMPLER_insertCB insertCB;
152
153   /**
154    * Closure to the insertCB.
155    */
156   void *insertCLS;
157
158   /**
159    * Callback to be called when a peer gets inserted into a sampler.
160    */
161   SAMPLER_removeCB removeCB;
162
163   /**
164    * Closure to the removeCB.
165    */
166   void *removeCLS;
167
168   /**
169    * The head of the DLL.
170    */
171   struct Sampler *head;
172
173   /**
174    * The tail of the DLL.
175    */
176   struct Sampler *tail;
177
178 };
179
180 /**
181  * (Re)Initialise given Sampler with random min-wise independent function.
182  *
183  * In this implementation this means choosing an auth_key for later use in
184  * a hmac at random.
185  *
186  * @param id pointer to the place where this sampler will store the PeerID.
187  *           This will be overwritten.
188  */
189   struct Sampler *
190 SAMPLER_init(struct GNUNET_PeerIdentity *id)
191 {
192   struct Sampler *s;
193   
194   s = GNUNET_new(struct Sampler);
195
196   // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
197   GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
198                              &(s->auth_key.key),
199                              GNUNET_CRYPTO_HASH_LENGTH);
200
201   GNUNET_assert(NULL != id);
202   s->peer_id = id;
203   memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // FIXME this should probably be NULL -- the caller has to handle those.
204   //s->peer_id = own_identity; // Maybe set to own PeerID. So we always have
205                      // a valid PeerID in the sampler.
206                      // Maybe take a PeerID as second argument.
207   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) \n",
208       GNUNET_i2s(s->peer_id), s->peer_id);
209
210   GNUNET_CRYPTO_hmac(&s->auth_key, s->peer_id,
211                      sizeof(struct GNUNET_PeerIdentity),
212                      &s->peer_id_hash);
213
214   s->prev = NULL;
215   s->next = NULL;
216
217   return s;
218 }
219
220 /**
221  * Input an PeerID into the given sampler.
222  */
223   static void
224 SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
225     SAMPLER_insertCB insertCB, void *insertCLS,
226     SAMPLER_removeCB removeCB, void *removeCLS)
227   // TODO call update herein
228 {
229   struct GNUNET_HashCode other_hash;
230
231   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
232       GNUNET_i2s(other), other);
233   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
234       GNUNET_i2s(s->peer_id), s->peer_id);
235
236   if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
237   {
238     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
239         GNUNET_i2s(other));
240     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
241         GNUNET_i2s(s->peer_id));
242   }
243   else
244   {
245     GNUNET_CRYPTO_hmac(&s->auth_key,
246         other,
247         sizeof(struct GNUNET_PeerIdentity),
248         &other_hash);
249
250     if ( NULL == s->peer_id )
251     { // Or whatever is a valid way to say
252       // "we have no PeerID at the moment"
253       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting (got NULL previously).\n",
254           GNUNET_i2s(other));
255       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
256       //s->peer_id = other;
257       s->peer_id_hash = other_hash;
258       if (NULL != insertCB)
259       {
260         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
261       }
262     }
263     else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
264     {
265       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
266           GNUNET_i2s(other));
267       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
268           GNUNET_i2s(s->peer_id));
269
270       if ( NULL != removeCB )
271       {
272         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the remove callback.\n",
273             GNUNET_i2s(s->peer_id));
274         removeCB(removeCLS, s->peer_id, s->peer_id_hash);
275       }
276
277       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
278       //s->peer_id = other;
279       s->peer_id_hash = other_hash;
280
281       if ( NULL != insertCB )
282       {
283         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with the insert callback.\n",
284             GNUNET_i2s(s->peer_id));
285         insertCB(insertCLS, s->peer_id, s->peer_id_hash);
286       }
287     }
288     else
289     {
290       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
291           GNUNET_i2s(other));
292       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
293           GNUNET_i2s(s->peer_id));
294     }
295   }
296 }
297
298 /**
299  * Gow or shrink the size of the tuple of samplers.
300  *
301  * @param samplers the samplers to grow
302  * @param new_size the new size of the samplers
303  * @param fill_up_id if growing, that has to point to a
304  *                   valid PeerID and will be used
305  *                   to initialise newly created samplers
306  */
307   void
308 SAMPLER_samplers_resize (struct Samplers * samplers,
309     unsigned int new_size,
310     struct GNUNET_PeerIdentity *fill_up_id)
311 {
312   if ( samplers->size == new_size )
313   {
314     LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to do\n");
315     return;
316   }
317
318   unsigned int old_size;
319   struct Sampler *iter;
320   uint64_t i;
321   struct Sampler *tmp;
322
323   old_size = samplers->size;
324   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> %u\n", old_size, new_size);
325
326   iter = samplers->head;
327
328   if ( new_size < old_size )
329   {
330     for ( i = new_size ; i < old_size ; i++ )
331     {/* Remove unneeded rest */
332       tmp = iter->next;
333       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", i);
334       if (NULL != samplers->removeCB)
335         samplers->removeCB(samplers->removeCLS, iter->peer_id, iter->peer_id_hash);
336       GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
337       GNUNET_free(iter);
338       iter = tmp;
339     }
340   }
341
342   GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
343   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to %p\n", samplers->peer_ids);
344
345   if ( new_size > old_size )
346   { /* Growing */
347     GNUNET_assert( NULL != fill_up_id );
348     for ( i = 0 ; i < new_size ; i++ )
349     { /* All samplers */
350       if ( i < old_size )
351       { /* Update old samplers */
352         iter->peer_id = &samplers->peer_ids[i];
353         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
354             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
355         iter = iter->next;
356       }
357       else
358       { /* Add new samplers */
359         memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct GNUNET_PeerIdentity));
360         iter = SAMPLER_init(&samplers->peer_ids[i]);
361         if (NULL != samplers->insertCB)
362         {
363           samplers->insertCB(samplers->insertCLS, iter->peer_id, iter->peer_id_hash);
364         }
365         GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
366         LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
367             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
368       }
369     }
370   }
371   else// if ( new_size < old_size )
372   { /* Shrinking */
373     for ( i = 0 ; i < new_size ; i++)
374     { /* All samplers */
375       tmp = iter->next;
376       /* Update remaining samplers */
377       iter->peer_id = &samplers->peer_ids[i];
378       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, now pointing to %p, contains %s\n",
379           i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
380
381       iter = tmp;
382     }
383   }
384
385   GNUNET_assert(samplers->size == new_size);
386   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
387 }
388
389
390 /**
391  * Initialise a tuple of samplers.
392  */
393 struct Samplers *
394 SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
395     SAMPLER_insertCB insertCB, void *insertCLS,
396     SAMPLER_removeCB removeCB, void *removeCLS)
397 {
398   struct Samplers *samplers;
399   //struct Sampler *s;
400   //uint64_t i;
401
402   samplers = GNUNET_new(struct Samplers);
403   samplers->size = 0;
404   samplers->peer_ids = NULL;
405   samplers->insertCB = insertCB;
406   samplers->insertCLS = insertCLS;
407   samplers->removeCB = removeCB;
408   samplers->removeCLS = removeCLS;
409   samplers->head = samplers->tail = NULL;
410   //samplers->peer_ids = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity);
411
412   SAMPLER_samplers_resize(samplers, init_size, id);
413
414   //for ( i = 0 ; i < init_size ; i++ )
415   //{
416   //  GNUNET_array_append(samplers->peer_ids,
417   //      samplers->size,
418   //      *id);
419   //  s = SAMPLER_init(&samplers->peer_ids[i]);
420   //  GNUNET_CONTAINER_DLL_insert_tail(samplers->head,
421   //      samplers->tail,
422   //      s);
423   //}
424   //samplers->size = init_size;
425   GNUNET_assert(init_size == samplers->size);
426   return samplers;
427 }
428
429
430 /**
431  * A fuction to update every sampler in the given list
432  */
433   static void
434 SAMPLER_update_list(struct Samplers *samplers, const struct GNUNET_PeerIdentity *id)
435 {
436   struct Sampler *iter;
437
438   iter = samplers->head;
439   while ( NULL != iter->next )
440   {
441     SAMPLER_next(iter, id,
442         samplers->insertCB, samplers->insertCLS,
443         samplers->removeCB, samplers->removeCLS);
444     iter = iter->next;
445   }
446   
447 }
448
449 /**
450  * Get one random peer out of the sampled peers.
451  *
452  * We might want to reinitialise this sampler after giving the
453  * corrsponding peer to the client.
454  */
455   const struct GNUNET_PeerIdentity* 
456 SAMPLER_get_rand_peer (struct Samplers *samplers)
457 {
458   LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
459
460   if ( 0 == samplers->size )
461   {
462     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID %s\n", GNUNET_i2s(own_identity));
463     return own_identity;
464   }
465   else
466   {
467     const struct GNUNET_PeerIdentity *peer;
468
469     peer = get_rand_peer(samplers->peer_ids, samplers->size);
470     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer));
471     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", GNUNET_i2s(own_identity));
472
473     return peer;
474   }
475 }
476
477 /**
478  * Get n random peers out of the sampled peers.
479  *
480  * We might want to reinitialise this sampler after giving the
481  * corrsponding peer to the client.
482  * Random with or without consumption?
483  */
484   const struct GNUNET_PeerIdentity*  // TODO give back simple array
485 SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
486 {
487   // TODO check if we have too much (distinct) sampled peers
488   // If we are not ready yet maybe schedule for later
489   struct GNUNET_PeerIdentity *peers;
490   uint64_t i;
491   
492   peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
493
494   for ( i = 0 ; i < n ; i++ ) {
495     //peers[i] = SAMPLER_get_rand_peer(samplers);
496     memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct GNUNET_PeerIdentity));
497   }
498
499   // TODO something else missing?
500   return peers;
501 }
502
503 /**
504  * Counts how many Samplers currently hold a given PeerID.
505  */
506   uint64_t
507 SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity *id )
508 {
509   struct Sampler *iter;
510   uint64_t count;
511
512   iter = samplers->head;
513   count = 0;
514   while ( NULL != iter )
515   {
516     if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
517       count++;
518     iter = iter->next;
519   }
520   return count;
521 }
522
523
524 /**
525  * Cleans the samplers.
526  * 
527  * @param samplers the samplers to clean up.
528  */
529   void
530 SAMPLER_samplers_destroy (struct Samplers *samplers)
531 {
532   //struct Sampler *iter;
533   //struct Sampler *tmp;
534
535   SAMPLER_samplers_resize(samplers, 0, NULL);
536   //iter = samplers->head;
537   //while (NULL != iter) // use _samplers_resize(0)?
538   //{
539   //  iter = iter->next;
540   //  tmp = iter->prev;
541   //  GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter->prev);
542   //  GNUNET_free(tmp);
543   //}
544   //GNUNET_array_grow(samplers->peer_ids, samplers->size, 0);
545   GNUNET_free(samplers);
546 }
547
548 /***********************************************************************
549  * /Sampler
550 ***********************************************************************/
551
552
553
554 /***********************************************************************
555  * Housekeeping with peers
556 ***********************************************************************/
557
558 /**
559  * Struct used to store the context of a connected client.
560  */
561 struct client_ctx
562 {
563   /**
564    * The message queue to communicate with the client.
565    */
566   struct GNUNET_MQ_Handle *mq;
567 };
568
569 /**
570  * Used to keep track in what lists single peerIDs are.
571  */
572 enum in_list_flag // probably unneeded
573 {
574   in_other_sampler_list = 0x1,
575   in_other_gossip_list  = 0x2, // unneeded?
576   in_own_sampler_list   = 0x4,
577   in_own_gossip_list    = 0x8 // unneeded?
578 };
579
580 /**
581  * Struct used to keep track of other peer's status
582  *
583  * This is stored in a multipeermap.
584  */
585 struct peer_context
586 {
587   /**
588    * In own gossip/sampler list, in other's gossip/sampler list
589    */
590   uint32_t in_flags; // unneeded?
591
592   /**
593    * Message queue open to client
594    */
595   struct GNUNET_MQ_Handle *mq;
596
597   /**
598    * Channel open to client.
599    */
600   struct GNUNET_CADET_Channel *to_channel;
601
602   /**
603    * Channel open from client.
604    */
605   struct GNUNET_CADET_Channel *from_channel; // unneeded
606
607   /**
608    * This is pobably followed by 'statistical' data (when we first saw
609    * him, how did we get his ID, how many pushes (in a timeinterval),
610    * ...)
611    */
612 };
613
614 /***********************************************************************
615  * /Housekeeping with peers
616 ***********************************************************************/
617
618 /**
619  * Set of all peers to keep track of them.
620  */
621 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
622
623
624 /**
625  * The samplers.
626  */
627 struct Samplers *sampler_list;
628
629
630 /**
631  * The gossiped list of peers.
632  */
633 struct GNUNET_PeerIdentity *gossip_list;
634
635 /**
636  * Size of the gossiped list
637  */
638 unsigned int gossip_list_size;
639
640
641 /**
642  * The estimated size of the network.
643  *
644  * Influenced by the stdev.
645  */
646 unsigned int est_size;
647 //size_t est_size;
648
649
650 /**
651  * Percentage of total peer number in the gossip list
652  * to send random PUSHes to
653  */
654 float alpha;
655
656 /**
657  * Percentage of total peer number in the gossip list
658  * to send random PULLs to
659  */
660 float beta;
661
662 /**
663  * The percentage gamma of history updates.
664  * Simply 1 - alpha - beta
665  */
666
667
668
669
670 /**
671  * Identifier for the main task that runs periodically.
672  */
673 struct GNUNET_SCHEDULER_Task * do_round_task;
674
675 /**
676  * Time inverval the do_round task runs in.
677  */
678 struct GNUNET_TIME_Relative round_interval;
679
680
681
682 /**
683  * List to store peers received through pushes temporary.
684  */
685 struct GNUNET_PeerIdentity *push_list;
686
687 /**
688  * Size of the push_list;
689  */
690 unsigned int push_list_size;
691 //size_t push_list_size;
692
693 /**
694  * List to store peers received through pulls temporary.
695  */
696 struct GNUNET_PeerIdentity *pull_list;
697
698 /**
699  * Size of the pull_list;
700  */
701 unsigned int pull_list_size;
702 //size_t pull_list_size;
703
704
705 /**
706  * Handler to NSE.
707  */
708 struct GNUNET_NSE_Handle *nse;
709
710 /**
711  * Handler to CADET.
712  */
713 struct GNUNET_CADET_Handle *cadet_handle;
714
715
716 /***********************************************************************
717  * Util functions
718 ***********************************************************************/
719
720 /**
721  * Get random peer from the gossip list.
722  */
723   struct GNUNET_PeerIdentity *
724 get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size)
725 {
726   uint64_t r_index;
727   struct GNUNET_PeerIdentity *peer;
728
729   // TODO find a better solution.
730   // FIXME if we have only own ID in gossip list this will block
731   // but then we might have a problem nevertheless ?
732
733   do {
734
735     /**;
736      * Choose the r_index of the peer we want to return
737      * at random from the interval of the gossip list
738      */
739     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
740                                      size);
741
742     peer = &(peer_list[r_index]);
743   } while ( own_identity == peer || NULL == peer );
744
745   return peer;
746 }
747
748 /**
749  * Make sure the context of a given peer exists.
750  */
751   void
752 touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
753 {
754   struct peer_context *ctx;
755
756   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
757   {
758     ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
759   }
760   else
761   {
762     ctx = GNUNET_malloc(sizeof(struct peer_context));
763     ctx->in_flags = 0;
764     ctx->mq = NULL;
765     ctx->to_channel = NULL;
766     ctx->from_channel = NULL;
767     GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
768   }
769 }
770
771 /**
772  * Get the context of a peer. If not existing, create.
773  */
774   struct peer_context *
775 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
776 {
777   struct peer_context *ctx;
778
779   touch_peer_ctx(peer_map, peer);
780   ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
781   return ctx;
782 }
783
784 /**
785  * Get the channel of a peer. If not existing, create.
786  */
787   void
788 touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
789 {
790   struct peer_context *ctx;
791
792   ctx = get_peer_ctx (peer_map, peer);
793   if (NULL == ctx->to_channel)
794   {
795     ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
796                                                   GNUNET_RPS_CADET_PORT,
797                                                   GNUNET_CADET_OPTION_RELIABLE);
798     //TODO do I have to explicitly put it in the peer_map?
799   }
800 }
801
802 /**
803  * Get the channel of a peer. If not existing, create.
804  */
805   struct GNUNET_CADET_Channel *
806 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer)
807 {
808   struct peer_context *ctx;
809
810   ctx = get_peer_ctx (peer_map, peer);
811   touch_channel(peer_map, peer);
812   return ctx->to_channel;
813 }
814
815 /**
816  * Make sure the mq for a given peer exists.
817  *
818  * If we already have a message queue open to this client,
819  * simply return it, otherways create one.
820  */
821   void
822 touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
823 {
824   struct peer_context *ctx;
825
826   ctx = get_peer_ctx(peer_map, peer_id);
827   if (NULL == ctx->mq)
828   {
829     touch_channel(peer_map, peer_id);
830     ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
831     //TODO do I have to explicitly put it in the peer_map?
832     //GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
833     //                                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
834   }
835 }
836
837 /**
838  * Get the message queue of a specific peer.
839  *
840  * If we already have a message queue open to this client,
841  * simply return it, otherways create one.
842  */
843   struct GNUNET_MQ_Handle *
844 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer_id)
845 {
846   struct peer_context *ctx;
847
848   ctx = get_peer_ctx(peer_map, peer_id);
849   touch_mq(peer_map, peer_id);
850
851   return ctx->mq;
852 }
853
854 /***********************************************************************
855  * /Util functions
856 ***********************************************************************/
857
858 /**
859  * Function called by NSE.
860  *
861  * Updates sizes of sampler list and gossip list and adapt those lists
862  * accordingly.
863  */
864   void
865 nse_callback(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
866 {
867   double estimate;
868   //double scale; // TODO this might go gloabal/config
869
870   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a ns estimate - logest: %f, std_dev: %f\n", logestimate, std_dev);
871   //scale = .01;
872   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
873   // GNUNET_NSE_log_estimate_to_n (logestimate);
874   estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
875   if ( 0 < estimate ) {
876     LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
877     est_size = estimate;
878   } else {
879     LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
880   }
881 }
882
883 /**
884  * Handle RPS request from the client.
885  *
886  * @param cls closure
887  * @param client identification of the client
888  * @param message the actual message
889  */
890 static void
891 // TODO rename
892 handle_cs_request (void *cls,
893             struct GNUNET_SERVER_Client *client,
894             const struct GNUNET_MessageHeader *message)
895 {
896   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested (a) random peer(s).\n");
897
898   struct GNUNET_RPS_CS_RequestMessage *msg;
899   //unsigned int n_arr[sampler_list->size];// =
900     //GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
901   //struct GNUNET_MQ_Handle *mq;
902   struct client_ctx *cli_ctx;
903   struct GNUNET_MQ_Envelope *ev;
904   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
905   uint64_t num_peers;
906   //uint64_t i;
907
908   // TODO
909   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
910   // Does not work because the compiler seems not to find it.
911   cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
912   if ( NULL == cli_ctx ) {
913     cli_ctx = GNUNET_new(struct client_ctx);
914     cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
915     GNUNET_SERVER_client_set_user_context(client, cli_ctx);
916   }
917   
918   // TODO How many peers do we give back?
919   // Wait until we have enough random peers?
920
921   ev = GNUNET_MQ_msg_extra(out_msg,
922                            GNUNET_ntohll(msg->num_peers) * sizeof(struct GNUNET_PeerIdentity),
923                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
924   out_msg->num_peers = msg->num_peers; // No conversion between network and host order
925
926   num_peers = GNUNET_ntohll(msg->num_peers);
927   //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
928   memcpy(&out_msg[1],
929       SAMPLER_get_n_rand_peers(sampler_list, num_peers),
930       num_peers * sizeof(struct GNUNET_PeerIdentity));
931   
932   GNUNET_MQ_send(cli_ctx->mq, ev);
933   //GNUNET_MQ_destroy(mq);
934
935   GNUNET_SERVER_receive_done (client,
936                               GNUNET_OK);
937 }
938
939 /**
940  * Handle a PUSH message from another peer.
941  *
942  * Check the proof of work and store the PeerID
943  * in the temporary list for pushed PeerIDs.
944  *
945  * @param cls Closure
946  * @param channel The channel the PUSH was received over
947  * @param channel_ctx The context associated with this channel
948  * @param msg The message header
949  */
950 static int
951 handle_peer_push (void *cls,
952     struct GNUNET_CADET_Channel *channel,
953     void **channel_ctx,
954     const struct GNUNET_MessageHeader *msg)
955 {
956   LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
957
958   struct GNUNET_PeerIdentity *peer;
959
960   // TODO check the proof of work
961   // and check limit for PUSHes
962   // IF we count per peer PUSHes
963   // maybe remove from gossip/sampler list
964   
965   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( channel, GNUNET_CADET_OPTION_PEER );
966   
967   /* Add the sending peer to the push_list */
968   LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", push_list_size);
969   GNUNET_array_append(push_list, push_list_size, *peer);
970   LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", push_list_size);
971
972   return GNUNET_OK;
973 }
974
975 /**
976  * Handle PULL REQUEST request message from another peer.
977  *
978  * Reply with the gossip list of PeerIDs.
979  *
980  * @param cls Closure
981  * @param channel The channel the PUSH was received over
982  * @param channel_ctx The context associated with this channel
983  * @param msg The message header
984  */
985 static int
986 handle_peer_pull_request (void *cls,
987     struct GNUNET_CADET_Channel *channel,
988     void **channel_ctx,
989     const struct GNUNET_MessageHeader *msg)
990 {
991
992   struct GNUNET_PeerIdentity *peer;
993   struct GNUNET_MQ_Handle *mq;
994   //struct GNUNET_RPS_P2P_PullRequestMessage *in_msg;
995   struct GNUNET_MQ_Envelope *ev;
996   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
997
998   // TODO find some way to keep one peer from spamming with pull requests
999   // allow only one request per time interval ?
1000   // otherwise remove from peerlist?
1001
1002   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, GNUNET_CADET_OPTION_PEER);
1003   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", GNUNET_i2s(peer));
1004
1005   mq = GNUNET_CADET_mq_create(channel); // TODO without mq?
1006   //mq = get_mq(peer_map, peer);
1007
1008   //in_msg = (struct GNUNET_RPS_P2P_PullRequestMessage *) msg;
1009   // TODO how many peers do we actually send?
1010   // GNUNET_ntohll(in_msg->num_peers)
1011   ev = GNUNET_MQ_msg_extra(out_msg,
1012                            gossip_list_size * sizeof(struct GNUNET_PeerIdentity),
1013                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1014   out_msg->num_peers = GNUNET_htonll(gossip_list_size);
1015   memcpy(&out_msg[1], gossip_list,
1016          gossip_list_size * sizeof(struct GNUNET_PeerIdentity));
1017
1018   GNUNET_MQ_send(mq, ev);
1019
1020   GNUNET_MQ_destroy(mq);
1021
1022
1023   return GNUNET_OK;
1024 }
1025
1026 /**
1027  * Handle PULL REPLY message from another peer.
1028  *
1029  * Check whether we sent a corresponding request and
1030  * whether this reply is the first one.
1031  *
1032  * @param cls Closure
1033  * @param channel The channel the PUSH was received over
1034  * @param channel_ctx The context associated with this channel
1035  * @param msg The message header
1036  */
1037 static int
1038 handle_peer_pull_reply (void *cls,
1039     struct GNUNET_CADET_Channel *channel,
1040     void **channel_ctx,
1041     const struct GNUNET_MessageHeader *msg)
1042 {
1043   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1044
1045   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1046   struct GNUNET_PeerIdentity *peers;
1047   uint64_t i;
1048
1049   // TODO check that we sent a request and that it is the first reply
1050
1051   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1052   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1053   for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ ) {
1054     GNUNET_array_append(pull_list, pull_list_size, peers[i]);
1055   }
1056
1057   // TODO maybe a disconnect happens here
1058   
1059   return GNUNET_OK;
1060 }
1061
1062
1063 /**
1064  * Send out PUSHes and PULLs.
1065  *
1066  * This is executed regylary.
1067  */
1068 static void
1069 do_round(void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1070 {
1071   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round\n");
1072
1073   uint64_t i;
1074   //unsigned int *n_arr;
1075   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
1076   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty message
1077   struct GNUNET_MQ_Envelope *ev;
1078   struct GNUNET_PeerIdentity *peer;
1079
1080   // TODO print lists, ...
1081   // TODO cleanup peer_map
1082
1083
1084   /* If the NSE has changed adapt the lists accordingly */
1085   if ( sampler_list->size != est_size )
1086     SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
1087
1088   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1089
1090
1091   /* Would it make sense to have one shuffeled gossip list and then
1092    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1093    * use the rest to update sampler?
1094    * in essence get random peers with consumption */
1095
1096   /* Send PUSHes */
1097   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size);
1098   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %f (%f * %u) peers.\n",
1099       alpha * gossip_list_size, alpha, gossip_list_size);
1100   for ( i = 0 ; i < alpha * gossip_list_size ; i++ ) { // TODO compute length
1101     peer = get_rand_peer(gossip_list, gossip_list_size);
1102     // TODO check NULL == peer
1103     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1104
1105     ev = GNUNET_MQ_msg(push_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1106     //ev = GNUNET_MQ_msg_extra();
1107     /* TODO Compute proof of work here
1108     push_msg; */
1109     push_msg->placeholder = 0;
1110     GNUNET_MQ_send( get_mq(peer_map, peer), ev );
1111
1112     // TODO modify in_flags of respective peer?
1113   }
1114
1115
1116   /* Send PULL requests */
1117   // TODO
1118   //n_arr = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1119   LOG(GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %f (%f * %u) peers.\n",
1120       beta * gossip_list_size, beta, gossip_list_size);
1121   for ( i = 0 ; i < beta * gossip_list_size ; i++ ){ // TODO compute length
1122     peer = get_rand_peer(gossip_list, gossip_list_size);
1123     // TODO check NULL == peer
1124     LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s(peer));
1125
1126     ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1127     //ev = GNUNET_MQ_msg_extra();
1128     pull_msg->placeholder = 0;
1129     GNUNET_MQ_send( get_mq(peer_map, peer), ev );
1130     // TODO modify in_flags of respective peer?
1131   }
1132
1133
1134   /* Update gossip list */
1135   uint64_t r_index;
1136
1137   if ( push_list_size <= alpha * gossip_list_size &&
1138        push_list_size != 0 &&
1139        pull_list_size != 0 )
1140   {
1141     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
1142
1143     uint64_t first_border;
1144     uint64_t second_border;
1145
1146     first_border = round(alpha * gossip_list_size);
1147     for ( i = 0 ; i < first_border ; i++ )
1148     { // TODO use SAMPLER_get_n_rand_peers
1149       /* Update gossip list with peers received through PUSHes */
1150       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1151                                        push_list_size);
1152       gossip_list[i] = push_list[r_index];
1153       // TODO change the in_flags accordingly
1154     }
1155
1156     second_border = first_border + round(beta * gossip_list_size);
1157     for ( i = first_border ; i < second_border ; i++ )
1158     {
1159       /* Update gossip list with peers received through PULLs */
1160       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1161                                        pull_list_size);
1162       gossip_list[i] = pull_list[r_index];
1163       // TODO change the in_flags accordingly
1164     }
1165
1166     for ( i = second_border ; i < gossip_list_size ; i++ )
1167     {
1168       /* Update gossip list with peers from history */
1169       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
1170                                        sampler_list->size);
1171       gossip_list[i] = sampler_list->peer_ids[r_index];
1172       // TODO change the in_flags accordingly
1173     }
1174
1175   }
1176   else
1177   {
1178     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
1179   }
1180   // TODO independent of that also get some peers from CADET_get_peers()?
1181
1182
1183   /* Update samplers */
1184
1185   for ( i = 0 ; i < push_list_size ; i++ )
1186   {
1187     SAMPLER_update_list(sampler_list, &push_list[i]);
1188     // TODO set in_flag?
1189   }
1190
1191   for ( i = 0 ; i < pull_list_size ; i++ )
1192   {
1193     SAMPLER_update_list(sampler_list, &pull_list[i]);
1194     // TODO set in_flag?
1195   }
1196
1197
1198   /* Empty push/pull lists */
1199   GNUNET_array_grow(push_list, push_list_size, 0);
1200   push_list_size = 0; // I guess that's not necessary but doesn't hurt
1201   GNUNET_array_grow(pull_list, pull_list_size, 0);
1202   pull_list_size = 0; // I guess that's not necessary but doesn't hurt
1203
1204
1205   /* Schedule next round */
1206   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL );
1207   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1208 }
1209
1210 /**
1211  * Open a connection to given peer and store channel and mq.
1212  */
1213   void
1214 insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1215 {
1216   touch_mq(peer_map, id);
1217 }
1218
1219 /**
1220  * Close the connection to given peer and delete channel and mq.
1221  */
1222   void
1223 removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode hash)
1224 {
1225   size_t s;
1226   struct peer_context *ctx;
1227
1228   s = SAMPLER_count_id(sampler_list, id);
1229   if ( 1 >= s ) {
1230     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
1231     {
1232       ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
1233       if (NULL != ctx->to_channel)
1234       {
1235         if (NULL != ctx->mq)
1236         {
1237           GNUNET_MQ_destroy(ctx->mq);
1238         }
1239         GNUNET_CADET_channel_destroy(ctx->to_channel);
1240       }
1241       // TODO cleanup peer
1242       GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
1243     }
1244   }
1245 }
1246
1247 static void
1248 rps_start (struct GNUNET_SERVER_Handle *server);
1249
1250 /**
1251  * This is called from GNUNET_CADET_get_peers().
1252  *
1253  * It is called on every peer(ID) that cadet somehow has contact with.
1254  * We use those to initialise the sampler.
1255  */
1256 void
1257 init_peer_cb (void *cls,
1258               const struct GNUNET_PeerIdentity *peer,
1259               int tunnel, // "Do we have a tunnel towards this peer?"
1260               unsigned int n_paths, // "Number of known paths towards this peer"
1261               unsigned int best_path) // "How long is the best path?
1262                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1263 {
1264   if ( NULL != peer )
1265   {
1266     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", GNUNET_i2s(peer), peer);
1267     SAMPLER_update_list(sampler_list, peer);
1268     touch_peer_ctx(peer_map, peer);
1269
1270     uint64_t i;
1271     i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, gossip_list_size);
1272     gossip_list[i] = *peer;
1273     // TODO send push/pull to each of those peers?
1274   }
1275   else
1276   {
1277     rps_start( (struct GNUNET_SERVER_Handle *) cls);
1278   }
1279 }
1280
1281
1282
1283
1284 /**
1285  * Task run during shutdown.
1286  *
1287  * @param cls unused
1288  * @param tc unused
1289  */
1290 static void
1291 shutdown_task (void *cls,
1292                const struct GNUNET_SCHEDULER_TaskContext *tc)
1293 {
1294   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
1295
1296   if ( NULL != do_round_task )
1297   {
1298     GNUNET_SCHEDULER_cancel (do_round_task);
1299     do_round_task = NULL;
1300   }
1301
1302   GNUNET_NSE_disconnect(nse);
1303   GNUNET_CADET_disconnect(cadet_handle);
1304   GNUNET_free(own_identity);
1305   //GNUNET_free(round_interval);
1306   //GNUNET_free(gossip_list);
1307   SAMPLER_samplers_destroy(sampler_list);
1308   GNUNET_array_grow(gossip_list, gossip_list_size, 0);
1309   GNUNET_array_grow(push_list, push_list_size, 0);
1310   GNUNET_array_grow(pull_list, pull_list_size, 0);
1311   // TODO delete global data
1312 }
1313
1314
1315 /**
1316  * A client disconnected.  Remove all of its data structure entries.
1317  *
1318  * @param cls closure, NULL
1319  * @param client identification of the client
1320  */
1321 static void
1322 handle_client_disconnect (void *cls,
1323                           struct GNUNET_SERVER_Client * client)
1324 {
1325   // TODO reinitialise that sampler
1326 }
1327
1328 /**
1329  * Handle the channel a peer opens to us.
1330  *
1331  * @param cls The closure
1332  * @param channel The channel the peer wants to establish
1333  * @param initiator The peer's peer ID
1334  * @param port The port the channel is being established over
1335  * @param options Further options
1336  */
1337   static void *
1338 handle_inbound_channel (void *cls,
1339                         struct GNUNET_CADET_Channel *channel,
1340                         const struct GNUNET_PeerIdentity *initiator,
1341                         uint32_t port,
1342                         enum GNUNET_CADET_ChannelOption options)
1343 {
1344   LOG(GNUNET_ERROR_TYPE_DEBUG, "New channel was established to us.\n");
1345
1346   GNUNET_assert( NULL != channel );
1347
1348   // TODO we might not even store the from_channel
1349
1350   if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
1351     ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, initiator ))->from_channel = channel;
1352     // FIXME there might already be an established channel
1353   } else {
1354     struct peer_context *ctx;
1355
1356     ctx = GNUNET_malloc( sizeof(struct peer_context));
1357     ctx->in_flags = in_other_gossip_list;
1358     ctx->mq = NULL; // TODO create mq?
1359     ctx->from_channel = channel;
1360
1361     GNUNET_CONTAINER_multipeermap_put( peer_map, initiator, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1362   }
1363   return NULL; // TODO
1364 }
1365
1366 /**
1367  * This is called when a remote peer destroys a channel.
1368  *
1369  * @param cls The closure
1370  * @param channel The channel being closed
1371  * @param channel_ctx The context associated with this channel
1372  */
1373 static void
1374 cleanup_channel(void *cls,
1375                 const struct GNUNET_CADET_Channel *channel,
1376                 void *channel_ctx)
1377 {
1378   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n");
1379   // TODO test whether that was a peer in the samplers/a peer we opened a connection to
1380   //      and if so, reinitialise the sampler
1381 }
1382
1383 /**
1384  * Actually start the service.
1385  */
1386 static void
1387 rps_start (struct GNUNET_SERVER_Handle *server)
1388 {
1389   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1390     {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
1391     {NULL, NULL, 0, 0}
1392   };
1393
1394   GNUNET_SERVER_add_handlers (server, handlers);
1395   GNUNET_SERVER_disconnect_notify (server,
1396                                    &handle_client_disconnect,
1397                                    NULL);
1398   LOG(GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
1399
1400
1401
1402   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, NULL);
1403   LOG(GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
1404
1405   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1406                                 &shutdown_task,
1407                                 NULL);
1408 }
1409
1410
1411
1412 /**
1413  * Process statistics requests.
1414  *
1415  * @param cls closure
1416  * @param server the initialized server
1417  * @param c configuration to use
1418  */
1419 static void
1420 run (void *cls,
1421      struct GNUNET_SERVER_Handle *server,
1422      const struct GNUNET_CONFIGURATION_Handle *c)
1423 {
1424   // TODO check what this does -- copied from gnunet-boss
1425   // - seems to work as expected
1426   GNUNET_log_setup("rps", GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG), NULL);
1427
1428   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
1429
1430   cfg = c;
1431
1432
1433   own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
1434
1435   GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return value
1436
1437   GNUNET_assert(NULL != own_identity);
1438
1439   LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", GNUNET_i2s(own_identity), own_identity);
1440
1441
1442
1443   /* Get time interval from the configuration */
1444   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
1445                                                         "ROUNDINTERVAL",
1446                                                         &round_interval))
1447   {
1448     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
1449     GNUNET_SCHEDULER_shutdown();
1450     return;
1451   }
1452
1453   /* Get initial size of sampler/gossip list from the configuration */
1454   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
1455                                                          "INITSIZE",
1456                                                          (long long unsigned int *) &est_size))
1457   {
1458     LOG(GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
1459     GNUNET_SCHEDULER_shutdown();
1460     return;
1461   }
1462   LOG(GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", est_size);
1463
1464   //gossip_list_size = est_size; // TODO rename est_size
1465
1466   gossip_list = NULL;
1467
1468   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
1469
1470
1471   /* connect to NSE */
1472   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
1473   // TODO check whether that was successful
1474   // TODO disconnect on shutdown
1475   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
1476
1477
1478   alpha = 0.45;
1479   beta  = 0.45;
1480   // TODO initialise thresholds - ?
1481
1482   ///* Get alpha from the configuration */
1483   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1484   //                                                       "ALPHA",
1485   //                                                       &alpha))
1486   //{
1487   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No ALPHA specified in the config\n");
1488   //}
1489   //LOG(GNUNET_ERROR_TYPE_DEBUG, "ALPHA is %f\n", alpha);
1490  
1491   ///* Get beta from the configuration */
1492   //if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_float (cfg, "RPS",
1493   //                                                       "BETA",
1494   //                                                       &beta))
1495   //{
1496   //  LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
1497   //}
1498   //LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
1499
1500
1501   peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
1502
1503
1504   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
1505     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
1506     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
1507     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
1508     {NULL, 0, 0}
1509   };
1510
1511   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
1512   cadet_handle = GNUNET_CADET_connect(cfg,
1513                                     cls,
1514                                     &handle_inbound_channel,
1515                                     &cleanup_channel,
1516                                     cadet_handlers,
1517                                     ports);
1518   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
1519
1520
1521   /* Initialise sampler and gossip list */
1522
1523   sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, removeCB, NULL);
1524
1525   push_list = NULL;
1526   push_list_size = 0;
1527   pull_list = NULL;
1528   pull_list_size = 0;
1529
1530
1531   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
1532   GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
1533   // FIXME use magic 0000 PeerID to _start_ the service
1534
1535   // TODO send push/pull to each of those peers?
1536 }
1537
1538
1539 /**
1540  * The main function for the rps service.
1541  *
1542  * @param argc number of arguments from the command line
1543  * @param argv command line arguments
1544  * @return 0 ok, 1 on error
1545  */
1546 int
1547 main (int argc, char *const *argv)
1548 {
1549   return (GNUNET_OK ==
1550           GNUNET_SERVICE_run (argc,
1551                               argv,
1552                               "rps",
1553                               GNUNET_SERVICE_OPTION_NONE,
1554                               &run, NULL)) ? 0 : 1;
1555 }
1556
1557 /* end of gnunet-service-rps.c */