-fix rps service: handle client service types correctly
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "rps.h"
33 #include "rps-test_util.h"
34 #include "gnunet-service-rps_sampler.h"
35 #include "gnunet-service-rps_custommap.h"
36 #include "gnunet-service-rps_peers.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Our own identity.
65  */
66 static struct GNUNET_PeerIdentity own_identity;
67
68
69 /***********************************************************************
70  * Housekeeping with clients
71 ***********************************************************************/
72
73 /**
74  * Closure used to pass the client and the id to the callback
75  * that replies to a client's request
76  */
77 struct ReplyCls
78 {
79   /**
80    * DLL
81    */
82   struct ReplyCls *next;
83   struct ReplyCls *prev;
84
85   /**
86    * The identifier of the request
87    */
88   uint32_t id;
89
90   /**
91    * The handle to the request
92    */
93   struct RPS_SamplerRequestHandle *req_handle;
94
95   /**
96    * The client handle to send the reply to
97    */
98   struct ClientContext *cli_ctx;
99 };
100
101
102 /**
103  * Struct used to store the context of a connected client.
104  */
105 struct ClientContext
106 {
107   /**
108    * DLL
109    */
110   struct ClientContext *next;
111   struct ClientContext *prev;
112
113   /**
114    * The message queue to communicate with the client.
115    */
116   struct GNUNET_MQ_Handle *mq;
117
118   /**
119    * DLL with handles to single requests from the client
120    */
121   struct ReplyCls *rep_cls_head;
122   struct ReplyCls *rep_cls_tail;
123
124   /**
125    * The client handle to send the reply to
126    */
127   struct GNUNET_SERVICE_Client *client;
128 };
129
130 /**
131  * DLL with all clients currently connected to us
132  */
133 struct ClientContext *cli_ctx_head;
134 struct ClientContext *cli_ctx_tail;
135
136 /***********************************************************************
137  * /Housekeeping with clients
138 ***********************************************************************/
139
140
141
142
143
144 /***********************************************************************
145  * Globals
146 ***********************************************************************/
147
148 /**
149  * Sampler used for the Brahms protocol itself.
150  */
151 static struct RPS_Sampler *prot_sampler;
152
153 /**
154  * Sampler used for the clients.
155  */
156 static struct RPS_Sampler *client_sampler;
157
158 /**
159  * Name to log view to
160  */
161 static char *file_name_view_log;
162
163 /**
164  * The size of sampler we need to be able to satisfy the client's need
165  * of random peers.
166  */
167 static unsigned int sampler_size_client_need;
168
169 /**
170  * The size of sampler we need to be able to satisfy the Brahms protocol's
171  * need of random peers.
172  *
173  * This is one minimum size the sampler grows to.
174  */
175 static unsigned int sampler_size_est_need;
176
177 /**
178  * Percentage of total peer number in the view
179  * to send random PUSHes to
180  */
181 static float alpha;
182
183 /**
184  * Percentage of total peer number in the view
185  * to send random PULLs to
186  */
187 static float beta;
188
189 /**
190  * Identifier for the main task that runs periodically.
191  */
192 static struct GNUNET_SCHEDULER_Task *do_round_task;
193
194 /**
195  * Time inverval the do_round task runs in.
196  */
197 static struct GNUNET_TIME_Relative round_interval;
198
199 /**
200  * List to store peers received through pushes temporary.
201  */
202 static struct CustomPeerMap *push_map;
203
204 /**
205  * List to store peers received through pulls temporary.
206  */
207 static struct CustomPeerMap *pull_map;
208
209 /**
210  * Handler to NSE.
211  */
212 static struct GNUNET_NSE_Handle *nse;
213
214 /**
215  * Handler to CADET.
216  */
217 static struct GNUNET_CADET_Handle *cadet_handle;
218
219 /**
220  * Handler to PEERINFO.
221  */
222 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
223
224 /**
225  * Handle for cancellation of iteration over peers.
226  */
227 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
228
229 /**
230  * Request counter.
231  *
232  * Counts how many requets clients already issued.
233  * Only needed in the beginning to check how many of the 64 deltas
234  * we already have
235  */
236 static unsigned int req_counter;
237
238 /**
239  * Time of the last request we received.
240  *
241  * Used to compute the expected request rate.
242  */
243 static struct GNUNET_TIME_Absolute last_request;
244
245 /**
246  * Size of #request_deltas.
247  */
248 #define REQUEST_DELTAS_SIZE 64
249 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
250
251 /**
252  * Last 64 deltas between requests
253  */
254 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
255
256 /**
257  * The prediction of the rate of requests
258  */
259 static struct GNUNET_TIME_Relative request_rate;
260
261
262 #ifdef ENABLE_MALICIOUS
263 /**
264  * Type of malicious peer
265  *
266  * 0 Don't act malicious at all - Default
267  * 1 Try to maximise representation
268  * 2 Try to partition the network
269  * 3 Combined attack
270  */
271 static uint32_t mal_type;
272
273 /**
274  * Other malicious peers
275  */
276 static struct GNUNET_PeerIdentity *mal_peers;
277
278 /**
279  * Hashmap of malicious peers used as set.
280  * Used to more efficiently check whether we know that peer.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
283
284 /**
285  * Number of other malicious peers
286  */
287 static uint32_t num_mal_peers;
288
289
290 /**
291  * If type is 2 This struct is used to store the attacked peers in a DLL
292  */
293 struct AttackedPeer
294 {
295   /**
296    * DLL
297    */
298   struct AttackedPeer *next;
299   struct AttackedPeer *prev;
300
301   /**
302    * PeerID
303    */
304   struct GNUNET_PeerIdentity peer_id;
305 };
306
307 /**
308  * If type is 2 this is the DLL of attacked peers
309  */
310 static struct AttackedPeer *att_peers_head;
311 static struct AttackedPeer *att_peers_tail;
312
313 /**
314  * This index is used to point to an attacked peer to
315  * implement the round-robin-ish way to select attacked peers.
316  */
317 static struct AttackedPeer *att_peer_index;
318
319 /**
320  * Hashmap of attacked peers used as set.
321  * Used to more efficiently check whether we know that peer.
322  */
323 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
324
325 /**
326  * Number of attacked peers
327  */
328 static uint32_t num_attacked_peers;
329
330 /**
331  * If type is 1 this is the attacked peer
332  */
333 static struct GNUNET_PeerIdentity attacked_peer;
334
335 /**
336  * The limit of PUSHes we can send in one round.
337  * This is an assumption of the Brahms protocol and either implemented
338  * via proof of work
339  * or
340  * assumend to be the bandwidth limitation.
341  */
342 static uint32_t push_limit = 10000;
343 #endif /* ENABLE_MALICIOUS */
344
345
346 /***********************************************************************
347  * /Globals
348 ***********************************************************************/
349
350
351 /***********************************************************************
352  * Util functions
353 ***********************************************************************/
354
355
356 /**
357  * Print peerlist to log.
358  */
359 static void
360 print_peer_list (struct GNUNET_PeerIdentity *list,
361                  unsigned int len)
362 {
363   unsigned int i;
364
365   LOG (GNUNET_ERROR_TYPE_DEBUG,
366        "Printing peer list of length %u at %p:\n",
367        len,
368        list);
369   for (i = 0 ; i < len ; i++)
370   {
371     LOG (GNUNET_ERROR_TYPE_DEBUG,
372          "%u. peer: %s\n",
373          i, GNUNET_i2s (&list[i]));
374   }
375 }
376
377
378 /**
379  * Remove peer from list.
380  */
381 static void
382 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
383                unsigned int *list_size,
384                const struct GNUNET_PeerIdentity *peer)
385 {
386   unsigned int i;
387   struct GNUNET_PeerIdentity *tmp;
388
389   tmp = *peer_list;
390
391   LOG (GNUNET_ERROR_TYPE_DEBUG,
392        "Removing peer %s from list at %p\n",
393        GNUNET_i2s (peer),
394        tmp);
395
396   for ( i = 0 ; i < *list_size ; i++ )
397   {
398     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
399     {
400       if (i < *list_size -1)
401       { /* Not at the last entry -- shift peers left */
402         GNUNET_memcpy (&tmp[i], &tmp[i +1],
403                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
404       }
405       /* Remove last entry (should be now useless PeerID) */
406       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
407     }
408   }
409   *peer_list = tmp;
410 }
411
412
413 /**
414  * Sum all time relatives of an array.
415  */
416 static struct GNUNET_TIME_Relative
417 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
418                 uint32_t arr_size)
419 {
420   struct GNUNET_TIME_Relative sum;
421   uint32_t i;
422
423   sum = GNUNET_TIME_UNIT_ZERO;
424   for ( i = 0 ; i < arr_size ; i++ )
425   {
426     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
427   }
428   return sum;
429 }
430
431
432 /**
433  * Compute the average of given time relatives.
434  */
435 static struct GNUNET_TIME_Relative
436 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
437                 uint32_t arr_size)
438 {
439   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
440                                                       arr_size),
441                                       arr_size);
442 }
443
444
445 /**
446  * Insert PeerID in #view
447  *
448  * Called once we know a peer is live.
449  * Implements #PeerOp
450  *
451  * @return GNUNET_OK if peer was actually inserted
452  *         GNUNET_NO if peer was not inserted
453  */
454 static void
455 insert_in_view_op (void *cls,
456                 const struct GNUNET_PeerIdentity *peer);
457
458 /**
459  * Insert PeerID in #view
460  *
461  * Called once we know a peer is live.
462  *
463  * @return GNUNET_OK if peer was actually inserted
464  *         GNUNET_NO if peer was not inserted
465  */
466 static int
467 insert_in_view (const struct GNUNET_PeerIdentity *peer)
468 {
469   int online;
470
471   online = Peers_check_peer_flag (peer, Peers_ONLINE);
472   if ( (GNUNET_NO == online) ||
473        (GNUNET_SYSERR == online) ) /* peer is not even known */
474   {
475     (void) Peers_issue_peer_liveliness_check (peer);
476     (void) Peers_schedule_operation (peer, insert_in_view_op);
477     return GNUNET_NO;
478   }
479   /* Open channel towards peer to keep connection open */
480   Peers_indicate_sending_intention (peer);
481   return View_put (peer);
482 }
483
484 /**
485  * Put random peer from sampler into the view as history update.
486  */
487 static void
488 hist_update (void *cls,
489              struct GNUNET_PeerIdentity *ids,
490              uint32_t num_peers)
491 {
492   unsigned int i;
493
494   for (i = 0; i < num_peers; i++)
495   {
496     (void) insert_in_view (&ids[i]);
497     to_file (file_name_view_log,
498              "+%s\t(hist)",
499              GNUNET_i2s_full (ids));
500   }
501 }
502
503
504 /**
505  * Wrapper around #RPS_sampler_resize()
506  *
507  * If we do not have enough sampler elements, double current sampler size
508  * If we have more than enough sampler elements, halv current sampler size
509  */
510 static void
511 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
512 {
513   unsigned int sampler_size;
514
515   // TODO statistics
516   // TODO respect the min, max
517   sampler_size = RPS_sampler_get_size (sampler);
518   if (sampler_size > new_size * 4)
519   { /* Shrinking */
520     RPS_sampler_resize (sampler, sampler_size / 2);
521   }
522   else if (sampler_size < new_size)
523   { /* Growing */
524     RPS_sampler_resize (sampler, sampler_size * 2);
525   }
526   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
527 }
528
529
530 /**
531  * Wrapper around #RPS_sampler_resize() resizing the client sampler
532  */
533 static void
534 client_resize_wrapper ()
535 {
536   uint32_t bigger_size;
537
538   // TODO statistics
539
540   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
541
542   // TODO respect the min, max
543   resize_wrapper (client_sampler, bigger_size);
544   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
545       bigger_size);
546 }
547
548
549 /**
550  * Estimate request rate
551  *
552  * Called every time we receive a request from the client.
553  */
554 static void
555 est_request_rate()
556 {
557   struct GNUNET_TIME_Relative max_round_duration;
558
559   if (request_deltas_size > req_counter)
560     req_counter++;
561   if ( 1 < req_counter)
562   {
563     /* Shift last request deltas to the right */
564     memmove (&request_deltas[1],
565         request_deltas,
566         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
567
568     /* Add current delta to beginning */
569     request_deltas[0] =
570         GNUNET_TIME_absolute_get_difference (last_request,
571                                              GNUNET_TIME_absolute_get ());
572     request_rate = T_relative_avg (request_deltas, req_counter);
573     request_rate = (request_rate.rel_value_us < 1) ?
574       GNUNET_TIME_relative_get_unit_ () : request_rate;
575
576     /* Compute the duration a round will maximally take */
577     max_round_duration =
578         GNUNET_TIME_relative_add (round_interval,
579                                   GNUNET_TIME_relative_divide (round_interval, 2));
580
581     /* Set the estimated size the sampler has to have to
582      * satisfy the current client request rate */
583     sampler_size_client_need =
584         max_round_duration.rel_value_us / request_rate.rel_value_us;
585
586     /* Resize the sampler */
587     client_resize_wrapper ();
588   }
589   last_request = GNUNET_TIME_absolute_get ();
590 }
591
592
593 /**
594  * Add all peers in @a peer_array to @a peer_map used as set.
595  *
596  * @param peer_array array containing the peers
597  * @param num_peers number of peers in @peer_array
598  * @param peer_map the peermap to use as set
599  */
600 static void
601 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
602                        unsigned int num_peers,
603                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
604 {
605   unsigned int i;
606   if (NULL == peer_map)
607   {
608     LOG (GNUNET_ERROR_TYPE_WARNING,
609          "Trying to add peers to non-existing peermap.\n");
610     return;
611   }
612
613   for (i = 0; i < num_peers; i++)
614   {
615     GNUNET_CONTAINER_multipeermap_put (peer_map,
616                                        &peer_array[i],
617                                        NULL,
618                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
619   }
620 }
621
622
623 /**
624  * Send a PULL REPLY to @a peer_id
625  *
626  * @param peer_id the peer to send the reply to.
627  * @param peer_ids the peers to send to @a peer_id
628  * @param num_peer_ids the number of peers to send to @a peer_id
629  */
630 static void
631 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
632                  const struct GNUNET_PeerIdentity *peer_ids,
633                  unsigned int num_peer_ids)
634 {
635   uint32_t send_size;
636   struct GNUNET_MQ_Envelope *ev;
637   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
638
639   /* Compute actual size */
640   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
641               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
642
643   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
644     /* Compute number of peers to send
645      * If too long, simply truncate */
646     // TODO select random ones via permutation
647     //      or even better: do good protocol design
648     send_size =
649       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
650        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
651        sizeof (struct GNUNET_PeerIdentity);
652   else
653     send_size = num_peer_ids;
654
655   LOG (GNUNET_ERROR_TYPE_DEBUG,
656       "Going to send PULL REPLY with %u peers to %s\n",
657       send_size, GNUNET_i2s (peer_id));
658
659   ev = GNUNET_MQ_msg_extra (out_msg,
660                             send_size * sizeof (struct GNUNET_PeerIdentity),
661                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
662   out_msg->num_peers = htonl (send_size);
663   GNUNET_memcpy (&out_msg[1], peer_ids,
664          send_size * sizeof (struct GNUNET_PeerIdentity));
665
666   Peers_send_message (peer_id, ev, "PULL REPLY");
667 }
668
669
670 /**
671  * Insert PeerID in #pull_map
672  *
673  * Called once we know a peer is live.
674  */
675 static void
676 insert_in_pull_map (void *cls,
677                     const struct GNUNET_PeerIdentity *peer)
678 {
679   CustomPeerMap_put (pull_map, peer);
680 }
681
682
683 /**
684  * Insert PeerID in #view
685  *
686  * Called once we know a peer is live.
687  * Implements #PeerOp
688  */
689 static void
690 insert_in_view_op (void *cls,
691                 const struct GNUNET_PeerIdentity *peer)
692 {
693   (void) insert_in_view (peer);
694 }
695
696
697 /**
698  * Update sampler with given PeerID.
699  * Implements #PeerOp
700  */
701 static void
702 insert_in_sampler (void *cls,
703                    const struct GNUNET_PeerIdentity *peer)
704 {
705   LOG (GNUNET_ERROR_TYPE_DEBUG,
706        "Updating samplers with peer %s from insert_in_sampler()\n",
707        GNUNET_i2s (peer));
708   RPS_sampler_update (prot_sampler,   peer);
709   RPS_sampler_update (client_sampler, peer);
710   if (0 < RPS_sampler_count_id (prot_sampler, peer))
711   {
712     /* Make sure we 'know' about this peer */
713     (void) Peers_issue_peer_liveliness_check (peer);
714     /* Establish a channel towards that peer to indicate we are going to send
715      * messages to it */
716     //Peers_indicate_sending_intention (peer);
717   }
718 }
719
720 /**
721  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
722  *        If the peer is not known, liveliness check is issued and it is
723  *        scheduled to be inserted in sampler and view.
724  *
725  * "External sources" refer to every source except the gossip.
726  *
727  * @param peer peer to insert
728  */
729 static void
730 got_peer (const struct GNUNET_PeerIdentity *peer)
731 {
732   /* If we did not know this peer already, insert it into sampler and view */
733   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
734   {
735     Peers_schedule_operation (peer, insert_in_sampler);
736     Peers_schedule_operation (peer, insert_in_view_op);
737   }
738 }
739
740 /**
741  * @brief Checks if there is a sending channel and if it is needed
742  *
743  * @param peer the peer whose sending channel is checked
744  * @return GNUNET_YES if sending channel exists and is still needed
745  *         GNUNET_NO  otherwise
746  */
747 static int
748 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
749 {
750   /* struct GNUNET_CADET_Channel *channel; */
751   if (GNUNET_NO == Peers_check_peer_known (peer))
752   {
753     return GNUNET_NO;
754   }
755   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
756   {
757     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
758          (GNUNET_YES == View_contains_peer (peer)) ||
759          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
760          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
761          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
762     { /* If we want to keep the connection to peer open */
763       return GNUNET_YES;
764     }
765     return GNUNET_NO;
766   }
767   return GNUNET_NO;
768 }
769
770 /**
771  * @brief remove peer from our knowledge, the view, push and pull maps and
772  * samplers.
773  *
774  * @param peer the peer to remove
775  */
776 static void
777 remove_peer (const struct GNUNET_PeerIdentity *peer)
778 {
779   (void) View_remove_peer (peer);
780   CustomPeerMap_remove_peer (pull_map, peer);
781   CustomPeerMap_remove_peer (push_map, peer);
782   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
783   RPS_sampler_reinitialise_by_value (client_sampler, peer);
784   Peers_remove_peer (peer);
785 }
786
787
788 /**
789  * @brief Remove data that is not needed anymore.
790  *
791  * If the sending channel is no longer needed it is destroyed.
792  *
793  * @param peer the peer whose data is about to be cleaned
794  */
795 static void
796 clean_peer (const struct GNUNET_PeerIdentity *peer)
797 {
798   if (GNUNET_NO == check_sending_channel_needed (peer))
799   {
800     LOG (GNUNET_ERROR_TYPE_DEBUG,
801         "Going to remove send channel to peer %s\n",
802         GNUNET_i2s (peer));
803     #ifdef ENABLE_MALICIOUS
804     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
805       (void) Peers_destroy_sending_channel (peer);
806     #else /* ENABLE_MALICIOUS */
807     (void) Peers_destroy_sending_channel (peer);
808     #endif /* ENABLE_MALICIOUS */
809   }
810
811   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
812        (GNUNET_NO == View_contains_peer (peer)) &&
813        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
814        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
815        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
816        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
817        (GNUNET_NO != Peers_check_removable (peer)) )
818   { /* We can safely remove this peer */
819     LOG (GNUNET_ERROR_TYPE_DEBUG,
820         "Going to remove peer %s\n",
821         GNUNET_i2s (peer));
822     remove_peer (peer);
823     return;
824   }
825 }
826
827 /**
828  * @brief This is called when a channel is destroyed.
829  *
830  * Removes peer completely from our knowledge if the send_channel was destroyed
831  * Otherwise simply delete the recv_channel
832  * Also check if the knowledge about this peer is still needed.
833  * If not, remove this peer from our knowledge.
834  *
835  * @param cls The closure
836  * @param channel The channel being closed
837  * @param channel_ctx The context associated with this channel
838  */
839 static void
840 cleanup_destroyed_channel (void *cls,
841                            const struct GNUNET_CADET_Channel *channel,
842                            void *channel_ctx)
843 {
844   struct GNUNET_PeerIdentity *peer;
845
846   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
847       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
848        // FIXME wait for cadet to change this function
849
850   if (GNUNET_NO == Peers_check_peer_known (peer))
851   { /* We don't know a context to that peer */
852     LOG (GNUNET_ERROR_TYPE_WARNING,
853          "channel (%s) without associated context was destroyed\n",
854          GNUNET_i2s (peer));
855     return;
856   }
857
858   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
859   { /* We are in the middle of removing that peer from our knowledge. In this
860        case simply make sure that the channels are cleaned. */
861     Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
862     to_file (file_name_view_log,
863              "-%s\t(cleanup channel, ourself)",
864              GNUNET_i2s_full (peer));
865     return;
866   }
867
868   if (GNUNET_YES ==
869       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
870   { /* Channel used for sending was destroyed */
871     /* Possible causes of channel destruction:
872      *  - ourselves  -> cleaning send channel -> clean context
873      *  - other peer -> peer probably went down -> remove
874      */
875     if (GNUNET_YES == Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_CLEAN))
876     { /* We are about to clean the sending channel. Clean the respective
877        * context */
878       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
879       return;
880     }
881     else
882     { /* Other peer destroyed our sending channel that he is supposed to keep
883        * open. It probably went down. Remove it from our knowledge. */
884       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
885       remove_peer (peer);
886       return;
887     }
888   }
889   else if (GNUNET_YES ==
890       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
891   { /* Channel used for receiving was destroyed */
892     /* Possible causes of channel destruction:
893      *  - ourselves  -> peer tried to establish channel twice -> clean context
894      *  - other peer -> peer doesn't want to send us data -> clean
895      */
896     if (GNUNET_YES ==
897         Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_ESTABLISHED_TWICE))
898     { /* Other peer tried to establish a channel to us twice. We do not accept
899        * that. Clean the context. */
900       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
901       return;
902     }
903     else
904     { /* Other peer doesn't want to send us data anymore. We are free to clean
905        * it. */
906       Peers_cleanup_destroyed_channel (cls, channel, channel_ctx);
907       clean_peer (peer);
908       return;
909     }
910   }
911   else
912   {
913     LOG (GNUNET_ERROR_TYPE_WARNING,
914         "Destroyed channel is neither sending nor receiving channel\n");
915   }
916 }
917
918 /***********************************************************************
919  * /Util functions
920 ***********************************************************************/
921
922 static void
923 destroy_reply_cls (struct ReplyCls *rep_cls)
924 {
925   struct ClientContext *cli_ctx;
926
927   cli_ctx = rep_cls->cli_ctx;
928   GNUNET_assert (NULL != cli_ctx);
929   RPS_sampler_request_cancel (rep_cls->req_handle);
930   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
931                                cli_ctx->rep_cls_tail,
932                                rep_cls);
933   GNUNET_free (rep_cls);
934 }
935
936
937 static void
938 destroy_cli_ctx (struct ClientContext *cli_ctx)
939 {
940   GNUNET_assert (NULL != cli_ctx);
941   if (NULL != cli_ctx->rep_cls_head)
942   {
943     LOG (GNUNET_ERROR_TYPE_WARNING,
944          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
945     while (NULL != cli_ctx->rep_cls_head)
946       destroy_reply_cls (cli_ctx->rep_cls_head);
947   }
948   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
949                                cli_ctx_tail,
950                                cli_ctx);
951   GNUNET_free (cli_ctx);
952 }
953
954
955 /**
956  * Function called by NSE.
957  *
958  * Updates sizes of sampler list and view and adapt those lists
959  * accordingly.
960  */
961 static void
962 nse_callback (void *cls,
963               struct GNUNET_TIME_Absolute timestamp,
964               double logestimate, double std_dev)
965 {
966   double estimate;
967   //double scale; // TODO this might go gloabal/config
968
969   LOG (GNUNET_ERROR_TYPE_DEBUG,
970        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
971        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
972   //scale = .01;
973   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
974   // GNUNET_NSE_log_estimate_to_n (logestimate);
975   estimate = pow (estimate, 1.0 / 3);
976   // TODO add if std_dev is a number
977   // estimate += (std_dev * scale);
978   if (2 < ceil (estimate))
979   {
980     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
981     sampler_size_est_need = estimate;
982   } else
983     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
984
985   /* If the NSE has changed adapt the lists accordingly */
986   resize_wrapper (prot_sampler, sampler_size_est_need);
987   client_resize_wrapper ();
988 }
989
990
991 /**
992  * Callback called once the requested PeerIDs are ready.
993  *
994  * Sends those to the requesting client.
995  */
996 static void
997 client_respond (void *cls,
998                 struct GNUNET_PeerIdentity *peer_ids,
999                 uint32_t num_peers)
1000 {
1001   struct ReplyCls *reply_cls = cls;
1002   uint32_t i;
1003   struct GNUNET_MQ_Envelope *ev;
1004   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
1005   uint32_t size_needed;
1006   struct ClientContext *cli_ctx;
1007
1008   GNUNET_assert (NULL != reply_cls);
1009   LOG (GNUNET_ERROR_TYPE_DEBUG,
1010        "sampler returned %" PRIu32 " peers:\n",
1011        num_peers);
1012   for (i = 0; i < num_peers; i++)
1013   {
1014     LOG (GNUNET_ERROR_TYPE_DEBUG,
1015          "  %" PRIu32 ": %s\n",
1016          i,
1017          GNUNET_i2s (&peer_ids[i]));
1018   }
1019
1020   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1021                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1022
1023   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1024
1025   ev = GNUNET_MQ_msg_extra (out_msg,
1026                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1027                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1028   out_msg->num_peers = htonl (num_peers);
1029   out_msg->id = htonl (reply_cls->id);
1030
1031   GNUNET_memcpy (&out_msg[1],
1032           peer_ids,
1033           num_peers * sizeof (struct GNUNET_PeerIdentity));
1034   GNUNET_free (peer_ids);
1035
1036   cli_ctx = reply_cls->cli_ctx;
1037   GNUNET_assert (NULL != cli_ctx);
1038   destroy_reply_cls (reply_cls);
1039   GNUNET_MQ_send (cli_ctx->mq, ev);
1040 }
1041
1042
1043 /**
1044  * Handle RPS request from the client.
1045  *
1046  * @param cls closure
1047  * @param client identification of the client
1048  * @param message the actual message
1049  */
1050 static void
1051 handle_client_request (void *cls,
1052                        const struct GNUNET_RPS_CS_RequestMessage *msg)
1053 {
1054   struct ClientContext *cli_ctx = cls;
1055   uint32_t num_peers;
1056   uint32_t size_needed;
1057   struct ReplyCls *reply_cls;
1058   uint32_t i;
1059
1060   num_peers = ntohl (msg->num_peers);
1061   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1062                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1063
1064   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1065   {
1066     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1067                 "Message received from client has size larger than expected\n");
1068     GNUNET_SERVICE_client_drop (cli_ctx->client);
1069     return;
1070   }
1071
1072   for (i = 0 ; i < num_peers ; i++)
1073     est_request_rate();
1074
1075   LOG (GNUNET_ERROR_TYPE_DEBUG,
1076        "Client requested %" PRIu32 " random peer(s).\n",
1077        num_peers);
1078
1079   reply_cls = GNUNET_new (struct ReplyCls);
1080   reply_cls->id = ntohl (msg->id);
1081   reply_cls->cli_ctx = cli_ctx;
1082   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1083                                                         client_respond,
1084                                                         reply_cls,
1085                                                         num_peers);
1086
1087   GNUNET_assert (NULL != cli_ctx);
1088   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1089                                cli_ctx->rep_cls_tail,
1090                                reply_cls);
1091   GNUNET_SERVICE_client_continue (cli_ctx->client);
1092 }
1093
1094
1095 /**
1096  * @brief Handle a message that requests the cancellation of a request
1097  *
1098  * @param cls unused
1099  * @param client the client that requests the cancellation
1100  * @param message the message containing the id of the request
1101  */
1102 static void
1103 handle_client_request_cancel (void *cls,
1104                           const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
1105 {
1106   struct ClientContext *cli_ctx = cls;
1107   struct ReplyCls *rep_cls;
1108
1109   GNUNET_assert (NULL != cli_ctx);
1110   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1111   rep_cls = cli_ctx->rep_cls_head;
1112   LOG (GNUNET_ERROR_TYPE_DEBUG,
1113       "Client cancels request with id %" PRIu32 "\n",
1114       ntohl (msg->id));
1115   while ( (NULL != rep_cls->next) &&
1116           (rep_cls->id != ntohl (msg->id)) )
1117     rep_cls = rep_cls->next;
1118   GNUNET_assert (rep_cls->id == ntohl (msg->id));
1119   RPS_sampler_request_cancel (rep_cls->req_handle);
1120   destroy_reply_cls (rep_cls);
1121   GNUNET_SERVICE_client_continue (cli_ctx->client);
1122 }
1123
1124
1125 /**
1126  * @brief This function is called, when the client seeds peers.
1127  * It verifies that @a msg is well-formed.
1128  *
1129  * @param cls the closure (#ClientContext)
1130  * @param msg the message
1131  * @return #GNUNET_OK if @a msg is well-formed
1132  */
1133 static int
1134 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
1135 {
1136   struct ClientContext *cli_ctx = cls;
1137   uint16_t msize = ntohs (msg->header.size);
1138   uint32_t num_peers = ntohl (msg->num_peers);
1139
1140   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
1141   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
1142        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
1143   {
1144     GNUNET_break (0);
1145     GNUNET_SERVICE_client_drop (cli_ctx->client);
1146     return GNUNET_SYSERR;
1147   }
1148   return GNUNET_OK;
1149 }
1150
1151
1152 /**
1153  * Handle seed from the client.
1154  *
1155  * @param cls closure
1156  * @param client identification of the client
1157  * @param message the actual message
1158  */
1159 static void
1160 handle_client_seed (void *cls,
1161                     const struct GNUNET_RPS_CS_SeedMessage *msg)
1162 {
1163   struct ClientContext *cli_ctx = cls;
1164   struct GNUNET_PeerIdentity *peers;
1165   uint32_t num_peers;
1166   uint32_t i;
1167
1168   num_peers = ntohl (msg->num_peers);
1169   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1170   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1171   //GNUNET_memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1172
1173   LOG (GNUNET_ERROR_TYPE_DEBUG,
1174        "Client seeded peers:\n");
1175   print_peer_list (peers, num_peers);
1176
1177   for (i = 0; i < num_peers; i++)
1178   {
1179     LOG (GNUNET_ERROR_TYPE_DEBUG,
1180          "Updating samplers with seed %" PRIu32 ": %s\n",
1181          i,
1182          GNUNET_i2s (&peers[i]));
1183
1184     got_peer (&peers[i]);
1185   }
1186
1187   ////GNUNET_free (peers);
1188
1189   GNUNET_SERVICE_client_drop (cli_ctx->client);
1190 }
1191
1192 /**
1193  * Handle a CHECK_LIVE message from another peer.
1194  *
1195  * This does nothing. But without calling #GNUNET_CADET_receive_done()
1196  * the channel is blocked for all other communication.
1197  *
1198  * @param cls Closure
1199  * @param channel The channel the CHECK was received over
1200  * @param channel_ctx The context associated with this channel
1201  * @param msg The message header
1202  */
1203 static int
1204 handle_peer_check (void *cls,
1205                   struct GNUNET_CADET_Channel *channel,
1206                   void **channel_ctx,
1207                   const struct GNUNET_MessageHeader *msg)
1208 {
1209   GNUNET_CADET_receive_done (channel);
1210   return GNUNET_OK;
1211 }
1212
1213 /**
1214  * Handle a PUSH message from another peer.
1215  *
1216  * Check the proof of work and store the PeerID
1217  * in the temporary list for pushed PeerIDs.
1218  *
1219  * @param cls Closure
1220  * @param channel The channel the PUSH was received over
1221  * @param channel_ctx The context associated with this channel
1222  * @param msg The message header
1223  */
1224 static int
1225 handle_peer_push (void *cls,
1226                   struct GNUNET_CADET_Channel *channel,
1227                   void **channel_ctx,
1228                   const struct GNUNET_MessageHeader *msg)
1229 {
1230   const struct GNUNET_PeerIdentity *peer;
1231
1232   // (check the proof of work (?))
1233
1234   peer = (const struct GNUNET_PeerIdentity *)
1235     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1236   // FIXME wait for cadet to change this function
1237
1238   LOG (GNUNET_ERROR_TYPE_DEBUG,
1239        "Received PUSH (%s)\n",
1240        GNUNET_i2s (peer));
1241
1242   #ifdef ENABLE_MALICIOUS
1243   struct AttackedPeer *tmp_att_peer;
1244
1245   if ( (1 == mal_type) ||
1246        (3 == mal_type) )
1247   { /* Try to maximise representation */
1248     tmp_att_peer = GNUNET_new (struct AttackedPeer);
1249     GNUNET_memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1250     if (NULL == att_peer_set)
1251       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1252     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1253                                                              peer))
1254     {
1255       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1256                                    att_peers_tail,
1257                                    tmp_att_peer);
1258       add_peer_array_to_set (peer, 1, att_peer_set);
1259     }
1260     GNUNET_CADET_receive_done (channel);
1261     return GNUNET_OK;
1262   }
1263
1264
1265   else if (2 == mal_type)
1266   { /* We attack one single well-known peer - simply ignore */
1267     GNUNET_CADET_receive_done (channel);
1268     return GNUNET_OK;
1269   }
1270   #endif /* ENABLE_MALICIOUS */
1271
1272   /* Add the sending peer to the push_map */
1273   CustomPeerMap_put (push_map, peer);
1274
1275   GNUNET_CADET_receive_done (channel);
1276   return GNUNET_OK;
1277 }
1278
1279
1280 /**
1281  * Handle PULL REQUEST request message from another peer.
1282  *
1283  * Reply with the view of PeerIDs.
1284  *
1285  * @param cls Closure
1286  * @param channel The channel the PULL REQUEST was received over
1287  * @param channel_ctx The context associated with this channel
1288  * @param msg The message header
1289  */
1290 static int
1291 handle_peer_pull_request (void *cls,
1292                           struct GNUNET_CADET_Channel *channel,
1293                           void **channel_ctx,
1294                           const struct GNUNET_MessageHeader *msg)
1295 {
1296   struct GNUNET_PeerIdentity *peer;
1297   const struct GNUNET_PeerIdentity *view_array;
1298
1299   peer = (struct GNUNET_PeerIdentity *)
1300     GNUNET_CADET_channel_get_info (channel,
1301                                    GNUNET_CADET_OPTION_PEER);
1302   // FIXME wait for cadet to change this function
1303
1304   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
1305
1306   #ifdef ENABLE_MALICIOUS
1307   if (1 == mal_type
1308       || 3 == mal_type)
1309   { /* Try to maximise representation */
1310     send_pull_reply (peer, mal_peers, num_mal_peers);
1311     GNUNET_CADET_receive_done (channel);
1312     return GNUNET_OK;
1313   }
1314
1315   else if (2 == mal_type)
1316   { /* Try to partition network */
1317     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1318     {
1319       send_pull_reply (peer, mal_peers, num_mal_peers);
1320     }
1321     GNUNET_CADET_receive_done (channel);
1322     return GNUNET_OK;
1323   }
1324   #endif /* ENABLE_MALICIOUS */
1325
1326   view_array = View_get_as_array ();
1327
1328   send_pull_reply (peer, view_array, View_size ());
1329
1330   GNUNET_CADET_receive_done (channel);
1331   return GNUNET_OK;
1332 }
1333
1334
1335 /**
1336  * Handle PULL REPLY message from another peer.
1337  *
1338  * Check whether we sent a corresponding request and
1339  * whether this reply is the first one.
1340  *
1341  * @param cls Closure
1342  * @param channel The channel the PUSH was received over
1343  * @param channel_ctx The context associated with this channel
1344  * @param msg The message header
1345  */
1346 static int
1347 handle_peer_pull_reply (void *cls,
1348                         struct GNUNET_CADET_Channel *channel,
1349                         void **channel_ctx,
1350                         const struct GNUNET_MessageHeader *msg)
1351 {
1352   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1353   struct GNUNET_PeerIdentity *peers;
1354   struct GNUNET_PeerIdentity *sender;
1355   uint32_t i;
1356 #ifdef ENABLE_MALICIOUS
1357   struct AttackedPeer *tmp_att_peer;
1358 #endif /* ENABLE_MALICIOUS */
1359
1360   /* Check for protocol violation */
1361   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1362   {
1363     GNUNET_break_op (0);
1364     GNUNET_CADET_receive_done (channel);
1365     return GNUNET_SYSERR;
1366   }
1367
1368   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1369   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1370       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1371   {
1372     LOG (GNUNET_ERROR_TYPE_ERROR,
1373         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1374         ntohl (in_msg->num_peers),
1375         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1376             sizeof (struct GNUNET_PeerIdentity));
1377     GNUNET_break_op (0);
1378     GNUNET_CADET_receive_done (channel);
1379     return GNUNET_SYSERR;
1380   }
1381
1382   // Guess simply casting isn't the nicest way...
1383   // FIXME wait for cadet to change this function
1384   sender = (struct GNUNET_PeerIdentity *)
1385       GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1386
1387   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
1388
1389   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
1390   {
1391     LOG (GNUNET_ERROR_TYPE_WARNING,
1392         "Received a pull reply from a peer we didn't request one from!\n");
1393     GNUNET_CADET_receive_done (channel);
1394     GNUNET_break_op (0);
1395     return GNUNET_OK;
1396   }
1397
1398
1399   #ifdef ENABLE_MALICIOUS
1400   // We shouldn't even receive pull replies as we're not sending
1401   if (2 == mal_type)
1402   {
1403     GNUNET_CADET_receive_done (channel);
1404     return GNUNET_OK;
1405   }
1406   #endif /* ENABLE_MALICIOUS */
1407
1408   /* Do actual logic */
1409   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1410
1411   LOG (GNUNET_ERROR_TYPE_DEBUG,
1412        "PULL REPLY received, got following %u peers:\n",
1413        ntohl (in_msg->num_peers));
1414
1415   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1416   {
1417     LOG (GNUNET_ERROR_TYPE_DEBUG,
1418          "%u. %s\n",
1419          i,
1420          GNUNET_i2s (&peers[i]));
1421
1422     #ifdef ENABLE_MALICIOUS
1423     if ((NULL != att_peer_set) &&
1424         (1 == mal_type || 3 == mal_type))
1425     { /* Add attacked peer to local list */
1426       // TODO check if we sent a request and this was the first reply
1427       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1428                                                                &peers[i])
1429           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1430                                                                   &peers[i])
1431           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1432                                                    &own_identity))
1433       {
1434         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1435         tmp_att_peer->peer_id = peers[i];
1436         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1437                                      att_peers_tail,
1438                                      tmp_att_peer);
1439         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1440       }
1441       continue;
1442     }
1443     #endif /* ENABLE_MALICIOUS */
1444     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
1445                                               &peers[i]))
1446     {
1447       /* Make sure we 'know' about this peer */
1448       (void) Peers_insert_peer (&peers[i]);
1449
1450       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
1451       {
1452         CustomPeerMap_put (pull_map, &peers[i]);
1453       }
1454       else
1455       {
1456         Peers_schedule_operation (&peers[i], insert_in_pull_map);
1457         (void) Peers_issue_peer_liveliness_check (&peers[i]);
1458       }
1459     }
1460   }
1461
1462   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
1463   clean_peer (sender);
1464
1465   GNUNET_CADET_receive_done (channel);
1466   return GNUNET_OK;
1467 }
1468
1469
1470 /**
1471  * Compute a random delay.
1472  * A uniformly distributed value between mean + spread and mean - spread.
1473  *
1474  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1475  * It would return a random value between 2 and 6 min.
1476  *
1477  * @param mean the mean
1478  * @param spread the inverse amount of deviation from the mean
1479  */
1480 static struct GNUNET_TIME_Relative
1481 compute_rand_delay (struct GNUNET_TIME_Relative mean,
1482                     unsigned int spread)
1483 {
1484   struct GNUNET_TIME_Relative half_interval;
1485   struct GNUNET_TIME_Relative ret;
1486   unsigned int rand_delay;
1487   unsigned int max_rand_delay;
1488
1489   if (0 == spread)
1490   {
1491     LOG (GNUNET_ERROR_TYPE_WARNING,
1492          "Not accepting spread of 0\n");
1493     GNUNET_break (0);
1494     GNUNET_assert (0);
1495   }
1496   GNUNET_assert (0 != mean.rel_value_us);
1497
1498   /* Compute random time value between spread * mean and spread * mean */
1499   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1500
1501   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1502   /**
1503    * Compute random value between (0 and 1) * round_interval
1504    * via multiplying round_interval with a 'fraction' (0 to value)/value
1505    */
1506   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1507   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1508   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1509   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1510
1511   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1512     LOG (GNUNET_ERROR_TYPE_WARNING,
1513          "Returning FOREVER_REL\n");
1514
1515   return ret;
1516 }
1517
1518
1519 /**
1520  * Send single pull request
1521  *
1522  * @param peer_id the peer to send the pull request to.
1523  */
1524 static void
1525 send_pull_request (const struct GNUNET_PeerIdentity *peer)
1526 {
1527   struct GNUNET_MQ_Envelope *ev;
1528
1529   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
1530                                                      Peers_PULL_REPLY_PENDING));
1531   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
1532
1533   LOG (GNUNET_ERROR_TYPE_DEBUG,
1534        "Going to send PULL REQUEST to peer %s.\n",
1535        GNUNET_i2s (peer));
1536
1537   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1538   Peers_send_message (peer, ev, "PULL REQUEST");
1539 }
1540
1541
1542 /**
1543  * Send single push
1544  *
1545  * @param peer_id the peer to send the push to.
1546  */
1547 static void
1548 send_push (const struct GNUNET_PeerIdentity *peer_id)
1549 {
1550   struct GNUNET_MQ_Envelope *ev;
1551
1552   LOG (GNUNET_ERROR_TYPE_DEBUG,
1553        "Going to send PUSH to peer %s.\n",
1554        GNUNET_i2s (peer_id));
1555
1556   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1557   Peers_send_message (peer_id, ev, "PUSH");
1558 }
1559
1560
1561 static void
1562 do_round (void *cls);
1563
1564 static void
1565 do_mal_round (void *cls);
1566
1567 #ifdef ENABLE_MALICIOUS
1568
1569
1570 /**
1571  * @brief This function is called, when the client tells us to act malicious.
1572  * It verifies that @a msg is well-formed.
1573  *
1574  * @param cls the closure (#ClientContext)
1575  * @param msg the message
1576  * @return #GNUNET_OK if @a msg is well-formed
1577  */
1578 static int
1579 check_client_act_malicious (void *cls,
1580                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
1581 {
1582   struct ClientContext *cli_ctx = cls;
1583   uint16_t msize = ntohs (msg->header.size);
1584   uint32_t num_peers = ntohl (msg->num_peers);
1585
1586   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
1587   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
1588        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
1589   {
1590     LOG (GNUNET_ERROR_TYPE_ERROR,
1591         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
1592         ntohl (msg->num_peers),
1593         (msize / sizeof (struct GNUNET_PeerIdentity)));
1594     GNUNET_break (0);
1595     GNUNET_SERVICE_client_drop (cli_ctx->client);
1596     return GNUNET_SYSERR;
1597   }
1598   return GNUNET_OK;
1599 }
1600
1601 /**
1602  * Turn RPS service to act malicious.
1603  *
1604  * @param cls Closure
1605  * @param client The client that sent the message
1606  * @param msg The message header
1607  */
1608 static void
1609 handle_client_act_malicious (void *cls,
1610                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
1611 {
1612   struct ClientContext *cli_ctx = cls;
1613   struct GNUNET_PeerIdentity *peers;
1614   uint32_t num_mal_peers_sent;
1615   uint32_t num_mal_peers_old;
1616
1617   /* Do actual logic */
1618   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1619   mal_type = ntohl (msg->type);
1620   if (NULL == mal_peer_set)
1621     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1622
1623   LOG (GNUNET_ERROR_TYPE_DEBUG,
1624        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
1625        mal_type,
1626        ntohl (msg->num_peers));
1627
1628   if (1 == mal_type)
1629   { /* Try to maximise representation */
1630     /* Add other malicious peers to those we already know */
1631
1632     num_mal_peers_sent = ntohl (msg->num_peers);
1633     num_mal_peers_old = num_mal_peers;
1634     GNUNET_array_grow (mal_peers,
1635                        num_mal_peers,
1636                        num_mal_peers + num_mal_peers_sent);
1637     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
1638             peers,
1639             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1640
1641     /* Add all mal peers to mal_peer_set */
1642     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1643                            num_mal_peers_sent,
1644                            mal_peer_set);
1645
1646     /* Substitute do_round () with do_mal_round () */
1647     GNUNET_SCHEDULER_cancel (do_round_task);
1648     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1649   }
1650
1651   else if ( (2 == mal_type) ||
1652             (3 == mal_type) )
1653   { /* Try to partition the network */
1654     /* Add other malicious peers to those we already know */
1655
1656     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
1657     num_mal_peers_old = num_mal_peers;
1658     GNUNET_array_grow (mal_peers,
1659                        num_mal_peers,
1660                        num_mal_peers + num_mal_peers_sent);
1661     if (NULL != mal_peers &&
1662         0 != num_mal_peers)
1663     {
1664       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
1665               peers,
1666               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1667
1668       /* Add all mal peers to mal_peer_set */
1669       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1670                              num_mal_peers_sent,
1671                              mal_peer_set);
1672     }
1673
1674     /* Store the one attacked peer */
1675     GNUNET_memcpy (&attacked_peer,
1676             &msg->attacked_peer,
1677             sizeof (struct GNUNET_PeerIdentity));
1678     /* Set the flag of the attacked peer to valid to avoid problems */
1679     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
1680     {
1681       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1682     }
1683
1684     LOG (GNUNET_ERROR_TYPE_DEBUG,
1685          "Attacked peer is %s\n",
1686          GNUNET_i2s (&attacked_peer));
1687
1688     /* Substitute do_round () with do_mal_round () */
1689     GNUNET_SCHEDULER_cancel (do_round_task);
1690     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1691   }
1692   else if (0 == mal_type)
1693   { /* Stop acting malicious */
1694     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
1695
1696     /* Substitute do_mal_round () with do_round () */
1697     GNUNET_SCHEDULER_cancel (do_round_task);
1698     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1699   }
1700   else
1701   {
1702     GNUNET_break (0);
1703     GNUNET_SERVICE_client_continue (cli_ctx->client);
1704   }
1705   GNUNET_SERVICE_client_continue (cli_ctx->client);
1706 }
1707
1708
1709 /**
1710  * Send out PUSHes and PULLs maliciously.
1711  *
1712  * This is executed regylary.
1713  */
1714 static void
1715 do_mal_round (void *cls)
1716 {
1717   uint32_t num_pushes;
1718   uint32_t i;
1719   struct GNUNET_TIME_Relative time_next_round;
1720   struct AttackedPeer *tmp_att_peer;
1721
1722   LOG (GNUNET_ERROR_TYPE_DEBUG,
1723        "Going to execute next round maliciously type %" PRIu32 ".\n",
1724       mal_type);
1725   do_round_task = NULL;
1726   GNUNET_assert (mal_type <= 3);
1727   /* Do malicious actions */
1728   if (1 == mal_type)
1729   { /* Try to maximise representation */
1730
1731     /* The maximum of pushes we're going to send this round */
1732     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
1733                                          num_attacked_peers),
1734                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1735
1736     LOG (GNUNET_ERROR_TYPE_DEBUG,
1737          "Going to send %" PRIu32 " pushes\n",
1738          num_pushes);
1739
1740     /* Send PUSHes to attacked peers */
1741     for (i = 0 ; i < num_pushes ; i++)
1742     {
1743       if (att_peers_tail == att_peer_index)
1744         att_peer_index = att_peers_head;
1745       else
1746         att_peer_index = att_peer_index->next;
1747
1748       send_push (&att_peer_index->peer_id);
1749     }
1750
1751     /* Send PULLs to some peers to learn about additional peers to attack */
1752     tmp_att_peer = att_peer_index;
1753     for (i = 0 ; i < num_pushes * alpha ; i++)
1754     {
1755       if (att_peers_tail == tmp_att_peer)
1756         tmp_att_peer = att_peers_head;
1757       else
1758         att_peer_index = tmp_att_peer->next;
1759
1760       send_pull_request (&tmp_att_peer->peer_id);
1761     }
1762   }
1763
1764
1765   else if (2 == mal_type)
1766   { /**
1767      * Try to partition the network
1768      * Send as many pushes to the attacked peer as possible
1769      * That is one push per round as it will ignore more.
1770      */
1771     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1772     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
1773       send_push (&attacked_peer);
1774   }
1775
1776
1777   if (3 == mal_type)
1778   { /* Combined attack */
1779
1780     /* Send PUSH to attacked peers */
1781     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
1782     {
1783       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1784       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
1785       {
1786         LOG (GNUNET_ERROR_TYPE_DEBUG,
1787             "Goding to send push to attacked peer (%s)\n",
1788             GNUNET_i2s (&attacked_peer));
1789         send_push (&attacked_peer);
1790       }
1791     }
1792     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
1793
1794     /* The maximum of pushes we're going to send this round */
1795     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
1796                                          num_attacked_peers),
1797                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
1798
1799     LOG (GNUNET_ERROR_TYPE_DEBUG,
1800          "Going to send %" PRIu32 " pushes\n",
1801          num_pushes);
1802
1803     for (i = 0; i < num_pushes; i++)
1804     {
1805       if (att_peers_tail == att_peer_index)
1806         att_peer_index = att_peers_head;
1807       else
1808         att_peer_index = att_peer_index->next;
1809
1810       send_push (&att_peer_index->peer_id);
1811     }
1812
1813     /* Send PULLs to some peers to learn about additional peers to attack */
1814     tmp_att_peer = att_peer_index;
1815     for (i = 0; i < num_pushes * alpha; i++)
1816     {
1817       if (att_peers_tail == tmp_att_peer)
1818         tmp_att_peer = att_peers_head;
1819       else
1820         att_peer_index = tmp_att_peer->next;
1821
1822       send_pull_request (&tmp_att_peer->peer_id);
1823     }
1824   }
1825
1826   /* Schedule next round */
1827   time_next_round = compute_rand_delay (round_interval, 2);
1828
1829   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
1830   //NULL);
1831   GNUNET_assert (NULL == do_round_task);
1832   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
1833                                                 &do_mal_round, NULL);
1834   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
1835 }
1836 #endif /* ENABLE_MALICIOUS */
1837
1838
1839 /**
1840  * Send out PUSHes and PULLs, possibly update #view, samplers.
1841  *
1842  * This is executed regylary.
1843  */
1844 static void
1845 do_round (void *cls)
1846 {
1847   uint32_t i;
1848   const struct GNUNET_PeerIdentity *view_array;
1849   unsigned int *permut;
1850   unsigned int a_peers; /* Number of peers we send pushes to */
1851   unsigned int b_peers; /* Number of peers we send pull requests to */
1852   uint32_t first_border;
1853   uint32_t second_border;
1854   struct GNUNET_PeerIdentity peer;
1855   struct GNUNET_PeerIdentity *update_peer;
1856
1857   LOG (GNUNET_ERROR_TYPE_DEBUG,
1858        "Going to execute next round.\n");
1859   do_round_task = NULL;
1860   LOG (GNUNET_ERROR_TYPE_DEBUG,
1861        "Printing view:\n");
1862   to_file (file_name_view_log,
1863            "___ new round ___");
1864   view_array = View_get_as_array ();
1865   for (i = 0; i < View_size (); i++)
1866   {
1867     LOG (GNUNET_ERROR_TYPE_DEBUG,
1868          "\t%s\n", GNUNET_i2s (&view_array[i]));
1869     to_file (file_name_view_log,
1870              "=%s\t(do round)",
1871              GNUNET_i2s_full (&view_array[i]));
1872   }
1873
1874
1875   /* Send pushes and pull requests */
1876   if (0 < View_size ())
1877   {
1878     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1879                                            View_size ());
1880
1881     /* Send PUSHes */
1882     a_peers = ceil (alpha * View_size ());
1883
1884     LOG (GNUNET_ERROR_TYPE_DEBUG,
1885          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
1886          a_peers, alpha, View_size ());
1887     for (i = 0; i < a_peers; i++)
1888     {
1889       peer = view_array[permut[i]];
1890       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
1891       { // FIXME if this fails schedule/loop this for later
1892         send_push (&peer);
1893       }
1894     }
1895
1896     /* Send PULL requests */
1897     b_peers = ceil (beta * View_size ());
1898     first_border = a_peers;
1899     second_border = a_peers + b_peers;
1900     if (second_border > View_size ())
1901     {
1902       first_border = View_size () - b_peers;
1903       second_border = View_size ();
1904     }
1905     LOG (GNUNET_ERROR_TYPE_DEBUG,
1906         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
1907         b_peers, beta, View_size ());
1908     for (i = first_border; i < second_border; i++)
1909     {
1910       peer = view_array[permut[i]];
1911       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
1912           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
1913       { // FIXME if this fails schedule/loop this for later
1914         send_pull_request (&peer);
1915       }
1916     }
1917
1918     GNUNET_free (permut);
1919     permut = NULL;
1920   }
1921
1922
1923   /* Update view */
1924   /* TODO see how many peers are in push-/pull- list! */
1925
1926   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
1927       (0 < CustomPeerMap_size (push_map)) &&
1928       (0 < CustomPeerMap_size (pull_map)))
1929   { /* If conditions for update are fulfilled, update */
1930     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
1931
1932     uint32_t final_size;
1933     uint32_t peers_to_clean_size;
1934     struct GNUNET_PeerIdentity *peers_to_clean;
1935
1936     peers_to_clean = NULL;
1937     peers_to_clean_size = 0;
1938     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
1939     GNUNET_memcpy (peers_to_clean,
1940             view_array,
1941             View_size () * sizeof (struct GNUNET_PeerIdentity));
1942
1943     /* Seems like recreating is the easiest way of emptying the peermap */
1944     View_clear ();
1945     to_file (file_name_view_log,
1946              "--- emptied ---");
1947
1948     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
1949                                 CustomPeerMap_size (push_map));
1950     second_border = first_border +
1951                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
1952                                 CustomPeerMap_size (pull_map));
1953     final_size    = second_border +
1954       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
1955
1956     /* Update view with peers received through PUSHes */
1957     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1958                                            CustomPeerMap_size (push_map));
1959     for (i = 0; i < first_border; i++)
1960     {
1961       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
1962                                                               permut[i]));
1963       to_file (file_name_view_log,
1964                "+%s\t(push list)",
1965                GNUNET_i2s_full (&view_array[i]));
1966       // TODO change the peer_flags accordingly
1967     }
1968     GNUNET_free (permut);
1969     permut = NULL;
1970
1971     /* Update view with peers received through PULLs */
1972     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
1973                                            CustomPeerMap_size (pull_map));
1974     for (i = first_border; i < second_border; i++)
1975     {
1976       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
1977             permut[i - first_border]));
1978       to_file (file_name_view_log,
1979                "+%s\t(pull list)",
1980                GNUNET_i2s_full (&view_array[i]));
1981       // TODO change the peer_flags accordingly
1982     }
1983     GNUNET_free (permut);
1984     permut = NULL;
1985
1986     /* Update view with peers from history */
1987     RPS_sampler_get_n_rand_peers (prot_sampler,
1988                                   hist_update,
1989                                   NULL,
1990                                   final_size - second_border);
1991     // TODO change the peer_flags accordingly
1992
1993     for (i = 0; i < View_size (); i++)
1994       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
1995
1996     /* Clean peers that were removed from the view */
1997     for (i = 0; i < peers_to_clean_size; i++)
1998     {
1999       to_file (file_name_view_log,
2000                "-%s",
2001                GNUNET_i2s_full (&peers_to_clean[i]));
2002       clean_peer (&peers_to_clean[i]);
2003       //peer_destroy_channel_send (sender);
2004     }
2005
2006     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
2007   }
2008   else
2009   {
2010     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
2011   }
2012   // TODO independent of that also get some peers from CADET_get_peers()?
2013
2014   LOG (GNUNET_ERROR_TYPE_DEBUG,
2015        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
2016        CustomPeerMap_size (push_map),
2017        CustomPeerMap_size (pull_map),
2018        alpha,
2019        View_size (),
2020        alpha * View_size ());
2021
2022   /* Update samplers */
2023   for (i = 0; i < CustomPeerMap_size (push_map); i++)
2024   {
2025     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
2026     LOG (GNUNET_ERROR_TYPE_DEBUG,
2027          "Updating with peer %s from push list\n",
2028          GNUNET_i2s (update_peer));
2029     insert_in_sampler (NULL, update_peer);
2030     clean_peer (update_peer); /* This cleans only if it is not in the view */
2031     //peer_destroy_channel_send (sender);
2032   }
2033
2034   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
2035   {
2036     LOG (GNUNET_ERROR_TYPE_DEBUG,
2037          "Updating with peer %s from pull list\n",
2038          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
2039     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
2040     /* This cleans only if it is not in the view */
2041     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
2042     //peer_destroy_channel_send (sender);
2043   }
2044
2045
2046   /* Empty push/pull lists */
2047   CustomPeerMap_clear (push_map);
2048   CustomPeerMap_clear (pull_map);
2049
2050   struct GNUNET_TIME_Relative time_next_round;
2051
2052   time_next_round = compute_rand_delay (round_interval, 2);
2053
2054   /* Schedule next round */
2055   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
2056                                                 &do_round, NULL);
2057   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
2058 }
2059
2060
2061 /**
2062  * This is called from GNUNET_CADET_get_peers().
2063  *
2064  * It is called on every peer(ID) that cadet somehow has contact with.
2065  * We use those to initialise the sampler.
2066  */
2067 void
2068 init_peer_cb (void *cls,
2069               const struct GNUNET_PeerIdentity *peer,
2070               int tunnel, // "Do we have a tunnel towards this peer?"
2071               unsigned int n_paths, // "Number of known paths towards this peer"
2072               unsigned int best_path) // "How long is the best path?
2073                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2074 {
2075   if (NULL != peer)
2076   {
2077     LOG (GNUNET_ERROR_TYPE_DEBUG,
2078          "Got peer_id %s from cadet\n",
2079          GNUNET_i2s (peer));
2080     got_peer (peer);
2081   }
2082 }
2083
2084 /**
2085  * @brief Iterator function over stored, valid peers.
2086  *
2087  * We initialise the sampler with those.
2088  *
2089  * @param cls the closure
2090  * @param peer the peer id
2091  * @return #GNUNET_YES if we should continue to
2092  *         iterate,
2093  *         #GNUNET_NO if not.
2094  */
2095 static int
2096 valid_peers_iterator (void *cls,
2097                       const struct GNUNET_PeerIdentity *peer)
2098 {
2099   if (NULL != peer)
2100   {
2101     LOG (GNUNET_ERROR_TYPE_DEBUG,
2102          "Got stored, valid peer %s\n",
2103          GNUNET_i2s (peer));
2104     got_peer (peer);
2105   }
2106   return GNUNET_YES;
2107 }
2108
2109
2110 /**
2111  * Iterator over peers from peerinfo.
2112  *
2113  * @param cls closure
2114  * @param peer id of the peer, NULL for last call
2115  * @param hello hello message for the peer (can be NULL)
2116  * @param error message
2117  */
2118 void
2119 process_peerinfo_peers (void *cls,
2120                         const struct GNUNET_PeerIdentity *peer,
2121                         const struct GNUNET_HELLO_Message *hello,
2122                         const char *err_msg)
2123 {
2124   if (NULL != peer)
2125   {
2126     LOG (GNUNET_ERROR_TYPE_DEBUG,
2127          "Got peer_id %s from peerinfo\n",
2128          GNUNET_i2s (peer));
2129     got_peer (peer);
2130   }
2131 }
2132
2133
2134 /**
2135  * Task run during shutdown.
2136  *
2137  * @param cls unused
2138  */
2139 static void
2140 shutdown_task (void *cls)
2141 {
2142   struct ClientContext *client_ctx;
2143   struct ReplyCls *reply_cls;
2144
2145   LOG (GNUNET_ERROR_TYPE_DEBUG,
2146        "RPS is going down\n");
2147
2148   /* Clean all clients */
2149   for (client_ctx = cli_ctx_head;
2150        NULL != cli_ctx_head;
2151        client_ctx = cli_ctx_head)
2152   {
2153     /* Clean pending requests to the sampler */
2154     for (reply_cls = client_ctx->rep_cls_head;
2155          NULL != client_ctx->rep_cls_head;
2156          reply_cls = client_ctx->rep_cls_head)
2157     {
2158       RPS_sampler_request_cancel (reply_cls->req_handle);
2159       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
2160                                    client_ctx->rep_cls_tail,
2161                                    reply_cls);
2162       GNUNET_free (reply_cls);
2163     }
2164     GNUNET_MQ_destroy (client_ctx->mq);
2165     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
2166     GNUNET_free (client_ctx);
2167   }
2168   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
2169   GNUNET_PEERINFO_disconnect (peerinfo_handle);
2170
2171   if (NULL != do_round_task)
2172   {
2173     GNUNET_SCHEDULER_cancel (do_round_task);
2174     do_round_task = NULL;
2175   }
2176
2177   Peers_terminate ();
2178
2179   GNUNET_NSE_disconnect (nse);
2180   RPS_sampler_destroy (prot_sampler);
2181   RPS_sampler_destroy (client_sampler);
2182   GNUNET_CADET_disconnect (cadet_handle);
2183   View_destroy ();
2184   CustomPeerMap_destroy (push_map);
2185   CustomPeerMap_destroy (pull_map);
2186   #ifdef ENABLE_MALICIOUS
2187   struct AttackedPeer *tmp_att_peer;
2188   GNUNET_free (file_name_view_log);
2189   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2190   if (NULL != mal_peer_set)
2191     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2192   if (NULL != att_peer_set)
2193     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2194   while (NULL != att_peers_head)
2195   {
2196     tmp_att_peer = att_peers_head;
2197     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2198   }
2199   #endif /* ENABLE_MALICIOUS */
2200 }
2201
2202
2203 /**
2204  * Handle client connecting to the service.
2205  *
2206  * @param cls NULL
2207  * @param client the new client
2208  * @param mq the message queue of @a client
2209  * @return @a client
2210  */
2211 static void *
2212 client_connect_cb (void *cls,
2213                    struct GNUNET_SERVICE_Client *client,
2214                    struct GNUNET_MQ_Handle *mq)
2215 {
2216   struct ClientContext *cli_ctx;
2217
2218   LOG (GNUNET_ERROR_TYPE_DEBUG,
2219        "Client connected\n");
2220   if (NULL == client)
2221     return client; /* Server was destroyed before a client connected. Shutting down */
2222   cli_ctx = GNUNET_new (struct ClientContext);
2223   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
2224   cli_ctx->client = client;
2225   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2226                                cli_ctx_tail,
2227                                cli_ctx);
2228   return cli_ctx;
2229 }
2230
2231 /**
2232  * Callback called when a client disconnected from the service
2233  *
2234  * @param cls closure for the service
2235  * @param c the client that disconnected
2236  * @param internal_cls should be equal to @a c
2237  */
2238 static void
2239 client_disconnect_cb (void *cls,
2240                                   struct GNUNET_SERVICE_Client *client,
2241                                   void *internal_cls)
2242 {
2243   struct ClientContext *cli_ctx = internal_cls;
2244
2245   GNUNET_assert (client == cli_ctx->client);
2246   if (NULL == client)
2247   {/* shutdown task - destroy all clients */
2248     while (NULL != cli_ctx_head)
2249       destroy_cli_ctx (cli_ctx_head);
2250   }
2251   else
2252   { /* destroy this client */
2253     LOG (GNUNET_ERROR_TYPE_DEBUG,
2254         "Client disconnected. Destroy its context.\n");
2255     destroy_cli_ctx (cli_ctx);
2256   }
2257 }
2258
2259
2260 /**
2261  * Handle random peer sampling clients.
2262  *
2263  * @param cls closure
2264  * @param c configuration to use
2265  * @param service the initialized service
2266  */
2267 static void
2268 run (void *cls,
2269      const struct GNUNET_CONFIGURATION_Handle *c,
2270      struct GNUNET_SERVICE_Handle *service)
2271 {
2272   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2273     {&handle_peer_check       , GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2274       sizeof (struct GNUNET_MessageHeader)},
2275     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2276       sizeof (struct GNUNET_MessageHeader)},
2277     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2278       sizeof (struct GNUNET_MessageHeader)},
2279     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY, 0},
2280     {NULL, 0, 0}
2281   };
2282
2283   int size;
2284   int out_size;
2285   char* fn_valid_peers;
2286   struct GNUNET_HashCode port;
2287
2288   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2289   cfg = c;
2290
2291
2292   /* Get own ID */
2293   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2294   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2295               "STARTING SERVICE (rps) for peer [%s]\n",
2296               GNUNET_i2s (&own_identity));
2297   #ifdef ENABLE_MALICIOUS
2298   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2299               "Malicious execution compiled in.\n");
2300   #endif /* ENABLE_MALICIOUS */
2301
2302
2303
2304   /* Get time interval from the configuration */
2305   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2306                                                         "ROUNDINTERVAL",
2307                                                         &round_interval))
2308   {
2309     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2310                                "RPS", "ROUNDINTERVAL");
2311     GNUNET_SCHEDULER_shutdown ();
2312     return;
2313   }
2314
2315   /* Get initial size of sampler/view from the configuration */
2316   if (GNUNET_OK !=
2317       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
2318         (long long unsigned int *) &sampler_size_est_need))
2319   {
2320     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2321                                "RPS", "INITSIZE");
2322     GNUNET_SCHEDULER_shutdown ();
2323     return;
2324   }
2325   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %u\n", sampler_size_est_need);
2326
2327   if (GNUNET_OK !=
2328       GNUNET_CONFIGURATION_get_value_filename (cfg,
2329                                                "rps",
2330                                                "FILENAME_VALID_PEERS",
2331                                                &fn_valid_peers))
2332   {
2333     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2334                                "rps", "FILENAME_VALID_PEERS");
2335   }
2336
2337
2338   View_create (4);
2339
2340   /* file_name_view_log */
2341   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
2342   {
2343     LOG (GNUNET_ERROR_TYPE_WARNING,
2344          "Failed to create directory /tmp/rps/\n");
2345   }
2346
2347   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
2348   file_name_view_log = GNUNET_malloc (size);
2349   out_size = GNUNET_snprintf (file_name_view_log,
2350                               size,
2351                               "/tmp/rps/view-%s",
2352                               GNUNET_i2s_full (&own_identity));
2353   if (size < out_size ||
2354       0 > out_size)
2355   {
2356     LOG (GNUNET_ERROR_TYPE_WARNING,
2357          "Failed to write string to buffer (size: %i, out_size: %i)\n",
2358          size,
2359          out_size);
2360   }
2361
2362
2363   /* connect to NSE */
2364   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2365
2366
2367   alpha = 0.45;
2368   beta  = 0.45;
2369
2370
2371   /* Initialise cadet */
2372   cadet_handle = GNUNET_CADET_connect (cfg,
2373                                        cls,
2374                                        &cleanup_destroyed_channel,
2375                                        cadet_handlers);
2376   GNUNET_assert (NULL != cadet_handle);
2377   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
2378                       strlen (GNUNET_APPLICATION_PORT_RPS),
2379                       &port);
2380   GNUNET_CADET_open_port (cadet_handle,
2381                           &port,
2382                           &Peers_handle_inbound_channel, cls);
2383
2384
2385   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
2386   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
2387   GNUNET_free (fn_valid_peers);
2388
2389   /* Initialise sampler */
2390   struct GNUNET_TIME_Relative half_round_interval;
2391   struct GNUNET_TIME_Relative  max_round_interval;
2392
2393   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2394   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2395
2396   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
2397   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
2398
2399   /* Initialise push and pull maps */
2400   push_map = CustomPeerMap_create (4);
2401   pull_map = CustomPeerMap_create (4);
2402
2403
2404   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2405   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2406   // TODO send push/pull to each of those peers?
2407   // TODO read stored valid peers from last run
2408   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
2409   Peers_get_valid_peers (valid_peers_iterator, NULL);
2410
2411   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
2412                                                    GNUNET_NO,
2413                                                    process_peerinfo_peers,
2414                                                    NULL);
2415
2416   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
2417
2418   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2419   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2420
2421   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
2422 }
2423
2424
2425 /**
2426  * Define "main" method using service macro.
2427  */
2428 GNUNET_SERVICE_MAIN
2429 ("rps",
2430  GNUNET_SERVICE_OPTION_NONE,
2431  &run,
2432  &client_connect_cb,
2433  &client_disconnect_cb,
2434  NULL,
2435  GNUNET_MQ_hd_fixed_size (client_request,
2436    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2437    struct GNUNET_RPS_CS_RequestMessage,
2438    NULL),
2439  GNUNET_MQ_hd_fixed_size (client_request_cancel,
2440    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2441    struct GNUNET_RPS_CS_RequestCancelMessage,
2442    NULL),
2443  GNUNET_MQ_hd_var_size (client_seed,
2444    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
2445    struct GNUNET_RPS_CS_SeedMessage,
2446    NULL),
2447 #ifdef ENABLE_MALICIOUS
2448  GNUNET_MQ_hd_var_size (client_act_malicious,
2449    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
2450    struct GNUNET_RPS_CS_ActMaliciousMessage,
2451    NULL),
2452 #endif /* ENABLE_MALICIOUS */
2453  GNUNET_MQ_handler_end());
2454
2455 /* end of gnunet-service-rps.c */