66ffd173af1f6f629bd23d29c6bbdc101e30c160
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log (kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask) \
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask) \
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time interval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357 #ifdef TO_FILE_FULL
358   /**
359    * Name to log view to
360    */
361   char *file_name_view_log;
362 #endif /* TO_FILE_FULL */
363
364 #ifdef TO_FILE
365 #ifdef TO_FILE_FULL
366   /**
367    * Name to log number of observed peers to
368    */
369   char *file_name_observed_log;
370 #endif /* TO_FILE_FULL */
371
372   /**
373    * @brief Count the observed peers
374    */
375   uint32_t num_observed_peers;
376
377   /**
378    * @brief Multipeermap (ab-) used to count unique peer_ids
379    */
380   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
381 #endif /* TO_FILE */
382
383   /**
384    * List to store peers received through pushes temporary.
385    */
386   struct CustomPeerMap *push_map;
387
388   /**
389    * List to store peers received through pulls temporary.
390    */
391   struct CustomPeerMap *pull_map;
392
393   /**
394    * @brief This is the estimate used as view size.
395    *
396    * It is initialised with the minimum
397    */
398   unsigned int view_size_est_need;
399
400   /**
401    * @brief This is the minimum estimate used as view size.
402    *
403    * It is configured by the user.
404    */
405   unsigned int view_size_est_min;
406
407   /**
408    * @brief The view.
409    */
410   struct View *view;
411
412   /**
413    * Identifier for the main task that runs periodically.
414    */
415   struct GNUNET_SCHEDULER_Task *do_round_task;
416
417   /* === stats === */
418
419   /**
420    * @brief Counts the executed rounds.
421    */
422   uint32_t num_rounds;
423
424   /**
425    * @brief This array accumulates the number of received pushes per round.
426    *
427    * Number at index i represents the number of rounds with i observed pushes.
428    */
429   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
430
431   /**
432    * @brief Histogram of deltas between the expected and actual number of
433    * received pushes.
434    *
435    * As half of the entries are expected to be negative, this is shifted by
436    * #HISTOGRAM_FILE_SLOTS/2.
437    */
438   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
439
440   /**
441    * @brief Number of pull replies with this delay measured in rounds.
442    *
443    * Number at index i represents the number of pull replies with a delay of i
444    * rounds.
445    */
446   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
447 };
448
449
450 /***********************************************************************
451 * Globals
452 ***********************************************************************/
453
454 /**
455  * Our configuration.
456  */
457 static const struct GNUNET_CONFIGURATION_Handle *cfg;
458
459 /**
460  * Handle to the statistics service.
461  */
462 struct GNUNET_STATISTICS_Handle *stats;
463
464 /**
465  * Handler to CADET.
466  */
467 struct GNUNET_CADET_Handle *cadet_handle;
468
469 /**
470  * Handle to CORE
471  */
472 struct GNUNET_CORE_Handle *core_handle;
473
474 /**
475  * @brief PeerMap to keep track of connected peers.
476  */
477 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478
479 /**
480  * Our own identity.
481  */
482 static struct GNUNET_PeerIdentity own_identity;
483
484 /**
485  * Percentage of total peer number in the view
486  * to send random PUSHes to
487  */
488 static float alpha;
489
490 /**
491  * Percentage of total peer number in the view
492  * to send random PULLs to
493  */
494 static float beta;
495
496 /**
497  * Handler to NSE.
498  */
499 static struct GNUNET_NSE_Handle *nse;
500
501 /**
502  * Handler to PEERINFO.
503  */
504 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
505
506 /**
507  * Handle for cancellation of iteration over peers.
508  */
509 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510
511
512 #if ENABLE_MALICIOUS
513 /**
514  * Type of malicious peer
515  *
516  * 0 Don't act malicious at all - Default
517  * 1 Try to maximise representation
518  * 2 Try to partition the network
519  * 3 Combined attack
520  */
521 static uint32_t mal_type;
522
523 /**
524  * Other malicious peers
525  */
526 static struct GNUNET_PeerIdentity *mal_peers;
527
528 /**
529  * Hashmap of malicious peers used as set.
530  * Used to more efficiently check whether we know that peer.
531  */
532 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
533
534 /**
535  * Number of other malicious peers
536  */
537 static uint32_t num_mal_peers;
538
539
540 /**
541  * If type is 2 this is the DLL of attacked peers
542  */
543 static struct AttackedPeer *att_peers_head;
544 static struct AttackedPeer *att_peers_tail;
545
546 /**
547  * This index is used to point to an attacked peer to
548  * implement the round-robin-ish way to select attacked peers.
549  */
550 static struct AttackedPeer *att_peer_index;
551
552 /**
553  * Hashmap of attacked peers used as set.
554  * Used to more efficiently check whether we know that peer.
555  */
556 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
557
558 /**
559  * Number of attacked peers
560  */
561 static uint32_t num_attacked_peers;
562
563 /**
564  * If type is 1 this is the attacked peer
565  */
566 static struct GNUNET_PeerIdentity attacked_peer;
567
568 /**
569  * The limit of PUSHes we can send in one round.
570  * This is an assumption of the Brahms protocol and either implemented
571  * via proof of work
572  * or
573  * assumend to be the bandwidth limitation.
574  */
575 static uint32_t push_limit = 10000;
576 #endif /* ENABLE_MALICIOUS */
577
578 /**
579  * @brief Main Sub.
580  *
581  * This is run in any case by all peers and connects to all peers without
582  * specifying a shared value.
583  */
584 static struct Sub *msub;
585
586 /**
587  * @brief Maximum number of valid peers to keep.
588  * TODO read from config
589  */
590 static const uint32_t num_valid_peers_max = UINT32_MAX;
591
592 /***********************************************************************
593 * /Globals
594 ***********************************************************************/
595
596
597 static void
598 do_round (void *cls);
599
600 static void
601 do_mal_round (void *cls);
602
603
604 /**
605  * @brief Get the #PeerContext associated with a peer
606  *
607  * @param peer_map The peer map containing the context
608  * @param peer the peer id
609  *
610  * @return the #PeerContext
611  */
612 static struct PeerContext *
613 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
614               const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *ctx;
617   int ret;
618
619   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
620   GNUNET_assert (GNUNET_YES == ret);
621   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
622   GNUNET_assert (NULL != ctx);
623   return ctx;
624 }
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                            "# known peers",
679                            GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                            GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ((NULL == peer_ctx->send_channel_ctx) &&
729       (NULL == peer_ctx->recv_channel_ctx))
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780
781   (void) value;
782
783   if (0 >= iterator_cls->index)
784   {
785     iterator_cls->peer = peer;
786     return GNUNET_NO;
787   }
788   iterator_cls->index--;
789   return GNUNET_YES;
790 }
791
792
793 /**
794  * @brief Get a random peer from @a peer_map
795  *
796  * @param valid_peers Peer map containing valid peers from which to select a
797  * random one
798  *
799  * @return a random peer
800  */
801 static const struct GNUNET_PeerIdentity *
802 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
803 {
804   struct GetRandPeerIteratorCls *iterator_cls;
805   const struct GNUNET_PeerIdentity *ret;
806
807   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
808   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
809                                                   GNUNET_CONTAINER_multipeermap_size (
810                                                     valid_peers));
811   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
812                                                 get_rand_peer_iterator,
813                                                 iterator_cls);
814   ret = iterator_cls->peer;
815   GNUNET_free (iterator_cls);
816   return ret;
817 }
818
819
820 /**
821  * @brief Add a given @a peer to valid peers.
822  *
823  * If valid peers are already #num_valid_peers_max, delete a peer previously.
824  *
825  * @param peer The peer that is added to the valid peers.
826  * @param valid_peers Peer map of valid peers to which to add the @a peer
827  *
828  * @return #GNUNET_YES if no other peer had to be removed
829  *         #GNUNET_NO  otherwise
830  */
831 static int
832 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
833                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
834 {
835   const struct GNUNET_PeerIdentity *rand_peer;
836   int ret;
837
838   ret = GNUNET_YES;
839   /* Remove random peers until there is space for a new one */
840   while (num_valid_peers_max <=
841          GNUNET_CONTAINER_multipeermap_size (valid_peers))
842   {
843     rand_peer = get_random_peer_from_peermap (valid_peers);
844     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
845     ret = GNUNET_NO;
846   }
847   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
848                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
849   if (valid_peers == msub->valid_peers)
850   {
851     GNUNET_STATISTICS_set (stats,
852                            "# valid peers",
853                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
854                            GNUNET_NO);
855   }
856   return ret;
857 }
858
859 static void
860 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
861
862 /**
863  * @brief Set the peer flag to living and
864  *        call the pending operations on this peer.
865  *
866  * Also adds peer to #valid_peers.
867  *
868  * @param peer_ctx the #PeerContext of the peer to set online
869  */
870 static void
871 set_peer_online (struct PeerContext *peer_ctx)
872 {
873   struct GNUNET_PeerIdentity *peer;
874   unsigned int i;
875
876   peer = &peer_ctx->peer_id;
877   LOG (GNUNET_ERROR_TYPE_DEBUG,
878        "Peer %s is online and valid, calling %i pending operations on it\n",
879        GNUNET_i2s (peer),
880        peer_ctx->num_pending_ops);
881
882   if (NULL != peer_ctx->online_check_pending)
883   {
884     LOG (GNUNET_ERROR_TYPE_DEBUG,
885          "Removing pending online check for peer %s\n",
886          GNUNET_i2s (&peer_ctx->peer_id));
887     // TODO wait until cadet sets mq->cancel_impl
888     // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
889     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
890     peer_ctx->online_check_pending = NULL;
891   }
892
893   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
894
895   /* Call pending operations */
896   for (i = 0; i < peer_ctx->num_pending_ops; i++)
897   {
898     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
899   }
900   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
901 }
902
903 static void
904 cleanup_destroyed_channel (void *cls,
905                            const struct GNUNET_CADET_Channel *channel);
906
907 /* Declaration of handlers */
908 static void
909 handle_peer_check (void *cls,
910                    const struct GNUNET_MessageHeader *msg);
911
912 static void
913 handle_peer_push (void *cls,
914                   const struct GNUNET_MessageHeader *msg);
915
916 static void
917 handle_peer_pull_request (void *cls,
918                           const struct GNUNET_MessageHeader *msg);
919
920 static int
921 check_peer_pull_reply (void *cls,
922                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
923
924 static void
925 handle_peer_pull_reply (void *cls,
926                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
927
928 /* End declaration of handlers */
929
930 /**
931  * @brief Allocate memory for a new channel context and insert it into DLL
932  *
933  * @param peer_ctx context of the according peer
934  *
935  * @return The channel context
936  */
937 static struct ChannelCtx *
938 add_channel_ctx (struct PeerContext *peer_ctx)
939 {
940   struct ChannelCtx *channel_ctx;
941
942   channel_ctx = GNUNET_new (struct ChannelCtx);
943   channel_ctx->peer_ctx = peer_ctx;
944   return channel_ctx;
945 }
946
947
948 /**
949  * @brief Free memory and NULL pointers.
950  *
951  * @param channel_ctx The channel context.
952  */
953 static void
954 remove_channel_ctx (struct ChannelCtx *channel_ctx)
955 {
956   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
957
958   if (NULL != channel_ctx->destruction_task)
959   {
960     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
961     channel_ctx->destruction_task = NULL;
962   }
963
964   GNUNET_free (channel_ctx);
965
966   if (NULL == peer_ctx)
967     return;
968   if (channel_ctx == peer_ctx->send_channel_ctx)
969   {
970     peer_ctx->send_channel_ctx = NULL;
971     peer_ctx->mq = NULL;
972   }
973   else if (channel_ctx == peer_ctx->recv_channel_ctx)
974   {
975     peer_ctx->recv_channel_ctx = NULL;
976   }
977 }
978
979
980 /**
981  * @brief Get the channel of a peer. If not existing, create.
982  *
983  * @param peer_ctx Context of the peer of which to get the channel
984  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
985  */
986 struct GNUNET_CADET_Channel *
987 get_channel (struct PeerContext *peer_ctx)
988 {
989   /* There exists a copy-paste-clone in run() */
990   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
991     GNUNET_MQ_hd_fixed_size (peer_check,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_fixed_size (peer_push,
996                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
997                              struct GNUNET_MessageHeader,
998                              NULL),
999     GNUNET_MQ_hd_fixed_size (peer_pull_request,
1000                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
1001                              struct GNUNET_MessageHeader,
1002                              NULL),
1003     GNUNET_MQ_hd_var_size (peer_pull_reply,
1004                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1005                            struct GNUNET_RPS_P2P_PullReplyMessage,
1006                            NULL),
1007     GNUNET_MQ_handler_end ()
1008   };
1009
1010
1011   if (NULL == peer_ctx->send_channel_ctx)
1012   {
1013     LOG (GNUNET_ERROR_TYPE_DEBUG,
1014          "Trying to establish channel to peer %s\n",
1015          GNUNET_i2s (&peer_ctx->peer_id));
1016     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1017     peer_ctx->send_channel_ctx->channel =
1018       GNUNET_CADET_channel_create (cadet_handle,
1019                                    peer_ctx->send_channel_ctx,  /* context */
1020                                    &peer_ctx->peer_id,
1021                                    &peer_ctx->sub->hash,
1022                                    NULL,  /* WindowSize handler */
1023                                    &cleanup_destroyed_channel,  /* Disconnect handler */
1024                                    cadet_handlers);
1025   }
1026   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1027   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1028   return peer_ctx->send_channel_ctx->channel;
1029 }
1030
1031
1032 /**
1033  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1034  *
1035  * If we already have a message queue open to this client,
1036  * simply return it, otherways create one.
1037  *
1038  * @param peer_ctx Context of the peer of whicht to get the mq
1039  * @return the #GNUNET_MQ_Handle
1040  */
1041 static struct GNUNET_MQ_Handle *
1042 get_mq (struct PeerContext *peer_ctx)
1043 {
1044   if (NULL == peer_ctx->mq)
1045   {
1046     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1047   }
1048   return peer_ctx->mq;
1049 }
1050
1051 /**
1052  * @brief Add an envelope to a message passed to mq to list of pending messages
1053  *
1054  * @param peer_ctx Context of the peer for which to insert the envelope
1055  * @param ev envelope to the message
1056  * @param type type of the message to be sent
1057  * @return pointer to pending message
1058  */
1059 static struct PendingMessage *
1060 insert_pending_message (struct PeerContext *peer_ctx,
1061                         struct GNUNET_MQ_Envelope *ev,
1062                         const char *type)
1063 {
1064   struct PendingMessage *pending_msg;
1065
1066   pending_msg = GNUNET_new (struct PendingMessage);
1067   pending_msg->ev = ev;
1068   pending_msg->peer_ctx = peer_ctx;
1069   pending_msg->type = type;
1070   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1071                                peer_ctx->pending_messages_tail,
1072                                pending_msg);
1073   return pending_msg;
1074 }
1075
1076
1077 /**
1078  * @brief Remove a pending message from the respective DLL
1079  *
1080  * @param pending_msg the pending message to remove
1081  * @param cancel whether to cancel the pending message, too
1082  */
1083 static void
1084 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1085 {
1086   struct PeerContext *peer_ctx;
1087
1088   (void) cancel;
1089
1090   peer_ctx = pending_msg->peer_ctx;
1091   GNUNET_assert (NULL != peer_ctx);
1092   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1093                                peer_ctx->pending_messages_tail,
1094                                pending_msg);
1095   // TODO wait for the cadet implementation of message cancellation
1096   // if (GNUNET_YES == cancel)
1097   // {
1098   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1099   // }
1100   GNUNET_free (pending_msg);
1101 }
1102
1103
1104 /**
1105  * @brief This is called in response to the first message we sent as a
1106  * online check.
1107  *
1108  * @param cls #PeerContext of peer with pending online check
1109  */
1110 static void
1111 mq_online_check_successful (void *cls)
1112 {
1113   struct PeerContext *peer_ctx = cls;
1114
1115   if (NULL != peer_ctx->online_check_pending)
1116   {
1117     LOG (GNUNET_ERROR_TYPE_DEBUG,
1118          "Online check for peer %s was successfull\n",
1119          GNUNET_i2s (&peer_ctx->peer_id));
1120     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1121     peer_ctx->online_check_pending = NULL;
1122     set_peer_online (peer_ctx);
1123     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1124   }
1125 }
1126
1127 /**
1128  * Issue a check whether peer is online
1129  *
1130  * @param peer_ctx the context of the peer
1131  */
1132 static void
1133 check_peer_online (struct PeerContext *peer_ctx)
1134 {
1135   LOG (GNUNET_ERROR_TYPE_DEBUG,
1136        "Get informed about peer %s getting online\n",
1137        GNUNET_i2s (&peer_ctx->peer_id));
1138
1139   struct GNUNET_MQ_Handle *mq;
1140   struct GNUNET_MQ_Envelope *ev;
1141
1142   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1143   peer_ctx->online_check_pending =
1144     insert_pending_message (peer_ctx, ev, "Check online");
1145   mq = get_mq (peer_ctx);
1146   GNUNET_MQ_notify_sent (ev,
1147                          mq_online_check_successful,
1148                          peer_ctx);
1149   GNUNET_MQ_send (mq, ev);
1150   if (peer_ctx->sub == msub)
1151   {
1152     GNUNET_STATISTICS_update (stats,
1153                               "# pending online checks",
1154                               1,
1155                               GNUNET_NO);
1156   }
1157 }
1158
1159
1160 /**
1161  * @brief Check whether function of type #PeerOp was already scheduled
1162  *
1163  * The array with pending operations will probably never grow really big, so
1164  * iterating over it should be ok.
1165  *
1166  * @param peer_ctx Context of the peer to check for the operation
1167  * @param peer_op the operation (#PeerOp) on the peer
1168  *
1169  * @return #GNUNET_YES if this operation is scheduled on that peer
1170  *         #GNUNET_NO  otherwise
1171  */
1172 static int
1173 check_operation_scheduled (const struct PeerContext *peer_ctx,
1174                            const PeerOp peer_op)
1175 {
1176   unsigned int i;
1177
1178   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1179     if (peer_op == peer_ctx->pending_ops[i].op)
1180       return GNUNET_YES;
1181   return GNUNET_NO;
1182 }
1183
1184
1185 /**
1186  * @brief Callback for scheduler to destroy a channel
1187  *
1188  * @param cls Context of the channel
1189  */
1190 static void
1191 destroy_channel (struct ChannelCtx *channel_ctx)
1192 {
1193   struct GNUNET_CADET_Channel *channel;
1194
1195   if (NULL != channel_ctx->destruction_task)
1196   {
1197     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1198     channel_ctx->destruction_task = NULL;
1199   }
1200   GNUNET_assert (channel_ctx->channel != NULL);
1201   channel = channel_ctx->channel;
1202   channel_ctx->channel = NULL;
1203   GNUNET_CADET_channel_destroy (channel);
1204   remove_channel_ctx (channel_ctx);
1205 }
1206
1207
1208 /**
1209  * @brief Destroy a cadet channel.
1210  *
1211  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1212  *
1213  * @param cls
1214  */
1215 static void
1216 destroy_channel_cb (void *cls)
1217 {
1218   struct ChannelCtx *channel_ctx = cls;
1219
1220   channel_ctx->destruction_task = NULL;
1221   destroy_channel (channel_ctx);
1222 }
1223
1224
1225 /**
1226  * @brief Schedule the destruction of a channel for immediately afterwards.
1227  *
1228  * In case a channel is to be destroyed from within the callback to the
1229  * destruction of another channel (send channel), we cannot call
1230  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1231  * construction.
1232  *
1233  * @param channel_ctx channel to be destroyed.
1234  */
1235 static void
1236 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1237 {
1238   GNUNET_assert (NULL ==
1239                  channel_ctx->destruction_task);
1240   GNUNET_assert (NULL !=
1241                  channel_ctx->channel);
1242   channel_ctx->destruction_task =
1243     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1244                               channel_ctx);
1245 }
1246
1247
1248 /**
1249  * @brief Remove peer
1250  *
1251  * - Empties the list with pending operations
1252  * - Empties the list with pending messages
1253  * - Cancels potentially existing online check
1254  * - Schedules closing of send and recv channels
1255  * - Removes peer from peer map
1256  *
1257  * @param peer_ctx Context of the peer to be destroyed
1258  * @return #GNUNET_YES if peer was removed
1259  *         #GNUNET_NO  otherwise
1260  */
1261 static int
1262 destroy_peer (struct PeerContext *peer_ctx)
1263 {
1264   GNUNET_assert (NULL != peer_ctx);
1265   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1266   if (GNUNET_NO ==
1267       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1268                                               &peer_ctx->peer_id))
1269   {
1270     return GNUNET_NO;
1271   }
1272   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1273   LOG (GNUNET_ERROR_TYPE_DEBUG,
1274        "Going to remove peer %s\n",
1275        GNUNET_i2s (&peer_ctx->peer_id));
1276   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1277
1278   /* Clear list of pending operations */
1279   // TODO this probably leaks memory
1280   //      ('only' the cls to the function. Not sure what to do with it)
1281   GNUNET_array_grow (peer_ctx->pending_ops,
1282                      peer_ctx->num_pending_ops,
1283                      0);
1284   /* Remove all pending messages */
1285   while (NULL != peer_ctx->pending_messages_head)
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288          "Removing unsent %s\n",
1289          peer_ctx->pending_messages_head->type);
1290     /* Cancle pending message, too */
1291     if ((NULL != peer_ctx->online_check_pending) &&
1292         (0 == memcmp (peer_ctx->pending_messages_head,
1293                       peer_ctx->online_check_pending,
1294                       sizeof(struct PendingMessage))))
1295     {
1296       peer_ctx->online_check_pending = NULL;
1297       if (peer_ctx->sub == msub)
1298       {
1299         GNUNET_STATISTICS_update (stats,
1300                                   "# pending online checks",
1301                                   -1,
1302                                   GNUNET_NO);
1303       }
1304     }
1305     remove_pending_message (peer_ctx->pending_messages_head,
1306                             GNUNET_YES);
1307   }
1308
1309   /* If we are still waiting for notification whether this peer is online
1310    * cancel the according task */
1311   if (NULL != peer_ctx->online_check_pending)
1312   {
1313     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1314                 "Removing pending online check for peer %s\n",
1315                 GNUNET_i2s (&peer_ctx->peer_id));
1316     // TODO wait until cadet sets mq->cancel_impl
1317     // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1318     remove_pending_message (peer_ctx->online_check_pending,
1319                             GNUNET_YES);
1320     peer_ctx->online_check_pending = NULL;
1321   }
1322
1323   if (NULL != peer_ctx->send_channel_ctx)
1324   {
1325     /* This is possibly called from within channel destruction */
1326     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1327     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1328     peer_ctx->send_channel_ctx = NULL;
1329     peer_ctx->mq = NULL;
1330   }
1331   if (NULL != peer_ctx->recv_channel_ctx)
1332   {
1333     /* This is possibly called from within channel destruction */
1334     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1335     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1336     peer_ctx->recv_channel_ctx = NULL;
1337   }
1338
1339   if (GNUNET_YES !=
1340       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1341                                                 &peer_ctx->peer_id))
1342   {
1343     LOG (GNUNET_ERROR_TYPE_WARNING,
1344          "removing peer from peer_ctx->sub->peer_map failed\n");
1345   }
1346   if (peer_ctx->sub == msub)
1347   {
1348     GNUNET_STATISTICS_set (stats,
1349                            "# known peers",
1350                            GNUNET_CONTAINER_multipeermap_size (
1351                              peer_ctx->sub->peer_map),
1352                            GNUNET_NO);
1353   }
1354   GNUNET_free (peer_ctx);
1355   return GNUNET_YES;
1356 }
1357
1358
1359 /**
1360  * Iterator over hash map entries. Deletes all contexts of peers.
1361  *
1362  * @param cls closure
1363  * @param key current public key
1364  * @param value value in the hash map
1365  * @return #GNUNET_YES if we should continue to iterate,
1366  *         #GNUNET_NO if not.
1367  */
1368 static int
1369 peermap_clear_iterator (void *cls,
1370                         const struct GNUNET_PeerIdentity *key,
1371                         void *value)
1372 {
1373   struct Sub *sub = cls;
1374
1375   (void) value;
1376
1377   destroy_peer (get_peer_ctx (sub->peer_map, key));
1378   return GNUNET_YES;
1379 }
1380
1381
1382 /**
1383  * @brief This is called once a message is sent.
1384  *
1385  * Removes the pending message
1386  *
1387  * @param cls type of the message that was sent
1388  */
1389 static void
1390 mq_notify_sent_cb (void *cls)
1391 {
1392   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1393
1394   LOG (GNUNET_ERROR_TYPE_DEBUG,
1395        "%s was sent.\n",
1396        pending_msg->type);
1397   if (pending_msg->peer_ctx->sub == msub)
1398   {
1399     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1400       GNUNET_STATISTICS_update (stats, "# pull replys sent", 1, GNUNET_NO);
1401     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1402       GNUNET_STATISTICS_update (stats, "# pull requests sent", 1, GNUNET_NO);
1403     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1404       GNUNET_STATISTICS_update (stats, "# pushes sent", 1, GNUNET_NO);
1405     if ((0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) &&
1406         (NULL != map_single_hop) &&
1407         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1408                                                               &pending_msg->
1409                                                               peer_ctx->peer_id)) )
1410       GNUNET_STATISTICS_update (stats,
1411                                 "# pull requests sent (multi-hop peer)",
1412                                 1,
1413                                 GNUNET_NO);
1414   }
1415   /* Do not cancle message */
1416   remove_pending_message (pending_msg, GNUNET_NO);
1417 }
1418
1419
1420 /**
1421  * @brief Iterator function for #store_valid_peers.
1422  *
1423  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1424  * Writes single peer to disk.
1425  *
1426  * @param cls the file handle to write to.
1427  * @param peer current peer
1428  * @param value unused
1429  *
1430  * @return  #GNUNET_YES if we should continue to
1431  *          iterate,
1432  *          #GNUNET_NO if not.
1433  */
1434 static int
1435 store_peer_presistently_iterator (void *cls,
1436                                   const struct GNUNET_PeerIdentity *peer,
1437                                   void *value)
1438 {
1439   const struct GNUNET_DISK_FileHandle *fh = cls;
1440   char peer_string[128];
1441   int size;
1442   ssize_t ret;
1443
1444   (void) value;
1445
1446   if (NULL == peer)
1447   {
1448     return GNUNET_YES;
1449   }
1450   size = GNUNET_snprintf (peer_string,
1451                           sizeof(peer_string),
1452                           "%s\n",
1453                           GNUNET_i2s_full (peer));
1454   GNUNET_assert (53 == size);
1455   ret = GNUNET_DISK_file_write (fh,
1456                                 peer_string,
1457                                 size);
1458   GNUNET_assert (size == ret);
1459   return GNUNET_YES;
1460 }
1461
1462
1463 /**
1464  * @brief Store the peers currently in #valid_peers to disk.
1465  *
1466  * @param sub Sub for which to store the valid peers
1467  */
1468 static void
1469 store_valid_peers (const struct Sub *sub)
1470 {
1471   struct GNUNET_DISK_FileHandle *fh;
1472   uint32_t number_written_peers;
1473   int ret;
1474
1475   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1476   {
1477     return;
1478   }
1479
1480   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1481   if (GNUNET_SYSERR == ret)
1482   {
1483     LOG (GNUNET_ERROR_TYPE_WARNING,
1484          "Not able to create directory for file `%s'\n",
1485          sub->filename_valid_peers);
1486     GNUNET_break (0);
1487   }
1488   else if (GNUNET_NO == ret)
1489   {
1490     LOG (GNUNET_ERROR_TYPE_WARNING,
1491          "Directory for file `%s' exists but is not writable for us\n",
1492          sub->filename_valid_peers);
1493     GNUNET_break (0);
1494   }
1495   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1496                               GNUNET_DISK_OPEN_WRITE
1497                               | GNUNET_DISK_OPEN_CREATE,
1498                               GNUNET_DISK_PERM_USER_READ
1499                               | GNUNET_DISK_PERM_USER_WRITE);
1500   if (NULL == fh)
1501   {
1502     LOG (GNUNET_ERROR_TYPE_WARNING,
1503          "Not able to write valid peers to file `%s'\n",
1504          sub->filename_valid_peers);
1505     return;
1506   }
1507   LOG (GNUNET_ERROR_TYPE_DEBUG,
1508        "Writing %u valid peers to disk\n",
1509        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1510   number_written_peers =
1511     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1512                                            store_peer_presistently_iterator,
1513                                            fh);
1514   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1515   GNUNET_assert (number_written_peers ==
1516                  GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1517 }
1518
1519
1520 /**
1521  * @brief Convert string representation of peer id to peer id.
1522  *
1523  * Counterpart to #GNUNET_i2s_full.
1524  *
1525  * @param string_repr The string representation of the peer id
1526  *
1527  * @return The peer id
1528  */
1529 static const struct GNUNET_PeerIdentity *
1530 s2i_full (const char *string_repr)
1531 {
1532   struct GNUNET_PeerIdentity *peer;
1533   size_t len;
1534   int ret;
1535
1536   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1537   len = strlen (string_repr);
1538   if (52 > len)
1539   {
1540     LOG (GNUNET_ERROR_TYPE_WARNING,
1541          "Not able to convert string representation of PeerID to PeerID\n"
1542          "Sting representation: %s (len %lu) - too short\n",
1543          string_repr,
1544          len);
1545     GNUNET_break (0);
1546   }
1547   else if (52 < len)
1548   {
1549     len = 52;
1550   }
1551   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1552                                                     len,
1553                                                     &peer->public_key);
1554   if (GNUNET_OK != ret)
1555   {
1556     LOG (GNUNET_ERROR_TYPE_WARNING,
1557          "Not able to convert string representation of PeerID to PeerID\n"
1558          "Sting representation: %s\n",
1559          string_repr);
1560     GNUNET_break (0);
1561   }
1562   return peer;
1563 }
1564
1565
1566 /**
1567  * @brief Restore the peers on disk to #valid_peers.
1568  *
1569  * @param sub Sub for which to restore the valid peers
1570  */
1571 static void
1572 restore_valid_peers (const struct Sub *sub)
1573 {
1574   off_t file_size;
1575   uint32_t num_peers;
1576   struct GNUNET_DISK_FileHandle *fh;
1577   char *buf;
1578   ssize_t size_read;
1579   char *iter_buf;
1580   char *str_repr;
1581   const struct GNUNET_PeerIdentity *peer;
1582
1583   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1584   {
1585     return;
1586   }
1587
1588   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1589   {
1590     return;
1591   }
1592   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1593                               GNUNET_DISK_OPEN_READ,
1594                               GNUNET_DISK_PERM_NONE);
1595   GNUNET_assert (NULL != fh);
1596   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1597   num_peers = file_size / 53;
1598   buf = GNUNET_malloc (file_size);
1599   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1600   GNUNET_assert (size_read == file_size);
1601   LOG (GNUNET_ERROR_TYPE_DEBUG,
1602        "Restoring %" PRIu32 " peers from file `%s'\n",
1603        num_peers,
1604        sub->filename_valid_peers);
1605   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1606   {
1607     str_repr = GNUNET_strndup (iter_buf, 53);
1608     peer = s2i_full (str_repr);
1609     GNUNET_free (str_repr);
1610     add_valid_peer (peer, sub->valid_peers);
1611     LOG (GNUNET_ERROR_TYPE_DEBUG,
1612          "Restored valid peer %s from disk\n",
1613          GNUNET_i2s_full (peer));
1614   }
1615   iter_buf = NULL;
1616   GNUNET_free (buf);
1617   LOG (GNUNET_ERROR_TYPE_DEBUG,
1618        "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1619        num_peers,
1620        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1621   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1622   {
1623     LOG (GNUNET_ERROR_TYPE_WARNING,
1624          "Number of restored peers does not match file size. Have probably duplicates.\n");
1625   }
1626   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1627   LOG (GNUNET_ERROR_TYPE_DEBUG,
1628        "Restored %u valid peers from disk\n",
1629        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1630 }
1631
1632
1633 /**
1634  * @brief Delete storage of peers that was created with #initialise_peers ()
1635  *
1636  * @param sub Sub for which the storage is deleted
1637  */
1638 static void
1639 peers_terminate (struct Sub *sub)
1640 {
1641   if (GNUNET_SYSERR ==
1642       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1643                                              &peermap_clear_iterator,
1644                                              sub))
1645   {
1646     LOG (GNUNET_ERROR_TYPE_WARNING,
1647          "Iteration destroying peers was aborted.\n");
1648   }
1649   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1650   sub->peer_map = NULL;
1651   store_valid_peers (sub);
1652   GNUNET_free (sub->filename_valid_peers);
1653   sub->filename_valid_peers = NULL;
1654   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1655   sub->valid_peers = NULL;
1656 }
1657
1658
1659 /**
1660  * Iterator over #valid_peers hash map entries.
1661  *
1662  * @param cls Closure that contains iterator function and closure
1663  * @param peer current peer id
1664  * @param value value in the hash map - unused
1665  * @return #GNUNET_YES if we should continue to
1666  *         iterate,
1667  *         #GNUNET_NO if not.
1668  */
1669 static int
1670 valid_peer_iterator (void *cls,
1671                      const struct GNUNET_PeerIdentity *peer,
1672                      void *value)
1673 {
1674   struct PeersIteratorCls *it_cls = cls;
1675
1676   (void) value;
1677
1678   return it_cls->iterator (it_cls->cls, peer);
1679 }
1680
1681
1682 /**
1683  * @brief Get all currently known, valid peer ids.
1684  *
1685  * @param valid_peers Peer map containing the valid peers in question
1686  * @param iterator function to call on each peer id
1687  * @param it_cls extra argument to @a iterator
1688  * @return the number of key value pairs processed,
1689  *         #GNUNET_SYSERR if it aborted iteration
1690  */
1691 static int
1692 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1693                  PeersIterator iterator,
1694                  void *it_cls)
1695 {
1696   struct PeersIteratorCls *cls;
1697   int ret;
1698
1699   cls = GNUNET_new (struct PeersIteratorCls);
1700   cls->iterator = iterator;
1701   cls->cls = it_cls;
1702   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1703                                                valid_peer_iterator,
1704                                                cls);
1705   GNUNET_free (cls);
1706   return ret;
1707 }
1708
1709
1710 /**
1711  * @brief Add peer to known peers.
1712  *
1713  * This function is called on new peer_ids from 'external' sources
1714  * (client seed, cadet get_peers(), ...)
1715  *
1716  * @param sub Sub with the peer map that the @a peer will be added to
1717  * @param peer the new #GNUNET_PeerIdentity
1718  *
1719  * @return #GNUNET_YES if peer was inserted
1720  *         #GNUNET_NO  otherwise
1721  */
1722 static int
1723 insert_peer (struct Sub *sub,
1724              const struct GNUNET_PeerIdentity *peer)
1725 {
1726   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1727   {
1728     return GNUNET_NO;   /* We already know this peer - nothing to do */
1729   }
1730   (void) create_peer_ctx (sub, peer);
1731   return GNUNET_YES;
1732 }
1733
1734
1735 /**
1736  * @brief Check whether flags on a peer are set.
1737  *
1738  * @param peer_map Peer map that is expected to contain the @a peer
1739  * @param peer the peer to check the flag of
1740  * @param flags the flags to check
1741  *
1742  * @return #GNUNET_SYSERR if peer is not known
1743  *         #GNUNET_YES    if all given flags are set
1744  *         #GNUNET_NO     otherwise
1745  */
1746 static int
1747 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1748                  const struct GNUNET_PeerIdentity *peer,
1749                  enum Peers_PeerFlags flags)
1750 {
1751   struct PeerContext *peer_ctx;
1752
1753   if (GNUNET_NO == check_peer_known (peer_map, peer))
1754   {
1755     return GNUNET_SYSERR;
1756   }
1757   peer_ctx = get_peer_ctx (peer_map, peer);
1758   return check_peer_flag_set (peer_ctx, flags);
1759 }
1760
1761 /**
1762  * @brief Try connecting to a peer to see whether it is online
1763  *
1764  * If not known yet, insert into known peers
1765  *
1766  * @param sub Sub which would contain the @a peer
1767  * @param peer the peer whose online is to be checked
1768  * @return #GNUNET_YES if the check was issued
1769  *         #GNUNET_NO  otherwise
1770  */
1771 static int
1772 issue_peer_online_check (struct Sub *sub,
1773                          const struct GNUNET_PeerIdentity *peer)
1774 {
1775   struct PeerContext *peer_ctx;
1776
1777   (void) insert_peer (sub, peer);   // TODO even needed?
1778   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1779   if ((GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1780       (NULL == peer_ctx->online_check_pending))
1781   {
1782     check_peer_online (peer_ctx);
1783     return GNUNET_YES;
1784   }
1785   return GNUNET_NO;
1786 }
1787
1788
1789 /**
1790  * @brief Check if peer is removable.
1791  *
1792  * Check if
1793  *  - a recv channel exists
1794  *  - there are pending messages
1795  *  - there is no pending pull reply
1796  *
1797  * @param peer_ctx Context of the peer in question
1798  * @return #GNUNET_YES    if peer is removable
1799  *         #GNUNET_NO     if peer is NOT removable
1800  *         #GNUNET_SYSERR if peer is not known
1801  */
1802 static int
1803 check_removable (const struct PeerContext *peer_ctx)
1804 {
1805   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (
1806         peer_ctx->sub->peer_map,
1807         &peer_ctx->peer_id))
1808   {
1809     return GNUNET_SYSERR;
1810   }
1811
1812   if ((NULL != peer_ctx->recv_channel_ctx) ||
1813       (NULL != peer_ctx->pending_messages_head) ||
1814       (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)))
1815   {
1816     return GNUNET_NO;
1817   }
1818   return GNUNET_YES;
1819 }
1820
1821
1822 /**
1823  * @brief Check whether @a peer is actually a peer.
1824  *
1825  * A valid peer is a peer that we know exists eg. we were connected to once.
1826  *
1827  * @param valid_peers Peer map that would contain the @a peer
1828  * @param peer peer in question
1829  *
1830  * @return #GNUNET_YES if peer is valid
1831  *         #GNUNET_NO  if peer is not valid
1832  */
1833 static int
1834 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1835                   const struct GNUNET_PeerIdentity *peer)
1836 {
1837   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1838 }
1839
1840
1841 /**
1842  * @brief Indicate that we want to send to the other peer
1843  *
1844  * This establishes a sending channel
1845  *
1846  * @param peer_ctx Context of the target peer
1847  */
1848 static void
1849 indicate_sending_intention (struct PeerContext *peer_ctx)
1850 {
1851   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1852                                                  &peer_ctx->peer_id));
1853   (void) get_channel (peer_ctx);
1854 }
1855
1856
1857 /**
1858  * @brief Check whether other peer has the intention to send/opened channel
1859  *        towars us
1860  *
1861  * @param peer_ctx Context of the peer in question
1862  *
1863  * @return #GNUNET_YES if peer has the intention to send
1864  *         #GNUNET_NO  otherwise
1865  */
1866 static int
1867 check_peer_send_intention (const struct PeerContext *peer_ctx)
1868 {
1869   if (NULL != peer_ctx->recv_channel_ctx)
1870   {
1871     return GNUNET_YES;
1872   }
1873   return GNUNET_NO;
1874 }
1875
1876
1877 /**
1878  * Handle the channel a peer opens to us.
1879  *
1880  * @param cls The closure - Sub
1881  * @param channel The channel the peer wants to establish
1882  * @param initiator The peer's peer ID
1883  *
1884  * @return initial channel context for the channel
1885  *         (can be NULL -- that's not an error)
1886  */
1887 static void *
1888 handle_inbound_channel (void *cls,
1889                         struct GNUNET_CADET_Channel *channel,
1890                         const struct GNUNET_PeerIdentity *initiator)
1891 {
1892   struct PeerContext *peer_ctx;
1893   struct ChannelCtx *channel_ctx;
1894   struct Sub *sub = cls;
1895
1896   LOG (GNUNET_ERROR_TYPE_DEBUG,
1897        "New channel was established to us (Peer %s).\n",
1898        GNUNET_i2s (initiator));
1899   GNUNET_assert (NULL != channel);  /* according to cadet API */
1900   /* Make sure we 'know' about this peer */
1901   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1902   set_peer_online (peer_ctx);
1903   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1904   channel_ctx = add_channel_ctx (peer_ctx);
1905   channel_ctx->channel = channel;
1906   /* We only accept one incoming channel per peer */
1907   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1908                                                              initiator)))
1909   {
1910     LOG (GNUNET_ERROR_TYPE_WARNING,
1911          "Already got one receive channel. Destroying old one.\n");
1912     GNUNET_break_op (0);
1913     destroy_channel (peer_ctx->recv_channel_ctx);
1914     peer_ctx->recv_channel_ctx = channel_ctx;
1915     /* return the channel context */
1916     return channel_ctx;
1917   }
1918   peer_ctx->recv_channel_ctx = channel_ctx;
1919   return channel_ctx;
1920 }
1921
1922
1923 /**
1924  * @brief Check whether a sending channel towards the given peer exists
1925  *
1926  * @param peer_ctx Context of the peer in question
1927  *
1928  * @return #GNUNET_YES if a sending channel towards that peer exists
1929  *         #GNUNET_NO  otherwise
1930  */
1931 static int
1932 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1933 {
1934   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1935                                      &peer_ctx->peer_id))
1936   {   /* If no such peer exists, there is no channel */
1937     return GNUNET_NO;
1938   }
1939   if (NULL == peer_ctx->send_channel_ctx)
1940   {
1941     return GNUNET_NO;
1942   }
1943   return GNUNET_YES;
1944 }
1945
1946
1947 /**
1948  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1949  *        intention to another peer
1950  *
1951  * @param peer_ctx Context to the peer
1952  * @return #GNUNET_YES if channel was destroyed
1953  *         #GNUNET_NO  otherwise
1954  */
1955 static int
1956 destroy_sending_channel (struct PeerContext *peer_ctx)
1957 {
1958   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1959                                      &peer_ctx->peer_id))
1960   {
1961     return GNUNET_NO;
1962   }
1963   if (NULL != peer_ctx->send_channel_ctx)
1964   {
1965     destroy_channel (peer_ctx->send_channel_ctx);
1966     (void) check_connected (peer_ctx);
1967     return GNUNET_YES;
1968   }
1969   return GNUNET_NO;
1970 }
1971
1972 /**
1973  * @brief Send a message to another peer.
1974  *
1975  * Keeps track about pending messages so they can be properly removed when the
1976  * peer is destroyed.
1977  *
1978  * @param peer_ctx Context of the peer to which the message is to be sent
1979  * @param ev envelope of the message
1980  * @param type type of the message
1981  */
1982 static void
1983 send_message (struct PeerContext *peer_ctx,
1984               struct GNUNET_MQ_Envelope *ev,
1985               const char *type)
1986 {
1987   struct PendingMessage *pending_msg;
1988   struct GNUNET_MQ_Handle *mq;
1989
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "Sending message to %s of type %s\n",
1992               GNUNET_i2s (&peer_ctx->peer_id),
1993               type);
1994   pending_msg = insert_pending_message (peer_ctx, ev, type);
1995   mq = get_mq (peer_ctx);
1996   GNUNET_MQ_notify_sent (ev,
1997                          mq_notify_sent_cb,
1998                          pending_msg);
1999   GNUNET_MQ_send (mq, ev);
2000 }
2001
2002 /**
2003  * @brief Schedule a operation on given peer
2004  *
2005  * Avoids scheduling an operation twice.
2006  *
2007  * @param peer_ctx Context of the peer for which to schedule the operation
2008  * @param peer_op the operation to schedule
2009  * @param cls Closure to @a peer_op
2010  *
2011  * @return #GNUNET_YES if the operation was scheduled
2012  *         #GNUNET_NO  otherwise
2013  */
2014 static int
2015 schedule_operation (struct PeerContext *peer_ctx,
2016                     const PeerOp peer_op,
2017                     void *cls)
2018 {
2019   struct PeerPendingOp pending_op;
2020
2021   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2022                                                  &peer_ctx->peer_id));
2023
2024   // TODO if ONLINE execute immediately
2025
2026   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2027   {
2028     pending_op.op = peer_op;
2029     pending_op.op_cls = cls;
2030     GNUNET_array_append (peer_ctx->pending_ops,
2031                          peer_ctx->num_pending_ops,
2032                          pending_op);
2033     return GNUNET_YES;
2034   }
2035   return GNUNET_NO;
2036 }
2037
2038 /***********************************************************************
2039 * /Old gnunet-service-rps_peers.c
2040 ***********************************************************************/
2041
2042
2043 /***********************************************************************
2044 * Housekeeping with clients
2045 ***********************************************************************/
2046
2047 /**
2048  * Closure used to pass the client and the id to the callback
2049  * that replies to a client's request
2050  */
2051 struct ReplyCls
2052 {
2053   /**
2054    * DLL
2055    */
2056   struct ReplyCls *next;
2057   struct ReplyCls *prev;
2058
2059   /**
2060    * The identifier of the request
2061    */
2062   uint32_t id;
2063
2064   /**
2065    * The handle to the request
2066    */
2067   struct RPS_SamplerRequestHandle *req_handle;
2068
2069   /**
2070    * The client handle to send the reply to
2071    */
2072   struct ClientContext *cli_ctx;
2073 };
2074
2075
2076 /**
2077  * Struct used to store the context of a connected client.
2078  */
2079 struct ClientContext
2080 {
2081   /**
2082    * DLL
2083    */
2084   struct ClientContext *next;
2085   struct ClientContext *prev;
2086
2087   /**
2088    * The message queue to communicate with the client.
2089    */
2090   struct GNUNET_MQ_Handle *mq;
2091
2092   /**
2093    * @brief How many updates this client expects to receive.
2094    */
2095   int64_t view_updates_left;
2096
2097   /**
2098    * @brief Whether this client wants to receive stream updates.
2099    * Either #GNUNET_YES or #GNUNET_NO
2100    */
2101   int8_t stream_update;
2102
2103   /**
2104    * The client handle to send the reply to
2105    */
2106   struct GNUNET_SERVICE_Client *client;
2107
2108   /**
2109    * The #Sub this context belongs to
2110    */
2111   struct Sub *sub;
2112 };
2113
2114 /**
2115  * DLL with all clients currently connected to us
2116  */
2117 struct ClientContext *cli_ctx_head;
2118 struct ClientContext *cli_ctx_tail;
2119
2120 /***********************************************************************
2121 * /Housekeeping with clients
2122 ***********************************************************************/
2123
2124
2125
2126
2127
2128 /***********************************************************************
2129 * Util functions
2130 ***********************************************************************/
2131
2132
2133 /**
2134  * Print peerlist to log.
2135  */
2136 static void
2137 print_peer_list (struct GNUNET_PeerIdentity *list,
2138                  unsigned int len)
2139 {
2140   unsigned int i;
2141
2142   LOG (GNUNET_ERROR_TYPE_DEBUG,
2143        "Printing peer list of length %u at %p:\n",
2144        len,
2145        list);
2146   for (i = 0; i < len; i++)
2147   {
2148     LOG (GNUNET_ERROR_TYPE_DEBUG,
2149          "%u. peer: %s\n",
2150          i, GNUNET_i2s (&list[i]));
2151   }
2152 }
2153
2154
2155 /**
2156  * Remove peer from list.
2157  */
2158 static void
2159 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2160                unsigned int *list_size,
2161                const struct GNUNET_PeerIdentity *peer)
2162 {
2163   unsigned int i;
2164   struct GNUNET_PeerIdentity *tmp;
2165
2166   tmp = *peer_list;
2167
2168   LOG (GNUNET_ERROR_TYPE_DEBUG,
2169        "Removing peer %s from list at %p\n",
2170        GNUNET_i2s (peer),
2171        tmp);
2172
2173   for (i = 0; i < *list_size; i++)
2174   {
2175     if (0 == GNUNET_memcmp (&tmp[i], peer))
2176     {
2177       if (i < *list_size - 1)
2178       {       /* Not at the last entry -- shift peers left */
2179         memmove (&tmp[i], &tmp[i + 1],
2180                  ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
2181       }
2182       /* Remove last entry (should be now useless PeerID) */
2183       GNUNET_array_grow (tmp, *list_size, (*list_size) - 1);
2184     }
2185   }
2186   *peer_list = tmp;
2187 }
2188
2189
2190 /**
2191  * Insert PeerID in #view
2192  *
2193  * Called once we know a peer is online.
2194  * Implements #PeerOp
2195  *
2196  * @return GNUNET_OK if peer was actually inserted
2197  *         GNUNET_NO if peer was not inserted
2198  */
2199 static void
2200 insert_in_view_op (void *cls,
2201                    const struct GNUNET_PeerIdentity *peer);
2202
2203 /**
2204  * Insert PeerID in #view
2205  *
2206  * Called once we know a peer is online.
2207  *
2208  * @param sub Sub in with the view to insert in
2209  * @param peer the peer to insert
2210  *
2211  * @return GNUNET_OK if peer was actually inserted
2212  *         GNUNET_NO if peer was not inserted
2213  */
2214 static int
2215 insert_in_view (struct Sub *sub,
2216                 const struct GNUNET_PeerIdentity *peer)
2217 {
2218   struct PeerContext *peer_ctx;
2219   int online;
2220   int ret;
2221
2222   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2223   peer_ctx = get_peer_ctx (sub->peer_map, peer);  // TODO indirection needed?
2224   if ((GNUNET_NO == online) ||
2225       (GNUNET_SYSERR == online))   /* peer is not even known */
2226   {
2227     (void) issue_peer_online_check (sub, peer);
2228     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2229     return GNUNET_NO;
2230   }
2231   /* Open channel towards peer to keep connection open */
2232   indicate_sending_intention (peer_ctx);
2233   ret = View_put (sub->view, peer);
2234   if (peer_ctx->sub == msub)
2235   {
2236     GNUNET_STATISTICS_set (stats,
2237                            "view size",
2238                            View_size (peer_ctx->sub->view),
2239                            GNUNET_NO);
2240   }
2241   return ret;
2242 }
2243
2244
2245 /**
2246  * @brief Send view to client
2247  *
2248  * @param cli_ctx the context of the client
2249  * @param view_array the peerids of the view as array (can be empty)
2250  * @param view_size the size of the view array (can be 0)
2251  */
2252 static void
2253 send_view (const struct ClientContext *cli_ctx,
2254            const struct GNUNET_PeerIdentity *view_array,
2255            uint64_t view_size)
2256 {
2257   struct GNUNET_MQ_Envelope *ev;
2258   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2259   struct Sub *sub;
2260
2261   if (NULL == view_array)
2262   {
2263     if (NULL == cli_ctx->sub)
2264       sub = msub;
2265     else
2266       sub = cli_ctx->sub;
2267     view_size = View_size (sub->view);
2268     view_array = View_get_as_array (sub->view);
2269   }
2270
2271   ev = GNUNET_MQ_msg_extra (out_msg,
2272                             view_size * sizeof(struct GNUNET_PeerIdentity),
2273                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2274   out_msg->num_peers = htonl (view_size);
2275
2276   GNUNET_memcpy (&out_msg[1],
2277                  view_array,
2278                  view_size * sizeof(struct GNUNET_PeerIdentity));
2279   GNUNET_MQ_send (cli_ctx->mq, ev);
2280 }
2281
2282
2283 /**
2284  * @brief Send peer from biased stream to client.
2285  *
2286  * TODO merge with send_view, parameterise
2287  *
2288  * @param cli_ctx the context of the client
2289  * @param view_array the peerids of the view as array (can be empty)
2290  * @param view_size the size of the view array (can be 0)
2291  */
2292 static void
2293 send_stream_peers (const struct ClientContext *cli_ctx,
2294                    uint64_t num_peers,
2295                    const struct GNUNET_PeerIdentity *peers)
2296 {
2297   struct GNUNET_MQ_Envelope *ev;
2298   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2299
2300   GNUNET_assert (NULL != peers);
2301
2302   ev = GNUNET_MQ_msg_extra (out_msg,
2303                             num_peers * sizeof(struct GNUNET_PeerIdentity),
2304                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2305   out_msg->num_peers = htonl (num_peers);
2306
2307   GNUNET_memcpy (&out_msg[1],
2308                  peers,
2309                  num_peers * sizeof(struct GNUNET_PeerIdentity));
2310   GNUNET_MQ_send (cli_ctx->mq, ev);
2311 }
2312
2313
2314 /**
2315  * @brief sends updates to clients that are interested
2316  *
2317  * @param sub Sub for which to notify clients
2318  */
2319 static void
2320 clients_notify_view_update (const struct Sub *sub)
2321 {
2322   struct ClientContext *cli_ctx_iter;
2323   uint64_t num_peers;
2324   const struct GNUNET_PeerIdentity *view_array;
2325
2326   num_peers = View_size (sub->view);
2327   view_array = View_get_as_array (sub->view);
2328   /* check size of view is small enough */
2329   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2330   {
2331     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2332                 "View is too big to send\n");
2333     return;
2334   }
2335
2336   for (cli_ctx_iter = cli_ctx_head;
2337        NULL != cli_ctx_iter;
2338        cli_ctx_iter = cli_ctx_iter->next)
2339   {
2340     if (1 < cli_ctx_iter->view_updates_left)
2341     {
2342       /* Client wants to receive limited amount of updates */
2343       cli_ctx_iter->view_updates_left -= 1;
2344     }
2345     else if (1 == cli_ctx_iter->view_updates_left)
2346     {
2347       /* Last update of view for client */
2348       cli_ctx_iter->view_updates_left = -1;
2349     }
2350     else if (0 > cli_ctx_iter->view_updates_left)
2351     {
2352       /* Client is not interested in updates */
2353       continue;
2354     }
2355     /* else _updates_left == 0 - infinite amount of updates */
2356
2357     /* send view */
2358     send_view (cli_ctx_iter, view_array, num_peers);
2359   }
2360 }
2361
2362
2363 /**
2364  * @brief sends updates to clients that are interested
2365  *
2366  * @param num_peers Number of peers to send
2367  * @param peers the array of peers to send
2368  */
2369 static void
2370 clients_notify_stream_peer (const struct Sub *sub,
2371                             uint64_t num_peers,
2372                             const struct GNUNET_PeerIdentity *peers)
2373 // TODO enum StreamPeerSource)
2374 {
2375   struct ClientContext *cli_ctx_iter;
2376
2377   LOG (GNUNET_ERROR_TYPE_DEBUG,
2378        "Got peer (%s) from biased stream - update all clients\n",
2379        GNUNET_i2s (peers));
2380
2381   for (cli_ctx_iter = cli_ctx_head;
2382        NULL != cli_ctx_iter;
2383        cli_ctx_iter = cli_ctx_iter->next)
2384   {
2385     if ((GNUNET_YES == cli_ctx_iter->stream_update) &&
2386         ((sub == cli_ctx_iter->sub) ||(sub == msub) ))
2387     {
2388       send_stream_peers (cli_ctx_iter, num_peers, peers);
2389     }
2390   }
2391 }
2392
2393
2394 /**
2395  * Put random peer from sampler into the view as history update.
2396  *
2397  * @param ids Array of Peers to insert into view
2398  * @param num_peers Number of peers to insert
2399  * @param cls Closure - The Sub for which this is to be done
2400  */
2401 static void
2402 hist_update (const struct GNUNET_PeerIdentity *ids,
2403              uint32_t num_peers,
2404              void *cls)
2405 {
2406   unsigned int i;
2407   struct Sub *sub = cls;
2408
2409   for (i = 0; i < num_peers; i++)
2410   {
2411     int inserted;
2412     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2413     {
2414       LOG (GNUNET_ERROR_TYPE_WARNING,
2415            "Peer in history update not known!\n");
2416       continue;
2417     }
2418     inserted = insert_in_view (sub, &ids[i]);
2419     if (GNUNET_OK == inserted)
2420     {
2421       clients_notify_stream_peer (sub, 1, &ids[i]);
2422     }
2423 #ifdef TO_FILE_FULL
2424     to_file (sub->file_name_view_log,
2425              "+%s\t(hist)",
2426              GNUNET_i2s_full (ids));
2427 #endif /* TO_FILE_FULL */
2428   }
2429   clients_notify_view_update (sub);
2430 }
2431
2432
2433 /**
2434  * Wrapper around #RPS_sampler_resize()
2435  *
2436  * If we do not have enough sampler elements, double current sampler size
2437  * If we have more than enough sampler elements, halv current sampler size
2438  *
2439  * @param sampler The sampler to resize
2440  * @param new_size New size to which to resize
2441  */
2442 static void
2443 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2444 {
2445   unsigned int sampler_size;
2446
2447   // TODO statistics
2448   // TODO respect the min, max
2449   sampler_size = RPS_sampler_get_size (sampler);
2450   if (sampler_size > new_size * 4)
2451   {   /* Shrinking */
2452     RPS_sampler_resize (sampler, sampler_size / 2);
2453   }
2454   else if (sampler_size < new_size)
2455   {   /* Growing */
2456     RPS_sampler_resize (sampler, sampler_size * 2);
2457   }
2458   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2459 }
2460
2461
2462 /**
2463  * Add all peers in @a peer_array to @a peer_map used as set.
2464  *
2465  * @param peer_array array containing the peers
2466  * @param num_peers number of peers in @peer_array
2467  * @param peer_map the peermap to use as set
2468  */
2469 static void
2470 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2471                        unsigned int num_peers,
2472                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2473 {
2474   unsigned int i;
2475
2476   if (NULL == peer_map)
2477   {
2478     LOG (GNUNET_ERROR_TYPE_WARNING,
2479          "Trying to add peers to non-existing peermap.\n");
2480     return;
2481   }
2482
2483   for (i = 0; i < num_peers; i++)
2484   {
2485     GNUNET_CONTAINER_multipeermap_put (peer_map,
2486                                        &peer_array[i],
2487                                        NULL,
2488                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2489     if (msub->peer_map == peer_map)
2490     {
2491       GNUNET_STATISTICS_set (stats,
2492                              "# known peers",
2493                              GNUNET_CONTAINER_multipeermap_size (peer_map),
2494                              GNUNET_NO);
2495     }
2496   }
2497 }
2498
2499
2500 /**
2501  * Send a PULL REPLY to @a peer_id
2502  *
2503  * @param peer_ctx Context of the peer to send the reply to
2504  * @param peer_ids the peers to send to @a peer_id
2505  * @param num_peer_ids the number of peers to send to @a peer_id
2506  */
2507 static void
2508 send_pull_reply (struct PeerContext *peer_ctx,
2509                  const struct GNUNET_PeerIdentity *peer_ids,
2510                  unsigned int num_peer_ids)
2511 {
2512   uint32_t send_size;
2513   struct GNUNET_MQ_Envelope *ev;
2514   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2515
2516   /* Compute actual size */
2517   send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)
2518               + num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
2519
2520   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2521     /* Compute number of peers to send
2522      * If too long, simply truncate */
2523     // TODO select random ones via permutation
2524     //      or even better: do good protocol design
2525     send_size =
2526       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE
2527        - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage))
2528       / sizeof(struct GNUNET_PeerIdentity);
2529   else
2530     send_size = num_peer_ids;
2531
2532   LOG (GNUNET_ERROR_TYPE_DEBUG,
2533        "Going to send PULL REPLY with %u peers to %s\n",
2534        send_size, GNUNET_i2s (&peer_ctx->peer_id));
2535
2536   ev = GNUNET_MQ_msg_extra (out_msg,
2537                             send_size * sizeof(struct GNUNET_PeerIdentity),
2538                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2539   out_msg->num_peers = htonl (send_size);
2540   GNUNET_memcpy (&out_msg[1], peer_ids,
2541                  send_size * sizeof(struct GNUNET_PeerIdentity));
2542
2543   send_message (peer_ctx, ev, "PULL REPLY");
2544   if (peer_ctx->sub == msub)
2545   {
2546     GNUNET_STATISTICS_update (stats, "# pull reply send issued", 1, GNUNET_NO);
2547   }
2548   // TODO check with send intention: as send_channel is used/opened we indicate
2549   // a sending intention without intending it.
2550   // -> clean peer afterwards?
2551   // -> use recv_channel?
2552 }
2553
2554
2555 /**
2556  * Insert PeerID in #pull_map
2557  *
2558  * Called once we know a peer is online.
2559  *
2560  * @param cls Closure - Sub with the pull map to insert into
2561  * @param peer Peer to insert
2562  */
2563 static void
2564 insert_in_pull_map (void *cls,
2565                     const struct GNUNET_PeerIdentity *peer)
2566 {
2567   struct Sub *sub = cls;
2568
2569   CustomPeerMap_put (sub->pull_map, peer);
2570 }
2571
2572
2573 /**
2574  * Insert PeerID in #view
2575  *
2576  * Called once we know a peer is online.
2577  * Implements #PeerOp
2578  *
2579  * @param cls Closure - Sub with view to insert peer into
2580  * @param peer the peer to insert
2581  */
2582 static void
2583 insert_in_view_op (void *cls,
2584                    const struct GNUNET_PeerIdentity *peer)
2585 {
2586   struct Sub *sub = cls;
2587   int inserted;
2588
2589   inserted = insert_in_view (sub, peer);
2590   if (GNUNET_OK == inserted)
2591   {
2592     clients_notify_stream_peer (sub, 1, peer);
2593   }
2594 }
2595
2596
2597 /**
2598  * Update sampler with given PeerID.
2599  * Implements #PeerOp
2600  *
2601  * @param cls Closure - Sub containing the sampler to insert into
2602  * @param peer Peer to insert
2603  */
2604 static void
2605 insert_in_sampler (void *cls,
2606                    const struct GNUNET_PeerIdentity *peer)
2607 {
2608   struct Sub *sub = cls;
2609
2610   LOG (GNUNET_ERROR_TYPE_DEBUG,
2611        "Updating samplers with peer %s from insert_in_sampler()\n",
2612        GNUNET_i2s (peer));
2613   RPS_sampler_update (sub->sampler, peer);
2614   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2615   {
2616     /* Make sure we 'know' about this peer */
2617     (void) issue_peer_online_check (sub, peer);
2618     /* Establish a channel towards that peer to indicate we are going to send
2619      * messages to it */
2620     // indicate_sending_intention (peer);
2621   }
2622   if (sub == msub)
2623   {
2624     GNUNET_STATISTICS_update (stats,
2625                               "# observed peers in gossip",
2626                               1,
2627                               GNUNET_NO);
2628   }
2629 #ifdef TO_FILE
2630   sub->num_observed_peers++;
2631   GNUNET_CONTAINER_multipeermap_put
2632     (sub->observed_unique_peers,
2633     peer,
2634     NULL,
2635     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2636   uint32_t num_observed_unique_peers =
2637     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2638   GNUNET_STATISTICS_set (stats,
2639                          "# unique peers in gossip",
2640                          num_observed_unique_peers,
2641                          GNUNET_NO);
2642 #ifdef TO_FILE_FULL
2643   to_file (sub->file_name_observed_log,
2644            "%" PRIu32 " %" PRIu32 " %f\n",
2645            sub->num_observed_peers,
2646            num_observed_unique_peers,
2647            1.0 * num_observed_unique_peers / sub->num_observed_peers)
2648 #endif /* TO_FILE_FULL */
2649 #endif /* TO_FILE */
2650 }
2651
2652
2653 /**
2654  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2655  *        If the peer is not known, online check is issued and it is
2656  *        scheduled to be inserted in sampler and view.
2657  *
2658  * "External sources" refer to every source except the gossip.
2659  *
2660  * @param sub Sub for which @a peer was received
2661  * @param peer peer to insert/peer received
2662  */
2663 static void
2664 got_peer (struct Sub *sub,
2665           const struct GNUNET_PeerIdentity *peer)
2666 {
2667   /* If we did not know this peer already, insert it into sampler and view */
2668   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2669   {
2670     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2671                         &insert_in_sampler, sub);
2672     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2673                         &insert_in_view_op, sub);
2674   }
2675   if (sub == msub)
2676   {
2677     GNUNET_STATISTICS_update (stats,
2678                               "# learnd peers",
2679                               1,
2680                               GNUNET_NO);
2681   }
2682 }
2683
2684
2685 /**
2686  * @brief Checks if there is a sending channel and if it is needed
2687  *
2688  * @param peer_ctx Context of the peer to check
2689  * @return GNUNET_YES if sending channel exists and is still needed
2690  *         GNUNET_NO  otherwise
2691  */
2692 static int
2693 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2694 {
2695   /* struct GNUNET_CADET_Channel *channel; */
2696   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2697                                      &peer_ctx->peer_id))
2698   {
2699     return GNUNET_NO;
2700   }
2701   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2702   {
2703     if ((0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2704                                    &peer_ctx->peer_id)) ||
2705         (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2706                                            &peer_ctx->peer_id)) ||
2707         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2708                                                     &peer_ctx->peer_id)) ||
2709         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2710                                                     &peer_ctx->peer_id)) ||
2711         (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2712                                         &peer_ctx->peer_id,
2713                                         Peers_PULL_REPLY_PENDING)))
2714     {     /* If we want to keep the connection to peer open */
2715       return GNUNET_YES;
2716     }
2717     return GNUNET_NO;
2718   }
2719   return GNUNET_NO;
2720 }
2721
2722
2723 /**
2724  * @brief remove peer from our knowledge, the view, push and pull maps and
2725  * samplers.
2726  *
2727  * @param sub Sub with the data structures the peer is to be removed from
2728  * @param peer the peer to remove
2729  */
2730 static void
2731 remove_peer (struct Sub *sub,
2732              const struct GNUNET_PeerIdentity *peer)
2733 {
2734   (void) View_remove_peer (sub->view,
2735                            peer);
2736   CustomPeerMap_remove_peer (sub->pull_map,
2737                              peer);
2738   CustomPeerMap_remove_peer (sub->push_map,
2739                              peer);
2740   RPS_sampler_reinitialise_by_value (sub->sampler,
2741                                      peer);
2742   /* We want to destroy the peer now.
2743    * Sometimes, it just seems that it's already been removed from the peer_map,
2744    * so check the peer_map first. */
2745   if (GNUNET_YES == check_peer_known (sub->peer_map,
2746                                       peer))
2747   {
2748     destroy_peer (get_peer_ctx (sub->peer_map,
2749                                 peer));
2750   }
2751 }
2752
2753
2754 /**
2755  * @brief Remove data that is not needed anymore.
2756  *
2757  * If the sending channel is no longer needed it is destroyed.
2758  *
2759  * @param sub Sub in which the current peer is to be cleaned
2760  * @param peer the peer whose data is about to be cleaned
2761  */
2762 static void
2763 clean_peer (struct Sub *sub,
2764             const struct GNUNET_PeerIdentity *peer)
2765 {
2766   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2767                                                                peer)))
2768   {
2769     LOG (GNUNET_ERROR_TYPE_DEBUG,
2770          "Going to remove send channel to peer %s\n",
2771          GNUNET_i2s (peer));
2772     #if ENABLE_MALICIOUS
2773     if (0 != GNUNET_memcmp (&attacked_peer,
2774                             peer))
2775       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2776                                                     peer));
2777     #else /* ENABLE_MALICIOUS */
2778     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2779                                                   peer));
2780     #endif /* ENABLE_MALICIOUS */
2781   }
2782
2783   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2784                                                            peer))
2785   {
2786     /* Peer was already removed by callback on destroyed channel */
2787     LOG (GNUNET_ERROR_TYPE_WARNING,
2788          "Peer was removed from our knowledge during cleanup\n");
2789     return;
2790   }
2791
2792   if ((GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2793                                                              peer))) &&
2794       (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2795       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2796       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2797       (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2798       (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))))
2799   {   /* We can safely remove this peer */
2800     LOG (GNUNET_ERROR_TYPE_DEBUG,
2801          "Going to remove peer %s\n",
2802          GNUNET_i2s (peer));
2803     remove_peer (sub, peer);
2804     return;
2805   }
2806 }
2807
2808
2809 /**
2810  * @brief This is called when a channel is destroyed.
2811  *
2812  * Removes peer completely from our knowledge if the send_channel was destroyed
2813  * Otherwise simply delete the recv_channel
2814  * Also check if the knowledge about this peer is still needed.
2815  * If not, remove this peer from our knowledge.
2816  *
2817  * @param cls The closure - Context to the channel
2818  * @param channel The channel being closed
2819  */
2820 static void
2821 cleanup_destroyed_channel (void *cls,
2822                            const struct GNUNET_CADET_Channel *channel)
2823 {
2824   struct ChannelCtx *channel_ctx = cls;
2825   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2826
2827   (void) channel;
2828
2829   channel_ctx->channel = NULL;
2830   remove_channel_ctx (channel_ctx);
2831   if ((NULL != peer_ctx)&&
2832       (peer_ctx->send_channel_ctx == channel_ctx) &&
2833       (GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx)) )
2834   {
2835     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2836   }
2837 }
2838
2839 /***********************************************************************
2840 * /Util functions
2841 ***********************************************************************/
2842
2843
2844
2845 /***********************************************************************
2846 * Sub
2847 ***********************************************************************/
2848
2849 /**
2850  * @brief Create a new Sub
2851  *
2852  * @param hash Hash of value shared among rps instances on other hosts that
2853  *        defines a subgroup to sample from.
2854  * @param sampler_size Size of the sampler
2855  * @param round_interval Interval (in average) between two rounds
2856  *
2857  * @return Sub
2858  */
2859 struct Sub *
2860 new_sub (const struct GNUNET_HashCode *hash,
2861          uint32_t sampler_size,
2862          struct GNUNET_TIME_Relative round_interval)
2863 {
2864   struct Sub *sub;
2865
2866   sub = GNUNET_new (struct Sub);
2867
2868   /* With the hash generated from the secret value this service only connects
2869    * to rps instances that share the value */
2870   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2871     GNUNET_MQ_hd_fixed_size (peer_check,
2872                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2873                              struct GNUNET_MessageHeader,
2874                              NULL),
2875     GNUNET_MQ_hd_fixed_size (peer_push,
2876                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2877                              struct GNUNET_MessageHeader,
2878                              NULL),
2879     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2880                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2881                              struct GNUNET_MessageHeader,
2882                              NULL),
2883     GNUNET_MQ_hd_var_size (peer_pull_reply,
2884                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2885                            struct GNUNET_RPS_P2P_PullReplyMessage,
2886                            NULL),
2887     GNUNET_MQ_handler_end ()
2888   };
2889   sub->hash = *hash;
2890   sub->cadet_port =
2891     GNUNET_CADET_open_port (cadet_handle,
2892                             &sub->hash,
2893                             &handle_inbound_channel, /* Connect handler */
2894                             sub, /* cls */
2895                             NULL, /* WindowSize handler */
2896                             &cleanup_destroyed_channel, /* Disconnect handler */
2897                             cadet_handlers);
2898   if (NULL == sub->cadet_port)
2899   {
2900     LOG (GNUNET_ERROR_TYPE_ERROR,
2901          "Cadet port `%s' is already in use.\n",
2902          GNUNET_APPLICATION_PORT_RPS);
2903     GNUNET_assert (0);
2904   }
2905
2906   /* Set up general data structure to keep track about peers */
2907   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2908   if (GNUNET_OK !=
2909       GNUNET_CONFIGURATION_get_value_filename (cfg,
2910                                                "rps",
2911                                                "FILENAME_VALID_PEERS",
2912                                                &sub->filename_valid_peers))
2913   {
2914     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2915                                "rps",
2916                                "FILENAME_VALID_PEERS");
2917   }
2918   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2919   {
2920     char *tmp_filename_valid_peers;
2921     char str_hash[105];
2922
2923     GNUNET_snprintf (str_hash,
2924                      sizeof(str_hash),
2925                      GNUNET_h2s_full (hash));
2926     tmp_filename_valid_peers = sub->filename_valid_peers;
2927     GNUNET_asprintf (&sub->filename_valid_peers,
2928                      "%s%s",
2929                      tmp_filename_valid_peers,
2930                      str_hash);
2931     GNUNET_free (tmp_filename_valid_peers);
2932   }
2933   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2934
2935   /* Set up the sampler */
2936   sub->sampler_size_est_min = sampler_size;
2937   sub->sampler_size_est_need = sampler_size;;
2938   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2939   GNUNET_assert (0 != round_interval.rel_value_us);
2940   sub->round_interval = round_interval;
2941   sub->sampler = RPS_sampler_init (sampler_size,
2942                                    round_interval);
2943
2944   /* Logging of internals */
2945 #ifdef TO_FILE_FULL
2946   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2947 #endif /* TO_FILE_FULL */
2948 #ifdef TO_FILE
2949 #ifdef TO_FILE_FULL
2950   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2951                                                         "observed");
2952 #endif /* TO_FILE_FULL */
2953   sub->num_observed_peers = 0;
2954   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2955                                                                      GNUNET_NO);
2956 #endif /* TO_FILE */
2957
2958   /* Set up data structures for gossip */
2959   sub->push_map = CustomPeerMap_create (4);
2960   sub->pull_map = CustomPeerMap_create (4);
2961   sub->view_size_est_min = sampler_size;;
2962   sub->view = View_create (sub->view_size_est_min);
2963   if (sub == msub)
2964   {
2965     GNUNET_STATISTICS_set (stats,
2966                            "view size aim",
2967                            sub->view_size_est_min,
2968                            GNUNET_NO);
2969   }
2970
2971   /* Start executing rounds */
2972   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2973
2974   return sub;
2975 }
2976
2977
2978 #ifdef TO_FILE
2979 /**
2980  * @brief Write all numbers in the given array into the given file
2981  *
2982  * Single numbers devided by a newline
2983  *
2984  * @param hist_array[] the array to dump
2985  * @param file_name file to dump into
2986  */
2987 static void
2988 write_histogram_to_file (const uint32_t hist_array[],
2989                          const char *file_name)
2990 {
2991   char collect_str[SIZE_DUMP_FILE + 1] = "";
2992   char *recv_str_iter;
2993   char *file_name_full;
2994
2995   recv_str_iter = collect_str;
2996   file_name_full = store_prefix_file_name (&own_identity,
2997                                            file_name);
2998   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2999   {
3000     char collect_str_tmp[8];
3001
3002     GNUNET_snprintf (collect_str_tmp,
3003                      sizeof(collect_str_tmp),
3004                      "%" PRIu32 "\n",
3005                      hist_array[i]);
3006     recv_str_iter = stpncpy (recv_str_iter,
3007                              collect_str_tmp,
3008                              6);
3009   }
3010   (void) stpcpy (recv_str_iter,
3011                  "\n");
3012   LOG (GNUNET_ERROR_TYPE_DEBUG,
3013        "Writing push stats to disk\n");
3014   to_file_w_len (file_name_full,
3015                  SIZE_DUMP_FILE,
3016                  collect_str);
3017   GNUNET_free (file_name_full);
3018 }
3019 #endif /* TO_FILE */
3020
3021
3022 /**
3023  * @brief Destroy Sub.
3024  *
3025  * @param sub Sub to destroy
3026  */
3027 static void
3028 destroy_sub (struct Sub *sub)
3029 {
3030   GNUNET_assert (NULL != sub);
3031   GNUNET_assert (NULL != sub->do_round_task);
3032   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3033   sub->do_round_task = NULL;
3034
3035   /* Disconnect from cadet */
3036   GNUNET_CADET_close_port (sub->cadet_port);
3037   sub->cadet_port = NULL;
3038
3039   /* Clean up data structures for peers */
3040   RPS_sampler_destroy (sub->sampler);
3041   sub->sampler = NULL;
3042   View_destroy (sub->view);
3043   sub->view = NULL;
3044   CustomPeerMap_destroy (sub->push_map);
3045   sub->push_map = NULL;
3046   CustomPeerMap_destroy (sub->pull_map);
3047   sub->pull_map = NULL;
3048   peers_terminate (sub);
3049
3050   /* Free leftover data structures */
3051 #ifdef TO_FILE_FULL
3052   GNUNET_free (sub->file_name_view_log);
3053   sub->file_name_view_log = NULL;
3054 #endif /* TO_FILE_FULL */
3055 #ifdef TO_FILE
3056 #ifdef TO_FILE_FULL
3057   GNUNET_free (sub->file_name_observed_log);
3058   sub->file_name_observed_log = NULL;
3059 #endif /* TO_FILE_FULL */
3060
3061   /* Write push frequencies to disk */
3062   write_histogram_to_file (sub->push_recv,
3063                            "push_recv");
3064
3065   /* Write push deltas to disk */
3066   write_histogram_to_file (sub->push_delta,
3067                            "push_delta");
3068
3069   /* Write pull delays to disk */
3070   write_histogram_to_file (sub->pull_delays,
3071                            "pull_delays");
3072
3073   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3074   sub->observed_unique_peers = NULL;
3075 #endif /* TO_FILE */
3076
3077   GNUNET_free (sub);
3078 }
3079
3080
3081 /***********************************************************************
3082 * /Sub
3083 ***********************************************************************/
3084
3085
3086 /***********************************************************************
3087 * Core handlers
3088 ***********************************************************************/
3089
3090 /**
3091  * @brief Callback on initialisation of Core.
3092  *
3093  * @param cls - unused
3094  * @param my_identity - unused
3095  */
3096 void
3097 core_init (void *cls,
3098            const struct GNUNET_PeerIdentity *my_identity)
3099 {
3100   (void) cls;
3101   (void) my_identity;
3102
3103   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3104 }
3105
3106
3107 /**
3108  * @brief Callback for core.
3109  * Method called whenever a given peer connects.
3110  *
3111  * @param cls closure - unused
3112  * @param peer peer identity this notification is about
3113  * @return closure given to #core_disconnects as peer_cls
3114  */
3115 void *
3116 core_connects (void *cls,
3117                const struct GNUNET_PeerIdentity *peer,
3118                struct GNUNET_MQ_Handle *mq)
3119 {
3120   (void) cls;
3121   (void) mq;
3122
3123   GNUNET_assert (GNUNET_YES ==
3124                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3125                                                     peer,
3126                                                     NULL,
3127                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3128   return NULL;
3129 }
3130
3131
3132 /**
3133  * @brief Callback for core.
3134  * Method called whenever a peer disconnects.
3135  *
3136  * @param cls closure - unused
3137  * @param peer peer identity this notification is about
3138  * @param peer_cls closure given in #core_connects - unused
3139  */
3140 void
3141 core_disconnects (void *cls,
3142                   const struct GNUNET_PeerIdentity *peer,
3143                   void *peer_cls)
3144 {
3145   (void) cls;
3146   (void) peer_cls;
3147
3148   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3149 }
3150
3151 /***********************************************************************
3152 * /Core handlers
3153 ***********************************************************************/
3154
3155
3156 /**
3157  * @brief Destroy the context for a (connected) client
3158  *
3159  * @param cli_ctx Context to destroy
3160  */
3161 static void
3162 destroy_cli_ctx (struct ClientContext *cli_ctx)
3163 {
3164   GNUNET_assert (NULL != cli_ctx);
3165   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3166                                cli_ctx_tail,
3167                                cli_ctx);
3168   if (NULL != cli_ctx->sub)
3169   {
3170     destroy_sub (cli_ctx->sub);
3171     cli_ctx->sub = NULL;
3172   }
3173   GNUNET_free (cli_ctx);
3174 }
3175
3176
3177 /**
3178  * @brief Update sizes in sampler and view on estimate update from nse service
3179  *
3180  * @param sub Sub
3181  * @param logestimate the log(Base 2) value of the current network size estimate
3182  * @param std_dev standard deviation for the estimate
3183  */
3184 static void
3185 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3186 {
3187   double estimate;
3188
3189   // double scale; // TODO this might go gloabal/config
3190
3191   LOG (GNUNET_ERROR_TYPE_DEBUG,
3192        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3193        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3194   // scale = .01;
3195   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3196   // GNUNET_NSE_log_estimate_to_n (logestimate);
3197   estimate = pow (estimate, 1.0 / 3);
3198   // TODO add if std_dev is a number
3199   // estimate += (std_dev * scale);
3200   if (sub->view_size_est_min < ceil (estimate))
3201   {
3202     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3203     sub->sampler_size_est_need = estimate;
3204     sub->view_size_est_need = estimate;
3205   }
3206   else
3207   {
3208     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3209     // sub->sampler_size_est_need = sub->view_size_est_min;
3210     sub->view_size_est_need = sub->view_size_est_min;
3211   }
3212   if (sub == msub)
3213   {
3214     GNUNET_STATISTICS_set (stats,
3215                            "view size aim",
3216                            sub->view_size_est_need,
3217                            GNUNET_NO);
3218   }
3219
3220   /* If the NSE has changed adapt the lists accordingly */
3221   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3222   View_change_len (sub->view, sub->view_size_est_need);
3223 }
3224
3225
3226 /**
3227  * Function called by NSE.
3228  *
3229  * Updates sizes of sampler list and view and adapt those lists
3230  * accordingly.
3231  *
3232  * implements #GNUNET_NSE_Callback
3233  *
3234  * @param cls Closure - unused
3235  * @param timestamp time when the estimate was received from the server (or created by the server)
3236  * @param logestimate the log(Base 2) value of the current network size estimate
3237  * @param std_dev standard deviation for the estimate
3238  */
3239 static void
3240 nse_callback (void *cls,
3241               struct GNUNET_TIME_Absolute timestamp,
3242               double logestimate, double std_dev)
3243 {
3244   (void) cls;
3245   (void) timestamp;
3246   struct ClientContext *cli_ctx_iter;
3247
3248   adapt_sizes (msub, logestimate, std_dev);
3249   for (cli_ctx_iter = cli_ctx_head;
3250        NULL != cli_ctx_iter;
3251        cli_ctx_iter = cli_ctx_iter->next)
3252   {
3253     if (NULL != cli_ctx_iter->sub)
3254     {
3255       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3256     }
3257   }
3258 }
3259
3260
3261 /**
3262  * @brief This function is called, when the client seeds peers.
3263  * It verifies that @a msg is well-formed.
3264  *
3265  * @param cls the closure (#ClientContext)
3266  * @param msg the message
3267  * @return #GNUNET_OK if @a msg is well-formed
3268  *         #GNUNET_SYSERR otherwise
3269  */
3270 static int
3271 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3272 {
3273   struct ClientContext *cli_ctx = cls;
3274   uint16_t msize = ntohs (msg->header.size);
3275   uint32_t num_peers = ntohl (msg->num_peers);
3276
3277   msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
3278   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3279       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3280   {
3281     LOG (GNUNET_ERROR_TYPE_ERROR,
3282          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3283          ntohl (msg->num_peers),
3284          (msize / sizeof(struct GNUNET_PeerIdentity)));
3285     GNUNET_break (0);
3286     GNUNET_SERVICE_client_drop (cli_ctx->client);
3287     return GNUNET_SYSERR;
3288   }
3289   return GNUNET_OK;
3290 }
3291
3292
3293 /**
3294  * Handle seed from the client.
3295  *
3296  * @param cls closure
3297  * @param message the actual message
3298  */
3299 static void
3300 handle_client_seed (void *cls,
3301                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3302 {
3303   struct ClientContext *cli_ctx = cls;
3304   struct GNUNET_PeerIdentity *peers;
3305   uint32_t num_peers;
3306   uint32_t i;
3307
3308   num_peers = ntohl (msg->num_peers);
3309   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3310
3311   LOG (GNUNET_ERROR_TYPE_DEBUG,
3312        "Client seeded peers:\n");
3313   print_peer_list (peers, num_peers);
3314
3315   for (i = 0; i < num_peers; i++)
3316   {
3317     LOG (GNUNET_ERROR_TYPE_DEBUG,
3318          "Updating samplers with seed %" PRIu32 ": %s\n",
3319          i,
3320          GNUNET_i2s (&peers[i]));
3321
3322     if (NULL != msub)
3323       got_peer (msub, &peers[i]);                 /* Condition needed? */
3324     if (NULL != cli_ctx->sub)
3325       got_peer (cli_ctx->sub, &peers[i]);
3326   }
3327   GNUNET_SERVICE_client_continue (cli_ctx->client);
3328 }
3329
3330
3331 /**
3332  * Handle RPS request from the client.
3333  *
3334  * @param cls Client context
3335  * @param message Message containing the numer of updates the client wants to
3336  * receive
3337  */
3338 static void
3339 handle_client_view_request (void *cls,
3340                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3341 {
3342   struct ClientContext *cli_ctx = cls;
3343   uint64_t num_updates;
3344
3345   num_updates = ntohl (msg->num_updates);
3346
3347   LOG (GNUNET_ERROR_TYPE_DEBUG,
3348        "Client requested %" PRIu64 " updates of view.\n",
3349        num_updates);
3350
3351   GNUNET_assert (NULL != cli_ctx);
3352   cli_ctx->view_updates_left = num_updates;
3353   send_view (cli_ctx, NULL, 0);
3354   GNUNET_SERVICE_client_continue (cli_ctx->client);
3355 }
3356
3357
3358 /**
3359  * @brief Handle the cancellation of the view updates.
3360  *
3361  * @param cls The client context
3362  * @param msg Unused
3363  */
3364 static void
3365 handle_client_view_cancel (void *cls,
3366                            const struct GNUNET_MessageHeader *msg)
3367 {
3368   struct ClientContext *cli_ctx = cls;
3369
3370   (void) msg;
3371
3372   LOG (GNUNET_ERROR_TYPE_DEBUG,
3373        "Client does not want to receive updates of view any more.\n");
3374
3375   GNUNET_assert (NULL != cli_ctx);
3376   cli_ctx->view_updates_left = 0;
3377   GNUNET_SERVICE_client_continue (cli_ctx->client);
3378   if (GNUNET_YES == cli_ctx->stream_update)
3379   {
3380     destroy_cli_ctx (cli_ctx);
3381   }
3382 }
3383
3384
3385 /**
3386  * Handle RPS request for biased stream from the client.
3387  *
3388  * @param cls Client context
3389  * @param message unused
3390  */
3391 static void
3392 handle_client_stream_request (void *cls,
3393                               const struct
3394                               GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3395 {
3396   struct ClientContext *cli_ctx = cls;
3397
3398   (void) msg;
3399
3400   LOG (GNUNET_ERROR_TYPE_DEBUG,
3401        "Client requested peers from biased stream.\n");
3402   cli_ctx->stream_update = GNUNET_YES;
3403
3404   GNUNET_assert (NULL != cli_ctx);
3405   GNUNET_SERVICE_client_continue (cli_ctx->client);
3406 }
3407
3408
3409 /**
3410  * @brief Handles the cancellation of the stream of biased peer ids
3411  *
3412  * @param cls The client context
3413  * @param msg unused
3414  */
3415 static void
3416 handle_client_stream_cancel (void *cls,
3417                              const struct GNUNET_MessageHeader *msg)
3418 {
3419   struct ClientContext *cli_ctx = cls;
3420
3421   (void) msg;
3422
3423   LOG (GNUNET_ERROR_TYPE_DEBUG,
3424        "Client canceled receiving peers from biased stream.\n");
3425   cli_ctx->stream_update = GNUNET_NO;
3426
3427   GNUNET_assert (NULL != cli_ctx);
3428   GNUNET_SERVICE_client_continue (cli_ctx->client);
3429 }
3430
3431
3432 /**
3433  * @brief Create and start a Sub.
3434  *
3435  * @param cls Closure - unused
3436  * @param msg Message containing the necessary information
3437  */
3438 static void
3439 handle_client_start_sub (void *cls,
3440                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3441 {
3442   struct ClientContext *cli_ctx = cls;
3443
3444   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3445   if ((NULL != cli_ctx->sub)&&
3446       (0 != memcmp (&cli_ctx->sub->hash,
3447                     &msg->hash,
3448                     sizeof(struct GNUNET_HashCode))) )
3449   {
3450     LOG (GNUNET_ERROR_TYPE_WARNING,
3451          "Already have a Sub with different share for this client. Remove old one, add new.\n");
3452     destroy_sub (cli_ctx->sub);
3453     cli_ctx->sub = NULL;
3454   }
3455   cli_ctx->sub = new_sub (&msg->hash,
3456                           msub->sampler_size_est_min, // TODO make api input?
3457                           GNUNET_TIME_relative_ntoh (msg->round_interval));
3458   GNUNET_SERVICE_client_continue (cli_ctx->client);
3459 }
3460
3461
3462 /**
3463  * @brief Destroy the Sub
3464  *
3465  * @param cls Closure - unused
3466  * @param msg Message containing the hash that identifies the Sub
3467  */
3468 static void
3469 handle_client_stop_sub (void *cls,
3470                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3471 {
3472   struct ClientContext *cli_ctx = cls;
3473
3474   GNUNET_assert (NULL != cli_ctx->sub);
3475   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof(struct
3476                                                            GNUNET_HashCode)))
3477   {
3478     LOG (GNUNET_ERROR_TYPE_WARNING,
3479          "Share of current sub and request differ!\n");
3480   }
3481   destroy_sub (cli_ctx->sub);
3482   cli_ctx->sub = NULL;
3483   GNUNET_SERVICE_client_continue (cli_ctx->client);
3484 }
3485
3486
3487 /**
3488  * Handle a CHECK_LIVE message from another peer.
3489  *
3490  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3491  * the channel is blocked for all other communication.
3492  *
3493  * @param cls Closure - Context of channel
3494  * @param msg Message - unused
3495  */
3496 static void
3497 handle_peer_check (void *cls,
3498                    const struct GNUNET_MessageHeader *msg)
3499 {
3500   const struct ChannelCtx *channel_ctx = cls;
3501   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3502
3503   (void) msg;
3504
3505   LOG (GNUNET_ERROR_TYPE_DEBUG,
3506        "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3507   if (channel_ctx->peer_ctx->sub == msub)
3508   {
3509     GNUNET_STATISTICS_update (stats,
3510                               "# pending online checks",
3511                               -1,
3512                               GNUNET_NO);
3513   }
3514
3515   GNUNET_CADET_receive_done (channel_ctx->channel);
3516 }
3517
3518
3519 /**
3520  * Handle a PUSH message from another peer.
3521  *
3522  * Check the proof of work and store the PeerID
3523  * in the temporary list for pushed PeerIDs.
3524  *
3525  * @param cls Closure - Context of channel
3526  * @param msg Message - unused
3527  */
3528 static void
3529 handle_peer_push (void *cls,
3530                   const struct GNUNET_MessageHeader *msg)
3531 {
3532   const struct ChannelCtx *channel_ctx = cls;
3533   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3534
3535   (void) msg;
3536
3537   // (check the proof of work (?))
3538
3539   LOG (GNUNET_ERROR_TYPE_DEBUG,
3540        "Received PUSH (%s)\n",
3541        GNUNET_i2s (peer));
3542   if (channel_ctx->peer_ctx->sub == msub)
3543   {
3544     GNUNET_STATISTICS_update (stats, "# push message received", 1, GNUNET_NO);
3545     if ((NULL != map_single_hop) &&
3546         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3547                                                               peer)))
3548     {
3549       GNUNET_STATISTICS_update (stats,
3550                                 "# push message received (multi-hop peer)",
3551                                 1,
3552                                 GNUNET_NO);
3553     }
3554   }
3555
3556   #if ENABLE_MALICIOUS
3557   struct AttackedPeer *tmp_att_peer;
3558
3559   if ((1 == mal_type) ||
3560       (3 == mal_type))
3561   {   /* Try to maximise representation */
3562     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3563     tmp_att_peer->peer_id = *peer;
3564     if (NULL == att_peer_set)
3565       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3566     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3567                                                              peer))
3568     {
3569       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3570                                    att_peers_tail,
3571                                    tmp_att_peer);
3572       add_peer_array_to_set (peer, 1, att_peer_set);
3573     }
3574     else
3575     {
3576       GNUNET_free (tmp_att_peer);
3577     }
3578   }
3579
3580
3581   else if (2 == mal_type)
3582   {
3583     /* We attack one single well-known peer - simply ignore */
3584   }
3585   #endif /* ENABLE_MALICIOUS */
3586
3587   /* Add the sending peer to the push_map */
3588   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3589
3590   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3591                                      &channel_ctx->peer_ctx->peer_id));
3592   GNUNET_CADET_receive_done (channel_ctx->channel);
3593 }
3594
3595
3596 /**
3597  * Handle PULL REQUEST request message from another peer.
3598  *
3599  * Reply with the view of PeerIDs.
3600  *
3601  * @param cls Closure - Context of channel
3602  * @param msg Message - unused
3603  */
3604 static void
3605 handle_peer_pull_request (void *cls,
3606                           const struct GNUNET_MessageHeader *msg)
3607 {
3608   const struct ChannelCtx *channel_ctx = cls;
3609   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3610   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3611   const struct GNUNET_PeerIdentity *view_array;
3612
3613   (void) msg;
3614
3615   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (
3616          peer));
3617   if (peer_ctx->sub == msub)
3618   {
3619     GNUNET_STATISTICS_update (stats,
3620                               "# pull request message received",
3621                               1,
3622                               GNUNET_NO);
3623     if ((NULL != map_single_hop) &&
3624         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3625                                                               &peer_ctx->peer_id)))
3626     {
3627       GNUNET_STATISTICS_update (stats,
3628                                 "# pull request message received (multi-hop peer)",
3629                                 1,
3630                                 GNUNET_NO);
3631     }
3632   }
3633
3634   #if ENABLE_MALICIOUS
3635   if ((1 == mal_type)
3636       ||(3 == mal_type))
3637   {   /* Try to maximise representation */
3638     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3639   }
3640
3641   else if (2 == mal_type)
3642   {   /* Try to partition network */
3643     if (0 == GNUNET_memcmp (&attacked_peer, peer))
3644     {
3645       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3646     }
3647   }
3648   #endif /* ENABLE_MALICIOUS */
3649
3650   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3651                                      &channel_ctx->peer_ctx->peer_id));
3652   GNUNET_CADET_receive_done (channel_ctx->channel);
3653   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3654   send_pull_reply (peer_ctx,
3655                    view_array,
3656                    View_size (channel_ctx->peer_ctx->sub->view));
3657 }
3658
3659
3660 /**
3661  * Check whether we sent a corresponding request and
3662  * whether this reply is the first one.
3663  *
3664  * @param cls Closure - Context of channel
3665  * @param msg Message containing the replied peers
3666  */
3667 static int
3668 check_peer_pull_reply (void *cls,
3669                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3670 {
3671   struct ChannelCtx *channel_ctx = cls;
3672   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3673
3674   if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3675   {
3676     GNUNET_break_op (0);
3677     return GNUNET_SYSERR;
3678   }
3679
3680   if ((ntohs (msg->header.size) - sizeof(struct
3681                                          GNUNET_RPS_P2P_PullReplyMessage))
3682       / sizeof(struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3683   {
3684     LOG (GNUNET_ERROR_TYPE_ERROR,
3685          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3686          ntohl (msg->num_peers),
3687          (ntohs (msg->header.size) - sizeof(struct
3688                                             GNUNET_RPS_P2P_PullReplyMessage))
3689          / sizeof(struct GNUNET_PeerIdentity));
3690     GNUNET_break_op (0);
3691     return GNUNET_SYSERR;
3692   }
3693
3694   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3695                                      &sender_ctx->peer_id,
3696                                      Peers_PULL_REPLY_PENDING))
3697   {
3698     LOG (GNUNET_ERROR_TYPE_WARNING,
3699          "Received a pull reply from a peer (%s) we didn't request one from!\n",
3700          GNUNET_i2s (&sender_ctx->peer_id));
3701     if (sender_ctx->sub == msub)
3702     {
3703       GNUNET_STATISTICS_update (stats,
3704                                 "# unrequested pull replies",
3705                                 1,
3706                                 GNUNET_NO);
3707     }
3708   }
3709   return GNUNET_OK;
3710 }
3711
3712
3713 /**
3714  * Handle PULL REPLY message from another peer.
3715  *
3716  * @param cls Closure
3717  * @param msg The message header
3718  */
3719 static void
3720 handle_peer_pull_reply (void *cls,
3721                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3722 {
3723   const struct ChannelCtx *channel_ctx = cls;
3724   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3725   const struct GNUNET_PeerIdentity *peers;
3726   struct Sub *sub = channel_ctx->peer_ctx->sub;
3727   uint32_t i;
3728
3729 #if ENABLE_MALICIOUS
3730   struct AttackedPeer *tmp_att_peer;
3731 #endif /* ENABLE_MALICIOUS */
3732
3733   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3734   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (
3735          sender));
3736   if (channel_ctx->peer_ctx->sub == msub)
3737   {
3738     GNUNET_STATISTICS_update (stats,
3739                               "# pull reply messages received",
3740                               1,
3741                               GNUNET_NO);
3742     if ((NULL != map_single_hop) &&
3743         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3744                                                               &channel_ctx->
3745                                                               peer_ctx->peer_id)) )
3746     {
3747       GNUNET_STATISTICS_update (stats,
3748                                 "# pull reply messages received (multi-hop peer)",
3749                                 1,
3750                                 GNUNET_NO);
3751     }
3752   }
3753
3754   #if ENABLE_MALICIOUS
3755   // We shouldn't even receive pull replies as we're not sending
3756   if (2 == mal_type)
3757   {
3758   }
3759   #endif /* ENABLE_MALICIOUS */
3760
3761   /* Do actual logic */
3762   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3763
3764   LOG (GNUNET_ERROR_TYPE_DEBUG,
3765        "PULL REPLY received, got following %u peers:\n",
3766        ntohl (msg->num_peers));
3767
3768   for (i = 0; i < ntohl (msg->num_peers); i++)
3769   {
3770     LOG (GNUNET_ERROR_TYPE_DEBUG,
3771          "%u. %s\n",
3772          i,
3773          GNUNET_i2s (&peers[i]));
3774
3775     #if ENABLE_MALICIOUS
3776     if ((NULL != att_peer_set) &&
3777         ((1 == mal_type) ||(3 == mal_type) ))
3778     {     /* Add attacked peer to local list */
3779           // TODO check if we sent a request and this was the first reply
3780       if ((GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3781                                                                 &peers[i]))
3782           &&(GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3783                                                                   &peers[i])) )
3784       {
3785         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3786         tmp_att_peer->peer_id = peers[i];
3787         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3788                                      att_peers_tail,
3789                                      tmp_att_peer);
3790         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3791       }
3792       continue;
3793     }
3794     #endif /* ENABLE_MALICIOUS */
3795     /* Make sure we 'know' about this peer */
3796     (void) insert_peer (channel_ctx->peer_ctx->sub,
3797                         &peers[i]);
3798
3799     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3800                                         &peers[i]))
3801     {
3802       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3803                          &peers[i]);
3804     }
3805     else
3806     {
3807       schedule_operation (channel_ctx->peer_ctx,
3808                           insert_in_pull_map,
3809                           channel_ctx->peer_ctx->sub);    /* cls */
3810       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3811                                       &peers[i]);
3812     }
3813   }
3814
3815   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3816                                  sender),
3817                    Peers_PULL_REPLY_PENDING);
3818   clean_peer (channel_ctx->peer_ctx->sub,
3819               sender);
3820
3821   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3822                                      sender));
3823   GNUNET_CADET_receive_done (channel_ctx->channel);
3824 }
3825
3826
3827 /**
3828  * Compute a random delay.
3829  * A uniformly distributed value between mean + spread and mean - spread.
3830  *
3831  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3832  * It would return a random value between 2 and 6 min.
3833  *
3834  * @param mean the mean time until the next round
3835  * @param spread the inverse amount of deviation from the mean
3836  */
3837 static struct GNUNET_TIME_Relative
3838 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3839                     unsigned int spread)
3840 {
3841   struct GNUNET_TIME_Relative half_interval;
3842   struct GNUNET_TIME_Relative ret;
3843   unsigned int rand_delay;
3844   unsigned int max_rand_delay;
3845
3846   if (0 == spread)
3847   {
3848     LOG (GNUNET_ERROR_TYPE_WARNING,
3849          "Not accepting spread of 0\n");
3850     GNUNET_break (0);
3851     GNUNET_assert (0);
3852   }
3853   GNUNET_assert (0 != mean.rel_value_us);
3854
3855   /* Compute random time value between spread * mean and spread * mean */
3856   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3857
3858   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us
3859                    / mean.rel_value_us * (2 / spread);
3860   /**
3861    * Compute random value between (0 and 1) * round_interval
3862    * via multiplying round_interval with a 'fraction' (0 to value)/value
3863    */
3864   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3865                                          max_rand_delay);
3866   ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3867   ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3868   ret = GNUNET_TIME_relative_add (ret, half_interval);
3869
3870   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3871     LOG (GNUNET_ERROR_TYPE_WARNING,
3872          "Returning FOREVER_REL\n");
3873
3874   return ret;
3875 }
3876
3877
3878 /**
3879  * Send single pull request
3880  *
3881  * @param peer_ctx Context to the peer to send request to
3882  */
3883 static void
3884 send_pull_request (struct PeerContext *peer_ctx)
3885 {
3886   struct GNUNET_MQ_Envelope *ev;
3887
3888   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3889                                                &peer_ctx->peer_id,
3890                                                Peers_PULL_REPLY_PENDING));
3891   SET_PEER_FLAG (peer_ctx,
3892                  Peers_PULL_REPLY_PENDING);
3893   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3894
3895   LOG (GNUNET_ERROR_TYPE_DEBUG,
3896        "Going to send PULL REQUEST to peer %s.\n",
3897        GNUNET_i2s (&peer_ctx->peer_id));
3898
3899   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3900   send_message (peer_ctx,
3901                 ev,
3902                 "PULL REQUEST");
3903   if (peer_ctx->sub)
3904   {
3905     GNUNET_STATISTICS_update (stats,
3906                               "# pull request send issued",
3907                               1,
3908                               GNUNET_NO);
3909     if ((NULL != map_single_hop) &&
3910         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3911                                                               &peer_ctx->peer_id)))
3912     {
3913       GNUNET_STATISTICS_update (stats,
3914                                 "# pull request send issued (multi-hop peer)",
3915                                 1,
3916                                 GNUNET_NO);
3917     }
3918   }
3919 }
3920
3921
3922 /**
3923  * Send single push
3924  *
3925  * @param peer_ctx Context of peer to send push to
3926  */
3927 static void
3928 send_push (struct PeerContext *peer_ctx)
3929 {
3930   struct GNUNET_MQ_Envelope *ev;
3931
3932   LOG (GNUNET_ERROR_TYPE_DEBUG,
3933        "Going to send PUSH to peer %s.\n",
3934        GNUNET_i2s (&peer_ctx->peer_id));
3935
3936   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3937   send_message (peer_ctx, ev, "PUSH");
3938   if (peer_ctx->sub)
3939   {
3940     GNUNET_STATISTICS_update (stats,
3941                               "# push send issued",
3942                               1,
3943                               GNUNET_NO);
3944     if ((NULL != map_single_hop) &&
3945         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3946                                                               &peer_ctx->peer_id)))
3947     {
3948       GNUNET_STATISTICS_update (stats,
3949                                 "# push send issued (multi-hop peer)",
3950                                 1,
3951                                 GNUNET_NO);
3952     }
3953   }
3954 }
3955
3956
3957 #if ENABLE_MALICIOUS
3958
3959
3960 /**
3961  * @brief This function is called, when the client tells us to act malicious.
3962  * It verifies that @a msg is well-formed.
3963  *
3964  * @param cls the closure (#ClientContext)
3965  * @param msg the message
3966  * @return #GNUNET_OK if @a msg is well-formed
3967  */
3968 static int
3969 check_client_act_malicious (void *cls,
3970                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3971 {
3972   struct ClientContext *cli_ctx = cls;
3973   uint16_t msize = ntohs (msg->header.size);
3974   uint32_t num_peers = ntohl (msg->num_peers);
3975
3976   msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
3977   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3978       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3979   {
3980     LOG (GNUNET_ERROR_TYPE_ERROR,
3981          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3982          ntohl (msg->num_peers),
3983          (msize / sizeof(struct GNUNET_PeerIdentity)));
3984     GNUNET_break (0);
3985     GNUNET_SERVICE_client_drop (cli_ctx->client);
3986     return GNUNET_SYSERR;
3987   }
3988   return GNUNET_OK;
3989 }
3990
3991 /**
3992  * Turn RPS service to act malicious.
3993  *
3994  * @param cls Closure
3995  * @param client The client that sent the message
3996  * @param msg The message header
3997  */
3998 static void
3999 handle_client_act_malicious (void *cls,
4000                              const struct
4001                              GNUNET_RPS_CS_ActMaliciousMessage *msg)
4002 {
4003   struct ClientContext *cli_ctx = cls;
4004   struct GNUNET_PeerIdentity *peers;
4005   uint32_t num_mal_peers_sent;
4006   uint32_t num_mal_peers_old;
4007   struct Sub *sub = cli_ctx->sub;
4008
4009   if (NULL == sub)
4010     sub = msub;
4011   /* Do actual logic */
4012   peers = (struct GNUNET_PeerIdentity *) &msg[1];
4013   mal_type = ntohl (msg->type);
4014   if (NULL == mal_peer_set)
4015     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4016
4017   LOG (GNUNET_ERROR_TYPE_DEBUG,
4018        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
4019        mal_type,
4020        ntohl (msg->num_peers));
4021
4022   if (1 == mal_type)
4023   {   /* Try to maximise representation */
4024       /* Add other malicious peers to those we already know */
4025
4026     num_mal_peers_sent = ntohl (msg->num_peers);
4027     num_mal_peers_old = num_mal_peers;
4028     GNUNET_array_grow (mal_peers,
4029                        num_mal_peers,
4030                        num_mal_peers + num_mal_peers_sent);
4031     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4032                    peers,
4033                    num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4034
4035     /* Add all mal peers to mal_peer_set */
4036     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4037                            num_mal_peers_sent,
4038                            mal_peer_set);
4039
4040     /* Substitute do_round () with do_mal_round () */
4041     GNUNET_assert (NULL != sub->do_round_task);
4042     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4043     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4044   }
4045
4046   else if ((2 == mal_type) ||
4047            (3 == mal_type))
4048   {   /* Try to partition the network */
4049       /* Add other malicious peers to those we already know */
4050
4051     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4052     num_mal_peers_old = num_mal_peers;
4053     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4054     GNUNET_array_grow (mal_peers,
4055                        num_mal_peers,
4056                        num_mal_peers + num_mal_peers_sent);
4057     if ((NULL != mal_peers) &&
4058         (0 != num_mal_peers) )
4059     {
4060       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4061                      peers,
4062                      num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4063
4064       /* Add all mal peers to mal_peer_set */
4065       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4066                              num_mal_peers_sent,
4067                              mal_peer_set);
4068     }
4069
4070     /* Store the one attacked peer */
4071     GNUNET_memcpy (&attacked_peer,
4072                    &msg->attacked_peer,
4073                    sizeof(struct GNUNET_PeerIdentity));
4074     /* Set the flag of the attacked peer to valid to avoid problems */
4075     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4076     {
4077       (void) issue_peer_online_check (sub, &attacked_peer);
4078     }
4079
4080     LOG (GNUNET_ERROR_TYPE_DEBUG,
4081          "Attacked peer is %s\n",
4082          GNUNET_i2s (&attacked_peer));
4083
4084     /* Substitute do_round () with do_mal_round () */
4085     if (NULL != sub->do_round_task)
4086     {
4087       /* Probably in shutdown */
4088       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4089       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4090     }
4091   }
4092   else if (0 == mal_type)
4093   {   /* Stop acting malicious */
4094     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4095
4096     /* Substitute do_mal_round () with do_round () */
4097     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4098     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4099   }
4100   else
4101   {
4102     GNUNET_break (0);
4103     GNUNET_SERVICE_client_continue (cli_ctx->client);
4104   }
4105   GNUNET_SERVICE_client_continue (cli_ctx->client);
4106 }
4107
4108
4109 /**
4110  * Send out PUSHes and PULLs maliciously.
4111  *
4112  * This is executed regylary.
4113  *
4114  * @param cls Closure - Sub
4115  */
4116 static void
4117 do_mal_round (void *cls)
4118 {
4119   uint32_t num_pushes;
4120   uint32_t i;
4121   struct GNUNET_TIME_Relative time_next_round;
4122   struct AttackedPeer *tmp_att_peer;
4123   struct Sub *sub = cls;
4124
4125   LOG (GNUNET_ERROR_TYPE_DEBUG,
4126        "Going to execute next round maliciously type %" PRIu32 ".\n",
4127        mal_type);
4128   sub->do_round_task = NULL;
4129   GNUNET_assert (mal_type <= 3);
4130   /* Do malicious actions */
4131   if (1 == mal_type)
4132   {   /* Try to maximise representation */
4133       /* The maximum of pushes we're going to send this round */
4134     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4135                                          num_attacked_peers),
4136                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4137
4138     LOG (GNUNET_ERROR_TYPE_DEBUG,
4139          "Going to send %" PRIu32 " pushes\n",
4140          num_pushes);
4141
4142     /* Send PUSHes to attacked peers */
4143     for (i = 0; i < num_pushes; i++)
4144     {
4145       if (att_peers_tail == att_peer_index)
4146         att_peer_index = att_peers_head;
4147       else
4148         att_peer_index = att_peer_index->next;
4149
4150       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4151     }
4152
4153     /* Send PULLs to some peers to learn about additional peers to attack */
4154     tmp_att_peer = att_peer_index;
4155     for (i = 0; i < num_pushes * alpha; i++)
4156     {
4157       if (att_peers_tail == tmp_att_peer)
4158         tmp_att_peer = att_peers_head;
4159       else
4160         att_peer_index = tmp_att_peer->next;
4161
4162       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4163     }
4164   }
4165
4166
4167   else if (2 == mal_type)
4168   {   /**
4169        * Try to partition the network
4170        * Send as many pushes to the attacked peer as possible
4171        * That is one push per round as it will ignore more.
4172        */
4173     (void) issue_peer_online_check (sub, &attacked_peer);
4174     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4175                                        &attacked_peer,
4176                                        Peers_ONLINE))
4177       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4178   }
4179
4180
4181   if (3 == mal_type)
4182   {   /* Combined attack */
4183       /* Send PUSH to attacked peers */
4184     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4185     {
4186       (void) issue_peer_online_check (sub, &attacked_peer);
4187       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4188                                          &attacked_peer,
4189                                          Peers_ONLINE))
4190       {
4191         LOG (GNUNET_ERROR_TYPE_DEBUG,
4192              "Goding to send push to attacked peer (%s)\n",
4193              GNUNET_i2s (&attacked_peer));
4194         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4195       }
4196     }
4197     (void) issue_peer_online_check (sub, &attacked_peer);
4198
4199     /* The maximum of pushes we're going to send this round */
4200     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4201                                          num_attacked_peers),
4202                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4203
4204     LOG (GNUNET_ERROR_TYPE_DEBUG,
4205          "Going to send %" PRIu32 " pushes\n",
4206          num_pushes);
4207
4208     for (i = 0; i < num_pushes; i++)
4209     {
4210       if (att_peers_tail == att_peer_index)
4211         att_peer_index = att_peers_head;
4212       else
4213         att_peer_index = att_peer_index->next;
4214
4215       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4216     }
4217
4218     /* Send PULLs to some peers to learn about additional peers to attack */
4219     tmp_att_peer = att_peer_index;
4220     for (i = 0; i < num_pushes * alpha; i++)
4221     {
4222       if (att_peers_tail == tmp_att_peer)
4223         tmp_att_peer = att_peers_head;
4224       else
4225         att_peer_index = tmp_att_peer->next;
4226
4227       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4228     }
4229   }
4230
4231   /* Schedule next round */
4232   time_next_round = compute_rand_delay (sub->round_interval, 2);
4233
4234   GNUNET_assert (NULL == sub->do_round_task);
4235   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4236                                                      &do_mal_round, sub);
4237   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4238 }
4239 #endif /* ENABLE_MALICIOUS */
4240
4241
4242 /**
4243  * Send out PUSHes and PULLs, possibly update #view, samplers.
4244  *
4245  * This is executed regylary.
4246  *
4247  * @param cls Closure - Sub
4248  */
4249 static void
4250 do_round (void *cls)
4251 {
4252   unsigned int i;
4253   const struct GNUNET_PeerIdentity *view_array;
4254   unsigned int *permut;
4255   unsigned int a_peers; /* Number of peers we send pushes to */
4256   unsigned int b_peers; /* Number of peers we send pull requests to */
4257   uint32_t first_border;
4258   uint32_t second_border;
4259   struct GNUNET_PeerIdentity peer;
4260   struct GNUNET_PeerIdentity *update_peer;
4261   struct Sub *sub = cls;
4262
4263   sub->num_rounds++;
4264   LOG (GNUNET_ERROR_TYPE_DEBUG,
4265        "Going to execute next round.\n");
4266   if (sub == msub)
4267   {
4268     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4269   }
4270   sub->do_round_task = NULL;
4271 #ifdef TO_FILE_FULL
4272   to_file (sub->file_name_view_log,
4273            "___ new round ___");
4274 #endif /* TO_FILE_FULL */
4275   view_array = View_get_as_array (sub->view);
4276   for (i = 0; i < View_size (sub->view); i++)
4277   {
4278     LOG (GNUNET_ERROR_TYPE_DEBUG,
4279          "\t%s\n", GNUNET_i2s (&view_array[i]));
4280 #ifdef TO_FILE_FULL
4281     to_file (sub->file_name_view_log,
4282              "=%s\t(do round)",
4283              GNUNET_i2s_full (&view_array[i]));
4284 #endif /* TO_FILE_FULL */
4285   }
4286
4287
4288   /* Send pushes and pull requests */
4289   if (0 < View_size (sub->view))
4290   {
4291     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4292                                            View_size (sub->view));
4293
4294     /* Send PUSHes */
4295     a_peers = ceil (alpha * View_size (sub->view));
4296
4297     LOG (GNUNET_ERROR_TYPE_DEBUG,
4298          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4299          a_peers, alpha, View_size (sub->view));
4300     for (i = 0; i < a_peers; i++)
4301     {
4302       peer = view_array[permut[i]];
4303       // FIXME if this fails schedule/loop this for later
4304       send_push (get_peer_ctx (sub->peer_map, &peer));
4305     }
4306
4307     /* Send PULL requests */
4308     b_peers = ceil (beta * View_size (sub->view));
4309     first_border = a_peers;
4310     second_border = a_peers + b_peers;
4311     if (second_border > View_size (sub->view))
4312     {
4313       first_border = View_size (sub->view) - b_peers;
4314       second_border = View_size (sub->view);
4315     }
4316     LOG (GNUNET_ERROR_TYPE_DEBUG,
4317          "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4318          b_peers, beta, View_size (sub->view));
4319     for (i = first_border; i < second_border; i++)
4320     {
4321       peer = view_array[permut[i]];
4322       if (GNUNET_NO == check_peer_flag (sub->peer_map,
4323                                         &peer,
4324                                         Peers_PULL_REPLY_PENDING))
4325       {       // FIXME if this fails schedule/loop this for later
4326         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4327       }
4328     }
4329
4330     GNUNET_free (permut);
4331     permut = NULL;
4332   }
4333
4334
4335   /* Update view */
4336   /* TODO see how many peers are in push-/pull- list! */
4337
4338   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4339       (0 < CustomPeerMap_size (sub->push_map)) &&
4340       (0 < CustomPeerMap_size (sub->pull_map)))
4341   {   /* If conditions for update are fulfilled, update */
4342     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4343
4344     uint32_t final_size;
4345     uint32_t peers_to_clean_size;
4346     struct GNUNET_PeerIdentity *peers_to_clean;
4347
4348     peers_to_clean = NULL;
4349     peers_to_clean_size = 0;
4350     GNUNET_array_grow (peers_to_clean,
4351                        peers_to_clean_size,
4352                        View_size (sub->view));
4353     GNUNET_memcpy (peers_to_clean,
4354                    view_array,
4355                    View_size (sub->view) * sizeof(struct GNUNET_PeerIdentity));
4356
4357     /* Seems like recreating is the easiest way of emptying the peermap */
4358     View_clear (sub->view);
4359 #ifdef TO_FILE_FULL
4360     to_file (sub->file_name_view_log,
4361              "--- emptied ---");
4362 #endif /* TO_FILE_FULL */
4363
4364     first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4365                                CustomPeerMap_size (sub->push_map));
4366     second_border = first_border
4367                     + GNUNET_MIN (floor (beta * sub->view_size_est_need),
4368                                   CustomPeerMap_size (sub->pull_map));
4369     final_size = second_border
4370                  + ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4371     LOG (GNUNET_ERROR_TYPE_DEBUG,
4372          "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"
4373          PRIu32 "\n",
4374          first_border,
4375          second_border,
4376          final_size);
4377
4378     /* Update view with peers received through PUSHes */
4379     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4380                                            CustomPeerMap_size (sub->push_map));
4381     for (i = 0; i < first_border; i++)
4382     {
4383       int inserted;
4384       inserted = insert_in_view (sub,
4385                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4386                                                                   permut[i]));
4387       if (GNUNET_OK == inserted)
4388       {
4389         clients_notify_stream_peer (sub,
4390                                     1,
4391                                     CustomPeerMap_get_peer_by_index (
4392                                       sub->push_map, permut[i]));
4393       }
4394 #ifdef TO_FILE_FULL
4395       to_file (sub->file_name_view_log,
4396                "+%s\t(push list)",
4397                GNUNET_i2s_full (&view_array[i]));
4398 #endif /* TO_FILE_FULL */
4399        // TODO change the peer_flags accordingly
4400     }
4401     GNUNET_free (permut);
4402     permut = NULL;
4403
4404     /* Update view with peers received through PULLs */
4405     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4406                                            CustomPeerMap_size (sub->pull_map));
4407     for (i = first_border; i < second_border; i++)
4408     {
4409       int inserted;
4410       inserted = insert_in_view (sub,
4411                                  CustomPeerMap_get_peer_by_index (sub->pull_map,
4412                                                                   permut[i
4413                                                                          -
4414                                                                          first_border
4415                                                                   ]));
4416       if (GNUNET_OK == inserted)
4417       {
4418         clients_notify_stream_peer (sub,
4419                                     1,
4420                                     CustomPeerMap_get_peer_by_index (
4421                                       sub->pull_map,
4422                                       permut[i
4423                                              - first_border]));
4424       }
4425 #ifdef TO_FILE_FULL
4426       to_file (sub->file_name_view_log,
4427                "+%s\t(pull list)",
4428                GNUNET_i2s_full (&view_array[i]));
4429 #endif /* TO_FILE_FULL */
4430        // TODO change the peer_flags accordingly
4431     }
4432     GNUNET_free (permut);
4433     permut = NULL;
4434
4435     /* Update view with peers from history */
4436     RPS_sampler_get_n_rand_peers (sub->sampler,
4437                                   final_size - second_border,
4438                                   hist_update,
4439                                   sub);
4440     // TODO change the peer_flags accordingly
4441
4442     for (i = 0; i < View_size (sub->view); i++)
4443       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4444
4445     /* Clean peers that were removed from the view */
4446     for (i = 0; i < peers_to_clean_size; i++)
4447     {
4448 #ifdef TO_FILE_FULL
4449       to_file (sub->file_name_view_log,
4450                "-%s",
4451                GNUNET_i2s_full (&peers_to_clean[i]));
4452 #endif /* TO_FILE_FULL */
4453       clean_peer (sub, &peers_to_clean[i]);
4454     }
4455
4456     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4457     clients_notify_view_update (sub);
4458   }
4459   else
4460   {
4461     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4462     if (sub == msub)
4463     {
4464       GNUNET_STATISTICS_update (stats, "# rounds blocked", 1, GNUNET_NO);
4465       if ((CustomPeerMap_size (sub->push_map) > alpha
4466            * sub->view_size_est_need) &&
4467           ! (0 >= CustomPeerMap_size (sub->pull_map)))
4468         GNUNET_STATISTICS_update (stats, "# rounds blocked - too many pushes",
4469                                   1, GNUNET_NO);
4470       if ((CustomPeerMap_size (sub->push_map) > alpha
4471            * sub->view_size_est_need) &&
4472           (0 >= CustomPeerMap_size (sub->pull_map)))
4473         GNUNET_STATISTICS_update (stats,
4474                                   "# rounds blocked - too many pushes, no pull replies",
4475                                   1, GNUNET_NO);
4476       if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4477           ! (0 >= CustomPeerMap_size (sub->pull_map)))
4478         GNUNET_STATISTICS_update (stats, "# rounds blocked - no pushes", 1,
4479                                   GNUNET_NO);
4480       if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4481           (0 >= CustomPeerMap_size (sub->pull_map)))
4482         GNUNET_STATISTICS_update (stats,
4483                                   "# rounds blocked - no pushes, no pull replies",
4484                                   1, GNUNET_NO);
4485       if ((0 >= CustomPeerMap_size (sub->pull_map)) &&
4486           (CustomPeerMap_size (sub->push_map) > alpha
4487            * sub->view_size_est_need) &&
4488           (0 >= CustomPeerMap_size (sub->push_map)) )
4489         GNUNET_STATISTICS_update (stats, "# rounds blocked - no pull replies",
4490                                   1, GNUNET_NO);
4491     }
4492   }
4493   // TODO independent of that also get some peers from CADET_get_peers()?
4494   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4495   {
4496     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4497   }
4498   else
4499   {
4500     LOG (GNUNET_ERROR_TYPE_WARNING,
4501          "Push map size too big for histogram (%u, %u)\n",
4502          CustomPeerMap_size (sub->push_map),
4503          HISTOGRAM_FILE_SLOTS);
4504   }
4505   // FIXME check bounds of histogram
4506   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map)
4507                              - (alpha * sub->view_size_est_need))
4508                   + (HISTOGRAM_FILE_SLOTS / 2)]++;
4509   if (sub == msub)
4510   {
4511     GNUNET_STATISTICS_set (stats,
4512                            "# peers in push map at end of round",
4513                            CustomPeerMap_size (sub->push_map),
4514                            GNUNET_NO);
4515     GNUNET_STATISTICS_set (stats,
4516                            "# peers in pull map at end of round",
4517                            CustomPeerMap_size (sub->pull_map),
4518                            GNUNET_NO);
4519     GNUNET_STATISTICS_set (stats,
4520                            "# peers in view at end of round",
4521                            View_size (sub->view),
4522                            GNUNET_NO);
4523     GNUNET_STATISTICS_set (stats,
4524                            "# expected pushes",
4525                            alpha * sub->view_size_est_need,
4526                            GNUNET_NO);
4527     GNUNET_STATISTICS_set (stats,
4528                            "delta expected - received pushes",
4529                            CustomPeerMap_size (sub->push_map) - (alpha
4530                                                                  * sub->
4531                                                                  view_size_est_need),
4532                            GNUNET_NO);
4533   }
4534
4535   LOG (GNUNET_ERROR_TYPE_DEBUG,
4536        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4537        CustomPeerMap_size (sub->push_map),
4538        CustomPeerMap_size (sub->pull_map),
4539        alpha,
4540        View_size (sub->view),
4541        alpha * View_size (sub->view));
4542
4543   /* Update samplers */
4544   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4545   {
4546     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4547     LOG (GNUNET_ERROR_TYPE_DEBUG,
4548          "Updating with peer %s from push list\n",
4549          GNUNET_i2s (update_peer));
4550     insert_in_sampler (sub, update_peer);
4551     clean_peer (sub, update_peer);  /* This cleans only if it is not in the view */
4552   }
4553
4554   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4555   {
4556     LOG (GNUNET_ERROR_TYPE_DEBUG,
4557          "Updating with peer %s from pull list\n",
4558          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4559     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4560     /* This cleans only if it is not in the view */
4561     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4562   }
4563
4564
4565   /* Empty push/pull lists */
4566   CustomPeerMap_clear (sub->push_map);
4567   CustomPeerMap_clear (sub->pull_map);
4568
4569   if (sub == msub)
4570   {
4571     GNUNET_STATISTICS_set (stats,
4572                            "view size",
4573                            View_size (sub->view),
4574                            GNUNET_NO);
4575   }
4576
4577   struct GNUNET_TIME_Relative time_next_round;
4578
4579   time_next_round = compute_rand_delay (sub->round_interval, 2);
4580
4581   /* Schedule next round */
4582   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4583                                                      &do_round, sub);
4584   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4585 }
4586
4587
4588 /**
4589  * This is called from GNUNET_CADET_get_peers().
4590  *
4591  * It is called on every peer(ID) that cadet somehow has contact with.
4592  * We use those to initialise the sampler.
4593  *
4594  * implements #GNUNET_CADET_PeersCB
4595  *
4596  * @param cls Closure - Sub
4597  * @param peer Peer, or NULL on "EOF".
4598  * @param tunnel Do we have a tunnel towards this peer?
4599  * @param n_paths Number of known paths towards this peer.
4600  * @param best_path How long is the best path?
4601  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4602  */
4603 void
4604 init_peer_cb (void *cls,
4605               const struct GNUNET_PeerIdentity *peer,
4606               int tunnel, /* "Do we have a tunnel towards this peer?" */
4607               unsigned int n_paths, /* "Number of known paths towards this peer" */
4608               unsigned int best_path) /* "How long is the best path?
4609                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4610 {
4611   struct Sub *sub = cls;
4612
4613   (void) tunnel;
4614   (void) n_paths;
4615   (void) best_path;
4616
4617   if (NULL != peer)
4618   {
4619     LOG (GNUNET_ERROR_TYPE_DEBUG,
4620          "Got peer_id %s from cadet\n",
4621          GNUNET_i2s (peer));
4622     got_peer (sub, peer);
4623   }
4624 }
4625
4626
4627 /**
4628  * @brief Iterator function over stored, valid peers.
4629  *
4630  * We initialise the sampler with those.
4631  *
4632  * @param cls Closure - Sub
4633  * @param peer the peer id
4634  * @return #GNUNET_YES if we should continue to
4635  *         iterate,
4636  *         #GNUNET_NO if not.
4637  */
4638 static int
4639 valid_peers_iterator (void *cls,
4640                       const struct GNUNET_PeerIdentity *peer)
4641 {
4642   struct Sub *sub = cls;
4643
4644   if (NULL != peer)
4645   {
4646     LOG (GNUNET_ERROR_TYPE_DEBUG,
4647          "Got stored, valid peer %s\n",
4648          GNUNET_i2s (peer));
4649     got_peer (sub, peer);
4650   }
4651   return GNUNET_YES;
4652 }
4653
4654
4655 /**
4656  * Iterator over peers from peerinfo.
4657  *
4658  * @param cls Closure - Sub
4659  * @param peer id of the peer, NULL for last call
4660  * @param hello hello message for the peer (can be NULL)
4661  * @param error message
4662  */
4663 void
4664 process_peerinfo_peers (void *cls,
4665                         const struct GNUNET_PeerIdentity *peer,
4666                         const struct GNUNET_HELLO_Message *hello,
4667                         const char *err_msg)
4668 {
4669   struct Sub *sub = cls;
4670
4671   (void) hello;
4672   (void) err_msg;
4673
4674   if (NULL != peer)
4675   {
4676     LOG (GNUNET_ERROR_TYPE_DEBUG,
4677          "Got peer_id %s from peerinfo\n",
4678          GNUNET_i2s (peer));
4679     got_peer (sub, peer);
4680   }
4681 }
4682
4683
4684 /**
4685  * Task run during shutdown.
4686  *
4687  * @param cls Closure - unused
4688  */
4689 static void
4690 shutdown_task (void *cls)
4691 {
4692   (void) cls;
4693   struct ClientContext *client_ctx;
4694
4695   LOG (GNUNET_ERROR_TYPE_DEBUG,
4696        "RPS service is going down\n");
4697
4698   /* Clean all clients */
4699   for (client_ctx = cli_ctx_head;
4700        NULL != cli_ctx_head;
4701        client_ctx = cli_ctx_head)
4702   {
4703     destroy_cli_ctx (client_ctx);
4704   }
4705   if (NULL != msub)
4706   {
4707     destroy_sub (msub);
4708     msub = NULL;
4709   }
4710
4711   /* Disconnect from other services */
4712   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4713   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4714   peerinfo_handle = NULL;
4715   GNUNET_NSE_disconnect (nse);
4716   if (NULL != map_single_hop)
4717   {
4718     /* core_init was called - core was initialised */
4719     /* disconnect first, so no callback tries to access missing peermap */
4720     GNUNET_CORE_disconnect (core_handle);
4721     core_handle = NULL;
4722     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4723     map_single_hop = NULL;
4724   }
4725
4726   if (NULL != stats)
4727   {
4728     GNUNET_STATISTICS_destroy (stats,
4729                                GNUNET_NO);
4730     stats = NULL;
4731   }
4732   GNUNET_CADET_disconnect (cadet_handle);
4733   cadet_handle = NULL;
4734 #if ENABLE_MALICIOUS
4735   struct AttackedPeer *tmp_att_peer;
4736   GNUNET_array_grow (mal_peers,
4737                      num_mal_peers,
4738                      0);
4739   if (NULL != mal_peer_set)
4740     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4741   if (NULL != att_peer_set)
4742     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4743   while (NULL != att_peers_head)
4744   {
4745     tmp_att_peer = att_peers_head;
4746     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4747                                  att_peers_tail,
4748                                  tmp_att_peer);
4749     GNUNET_free (tmp_att_peer);
4750   }
4751 #endif /* ENABLE_MALICIOUS */
4752   close_all_files ();
4753 }
4754
4755
4756 /**
4757  * Handle client connecting to the service.
4758  *
4759  * @param cls unused
4760  * @param client the new client
4761  * @param mq the message queue of @a client
4762  * @return @a client
4763  */
4764 static void *
4765 client_connect_cb (void *cls,
4766                    struct GNUNET_SERVICE_Client *client,
4767                    struct GNUNET_MQ_Handle *mq)
4768 {
4769   struct ClientContext *cli_ctx;
4770
4771   (void) cls;
4772
4773   LOG (GNUNET_ERROR_TYPE_DEBUG,
4774        "Client connected\n");
4775   if (NULL == client)
4776     return client; /* Server was destroyed before a client connected. Shutting down */
4777   cli_ctx = GNUNET_new (struct ClientContext);
4778   cli_ctx->mq = mq;
4779   cli_ctx->view_updates_left = -1;
4780   cli_ctx->stream_update = GNUNET_NO;
4781   cli_ctx->client = client;
4782   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4783                                cli_ctx_tail,
4784                                cli_ctx);
4785   return cli_ctx;
4786 }
4787
4788 /**
4789  * Callback called when a client disconnected from the service
4790  *
4791  * @param cls closure for the service
4792  * @param c the client that disconnected
4793  * @param internal_cls should be equal to @a c
4794  */
4795 static void
4796 client_disconnect_cb (void *cls,
4797                       struct GNUNET_SERVICE_Client *client,
4798                       void *internal_cls)
4799 {
4800   struct ClientContext *cli_ctx = internal_cls;
4801
4802   (void) cls;
4803   GNUNET_assert (client == cli_ctx->client);
4804   if (NULL == client)
4805   {  /* shutdown task - destroy all clients */
4806     while (NULL != cli_ctx_head)
4807       destroy_cli_ctx (cli_ctx_head);
4808   }
4809   else
4810   {   /* destroy this client */
4811     LOG (GNUNET_ERROR_TYPE_DEBUG,
4812          "Client disconnected. Destroy its context.\n");
4813     destroy_cli_ctx (cli_ctx);
4814   }
4815 }
4816
4817
4818 /**
4819  * Handle random peer sampling clients.
4820  *
4821  * @param cls closure
4822  * @param c configuration to use
4823  * @param service the initialized service
4824  */
4825 static void
4826 run (void *cls,
4827      const struct GNUNET_CONFIGURATION_Handle *c,
4828      struct GNUNET_SERVICE_Handle *service)
4829 {
4830   struct GNUNET_TIME_Relative round_interval;
4831   long long unsigned int sampler_size;
4832   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4833   struct GNUNET_HashCode hash;
4834
4835   (void) cls;
4836   (void) service;
4837
4838   GNUNET_log_setup ("rps",
4839                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4840                     NULL);
4841   cfg = c;
4842   /* Get own ID */
4843   GNUNET_CRYPTO_get_peer_identity (cfg,
4844                                    &own_identity); // TODO check return value
4845   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4846               "STARTING SERVICE (rps) for peer [%s]\n",
4847               GNUNET_i2s (&own_identity));
4848 #if ENABLE_MALICIOUS
4849   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4850               "Malicious execution compiled in.\n");
4851 #endif /* ENABLE_MALICIOUS */
4852
4853   /* Get time interval from the configuration */
4854   if (GNUNET_OK !=
4855       GNUNET_CONFIGURATION_get_value_time (cfg,
4856                                            "RPS",
4857                                            "ROUNDINTERVAL",
4858                                            &round_interval))
4859   {
4860     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4861                                "RPS", "ROUNDINTERVAL");
4862     GNUNET_SCHEDULER_shutdown ();
4863     return;
4864   }
4865
4866   /* Get initial size of sampler/view from the configuration */
4867   if (GNUNET_OK !=
4868       GNUNET_CONFIGURATION_get_value_number (cfg,
4869                                              "RPS",
4870                                              "MINSIZE",
4871                                              &sampler_size))
4872   {
4873     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4874                                "RPS", "MINSIZE");
4875     GNUNET_SCHEDULER_shutdown ();
4876     return;
4877   }
4878
4879   cadet_handle = GNUNET_CADET_connect (cfg);
4880   GNUNET_assert (NULL != cadet_handle);
4881   core_handle = GNUNET_CORE_connect (cfg,
4882                                      NULL, /* cls */
4883                                      core_init, /* init */
4884                                      core_connects, /* connects */
4885                                      core_disconnects, /* disconnects */
4886                                      NULL); /* handlers */
4887   GNUNET_assert (NULL != core_handle);
4888
4889
4890   alpha = 0.45;
4891   beta = 0.45;
4892
4893
4894   /* Set up main Sub */
4895   GNUNET_CRYPTO_hash (hash_port_string,
4896                       strlen (hash_port_string),
4897                       &hash);
4898   msub = new_sub (&hash,
4899                   sampler_size, /* Will be overwritten by config */
4900                   round_interval);
4901
4902
4903   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4904
4905   /* connect to NSE */
4906   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4907
4908   // LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4909   // GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4910   // TODO send push/pull to each of those peers?
4911   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4912   restore_valid_peers (msub);
4913   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4914
4915   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4916                                                    GNUNET_NO,
4917                                                    process_peerinfo_peers,
4918                                                    msub);
4919
4920   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4921
4922   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4923   stats = GNUNET_STATISTICS_create ("rps", cfg);
4924 }
4925
4926
4927 /**
4928  * Define "main" method using service macro.
4929  */
4930 GNUNET_SERVICE_MAIN
4931   ("rps",
4932   GNUNET_SERVICE_OPTION_NONE,
4933   &run,
4934   &client_connect_cb,
4935   &client_disconnect_cb,
4936   NULL,
4937   GNUNET_MQ_hd_var_size (client_seed,
4938                          GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4939                          struct GNUNET_RPS_CS_SeedMessage,
4940                          NULL),
4941 #if ENABLE_MALICIOUS
4942   GNUNET_MQ_hd_var_size (client_act_malicious,
4943                          GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4944                          struct GNUNET_RPS_CS_ActMaliciousMessage,
4945                          NULL),
4946 #endif /* ENABLE_MALICIOUS */
4947   GNUNET_MQ_hd_fixed_size (client_view_request,
4948                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4949                            struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4950                            NULL),
4951   GNUNET_MQ_hd_fixed_size (client_view_cancel,
4952                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4953                            struct GNUNET_MessageHeader,
4954                            NULL),
4955   GNUNET_MQ_hd_fixed_size (client_stream_request,
4956                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4957                            struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4958                            NULL),
4959   GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4960                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4961                            struct GNUNET_MessageHeader,
4962                            NULL),
4963   GNUNET_MQ_hd_fixed_size (client_start_sub,
4964                            GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4965                            struct GNUNET_RPS_CS_SubStartMessage,
4966                            NULL),
4967   GNUNET_MQ_hd_fixed_size (client_stop_sub,
4968                            GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4969                            struct GNUNET_RPS_CS_SubStopMessage,
4970                            NULL),
4971   GNUNET_MQ_handler_end ());
4972
4973 /* end of gnunet-service-rps.c */