social cli
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_peerinfo_service.h"
30 #include "gnunet_nse_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_peers.h"
36 #include "gnunet-service-rps_view.h"
37
38 #include <math.h>
39 #include <inttypes.h>
40
41 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
42
43 // TODO modify @brief in every file
44
45 // TODO check for overflows
46
47 // TODO align message structs
48
49 // TODO connect to friends
50
51 // TODO store peers somewhere persistent
52
53 // TODO blacklist? (-> mal peer detection on top of brahms)
54
55 // hist_size_init, hist_size_max
56
57 /**
58  * Our configuration.
59  */
60 static const struct GNUNET_CONFIGURATION_Handle *cfg;
61
62 /**
63  * Our own identity.
64  */
65 static struct GNUNET_PeerIdentity own_identity;
66
67
68 /***********************************************************************
69  * Housekeeping with clients
70 ***********************************************************************/
71
72 /**
73  * Closure used to pass the client and the id to the callback
74  * that replies to a client's request
75  */
76 struct ReplyCls
77 {
78   /**
79    * DLL
80    */
81   struct ReplyCls *next;
82   struct ReplyCls *prev;
83
84   /**
85    * The identifier of the request
86    */
87   uint32_t id;
88
89   /**
90    * The handle to the request
91    */
92   struct RPS_SamplerRequestHandle *req_handle;
93
94   /**
95    * The client handle to send the reply to
96    */
97   struct GNUNET_SERVER_Client *client;
98 };
99
100
101 /**
102  * Struct used to store the context of a connected client.
103  */
104 struct ClientContext
105 {
106   /**
107    * DLL
108    */
109   struct ClientContext *next;
110   struct ClientContext *prev;
111
112   /**
113    * The message queue to communicate with the client.
114    */
115   struct GNUNET_MQ_Handle *mq;
116
117   /**
118    * DLL with handles to single requests from the client
119    */
120   struct ReplyCls *rep_cls_head;
121   struct ReplyCls *rep_cls_tail;
122 };
123
124 /**
125  * DLL with all clients currently connected to us
126  */
127 struct ClientContext *cli_ctx_head;
128 struct ClientContext *cli_ctx_tail;
129
130 /***********************************************************************
131  * /Housekeeping with clients
132 ***********************************************************************/
133
134
135
136
137
138 /***********************************************************************
139  * Globals
140 ***********************************************************************/
141
142 /**
143  * Sampler used for the Brahms protocol itself.
144  */
145 static struct RPS_Sampler *prot_sampler;
146
147 /**
148  * Sampler used for the clients.
149  */
150 static struct RPS_Sampler *client_sampler;
151
152 /**
153  * Name to log view to
154  */
155 static char *file_name_view_log;
156
157 /**
158  * The size of sampler we need to be able to satisfy the client's need
159  * of random peers.
160  */
161 static unsigned int sampler_size_client_need;
162
163 /**
164  * The size of sampler we need to be able to satisfy the Brahms protocol's
165  * need of random peers.
166  *
167  * This is one minimum size the sampler grows to.
168  */
169 static unsigned int sampler_size_est_need;
170
171 /**
172  * Percentage of total peer number in the view
173  * to send random PUSHes to
174  */
175 static float alpha;
176
177 /**
178  * Percentage of total peer number in the view
179  * to send random PULLs to
180  */
181 static float beta;
182
183 /**
184  * Identifier for the main task that runs periodically.
185  */
186 static struct GNUNET_SCHEDULER_Task *do_round_task;
187
188 /**
189  * Time inverval the do_round task runs in.
190  */
191 static struct GNUNET_TIME_Relative round_interval;
192
193 /**
194  * List to store peers received through pushes temporary.
195  */
196 static struct CustomPeerMap *push_map;
197
198 /**
199  * List to store peers received through pulls temporary.
200  */
201 static struct CustomPeerMap *pull_map;
202
203 /**
204  * Handler to NSE.
205  */
206 static struct GNUNET_NSE_Handle *nse;
207
208 /**
209  * Handler to CADET.
210  */
211 static struct GNUNET_CADET_Handle *cadet_handle;
212
213 /**
214  * Handler to PEERINFO.
215  */
216 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
217
218 /**
219  * Handle for cancellation of iteration over peers.
220  */
221 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
222
223 /**
224  * Request counter.
225  *
226  * Counts how many requets clients already issued.
227  * Only needed in the beginning to check how many of the 64 deltas
228  * we already have
229  */
230 static unsigned int req_counter;
231
232 /**
233  * Time of the last request we received.
234  *
235  * Used to compute the expected request rate.
236  */
237 static struct GNUNET_TIME_Absolute last_request;
238
239 /**
240  * Size of #request_deltas.
241  */
242 #define REQUEST_DELTAS_SIZE 64
243 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
244
245 /**
246  * Last 64 deltas between requests
247  */
248 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
249
250 /**
251  * The prediction of the rate of requests
252  */
253 static struct GNUNET_TIME_Relative request_rate;
254
255 /**
256  * Number of history update tasks.
257  */
258 static uint32_t num_hist_update_tasks;
259
260
261 #ifdef ENABLE_MALICIOUS
262 /**
263  * Type of malicious peer
264  *
265  * 0 Don't act malicious at all - Default
266  * 1 Try to maximise representation
267  * 2 Try to partition the network
268  * 3 Combined attack
269  */
270 static uint32_t mal_type;
271
272 /**
273  * Other malicious peers
274  */
275 static struct GNUNET_PeerIdentity *mal_peers;
276
277 /**
278  * Hashmap of malicious peers used as set.
279  * Used to more efficiently check whether we know that peer.
280  */
281 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
282
283 /**
284  * Number of other malicious peers
285  */
286 static uint32_t num_mal_peers;
287
288
289 /**
290  * If type is 2 This struct is used to store the attacked peers in a DLL
291  */
292 struct AttackedPeer
293 {
294   /**
295    * DLL
296    */
297   struct AttackedPeer *next;
298   struct AttackedPeer *prev;
299
300   /**
301    * PeerID
302    */
303   struct GNUNET_PeerIdentity peer_id;
304 };
305
306 /**
307  * If type is 2 this is the DLL of attacked peers
308  */
309 static struct AttackedPeer *att_peers_head;
310 static struct AttackedPeer *att_peers_tail;
311
312 /**
313  * This index is used to point to an attacked peer to
314  * implement the round-robin-ish way to select attacked peers.
315  */
316 static struct AttackedPeer *att_peer_index;
317
318 /**
319  * Hashmap of attacked peers used as set.
320  * Used to more efficiently check whether we know that peer.
321  */
322 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
323
324 /**
325  * Number of attacked peers
326  */
327 static uint32_t num_attacked_peers;
328
329 /**
330  * If type is 1 this is the attacked peer
331  */
332 static struct GNUNET_PeerIdentity attacked_peer;
333
334 /**
335  * The limit of PUSHes we can send in one round.
336  * This is an assumption of the Brahms protocol and either implemented
337  * via proof of work
338  * or
339  * assumend to be the bandwidth limitation.
340  */
341 static uint32_t push_limit = 10000;
342 #endif /* ENABLE_MALICIOUS */
343
344
345 /***********************************************************************
346  * /Globals
347 ***********************************************************************/
348
349
350 /***********************************************************************
351  * Util functions
352 ***********************************************************************/
353
354
355 /**
356  * Print peerlist to log.
357  */
358 static void
359 print_peer_list (struct GNUNET_PeerIdentity *list,
360                  unsigned int len)
361 {
362   unsigned int i;
363
364   LOG (GNUNET_ERROR_TYPE_DEBUG,
365        "Printing peer list of length %u at %p:\n",
366        len,
367        list);
368   for (i = 0 ; i < len ; i++)
369   {
370     LOG (GNUNET_ERROR_TYPE_DEBUG,
371          "%u. peer: %s\n",
372          i, GNUNET_i2s (&list[i]));
373   }
374 }
375
376
377 /**
378  * Remove peer from list.
379  */
380 static void
381 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
382                unsigned int *list_size,
383                const struct GNUNET_PeerIdentity *peer)
384 {
385   unsigned int i;
386   struct GNUNET_PeerIdentity *tmp;
387
388   tmp = *peer_list;
389
390   LOG (GNUNET_ERROR_TYPE_DEBUG,
391        "Removing peer %s from list at %p\n",
392        GNUNET_i2s (peer),
393        tmp);
394
395   for ( i = 0 ; i < *list_size ; i++ )
396   {
397     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
398     {
399       if (i < *list_size -1)
400       { /* Not at the last entry -- shift peers left */
401         memcpy (&tmp[i], &tmp[i +1],
402                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
403       }
404       /* Remove last entry (should be now useless PeerID) */
405       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
406     }
407   }
408   *peer_list = tmp;
409 }
410
411
412 /**
413  * Sum all time relatives of an array.
414  */
415 static struct GNUNET_TIME_Relative
416 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
417                 uint32_t arr_size)
418 {
419   struct GNUNET_TIME_Relative sum;
420   uint32_t i;
421
422   sum = GNUNET_TIME_UNIT_ZERO;
423   for ( i = 0 ; i < arr_size ; i++ )
424   {
425     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
426   }
427   return sum;
428 }
429
430
431 /**
432  * Compute the average of given time relatives.
433  */
434 static struct GNUNET_TIME_Relative
435 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
436                 uint32_t arr_size)
437 {
438   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
439                                                       arr_size),
440                                       arr_size);
441 }
442
443
444 /**
445  * Put random peer from sampler into the view as history update.
446  */
447 static void
448 hist_update (void *cls,
449              struct GNUNET_PeerIdentity *ids,
450              uint32_t num_peers)
451 {
452   unsigned int i;
453
454   for (i = 0; i < num_peers; i++)
455   {
456     View_put (&ids[i]);
457     to_file (file_name_view_log,
458              "+%s\t(hist)",
459              GNUNET_i2s_full (ids));
460   }
461   if (0 < num_hist_update_tasks)
462     num_hist_update_tasks--;
463 }
464
465
466 /**
467  * Wrapper around #RPS_sampler_resize()
468  *
469  * If we do not have enough sampler elements, double current sampler size
470  * If we have more than enough sampler elements, halv current sampler size
471  */
472 static void
473 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
474 {
475   unsigned int sampler_size;
476
477   // TODO statistics
478   // TODO respect the min, max
479   sampler_size = RPS_sampler_get_size (sampler);
480   if (sampler_size > new_size * 4)
481   { /* Shrinking */
482     RPS_sampler_resize (sampler, sampler_size / 2);
483   }
484   else if (sampler_size < new_size)
485   { /* Growing */
486     RPS_sampler_resize (sampler, sampler_size * 2);
487   }
488   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
489 }
490
491
492 /**
493  * Wrapper around #RPS_sampler_resize() resizing the client sampler
494  */
495 static void
496 client_resize_wrapper ()
497 {
498   uint32_t bigger_size;
499
500   // TODO statistics
501
502   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
503
504   // TODO respect the min, max
505   resize_wrapper (client_sampler, bigger_size);
506   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
507       bigger_size);
508 }
509
510
511 /**
512  * Estimate request rate
513  *
514  * Called every time we receive a request from the client.
515  */
516 static void
517 est_request_rate()
518 {
519   struct GNUNET_TIME_Relative max_round_duration;
520
521   if (request_deltas_size > req_counter)
522     req_counter++;
523   if ( 1 < req_counter)
524   {
525     /* Shift last request deltas to the right */
526     memcpy (&request_deltas[1],
527         request_deltas,
528         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
529
530     /* Add current delta to beginning */
531     request_deltas[0] =
532         GNUNET_TIME_absolute_get_difference (last_request,
533                                              GNUNET_TIME_absolute_get ());
534     request_rate = T_relative_avg (request_deltas, req_counter);
535     request_rate = (request_rate.rel_value_us < 1) ?
536       GNUNET_TIME_relative_get_unit_ () : request_rate;
537
538     /* Compute the duration a round will maximally take */
539     max_round_duration =
540         GNUNET_TIME_relative_add (round_interval,
541                                   GNUNET_TIME_relative_divide (round_interval, 2));
542
543     /* Set the estimated size the sampler has to have to
544      * satisfy the current client request rate */
545     sampler_size_client_need =
546         max_round_duration.rel_value_us / request_rate.rel_value_us;
547
548     /* Resize the sampler */
549     client_resize_wrapper ();
550   }
551   last_request = GNUNET_TIME_absolute_get ();
552 }
553
554
555 /**
556  * Add all peers in @a peer_array to @a peer_map used as set.
557  *
558  * @param peer_array array containing the peers
559  * @param num_peers number of peers in @peer_array
560  * @param peer_map the peermap to use as set
561  */
562 static void
563 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
564                        unsigned int num_peers,
565                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
566 {
567   unsigned int i;
568   if (NULL == peer_map)
569   {
570     LOG (GNUNET_ERROR_TYPE_WARNING,
571          "Trying to add peers to non-existing peermap.\n");
572     return;
573   }
574
575   for (i = 0; i < num_peers; i++)
576   {
577     GNUNET_CONTAINER_multipeermap_put (peer_map,
578                                        &peer_array[i],
579                                        NULL,
580                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
581   }
582 }
583
584
585 /**
586  * Send a PULL REPLY to @a peer_id
587  *
588  * @param peer_id the peer to send the reply to.
589  * @param peer_ids the peers to send to @a peer_id
590  * @param num_peer_ids the number of peers to send to @a peer_id
591  */
592 static void
593 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
594                  const struct GNUNET_PeerIdentity *peer_ids,
595                  unsigned int num_peer_ids)
596 {
597   uint32_t send_size;
598   struct GNUNET_MQ_Envelope *ev;
599   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
600
601   /* Compute actual size */
602   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
603               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
604
605   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
606     /* Compute number of peers to send
607      * If too long, simply truncate */
608     // TODO select random ones via permutation
609     //      or even better: do good protocol design
610     send_size =
611       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
612        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
613        sizeof (struct GNUNET_PeerIdentity);
614   else
615     send_size = num_peer_ids;
616
617   LOG (GNUNET_ERROR_TYPE_DEBUG,
618       "Going to send PULL REPLY with %u peers to %s\n",
619       send_size, GNUNET_i2s (peer_id));
620
621   ev = GNUNET_MQ_msg_extra (out_msg,
622                             send_size * sizeof (struct GNUNET_PeerIdentity),
623                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
624   out_msg->num_peers = htonl (send_size);
625   memcpy (&out_msg[1], peer_ids,
626          send_size * sizeof (struct GNUNET_PeerIdentity));
627
628   Peers_send_message (peer_id, ev, "PULL REPLY");
629 }
630
631
632 /**
633  * Insert PeerID in #pull_map
634  *
635  * Called once we know a peer is live.
636  */
637 static void
638 insert_in_pull_map (void *cls,
639                     const struct GNUNET_PeerIdentity *peer)
640 {
641   CustomPeerMap_put (pull_map, peer);
642 }
643
644
645 /**
646  * Insert PeerID in #view
647  *
648  * Called once we know a peer is live.
649  */
650 static void
651 insert_in_view (void *cls,
652                 const struct GNUNET_PeerIdentity *peer)
653 {
654   View_put (peer);
655 }
656
657
658 /**
659  * Update sampler with given PeerID.
660  */
661 static void
662 insert_in_sampler (void *cls,
663                    const struct GNUNET_PeerIdentity *peer)
664 {
665   LOG (GNUNET_ERROR_TYPE_DEBUG,
666        "Updating samplers with peer %s from insert_in_sampler()\n",
667        GNUNET_i2s (peer));
668   RPS_sampler_update (prot_sampler,   peer);
669   RPS_sampler_update (client_sampler, peer);
670   if (0 < RPS_sampler_count_id (prot_sampler, peer))
671   {
672     /* Make sure we 'know' about this peer */
673     (void) Peers_insert_peer_check_liveliness (peer);
674     /* Establish a channel towards that peer to indicate we are going to send
675      * messages to it */
676     Peers_indicate_sending_intention (peer);
677     //Peers_issue_peer_liveliness_check (peer);
678   }
679 }
680
681
682 /**
683  * @brief Checks if there is a sending channel and if it is needed
684  *
685  * @param peer the peer whose sending channel is checked
686  * @return GNUNET_YES if sending channel exists and is still needed
687  *         GNUNET_NO  otherwise
688  */
689 static int
690 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
691 {
692   /* struct GNUNET_CADET_Channel *channel; */
693   if (GNUNET_NO == Peers_check_peer_known (peer))
694   {
695     return GNUNET_NO;
696   }
697   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
698   {
699     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
700          (GNUNET_YES == View_contains_peer (peer)) ||
701          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
702          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
703          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
704     { /* If we want to keep the connection to peer open */
705       return GNUNET_YES;
706     }
707     return GNUNET_NO;
708   }
709   return GNUNET_NO;
710 }
711
712 /**
713  * @brief remove peer from our knowledge, the view, push and pull maps and
714  * samplers.
715  *
716  * @param peer the peer to remove
717  */
718 static void
719 remove_peer (const struct GNUNET_PeerIdentity *peer)
720 {
721   View_remove_peer (peer);
722   CustomPeerMap_remove_peer (pull_map, peer);
723   CustomPeerMap_remove_peer (push_map, peer);
724   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
725   RPS_sampler_reinitialise_by_value (client_sampler, peer);
726   Peers_remove_peer (peer);
727 }
728
729
730 /**
731  * @brief Remove data that is not needed anymore.
732  *
733  * If the sending channel is no longer needed it is destroyed.
734  *
735  * @param peer the peer whose data is about to be cleaned
736  */
737 static void
738 clean_peer (const struct GNUNET_PeerIdentity *peer)
739 {
740   if (GNUNET_NO == check_sending_channel_needed (peer))
741   {
742     #ifdef ENABLE_MALICIOUS
743     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
744       Peers_destroy_sending_channel (peer);
745     #else /* ENABLE_MALICIOUS */
746     Peers_destroy_sending_channel (peer);
747     #endif /* ENABLE_MALICIOUS */
748   }
749
750   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
751        (GNUNET_NO == View_contains_peer (peer)) &&
752        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
753        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
754        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
755        (0 == RPS_sampler_count_id (client_sampler, peer)) )
756   { /* We can safely remov this peer */
757     remove_peer (peer);
758     return;
759   }
760   Peers_clean_peer (peer);
761 }
762
763 /**
764  * @brief This is called when a channel is destroyed.
765  *
766  * Removes peer completely from our knowledge if the send_channel was destroyed
767  * Otherwise simply delete the recv_channel
768  *
769  * @param cls The closure
770  * @param channel The channel being closed
771  * @param channel_ctx The context associated with this channel
772  */
773 static void
774 cleanup_destroyed_channel (void *cls,
775                            const struct GNUNET_CADET_Channel *channel,
776                            void *channel_ctx)
777 {
778   struct GNUNET_PeerIdentity *peer;
779
780   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
781       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
782        // FIXME wait for cadet to change this function
783
784   if (GNUNET_NO == Peers_check_peer_known (peer))
785   { /* We don't know a context to that peer */
786     LOG (GNUNET_ERROR_TYPE_WARNING,
787          "channel (%s) without associated context was destroyed\n",
788          GNUNET_i2s (peer));
789     return;
790   }
791
792   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
793   { /* We are in the middle of removing that peer from our knowledge. In this
794        case simply make sure that the channels are cleaned. */
795     Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
796     to_file (file_name_view_log,
797              "-%s\t(cleanup channel, ourself)",
798              GNUNET_i2s_full (peer));
799     return;
800   }
801
802   if (GNUNET_YES ==
803       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
804   { /* Channel used for sending was destroyed */
805     /* Possible causes of channel destruction:
806      *  - ourselves  -> cleaning send channel -> clean context
807      *  - other peer -> peer probably went down -> remove
808      */
809     if (GNUNET_YES == Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_CLEAN))
810     { /* We are about to clean the sending channel. Clean the respective
811        * context */
812       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
813       return;
814     }
815     else
816     { /* Other peer destroyed our sending channel that he is supposed to keep
817        * open. It probably went down. Remove it from our knowledge. */
818       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
819       remove_peer (peer);
820       return;
821     }
822   }
823   else if (GNUNET_YES ==
824       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
825   { /* Channel used for receiving was destroyed */
826     /* Possible causes of channel destruction:
827      *  - ourselves  -> peer tried to establish channel twice -> clean context
828      *  - other peer -> peer doesn't want to send us data -> clean
829      */
830     if (GNUNET_YES ==
831         Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_ESTABLISHED_TWICE))
832     { /* Other peer tried to establish a channel to us twice. We do not accept
833        * that. Clean the context. */
834       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
835       return;
836     }
837     else
838     { /* Other peer doesn't want to send us data anymore. We are free to clean
839        * it. */
840       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
841       clean_peer (peer);
842       return;
843     }
844   }
845   else
846   {
847     LOG (GNUNET_ERROR_TYPE_WARNING,
848         "Destroyed channel is neither sending nor receiving channel\n");
849   }
850 }
851
852 /***********************************************************************
853  * /Util functions
854 ***********************************************************************/
855
856 static void
857 destroy_reply_cls (struct ReplyCls *rep_cls)
858 {
859   struct ClientContext *cli_ctx;
860
861   cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client,
862                                                    struct ClientContext);
863   GNUNET_assert (NULL != cli_ctx);
864   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
865                                cli_ctx->rep_cls_tail,
866                                rep_cls);
867   GNUNET_free (rep_cls);
868 }
869
870
871 static void
872 destroy_cli_ctx (struct ClientContext *cli_ctx)
873 {
874   GNUNET_assert (NULL != cli_ctx);
875   if (NULL != cli_ctx->rep_cls_head)
876   {
877     LOG (GNUNET_ERROR_TYPE_WARNING,
878          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
879     while (NULL != cli_ctx->rep_cls_head)
880       destroy_reply_cls (cli_ctx->rep_cls_head);
881   }
882   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
883                                cli_ctx_tail,
884                                cli_ctx);
885   GNUNET_free (cli_ctx);
886 }
887
888
889 /**
890  * Function called by NSE.
891  *
892  * Updates sizes of sampler list and view and adapt those lists
893  * accordingly.
894  */
895 static void
896 nse_callback (void *cls,
897               struct GNUNET_TIME_Absolute timestamp,
898               double logestimate, double std_dev)
899 {
900   double estimate;
901   //double scale; // TODO this might go gloabal/config
902
903   LOG (GNUNET_ERROR_TYPE_DEBUG,
904        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
905        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
906   //scale = .01;
907   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
908   // GNUNET_NSE_log_estimate_to_n (logestimate);
909   estimate = pow (estimate, 1.0 / 3);
910   // TODO add if std_dev is a number
911   // estimate += (std_dev * scale);
912   if (2 < ceil (estimate))
913   {
914     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
915     sampler_size_est_need = estimate;
916   } else
917     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
918
919   /* If the NSE has changed adapt the lists accordingly */
920   resize_wrapper (prot_sampler, sampler_size_est_need);
921   client_resize_wrapper ();
922 }
923
924
925 /**
926  * Callback called once the requested PeerIDs are ready.
927  *
928  * Sends those to the requesting client.
929  */
930 static void
931 client_respond (void *cls,
932                 struct GNUNET_PeerIdentity *peer_ids,
933                 uint32_t num_peers)
934 {
935   uint32_t i;
936   struct GNUNET_MQ_Envelope *ev;
937   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
938   struct ReplyCls *reply_cls = (struct ReplyCls *) cls;
939   uint32_t size_needed;
940   struct ClientContext *cli_ctx;
941
942   GNUNET_assert (NULL != reply_cls);
943   LOG (GNUNET_ERROR_TYPE_DEBUG,
944        "sampler returned %" PRIu32 " peers:\n",
945        num_peers);
946   for (i = 0; i < num_peers; i++)
947   {
948     LOG (GNUNET_ERROR_TYPE_DEBUG,
949          "  %lu: %s\n",
950          i,
951          GNUNET_i2s (&peer_ids[i]));
952   }
953
954   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
955                 num_peers * sizeof (struct GNUNET_PeerIdentity);
956
957   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
958
959   ev = GNUNET_MQ_msg_extra (out_msg,
960                             num_peers * sizeof (struct GNUNET_PeerIdentity),
961                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
962   out_msg->num_peers = htonl (num_peers);
963   out_msg->id = htonl (reply_cls->id);
964
965   memcpy (&out_msg[1],
966           peer_ids,
967           num_peers * sizeof (struct GNUNET_PeerIdentity));
968   GNUNET_free (peer_ids);
969
970   cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client,
971                                                    struct ClientContext);
972   GNUNET_assert (NULL != cli_ctx);
973   destroy_reply_cls (reply_cls);
974   GNUNET_MQ_send (cli_ctx->mq, ev);
975 }
976
977
978 /**
979  * Handle RPS request from the client.
980  *
981  * @param cls closure
982  * @param client identification of the client
983  * @param message the actual message
984  */
985 static void
986 handle_client_request (void *cls,
987                        struct GNUNET_SERVER_Client *client,
988                        const struct GNUNET_MessageHeader *message)
989 {
990   struct GNUNET_RPS_CS_RequestMessage *msg;
991   uint32_t num_peers;
992   uint32_t size_needed;
993   struct ReplyCls *reply_cls;
994   uint32_t i;
995   struct ClientContext *cli_ctx;
996
997   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
998
999   num_peers = ntohl (msg->num_peers);
1000   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1001                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1002
1003   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1004   {
1005     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1006                 "Message received from client has size larger than expected\n");
1007     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1008     return;
1009   }
1010
1011   for (i = 0 ; i < num_peers ; i++)
1012     est_request_rate();
1013
1014   LOG (GNUNET_ERROR_TYPE_DEBUG,
1015        "Client requested %" PRIu32 " random peer(s).\n",
1016        num_peers);
1017
1018   reply_cls = GNUNET_new (struct ReplyCls);
1019   reply_cls->id = ntohl (msg->id);
1020   reply_cls->client = client;
1021   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1022                                                         client_respond,
1023                                                         reply_cls,
1024                                                         num_peers);
1025
1026   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1027   GNUNET_assert (NULL != cli_ctx);
1028   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1029                                cli_ctx->rep_cls_tail,
1030                                reply_cls);
1031   GNUNET_SERVER_receive_done (client,
1032                               GNUNET_OK);
1033 }
1034
1035
1036 /**
1037  * @brief Handle a message that requests the cancellation of a request
1038  *
1039  * @param cls unused
1040  * @param client the client that requests the cancellation
1041  * @param message the message containing the id of the request
1042  */
1043 static void
1044 handle_client_request_cancel (void *cls,
1045                               struct GNUNET_SERVER_Client *client,
1046                               const struct GNUNET_MessageHeader *message)
1047 {
1048   struct GNUNET_RPS_CS_RequestCancelMessage *msg =
1049     (struct GNUNET_RPS_CS_RequestCancelMessage *) message;
1050   struct ClientContext *cli_ctx;
1051   struct ReplyCls *rep_cls;
1052
1053   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1054   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1055   rep_cls = cli_ctx->rep_cls_head;
1056   LOG (GNUNET_ERROR_TYPE_DEBUG,
1057       "Client cancels request with id %lu\n",
1058       ntohl (msg->id));
1059   while ( (NULL != rep_cls->next) &&
1060           (rep_cls->id != ntohl (msg->id)) )
1061     rep_cls = rep_cls->next;
1062   GNUNET_assert (rep_cls->id == ntohl (msg->id));
1063   RPS_sampler_request_cancel (rep_cls->req_handle);
1064   destroy_reply_cls (rep_cls);
1065   GNUNET_SERVER_receive_done (client,
1066                               GNUNET_OK);
1067 }
1068
1069
1070 /**
1071  * Handle seed from the client.
1072  *
1073  * @param cls closure
1074  * @param client identification of the client
1075  * @param message the actual message
1076  */
1077 static void
1078 handle_client_seed (void *cls,
1079                     struct GNUNET_SERVER_Client *client,
1080                     const struct GNUNET_MessageHeader *message)
1081 {
1082   struct GNUNET_RPS_CS_SeedMessage *in_msg;
1083   struct GNUNET_PeerIdentity *peers;
1084   uint32_t num_peers;
1085   uint32_t i;
1086
1087   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) > ntohs (message->size))
1088   {
1089     GNUNET_break_op (0);
1090     GNUNET_SERVER_receive_done (client,
1091                                 GNUNET_SYSERR);
1092   }
1093
1094   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
1095   num_peers = ntohl (in_msg->num_peers);
1096   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1097   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1098   //memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1099
1100   if ((ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1101       sizeof (struct GNUNET_PeerIdentity) != num_peers)
1102   {
1103     GNUNET_break_op (0);
1104     GNUNET_SERVER_receive_done (client,
1105                                 GNUNET_SYSERR);
1106     return;
1107   }
1108
1109   LOG (GNUNET_ERROR_TYPE_DEBUG,
1110        "Client seeded peers:\n");
1111   print_peer_list (peers, num_peers);
1112
1113   for (i = 0; i < num_peers; i++)
1114   {
1115     LOG (GNUNET_ERROR_TYPE_DEBUG,
1116          "Updating samplers with seed %" PRIu32 ": %s\n",
1117          i,
1118          GNUNET_i2s (&peers[i]));
1119
1120     if (GNUNET_YES == Peers_insert_peer_check_liveliness (&peers[i]))
1121     {
1122       Peers_schedule_operation (&peers[i], insert_in_sampler);
1123       Peers_schedule_operation (&peers[i], insert_in_view);
1124     }
1125
1126     //RPS_sampler_update (prot_sampler,   &peers[i]);
1127     //RPS_sampler_update (client_sampler, &peers[i]);
1128   }
1129
1130   ////GNUNET_free (peers);
1131
1132   GNUNET_SERVER_receive_done (client,
1133                               GNUNET_OK);
1134 }
1135
1136
1137 /**
1138  * Handle a PUSH message from another peer.
1139  *
1140  * Check the proof of work and store the PeerID
1141  * in the temporary list for pushed PeerIDs.
1142  *
1143  * @param cls Closure
1144  * @param channel The channel the PUSH was received over
1145  * @param channel_ctx The context associated with this channel
1146  * @param msg The message header
1147  */
1148 static int
1149 handle_peer_push (void *cls,
1150                   struct GNUNET_CADET_Channel *channel,
1151                   void **channel_ctx,
1152                   const struct GNUNET_MessageHeader *msg)
1153 {
1154   const struct GNUNET_PeerIdentity *peer;
1155
1156   // (check the proof of work (?))
1157
1158   peer = (const struct GNUNET_PeerIdentity *)
1159     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1160   // FIXME wait for cadet to change this function
1161
1162   LOG (GNUNET_ERROR_TYPE_DEBUG,
1163        "Received PUSH (%s)\n",
1164        GNUNET_i2s (peer));
1165
1166 #ifdef ENABLE_MALICIOUS
1167   struct AttackedPeer *tmp_att_peer;
1168
1169   tmp_att_peer = GNUNET_new (struct AttackedPeer);
1170   memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1171   if (1 == mal_type
1172       || 3 == mal_type)
1173   { /* Try to maximise representation */
1174     if (NULL == att_peer_set)
1175       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1176     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1177                                                              peer))
1178     {
1179       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1180                                    att_peers_tail,
1181                                    tmp_att_peer);
1182       add_peer_array_to_set (peer, 1, att_peer_set);
1183     }
1184     return GNUNET_OK;
1185   }
1186
1187
1188   else if (2 == mal_type)
1189   { /* We attack one single well-known peer - simply ignore */
1190     return GNUNET_OK;
1191   }
1192   else
1193   {
1194     GNUNET_free (tmp_att_peer);
1195   }
1196
1197   #endif /* ENABLE_MALICIOUS */
1198
1199   /* Add the sending peer to the push_map */
1200   CustomPeerMap_put (push_map, peer);
1201
1202   GNUNET_CADET_receive_done (channel);
1203   return GNUNET_OK;
1204 }
1205
1206
1207 /**
1208  * Handle PULL REQUEST request message from another peer.
1209  *
1210  * Reply with the view of PeerIDs.
1211  *
1212  * @param cls Closure
1213  * @param channel The channel the PULL REQUEST was received over
1214  * @param channel_ctx The context associated with this channel
1215  * @param msg The message header
1216  */
1217 static int
1218 handle_peer_pull_request (void *cls,
1219                           struct GNUNET_CADET_Channel *channel,
1220                           void **channel_ctx,
1221                           const struct GNUNET_MessageHeader *msg)
1222 {
1223   struct GNUNET_PeerIdentity *peer;
1224   const struct GNUNET_PeerIdentity *view_array;
1225
1226   peer = (struct GNUNET_PeerIdentity *)
1227     GNUNET_CADET_channel_get_info (channel,
1228                                    GNUNET_CADET_OPTION_PEER);
1229   // FIXME wait for cadet to change this function
1230
1231   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
1232
1233   #ifdef ENABLE_MALICIOUS
1234   if (1 == mal_type
1235       || 3 == mal_type)
1236   { /* Try to maximise representation */
1237     send_pull_reply (peer, mal_peers, num_mal_peers);
1238     return GNUNET_OK;
1239   }
1240
1241   else if (2 == mal_type)
1242   { /* Try to partition network */
1243     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1244     {
1245       send_pull_reply (peer, mal_peers, num_mal_peers);
1246     }
1247     return GNUNET_OK;
1248   }
1249   #endif /* ENABLE_MALICIOUS */
1250
1251   view_array = View_get_as_array ();
1252
1253   send_pull_reply (peer, view_array, View_size ());
1254
1255   GNUNET_CADET_receive_done (channel);
1256   return GNUNET_OK;
1257 }
1258
1259
1260 /**
1261  * Handle PULL REPLY message from another peer.
1262  *
1263  * Check whether we sent a corresponding request and
1264  * whether this reply is the first one.
1265  *
1266  * @param cls Closure
1267  * @param channel The channel the PUSH was received over
1268  * @param channel_ctx The context associated with this channel
1269  * @param msg The message header
1270  */
1271 static int
1272 handle_peer_pull_reply (void *cls,
1273                         struct GNUNET_CADET_Channel *channel,
1274                         void **channel_ctx,
1275                         const struct GNUNET_MessageHeader *msg)
1276 {
1277   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1278   struct GNUNET_PeerIdentity *peers;
1279   struct GNUNET_PeerIdentity *sender;
1280   uint32_t i;
1281 #ifdef ENABLE_MALICIOUS
1282   struct AttackedPeer *tmp_att_peer;
1283 #endif /* ENABLE_MALICIOUS */
1284
1285   /* Check for protocol violation */
1286   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1287   {
1288     GNUNET_break_op (0);
1289     GNUNET_CADET_receive_done (channel);
1290     return GNUNET_SYSERR;
1291   }
1292
1293   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1294   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1295       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1296   {
1297     LOG (GNUNET_ERROR_TYPE_ERROR,
1298         "message says it sends %" PRIu32 " peers, have space for %i peers\n",
1299         ntohl (in_msg->num_peers),
1300         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1301             sizeof (struct GNUNET_PeerIdentity));
1302     GNUNET_break_op (0);
1303     GNUNET_CADET_receive_done (channel);
1304     return GNUNET_SYSERR;
1305   }
1306
1307   // Guess simply casting isn't the nicest way...
1308   // FIXME wait for cadet to change this function
1309   sender = (struct GNUNET_PeerIdentity *)
1310       GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1311
1312   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
1313
1314   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
1315   {
1316     LOG (GNUNET_ERROR_TYPE_WARNING,
1317         "Received a pull reply from a peer we didn't request one from!\n");
1318     GNUNET_break_op (0);
1319     GNUNET_CADET_receive_done (channel);
1320     return GNUNET_OK;
1321   }
1322
1323
1324   #ifdef ENABLE_MALICIOUS
1325   // We shouldn't even receive pull replies as we're not sending
1326   if (2 == mal_type)
1327     return GNUNET_OK;
1328   #endif /* ENABLE_MALICIOUS */
1329
1330   /* Do actual logic */
1331   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1332
1333   LOG (GNUNET_ERROR_TYPE_DEBUG,
1334        "PULL REPLY received, got following %u peers:\n",
1335        ntohl (in_msg->num_peers));
1336
1337   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1338   {
1339     LOG (GNUNET_ERROR_TYPE_DEBUG,
1340          "%u. %s\n",
1341          i,
1342          GNUNET_i2s (&peers[i]));
1343
1344     #ifdef ENABLE_MALICIOUS
1345     if ((NULL != att_peer_set) &&
1346         (1 == mal_type || 3 == mal_type))
1347     { /* Add attacked peer to local list */
1348       // TODO check if we sent a request and this was the first reply
1349       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1350                                                                &peers[i])
1351           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1352                                                                   &peers[i])
1353           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1354                                                    &own_identity))
1355       {
1356         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1357         tmp_att_peer->peer_id = peers[i];
1358         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1359                                      att_peers_tail,
1360                                      tmp_att_peer);
1361         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1362       }
1363       continue;
1364     }
1365     #endif /* ENABLE_MALICIOUS */
1366     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
1367                                               &peers[i]))
1368     {
1369       /* Make sure we 'know' about this peer */
1370       (void) Peers_insert_peer_check_liveliness (&peers[i]);
1371
1372       if (GNUNET_YES == Peers_check_peer_flag (&peers[i], Peers_VALID))
1373       {
1374         CustomPeerMap_put (pull_map, &peers[i]);
1375       }
1376       else
1377       {
1378         Peers_schedule_operation (&peers[i], insert_in_pull_map);
1379         Peers_issue_peer_liveliness_check (&peers[i]);
1380       }
1381     }
1382   }
1383
1384   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
1385   clean_peer (sender);
1386
1387   GNUNET_CADET_receive_done (channel);
1388   return GNUNET_OK;
1389 }
1390
1391
1392 /**
1393  * Compute a random delay.
1394  * A uniformly distributed value between mean + spread and mean - spread.
1395  *
1396  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1397  * It would return a random value between 2 and 6 min.
1398  *
1399  * @param mean the mean
1400  * @param spread the inverse amount of deviation from the mean
1401  */
1402 static struct GNUNET_TIME_Relative
1403 compute_rand_delay (struct GNUNET_TIME_Relative mean,
1404                     unsigned int spread)
1405 {
1406   struct GNUNET_TIME_Relative half_interval;
1407   struct GNUNET_TIME_Relative ret;
1408   unsigned int rand_delay;
1409   unsigned int max_rand_delay;
1410
1411   if (0 == spread)
1412   {
1413     LOG (GNUNET_ERROR_TYPE_WARNING,
1414          "Not accepting spread of 0\n");
1415     GNUNET_break (0);
1416   }
1417
1418   /* Compute random time value between spread * mean and spread * mean */
1419   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1420
1421   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1422   /**
1423    * Compute random value between (0 and 1) * round_interval
1424    * via multiplying round_interval with a 'fraction' (0 to value)/value
1425    */
1426   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1427   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1428   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1429   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1430
1431   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1432     LOG (GNUNET_ERROR_TYPE_WARNING,
1433          "Returning FOREVER_REL\n");
1434
1435   return ret;
1436 }
1437
1438
1439 /**
1440  * Send single pull request
1441  *
1442  * @param peer_id the peer to send the pull request to.
1443  */
1444 static void
1445 send_pull_request (const struct GNUNET_PeerIdentity *peer)
1446 {
1447   struct GNUNET_MQ_Envelope *ev;
1448
1449   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
1450                                                      Peers_PULL_REPLY_PENDING));
1451   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
1452
1453   LOG (GNUNET_ERROR_TYPE_DEBUG,
1454        "Going to send PULL REQUEST to peer %s.\n",
1455        GNUNET_i2s (peer));
1456
1457   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1458   Peers_send_message (peer, ev, "PULL REQUEST");
1459 }
1460
1461
1462 /**
1463  * Send single push
1464  *
1465  * @param peer_id the peer to send the push to.
1466  */
1467 static void
1468 send_push (const struct GNUNET_PeerIdentity *peer_id)
1469 {
1470   struct GNUNET_MQ_Envelope *ev;
1471
1472   LOG (GNUNET_ERROR_TYPE_DEBUG,
1473        "Going to send PUSH to peer %s.\n",
1474        GNUNET_i2s (peer_id));
1475
1476   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1477   Peers_send_message (peer_id, ev, "PUSH");
1478 }
1479
1480
1481 static void
1482 do_round (void *cls);
1483
1484 static void
1485 do_mal_round (void *cls);
1486
1487
1488 #ifdef ENABLE_MALICIOUS
1489 /**
1490  * Turn RPS service to act malicious.
1491  *
1492  * @param cls Closure
1493  * @param client The client that sent the message
1494  * @param msg The message header
1495  */
1496 static void
1497 handle_client_act_malicious (void *cls,
1498                              struct GNUNET_SERVER_Client *client,
1499                              const struct GNUNET_MessageHeader *msg)
1500 {
1501   struct GNUNET_RPS_CS_ActMaliciousMessage *in_msg;
1502   struct GNUNET_PeerIdentity *peers;
1503   uint32_t num_mal_peers_sent;
1504   uint32_t num_mal_peers_old;
1505
1506   /* Check for protocol violation */
1507   if (sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage) > ntohs (msg->size))
1508   {
1509     GNUNET_break_op (0);
1510   }
1511
1512   in_msg = (struct GNUNET_RPS_CS_ActMaliciousMessage *) msg;
1513   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1514       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1515   {
1516     LOG (GNUNET_ERROR_TYPE_ERROR,
1517         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1518         ntohl (in_msg->num_peers),
1519         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1520             sizeof (struct GNUNET_PeerIdentity));
1521     GNUNET_break_op (0);
1522   }
1523
1524
1525   /* Do actual logic */
1526   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1527   mal_type = ntohl (in_msg->type);
1528   if (NULL == mal_peer_set)
1529     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1530
1531   LOG (GNUNET_ERROR_TYPE_DEBUG,
1532        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
1533        mal_type,
1534        ntohl (in_msg->num_peers));
1535
1536   if (1 == mal_type)
1537   { /* Try to maximise representation */
1538     /* Add other malicious peers to those we already know */
1539
1540     num_mal_peers_sent = ntohl (in_msg->num_peers);
1541     num_mal_peers_old = num_mal_peers;
1542     GNUNET_array_grow (mal_peers,
1543                        num_mal_peers,
1544                        num_mal_peers + num_mal_peers_sent);
1545     memcpy (&mal_peers[num_mal_peers_old],
1546             peers,
1547             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1548
1549     /* Add all mal peers to mal_peer_set */
1550     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1551                            num_mal_peers_sent,
1552                            mal_peer_set);
1553
1554     /* Substitute do_round () with do_mal_round () */
1555     GNUNET_SCHEDULER_cancel (do_round_task);
1556     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1557   }
1558
1559   else if ( (2 == mal_type) ||
1560             (3 == mal_type) )
1561   { /* Try to partition the network */
1562     /* Add other malicious peers to those we already know */
1563
1564     num_mal_peers_sent = ntohl (in_msg->num_peers) - 1;
1565     num_mal_peers_old = num_mal_peers;
1566     GNUNET_array_grow (mal_peers,
1567                        num_mal_peers,
1568                        num_mal_peers + num_mal_peers_sent);
1569     if (NULL != mal_peers &&
1570         0 != num_mal_peers)
1571     {
1572       memcpy (&mal_peers[num_mal_peers_old],
1573               peers,
1574               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1575
1576       /* Add all mal peers to mal_peer_set */
1577       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1578                              num_mal_peers_sent,
1579                              mal_peer_set);
1580     }
1581
1582     /* Store the one attacked peer */
1583     memcpy (&attacked_peer,
1584             &in_msg->attacked_peer,
1585             sizeof (struct GNUNET_PeerIdentity));
1586     /* Set the flag of the attacked peer to valid to avoid problems */
1587     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
1588     {
1589       Peers_insert_peer_check_liveliness (&attacked_peer);
1590       Peers_issue_peer_liveliness_check (&attacked_peer);
1591     }
1592
1593     LOG (GNUNET_ERROR_TYPE_DEBUG,
1594          "Attacked peer is %s\n",
1595          GNUNET_i2s (&attacked_peer));
1596
1597     /* Substitute do_round () with do_mal_round () */
1598     GNUNET_SCHEDULER_cancel (do_round_task);
1599     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1600   }
1601   else if (0 == mal_type)
1602   { /* Stop acting malicious */
1603     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
1604
1605     /* Substitute do_mal_round () with do_round () */
1606     GNUNET_SCHEDULER_cancel (do_round_task);
1607     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1608   }
1609   else
1610   {
1611     GNUNET_break (0);
1612   }
1613   GNUNET_SERVER_receive_done (client,
1614                               GNUNET_OK);
1615 }
1616
1617
1618 /**
1619  * Send out PUSHes and PULLs maliciously.
1620  *
1621  * This is executed regylary.
1622  */
1623 static void
1624 do_mal_round (void *cls)
1625 {
1626   uint32_t num_pushes;
1627   uint32_t i;
1628   struct GNUNET_TIME_Relative time_next_round;
1629   struct AttackedPeer *tmp_att_peer;
1630
1631   LOG (GNUNET_ERROR_TYPE_DEBUG,
1632        "Going to execute next round maliciously type %" PRIu32 ".\n",
1633       mal_type);
1634   do_round_task = NULL;
1635   GNUNET_assert (mal_type <= 3);
1636   /* Do malicious actions */
1637   if (1 == mal_type)
1638   { /* Try to maximise representation */
1639
1640     /* The maximum of pushes we're going to send this round */
1641     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
1642                                          num_attacked_peers),
1643                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1644
1645     LOG (GNUNET_ERROR_TYPE_DEBUG,
1646          "Going to send %" PRIu32 " pushes\n",
1647          num_pushes);
1648
1649     /* Send PUSHes to attacked peers */
1650     for (i = 0 ; i < num_pushes ; i++)
1651     {
1652       if (att_peers_tail == att_peer_index)
1653         att_peer_index = att_peers_head;
1654       else
1655         att_peer_index = att_peer_index->next;
1656
1657       send_push (&att_peer_index->peer_id);
1658     }
1659
1660     /* Send PULLs to some peers to learn about additional peers to attack */
1661     tmp_att_peer = att_peer_index;
1662     for (i = 0 ; i < num_pushes * alpha ; i++)
1663     {
1664       if (att_peers_tail == tmp_att_peer)
1665         tmp_att_peer = att_peers_head;
1666       else
1667         att_peer_index = tmp_att_peer->next;
1668
1669       send_pull_request (&tmp_att_peer->peer_id);
1670     }
1671   }
1672
1673
1674   else if (2 == mal_type)
1675   { /**
1676      * Try to partition the network
1677      * Send as many pushes to the attacked peer as possible
1678      * That is one push per round as it will ignore more.
1679      */
1680     Peers_insert_peer_check_liveliness (&attacked_peer);
1681     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_VALID))
1682       send_push (&attacked_peer);
1683   }
1684
1685
1686   if (3 == mal_type)
1687   { /* Combined attack */
1688
1689     /* Send PUSH to attacked peers */
1690     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
1691     {
1692       Peers_insert_peer_check_liveliness (&attacked_peer);
1693       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_VALID))
1694       {
1695         LOG (GNUNET_ERROR_TYPE_DEBUG,
1696             "Goding to send push to attacked peer (%s)\n",
1697             GNUNET_i2s (&attacked_peer));
1698         send_push (&attacked_peer);
1699       }
1700       else
1701         Peers_issue_peer_liveliness_check (&attacked_peer);
1702     }
1703     else
1704       Peers_insert_peer_check_liveliness (&attacked_peer);
1705     Peers_issue_peer_liveliness_check (&attacked_peer);
1706
1707     /* The maximum of pushes we're going to send this round */
1708     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
1709                                          num_attacked_peers),
1710                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1711
1712     LOG (GNUNET_ERROR_TYPE_DEBUG,
1713          "Going to send %" PRIu32 " pushes\n",
1714          num_pushes);
1715
1716     for (i = 0; i < num_pushes; i++)
1717     {
1718       if (att_peers_tail == att_peer_index)
1719         att_peer_index = att_peers_head;
1720       else
1721         att_peer_index = att_peer_index->next;
1722
1723       send_push (&att_peer_index->peer_id);
1724     }
1725
1726     /* Send PULLs to some peers to learn about additional peers to attack */
1727     tmp_att_peer = att_peer_index;
1728     for (i = 0; i < num_pushes * alpha; i++)
1729     {
1730       if (att_peers_tail == tmp_att_peer)
1731         tmp_att_peer = att_peers_head;
1732       else
1733         att_peer_index = tmp_att_peer->next;
1734
1735       send_pull_request (&tmp_att_peer->peer_id);
1736     }
1737   }
1738
1739   /* Schedule next round */
1740   time_next_round = compute_rand_delay (round_interval, 2);
1741
1742   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
1743   //NULL);
1744   GNUNET_assert (NULL == do_round_task);
1745   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
1746                                                 &do_mal_round, NULL);
1747   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1748 }
1749 #endif /* ENABLE_MALICIOUS */
1750
1751
1752 /**
1753  * Send out PUSHes and PULLs, possibly update #view, samplers.
1754  *
1755  * This is executed regylary.
1756  */
1757 static void
1758 do_round (void *cls)
1759 {
1760   uint32_t i;
1761   const struct GNUNET_PeerIdentity *view_array;
1762   unsigned int *permut;
1763   unsigned int a_peers; /* Number of peers we send pushes to */
1764   unsigned int b_peers; /* Number of peers we send pull requests to */
1765   uint32_t first_border;
1766   uint32_t second_border;
1767   struct GNUNET_PeerIdentity peer;
1768   struct GNUNET_PeerIdentity *update_peer;
1769
1770   LOG (GNUNET_ERROR_TYPE_DEBUG,
1771        "Going to execute next round.\n");
1772   do_round_task = NULL;
1773   LOG (GNUNET_ERROR_TYPE_DEBUG,
1774        "Printing view:\n");
1775   to_file (file_name_view_log,
1776            "___ new round ___");
1777   view_array = View_get_as_array ();
1778   for (i = 0; i < View_size (); i++)
1779   {
1780     LOG (GNUNET_ERROR_TYPE_DEBUG,
1781          "\t%s\n", GNUNET_i2s (&view_array[i]));
1782     to_file (file_name_view_log,
1783              "=%s\t(do round)",
1784              GNUNET_i2s_full (&view_array[i]));
1785   }
1786
1787
1788   /* Send pushes and pull requests */
1789   if (0 < View_size ())
1790   {
1791     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1792                                            View_size ());
1793
1794     /* Send PUSHes */
1795     a_peers = ceil (alpha * View_size ());
1796
1797     LOG (GNUNET_ERROR_TYPE_DEBUG,
1798          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
1799          a_peers, alpha, View_size ());
1800     for (i = 0; i < a_peers; i++)
1801     {
1802       peer = view_array[permut[i]];
1803       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1804       { // FIXME if this fails schedule/loop this for later
1805         send_push (&peer);
1806       }
1807     }
1808
1809     /* Send PULL requests */
1810     b_peers = ceil (beta * View_size ());
1811     first_border = a_peers;
1812     second_border = a_peers + b_peers;
1813     if (second_border > View_size ())
1814     {
1815       first_border = View_size () - b_peers;
1816       second_border = View_size ();
1817     }
1818     LOG (GNUNET_ERROR_TYPE_DEBUG,
1819         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
1820         b_peers, beta, View_size ());
1821     for (i = first_border; i < second_border; i++)
1822     {
1823       peer = view_array[permut[i]];
1824       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
1825           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
1826       { // FIXME if this fails schedule/loop this for later
1827         send_pull_request (&peer);
1828       }
1829     }
1830
1831     GNUNET_free (permut);
1832     permut = NULL;
1833   }
1834
1835
1836   /* Update view */
1837   /* TODO see how many peers are in push-/pull- list! */
1838
1839   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
1840       (0 < CustomPeerMap_size (push_map)) &&
1841       (0 < CustomPeerMap_size (pull_map)))
1842   { /* If conditions for update are fulfilled, update */
1843     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
1844
1845     uint32_t final_size;
1846     uint32_t peers_to_clean_size;
1847     struct GNUNET_PeerIdentity *peers_to_clean;
1848
1849     peers_to_clean = NULL;
1850     peers_to_clean_size = 0;
1851     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
1852     memcpy (peers_to_clean,
1853             view_array,
1854             View_size () * sizeof (struct GNUNET_PeerIdentity));
1855
1856     /* Seems like recreating is the easiest way of emptying the peermap */
1857     View_clear ();
1858     to_file (file_name_view_log,
1859              "--- emptied ---");
1860
1861     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
1862                                 CustomPeerMap_size (push_map));
1863     second_border = first_border +
1864                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
1865                                 CustomPeerMap_size (pull_map));
1866     final_size    = second_border +
1867       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
1868
1869     /* Update view with peers received through PUSHes */
1870     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1871                                            CustomPeerMap_size (push_map));
1872     for (i = 0; i < first_border; i++)
1873     {
1874       View_put (CustomPeerMap_get_peer_by_index (push_map, permut[i]));
1875       to_file (file_name_view_log,
1876                "+%s\t(push list)",
1877                GNUNET_i2s_full (&view_array[i]));
1878       // TODO change the peer_flags accordingly
1879     }
1880     GNUNET_free (permut);
1881     permut = NULL;
1882
1883     /* Update view with peers received through PULLs */
1884     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1885                                            CustomPeerMap_size (pull_map));
1886     for (i = first_border; i < second_border; i++)
1887     {
1888       View_put (CustomPeerMap_get_peer_by_index (pull_map,
1889                                                  permut[i - first_border]));
1890       to_file (file_name_view_log,
1891                "+%s\t(pull list)",
1892                GNUNET_i2s_full (&view_array[i]));
1893       // TODO change the peer_flags accordingly
1894     }
1895     GNUNET_free (permut);
1896     permut = NULL;
1897
1898     /* Update view with peers from history */
1899     RPS_sampler_get_n_rand_peers (prot_sampler,
1900                                   hist_update,
1901                                   NULL,
1902                                   final_size - second_border);
1903     num_hist_update_tasks = final_size - second_border;
1904     // TODO change the peer_flags accordingly
1905
1906     for (i = 0; i < View_size (); i++)
1907       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
1908
1909     /* Clean peers that were removed from the view */
1910     for (i = 0; i < peers_to_clean_size; i++)
1911     {
1912       to_file (file_name_view_log,
1913                "-%s",
1914                GNUNET_i2s_full (&peers_to_clean[i]));
1915       Peers_clean_peer (&peers_to_clean[i]);
1916       //peer_destroy_channel_send (sender);
1917     }
1918
1919     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
1920     peers_to_clean = NULL;
1921   }
1922   else
1923   {
1924     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
1925   }
1926   // TODO independent of that also get some peers from CADET_get_peers()?
1927
1928   LOG (GNUNET_ERROR_TYPE_DEBUG,
1929        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
1930        CustomPeerMap_size (push_map),
1931        CustomPeerMap_size (pull_map),
1932        alpha,
1933        View_size (),
1934        alpha * View_size ());
1935
1936   /* Update samplers */
1937   for (i = 0; i < CustomPeerMap_size (push_map); i++)
1938   {
1939     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
1940     LOG (GNUNET_ERROR_TYPE_DEBUG,
1941          "Updating with peer %s from push list\n",
1942          GNUNET_i2s (update_peer));
1943     insert_in_sampler (NULL, update_peer);
1944     Peers_clean_peer (update_peer); /* This cleans only if it is not in the view */
1945     //peer_destroy_channel_send (sender);
1946   }
1947
1948   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
1949   {
1950     LOG (GNUNET_ERROR_TYPE_DEBUG,
1951          "Updating with peer %s from pull list\n",
1952          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
1953     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
1954     /* This cleans only if it is not in the view */
1955     Peers_clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
1956     //peer_destroy_channel_send (sender);
1957   }
1958
1959
1960   /* Empty push/pull lists */
1961   CustomPeerMap_clear (push_map);
1962   CustomPeerMap_clear (pull_map);
1963
1964   struct GNUNET_TIME_Relative time_next_round;
1965
1966   time_next_round = compute_rand_delay (round_interval, 2);
1967
1968   /* Schedule next round */
1969   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
1970                                                 &do_round, NULL);
1971   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1972 }
1973
1974
1975 static void
1976 rps_start (struct GNUNET_SERVER_Handle *server);
1977
1978
1979 /**
1980  * This is called from GNUNET_CADET_get_peers().
1981  *
1982  * It is called on every peer(ID) that cadet somehow has contact with.
1983  * We use those to initialise the sampler.
1984  */
1985 void
1986 init_peer_cb (void *cls,
1987               const struct GNUNET_PeerIdentity *peer,
1988               int tunnel, // "Do we have a tunnel towards this peer?"
1989               unsigned int n_paths, // "Number of known paths towards this peer"
1990               unsigned int best_path) // "How long is the best path?
1991                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
1992 {
1993   if (NULL != peer)
1994   {
1995     LOG (GNUNET_ERROR_TYPE_DEBUG,
1996          "Got peer_id %s from cadet\n",
1997          GNUNET_i2s (peer));
1998     Peers_insert_peer_check_liveliness (peer);
1999     Peers_schedule_operation (peer, insert_in_sampler);
2000     Peers_schedule_operation (peer, insert_in_view);
2001   }
2002 }
2003
2004
2005 /**
2006  * Iterator over peers from peerinfo.
2007  *
2008  * @param cls closure
2009  * @param peer id of the peer, NULL for last call
2010  * @param hello hello message for the peer (can be NULL)
2011  * @param error message
2012  */
2013 void
2014 process_peerinfo_peers (void *cls,
2015                         const struct GNUNET_PeerIdentity *peer,
2016                         const struct GNUNET_HELLO_Message *hello,
2017                         const char *err_msg)
2018 {
2019   if (NULL != peer)
2020   {
2021     LOG (GNUNET_ERROR_TYPE_DEBUG,
2022          "Got peer_id %s from peerinfo\n",
2023          GNUNET_i2s (peer));
2024     Peers_insert_peer_check_liveliness (peer);
2025     Peers_schedule_operation (peer, insert_in_sampler);
2026     Peers_schedule_operation (peer, insert_in_view);
2027   }
2028 }
2029
2030
2031 /**
2032  * Task run during shutdown.
2033  *
2034  * @param cls unused
2035  */
2036 static void
2037 shutdown_task (void *cls)
2038 {
2039   LOG (GNUNET_ERROR_TYPE_DEBUG,
2040        "RPS is going down\n");
2041   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
2042   GNUNET_PEERINFO_disconnect (peerinfo_handle);
2043
2044   if (NULL != do_round_task)
2045   {
2046     GNUNET_SCHEDULER_cancel (do_round_task);
2047     do_round_task = NULL;
2048   }
2049
2050   Peers_terminate ();
2051
2052   GNUNET_NSE_disconnect (nse);
2053   RPS_sampler_destroy (prot_sampler);
2054   RPS_sampler_destroy (client_sampler);
2055   GNUNET_CADET_disconnect (cadet_handle);
2056   View_destroy ();
2057   CustomPeerMap_destroy (push_map);
2058   CustomPeerMap_destroy (pull_map);
2059   #ifdef ENABLE_MALICIOUS
2060   struct AttackedPeer *tmp_att_peer;
2061   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2062   if (NULL != mal_peer_set)
2063     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2064   if (NULL != att_peer_set)
2065     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2066   while (NULL != att_peers_head)
2067   {
2068     tmp_att_peer = att_peers_head;
2069     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2070   }
2071   #endif /* ENABLE_MALICIOUS */
2072 }
2073
2074
2075 /**
2076  * @brief Get informed about a connecting client.
2077  *
2078  * @param cls unused
2079  * @param client the client that connects
2080  */
2081 static void
2082 handle_client_connect (void *cls,
2083                        struct GNUNET_SERVER_Client *client)
2084 {
2085   struct ClientContext *cli_ctx;
2086
2087   LOG (GNUNET_ERROR_TYPE_DEBUG,
2088        "Client connected\n");
2089   if (NULL == client)
2090     return; /* Server was destroyed before a client connected. Shutting down */
2091   cli_ctx = GNUNET_new (struct ClientContext);
2092   cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
2093   GNUNET_SERVER_client_set_user_context (client, cli_ctx);
2094   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2095                                cli_ctx_tail,
2096                                cli_ctx);
2097 }
2098
2099 /**
2100  * A client disconnected.  Remove all of its data structure entries.
2101  *
2102  * @param cls closure, NULL
2103  * @param client identification of the client
2104  */
2105 static void
2106 handle_client_disconnect (void *cls,
2107                                             struct GNUNET_SERVER_Client *client)
2108 {
2109   struct ClientContext *cli_ctx;
2110
2111   if (NULL == client)
2112   {/* shutdown task */
2113     while (NULL != cli_ctx_head)
2114       destroy_cli_ctx (cli_ctx_head);
2115   }
2116   else
2117   {
2118     cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
2119     destroy_cli_ctx (cli_ctx);
2120   }
2121 }
2122
2123
2124 /**
2125  * Actually start the service.
2126  */
2127   static void
2128 rps_start (struct GNUNET_SERVER_Handle *server)
2129 {
2130   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2131     {&handle_client_request,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2132       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2133     {&handle_client_request_cancel, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2134       sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)},
2135     {&handle_client_seed,           NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2136     #ifdef ENABLE_MALICIOUS
2137     {&handle_client_act_malicious,  NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2138     #endif /* ENABLE_MALICIOUS */
2139     {NULL, NULL, 0, 0}
2140   };
2141
2142   GNUNET_SERVER_add_handlers (server, handlers);
2143   GNUNET_SERVER_connect_notify (server,
2144                                 &handle_client_connect,
2145                                 NULL);
2146   GNUNET_SERVER_disconnect_notify (server,
2147                                    &handle_client_disconnect,
2148                                    NULL);
2149   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
2150
2151
2152   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2153   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2154
2155   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
2156                                  NULL);
2157 }
2158
2159
2160 /**
2161  * Process statistics requests.
2162  *
2163  * @param cls closure
2164  * @param server the initialized server
2165  * @param c configuration to use
2166  */
2167   static void
2168 run (void *cls,
2169      struct GNUNET_SERVER_Handle *server,
2170      const struct GNUNET_CONFIGURATION_Handle *c)
2171 {
2172   int size;
2173   int out_size;
2174
2175   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2176   cfg = c;
2177
2178
2179   /* Get own ID */
2180   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2181   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2182               "STARTING SERVICE (rps) for peer [%s]\n",
2183               GNUNET_i2s (&own_identity));
2184   #ifdef ENABLE_MALICIOUS
2185   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2186               "Malicious execution compiled in.\n");
2187   #endif /* ENABLE_MALICIOUS */
2188
2189
2190
2191   /* Get time interval from the configuration */
2192   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2193                                                         "ROUNDINTERVAL",
2194                                                         &round_interval))
2195   {
2196     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2197                                "RPS", "ROUNDINTERVAL");
2198     GNUNET_SCHEDULER_shutdown ();
2199     return;
2200   }
2201
2202   /* Get initial size of sampler/view from the configuration */
2203   if (GNUNET_OK !=
2204       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
2205         (long long unsigned int *) &sampler_size_est_need))
2206   {
2207     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2208                                "RPS", "INITSIZE");
2209     GNUNET_SCHEDULER_shutdown ();
2210     return;
2211   }
2212   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
2213
2214
2215   View_create (4);
2216
2217   /* file_name_view_log */
2218   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
2219   {
2220     LOG (GNUNET_ERROR_TYPE_WARNING,
2221          "Failed to create directory /tmp/rps/\n");
2222   }
2223
2224   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
2225   file_name_view_log = GNUNET_malloc (size);
2226   out_size = GNUNET_snprintf (file_name_view_log,
2227                               size,
2228                               "/tmp/rps/view-%s",
2229                               GNUNET_i2s_full (&own_identity));
2230   if (size < out_size ||
2231       0 > out_size)
2232   {
2233     LOG (GNUNET_ERROR_TYPE_WARNING,
2234          "Failed to write string to buffer (size: %i, out_size: %i)\n",
2235          size,
2236          out_size);
2237   }
2238
2239
2240   /* connect to NSE */
2241   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2242
2243
2244   alpha = 0.45;
2245   beta  = 0.45;
2246
2247
2248   /* Initialise cadet */
2249   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2250     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        ,
2251       sizeof (struct GNUNET_MessageHeader)},
2252     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2253       sizeof (struct GNUNET_MessageHeader)},
2254     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
2255     {NULL, 0, 0}
2256   };
2257
2258   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
2259   cadet_handle = GNUNET_CADET_connect (cfg,
2260                                        cls,
2261                                        &Peers_handle_inbound_channel,
2262                                        &cleanup_destroyed_channel,
2263                                        cadet_handlers,
2264                                        ports);
2265   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
2266   Peers_initialise (cadet_handle, &own_identity);
2267
2268   /* Initialise sampler */
2269   struct GNUNET_TIME_Relative half_round_interval;
2270   struct GNUNET_TIME_Relative  max_round_interval;
2271
2272   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2273   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2274
2275   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
2276   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
2277
2278   /* Initialise push and pull maps */
2279   push_map = CustomPeerMap_create (4);
2280   pull_map = CustomPeerMap_create (4);
2281
2282
2283   num_hist_update_tasks = 0;
2284
2285
2286   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2287   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2288   // TODO send push/pull to each of those peers?
2289
2290   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
2291                                                    GNUNET_NO,
2292                                                    process_peerinfo_peers,
2293                                                    NULL);
2294
2295   rps_start (server);
2296 }
2297
2298
2299 /**
2300  * The main function for the rps service.
2301  *
2302  * @param argc number of arguments from the command line
2303  * @param argv command line arguments
2304  * @return 0 ok, 1 on error
2305  */
2306 int
2307 main (int argc, char *const *argv)
2308 {
2309   return (GNUNET_OK ==
2310           GNUNET_SERVICE_run (argc,
2311                               argv,
2312                               "rps",
2313                               GNUNET_SERVICE_OPTION_NONE,
2314                               &run, NULL)) ? 0 : 1;
2315 }
2316
2317 /* end of gnunet-service-rps.c */