More API function tests...
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "rps.h"
33 #include "rps-test_util.h"
34 #include "gnunet-service-rps_sampler.h"
35 #include "gnunet-service-rps_custommap.h"
36 #include "gnunet-service-rps_peers.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Our own identity.
65  */
66 static struct GNUNET_PeerIdentity own_identity;
67
68
69 /***********************************************************************
70  * Housekeeping with clients
71 ***********************************************************************/
72
73 /**
74  * Closure used to pass the client and the id to the callback
75  * that replies to a client's request
76  */
77 struct ReplyCls
78 {
79   /**
80    * DLL
81    */
82   struct ReplyCls *next;
83   struct ReplyCls *prev;
84
85   /**
86    * The identifier of the request
87    */
88   uint32_t id;
89
90   /**
91    * The handle to the request
92    */
93   struct RPS_SamplerRequestHandle *req_handle;
94
95   /**
96    * The client handle to send the reply to
97    */
98   struct ClientContext *cli_ctx;
99 };
100
101
102 /**
103  * Struct used to store the context of a connected client.
104  */
105 struct ClientContext
106 {
107   /**
108    * DLL
109    */
110   struct ClientContext *next;
111   struct ClientContext *prev;
112
113   /**
114    * The message queue to communicate with the client.
115    */
116   struct GNUNET_MQ_Handle *mq;
117
118   /**
119    * DLL with handles to single requests from the client
120    */
121   struct ReplyCls *rep_cls_head;
122   struct ReplyCls *rep_cls_tail;
123
124   /**
125    * The client handle to send the reply to
126    */
127   struct GNUNET_SERVICE_Client *client;
128 };
129
130 /**
131  * DLL with all clients currently connected to us
132  */
133 struct ClientContext *cli_ctx_head;
134 struct ClientContext *cli_ctx_tail;
135
136 /***********************************************************************
137  * /Housekeeping with clients
138 ***********************************************************************/
139
140
141
142
143
144 /***********************************************************************
145  * Globals
146 ***********************************************************************/
147
148 /**
149  * Sampler used for the Brahms protocol itself.
150  */
151 static struct RPS_Sampler *prot_sampler;
152
153 /**
154  * Sampler used for the clients.
155  */
156 static struct RPS_Sampler *client_sampler;
157
158 /**
159  * Name to log view to
160  */
161 static char *file_name_view_log;
162
163 /**
164  * The size of sampler we need to be able to satisfy the client's need
165  * of random peers.
166  */
167 static unsigned int sampler_size_client_need;
168
169 /**
170  * The size of sampler we need to be able to satisfy the Brahms protocol's
171  * need of random peers.
172  *
173  * This is one minimum size the sampler grows to.
174  */
175 static unsigned int sampler_size_est_need;
176
177 /**
178  * Percentage of total peer number in the view
179  * to send random PUSHes to
180  */
181 static float alpha;
182
183 /**
184  * Percentage of total peer number in the view
185  * to send random PULLs to
186  */
187 static float beta;
188
189 /**
190  * Identifier for the main task that runs periodically.
191  */
192 static struct GNUNET_SCHEDULER_Task *do_round_task;
193
194 /**
195  * Time inverval the do_round task runs in.
196  */
197 static struct GNUNET_TIME_Relative round_interval;
198
199 /**
200  * List to store peers received through pushes temporary.
201  */
202 static struct CustomPeerMap *push_map;
203
204 /**
205  * List to store peers received through pulls temporary.
206  */
207 static struct CustomPeerMap *pull_map;
208
209 /**
210  * Handler to NSE.
211  */
212 static struct GNUNET_NSE_Handle *nse;
213
214 /**
215  * Handler to CADET.
216  */
217 static struct GNUNET_CADET_Handle *cadet_handle;
218
219 /**
220  * Handler to PEERINFO.
221  */
222 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
223
224 /**
225  * Handle for cancellation of iteration over peers.
226  */
227 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
228
229 /**
230  * Request counter.
231  *
232  * Counts how many requets clients already issued.
233  * Only needed in the beginning to check how many of the 64 deltas
234  * we already have
235  */
236 static unsigned int req_counter;
237
238 /**
239  * Time of the last request we received.
240  *
241  * Used to compute the expected request rate.
242  */
243 static struct GNUNET_TIME_Absolute last_request;
244
245 /**
246  * Size of #request_deltas.
247  */
248 #define REQUEST_DELTAS_SIZE 64
249 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
250
251 /**
252  * Last 64 deltas between requests
253  */
254 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
255
256 /**
257  * The prediction of the rate of requests
258  */
259 static struct GNUNET_TIME_Relative request_rate;
260
261
262 #ifdef ENABLE_MALICIOUS
263 /**
264  * Type of malicious peer
265  *
266  * 0 Don't act malicious at all - Default
267  * 1 Try to maximise representation
268  * 2 Try to partition the network
269  * 3 Combined attack
270  */
271 static uint32_t mal_type;
272
273 /**
274  * Other malicious peers
275  */
276 static struct GNUNET_PeerIdentity *mal_peers;
277
278 /**
279  * Hashmap of malicious peers used as set.
280  * Used to more efficiently check whether we know that peer.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
283
284 /**
285  * Number of other malicious peers
286  */
287 static uint32_t num_mal_peers;
288
289
290 /**
291  * If type is 2 This struct is used to store the attacked peers in a DLL
292  */
293 struct AttackedPeer
294 {
295   /**
296    * DLL
297    */
298   struct AttackedPeer *next;
299   struct AttackedPeer *prev;
300
301   /**
302    * PeerID
303    */
304   struct GNUNET_PeerIdentity peer_id;
305 };
306
307 /**
308  * If type is 2 this is the DLL of attacked peers
309  */
310 static struct AttackedPeer *att_peers_head;
311 static struct AttackedPeer *att_peers_tail;
312
313 /**
314  * This index is used to point to an attacked peer to
315  * implement the round-robin-ish way to select attacked peers.
316  */
317 static struct AttackedPeer *att_peer_index;
318
319 /**
320  * Hashmap of attacked peers used as set.
321  * Used to more efficiently check whether we know that peer.
322  */
323 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
324
325 /**
326  * Number of attacked peers
327  */
328 static uint32_t num_attacked_peers;
329
330 /**
331  * If type is 1 this is the attacked peer
332  */
333 static struct GNUNET_PeerIdentity attacked_peer;
334
335 /**
336  * The limit of PUSHes we can send in one round.
337  * This is an assumption of the Brahms protocol and either implemented
338  * via proof of work
339  * or
340  * assumend to be the bandwidth limitation.
341  */
342 static uint32_t push_limit = 10000;
343 #endif /* ENABLE_MALICIOUS */
344
345
346 /***********************************************************************
347  * /Globals
348 ***********************************************************************/
349
350
351 /***********************************************************************
352  * Util functions
353 ***********************************************************************/
354
355
356 /**
357  * Print peerlist to log.
358  */
359 static void
360 print_peer_list (struct GNUNET_PeerIdentity *list,
361                  unsigned int len)
362 {
363   unsigned int i;
364
365   LOG (GNUNET_ERROR_TYPE_DEBUG,
366        "Printing peer list of length %u at %p:\n",
367        len,
368        list);
369   for (i = 0 ; i < len ; i++)
370   {
371     LOG (GNUNET_ERROR_TYPE_DEBUG,
372          "%u. peer: %s\n",
373          i, GNUNET_i2s (&list[i]));
374   }
375 }
376
377
378 /**
379  * Remove peer from list.
380  */
381 static void
382 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
383                unsigned int *list_size,
384                const struct GNUNET_PeerIdentity *peer)
385 {
386   unsigned int i;
387   struct GNUNET_PeerIdentity *tmp;
388
389   tmp = *peer_list;
390
391   LOG (GNUNET_ERROR_TYPE_DEBUG,
392        "Removing peer %s from list at %p\n",
393        GNUNET_i2s (peer),
394        tmp);
395
396   for ( i = 0 ; i < *list_size ; i++ )
397   {
398     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
399     {
400       if (i < *list_size -1)
401       { /* Not at the last entry -- shift peers left */
402         memmove (&tmp[i], &tmp[i +1],
403                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
404       }
405       /* Remove last entry (should be now useless PeerID) */
406       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
407     }
408   }
409   *peer_list = tmp;
410 }
411
412
413 /**
414  * Sum all time relatives of an array.
415  */
416 static struct GNUNET_TIME_Relative
417 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
418                 uint32_t arr_size)
419 {
420   struct GNUNET_TIME_Relative sum;
421   uint32_t i;
422
423   sum = GNUNET_TIME_UNIT_ZERO;
424   for ( i = 0 ; i < arr_size ; i++ )
425   {
426     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
427   }
428   return sum;
429 }
430
431
432 /**
433  * Compute the average of given time relatives.
434  */
435 static struct GNUNET_TIME_Relative
436 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
437                 uint32_t arr_size)
438 {
439   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
440                                                       arr_size),
441                                       arr_size);
442 }
443
444
445 /**
446  * Insert PeerID in #view
447  *
448  * Called once we know a peer is live.
449  * Implements #PeerOp
450  *
451  * @return GNUNET_OK if peer was actually inserted
452  *         GNUNET_NO if peer was not inserted
453  */
454 static void
455 insert_in_view_op (void *cls,
456                 const struct GNUNET_PeerIdentity *peer);
457
458 /**
459  * Insert PeerID in #view
460  *
461  * Called once we know a peer is live.
462  *
463  * @return GNUNET_OK if peer was actually inserted
464  *         GNUNET_NO if peer was not inserted
465  */
466 static int
467 insert_in_view (const struct GNUNET_PeerIdentity *peer)
468 {
469   int online;
470
471   online = Peers_check_peer_flag (peer, Peers_ONLINE);
472   if ( (GNUNET_NO == online) ||
473        (GNUNET_SYSERR == online) ) /* peer is not even known */
474   {
475     (void) Peers_issue_peer_liveliness_check (peer);
476     (void) Peers_schedule_operation (peer, insert_in_view_op);
477     return GNUNET_NO;
478   }
479   /* Open channel towards peer to keep connection open */
480   Peers_indicate_sending_intention (peer);
481   return View_put (peer);
482 }
483
484 /**
485  * Put random peer from sampler into the view as history update.
486  */
487 static void
488 hist_update (void *cls,
489              struct GNUNET_PeerIdentity *ids,
490              uint32_t num_peers)
491 {
492   unsigned int i;
493
494   for (i = 0; i < num_peers; i++)
495   {
496     (void) insert_in_view (&ids[i]);
497     to_file (file_name_view_log,
498              "+%s\t(hist)",
499              GNUNET_i2s_full (ids));
500   }
501 }
502
503
504 /**
505  * Wrapper around #RPS_sampler_resize()
506  *
507  * If we do not have enough sampler elements, double current sampler size
508  * If we have more than enough sampler elements, halv current sampler size
509  */
510 static void
511 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
512 {
513   unsigned int sampler_size;
514
515   // TODO statistics
516   // TODO respect the min, max
517   sampler_size = RPS_sampler_get_size (sampler);
518   if (sampler_size > new_size * 4)
519   { /* Shrinking */
520     RPS_sampler_resize (sampler, sampler_size / 2);
521   }
522   else if (sampler_size < new_size)
523   { /* Growing */
524     RPS_sampler_resize (sampler, sampler_size * 2);
525   }
526   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
527 }
528
529
530 /**
531  * Wrapper around #RPS_sampler_resize() resizing the client sampler
532  */
533 static void
534 client_resize_wrapper ()
535 {
536   uint32_t bigger_size;
537
538   // TODO statistics
539
540   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
541
542   // TODO respect the min, max
543   resize_wrapper (client_sampler, bigger_size);
544   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
545       bigger_size);
546 }
547
548
549 /**
550  * Estimate request rate
551  *
552  * Called every time we receive a request from the client.
553  */
554 static void
555 est_request_rate()
556 {
557   struct GNUNET_TIME_Relative max_round_duration;
558
559   if (request_deltas_size > req_counter)
560     req_counter++;
561   if ( 1 < req_counter)
562   {
563     /* Shift last request deltas to the right */
564     memmove (&request_deltas[1],
565         request_deltas,
566         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
567
568     /* Add current delta to beginning */
569     request_deltas[0] =
570         GNUNET_TIME_absolute_get_difference (last_request,
571                                              GNUNET_TIME_absolute_get ());
572     request_rate = T_relative_avg (request_deltas, req_counter);
573     request_rate = (request_rate.rel_value_us < 1) ?
574       GNUNET_TIME_relative_get_unit_ () : request_rate;
575
576     /* Compute the duration a round will maximally take */
577     max_round_duration =
578         GNUNET_TIME_relative_add (round_interval,
579                                   GNUNET_TIME_relative_divide (round_interval, 2));
580
581     /* Set the estimated size the sampler has to have to
582      * satisfy the current client request rate */
583     sampler_size_client_need =
584         max_round_duration.rel_value_us / request_rate.rel_value_us;
585
586     /* Resize the sampler */
587     client_resize_wrapper ();
588   }
589   last_request = GNUNET_TIME_absolute_get ();
590 }
591
592
593 /**
594  * Add all peers in @a peer_array to @a peer_map used as set.
595  *
596  * @param peer_array array containing the peers
597  * @param num_peers number of peers in @peer_array
598  * @param peer_map the peermap to use as set
599  */
600 static void
601 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
602                        unsigned int num_peers,
603                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
604 {
605   unsigned int i;
606   if (NULL == peer_map)
607   {
608     LOG (GNUNET_ERROR_TYPE_WARNING,
609          "Trying to add peers to non-existing peermap.\n");
610     return;
611   }
612
613   for (i = 0; i < num_peers; i++)
614   {
615     GNUNET_CONTAINER_multipeermap_put (peer_map,
616                                        &peer_array[i],
617                                        NULL,
618                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
619   }
620 }
621
622
623 /**
624  * Send a PULL REPLY to @a peer_id
625  *
626  * @param peer_id the peer to send the reply to.
627  * @param peer_ids the peers to send to @a peer_id
628  * @param num_peer_ids the number of peers to send to @a peer_id
629  */
630 static void
631 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
632                  const struct GNUNET_PeerIdentity *peer_ids,
633                  unsigned int num_peer_ids)
634 {
635   uint32_t send_size;
636   struct GNUNET_MQ_Envelope *ev;
637   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
638
639   /* Compute actual size */
640   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
641               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
642
643   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
644     /* Compute number of peers to send
645      * If too long, simply truncate */
646     // TODO select random ones via permutation
647     //      or even better: do good protocol design
648     send_size =
649       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
650        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
651        sizeof (struct GNUNET_PeerIdentity);
652   else
653     send_size = num_peer_ids;
654
655   LOG (GNUNET_ERROR_TYPE_DEBUG,
656       "Going to send PULL REPLY with %u peers to %s\n",
657       send_size, GNUNET_i2s (peer_id));
658
659   ev = GNUNET_MQ_msg_extra (out_msg,
660                             send_size * sizeof (struct GNUNET_PeerIdentity),
661                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
662   out_msg->num_peers = htonl (send_size);
663   GNUNET_memcpy (&out_msg[1], peer_ids,
664          send_size * sizeof (struct GNUNET_PeerIdentity));
665
666   Peers_send_message (peer_id, ev, "PULL REPLY");
667 }
668
669
670 /**
671  * Insert PeerID in #pull_map
672  *
673  * Called once we know a peer is live.
674  */
675 static void
676 insert_in_pull_map (void *cls,
677                     const struct GNUNET_PeerIdentity *peer)
678 {
679   CustomPeerMap_put (pull_map, peer);
680 }
681
682
683 /**
684  * Insert PeerID in #view
685  *
686  * Called once we know a peer is live.
687  * Implements #PeerOp
688  */
689 static void
690 insert_in_view_op (void *cls,
691                 const struct GNUNET_PeerIdentity *peer)
692 {
693   (void) insert_in_view (peer);
694 }
695
696
697 /**
698  * Update sampler with given PeerID.
699  * Implements #PeerOp
700  */
701 static void
702 insert_in_sampler (void *cls,
703                    const struct GNUNET_PeerIdentity *peer)
704 {
705   LOG (GNUNET_ERROR_TYPE_DEBUG,
706        "Updating samplers with peer %s from insert_in_sampler()\n",
707        GNUNET_i2s (peer));
708   RPS_sampler_update (prot_sampler,   peer);
709   RPS_sampler_update (client_sampler, peer);
710   if (0 < RPS_sampler_count_id (prot_sampler, peer))
711   {
712     /* Make sure we 'know' about this peer */
713     (void) Peers_issue_peer_liveliness_check (peer);
714     /* Establish a channel towards that peer to indicate we are going to send
715      * messages to it */
716     //Peers_indicate_sending_intention (peer);
717   }
718 }
719
720 /**
721  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
722  *        If the peer is not known, liveliness check is issued and it is
723  *        scheduled to be inserted in sampler and view.
724  *
725  * "External sources" refer to every source except the gossip.
726  *
727  * @param peer peer to insert
728  */
729 static void
730 got_peer (const struct GNUNET_PeerIdentity *peer)
731 {
732   /* If we did not know this peer already, insert it into sampler and view */
733   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
734   {
735     Peers_schedule_operation (peer, insert_in_sampler);
736     Peers_schedule_operation (peer, insert_in_view_op);
737   }
738 }
739
740 /**
741  * @brief Checks if there is a sending channel and if it is needed
742  *
743  * @param peer the peer whose sending channel is checked
744  * @return GNUNET_YES if sending channel exists and is still needed
745  *         GNUNET_NO  otherwise
746  */
747 static int
748 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
749 {
750   /* struct GNUNET_CADET_Channel *channel; */
751   if (GNUNET_NO == Peers_check_peer_known (peer))
752   {
753     return GNUNET_NO;
754   }
755   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
756   {
757     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
758          (GNUNET_YES == View_contains_peer (peer)) ||
759          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
760          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
761          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
762     { /* If we want to keep the connection to peer open */
763       return GNUNET_YES;
764     }
765     return GNUNET_NO;
766   }
767   return GNUNET_NO;
768 }
769
770 /**
771  * @brief remove peer from our knowledge, the view, push and pull maps and
772  * samplers.
773  *
774  * @param peer the peer to remove
775  */
776 static void
777 remove_peer (const struct GNUNET_PeerIdentity *peer)
778 {
779   (void) View_remove_peer (peer);
780   CustomPeerMap_remove_peer (pull_map, peer);
781   CustomPeerMap_remove_peer (push_map, peer);
782   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
783   RPS_sampler_reinitialise_by_value (client_sampler, peer);
784   Peers_remove_peer (peer);
785 }
786
787
788 /**
789  * @brief Remove data that is not needed anymore.
790  *
791  * If the sending channel is no longer needed it is destroyed.
792  *
793  * @param peer the peer whose data is about to be cleaned
794  */
795 static void
796 clean_peer (const struct GNUNET_PeerIdentity *peer)
797 {
798   if (GNUNET_NO == check_sending_channel_needed (peer))
799   {
800     LOG (GNUNET_ERROR_TYPE_DEBUG,
801         "Going to remove send channel to peer %s\n",
802         GNUNET_i2s (peer));
803     #ifdef ENABLE_MALICIOUS
804     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
805       (void) Peers_destroy_sending_channel (peer);
806     #else /* ENABLE_MALICIOUS */
807     (void) Peers_destroy_sending_channel (peer);
808     #endif /* ENABLE_MALICIOUS */
809   }
810
811   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
812        (GNUNET_NO == View_contains_peer (peer)) &&
813        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
814        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
815        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
816        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
817        (GNUNET_NO != Peers_check_removable (peer)) )
818   { /* We can safely remove this peer */
819     LOG (GNUNET_ERROR_TYPE_DEBUG,
820         "Going to remove peer %s\n",
821         GNUNET_i2s (peer));
822     remove_peer (peer);
823     return;
824   }
825 }
826
827 /**
828  * @brief This is called when a channel is destroyed.
829  *
830  * Removes peer completely from our knowledge if the send_channel was destroyed
831  * Otherwise simply delete the recv_channel
832  * Also check if the knowledge about this peer is still needed.
833  * If not, remove this peer from our knowledge.
834  *
835  * @param cls The closure
836  * @param channel The channel being closed
837  * @param channel_ctx The context associated with this channel
838  */
839 static void
840 cleanup_destroyed_channel (void *cls,
841                            const struct GNUNET_CADET_Channel *channel,
842                            void *channel_ctx)
843 {
844   struct GNUNET_PeerIdentity *peer;
845
846   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
847       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
848        // FIXME wait for cadet to change this function
849
850   if (GNUNET_NO == Peers_check_peer_known (peer))
851   { /* We don't know a context to that peer */
852     LOG (GNUNET_ERROR_TYPE_WARNING,
853          "channel (%s) without associated context was destroyed\n",
854          GNUNET_i2s (peer));
855     return;
856   }
857
858   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
859   { /* We are in the middle of removing that peer from our knowledge. In this
860        case simply make sure that the channels are cleaned. */
861     Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
862     to_file (file_name_view_log,
863              "-%s\t(cleanup channel, ourself)",
864              GNUNET_i2s_full (peer));
865     return;
866   }
867
868   if (GNUNET_YES ==
869       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
870   { /* Channel used for sending was destroyed */
871     /* Possible causes of channel destruction:
872      *  - ourselves  -> cleaning send channel -> clean context
873      *  - other peer -> peer probably went down -> remove
874      */
875     if (GNUNET_YES == Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_CLEAN))
876     { /* We are about to clean the sending channel. Clean the respective
877        * context */
878       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
879       return;
880     }
881     else
882     { /* Other peer destroyed our sending channel that he is supposed to keep
883        * open. It probably went down. Remove it from our knowledge. */
884       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
885       remove_peer (peer);
886       return;
887     }
888   }
889   else if (GNUNET_YES ==
890       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
891   { /* Channel used for receiving was destroyed */
892     /* Possible causes of channel destruction:
893      *  - ourselves  -> peer tried to establish channel twice -> clean context
894      *  - other peer -> peer doesn't want to send us data -> clean
895      */
896     if (GNUNET_YES ==
897         Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_ESTABLISHED_TWICE))
898     { /* Other peer tried to establish a channel to us twice. We do not accept
899        * that. Clean the context. */
900       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
901       return;
902     }
903     else
904     { /* Other peer doesn't want to send us data anymore. We are free to clean
905        * it. */
906       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
907       clean_peer (peer);
908       return;
909     }
910   }
911   else
912   {
913     LOG (GNUNET_ERROR_TYPE_WARNING,
914         "Destroyed channel is neither sending nor receiving channel\n");
915   }
916 }
917
918 /***********************************************************************
919  * /Util functions
920 ***********************************************************************/
921
922 static void
923 destroy_reply_cls (struct ReplyCls *rep_cls)
924 {
925   struct ClientContext *cli_ctx;
926
927   cli_ctx = rep_cls->cli_ctx;
928   GNUNET_assert (NULL != cli_ctx);
929   if (NULL != rep_cls->req_handle)
930   {
931     RPS_sampler_request_cancel (rep_cls->req_handle);
932   }
933   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
934                                cli_ctx->rep_cls_tail,
935                                rep_cls);
936   GNUNET_free (rep_cls);
937 }
938
939
940 static void
941 destroy_cli_ctx (struct ClientContext *cli_ctx)
942 {
943   GNUNET_assert (NULL != cli_ctx);
944   if (NULL != cli_ctx->rep_cls_head)
945   {
946     LOG (GNUNET_ERROR_TYPE_WARNING,
947          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
948     while (NULL != cli_ctx->rep_cls_head)
949       destroy_reply_cls (cli_ctx->rep_cls_head);
950   }
951   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
952                                cli_ctx_tail,
953                                cli_ctx);
954   GNUNET_free (cli_ctx);
955 }
956
957
958 /**
959  * Function called by NSE.
960  *
961  * Updates sizes of sampler list and view and adapt those lists
962  * accordingly.
963  */
964 static void
965 nse_callback (void *cls,
966               struct GNUNET_TIME_Absolute timestamp,
967               double logestimate, double std_dev)
968 {
969   double estimate;
970   //double scale; // TODO this might go gloabal/config
971
972   LOG (GNUNET_ERROR_TYPE_DEBUG,
973        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
974        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
975   //scale = .01;
976   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
977   // GNUNET_NSE_log_estimate_to_n (logestimate);
978   estimate = pow (estimate, 1.0 / 3);
979   // TODO add if std_dev is a number
980   // estimate += (std_dev * scale);
981   if (2 < ceil (estimate))
982   {
983     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
984     sampler_size_est_need = estimate;
985   } else
986     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
987
988   /* If the NSE has changed adapt the lists accordingly */
989   resize_wrapper (prot_sampler, sampler_size_est_need);
990   client_resize_wrapper ();
991 }
992
993
994 /**
995  * Callback called once the requested PeerIDs are ready.
996  *
997  * Sends those to the requesting client.
998  */
999 static void
1000 client_respond (void *cls,
1001                 struct GNUNET_PeerIdentity *peer_ids,
1002                 uint32_t num_peers)
1003 {
1004   struct ReplyCls *reply_cls = cls;
1005   uint32_t i;
1006   struct GNUNET_MQ_Envelope *ev;
1007   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
1008   uint32_t size_needed;
1009   struct ClientContext *cli_ctx;
1010
1011   GNUNET_assert (NULL != reply_cls);
1012   LOG (GNUNET_ERROR_TYPE_DEBUG,
1013        "sampler returned %" PRIu32 " peers:\n",
1014        num_peers);
1015   for (i = 0; i < num_peers; i++)
1016   {
1017     LOG (GNUNET_ERROR_TYPE_DEBUG,
1018          "  %" PRIu32 ": %s\n",
1019          i,
1020          GNUNET_i2s (&peer_ids[i]));
1021   }
1022
1023   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1024                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1025
1026   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1027
1028   ev = GNUNET_MQ_msg_extra (out_msg,
1029                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1030                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1031   out_msg->num_peers = htonl (num_peers);
1032   out_msg->id = htonl (reply_cls->id);
1033
1034   GNUNET_memcpy (&out_msg[1],
1035           peer_ids,
1036           num_peers * sizeof (struct GNUNET_PeerIdentity));
1037   GNUNET_free (peer_ids);
1038
1039   cli_ctx = reply_cls->cli_ctx;
1040   GNUNET_assert (NULL != cli_ctx);
1041   reply_cls->req_handle = NULL;
1042   destroy_reply_cls (reply_cls);
1043   GNUNET_MQ_send (cli_ctx->mq, ev);
1044 }
1045
1046
1047 /**
1048  * Handle RPS request from the client.
1049  *
1050  * @param cls closure
1051  * @param client identification of the client
1052  * @param message the actual message
1053  */
1054 static void
1055 handle_client_request (void *cls,
1056                        const struct GNUNET_RPS_CS_RequestMessage *msg)
1057 {
1058   struct ClientContext *cli_ctx = cls;
1059   uint32_t num_peers;
1060   uint32_t size_needed;
1061   struct ReplyCls *reply_cls;
1062   uint32_t i;
1063
1064   num_peers = ntohl (msg->num_peers);
1065   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1066                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1067
1068   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1069   {
1070     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1071                 "Message received from client has size larger than expected\n");
1072     GNUNET_SERVICE_client_drop (cli_ctx->client);
1073     return;
1074   }
1075
1076   for (i = 0 ; i < num_peers ; i++)
1077     est_request_rate();
1078
1079   LOG (GNUNET_ERROR_TYPE_DEBUG,
1080        "Client requested %" PRIu32 " random peer(s).\n",
1081        num_peers);
1082
1083   reply_cls = GNUNET_new (struct ReplyCls);
1084   reply_cls->id = ntohl (msg->id);
1085   reply_cls->cli_ctx = cli_ctx;
1086   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1087                                                         client_respond,
1088                                                         reply_cls,
1089                                                         num_peers);
1090
1091   GNUNET_assert (NULL != cli_ctx);
1092   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1093                                cli_ctx->rep_cls_tail,
1094                                reply_cls);
1095   GNUNET_SERVICE_client_continue (cli_ctx->client);
1096 }
1097
1098
1099 /**
1100  * @brief Handle a message that requests the cancellation of a request
1101  *
1102  * @param cls unused
1103  * @param client the client that requests the cancellation
1104  * @param message the message containing the id of the request
1105  */
1106 static void
1107 handle_client_request_cancel (void *cls,
1108                           const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
1109 {
1110   struct ClientContext *cli_ctx = cls;
1111   struct ReplyCls *rep_cls;
1112
1113   GNUNET_assert (NULL != cli_ctx);
1114   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1115   rep_cls = cli_ctx->rep_cls_head;
1116   LOG (GNUNET_ERROR_TYPE_DEBUG,
1117       "Client cancels request with id %" PRIu32 "\n",
1118       ntohl (msg->id));
1119   while ( (NULL != rep_cls->next) &&
1120           (rep_cls->id != ntohl (msg->id)) )
1121     rep_cls = rep_cls->next;
1122   GNUNET_assert (rep_cls->id == ntohl (msg->id));
1123   RPS_sampler_request_cancel (rep_cls->req_handle);
1124   destroy_reply_cls (rep_cls);
1125   GNUNET_SERVICE_client_continue (cli_ctx->client);
1126 }
1127
1128
1129 /**
1130  * @brief This function is called, when the client seeds peers.
1131  * It verifies that @a msg is well-formed.
1132  *
1133  * @param cls the closure (#ClientContext)
1134  * @param msg the message
1135  * @return #GNUNET_OK if @a msg is well-formed
1136  */
1137 static int
1138 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
1139 {
1140   struct ClientContext *cli_ctx = cls;
1141   uint16_t msize = ntohs (msg->header.size);
1142   uint32_t num_peers = ntohl (msg->num_peers);
1143
1144   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
1145   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
1146        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
1147   {
1148     GNUNET_break (0);
1149     GNUNET_SERVICE_client_drop (cli_ctx->client);
1150     return GNUNET_SYSERR;
1151   }
1152   return GNUNET_OK;
1153 }
1154
1155
1156 /**
1157  * Handle seed from the client.
1158  *
1159  * @param cls closure
1160  * @param client identification of the client
1161  * @param message the actual message
1162  */
1163 static void
1164 handle_client_seed (void *cls,
1165                     const struct GNUNET_RPS_CS_SeedMessage *msg)
1166 {
1167   struct ClientContext *cli_ctx = cls;
1168   struct GNUNET_PeerIdentity *peers;
1169   uint32_t num_peers;
1170   uint32_t i;
1171
1172   num_peers = ntohl (msg->num_peers);
1173   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1174   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1175   //GNUNET_memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1176
1177   LOG (GNUNET_ERROR_TYPE_DEBUG,
1178        "Client seeded peers:\n");
1179   print_peer_list (peers, num_peers);
1180
1181   for (i = 0; i < num_peers; i++)
1182   {
1183     LOG (GNUNET_ERROR_TYPE_DEBUG,
1184          "Updating samplers with seed %" PRIu32 ": %s\n",
1185          i,
1186          GNUNET_i2s (&peers[i]));
1187
1188     got_peer (&peers[i]);
1189   }
1190
1191   ////GNUNET_free (peers);
1192
1193   GNUNET_SERVICE_client_continue (cli_ctx->client);
1194 }
1195
1196 /**
1197  * Handle a CHECK_LIVE message from another peer.
1198  *
1199  * This does nothing. But without calling #GNUNET_CADET_receive_done()
1200  * the channel is blocked for all other communication.
1201  *
1202  * @param cls Closure
1203  * @param channel The channel the CHECK was received over
1204  * @param channel_ctx The context associated with this channel
1205  * @param msg The message header
1206  */
1207 static int
1208 handle_peer_check (void *cls,
1209                   struct GNUNET_CADET_Channel *channel,
1210                   void **channel_ctx,
1211                   const struct GNUNET_MessageHeader *msg)
1212 {
1213   GNUNET_CADET_receive_done (channel);
1214   return GNUNET_OK;
1215 }
1216
1217 /**
1218  * Handle a PUSH message from another peer.
1219  *
1220  * Check the proof of work and store the PeerID
1221  * in the temporary list for pushed PeerIDs.
1222  *
1223  * @param cls Closure
1224  * @param channel The channel the PUSH was received over
1225  * @param channel_ctx The context associated with this channel
1226  * @param msg The message header
1227  */
1228 static int
1229 handle_peer_push (void *cls,
1230                   struct GNUNET_CADET_Channel *channel,
1231                   void **channel_ctx,
1232                   const struct GNUNET_MessageHeader *msg)
1233 {
1234   const struct GNUNET_PeerIdentity *peer;
1235
1236   // (check the proof of work (?))
1237
1238   peer = (const struct GNUNET_PeerIdentity *)
1239     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1240   // FIXME wait for cadet to change this function
1241
1242   LOG (GNUNET_ERROR_TYPE_DEBUG,
1243        "Received PUSH (%s)\n",
1244        GNUNET_i2s (peer));
1245
1246   #ifdef ENABLE_MALICIOUS
1247   struct AttackedPeer *tmp_att_peer;
1248
1249   if ( (1 == mal_type) ||
1250        (3 == mal_type) )
1251   { /* Try to maximise representation */
1252     tmp_att_peer = GNUNET_new (struct AttackedPeer);
1253     GNUNET_memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1254     if (NULL == att_peer_set)
1255       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1256     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1257                                                              peer))
1258     {
1259       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1260                                    att_peers_tail,
1261                                    tmp_att_peer);
1262       add_peer_array_to_set (peer, 1, att_peer_set);
1263     }
1264     GNUNET_CADET_receive_done (channel);
1265     return GNUNET_OK;
1266   }
1267
1268
1269   else if (2 == mal_type)
1270   { /* We attack one single well-known peer - simply ignore */
1271     GNUNET_CADET_receive_done (channel);
1272     return GNUNET_OK;
1273   }
1274   #endif /* ENABLE_MALICIOUS */
1275
1276   /* Add the sending peer to the push_map */
1277   CustomPeerMap_put (push_map, peer);
1278
1279   GNUNET_CADET_receive_done (channel);
1280   return GNUNET_OK;
1281 }
1282
1283
1284 /**
1285  * Handle PULL REQUEST request message from another peer.
1286  *
1287  * Reply with the view of PeerIDs.
1288  *
1289  * @param cls Closure
1290  * @param channel The channel the PULL REQUEST was received over
1291  * @param channel_ctx The context associated with this channel
1292  * @param msg The message header
1293  */
1294 static int
1295 handle_peer_pull_request (void *cls,
1296                           struct GNUNET_CADET_Channel *channel,
1297                           void **channel_ctx,
1298                           const struct GNUNET_MessageHeader *msg)
1299 {
1300   struct GNUNET_PeerIdentity *peer;
1301   const struct GNUNET_PeerIdentity *view_array;
1302
1303   peer = (struct GNUNET_PeerIdentity *)
1304     GNUNET_CADET_channel_get_info (channel,
1305                                    GNUNET_CADET_OPTION_PEER);
1306   // FIXME wait for cadet to change this function
1307
1308   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
1309
1310   #ifdef ENABLE_MALICIOUS
1311   if (1 == mal_type
1312       || 3 == mal_type)
1313   { /* Try to maximise representation */
1314     send_pull_reply (peer, mal_peers, num_mal_peers);
1315     GNUNET_CADET_receive_done (channel);
1316     return GNUNET_OK;
1317   }
1318
1319   else if (2 == mal_type)
1320   { /* Try to partition network */
1321     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1322     {
1323       send_pull_reply (peer, mal_peers, num_mal_peers);
1324     }
1325     GNUNET_CADET_receive_done (channel);
1326     return GNUNET_OK;
1327   }
1328   #endif /* ENABLE_MALICIOUS */
1329
1330   view_array = View_get_as_array ();
1331
1332   send_pull_reply (peer, view_array, View_size ());
1333
1334   GNUNET_CADET_receive_done (channel);
1335   return GNUNET_OK;
1336 }
1337
1338
1339 /**
1340  * Handle PULL REPLY message from another peer.
1341  *
1342  * Check whether we sent a corresponding request and
1343  * whether this reply is the first one.
1344  *
1345  * @param cls Closure
1346  * @param channel The channel the PUSH was received over
1347  * @param channel_ctx The context associated with this channel
1348  * @param msg The message header
1349  */
1350 static int
1351 handle_peer_pull_reply (void *cls,
1352                         struct GNUNET_CADET_Channel *channel,
1353                         void **channel_ctx,
1354                         const struct GNUNET_MessageHeader *msg)
1355 {
1356   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1357   struct GNUNET_PeerIdentity *peers;
1358   struct GNUNET_PeerIdentity *sender;
1359   uint32_t i;
1360 #ifdef ENABLE_MALICIOUS
1361   struct AttackedPeer *tmp_att_peer;
1362 #endif /* ENABLE_MALICIOUS */
1363
1364   /* Check for protocol violation */
1365   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1366   {
1367     GNUNET_break_op (0);
1368     GNUNET_CADET_receive_done (channel);
1369     return GNUNET_SYSERR;
1370   }
1371
1372   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1373   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1374       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1375   {
1376     LOG (GNUNET_ERROR_TYPE_ERROR,
1377         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1378         ntohl (in_msg->num_peers),
1379         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1380             sizeof (struct GNUNET_PeerIdentity));
1381     GNUNET_break_op (0);
1382     GNUNET_CADET_receive_done (channel);
1383     return GNUNET_SYSERR;
1384   }
1385
1386   // Guess simply casting isn't the nicest way...
1387   // FIXME wait for cadet to change this function
1388   sender = (struct GNUNET_PeerIdentity *)
1389       GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1390
1391   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
1392
1393   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
1394   {
1395     LOG (GNUNET_ERROR_TYPE_WARNING,
1396         "Received a pull reply from a peer we didn't request one from!\n");
1397     GNUNET_CADET_receive_done (channel);
1398     GNUNET_break_op (0);
1399     return GNUNET_OK;
1400   }
1401
1402
1403   #ifdef ENABLE_MALICIOUS
1404   // We shouldn't even receive pull replies as we're not sending
1405   if (2 == mal_type)
1406   {
1407     GNUNET_CADET_receive_done (channel);
1408     return GNUNET_OK;
1409   }
1410   #endif /* ENABLE_MALICIOUS */
1411
1412   /* Do actual logic */
1413   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1414
1415   LOG (GNUNET_ERROR_TYPE_DEBUG,
1416        "PULL REPLY received, got following %u peers:\n",
1417        ntohl (in_msg->num_peers));
1418
1419   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1420   {
1421     LOG (GNUNET_ERROR_TYPE_DEBUG,
1422          "%u. %s\n",
1423          i,
1424          GNUNET_i2s (&peers[i]));
1425
1426     #ifdef ENABLE_MALICIOUS
1427     if ((NULL != att_peer_set) &&
1428         (1 == mal_type || 3 == mal_type))
1429     { /* Add attacked peer to local list */
1430       // TODO check if we sent a request and this was the first reply
1431       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1432                                                                &peers[i])
1433           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1434                                                                   &peers[i])
1435           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1436                                                    &own_identity))
1437       {
1438         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1439         tmp_att_peer->peer_id = peers[i];
1440         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1441                                      att_peers_tail,
1442                                      tmp_att_peer);
1443         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1444       }
1445       continue;
1446     }
1447     #endif /* ENABLE_MALICIOUS */
1448     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
1449                                               &peers[i]))
1450     {
1451       /* Make sure we 'know' about this peer */
1452       (void) Peers_insert_peer (&peers[i]);
1453
1454       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
1455       {
1456         CustomPeerMap_put (pull_map, &peers[i]);
1457       }
1458       else
1459       {
1460         Peers_schedule_operation (&peers[i], insert_in_pull_map);
1461         (void) Peers_issue_peer_liveliness_check (&peers[i]);
1462       }
1463     }
1464   }
1465
1466   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
1467   clean_peer (sender);
1468
1469   GNUNET_CADET_receive_done (channel);
1470   return GNUNET_OK;
1471 }
1472
1473
1474 /**
1475  * Compute a random delay.
1476  * A uniformly distributed value between mean + spread and mean - spread.
1477  *
1478  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1479  * It would return a random value between 2 and 6 min.
1480  *
1481  * @param mean the mean
1482  * @param spread the inverse amount of deviation from the mean
1483  */
1484 static struct GNUNET_TIME_Relative
1485 compute_rand_delay (struct GNUNET_TIME_Relative mean,
1486                     unsigned int spread)
1487 {
1488   struct GNUNET_TIME_Relative half_interval;
1489   struct GNUNET_TIME_Relative ret;
1490   unsigned int rand_delay;
1491   unsigned int max_rand_delay;
1492
1493   if (0 == spread)
1494   {
1495     LOG (GNUNET_ERROR_TYPE_WARNING,
1496          "Not accepting spread of 0\n");
1497     GNUNET_break (0);
1498     GNUNET_assert (0);
1499   }
1500   GNUNET_assert (0 != mean.rel_value_us);
1501
1502   /* Compute random time value between spread * mean and spread * mean */
1503   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1504
1505   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1506   /**
1507    * Compute random value between (0 and 1) * round_interval
1508    * via multiplying round_interval with a 'fraction' (0 to value)/value
1509    */
1510   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1511   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
1512   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1513   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1514
1515   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1516     LOG (GNUNET_ERROR_TYPE_WARNING,
1517          "Returning FOREVER_REL\n");
1518
1519   return ret;
1520 }
1521
1522
1523 /**
1524  * Send single pull request
1525  *
1526  * @param peer_id the peer to send the pull request to.
1527  */
1528 static void
1529 send_pull_request (const struct GNUNET_PeerIdentity *peer)
1530 {
1531   struct GNUNET_MQ_Envelope *ev;
1532
1533   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
1534                                                      Peers_PULL_REPLY_PENDING));
1535   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
1536
1537   LOG (GNUNET_ERROR_TYPE_DEBUG,
1538        "Going to send PULL REQUEST to peer %s.\n",
1539        GNUNET_i2s (peer));
1540
1541   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1542   Peers_send_message (peer, ev, "PULL REQUEST");
1543 }
1544
1545
1546 /**
1547  * Send single push
1548  *
1549  * @param peer_id the peer to send the push to.
1550  */
1551 static void
1552 send_push (const struct GNUNET_PeerIdentity *peer_id)
1553 {
1554   struct GNUNET_MQ_Envelope *ev;
1555
1556   LOG (GNUNET_ERROR_TYPE_DEBUG,
1557        "Going to send PUSH to peer %s.\n",
1558        GNUNET_i2s (peer_id));
1559
1560   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1561   Peers_send_message (peer_id, ev, "PUSH");
1562 }
1563
1564
1565 static void
1566 do_round (void *cls);
1567
1568 static void
1569 do_mal_round (void *cls);
1570
1571 #ifdef ENABLE_MALICIOUS
1572
1573
1574 /**
1575  * @brief This function is called, when the client tells us to act malicious.
1576  * It verifies that @a msg is well-formed.
1577  *
1578  * @param cls the closure (#ClientContext)
1579  * @param msg the message
1580  * @return #GNUNET_OK if @a msg is well-formed
1581  */
1582 static int
1583 check_client_act_malicious (void *cls,
1584                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
1585 {
1586   struct ClientContext *cli_ctx = cls;
1587   uint16_t msize = ntohs (msg->header.size);
1588   uint32_t num_peers = ntohl (msg->num_peers);
1589
1590   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
1591   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
1592        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
1593   {
1594     LOG (GNUNET_ERROR_TYPE_ERROR,
1595         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1596         ntohl (msg->num_peers),
1597         (msize / sizeof (struct GNUNET_PeerIdentity)));
1598     GNUNET_break (0);
1599     GNUNET_SERVICE_client_drop (cli_ctx->client);
1600     return GNUNET_SYSERR;
1601   }
1602   return GNUNET_OK;
1603 }
1604
1605 /**
1606  * Turn RPS service to act malicious.
1607  *
1608  * @param cls Closure
1609  * @param client The client that sent the message
1610  * @param msg The message header
1611  */
1612 static void
1613 handle_client_act_malicious (void *cls,
1614                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
1615 {
1616   struct ClientContext *cli_ctx = cls;
1617   struct GNUNET_PeerIdentity *peers;
1618   uint32_t num_mal_peers_sent;
1619   uint32_t num_mal_peers_old;
1620
1621   /* Do actual logic */
1622   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1623   mal_type = ntohl (msg->type);
1624   if (NULL == mal_peer_set)
1625     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1626
1627   LOG (GNUNET_ERROR_TYPE_DEBUG,
1628        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
1629        mal_type,
1630        ntohl (msg->num_peers));
1631
1632   if (1 == mal_type)
1633   { /* Try to maximise representation */
1634     /* Add other malicious peers to those we already know */
1635
1636     num_mal_peers_sent = ntohl (msg->num_peers);
1637     num_mal_peers_old = num_mal_peers;
1638     GNUNET_array_grow (mal_peers,
1639                        num_mal_peers,
1640                        num_mal_peers + num_mal_peers_sent);
1641     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
1642             peers,
1643             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1644
1645     /* Add all mal peers to mal_peer_set */
1646     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1647                            num_mal_peers_sent,
1648                            mal_peer_set);
1649
1650     /* Substitute do_round () with do_mal_round () */
1651     GNUNET_SCHEDULER_cancel (do_round_task);
1652     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1653   }
1654
1655   else if ( (2 == mal_type) ||
1656             (3 == mal_type) )
1657   { /* Try to partition the network */
1658     /* Add other malicious peers to those we already know */
1659
1660     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
1661     num_mal_peers_old = num_mal_peers;
1662     GNUNET_array_grow (mal_peers,
1663                        num_mal_peers,
1664                        num_mal_peers + num_mal_peers_sent);
1665     if (NULL != mal_peers &&
1666         0 != num_mal_peers)
1667     {
1668       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
1669               peers,
1670               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1671
1672       /* Add all mal peers to mal_peer_set */
1673       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1674                              num_mal_peers_sent,
1675                              mal_peer_set);
1676     }
1677
1678     /* Store the one attacked peer */
1679     GNUNET_memcpy (&attacked_peer,
1680             &msg->attacked_peer,
1681             sizeof (struct GNUNET_PeerIdentity));
1682     /* Set the flag of the attacked peer to valid to avoid problems */
1683     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
1684     {
1685       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1686     }
1687
1688     LOG (GNUNET_ERROR_TYPE_DEBUG,
1689          "Attacked peer is %s\n",
1690          GNUNET_i2s (&attacked_peer));
1691
1692     /* Substitute do_round () with do_mal_round () */
1693     GNUNET_SCHEDULER_cancel (do_round_task);
1694     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1695   }
1696   else if (0 == mal_type)
1697   { /* Stop acting malicious */
1698     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
1699
1700     /* Substitute do_mal_round () with do_round () */
1701     GNUNET_SCHEDULER_cancel (do_round_task);
1702     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1703   }
1704   else
1705   {
1706     GNUNET_break (0);
1707     GNUNET_SERVICE_client_continue (cli_ctx->client);
1708   }
1709   GNUNET_SERVICE_client_continue (cli_ctx->client);
1710 }
1711
1712
1713 /**
1714  * Send out PUSHes and PULLs maliciously.
1715  *
1716  * This is executed regylary.
1717  */
1718 static void
1719 do_mal_round (void *cls)
1720 {
1721   uint32_t num_pushes;
1722   uint32_t i;
1723   struct GNUNET_TIME_Relative time_next_round;
1724   struct AttackedPeer *tmp_att_peer;
1725
1726   LOG (GNUNET_ERROR_TYPE_DEBUG,
1727        "Going to execute next round maliciously type %" PRIu32 ".\n",
1728       mal_type);
1729   do_round_task = NULL;
1730   GNUNET_assert (mal_type <= 3);
1731   /* Do malicious actions */
1732   if (1 == mal_type)
1733   { /* Try to maximise representation */
1734
1735     /* The maximum of pushes we're going to send this round */
1736     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
1737                                          num_attacked_peers),
1738                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1739
1740     LOG (GNUNET_ERROR_TYPE_DEBUG,
1741          "Going to send %" PRIu32 " pushes\n",
1742          num_pushes);
1743
1744     /* Send PUSHes to attacked peers */
1745     for (i = 0 ; i < num_pushes ; i++)
1746     {
1747       if (att_peers_tail == att_peer_index)
1748         att_peer_index = att_peers_head;
1749       else
1750         att_peer_index = att_peer_index->next;
1751
1752       send_push (&att_peer_index->peer_id);
1753     }
1754
1755     /* Send PULLs to some peers to learn about additional peers to attack */
1756     tmp_att_peer = att_peer_index;
1757     for (i = 0 ; i < num_pushes * alpha ; i++)
1758     {
1759       if (att_peers_tail == tmp_att_peer)
1760         tmp_att_peer = att_peers_head;
1761       else
1762         att_peer_index = tmp_att_peer->next;
1763
1764       send_pull_request (&tmp_att_peer->peer_id);
1765     }
1766   }
1767
1768
1769   else if (2 == mal_type)
1770   { /**
1771      * Try to partition the network
1772      * Send as many pushes to the attacked peer as possible
1773      * That is one push per round as it will ignore more.
1774      */
1775     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1776     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
1777       send_push (&attacked_peer);
1778   }
1779
1780
1781   if (3 == mal_type)
1782   { /* Combined attack */
1783
1784     /* Send PUSH to attacked peers */
1785     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
1786     {
1787       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1788       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
1789       {
1790         LOG (GNUNET_ERROR_TYPE_DEBUG,
1791             "Goding to send push to attacked peer (%s)\n",
1792             GNUNET_i2s (&attacked_peer));
1793         send_push (&attacked_peer);
1794       }
1795     }
1796     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1797
1798     /* The maximum of pushes we're going to send this round */
1799     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
1800                                          num_attacked_peers),
1801                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1802
1803     LOG (GNUNET_ERROR_TYPE_DEBUG,
1804          "Going to send %" PRIu32 " pushes\n",
1805          num_pushes);
1806
1807     for (i = 0; i < num_pushes; i++)
1808     {
1809       if (att_peers_tail == att_peer_index)
1810         att_peer_index = att_peers_head;
1811       else
1812         att_peer_index = att_peer_index->next;
1813
1814       send_push (&att_peer_index->peer_id);
1815     }
1816
1817     /* Send PULLs to some peers to learn about additional peers to attack */
1818     tmp_att_peer = att_peer_index;
1819     for (i = 0; i < num_pushes * alpha; i++)
1820     {
1821       if (att_peers_tail == tmp_att_peer)
1822         tmp_att_peer = att_peers_head;
1823       else
1824         att_peer_index = tmp_att_peer->next;
1825
1826       send_pull_request (&tmp_att_peer->peer_id);
1827     }
1828   }
1829
1830   /* Schedule next round */
1831   time_next_round = compute_rand_delay (round_interval, 2);
1832
1833   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
1834   //NULL);
1835   GNUNET_assert (NULL == do_round_task);
1836   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
1837                                                 &do_mal_round, NULL);
1838   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1839 }
1840 #endif /* ENABLE_MALICIOUS */
1841
1842
1843 /**
1844  * Send out PUSHes and PULLs, possibly update #view, samplers.
1845  *
1846  * This is executed regylary.
1847  */
1848 static void
1849 do_round (void *cls)
1850 {
1851   uint32_t i;
1852   const struct GNUNET_PeerIdentity *view_array;
1853   unsigned int *permut;
1854   unsigned int a_peers; /* Number of peers we send pushes to */
1855   unsigned int b_peers; /* Number of peers we send pull requests to */
1856   uint32_t first_border;
1857   uint32_t second_border;
1858   struct GNUNET_PeerIdentity peer;
1859   struct GNUNET_PeerIdentity *update_peer;
1860
1861   LOG (GNUNET_ERROR_TYPE_DEBUG,
1862        "Going to execute next round.\n");
1863   do_round_task = NULL;
1864   LOG (GNUNET_ERROR_TYPE_DEBUG,
1865        "Printing view:\n");
1866   to_file (file_name_view_log,
1867            "___ new round ___");
1868   view_array = View_get_as_array ();
1869   for (i = 0; i < View_size (); i++)
1870   {
1871     LOG (GNUNET_ERROR_TYPE_DEBUG,
1872          "\t%s\n", GNUNET_i2s (&view_array[i]));
1873     to_file (file_name_view_log,
1874              "=%s\t(do round)",
1875              GNUNET_i2s_full (&view_array[i]));
1876   }
1877
1878
1879   /* Send pushes and pull requests */
1880   if (0 < View_size ())
1881   {
1882     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1883                                            View_size ());
1884
1885     /* Send PUSHes */
1886     a_peers = ceil (alpha * View_size ());
1887
1888     LOG (GNUNET_ERROR_TYPE_DEBUG,
1889          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
1890          a_peers, alpha, View_size ());
1891     for (i = 0; i < a_peers; i++)
1892     {
1893       peer = view_array[permut[i]];
1894       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1895       { // FIXME if this fails schedule/loop this for later
1896         send_push (&peer);
1897       }
1898     }
1899
1900     /* Send PULL requests */
1901     b_peers = ceil (beta * View_size ());
1902     first_border = a_peers;
1903     second_border = a_peers + b_peers;
1904     if (second_border > View_size ())
1905     {
1906       first_border = View_size () - b_peers;
1907       second_border = View_size ();
1908     }
1909     LOG (GNUNET_ERROR_TYPE_DEBUG,
1910         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
1911         b_peers, beta, View_size ());
1912     for (i = first_border; i < second_border; i++)
1913     {
1914       peer = view_array[permut[i]];
1915       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
1916           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
1917       { // FIXME if this fails schedule/loop this for later
1918         send_pull_request (&peer);
1919       }
1920     }
1921
1922     GNUNET_free (permut);
1923     permut = NULL;
1924   }
1925
1926
1927   /* Update view */
1928   /* TODO see how many peers are in push-/pull- list! */
1929
1930   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
1931       (0 < CustomPeerMap_size (push_map)) &&
1932       (0 < CustomPeerMap_size (pull_map)))
1933   { /* If conditions for update are fulfilled, update */
1934     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
1935
1936     uint32_t final_size;
1937     uint32_t peers_to_clean_size;
1938     struct GNUNET_PeerIdentity *peers_to_clean;
1939
1940     peers_to_clean = NULL;
1941     peers_to_clean_size = 0;
1942     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
1943     GNUNET_memcpy (peers_to_clean,
1944             view_array,
1945             View_size () * sizeof (struct GNUNET_PeerIdentity));
1946
1947     /* Seems like recreating is the easiest way of emptying the peermap */
1948     View_clear ();
1949     to_file (file_name_view_log,
1950              "--- emptied ---");
1951
1952     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
1953                                 CustomPeerMap_size (push_map));
1954     second_border = first_border +
1955                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
1956                                 CustomPeerMap_size (pull_map));
1957     final_size    = second_border +
1958       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
1959
1960     /* Update view with peers received through PUSHes */
1961     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1962                                            CustomPeerMap_size (push_map));
1963     for (i = 0; i < first_border; i++)
1964     {
1965       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
1966                                                               permut[i]));
1967       to_file (file_name_view_log,
1968                "+%s\t(push list)",
1969                GNUNET_i2s_full (&view_array[i]));
1970       // TODO change the peer_flags accordingly
1971     }
1972     GNUNET_free (permut);
1973     permut = NULL;
1974
1975     /* Update view with peers received through PULLs */
1976     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1977                                            CustomPeerMap_size (pull_map));
1978     for (i = first_border; i < second_border; i++)
1979     {
1980       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
1981             permut[i - first_border]));
1982       to_file (file_name_view_log,
1983                "+%s\t(pull list)",
1984                GNUNET_i2s_full (&view_array[i]));
1985       // TODO change the peer_flags accordingly
1986     }
1987     GNUNET_free (permut);
1988     permut = NULL;
1989
1990     /* Update view with peers from history */
1991     RPS_sampler_get_n_rand_peers (prot_sampler,
1992                                   hist_update,
1993                                   NULL,
1994                                   final_size - second_border);
1995     // TODO change the peer_flags accordingly
1996
1997     for (i = 0; i < View_size (); i++)
1998       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
1999
2000     /* Clean peers that were removed from the view */
2001     for (i = 0; i < peers_to_clean_size; i++)
2002     {
2003       to_file (file_name_view_log,
2004                "-%s",
2005                GNUNET_i2s_full (&peers_to_clean[i]));
2006       clean_peer (&peers_to_clean[i]);
2007       //peer_destroy_channel_send (sender);
2008     }
2009
2010     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
2011   }
2012   else
2013   {
2014     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
2015   }
2016   // TODO independent of that also get some peers from CADET_get_peers()?
2017
2018   LOG (GNUNET_ERROR_TYPE_DEBUG,
2019        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
2020        CustomPeerMap_size (push_map),
2021        CustomPeerMap_size (pull_map),
2022        alpha,
2023        View_size (),
2024        alpha * View_size ());
2025
2026   /* Update samplers */
2027   for (i = 0; i < CustomPeerMap_size (push_map); i++)
2028   {
2029     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
2030     LOG (GNUNET_ERROR_TYPE_DEBUG,
2031          "Updating with peer %s from push list\n",
2032          GNUNET_i2s (update_peer));
2033     insert_in_sampler (NULL, update_peer);
2034     clean_peer (update_peer); /* This cleans only if it is not in the view */
2035     //peer_destroy_channel_send (sender);
2036   }
2037
2038   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
2039   {
2040     LOG (GNUNET_ERROR_TYPE_DEBUG,
2041          "Updating with peer %s from pull list\n",
2042          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
2043     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
2044     /* This cleans only if it is not in the view */
2045     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
2046     //peer_destroy_channel_send (sender);
2047   }
2048
2049
2050   /* Empty push/pull lists */
2051   CustomPeerMap_clear (push_map);
2052   CustomPeerMap_clear (pull_map);
2053
2054   struct GNUNET_TIME_Relative time_next_round;
2055
2056   time_next_round = compute_rand_delay (round_interval, 2);
2057
2058   /* Schedule next round */
2059   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
2060                                                 &do_round, NULL);
2061   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
2062 }
2063
2064
2065 /**
2066  * This is called from GNUNET_CADET_get_peers().
2067  *
2068  * It is called on every peer(ID) that cadet somehow has contact with.
2069  * We use those to initialise the sampler.
2070  */
2071 void
2072 init_peer_cb (void *cls,
2073               const struct GNUNET_PeerIdentity *peer,
2074               int tunnel, // "Do we have a tunnel towards this peer?"
2075               unsigned int n_paths, // "Number of known paths towards this peer"
2076               unsigned int best_path) // "How long is the best path?
2077                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2078 {
2079   if (NULL != peer)
2080   {
2081     LOG (GNUNET_ERROR_TYPE_DEBUG,
2082          "Got peer_id %s from cadet\n",
2083          GNUNET_i2s (peer));
2084     got_peer (peer);
2085   }
2086 }
2087
2088 /**
2089  * @brief Iterator function over stored, valid peers.
2090  *
2091  * We initialise the sampler with those.
2092  *
2093  * @param cls the closure
2094  * @param peer the peer id
2095  * @return #GNUNET_YES if we should continue to
2096  *         iterate,
2097  *         #GNUNET_NO if not.
2098  */
2099 static int
2100 valid_peers_iterator (void *cls,
2101                       const struct GNUNET_PeerIdentity *peer)
2102 {
2103   if (NULL != peer)
2104   {
2105     LOG (GNUNET_ERROR_TYPE_DEBUG,
2106          "Got stored, valid peer %s\n",
2107          GNUNET_i2s (peer));
2108     got_peer (peer);
2109   }
2110   return GNUNET_YES;
2111 }
2112
2113
2114 /**
2115  * Iterator over peers from peerinfo.
2116  *
2117  * @param cls closure
2118  * @param peer id of the peer, NULL for last call
2119  * @param hello hello message for the peer (can be NULL)
2120  * @param error message
2121  */
2122 void
2123 process_peerinfo_peers (void *cls,
2124                         const struct GNUNET_PeerIdentity *peer,
2125                         const struct GNUNET_HELLO_Message *hello,
2126                         const char *err_msg)
2127 {
2128   if (NULL != peer)
2129   {
2130     LOG (GNUNET_ERROR_TYPE_DEBUG,
2131          "Got peer_id %s from peerinfo\n",
2132          GNUNET_i2s (peer));
2133     got_peer (peer);
2134   }
2135 }
2136
2137
2138 /**
2139  * Task run during shutdown.
2140  *
2141  * @param cls unused
2142  */
2143 static void
2144 shutdown_task (void *cls)
2145 {
2146   struct ClientContext *client_ctx;
2147   struct ReplyCls *reply_cls;
2148
2149   LOG (GNUNET_ERROR_TYPE_DEBUG,
2150        "RPS is going down\n");
2151
2152   /* Clean all clients */
2153   for (client_ctx = cli_ctx_head;
2154        NULL != cli_ctx_head;
2155        client_ctx = cli_ctx_head)
2156   {
2157     /* Clean pending requests to the sampler */
2158     for (reply_cls = client_ctx->rep_cls_head;
2159          NULL != client_ctx->rep_cls_head;
2160          reply_cls = client_ctx->rep_cls_head)
2161     {
2162       RPS_sampler_request_cancel (reply_cls->req_handle);
2163       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
2164                                    client_ctx->rep_cls_tail,
2165                                    reply_cls);
2166       GNUNET_free (reply_cls);
2167     }
2168     GNUNET_MQ_destroy (client_ctx->mq);
2169     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
2170     GNUNET_free (client_ctx);
2171   }
2172   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
2173   GNUNET_PEERINFO_disconnect (peerinfo_handle);
2174
2175   if (NULL != do_round_task)
2176   {
2177     GNUNET_SCHEDULER_cancel (do_round_task);
2178     do_round_task = NULL;
2179   }
2180
2181   Peers_terminate ();
2182
2183   GNUNET_NSE_disconnect (nse);
2184   RPS_sampler_destroy (prot_sampler);
2185   RPS_sampler_destroy (client_sampler);
2186   GNUNET_CADET_disconnect (cadet_handle);
2187   View_destroy ();
2188   CustomPeerMap_destroy (push_map);
2189   CustomPeerMap_destroy (pull_map);
2190   #ifdef ENABLE_MALICIOUS
2191   struct AttackedPeer *tmp_att_peer;
2192   GNUNET_free (file_name_view_log);
2193   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2194   if (NULL != mal_peer_set)
2195     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2196   if (NULL != att_peer_set)
2197     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2198   while (NULL != att_peers_head)
2199   {
2200     tmp_att_peer = att_peers_head;
2201     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2202   }
2203   #endif /* ENABLE_MALICIOUS */
2204 }
2205
2206
2207 /**
2208  * Handle client connecting to the service.
2209  *
2210  * @param cls NULL
2211  * @param client the new client
2212  * @param mq the message queue of @a client
2213  * @return @a client
2214  */
2215 static void *
2216 client_connect_cb (void *cls,
2217                    struct GNUNET_SERVICE_Client *client,
2218                    struct GNUNET_MQ_Handle *mq)
2219 {
2220   struct ClientContext *cli_ctx;
2221
2222   LOG (GNUNET_ERROR_TYPE_DEBUG,
2223        "Client connected\n");
2224   if (NULL == client)
2225     return client; /* Server was destroyed before a client connected. Shutting down */
2226   cli_ctx = GNUNET_new (struct ClientContext);
2227   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
2228   cli_ctx->client = client;
2229   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2230                                cli_ctx_tail,
2231                                cli_ctx);
2232   return cli_ctx;
2233 }
2234
2235 /**
2236  * Callback called when a client disconnected from the service
2237  *
2238  * @param cls closure for the service
2239  * @param c the client that disconnected
2240  * @param internal_cls should be equal to @a c
2241  */
2242 static void
2243 client_disconnect_cb (void *cls,
2244                                   struct GNUNET_SERVICE_Client *client,
2245                                   void *internal_cls)
2246 {
2247   struct ClientContext *cli_ctx = internal_cls;
2248
2249   GNUNET_assert (client == cli_ctx->client);
2250   if (NULL == client)
2251   {/* shutdown task - destroy all clients */
2252     while (NULL != cli_ctx_head)
2253       destroy_cli_ctx (cli_ctx_head);
2254   }
2255   else
2256   { /* destroy this client */
2257     LOG (GNUNET_ERROR_TYPE_DEBUG,
2258         "Client disconnected. Destroy its context.\n");
2259     destroy_cli_ctx (cli_ctx);
2260   }
2261 }
2262
2263
2264 /**
2265  * Handle random peer sampling clients.
2266  *
2267  * @param cls closure
2268  * @param c configuration to use
2269  * @param service the initialized service
2270  */
2271 static void
2272 run (void *cls,
2273      const struct GNUNET_CONFIGURATION_Handle *c,
2274      struct GNUNET_SERVICE_Handle *service)
2275 {
2276   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2277     {&handle_peer_check       , GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2278       sizeof (struct GNUNET_MessageHeader)},
2279     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2280       sizeof (struct GNUNET_MessageHeader)},
2281     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2282       sizeof (struct GNUNET_MessageHeader)},
2283     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY, 0},
2284     {NULL, 0, 0}
2285   };
2286
2287   int size;
2288   int out_size;
2289   char* fn_valid_peers;
2290   struct GNUNET_HashCode port;
2291
2292   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2293   cfg = c;
2294
2295
2296   /* Get own ID */
2297   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2298   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2299               "STARTING SERVICE (rps) for peer [%s]\n",
2300               GNUNET_i2s (&own_identity));
2301   #ifdef ENABLE_MALICIOUS
2302   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2303               "Malicious execution compiled in.\n");
2304   #endif /* ENABLE_MALICIOUS */
2305
2306
2307
2308   /* Get time interval from the configuration */
2309   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2310                                                         "ROUNDINTERVAL",
2311                                                         &round_interval))
2312   {
2313     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2314                                "RPS", "ROUNDINTERVAL");
2315     GNUNET_SCHEDULER_shutdown ();
2316     return;
2317   }
2318
2319   /* Get initial size of sampler/view from the configuration */
2320   if (GNUNET_OK !=
2321       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
2322         (long long unsigned int *) &sampler_size_est_need))
2323   {
2324     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2325                                "RPS", "INITSIZE");
2326     GNUNET_SCHEDULER_shutdown ();
2327     return;
2328   }
2329   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %u\n", sampler_size_est_need);
2330
2331   if (GNUNET_OK !=
2332       GNUNET_CONFIGURATION_get_value_filename (cfg,
2333                                                "rps",
2334                                                "FILENAME_VALID_PEERS",
2335                                                &fn_valid_peers))
2336   {
2337     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2338                                "rps", "FILENAME_VALID_PEERS");
2339   }
2340
2341
2342   View_create (4);
2343
2344   /* file_name_view_log */
2345   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
2346   {
2347     LOG (GNUNET_ERROR_TYPE_WARNING,
2348          "Failed to create directory /tmp/rps/\n");
2349   }
2350
2351   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
2352   file_name_view_log = GNUNET_malloc (size);
2353   out_size = GNUNET_snprintf (file_name_view_log,
2354                               size,
2355                               "/tmp/rps/view-%s",
2356                               GNUNET_i2s_full (&own_identity));
2357   if (size < out_size ||
2358       0 > out_size)
2359   {
2360     LOG (GNUNET_ERROR_TYPE_WARNING,
2361          "Failed to write string to buffer (size: %i, out_size: %i)\n",
2362          size,
2363          out_size);
2364   }
2365
2366
2367   /* connect to NSE */
2368   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2369
2370
2371   alpha = 0.45;
2372   beta  = 0.45;
2373
2374
2375   /* Initialise cadet */
2376   cadet_handle = GNUNET_CADET_connect (cfg,
2377                                        cls,
2378                                        &cleanup_destroyed_channel,
2379                                        cadet_handlers);
2380   GNUNET_assert (NULL != cadet_handle);
2381   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
2382                       strlen (GNUNET_APPLICATION_PORT_RPS),
2383                       &port);
2384   GNUNET_CADET_open_port (cadet_handle,
2385                           &port,
2386                           &Peers_handle_inbound_channel, cls);
2387
2388
2389   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
2390   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
2391   GNUNET_free (fn_valid_peers);
2392
2393   /* Initialise sampler */
2394   struct GNUNET_TIME_Relative half_round_interval;
2395   struct GNUNET_TIME_Relative  max_round_interval;
2396
2397   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
2398   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2399
2400   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
2401   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
2402
2403   /* Initialise push and pull maps */
2404   push_map = CustomPeerMap_create (4);
2405   pull_map = CustomPeerMap_create (4);
2406
2407
2408   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2409   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2410   // TODO send push/pull to each of those peers?
2411   // TODO read stored valid peers from last run
2412   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
2413   Peers_get_valid_peers (valid_peers_iterator, NULL);
2414
2415   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
2416                                                    GNUNET_NO,
2417                                                    process_peerinfo_peers,
2418                                                    NULL);
2419
2420   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
2421
2422   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2423   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2424
2425   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2426 }
2427
2428
2429 /**
2430  * Define "main" method using service macro.
2431  */
2432 GNUNET_SERVICE_MAIN
2433 ("rps",
2434  GNUNET_SERVICE_OPTION_NONE,
2435  &run,
2436  &client_connect_cb,
2437  &client_disconnect_cb,
2438  NULL,
2439  GNUNET_MQ_hd_fixed_size (client_request,
2440    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2441    struct GNUNET_RPS_CS_RequestMessage,
2442    NULL),
2443  GNUNET_MQ_hd_fixed_size (client_request_cancel,
2444    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2445    struct GNUNET_RPS_CS_RequestCancelMessage,
2446    NULL),
2447  GNUNET_MQ_hd_var_size (client_seed,
2448    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
2449    struct GNUNET_RPS_CS_SeedMessage,
2450    NULL),
2451 #ifdef ENABLE_MALICIOUS
2452  GNUNET_MQ_hd_var_size (client_act_malicious,
2453    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
2454    struct GNUNET_RPS_CS_ActMaliciousMessage,
2455    NULL),
2456 #endif /* ENABLE_MALICIOUS */
2457  GNUNET_MQ_handler_end());
2458
2459 /* end of gnunet-service-rps.c */