Fix for #4553
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_peerinfo_service.h"
30 #include "gnunet_nse_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_peers.h"
36 #include "gnunet-service-rps_view.h"
37
38 #include <math.h>
39 #include <inttypes.h>
40
41 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
42
43 // TODO modify @brief in every file
44
45 // TODO check for overflows
46
47 // TODO align message structs
48
49 // TODO connect to friends
50
51 // TODO store peers somewhere persistent
52
53 // TODO blacklist? (-> mal peer detection on top of brahms)
54
55 // hist_size_init, hist_size_max
56
57 /**
58  * Our configuration.
59  */
60 static const struct GNUNET_CONFIGURATION_Handle *cfg;
61
62 /**
63  * Our own identity.
64  */
65 static struct GNUNET_PeerIdentity own_identity;
66
67
68 /***********************************************************************
69  * Housekeeping with clients
70 ***********************************************************************/
71
72 /**
73  * Closure used to pass the client and the id to the callback
74  * that replies to a client's request
75  */
76 struct ReplyCls
77 {
78   /**
79    * DLL
80    */
81   struct ReplyCls *next;
82   struct ReplyCls *prev;
83
84   /**
85    * The identifier of the request
86    */
87   uint32_t id;
88
89   /**
90    * The handle to the request
91    */
92   struct RPS_SamplerRequestHandle *req_handle;
93
94   /**
95    * The client handle to send the reply to
96    */
97   struct GNUNET_SERVER_Client *client;
98 };
99
100
101 /**
102  * Struct used to store the context of a connected client.
103  */
104 struct ClientContext
105 {
106   /**
107    * DLL
108    */
109   struct ClientContext *next;
110   struct ClientContext *prev;
111
112   /**
113    * The message queue to communicate with the client.
114    */
115   struct GNUNET_MQ_Handle *mq;
116
117   /**
118    * DLL with handles to single requests from the client
119    */
120   struct ReplyCls *rep_cls_head;
121   struct ReplyCls *rep_cls_tail;
122 };
123
124 /**
125  * DLL with all clients currently connected to us
126  */
127 struct ClientContext *cli_ctx_head;
128 struct ClientContext *cli_ctx_tail;
129
130 /***********************************************************************
131  * /Housekeeping with clients
132 ***********************************************************************/
133
134
135
136
137
138 /***********************************************************************
139  * Globals
140 ***********************************************************************/
141
142 /**
143  * Sampler used for the Brahms protocol itself.
144  */
145 static struct RPS_Sampler *prot_sampler;
146
147 /**
148  * Sampler used for the clients.
149  */
150 static struct RPS_Sampler *client_sampler;
151
152 /**
153  * Name to log view to
154  */
155 static char *file_name_view_log;
156
157 /**
158  * The size of sampler we need to be able to satisfy the client's need
159  * of random peers.
160  */
161 static unsigned int sampler_size_client_need;
162
163 /**
164  * The size of sampler we need to be able to satisfy the Brahms protocol's
165  * need of random peers.
166  *
167  * This is one minimum size the sampler grows to.
168  */
169 static unsigned int sampler_size_est_need;
170
171 /**
172  * Percentage of total peer number in the view
173  * to send random PUSHes to
174  */
175 static float alpha;
176
177 /**
178  * Percentage of total peer number in the view
179  * to send random PULLs to
180  */
181 static float beta;
182
183 /**
184  * Identifier for the main task that runs periodically.
185  */
186 static struct GNUNET_SCHEDULER_Task *do_round_task;
187
188 /**
189  * Time inverval the do_round task runs in.
190  */
191 static struct GNUNET_TIME_Relative round_interval;
192
193 /**
194  * List to store peers received through pushes temporary.
195  */
196 static struct CustomPeerMap *push_map;
197
198 /**
199  * List to store peers received through pulls temporary.
200  */
201 static struct CustomPeerMap *pull_map;
202
203 /**
204  * Handler to NSE.
205  */
206 static struct GNUNET_NSE_Handle *nse;
207
208 /**
209  * Handler to CADET.
210  */
211 static struct GNUNET_CADET_Handle *cadet_handle;
212
213 /**
214  * Handler to PEERINFO.
215  */
216 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
217
218 /**
219  * Handle for cancellation of iteration over peers.
220  */
221 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
222
223 /**
224  * Request counter.
225  *
226  * Counts how many requets clients already issued.
227  * Only needed in the beginning to check how many of the 64 deltas
228  * we already have
229  */
230 static unsigned int req_counter;
231
232 /**
233  * Time of the last request we received.
234  *
235  * Used to compute the expected request rate.
236  */
237 static struct GNUNET_TIME_Absolute last_request;
238
239 /**
240  * Size of #request_deltas.
241  */
242 #define REQUEST_DELTAS_SIZE 64
243 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
244
245 /**
246  * Last 64 deltas between requests
247  */
248 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
249
250 /**
251  * The prediction of the rate of requests
252  */
253 static struct GNUNET_TIME_Relative request_rate;
254
255
256 #ifdef ENABLE_MALICIOUS
257 /**
258  * Type of malicious peer
259  *
260  * 0 Don't act malicious at all - Default
261  * 1 Try to maximise representation
262  * 2 Try to partition the network
263  * 3 Combined attack
264  */
265 static uint32_t mal_type;
266
267 /**
268  * Other malicious peers
269  */
270 static struct GNUNET_PeerIdentity *mal_peers;
271
272 /**
273  * Hashmap of malicious peers used as set.
274  * Used to more efficiently check whether we know that peer.
275  */
276 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
277
278 /**
279  * Number of other malicious peers
280  */
281 static uint32_t num_mal_peers;
282
283
284 /**
285  * If type is 2 This struct is used to store the attacked peers in a DLL
286  */
287 struct AttackedPeer
288 {
289   /**
290    * DLL
291    */
292   struct AttackedPeer *next;
293   struct AttackedPeer *prev;
294
295   /**
296    * PeerID
297    */
298   struct GNUNET_PeerIdentity peer_id;
299 };
300
301 /**
302  * If type is 2 this is the DLL of attacked peers
303  */
304 static struct AttackedPeer *att_peers_head;
305 static struct AttackedPeer *att_peers_tail;
306
307 /**
308  * This index is used to point to an attacked peer to
309  * implement the round-robin-ish way to select attacked peers.
310  */
311 static struct AttackedPeer *att_peer_index;
312
313 /**
314  * Hashmap of attacked peers used as set.
315  * Used to more efficiently check whether we know that peer.
316  */
317 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
318
319 /**
320  * Number of attacked peers
321  */
322 static uint32_t num_attacked_peers;
323
324 /**
325  * If type is 1 this is the attacked peer
326  */
327 static struct GNUNET_PeerIdentity attacked_peer;
328
329 /**
330  * The limit of PUSHes we can send in one round.
331  * This is an assumption of the Brahms protocol and either implemented
332  * via proof of work
333  * or
334  * assumend to be the bandwidth limitation.
335  */
336 static uint32_t push_limit = 10000;
337 #endif /* ENABLE_MALICIOUS */
338
339
340 /***********************************************************************
341  * /Globals
342 ***********************************************************************/
343
344
345 /***********************************************************************
346  * Util functions
347 ***********************************************************************/
348
349
350 /**
351  * Print peerlist to log.
352  */
353 static void
354 print_peer_list (struct GNUNET_PeerIdentity *list,
355                  unsigned int len)
356 {
357   unsigned int i;
358
359   LOG (GNUNET_ERROR_TYPE_DEBUG,
360        "Printing peer list of length %u at %p:\n",
361        len,
362        list);
363   for (i = 0 ; i < len ; i++)
364   {
365     LOG (GNUNET_ERROR_TYPE_DEBUG,
366          "%u. peer: %s\n",
367          i, GNUNET_i2s (&list[i]));
368   }
369 }
370
371
372 /**
373  * Remove peer from list.
374  */
375 static void
376 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
377                unsigned int *list_size,
378                const struct GNUNET_PeerIdentity *peer)
379 {
380   unsigned int i;
381   struct GNUNET_PeerIdentity *tmp;
382
383   tmp = *peer_list;
384
385   LOG (GNUNET_ERROR_TYPE_DEBUG,
386        "Removing peer %s from list at %p\n",
387        GNUNET_i2s (peer),
388        tmp);
389
390   for ( i = 0 ; i < *list_size ; i++ )
391   {
392     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
393     {
394       if (i < *list_size -1)
395       { /* Not at the last entry -- shift peers left */
396         memcpy (&tmp[i], &tmp[i +1],
397                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
398       }
399       /* Remove last entry (should be now useless PeerID) */
400       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
401     }
402   }
403   *peer_list = tmp;
404 }
405
406
407 /**
408  * Sum all time relatives of an array.
409  */
410 static struct GNUNET_TIME_Relative
411 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
412                 uint32_t arr_size)
413 {
414   struct GNUNET_TIME_Relative sum;
415   uint32_t i;
416
417   sum = GNUNET_TIME_UNIT_ZERO;
418   for ( i = 0 ; i < arr_size ; i++ )
419   {
420     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
421   }
422   return sum;
423 }
424
425
426 /**
427  * Compute the average of given time relatives.
428  */
429 static struct GNUNET_TIME_Relative
430 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
431                 uint32_t arr_size)
432 {
433   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
434                                                       arr_size),
435                                       arr_size);
436 }
437
438
439 /**
440  * Insert PeerID in #view
441  *
442  * Called once we know a peer is live.
443  * Implements #PeerOp
444  *
445  * @return GNUNET_OK if peer was actually inserted
446  *         GNUNET_NO if peer was not inserted
447  */
448 static void
449 insert_in_view_op (void *cls,
450                 const struct GNUNET_PeerIdentity *peer);
451
452 /**
453  * Insert PeerID in #view
454  *
455  * Called once we know a peer is live.
456  *
457  * @return GNUNET_OK if peer was actually inserted
458  *         GNUNET_NO if peer was not inserted
459  */
460 static int
461 insert_in_view (const struct GNUNET_PeerIdentity *peer)
462 {
463   int online;
464
465   online = Peers_check_peer_flag (peer, Peers_ONLINE);
466   if ( (GNUNET_NO == online) ||
467        (GNUNET_SYSERR == online) ) /* peer is not even known */
468   {
469     (void) Peers_check_peer_live (peer);
470     (void) Peers_schedule_operation (peer, insert_in_view_op);
471     return GNUNET_NO;
472   }
473   return View_put (peer);
474 }
475
476 /**
477  * Put random peer from sampler into the view as history update.
478  */
479 static void
480 hist_update (void *cls,
481              struct GNUNET_PeerIdentity *ids,
482              uint32_t num_peers)
483 {
484   unsigned int i;
485
486   for (i = 0; i < num_peers; i++)
487   {
488     (void) insert_in_view (&ids[i]);
489     to_file (file_name_view_log,
490              "+%s\t(hist)",
491              GNUNET_i2s_full (ids));
492   }
493 }
494
495
496 /**
497  * Wrapper around #RPS_sampler_resize()
498  *
499  * If we do not have enough sampler elements, double current sampler size
500  * If we have more than enough sampler elements, halv current sampler size
501  */
502 static void
503 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
504 {
505   unsigned int sampler_size;
506
507   // TODO statistics
508   // TODO respect the min, max
509   sampler_size = RPS_sampler_get_size (sampler);
510   if (sampler_size > new_size * 4)
511   { /* Shrinking */
512     RPS_sampler_resize (sampler, sampler_size / 2);
513   }
514   else if (sampler_size < new_size)
515   { /* Growing */
516     RPS_sampler_resize (sampler, sampler_size * 2);
517   }
518   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
519 }
520
521
522 /**
523  * Wrapper around #RPS_sampler_resize() resizing the client sampler
524  */
525 static void
526 client_resize_wrapper ()
527 {
528   uint32_t bigger_size;
529
530   // TODO statistics
531
532   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
533
534   // TODO respect the min, max
535   resize_wrapper (client_sampler, bigger_size);
536   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
537       bigger_size);
538 }
539
540
541 /**
542  * Estimate request rate
543  *
544  * Called every time we receive a request from the client.
545  */
546 static void
547 est_request_rate()
548 {
549   struct GNUNET_TIME_Relative max_round_duration;
550
551   if (request_deltas_size > req_counter)
552     req_counter++;
553   if ( 1 < req_counter)
554   {
555     /* Shift last request deltas to the right */
556     memcpy (&request_deltas[1],
557         request_deltas,
558         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
559
560     /* Add current delta to beginning */
561     request_deltas[0] =
562         GNUNET_TIME_absolute_get_difference (last_request,
563                                              GNUNET_TIME_absolute_get ());
564     request_rate = T_relative_avg (request_deltas, req_counter);
565     request_rate = (request_rate.rel_value_us < 1) ?
566       GNUNET_TIME_relative_get_unit_ () : request_rate;
567
568     /* Compute the duration a round will maximally take */
569     max_round_duration =
570         GNUNET_TIME_relative_add (round_interval,
571                                   GNUNET_TIME_relative_divide (round_interval, 2));
572
573     /* Set the estimated size the sampler has to have to
574      * satisfy the current client request rate */
575     sampler_size_client_need =
576         max_round_duration.rel_value_us / request_rate.rel_value_us;
577
578     /* Resize the sampler */
579     client_resize_wrapper ();
580   }
581   last_request = GNUNET_TIME_absolute_get ();
582 }
583
584
585 /**
586  * Add all peers in @a peer_array to @a peer_map used as set.
587  *
588  * @param peer_array array containing the peers
589  * @param num_peers number of peers in @peer_array
590  * @param peer_map the peermap to use as set
591  */
592 static void
593 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
594                        unsigned int num_peers,
595                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
596 {
597   unsigned int i;
598   if (NULL == peer_map)
599   {
600     LOG (GNUNET_ERROR_TYPE_WARNING,
601          "Trying to add peers to non-existing peermap.\n");
602     return;
603   }
604
605   for (i = 0; i < num_peers; i++)
606   {
607     GNUNET_CONTAINER_multipeermap_put (peer_map,
608                                        &peer_array[i],
609                                        NULL,
610                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
611   }
612 }
613
614
615 /**
616  * Send a PULL REPLY to @a peer_id
617  *
618  * @param peer_id the peer to send the reply to.
619  * @param peer_ids the peers to send to @a peer_id
620  * @param num_peer_ids the number of peers to send to @a peer_id
621  */
622 static void
623 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
624                  const struct GNUNET_PeerIdentity *peer_ids,
625                  unsigned int num_peer_ids)
626 {
627   uint32_t send_size;
628   struct GNUNET_MQ_Envelope *ev;
629   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
630
631   /* Compute actual size */
632   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
633               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
634
635   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
636     /* Compute number of peers to send
637      * If too long, simply truncate */
638     // TODO select random ones via permutation
639     //      or even better: do good protocol design
640     send_size =
641       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
642        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
643        sizeof (struct GNUNET_PeerIdentity);
644   else
645     send_size = num_peer_ids;
646
647   LOG (GNUNET_ERROR_TYPE_DEBUG,
648       "Going to send PULL REPLY with %u peers to %s\n",
649       send_size, GNUNET_i2s (peer_id));
650
651   ev = GNUNET_MQ_msg_extra (out_msg,
652                             send_size * sizeof (struct GNUNET_PeerIdentity),
653                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
654   out_msg->num_peers = htonl (send_size);
655   memcpy (&out_msg[1], peer_ids,
656          send_size * sizeof (struct GNUNET_PeerIdentity));
657
658   Peers_send_message (peer_id, ev, "PULL REPLY");
659 }
660
661
662 /**
663  * Insert PeerID in #pull_map
664  *
665  * Called once we know a peer is live.
666  */
667 static void
668 insert_in_pull_map (void *cls,
669                     const struct GNUNET_PeerIdentity *peer)
670 {
671   CustomPeerMap_put (pull_map, peer);
672 }
673
674
675 /**
676  * Insert PeerID in #view
677  *
678  * Called once we know a peer is live.
679  * Implements #PeerOp
680  */
681 static void
682 insert_in_view_op (void *cls,
683                 const struct GNUNET_PeerIdentity *peer)
684 {
685   (void) insert_in_view (peer);
686 }
687
688
689 /**
690  * Update sampler with given PeerID.
691  * Implements #PeerOp
692  */
693 static void
694 insert_in_sampler (void *cls,
695                    const struct GNUNET_PeerIdentity *peer)
696 {
697   LOG (GNUNET_ERROR_TYPE_DEBUG,
698        "Updating samplers with peer %s from insert_in_sampler()\n",
699        GNUNET_i2s (peer));
700   RPS_sampler_update (prot_sampler,   peer);
701   RPS_sampler_update (client_sampler, peer);
702   if (0 < RPS_sampler_count_id (prot_sampler, peer))
703   {
704     /* Make sure we 'know' about this peer */
705     (void) Peers_check_peer_live (peer);
706     /* Establish a channel towards that peer to indicate we are going to send
707      * messages to it */
708     Peers_indicate_sending_intention (peer);
709     //Peers_issue_peer_liveliness_check (peer);
710   }
711 }
712
713 /**
714  * @brief If @a peer was unknown, check liveliness and insert it in view and
715  *        sampler
716  *
717  * @param peer peer to insert
718  */
719 static void
720 got_peer (const struct GNUNET_PeerIdentity *peer)
721 {
722   /* If we did not know this peer already, insert it into sampler and view */
723   if (GNUNET_YES == Peers_check_peer_live (peer))
724   {
725     Peers_schedule_operation (peer, insert_in_sampler);
726     Peers_schedule_operation (peer, insert_in_view_op);
727   }
728 }
729
730 /**
731  * @brief Checks if there is a sending channel and if it is needed
732  *
733  * @param peer the peer whose sending channel is checked
734  * @return GNUNET_YES if sending channel exists and is still needed
735  *         GNUNET_NO  otherwise
736  */
737 static int
738 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
739 {
740   /* struct GNUNET_CADET_Channel *channel; */
741   if (GNUNET_NO == Peers_check_peer_known (peer))
742   {
743     return GNUNET_NO;
744   }
745   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
746   {
747     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
748          (GNUNET_YES == View_contains_peer (peer)) ||
749          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
750          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
751          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
752     { /* If we want to keep the connection to peer open */
753       return GNUNET_YES;
754     }
755     return GNUNET_NO;
756   }
757   return GNUNET_NO;
758 }
759
760 /**
761  * @brief remove peer from our knowledge, the view, push and pull maps and
762  * samplers.
763  *
764  * @param peer the peer to remove
765  */
766 static void
767 remove_peer (const struct GNUNET_PeerIdentity *peer)
768 {
769   View_remove_peer (peer);
770   CustomPeerMap_remove_peer (pull_map, peer);
771   CustomPeerMap_remove_peer (push_map, peer);
772   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
773   RPS_sampler_reinitialise_by_value (client_sampler, peer);
774   Peers_remove_peer (peer);
775 }
776
777
778 /**
779  * @brief Remove data that is not needed anymore.
780  *
781  * If the sending channel is no longer needed it is destroyed.
782  *
783  * @param peer the peer whose data is about to be cleaned
784  */
785 static void
786 clean_peer (const struct GNUNET_PeerIdentity *peer)
787 {
788   if (GNUNET_NO == check_sending_channel_needed (peer))
789   {
790     #ifdef ENABLE_MALICIOUS
791     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
792       Peers_destroy_sending_channel (peer);
793     #else /* ENABLE_MALICIOUS */
794     Peers_destroy_sending_channel (peer);
795     #endif /* ENABLE_MALICIOUS */
796   }
797
798   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
799        (GNUNET_NO == View_contains_peer (peer)) &&
800        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
801        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
802        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
803        (0 == RPS_sampler_count_id (client_sampler, peer)) )
804   { /* We can safely remov this peer */
805     remove_peer (peer);
806     return;
807   }
808   Peers_clean_peer (peer);
809 }
810
811 /**
812  * @brief This is called when a channel is destroyed.
813  *
814  * Removes peer completely from our knowledge if the send_channel was destroyed
815  * Otherwise simply delete the recv_channel
816  *
817  * @param cls The closure
818  * @param channel The channel being closed
819  * @param channel_ctx The context associated with this channel
820  */
821 static void
822 cleanup_destroyed_channel (void *cls,
823                            const struct GNUNET_CADET_Channel *channel,
824                            void *channel_ctx)
825 {
826   struct GNUNET_PeerIdentity *peer;
827
828   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
829       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
830        // FIXME wait for cadet to change this function
831
832   if (GNUNET_NO == Peers_check_peer_known (peer))
833   { /* We don't know a context to that peer */
834     LOG (GNUNET_ERROR_TYPE_WARNING,
835          "channel (%s) without associated context was destroyed\n",
836          GNUNET_i2s (peer));
837     return;
838   }
839
840   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
841   { /* We are in the middle of removing that peer from our knowledge. In this
842        case simply make sure that the channels are cleaned. */
843     Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
844     to_file (file_name_view_log,
845              "-%s\t(cleanup channel, ourself)",
846              GNUNET_i2s_full (peer));
847     return;
848   }
849
850   if (GNUNET_YES ==
851       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
852   { /* Channel used for sending was destroyed */
853     /* Possible causes of channel destruction:
854      *  - ourselves  -> cleaning send channel -> clean context
855      *  - other peer -> peer probably went down -> remove
856      */
857     if (GNUNET_YES == Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_CLEAN))
858     { /* We are about to clean the sending channel. Clean the respective
859        * context */
860       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
861       return;
862     }
863     else
864     { /* Other peer destroyed our sending channel that he is supposed to keep
865        * open. It probably went down. Remove it from our knowledge. */
866       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
867       remove_peer (peer);
868       return;
869     }
870   }
871   else if (GNUNET_YES ==
872       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
873   { /* Channel used for receiving was destroyed */
874     /* Possible causes of channel destruction:
875      *  - ourselves  -> peer tried to establish channel twice -> clean context
876      *  - other peer -> peer doesn't want to send us data -> clean
877      */
878     if (GNUNET_YES ==
879         Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_ESTABLISHED_TWICE))
880     { /* Other peer tried to establish a channel to us twice. We do not accept
881        * that. Clean the context. */
882       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
883       return;
884     }
885     else
886     { /* Other peer doesn't want to send us data anymore. We are free to clean
887        * it. */
888       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
889       clean_peer (peer);
890       return;
891     }
892   }
893   else
894   {
895     LOG (GNUNET_ERROR_TYPE_WARNING,
896         "Destroyed channel is neither sending nor receiving channel\n");
897   }
898 }
899
900 /***********************************************************************
901  * /Util functions
902 ***********************************************************************/
903
904 static void
905 destroy_reply_cls (struct ReplyCls *rep_cls)
906 {
907   struct ClientContext *cli_ctx;
908
909   cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client,
910                                                    struct ClientContext);
911   GNUNET_assert (NULL != cli_ctx);
912   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
913                                cli_ctx->rep_cls_tail,
914                                rep_cls);
915   GNUNET_free (rep_cls);
916 }
917
918
919 static void
920 destroy_cli_ctx (struct ClientContext *cli_ctx)
921 {
922   GNUNET_assert (NULL != cli_ctx);
923   if (NULL != cli_ctx->mq)
924   {
925     GNUNET_MQ_destroy (cli_ctx->mq);
926   }
927   if (NULL != cli_ctx->rep_cls_head)
928   {
929     LOG (GNUNET_ERROR_TYPE_WARNING,
930          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
931     while (NULL != cli_ctx->rep_cls_head)
932       destroy_reply_cls (cli_ctx->rep_cls_head);
933   }
934   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
935                                cli_ctx_tail,
936                                cli_ctx);
937   GNUNET_free (cli_ctx);
938 }
939
940
941 /**
942  * Function called by NSE.
943  *
944  * Updates sizes of sampler list and view and adapt those lists
945  * accordingly.
946  */
947 static void
948 nse_callback (void *cls,
949               struct GNUNET_TIME_Absolute timestamp,
950               double logestimate, double std_dev)
951 {
952   double estimate;
953   //double scale; // TODO this might go gloabal/config
954
955   LOG (GNUNET_ERROR_TYPE_DEBUG,
956        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
957        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
958   //scale = .01;
959   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
960   // GNUNET_NSE_log_estimate_to_n (logestimate);
961   estimate = pow (estimate, 1.0 / 3);
962   // TODO add if std_dev is a number
963   // estimate += (std_dev * scale);
964   if (2 < ceil (estimate))
965   {
966     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
967     sampler_size_est_need = estimate;
968   } else
969     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
970
971   /* If the NSE has changed adapt the lists accordingly */
972   resize_wrapper (prot_sampler, sampler_size_est_need);
973   client_resize_wrapper ();
974 }
975
976
977 /**
978  * Callback called once the requested PeerIDs are ready.
979  *
980  * Sends those to the requesting client.
981  */
982 static void
983 client_respond (void *cls,
984                 struct GNUNET_PeerIdentity *peer_ids,
985                 uint32_t num_peers)
986 {
987   uint32_t i;
988   struct GNUNET_MQ_Envelope *ev;
989   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
990   struct ReplyCls *reply_cls = (struct ReplyCls *) cls;
991   uint32_t size_needed;
992   struct ClientContext *cli_ctx;
993
994   GNUNET_assert (NULL != reply_cls);
995   LOG (GNUNET_ERROR_TYPE_DEBUG,
996        "sampler returned %" PRIu32 " peers:\n",
997        num_peers);
998   for (i = 0; i < num_peers; i++)
999   {
1000     LOG (GNUNET_ERROR_TYPE_DEBUG,
1001          "  %" PRIu32 ": %s\n",
1002          i,
1003          GNUNET_i2s (&peer_ids[i]));
1004   }
1005
1006   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1007                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1008
1009   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1010
1011   ev = GNUNET_MQ_msg_extra (out_msg,
1012                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1013                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1014   out_msg->num_peers = htonl (num_peers);
1015   out_msg->id = htonl (reply_cls->id);
1016
1017   memcpy (&out_msg[1],
1018           peer_ids,
1019           num_peers * sizeof (struct GNUNET_PeerIdentity));
1020   GNUNET_free (peer_ids);
1021
1022   cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client,
1023                                                    struct ClientContext);
1024   GNUNET_assert (NULL != cli_ctx);
1025   destroy_reply_cls (reply_cls);
1026   GNUNET_MQ_send (cli_ctx->mq, ev);
1027 }
1028
1029
1030 /**
1031  * Handle RPS request from the client.
1032  *
1033  * @param cls closure
1034  * @param client identification of the client
1035  * @param message the actual message
1036  */
1037 static void
1038 handle_client_request (void *cls,
1039                        struct GNUNET_SERVER_Client *client,
1040                        const struct GNUNET_MessageHeader *message)
1041 {
1042   struct GNUNET_RPS_CS_RequestMessage *msg;
1043   uint32_t num_peers;
1044   uint32_t size_needed;
1045   struct ReplyCls *reply_cls;
1046   uint32_t i;
1047   struct ClientContext *cli_ctx;
1048
1049   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
1050
1051   num_peers = ntohl (msg->num_peers);
1052   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1053                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1054
1055   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1056   {
1057     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1058                 "Message received from client has size larger than expected\n");
1059     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1060     return;
1061   }
1062
1063   for (i = 0 ; i < num_peers ; i++)
1064     est_request_rate();
1065
1066   LOG (GNUNET_ERROR_TYPE_DEBUG,
1067        "Client requested %" PRIu32 " random peer(s).\n",
1068        num_peers);
1069
1070   reply_cls = GNUNET_new (struct ReplyCls);
1071   reply_cls->id = ntohl (msg->id);
1072   reply_cls->client = client;
1073   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1074                                                         client_respond,
1075                                                         reply_cls,
1076                                                         num_peers);
1077
1078   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1079   GNUNET_assert (NULL != cli_ctx);
1080   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1081                                cli_ctx->rep_cls_tail,
1082                                reply_cls);
1083   GNUNET_SERVER_receive_done (client,
1084                               GNUNET_OK);
1085 }
1086
1087
1088 /**
1089  * @brief Handle a message that requests the cancellation of a request
1090  *
1091  * @param cls unused
1092  * @param client the client that requests the cancellation
1093  * @param message the message containing the id of the request
1094  */
1095 static void
1096 handle_client_request_cancel (void *cls,
1097                               struct GNUNET_SERVER_Client *client,
1098                               const struct GNUNET_MessageHeader *message)
1099 {
1100   struct GNUNET_RPS_CS_RequestCancelMessage *msg =
1101     (struct GNUNET_RPS_CS_RequestCancelMessage *) message;
1102   struct ClientContext *cli_ctx;
1103   struct ReplyCls *rep_cls;
1104
1105   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1106   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1107   rep_cls = cli_ctx->rep_cls_head;
1108   LOG (GNUNET_ERROR_TYPE_DEBUG,
1109       "Client cancels request with id %" PRIu32 "\n",
1110       ntohl (msg->id));
1111   while ( (NULL != rep_cls->next) &&
1112           (rep_cls->id != ntohl (msg->id)) )
1113     rep_cls = rep_cls->next;
1114   GNUNET_assert (rep_cls->id == ntohl (msg->id));
1115   RPS_sampler_request_cancel (rep_cls->req_handle);
1116   destroy_reply_cls (rep_cls);
1117   GNUNET_SERVER_receive_done (client,
1118                               GNUNET_OK);
1119 }
1120
1121
1122 /**
1123  * Handle seed from the client.
1124  *
1125  * @param cls closure
1126  * @param client identification of the client
1127  * @param message the actual message
1128  */
1129 static void
1130 handle_client_seed (void *cls,
1131                     struct GNUNET_SERVER_Client *client,
1132                     const struct GNUNET_MessageHeader *message)
1133 {
1134   struct GNUNET_RPS_CS_SeedMessage *in_msg;
1135   struct GNUNET_PeerIdentity *peers;
1136   uint32_t num_peers;
1137   uint32_t i;
1138
1139   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) > ntohs (message->size))
1140   {
1141     GNUNET_break_op (0);
1142     GNUNET_SERVER_receive_done (client,
1143                                 GNUNET_SYSERR);
1144   }
1145
1146   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
1147   num_peers = ntohl (in_msg->num_peers);
1148   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1149   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1150   //memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1151
1152   if ((ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1153       sizeof (struct GNUNET_PeerIdentity) != num_peers)
1154   {
1155     GNUNET_break_op (0);
1156     GNUNET_SERVER_receive_done (client,
1157                                 GNUNET_SYSERR);
1158     return;
1159   }
1160
1161   LOG (GNUNET_ERROR_TYPE_DEBUG,
1162        "Client seeded peers:\n");
1163   print_peer_list (peers, num_peers);
1164
1165   for (i = 0; i < num_peers; i++)
1166   {
1167     LOG (GNUNET_ERROR_TYPE_DEBUG,
1168          "Updating samplers with seed %" PRIu32 ": %s\n",
1169          i,
1170          GNUNET_i2s (&peers[i]));
1171
1172     got_peer (&peers[i]);
1173
1174     //RPS_sampler_update (prot_sampler,   &peers[i]);
1175     //RPS_sampler_update (client_sampler, &peers[i]);
1176   }
1177
1178   ////GNUNET_free (peers);
1179
1180   GNUNET_SERVER_receive_done (client,
1181                               GNUNET_OK);
1182 }
1183
1184
1185 /**
1186  * Handle a PUSH message from another peer.
1187  *
1188  * Check the proof of work and store the PeerID
1189  * in the temporary list for pushed PeerIDs.
1190  *
1191  * @param cls Closure
1192  * @param channel The channel the PUSH was received over
1193  * @param channel_ctx The context associated with this channel
1194  * @param msg The message header
1195  */
1196 static int
1197 handle_peer_push (void *cls,
1198                   struct GNUNET_CADET_Channel *channel,
1199                   void **channel_ctx,
1200                   const struct GNUNET_MessageHeader *msg)
1201 {
1202   const struct GNUNET_PeerIdentity *peer;
1203
1204   // (check the proof of work (?))
1205
1206   peer = (const struct GNUNET_PeerIdentity *)
1207     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1208   // FIXME wait for cadet to change this function
1209
1210   LOG (GNUNET_ERROR_TYPE_DEBUG,
1211        "Received PUSH (%s)\n",
1212        GNUNET_i2s (peer));
1213
1214 #ifdef ENABLE_MALICIOUS
1215   struct AttackedPeer *tmp_att_peer;
1216
1217   tmp_att_peer = GNUNET_new (struct AttackedPeer);
1218   memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1219   if (1 == mal_type
1220       || 3 == mal_type)
1221   { /* Try to maximise representation */
1222     if (NULL == att_peer_set)
1223       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1224     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1225                                                              peer))
1226     {
1227       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1228                                    att_peers_tail,
1229                                    tmp_att_peer);
1230       add_peer_array_to_set (peer, 1, att_peer_set);
1231     }
1232     return GNUNET_OK;
1233   }
1234
1235
1236   else if (2 == mal_type)
1237   { /* We attack one single well-known peer - simply ignore */
1238     return GNUNET_OK;
1239   }
1240   else
1241   {
1242     GNUNET_free (tmp_att_peer);
1243   }
1244
1245   #endif /* ENABLE_MALICIOUS */
1246
1247   /* Add the sending peer to the push_map */
1248   CustomPeerMap_put (push_map, peer);
1249
1250   GNUNET_CADET_receive_done (channel);
1251   return GNUNET_OK;
1252 }
1253
1254
1255 /**
1256  * Handle PULL REQUEST request message from another peer.
1257  *
1258  * Reply with the view of PeerIDs.
1259  *
1260  * @param cls Closure
1261  * @param channel The channel the PULL REQUEST was received over
1262  * @param channel_ctx The context associated with this channel
1263  * @param msg The message header
1264  */
1265 static int
1266 handle_peer_pull_request (void *cls,
1267                           struct GNUNET_CADET_Channel *channel,
1268                           void **channel_ctx,
1269                           const struct GNUNET_MessageHeader *msg)
1270 {
1271   struct GNUNET_PeerIdentity *peer;
1272   const struct GNUNET_PeerIdentity *view_array;
1273
1274   peer = (struct GNUNET_PeerIdentity *)
1275     GNUNET_CADET_channel_get_info (channel,
1276                                    GNUNET_CADET_OPTION_PEER);
1277   // FIXME wait for cadet to change this function
1278
1279   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
1280
1281   #ifdef ENABLE_MALICIOUS
1282   if (1 == mal_type
1283       || 3 == mal_type)
1284   { /* Try to maximise representation */
1285     send_pull_reply (peer, mal_peers, num_mal_peers);
1286     return GNUNET_OK;
1287   }
1288
1289   else if (2 == mal_type)
1290   { /* Try to partition network */
1291     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1292     {
1293       send_pull_reply (peer, mal_peers, num_mal_peers);
1294     }
1295     return GNUNET_OK;
1296   }
1297   #endif /* ENABLE_MALICIOUS */
1298
1299   view_array = View_get_as_array ();
1300
1301   send_pull_reply (peer, view_array, View_size ());
1302
1303   GNUNET_CADET_receive_done (channel);
1304   return GNUNET_OK;
1305 }
1306
1307
1308 /**
1309  * Handle PULL REPLY message from another peer.
1310  *
1311  * Check whether we sent a corresponding request and
1312  * whether this reply is the first one.
1313  *
1314  * @param cls Closure
1315  * @param channel The channel the PUSH was received over
1316  * @param channel_ctx The context associated with this channel
1317  * @param msg The message header
1318  */
1319 static int
1320 handle_peer_pull_reply (void *cls,
1321                         struct GNUNET_CADET_Channel *channel,
1322                         void **channel_ctx,
1323                         const struct GNUNET_MessageHeader *msg)
1324 {
1325   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1326   struct GNUNET_PeerIdentity *peers;
1327   struct GNUNET_PeerIdentity *sender;
1328   uint32_t i;
1329 #ifdef ENABLE_MALICIOUS
1330   struct AttackedPeer *tmp_att_peer;
1331 #endif /* ENABLE_MALICIOUS */
1332
1333   /* Check for protocol violation */
1334   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1335   {
1336     GNUNET_break_op (0);
1337     GNUNET_CADET_receive_done (channel);
1338     return GNUNET_SYSERR;
1339   }
1340
1341   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1342   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1343       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1344   {
1345     LOG (GNUNET_ERROR_TYPE_ERROR,
1346         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1347         ntohl (in_msg->num_peers),
1348         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1349             sizeof (struct GNUNET_PeerIdentity));
1350     GNUNET_break_op (0);
1351     GNUNET_CADET_receive_done (channel);
1352     return GNUNET_SYSERR;
1353   }
1354
1355   // Guess simply casting isn't the nicest way...
1356   // FIXME wait for cadet to change this function
1357   sender = (struct GNUNET_PeerIdentity *)
1358       GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1359
1360   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
1361
1362   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
1363   {
1364     LOG (GNUNET_ERROR_TYPE_WARNING,
1365         "Received a pull reply from a peer we didn't request one from!\n");
1366     GNUNET_break_op (0);
1367     GNUNET_CADET_receive_done (channel);
1368     return GNUNET_OK;
1369   }
1370
1371
1372   #ifdef ENABLE_MALICIOUS
1373   // We shouldn't even receive pull replies as we're not sending
1374   if (2 == mal_type)
1375     return GNUNET_OK;
1376   #endif /* ENABLE_MALICIOUS */
1377
1378   /* Do actual logic */
1379   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1380
1381   LOG (GNUNET_ERROR_TYPE_DEBUG,
1382        "PULL REPLY received, got following %u peers:\n",
1383        ntohl (in_msg->num_peers));
1384
1385   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1386   {
1387     LOG (GNUNET_ERROR_TYPE_DEBUG,
1388          "%u. %s\n",
1389          i,
1390          GNUNET_i2s (&peers[i]));
1391
1392     #ifdef ENABLE_MALICIOUS
1393     if ((NULL != att_peer_set) &&
1394         (1 == mal_type || 3 == mal_type))
1395     { /* Add attacked peer to local list */
1396       // TODO check if we sent a request and this was the first reply
1397       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1398                                                                &peers[i])
1399           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1400                                                                   &peers[i])
1401           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1402                                                    &own_identity))
1403       {
1404         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1405         tmp_att_peer->peer_id = peers[i];
1406         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1407                                      att_peers_tail,
1408                                      tmp_att_peer);
1409         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1410       }
1411       continue;
1412     }
1413     #endif /* ENABLE_MALICIOUS */
1414     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
1415                                               &peers[i]))
1416     {
1417       /* Make sure we 'know' about this peer */
1418       (void) Peers_check_peer_live (&peers[i]);
1419
1420       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
1421       {
1422         CustomPeerMap_put (pull_map, &peers[i]);
1423       }
1424       else
1425       {
1426         Peers_schedule_operation (&peers[i], insert_in_pull_map);
1427         Peers_issue_peer_liveliness_check (&peers[i]);
1428       }
1429     }
1430   }
1431
1432   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
1433   clean_peer (sender);
1434
1435   GNUNET_CADET_receive_done (channel);
1436   return GNUNET_OK;
1437 }
1438
1439
1440 /**
1441  * Compute a random delay.
1442  * A uniformly distributed value between mean + spread and mean - spread.
1443  *
1444  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1445  * It would return a random value between 2 and 6 min.
1446  *
1447  * @param mean the mean
1448  * @param spread the inverse amount of deviation from the mean
1449  */
1450 static struct GNUNET_TIME_Relative
1451 compute_rand_delay (struct GNUNET_TIME_Relative mean,
1452                     unsigned int spread)
1453 {
1454   struct GNUNET_TIME_Relative half_interval;
1455   struct GNUNET_TIME_Relative ret;
1456   unsigned int rand_delay;
1457   unsigned int max_rand_delay;
1458
1459   if (0 == spread)
1460   {
1461     LOG (GNUNET_ERROR_TYPE_WARNING,
1462          "Not accepting spread of 0\n");
1463     GNUNET_break (0);
1464   }
1465
1466   /* Compute random time value between spread * mean and spread * mean */
1467   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1468
1469   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1470   /**
1471    * Compute random value between (0 and 1) * round_interval
1472    * via multiplying round_interval with a 'fraction' (0 to value)/value
1473    */
1474   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1475   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1476   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1477   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1478
1479   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1480     LOG (GNUNET_ERROR_TYPE_WARNING,
1481          "Returning FOREVER_REL\n");
1482
1483   return ret;
1484 }
1485
1486
1487 /**
1488  * Send single pull request
1489  *
1490  * @param peer_id the peer to send the pull request to.
1491  */
1492 static void
1493 send_pull_request (const struct GNUNET_PeerIdentity *peer)
1494 {
1495   struct GNUNET_MQ_Envelope *ev;
1496
1497   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
1498                                                      Peers_PULL_REPLY_PENDING));
1499   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
1500
1501   LOG (GNUNET_ERROR_TYPE_DEBUG,
1502        "Going to send PULL REQUEST to peer %s.\n",
1503        GNUNET_i2s (peer));
1504
1505   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1506   Peers_send_message (peer, ev, "PULL REQUEST");
1507 }
1508
1509
1510 /**
1511  * Send single push
1512  *
1513  * @param peer_id the peer to send the push to.
1514  */
1515 static void
1516 send_push (const struct GNUNET_PeerIdentity *peer_id)
1517 {
1518   struct GNUNET_MQ_Envelope *ev;
1519
1520   LOG (GNUNET_ERROR_TYPE_DEBUG,
1521        "Going to send PUSH to peer %s.\n",
1522        GNUNET_i2s (peer_id));
1523
1524   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1525   Peers_send_message (peer_id, ev, "PUSH");
1526 }
1527
1528
1529 static void
1530 do_round (void *cls);
1531
1532 static void
1533 do_mal_round (void *cls);
1534
1535
1536 #ifdef ENABLE_MALICIOUS
1537 /**
1538  * Turn RPS service to act malicious.
1539  *
1540  * @param cls Closure
1541  * @param client The client that sent the message
1542  * @param msg The message header
1543  */
1544 static void
1545 handle_client_act_malicious (void *cls,
1546                              struct GNUNET_SERVER_Client *client,
1547                              const struct GNUNET_MessageHeader *msg)
1548 {
1549   struct GNUNET_RPS_CS_ActMaliciousMessage *in_msg;
1550   struct GNUNET_PeerIdentity *peers;
1551   uint32_t num_mal_peers_sent;
1552   uint32_t num_mal_peers_old;
1553
1554   /* Check for protocol violation */
1555   if (sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage) > ntohs (msg->size))
1556   {
1557     GNUNET_break_op (0);
1558   }
1559
1560   in_msg = (struct GNUNET_RPS_CS_ActMaliciousMessage *) msg;
1561   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1562       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1563   {
1564     LOG (GNUNET_ERROR_TYPE_ERROR,
1565         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1566         ntohl (in_msg->num_peers),
1567         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1568             sizeof (struct GNUNET_PeerIdentity));
1569     GNUNET_break_op (0);
1570   }
1571
1572
1573   /* Do actual logic */
1574   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1575   mal_type = ntohl (in_msg->type);
1576   if (NULL == mal_peer_set)
1577     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1578
1579   LOG (GNUNET_ERROR_TYPE_DEBUG,
1580        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
1581        mal_type,
1582        ntohl (in_msg->num_peers));
1583
1584   if (1 == mal_type)
1585   { /* Try to maximise representation */
1586     /* Add other malicious peers to those we already know */
1587
1588     num_mal_peers_sent = ntohl (in_msg->num_peers);
1589     num_mal_peers_old = num_mal_peers;
1590     GNUNET_array_grow (mal_peers,
1591                        num_mal_peers,
1592                        num_mal_peers + num_mal_peers_sent);
1593     memcpy (&mal_peers[num_mal_peers_old],
1594             peers,
1595             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1596
1597     /* Add all mal peers to mal_peer_set */
1598     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1599                            num_mal_peers_sent,
1600                            mal_peer_set);
1601
1602     /* Substitute do_round () with do_mal_round () */
1603     GNUNET_SCHEDULER_cancel (do_round_task);
1604     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1605   }
1606
1607   else if ( (2 == mal_type) ||
1608             (3 == mal_type) )
1609   { /* Try to partition the network */
1610     /* Add other malicious peers to those we already know */
1611
1612     num_mal_peers_sent = ntohl (in_msg->num_peers) - 1;
1613     num_mal_peers_old = num_mal_peers;
1614     GNUNET_array_grow (mal_peers,
1615                        num_mal_peers,
1616                        num_mal_peers + num_mal_peers_sent);
1617     if (NULL != mal_peers &&
1618         0 != num_mal_peers)
1619     {
1620       memcpy (&mal_peers[num_mal_peers_old],
1621               peers,
1622               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1623
1624       /* Add all mal peers to mal_peer_set */
1625       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1626                              num_mal_peers_sent,
1627                              mal_peer_set);
1628     }
1629
1630     /* Store the one attacked peer */
1631     memcpy (&attacked_peer,
1632             &in_msg->attacked_peer,
1633             sizeof (struct GNUNET_PeerIdentity));
1634     /* Set the flag of the attacked peer to valid to avoid problems */
1635     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
1636     {
1637       Peers_check_peer_live (&attacked_peer);
1638       Peers_issue_peer_liveliness_check (&attacked_peer);
1639     }
1640
1641     LOG (GNUNET_ERROR_TYPE_DEBUG,
1642          "Attacked peer is %s\n",
1643          GNUNET_i2s (&attacked_peer));
1644
1645     /* Substitute do_round () with do_mal_round () */
1646     GNUNET_SCHEDULER_cancel (do_round_task);
1647     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1648   }
1649   else if (0 == mal_type)
1650   { /* Stop acting malicious */
1651     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
1652
1653     /* Substitute do_mal_round () with do_round () */
1654     GNUNET_SCHEDULER_cancel (do_round_task);
1655     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1656   }
1657   else
1658   {
1659     GNUNET_break (0);
1660   }
1661   GNUNET_SERVER_receive_done (client,
1662                               GNUNET_OK);
1663 }
1664
1665
1666 /**
1667  * Send out PUSHes and PULLs maliciously.
1668  *
1669  * This is executed regylary.
1670  */
1671 static void
1672 do_mal_round (void *cls)
1673 {
1674   uint32_t num_pushes;
1675   uint32_t i;
1676   struct GNUNET_TIME_Relative time_next_round;
1677   struct AttackedPeer *tmp_att_peer;
1678
1679   LOG (GNUNET_ERROR_TYPE_DEBUG,
1680        "Going to execute next round maliciously type %" PRIu32 ".\n",
1681       mal_type);
1682   do_round_task = NULL;
1683   GNUNET_assert (mal_type <= 3);
1684   /* Do malicious actions */
1685   if (1 == mal_type)
1686   { /* Try to maximise representation */
1687
1688     /* The maximum of pushes we're going to send this round */
1689     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
1690                                          num_attacked_peers),
1691                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1692
1693     LOG (GNUNET_ERROR_TYPE_DEBUG,
1694          "Going to send %" PRIu32 " pushes\n",
1695          num_pushes);
1696
1697     /* Send PUSHes to attacked peers */
1698     for (i = 0 ; i < num_pushes ; i++)
1699     {
1700       if (att_peers_tail == att_peer_index)
1701         att_peer_index = att_peers_head;
1702       else
1703         att_peer_index = att_peer_index->next;
1704
1705       send_push (&att_peer_index->peer_id);
1706     }
1707
1708     /* Send PULLs to some peers to learn about additional peers to attack */
1709     tmp_att_peer = att_peer_index;
1710     for (i = 0 ; i < num_pushes * alpha ; i++)
1711     {
1712       if (att_peers_tail == tmp_att_peer)
1713         tmp_att_peer = att_peers_head;
1714       else
1715         att_peer_index = tmp_att_peer->next;
1716
1717       send_pull_request (&tmp_att_peer->peer_id);
1718     }
1719   }
1720
1721
1722   else if (2 == mal_type)
1723   { /**
1724      * Try to partition the network
1725      * Send as many pushes to the attacked peer as possible
1726      * That is one push per round as it will ignore more.
1727      */
1728     Peers_check_peer_live (&attacked_peer);
1729     if (GNUNET_YES == Peers_check_peer_valid (&attacked_peer))
1730       send_push (&attacked_peer);
1731   }
1732
1733
1734   if (3 == mal_type)
1735   { /* Combined attack */
1736
1737     /* Send PUSH to attacked peers */
1738     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
1739     {
1740       Peers_check_peer_live (&attacked_peer);
1741       if (GNUNET_YES == Peers_check_peer_valid (&attacked_peer))
1742       {
1743         LOG (GNUNET_ERROR_TYPE_DEBUG,
1744             "Goding to send push to attacked peer (%s)\n",
1745             GNUNET_i2s (&attacked_peer));
1746         send_push (&attacked_peer);
1747       }
1748       else
1749         Peers_issue_peer_liveliness_check (&attacked_peer);
1750     }
1751     else
1752       Peers_check_peer_live (&attacked_peer);
1753     Peers_issue_peer_liveliness_check (&attacked_peer);
1754
1755     /* The maximum of pushes we're going to send this round */
1756     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
1757                                          num_attacked_peers),
1758                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1759
1760     LOG (GNUNET_ERROR_TYPE_DEBUG,
1761          "Going to send %" PRIu32 " pushes\n",
1762          num_pushes);
1763
1764     for (i = 0; i < num_pushes; i++)
1765     {
1766       if (att_peers_tail == att_peer_index)
1767         att_peer_index = att_peers_head;
1768       else
1769         att_peer_index = att_peer_index->next;
1770
1771       send_push (&att_peer_index->peer_id);
1772     }
1773
1774     /* Send PULLs to some peers to learn about additional peers to attack */
1775     tmp_att_peer = att_peer_index;
1776     for (i = 0; i < num_pushes * alpha; i++)
1777     {
1778       if (att_peers_tail == tmp_att_peer)
1779         tmp_att_peer = att_peers_head;
1780       else
1781         att_peer_index = tmp_att_peer->next;
1782
1783       send_pull_request (&tmp_att_peer->peer_id);
1784     }
1785   }
1786
1787   /* Schedule next round */
1788   time_next_round = compute_rand_delay (round_interval, 2);
1789
1790   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
1791   //NULL);
1792   GNUNET_assert (NULL == do_round_task);
1793   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
1794                                                 &do_mal_round, NULL);
1795   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1796 }
1797 #endif /* ENABLE_MALICIOUS */
1798
1799
1800 /**
1801  * Send out PUSHes and PULLs, possibly update #view, samplers.
1802  *
1803  * This is executed regylary.
1804  */
1805 static void
1806 do_round (void *cls)
1807 {
1808   uint32_t i;
1809   const struct GNUNET_PeerIdentity *view_array;
1810   unsigned int *permut;
1811   unsigned int a_peers; /* Number of peers we send pushes to */
1812   unsigned int b_peers; /* Number of peers we send pull requests to */
1813   uint32_t first_border;
1814   uint32_t second_border;
1815   struct GNUNET_PeerIdentity peer;
1816   struct GNUNET_PeerIdentity *update_peer;
1817
1818   LOG (GNUNET_ERROR_TYPE_DEBUG,
1819        "Going to execute next round.\n");
1820   do_round_task = NULL;
1821   LOG (GNUNET_ERROR_TYPE_DEBUG,
1822        "Printing view:\n");
1823   to_file (file_name_view_log,
1824            "___ new round ___");
1825   view_array = View_get_as_array ();
1826   for (i = 0; i < View_size (); i++)
1827   {
1828     LOG (GNUNET_ERROR_TYPE_DEBUG,
1829          "\t%s\n", GNUNET_i2s (&view_array[i]));
1830     to_file (file_name_view_log,
1831              "=%s\t(do round)",
1832              GNUNET_i2s_full (&view_array[i]));
1833   }
1834
1835
1836   /* Send pushes and pull requests */
1837   if (0 < View_size ())
1838   {
1839     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1840                                            View_size ());
1841
1842     /* Send PUSHes */
1843     a_peers = ceil (alpha * View_size ());
1844
1845     LOG (GNUNET_ERROR_TYPE_DEBUG,
1846          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
1847          a_peers, alpha, View_size ());
1848     for (i = 0; i < a_peers; i++)
1849     {
1850       peer = view_array[permut[i]];
1851       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1852       { // FIXME if this fails schedule/loop this for later
1853         send_push (&peer);
1854       }
1855     }
1856
1857     /* Send PULL requests */
1858     b_peers = ceil (beta * View_size ());
1859     first_border = a_peers;
1860     second_border = a_peers + b_peers;
1861     if (second_border > View_size ())
1862     {
1863       first_border = View_size () - b_peers;
1864       second_border = View_size ();
1865     }
1866     LOG (GNUNET_ERROR_TYPE_DEBUG,
1867         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
1868         b_peers, beta, View_size ());
1869     for (i = first_border; i < second_border; i++)
1870     {
1871       peer = view_array[permut[i]];
1872       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
1873           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
1874       { // FIXME if this fails schedule/loop this for later
1875         send_pull_request (&peer);
1876       }
1877     }
1878
1879     GNUNET_free (permut);
1880     permut = NULL;
1881   }
1882
1883
1884   /* Update view */
1885   /* TODO see how many peers are in push-/pull- list! */
1886
1887   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
1888       (0 < CustomPeerMap_size (push_map)) &&
1889       (0 < CustomPeerMap_size (pull_map)))
1890   { /* If conditions for update are fulfilled, update */
1891     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
1892
1893     uint32_t final_size;
1894     uint32_t peers_to_clean_size;
1895     struct GNUNET_PeerIdentity *peers_to_clean;
1896
1897     peers_to_clean = NULL;
1898     peers_to_clean_size = 0;
1899     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
1900     memcpy (peers_to_clean,
1901             view_array,
1902             View_size () * sizeof (struct GNUNET_PeerIdentity));
1903
1904     /* Seems like recreating is the easiest way of emptying the peermap */
1905     View_clear ();
1906     to_file (file_name_view_log,
1907              "--- emptied ---");
1908
1909     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
1910                                 CustomPeerMap_size (push_map));
1911     second_border = first_border +
1912                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
1913                                 CustomPeerMap_size (pull_map));
1914     final_size    = second_border +
1915       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
1916
1917     /* Update view with peers received through PUSHes */
1918     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1919                                            CustomPeerMap_size (push_map));
1920     for (i = 0; i < first_border; i++)
1921     {
1922       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
1923                                                               permut[i]));
1924       to_file (file_name_view_log,
1925                "+%s\t(push list)",
1926                GNUNET_i2s_full (&view_array[i]));
1927       // TODO change the peer_flags accordingly
1928     }
1929     GNUNET_free (permut);
1930     permut = NULL;
1931
1932     /* Update view with peers received through PULLs */
1933     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1934                                            CustomPeerMap_size (pull_map));
1935     for (i = first_border; i < second_border; i++)
1936     {
1937       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
1938             permut[i - first_border]));
1939       to_file (file_name_view_log,
1940                "+%s\t(pull list)",
1941                GNUNET_i2s_full (&view_array[i]));
1942       // TODO change the peer_flags accordingly
1943     }
1944     GNUNET_free (permut);
1945     permut = NULL;
1946
1947     /* Update view with peers from history */
1948     RPS_sampler_get_n_rand_peers (prot_sampler,
1949                                   hist_update,
1950                                   NULL,
1951                                   final_size - second_border);
1952     // TODO change the peer_flags accordingly
1953
1954     for (i = 0; i < View_size (); i++)
1955       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
1956
1957     /* Clean peers that were removed from the view */
1958     for (i = 0; i < peers_to_clean_size; i++)
1959     {
1960       to_file (file_name_view_log,
1961                "-%s",
1962                GNUNET_i2s_full (&peers_to_clean[i]));
1963       Peers_clean_peer (&peers_to_clean[i]);
1964       //peer_destroy_channel_send (sender);
1965     }
1966
1967     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
1968     peers_to_clean = NULL;
1969   }
1970   else
1971   {
1972     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
1973   }
1974   // TODO independent of that also get some peers from CADET_get_peers()?
1975
1976   LOG (GNUNET_ERROR_TYPE_DEBUG,
1977        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
1978        CustomPeerMap_size (push_map),
1979        CustomPeerMap_size (pull_map),
1980        alpha,
1981        View_size (),
1982        alpha * View_size ());
1983
1984   /* Update samplers */
1985   for (i = 0; i < CustomPeerMap_size (push_map); i++)
1986   {
1987     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
1988     LOG (GNUNET_ERROR_TYPE_DEBUG,
1989          "Updating with peer %s from push list\n",
1990          GNUNET_i2s (update_peer));
1991     insert_in_sampler (NULL, update_peer);
1992     Peers_clean_peer (update_peer); /* This cleans only if it is not in the view */
1993     //peer_destroy_channel_send (sender);
1994   }
1995
1996   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
1997   {
1998     LOG (GNUNET_ERROR_TYPE_DEBUG,
1999          "Updating with peer %s from pull list\n",
2000          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
2001     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
2002     /* This cleans only if it is not in the view */
2003     Peers_clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
2004     //peer_destroy_channel_send (sender);
2005   }
2006
2007
2008   /* Empty push/pull lists */
2009   CustomPeerMap_clear (push_map);
2010   CustomPeerMap_clear (pull_map);
2011
2012   struct GNUNET_TIME_Relative time_next_round;
2013
2014   time_next_round = compute_rand_delay (round_interval, 2);
2015
2016   /* Schedule next round */
2017   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
2018                                                 &do_round, NULL);
2019   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
2020 }
2021
2022
2023 static void
2024 rps_start (struct GNUNET_SERVER_Handle *server);
2025
2026
2027 /**
2028  * This is called from GNUNET_CADET_get_peers().
2029  *
2030  * It is called on every peer(ID) that cadet somehow has contact with.
2031  * We use those to initialise the sampler.
2032  */
2033 void
2034 init_peer_cb (void *cls,
2035               const struct GNUNET_PeerIdentity *peer,
2036               int tunnel, // "Do we have a tunnel towards this peer?"
2037               unsigned int n_paths, // "Number of known paths towards this peer"
2038               unsigned int best_path) // "How long is the best path?
2039                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2040 {
2041   if (NULL != peer)
2042   {
2043     LOG (GNUNET_ERROR_TYPE_DEBUG,
2044          "Got peer_id %s from cadet\n",
2045          GNUNET_i2s (peer));
2046     got_peer (peer);
2047   }
2048 }
2049
2050 /**
2051  * @brief Iterator function over stored, valid peers.
2052  *
2053  * We initialise the sampler with those.
2054  *
2055  * @param cls the closure
2056  * @param peer the peer id
2057  * @return #GNUNET_YES if we should continue to
2058  *         iterate,
2059  *         #GNUNET_NO if not.
2060  */
2061 static int
2062 valid_peers_iterator (void *cls,
2063                       const struct GNUNET_PeerIdentity *peer)
2064 {
2065   if (NULL != peer)
2066   {
2067     LOG (GNUNET_ERROR_TYPE_DEBUG,
2068          "Got stored, valid peer %s\n",
2069          GNUNET_i2s (peer));
2070     got_peer (peer);
2071   }
2072   return GNUNET_YES;
2073 }
2074
2075
2076 /**
2077  * Iterator over peers from peerinfo.
2078  *
2079  * @param cls closure
2080  * @param peer id of the peer, NULL for last call
2081  * @param hello hello message for the peer (can be NULL)
2082  * @param error message
2083  */
2084 void
2085 process_peerinfo_peers (void *cls,
2086                         const struct GNUNET_PeerIdentity *peer,
2087                         const struct GNUNET_HELLO_Message *hello,
2088                         const char *err_msg)
2089 {
2090   if (NULL != peer)
2091   {
2092     LOG (GNUNET_ERROR_TYPE_DEBUG,
2093          "Got peer_id %s from peerinfo\n",
2094          GNUNET_i2s (peer));
2095     got_peer (peer);
2096   }
2097 }
2098
2099
2100 /**
2101  * Task run during shutdown.
2102  *
2103  * @param cls unused
2104  */
2105 static void
2106 shutdown_task (void *cls)
2107 {
2108   LOG (GNUNET_ERROR_TYPE_DEBUG,
2109        "RPS is going down\n");
2110   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
2111   GNUNET_PEERINFO_disconnect (peerinfo_handle);
2112
2113   if (NULL != do_round_task)
2114   {
2115     GNUNET_SCHEDULER_cancel (do_round_task);
2116     do_round_task = NULL;
2117   }
2118
2119   Peers_terminate ();
2120
2121   GNUNET_NSE_disconnect (nse);
2122   RPS_sampler_destroy (prot_sampler);
2123   RPS_sampler_destroy (client_sampler);
2124   GNUNET_CADET_disconnect (cadet_handle);
2125   View_destroy ();
2126   CustomPeerMap_destroy (push_map);
2127   CustomPeerMap_destroy (pull_map);
2128   #ifdef ENABLE_MALICIOUS
2129   struct AttackedPeer *tmp_att_peer;
2130   GNUNET_free (file_name_view_log);
2131   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2132   if (NULL != mal_peer_set)
2133     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2134   if (NULL != att_peer_set)
2135     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2136   while (NULL != att_peers_head)
2137   {
2138     tmp_att_peer = att_peers_head;
2139     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2140   }
2141   #endif /* ENABLE_MALICIOUS */
2142 }
2143
2144
2145 /**
2146  * @brief Get informed about a connecting client.
2147  *
2148  * @param cls unused
2149  * @param client the client that connects
2150  */
2151 static void
2152 handle_client_connect (void *cls,
2153                        struct GNUNET_SERVER_Client *client)
2154 {
2155   struct ClientContext *cli_ctx;
2156
2157   LOG (GNUNET_ERROR_TYPE_DEBUG,
2158        "Client connected\n");
2159   if (NULL == client)
2160     return; /* Server was destroyed before a client connected. Shutting down */
2161   cli_ctx = GNUNET_new (struct ClientContext);
2162   cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
2163   GNUNET_SERVER_client_set_user_context (client, cli_ctx);
2164   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2165                                cli_ctx_tail,
2166                                cli_ctx);
2167 }
2168
2169 /**
2170  * A client disconnected.  Remove all of its data structure entries.
2171  *
2172  * @param cls closure, NULL
2173  * @param client identification of the client
2174  */
2175 static void
2176 handle_client_disconnect (void *cls,
2177                                             struct GNUNET_SERVER_Client *client)
2178 {
2179   struct ClientContext *cli_ctx;
2180
2181   if (NULL == client)
2182   {/* shutdown task */
2183     while (NULL != cli_ctx_head)
2184       destroy_cli_ctx (cli_ctx_head);
2185   }
2186   else
2187   {
2188     cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
2189     destroy_cli_ctx (cli_ctx);
2190   }
2191 }
2192
2193
2194 /**
2195  * Actually start the service.
2196  */
2197   static void
2198 rps_start (struct GNUNET_SERVER_Handle *server)
2199 {
2200   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2201     {&handle_client_request,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2202       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2203     {&handle_client_request_cancel, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2204       sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)},
2205     {&handle_client_seed,           NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2206     #ifdef ENABLE_MALICIOUS
2207     {&handle_client_act_malicious,  NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2208     #endif /* ENABLE_MALICIOUS */
2209     {NULL, NULL, 0, 0}
2210   };
2211
2212   GNUNET_SERVER_add_handlers (server, handlers);
2213   GNUNET_SERVER_connect_notify (server,
2214                                 &handle_client_connect,
2215                                 NULL);
2216   GNUNET_SERVER_disconnect_notify (server,
2217                                    &handle_client_disconnect,
2218                                    NULL);
2219   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
2220
2221
2222   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2223   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2224
2225   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2226                                  NULL);
2227 }
2228
2229
2230 /**
2231  * Process statistics requests.
2232  *
2233  * @param cls closure
2234  * @param server the initialized server
2235  * @param c configuration to use
2236  */
2237   static void
2238 run (void *cls,
2239      struct GNUNET_SERVER_Handle *server,
2240      const struct GNUNET_CONFIGURATION_Handle *c)
2241 {
2242   int size;
2243   int out_size;
2244   char* fn_valid_peers;
2245
2246   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2247   cfg = c;
2248
2249
2250   /* Get own ID */
2251   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2252   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2253               "STARTING SERVICE (rps) for peer [%s]\n",
2254               GNUNET_i2s (&own_identity));
2255   #ifdef ENABLE_MALICIOUS
2256   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2257               "Malicious execution compiled in.\n");
2258   #endif /* ENABLE_MALICIOUS */
2259
2260
2261
2262   /* Get time interval from the configuration */
2263   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2264                                                         "ROUNDINTERVAL",
2265                                                         &round_interval))
2266   {
2267     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2268                                "RPS", "ROUNDINTERVAL");
2269     GNUNET_SCHEDULER_shutdown ();
2270     return;
2271   }
2272
2273   /* Get initial size of sampler/view from the configuration */
2274   if (GNUNET_OK !=
2275       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
2276         (long long unsigned int *) &sampler_size_est_need))
2277   {
2278     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2279                                "RPS", "INITSIZE");
2280     GNUNET_SCHEDULER_shutdown ();
2281     return;
2282   }
2283   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %u\n", sampler_size_est_need);
2284
2285   if (GNUNET_OK !=
2286       GNUNET_CONFIGURATION_get_value_filename (cfg,
2287                                                "rps",
2288                                                "FILENAME_VALID_PEERS",
2289                                                &fn_valid_peers))
2290   {
2291     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2292                                "rps", "FILENAME_VALID_PEERS");
2293   }
2294
2295
2296   View_create (4);
2297
2298   /* file_name_view_log */
2299   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
2300   {
2301     LOG (GNUNET_ERROR_TYPE_WARNING,
2302          "Failed to create directory /tmp/rps/\n");
2303   }
2304
2305   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
2306   file_name_view_log = GNUNET_malloc (size);
2307   out_size = GNUNET_snprintf (file_name_view_log,
2308                               size,
2309                               "/tmp/rps/view-%s",
2310                               GNUNET_i2s_full (&own_identity));
2311   if (size < out_size ||
2312       0 > out_size)
2313   {
2314     LOG (GNUNET_ERROR_TYPE_WARNING,
2315          "Failed to write string to buffer (size: %i, out_size: %i)\n",
2316          size,
2317          out_size);
2318   }
2319
2320
2321   /* connect to NSE */
2322   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2323
2324
2325   alpha = 0.45;
2326   beta  = 0.45;
2327
2328
2329   /* Initialise cadet */
2330   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2331     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        ,
2332       sizeof (struct GNUNET_MessageHeader)},
2333     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2334       sizeof (struct GNUNET_MessageHeader)},
2335     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
2336     {NULL, 0, 0}
2337   };
2338   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
2339   cadet_handle = GNUNET_CADET_connect (cfg,
2340                                        cls,
2341                                        &Peers_handle_inbound_channel,
2342                                        &cleanup_destroyed_channel,
2343                                        cadet_handlers,
2344                                        ports);
2345
2346   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
2347   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
2348   GNUNET_free (fn_valid_peers);
2349
2350   /* Initialise sampler */
2351   struct GNUNET_TIME_Relative half_round_interval;
2352   struct GNUNET_TIME_Relative  max_round_interval;
2353
2354   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2355   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2356
2357   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
2358   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
2359
2360   /* Initialise push and pull maps */
2361   push_map = CustomPeerMap_create (4);
2362   pull_map = CustomPeerMap_create (4);
2363
2364
2365   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2366   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2367   // TODO send push/pull to each of those peers?
2368   // TODO read stored valid peers from last run
2369   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
2370   Peers_get_valid_peers (valid_peers_iterator, NULL);
2371
2372   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
2373                                                    GNUNET_NO,
2374                                                    process_peerinfo_peers,
2375                                                    NULL);
2376
2377   rps_start (server);
2378 }
2379
2380
2381 /**
2382  * The main function for the rps service.
2383  *
2384  * @param argc number of arguments from the command line
2385  * @param argv command line arguments
2386  * @return 0 ok, 1 on error
2387  */
2388 int
2389 main (int argc, char *const *argv)
2390 {
2391   return (GNUNET_OK ==
2392           GNUNET_SERVICE_run (argc,
2393                               argv,
2394                               "rps",
2395                               GNUNET_SERVICE_OPTION_NONE,
2396                               &run, NULL)) ? 0 : 1;
2397 }
2398
2399 /* end of gnunet-service-rps.c */