-removed obsolete functions
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_cadet_service.h"
29 #include "gnunet_peerinfo_service.h"
30 #include "gnunet_nse_service.h"
31 #include "rps.h"
32 #include "gnunet-service-rps_custommap.h"
33 #include "gnunet-service-rps_view.h"
34 #include "rps-test_util.h"
35
36 #include "gnunet-service-rps_sampler.h"
37
38 #include <math.h>
39 #include <inttypes.h>
40
41 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
42
43 // TODO modify @brief in every file
44
45 // TODO check for overflows
46
47 // TODO align message structs
48
49 // TODO connect to friends
50
51 // TODO store peers somewhere persistent
52
53 // TODO blacklist? (-> mal peer detection on top of brahms)
54
55 // TODO API request_cancel
56
57 // hist_size_init, hist_size_max
58
59 /**
60  * Our configuration.
61  */
62 static const struct GNUNET_CONFIGURATION_Handle *cfg;
63
64 /**
65  * Our own identity.
66  */
67 static struct GNUNET_PeerIdentity own_identity;
68
69
70 /***********************************************************************
71  * Housekeeping with peers
72 ***********************************************************************/
73
74 /**
75  * Closure used to pass the client and the id to the callback
76  * that replies to a client's request
77  */
78 struct ReplyCls
79 {
80   /**
81    * DLL
82    */
83   struct ReplyCls *next;
84   struct ReplyCls *prev;
85
86   /**
87    * The identifier of the request
88    */
89   uint32_t id;
90
91   /**
92    * The handle to the request
93    */
94   struct RPS_SamplerRequestHandle *req_handle;
95
96   /**
97    * The client handle to send the reply to
98    */
99   struct GNUNET_SERVER_Client *client;
100 };
101
102
103 /**
104  * Struct used to store the context of a connected client.
105  */
106 struct ClientContext
107 {
108   /**
109    * DLL
110    */
111   struct ClientContext *next;
112   struct ClientContext *prev;
113
114   /**
115    * The message queue to communicate with the client.
116    */
117   struct GNUNET_MQ_Handle *mq;
118
119   /**
120    * DLL with handles to single requests from the client
121    */
122   struct ReplyCls *rep_cls_head;
123   struct ReplyCls *rep_cls_tail;
124 };
125
126 /**
127  * DLL with all clients currently connected to us
128  */
129 struct ClientContext *cli_ctx_head;
130 struct ClientContext *cli_ctx_tail;
131
132 /**
133  * Used to keep track in what lists single peerIDs are.
134  */
135 enum PeerFlags
136 {
137   /**
138    * If we are waiting for a reply from that peer (sent a pull request).
139    */
140   PULL_REPLY_PENDING   = 0x01,
141
142   IN_OTHER_GOSSIP_LIST = 0x02, // unneeded?
143   IN_OWN_SAMPLER_LIST  = 0x04, // unneeded?
144   IN_OWN_GOSSIP_LIST   = 0x08, // unneeded?
145
146   /**
147    * We set this bit when we can be sure the other peer is/was live.
148    */
149   VALID                = 0x10,
150
151   /**
152    * We set this bit when we are going to destroy the channel to this peer.
153    * When cleanup_channel is called, we know that we wanted to destroy it.
154    * Otherwise the channel to the other peer was destroyed.
155    */
156   TO_DESTROY           = 0x20,
157 };
158
159
160 /**
161  * Functions of this type can be used to be stored at a peer for later execution.
162  */
163 typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer);
164
165 /**
166  * Outstanding operation on peer consisting of callback and closure
167  */
168 struct PeerOutstandingOp
169 {
170   /**
171    * Callback
172    */
173   PeerOp op;
174
175   /**
176    * Closure
177    */
178   void *op_cls;
179 };
180
181
182 /**
183  * List containing all messages that are yet to be send
184  */
185 struct PendingMessage
186 {
187   /**
188    * DLL next, prev
189    */
190   struct PendingMessage *next;
191   struct PendingMessage *prev;
192
193   /**
194    * The envelope to the corresponding message
195    */
196   struct GNUNET_MQ_Envelope *ev;
197
198   /**
199    * The corresponding context
200    */
201   struct PeerContext *peer_ctx;
202
203   /**
204    * The message type
205    */
206   const char *type;
207 };
208
209
210 /**
211  * Struct used to keep track of other peer's status
212  *
213  * This is stored in a multipeermap.
214  */
215 struct PeerContext
216 {
217   /**
218    * Message queue open to client
219    */
220   struct GNUNET_MQ_Handle *mq;
221
222   /**
223    * Channel open to client.
224    */
225   struct GNUNET_CADET_Channel *send_channel;
226
227   /**
228    * Channel open from client.
229    */
230   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
231
232   /**
233    * Array of outstanding operations on this peer.
234    */
235   struct PeerOutstandingOp *outstanding_ops;
236
237   /**
238    * Handle to the callback given to cadet_ntfy_tmt_rdy()
239    *
240    * To be canceled on shutdown.
241    */
242   struct GNUNET_CADET_TransmitHandle *transmit_handle;
243
244   /**
245    * Number of outstanding operations.
246    */
247   unsigned int num_outstanding_ops;
248
249   /**
250    * Identity of the peer
251    */
252   struct GNUNET_PeerIdentity peer_id;
253   
254   /**
255    * Flags indicating status of peer
256    */
257   uint32_t peer_flags;
258
259   /**
260    * DLL with all messages that are yet to be sent
261    */
262   struct PendingMessage *pending_messages_head;
263   struct PendingMessage *pending_messages_tail;
264
265   /**
266    * This is pobably followed by 'statistical' data (when we first saw
267    * him, how did we get his ID, how many pushes (in a timeinterval),
268    * ...)
269    */
270 };
271
272 /***********************************************************************
273  * /Housekeeping with peers
274 ***********************************************************************/
275
276
277
278
279
280 /***********************************************************************
281  * Globals
282 ***********************************************************************/
283
284 /**
285  * Sampler used for the Brahms protocol itself.
286  */
287 static struct RPS_Sampler *prot_sampler;
288
289 /**
290  * Sampler used for the clients.
291  */
292 static struct RPS_Sampler *client_sampler;
293
294 /**
295  * Set of all peers to keep track of them.
296  */
297 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
298
299
300 /**
301  * Name to log view to
302  */
303 static char *file_name_view_log;
304
305
306 /**
307  * The size of sampler we need to be able to satisfy the client's need of
308  * random peers.
309  */
310 static unsigned int sampler_size_client_need;
311
312 /**
313  * The size of sampler we need to be able to satisfy the Brahms protocol's
314  * need of random peers.
315  *
316  * This is one minimum size the sampler grows to.
317  */
318 static unsigned int sampler_size_est_need;
319
320
321 /**
322  * Percentage of total peer number in the view
323  * to send random PUSHes to
324  */
325 static float alpha;
326
327 /**
328  * Percentage of total peer number in the view
329  * to send random PULLs to
330  */
331 static float beta;
332
333 /**
334  * The percentage gamma of history updates.
335  * Simply 1 - alpha - beta
336  */
337
338
339 /**
340  * Identifier for the main task that runs periodically.
341  */
342 static struct GNUNET_SCHEDULER_Task *do_round_task;
343
344 /**
345  * Time inverval the do_round task runs in.
346  */
347 static struct GNUNET_TIME_Relative round_interval;
348
349
350
351 /**
352  * List to store peers received through pushes temporary.
353  */
354 static struct CustomPeerMap *push_map;
355
356 /**
357  * List to store peers received through pulls temporary.
358  */
359 static struct CustomPeerMap *pull_map;
360
361
362 /**
363  * Handler to NSE.
364  */
365 static struct GNUNET_NSE_Handle *nse;
366
367 /**
368  * Handler to CADET.
369  */
370 static struct GNUNET_CADET_Handle *cadet_handle;
371
372 /**
373  * Handler to PEERINFO.
374  */
375 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
376
377 /**
378  * Handle for cancellation of iteration over peers.
379  */
380 struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
381
382
383 /**
384  * Request counter.
385  *
386  * Only needed in the beginning to check how many of the 64 deltas
387  * we already have
388  */
389 static unsigned int req_counter;
390
391 /**
392  * Time of the last request we received.
393  *
394  * Used to compute the expected request rate.
395  */
396 static struct GNUNET_TIME_Absolute last_request;
397
398 /**
399  * Size of #request_deltas.
400  */
401 #define REQUEST_DELTAS_SIZE 64
402 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
403
404 /**
405  * Last 64 deltas between requests
406  */
407 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
408
409 /**
410  * The prediction of the rate of requests
411  */
412 static struct GNUNET_TIME_Relative  request_rate;
413
414
415 /**
416  * Number of history update tasks.
417  */
418 uint32_t num_hist_update_tasks;
419
420
421 #ifdef ENABLE_MALICIOUS
422 /**
423  * Type of malicious peer
424  *
425  * 0 Don't act malicious at all - Default
426  * 1 Try to maximise representation
427  * 2 Try to partition the network
428  * 3 Combined attack
429  */
430 uint32_t mal_type = 0;
431
432 /**
433  * Other malicious peers
434  */
435 static struct GNUNET_PeerIdentity *mal_peers = NULL;
436
437 /**
438  * Hashmap of malicious peers used as set.
439  * Used to more efficiently check whether we know that peer.
440  */
441 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set = NULL;
442
443 /**
444  * Number of other malicious peers
445  */
446 static uint32_t num_mal_peers;
447
448
449 /**
450  * If type is 2 This struct is used to store the attacked peers in a DLL
451  */
452 struct AttackedPeer
453 {
454   /**
455    * DLL
456    */
457   struct AttackedPeer *next;
458   struct AttackedPeer *prev;
459
460   /**
461    * PeerID
462    */
463   struct GNUNET_PeerIdentity peer_id;
464 };
465
466 /**
467  * If type is 2 this is the DLL of attacked peers
468  */
469 static struct AttackedPeer *att_peers_head = NULL;
470 static struct AttackedPeer *att_peers_tail = NULL;
471
472 /**
473  * This index is used to point to an attacked peer to
474  * implement the round-robin-ish way to select attacked peers.
475  */
476 static struct AttackedPeer *att_peer_index = NULL;
477
478 /**
479  * Hashmap of attacked peers used as set.
480  * Used to more efficiently check whether we know that peer.
481  */
482 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set = NULL;
483
484 /**
485  * Number of attacked peers
486  */
487 static uint32_t num_attacked_peers = 0;
488
489
490 /**
491  * If type is 1 this is the attacked peer
492  */
493 static struct GNUNET_PeerIdentity attacked_peer;
494
495 /**
496  * The limit of PUSHes we can send in one round.
497  * This is an assumption of the Brahms protocol and either implemented
498  * via proof of work
499  * or
500  * assumend to be the bandwidth limitation.
501  */
502 static uint32_t push_limit = 10000;
503 #endif /* ENABLE_MALICIOUS */
504
505
506 /***********************************************************************
507  * /Globals
508 ***********************************************************************/
509
510
511
512
513
514
515 /***********************************************************************
516  * Util functions
517 ***********************************************************************/
518
519 /**
520  * Set a peer flag of given peer context.
521  */
522 #define set_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags |= mask)
523
524 /**
525  * Get peer flag of given peer context.
526  */
527 #define get_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags & mask ? GNUNET_YES : GNUNET_NO)
528
529 /**
530  * Unset flag of given peer context.
531  */
532 #define unset_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags &= (~mask))
533
534 /**
535  * Clean the send channel of a peer
536  */
537 void
538 peer_clean (const struct GNUNET_PeerIdentity *peer);
539
540
541 /**
542  * Print peerlist to log.
543  */
544 void
545 print_peer_list (struct GNUNET_PeerIdentity *list, unsigned int len)
546 {
547   unsigned int i;
548
549   LOG (GNUNET_ERROR_TYPE_DEBUG,
550        "Printing peer list of length %u at %p:\n",
551        len,
552        list);
553   for (i = 0 ; i < len ; i++)
554   {
555     LOG (GNUNET_ERROR_TYPE_DEBUG,
556          "%u. peer: %s\n",
557          i, GNUNET_i2s (&list[i]));
558   }
559 }
560
561
562 /**
563  * Remove peer from list.
564  */
565   void
566 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
567                unsigned int *list_size,
568                const struct GNUNET_PeerIdentity *peer)
569 {
570   unsigned int i;
571   struct GNUNET_PeerIdentity *tmp;
572
573   tmp = *peer_list;
574
575   LOG (GNUNET_ERROR_TYPE_DEBUG,
576        "Removing peer %s from list at %p\n",
577        GNUNET_i2s (peer),
578        tmp);
579
580   for ( i = 0 ; i < *list_size ; i++ )
581   {
582     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
583     {
584       if (i < *list_size -1)
585       { /* Not at the last entry -- shift peers left */
586         memcpy (&tmp[i], &tmp[i +1],
587                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
588       }
589       /* Remove last entry (should be now useless PeerID) */
590       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
591     }
592   }
593   *peer_list = tmp;
594 }
595
596
597 /**
598  * Get the context of a peer. If not existing, create.
599  */
600   struct PeerContext *
601 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
602 {
603   struct PeerContext *ctx;
604   int ret;
605
606   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
607   GNUNET_assert (GNUNET_YES == ret);
608   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
609   GNUNET_assert (NULL != ctx);
610   return ctx;
611 }
612
613 /**
614  * Create a new peer context and insert it into the peer map
615  */
616 struct PeerContext *
617 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
618 {
619   struct PeerContext *ctx;
620   int ret;
621
622   ctx = GNUNET_new (struct PeerContext);
623   ctx->peer_id = *peer;
624   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
625                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
626   GNUNET_assert (GNUNET_OK == ret);
627   return ctx;
628 }
629
630
631 /**
632  * Set the peer flag to living and call the outstanding operations on this peer.
633  */
634 static size_t
635 peer_is_live (struct PeerContext *peer_ctx)
636 {
637   struct GNUNET_PeerIdentity *peer;
638
639   /* Cancle transmit_handle if still scheduled */
640   if (NULL != peer_ctx->transmit_handle)
641   {
642     GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle);
643     peer_ctx->transmit_handle = NULL;
644   }
645
646   peer = &peer_ctx->peer_id;
647   set_peer_flag (peer_ctx, VALID);
648
649   LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer));
650
651   if (0 < peer_ctx->num_outstanding_ops)
652   { /* Call outstanding operations */
653     unsigned int i;
654
655     for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
656       peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer);
657     GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0);
658   }
659
660   return 0;
661 }
662
663
664 /**
665  * Callback that is called when a channel was effectively established.
666  * This is given to ntfy_tmt_rdy and called when the channel was
667  * successfully established.
668  */
669 static size_t
670 cadet_ntfy_tmt_rdy_cb (void *cls, size_t size, void *buf)
671 {
672   struct PeerContext *peer_ctx = (struct PeerContext *) cls;
673
674   peer_ctx->transmit_handle = NULL;
675   LOG (GNUNET_ERROR_TYPE_DEBUG,
676        "Set ->transmit_handle = NULL for peer %s\n",
677        GNUNET_i2s (&peer_ctx->peer_id));
678
679   if (NULL != buf
680       && 0 != size)
681   {
682     peer_is_live (peer_ctx);
683   }
684   else
685   {
686     LOG (GNUNET_ERROR_TYPE_WARNING,
687          "Problems establishing a connection to peer %s in order to check liveliness\n",
688          GNUNET_i2s (&peer_ctx->peer_id));
689     // TODO reschedule? cleanup?
690   }
691
692   //if (NULL != peer_ctx->transmit_handle)
693   //{
694   //  LOG (GNUNET_ERROR_TYPE_DEBUG,
695   //       "Trying to cancle transmit_handle for peer %s\n",
696   //       GNUNET_i2s (&peer_ctx->peer_id));
697   //  GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle);
698   //  peer_ctx->transmit_handle = NULL;
699   //}
700
701   return 0;
702 }
703
704
705 /**
706  * Get the channel of a peer. If not existing, create.
707  */
708   struct GNUNET_CADET_Channel *
709 get_channel (const struct GNUNET_PeerIdentity *peer)
710 {
711   struct PeerContext *peer_ctx;
712
713   peer_ctx = get_peer_ctx (peer);
714   if (NULL == peer_ctx->send_channel)
715   {
716     LOG (GNUNET_ERROR_TYPE_DEBUG,
717          "Trying to establish channel to peer %s\n",
718          GNUNET_i2s (peer));
719
720     peer_ctx->send_channel =
721       GNUNET_CADET_channel_create (cadet_handle,
722                                    NULL,
723                                    peer,
724                                    GNUNET_RPS_CADET_PORT,
725                                    GNUNET_CADET_OPTION_RELIABLE);
726
727   }
728   return peer_ctx->send_channel;
729 }
730
731
732 /**
733  * Get the message queue of a specific peer.
734  *
735  * If we already have a message queue open to this client,
736  * simply return it, otherways create one.
737  */
738   struct GNUNET_MQ_Handle *
739 get_mq (const struct GNUNET_PeerIdentity *peer_id)
740 {
741   struct PeerContext *peer_ctx;
742
743   peer_ctx = get_peer_ctx (peer_id);
744
745   GNUNET_assert (NULL == peer_ctx->transmit_handle);
746
747   if (NULL == peer_ctx->mq)
748   {
749     (void) get_channel (peer_id);
750     peer_ctx->mq = GNUNET_CADET_mq_create (peer_ctx->send_channel);
751     //do I have to explicitly put it in the peer_map?
752     (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer_id, peer_ctx,
753                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
754   }
755   return peer_ctx->mq;
756 }
757
758
759 /**
760  * Issue check whether peer is live
761  *
762  * @param peer_ctx the context of the peer
763  */
764 void
765 check_peer_live (struct PeerContext *peer_ctx)
766 {
767   LOG (GNUNET_ERROR_TYPE_DEBUG,
768        "Get informed about peer %s getting live\n",
769        GNUNET_i2s (&peer_ctx->peer_id));
770
771   if (NULL == peer_ctx->transmit_handle &&
772       NULL == peer_ctx->send_channel)
773   {
774     (void) get_channel (&peer_ctx->peer_id);
775     peer_ctx->transmit_handle =
776         GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel,
777                                             GNUNET_NO,
778                                             GNUNET_TIME_UNIT_FOREVER_REL,
779                                             sizeof (struct GNUNET_MessageHeader),
780                                             cadet_ntfy_tmt_rdy_cb,
781                                             peer_ctx);
782   }
783   else if (NULL != peer_ctx->transmit_handle)
784     LOG (GNUNET_ERROR_TYPE_DEBUG,
785          "Already waiting for notification\n");
786   else if (NULL != peer_ctx->send_channel)
787     LOG (GNUNET_ERROR_TYPE_DEBUG,
788          "Already have established channel to peer\n");
789 }
790
791
792 /**
793  * Sum all time relatives of an array.
794   */
795   struct GNUNET_TIME_Relative
796 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
797 {
798   struct GNUNET_TIME_Relative sum;
799   uint32_t i;
800
801   sum = GNUNET_TIME_UNIT_ZERO;
802   for ( i = 0 ; i < arr_size ; i++ )
803   {
804     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
805   }
806   return sum;
807 }
808
809
810 /**
811  * Compute the average of given time relatives.
812  */
813   struct GNUNET_TIME_Relative
814 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size)
815 {
816   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size);
817 }
818
819
820 /**
821  * Insert PeerID in #pull_map
822  *
823  * Called once we know a peer is live.
824  */
825   void
826 insert_in_pull_map (void *cls, const struct GNUNET_PeerIdentity *peer)
827 {
828   CustomPeerMap_put (pull_map, peer);
829 }
830
831 /**
832  * Check whether #insert_in_pull_map was already scheduled
833  */
834   int
835 insert_in_pull_map_scheduled (const struct PeerContext *peer_ctx)
836 {
837   unsigned int i;
838
839   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
840     if (insert_in_pull_map == peer_ctx->outstanding_ops[i].op)
841       return GNUNET_YES;
842   return GNUNET_NO;
843 }
844
845
846 /**
847  * Insert PeerID in #view
848  *
849  * Called once we know a peer is live.
850  */
851   void
852 insert_in_view (void *cls, const struct GNUNET_PeerIdentity *peer)
853 {
854   View_put (peer);
855 }
856
857 /**
858  * Check whether #insert_in_view was already scheduled
859  */
860   int
861 insert_in_view_scheduled (const struct PeerContext *peer_ctx)
862 {
863   unsigned int i;
864
865   for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ )
866     if (insert_in_view == peer_ctx->outstanding_ops[i].op)
867       return GNUNET_YES;
868   return GNUNET_NO;
869 }
870
871
872 /**
873  * Update sampler with given PeerID.
874  */
875   void
876 insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer)
877 {
878   LOG (GNUNET_ERROR_TYPE_DEBUG,
879        "Updating samplers with peer %s from insert_in_sampler()\n",
880        GNUNET_i2s (peer));
881   RPS_sampler_update (prot_sampler,   peer);
882   RPS_sampler_update (client_sampler, peer);
883   if (0 < RPS_sampler_count_id (prot_sampler, peer))
884   {
885     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
886       (void) create_peer_ctx (peer);
887     (void) get_channel (peer);
888   }
889 }
890
891
892 /**
893  * Check whether #insert_in_sampler was already scheduled
894  */
895 static int
896 insert_in_sampler_scheduled (const struct PeerContext *peer_ctx)
897 {
898   unsigned int i;
899
900   for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++)
901     if (insert_in_sampler== peer_ctx->outstanding_ops[i].op)
902       return GNUNET_YES;
903   return GNUNET_NO;
904 }
905
906 /**
907  * Put random peer from sampler into the view as history update.
908  */
909   void
910 hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers)
911 {
912   unsigned int i;
913
914   for (i = 0; i < num_peers; i++)
915   {
916     View_put (&ids[i]);
917     to_file (file_name_view_log,
918              "+%s\t(hist)",
919              GNUNET_i2s_full (ids));
920   }
921   if (0 < num_hist_update_tasks)
922     num_hist_update_tasks--;
923 }
924
925
926
927 /**
928  * Wrapper around #RPS_sampler_resize()
929  *
930  * If we do not have enough sampler elements, double current sampler size
931  * If we have more than enough sampler elements, halv current sampler size
932  */
933 static void
934 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
935 {
936   unsigned int sampler_size;
937
938   // TODO statistics
939   // TODO respect the min, max
940   sampler_size = RPS_sampler_get_size (sampler);
941   if (sampler_size > new_size * 4)
942   { /* Shrinking */
943     RPS_sampler_resize (sampler, sampler_size / 2);
944   }
945   else if (sampler_size < new_size)
946   { /* Growing */
947     RPS_sampler_resize (sampler, sampler_size * 2);
948   }
949   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
950 }
951
952
953 /**
954  * Wrapper around #RPS_sampler_resize() resizing the client sampler
955  */
956 static void
957 client_resize_wrapper ()
958 {
959   uint32_t bigger_size;
960
961   // TODO statistics
962
963   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
964
965   // TODO respect the min, max
966   resize_wrapper (client_sampler, bigger_size);
967   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
968       bigger_size);
969 }
970
971
972 /**
973  * Estimate request rate
974  *
975  * Called every time we receive a request from the client.
976  */
977   void
978 est_request_rate()
979 {
980   struct GNUNET_TIME_Relative max_round_duration;
981
982   if (request_deltas_size > req_counter)
983     req_counter++;
984   if ( 1 < req_counter)
985   {
986     /* Shift last request deltas to the right */
987     memcpy (&request_deltas[1],
988         request_deltas,
989         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
990
991     /* Add current delta to beginning */
992     request_deltas[0] =
993         GNUNET_TIME_absolute_get_difference (last_request,
994                                              GNUNET_TIME_absolute_get ());
995     request_rate = T_relative_avg (request_deltas, req_counter);
996     request_rate = (request_rate.rel_value_us < 1) ?
997       GNUNET_TIME_relative_get_unit_ () : request_rate;
998
999     /* Compute the duration a round will maximally take */
1000     max_round_duration =
1001         GNUNET_TIME_relative_add (round_interval,
1002                                   GNUNET_TIME_relative_divide (round_interval, 2));
1003
1004     /* Set the estimated size the sampler has to have to
1005      * satisfy the current client request rate */
1006     sampler_size_client_need =
1007         max_round_duration.rel_value_us / request_rate.rel_value_us;
1008
1009     /* Resize the sampler */
1010     client_resize_wrapper ();
1011   }
1012   last_request = GNUNET_TIME_absolute_get ();
1013 }
1014
1015
1016 /**
1017  * Add all peers in @a peer_array to @a peer_map used as set.
1018  *
1019  * @param peer_array array containing the peers
1020  * @param num_peers number of peers in @peer_array
1021  * @param peer_map the peermap to use as set
1022  */
1023 static void
1024 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
1025                        unsigned int num_peers,
1026                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
1027 {
1028   unsigned int i;
1029   if (NULL == peer_map)
1030   {
1031     LOG (GNUNET_ERROR_TYPE_WARNING,
1032          "Trying to add peers to non-existing peermap.\n");
1033     return;
1034   }
1035
1036   for (i = 0; i < num_peers; i++)
1037   {
1038     GNUNET_CONTAINER_multipeermap_put (peer_map,
1039                                        &peer_array[i],
1040                                        NULL,
1041                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
1042   }
1043 }
1044
1045
1046 /**
1047  * @brief Add an envelope to a message passed to mq to list of pending messages
1048  *
1049  * @param peer peer the message was sent to
1050  * @param ev envelope to the message
1051  * @param type type of the message to be sent
1052  */
1053 static struct PendingMessage *
1054 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
1055                         struct GNUNET_MQ_Envelope *ev,
1056                         const char *type)
1057 {
1058   struct PendingMessage *pending_msg;
1059   struct PeerContext *peer_ctx;
1060
1061   peer_ctx = get_peer_ctx (peer);
1062   pending_msg = GNUNET_new (struct PendingMessage);
1063   pending_msg->ev = ev;
1064   pending_msg->peer_ctx = peer_ctx;
1065   pending_msg->type = type;
1066   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1067                                peer_ctx->pending_messages_tail,
1068                                pending_msg);
1069   return pending_msg;
1070 }
1071
1072 static void
1073 remove_pending_message (struct PendingMessage *pending_msg)
1074 {
1075   struct PeerContext *peer_ctx;
1076
1077   peer_ctx = pending_msg->peer_ctx;
1078   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1079                                peer_ctx->pending_messages_tail,
1080                                pending_msg);
1081   GNUNET_free (pending_msg);
1082 }
1083
1084
1085 /**
1086  * @brief This is called once a message is sent.
1087  *
1088  * @param cls type of the message that was sent
1089  */
1090 static void
1091 mq_notify_sent_cb (void *cls)
1092 {
1093   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1094   LOG (GNUNET_ERROR_TYPE_DEBUG,
1095       "%s was sent.\n",
1096       pending_msg->type);
1097   remove_pending_message (pending_msg);
1098 }
1099
1100
1101 /**
1102  * Send a PULL REPLY to @a peer_id
1103  *
1104  * @param peer_id the peer to send the reply to.
1105  * @param peer_ids the peers to send to @a peer_id
1106  * @param num_peer_ids the number of peers to send to @a peer_id
1107  */
1108 static void
1109 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
1110                  const struct GNUNET_PeerIdentity *peer_ids,
1111                  unsigned int num_peer_ids)
1112 {
1113   uint32_t send_size;
1114   struct GNUNET_MQ_Handle *mq;
1115   struct GNUNET_MQ_Envelope *ev;
1116   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
1117   struct PendingMessage *pending_msg;
1118
1119   /* Compute actual size */
1120   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
1121               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
1122
1123   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
1124     /* Compute number of peers to send
1125      * If too long, simply truncate */
1126     // TODO select random ones via permutation
1127     //      or even better: do good protocol design
1128     send_size =
1129       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
1130        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1131        sizeof (struct GNUNET_PeerIdentity);
1132   else
1133     send_size = num_peer_ids;
1134
1135   LOG (GNUNET_ERROR_TYPE_DEBUG,
1136       "Going to send PULL REPLY with %u peers to %s\n",
1137       send_size, GNUNET_i2s (peer_id));
1138
1139   mq = get_mq (peer_id);
1140
1141   ev = GNUNET_MQ_msg_extra (out_msg,
1142                             send_size * sizeof (struct GNUNET_PeerIdentity),
1143                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
1144   out_msg->num_peers = htonl (send_size);
1145   memcpy (&out_msg[1], peer_ids,
1146          send_size * sizeof (struct GNUNET_PeerIdentity));
1147
1148   pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY");
1149   GNUNET_MQ_notify_sent (ev,
1150       mq_notify_sent_cb,
1151       pending_msg);
1152   GNUNET_MQ_send (mq, ev);
1153 }
1154
1155
1156 /**
1157  * This function is called on new peer_ids from 'external' sources
1158  * (client seed, cadet get_peers(), ...)
1159  *
1160  * @param peer_id the new peer_id
1161  */
1162 static void
1163 new_peer_id (const struct GNUNET_PeerIdentity *peer_id)
1164 {
1165   struct PeerOutstandingOp out_op;
1166   struct PeerContext *peer_ctx;
1167
1168   if ((NULL == peer_id) ||
1169       (0 == GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer_id)))
1170     return;
1171   LOG (GNUNET_ERROR_TYPE_DEBUG,
1172        "Got peer_id %s (at %p, view size: %u)\n",
1173        GNUNET_i2s (peer_id),
1174        peer_id,
1175        View_size ());
1176
1177   /* if the seed peer is already know, skip context creation */
1178   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer_id))
1179     peer_ctx = create_peer_ctx (peer_id);
1180   else
1181     peer_ctx = get_peer_ctx (peer_id);
1182
1183   if (GNUNET_NO == get_peer_flag (peer_ctx, VALID))
1184   {
1185     if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx))
1186     {
1187       out_op.op = insert_in_sampler;
1188       out_op.op_cls = NULL;
1189       GNUNET_array_append (peer_ctx->outstanding_ops,
1190                            peer_ctx->num_outstanding_ops,
1191                            out_op);
1192     }
1193
1194     if (GNUNET_NO == insert_in_view_scheduled (peer_ctx))
1195     {
1196       out_op.op = insert_in_view;
1197       out_op.op_cls = NULL;
1198       GNUNET_array_append (peer_ctx->outstanding_ops,
1199                            peer_ctx->num_outstanding_ops,
1200                            out_op);
1201     }
1202
1203     /* Trigger livelyness test on peer */
1204     check_peer_live (peer_ctx);
1205   }
1206   // else...?
1207
1208   // send push/pull to each of those peers?
1209 }
1210
1211
1212 /***********************************************************************
1213  * /Util functions
1214 ***********************************************************************/
1215
1216 static void
1217 destroy_reply_cls (struct ReplyCls *rep_cls)
1218 {
1219   struct ClientContext *cli_ctx;
1220
1221   cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client,
1222                                                    struct ClientContext);
1223   GNUNET_assert (NULL != cli_ctx);
1224   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
1225                                cli_ctx->rep_cls_tail,
1226                                rep_cls);
1227   GNUNET_free (rep_cls);
1228 }
1229
1230 static void
1231 destroy_cli_ctx (struct ClientContext *cli_ctx)
1232 {
1233   GNUNET_assert (NULL != cli_ctx);
1234   if (NULL != cli_ctx->rep_cls_head)
1235   {
1236     LOG (GNUNET_ERROR_TYPE_WARNING,
1237          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
1238     while (NULL != cli_ctx->rep_cls_head)
1239       destroy_reply_cls (cli_ctx->rep_cls_head);
1240   }
1241   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
1242                                cli_ctx_tail,
1243                                cli_ctx);
1244   GNUNET_free (cli_ctx);
1245 }
1246
1247
1248 /**
1249  * Function called by NSE.
1250  *
1251  * Updates sizes of sampler list and view and adapt those lists
1252  * accordingly.
1253  */
1254   void
1255 nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp,
1256               double logestimate, double std_dev)
1257 {
1258   double estimate;
1259   //double scale; // TODO this might go gloabal/config
1260
1261   LOG (GNUNET_ERROR_TYPE_DEBUG,
1262        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
1263        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
1264   //scale = .01;
1265   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
1266   // GNUNET_NSE_log_estimate_to_n (logestimate);
1267   estimate = pow (estimate, 1.0 / 3);
1268   // TODO add if std_dev is a number
1269   // estimate += (std_dev * scale);
1270   if (2 < ceil (estimate))
1271   {
1272     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
1273     sampler_size_est_need = estimate;
1274   } else
1275     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
1276
1277   /* If the NSE has changed adapt the lists accordingly */
1278   resize_wrapper (prot_sampler, sampler_size_est_need);
1279   client_resize_wrapper ();
1280 }
1281
1282
1283 /**
1284  * Callback called once the requested PeerIDs are ready.
1285  *
1286  * Sends those to the requesting client.
1287  */
1288 void client_respond (void *cls,
1289     struct GNUNET_PeerIdentity *peer_ids, uint32_t num_peers)
1290 {
1291   uint32_t i;
1292   struct GNUNET_MQ_Envelope *ev;
1293   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
1294   struct ReplyCls *reply_cls = (struct ReplyCls *) cls;
1295   uint32_t size_needed;
1296   struct ClientContext *cli_ctx;
1297
1298   GNUNET_assert (NULL != reply_cls);
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "sampler returned %" PRIu32 " peers:\n",
1301        num_peers);
1302   for (i = 0; i < num_peers; i++)
1303   {
1304     LOG (GNUNET_ERROR_TYPE_DEBUG,
1305          "  %lu: %s\n",
1306          i,
1307          GNUNET_i2s (&peer_ids[i]));
1308   }
1309
1310   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
1311                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1312
1313   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= size_needed);
1314
1315   ev = GNUNET_MQ_msg_extra (out_msg,
1316                             num_peers * sizeof (struct GNUNET_PeerIdentity),
1317                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
1318   out_msg->num_peers = htonl (num_peers);
1319   out_msg->id = htonl (reply_cls->id);
1320
1321   memcpy (&out_msg[1],
1322           peer_ids,
1323           num_peers * sizeof (struct GNUNET_PeerIdentity));
1324   GNUNET_free (peer_ids);
1325
1326   cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client,
1327                                                    struct ClientContext);
1328   GNUNET_assert (NULL != cli_ctx);
1329   destroy_reply_cls (reply_cls);
1330   GNUNET_MQ_send (cli_ctx->mq, ev);
1331 }
1332
1333
1334 /**
1335  * Handle RPS request from the client.
1336  *
1337  * @param cls closure
1338  * @param client identification of the client
1339  * @param message the actual message
1340  */
1341 static void
1342 handle_client_request (void *cls,
1343                        struct GNUNET_SERVER_Client *client,
1344                        const struct GNUNET_MessageHeader *message)
1345 {
1346   struct GNUNET_RPS_CS_RequestMessage *msg;
1347   uint32_t num_peers;
1348   uint32_t size_needed;
1349   struct ReplyCls *reply_cls;
1350   uint32_t i;
1351   struct ClientContext *cli_ctx;
1352
1353   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
1354
1355   num_peers = ntohl (msg->num_peers);
1356   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
1357                 num_peers * sizeof (struct GNUNET_PeerIdentity);
1358
1359   if (GNUNET_SERVER_MAX_MESSAGE_SIZE < size_needed)
1360   {
1361     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1362                 "Message received from client has size larger than expected\n");
1363     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1364     return;
1365   }
1366
1367   for (i = 0 ; i < num_peers ; i++)
1368     est_request_rate();
1369
1370   LOG (GNUNET_ERROR_TYPE_DEBUG,
1371        "Client requested %" PRIu32 " random peer(s).\n",
1372        num_peers);
1373
1374   reply_cls = GNUNET_new (struct ReplyCls);
1375   reply_cls->id = ntohl (msg->id);
1376   reply_cls->client = client;
1377   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
1378                                                         client_respond,
1379                                                         reply_cls,
1380                                                         num_peers);
1381
1382   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1383   GNUNET_assert (NULL != cli_ctx);
1384   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
1385                                cli_ctx->rep_cls_tail,
1386                                reply_cls);
1387   GNUNET_SERVER_receive_done (client,
1388                               GNUNET_OK);
1389 }
1390
1391
1392 /**
1393  * @brief Handle a message that requests the cancellation of a request
1394  *
1395  * @param cls unused
1396  * @param client the client that requests the cancellation
1397  * @param message the message containing the id of the request
1398  */
1399 static void
1400 handle_client_request_cancel (void *cls,
1401                               struct GNUNET_SERVER_Client *client,
1402                               const struct GNUNET_MessageHeader *message)
1403 {
1404   struct GNUNET_RPS_CS_RequestCancelMessage *msg =
1405     (struct GNUNET_RPS_CS_RequestCancelMessage *) message;
1406   struct ClientContext *cli_ctx;
1407   struct ReplyCls *rep_cls;
1408
1409   cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
1410   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
1411   rep_cls = cli_ctx->rep_cls_head;
1412   LOG (GNUNET_ERROR_TYPE_DEBUG,
1413       "Client cancels request with id %lu\n",
1414       ntohl (msg->id));
1415   while ( (NULL != rep_cls->next) &&
1416           (rep_cls->id != ntohl (msg->id)) )
1417     rep_cls = rep_cls->next;
1418   GNUNET_assert (rep_cls->id == ntohl (msg->id));
1419   RPS_sampler_request_cancel (rep_cls->req_handle);
1420   destroy_reply_cls (rep_cls);
1421   GNUNET_SERVER_receive_done (client,
1422                               GNUNET_OK);
1423 }
1424
1425
1426 /**
1427  * Handle seed from the client.
1428  *
1429  * @param cls closure
1430  * @param client identification of the client
1431  * @param message the actual message
1432  */
1433   static void
1434 handle_client_seed (void *cls,
1435                     struct GNUNET_SERVER_Client *client,
1436                     const struct GNUNET_MessageHeader *message)
1437 {
1438   struct GNUNET_RPS_CS_SeedMessage *in_msg;
1439   struct GNUNET_PeerIdentity *peers;
1440   uint32_t num_peers;
1441   uint32_t i;
1442
1443   if (sizeof (struct GNUNET_RPS_CS_SeedMessage) > ntohs (message->size))
1444   {
1445     GNUNET_break_op (0);
1446     GNUNET_SERVER_receive_done (client,
1447                                 GNUNET_SYSERR);
1448   }
1449
1450   in_msg = (struct GNUNET_RPS_CS_SeedMessage *) message;
1451   num_peers = ntohl (in_msg->num_peers);
1452   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1453   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
1454   //memcpy (peers, &in_msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
1455
1456   if ((ntohs (message->size) - sizeof (struct GNUNET_RPS_CS_SeedMessage)) /
1457       sizeof (struct GNUNET_PeerIdentity) != num_peers)
1458   {
1459     GNUNET_break_op (0);
1460     GNUNET_SERVER_receive_done (client,
1461                                 GNUNET_SYSERR);
1462     return;
1463   }
1464
1465   LOG (GNUNET_ERROR_TYPE_DEBUG,
1466        "Client seeded peers:\n");
1467   print_peer_list (peers, num_peers);
1468
1469   for (i = 0 ; i < num_peers ; i++)
1470   {
1471     LOG (GNUNET_ERROR_TYPE_DEBUG,
1472          "Updating samplers with seed %" PRIu32 ": %s\n",
1473          i,
1474          GNUNET_i2s (&peers[i]));
1475
1476     new_peer_id (&peers[i]);
1477
1478     //RPS_sampler_update (prot_sampler,   &peers[i]);
1479     //RPS_sampler_update (client_sampler, &peers[i]);
1480   }
1481
1482   ////GNUNET_free (peers);
1483
1484   GNUNET_SERVER_receive_done (client,
1485                                                 GNUNET_OK);
1486 }
1487
1488
1489 /**
1490  * Handle a PUSH message from another peer.
1491  *
1492  * Check the proof of work and store the PeerID
1493  * in the temporary list for pushed PeerIDs.
1494  *
1495  * @param cls Closure
1496  * @param channel The channel the PUSH was received over
1497  * @param channel_ctx The context associated with this channel
1498  * @param msg The message header
1499  */
1500 static int
1501 handle_peer_push (void *cls,
1502     struct GNUNET_CADET_Channel *channel,
1503     void **channel_ctx,
1504     const struct GNUNET_MessageHeader *msg)
1505 {
1506   const struct GNUNET_PeerIdentity *peer;
1507
1508   // (check the proof of work)
1509
1510   peer = (const struct GNUNET_PeerIdentity *)
1511     GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1512   // FIXME wait for cadet to change this function
1513
1514   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PUSH (%s)\n", GNUNET_i2s (peer));
1515
1516   #ifdef ENABLE_MALICIOUS
1517   struct AttackedPeer *tmp_att_peer;
1518
1519   tmp_att_peer = GNUNET_new (struct AttackedPeer);
1520   memcpy (&tmp_att_peer->peer_id, peer, sizeof (struct GNUNET_PeerIdentity));
1521   if (1 == mal_type
1522       || 3 == mal_type)
1523   { /* Try to maximise representation */
1524     if (NULL == att_peer_set)
1525       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1526     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1527                                                              peer))
1528     {
1529       GNUNET_CONTAINER_DLL_insert (att_peers_head,
1530                                    att_peers_tail,
1531                                    tmp_att_peer);
1532       add_peer_array_to_set (peer, 1, att_peer_set);
1533     }
1534     return GNUNET_OK;
1535   }
1536
1537
1538   else if (2 == mal_type)
1539   { /* We attack one single well-known peer - simply ignore */
1540     return GNUNET_OK;
1541   }
1542   else
1543   {
1544     GNUNET_free (tmp_att_peer);
1545   }
1546
1547   #endif /* ENABLE_MALICIOUS */
1548
1549   /* Add the sending peer to the push_map */
1550   CustomPeerMap_put (push_map, peer);
1551
1552   GNUNET_CADET_receive_done (channel);
1553   return GNUNET_OK;
1554 }
1555
1556
1557 /**
1558  * Handle PULL REQUEST request message from another peer.
1559  *
1560  * Reply with the view of PeerIDs.
1561  *
1562  * @param cls Closure
1563  * @param channel The channel the PULL REQUEST was received over
1564  * @param channel_ctx The context associated with this channel
1565  * @param msg The message header
1566  */
1567 static int
1568 handle_peer_pull_request (void *cls,
1569     struct GNUNET_CADET_Channel *channel,
1570     void **channel_ctx,
1571     const struct GNUNET_MessageHeader *msg)
1572 {
1573   struct GNUNET_PeerIdentity *peer;
1574   const struct GNUNET_PeerIdentity *view_array;
1575
1576   peer = (struct GNUNET_PeerIdentity *)
1577     GNUNET_CADET_channel_get_info (channel,
1578                                    GNUNET_CADET_OPTION_PEER);
1579   // FIXME wait for cadet to change this function
1580
1581   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
1582
1583   #ifdef ENABLE_MALICIOUS
1584   if (1 == mal_type
1585       || 3 == mal_type)
1586   { /* Try to maximise representation */
1587     send_pull_reply (peer, mal_peers, num_mal_peers);
1588     return GNUNET_OK;
1589   }
1590
1591   else if (2 == mal_type)
1592   { /* Try to partition network */
1593     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
1594     {
1595       send_pull_reply (peer, mal_peers, num_mal_peers);
1596     }
1597     return GNUNET_OK;
1598   }
1599   #endif /* ENABLE_MALICIOUS */
1600
1601   view_array = View_get_as_array ();
1602
1603   send_pull_reply (peer, view_array, View_size ());
1604
1605   GNUNET_CADET_receive_done (channel);
1606   return GNUNET_OK;
1607 }
1608
1609
1610 /**
1611  * Handle PULL REPLY message from another peer.
1612  *
1613  * Check whether we sent a corresponding request and
1614  * whether this reply is the first one.
1615  *
1616  * @param cls Closure
1617  * @param channel The channel the PUSH was received over
1618  * @param channel_ctx The context associated with this channel
1619  * @param msg The message header
1620  */
1621   static int
1622 handle_peer_pull_reply (void *cls,
1623                         struct GNUNET_CADET_Channel *channel,
1624                         void **channel_ctx,
1625                         const struct GNUNET_MessageHeader *msg)
1626 {
1627   struct GNUNET_RPS_P2P_PullReplyMessage *in_msg;
1628   struct GNUNET_PeerIdentity *peers;
1629   struct PeerContext *peer_ctx;
1630   struct GNUNET_PeerIdentity *sender;
1631   struct PeerContext *sender_ctx;
1632   struct PeerOutstandingOp out_op;
1633   uint32_t i;
1634 #ifdef ENABLE_MALICIOUS
1635   struct AttackedPeer *tmp_att_peer;
1636 #endif /* ENABLE_MALICIOUS */
1637
1638   /* Check for protocol violation */
1639   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size))
1640   {
1641     GNUNET_break_op (0);
1642     GNUNET_CADET_receive_done (channel);
1643     return GNUNET_SYSERR;
1644   }
1645
1646   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
1647   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1648       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1649   {
1650     LOG (GNUNET_ERROR_TYPE_ERROR,
1651         "message says it sends %" PRIu32 " peers, have space for %i peers\n",
1652         ntohl (in_msg->num_peers),
1653         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
1654             sizeof (struct GNUNET_PeerIdentity));
1655     GNUNET_break_op (0);
1656     GNUNET_CADET_receive_done (channel);
1657     return GNUNET_SYSERR;
1658   }
1659
1660   // Guess simply casting isn't the nicest way...
1661   // FIXME wait for cadet to change this function
1662   sender = (struct GNUNET_PeerIdentity *)
1663       GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER);
1664   sender_ctx = get_peer_ctx (sender);
1665
1666   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
1667
1668   if (GNUNET_YES != get_peer_flag (sender_ctx, PULL_REPLY_PENDING))
1669   {
1670     LOG (GNUNET_ERROR_TYPE_WARNING,
1671         "Received a pull reply from a peer we didn't request one from!\n");
1672     GNUNET_break_op (0);
1673     GNUNET_CADET_receive_done (channel);
1674     return GNUNET_OK;
1675   }
1676
1677
1678   #ifdef ENABLE_MALICIOUS
1679   // We shouldn't even receive pull replies as we're not sending
1680   if (2 == mal_type)
1681     return GNUNET_OK;
1682   #endif /* ENABLE_MALICIOUS */
1683
1684   /* Do actual logic */
1685   peers = (struct GNUNET_PeerIdentity *) &in_msg[1];
1686
1687   LOG (GNUNET_ERROR_TYPE_DEBUG,
1688        "PULL REPLY received, got following %u peers:\n",
1689        ntohl (in_msg->num_peers));
1690
1691   for (i = 0 ; i < ntohl (in_msg->num_peers) ; i++)
1692   {
1693     LOG (GNUNET_ERROR_TYPE_DEBUG,
1694          "%u. %s\n",
1695          i,
1696          GNUNET_i2s (&peers[i]));
1697
1698     #ifdef ENABLE_MALICIOUS
1699     if ((NULL != att_peer_set) &&
1700         (1 == mal_type || 3 == mal_type))
1701     { /* Add attacked peer to local list */
1702       // TODO check if we sent a request and this was the first reply
1703       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
1704                                                                &peers[i])
1705           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
1706                                                                   &peers[i])
1707           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
1708                                                    &own_identity))
1709       {
1710         tmp_att_peer = GNUNET_new (struct AttackedPeer);
1711         tmp_att_peer->peer_id = peers[i];
1712         GNUNET_CONTAINER_DLL_insert (att_peers_head,
1713                                      att_peers_tail,
1714                                      tmp_att_peer);
1715         add_peer_array_to_set (&peers[i], 1, att_peer_set);
1716       }
1717       continue;
1718     }
1719     #endif /* ENABLE_MALICIOUS */
1720     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
1721                                               &peers[i]))
1722     {
1723       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map,
1724             &peers[i]))
1725         peer_ctx = create_peer_ctx (&peers[i]);
1726       else
1727         peer_ctx = get_peer_ctx (&peers[i]);
1728
1729       if (GNUNET_YES == get_peer_flag (peer_ctx, VALID))
1730       {
1731         CustomPeerMap_put (pull_map, &peers[i]);
1732       }
1733       else if (GNUNET_NO == insert_in_pull_map_scheduled (peer_ctx))
1734       {
1735         out_op.op = insert_in_pull_map;
1736         out_op.op_cls = NULL;
1737         GNUNET_array_append (peer_ctx->outstanding_ops,
1738                              peer_ctx->num_outstanding_ops,
1739                              out_op);
1740         check_peer_live (peer_ctx);
1741       }
1742     }
1743   }
1744
1745   unset_peer_flag (sender_ctx, PULL_REPLY_PENDING);
1746
1747   peer_clean (sender);
1748
1749   GNUNET_CADET_receive_done (channel);
1750   return GNUNET_OK;
1751 }
1752
1753
1754 /**
1755  * Compute a random delay.
1756  * A uniformly distributed value between mean + spread and mean - spread.
1757  *
1758  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
1759  * It would return a random value between 2 and 6 min.
1760  *
1761  * @param mean the mean
1762  * @param spread the inverse amount of deviation from the mean
1763  */
1764 static struct GNUNET_TIME_Relative
1765 compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread)
1766 {
1767   struct GNUNET_TIME_Relative half_interval;
1768   struct GNUNET_TIME_Relative ret;
1769   unsigned int rand_delay;
1770   unsigned int max_rand_delay;
1771
1772   if (0 == spread)
1773   {
1774     LOG (GNUNET_ERROR_TYPE_WARNING,
1775          "Not accepting spread of 0\n");
1776     GNUNET_break (0);
1777   }
1778
1779   /* Compute random time value between spread * mean and spread * mean */
1780   half_interval = GNUNET_TIME_relative_divide (mean, spread);
1781
1782   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
1783   /**
1784    * Compute random value between (0 and 1) * round_interval
1785    * via multiplying round_interval with a 'fraction' (0 to value)/value
1786    */
1787   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
1788   ret = GNUNET_TIME_relative_multiply (mean,  rand_delay);
1789   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
1790   ret = GNUNET_TIME_relative_add      (ret, half_interval);
1791
1792   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
1793     LOG (GNUNET_ERROR_TYPE_WARNING,
1794          "Returning FOREVER_REL\n");
1795
1796   return ret;
1797 }
1798
1799
1800 /**
1801  * Send single pull request
1802  *
1803  * @param peer_id the peer to send the pull request to.
1804  */
1805 static void
1806 send_pull_request (const struct GNUNET_PeerIdentity *peer_id)
1807 {
1808   struct GNUNET_MQ_Envelope *ev;
1809   struct GNUNET_MQ_Handle *mq;
1810   struct PeerContext *peer_ctx;
1811   struct PendingMessage *pending_msg;
1812
1813   peer_ctx = get_peer_ctx (peer_id);
1814   GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING));
1815   set_peer_flag (peer_ctx, PULL_REPLY_PENDING);
1816
1817   LOG (GNUNET_ERROR_TYPE_DEBUG,
1818        "Going to send PULL REQUEST to peer %s.\n",
1819        GNUNET_i2s (peer_id));
1820
1821   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
1822   mq = get_mq (peer_id);
1823   pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST");
1824   GNUNET_MQ_notify_sent (ev,
1825       mq_notify_sent_cb,
1826       pending_msg);
1827   GNUNET_MQ_send (mq, ev);
1828 }
1829
1830
1831 /**
1832  * Send single push
1833  *
1834  * @param peer_id the peer to send the push to.
1835  */
1836 static void
1837 send_push (const struct GNUNET_PeerIdentity *peer_id)
1838 {
1839   struct GNUNET_MQ_Envelope *ev;
1840   struct GNUNET_MQ_Handle *mq;
1841   struct PendingMessage *pending_msg;
1842
1843   LOG (GNUNET_ERROR_TYPE_DEBUG,
1844        "Going to send PUSH to peer %s.\n",
1845        GNUNET_i2s (peer_id));
1846
1847   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
1848   mq = get_mq (peer_id);
1849   pending_msg = insert_pending_message (peer_id, ev, "PUSH");
1850   GNUNET_MQ_notify_sent (ev,
1851       mq_notify_sent_cb,
1852       pending_msg);
1853   GNUNET_MQ_send (mq, ev);
1854 }
1855
1856
1857 static void
1858 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1859
1860 static void
1861 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
1862
1863
1864 #ifdef ENABLE_MALICIOUS
1865 /**
1866  * Turn RPS service to act malicious.
1867  *
1868  * @param cls Closure
1869  * @param client The client that sent the message
1870  * @param msg The message header
1871  */
1872   static void
1873 handle_client_act_malicious (void *cls,
1874                              struct GNUNET_SERVER_Client *client,
1875                              const struct GNUNET_MessageHeader *msg)
1876 {
1877   struct GNUNET_RPS_CS_ActMaliciousMessage *in_msg;
1878   struct GNUNET_PeerIdentity *peers;
1879   uint32_t num_mal_peers_sent;
1880   uint32_t num_mal_peers_old;
1881
1882   /* Check for protocol violation */
1883   if (sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage) > ntohs (msg->size))
1884   {
1885     GNUNET_break_op (0);
1886   }
1887
1888   in_msg = (struct GNUNET_RPS_CS_ActMaliciousMessage *) msg;
1889   if ((ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1890       sizeof (struct GNUNET_PeerIdentity) != ntohl (in_msg->num_peers))
1891   {
1892     LOG (GNUNET_ERROR_TYPE_ERROR,
1893         "message says it sends %" PRIu64 " peers, have space for %i peers\n",
1894         ntohl (in_msg->num_peers),
1895         (ntohs (msg->size) - sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage)) /
1896             sizeof (struct GNUNET_PeerIdentity));
1897     GNUNET_break_op (0);
1898   }
1899
1900
1901   /* Do actual logic */
1902   peers = (struct GNUNET_PeerIdentity *) &msg[1];
1903   mal_type = ntohl (in_msg->type);
1904   if (NULL == mal_peer_set)
1905     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
1906
1907   LOG (GNUNET_ERROR_TYPE_DEBUG,
1908        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
1909        mal_type,
1910        ntohl (in_msg->num_peers));
1911
1912   if (1 == mal_type)
1913   { /* Try to maximise representation */
1914     /* Add other malicious peers to those we already know */
1915
1916     num_mal_peers_sent = ntohl (in_msg->num_peers);
1917     num_mal_peers_old = num_mal_peers;
1918     GNUNET_array_grow (mal_peers,
1919                        num_mal_peers,
1920                        num_mal_peers + num_mal_peers_sent);
1921     memcpy (&mal_peers[num_mal_peers_old],
1922             peers,
1923             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1924
1925     /* Add all mal peers to mal_peer_set */
1926     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1927                            num_mal_peers_sent,
1928                            mal_peer_set);
1929
1930     /* Substitute do_round () with do_mal_round () */
1931     GNUNET_SCHEDULER_cancel (do_round_task);
1932     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1933   }
1934
1935   else if (2 == mal_type
1936            || 3 == mal_type)
1937   { /* Try to partition the network */
1938     /* Add other malicious peers to those we already know */
1939     struct PeerContext *att_ctx;
1940
1941     num_mal_peers_sent = ntohl (in_msg->num_peers) - 1;
1942     num_mal_peers_old = num_mal_peers;
1943     GNUNET_array_grow (mal_peers,
1944                        num_mal_peers,
1945                        num_mal_peers + num_mal_peers_sent);
1946     if (NULL != mal_peers &&
1947         0 != num_mal_peers)
1948     {
1949       memcpy (&mal_peers[num_mal_peers_old],
1950               peers,
1951               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
1952
1953       /* Add all mal peers to mal_peer_set */
1954       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
1955                              num_mal_peers_sent,
1956                              mal_peer_set);
1957     }
1958
1959     /* Store the one attacked peer */
1960     memcpy (&attacked_peer,
1961             &in_msg->attacked_peer,
1962             sizeof (struct GNUNET_PeerIdentity));
1963     /* Set the flag of the attacked peer to valid to avoid problems */
1964     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map,
1965           &attacked_peer))
1966     {
1967       att_ctx = create_peer_ctx (&attacked_peer);
1968       check_peer_live (att_ctx);
1969     }
1970
1971     LOG (GNUNET_ERROR_TYPE_DEBUG,
1972          "Attacked peer is %s\n",
1973          GNUNET_i2s (&attacked_peer));
1974
1975     /* Substitute do_round () with do_mal_round () */
1976     GNUNET_SCHEDULER_cancel (do_round_task);
1977     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
1978   }
1979   else if (0 == mal_type)
1980   { /* Stop acting malicious */
1981     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
1982
1983     /* Substitute do_mal_round () with do_round () */
1984     GNUNET_SCHEDULER_cancel (do_round_task);
1985     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
1986   }
1987   else
1988   {
1989     GNUNET_break (0);
1990   }
1991
1992   GNUNET_SERVER_receive_done (client,   GNUNET_OK);
1993 }
1994
1995
1996 /**
1997  * Send out PUSHes and PULLs maliciously.
1998  *
1999  * This is executed regylary.
2000  */
2001 static void
2002 do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2003 {
2004   uint32_t num_pushes;
2005   uint32_t i;
2006   struct GNUNET_TIME_Relative time_next_round;
2007   struct AttackedPeer *tmp_att_peer;
2008   struct PeerContext *peer_ctx;
2009
2010   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously type %" PRIu32 ".\n",
2011       mal_type);
2012   do_round_task = NULL;
2013   GNUNET_assert (mal_type <= 3);
2014   /* Do malicious actions */
2015   if (1 == mal_type)
2016   { /* Try to maximise representation */
2017
2018     /* The maximum of pushes we're going to send this round */
2019     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
2020                                          num_attacked_peers),
2021                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
2022
2023     LOG (GNUNET_ERROR_TYPE_DEBUG,
2024          "Going to send %" PRIu32 " pushes\n",
2025          num_pushes);
2026
2027     /* Send PUSHes to attacked peers */
2028     for (i = 0 ; i < num_pushes ; i++)
2029     {
2030       if (att_peers_tail == att_peer_index)
2031         att_peer_index = att_peers_head;
2032       else
2033         att_peer_index = att_peer_index->next;
2034
2035       send_push (&att_peer_index->peer_id);
2036     }
2037
2038     /* Send PULLs to some peers to learn about additional peers to attack */
2039     tmp_att_peer = att_peer_index;
2040     for (i = 0 ; i < num_pushes * alpha ; i++)
2041     {
2042       if (att_peers_tail == tmp_att_peer)
2043         tmp_att_peer = att_peers_head;
2044       else
2045         att_peer_index = tmp_att_peer->next;
2046
2047       send_pull_request (&tmp_att_peer->peer_id);
2048     }
2049   }
2050
2051
2052   else if (2 == mal_type)
2053   { /**
2054      * Try to partition the network
2055      * Send as many pushes to the attacked peer as possible
2056      * That is one push per round as it will ignore more.
2057      */
2058     peer_ctx = get_peer_ctx (&attacked_peer);
2059     if (GNUNET_YES == get_peer_flag (peer_ctx, VALID))
2060       send_push (&attacked_peer);
2061   }
2062
2063
2064   if (3 == mal_type)
2065   { /* Combined attack */
2066
2067     /* Send PUSH to attacked peers */
2068     if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map,
2069           &attacked_peer))
2070     {
2071       peer_ctx = get_peer_ctx (&attacked_peer);
2072       if (GNUNET_YES == get_peer_flag (peer_ctx, VALID))
2073       {
2074         LOG (GNUNET_ERROR_TYPE_DEBUG,
2075             "Goding to send push to attacked peer (%s)\n",
2076             GNUNET_i2s (&attacked_peer));
2077         send_push (&attacked_peer);
2078       }
2079       else
2080         check_peer_live (peer_ctx);
2081     }
2082     else
2083       peer_ctx = create_peer_ctx (&attacked_peer);
2084     check_peer_live (peer_ctx);
2085
2086     /* The maximum of pushes we're going to send this round */
2087     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
2088                                          num_attacked_peers),
2089                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
2090
2091     LOG (GNUNET_ERROR_TYPE_DEBUG,
2092          "Going to send %" PRIu32 " pushes\n",
2093          num_pushes);
2094
2095     for (i = 0 ; i < num_pushes ; i++)
2096     {
2097       if (att_peers_tail == att_peer_index)
2098         att_peer_index = att_peers_head;
2099       else
2100         att_peer_index = att_peer_index->next;
2101
2102       send_push (&att_peer_index->peer_id);
2103     }
2104
2105     /* Send PULLs to some peers to learn about additional peers to attack */
2106     tmp_att_peer = att_peer_index;
2107     for (i = 0 ; i < num_pushes * alpha ; i++)
2108     {
2109       if (att_peers_tail == tmp_att_peer)
2110         tmp_att_peer = att_peers_head;
2111       else
2112         att_peer_index = tmp_att_peer->next;
2113
2114       send_pull_request (&tmp_att_peer->peer_id);
2115     }
2116   }
2117
2118   /* Schedule next round */
2119   time_next_round = compute_rand_delay (round_interval, 2);
2120
2121   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
2122   //NULL);
2123   GNUNET_assert (NULL == do_round_task);
2124   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_mal_round, NULL);
2125   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
2126 }
2127 #endif /* ENABLE_MALICIOUS */
2128
2129
2130 /**
2131  * Send out PUSHes and PULLs, possibly update #view, samplers.
2132  *
2133  * This is executed regylary.
2134  */
2135 static void
2136 do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2137 {
2138   LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round.\n");
2139
2140   uint32_t i;
2141   const struct GNUNET_PeerIdentity *view_array;
2142   unsigned int *permut;
2143   unsigned int a_peers; /* Number of peers we send pushes to */
2144   unsigned int b_peers; /* Number of peers we send pull requests to */
2145   uint32_t first_border;
2146   uint32_t second_border;
2147   struct GNUNET_PeerIdentity peer;
2148   struct PeerContext *peer_ctx;
2149   struct GNUNET_PeerIdentity *update_peer;
2150
2151   do_round_task = NULL;
2152   LOG (GNUNET_ERROR_TYPE_DEBUG,
2153        "Printing view:\n");
2154   to_file (file_name_view_log,
2155            "___ new round ___");
2156   view_array = View_get_as_array ();
2157   for (i = 0; i < View_size (); i++)
2158   {
2159     LOG (GNUNET_ERROR_TYPE_DEBUG,
2160          "\t%s\n", GNUNET_i2s (&view_array[i]));
2161     to_file (file_name_view_log,
2162              "=%s\t(do round)",
2163              GNUNET_i2s_full (&view_array[i]));
2164   }
2165
2166
2167   /* Send pushes and pull requests */
2168   if (0 < View_size ())
2169   {
2170     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
2171                                            View_size ());
2172
2173     /* Send PUSHes */
2174     a_peers = ceil (alpha * View_size ());
2175
2176     LOG (GNUNET_ERROR_TYPE_DEBUG,
2177          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
2178          a_peers, alpha, View_size ());
2179     for (i = 0; i < a_peers; i++)
2180     {
2181       peer = view_array[permut[i]];
2182       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
2183       { // FIXME if this fails schedule/loop this for later
2184         send_push (&peer);
2185       }
2186     }
2187
2188     /* Send PULL requests */
2189     b_peers = ceil (beta * View_size ());
2190     first_border = a_peers;
2191     second_border = a_peers + b_peers;
2192     if (second_border > View_size ())
2193     {
2194       first_border = View_size () - b_peers;
2195       second_border = View_size ();
2196     }
2197     LOG (GNUNET_ERROR_TYPE_DEBUG,
2198         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
2199         b_peers, beta, View_size ());
2200     for (i = first_border; i < second_border; i++)
2201     {
2202       peer = view_array[permut[i]];
2203       peer_ctx = get_peer_ctx (&peer);
2204       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
2205           GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) // TODO
2206       { // FIXME if this fails schedule/loop this for later
2207         send_pull_request (&peer);
2208       }
2209     }
2210
2211     GNUNET_free (permut);
2212     permut = NULL;
2213   }
2214
2215
2216   /* Update view */
2217   /* TODO see how many peers are in push-/pull- list! */
2218
2219   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
2220       (0 < CustomPeerMap_size (push_map)) &&
2221       (0 < CustomPeerMap_size (pull_map)))
2222   { /* If conditions for update are fulfilled, update */
2223     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
2224
2225     uint32_t final_size;
2226     uint32_t peers_to_clean_size;
2227     struct GNUNET_PeerIdentity *peers_to_clean;
2228
2229     peers_to_clean = NULL;
2230     peers_to_clean_size = 0;
2231     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
2232     memcpy (peers_to_clean,
2233             view_array,
2234             View_size () * sizeof (struct GNUNET_PeerIdentity));
2235
2236     /* Seems like recreating is the easiest way of emptying the peermap */
2237     View_clear ();
2238     to_file (file_name_view_log,
2239              "--- emptied ---");
2240
2241     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
2242                                 CustomPeerMap_size (push_map));
2243     second_border = first_border +
2244                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
2245                                 CustomPeerMap_size (pull_map));
2246     final_size    = second_border +
2247       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
2248
2249     /* Update view with peers received through PUSHes */
2250     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
2251                                            CustomPeerMap_size (push_map));
2252     for (i = 0; i < first_border; i++)
2253     {
2254       View_put (CustomPeerMap_get_peer_by_index (push_map, permut[i]));
2255       to_file (file_name_view_log,
2256                "+%s\t(push list)",
2257                GNUNET_i2s_full (&view_array[i]));
2258       // TODO change the peer_flags accordingly
2259     }
2260     GNUNET_free (permut);
2261     permut = NULL;
2262
2263     /* Update view with peers received through PULLs */
2264     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
2265                                            CustomPeerMap_size (pull_map));
2266     for (i = first_border; i < second_border; i++)
2267     {
2268       View_put (CustomPeerMap_get_peer_by_index (pull_map,
2269                                                  permut[i - first_border]));
2270       to_file (file_name_view_log,
2271                "+%s\t(pull list)",
2272                GNUNET_i2s_full (&view_array[i]));
2273       // TODO change the peer_flags accordingly
2274     }
2275     GNUNET_free (permut);
2276     permut = NULL;
2277
2278     /* Update view with peers from history */
2279     RPS_sampler_get_n_rand_peers (prot_sampler,
2280                                   hist_update,
2281                                   NULL,
2282                                   final_size - second_border);
2283     num_hist_update_tasks = final_size - second_border;
2284     // TODO change the peer_flags accordingly
2285
2286     for (i = 0; i < View_size (); i++)
2287       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
2288
2289     /* Clean peers that were removed from the view */
2290     for (i = 0; i < peers_to_clean_size; i++)
2291     {
2292       to_file (file_name_view_log,
2293                "-%s",
2294                GNUNET_i2s_full (&peers_to_clean[i]));
2295       peer_clean (&peers_to_clean[i]);
2296     }
2297
2298     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
2299     peers_to_clean = NULL;
2300   }
2301   else
2302   {
2303     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
2304   }
2305   // TODO independent of that also get some peers from CADET_get_peers()?
2306
2307   LOG (GNUNET_ERROR_TYPE_DEBUG,
2308        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
2309        CustomPeerMap_size (push_map),
2310        CustomPeerMap_size (pull_map),
2311        alpha,
2312        View_size (),
2313        alpha * View_size ());
2314
2315   /* Update samplers */
2316   for (i = 0; i < CustomPeerMap_size (push_map); i++)
2317   {
2318     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
2319     LOG (GNUNET_ERROR_TYPE_DEBUG,
2320          "Updating with peer %s from push list\n",
2321          GNUNET_i2s (update_peer));
2322     insert_in_sampler (NULL, update_peer);
2323     peer_clean (update_peer); /* This cleans only if it is not in the view */
2324   }
2325
2326   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
2327   {
2328     LOG (GNUNET_ERROR_TYPE_DEBUG,
2329          "Updating with peer %s from pull list\n",
2330          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
2331     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
2332     /* This cleans only if it is not in the view */
2333     peer_clean (CustomPeerMap_get_peer_by_index (pull_map, i));
2334   }
2335
2336
2337   /* Empty push/pull lists */
2338   CustomPeerMap_clear (push_map);
2339   CustomPeerMap_clear (pull_map);
2340
2341   struct GNUNET_TIME_Relative time_next_round;
2342
2343   time_next_round = compute_rand_delay (round_interval, 2);
2344
2345   /* Schedule next round */
2346   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round, &do_round, NULL);
2347   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
2348 }
2349
2350
2351 static void
2352 rps_start (struct GNUNET_SERVER_Handle *server);
2353
2354
2355 /**
2356  * This is called from GNUNET_CADET_get_peers().
2357  *
2358  * It is called on every peer(ID) that cadet somehow has contact with.
2359  * We use those to initialise the sampler.
2360  */
2361 void
2362 init_peer_cb (void *cls,
2363               const struct GNUNET_PeerIdentity *peer,
2364               int tunnel, // "Do we have a tunnel towards this peer?"
2365               unsigned int n_paths, // "Number of known paths towards this peer"
2366               unsigned int best_path) // "How long is the best path?
2367                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
2368 {
2369   if (NULL != peer)
2370   {
2371     LOG (GNUNET_ERROR_TYPE_DEBUG,
2372          "Got peer_id %s from cadet\n",
2373          GNUNET_i2s (peer));
2374     new_peer_id (peer);
2375   }
2376 }
2377
2378
2379 /**
2380  * Iterator over peers from peerinfo.
2381  *
2382  * @param cls closure
2383  * @param peer id of the peer, NULL for last call
2384  * @param hello hello message for the peer (can be NULL)
2385  * @param error message
2386  */
2387 void
2388 process_peerinfo_peers (void *cls,
2389                         const struct GNUNET_PeerIdentity *peer,
2390                         const struct GNUNET_HELLO_Message *hello,
2391                         const char *err_msg)
2392 {
2393   if (NULL != peer)
2394   {
2395     LOG (GNUNET_ERROR_TYPE_DEBUG,
2396          "Got peer_id %s from peerinfo\n",
2397          GNUNET_i2s (peer));
2398     new_peer_id (peer);
2399   }
2400 }
2401
2402
2403 /**
2404  * Callback used to remove peers from the multipeermap.
2405  */
2406   int
2407 peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value)
2408 {
2409   struct PeerContext *peer_ctx;
2410   const struct GNUNET_CADET_Channel *channel =
2411     (const struct GNUNET_CADET_Channel *) cls;
2412
2413   peer_ctx = (struct PeerContext *) value;
2414   set_peer_flag (peer_ctx, TO_DESTROY);
2415
2416   LOG (GNUNET_ERROR_TYPE_DEBUG,
2417        "Going to remove peer %s\n",
2418        GNUNET_i2s (&peer_ctx->peer_id));
2419
2420   /* Remove it from the sampler used for the Brahms protocol */
2421   RPS_sampler_reinitialise_by_value (prot_sampler, key);
2422
2423   /* If operations are still scheduled for this peer cancel those */
2424   if (0 != peer_ctx->num_outstanding_ops)
2425   {
2426     GNUNET_array_grow (peer_ctx->outstanding_ops,
2427                        peer_ctx->num_outstanding_ops,
2428                        0);
2429   }
2430
2431   /* If we are still waiting for notification whether this peer is live
2432    * cancel the according task */
2433   if (NULL != peer_ctx->transmit_handle)
2434   {
2435     LOG (GNUNET_ERROR_TYPE_DEBUG,
2436          "Trying to cancle transmit_handle for peer %s\n",
2437          GNUNET_i2s (key));
2438     GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle);
2439     peer_ctx->transmit_handle = NULL;
2440   }
2441
2442   unset_peer_flag (peer_ctx, PULL_REPLY_PENDING);
2443
2444   /* Remove peer from view */
2445   if (View_contains_peer (key))
2446   {
2447     to_file (file_name_view_log,
2448         "-%s\t(cleanup channel, other peer)",
2449         GNUNET_i2s_full (key));
2450     View_remove_peer (key);
2451   }
2452
2453   /* Remove from push and pull lists */
2454   CustomPeerMap_remove_peer (push_map, key);
2455   CustomPeerMap_remove_peer (pull_map, key);
2456
2457   /* Cancle messages that have not been sent yet */
2458   while (NULL != peer_ctx->pending_messages_head)
2459   {
2460     LOG (GNUNET_ERROR_TYPE_DEBUG,
2461         "Removing unsent %s\n",
2462         peer_ctx->pending_messages_head->type);
2463     /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does not
2464      * set a #GNUNET_MQ_CancelImpl */
2465     /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */
2466     remove_pending_message (peer_ctx->pending_messages_head);
2467   }
2468
2469   /* If there is still a mq destroy it */
2470   if (NULL != peer_ctx->mq)
2471   {
2472     GNUNET_MQ_destroy (peer_ctx->mq);
2473     peer_ctx->mq = NULL;
2474   }
2475
2476
2477   /* Remove the send_channel
2478    * This function should be called again from #cleanup_channel (callback
2479    * called on the destruction of channels) and clean up the rest. */
2480   if (NULL != peer_ctx->send_channel &&
2481       channel != peer_ctx->send_channel)
2482   {
2483     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
2484     peer_ctx->send_channel = NULL;
2485   }
2486
2487   /* Remove the recv_channel
2488    * This function should be called again from #cleanup_channel (callback
2489    * called on the destruction of channels) and clean up the rest. */
2490   if (NULL != peer_ctx->recv_channel &&
2491       channel != peer_ctx->recv_channel)
2492   {
2493     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
2494     peer_ctx->recv_channel = NULL;
2495   }
2496
2497   /* If there is no channel we have to remove the context now */
2498   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, key))
2499     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
2500
2501   GNUNET_free (peer_ctx);
2502
2503   return GNUNET_YES;
2504 }
2505
2506
2507 /**
2508  * Clean the send channel of a peer
2509  * If there is also no channel to receive messages from that peer, remove it
2510  * from the peermap.
2511  */
2512 void
2513 peer_clean (const struct GNUNET_PeerIdentity *peer)
2514 {
2515   struct PeerContext *peer_ctx;
2516   /* struct GNUNET_CADET_Channel *channel; */
2517
2518   if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
2519        (GNUNET_NO  == View_contains_peer (peer)) &&
2520        (GNUNET_NO  == CustomPeerMap_contains_peer (push_map, peer)) &&
2521        (GNUNET_NO  == CustomPeerMap_contains_peer (pull_map, peer)) &&
2522        (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) )
2523   {
2524     peer_ctx = get_peer_ctx (peer);
2525
2526     if ( (NULL == peer_ctx->recv_channel) &&
2527          (NULL == peer_ctx->pending_messages_head) &&
2528          (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) )
2529     {
2530       #ifdef ENABLE_MALICIOUS
2531       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2532         peer_remove_cb (NULL, peer, peer_ctx);
2533       #else /* ENABLE_MALICIOUS */
2534       peer_remove_cb (NULL, peer, peer_ctx);
2535       #endif /* ENABLE_MALICIOUS */
2536     }
2537   }
2538 }
2539
2540
2541 /**
2542  * Task run during shutdown.
2543  *
2544  * @param cls unused
2545  * @param tc unused
2546  */
2547 static void
2548 shutdown_task (void *cls,
2549                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2550 {
2551
2552   LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS is going down\n");
2553
2554   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
2555   GNUNET_PEERINFO_disconnect (peerinfo_handle);
2556
2557   if (NULL != do_round_task)
2558   {
2559     GNUNET_SCHEDULER_cancel (do_round_task);
2560     do_round_task = NULL;
2561   }
2562
2563   {
2564   if (GNUNET_SYSERR ==
2565         GNUNET_CONTAINER_multipeermap_iterate (peer_map, peer_remove_cb, NULL))
2566     LOG (GNUNET_ERROR_TYPE_WARNING,
2567         "Iterating over peers to disconnect from them was cancelled\n");
2568   }
2569
2570   GNUNET_NSE_disconnect (nse);
2571   RPS_sampler_destroy (prot_sampler);
2572   RPS_sampler_destroy (client_sampler);
2573   LOG (GNUNET_ERROR_TYPE_DEBUG,
2574        "Size of the peermap: %u\n",
2575        GNUNET_CONTAINER_multipeermap_size (peer_map));
2576   GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map));
2577   GNUNET_CADET_disconnect (cadet_handle);
2578   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
2579   View_destroy ();
2580   CustomPeerMap_destroy (push_map);
2581   CustomPeerMap_destroy (pull_map);
2582   #ifdef ENABLE_MALICIOUS
2583   struct AttackedPeer *tmp_att_peer;
2584   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
2585   if (NULL != mal_peer_set)
2586     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
2587   if (NULL != att_peer_set)
2588     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
2589   while (NULL != att_peers_head)
2590   {
2591     tmp_att_peer = att_peers_head;
2592     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
2593   }
2594   #endif /* ENABLE_MALICIOUS */
2595 }
2596
2597
2598 /**
2599  * @brief Get informed about a connecting client.
2600  *
2601  * @param cls unused
2602  * @param client the client that connects
2603  */
2604 static void
2605 handle_client_connect (void *cls,
2606                        struct GNUNET_SERVER_Client *client)
2607 {
2608   struct ClientContext *cli_ctx;
2609
2610   LOG (GNUNET_ERROR_TYPE_DEBUG,
2611        "Client connected\n");
2612   if (NULL == client)
2613     return; /* Server was destroyed before a client connected. Shutting down */
2614   cli_ctx = GNUNET_new (struct ClientContext);
2615   cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
2616   GNUNET_SERVER_client_set_user_context (client, cli_ctx);
2617   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
2618                                cli_ctx_tail,
2619                                cli_ctx);
2620 }
2621
2622 /**
2623  * A client disconnected.  Remove all of its data structure entries.
2624  *
2625  * @param cls closure, NULL
2626  * @param client identification of the client
2627  */
2628 static void
2629 handle_client_disconnect (void *cls,
2630                                             struct GNUNET_SERVER_Client *client)
2631 {
2632   struct ClientContext *cli_ctx;
2633
2634   if (NULL == client)
2635   {/* shutdown task */
2636     while (NULL != cli_ctx_head)
2637       destroy_cli_ctx (cli_ctx_head);
2638   }
2639   else
2640   {
2641     cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext);
2642     destroy_cli_ctx (cli_ctx);
2643   }
2644 }
2645
2646
2647 /**
2648  * Handle the channel a peer opens to us.
2649  *
2650  * @param cls The closure
2651  * @param channel The channel the peer wants to establish
2652  * @param initiator The peer's peer ID
2653  * @param port The port the channel is being established over
2654  * @param options Further options
2655  */
2656   static void *
2657 handle_inbound_channel (void *cls,
2658                         struct GNUNET_CADET_Channel *channel,
2659                         const struct GNUNET_PeerIdentity *initiator,
2660                         uint32_t port,
2661                         enum GNUNET_CADET_ChannelOption options)
2662 {
2663   struct PeerContext *peer_ctx;
2664
2665   LOG (GNUNET_ERROR_TYPE_DEBUG,
2666       "New channel was established to us (Peer %s).\n",
2667       GNUNET_i2s (initiator));
2668   GNUNET_assert (NULL != channel); /* according to cadet API */
2669   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, initiator))
2670     peer_ctx = create_peer_ctx (initiator);
2671   else
2672     peer_ctx = get_peer_ctx (initiator);
2673   /* We only accept one incoming channel from peers */
2674   if (NULL != peer_ctx->recv_channel)
2675   {
2676     GNUNET_CADET_channel_destroy (channel);
2677     return NULL;
2678   }
2679   peer_ctx->recv_channel = channel;
2680   peer_is_live (peer_ctx);
2681   return NULL;
2682 }
2683
2684
2685 /**
2686  * This is called when a channel is destroyed.
2687  *
2688  * @param cls The closure
2689  * @param channel The channel being closed
2690  * @param channel_ctx The context associated with this channel
2691  */
2692   static void
2693 cleanup_channel (void *cls,
2694                 const struct GNUNET_CADET_Channel *channel,
2695                 void *channel_ctx)
2696 {
2697   struct GNUNET_PeerIdentity *peer;
2698   struct PeerContext *peer_ctx;
2699
2700   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
2701       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
2702        // Guess simply casting isn't the nicest way...
2703        // FIXME wait for cadet to change this function
2704
2705   if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
2706   {/* We don't want to implicitly create a context that we're about to kill */
2707     peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
2708     if (NULL == peer_ctx) /* It could have been removed by shutdown_task */
2709       return;
2710
2711     if (get_peer_flag (peer_ctx, TO_DESTROY))
2712     {/* We initiatad the destruction of this particular peer */
2713       if (channel == peer_ctx->send_channel)
2714         peer_ctx->send_channel = NULL;
2715       else if (channel == peer_ctx->recv_channel)
2716         peer_ctx->recv_channel = NULL;
2717
2718       to_file (file_name_view_log,
2719                "-%s\t(cleanup channel, ourself)",
2720                GNUNET_i2s_full (peer));
2721     }
2722
2723     else
2724     { /* We did not initiate the destruction of this peer */
2725       if (channel == peer_ctx->send_channel)
2726       { /* Something (but us) killd the channel - clean up peer */
2727         LOG (GNUNET_ERROR_TYPE_DEBUG,
2728             "send channel (%s) was destroyed - cleaning up\n",
2729             GNUNET_i2s (peer));
2730         peer_ctx->send_channel = NULL;
2731         /* Somwewhat {ab,re}use the iterator function */
2732         /* Cast to void is ok, because it's used as void in peer_remove_cb */
2733         (void) peer_remove_cb ((void *) channel, peer, peer_ctx);
2734       }
2735       else if (channel == peer_ctx->recv_channel)
2736       { /* Other peer doesn't want to send us messages anymore */
2737         LOG (GNUNET_ERROR_TYPE_DEBUG,
2738              "Peer %s destroyed recv channel - cleaning up channel\n",
2739              GNUNET_i2s (peer));
2740         peer_ctx->recv_channel = NULL;
2741       }
2742       else
2743       {
2744         LOG (GNUNET_ERROR_TYPE_WARNING,
2745              "unknown channel (%s) was destroyed\n",
2746              GNUNET_i2s (peer));
2747       }
2748     }
2749   }
2750
2751   else
2752   { /* We don't know a context to that peer */
2753     LOG (GNUNET_ERROR_TYPE_DEBUG,
2754          "channel (%s) without associated context was destroyed\n",
2755          GNUNET_i2s (peer));
2756   }
2757 }
2758
2759
2760 /**
2761  * Actually start the service.
2762  */
2763   static void
2764 rps_start (struct GNUNET_SERVER_Handle *server)
2765 {
2766   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
2767     {&handle_client_request,        NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
2768       sizeof (struct GNUNET_RPS_CS_RequestMessage)},
2769     {&handle_client_request_cancel, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
2770       sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)},
2771     {&handle_client_seed,           NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0},
2772     #ifdef ENABLE_MALICIOUS
2773     {&handle_client_act_malicious,  NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0},
2774     #endif /* ENABLE_MALICIOUS */
2775     {NULL, NULL, 0, 0}
2776   };
2777
2778   GNUNET_SERVER_add_handlers (server, handlers);
2779   GNUNET_SERVER_connect_notify (server,
2780                                 &handle_client_connect,
2781                                 NULL);
2782   GNUNET_SERVER_disconnect_notify (server,
2783                                    &handle_client_disconnect,
2784                                    NULL);
2785   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
2786
2787
2788   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
2789   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
2790
2791   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
2792                                                         &shutdown_task,
2793                                                         NULL);
2794 }
2795
2796
2797 /**
2798  * Process statistics requests.
2799  *
2800  * @param cls closure
2801  * @param server the initialized server
2802  * @param c configuration to use
2803  */
2804   static void
2805 run (void *cls,
2806      struct GNUNET_SERVER_Handle *server,
2807      const struct GNUNET_CONFIGURATION_Handle *c)
2808 {
2809   int size;
2810   int out_size;
2811
2812   // TODO check what this does -- copied from gnunet-boss
2813   // - seems to work as expected
2814   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
2815   cfg = c;
2816
2817
2818   /* Get own ID */
2819   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
2820   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2821               "STARTING SERVICE (rps) for peer [%s]\n",
2822               GNUNET_i2s (&own_identity));
2823   #ifdef ENABLE_MALICIOUS
2824   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2825               "Malicious execution compiled in.\n");
2826   #endif /* ENABLE_MALICIOUS */
2827
2828
2829
2830   /* Get time interval from the configuration */
2831   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
2832                                                         "ROUNDINTERVAL",
2833                                                         &round_interval))
2834   {
2835     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2836                                "RPS", "ROUNDINTERVAL");
2837     GNUNET_SCHEDULER_shutdown ();
2838     return;
2839   }
2840
2841   /* Get initial size of sampler/view from the configuration */
2842   if (GNUNET_OK !=
2843       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
2844         (long long unsigned int *) &sampler_size_est_need))
2845   {
2846     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2847                                "RPS", "INITSIZE");
2848     GNUNET_SCHEDULER_shutdown ();
2849     return;
2850   }
2851   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need);
2852
2853
2854   View_create (4);
2855
2856   /* file_name_view_log */
2857   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
2858   {
2859     LOG (GNUNET_ERROR_TYPE_WARNING,
2860          "Failed to create directory /tmp/rps/\n");
2861   }
2862
2863   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
2864   file_name_view_log = GNUNET_malloc (size);
2865   out_size = GNUNET_snprintf (file_name_view_log,
2866                               size,
2867                               "/tmp/rps/view-%s",
2868                               GNUNET_i2s_full (&own_identity));
2869   if (size < out_size ||
2870       0 > out_size)
2871   {
2872     LOG (GNUNET_ERROR_TYPE_WARNING,
2873          "Failed to write string to buffer (size: %i, out_size: %i)\n",
2874          size,
2875          out_size);
2876   }
2877
2878
2879   /* connect to NSE */
2880   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
2881
2882
2883   alpha = 0.45;
2884   beta  = 0.45;
2885
2886   peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO);
2887
2888
2889   /* Initialise cadet */
2890   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
2891     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        ,
2892       sizeof (struct GNUNET_MessageHeader)},
2893     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2894       sizeof (struct GNUNET_MessageHeader)},
2895     {&handle_peer_pull_reply  , GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY  , 0},
2896     {NULL, 0, 0}
2897   };
2898
2899   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h
2900   cadet_handle = GNUNET_CADET_connect (cfg,
2901                                        cls,
2902                                        &handle_inbound_channel,
2903                                        &cleanup_channel,
2904                                        cadet_handlers,
2905                                        ports);
2906
2907   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
2908
2909   /* Initialise sampler */
2910   struct GNUNET_TIME_Relative half_round_interval;
2911   struct GNUNET_TIME_Relative  max_round_interval;
2912
2913   half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5);
2914   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
2915
2916   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
2917   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
2918
2919   /* Initialise push and pull maps */
2920   push_map = CustomPeerMap_create (4);
2921   pull_map = CustomPeerMap_create (4);
2922
2923
2924   num_hist_update_tasks = 0;
2925
2926
2927   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
2928   GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
2929   // TODO send push/pull to each of those peers?
2930
2931   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
2932                                                    GNUNET_NO,
2933                                                    process_peerinfo_peers,
2934                                                    NULL);
2935
2936   rps_start (server);
2937 }
2938
2939
2940 /**
2941  * The main function for the rps service.
2942  *
2943  * @param argc number of arguments from the command line
2944  * @param argv command line arguments
2945  * @return 0 ok, 1 on error
2946  */
2947   int
2948 main (int argc, char *const *argv)
2949 {
2950   return (GNUNET_OK ==
2951           GNUNET_SERVICE_run (argc,
2952                               argv,
2953                               "rps",
2954                               GNUNET_SERVICE_OPTION_NONE,
2955                               &run, NULL)) ? 0 : 1;
2956 }
2957
2958 /* end of gnunet-service-rps.c */