-restructured code
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_nse_service.h"
30 #include "rps.h"
31
32 #include "gnunet-service-rps_sampler.h"
33
34 #include <math.h>
35 #include <inttypes.h>
36
37 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
38
39 // TODO modify @brief in every file
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // (TODO api -- possibility of getting weak random peer immideately)
46
47 // TODO connect to friends
48
49 // TODO store peers somewhere
50
51 // TODO ignore list?
52
53 // hist_size_init, hist_size_max
54
55 /**
56  * Our configuration.
57  */
58 static const struct GNUNET_CONFIGURATION_Handle *cfg;
59
60 /**
61  * Our own identity.
62  */
63 static struct GNUNET_PeerIdentity own_identity;
64
65
66   struct GNUNET_PeerIdentity *
67 get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list, unsigned int size,
68                            const struct GNUNET_PeerIdentity *ignore_list, unsigned int ignore_size);
69
70
71 /***********************************************************************
72  * Housekeeping with peers
73 ***********************************************************************/
74
75 /**
76  * Struct used to store the context of a connected client.
77  */
78 struct client_ctx
79 {
80   /**
81    * The message queue to communicate with the client.
82    */
83   struct GNUNET_MQ_Handle *mq;
84 };
85
86 /**
87  * Used to keep track in what lists single peerIDs are.
88  */
89 enum PeerFlags
90 {
91   PULL_REPLY_PENDING   = 0x01,
92   IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
93   IN_OWN_SAMPLER_LIST  = 0x04, // unneeded?
94   IN_OWN_GOSSIP_LIST   = 0x08, // unneeded?
95
96   /**
97    * We set this bit when we can be sure the other peer is/was live.
98    */
99   VALID                = 0x10
100 };
101
102
103 /**
104  * Functions of this type can be used to be stored at a peer for later execution.
105  */
106 typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
107
108 /**
109  * Outstanding operation on peer consisting of callback and closure
110  */
111 struct PeerOutstandingOp
112 {
113   /**
114    * Callback
115    */
116   PeerOp op;
117
118   /**
119    * Closure
120    */
121   void *op_cls;
122 };
123
124
125 /**
126  * Struct used to keep track of other peer's status
127  *
128  * This is stored in a multipeermap.
129  */
130 struct PeerContext
131 {
132   /**
133    * In own gossip/sampler list, in other's gossip/sampler list
134    */
135   uint32_t peer_flags;
136
137   /**
138    * Message queue open to client
139    */
140   struct GNUNET_MQ_Handle *mq;
141
142   /**
143    * Channel open to client.
144    */
145   struct GNUNET_CADET_Channel *send_channel;
146
147   /**
148    * Channel open from client.
149    */
150   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
151
152   /**
153    * Array of outstanding operations on this peer.
154    */
155   struct PeerOutstandingOp *outstanding_ops;
156
157   /**
158    * Number of outstanding operations.
159    */
160   unsigned int num_outstanding_ops;
161   //size_t num_outstanding_ops;
162
163   /**
164    * Handle to the callback given to cadet_ntfy_tmt_rdy()
165    *
166    * To be canceled on shutdown.
167    */
168   struct GNUNET_CADET_TransmitHandle *is_live_task;
169
170   /**
171    * Identity of the peer
172    */
173   struct GNUNET_PeerIdentity peer_id;
174
175   /**
176    * This is pobably followed by 'statistical' data (when we first saw
177    * him, how did we get his ID, how many pushes (in a timeinterval),
178    * ...)
179    */
180 };
181
182 /***********************************************************************
183  * /Housekeeping with peers
184 ***********************************************************************/
185
186
187
188
189
190 /***********************************************************************
191  * Globals
192 ***********************************************************************/
193
194 /**
195  * Sampler used for the Brahms protocol itself.
196  */
197 static struct RPS_Sampler *prot_sampler;
198
199 /**
200  * Sampler used for the clients.
201  */
202 static struct RPS_Sampler *client_sampler;
203
204 /**
205  * Set of all peers to keep track of them.
206  */
207 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
208
209
210 /**
211  * The gossiped list of peers.
212  */
213 static struct GNUNET_PeerIdentity *gossip_list;
214
215 /**
216  * Size of the gossiped list
217  */
218 //static unsigned int gossip_list_size;
219 static uint32_t gossip_list_size;
220
221
222 /**
223  * The size of sampler we need to be able to satisfy the client's need of
224  * random peers.
225  */
226 static unsigned int sampler_size_client_need;
227
228 /**
229  * The size of sampler we need to be able to satisfy the Brahms protocol's
230  * need of random peers.
231  *
232  * This is directly taken as the #gossip_list_size on update of the
233  * #gossip_list
234  *
235  * This is one minimum size the sampler grows to.
236  */
237 static unsigned int sampler_size_est_need;
238
239
240 /**
241  * Percentage of total peer number in the gossip list
242  * to send random PUSHes to
243  */
244 static float alpha;
245
246 /**
247  * Percentage of total peer number in the gossip list
248  * to send random PULLs to
249  */
250 static float beta;
251
252 /**
253  * The percentage gamma of history updates.
254  * Simply 1 - alpha - beta
255  */
256
257
258 /**
259  * Identifier for the main task that runs periodically.
260  */
261 static struct GNUNET_SCHEDULER_Task *do_round_task;
262
263 /**
264  * Time inverval the do_round task runs in.
265  */
266 static struct GNUNET_TIME_Relative round_interval;
267
268
269
270 /**
271  * List to store peers received through pushes temporary.
272  *
273  * TODO -> multipeermap
274  */
275 static struct GNUNET_PeerIdentity *push_list;
276
277 /**
278  * Size of the push_list;
279  */
280 static unsigned int push_list_size;
281 //size_t push_list_size;
282
283 /**
284  * List to store peers received through pulls temporary.
285  *
286  * TODO -> multipeermap
287  */
288 static struct GNUNET_PeerIdentity *pull_list;
289
290 /**
291  * Size of the pull_list;
292  */
293 static unsigned int pull_list_size;
294 //size_t pull_list_size;
295
296
297 /**
298  * Handler to NSE.
299  */
300 static struct GNUNET_NSE_Handle *nse;
301
302 /**
303  * Handler to CADET.
304  */
305 static struct GNUNET_CADET_Handle *cadet_handle;
306
307
308 /**
309  * Request counter.
310  *
311  * Only needed in the beginning to check how many of the 64 deltas
312  * we already have
313  */
314 static unsigned int req_counter;
315
316 /**
317  * Time of the last request we received.
318  *
319  * Used to compute the expected request rate.
320  */
321 static struct GNUNET_TIME_Absolute last_request;
322
323 /**
324  * Size of #request_deltas.
325  */
326 #define REQUEST_DELTAS_SIZE 64
327 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
328
329 /**
330  * Last 64 deltas between requests
331  */
332 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
333
334 /**
335  * The prediction of the rate of requests
336  */
337 static struct GNUNET_TIME_Relative  request_rate;
338
339
340 /**
341  * List with the peers we sent requests to.
342  */
343 struct GNUNET_PeerIdentity *pending_pull_reply_list;
344
345 /**
346  * Size of #pending_pull_reply_list.
347  */
348 uint32_t pending_pull_reply_list_size;
349
350
351 /**
352  * Number of history update tasks.
353  */
354 uint32_t num_hist_update_tasks;
355
356
357 #ifdef ENABLE_MALICIOUS
358 /**
359  * Type of malicious peer
360  *
361  * 0 Don't act malicious at all - Default
362  * 1 Try to maximise representation
363  * 2 Try to partition the network
364  */
365 uint32_t mal_type = 0;
366
367 /**
368  * Other malicious peers
369  */
370 static struct GNUNET_PeerIdentity *mal_peers = NULL;
371
372 /**
373  * Hashmap of malicious peers used as set.
374  * Used to more efficiently check whether we know that peer.
375  */
376 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set = NULL;
377
378 /**
379  * Number of other malicious peers
380  */
381 static uint32_t num_mal_peers = 0;
382
383
384 /**
385  * If type is 2 This struct is used to store the attacked peers in a DLL
386  */
387 struct AttackedPeer
388 {
389   /**
390    * DLL
391    */
392   struct AttackedPeer *next;
393   struct AttackedPeer *prev;
394
395   /**
396    * PeerID
397    */
398   struct GNUNET_PeerIdentity peer_id;
399 };
400
401 /**
402  * If type is 2 this is the DLL of attacked peers
403  */
404 static struct AttackedPeer *att_peers_head = NULL;
405 static struct AttackedPeer *att_peers_tail = NULL;
406
407 /**
408  * This index is used to point to an attacked peer to
409  * implement the round-robin-ish way to select attacked peers.
410  */
411 static struct AttackedPeer *att_peer_index = NULL;
412
413 /**
414  * Hashmap of attacked peers used as set.
415  * Used to more efficiently check whether we know that peer.
416  */
417 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set = NULL;
418
419 /**
420  * Number of attacked peers
421  */
422 static uint32_t num_attacked_peers = 0;
423
424
425 /**
426  * If type is 1 this is the attacked peer
427  */
428 static struct GNUNET_PeerIdentity attacked_peer;
429
430 /**
431  * The limit of PUSHes we can send in one round.
432  * This is an assumption of the Brahms protocol and either implemented
433  * via proof of work
434  * or
435  * assumend to be the bandwidth limitation.
436  */
437 static uint32_t push_limit = 10000;
438 #endif /* ENABLE_MALICIOUS */
439
440
441 /***********************************************************************
442  * /Globals
443 ***********************************************************************/
444
445
446
447
448
449
450 /***********************************************************************
451  * Util functions
452 ***********************************************************************/
453
454 /**
455  * Set a peer flag of given peer context.
456  */
457 #define set_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags |= mask)
458
459 /**
460  * Get peer flag of given peer context.
461  */
462 #define get_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags & mask ? GNUNET_YES : GNUNET_NO)
463
464 /**
465  * Unset flag of given peer context.
466  */
467 #define unset_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags &= (~mask))
468
469 /**
470  * Compute the minimum of two ints
471  */
472 #define min(x, y) ((x < y) ? x : y)
473
474 /**
475  * Clean the send channel of a peer
476  */
477 void
478 peer_clean (const struct GNUNET_PeerIdentity *peer);
479
480
481 /**
482  * Check if peer is already in peer array.
483  */
484   int
485 in_arr (const struct GNUNET_PeerIdentity *array,
486         unsigned int arr_size,
487         const struct GNUNET_PeerIdentity *peer)
488 {
489   GNUNET_assert (NULL != peer);
490
491   if (0 == arr_size)
492     return GNUNET_NO;
493
494   GNUNET_assert (NULL != array);
495
496   unsigned int i;
497
498   for (i = 0; i < arr_size ; i++)
499     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&array[i], peer))
500       return GNUNET_YES;
501   return GNUNET_NO;
502 }
503
504
505 /**
506  * Print peerlist to log.
507  */
508 void
509 print_peer_list (struct GNUNET_PeerIdentity *list, unsigned int len)
510 {
511   unsigned int i;
512
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514        "Printing peer list of length %u at %p:\n",
515        len,
516        list);
517   for (i = 0 ; i < len ; i++)
518   {
519     LOG (GNUNET_ERROR_TYPE_DEBUG,
520          "%u. peer: %s\n",
521          i, GNUNET_i2s (&list[i]));
522   }
523 }
524
525
526 /**
527  * Remove peer from list.
528  */
529   void
530 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
531                unsigned int *list_size,
532                const struct GNUNET_PeerIdentity *peer)
533 {
534   unsigned int i;
535   struct GNUNET_PeerIdentity *tmp;
536
537   tmp = *peer_list;
538
539   LOG (GNUNET_ERROR_TYPE_DEBUG,
540        "Removing peer %s from list at %p\n",
541        GNUNET_i2s (peer),
542        tmp);
543
544   for ( i = 0 ; i < *list_size ; i++ )
545   {
546     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
547     {
548       if (i < *list_size -1)
549       { /* Not at the last entry -- shift peers left */
550         memcpy (&tmp[i], &tmp[i +1],
551                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
552       }
553       /* Remove last entry (should be now useless PeerID) */
554       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
555     }
556   }
557   *peer_list = tmp;
558 }
559
560 /**
561  * Get random peer from the given list but don't return one from the @a ignore_list.
562  */
563   struct GNUNET_PeerIdentity *
564 get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list,
565                            uint32_t list_size,
566                            const struct GNUNET_PeerIdentity *ignore_list,
567                            uint32_t ignore_size)
568 {
569   uint32_t r_index;
570   uint32_t tmp_size;
571   struct GNUNET_PeerIdentity *tmp_peer_list;
572   struct GNUNET_PeerIdentity *peer;
573
574   GNUNET_assert (NULL != peer_list);
575   if (0 == list_size)
576     return NULL;
577
578   tmp_size = 0;
579   tmp_peer_list = NULL;
580   GNUNET_array_grow (tmp_peer_list, tmp_size, list_size);
581   memcpy (tmp_peer_list,
582           peer_list,
583           list_size * sizeof (struct GNUNET_PeerIdentity));
584   peer = GNUNET_new (struct GNUNET_PeerIdentity);
585
586   /**;
587    * Choose the r_index of the peer we want to return
588    * at random from the interval of the gossip list
589    */
590   r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
591                                       tmp_size);
592   *peer = tmp_peer_list[r_index];
593
594   while (in_arr (ignore_list, ignore_size, peer))
595   {
596     rem_from_list (&tmp_peer_list, &tmp_size, peer);
597
598     print_peer_list (tmp_peer_list, tmp_size);
599
600     if (0 == tmp_size)
601     {
602       GNUNET_free (peer);
603       return NULL;
604     }
605
606     /**;
607      * Choose the r_index of the peer we want to return
608      * at random from the interval of the gossip list
609      */
610     r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
611                                         tmp_size);
612     *peer = tmp_peer_list[r_index];
613   }
614
615
616   GNUNET_array_grow (tmp_peer_list, tmp_size, 0);
617
618   return peer;
619 }
620
621
622 /**
623  * Get the context of a peer. If not existing, create.
624  */
625   struct PeerContext *
626 get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
627               const struct GNUNET_PeerIdentity *peer)
628 {
629   struct PeerContext *ctx;
630
631   if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
632   {
633     ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
634   }
635   else
636   {
637     ctx = GNUNET_new (struct PeerContext);
638     ctx->peer_flags = 0;
639     ctx->mq = NULL;
640     ctx->send_channel = NULL;
641     ctx->recv_channel = NULL;
642     ctx->outstanding_ops = NULL;
643     ctx->num_outstanding_ops = 0;
644     ctx->is_live_task = NULL;
645     ctx->peer_id = *peer;
646     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
647                                               GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
648   }
649   return ctx;
650 }
651
652
653 /**
654  * Put random peer from sampler into the gossip list as history update.
655  */
656   void
657 hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
658 {
659   GNUNET_assert (1 == num_peers);
660
661   if (gossip_list_size < sampler_size_est_need)
662     GNUNET_array_append (gossip_list, gossip_list_size, *ids);
663
664   if (0 < num_hist_update_tasks)
665     num_hist_update_tasks--;
666 }
667
668
669 /**
670  * Set the peer flag to living and call the outstanding operations on this peer.
671  */
672 static size_t
673 peer_is_live (struct PeerContext *peer_ctx)
674 {
675   struct GNUNET_PeerIdentity *peer;
676
677   /* Cancle is_live_task if still scheduled */
678   if (NULL != peer_ctx->is_live_task)
679   {
680     GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
681     peer_ctx->is_live_task = NULL;
682   }
683
684   peer = &peer_ctx->peer_id;
685   set_peer_flag (peer_ctx, VALID);
686
687   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
688
689   if (0 < peer_ctx->num_outstanding_ops)
690   { /* Call outstanding operations */
691     unsigned int i;
692
693     for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
694       peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer);
695     GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
696   }
697
698   return 0;
699 }
700
701
702 /**
703  * Callback that is called when a channel was effectively established.
704  * This is given to ntfy_tmt_rdy and called when the channel was
705  * successfully established.
706  */
707 static size_t
708 cadet_ntfy_tmt_rdy_cb (void *cls, size_t size, void *buf)
709 {
710   struct PeerContext *peer_ctx = (struct PeerContext *) cls;
711
712   peer_ctx->is_live_task = NULL;
713   LOG (GNUNET_ERROR_TYPE_DEBUG,
714        "Set ->is_live_task = NULL for peer %s\n",
715        GNUNET_i2s (&peer_ctx->peer_id));
716
717   if (NULL != buf
718       && 0 != size)
719   {
720     peer_is_live (peer_ctx);
721   }
722   else
723   {
724     LOG (GNUNET_ERROR_TYPE_WARNING,
725          "Problems establishing a connection to peer %s in order to check liveliness\n",
726          GNUNET_i2s (&peer_ctx->peer_id));
727     // TODO reschedule? cleanup?
728   }
729
730   //if (NULL != peer_ctx->is_live_task)
731   //{
732   //  LOG (GNUNET_ERROR_TYPE_DEBUG,
733   //       "Trying to cancle is_live_task for peer %s\n",
734   //       GNUNET_i2s (&peer_ctx->peer_id));
735   //  GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
736   //  peer_ctx->is_live_task = NULL;
737   //}
738
739   return 0;
740 }
741
742
743 /**
744  * Get the channel of a peer. If not existing, create.
745  */
746   struct GNUNET_CADET_Channel *
747 get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
748              const struct GNUNET_PeerIdentity *peer)
749 {
750   struct PeerContext *peer_ctx;
751
752   peer_ctx = get_peer_ctx (peer_map, peer);
753
754   GNUNET_assert (NULL == peer_ctx->is_live_task);
755
756   if (NULL == peer_ctx->send_channel)
757   {
758     LOG (GNUNET_ERROR_TYPE_DEBUG,
759          "Trying to establish channel to peer %s\n",
760          GNUNET_i2s (peer));
761
762     peer_ctx->send_channel =
763       GNUNET_CADET_channel_create (cadet_handle,
764                                    NULL,
765                                    peer,
766                                    GNUNET_RPS_CADET_PORT,
767                                    GNUNET_CADET_OPTION_RELIABLE);
768
769     // do I have to explicitly put it in the peer_map?
770     (void) GNUNET_CONTAINER_multipeermap_put
771       (peer_map,
772        peer,
773        peer_ctx,
774        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
775   }
776   return peer_ctx->send_channel;
777 }
778
779
780 /**
781  * Get the message queue of a specific peer.
782  *
783  * If we already have a message queue open to this client,
784  * simply return it, otherways create one.
785  */
786   struct GNUNET_MQ_Handle *
787 get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
788         const struct GNUNET_PeerIdentity *peer_id)
789 {
790   struct PeerContext *peer_ctx;
791
792   peer_ctx = get_peer_ctx (peer_map, peer_id);
793
794   GNUNET_assert (NULL == peer_ctx->is_live_task);
795
796   if (NULL == peer_ctx->mq)
797   {
798     (void) get_channel (peer_map, peer_id);
799     peer_ctx->mq = GNUNET_CADET_mq_create (peer_ctx->send_channel);
800     //do I have to explicitly put it in the peer_map?
801     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer_id, peer_ctx,
802                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
803   }
804   return peer_ctx->mq;
805 }
806
807
808 /**
809  * Issue check whether peer is live
810  *
811  * @param peer_ctx the context of the peer
812  */
813 void
814 check_peer_live (struct PeerContext *peer_ctx)
815 {
816   (void) get_channel (peer_map, &peer_ctx->peer_id);
817   LOG (GNUNET_ERROR_TYPE_DEBUG,
818        "Get informed about peer %s getting live\n",
819        GNUNET_i2s (&peer_ctx->peer_id));
820   if (NULL == peer_ctx->is_live_task)
821   {
822     peer_ctx->is_live_task =
823         GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel,
824                                             GNUNET_NO,
825                                             GNUNET_TIME_UNIT_FOREVER_REL,
826                                             sizeof (struct GNUNET_MessageHeader),
827                                             cadet_ntfy_tmt_rdy_cb,
828                                             peer_ctx);
829   }
830   else
831   {
832     LOG (GNUNET_ERROR_TYPE_DEBUG,
833          "Already waiting for notification\n");
834   }
835 }
836
837
838 /**
839  * Sum all time relatives of an array.
840   */
841   struct GNUNET_TIME_Relative
842 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
843 {
844   struct GNUNET_TIME_Relative sum;
845   uint32_t i;
846
847   sum = GNUNET_TIME_UNIT_ZERO;
848   for ( i = 0 ; i < arr_size ; i++ )
849   {
850     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
851   }
852   return sum;
853 }
854
855
856 /**
857  * Compute the average of given time relatives.
858  */
859   struct GNUNET_TIME_Relative
860 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
861 {
862   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
863 }
864
865
866 /**
867  * Insert PeerID in #pull_list
868  *
869  * Called once we know a peer is live.
870  */
871   void
872 insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer)
873 {
874   if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer))
875     GNUNET_array_append (pull_list, pull_list_size, *peer);
876
877   peer_clean (peer);
878 }
879
880 /**
881  * Check whether #insert_in_pull_list was already scheduled
882  */
883   int
884 insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx)
885 {
886   unsigned int i;
887
888   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
889     if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op)
890       return GNUNET_YES;
891   return GNUNET_NO;
892 }
893
894
895 /**
896  * Insert PeerID in #gossip_list
897  *
898  * Called once we know a peer is live.
899  */
900   void
901 insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer)
902 {
903   if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer))
904     GNUNET_array_append (gossip_list, gossip_list_size, *peer);
905
906   (void) get_channel (peer_map, peer);
907 }
908
909 /**
910  * Check whether #insert_in_gossip_list was already scheduled
911  */
912   int
913 insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx)
914 {
915   unsigned int i;
916
917   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
918     if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op)
919       return GNUNET_YES;
920   return GNUNET_NO;
921 }
922
923
924 /**
925  * Update sampler with given PeerID.
926  */
927   void
928 insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
929 {
930   LOG (GNUNET_ERROR_TYPE_DEBUG,
931        "Updating samplers with peer %s from insert_in_sampler()\n",
932        GNUNET_i2s (peer));
933   RPS_sampler_update (prot_sampler,   peer);
934   RPS_sampler_update (client_sampler, peer);
935 }
936
937
938 /**
939  * Check whether #insert_in_sampler was already scheduled
940  */
941 static int
942 insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
943 {
944   unsigned int i;
945
946   for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
947     if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
948       return GNUNET_YES;
949   return GNUNET_NO;
950 }
951
952
953 /**
954  * Wrapper around #RPS_sampler_resize()
955  *
956  * If we do not have enough sampler elements, double current sampler size
957  * If we have more than enough sampler elements, halv current sampler size
958  */
959 static void
960 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
961 {
962   unsigned int sampler_size;
963
964   // TODO statistics
965   // TODO respect the min, max
966   sampler_size = RPS_sampler_get_size (sampler);
967   if (sampler_size > new_size * 4)
968   { /* Shrinking */
969     RPS_sampler_resize (sampler, sampler_size / 2);
970   }
971   else if (sampler_size < new_size)
972   { /* Growing */
973     RPS_sampler_resize (sampler, sampler_size * 2);
974   }
975   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
976 }
977
978
979 /**
980  * Wrapper around #RPS_sampler_resize() resizing the client sampler
981  */
982 static void
983 client_resize_wrapper ()
984 {
985   uint32_t bigger_size;
986   unsigned int sampler_size;
987
988   // TODO statistics
989
990   sampler_size = RPS_sampler_get_size (client_sampler);
991
992   if (sampler_size_est_need > sampler_size_client_need)
993     bigger_size = sampler_size_est_need;
994   else
995     bigger_size = sampler_size_client_need;
996
997   // TODO respect the min, max
998   resize_wrapper (client_sampler, bigger_size);
999   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
1000 }
1001
1002
1003 /**
1004  * Estimate request rate
1005  *
1006  * Called every time we receive a request from the client.
1007  */
1008   void
1009 est_request_rate()
1010 {
1011   struct GNUNET_TIME_Relative max_round_duration;
1012
1013   if (request_deltas_size > req_counter)
1014     req_counter++;
1015   if ( 1 < req_counter)
1016   {
1017     /* Shift last request deltas to the right */
1018     memcpy (&request_deltas[1],
1019         request_deltas,
1020         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
1021
1022     /* Add current delta to beginning */
1023     request_deltas[0] =
1024         GNUNET_TIME_absolute_get_difference (last_request,
1025                                              GNUNET_TIME_absolute_get ());
1026     request_rate = T_relative_avg (request_deltas, req_counter);
1027
1028     /* Compute the duration a round will maximally take */
1029     max_round_duration =
1030         GNUNET_TIME_relative_add (round_interval,
1031                                   GNUNET_TIME_relative_divide (round_interval, 2));
1032
1033     /* Set the estimated size the sampler has to have to
1034      * satisfy the current client request rate */
1035     sampler_size_client_need =
1036         max_round_duration.rel_value_us / request_rate.rel_value_us;
1037
1038     /* Resize the sampler */
1039     client_resize_wrapper ();
1040   }
1041   last_request = GNUNET_TIME_absolute_get ();
1042 }
1043
1044
1045 /**
1046  * Add all peers in @a peer_array to @peer_map used as set.
1047  *
1048  * @param peer_array array containing the peers
1049  * @param num_peers number of peers in @peer_array
1050  * @param peer_map the peermap to use as set
1051  */
1052 static void
1053 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
1054                        unsigned int num_peers,
1055                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
1056 {
1057   unsigned int i;
1058   if (NULL == peer_map)
1059     peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers + 1,
1060                                                      GNUNET_NO);
1061   for (i = 0 ; i < num_peers ; i++)
1062   {
1063     GNUNET_CONTAINER_multipeermap_put (peer_map,
1064                                        &peer_array[i],
1065                                        NULL,
1066                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1067   }
1068 }
1069
1070
1071 /**
1072  * Send a PULL REPLY to @a peer_id
1073  *
1074  * @param peer_id the peer to send the reply to.
1075  * @param peer_ids the peers to send to @a peer_id
1076  * @param num_peer_ids the number of peers to send to @a peer_id
1077  */
1078 static void
1079 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
1080                  const struct GNUNET_PeerIdentity *peer_ids,
1081                  unsigned int num_peer_ids)
1082 {
1083   uint32_t send_size;
1084   struct GNUNET_MQ_Handle *mq;
1085   struct GNUNET_MQ_Envelope *ev;
1086   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1087
1088   /* Compute actual size */
1089   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
1090               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
1091
1092   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
1093     /* Compute number of peers to send
1094      * If too long, simply truncate */
1095     // TODO select random ones via permutation
1096     //      or even better: do good protocol design
1097     send_size =
1098       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
1099        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1100        sizeof (struct GNUNET_PeerIdentity);
1101   else
1102     send_size = num_peer_ids;
1103
1104   LOG (GNUNET_ERROR_TYPE_DEBUG,
1105       "PULL REQUEST from peer %s received, going to send %u peers\n",
1106       GNUNET_i2s (peer_id), send_size);
1107
1108   mq = get_mq (peer_map, peer_id);
1109
1110   ev = GNUNET_MQ_msg_extra (out_msg,
1111                             send_size * sizeof (struct GNUNET_PeerIdentity),
1112                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1113   out_msg->num_peers = htonl (send_size);
1114   memcpy (&out_msg[1], peer_ids,
1115          send_size * sizeof (struct GNUNET_PeerIdentity));
1116
1117   GNUNET_MQ_send (mq, ev);
1118 }
1119
1120
1121 /***********************************************************************
1122  * /Util functions
1123 ***********************************************************************/
1124
1125
1126
1127
1128
1129 /**
1130  * Function called by NSE.
1131  *
1132  * Updates sizes of sampler list and gossip list and adapt those lists
1133  * accordingly.
1134  */
1135   void
1136 nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
1137               double logestimate, double std_dev)
1138 {
1139   double estimate;
1140   //double scale; // TODO this might go gloabal/config
1141
1142   LOG (GNUNET_ERROR_TYPE_DEBUG,
1143        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
1144        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
1145   //scale = .01;
1146   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
1147   // GNUNET_NSE_log_estimate_to_n (logestimate);
1148   estimate = pow (estimate, 1.0 / 3);
1149   // TODO add if std_dev is a number
1150   // estimate += (std_dev * scale);
1151   if (2 < ceil (estimate))
1152   {
1153     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
1154     sampler_size_est_need = estimate;
1155   } else
1156     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
1157
1158   /* If the NSE has changed adapt the lists accordingly */
1159   resize_wrapper (prot_sampler, sampler_size_est_need);
1160   client_resize_wrapper ();
1161 }
1162
1163
1164 /**
1165  * Callback called once the requested PeerIDs are ready.
1166  *
1167  * Sends those to the requesting client.
1168  */
1169 void client_respond (void *cls,
1170     struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
1171 {
1172   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler returned %" PRIX32 " peers\n", num_peers);
1173   struct GNUNET_MQ_Envelope *ev;
1174   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
1175   struct GNUNET_SERVER_Client *client;
1176   uint32_t size_needed;
1177   struct client_ctx *cli_ctx;
1178
1179   client = (struct GNUNET_SERVER_Client *) cls;
1180
1181   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1182                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1183
1184   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1185
1186   ev = GNUNET_MQ_msg_extra (out_msg,
1187                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1188                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1189   out_msg->num_peers = htonl (num_peers);
1190
1191   memcpy (&out_msg[1],
1192       ids,
1193       num_peers * sizeof (struct GNUNET_PeerIdentity));
1194   GNUNET_free (ids);
1195
1196   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
1197   if (NULL == cli_ctx) {
1198     cli_ctx = GNUNET_new (struct client_ctx);
1199     cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
1200     GNUNET_SERVER_client_set_user_context (client, cli_ctx);
1201   }
1202
1203   GNUNET_MQ_send (cli_ctx->mq, ev);
1204 }
1205
1206
1207 /**
1208  * Handle RPS request from the client.
1209  *
1210  * @param cls closure
1211  * @param client identification of the client
1212  * @param message the actual message
1213  */
1214 static void
1215 handle_client_request (void *cls,
1216             struct GNUNET_SERVER_Client *client,
1217             const struct GNUNET_MessageHeader *message)
1218 {
1219   struct GNUNET_RPS_CS_RequestMessage *msg;
1220   uint32_t num_peers;
1221   uint32_t size_needed;
1222   uint32_t i;
1223
1224   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
1225
1226   num_peers = ntohl (msg->num_peers);
1227   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1228                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1229
1230   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1231   {
1232     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1233     return;
1234   }
1235
1236   for (i = 0 ; i < num_peers ; i++)
1237     est_request_rate();
1238
1239   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers);
1240
1241   RPS_sampler_get_n_rand_peers (client_sampler, client_respond,
1242                                 client, num_peers, GNUNET_YES);
1243
1244   GNUNET_SERVER_receive_done (client,
1245                               GNUNET_OK);
1246 }
1247
1248
1249 /**
1250  * Handle seed from the client.
1251  *
1252  * @param cls closure
1253  * @param client identification of the client
1254  * @param message the actual message
1255  */
1256   static void
1257 handle_client_seed (void *cls,
1258                     struct GNUNET_SERVER_Client *client,
1259                     const struct GNUNET_MessageHeader *message)
1260 {
1261   struct GNUNET_RPS_CS_SeedMessage *in_msg;
1262   struct GNUNET_PeerIdentity *peers;
1263   uint32_t num_peers;
1264   uint32_t i;
1265
1266   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) > ntohs (message->size))
1267   {
1268     GNUNET_break_op (0);
1269     GNUNET_SERVER_receive_done (client,
1270                                 GNUNET_SYSERR);
1271   }
1272
1273   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
1274   num_peers = ntohl (in_msg->num_peers);
1275   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1276   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1277   //memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1278
1279   if ((ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1280       sizeof (struct GNUNET_PeerIdentity) != num_peers)
1281   {
1282     GNUNET_break_op (0);
1283     GNUNET_SERVER_receive_done (client,
1284                                 GNUNET_SYSERR);
1285   }
1286
1287   LOG (GNUNET_ERROR_TYPE_DEBUG,
1288        "Client seeded peers:\n");
1289   print_peer_list (peers, num_peers);
1290
1291   // TODO check for validity of ids
1292
1293   for (i = 0 ; i < num_peers ; i++)
1294   {
1295     LOG (GNUNET_ERROR_TYPE_DEBUG,
1296          "Updating samplers with seed %" PRIX32 ": %s\n",
1297          i,
1298          GNUNET_i2s (&peers[i]));
1299
1300     RPS_sampler_update (prot_sampler,   &peers[i]);
1301     RPS_sampler_update (client_sampler, &peers[i]);
1302   }
1303
1304   //GNUNET_free (peers);
1305
1306   GNUNET_SERVER_receive_done (client,
1307                                                 GNUNET_OK);
1308 }
1309
1310
1311 /**
1312  * Handle a PUSH message from another peer.
1313  *
1314  * Check the proof of work and store the PeerID
1315  * in the temporary list for pushed PeerIDs.
1316  *
1317  * @param cls Closure
1318  * @param channel The channel the PUSH was received over
1319  * @param channel_ctx The context associated with this channel
1320  * @param msg The message header
1321  */
1322 static int
1323 handle_peer_push (void *cls,
1324     struct GNUNET_CADET_Channel *channel,
1325     void **channel_ctx,
1326     const struct GNUNET_MessageHeader *msg)
1327 {
1328   const struct GNUNET_PeerIdentity *peer;
1329
1330   // (check the proof of work)
1331
1332   peer = (const struct GNUNET_PeerIdentity *)
1333     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1334   // FIXME wait for cadet to change this function
1335
1336   LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received (%s)\n", GNUNET_i2s (peer));
1337
1338   #ifdef ENABLE_MALICIOUS
1339   struct AttackedPeer *tmp_att_peer;
1340
1341   tmp_att_peer = GNUNET_new (struct AttackedPeer);
1342   memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1343   if (1 == mal_type)
1344   { /* Try to maximise representation */
1345     if (NULL == att_peer_set)
1346       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1347     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1348                                                              peer))
1349     {
1350       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1351                                    att_peers_tail,
1352                                    tmp_att_peer);
1353       add_peer_array_to_set (peer, 1, att_peer_set);
1354     }
1355     return GNUNET_OK;
1356   }
1357
1358
1359   else if (2 == mal_type)
1360   { /* We attack one single well-known peer - simply ignore */
1361     return GNUNET_OK;
1362   }
1363
1364   #endif /* ENABLE_MALICIOUS */
1365
1366   /* Add the sending peer to the push_list */
1367   if (GNUNET_NO == in_arr (push_list, push_list_size, peer))
1368     GNUNET_array_append (push_list, push_list_size, *peer);
1369
1370   return GNUNET_OK;
1371 }
1372
1373
1374 /**
1375  * Handle PULL REQUEST request message from another peer.
1376  *
1377  * Reply with the gossip list of PeerIDs.
1378  *
1379  * @param cls Closure
1380  * @param channel The channel the PUSH was received over
1381  * @param channel_ctx The context associated with this channel
1382  * @param msg The message header
1383  */
1384 static int
1385 handle_peer_pull_request (void *cls,
1386     struct GNUNET_CADET_Channel *channel,
1387     void **channel_ctx,
1388     const struct GNUNET_MessageHeader *msg)
1389 {
1390   struct GNUNET_PeerIdentity *peer;
1391
1392   peer = (struct GNUNET_PeerIdentity *)
1393     GNUNET_CADET_channel_get_info (channel,
1394                                    GNUNET_CADET_OPTION_PEER);
1395   // FIXME wait for cadet to change this function
1396
1397   #ifdef ENABLE_MALICIOUS
1398   if (1 == mal_type)
1399   { /* Try to maximise representation */
1400     send_pull_reply (peer, mal_peers, num_mal_peers);
1401     return GNUNET_OK;
1402   }
1403
1404   else if (2 == mal_type)
1405   { /* Try to partition network */
1406     if (GNUNET_YES == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1407     {
1408       send_pull_reply (peer, mal_peers, num_mal_peers);
1409     }
1410     return GNUNET_OK;
1411   }
1412   #endif /* ENABLE_MALICIOUS */
1413
1414   send_pull_reply (peer, gossip_list, gossip_list_size);
1415
1416   return GNUNET_OK;
1417 }
1418
1419
1420 /**
1421  * Handle PULL REPLY message from another peer.
1422  *
1423  * Check whether we sent a corresponding request and
1424  * whether this reply is the first one.
1425  *
1426  * @param cls Closure
1427  * @param channel The channel the PUSH was received over
1428  * @param channel_ctx The context associated with this channel
1429  * @param msg The message header
1430  */
1431   static int
1432 handle_peer_pull_reply (void *cls,
1433                         struct GNUNET_CADET_Channel *channel,
1434                         void **channel_ctx,
1435                         const struct GNUNET_MessageHeader *msg)
1436 {
1437   LOG (GNUNET_ERROR_TYPE_DEBUG, "PULL REPLY received\n");
1438
1439   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1440   struct GNUNET_PeerIdentity *peers;
1441   struct PeerContext *peer_ctx;
1442   struct GNUNET_PeerIdentity *sender;
1443   struct PeerContext *sender_ctx;
1444   struct PeerOutstandingOp out_op;
1445   uint32_t i;
1446 #ifdef ENABLE_MALICIOUS
1447   struct AttackedPeer *tmp_att_peer;
1448 #endif /* ENABLE_MALICIOUS */
1449
1450   /* Check for protocol violation */
1451   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1452   {
1453     GNUNET_break_op (0);
1454     return GNUNET_SYSERR;
1455   }
1456
1457   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1458   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1459       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1460   {
1461     LOG (GNUNET_ERROR_TYPE_ERROR,
1462         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1463         ntohl (in_msg->num_peers),
1464         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1465             sizeof (struct GNUNET_PeerIdentity));
1466     GNUNET_break_op (0);
1467     return GNUNET_SYSERR;
1468   }
1469
1470   sender = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
1471       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
1472        // Guess simply casting isn't the nicest way...
1473        // FIXME wait for cadet to change this function
1474   sender_ctx = get_peer_ctx (peer_map, sender);
1475
1476   if (GNUNET_YES == get_peer_flag (sender_ctx, PULL_REPLY_PENDING))
1477   {
1478     GNUNET_break_op (0);
1479     return GNUNET_OK;
1480   }
1481
1482 #ifdef ENABLE_MALICIOUS
1483   // We shouldn't even receive pull replies as we're not sending
1484   if (2 == mal_type)
1485     return GNUNET_OK;
1486 #endif /* ENABLE_MALICIOUS */
1487
1488   /* Do actual logic */
1489   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1490
1491   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1492   {
1493   #ifdef ENABLE_MALICIOUS
1494     if (1 == mal_type)
1495     {
1496       // TODO check if we sent a request and this was the first reply
1497       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1498                                                                &peers[i])
1499           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1500                                                                   &peers[i]))
1501       {
1502         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1503         tmp_att_peer->peer_id = peers[i];
1504         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1505                                      att_peers_tail,
1506                                      tmp_att_peer);
1507         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1508       }
1509       continue;
1510     }
1511   #endif /* ENABLE_MALICIOUS */
1512     peer_ctx = get_peer_ctx (peer_map, &peers[i]);
1513     if (GNUNET_YES == get_peer_flag (peer_ctx, VALID)
1514         || NULL != peer_ctx->send_channel
1515         || NULL != peer_ctx->recv_channel)
1516     {
1517       if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])
1518           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peers[i]))
1519         GNUNET_array_append (pull_list, pull_list_size, peers[i]);
1520     }
1521     else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx))
1522     {
1523       out_op.op = insert_in_pull_list;
1524       out_op.op_cls = NULL;
1525       GNUNET_array_append (peer_ctx->outstanding_ops,
1526                            peer_ctx->num_outstanding_ops,
1527                            out_op);
1528       check_peer_live (peer_ctx);
1529     }
1530   }
1531
1532   unset_peer_flag (sender_ctx, PULL_REPLY_PENDING);
1533   rem_from_list (&pending_pull_reply_list, &pending_pull_reply_list_size, sender);
1534
1535   return GNUNET_OK;
1536 }
1537
1538
1539 /**
1540  * Compute a random delay.
1541  * A uniformly distributed value between mean + spread and mean - spread.
1542  *
1543  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1544  * It would return a random value between 2 and 6 min.
1545  *
1546  * @param mean the mean
1547  * @param spread the inverse amount of deviation from the mean
1548  */
1549 static struct GNUNET_TIME_Relative
1550 compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread)
1551 {
1552   struct GNUNET_TIME_Relative half_interval;
1553   struct GNUNET_TIME_Relative ret;
1554   unsigned int rand_delay;
1555   unsigned int max_rand_delay;
1556
1557   if (0 == spread)
1558   {
1559     LOG (GNUNET_ERROR_TYPE_WARNING,
1560          "Not accepting spread of 0\n");
1561     GNUNET_break (0);
1562   }
1563
1564   /* Compute random time value between spread * mean and spread * mean */
1565   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1566
1567   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1568   /**
1569    * Compute random value between (0 and 1) * round_interval
1570    * via multiplying round_interval with a 'fraction' (0 to value)/value
1571    */
1572   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1573   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1574   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1575   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1576
1577   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1578     LOG (GNUNET_ERROR_TYPE_WARNING,
1579          "Returning FOREVER_REL\n");
1580
1581   return ret;
1582 }
1583
1584
1585 /**
1586  * Send single pull request
1587  *
1588  * @param peer_id the peer to send the pull request to.
1589  */
1590 static void
1591 send_pull_request (struct GNUNET_PeerIdentity *peer_id)
1592 {
1593   struct GNUNET_MQ_Envelope *ev;
1594   struct GNUNET_MQ_Handle *mq;
1595
1596   LOG (GNUNET_ERROR_TYPE_DEBUG,
1597        "Sending PULL request to peer %s of gossiped list.\n",
1598        GNUNET_i2s (peer_id));
1599
1600   GNUNET_array_append (pending_pull_reply_list, pending_pull_reply_list_size, *peer_id);
1601
1602   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1603   mq = get_mq (peer_map, peer_id);
1604   GNUNET_MQ_send (mq, ev);
1605 }
1606
1607
1608 /**
1609  * Send single push
1610  *
1611  * @param peer_id the peer to send the push to.
1612  */
1613 static void
1614 send_push (struct GNUNET_PeerIdentity *peer_id)
1615 {
1616   struct GNUNET_MQ_Envelope *ev;
1617   struct GNUNET_MQ_Handle *mq;
1618
1619   LOG (GNUNET_ERROR_TYPE_DEBUG,
1620        "Sending PUSH to peer %s of gossiped list.\n",
1621        GNUNET_i2s (peer_id));
1622
1623   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1624   mq = get_mq (peer_map, peer_id);
1625   GNUNET_MQ_send (mq, ev);
1626 }
1627
1628
1629 static void
1630 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1631
1632 static void
1633 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1634
1635
1636 #ifdef ENABLE_MALICIOUS
1637 /**
1638  * Turn RPS service to act malicious.
1639  *
1640  * @param cls Closure
1641  * @param channel The channel the PUSH was received over
1642  * @param channel_ctx The context associated with this channel
1643  * @param msg The message header
1644  */
1645   static void
1646 handle_client_act_malicious (void *cls,
1647                              struct GNUNET_SERVER_Client *client,
1648                              const struct GNUNET_MessageHeader *msg)
1649 {
1650   struct GNUNET_RPS_CS_ActMaliciousMessage *in_msg;
1651   struct GNUNET_PeerIdentity *peers;
1652   uint32_t num_mal_peers_sent;
1653   uint32_t num_mal_peers_old;
1654
1655   /* Check for protocol violation */
1656   if (sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage) > ntohs (msg->size))
1657   {
1658     GNUNET_break_op (0);
1659   }
1660
1661   in_msg = (struct GNUNET_RPS_CS_ActMaliciousMessage *) msg;
1662   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1663       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1664   {
1665     LOG (GNUNET_ERROR_TYPE_ERROR,
1666         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1667         ntohl (in_msg->num_peers),
1668         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1669             sizeof (struct GNUNET_PeerIdentity));
1670     GNUNET_break_op (0);
1671   }
1672
1673
1674   /* Do actual logic */
1675   // FIXME ingore own id
1676   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1677   mal_type = ntohl (in_msg->type);
1678
1679   LOG (GNUNET_ERROR_TYPE_DEBUG,
1680        "Now acting malicious type %" PRIu32 "\n",
1681        mal_type);
1682
1683   if (1 == mal_type)
1684   { /* Try to maximise representation */
1685     /* Add other malicious peers to those we already know */
1686
1687     num_mal_peers_sent = ntohl (in_msg->num_peers);
1688     num_mal_peers_old = num_mal_peers;
1689     GNUNET_array_grow (mal_peers,
1690                        num_mal_peers,
1691                        num_mal_peers + num_mal_peers_sent);
1692     memcpy (&mal_peers[num_mal_peers_old],
1693             peers,
1694             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1695
1696     /* Add all mal peers to mal_peer_set */
1697     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1698                            num_mal_peers_sent,
1699                            mal_peer_set);
1700
1701     /* Substitute do_round () with do_mal_round () */
1702     GNUNET_SCHEDULER_cancel (do_round_task);
1703     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1704   }
1705
1706   else if (2 == mal_type)
1707   { /* Try to partition the network */
1708     /* Add other malicious peers to those we already know */
1709     num_mal_peers_sent = ntohl (in_msg->num_peers) - 1;
1710     num_mal_peers_old = num_mal_peers;
1711     GNUNET_array_grow (mal_peers,
1712                        num_mal_peers,
1713                        num_mal_peers + num_mal_peers_sent);
1714     memcpy (&mal_peers[num_mal_peers_old],
1715             peers,
1716             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1717
1718     /* Add all mal peers to mal_peer_set */
1719     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1720                            num_mal_peers_sent,
1721                            mal_peer_set);
1722
1723     /* Store the one attacked peer */
1724     memcpy (&attacked_peer,
1725             &in_msg->attacked_peer,
1726             sizeof (struct GNUNET_PeerIdentity));
1727
1728     LOG (GNUNET_ERROR_TYPE_DEBUG,
1729          "Attacked peer is %s\n",
1730          GNUNET_i2s (&attacked_peer));
1731
1732     /* Substitute do_round () with do_mal_round () */
1733     GNUNET_SCHEDULER_cancel (do_round_task);
1734     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1735   }
1736   else if (0 == mal_type)
1737   { /* Stop acting malicious */
1738     num_mal_peers = 0;
1739     GNUNET_free (mal_peers);
1740
1741     /* Substitute do_mal_round () with do_round () */
1742     GNUNET_SCHEDULER_cancel (do_round_task);
1743     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1744   }
1745   else
1746   {
1747     GNUNET_break (0);
1748   }
1749 }
1750
1751
1752 /**
1753  * Send out PUSHes and PULLs maliciously.
1754  *
1755  * This is executed regylary.
1756  */
1757 static void
1758 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1759 {
1760   uint32_t num_pushes;
1761   uint32_t i;
1762   struct GNUNET_TIME_Relative time_next_round;
1763   struct AttackedPeer *tmp_att_peer;
1764
1765   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously.\n");
1766
1767   /* Do malicious actions */
1768   if (1 == mal_type)
1769   { /* Try to maximise representation */
1770
1771     /* The maximum of pushes we're going to send this round */
1772     num_pushes = min (min (push_limit,
1773                            num_attacked_peers),
1774                        GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1775
1776     /* Send PUSHes to attacked peers */
1777     for (i = 0 ; i < num_pushes ; i++)
1778     {
1779       if (att_peers_tail == att_peer_index)
1780         att_peer_index = att_peers_head;
1781       else
1782         att_peer_index = att_peer_index->next;
1783
1784       send_push (&att_peer_index->peer_id);
1785     }
1786
1787     /* Send PULLs to some peers to learn about additional peers to attack */
1788     for (i = 0 ; i < num_pushes * alpha ; i++)
1789     {
1790       if (att_peers_tail == tmp_att_peer)
1791         tmp_att_peer = att_peers_head;
1792       else
1793         att_peer_index = tmp_att_peer->next;
1794
1795       send_pull_request (&tmp_att_peer->peer_id);
1796     }
1797   }
1798
1799
1800   else if (2 == mal_type)
1801   { /**
1802      * Try to partition the network
1803      * Send as many pushes to the attacked peer as possible
1804      * That is one push per round as it will ignore more.
1805      */
1806       send_push (&attacked_peer);
1807   }
1808
1809
1810   /* Schedule next round */
1811   time_next_round = compute_rand_delay (round_interval, 2);
1812
1813   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round, NULL);
1814   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_mal_round, NULL);
1815   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1816 }
1817 #endif /* ENABLE_MALICIOUS */
1818
1819
1820 /**
1821  * Send out PUSHes and PULLs, possibly update #gossip_list, samplers.
1822  *
1823  * This is executed regylary.
1824  */
1825 static void
1826 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1827 {
1828   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n");
1829
1830   uint32_t i;
1831   unsigned int *permut;
1832   unsigned int n_peers; /* Number of peers we send pushes/pulls to */
1833   struct GNUNET_PeerIdentity peer;
1834   struct GNUNET_PeerIdentity *tmp_peer;
1835
1836   LOG (GNUNET_ERROR_TYPE_DEBUG,
1837        "Printing gossip list:\n");
1838   for (i = 0 ; i < gossip_list_size ; i++)
1839     LOG (GNUNET_ERROR_TYPE_DEBUG,
1840          "\t%s\n", GNUNET_i2s (&gossip_list[i]));
1841   // TODO log lists, ...
1842
1843   /* Would it make sense to have one shuffeled gossip list and then
1844    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
1845    * use the rest to update sampler?
1846    * in essence get random peers with consumption */
1847
1848   /* Send PUSHes */
1849   if (0 < gossip_list_size)
1850   {
1851     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1852                                            (unsigned int) gossip_list_size);
1853     n_peers = ceil (alpha * gossip_list_size);
1854     LOG (GNUNET_ERROR_TYPE_DEBUG,
1855          "Going to send pushes to %u ceil (%f * %u) peers.\n",
1856          n_peers, alpha, gossip_list_size);
1857     for (i = 0 ; i < n_peers ; i++)
1858     {
1859       peer = gossip_list[permut[i]];
1860       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1861       { // FIXME if this fails schedule/loop this for later
1862         send_push (&peer);
1863       }
1864     }
1865     GNUNET_free (permut);
1866   }
1867
1868
1869   /* Send PULL requests */
1870   //permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size);
1871   n_peers = ceil (beta * gossip_list_size);
1872   LOG (GNUNET_ERROR_TYPE_DEBUG,
1873        "Going to send pulls to %u ceil (%f * %u) peers.\n",
1874        n_peers, beta, gossip_list_size);
1875   for (i = 0 ; i < n_peers ; i++)
1876   {
1877     tmp_peer = get_rand_peer_ignore_list (gossip_list, gossip_list_size,
1878         pending_pull_reply_list, pending_pull_reply_list_size);
1879     if (NULL != tmp_peer)
1880     {
1881       peer = *tmp_peer;
1882       GNUNET_free (tmp_peer);
1883
1884       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer))
1885       {
1886         send_pull_request (&peer);
1887       }
1888     }
1889   }
1890
1891
1892   /* Update gossip list */
1893
1894   if ( push_list_size <= alpha * gossip_list_size &&
1895        push_list_size != 0 &&
1896        pull_list_size != 0 )
1897   {
1898     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n");
1899
1900     uint32_t first_border;
1901     uint32_t second_border;
1902     uint32_t r_index;
1903     uint32_t peers_to_clean_size;
1904     struct GNUNET_PeerIdentity *peers_to_clean;
1905
1906     peers_to_clean = NULL;
1907     peers_to_clean_size = 0;
1908     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, gossip_list_size);
1909     memcpy (peers_to_clean,
1910             gossip_list,
1911             gossip_list_size * sizeof (struct GNUNET_PeerIdentity));
1912
1913     first_border  =                ceil (alpha * sampler_size_est_need);
1914     second_border = first_border + ceil (beta  * sampler_size_est_need);
1915
1916     GNUNET_array_grow (gossip_list, gossip_list_size, second_border);
1917
1918     for (i = 0 ; i < first_border ; i++)
1919     { // TODO use RPS_sampler_get_n_rand_peers
1920       /* Update gossip list with peers received through PUSHes */
1921       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1922                                        push_list_size);
1923       gossip_list[i] = push_list[r_index];
1924       // TODO change the peer_flags accordingly
1925     }
1926
1927     for (i = first_border ; i < second_border ; i++)
1928     {
1929       /* Update gossip list with peers received through PULLs */
1930       r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG,
1931                                        pull_list_size);
1932       gossip_list[i] = pull_list[r_index];
1933       // TODO change the peer_flags accordingly
1934     }
1935
1936     for (i = second_border ; i < sampler_size_est_need ; i++)
1937     {
1938       /* Update gossip list with peers from history */
1939       RPS_sampler_get_n_rand_peers (prot_sampler, hist_update, NULL, 1, GNUNET_NO);
1940       num_hist_update_tasks++;
1941       // TODO change the peer_flags accordingly
1942     }
1943
1944     for (i = 0 ; i < gossip_list_size ; i++)
1945       rem_from_list (&peers_to_clean, &peers_to_clean_size, &gossip_list[i]);
1946
1947     for (i = 0 ; i < peers_to_clean_size ; i++)
1948       peer_clean (&peers_to_clean[i]);
1949
1950     GNUNET_free (peers_to_clean);
1951   }
1952   else
1953   {
1954     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list.\n");
1955   }
1956   // TODO independent of that also get some peers from CADET_get_peers()?
1957
1958
1959   /* Update samplers */
1960   for ( i = 0 ; i < push_list_size ; i++ )
1961   {
1962     LOG (GNUNET_ERROR_TYPE_DEBUG,
1963          "Updating with peer %s from push list\n",
1964          GNUNET_i2s (&push_list[i]));
1965     RPS_sampler_update (prot_sampler,   &push_list[i]);
1966     RPS_sampler_update (client_sampler, &push_list[i]);
1967     // TODO set in_flag?
1968   }
1969
1970   for ( i = 0 ; i < pull_list_size ; i++ )
1971   {
1972     LOG (GNUNET_ERROR_TYPE_DEBUG,
1973          "Updating with peer %s from pull list\n",
1974          GNUNET_i2s (&pull_list[i]));
1975     RPS_sampler_update (prot_sampler,   &pull_list[i]);
1976     RPS_sampler_update (client_sampler, &pull_list[i]);
1977     // TODO set in_flag?
1978   }
1979
1980
1981   /* Empty push/pull lists */
1982   GNUNET_array_grow (push_list, push_list_size, 0);
1983   GNUNET_array_grow (pull_list, pull_list_size, 0);
1984
1985   struct GNUNET_TIME_Relative time_next_round;
1986
1987   time_next_round = compute_rand_delay (round_interval, 2);
1988
1989   /* Schedule next round */
1990   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round, NULL);
1991   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_round, NULL);
1992   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1993 }
1994
1995
1996 static void
1997 rps_start (struct GNUNET_SERVER_Handle *server);
1998
1999
2000 /**
2001  * This is called from GNUNET_CADET_get_peers().
2002  *
2003  * It is called on every peer(ID) that cadet somehow has contact with.
2004  * We use those to initialise the sampler.
2005  */
2006 void
2007 init_peer_cb (void *cls,
2008               const struct GNUNET_PeerIdentity *peer,
2009               int tunnel, // "Do we have a tunnel towards this peer?"
2010               unsigned int n_paths, // "Number of known paths towards this peer"
2011               unsigned int best_path) // "How long is the best path?
2012                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2013 {
2014   struct PeerOutstandingOp out_op;
2015   struct PeerContext *peer_ctx;
2016
2017   if (NULL != peer
2018       && 0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer))
2019   {
2020     LOG (GNUNET_ERROR_TYPE_DEBUG,
2021         "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n",
2022         GNUNET_i2s (peer), peer, gossip_list_size);
2023
2024     // maybe create a function for that
2025     peer_ctx = get_peer_ctx (peer_map, peer);
2026     if (GNUNET_YES != get_peer_flag (peer_ctx, VALID))
2027     {
2028       if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
2029       {
2030         out_op.op = insert_in_sampler;
2031         out_op.op_cls = NULL;
2032         GNUNET_array_append (peer_ctx->outstanding_ops,
2033                              peer_ctx->num_outstanding_ops,
2034                              out_op);
2035       }
2036
2037       if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx))
2038       {
2039         out_op.op = insert_in_gossip_list;
2040         out_op.op_cls = NULL;
2041         GNUNET_array_append (peer_ctx->outstanding_ops,
2042                              peer_ctx->num_outstanding_ops,
2043                              out_op);
2044       }
2045
2046       /* Trigger livelyness test on peer */
2047       check_peer_live (peer_ctx);
2048     }
2049
2050     // send push/pull to each of those peers?
2051   }
2052 }
2053
2054
2055 /**
2056  * Clean the send channel of a peer
2057  */
2058 void
2059 peer_clean (const struct GNUNET_PeerIdentity *peer)
2060 {
2061   struct PeerContext *peer_ctx;
2062   struct GNUNET_CADET_Channel *channel;
2063
2064   if (GNUNET_YES != in_arr (gossip_list, gossip_list_size, peer)
2065       && GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
2066   {
2067     peer_ctx = get_peer_ctx (peer_map, peer);
2068     if (NULL != peer_ctx->send_channel)
2069     {
2070       channel = peer_ctx->send_channel;
2071       peer_ctx->send_channel = NULL;
2072       GNUNET_CADET_channel_destroy (channel);
2073     }
2074   }
2075 }
2076
2077
2078 /**
2079  * Callback used to remove peers from the multipeermap.
2080  */
2081   int
2082 peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
2083 {
2084   struct PeerContext *peer_ctx;
2085   const struct GNUNET_CADET_Channel *channel =
2086     (const struct GNUNET_CADET_Channel *) cls;
2087   struct GNUNET_CADET_Channel *recv;
2088   struct GNUNET_CADET_Channel *send;
2089
2090   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, value))
2091   {
2092     peer_ctx = (struct PeerContext *) value;
2093
2094     if (0 != peer_ctx->num_outstanding_ops)
2095       GNUNET_array_grow (peer_ctx->outstanding_ops,
2096                          peer_ctx->num_outstanding_ops,
2097                          0);
2098
2099     if (NULL != peer_ctx->mq)
2100     {
2101       GNUNET_MQ_destroy (peer_ctx->mq);
2102       peer_ctx->mq = NULL;
2103     }
2104
2105
2106     if (NULL != peer_ctx->is_live_task)
2107     {
2108     LOG (GNUNET_ERROR_TYPE_DEBUG,
2109          "Trying to cancle is_live_task for peer %s\n",
2110          GNUNET_i2s (key));
2111       GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->is_live_task);
2112       peer_ctx->is_live_task = NULL;
2113     }
2114
2115     send = peer_ctx->send_channel;
2116     peer_ctx->send_channel = NULL;
2117     if (NULL != send
2118         && channel != send)
2119     {
2120       GNUNET_CADET_channel_destroy (send);
2121     }
2122
2123     recv = peer_ctx->send_channel;
2124     peer_ctx->recv_channel = NULL;
2125     if (NULL != recv
2126         && channel != recv)
2127     {
2128       GNUNET_CADET_channel_destroy (recv);
2129     }
2130
2131     if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, key))
2132       LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
2133     else
2134       GNUNET_free (peer_ctx);
2135   }
2136
2137   return GNUNET_YES;
2138 }
2139
2140
2141 /**
2142  * Task run during shutdown.
2143  *
2144  * @param cls unused
2145  * @param tc unused
2146  */
2147 static void
2148 shutdown_task (void *cls,
2149                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2150 {
2151
2152   LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
2153
2154   if (NULL != do_round_task)
2155   {
2156     GNUNET_SCHEDULER_cancel (do_round_task);
2157     do_round_task = NULL;
2158   }
2159
2160
2161   {
2162   if (GNUNET_SYSERR ==
2163         GNUNET_CONTAINER_multipeermap_iterate (peer_map, peer_remove_cb, NULL))
2164     LOG (GNUNET_ERROR_TYPE_WARNING,
2165         "Iterating over peers to disconnect from them was cancelled\n");
2166   }
2167
2168   GNUNET_NSE_disconnect (nse);
2169   GNUNET_CADET_disconnect (cadet_handle);
2170   RPS_sampler_destroy (prot_sampler);
2171   RPS_sampler_destroy (client_sampler);
2172   LOG (GNUNET_ERROR_TYPE_DEBUG,
2173        "Size of the peermap: %u\n",
2174        GNUNET_CONTAINER_multipeermap_size (peer_map));
2175   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
2176   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
2177   GNUNET_array_grow (gossip_list, gossip_list_size, 0);
2178   GNUNET_array_grow (push_list, push_list_size, 0);
2179   GNUNET_array_grow (pull_list, pull_list_size, 0);
2180   #ifdef ENABLE_MALICIOUS
2181   struct AttackedPeer *tmp_att_peer;
2182   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2183   if (NULL != mal_peer_set)
2184     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2185   if (NULL != att_peer_set)
2186     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2187   while (NULL != att_peers_head)
2188   {
2189     tmp_att_peer = att_peers_head;
2190     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2191   }
2192   #endif /* ENABLE_MALICIOUS */
2193 }
2194
2195
2196 /**
2197  * A client disconnected.  Remove all of its data structure entries.
2198  *
2199  * @param cls closure, NULL
2200  * @param client identification of the client
2201  */
2202 static void
2203 handle_client_disconnect (void *cls,
2204                           struct GNUNET_SERVER_Client * client)
2205 {
2206 }
2207
2208
2209 /**
2210  * Handle the channel a peer opens to us.
2211  *
2212  * @param cls The closure
2213  * @param channel The channel the peer wants to establish
2214  * @param initiator The peer's peer ID
2215  * @param port The port the channel is being established over
2216  * @param options Further options
2217  */
2218   static void *
2219 handle_inbound_channel (void *cls,
2220                         struct GNUNET_CADET_Channel *channel,
2221                         const struct GNUNET_PeerIdentity *initiator,
2222                         uint32_t port,
2223                         enum GNUNET_CADET_ChannelOption options)
2224 {
2225   struct PeerContext *peer_ctx;
2226   struct GNUNET_PeerIdentity peer;
2227
2228   peer = *initiator;
2229   LOG (GNUNET_ERROR_TYPE_DEBUG,
2230       "New channel was established to us (Peer %s).\n",
2231       GNUNET_i2s (&peer));
2232
2233   GNUNET_assert (NULL != channel);
2234
2235   // we might not even store the recv_channel
2236
2237   peer_ctx = get_peer_ctx (peer_map, &peer);
2238   // FIXME what do we do if a channel is established twice?
2239   //       overwrite? Clean old channel? ...?
2240   //if (NULL != peer_ctx->recv_channel)
2241   //{
2242   //  peer_ctx->recv_channel = channel;
2243   //}
2244   peer_ctx->recv_channel = channel;
2245
2246   (void) GNUNET_CONTAINER_multipeermap_put (peer_map, &peer, peer_ctx,
2247       GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
2248
2249   peer_is_live (peer_ctx);
2250
2251   return NULL; // TODO
2252 }
2253
2254
2255 /**
2256  * This is called when a remote peer destroys a channel.
2257  *
2258  * @param cls The closure
2259  * @param channel The channel being closed
2260  * @param channel_ctx The context associated with this channel
2261  */
2262   static void
2263 cleanup_channel (void *cls,
2264                 const struct GNUNET_CADET_Channel *channel,
2265                 void *channel_ctx)
2266 {
2267   struct GNUNET_PeerIdentity *peer;
2268   struct PeerContext *peer_ctx;
2269
2270   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
2271       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
2272        // Guess simply casting isn't the nicest way...
2273        // FIXME wait for cadet to change this function
2274
2275   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
2276   {
2277     peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
2278
2279     if (NULL == peer_ctx) /* It could have been removed by shutdown_task */
2280       return;
2281
2282     if (channel == peer_ctx->send_channel)
2283     { /* Peer probably went down */
2284       LOG (GNUNET_ERROR_TYPE_DEBUG,
2285            "Peer %s destroyed send channel - probably went down, cleaning up\n",
2286            GNUNET_i2s (peer));
2287       rem_from_list (&gossip_list, &gossip_list_size, peer);
2288       rem_from_list (&pending_pull_reply_list, &pending_pull_reply_list_size, peer);
2289
2290       peer_ctx->send_channel = NULL;
2291       /* Somwewhat {ab,re}use the iterator function */
2292       /* Cast to void is ok, because it's used as void in peer_remove_cb */
2293       (void) peer_remove_cb ((void *) channel, peer, peer_ctx);
2294     }
2295     else if (channel == peer_ctx->recv_channel)
2296     { /* Other peer doesn't want to send us messages anymore */
2297       LOG (GNUNET_ERROR_TYPE_DEBUG,
2298            "Peer %s destroyed recv channel - cleaning up channel\n",
2299            GNUNET_i2s (peer));
2300       peer_ctx->recv_channel = NULL;
2301     }
2302   }
2303 }
2304
2305
2306 /**
2307  * Actually start the service.
2308  */
2309   static void
2310 rps_start (struct GNUNET_SERVER_Handle *server)
2311 {
2312   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2313     {&handle_client_request,     NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2314       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2315     {&handle_client_seed,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2316     #ifdef ENABLE_MALICIOUS
2317     {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2318     #endif /* ENABLE_MALICIOUS */
2319     {NULL, NULL, 0, 0}
2320   };
2321
2322   GNUNET_SERVER_add_handlers (server, handlers);
2323   GNUNET_SERVER_disconnect_notify (server,
2324                                    &handle_client_disconnect,
2325                                    NULL);
2326   LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n");
2327
2328
2329   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2330   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2331
2332   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2333                                                         &shutdown_task,
2334                                                         NULL);
2335 }
2336
2337
2338 /**
2339  * Process statistics requests.
2340  *
2341  * @param cls closure
2342  * @param server the initialized server
2343  * @param c configuration to use
2344  */
2345   static void
2346 run (void *cls,
2347      struct GNUNET_SERVER_Handle *server,
2348      const struct GNUNET_CONFIGURATION_Handle *c)
2349 {
2350   // TODO check what this does -- copied from gnunet-boss
2351   // - seems to work as expected
2352   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2353   cfg = c;
2354
2355
2356   /* Get own ID */
2357   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2358   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2359               "STARTING SERVICE (rps) for peer [%s]\n",
2360               GNUNET_i2s (&own_identity));
2361   #ifdef ENABLE_MALICIOUS
2362   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363               "Malicious execution compiled in.\n");
2364   #endif /* ENABLE_MALICIOUS */
2365
2366
2367
2368   /* Get time interval from the configuration */
2369   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2370                                                         "ROUNDINTERVAL",
2371                                                         &round_interval))
2372   {
2373     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read ROUNDINTERVAL from config\n");
2374     GNUNET_SCHEDULER_shutdown ();
2375     return;
2376   }
2377
2378   /* Get initial size of sampler/gossip list from the configuration */
2379   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (cfg, "RPS",
2380                                                          "INITSIZE",
2381                                                          (long long unsigned int *) &sampler_size_est_need))
2382   {
2383     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to read INITSIZE from config\n");
2384     GNUNET_SCHEDULER_shutdown ();
2385     return;
2386   }
2387   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
2388
2389
2390   gossip_list = NULL;
2391
2392
2393   /* connect to NSE */
2394   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2395   // TODO check whether that was successful
2396   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n");
2397
2398
2399   alpha = 0.45;
2400   beta  = 0.45;
2401
2402   peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO);
2403
2404
2405   /* Initialise cadet */
2406   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2407     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        ,
2408       sizeof (struct GNUNET_MessageHeader)},
2409     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2410       sizeof (struct GNUNET_MessageHeader)},
2411     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
2412     {NULL, 0, 0}
2413   };
2414
2415   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
2416   cadet_handle = GNUNET_CADET_connect (cfg,
2417                                        cls,
2418                                        &handle_inbound_channel,
2419                                        &cleanup_channel,
2420                                        cadet_handlers,
2421                                        ports);
2422   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
2423
2424
2425   /* Initialise sampler */
2426   struct GNUNET_TIME_Relative half_round_interval;
2427   struct GNUNET_TIME_Relative  max_round_interval;
2428
2429   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2430   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2431
2432   prot_sampler =   RPS_sampler_init (sampler_size_est_need, max_round_interval);
2433   client_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
2434
2435   /* Initialise push and pull maps */
2436   push_list = NULL;
2437   push_list_size = 0;
2438   pull_list = NULL;
2439   pull_list_size = 0;
2440   pending_pull_reply_list = NULL;
2441   pending_pull_reply_list_size = 0;
2442
2443
2444   num_hist_update_tasks = 0;
2445
2446
2447   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2448   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2449   // TODO send push/pull to each of those peers?
2450
2451
2452   rps_start (server);
2453 }
2454
2455
2456 /**
2457  * The main function for the rps service.
2458  *
2459  * @param argc number of arguments from the command line
2460  * @param argv command line arguments
2461  * @return 0 ok, 1 on error
2462  */
2463   int
2464 main (int argc, char *const *argv)
2465 {
2466   return (GNUNET_OK ==
2467           GNUNET_SERVICE_run (argc,
2468                               argv,
2469                               "rps",
2470                               GNUNET_SERVICE_OPTION_NONE,
2471                               &run, NULL)) ? 0 : 1;
2472 }
2473
2474 /* end of gnunet-service-rps.c */