b3151a567d35f28d4966c859b716550454cff25f
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include "gnunet-service-rps_sampler.h"
33
34 #include <math.h>
35 #include <inttypes.h>
36
37 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
38
39 // TODO modify @brief in every file
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO connect to friends
48
49 // TODO store peers somewhere
50
51 // TODO ignore list?
52
53 // hist_size_init, hist_size_max
54
55 /**
56  * Our configuration.
57  */
58 static const struct GNUNET_CONFIGURATION_Handle *cfg;
59
60 /**
61  * Our own identity.
62  */
63 static struct GNUNET_PeerIdentity own_identity;
64
65
66   struct GNUNET_PeerIdentity *
67 get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list, unsigned int size,
68                            const struct GNUNET_PeerIdentity *ignore_list, unsigned int ignore_size);
69
70
71 /***********************************************************************
72  * Housekeeping with peers
73 ***********************************************************************/
74
75 /**
76  * Struct used to store the context of a connected client.
77  */
78 struct client_ctx
79 {
80   /**
81    * The message queue to communicate with the client.
82    */
83   struct GNUNET_MQ_Handle *mq;
84 };
85
86 /**
87  * Used to keep track in what lists single peerIDs are.
88  */
89 enum PeerFlags
90 {
91   PULL_REPLY_PENDING   = 0x01,
92   IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
93   IN_OWN_SAMPLER_LIST  = 0x04, // unneeded?
94   IN_OWN_GOSSIP_LIST   = 0x08, // unneeded?
95
96   /**
97    * We set this bit when we can be sure the other peer is/was live.
98    */
99   VALID                = 0x10
100 };
101
102
103 /**
104  * Functions of this type can be used to be stored at a peer for later execution.
105  */
106 typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
107
108 /**
109  * Outstanding operation on peer consisting of callback and closure
110  */
111 struct PeerOutstandingOp
112 {
113   /**
114    * Callback
115    */
116   PeerOp op;
117
118   /**
119    * Closure
120    */
121   void *op_cls;
122 };
123
124
125 /**
126  * Struct used to keep track of other peer's status
127  *
128  * This is stored in a multipeermap.
129  */
130 struct PeerContext
131 {
132   /**
133    * In own gossip/sampler list, in other's gossip/sampler list
134    */
135   uint32_t peer_flags;
136
137   /**
138    * Message queue open to client
139    */
140   struct GNUNET_MQ_Handle *mq;
141
142   /**
143    * Channel open to client.
144    */
145   struct GNUNET_CADET_Channel *send_channel;
146
147   /**
148    * Channel open from client.
149    */
150   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
151
152   /**
153    * Array of outstanding operations on this peer.
154    */
155   struct PeerOutstandingOp *outstanding_ops;
156
157   /**
158    * Number of outstanding operations.
159    */
160   unsigned int num_outstanding_ops;
161   //size_t num_outstanding_ops;
162
163   /**
164    * Handle to the callback given to cadet_ntfy_tmt_rdy()
165    *
166    * To be canceled on shutdown.
167    */
168   struct GNUNET_CADET_TransmitHandle *is_live_task;
169
170   /**
171    * Identity of the peer
172    */
173   struct GNUNET_PeerIdentity peer_id;
174
175   /**
176    * This is pobably followed by 'statistical' data (when we first saw
177    * him, how did we get his ID, how many pushes (in a timeinterval),
178    * ...)
179    */
180 };
181
182 /***********************************************************************
183  * /Housekeeping with peers
184 ***********************************************************************/
185
186
187
188
189
190 /***********************************************************************
191  * Globals
192 ***********************************************************************/
193
194 /**
195  * Sampler used for the Brahms protocol itself.
196  */
197 static struct RPS_Sampler *prot_sampler;
198
199 /**
200  * Sampler used for the clients.
201  */
202 static struct RPS_Sampler *client_sampler;
203
204 /**
205  * Set of all peers to keep track of them.
206  */
207 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
208
209
210 /**
211  * The gossiped list of peers.
212  */
213 static struct GNUNET_PeerIdentity *gossip_list;
214
215 /**
216  * Size of the gossiped list
217  */
218 //static unsigned int gossip_list_size;
219 static uint32_t gossip_list_size;
220
221
222 /**
223  * The size of sampler we need to be able to satisfy the client's need of
224  * random peers.
225  */
226 static unsigned int sampler_size_client_need;
227
228 /**
229  * The size of sampler we need to be able to satisfy the Brahms protocol's
230  * need of random peers.
231  *
232  * This is directly taken as the #gossip_list_size on update of the
233  * #gossip_list
234  *
235  * This is one minimum size the sampler grows to.
236  */
237 static unsigned int sampler_size_est_need;
238
239
240 /**
241  * Percentage of total peer number in the gossip list
242  * to send random PUSHes to
243  */
244 static float alpha;
245
246 /**
247  * Percentage of total peer number in the gossip list
248  * to send random PULLs to
249  */
250 static float beta;
251
252 /**
253  * The percentage gamma of history updates.
254  * Simply 1 - alpha - beta
255  */
256
257
258 /**
259  * Identifier for the main task that runs periodically.
260  */
261 static struct GNUNET_SCHEDULER_Task *do_round_task;
262
263 /**
264  * Time inverval the do_round task runs in.
265  */
266 static struct GNUNET_TIME_Relative round_interval;
267
268
269
270 /**
271  * List to store peers received through pushes temporary.
272  *
273  * TODO -> multipeermap
274  */
275 static struct GNUNET_PeerIdentity *push_list;
276
277 /**
278  * Size of the push_list;
279  */
280 static unsigned int push_list_size;
281 //size_t push_list_size;
282
283 /**
284  * List to store peers received through pulls temporary.
285  *
286  * TODO -> multipeermap
287  */
288 static struct GNUNET_PeerIdentity *pull_list;
289
290 /**
291  * Size of the pull_list;
292  */
293 static unsigned int pull_list_size;
294 //size_t pull_list_size;
295
296
297 /**
298  * Handler to NSE.
299  */
300 static struct GNUNET_NSE_Handle *nse;
301
302 /**
303  * Handler to CADET.
304  */
305 static struct GNUNET_CADET_Handle *cadet_handle;
306
307
308 /**
309  * Request counter.
310  *
311  * Only needed in the beginning to check how many of the 64 deltas
312  * we already have
313  */
314 static unsigned int req_counter;
315
316 /**
317  * Time of the last request we received.
318  *
319  * Used to compute the expected request rate.
320  */
321 static struct GNUNET_TIME_Absolute last_request;
322
323 /**
324  * Size of #request_deltas.
325  */
326 #define REQUEST_DELTAS_SIZE 64
327 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
328
329 /**
330  * Last 64 deltas between requests
331  */
332 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
333
334 /**
335  * The prediction of the rate of requests
336  */
337 static struct GNUNET_TIME_Relative  request_rate;
338
339
340 /**
341  * List with the peers we sent requests to.
342  */
343 struct GNUNET_PeerIdentity *pending_pull_reply_list;
344
345 /**
346  * Size of #pending_pull_reply_list.
347  */
348 uint32_t pending_pull_reply_list_size;
349
350
351 /**
352  * Number of history update tasks.
353  */
354 uint32_t num_hist_update_tasks;
355
356
357 #ifdef ENABLE_MALICIOUS
358 /**
359  * Type of malicious peer
360  *
361  * 0 Don't act malicious at all - Default
362  * 1 Try to maximise representation
363  * 2 Try to partition the network
364  */
365 uint32_t mal_type = 0;
366
367 /**
368  * Other malicious peers
369  */
370 static struct GNUNET_PeerIdentity *mal_peers = NULL;
371
372 /**
373  * Hashmap of malicious peers used as set.
374  * Used to more efficiently check whether we know that peer.
375  */
376 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set = NULL;
377
378 /**
379  * Number of other malicious peers
380  */
381 static uint32_t num_mal_peers = 0;
382
383
384 /**
385  * If type is 2 This struct is used to store the attacked peers in a DLL
386  */
387 struct AttackedPeer
388 {
389   /**
390    * DLL
391    */
392   struct AttackedPeer *next;
393   struct AttackedPeer *prev;
394
395   /**
396    * PeerID
397    */
398   struct GNUNET_PeerIdentity peer_id;
399 };
400
401 /**
402  * If type is 2 this is the DLL of attacked peers
403  */
404 static struct AttackedPeer *att_peers_head = NULL;
405 static struct AttackedPeer *att_peers_tail = NULL;
406
407 /**
408  * This index is used to point to an attacked peer to
409  * implement the round-robin-ish way to select attacked peers.
410  */
411 static struct AttackedPeer *att_peer_index = NULL;
412
413 /**
414  * Hashmap of attacked peers used as set.
415  * Used to more efficiently check whether we know that peer.
416  */
417 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set = NULL;
418
419 /**
420  * Number of attacked peers
421  */
422 static uint32_t num_attacked_peers = 0;
423
424
425 /**
426  * If type is 1 this is the attacked peer
427  */
428 static struct GNUNET_PeerIdentity attacked_peer;
429
430 /**
431  * The limit of PUSHes we can send in one round.
432  * This is an assumption of the Brahms protocol and either implemented
433  * via proof of work
434  * or
435  * assumend to be the bandwidth limitation.
436  */
437 static uint32_t push_limit = 10000;
438 #endif /* ENABLE_MALICIOUS */
439
440
441 /***********************************************************************
442  * /Globals
443 ***********************************************************************/
444
445
446
447
448
449
450 /***********************************************************************
451  * Util functions
452 ***********************************************************************/
453
454 /**
455  * Set a peer flag of given peer context.
456  */
457 #define set_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags |= mask)
458
459 /**
460  * Get peer flag of given peer context.
461  */
462 #define get_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags & mask ? GNUNET_YES : GNUNET_NO)
463
464 /**
465  * Unset flag of given peer context.
466  */
467 #define unset_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags &= (~mask))
468
469 /**
470  * Compute the minimum of two ints
471  */
472 #define min(x, y) ((x < y) ? x : y)
473
474 /**
475  * Clean the send channel of a peer
476  */
477 void
478 peer_clean (const struct GNUNET_PeerIdentity *peer);
479
480
481 /**
482  * Check if peer is already in peer array.
483  */
484   int
485 in_arr (const struct GNUNET_PeerIdentity *array,
486         unsigned int arr_size,
487         const struct GNUNET_PeerIdentity *peer)
488 {
489   GNUNET_assert (NULL != peer);
490
491   if (0 == arr_size)
492     return GNUNET_NO;
493
494   GNUNET_assert (NULL != array);
495
496   unsigned int i;
497
498   for (i = 0; i < arr_size ; i++)
499     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&array[i], peer))
500       return GNUNET_YES;
501   return GNUNET_NO;
502 }
503
504
505 /**
506  * Print peerlist to log.
507  */
508 void
509 print_peer_list (struct GNUNET_PeerIdentity *list, unsigned int len)
510 {
511   unsigned int i;
512
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514        "Printing peer list of length %u at %p:\n",
515        len,
516        list);
517   for (i = 0 ; i < len ; i++)
518   {
519     LOG (GNUNET_ERROR_TYPE_DEBUG,
520          "%u. peer: %s\n",
521          i, GNUNET_i2s (&list[i]));
522   }
523 }
524
525
526 /**
527  * Remove peer from list.
528  */
529   void
530 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
531                unsigned int *list_size,
532                const struct GNUNET_PeerIdentity *peer)
533 {
534   unsigned int i;
535   struct GNUNET_PeerIdentity *tmp;
536
537   tmp = *peer_list;
538
539   LOG (GNUNET_ERROR_TYPE_DEBUG,
540        "Removing peer %s from list at %p\n",
541        GNUNET_i2s (peer),
542        tmp);
543
544   for ( i = 0 ; i < *list_size ; i++ )
545   {
546     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
547     {
548       if (i < *list_size -1)
549       { /* Not at the last entry -- shift peers left */
550         memcpy (&tmp[i], &tmp[i +1],
551                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
552       }
553       /* Remove last entry (should be now useless PeerID) */
554       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
555     }
556   }
557   *peer_list = tmp;
558 }
559
560 /**
561  * Get random peer from the given list but don't return one from the @a ignore_list.
562  */
563   struct GNUNET_PeerIdentity *
564 get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list,
565                            uint32_t list_size,
566                            const struct GNUNET_PeerIdentity *ignore_list,
567                            uint32_t ignore_size)
568 {
569   uint32_t r_index;
570   uint32_t tmp_size;
571   struct GNUNET_PeerIdentity *tmp_peer_list;
572   struct GNUNET_PeerIdentity *peer;
573
574   GNUNET_assert (NULL != peer_list);
575   if (0 == list_size)
576     return NULL;
577
578   tmp_size = 0;
579   tmp_peer_list = NULL;
580   GNUNET_array_grow (tmp_peer_list, tmp_size, list_size);
581   memcpy (tmp_peer_list,
582           peer_list,
583           list_size * sizeof (struct GNUNET_PeerIdentity));
584   peer = GNUNET_new (struct GNUNET_PeerIdentity);
585
586   /**;
587    * Choose the r_index of the peer we want to return
588    * at random from the interval of the gossip list
589    */
590   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
591                                       tmp_size);
592   *peer = tmp_peer_list[r_index];
593
594   while (in_arr (ignore_list, ignore_size, peer))
595   {
596     rem_from_list (&tmp_peer_list, &tmp_size, peer);
597
598     print_peer_list (tmp_peer_list, tmp_size);
599
600     if (0 == tmp_size)
601     {
602       GNUNET_free (peer);
603       return NULL;
604     }
605
606     /**;
607      * Choose the r_index of the peer we want to return
608      * at random from the interval of the gossip list
609      */
610     r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
611                                         tmp_size);
612     *peer = tmp_peer_list[r_index];
613   }
614
615
616   GNUNET_array_grow (tmp_peer_list, tmp_size, 0);
617
618   return peer;
619 }
620
621
622 /**
623  * Get the context of a peer. If not existing, create.
624  */
625   struct PeerContext *
626 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
627               const struct GNUNET_PeerIdentity *peer)
628 {
629   struct PeerContext *ctx;
630
631   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
632   {
633     ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
634   }
635   else
636   {
637     ctx = GNUNET_new (struct PeerContext);
638     ctx->peer_flags = 0;
639     ctx->mq = NULL;
640     ctx->send_channel = NULL;
641     ctx->recv_channel = NULL;
642     ctx->outstanding_ops = NULL;
643     ctx->num_outstanding_ops = 0;
644     ctx->is_live_task = NULL;
645     ctx->peer_id = *peer;
646     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
647                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
648   }
649   return ctx;
650 }
651
652
653 /**
654  * Put random peer from sampler into the gossip list as history update.
655  */
656   void
657 hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
658 {
659   GNUNET_assert (1 == num_peers);
660
661   if (gossip_list_size < sampler_size_est_need)
662     GNUNET_array_append (gossip_list, gossip_list_size, *ids);
663
664   if (0 < num_hist_update_tasks)
665     num_hist_update_tasks--;
666 }
667
668
669 /**
670  * Set the peer flag to living and call the outstanding operations on this peer.
671  */
672 static size_t
673 peer_is_live (struct PeerContext *peer_ctx)
674 {
675   struct GNUNET_PeerIdentity *peer;
676
677   /* Cancle is_live_task if still scheduled */
678   if (NULL != peer_ctx->is_live_task)
679   {
680     GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
681     peer_ctx->is_live_task = NULL;
682   }
683
684   peer = &peer_ctx->peer_id;
685   set_peer_flag (peer_ctx, VALID);
686
687   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
688
689   if (0 < peer_ctx->num_outstanding_ops)
690   { /* Call outstanding operations */
691     unsigned int i;
692
693     for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
694       peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer);
695     GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
696   }
697
698   return 0;
699 }
700
701
702 /**
703  * Callback that is called when a channel was effectively established.
704  * This is given to ntfy_tmt_rdy and called when the channel was
705  * successfully established.
706  */
707 static size_t
708 cadet_ntfy_tmt_rdy_cb (void *cls, size_t size, void *buf)
709 {
710   struct PeerContext *peer_ctx = (struct PeerContext *) cls;
711
712   peer_ctx->is_live_task = NULL;
713   LOG (GNUNET_ERROR_TYPE_DEBUG,
714        "Set ->is_live_task = NULL for peer %s\n",
715        GNUNET_i2s (&peer_ctx->peer_id));
716
717   if (NULL != buf
718       && 0 != size)
719   {
720     peer_is_live (peer_ctx);
721   }
722   else
723   {
724     LOG (GNUNET_ERROR_TYPE_WARNING,
725          "Problems establishing a connection to peer %s in order to check liveliness\n",
726          GNUNET_i2s (&peer_ctx->peer_id));
727     // TODO reschedule? cleanup?
728   }
729
730   //if (NULL != peer_ctx->is_live_task)
731   //{
732   //  LOG (GNUNET_ERROR_TYPE_DEBUG,
733   //       "Trying to cancle is_live_task for peer %s\n",
734   //       GNUNET_i2s (&peer_ctx->peer_id));
735   //  GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
736   //  peer_ctx->is_live_task = NULL;
737   //}
738
739   return 0;
740 }
741
742
743 /**
744  * Get the channel of a peer. If not existing, create.
745  */
746   struct GNUNET_CADET_Channel *
747 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
748              const struct GNUNET_PeerIdentity *peer)
749 {
750   struct PeerContext *peer_ctx;
751
752   peer_ctx = get_peer_ctx (peer_map, peer);
753
754   GNUNET_assert (NULL == peer_ctx->is_live_task);
755
756   if (NULL == peer_ctx->send_channel)
757   {
758     LOG (GNUNET_ERROR_TYPE_DEBUG,
759          "Trying to establish channel to peer %s\n",
760          GNUNET_i2s (peer));
761
762     peer_ctx->send_channel =
763       GNUNET_CADET_channel_create (cadet_handle,
764                                    NULL,
765                                    peer,
766                                    GNUNET_RPS_CADET_PORT,
767                                    GNUNET_CADET_OPTION_RELIABLE);
768
769     // do I have to explicitly put it in the peer_map?
770     (void) GNUNET_CONTAINER_multipeermap_put
771       (peer_map,
772        peer,
773        peer_ctx,
774        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
775   }
776   return peer_ctx->send_channel;
777 }
778
779
780 /**
781  * Get the message queue of a specific peer.
782  *
783  * If we already have a message queue open to this client,
784  * simply return it, otherways create one.
785  */
786   struct GNUNET_MQ_Handle *
787 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
788         const struct GNUNET_PeerIdentity *peer_id)
789 {
790   struct PeerContext *peer_ctx;
791
792   peer_ctx = get_peer_ctx (peer_map, peer_id);
793
794   GNUNET_assert (NULL == peer_ctx->is_live_task);
795
796   if (NULL == peer_ctx->mq)
797   {
798     (void) get_channel (peer_map, peer_id);
799     peer_ctx->mq = GNUNET_CADET_mq_create (peer_ctx->send_channel);
800     //do I have to explicitly put it in the peer_map?
801     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer_id, peer_ctx,
802                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
803   }
804   return peer_ctx->mq;
805 }
806
807
808 /**
809  * Issue check whether peer is live
810  *
811  * @param peer_ctx the context of the peer
812  */
813 void
814 check_peer_live (struct PeerContext *peer_ctx)
815 {
816   (void) get_channel (peer_map, &peer_ctx->peer_id);
817   LOG (GNUNET_ERROR_TYPE_DEBUG,
818        "Get informed about peer %s getting live\n",
819        GNUNET_i2s (&peer_ctx->peer_id));
820   if (NULL == peer_ctx->is_live_task)
821   {
822     peer_ctx->is_live_task =
823         GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel,
824                                             GNUNET_NO,
825                                             GNUNET_TIME_UNIT_FOREVER_REL,
826                                             sizeof (struct GNUNET_MessageHeader),
827                                             cadet_ntfy_tmt_rdy_cb,
828                                             peer_ctx);
829   }
830   else
831   {
832     LOG (GNUNET_ERROR_TYPE_DEBUG,
833          "Already waiting for notification\n");
834   }
835 }
836
837
838 /**
839  * Sum all time relatives of an array.
840   */
841   struct GNUNET_TIME_Relative
842 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
843 {
844   struct GNUNET_TIME_Relative sum;
845   uint32_t i;
846
847   sum = GNUNET_TIME_UNIT_ZERO;
848   for ( i = 0 ; i < arr_size ; i++ )
849   {
850     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
851   }
852   return sum;
853 }
854
855
856 /**
857  * Compute the average of given time relatives.
858  */
859   struct GNUNET_TIME_Relative
860 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
861 {
862   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
863 }
864
865
866 /**
867  * Insert PeerID in #pull_list
868  *
869  * Called once we know a peer is live.
870  */
871   void
872 insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
873 {
874   if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
875     GNUNET_array_append (pull_list, pull_list_size, *peer);
876
877   peer_clean (peer);
878 }
879
880 /**
881  * Check whether #insert_in_pull_list was already scheduled
882  */
883   int
884 insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
885 {
886   unsigned int i;
887
888   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
889     if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
890       return GNUNET_YES;
891   return GNUNET_NO;
892 }
893
894
895 /**
896  * Insert PeerID in #gossip_list
897  *
898  * Called once we know a peer is live.
899  */
900   void
901 insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
902 {
903   if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
904     GNUNET_array_append (gossip_list, gossip_list_size, *peer);
905
906   (void) get_channel (peer_map, peer);
907 }
908
909 /**
910  * Check whether #insert_in_gossip_list was already scheduled
911  */
912   int
913 insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
914 {
915   unsigned int i;
916
917   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
918     if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
919       return GNUNET_YES;
920   return GNUNET_NO;
921 }
922
923
924 /**
925  * Update sampler with given PeerID.
926  */
927   void
928 insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
929 {
930   LOG (GNUNET_ERROR_TYPE_DEBUG,
931        "Updating samplers with peer %s from insert_in_sampler()\n",
932        GNUNET_i2s (peer));
933   RPS_sampler_update (prot_sampler,   peer);
934   RPS_sampler_update (client_sampler, peer);
935 }
936
937
938 /**
939  * Check whether #insert_in_sampler was already scheduled
940  */
941 static int
942 insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
943 {
944   unsigned int i;
945
946   for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
947     if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
948       return GNUNET_YES;
949   return GNUNET_NO;
950 }
951
952
953 /**
954  * Wrapper around #RPS_sampler_resize()
955  *
956  * If we do not have enough sampler elements, double current sampler size
957  * If we have more than enough sampler elements, halv current sampler size
958  */
959 static void
960 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
961 {
962   unsigned int sampler_size;
963
964   // TODO statistics
965   // TODO respect the min, max
966   sampler_size = RPS_sampler_get_size (sampler);
967   if (sampler_size > new_size * 4)
968   { /* Shrinking */
969     RPS_sampler_resize (sampler, sampler_size / 2);
970   }
971   else if (sampler_size < new_size)
972   { /* Growing */
973     RPS_sampler_resize (sampler, sampler_size * 2);
974   }
975   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
976 }
977
978
979 /**
980  * Wrapper around #RPS_sampler_resize() resizing the client sampler
981  */
982 static void
983 client_resize_wrapper ()
984 {
985   uint32_t bigger_size;
986   unsigned int sampler_size;
987
988   // TODO statistics
989
990   sampler_size = RPS_sampler_get_size (client_sampler);
991
992   if (sampler_size_est_need > sampler_size_client_need)
993     bigger_size = sampler_size_est_need;
994   else
995     bigger_size = sampler_size_client_need;
996
997   // TODO respect the min, max
998   resize_wrapper (client_sampler, bigger_size);
999   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
1000 }
1001
1002
1003 /**
1004  * Estimate request rate
1005  *
1006  * Called every time we receive a request from the client.
1007  */
1008   void
1009 est_request_rate()
1010 {
1011   struct GNUNET_TIME_Relative max_round_duration;
1012
1013   if (request_deltas_size > req_counter)
1014     req_counter++;
1015   if ( 1 < req_counter)
1016   {
1017     /* Shift last request deltas to the right */
1018     memcpy (&request_deltas[1],
1019         request_deltas,
1020         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
1021
1022     /* Add current delta to beginning */
1023     request_deltas[0] =
1024         GNUNET_TIME_absolute_get_difference (last_request,
1025                                              GNUNET_TIME_absolute_get ());
1026     request_rate = T_relative_avg (request_deltas, req_counter);
1027
1028     /* Compute the duration a round will maximally take */
1029     max_round_duration =
1030         GNUNET_TIME_relative_add (round_interval,
1031                                   GNUNET_TIME_relative_divide (round_interval, 2));
1032
1033     /* Set the estimated size the sampler has to have to
1034      * satisfy the current client request rate */
1035     sampler_size_client_need =
1036         max_round_duration.rel_value_us / request_rate.rel_value_us;
1037
1038     /* Resize the sampler */
1039     client_resize_wrapper ();
1040   }
1041   last_request = GNUNET_TIME_absolute_get ();
1042 }
1043
1044
1045 /**
1046  * Add all peers in @a peer_array to @peer_map used as set.
1047  *
1048  * @param peer_array array containing the peers
1049  * @param num_peers number of peers in @peer_array
1050  * @param peer_map the peermap to use as set
1051  */
1052 static void
1053 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
1054                        unsigned int num_peers,
1055                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
1056 {
1057   unsigned int i;
1058   if (NULL == peer_map)
1059     peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers + 1,
1060                                                      GNUNET_NO);
1061   for (i = 0 ; i < num_peers ; i++)
1062   {
1063     GNUNET_CONTAINER_multipeermap_put (peer_map,
1064                                        &peer_array[i],
1065                                        NULL,
1066                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1067   }
1068 }
1069
1070
1071 /**
1072  * Send a PULL REPLY to @a peer_id
1073  *
1074  * @param peer_id the peer to send the reply to.
1075  * @param peer_ids the peers to send to @a peer_id
1076  * @param num_peer_ids the number of peers to send to @a peer_id
1077  */
1078 static void
1079 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
1080                  const struct GNUNET_PeerIdentity *peer_ids,
1081                  unsigned int num_peer_ids)
1082 {
1083   uint32_t send_size;
1084   struct GNUNET_MQ_Handle *mq;
1085   struct GNUNET_MQ_Envelope *ev;
1086   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1087
1088   /* Compute actual size */
1089   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
1090               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
1091
1092   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
1093     /* Compute number of peers to send
1094      * If too long, simply truncate */
1095     // TODO select random ones via permutation
1096     //      or even better: do good protocol design
1097     send_size =
1098       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
1099        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1100        sizeof (struct GNUNET_PeerIdentity);
1101   else
1102     send_size = num_peer_ids;
1103
1104   LOG (GNUNET_ERROR_TYPE_DEBUG,
1105       "PULL REQUEST from peer %s received, going to send %u peers\n",
1106       GNUNET_i2s (peer_id), send_size);
1107
1108   mq = get_mq (peer_map, peer_id);
1109
1110   ev = GNUNET_MQ_msg_extra (out_msg,
1111                             send_size * sizeof (struct GNUNET_PeerIdentity),
1112                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1113   out_msg->num_peers = htonl (send_size);
1114   memcpy (&out_msg[1], peer_ids,
1115          send_size * sizeof (struct GNUNET_PeerIdentity));
1116
1117   GNUNET_MQ_send (mq, ev);
1118 }
1119
1120
1121 /***********************************************************************
1122  * /Util functions
1123 ***********************************************************************/
1124
1125
1126
1127
1128
1129 /**
1130  * Function called by NSE.
1131  *
1132  * Updates sizes of sampler list and gossip list and adapt those lists
1133  * accordingly.
1134  */
1135   void
1136 nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
1137               double logestimate, double std_dev)
1138 {
1139   double estimate;
1140   //double scale; // TODO this might go gloabal/config
1141
1142   LOG (GNUNET_ERROR_TYPE_DEBUG,
1143        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
1144        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
1145   //scale = .01;
1146   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
1147   // GNUNET_NSE_log_estimate_to_n (logestimate);
1148   estimate = pow (estimate, 1.0 / 3);
1149   // TODO add if std_dev is a number
1150   // estimate += (std_dev * scale);
1151   if (2 < ceil (estimate))
1152   {
1153     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
1154     sampler_size_est_need = estimate;
1155   } else
1156     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
1157
1158   /* If the NSE has changed adapt the lists accordingly */
1159   resize_wrapper (prot_sampler, sampler_size_est_need);
1160   client_resize_wrapper ();
1161 }
1162
1163
1164 /**
1165  * Callback called once the requested PeerIDs are ready.
1166  *
1167  * Sends those to the requesting client.
1168  */
1169 void client_respond (void *cls,
1170     struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
1171 {
1172   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler returned %" PRIu32 " peers\n", num_peers);
1173   struct GNUNET_MQ_Envelope *ev;
1174   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
1175   struct GNUNET_SERVER_Client *client;
1176   uint32_t size_needed;
1177   struct client_ctx *cli_ctx;
1178
1179   client = (struct GNUNET_SERVER_Client *) cls;
1180
1181   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1182                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1183
1184   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1185
1186   ev = GNUNET_MQ_msg_extra (out_msg,
1187                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1188                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1189   out_msg->num_peers = htonl (num_peers);
1190
1191   memcpy (&out_msg[1],
1192       ids,
1193       num_peers * sizeof (struct GNUNET_PeerIdentity));
1194   GNUNET_free (ids);
1195
1196   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
1197   if (NULL == cli_ctx) {
1198     cli_ctx = GNUNET_new (struct client_ctx);
1199     cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
1200     GNUNET_SERVER_client_set_user_context (client, cli_ctx);
1201   }
1202
1203   GNUNET_MQ_send (cli_ctx->mq, ev);
1204 }
1205
1206
1207 /**
1208  * Handle RPS request from the client.
1209  *
1210  * @param cls closure
1211  * @param client identification of the client
1212  * @param message the actual message
1213  */
1214 static void
1215 handle_client_request (void *cls,
1216             struct GNUNET_SERVER_Client *client,
1217             const struct GNUNET_MessageHeader *message)
1218 {
1219   struct GNUNET_RPS_CS_RequestMessage *msg;
1220   uint32_t num_peers;
1221   uint32_t size_needed;
1222   uint32_t i;
1223
1224   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
1225
1226   num_peers = ntohl (msg->num_peers);
1227   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1228                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1229
1230   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1231   {
1232     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1233     return;
1234   }
1235
1236   for (i = 0 ; i < num_peers ; i++)
1237     est_request_rate();
1238
1239   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIu32 " random peer(s).\n", num_peers);
1240
1241   RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
1242                                 client, num_peers, GNUNET_YES);
1243
1244   GNUNET_SERVER_receive_done (client,
1245                               GNUNET_OK);
1246 }
1247
1248
1249 /**
1250  * Handle seed from the client.
1251  *
1252  * @param cls closure
1253  * @param client identification of the client
1254  * @param message the actual message
1255  */
1256   static void
1257 handle_client_seed (void *cls,
1258                     struct GNUNET_SERVER_Client *client,
1259                     const struct GNUNET_MessageHeader *message)
1260 {
1261   struct GNUNET_RPS_CS_SeedMessage *in_msg;
1262   struct GNUNET_PeerIdentity *peers;
1263   uint32_t num_peers;
1264   uint32_t i;
1265
1266   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) > ntohs (message->size))
1267   {
1268     GNUNET_break_op (0);
1269     GNUNET_SERVER_receive_done (client,
1270                                 GNUNET_SYSERR);
1271   }
1272
1273   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
1274   num_peers = ntohl (in_msg->num_peers);
1275   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1276   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1277   //memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1278
1279   if ((ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1280       sizeof (struct GNUNET_PeerIdentity) != num_peers)
1281   {
1282     GNUNET_break_op (0);
1283     GNUNET_SERVER_receive_done (client,
1284                                 GNUNET_SYSERR);
1285   }
1286
1287   LOG (GNUNET_ERROR_TYPE_DEBUG,
1288        "Client seeded peers:\n");
1289   print_peer_list (peers, num_peers);
1290
1291   // TODO check for validity of ids
1292
1293   for (i = 0 ; i < num_peers ; i++)
1294   {
1295     LOG (GNUNET_ERROR_TYPE_DEBUG,
1296          "Updating samplers with seed %" PRIu32 ": %s\n",
1297          i,
1298          GNUNET_i2s (&peers[i]));
1299
1300     RPS_sampler_update (prot_sampler,   &peers[i]);
1301     RPS_sampler_update (client_sampler, &peers[i]);
1302   }
1303
1304   //GNUNET_free (peers);
1305
1306   GNUNET_SERVER_receive_done (client,
1307                                                 GNUNET_OK);
1308 }
1309
1310
1311 /**
1312  * Handle a PUSH message from another peer.
1313  *
1314  * Check the proof of work and store the PeerID
1315  * in the temporary list for pushed PeerIDs.
1316  *
1317  * @param cls Closure
1318  * @param channel The channel the PUSH was received over
1319  * @param channel_ctx The context associated with this channel
1320  * @param msg The message header
1321  */
1322 static int
1323 handle_peer_push (void *cls,
1324     struct GNUNET_CADET_Channel *channel,
1325     void **channel_ctx,
1326     const struct GNUNET_MessageHeader *msg)
1327 {
1328   const struct GNUNET_PeerIdentity *peer;
1329
1330   // (check the proof of work)
1331
1332   peer = (const struct GNUNET_PeerIdentity *)
1333     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1334   // FIXME wait for cadet to change this function
1335
1336   LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received (%s)\n", GNUNET_i2s (peer));
1337
1338   #ifdef ENABLE_MALICIOUS
1339   struct AttackedPeer *tmp_att_peer;
1340
1341   tmp_att_peer = GNUNET_new (struct AttackedPeer);
1342   memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1343   if (1 == mal_type)
1344   { /* Try to maximise representation */
1345     if (NULL == att_peer_set)
1346       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1347     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1348                                                              peer))
1349     {
1350       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1351                                    att_peers_tail,
1352                                    tmp_att_peer);
1353       add_peer_array_to_set (peer, 1, att_peer_set);
1354     }
1355     return GNUNET_OK;
1356   }
1357
1358
1359   else if (2 == mal_type)
1360   { /* We attack one single well-known peer - simply ignore */
1361     return GNUNET_OK;
1362   }
1363
1364   #endif /* ENABLE_MALICIOUS */
1365
1366   /* Add the sending peer to the push_list */
1367   if (GNUNET_NO == in_arr (push_list, push_list_size, peer))
1368     GNUNET_array_append (push_list, push_list_size, *peer);
1369
1370   return GNUNET_OK;
1371 }
1372
1373
1374 /**
1375  * Handle PULL REQUEST request message from another peer.
1376  *
1377  * Reply with the gossip list of PeerIDs.
1378  *
1379  * @param cls Closure
1380  * @param channel The channel the PUSH was received over
1381  * @param channel_ctx The context associated with this channel
1382  * @param msg The message header
1383  */
1384 static int
1385 handle_peer_pull_request (void *cls,
1386     struct GNUNET_CADET_Channel *channel,
1387     void **channel_ctx,
1388     const struct GNUNET_MessageHeader *msg)
1389 {
1390   struct GNUNET_PeerIdentity *peer;
1391
1392   peer = (struct GNUNET_PeerIdentity *)
1393     GNUNET_CADET_channel_get_info (channel,
1394                                    GNUNET_CADET_OPTION_PEER);
1395   // FIXME wait for cadet to change this function
1396
1397   #ifdef ENABLE_MALICIOUS
1398   if (1 == mal_type)
1399   { /* Try to maximise representation */
1400     send_pull_reply (peer, mal_peers, num_mal_peers);
1401     return GNUNET_OK;
1402   }
1403
1404   else if (2 == mal_type)
1405   { /* Try to partition network */
1406     if (GNUNET_YES == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1407     {
1408       send_pull_reply (peer, mal_peers, num_mal_peers);
1409     }
1410     return GNUNET_OK;
1411   }
1412   #endif /* ENABLE_MALICIOUS */
1413
1414   send_pull_reply (peer, gossip_list, gossip_list_size);
1415
1416   return GNUNET_OK;
1417 }
1418
1419
1420 /**
1421  * Handle PULL REPLY message from another peer.
1422  *
1423  * Check whether we sent a corresponding request and
1424  * whether this reply is the first one.
1425  *
1426  * @param cls Closure
1427  * @param channel The channel the PUSH was received over
1428  * @param channel_ctx The context associated with this channel
1429  * @param msg The message header
1430  */
1431   static int
1432 handle_peer_pull_reply (void *cls,
1433                         struct GNUNET_CADET_Channel *channel,
1434                         void **channel_ctx,
1435                         const struct GNUNET_MessageHeader *msg)
1436 {
1437   LOG (GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1438
1439   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1440   struct GNUNET_PeerIdentity *peers;
1441   struct PeerContext *peer_ctx;
1442   struct GNUNET_PeerIdentity *sender;
1443   struct PeerContext *sender_ctx;
1444   struct PeerOutstandingOp out_op;
1445   uint32_t i;
1446 #ifdef ENABLE_MALICIOUS
1447   struct AttackedPeer *tmp_att_peer;
1448 #endif /* ENABLE_MALICIOUS */
1449
1450   /* Check for protocol violation */
1451   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1452   {
1453     GNUNET_break_op (0);
1454     return GNUNET_SYSERR;
1455   }
1456
1457   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1458   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1459       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1460   {
1461     LOG (GNUNET_ERROR_TYPE_ERROR,
1462         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1463         ntohl (in_msg->num_peers),
1464         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1465             sizeof (struct GNUNET_PeerIdentity));
1466     GNUNET_break_op (0);
1467     return GNUNET_SYSERR;
1468   }
1469
1470   sender = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1471       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1472        // Guess simply casting isn't the nicest way...
1473        // FIXME wait for cadet to change this function
1474   sender_ctx = get_peer_ctx (peer_map, sender);
1475
1476   if (GNUNET_YES == get_peer_flag (sender_ctx, PULL_REPLY_PENDING))
1477   {
1478     GNUNET_break_op (0);
1479     return GNUNET_OK;
1480   }
1481
1482
1483   #ifdef ENABLE_MALICIOUS
1484   // We shouldn't even receive pull replies as we're not sending
1485   if (2 == mal_type)
1486     return GNUNET_OK;
1487   #endif /* ENABLE_MALICIOUS */
1488
1489   /* Do actual logic */
1490   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1491
1492   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1493   {
1494   #ifdef ENABLE_MALICIOUS
1495     if (1 == mal_type)
1496     {
1497       // TODO check if we sent a request and this was the first reply
1498       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1499                                                                &peers[i])
1500           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1501                                                                   &peers[i])
1502           && GNUNET_NO == GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1503                                                            &own_identity))
1504       {
1505         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1506         tmp_att_peer->peer_id = peers[i];
1507         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1508                                      att_peers_tail,
1509                                      tmp_att_peer);
1510         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1511       }
1512       continue;
1513     }
1514   #endif /* ENABLE_MALICIOUS */
1515     peer_ctx = get_peer_ctx (peer_map, &peers[i]);
1516     if (GNUNET_YES == get_peer_flag (peer_ctx, VALID)
1517         || NULL != peer_ctx->send_channel
1518         || NULL != peer_ctx->recv_channel)
1519     {
1520       if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])
1521           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peers[i]))
1522         GNUNET_array_append (pull_list, pull_list_size, peers[i]);
1523     }
1524     else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
1525     {
1526       out_op.op = insert_in_pull_list;
1527       out_op.op_cls = NULL;
1528       GNUNET_array_append (peer_ctx->outstanding_ops,
1529                            peer_ctx->num_outstanding_ops,
1530                            out_op);
1531       check_peer_live (peer_ctx);
1532     }
1533   }
1534
1535   unset_peer_flag (sender_ctx, PULL_REPLY_PENDING);
1536   rem_from_list (&pending_pull_reply_list, &pending_pull_reply_list_size, sender);
1537
1538   return GNUNET_OK;
1539 }
1540
1541
1542 /**
1543  * Compute a random delay.
1544  * A uniformly distributed value between mean + spread and mean - spread.
1545  *
1546  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1547  * It would return a random value between 2 and 6 min.
1548  *
1549  * @param mean the mean
1550  * @param spread the inverse amount of deviation from the mean
1551  */
1552 static struct GNUNET_TIME_Relative
1553 compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread)
1554 {
1555   struct GNUNET_TIME_Relative half_interval;
1556   struct GNUNET_TIME_Relative ret;
1557   unsigned int rand_delay;
1558   unsigned int max_rand_delay;
1559
1560   if (0 == spread)
1561   {
1562     LOG (GNUNET_ERROR_TYPE_WARNING,
1563          "Not accepting spread of 0\n");
1564     GNUNET_break (0);
1565   }
1566
1567   /* Compute random time value between spread * mean and spread * mean */
1568   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1569
1570   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1571   /**
1572    * Compute random value between (0 and 1) * round_interval
1573    * via multiplying round_interval with a 'fraction' (0 to value)/value
1574    */
1575   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1576   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1577   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1578   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1579
1580   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1581     LOG (GNUNET_ERROR_TYPE_WARNING,
1582          "Returning FOREVER_REL\n");
1583
1584   return ret;
1585 }
1586
1587
1588 /**
1589  * Send single pull request
1590  *
1591  * @param peer_id the peer to send the pull request to.
1592  */
1593 static void
1594 send_pull_request (struct GNUNET_PeerIdentity *peer_id)
1595 {
1596   struct GNUNET_MQ_Envelope *ev;
1597   struct GNUNET_MQ_Handle *mq;
1598
1599   LOG (GNUNET_ERROR_TYPE_DEBUG,
1600        "Sending PULL request to peer %s of gossiped list.\n",
1601        GNUNET_i2s (peer_id));
1602
1603   GNUNET_array_append (pending_pull_reply_list, pending_pull_reply_list_size, *peer_id);
1604
1605   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1606   mq = get_mq (peer_map, peer_id);
1607   GNUNET_MQ_send (mq, ev);
1608 }
1609
1610
1611 /**
1612  * Send single push
1613  *
1614  * @param peer_id the peer to send the push to.
1615  */
1616 static void
1617 send_push (struct GNUNET_PeerIdentity *peer_id)
1618 {
1619   struct GNUNET_MQ_Envelope *ev;
1620   struct GNUNET_MQ_Handle *mq;
1621
1622   LOG (GNUNET_ERROR_TYPE_DEBUG,
1623        "Sending PUSH to peer %s of gossiped list.\n",
1624        GNUNET_i2s (peer_id));
1625
1626   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1627   mq = get_mq (peer_map, peer_id);
1628   GNUNET_MQ_send (mq, ev);
1629 }
1630
1631
1632 static void
1633 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1634
1635 static void
1636 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1637
1638
1639 #ifdef ENABLE_MALICIOUS
1640 /**
1641  * Turn RPS service to act malicious.
1642  *
1643  * @param cls Closure
1644  * @param channel The channel the PUSH was received over
1645  * @param channel_ctx The context associated with this channel
1646  * @param msg The message header
1647  */
1648   static void
1649 handle_client_act_malicious (void *cls,
1650                              struct GNUNET_SERVER_Client *client,
1651                              const struct GNUNET_MessageHeader *msg)
1652 {
1653   struct GNUNET_RPS_CS_ActMaliciousMessage *in_msg;
1654   struct GNUNET_PeerIdentity *peers;
1655   uint32_t num_mal_peers_sent;
1656   uint32_t num_mal_peers_old;
1657
1658   /* Check for protocol violation */
1659   if (sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage) > ntohs (msg->size))
1660   {
1661     GNUNET_break_op (0);
1662   }
1663
1664   in_msg = (struct GNUNET_RPS_CS_ActMaliciousMessage *) msg;
1665   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1666       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1667   {
1668     LOG (GNUNET_ERROR_TYPE_ERROR,
1669         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1670         ntohl (in_msg->num_peers),
1671         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1672             sizeof (struct GNUNET_PeerIdentity));
1673     GNUNET_break_op (0);
1674   }
1675
1676
1677   /* Do actual logic */
1678   // FIXME ingore own id
1679   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1680   mal_type = ntohl (in_msg->type);
1681
1682   LOG (GNUNET_ERROR_TYPE_DEBUG,
1683        "Now acting malicious type %" PRIu32 "\n",
1684        mal_type);
1685
1686   if (1 == mal_type)
1687   { /* Try to maximise representation */
1688     /* Add other malicious peers to those we already know */
1689
1690     num_mal_peers_sent = ntohl (in_msg->num_peers);
1691     num_mal_peers_old = num_mal_peers;
1692     GNUNET_array_grow (mal_peers,
1693                        num_mal_peers,
1694                        num_mal_peers + num_mal_peers_sent);
1695     memcpy (&mal_peers[num_mal_peers_old],
1696             peers,
1697             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1698
1699     /* Add all mal peers to mal_peer_set */
1700     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1701                            num_mal_peers_sent,
1702                            mal_peer_set);
1703
1704     /* Substitute do_round () with do_mal_round () */
1705     GNUNET_SCHEDULER_cancel (do_round_task);
1706     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1707   }
1708
1709   else if (2 == mal_type)
1710   { /* Try to partition the network */
1711     /* Add other malicious peers to those we already know */
1712     num_mal_peers_sent = ntohl (in_msg->num_peers) - 1;
1713     num_mal_peers_old = num_mal_peers;
1714     GNUNET_array_grow (mal_peers,
1715                        num_mal_peers,
1716                        num_mal_peers + num_mal_peers_sent);
1717     memcpy (&mal_peers[num_mal_peers_old],
1718             peers,
1719             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1720
1721     /* Add all mal peers to mal_peer_set */
1722     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1723                            num_mal_peers_sent,
1724                            mal_peer_set);
1725
1726     /* Store the one attacked peer */
1727     memcpy (&attacked_peer,
1728             &in_msg->attacked_peer,
1729             sizeof (struct GNUNET_PeerIdentity));
1730
1731     LOG (GNUNET_ERROR_TYPE_DEBUG,
1732          "Attacked peer is %s\n",
1733          GNUNET_i2s (&attacked_peer));
1734
1735     /* Substitute do_round () with do_mal_round () */
1736     GNUNET_SCHEDULER_cancel (do_round_task);
1737     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1738   }
1739   else if (0 == mal_type)
1740   { /* Stop acting malicious */
1741     num_mal_peers = 0;
1742     if (NULL != mal_peers)
1743       GNUNET_free (mal_peers);
1744
1745     /* Substitute do_mal_round () with do_round () */
1746     GNUNET_SCHEDULER_cancel (do_round_task);
1747     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1748   }
1749   else
1750   {
1751     GNUNET_break (0);
1752   }
1753 }
1754
1755
1756 /**
1757  * Send out PUSHes and PULLs maliciously.
1758  *
1759  * This is executed regylary.
1760  */
1761 static void
1762 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1763 {
1764   uint32_t num_pushes;
1765   uint32_t i;
1766   struct GNUNET_TIME_Relative time_next_round;
1767   struct AttackedPeer *tmp_att_peer;
1768
1769   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously.\n");
1770
1771   /* Do malicious actions */
1772   if (1 == mal_type)
1773   { /* Try to maximise representation */
1774
1775     /* The maximum of pushes we're going to send this round */
1776     num_pushes = min (min (push_limit,
1777                            num_attacked_peers),
1778                        GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1779
1780     /* Send PUSHes to attacked peers */
1781     for (i = 0 ; i < num_pushes ; i++)
1782     {
1783       if (att_peers_tail == att_peer_index)
1784         att_peer_index = att_peers_head;
1785       else
1786         att_peer_index = att_peer_index->next;
1787
1788       send_push (&att_peer_index->peer_id);
1789     }
1790
1791     /* Send PULLs to some peers to learn about additional peers to attack */
1792     for (i = 0 ; i < num_pushes * alpha ; i++)
1793     {
1794       if (att_peers_tail == tmp_att_peer)
1795         tmp_att_peer = att_peers_head;
1796       else
1797         att_peer_index = tmp_att_peer->next;
1798
1799       send_pull_request (&tmp_att_peer->peer_id);
1800     }
1801   }
1802
1803
1804   else if (2 == mal_type)
1805   { /**
1806      * Try to partition the network
1807      * Send as many pushes to the attacked peer as possible
1808      * That is one push per round as it will ignore more.
1809      */
1810       send_push (&attacked_peer);
1811   }
1812
1813
1814   /* Schedule next round */
1815   time_next_round = compute_rand_delay (round_interval, 2);
1816
1817   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round, NULL);
1818   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_mal_round, NULL);
1819   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1820 }
1821 #endif /* ENABLE_MALICIOUS */
1822
1823
1824 /**
1825  * Send out PUSHes and PULLs, possibly update #gossip_list, samplers.
1826  *
1827  * This is executed regylary.
1828  */
1829 static void
1830 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1831 {
1832   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n");
1833
1834   uint32_t i;
1835   unsigned int *permut;
1836   unsigned int n_peers; /* Number of peers we send pushes/pulls to */
1837   struct GNUNET_PeerIdentity peer;
1838   struct GNUNET_PeerIdentity *tmp_peer;
1839
1840   LOG (GNUNET_ERROR_TYPE_DEBUG,
1841        "Printing gossip list:\n");
1842   for (i = 0 ; i < gossip_list_size ; i++)
1843     LOG (GNUNET_ERROR_TYPE_DEBUG,
1844          "\t%s\n", GNUNET_i2s (&gossip_list[i]));
1845   // TODO log lists, ...
1846
1847   /* Would it make sense to have one shuffeled gossip list and then
1848    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1849    * use the rest to update sampler?
1850    * in essence get random peers with consumption */
1851
1852   /* Send PUSHes */
1853   if (0 < gossip_list_size)
1854   {
1855     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1856                                            (unsigned int) gossip_list_size);
1857     n_peers = ceil (alpha * gossip_list_size);
1858     LOG (GNUNET_ERROR_TYPE_DEBUG,
1859          "Going to send pushes to %u ceil (%f * %u) peers.\n",
1860          n_peers, alpha, gossip_list_size);
1861     for (i = 0 ; i < n_peers ; i++)
1862     {
1863       peer = gossip_list[permut[i]];
1864       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1865       { // FIXME if this fails schedule/loop this for later
1866         send_push (&peer);
1867       }
1868     }
1869     GNUNET_free (permut);
1870   }
1871
1872
1873   /* Send PULL requests */
1874   //permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1875   n_peers = ceil (beta * gossip_list_size);
1876   LOG (GNUNET_ERROR_TYPE_DEBUG,
1877        "Going to send pulls to %u ceil (%f * %u) peers.\n",
1878        n_peers, beta, gossip_list_size);
1879   for (i = 0 ; i < n_peers ; i++)
1880   {
1881     tmp_peer = get_rand_peer_ignore_list (gossip_list, gossip_list_size,
1882         pending_pull_reply_list, pending_pull_reply_list_size);
1883     if (NULL != tmp_peer)
1884     {
1885       peer = *tmp_peer;
1886       GNUNET_free (tmp_peer);
1887
1888       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer))
1889       {
1890         send_pull_request (&peer);
1891       }
1892     }
1893   }
1894
1895
1896   /* Update gossip list */
1897
1898   if ( push_list_size <= alpha * gossip_list_size &&
1899        push_list_size != 0 &&
1900        pull_list_size != 0 )
1901   {
1902     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n");
1903
1904     uint32_t first_border;
1905     uint32_t second_border;
1906     uint32_t r_index;
1907     uint32_t peers_to_clean_size;
1908     struct GNUNET_PeerIdentity *peers_to_clean;
1909
1910     peers_to_clean = NULL;
1911     peers_to_clean_size = 0;
1912     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, gossip_list_size);
1913     memcpy (peers_to_clean,
1914             gossip_list,
1915             gossip_list_size * sizeof (struct GNUNET_PeerIdentity));
1916
1917     first_border  =                ceil (alpha * sampler_size_est_need);
1918     second_border = first_border + ceil (beta  * sampler_size_est_need);
1919
1920     GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
1921
1922     for (i = 0 ; i < first_border ; i++)
1923     { // TODO use RPS_sampler_get_n_rand_peers
1924       /* Update gossip list with peers received through PUSHes */
1925       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1926                                        push_list_size);
1927       gossip_list[i] = push_list[r_index];
1928       // TODO change the peer_flags accordingly
1929     }
1930
1931     for (i = first_border ; i < second_border ; i++)
1932     {
1933       /* Update gossip list with peers received through PULLs */
1934       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1935                                        pull_list_size);
1936       gossip_list[i] = pull_list[r_index];
1937       // TODO change the peer_flags accordingly
1938     }
1939
1940     for (i = second_border ; i < sampler_size_est_need ; i++)
1941     {
1942       /* Update gossip list with peers from history */
1943       RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
1944       num_hist_update_tasks++;
1945       // TODO change the peer_flags accordingly
1946     }
1947
1948     for (i = 0 ; i < gossip_list_size ; i++)
1949       rem_from_list (&peers_to_clean, &peers_to_clean_size, &gossip_list[i]);
1950
1951     for (i = 0 ; i < peers_to_clean_size ; i++)
1952       peer_clean (&peers_to_clean[i]);
1953
1954     GNUNET_free (peers_to_clean);
1955   }
1956   else
1957   {
1958     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list.\n");
1959   }
1960   // TODO independent of that also get some peers from CADET_get_peers()?
1961
1962
1963   /* Update samplers */
1964   for ( i = 0 ; i < push_list_size ; i++ )
1965   {
1966     LOG (GNUNET_ERROR_TYPE_DEBUG,
1967          "Updating with peer %s from push list\n",
1968          GNUNET_i2s (&push_list[i]));
1969     RPS_sampler_update (prot_sampler,   &push_list[i]);
1970     RPS_sampler_update (client_sampler, &push_list[i]);
1971     // TODO set in_flag?
1972   }
1973
1974   for ( i = 0 ; i < pull_list_size ; i++ )
1975   {
1976     LOG (GNUNET_ERROR_TYPE_DEBUG,
1977          "Updating with peer %s from pull list\n",
1978          GNUNET_i2s (&pull_list[i]));
1979     RPS_sampler_update (prot_sampler,   &pull_list[i]);
1980     RPS_sampler_update (client_sampler, &pull_list[i]);
1981     // TODO set in_flag?
1982   }
1983
1984
1985   /* Empty push/pull lists */
1986   GNUNET_array_grow (push_list, push_list_size, 0);
1987   GNUNET_array_grow (pull_list, pull_list_size, 0);
1988
1989   struct GNUNET_TIME_Relative time_next_round;
1990
1991   time_next_round = compute_rand_delay (round_interval, 2);
1992
1993   /* Schedule next round */
1994   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round, NULL);
1995   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_round, NULL);
1996   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1997 }
1998
1999
2000 static void
2001 rps_start (struct GNUNET_SERVER_Handle *server);
2002
2003
2004 /**
2005  * This is called from GNUNET_CADET_get_peers().
2006  *
2007  * It is called on every peer(ID) that cadet somehow has contact with.
2008  * We use those to initialise the sampler.
2009  */
2010 void
2011 init_peer_cb (void *cls,
2012               const struct GNUNET_PeerIdentity *peer,
2013               int tunnel, // "Do we have a tunnel towards this peer?"
2014               unsigned int n_paths, // "Number of known paths towards this peer"
2015               unsigned int best_path) // "How long is the best path?
2016                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2017 {
2018   struct PeerOutstandingOp out_op;
2019   struct PeerContext *peer_ctx;
2020
2021   if (NULL != peer
2022       && 0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer))
2023   {
2024     LOG (GNUNET_ERROR_TYPE_DEBUG,
2025         "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
2026         GNUNET_i2s (peer), peer, gossip_list_size);
2027
2028     // maybe create a function for that
2029     peer_ctx = get_peer_ctx (peer_map, peer);
2030     if (GNUNET_YES != get_peer_flag (peer_ctx, VALID))
2031     {
2032       if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
2033       {
2034         out_op.op = insert_in_sampler;
2035         out_op.op_cls = NULL;
2036         GNUNET_array_append (peer_ctx->outstanding_ops,
2037                              peer_ctx->num_outstanding_ops,
2038                              out_op);
2039       }
2040
2041       if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
2042       {
2043         out_op.op = insert_in_gossip_list;
2044         out_op.op_cls = NULL;
2045         GNUNET_array_append (peer_ctx->outstanding_ops,
2046                              peer_ctx->num_outstanding_ops,
2047                              out_op);
2048       }
2049
2050       /* Trigger livelyness test on peer */
2051       check_peer_live (peer_ctx);
2052     }
2053
2054     // send push/pull to each of those peers?
2055   }
2056 }
2057
2058
2059 /**
2060  * Clean the send channel of a peer
2061  */
2062 void
2063 peer_clean (const struct GNUNET_PeerIdentity *peer)
2064 {
2065   struct PeerContext *peer_ctx;
2066   struct GNUNET_CADET_Channel *channel;
2067
2068   if (GNUNET_YES != in_arr (gossip_list, gossip_list_size, peer)
2069       && GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
2070   {
2071     peer_ctx = get_peer_ctx (peer_map, peer);
2072     if (NULL != peer_ctx->send_channel)
2073     {
2074       channel = peer_ctx->send_channel;
2075       peer_ctx->send_channel = NULL;
2076       GNUNET_CADET_channel_destroy (channel);
2077     }
2078   }
2079 }
2080
2081
2082 /**
2083  * Callback used to remove peers from the multipeermap.
2084  */
2085   int
2086 peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
2087 {
2088   struct PeerContext *peer_ctx;
2089   const struct GNUNET_CADET_Channel *channel =
2090     (const struct GNUNET_CADET_Channel *) cls;
2091   struct GNUNET_CADET_Channel *recv;
2092   struct GNUNET_CADET_Channel *send;
2093
2094   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, value))
2095   {
2096     peer_ctx = (struct PeerContext *) value;
2097
2098     if (0 != peer_ctx->num_outstanding_ops)
2099       GNUNET_array_grow (peer_ctx->outstanding_ops,
2100                          peer_ctx->num_outstanding_ops,
2101                          0);
2102
2103     if (NULL != peer_ctx->mq)
2104     {
2105       GNUNET_MQ_destroy (peer_ctx->mq);
2106       peer_ctx->mq = NULL;
2107     }
2108
2109
2110     if (NULL != peer_ctx->is_live_task)
2111     {
2112     LOG (GNUNET_ERROR_TYPE_DEBUG,
2113          "Trying to cancle is_live_task for peer %s\n",
2114          GNUNET_i2s (key));
2115       GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
2116       peer_ctx->is_live_task = NULL;
2117     }
2118
2119     send = peer_ctx->send_channel;
2120     peer_ctx->send_channel = NULL;
2121     if (NULL != send
2122         && channel != send)
2123     {
2124       GNUNET_CADET_channel_destroy (send);
2125     }
2126
2127     recv = peer_ctx->send_channel;
2128     peer_ctx->recv_channel = NULL;
2129     if (NULL != recv
2130         && channel != recv)
2131     {
2132       GNUNET_CADET_channel_destroy (recv);
2133     }
2134
2135     if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, key))
2136       LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
2137     else
2138       GNUNET_free (peer_ctx);
2139   }
2140
2141   return GNUNET_YES;
2142 }
2143
2144
2145 /**
2146  * Task run during shutdown.
2147  *
2148  * @param cls unused
2149  * @param tc unused
2150  */
2151 static void
2152 shutdown_task (void *cls,
2153                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2154 {
2155
2156   LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
2157
2158   if (NULL != do_round_task)
2159   {
2160     GNUNET_SCHEDULER_cancel (do_round_task);
2161     do_round_task = NULL;
2162   }
2163
2164
2165   {
2166   if (GNUNET_SYSERR ==
2167         GNUNET_CONTAINER_multipeermap_iterate (peer_map, peer_remove_cb, NULL))
2168     LOG (GNUNET_ERROR_TYPE_WARNING,
2169         "Iterating over peers to disconnect from them was cancelled\n");
2170   }
2171
2172   GNUNET_NSE_disconnect (nse);
2173   GNUNET_CADET_disconnect (cadet_handle);
2174   RPS_sampler_destroy (prot_sampler);
2175   RPS_sampler_destroy (client_sampler);
2176   LOG (GNUNET_ERROR_TYPE_DEBUG,
2177        "Size of the peermap: %u\n",
2178        GNUNET_CONTAINER_multipeermap_size (peer_map));
2179   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
2180   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
2181   GNUNET_array_grow (gossip_list, gossip_list_size, 0);
2182   GNUNET_array_grow (push_list, push_list_size, 0);
2183   GNUNET_array_grow (pull_list, pull_list_size, 0);
2184   #ifdef ENABLE_MALICIOUS
2185   struct AttackedPeer *tmp_att_peer;
2186   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2187   if (NULL != mal_peer_set)
2188     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2189   if (NULL != att_peer_set)
2190     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2191   while (NULL != att_peers_head)
2192   {
2193     tmp_att_peer = att_peers_head;
2194     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2195   }
2196   #endif /* ENABLE_MALICIOUS */
2197 }
2198
2199
2200 /**
2201  * A client disconnected.  Remove all of its data structure entries.
2202  *
2203  * @param cls closure, NULL
2204  * @param client identification of the client
2205  */
2206 static void
2207 handle_client_disconnect (void *cls,
2208                           struct GNUNET_SERVER_Client * client)
2209 {
2210 }
2211
2212
2213 /**
2214  * Handle the channel a peer opens to us.
2215  *
2216  * @param cls The closure
2217  * @param channel The channel the peer wants to establish
2218  * @param initiator The peer's peer ID
2219  * @param port The port the channel is being established over
2220  * @param options Further options
2221  */
2222   static void *
2223 handle_inbound_channel (void *cls,
2224                         struct GNUNET_CADET_Channel *channel,
2225                         const struct GNUNET_PeerIdentity *initiator,
2226                         uint32_t port,
2227                         enum GNUNET_CADET_ChannelOption options)
2228 {
2229   struct PeerContext *peer_ctx;
2230   struct GNUNET_PeerIdentity peer;
2231
2232   peer = *initiator;
2233   LOG (GNUNET_ERROR_TYPE_DEBUG,
2234       "New channel was established to us (Peer %s).\n",
2235       GNUNET_i2s (&peer));
2236
2237   GNUNET_assert (NULL != channel);
2238
2239   // we might not even store the recv_channel
2240
2241   peer_ctx = get_peer_ctx (peer_map, &peer);
2242   // FIXME what do we do if a channel is established twice?
2243   //       overwrite? Clean old channel? ...?
2244   //if (NULL != peer_ctx->recv_channel)
2245   //{
2246   //  peer_ctx->recv_channel = channel;
2247   //}
2248   peer_ctx->recv_channel = channel;
2249
2250   (void) GNUNET_CONTAINER_multipeermap_put (peer_map, &peer, peer_ctx,
2251       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
2252
2253   peer_is_live (peer_ctx);
2254
2255   return NULL; // TODO
2256 }
2257
2258
2259 /**
2260  * This is called when a remote peer destroys a channel.
2261  *
2262  * @param cls The closure
2263  * @param channel The channel being closed
2264  * @param channel_ctx The context associated with this channel
2265  */
2266   static void
2267 cleanup_channel (void *cls,
2268                 const struct GNUNET_CADET_Channel *channel,
2269                 void *channel_ctx)
2270 {
2271   struct GNUNET_PeerIdentity *peer;
2272   struct PeerContext *peer_ctx;
2273
2274   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
2275       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
2276        // Guess simply casting isn't the nicest way...
2277        // FIXME wait for cadet to change this function
2278
2279   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
2280   {
2281     peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
2282
2283     if (NULL == peer_ctx) /* It could have been removed by shutdown_task */
2284       return;
2285
2286     if (channel == peer_ctx->send_channel)
2287     { /* Peer probably went down */
2288       LOG (GNUNET_ERROR_TYPE_DEBUG,
2289            "Peer %s destroyed send channel - probably went down, cleaning up\n",
2290            GNUNET_i2s (peer));
2291       rem_from_list (&gossip_list, &gossip_list_size, peer);
2292       rem_from_list (&pending_pull_reply_list, &pending_pull_reply_list_size, peer);
2293
2294       peer_ctx->send_channel = NULL;
2295       /* Somwewhat {ab,re}use the iterator function */
2296       /* Cast to void is ok, because it's used as void in peer_remove_cb */
2297       (void) peer_remove_cb ((void *) channel, peer, peer_ctx);
2298     }
2299     else if (channel == peer_ctx->recv_channel)
2300     { /* Other peer doesn't want to send us messages anymore */
2301       LOG (GNUNET_ERROR_TYPE_DEBUG,
2302            "Peer %s destroyed recv channel - cleaning up channel\n",
2303            GNUNET_i2s (peer));
2304       peer_ctx->recv_channel = NULL;
2305     }
2306   }
2307 }
2308
2309
2310 /**
2311  * Actually start the service.
2312  */
2313   static void
2314 rps_start (struct GNUNET_SERVER_Handle *server)
2315 {
2316   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2317     {&handle_client_request,     NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2318       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2319     {&handle_client_seed,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2320     #ifdef ENABLE_MALICIOUS
2321     {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2322     #endif /* ENABLE_MALICIOUS */
2323     {NULL, NULL, 0, 0}
2324   };
2325
2326   GNUNET_SERVER_add_handlers (server, handlers);
2327   GNUNET_SERVER_disconnect_notify (server,
2328                                    &handle_client_disconnect,
2329                                    NULL);
2330   LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
2331
2332
2333   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2334   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2335
2336   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2337                                                         &shutdown_task,
2338                                                         NULL);
2339 }
2340
2341
2342 /**
2343  * Process statistics requests.
2344  *
2345  * @param cls closure
2346  * @param server the initialized server
2347  * @param c configuration to use
2348  */
2349   static void
2350 run (void *cls,
2351      struct GNUNET_SERVER_Handle *server,
2352      const struct GNUNET_CONFIGURATION_Handle *c)
2353 {
2354   // TODO check what this does -- copied from gnunet-boss
2355   // - seems to work as expected
2356   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2357   cfg = c;
2358
2359
2360   /* Get own ID */
2361   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2362   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2363               "STARTING SERVICE (rps) for peer [%s]\n",
2364               GNUNET_i2s (&own_identity));
2365   #ifdef ENABLE_MALICIOUS
2366   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2367               "Malicious execution compiled in.\n");
2368   #endif /* ENABLE_MALICIOUS */
2369
2370
2371
2372   /* Get time interval from the configuration */
2373   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2374                                                         "ROUNDINTERVAL",
2375                                                         &round_interval))
2376   {
2377     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
2378     GNUNET_SCHEDULER_shutdown ();
2379     return;
2380   }
2381
2382   /* Get initial size of sampler/gossip list from the configuration */
2383   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
2384                                                          "INITSIZE",
2385                                                          (long long unsigned int *) &sampler_size_est_need))
2386   {
2387     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
2388     GNUNET_SCHEDULER_shutdown ();
2389     return;
2390   }
2391   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
2392
2393
2394   gossip_list = NULL;
2395
2396
2397   /* connect to NSE */
2398   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2399   // TODO check whether that was successful
2400   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
2401
2402
2403   alpha = 0.45;
2404   beta  = 0.45;
2405
2406   peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO);
2407
2408
2409   /* Initialise cadet */
2410   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2411     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        ,
2412       sizeof (struct GNUNET_MessageHeader)},
2413     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2414       sizeof (struct GNUNET_MessageHeader)},
2415     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
2416     {NULL, 0, 0}
2417   };
2418
2419   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
2420   cadet_handle = GNUNET_CADET_connect (cfg,
2421                                        cls,
2422                                        &handle_inbound_channel,
2423                                        &cleanup_channel,
2424                                        cadet_handlers,
2425                                        ports);
2426   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
2427
2428
2429   /* Initialise sampler */
2430   struct GNUNET_TIME_Relative half_round_interval;
2431   struct GNUNET_TIME_Relative  max_round_interval;
2432
2433   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2434   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2435
2436   prot_sampler =   RPS_sampler_init (sampler_size_est_need, max_round_interval);
2437   client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
2438
2439   /* Initialise push and pull maps */
2440   push_list = NULL;
2441   push_list_size = 0;
2442   pull_list = NULL;
2443   pull_list_size = 0;
2444   pending_pull_reply_list = NULL;
2445   pending_pull_reply_list_size = 0;
2446
2447
2448   num_hist_update_tasks = 0;
2449
2450
2451   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2452   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2453   // TODO send push/pull to each of those peers?
2454
2455
2456   rps_start (server);
2457 }
2458
2459
2460 /**
2461  * The main function for the rps service.
2462  *
2463  * @param argc number of arguments from the command line
2464  * @param argv command line arguments
2465  * @return 0 ok, 1 on error
2466  */
2467   int
2468 main (int argc, char *const *argv)
2469 {
2470   return (GNUNET_OK ==
2471           GNUNET_SERVICE_run (argc,
2472                               argv,
2473                               "rps",
2474                               GNUNET_SERVICE_OPTION_NONE,
2475                               &run, NULL)) ? 0 : 1;
2476 }
2477
2478 /* end of gnunet-service-rps.c */