32f3103ed5e2fba0132bce135e9a6b7dcc74765f
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask) \
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask) \
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp {
96   /**
97    * Callback
98    */
99   PeerOp op;
100
101   /**
102    * Closure
103    */
104   void *op_cls;
105 };
106
107 /**
108  * List containing all messages that are yet to be send
109  *
110  * This is used to keep track of all messages that have not been sent yet. When
111  * a peer is to be removed the pending messages can be removed properly.
112  */
113 struct PendingMessage {
114   /**
115    * DLL next, prev
116    */
117   struct PendingMessage *next;
118   struct PendingMessage *prev;
119
120   /**
121    * The envelope to the corresponding message
122    */
123   struct GNUNET_MQ_Envelope *ev;
124
125   /**
126    * The corresponding context
127    */
128   struct PeerContext *peer_ctx;
129
130   /**
131    * The message type
132    */
133   const char *type;
134 };
135
136 /**
137  * @brief Context for a channel
138  */
139 struct ChannelCtx;
140
141 /**
142  * Struct used to keep track of other peer's status
143  *
144  * This is stored in a multipeermap.
145  * It contains information such as cadet channels, a message queue for sending,
146  * status about the channels, the pending operations on this peer and some flags
147  * about the status of the peer itself. (online, valid, ...)
148  */
149 struct PeerContext {
150   /**
151    * The Sub this context belongs to.
152    */
153   struct Sub *sub;
154
155   /**
156    * Message queue open to client
157    */
158   struct GNUNET_MQ_Handle *mq;
159
160   /**
161    * Channel open to client.
162    */
163   struct ChannelCtx *send_channel_ctx;
164
165   /**
166    * Channel open from client.
167    */
168   struct ChannelCtx *recv_channel_ctx;
169
170   /**
171    * Array of pending operations on this peer.
172    */
173   struct PeerPendingOp *pending_ops;
174
175   /**
176    * Handle to the callback given to cadet_ntfy_tmt_rdy()
177    *
178    * To be canceled on shutdown.
179    */
180   struct PendingMessage *online_check_pending;
181
182   /**
183    * Number of pending operations.
184    */
185   unsigned int num_pending_ops;
186
187   /**
188    * Identity of the peer
189    */
190   struct GNUNET_PeerIdentity peer_id;
191
192   /**
193    * Flags indicating status of peer
194    */
195   uint32_t peer_flags;
196
197   /**
198    * Last time we received something from that peer.
199    */
200   struct GNUNET_TIME_Absolute last_message_recv;
201
202   /**
203    * Last time we received a keepalive message.
204    */
205   struct GNUNET_TIME_Absolute last_keepalive;
206
207   /**
208    * DLL with all messages that are yet to be sent
209    */
210   struct PendingMessage *pending_messages_head;
211   struct PendingMessage *pending_messages_tail;
212
213   /**
214    * This is pobably followed by 'statistical' data (when we first saw
215    * it, how did we get its ID, how many pushes (in a timeinterval),
216    * ...)
217    */
218   uint32_t round_pull_req;
219 };
220
221 /**
222  * @brief Closure to #valid_peer_iterator
223  */
224 struct PeersIteratorCls {
225   /**
226    * Iterator function
227    */
228   PeersIterator iterator;
229
230   /**
231    * Closure to iterator
232    */
233   void *cls;
234 };
235
236 /**
237  * @brief Context for a channel
238  */
239 struct ChannelCtx {
240   /**
241    * @brief The channel itself
242    */
243   struct GNUNET_CADET_Channel *channel;
244
245   /**
246    * @brief The peer context associated with the channel
247    */
248   struct PeerContext *peer_ctx;
249
250   /**
251    * @brief When channel destruction needs to be delayed (because it is called
252    * from within the cadet routine of another channel destruction) this task
253    * refers to the respective _SCHEDULER_Task.
254    */
255   struct GNUNET_SCHEDULER_Task *destruction_task;
256 };
257
258
259 #if ENABLE_MALICIOUS
260
261 /**
262  * If type is 2 This struct is used to store the attacked peers in a DLL
263  */
264 struct AttackedPeer {
265   /**
266    * DLL
267    */
268   struct AttackedPeer *next;
269   struct AttackedPeer *prev;
270
271   /**
272    * PeerID
273    */
274   struct GNUNET_PeerIdentity peer_id;
275 };
276
277 #endif /* ENABLE_MALICIOUS */
278
279 /**
280  * @brief This number determines the number of slots for files that represent
281  * histograms
282  */
283 #define HISTOGRAM_FILE_SLOTS 32
284
285 /**
286  * @brief The size (in bytes) a file needs to store the histogram
287  *
288  * Per slot: 1 newline, up to 4 chars,
289  * Additionally: 1 null termination
290  */
291 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
292
293 /**
294  * @brief One Sub.
295  *
296  * Essentially one instance of brahms that only connects to other instances
297  * with the same (secret) value.
298  */
299 struct Sub {
300   /**
301    * @brief Hash of the shared value that defines Subs.
302    */
303   struct GNUNET_HashCode hash;
304
305   /**
306    * @brief Port to communicate to other peers.
307    */
308   struct GNUNET_CADET_Port *cadet_port;
309
310   /**
311    * @brief Hashmap of valid peers.
312    */
313   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
314
315   /**
316    * @brief Filename of the file that stores the valid peers persistently.
317    */
318   char *filename_valid_peers;
319
320   /**
321    * Set of all peers to keep track of them.
322    */
323   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
324
325   /**
326    * @brief This is the minimum estimate used as sampler size.
327    *
328    * It is configured by the user.
329    */
330   unsigned int sampler_size_est_min;
331
332   /**
333    * The size of sampler we need to be able to satisfy the Brahms protocol's
334    * need of random peers.
335    *
336    * This is one minimum size the sampler grows to.
337    */
338   unsigned int sampler_size_est_need;
339
340   /**
341    * Time interval the do_round task runs in.
342    */
343   struct GNUNET_TIME_Relative round_interval;
344
345   /**
346    * Sampler used for the Brahms protocol itself.
347    */
348   struct RPS_Sampler *sampler;
349
350 #ifdef TO_FILE_FULL
351   /**
352    * Name to log view to
353    */
354   char *file_name_view_log;
355 #endif /* TO_FILE_FULL */
356
357 #ifdef TO_FILE
358 #ifdef TO_FILE_FULL
359   /**
360    * Name to log number of observed peers to
361    */
362   char *file_name_observed_log;
363 #endif /* TO_FILE_FULL */
364
365   /**
366    * @brief Count the observed peers
367    */
368   uint32_t num_observed_peers;
369
370   /**
371    * @brief Multipeermap (ab-) used to count unique peer_ids
372    */
373   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
374 #endif /* TO_FILE */
375
376   /**
377    * List to store peers received through pushes temporary.
378    */
379   struct CustomPeerMap *push_map;
380
381   /**
382    * List to store peers received through pulls temporary.
383    */
384   struct CustomPeerMap *pull_map;
385
386   /**
387    * @brief This is the estimate used as view size.
388    *
389    * It is initialised with the minimum
390    */
391   unsigned int view_size_est_need;
392
393   /**
394    * @brief This is the minimum estimate used as view size.
395    *
396    * It is configured by the user.
397    */
398   unsigned int view_size_est_min;
399
400   /**
401    * @brief The view.
402    */
403   struct View *view;
404
405   /**
406    * Identifier for the main task that runs periodically.
407    */
408   struct GNUNET_SCHEDULER_Task *do_round_task;
409
410   /* === stats === */
411
412   /**
413    * @brief Counts the executed rounds.
414    */
415   uint32_t num_rounds;
416
417   /**
418    * @brief This array accumulates the number of received pushes per round.
419    *
420    * Number at index i represents the number of rounds with i observed pushes.
421    */
422   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
423
424   /**
425    * @brief Histogram of deltas between the expected and actual number of
426    * received pushes.
427    *
428    * As half of the entries are expected to be negative, this is shifted by
429    * #HISTOGRAM_FILE_SLOTS/2.
430    */
431   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
432
433   /**
434    * @brief Number of pull replies with this delay measured in rounds.
435    *
436    * Number at index i represents the number of pull replies with a delay of i
437    * rounds.
438    */
439   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
440 };
441
442
443 /***********************************************************************
444 * Globals
445 ***********************************************************************/
446
447 /**
448  * Our configuration.
449  */
450 static const struct GNUNET_CONFIGURATION_Handle *cfg;
451
452 /**
453  * Handle to the statistics service.
454  */
455 struct GNUNET_STATISTICS_Handle *stats;
456
457 /**
458  * Handler to CADET.
459  */
460 struct GNUNET_CADET_Handle *cadet_handle;
461
462 /**
463  * Handle to CORE
464  */
465 struct GNUNET_CORE_Handle *core_handle;
466
467 /**
468  * @brief PeerMap to keep track of connected peers.
469  */
470 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
471
472 /**
473  * Our own identity.
474  */
475 static struct GNUNET_PeerIdentity own_identity;
476
477 /**
478  * Percentage of total peer number in the view
479  * to send random PUSHes to
480  */
481 static float alpha;
482
483 /**
484  * Percentage of total peer number in the view
485  * to send random PULLs to
486  */
487 static float beta;
488
489 /**
490  * Handler to NSE.
491  */
492 static struct GNUNET_NSE_Handle *nse;
493
494 /**
495  * Handler to PEERINFO.
496  */
497 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
498
499 /**
500  * Handle for cancellation of iteration over peers.
501  */
502 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
503
504
505 #if ENABLE_MALICIOUS
506 /**
507  * Type of malicious peer
508  *
509  * 0 Don't act malicious at all - Default
510  * 1 Try to maximise representation
511  * 2 Try to partition the network
512  * 3 Combined attack
513  */
514 static uint32_t mal_type;
515
516 /**
517  * Other malicious peers
518  */
519 static struct GNUNET_PeerIdentity *mal_peers;
520
521 /**
522  * Hashmap of malicious peers used as set.
523  * Used to more efficiently check whether we know that peer.
524  */
525 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
526
527 /**
528  * Number of other malicious peers
529  */
530 static uint32_t num_mal_peers;
531
532
533 /**
534  * If type is 2 this is the DLL of attacked peers
535  */
536 static struct AttackedPeer *att_peers_head;
537 static struct AttackedPeer *att_peers_tail;
538
539 /**
540  * This index is used to point to an attacked peer to
541  * implement the round-robin-ish way to select attacked peers.
542  */
543 static struct AttackedPeer *att_peer_index;
544
545 /**
546  * Hashmap of attacked peers used as set.
547  * Used to more efficiently check whether we know that peer.
548  */
549 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
550
551 /**
552  * Number of attacked peers
553  */
554 static uint32_t num_attacked_peers;
555
556 /**
557  * If type is 1 this is the attacked peer
558  */
559 static struct GNUNET_PeerIdentity attacked_peer;
560
561 /**
562  * The limit of PUSHes we can send in one round.
563  * This is an assumption of the Brahms protocol and either implemented
564  * via proof of work
565  * or
566  * assumend to be the bandwidth limitation.
567  */
568 static uint32_t push_limit = 10000;
569 #endif /* ENABLE_MALICIOUS */
570
571 /**
572  * @brief Main Sub.
573  *
574  * This is run in any case by all peers and connects to all peers without
575  * specifying a shared value.
576  */
577 static struct Sub *msub;
578
579 /**
580  * @brief Maximum number of valid peers to keep.
581  * TODO read from config
582  */
583 static const uint32_t num_valid_peers_max = UINT32_MAX;
584
585 /***********************************************************************
586 * /Globals
587 ***********************************************************************/
588
589
590 static void
591 do_round(void *cls);
592
593 static void
594 do_mal_round(void *cls);
595
596
597 /**
598  * @brief Get the #PeerContext associated with a peer
599  *
600  * @param peer_map The peer map containing the context
601  * @param peer the peer id
602  *
603  * @return the #PeerContext
604  */
605 static struct PeerContext *
606 get_peer_ctx(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
607              const struct GNUNET_PeerIdentity *peer)
608 {
609   struct PeerContext *ctx;
610   int ret;
611
612   ret = GNUNET_CONTAINER_multipeermap_contains(peer_map, peer);
613   GNUNET_assert(GNUNET_YES == ret);
614   ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
615   GNUNET_assert(NULL != ctx);
616   return ctx;
617 }
618
619 /**
620  * @brief Check whether we have information about the given peer.
621  *
622  * FIXME probably deprecated. Make this the new _online.
623  *
624  * @param peer_map The peer map to check for the existence of @a peer
625  * @param peer peer in question
626  *
627  * @return #GNUNET_YES if peer is known
628  *         #GNUNET_NO  if peer is not knwon
629  */
630 static int
631 check_peer_known(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
632                  const struct GNUNET_PeerIdentity *peer)
633 {
634   if (NULL != peer_map)
635     {
636       return GNUNET_CONTAINER_multipeermap_contains(peer_map, peer);
637     }
638   else
639     {
640       return GNUNET_NO;
641     }
642 }
643
644
645 /**
646  * @brief Create a new #PeerContext and insert it into the peer map
647  *
648  * @param sub The Sub this context belongs to.
649  * @param peer the peer to create the #PeerContext for
650  *
651  * @return the #PeerContext
652  */
653 static struct PeerContext *
654 create_peer_ctx(struct Sub *sub,
655                 const struct GNUNET_PeerIdentity *peer)
656 {
657   struct PeerContext *ctx;
658   int ret;
659
660   GNUNET_assert(GNUNET_NO == check_peer_known(sub->peer_map, peer));
661
662   ctx = GNUNET_new(struct PeerContext);
663   ctx->peer_id = *peer;
664   ctx->sub = sub;
665   ret = GNUNET_CONTAINER_multipeermap_put(sub->peer_map, peer, ctx,
666                                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
667   GNUNET_assert(GNUNET_OK == ret);
668   if (sub == msub)
669     {
670       GNUNET_STATISTICS_set(stats,
671                             "# known peers",
672                             GNUNET_CONTAINER_multipeermap_size(sub->peer_map),
673                             GNUNET_NO);
674     }
675   return ctx;
676 }
677
678
679 /**
680  * @brief Create or get a #PeerContext
681  *
682  * @param sub The Sub to which the created context belongs to
683  * @param peer the peer to get the associated context to
684  *
685  * @return the context
686  */
687 static struct PeerContext *
688 create_or_get_peer_ctx(struct Sub *sub,
689                        const struct GNUNET_PeerIdentity *peer)
690 {
691   if (GNUNET_NO == check_peer_known(sub->peer_map, peer))
692     {
693       return create_peer_ctx(sub, peer);
694     }
695   return get_peer_ctx(sub->peer_map, peer);
696 }
697
698
699 /**
700  * @brief Check whether we have a connection to this @a peer
701  *
702  * Also sets the #Peers_ONLINE flag accordingly
703  *
704  * @param peer_ctx Context of the peer of which connectivity is to be checked
705  *
706  * @return #GNUNET_YES if we are connected
707  *         #GNUNET_NO  otherwise
708  */
709 static int
710 check_connected(struct PeerContext *peer_ctx)
711 {
712   /* If we don't know about this peer we don't know whether it's online */
713   if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
714                                     &peer_ctx->peer_id))
715     {
716       return GNUNET_NO;
717     }
718   /* Get the context */
719   peer_ctx = get_peer_ctx(peer_ctx->sub->peer_map, &peer_ctx->peer_id);
720   /* If we have no channel to this peer we don't know whether it's online */
721   if ((NULL == peer_ctx->send_channel_ctx) &&
722       (NULL == peer_ctx->recv_channel_ctx))
723     {
724       UNSET_PEER_FLAG(peer_ctx, Peers_ONLINE);
725       return GNUNET_NO;
726     }
727   /* Otherwise (if we have a channel, we know that it's online */
728   SET_PEER_FLAG(peer_ctx, Peers_ONLINE);
729   return GNUNET_YES;
730 }
731
732
733 /**
734  * @brief The closure to #get_rand_peer_iterator.
735  */
736 struct GetRandPeerIteratorCls {
737   /**
738    * @brief The index of the peer to return.
739    * Will be decreased until 0.
740    * Then current peer is returned.
741    */
742   uint32_t index;
743
744   /**
745    * @brief Pointer to peer to return.
746    */
747   const struct GNUNET_PeerIdentity *peer;
748 };
749
750
751 /**
752  * @brief Iterator function for #get_random_peer_from_peermap.
753  *
754  * Implements #GNUNET_CONTAINER_PeerMapIterator.
755  * Decreases the index until the index is null.
756  * Then returns the current peer.
757  *
758  * @param cls the #GetRandPeerIteratorCls containing index and peer
759  * @param peer current peer
760  * @param value unused
761  *
762  * @return  #GNUNET_YES if we should continue to
763  *          iterate,
764  *          #GNUNET_NO if not.
765  */
766 static int
767 get_rand_peer_iterator(void *cls,
768                        const struct GNUNET_PeerIdentity *peer,
769                        void *value)
770 {
771   struct GetRandPeerIteratorCls *iterator_cls = cls;
772
773   (void)value;
774
775   if (0 >= iterator_cls->index)
776     {
777       iterator_cls->peer = peer;
778       return GNUNET_NO;
779     }
780   iterator_cls->index--;
781   return GNUNET_YES;
782 }
783
784
785 /**
786  * @brief Get a random peer from @a peer_map
787  *
788  * @param valid_peers Peer map containing valid peers from which to select a
789  * random one
790  *
791  * @return a random peer
792  */
793 static const struct GNUNET_PeerIdentity *
794 get_random_peer_from_peermap(struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
795 {
796   struct GetRandPeerIteratorCls *iterator_cls;
797   const struct GNUNET_PeerIdentity *ret;
798
799   iterator_cls = GNUNET_new(struct GetRandPeerIteratorCls);
800   iterator_cls->index = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
801                                                  GNUNET_CONTAINER_multipeermap_size(valid_peers));
802   (void)GNUNET_CONTAINER_multipeermap_iterate(valid_peers,
803                                               get_rand_peer_iterator,
804                                               iterator_cls);
805   ret = iterator_cls->peer;
806   GNUNET_free(iterator_cls);
807   return ret;
808 }
809
810
811 /**
812  * @brief Add a given @a peer to valid peers.
813  *
814  * If valid peers are already #num_valid_peers_max, delete a peer previously.
815  *
816  * @param peer The peer that is added to the valid peers.
817  * @param valid_peers Peer map of valid peers to which to add the @a peer
818  *
819  * @return #GNUNET_YES if no other peer had to be removed
820  *         #GNUNET_NO  otherwise
821  */
822 static int
823 add_valid_peer(const struct GNUNET_PeerIdentity *peer,
824                struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
825 {
826   const struct GNUNET_PeerIdentity *rand_peer;
827   int ret;
828
829   ret = GNUNET_YES;
830   /* Remove random peers until there is space for a new one */
831   while (num_valid_peers_max <=
832          GNUNET_CONTAINER_multipeermap_size(valid_peers))
833     {
834       rand_peer = get_random_peer_from_peermap(valid_peers);
835       GNUNET_CONTAINER_multipeermap_remove_all(valid_peers, rand_peer);
836       ret = GNUNET_NO;
837     }
838   (void)GNUNET_CONTAINER_multipeermap_put(valid_peers, peer, NULL,
839                                           GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
840   if (valid_peers == msub->valid_peers)
841     {
842       GNUNET_STATISTICS_set(stats,
843                             "# valid peers",
844                             GNUNET_CONTAINER_multipeermap_size(valid_peers),
845                             GNUNET_NO);
846     }
847   return ret;
848 }
849
850 static void
851 remove_pending_message(struct PendingMessage *pending_msg, int cancel);
852
853 /**
854  * @brief Set the peer flag to living and
855  *        call the pending operations on this peer.
856  *
857  * Also adds peer to #valid_peers.
858  *
859  * @param peer_ctx the #PeerContext of the peer to set online
860  */
861 static void
862 set_peer_online(struct PeerContext *peer_ctx)
863 {
864   struct GNUNET_PeerIdentity *peer;
865   unsigned int i;
866
867   peer = &peer_ctx->peer_id;
868   LOG(GNUNET_ERROR_TYPE_DEBUG,
869       "Peer %s is online and valid, calling %i pending operations on it\n",
870       GNUNET_i2s(peer),
871       peer_ctx->num_pending_ops);
872
873   if (NULL != peer_ctx->online_check_pending)
874     {
875       LOG(GNUNET_ERROR_TYPE_DEBUG,
876           "Removing pending online check for peer %s\n",
877           GNUNET_i2s(&peer_ctx->peer_id));
878       // TODO wait until cadet sets mq->cancel_impl
879       //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
880       remove_pending_message(peer_ctx->online_check_pending, GNUNET_YES);
881       peer_ctx->online_check_pending = NULL;
882     }
883
884   SET_PEER_FLAG(peer_ctx, Peers_ONLINE);
885
886   /* Call pending operations */
887   for (i = 0; i < peer_ctx->num_pending_ops; i++)
888     {
889       peer_ctx->pending_ops[i].op(peer_ctx->pending_ops[i].op_cls, peer);
890     }
891   GNUNET_array_grow(peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
892 }
893
894 static void
895 cleanup_destroyed_channel(void *cls,
896                           const struct GNUNET_CADET_Channel *channel);
897
898 /* Declaration of handlers */
899 static void
900 handle_peer_check(void *cls,
901                   const struct GNUNET_MessageHeader *msg);
902
903 static void
904 handle_peer_push(void *cls,
905                  const struct GNUNET_MessageHeader *msg);
906
907 static void
908 handle_peer_pull_request(void *cls,
909                          const struct GNUNET_MessageHeader *msg);
910
911 static int
912 check_peer_pull_reply(void *cls,
913                       const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
914
915 static void
916 handle_peer_pull_reply(void *cls,
917                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
918
919 /* End declaration of handlers */
920
921 /**
922  * @brief Allocate memory for a new channel context and insert it into DLL
923  *
924  * @param peer_ctx context of the according peer
925  *
926  * @return The channel context
927  */
928 static struct ChannelCtx *
929 add_channel_ctx(struct PeerContext *peer_ctx)
930 {
931   struct ChannelCtx *channel_ctx;
932
933   channel_ctx = GNUNET_new(struct ChannelCtx);
934   channel_ctx->peer_ctx = peer_ctx;
935   return channel_ctx;
936 }
937
938
939 /**
940  * @brief Free memory and NULL pointers.
941  *
942  * @param channel_ctx The channel context.
943  */
944 static void
945 remove_channel_ctx(struct ChannelCtx *channel_ctx)
946 {
947   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
948
949   if (NULL != channel_ctx->destruction_task)
950     {
951       GNUNET_SCHEDULER_cancel(channel_ctx->destruction_task);
952       channel_ctx->destruction_task = NULL;
953     }
954
955   GNUNET_free(channel_ctx);
956
957   if (NULL == peer_ctx)
958     return;
959   if (channel_ctx == peer_ctx->send_channel_ctx)
960     {
961       peer_ctx->send_channel_ctx = NULL;
962       peer_ctx->mq = NULL;
963     }
964   else if (channel_ctx == peer_ctx->recv_channel_ctx)
965     {
966       peer_ctx->recv_channel_ctx = NULL;
967     }
968 }
969
970
971 /**
972  * @brief Get the channel of a peer. If not existing, create.
973  *
974  * @param peer_ctx Context of the peer of which to get the channel
975  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
976  */
977 struct GNUNET_CADET_Channel *
978 get_channel(struct PeerContext *peer_ctx)
979 {
980   /* There exists a copy-paste-clone in run() */
981   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
982     GNUNET_MQ_hd_fixed_size(peer_check,
983                             GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
984                             struct GNUNET_MessageHeader,
985                             NULL),
986     GNUNET_MQ_hd_fixed_size(peer_push,
987                             GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
988                             struct GNUNET_MessageHeader,
989                             NULL),
990     GNUNET_MQ_hd_fixed_size(peer_pull_request,
991                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
992                             struct GNUNET_MessageHeader,
993                             NULL),
994     GNUNET_MQ_hd_var_size(peer_pull_reply,
995                           GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
996                           struct GNUNET_RPS_P2P_PullReplyMessage,
997                           NULL),
998     GNUNET_MQ_handler_end()
999   };
1000
1001
1002   if (NULL == peer_ctx->send_channel_ctx)
1003     {
1004       LOG(GNUNET_ERROR_TYPE_DEBUG,
1005           "Trying to establish channel to peer %s\n",
1006           GNUNET_i2s(&peer_ctx->peer_id));
1007       peer_ctx->send_channel_ctx = add_channel_ctx(peer_ctx);
1008       peer_ctx->send_channel_ctx->channel =
1009         GNUNET_CADET_channel_create(cadet_handle,
1010                                     peer_ctx->send_channel_ctx, /* context */
1011                                     &peer_ctx->peer_id,
1012                                     &peer_ctx->sub->hash,
1013                                     NULL, /* WindowSize handler */
1014                                     &cleanup_destroyed_channel, /* Disconnect handler */
1015                                     cadet_handlers);
1016     }
1017   GNUNET_assert(NULL != peer_ctx->send_channel_ctx);
1018   GNUNET_assert(NULL != peer_ctx->send_channel_ctx->channel);
1019   return peer_ctx->send_channel_ctx->channel;
1020 }
1021
1022
1023 /**
1024  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1025  *
1026  * If we already have a message queue open to this client,
1027  * simply return it, otherways create one.
1028  *
1029  * @param peer_ctx Context of the peer of whicht to get the mq
1030  * @return the #GNUNET_MQ_Handle
1031  */
1032 static struct GNUNET_MQ_Handle *
1033 get_mq(struct PeerContext *peer_ctx)
1034 {
1035   if (NULL == peer_ctx->mq)
1036     {
1037       peer_ctx->mq = GNUNET_CADET_get_mq(get_channel(peer_ctx));
1038     }
1039   return peer_ctx->mq;
1040 }
1041
1042 /**
1043  * @brief Add an envelope to a message passed to mq to list of pending messages
1044  *
1045  * @param peer_ctx Context of the peer for which to insert the envelope
1046  * @param ev envelope to the message
1047  * @param type type of the message to be sent
1048  * @return pointer to pending message
1049  */
1050 static struct PendingMessage *
1051 insert_pending_message(struct PeerContext *peer_ctx,
1052                        struct GNUNET_MQ_Envelope *ev,
1053                        const char *type)
1054 {
1055   struct PendingMessage *pending_msg;
1056
1057   pending_msg = GNUNET_new(struct PendingMessage);
1058   pending_msg->ev = ev;
1059   pending_msg->peer_ctx = peer_ctx;
1060   pending_msg->type = type;
1061   GNUNET_CONTAINER_DLL_insert(peer_ctx->pending_messages_head,
1062                               peer_ctx->pending_messages_tail,
1063                               pending_msg);
1064   return pending_msg;
1065 }
1066
1067
1068 /**
1069  * @brief Remove a pending message from the respective DLL
1070  *
1071  * @param pending_msg the pending message to remove
1072  * @param cancel whether to cancel the pending message, too
1073  */
1074 static void
1075 remove_pending_message(struct PendingMessage *pending_msg, int cancel)
1076 {
1077   struct PeerContext *peer_ctx;
1078
1079   (void)cancel;
1080
1081   peer_ctx = pending_msg->peer_ctx;
1082   GNUNET_assert(NULL != peer_ctx);
1083   GNUNET_CONTAINER_DLL_remove(peer_ctx->pending_messages_head,
1084                               peer_ctx->pending_messages_tail,
1085                               pending_msg);
1086   // TODO wait for the cadet implementation of message cancellation
1087   //if (GNUNET_YES == cancel)
1088   //{
1089   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1090   //}
1091   GNUNET_free(pending_msg);
1092 }
1093
1094
1095 /**
1096  * @brief This is called in response to the first message we sent as a
1097  * online check.
1098  *
1099  * @param cls #PeerContext of peer with pending online check
1100  */
1101 static void
1102 mq_online_check_successful(void *cls)
1103 {
1104   struct PeerContext *peer_ctx = cls;
1105
1106   if (NULL != peer_ctx->online_check_pending)
1107     {
1108       LOG(GNUNET_ERROR_TYPE_DEBUG,
1109           "Online check for peer %s was successfull\n",
1110           GNUNET_i2s(&peer_ctx->peer_id));
1111       remove_pending_message(peer_ctx->online_check_pending, GNUNET_YES);
1112       peer_ctx->online_check_pending = NULL;
1113       set_peer_online(peer_ctx);
1114       (void)add_valid_peer(&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1115     }
1116 }
1117
1118 /**
1119  * Issue a check whether peer is online
1120  *
1121  * @param peer_ctx the context of the peer
1122  */
1123 static void
1124 check_peer_online(struct PeerContext *peer_ctx)
1125 {
1126   LOG(GNUNET_ERROR_TYPE_DEBUG,
1127       "Get informed about peer %s getting online\n",
1128       GNUNET_i2s(&peer_ctx->peer_id));
1129
1130   struct GNUNET_MQ_Handle *mq;
1131   struct GNUNET_MQ_Envelope *ev;
1132
1133   ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1134   peer_ctx->online_check_pending =
1135     insert_pending_message(peer_ctx, ev, "Check online");
1136   mq = get_mq(peer_ctx);
1137   GNUNET_MQ_notify_sent(ev,
1138                         mq_online_check_successful,
1139                         peer_ctx);
1140   GNUNET_MQ_send(mq, ev);
1141   if (peer_ctx->sub == msub)
1142     {
1143       GNUNET_STATISTICS_update(stats,
1144                                "# pending online checks",
1145                                1,
1146                                GNUNET_NO);
1147     }
1148 }
1149
1150
1151 /**
1152  * @brief Check whether function of type #PeerOp was already scheduled
1153  *
1154  * The array with pending operations will probably never grow really big, so
1155  * iterating over it should be ok.
1156  *
1157  * @param peer_ctx Context of the peer to check for the operation
1158  * @param peer_op the operation (#PeerOp) on the peer
1159  *
1160  * @return #GNUNET_YES if this operation is scheduled on that peer
1161  *         #GNUNET_NO  otherwise
1162  */
1163 static int
1164 check_operation_scheduled(const struct PeerContext *peer_ctx,
1165                           const PeerOp peer_op)
1166 {
1167   unsigned int i;
1168
1169   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1170     if (peer_op == peer_ctx->pending_ops[i].op)
1171       return GNUNET_YES;
1172   return GNUNET_NO;
1173 }
1174
1175
1176 /**
1177  * @brief Callback for scheduler to destroy a channel
1178  *
1179  * @param cls Context of the channel
1180  */
1181 static void
1182 destroy_channel(struct ChannelCtx *channel_ctx)
1183 {
1184   struct GNUNET_CADET_Channel *channel;
1185
1186   if (NULL != channel_ctx->destruction_task)
1187     {
1188       GNUNET_SCHEDULER_cancel(channel_ctx->destruction_task);
1189       channel_ctx->destruction_task = NULL;
1190     }
1191   GNUNET_assert(channel_ctx->channel != NULL);
1192   channel = channel_ctx->channel;
1193   channel_ctx->channel = NULL;
1194   GNUNET_CADET_channel_destroy(channel);
1195   remove_channel_ctx(channel_ctx);
1196 }
1197
1198
1199 /**
1200  * @brief Destroy a cadet channel.
1201  *
1202  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1203  *
1204  * @param cls
1205  */
1206 static void
1207 destroy_channel_cb(void *cls)
1208 {
1209   struct ChannelCtx *channel_ctx = cls;
1210
1211   channel_ctx->destruction_task = NULL;
1212   destroy_channel(channel_ctx);
1213 }
1214
1215
1216 /**
1217  * @brief Schedule the destruction of a channel for immediately afterwards.
1218  *
1219  * In case a channel is to be destroyed from within the callback to the
1220  * destruction of another channel (send channel), we cannot call
1221  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1222  * construction.
1223  *
1224  * @param channel_ctx channel to be destroyed.
1225  */
1226 static void
1227 schedule_channel_destruction(struct ChannelCtx *channel_ctx)
1228 {
1229   GNUNET_assert(NULL ==
1230                 channel_ctx->destruction_task);
1231   GNUNET_assert(NULL !=
1232                 channel_ctx->channel);
1233   channel_ctx->destruction_task =
1234     GNUNET_SCHEDULER_add_now(&destroy_channel_cb,
1235                              channel_ctx);
1236 }
1237
1238
1239 /**
1240  * @brief Remove peer
1241  *
1242  * - Empties the list with pending operations
1243  * - Empties the list with pending messages
1244  * - Cancels potentially existing online check
1245  * - Schedules closing of send and recv channels
1246  * - Removes peer from peer map
1247  *
1248  * @param peer_ctx Context of the peer to be destroyed
1249  * @return #GNUNET_YES if peer was removed
1250  *         #GNUNET_NO  otherwise
1251  */
1252 static int
1253 destroy_peer(struct PeerContext *peer_ctx)
1254 {
1255   GNUNET_assert(NULL != peer_ctx);
1256   GNUNET_assert(NULL != peer_ctx->sub->peer_map);
1257   if (GNUNET_NO ==
1258       GNUNET_CONTAINER_multipeermap_contains(peer_ctx->sub->peer_map,
1259                                              &peer_ctx->peer_id))
1260     {
1261       return GNUNET_NO;
1262     }
1263   SET_PEER_FLAG(peer_ctx, Peers_TO_DESTROY);
1264   LOG(GNUNET_ERROR_TYPE_DEBUG,
1265       "Going to remove peer %s\n",
1266       GNUNET_i2s(&peer_ctx->peer_id));
1267   UNSET_PEER_FLAG(peer_ctx, Peers_ONLINE);
1268
1269   /* Clear list of pending operations */
1270   // TODO this probably leaks memory
1271   //      ('only' the cls to the function. Not sure what to do with it)
1272   GNUNET_array_grow(peer_ctx->pending_ops,
1273                     peer_ctx->num_pending_ops,
1274                     0);
1275   /* Remove all pending messages */
1276   while (NULL != peer_ctx->pending_messages_head)
1277     {
1278       LOG(GNUNET_ERROR_TYPE_DEBUG,
1279           "Removing unsent %s\n",
1280           peer_ctx->pending_messages_head->type);
1281       /* Cancle pending message, too */
1282       if ((NULL != peer_ctx->online_check_pending) &&
1283           (0 == memcmp(peer_ctx->pending_messages_head,
1284                        peer_ctx->online_check_pending,
1285                        sizeof(struct PendingMessage))))
1286         {
1287           peer_ctx->online_check_pending = NULL;
1288           if (peer_ctx->sub == msub)
1289             {
1290               GNUNET_STATISTICS_update(stats,
1291                                        "# pending online checks",
1292                                        -1,
1293                                        GNUNET_NO);
1294             }
1295         }
1296       remove_pending_message(peer_ctx->pending_messages_head,
1297                              GNUNET_YES);
1298     }
1299
1300   /* If we are still waiting for notification whether this peer is online
1301    * cancel the according task */
1302   if (NULL != peer_ctx->online_check_pending)
1303     {
1304       GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1305                  "Removing pending online check for peer %s\n",
1306                  GNUNET_i2s(&peer_ctx->peer_id));
1307       // TODO wait until cadet sets mq->cancel_impl
1308       //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1309       remove_pending_message(peer_ctx->online_check_pending,
1310                              GNUNET_YES);
1311       peer_ctx->online_check_pending = NULL;
1312     }
1313
1314   if (NULL != peer_ctx->send_channel_ctx)
1315     {
1316       /* This is possibly called from within channel destruction */
1317       peer_ctx->send_channel_ctx->peer_ctx = NULL;
1318       schedule_channel_destruction(peer_ctx->send_channel_ctx);
1319       peer_ctx->send_channel_ctx = NULL;
1320       peer_ctx->mq = NULL;
1321     }
1322   if (NULL != peer_ctx->recv_channel_ctx)
1323     {
1324       /* This is possibly called from within channel destruction */
1325       peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1326       schedule_channel_destruction(peer_ctx->recv_channel_ctx);
1327       peer_ctx->recv_channel_ctx = NULL;
1328     }
1329
1330   if (GNUNET_YES !=
1331       GNUNET_CONTAINER_multipeermap_remove_all(peer_ctx->sub->peer_map,
1332                                                &peer_ctx->peer_id))
1333     {
1334       LOG(GNUNET_ERROR_TYPE_WARNING,
1335           "removing peer from peer_ctx->sub->peer_map failed\n");
1336     }
1337   if (peer_ctx->sub == msub)
1338     {
1339       GNUNET_STATISTICS_set(stats,
1340                             "# known peers",
1341                             GNUNET_CONTAINER_multipeermap_size(peer_ctx->sub->peer_map),
1342                             GNUNET_NO);
1343     }
1344   GNUNET_free(peer_ctx);
1345   return GNUNET_YES;
1346 }
1347
1348
1349 /**
1350  * Iterator over hash map entries. Deletes all contexts of peers.
1351  *
1352  * @param cls closure
1353  * @param key current public key
1354  * @param value value in the hash map
1355  * @return #GNUNET_YES if we should continue to iterate,
1356  *         #GNUNET_NO if not.
1357  */
1358 static int
1359 peermap_clear_iterator(void *cls,
1360                        const struct GNUNET_PeerIdentity *key,
1361                        void *value)
1362 {
1363   struct Sub *sub = cls;
1364
1365   (void)value;
1366
1367   destroy_peer(get_peer_ctx(sub->peer_map, key));
1368   return GNUNET_YES;
1369 }
1370
1371
1372 /**
1373  * @brief This is called once a message is sent.
1374  *
1375  * Removes the pending message
1376  *
1377  * @param cls type of the message that was sent
1378  */
1379 static void
1380 mq_notify_sent_cb(void *cls)
1381 {
1382   struct PendingMessage *pending_msg = (struct PendingMessage *)cls;
1383
1384   LOG(GNUNET_ERROR_TYPE_DEBUG,
1385       "%s was sent.\n",
1386       pending_msg->type);
1387   if (pending_msg->peer_ctx->sub == msub)
1388     {
1389       if (0 == strncmp("PULL REPLY", pending_msg->type, 10))
1390         GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1391       if (0 == strncmp("PULL REQUEST", pending_msg->type, 12))
1392         GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1393       if (0 == strncmp("PUSH", pending_msg->type, 4))
1394         GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1395       if (0 == strncmp("PULL REQUEST", pending_msg->type, 12) &&
1396           NULL != map_single_hop &&
1397           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
1398                                                               &pending_msg->peer_ctx->peer_id))
1399         GNUNET_STATISTICS_update(stats,
1400                                  "# pull requests sent (multi-hop peer)",
1401                                  1,
1402                                  GNUNET_NO);
1403     }
1404   /* Do not cancle message */
1405   remove_pending_message(pending_msg, GNUNET_NO);
1406 }
1407
1408
1409 /**
1410  * @brief Iterator function for #store_valid_peers.
1411  *
1412  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1413  * Writes single peer to disk.
1414  *
1415  * @param cls the file handle to write to.
1416  * @param peer current peer
1417  * @param value unused
1418  *
1419  * @return  #GNUNET_YES if we should continue to
1420  *          iterate,
1421  *          #GNUNET_NO if not.
1422  */
1423 static int
1424 store_peer_presistently_iterator(void *cls,
1425                                  const struct GNUNET_PeerIdentity *peer,
1426                                  void *value)
1427 {
1428   const struct GNUNET_DISK_FileHandle *fh = cls;
1429   char peer_string[128];
1430   int size;
1431   ssize_t ret;
1432
1433   (void)value;
1434
1435   if (NULL == peer)
1436     {
1437       return GNUNET_YES;
1438     }
1439   size = GNUNET_snprintf(peer_string,
1440                          sizeof(peer_string),
1441                          "%s\n",
1442                          GNUNET_i2s_full(peer));
1443   GNUNET_assert(53 == size);
1444   ret = GNUNET_DISK_file_write(fh,
1445                                peer_string,
1446                                size);
1447   GNUNET_assert(size == ret);
1448   return GNUNET_YES;
1449 }
1450
1451
1452 /**
1453  * @brief Store the peers currently in #valid_peers to disk.
1454  *
1455  * @param sub Sub for which to store the valid peers
1456  */
1457 static void
1458 store_valid_peers(const struct Sub *sub)
1459 {
1460   struct GNUNET_DISK_FileHandle *fh;
1461   uint32_t number_written_peers;
1462   int ret;
1463
1464   if (0 == strncmp("DISABLE", sub->filename_valid_peers, 7))
1465     {
1466       return;
1467     }
1468
1469   ret = GNUNET_DISK_directory_create_for_file(sub->filename_valid_peers);
1470   if (GNUNET_SYSERR == ret)
1471     {
1472       LOG(GNUNET_ERROR_TYPE_WARNING,
1473           "Not able to create directory for file `%s'\n",
1474           sub->filename_valid_peers);
1475       GNUNET_break(0);
1476     }
1477   else if (GNUNET_NO == ret)
1478     {
1479       LOG(GNUNET_ERROR_TYPE_WARNING,
1480           "Directory for file `%s' exists but is not writable for us\n",
1481           sub->filename_valid_peers);
1482       GNUNET_break(0);
1483     }
1484   fh = GNUNET_DISK_file_open(sub->filename_valid_peers,
1485                              GNUNET_DISK_OPEN_WRITE |
1486                              GNUNET_DISK_OPEN_CREATE,
1487                              GNUNET_DISK_PERM_USER_READ |
1488                              GNUNET_DISK_PERM_USER_WRITE);
1489   if (NULL == fh)
1490     {
1491       LOG(GNUNET_ERROR_TYPE_WARNING,
1492           "Not able to write valid peers to file `%s'\n",
1493           sub->filename_valid_peers);
1494       return;
1495     }
1496   LOG(GNUNET_ERROR_TYPE_DEBUG,
1497       "Writing %u valid peers to disk\n",
1498       GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1499   number_written_peers =
1500     GNUNET_CONTAINER_multipeermap_iterate(sub->valid_peers,
1501                                           store_peer_presistently_iterator,
1502                                           fh);
1503   GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_close(fh));
1504   GNUNET_assert(number_written_peers ==
1505                 GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1506 }
1507
1508
1509 /**
1510  * @brief Convert string representation of peer id to peer id.
1511  *
1512  * Counterpart to #GNUNET_i2s_full.
1513  *
1514  * @param string_repr The string representation of the peer id
1515  *
1516  * @return The peer id
1517  */
1518 static const struct GNUNET_PeerIdentity *
1519 s2i_full(const char *string_repr)
1520 {
1521   struct GNUNET_PeerIdentity *peer;
1522   size_t len;
1523   int ret;
1524
1525   peer = GNUNET_new(struct GNUNET_PeerIdentity);
1526   len = strlen(string_repr);
1527   if (52 > len)
1528     {
1529       LOG(GNUNET_ERROR_TYPE_WARNING,
1530           "Not able to convert string representation of PeerID to PeerID\n"
1531           "Sting representation: %s (len %lu) - too short\n",
1532           string_repr,
1533           len);
1534       GNUNET_break(0);
1535     }
1536   else if (52 < len)
1537     {
1538       len = 52;
1539     }
1540   ret = GNUNET_CRYPTO_eddsa_public_key_from_string(string_repr,
1541                                                    len,
1542                                                    &peer->public_key);
1543   if (GNUNET_OK != ret)
1544     {
1545       LOG(GNUNET_ERROR_TYPE_WARNING,
1546           "Not able to convert string representation of PeerID to PeerID\n"
1547           "Sting representation: %s\n",
1548           string_repr);
1549       GNUNET_break(0);
1550     }
1551   return peer;
1552 }
1553
1554
1555 /**
1556  * @brief Restore the peers on disk to #valid_peers.
1557  *
1558  * @param sub Sub for which to restore the valid peers
1559  */
1560 static void
1561 restore_valid_peers(const struct Sub *sub)
1562 {
1563   off_t file_size;
1564   uint32_t num_peers;
1565   struct GNUNET_DISK_FileHandle *fh;
1566   char *buf;
1567   ssize_t size_read;
1568   char *iter_buf;
1569   char *str_repr;
1570   const struct GNUNET_PeerIdentity *peer;
1571
1572   if (0 == strncmp("DISABLE", sub->filename_valid_peers, 7))
1573     {
1574       return;
1575     }
1576
1577   if (GNUNET_OK != GNUNET_DISK_file_test(sub->filename_valid_peers))
1578     {
1579       return;
1580     }
1581   fh = GNUNET_DISK_file_open(sub->filename_valid_peers,
1582                              GNUNET_DISK_OPEN_READ,
1583                              GNUNET_DISK_PERM_NONE);
1584   GNUNET_assert(NULL != fh);
1585   GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_handle_size(fh, &file_size));
1586   num_peers = file_size / 53;
1587   buf = GNUNET_malloc(file_size);
1588   size_read = GNUNET_DISK_file_read(fh, buf, file_size);
1589   GNUNET_assert(size_read == file_size);
1590   LOG(GNUNET_ERROR_TYPE_DEBUG,
1591       "Restoring %" PRIu32 " peers from file `%s'\n",
1592       num_peers,
1593       sub->filename_valid_peers);
1594   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1595     {
1596       str_repr = GNUNET_strndup(iter_buf, 53);
1597       peer = s2i_full(str_repr);
1598       GNUNET_free(str_repr);
1599       add_valid_peer(peer, sub->valid_peers);
1600       LOG(GNUNET_ERROR_TYPE_DEBUG,
1601           "Restored valid peer %s from disk\n",
1602           GNUNET_i2s_full(peer));
1603     }
1604   iter_buf = NULL;
1605   GNUNET_free(buf);
1606   LOG(GNUNET_ERROR_TYPE_DEBUG,
1607       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1608       num_peers,
1609       GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1610   if (num_peers != GNUNET_CONTAINER_multipeermap_size(sub->valid_peers))
1611     {
1612       LOG(GNUNET_ERROR_TYPE_WARNING,
1613           "Number of restored peers does not match file size. Have probably duplicates.\n");
1614     }
1615   GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_close(fh));
1616   LOG(GNUNET_ERROR_TYPE_DEBUG,
1617       "Restored %u valid peers from disk\n",
1618       GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1619 }
1620
1621
1622 /**
1623  * @brief Delete storage of peers that was created with #initialise_peers ()
1624  *
1625  * @param sub Sub for which the storage is deleted
1626  */
1627 static void
1628 peers_terminate(struct Sub *sub)
1629 {
1630   if (GNUNET_SYSERR ==
1631       GNUNET_CONTAINER_multipeermap_iterate(sub->peer_map,
1632                                             &peermap_clear_iterator,
1633                                             sub))
1634     {
1635       LOG(GNUNET_ERROR_TYPE_WARNING,
1636           "Iteration destroying peers was aborted.\n");
1637     }
1638   GNUNET_CONTAINER_multipeermap_destroy(sub->peer_map);
1639   sub->peer_map = NULL;
1640   store_valid_peers(sub);
1641   GNUNET_free(sub->filename_valid_peers);
1642   sub->filename_valid_peers = NULL;
1643   GNUNET_CONTAINER_multipeermap_destroy(sub->valid_peers);
1644   sub->valid_peers = NULL;
1645 }
1646
1647
1648 /**
1649  * Iterator over #valid_peers hash map entries.
1650  *
1651  * @param cls Closure that contains iterator function and closure
1652  * @param peer current peer id
1653  * @param value value in the hash map - unused
1654  * @return #GNUNET_YES if we should continue to
1655  *         iterate,
1656  *         #GNUNET_NO if not.
1657  */
1658 static int
1659 valid_peer_iterator(void *cls,
1660                     const struct GNUNET_PeerIdentity *peer,
1661                     void *value)
1662 {
1663   struct PeersIteratorCls *it_cls = cls;
1664
1665   (void)value;
1666
1667   return it_cls->iterator(it_cls->cls, peer);
1668 }
1669
1670
1671 /**
1672  * @brief Get all currently known, valid peer ids.
1673  *
1674  * @param valid_peers Peer map containing the valid peers in question
1675  * @param iterator function to call on each peer id
1676  * @param it_cls extra argument to @a iterator
1677  * @return the number of key value pairs processed,
1678  *         #GNUNET_SYSERR if it aborted iteration
1679  */
1680 static int
1681 get_valid_peers(struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1682                 PeersIterator iterator,
1683                 void *it_cls)
1684 {
1685   struct PeersIteratorCls *cls;
1686   int ret;
1687
1688   cls = GNUNET_new(struct PeersIteratorCls);
1689   cls->iterator = iterator;
1690   cls->cls = it_cls;
1691   ret = GNUNET_CONTAINER_multipeermap_iterate(valid_peers,
1692                                               valid_peer_iterator,
1693                                               cls);
1694   GNUNET_free(cls);
1695   return ret;
1696 }
1697
1698
1699 /**
1700  * @brief Add peer to known peers.
1701  *
1702  * This function is called on new peer_ids from 'external' sources
1703  * (client seed, cadet get_peers(), ...)
1704  *
1705  * @param sub Sub with the peer map that the @a peer will be added to
1706  * @param peer the new #GNUNET_PeerIdentity
1707  *
1708  * @return #GNUNET_YES if peer was inserted
1709  *         #GNUNET_NO  otherwise
1710  */
1711 static int
1712 insert_peer(struct Sub *sub,
1713             const struct GNUNET_PeerIdentity *peer)
1714 {
1715   if (GNUNET_YES == check_peer_known(sub->peer_map, peer))
1716     {
1717       return GNUNET_NO; /* We already know this peer - nothing to do */
1718     }
1719   (void)create_peer_ctx(sub, peer);
1720   return GNUNET_YES;
1721 }
1722
1723
1724 /**
1725  * @brief Check whether flags on a peer are set.
1726  *
1727  * @param peer_map Peer map that is expected to contain the @a peer
1728  * @param peer the peer to check the flag of
1729  * @param flags the flags to check
1730  *
1731  * @return #GNUNET_SYSERR if peer is not known
1732  *         #GNUNET_YES    if all given flags are set
1733  *         #GNUNET_NO     otherwise
1734  */
1735 static int
1736 check_peer_flag(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1737                 const struct GNUNET_PeerIdentity *peer,
1738                 enum Peers_PeerFlags flags)
1739 {
1740   struct PeerContext *peer_ctx;
1741
1742   if (GNUNET_NO == check_peer_known(peer_map, peer))
1743     {
1744       return GNUNET_SYSERR;
1745     }
1746   peer_ctx = get_peer_ctx(peer_map, peer);
1747   return check_peer_flag_set(peer_ctx, flags);
1748 }
1749
1750 /**
1751  * @brief Try connecting to a peer to see whether it is online
1752  *
1753  * If not known yet, insert into known peers
1754  *
1755  * @param sub Sub which would contain the @a peer
1756  * @param peer the peer whose online is to be checked
1757  * @return #GNUNET_YES if the check was issued
1758  *         #GNUNET_NO  otherwise
1759  */
1760 static int
1761 issue_peer_online_check(struct Sub *sub,
1762                         const struct GNUNET_PeerIdentity *peer)
1763 {
1764   struct PeerContext *peer_ctx;
1765
1766   (void)insert_peer(sub, peer);   // TODO even needed?
1767   peer_ctx = get_peer_ctx(sub->peer_map, peer);
1768   if ((GNUNET_NO == check_peer_flag(sub->peer_map, peer, Peers_ONLINE)) &&
1769       (NULL == peer_ctx->online_check_pending))
1770     {
1771       check_peer_online(peer_ctx);
1772       return GNUNET_YES;
1773     }
1774   return GNUNET_NO;
1775 }
1776
1777
1778 /**
1779  * @brief Check if peer is removable.
1780  *
1781  * Check if
1782  *  - a recv channel exists
1783  *  - there are pending messages
1784  *  - there is no pending pull reply
1785  *
1786  * @param peer_ctx Context of the peer in question
1787  * @return #GNUNET_YES    if peer is removable
1788  *         #GNUNET_NO     if peer is NOT removable
1789  *         #GNUNET_SYSERR if peer is not known
1790  */
1791 static int
1792 check_removable(const struct PeerContext *peer_ctx)
1793 {
1794   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(peer_ctx->sub->peer_map,
1795                                                           &peer_ctx->peer_id))
1796     {
1797       return GNUNET_SYSERR;
1798     }
1799
1800   if ((NULL != peer_ctx->recv_channel_ctx) ||
1801       (NULL != peer_ctx->pending_messages_head) ||
1802       (GNUNET_YES == check_peer_flag_set(peer_ctx, Peers_PULL_REPLY_PENDING)))
1803     {
1804       return GNUNET_NO;
1805     }
1806   return GNUNET_YES;
1807 }
1808
1809
1810 /**
1811  * @brief Check whether @a peer is actually a peer.
1812  *
1813  * A valid peer is a peer that we know exists eg. we were connected to once.
1814  *
1815  * @param valid_peers Peer map that would contain the @a peer
1816  * @param peer peer in question
1817  *
1818  * @return #GNUNET_YES if peer is valid
1819  *         #GNUNET_NO  if peer is not valid
1820  */
1821 static int
1822 check_peer_valid(const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1823                  const struct GNUNET_PeerIdentity *peer)
1824 {
1825   return GNUNET_CONTAINER_multipeermap_contains(valid_peers, peer);
1826 }
1827
1828
1829 /**
1830  * @brief Indicate that we want to send to the other peer
1831  *
1832  * This establishes a sending channel
1833  *
1834  * @param peer_ctx Context of the target peer
1835  */
1836 static void
1837 indicate_sending_intention(struct PeerContext *peer_ctx)
1838 {
1839   GNUNET_assert(GNUNET_YES == check_peer_known(peer_ctx->sub->peer_map,
1840                                                &peer_ctx->peer_id));
1841   (void)get_channel(peer_ctx);
1842 }
1843
1844
1845 /**
1846  * @brief Check whether other peer has the intention to send/opened channel
1847  *        towars us
1848  *
1849  * @param peer_ctx Context of the peer in question
1850  *
1851  * @return #GNUNET_YES if peer has the intention to send
1852  *         #GNUNET_NO  otherwise
1853  */
1854 static int
1855 check_peer_send_intention(const struct PeerContext *peer_ctx)
1856 {
1857   if (NULL != peer_ctx->recv_channel_ctx)
1858     {
1859       return GNUNET_YES;
1860     }
1861   return GNUNET_NO;
1862 }
1863
1864
1865 /**
1866  * Handle the channel a peer opens to us.
1867  *
1868  * @param cls The closure - Sub
1869  * @param channel The channel the peer wants to establish
1870  * @param initiator The peer's peer ID
1871  *
1872  * @return initial channel context for the channel
1873  *         (can be NULL -- that's not an error)
1874  */
1875 static void *
1876 handle_inbound_channel(void *cls,
1877                        struct GNUNET_CADET_Channel *channel,
1878                        const struct GNUNET_PeerIdentity *initiator)
1879 {
1880   struct PeerContext *peer_ctx;
1881   struct ChannelCtx *channel_ctx;
1882   struct Sub *sub = cls;
1883
1884   LOG(GNUNET_ERROR_TYPE_DEBUG,
1885       "New channel was established to us (Peer %s).\n",
1886       GNUNET_i2s(initiator));
1887   GNUNET_assert(NULL != channel);  /* according to cadet API */
1888   /* Make sure we 'know' about this peer */
1889   peer_ctx = create_or_get_peer_ctx(sub, initiator);
1890   set_peer_online(peer_ctx);
1891   (void)add_valid_peer(&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1892   channel_ctx = add_channel_ctx(peer_ctx);
1893   channel_ctx->channel = channel;
1894   /* We only accept one incoming channel per peer */
1895   if (GNUNET_YES == check_peer_send_intention(get_peer_ctx(sub->peer_map,
1896                                                            initiator)))
1897     {
1898       LOG(GNUNET_ERROR_TYPE_WARNING,
1899           "Already got one receive channel. Destroying old one.\n");
1900       GNUNET_break_op(0);
1901       destroy_channel(peer_ctx->recv_channel_ctx);
1902       peer_ctx->recv_channel_ctx = channel_ctx;
1903       /* return the channel context */
1904       return channel_ctx;
1905     }
1906   peer_ctx->recv_channel_ctx = channel_ctx;
1907   return channel_ctx;
1908 }
1909
1910
1911 /**
1912  * @brief Check whether a sending channel towards the given peer exists
1913  *
1914  * @param peer_ctx Context of the peer in question
1915  *
1916  * @return #GNUNET_YES if a sending channel towards that peer exists
1917  *         #GNUNET_NO  otherwise
1918  */
1919 static int
1920 check_sending_channel_exists(const struct PeerContext *peer_ctx)
1921 {
1922   if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
1923                                     &peer_ctx->peer_id))
1924     { /* If no such peer exists, there is no channel */
1925       return GNUNET_NO;
1926     }
1927   if (NULL == peer_ctx->send_channel_ctx)
1928     {
1929       return GNUNET_NO;
1930     }
1931   return GNUNET_YES;
1932 }
1933
1934
1935 /**
1936  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1937  *        intention to another peer
1938  *
1939  * @param peer_ctx Context to the peer
1940  * @return #GNUNET_YES if channel was destroyed
1941  *         #GNUNET_NO  otherwise
1942  */
1943 static int
1944 destroy_sending_channel(struct PeerContext *peer_ctx)
1945 {
1946   if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
1947                                     &peer_ctx->peer_id))
1948     {
1949       return GNUNET_NO;
1950     }
1951   if (NULL != peer_ctx->send_channel_ctx)
1952     {
1953       destroy_channel(peer_ctx->send_channel_ctx);
1954       (void)check_connected(peer_ctx);
1955       return GNUNET_YES;
1956     }
1957   return GNUNET_NO;
1958 }
1959
1960 /**
1961  * @brief Send a message to another peer.
1962  *
1963  * Keeps track about pending messages so they can be properly removed when the
1964  * peer is destroyed.
1965  *
1966  * @param peer_ctx Context of the peer to which the message is to be sent
1967  * @param ev envelope of the message
1968  * @param type type of the message
1969  */
1970 static void
1971 send_message(struct PeerContext *peer_ctx,
1972              struct GNUNET_MQ_Envelope *ev,
1973              const char *type)
1974 {
1975   struct PendingMessage *pending_msg;
1976   struct GNUNET_MQ_Handle *mq;
1977
1978   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1979              "Sending message to %s of type %s\n",
1980              GNUNET_i2s(&peer_ctx->peer_id),
1981              type);
1982   pending_msg = insert_pending_message(peer_ctx, ev, type);
1983   mq = get_mq(peer_ctx);
1984   GNUNET_MQ_notify_sent(ev,
1985                         mq_notify_sent_cb,
1986                         pending_msg);
1987   GNUNET_MQ_send(mq, ev);
1988 }
1989
1990 /**
1991  * @brief Schedule a operation on given peer
1992  *
1993  * Avoids scheduling an operation twice.
1994  *
1995  * @param peer_ctx Context of the peer for which to schedule the operation
1996  * @param peer_op the operation to schedule
1997  * @param cls Closure to @a peer_op
1998  *
1999  * @return #GNUNET_YES if the operation was scheduled
2000  *         #GNUNET_NO  otherwise
2001  */
2002 static int
2003 schedule_operation(struct PeerContext *peer_ctx,
2004                    const PeerOp peer_op,
2005                    void *cls)
2006 {
2007   struct PeerPendingOp pending_op;
2008
2009   GNUNET_assert(GNUNET_YES == check_peer_known(peer_ctx->sub->peer_map,
2010                                                &peer_ctx->peer_id));
2011
2012   //TODO if ONLINE execute immediately
2013
2014   if (GNUNET_NO == check_operation_scheduled(peer_ctx, peer_op))
2015     {
2016       pending_op.op = peer_op;
2017       pending_op.op_cls = cls;
2018       GNUNET_array_append(peer_ctx->pending_ops,
2019                           peer_ctx->num_pending_ops,
2020                           pending_op);
2021       return GNUNET_YES;
2022     }
2023   return GNUNET_NO;
2024 }
2025
2026 /***********************************************************************
2027 * /Old gnunet-service-rps_peers.c
2028 ***********************************************************************/
2029
2030
2031 /***********************************************************************
2032 * Housekeeping with clients
2033 ***********************************************************************/
2034
2035 /**
2036  * Closure used to pass the client and the id to the callback
2037  * that replies to a client's request
2038  */
2039 struct ReplyCls {
2040   /**
2041    * DLL
2042    */
2043   struct ReplyCls *next;
2044   struct ReplyCls *prev;
2045
2046   /**
2047    * The identifier of the request
2048    */
2049   uint32_t id;
2050
2051   /**
2052    * The handle to the request
2053    */
2054   struct RPS_SamplerRequestHandle *req_handle;
2055
2056   /**
2057    * The client handle to send the reply to
2058    */
2059   struct ClientContext *cli_ctx;
2060 };
2061
2062
2063 /**
2064  * Struct used to store the context of a connected client.
2065  */
2066 struct ClientContext {
2067   /**
2068    * DLL
2069    */
2070   struct ClientContext *next;
2071   struct ClientContext *prev;
2072
2073   /**
2074    * The message queue to communicate with the client.
2075    */
2076   struct GNUNET_MQ_Handle *mq;
2077
2078   /**
2079    * @brief How many updates this client expects to receive.
2080    */
2081   int64_t view_updates_left;
2082
2083   /**
2084    * @brief Whether this client wants to receive stream updates.
2085    * Either #GNUNET_YES or #GNUNET_NO
2086    */
2087   int8_t stream_update;
2088
2089   /**
2090    * The client handle to send the reply to
2091    */
2092   struct GNUNET_SERVICE_Client *client;
2093
2094   /**
2095    * The #Sub this context belongs to
2096    */
2097   struct Sub *sub;
2098 };
2099
2100 /**
2101  * DLL with all clients currently connected to us
2102  */
2103 struct ClientContext *cli_ctx_head;
2104 struct ClientContext *cli_ctx_tail;
2105
2106 /***********************************************************************
2107 * /Housekeeping with clients
2108 ***********************************************************************/
2109
2110
2111
2112
2113
2114 /***********************************************************************
2115 * Util functions
2116 ***********************************************************************/
2117
2118
2119 /**
2120  * Print peerlist to log.
2121  */
2122 static void
2123 print_peer_list(struct GNUNET_PeerIdentity *list,
2124                 unsigned int len)
2125 {
2126   unsigned int i;
2127
2128   LOG(GNUNET_ERROR_TYPE_DEBUG,
2129       "Printing peer list of length %u at %p:\n",
2130       len,
2131       list);
2132   for (i = 0; i < len; i++)
2133     {
2134       LOG(GNUNET_ERROR_TYPE_DEBUG,
2135           "%u. peer: %s\n",
2136           i, GNUNET_i2s(&list[i]));
2137     }
2138 }
2139
2140
2141 /**
2142  * Remove peer from list.
2143  */
2144 static void
2145 rem_from_list(struct GNUNET_PeerIdentity **peer_list,
2146               unsigned int *list_size,
2147               const struct GNUNET_PeerIdentity *peer)
2148 {
2149   unsigned int i;
2150   struct GNUNET_PeerIdentity *tmp;
2151
2152   tmp = *peer_list;
2153
2154   LOG(GNUNET_ERROR_TYPE_DEBUG,
2155       "Removing peer %s from list at %p\n",
2156       GNUNET_i2s(peer),
2157       tmp);
2158
2159   for (i = 0; i < *list_size; i++)
2160     {
2161       if (0 == GNUNET_memcmp(&tmp[i], peer))
2162         {
2163           if (i < *list_size - 1)
2164             { /* Not at the last entry -- shift peers left */
2165               memmove(&tmp[i], &tmp[i + 1],
2166                       ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
2167             }
2168           /* Remove last entry (should be now useless PeerID) */
2169           GNUNET_array_grow(tmp, *list_size, (*list_size) - 1);
2170         }
2171     }
2172   *peer_list = tmp;
2173 }
2174
2175
2176 /**
2177  * Insert PeerID in #view
2178  *
2179  * Called once we know a peer is online.
2180  * Implements #PeerOp
2181  *
2182  * @return GNUNET_OK if peer was actually inserted
2183  *         GNUNET_NO if peer was not inserted
2184  */
2185 static void
2186 insert_in_view_op(void *cls,
2187                   const struct GNUNET_PeerIdentity *peer);
2188
2189 /**
2190  * Insert PeerID in #view
2191  *
2192  * Called once we know a peer is online.
2193  *
2194  * @param sub Sub in with the view to insert in
2195  * @param peer the peer to insert
2196  *
2197  * @return GNUNET_OK if peer was actually inserted
2198  *         GNUNET_NO if peer was not inserted
2199  */
2200 static int
2201 insert_in_view(struct Sub *sub,
2202                const struct GNUNET_PeerIdentity *peer)
2203 {
2204   struct PeerContext *peer_ctx;
2205   int online;
2206   int ret;
2207
2208   online = check_peer_flag(sub->peer_map, peer, Peers_ONLINE);
2209   peer_ctx = get_peer_ctx(sub->peer_map, peer);  // TODO indirection needed?
2210   if ((GNUNET_NO == online) ||
2211       (GNUNET_SYSERR == online))   /* peer is not even known */
2212     {
2213       (void)issue_peer_online_check(sub, peer);
2214       (void)schedule_operation(peer_ctx, insert_in_view_op, sub);
2215       return GNUNET_NO;
2216     }
2217   /* Open channel towards peer to keep connection open */
2218   indicate_sending_intention(peer_ctx);
2219   ret = View_put(sub->view, peer);
2220   if (peer_ctx->sub == msub)
2221     {
2222       GNUNET_STATISTICS_set(stats,
2223                             "view size",
2224                             View_size(peer_ctx->sub->view),
2225                             GNUNET_NO);
2226     }
2227   return ret;
2228 }
2229
2230
2231 /**
2232  * @brief Send view to client
2233  *
2234  * @param cli_ctx the context of the client
2235  * @param view_array the peerids of the view as array (can be empty)
2236  * @param view_size the size of the view array (can be 0)
2237  */
2238 static void
2239 send_view(const struct ClientContext *cli_ctx,
2240           const struct GNUNET_PeerIdentity *view_array,
2241           uint64_t view_size)
2242 {
2243   struct GNUNET_MQ_Envelope *ev;
2244   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2245   struct Sub *sub;
2246
2247   if (NULL == view_array)
2248     {
2249       if (NULL == cli_ctx->sub)
2250         sub = msub;
2251       else
2252         sub = cli_ctx->sub;
2253       view_size = View_size(sub->view);
2254       view_array = View_get_as_array(sub->view);
2255     }
2256
2257   ev = GNUNET_MQ_msg_extra(out_msg,
2258                            view_size * sizeof(struct GNUNET_PeerIdentity),
2259                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2260   out_msg->num_peers = htonl(view_size);
2261
2262   GNUNET_memcpy(&out_msg[1],
2263                 view_array,
2264                 view_size * sizeof(struct GNUNET_PeerIdentity));
2265   GNUNET_MQ_send(cli_ctx->mq, ev);
2266 }
2267
2268
2269 /**
2270  * @brief Send peer from biased stream to client.
2271  *
2272  * TODO merge with send_view, parameterise
2273  *
2274  * @param cli_ctx the context of the client
2275  * @param view_array the peerids of the view as array (can be empty)
2276  * @param view_size the size of the view array (can be 0)
2277  */
2278 static void
2279 send_stream_peers(const struct ClientContext *cli_ctx,
2280                   uint64_t num_peers,
2281                   const struct GNUNET_PeerIdentity *peers)
2282 {
2283   struct GNUNET_MQ_Envelope *ev;
2284   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2285
2286   GNUNET_assert(NULL != peers);
2287
2288   ev = GNUNET_MQ_msg_extra(out_msg,
2289                            num_peers * sizeof(struct GNUNET_PeerIdentity),
2290                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2291   out_msg->num_peers = htonl(num_peers);
2292
2293   GNUNET_memcpy(&out_msg[1],
2294                 peers,
2295                 num_peers * sizeof(struct GNUNET_PeerIdentity));
2296   GNUNET_MQ_send(cli_ctx->mq, ev);
2297 }
2298
2299
2300 /**
2301  * @brief sends updates to clients that are interested
2302  *
2303  * @param sub Sub for which to notify clients
2304  */
2305 static void
2306 clients_notify_view_update(const struct Sub *sub)
2307 {
2308   struct ClientContext *cli_ctx_iter;
2309   uint64_t num_peers;
2310   const struct GNUNET_PeerIdentity *view_array;
2311
2312   num_peers = View_size(sub->view);
2313   view_array = View_get_as_array(sub->view);
2314   /* check size of view is small enough */
2315   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2316     {
2317       GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
2318                  "View is too big to send\n");
2319       return;
2320     }
2321
2322   for (cli_ctx_iter = cli_ctx_head;
2323        NULL != cli_ctx_iter;
2324        cli_ctx_iter = cli_ctx_iter->next)
2325     {
2326       if (1 < cli_ctx_iter->view_updates_left)
2327         {
2328           /* Client wants to receive limited amount of updates */
2329           cli_ctx_iter->view_updates_left -= 1;
2330         }
2331       else if (1 == cli_ctx_iter->view_updates_left)
2332         {
2333           /* Last update of view for client */
2334           cli_ctx_iter->view_updates_left = -1;
2335         }
2336       else if (0 > cli_ctx_iter->view_updates_left)
2337         {
2338           /* Client is not interested in updates */
2339           continue;
2340         }
2341       /* else _updates_left == 0 - infinite amount of updates */
2342
2343       /* send view */
2344       send_view(cli_ctx_iter, view_array, num_peers);
2345     }
2346 }
2347
2348
2349 /**
2350  * @brief sends updates to clients that are interested
2351  *
2352  * @param num_peers Number of peers to send
2353  * @param peers the array of peers to send
2354  */
2355 static void
2356 clients_notify_stream_peer(const struct Sub *sub,
2357                            uint64_t num_peers,
2358                            const struct GNUNET_PeerIdentity *peers)
2359 // TODO enum StreamPeerSource)
2360 {
2361   struct ClientContext *cli_ctx_iter;
2362
2363   LOG(GNUNET_ERROR_TYPE_DEBUG,
2364       "Got peer (%s) from biased stream - update all clients\n",
2365       GNUNET_i2s(peers));
2366
2367   for (cli_ctx_iter = cli_ctx_head;
2368        NULL != cli_ctx_iter;
2369        cli_ctx_iter = cli_ctx_iter->next)
2370     {
2371       if (GNUNET_YES == cli_ctx_iter->stream_update &&
2372           (sub == cli_ctx_iter->sub || sub == msub))
2373         {
2374           send_stream_peers(cli_ctx_iter, num_peers, peers);
2375         }
2376     }
2377 }
2378
2379
2380 /**
2381  * Put random peer from sampler into the view as history update.
2382  *
2383  * @param ids Array of Peers to insert into view
2384  * @param num_peers Number of peers to insert
2385  * @param cls Closure - The Sub for which this is to be done
2386  */
2387 static void
2388 hist_update(const struct GNUNET_PeerIdentity *ids,
2389             uint32_t num_peers,
2390             void *cls)
2391 {
2392   unsigned int i;
2393   struct Sub *sub = cls;
2394
2395   for (i = 0; i < num_peers; i++)
2396     {
2397       int inserted;
2398       if (GNUNET_YES != check_peer_known(sub->peer_map, &ids[i]))
2399         {
2400           LOG(GNUNET_ERROR_TYPE_WARNING,
2401               "Peer in history update not known!\n");
2402           continue;
2403         }
2404       inserted = insert_in_view(sub, &ids[i]);
2405       if (GNUNET_OK == inserted)
2406         {
2407           clients_notify_stream_peer(sub, 1, &ids[i]);
2408         }
2409 #ifdef TO_FILE_FULL
2410       to_file(sub->file_name_view_log,
2411               "+%s\t(hist)",
2412               GNUNET_i2s_full(ids));
2413 #endif /* TO_FILE_FULL */
2414     }
2415   clients_notify_view_update(sub);
2416 }
2417
2418
2419 /**
2420  * Wrapper around #RPS_sampler_resize()
2421  *
2422  * If we do not have enough sampler elements, double current sampler size
2423  * If we have more than enough sampler elements, halv current sampler size
2424  *
2425  * @param sampler The sampler to resize
2426  * @param new_size New size to which to resize
2427  */
2428 static void
2429 resize_wrapper(struct RPS_Sampler *sampler, uint32_t new_size)
2430 {
2431   unsigned int sampler_size;
2432
2433   // TODO statistics
2434   // TODO respect the min, max
2435   sampler_size = RPS_sampler_get_size(sampler);
2436   if (sampler_size > new_size * 4)
2437     { /* Shrinking */
2438       RPS_sampler_resize(sampler, sampler_size / 2);
2439     }
2440   else if (sampler_size < new_size)
2441     { /* Growing */
2442       RPS_sampler_resize(sampler, sampler_size * 2);
2443     }
2444   LOG(GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2445 }
2446
2447
2448 /**
2449  * Add all peers in @a peer_array to @a peer_map used as set.
2450  *
2451  * @param peer_array array containing the peers
2452  * @param num_peers number of peers in @peer_array
2453  * @param peer_map the peermap to use as set
2454  */
2455 static void
2456 add_peer_array_to_set(const struct GNUNET_PeerIdentity *peer_array,
2457                       unsigned int num_peers,
2458                       struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2459 {
2460   unsigned int i;
2461
2462   if (NULL == peer_map)
2463     {
2464       LOG(GNUNET_ERROR_TYPE_WARNING,
2465           "Trying to add peers to non-existing peermap.\n");
2466       return;
2467     }
2468
2469   for (i = 0; i < num_peers; i++)
2470     {
2471       GNUNET_CONTAINER_multipeermap_put(peer_map,
2472                                         &peer_array[i],
2473                                         NULL,
2474                                         GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2475       if (msub->peer_map == peer_map)
2476         {
2477           GNUNET_STATISTICS_set(stats,
2478                                 "# known peers",
2479                                 GNUNET_CONTAINER_multipeermap_size(peer_map),
2480                                 GNUNET_NO);
2481         }
2482     }
2483 }
2484
2485
2486 /**
2487  * Send a PULL REPLY to @a peer_id
2488  *
2489  * @param peer_ctx Context of the peer to send the reply to
2490  * @param peer_ids the peers to send to @a peer_id
2491  * @param num_peer_ids the number of peers to send to @a peer_id
2492  */
2493 static void
2494 send_pull_reply(struct PeerContext *peer_ctx,
2495                 const struct GNUNET_PeerIdentity *peer_ids,
2496                 unsigned int num_peer_ids)
2497 {
2498   uint32_t send_size;
2499   struct GNUNET_MQ_Envelope *ev;
2500   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2501
2502   /* Compute actual size */
2503   send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) +
2504               num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
2505
2506   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2507     /* Compute number of peers to send
2508      * If too long, simply truncate */
2509     // TODO select random ones via permutation
2510     //      or even better: do good protocol design
2511     send_size =
2512       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2513        sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
2514       sizeof(struct GNUNET_PeerIdentity);
2515   else
2516     send_size = num_peer_ids;
2517
2518   LOG(GNUNET_ERROR_TYPE_DEBUG,
2519       "Going to send PULL REPLY with %u peers to %s\n",
2520       send_size, GNUNET_i2s(&peer_ctx->peer_id));
2521
2522   ev = GNUNET_MQ_msg_extra(out_msg,
2523                            send_size * sizeof(struct GNUNET_PeerIdentity),
2524                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2525   out_msg->num_peers = htonl(send_size);
2526   GNUNET_memcpy(&out_msg[1], peer_ids,
2527                 send_size * sizeof(struct GNUNET_PeerIdentity));
2528
2529   send_message(peer_ctx, ev, "PULL REPLY");
2530   if (peer_ctx->sub == msub)
2531     {
2532       GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2533     }
2534   // TODO check with send intention: as send_channel is used/opened we indicate
2535   // a sending intention without intending it.
2536   // -> clean peer afterwards?
2537   // -> use recv_channel?
2538 }
2539
2540
2541 /**
2542  * Insert PeerID in #pull_map
2543  *
2544  * Called once we know a peer is online.
2545  *
2546  * @param cls Closure - Sub with the pull map to insert into
2547  * @param peer Peer to insert
2548  */
2549 static void
2550 insert_in_pull_map(void *cls,
2551                    const struct GNUNET_PeerIdentity *peer)
2552 {
2553   struct Sub *sub = cls;
2554
2555   CustomPeerMap_put(sub->pull_map, peer);
2556 }
2557
2558
2559 /**
2560  * Insert PeerID in #view
2561  *
2562  * Called once we know a peer is online.
2563  * Implements #PeerOp
2564  *
2565  * @param cls Closure - Sub with view to insert peer into
2566  * @param peer the peer to insert
2567  */
2568 static void
2569 insert_in_view_op(void *cls,
2570                   const struct GNUNET_PeerIdentity *peer)
2571 {
2572   struct Sub *sub = cls;
2573   int inserted;
2574
2575   inserted = insert_in_view(sub, peer);
2576   if (GNUNET_OK == inserted)
2577     {
2578       clients_notify_stream_peer(sub, 1, peer);
2579     }
2580 }
2581
2582
2583 /**
2584  * Update sampler with given PeerID.
2585  * Implements #PeerOp
2586  *
2587  * @param cls Closure - Sub containing the sampler to insert into
2588  * @param peer Peer to insert
2589  */
2590 static void
2591 insert_in_sampler(void *cls,
2592                   const struct GNUNET_PeerIdentity *peer)
2593 {
2594   struct Sub *sub = cls;
2595
2596   LOG(GNUNET_ERROR_TYPE_DEBUG,
2597       "Updating samplers with peer %s from insert_in_sampler()\n",
2598       GNUNET_i2s(peer));
2599   RPS_sampler_update(sub->sampler, peer);
2600   if (0 < RPS_sampler_count_id(sub->sampler, peer))
2601     {
2602       /* Make sure we 'know' about this peer */
2603       (void)issue_peer_online_check(sub, peer);
2604       /* Establish a channel towards that peer to indicate we are going to send
2605        * messages to it */
2606       //indicate_sending_intention (peer);
2607     }
2608   if (sub == msub)
2609     {
2610       GNUNET_STATISTICS_update(stats,
2611                                "# observed peers in gossip",
2612                                1,
2613                                GNUNET_NO);
2614     }
2615 #ifdef TO_FILE
2616   sub->num_observed_peers++;
2617   GNUNET_CONTAINER_multipeermap_put
2618     (sub->observed_unique_peers,
2619     peer,
2620     NULL,
2621     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2622   uint32_t num_observed_unique_peers =
2623     GNUNET_CONTAINER_multipeermap_size(sub->observed_unique_peers);
2624   GNUNET_STATISTICS_set(stats,
2625                         "# unique peers in gossip",
2626                         num_observed_unique_peers,
2627                         GNUNET_NO);
2628 #ifdef TO_FILE_FULL
2629   to_file(sub->file_name_observed_log,
2630           "%" PRIu32 " %" PRIu32 " %f\n",
2631           sub->num_observed_peers,
2632           num_observed_unique_peers,
2633           1.0 * num_observed_unique_peers / sub->num_observed_peers)
2634 #endif /* TO_FILE_FULL */
2635 #endif /* TO_FILE */
2636 }
2637
2638
2639 /**
2640  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2641  *        If the peer is not known, online check is issued and it is
2642  *        scheduled to be inserted in sampler and view.
2643  *
2644  * "External sources" refer to every source except the gossip.
2645  *
2646  * @param sub Sub for which @a peer was received
2647  * @param peer peer to insert/peer received
2648  */
2649 static void
2650 got_peer(struct Sub *sub,
2651          const struct GNUNET_PeerIdentity *peer)
2652 {
2653   /* If we did not know this peer already, insert it into sampler and view */
2654   if (GNUNET_YES == issue_peer_online_check(sub, peer))
2655     {
2656       schedule_operation(get_peer_ctx(sub->peer_map, peer),
2657                          &insert_in_sampler, sub);
2658       schedule_operation(get_peer_ctx(sub->peer_map, peer),
2659                          &insert_in_view_op, sub);
2660     }
2661   if (sub == msub)
2662     {
2663       GNUNET_STATISTICS_update(stats,
2664                                "# learnd peers",
2665                                1,
2666                                GNUNET_NO);
2667     }
2668 }
2669
2670
2671 /**
2672  * @brief Checks if there is a sending channel and if it is needed
2673  *
2674  * @param peer_ctx Context of the peer to check
2675  * @return GNUNET_YES if sending channel exists and is still needed
2676  *         GNUNET_NO  otherwise
2677  */
2678 static int
2679 check_sending_channel_needed(const struct PeerContext *peer_ctx)
2680 {
2681   /* struct GNUNET_CADET_Channel *channel; */
2682   if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
2683                                     &peer_ctx->peer_id))
2684     {
2685       return GNUNET_NO;
2686     }
2687   if (GNUNET_YES == check_sending_channel_exists(peer_ctx))
2688     {
2689       if ((0 < RPS_sampler_count_id(peer_ctx->sub->sampler,
2690                                     &peer_ctx->peer_id)) ||
2691           (GNUNET_YES == View_contains_peer(peer_ctx->sub->view,
2692                                             &peer_ctx->peer_id)) ||
2693           (GNUNET_YES == CustomPeerMap_contains_peer(peer_ctx->sub->push_map,
2694                                                      &peer_ctx->peer_id)) ||
2695           (GNUNET_YES == CustomPeerMap_contains_peer(peer_ctx->sub->pull_map,
2696                                                      &peer_ctx->peer_id)) ||
2697           (GNUNET_YES == check_peer_flag(peer_ctx->sub->peer_map,
2698                                          &peer_ctx->peer_id,
2699                                          Peers_PULL_REPLY_PENDING)))
2700         { /* If we want to keep the connection to peer open */
2701           return GNUNET_YES;
2702         }
2703       return GNUNET_NO;
2704     }
2705   return GNUNET_NO;
2706 }
2707
2708
2709 /**
2710  * @brief remove peer from our knowledge, the view, push and pull maps and
2711  * samplers.
2712  *
2713  * @param sub Sub with the data structures the peer is to be removed from
2714  * @param peer the peer to remove
2715  */
2716 static void
2717 remove_peer(struct Sub *sub,
2718             const struct GNUNET_PeerIdentity *peer)
2719 {
2720   (void)View_remove_peer(sub->view,
2721                          peer);
2722   CustomPeerMap_remove_peer(sub->pull_map,
2723                             peer);
2724   CustomPeerMap_remove_peer(sub->push_map,
2725                             peer);
2726   RPS_sampler_reinitialise_by_value(sub->sampler,
2727                                     peer);
2728   /* We want to destroy the peer now.
2729    * Sometimes, it just seems that it's already been removed from the peer_map,
2730    * so check the peer_map first. */
2731   if (GNUNET_YES == check_peer_known(sub->peer_map,
2732                                      peer))
2733     {
2734       destroy_peer(get_peer_ctx(sub->peer_map,
2735                                 peer));
2736     }
2737 }
2738
2739
2740 /**
2741  * @brief Remove data that is not needed anymore.
2742  *
2743  * If the sending channel is no longer needed it is destroyed.
2744  *
2745  * @param sub Sub in which the current peer is to be cleaned
2746  * @param peer the peer whose data is about to be cleaned
2747  */
2748 static void
2749 clean_peer(struct Sub *sub,
2750            const struct GNUNET_PeerIdentity *peer)
2751 {
2752   if (GNUNET_NO == check_sending_channel_needed(get_peer_ctx(sub->peer_map,
2753                                                              peer)))
2754     {
2755       LOG(GNUNET_ERROR_TYPE_DEBUG,
2756           "Going to remove send channel to peer %s\n",
2757           GNUNET_i2s(peer));
2758     #if ENABLE_MALICIOUS
2759       if (0 != GNUNET_memcmp(&attacked_peer,
2760                              peer))
2761         (void)destroy_sending_channel(get_peer_ctx(sub->peer_map,
2762                                                    peer));
2763     #else /* ENABLE_MALICIOUS */
2764       (void)destroy_sending_channel(get_peer_ctx(sub->peer_map,
2765                                                  peer));
2766     #endif /* ENABLE_MALICIOUS */
2767     }
2768
2769   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(sub->peer_map,
2770                                                           peer))
2771     {
2772       /* Peer was already removed by callback on destroyed channel */
2773       LOG(GNUNET_ERROR_TYPE_WARNING,
2774           "Peer was removed from our knowledge during cleanup\n");
2775       return;
2776     }
2777
2778   if ((GNUNET_NO == check_peer_send_intention(get_peer_ctx(sub->peer_map,
2779                                                            peer))) &&
2780       (GNUNET_NO == View_contains_peer(sub->view, peer)) &&
2781       (GNUNET_NO == CustomPeerMap_contains_peer(sub->push_map, peer)) &&
2782       (GNUNET_NO == CustomPeerMap_contains_peer(sub->push_map, peer)) &&
2783       (0 == RPS_sampler_count_id(sub->sampler, peer)) &&
2784       (GNUNET_YES == check_removable(get_peer_ctx(sub->peer_map, peer))))
2785     { /* We can safely remove this peer */
2786       LOG(GNUNET_ERROR_TYPE_DEBUG,
2787           "Going to remove peer %s\n",
2788           GNUNET_i2s(peer));
2789       remove_peer(sub, peer);
2790       return;
2791     }
2792 }
2793
2794
2795 /**
2796  * @brief This is called when a channel is destroyed.
2797  *
2798  * Removes peer completely from our knowledge if the send_channel was destroyed
2799  * Otherwise simply delete the recv_channel
2800  * Also check if the knowledge about this peer is still needed.
2801  * If not, remove this peer from our knowledge.
2802  *
2803  * @param cls The closure - Context to the channel
2804  * @param channel The channel being closed
2805  */
2806 static void
2807 cleanup_destroyed_channel(void *cls,
2808                           const struct GNUNET_CADET_Channel *channel)
2809 {
2810   struct ChannelCtx *channel_ctx = cls;
2811   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2812
2813   (void)channel;
2814
2815   channel_ctx->channel = NULL;
2816   remove_channel_ctx(channel_ctx);
2817   if (NULL != peer_ctx &&
2818       peer_ctx->send_channel_ctx == channel_ctx &&
2819       GNUNET_YES == check_sending_channel_needed(channel_ctx->peer_ctx))
2820     {
2821       remove_peer(peer_ctx->sub, &peer_ctx->peer_id);
2822     }
2823 }
2824
2825 /***********************************************************************
2826 * /Util functions
2827 ***********************************************************************/
2828
2829
2830
2831 /***********************************************************************
2832 * Sub
2833 ***********************************************************************/
2834
2835 /**
2836  * @brief Create a new Sub
2837  *
2838  * @param hash Hash of value shared among rps instances on other hosts that
2839  *        defines a subgroup to sample from.
2840  * @param sampler_size Size of the sampler
2841  * @param round_interval Interval (in average) between two rounds
2842  *
2843  * @return Sub
2844  */
2845 struct Sub *
2846 new_sub(const struct GNUNET_HashCode *hash,
2847         uint32_t sampler_size,
2848         struct GNUNET_TIME_Relative round_interval)
2849 {
2850   struct Sub *sub;
2851
2852   sub = GNUNET_new(struct Sub);
2853
2854   /* With the hash generated from the secret value this service only connects
2855    * to rps instances that share the value */
2856   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2857     GNUNET_MQ_hd_fixed_size(peer_check,
2858                             GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2859                             struct GNUNET_MessageHeader,
2860                             NULL),
2861     GNUNET_MQ_hd_fixed_size(peer_push,
2862                             GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2863                             struct GNUNET_MessageHeader,
2864                             NULL),
2865     GNUNET_MQ_hd_fixed_size(peer_pull_request,
2866                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2867                             struct GNUNET_MessageHeader,
2868                             NULL),
2869     GNUNET_MQ_hd_var_size(peer_pull_reply,
2870                           GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2871                           struct GNUNET_RPS_P2P_PullReplyMessage,
2872                           NULL),
2873     GNUNET_MQ_handler_end()
2874   };
2875   sub->hash = *hash;
2876   sub->cadet_port =
2877     GNUNET_CADET_open_port(cadet_handle,
2878                            &sub->hash,
2879                            &handle_inbound_channel,  /* Connect handler */
2880                            sub,  /* cls */
2881                            NULL,  /* WindowSize handler */
2882                            &cleanup_destroyed_channel,  /* Disconnect handler */
2883                            cadet_handlers);
2884   if (NULL == sub->cadet_port)
2885     {
2886       LOG(GNUNET_ERROR_TYPE_ERROR,
2887           "Cadet port `%s' is already in use.\n",
2888           GNUNET_APPLICATION_PORT_RPS);
2889       GNUNET_assert(0);
2890     }
2891
2892   /* Set up general data structure to keep track about peers */
2893   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
2894   if (GNUNET_OK !=
2895       GNUNET_CONFIGURATION_get_value_filename(cfg,
2896                                               "rps",
2897                                               "FILENAME_VALID_PEERS",
2898                                               &sub->filename_valid_peers))
2899     {
2900       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
2901                                 "rps",
2902                                 "FILENAME_VALID_PEERS");
2903     }
2904   if (0 != strncmp("DISABLE", sub->filename_valid_peers, 7))
2905     {
2906       char *tmp_filename_valid_peers;
2907       char str_hash[105];
2908
2909       GNUNET_snprintf(str_hash,
2910                       sizeof(str_hash),
2911                       GNUNET_h2s_full(hash));
2912       tmp_filename_valid_peers = sub->filename_valid_peers;
2913       GNUNET_asprintf(&sub->filename_valid_peers,
2914                       "%s%s",
2915                       tmp_filename_valid_peers,
2916                       str_hash);
2917       GNUNET_free(tmp_filename_valid_peers);
2918     }
2919   sub->peer_map = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
2920
2921   /* Set up the sampler */
2922   sub->sampler_size_est_min = sampler_size;
2923   sub->sampler_size_est_need = sampler_size;;
2924   LOG(GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2925   GNUNET_assert(0 != round_interval.rel_value_us);
2926   sub->round_interval = round_interval;
2927   sub->sampler = RPS_sampler_init(sampler_size,
2928                                   round_interval);
2929
2930   /* Logging of internals */
2931 #ifdef TO_FILE_FULL
2932   sub->file_name_view_log = store_prefix_file_name(&own_identity, "view");
2933 #endif /* TO_FILE_FULL */
2934 #ifdef TO_FILE
2935 #ifdef TO_FILE_FULL
2936   sub->file_name_observed_log = store_prefix_file_name(&own_identity,
2937                                                        "observed");
2938 #endif /* TO_FILE_FULL */
2939   sub->num_observed_peers = 0;
2940   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create(1,
2941                                                                     GNUNET_NO);
2942 #endif /* TO_FILE */
2943
2944   /* Set up data structures for gossip */
2945   sub->push_map = CustomPeerMap_create(4);
2946   sub->pull_map = CustomPeerMap_create(4);
2947   sub->view_size_est_min = sampler_size;;
2948   sub->view = View_create(sub->view_size_est_min);
2949   if (sub == msub)
2950     {
2951       GNUNET_STATISTICS_set(stats,
2952                             "view size aim",
2953                             sub->view_size_est_min,
2954                             GNUNET_NO);
2955     }
2956
2957   /* Start executing rounds */
2958   sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_round, sub);
2959
2960   return sub;
2961 }
2962
2963
2964 #ifdef TO_FILE
2965 /**
2966  * @brief Write all numbers in the given array into the given file
2967  *
2968  * Single numbers devided by a newline
2969  *
2970  * @param hist_array[] the array to dump
2971  * @param file_name file to dump into
2972  */
2973 static void
2974 write_histogram_to_file(const uint32_t hist_array[],
2975                         const char *file_name)
2976 {
2977   char collect_str[SIZE_DUMP_FILE + 1] = "";
2978   char *recv_str_iter;
2979   char *file_name_full;
2980
2981   recv_str_iter = collect_str;
2982   file_name_full = store_prefix_file_name(&own_identity,
2983                                           file_name);
2984   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2985     {
2986       char collect_str_tmp[8];
2987
2988       GNUNET_snprintf(collect_str_tmp,
2989                       sizeof(collect_str_tmp),
2990                       "%" PRIu32 "\n",
2991                       hist_array[i]);
2992       recv_str_iter = stpncpy(recv_str_iter,
2993                               collect_str_tmp,
2994                               6);
2995     }
2996   (void)stpcpy(recv_str_iter,
2997                "\n");
2998   LOG(GNUNET_ERROR_TYPE_DEBUG,
2999       "Writing push stats to disk\n");
3000   to_file_w_len(file_name_full,
3001                 SIZE_DUMP_FILE,
3002                 collect_str);
3003   GNUNET_free(file_name_full);
3004 }
3005 #endif /* TO_FILE */
3006
3007
3008 /**
3009  * @brief Destroy Sub.
3010  *
3011  * @param sub Sub to destroy
3012  */
3013 static void
3014 destroy_sub(struct Sub *sub)
3015 {
3016   GNUNET_assert(NULL != sub);
3017   GNUNET_assert(NULL != sub->do_round_task);
3018   GNUNET_SCHEDULER_cancel(sub->do_round_task);
3019   sub->do_round_task = NULL;
3020
3021   /* Disconnect from cadet */
3022   GNUNET_CADET_close_port(sub->cadet_port);
3023   sub->cadet_port = NULL;
3024
3025   /* Clean up data structures for peers */
3026   RPS_sampler_destroy(sub->sampler);
3027   sub->sampler = NULL;
3028   View_destroy(sub->view);
3029   sub->view = NULL;
3030   CustomPeerMap_destroy(sub->push_map);
3031   sub->push_map = NULL;
3032   CustomPeerMap_destroy(sub->pull_map);
3033   sub->pull_map = NULL;
3034   peers_terminate(sub);
3035
3036   /* Free leftover data structures */
3037 #ifdef TO_FILE_FULL
3038   GNUNET_free(sub->file_name_view_log);
3039   sub->file_name_view_log = NULL;
3040 #endif /* TO_FILE_FULL */
3041 #ifdef TO_FILE
3042 #ifdef TO_FILE_FULL
3043   GNUNET_free(sub->file_name_observed_log);
3044   sub->file_name_observed_log = NULL;
3045 #endif /* TO_FILE_FULL */
3046
3047   /* Write push frequencies to disk */
3048   write_histogram_to_file(sub->push_recv,
3049                           "push_recv");
3050
3051   /* Write push deltas to disk */
3052   write_histogram_to_file(sub->push_delta,
3053                           "push_delta");
3054
3055   /* Write pull delays to disk */
3056   write_histogram_to_file(sub->pull_delays,
3057                           "pull_delays");
3058
3059   GNUNET_CONTAINER_multipeermap_destroy(sub->observed_unique_peers);
3060   sub->observed_unique_peers = NULL;
3061 #endif /* TO_FILE */
3062
3063   GNUNET_free(sub);
3064 }
3065
3066
3067 /***********************************************************************
3068 * /Sub
3069 ***********************************************************************/
3070
3071
3072 /***********************************************************************
3073 * Core handlers
3074 ***********************************************************************/
3075
3076 /**
3077  * @brief Callback on initialisation of Core.
3078  *
3079  * @param cls - unused
3080  * @param my_identity - unused
3081  */
3082 void
3083 core_init(void *cls,
3084           const struct GNUNET_PeerIdentity *my_identity)
3085 {
3086   (void)cls;
3087   (void)my_identity;
3088
3089   map_single_hop = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
3090 }
3091
3092
3093 /**
3094  * @brief Callback for core.
3095  * Method called whenever a given peer connects.
3096  *
3097  * @param cls closure - unused
3098  * @param peer peer identity this notification is about
3099  * @return closure given to #core_disconnects as peer_cls
3100  */
3101 void *
3102 core_connects(void *cls,
3103               const struct GNUNET_PeerIdentity *peer,
3104               struct GNUNET_MQ_Handle *mq)
3105 {
3106   (void)cls;
3107   (void)mq;
3108
3109   GNUNET_assert(GNUNET_YES ==
3110                 GNUNET_CONTAINER_multipeermap_put(map_single_hop,
3111                                                   peer,
3112                                                   NULL,
3113                                                   GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3114   return NULL;
3115 }
3116
3117
3118 /**
3119  * @brief Callback for core.
3120  * Method called whenever a peer disconnects.
3121  *
3122  * @param cls closure - unused
3123  * @param peer peer identity this notification is about
3124  * @param peer_cls closure given in #core_connects - unused
3125  */
3126 void
3127 core_disconnects(void *cls,
3128                  const struct GNUNET_PeerIdentity *peer,
3129                  void *peer_cls)
3130 {
3131   (void)cls;
3132   (void)peer_cls;
3133
3134   GNUNET_CONTAINER_multipeermap_remove_all(map_single_hop, peer);
3135 }
3136
3137 /***********************************************************************
3138 * /Core handlers
3139 ***********************************************************************/
3140
3141
3142 /**
3143  * @brief Destroy the context for a (connected) client
3144  *
3145  * @param cli_ctx Context to destroy
3146  */
3147 static void
3148 destroy_cli_ctx(struct ClientContext *cli_ctx)
3149 {
3150   GNUNET_assert(NULL != cli_ctx);
3151   GNUNET_CONTAINER_DLL_remove(cli_ctx_head,
3152                               cli_ctx_tail,
3153                               cli_ctx);
3154   if (NULL != cli_ctx->sub)
3155     {
3156       destroy_sub(cli_ctx->sub);
3157       cli_ctx->sub = NULL;
3158     }
3159   GNUNET_free(cli_ctx);
3160 }
3161
3162
3163 /**
3164  * @brief Update sizes in sampler and view on estimate update from nse service
3165  *
3166  * @param sub Sub
3167  * @param logestimate the log(Base 2) value of the current network size estimate
3168  * @param std_dev standard deviation for the estimate
3169  */
3170 static void
3171 adapt_sizes(struct Sub *sub, double logestimate, double std_dev)
3172 {
3173   double estimate;
3174
3175   //double scale; // TODO this might go gloabal/config
3176
3177   LOG(GNUNET_ERROR_TYPE_DEBUG,
3178       "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3179       logestimate, std_dev, RPS_sampler_get_size(sub->sampler));
3180   //scale = .01;
3181   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
3182   // GNUNET_NSE_log_estimate_to_n (logestimate);
3183   estimate = pow(estimate, 1.0 / 3);
3184   // TODO add if std_dev is a number
3185   // estimate += (std_dev * scale);
3186   if (sub->view_size_est_min < ceil(estimate))
3187     {
3188       LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3189       sub->sampler_size_est_need = estimate;
3190       sub->view_size_est_need = estimate;
3191     }
3192   else
3193     {
3194       LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3195       //sub->sampler_size_est_need = sub->view_size_est_min;
3196       sub->view_size_est_need = sub->view_size_est_min;
3197     }
3198   if (sub == msub)
3199     {
3200       GNUNET_STATISTICS_set(stats,
3201                             "view size aim",
3202                             sub->view_size_est_need,
3203                             GNUNET_NO);
3204     }
3205
3206   /* If the NSE has changed adapt the lists accordingly */
3207   resize_wrapper(sub->sampler, sub->sampler_size_est_need);
3208   View_change_len(sub->view, sub->view_size_est_need);
3209 }
3210
3211
3212 /**
3213  * Function called by NSE.
3214  *
3215  * Updates sizes of sampler list and view and adapt those lists
3216  * accordingly.
3217  *
3218  * implements #GNUNET_NSE_Callback
3219  *
3220  * @param cls Closure - unused
3221  * @param timestamp time when the estimate was received from the server (or created by the server)
3222  * @param logestimate the log(Base 2) value of the current network size estimate
3223  * @param std_dev standard deviation for the estimate
3224  */
3225 static void
3226 nse_callback(void *cls,
3227              struct GNUNET_TIME_Absolute timestamp,
3228              double logestimate, double std_dev)
3229 {
3230   (void)cls;
3231   (void)timestamp;
3232   struct ClientContext *cli_ctx_iter;
3233
3234   adapt_sizes(msub, logestimate, std_dev);
3235   for (cli_ctx_iter = cli_ctx_head;
3236        NULL != cli_ctx_iter;
3237        cli_ctx_iter = cli_ctx_iter->next)
3238     {
3239       if (NULL != cli_ctx_iter->sub)
3240         {
3241           adapt_sizes(cli_ctx_iter->sub, logestimate, std_dev);
3242         }
3243     }
3244 }
3245
3246
3247 /**
3248  * @brief This function is called, when the client seeds peers.
3249  * It verifies that @a msg is well-formed.
3250  *
3251  * @param cls the closure (#ClientContext)
3252  * @param msg the message
3253  * @return #GNUNET_OK if @a msg is well-formed
3254  *         #GNUNET_SYSERR otherwise
3255  */
3256 static int
3257 check_client_seed(void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3258 {
3259   struct ClientContext *cli_ctx = cls;
3260   uint16_t msize = ntohs(msg->header.size);
3261   uint32_t num_peers = ntohl(msg->num_peers);
3262
3263   msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
3264   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3265       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3266     {
3267       LOG(GNUNET_ERROR_TYPE_ERROR,
3268           "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3269           ntohl(msg->num_peers),
3270           (msize / sizeof(struct GNUNET_PeerIdentity)));
3271       GNUNET_break(0);
3272       GNUNET_SERVICE_client_drop(cli_ctx->client);
3273       return GNUNET_SYSERR;
3274     }
3275   return GNUNET_OK;
3276 }
3277
3278
3279 /**
3280  * Handle seed from the client.
3281  *
3282  * @param cls closure
3283  * @param message the actual message
3284  */
3285 static void
3286 handle_client_seed(void *cls,
3287                    const struct GNUNET_RPS_CS_SeedMessage *msg)
3288 {
3289   struct ClientContext *cli_ctx = cls;
3290   struct GNUNET_PeerIdentity *peers;
3291   uint32_t num_peers;
3292   uint32_t i;
3293
3294   num_peers = ntohl(msg->num_peers);
3295   peers = (struct GNUNET_PeerIdentity *)&msg[1];
3296
3297   LOG(GNUNET_ERROR_TYPE_DEBUG,
3298       "Client seeded peers:\n");
3299   print_peer_list(peers, num_peers);
3300
3301   for (i = 0; i < num_peers; i++)
3302     {
3303       LOG(GNUNET_ERROR_TYPE_DEBUG,
3304           "Updating samplers with seed %" PRIu32 ": %s\n",
3305           i,
3306           GNUNET_i2s(&peers[i]));
3307
3308       if (NULL != msub)
3309         got_peer(msub, &peers[i]);                /* Condition needed? */
3310       if (NULL != cli_ctx->sub)
3311         got_peer(cli_ctx->sub, &peers[i]);
3312     }
3313   GNUNET_SERVICE_client_continue(cli_ctx->client);
3314 }
3315
3316
3317 /**
3318  * Handle RPS request from the client.
3319  *
3320  * @param cls Client context
3321  * @param message Message containing the numer of updates the client wants to
3322  * receive
3323  */
3324 static void
3325 handle_client_view_request(void *cls,
3326                            const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3327 {
3328   struct ClientContext *cli_ctx = cls;
3329   uint64_t num_updates;
3330
3331   num_updates = ntohl(msg->num_updates);
3332
3333   LOG(GNUNET_ERROR_TYPE_DEBUG,
3334       "Client requested %" PRIu64 " updates of view.\n",
3335       num_updates);
3336
3337   GNUNET_assert(NULL != cli_ctx);
3338   cli_ctx->view_updates_left = num_updates;
3339   send_view(cli_ctx, NULL, 0);
3340   GNUNET_SERVICE_client_continue(cli_ctx->client);
3341 }
3342
3343
3344 /**
3345  * @brief Handle the cancellation of the view updates.
3346  *
3347  * @param cls The client context
3348  * @param msg Unused
3349  */
3350 static void
3351 handle_client_view_cancel(void *cls,
3352                           const struct GNUNET_MessageHeader *msg)
3353 {
3354   struct ClientContext *cli_ctx = cls;
3355
3356   (void)msg;
3357
3358   LOG(GNUNET_ERROR_TYPE_DEBUG,
3359       "Client does not want to receive updates of view any more.\n");
3360
3361   GNUNET_assert(NULL != cli_ctx);
3362   cli_ctx->view_updates_left = 0;
3363   GNUNET_SERVICE_client_continue(cli_ctx->client);
3364   if (GNUNET_YES == cli_ctx->stream_update)
3365     {
3366       destroy_cli_ctx(cli_ctx);
3367     }
3368 }
3369
3370
3371 /**
3372  * Handle RPS request for biased stream from the client.
3373  *
3374  * @param cls Client context
3375  * @param message unused
3376  */
3377 static void
3378 handle_client_stream_request(void *cls,
3379                              const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3380 {
3381   struct ClientContext *cli_ctx = cls;
3382
3383   (void)msg;
3384
3385   LOG(GNUNET_ERROR_TYPE_DEBUG,
3386       "Client requested peers from biased stream.\n");
3387   cli_ctx->stream_update = GNUNET_YES;
3388
3389   GNUNET_assert(NULL != cli_ctx);
3390   GNUNET_SERVICE_client_continue(cli_ctx->client);
3391 }
3392
3393
3394 /**
3395  * @brief Handles the cancellation of the stream of biased peer ids
3396  *
3397  * @param cls The client context
3398  * @param msg unused
3399  */
3400 static void
3401 handle_client_stream_cancel(void *cls,
3402                             const struct GNUNET_MessageHeader *msg)
3403 {
3404   struct ClientContext *cli_ctx = cls;
3405
3406   (void)msg;
3407
3408   LOG(GNUNET_ERROR_TYPE_DEBUG,
3409       "Client canceled receiving peers from biased stream.\n");
3410   cli_ctx->stream_update = GNUNET_NO;
3411
3412   GNUNET_assert(NULL != cli_ctx);
3413   GNUNET_SERVICE_client_continue(cli_ctx->client);
3414 }
3415
3416
3417 /**
3418  * @brief Create and start a Sub.
3419  *
3420  * @param cls Closure - unused
3421  * @param msg Message containing the necessary information
3422  */
3423 static void
3424 handle_client_start_sub(void *cls,
3425                         const struct GNUNET_RPS_CS_SubStartMessage *msg)
3426 {
3427   struct ClientContext *cli_ctx = cls;
3428
3429   LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3430   if (NULL != cli_ctx->sub &&
3431       0 != memcmp(&cli_ctx->sub->hash,
3432                   &msg->hash,
3433                   sizeof(struct GNUNET_HashCode)))
3434     {
3435       LOG(GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3436       destroy_sub(cli_ctx->sub);
3437       cli_ctx->sub = NULL;
3438     }
3439   cli_ctx->sub = new_sub(&msg->hash,
3440                          msub->sampler_size_est_min, // TODO make api input?
3441                          GNUNET_TIME_relative_ntoh(msg->round_interval));
3442   GNUNET_SERVICE_client_continue(cli_ctx->client);
3443 }
3444
3445
3446 /**
3447  * @brief Destroy the Sub
3448  *
3449  * @param cls Closure - unused
3450  * @param msg Message containing the hash that identifies the Sub
3451  */
3452 static void
3453 handle_client_stop_sub(void *cls,
3454                        const struct GNUNET_RPS_CS_SubStopMessage *msg)
3455 {
3456   struct ClientContext *cli_ctx = cls;
3457
3458   GNUNET_assert(NULL != cli_ctx->sub);
3459   if (0 != memcmp(&cli_ctx->sub->hash, &msg->hash, sizeof(struct GNUNET_HashCode)))
3460     {
3461       LOG(GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3462     }
3463   destroy_sub(cli_ctx->sub);
3464   cli_ctx->sub = NULL;
3465   GNUNET_SERVICE_client_continue(cli_ctx->client);
3466 }
3467
3468
3469 /**
3470  * Handle a CHECK_LIVE message from another peer.
3471  *
3472  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3473  * the channel is blocked for all other communication.
3474  *
3475  * @param cls Closure - Context of channel
3476  * @param msg Message - unused
3477  */
3478 static void
3479 handle_peer_check(void *cls,
3480                   const struct GNUNET_MessageHeader *msg)
3481 {
3482   const struct ChannelCtx *channel_ctx = cls;
3483   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3484
3485   (void)msg;
3486
3487   LOG(GNUNET_ERROR_TYPE_DEBUG,
3488       "Received CHECK_LIVE (%s)\n", GNUNET_i2s(peer));
3489   if (channel_ctx->peer_ctx->sub == msub)
3490     {
3491       GNUNET_STATISTICS_update(stats,
3492                                "# pending online checks",
3493                                -1,
3494                                GNUNET_NO);
3495     }
3496
3497   GNUNET_CADET_receive_done(channel_ctx->channel);
3498 }
3499
3500
3501 /**
3502  * Handle a PUSH message from another peer.
3503  *
3504  * Check the proof of work and store the PeerID
3505  * in the temporary list for pushed PeerIDs.
3506  *
3507  * @param cls Closure - Context of channel
3508  * @param msg Message - unused
3509  */
3510 static void
3511 handle_peer_push(void *cls,
3512                  const struct GNUNET_MessageHeader *msg)
3513 {
3514   const struct ChannelCtx *channel_ctx = cls;
3515   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3516
3517   (void)msg;
3518
3519   // (check the proof of work (?))
3520
3521   LOG(GNUNET_ERROR_TYPE_DEBUG,
3522       "Received PUSH (%s)\n",
3523       GNUNET_i2s(peer));
3524   if (channel_ctx->peer_ctx->sub == msub)
3525     {
3526       GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3527       if (NULL != map_single_hop &&
3528           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3529                                                               peer))
3530         {
3531           GNUNET_STATISTICS_update(stats,
3532                                    "# push message received (multi-hop peer)",
3533                                    1,
3534                                    GNUNET_NO);
3535         }
3536     }
3537
3538   #if ENABLE_MALICIOUS
3539   struct AttackedPeer *tmp_att_peer;
3540
3541   if ((1 == mal_type) ||
3542       (3 == mal_type))
3543     { /* Try to maximise representation */
3544       tmp_att_peer = GNUNET_new(struct AttackedPeer);
3545       tmp_att_peer->peer_id = *peer;
3546       if (NULL == att_peer_set)
3547         att_peer_set = GNUNET_CONTAINER_multipeermap_create(1, GNUNET_NO);
3548       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(att_peer_set,
3549                                                               peer))
3550         {
3551           GNUNET_CONTAINER_DLL_insert(att_peers_head,
3552                                       att_peers_tail,
3553                                       tmp_att_peer);
3554           add_peer_array_to_set(peer, 1, att_peer_set);
3555         }
3556       else
3557         {
3558           GNUNET_free(tmp_att_peer);
3559         }
3560     }
3561
3562
3563   else if (2 == mal_type)
3564     {
3565       /* We attack one single well-known peer - simply ignore */
3566     }
3567   #endif /* ENABLE_MALICIOUS */
3568
3569   /* Add the sending peer to the push_map */
3570   CustomPeerMap_put(channel_ctx->peer_ctx->sub->push_map, peer);
3571
3572   GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3573                                    &channel_ctx->peer_ctx->peer_id));
3574   GNUNET_CADET_receive_done(channel_ctx->channel);
3575 }
3576
3577
3578 /**
3579  * Handle PULL REQUEST request message from another peer.
3580  *
3581  * Reply with the view of PeerIDs.
3582  *
3583  * @param cls Closure - Context of channel
3584  * @param msg Message - unused
3585  */
3586 static void
3587 handle_peer_pull_request(void *cls,
3588                          const struct GNUNET_MessageHeader *msg)
3589 {
3590   const struct ChannelCtx *channel_ctx = cls;
3591   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3592   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3593   const struct GNUNET_PeerIdentity *view_array;
3594
3595   (void)msg;
3596
3597   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s(peer));
3598   if (peer_ctx->sub == msub)
3599     {
3600       GNUNET_STATISTICS_update(stats,
3601                                "# pull request message received",
3602                                1,
3603                                GNUNET_NO);
3604       if (NULL != map_single_hop &&
3605           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3606                                                               &peer_ctx->peer_id))
3607         {
3608           GNUNET_STATISTICS_update(stats,
3609                                    "# pull request message received (multi-hop peer)",
3610                                    1,
3611                                    GNUNET_NO);
3612         }
3613     }
3614
3615   #if ENABLE_MALICIOUS
3616   if (1 == mal_type
3617       || 3 == mal_type)
3618     { /* Try to maximise representation */
3619       send_pull_reply(peer_ctx, mal_peers, num_mal_peers);
3620     }
3621
3622   else if (2 == mal_type)
3623     { /* Try to partition network */
3624       if (0 == GNUNET_memcmp(&attacked_peer, peer))
3625         {
3626           send_pull_reply(peer_ctx, mal_peers, num_mal_peers);
3627         }
3628     }
3629   #endif /* ENABLE_MALICIOUS */
3630
3631   GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3632                                    &channel_ctx->peer_ctx->peer_id));
3633   GNUNET_CADET_receive_done(channel_ctx->channel);
3634   view_array = View_get_as_array(channel_ctx->peer_ctx->sub->view);
3635   send_pull_reply(peer_ctx,
3636                   view_array,
3637                   View_size(channel_ctx->peer_ctx->sub->view));
3638 }
3639
3640
3641 /**
3642  * Check whether we sent a corresponding request and
3643  * whether this reply is the first one.
3644  *
3645  * @param cls Closure - Context of channel
3646  * @param msg Message containing the replied peers
3647  */
3648 static int
3649 check_peer_pull_reply(void *cls,
3650                       const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3651 {
3652   struct ChannelCtx *channel_ctx = cls;
3653   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3654
3655   if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs(msg->header.size))
3656     {
3657       GNUNET_break_op(0);
3658       return GNUNET_SYSERR;
3659     }
3660
3661   if ((ntohs(msg->header.size) - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
3662       sizeof(struct GNUNET_PeerIdentity) != ntohl(msg->num_peers))
3663     {
3664       LOG(GNUNET_ERROR_TYPE_ERROR,
3665           "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3666           ntohl(msg->num_peers),
3667           (ntohs(msg->header.size) - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
3668           sizeof(struct GNUNET_PeerIdentity));
3669       GNUNET_break_op(0);
3670       return GNUNET_SYSERR;
3671     }
3672
3673   if (GNUNET_YES != check_peer_flag(sender_ctx->sub->peer_map,
3674                                     &sender_ctx->peer_id,
3675                                     Peers_PULL_REPLY_PENDING))
3676     {
3677       LOG(GNUNET_ERROR_TYPE_WARNING,
3678           "Received a pull reply from a peer (%s) we didn't request one from!\n",
3679           GNUNET_i2s(&sender_ctx->peer_id));
3680       if (sender_ctx->sub == msub)
3681         {
3682           GNUNET_STATISTICS_update(stats,
3683                                    "# unrequested pull replies",
3684                                    1,
3685                                    GNUNET_NO);
3686         }
3687     }
3688   return GNUNET_OK;
3689 }
3690
3691
3692 /**
3693  * Handle PULL REPLY message from another peer.
3694  *
3695  * @param cls Closure
3696  * @param msg The message header
3697  */
3698 static void
3699 handle_peer_pull_reply(void *cls,
3700                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3701 {
3702   const struct ChannelCtx *channel_ctx = cls;
3703   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3704   const struct GNUNET_PeerIdentity *peers;
3705   struct Sub *sub = channel_ctx->peer_ctx->sub;
3706   uint32_t i;
3707
3708 #if ENABLE_MALICIOUS
3709   struct AttackedPeer *tmp_att_peer;
3710 #endif /* ENABLE_MALICIOUS */
3711
3712   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3713   LOG(GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s(sender));
3714   if (channel_ctx->peer_ctx->sub == msub)
3715     {
3716       GNUNET_STATISTICS_update(stats,
3717                                "# pull reply messages received",
3718                                1,
3719                                GNUNET_NO);
3720       if (NULL != map_single_hop &&
3721           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3722                                                               &channel_ctx->peer_ctx->peer_id))
3723         {
3724           GNUNET_STATISTICS_update(stats,
3725                                    "# pull reply messages received (multi-hop peer)",
3726                                    1,
3727                                    GNUNET_NO);
3728         }
3729     }
3730
3731   #if ENABLE_MALICIOUS
3732   // We shouldn't even receive pull replies as we're not sending
3733   if (2 == mal_type)
3734     {
3735     }
3736   #endif /* ENABLE_MALICIOUS */
3737
3738   /* Do actual logic */
3739   peers = (const struct GNUNET_PeerIdentity *)&msg[1];
3740
3741   LOG(GNUNET_ERROR_TYPE_DEBUG,
3742       "PULL REPLY received, got following %u peers:\n",
3743       ntohl(msg->num_peers));
3744
3745   for (i = 0; i < ntohl(msg->num_peers); i++)
3746     {
3747       LOG(GNUNET_ERROR_TYPE_DEBUG,
3748           "%u. %s\n",
3749           i,
3750           GNUNET_i2s(&peers[i]));
3751
3752     #if ENABLE_MALICIOUS
3753       if ((NULL != att_peer_set) &&
3754           (1 == mal_type || 3 == mal_type))
3755         { /* Add attacked peer to local list */
3756           // TODO check if we sent a request and this was the first reply
3757           if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(att_peer_set,
3758                                                                   &peers[i])
3759               && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(mal_peer_set,
3760                                                                      &peers[i]))
3761             {
3762               tmp_att_peer = GNUNET_new(struct AttackedPeer);
3763               tmp_att_peer->peer_id = peers[i];
3764               GNUNET_CONTAINER_DLL_insert(att_peers_head,
3765                                           att_peers_tail,
3766                                           tmp_att_peer);
3767               add_peer_array_to_set(&peers[i], 1, att_peer_set);
3768             }
3769           continue;
3770         }
3771     #endif /* ENABLE_MALICIOUS */
3772       /* Make sure we 'know' about this peer */
3773       (void)insert_peer(channel_ctx->peer_ctx->sub,
3774                         &peers[i]);
3775
3776       if (GNUNET_YES == check_peer_valid(channel_ctx->peer_ctx->sub->valid_peers,
3777                                          &peers[i]))
3778         {
3779           CustomPeerMap_put(channel_ctx->peer_ctx->sub->pull_map,
3780                             &peers[i]);
3781         }
3782       else
3783         {
3784           schedule_operation(channel_ctx->peer_ctx,
3785                              insert_in_pull_map,
3786                              channel_ctx->peer_ctx->sub); /* cls */
3787           (void)issue_peer_online_check(channel_ctx->peer_ctx->sub,
3788                                         &peers[i]);
3789         }
3790     }
3791
3792   UNSET_PEER_FLAG(get_peer_ctx(channel_ctx->peer_ctx->sub->peer_map,
3793                                sender),
3794                   Peers_PULL_REPLY_PENDING);
3795   clean_peer(channel_ctx->peer_ctx->sub,
3796              sender);
3797
3798   GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3799                                    sender));
3800   GNUNET_CADET_receive_done(channel_ctx->channel);
3801 }
3802
3803
3804 /**
3805  * Compute a random delay.
3806  * A uniformly distributed value between mean + spread and mean - spread.
3807  *
3808  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3809  * It would return a random value between 2 and 6 min.
3810  *
3811  * @param mean the mean time until the next round
3812  * @param spread the inverse amount of deviation from the mean
3813  */
3814 static struct GNUNET_TIME_Relative
3815 compute_rand_delay(struct GNUNET_TIME_Relative mean,
3816                    unsigned int spread)
3817 {
3818   struct GNUNET_TIME_Relative half_interval;
3819   struct GNUNET_TIME_Relative ret;
3820   unsigned int rand_delay;
3821   unsigned int max_rand_delay;
3822
3823   if (0 == spread)
3824     {
3825       LOG(GNUNET_ERROR_TYPE_WARNING,
3826           "Not accepting spread of 0\n");
3827       GNUNET_break(0);
3828       GNUNET_assert(0);
3829     }
3830   GNUNET_assert(0 != mean.rel_value_us);
3831
3832   /* Compute random time value between spread * mean and spread * mean */
3833   half_interval = GNUNET_TIME_relative_divide(mean, spread);
3834
3835   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2 / spread);
3836   /**
3837    * Compute random value between (0 and 1) * round_interval
3838    * via multiplying round_interval with a 'fraction' (0 to value)/value
3839    */
3840   rand_delay = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3841   ret = GNUNET_TIME_relative_saturating_multiply(mean, rand_delay);
3842   ret = GNUNET_TIME_relative_divide(ret, max_rand_delay);
3843   ret = GNUNET_TIME_relative_add(ret, half_interval);
3844
3845   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3846     LOG(GNUNET_ERROR_TYPE_WARNING,
3847         "Returning FOREVER_REL\n");
3848
3849   return ret;
3850 }
3851
3852
3853 /**
3854  * Send single pull request
3855  *
3856  * @param peer_ctx Context to the peer to send request to
3857  */
3858 static void
3859 send_pull_request(struct PeerContext *peer_ctx)
3860 {
3861   struct GNUNET_MQ_Envelope *ev;
3862
3863   GNUNET_assert(GNUNET_NO == check_peer_flag(peer_ctx->sub->peer_map,
3864                                              &peer_ctx->peer_id,
3865                                              Peers_PULL_REPLY_PENDING));
3866   SET_PEER_FLAG(peer_ctx,
3867                 Peers_PULL_REPLY_PENDING);
3868   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3869
3870   LOG(GNUNET_ERROR_TYPE_DEBUG,
3871       "Going to send PULL REQUEST to peer %s.\n",
3872       GNUNET_i2s(&peer_ctx->peer_id));
3873
3874   ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3875   send_message(peer_ctx,
3876                ev,
3877                "PULL REQUEST");
3878   if (peer_ctx->sub)
3879     {
3880       GNUNET_STATISTICS_update(stats,
3881                                "# pull request send issued",
3882                                1,
3883                                GNUNET_NO);
3884       if (NULL != map_single_hop &&
3885           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3886                                                               &peer_ctx->peer_id))
3887         {
3888           GNUNET_STATISTICS_update(stats,
3889                                    "# pull request send issued (multi-hop peer)",
3890                                    1,
3891                                    GNUNET_NO);
3892         }
3893     }
3894 }
3895
3896
3897 /**
3898  * Send single push
3899  *
3900  * @param peer_ctx Context of peer to send push to
3901  */
3902 static void
3903 send_push(struct PeerContext *peer_ctx)
3904 {
3905   struct GNUNET_MQ_Envelope *ev;
3906
3907   LOG(GNUNET_ERROR_TYPE_DEBUG,
3908       "Going to send PUSH to peer %s.\n",
3909       GNUNET_i2s(&peer_ctx->peer_id));
3910
3911   ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3912   send_message(peer_ctx, ev, "PUSH");
3913   if (peer_ctx->sub)
3914     {
3915       GNUNET_STATISTICS_update(stats,
3916                                "# push send issued",
3917                                1,
3918                                GNUNET_NO);
3919       if (NULL != map_single_hop &&
3920           GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3921                                                               &peer_ctx->peer_id))
3922         {
3923           GNUNET_STATISTICS_update(stats,
3924                                    "# push send issued (multi-hop peer)",
3925                                    1,
3926                                    GNUNET_NO);
3927         }
3928     }
3929 }
3930
3931
3932 #if ENABLE_MALICIOUS
3933
3934
3935 /**
3936  * @brief This function is called, when the client tells us to act malicious.
3937  * It verifies that @a msg is well-formed.
3938  *
3939  * @param cls the closure (#ClientContext)
3940  * @param msg the message
3941  * @return #GNUNET_OK if @a msg is well-formed
3942  */
3943 static int
3944 check_client_act_malicious(void *cls,
3945                            const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3946 {
3947   struct ClientContext *cli_ctx = cls;
3948   uint16_t msize = ntohs(msg->header.size);
3949   uint32_t num_peers = ntohl(msg->num_peers);
3950
3951   msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
3952   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3953       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3954     {
3955       LOG(GNUNET_ERROR_TYPE_ERROR,
3956           "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3957           ntohl(msg->num_peers),
3958           (msize / sizeof(struct GNUNET_PeerIdentity)));
3959       GNUNET_break(0);
3960       GNUNET_SERVICE_client_drop(cli_ctx->client);
3961       return GNUNET_SYSERR;
3962     }
3963   return GNUNET_OK;
3964 }
3965
3966 /**
3967  * Turn RPS service to act malicious.
3968  *
3969  * @param cls Closure
3970  * @param client The client that sent the message
3971  * @param msg The message header
3972  */
3973 static void
3974 handle_client_act_malicious(void *cls,
3975                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3976 {
3977   struct ClientContext *cli_ctx = cls;
3978   struct GNUNET_PeerIdentity *peers;
3979   uint32_t num_mal_peers_sent;
3980   uint32_t num_mal_peers_old;
3981   struct Sub *sub = cli_ctx->sub;
3982
3983   if (NULL == sub)
3984     sub = msub;
3985   /* Do actual logic */
3986   peers = (struct GNUNET_PeerIdentity *)&msg[1];
3987   mal_type = ntohl(msg->type);
3988   if (NULL == mal_peer_set)
3989     mal_peer_set = GNUNET_CONTAINER_multipeermap_create(1, GNUNET_NO);
3990
3991   LOG(GNUNET_ERROR_TYPE_DEBUG,
3992       "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3993       mal_type,
3994       ntohl(msg->num_peers));
3995
3996   if (1 == mal_type)
3997     { /* Try to maximise representation */
3998       /* Add other malicious peers to those we already know */
3999
4000       num_mal_peers_sent = ntohl(msg->num_peers);
4001       num_mal_peers_old = num_mal_peers;
4002       GNUNET_array_grow(mal_peers,
4003                         num_mal_peers,
4004                         num_mal_peers + num_mal_peers_sent);
4005       GNUNET_memcpy(&mal_peers[num_mal_peers_old],
4006                     peers,
4007                     num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4008
4009       /* Add all mal peers to mal_peer_set */
4010       add_peer_array_to_set(&mal_peers[num_mal_peers_old],
4011                             num_mal_peers_sent,
4012                             mal_peer_set);
4013
4014       /* Substitute do_round () with do_mal_round () */
4015       GNUNET_assert(NULL != sub->do_round_task);
4016       GNUNET_SCHEDULER_cancel(sub->do_round_task);
4017       sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_mal_round, sub);
4018     }
4019
4020   else if ((2 == mal_type) ||
4021            (3 == mal_type))
4022     { /* Try to partition the network */
4023       /* Add other malicious peers to those we already know */
4024
4025       num_mal_peers_sent = ntohl(msg->num_peers) - 1;
4026       num_mal_peers_old = num_mal_peers;
4027       GNUNET_assert(GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4028       GNUNET_array_grow(mal_peers,
4029                         num_mal_peers,
4030                         num_mal_peers + num_mal_peers_sent);
4031       if (NULL != mal_peers &&
4032           0 != num_mal_peers)
4033         {
4034           GNUNET_memcpy(&mal_peers[num_mal_peers_old],
4035                         peers,
4036                         num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4037
4038           /* Add all mal peers to mal_peer_set */
4039           add_peer_array_to_set(&mal_peers[num_mal_peers_old],
4040                                 num_mal_peers_sent,
4041                                 mal_peer_set);
4042         }
4043
4044       /* Store the one attacked peer */
4045       GNUNET_memcpy(&attacked_peer,
4046                     &msg->attacked_peer,
4047                     sizeof(struct GNUNET_PeerIdentity));
4048       /* Set the flag of the attacked peer to valid to avoid problems */
4049       if (GNUNET_NO == check_peer_known(sub->peer_map, &attacked_peer))
4050         {
4051           (void)issue_peer_online_check(sub, &attacked_peer);
4052         }
4053
4054       LOG(GNUNET_ERROR_TYPE_DEBUG,
4055           "Attacked peer is %s\n",
4056           GNUNET_i2s(&attacked_peer));
4057
4058       /* Substitute do_round () with do_mal_round () */
4059       if (NULL != sub->do_round_task)
4060         {
4061           /* Probably in shutdown */
4062           GNUNET_SCHEDULER_cancel(sub->do_round_task);
4063           sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_mal_round, sub);
4064         }
4065     }
4066   else if (0 == mal_type)
4067     { /* Stop acting malicious */
4068       GNUNET_array_grow(mal_peers, num_mal_peers, 0);
4069
4070       /* Substitute do_mal_round () with do_round () */
4071       GNUNET_SCHEDULER_cancel(sub->do_round_task);
4072       sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_round, sub);
4073     }
4074   else
4075     {
4076       GNUNET_break(0);
4077       GNUNET_SERVICE_client_continue(cli_ctx->client);
4078     }
4079   GNUNET_SERVICE_client_continue(cli_ctx->client);
4080 }
4081
4082
4083 /**
4084  * Send out PUSHes and PULLs maliciously.
4085  *
4086  * This is executed regylary.
4087  *
4088  * @param cls Closure - Sub
4089  */
4090 static void
4091 do_mal_round(void *cls)
4092 {
4093   uint32_t num_pushes;
4094   uint32_t i;
4095   struct GNUNET_TIME_Relative time_next_round;
4096   struct AttackedPeer *tmp_att_peer;
4097   struct Sub *sub = cls;
4098
4099   LOG(GNUNET_ERROR_TYPE_DEBUG,
4100       "Going to execute next round maliciously type %" PRIu32 ".\n",
4101       mal_type);
4102   sub->do_round_task = NULL;
4103   GNUNET_assert(mal_type <= 3);
4104   /* Do malicious actions */
4105   if (1 == mal_type)
4106     { /* Try to maximise representation */
4107       /* The maximum of pushes we're going to send this round */
4108       num_pushes = GNUNET_MIN(GNUNET_MIN(push_limit,
4109                                          num_attacked_peers),
4110                               GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4111
4112       LOG(GNUNET_ERROR_TYPE_DEBUG,
4113           "Going to send %" PRIu32 " pushes\n",
4114           num_pushes);
4115
4116       /* Send PUSHes to attacked peers */
4117       for (i = 0; i < num_pushes; i++)
4118         {
4119           if (att_peers_tail == att_peer_index)
4120             att_peer_index = att_peers_head;
4121           else
4122             att_peer_index = att_peer_index->next;
4123
4124           send_push(get_peer_ctx(sub->peer_map, &att_peer_index->peer_id));
4125         }
4126
4127       /* Send PULLs to some peers to learn about additional peers to attack */
4128       tmp_att_peer = att_peer_index;
4129       for (i = 0; i < num_pushes * alpha; i++)
4130         {
4131           if (att_peers_tail == tmp_att_peer)
4132             tmp_att_peer = att_peers_head;
4133           else
4134             att_peer_index = tmp_att_peer->next;
4135
4136           send_pull_request(get_peer_ctx(sub->peer_map, &tmp_att_peer->peer_id));
4137         }
4138     }
4139
4140
4141   else if (2 == mal_type)
4142     { /**
4143        * Try to partition the network
4144        * Send as many pushes to the attacked peer as possible
4145        * That is one push per round as it will ignore more.
4146        */
4147       (void)issue_peer_online_check(sub, &attacked_peer);
4148       if (GNUNET_YES == check_peer_flag(sub->peer_map,
4149                                         &attacked_peer,
4150                                         Peers_ONLINE))
4151         send_push(get_peer_ctx(sub->peer_map, &attacked_peer));
4152     }
4153
4154
4155   if (3 == mal_type)
4156     { /* Combined attack */
4157       /* Send PUSH to attacked peers */
4158       if (GNUNET_YES == check_peer_known(sub->peer_map, &attacked_peer))
4159         {
4160           (void)issue_peer_online_check(sub, &attacked_peer);
4161           if (GNUNET_YES == check_peer_flag(sub->peer_map,
4162                                             &attacked_peer,
4163                                             Peers_ONLINE))
4164             {
4165               LOG(GNUNET_ERROR_TYPE_DEBUG,
4166                   "Goding to send push to attacked peer (%s)\n",
4167                   GNUNET_i2s(&attacked_peer));
4168               send_push(get_peer_ctx(sub->peer_map, &attacked_peer));
4169             }
4170         }
4171       (void)issue_peer_online_check(sub, &attacked_peer);
4172
4173       /* The maximum of pushes we're going to send this round */
4174       num_pushes = GNUNET_MIN(GNUNET_MIN(push_limit - 1,
4175                                          num_attacked_peers),
4176                               GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4177
4178       LOG(GNUNET_ERROR_TYPE_DEBUG,
4179           "Going to send %" PRIu32 " pushes\n",
4180           num_pushes);
4181
4182       for (i = 0; i < num_pushes; i++)
4183         {
4184           if (att_peers_tail == att_peer_index)
4185             att_peer_index = att_peers_head;
4186           else
4187             att_peer_index = att_peer_index->next;
4188
4189           send_push(get_peer_ctx(sub->peer_map, &att_peer_index->peer_id));
4190         }
4191
4192       /* Send PULLs to some peers to learn about additional peers to attack */
4193       tmp_att_peer = att_peer_index;
4194       for (i = 0; i < num_pushes * alpha; i++)
4195         {
4196           if (att_peers_tail == tmp_att_peer)
4197             tmp_att_peer = att_peers_head;
4198           else
4199             att_peer_index = tmp_att_peer->next;
4200
4201           send_pull_request(get_peer_ctx(sub->peer_map, &tmp_att_peer->peer_id));
4202         }
4203     }
4204
4205   /* Schedule next round */
4206   time_next_round = compute_rand_delay(sub->round_interval, 2);
4207
4208   GNUNET_assert(NULL == sub->do_round_task);
4209   sub->do_round_task = GNUNET_SCHEDULER_add_delayed(time_next_round,
4210                                                     &do_mal_round, sub);
4211   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4212 }
4213 #endif /* ENABLE_MALICIOUS */
4214
4215
4216 /**
4217  * Send out PUSHes and PULLs, possibly update #view, samplers.
4218  *
4219  * This is executed regylary.
4220  *
4221  * @param cls Closure - Sub
4222  */
4223 static void
4224 do_round(void *cls)
4225 {
4226   unsigned int i;
4227   const struct GNUNET_PeerIdentity *view_array;
4228   unsigned int *permut;
4229   unsigned int a_peers; /* Number of peers we send pushes to */
4230   unsigned int b_peers; /* Number of peers we send pull requests to */
4231   uint32_t first_border;
4232   uint32_t second_border;
4233   struct GNUNET_PeerIdentity peer;
4234   struct GNUNET_PeerIdentity *update_peer;
4235   struct Sub *sub = cls;
4236
4237   sub->num_rounds++;
4238   LOG(GNUNET_ERROR_TYPE_DEBUG,
4239       "Going to execute next round.\n");
4240   if (sub == msub)
4241     {
4242       GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
4243     }
4244   sub->do_round_task = NULL;
4245 #ifdef TO_FILE_FULL
4246   to_file(sub->file_name_view_log,
4247           "___ new round ___");
4248 #endif /* TO_FILE_FULL */
4249   view_array = View_get_as_array(sub->view);
4250   for (i = 0; i < View_size(sub->view); i++)
4251     {
4252       LOG(GNUNET_ERROR_TYPE_DEBUG,
4253           "\t%s\n", GNUNET_i2s(&view_array[i]));
4254 #ifdef TO_FILE_FULL
4255       to_file(sub->file_name_view_log,
4256               "=%s\t(do round)",
4257               GNUNET_i2s_full(&view_array[i]));
4258 #endif /* TO_FILE_FULL */
4259     }
4260
4261
4262   /* Send pushes and pull requests */
4263   if (0 < View_size(sub->view))
4264     {
4265       permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4266                                             View_size(sub->view));
4267
4268       /* Send PUSHes */
4269       a_peers = ceil(alpha * View_size(sub->view));
4270
4271       LOG(GNUNET_ERROR_TYPE_DEBUG,
4272           "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4273           a_peers, alpha, View_size(sub->view));
4274       for (i = 0; i < a_peers; i++)
4275         {
4276           peer = view_array[permut[i]];
4277           // FIXME if this fails schedule/loop this for later
4278           send_push(get_peer_ctx(sub->peer_map, &peer));
4279         }
4280
4281       /* Send PULL requests */
4282       b_peers = ceil(beta * View_size(sub->view));
4283       first_border = a_peers;
4284       second_border = a_peers + b_peers;
4285       if (second_border > View_size(sub->view))
4286         {
4287           first_border = View_size(sub->view) - b_peers;
4288           second_border = View_size(sub->view);
4289         }
4290       LOG(GNUNET_ERROR_TYPE_DEBUG,
4291           "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4292           b_peers, beta, View_size(sub->view));
4293       for (i = first_border; i < second_border; i++)
4294         {
4295           peer = view_array[permut[i]];
4296           if (GNUNET_NO == check_peer_flag(sub->peer_map,
4297                                            &peer,
4298                                            Peers_PULL_REPLY_PENDING))
4299             { // FIXME if this fails schedule/loop this for later
4300               send_pull_request(get_peer_ctx(sub->peer_map, &peer));
4301             }
4302         }
4303
4304       GNUNET_free(permut);
4305       permut = NULL;
4306     }
4307
4308
4309   /* Update view */
4310   /* TODO see how many peers are in push-/pull- list! */
4311
4312   if ((CustomPeerMap_size(sub->push_map) <= alpha * sub->view_size_est_need) &&
4313       (0 < CustomPeerMap_size(sub->push_map)) &&
4314       (0 < CustomPeerMap_size(sub->pull_map)))
4315     { /* If conditions for update are fulfilled, update */
4316       LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4317
4318       uint32_t final_size;
4319       uint32_t peers_to_clean_size;
4320       struct GNUNET_PeerIdentity *peers_to_clean;
4321
4322       peers_to_clean = NULL;
4323       peers_to_clean_size = 0;
4324       GNUNET_array_grow(peers_to_clean,
4325                         peers_to_clean_size,
4326                         View_size(sub->view));
4327       GNUNET_memcpy(peers_to_clean,
4328                     view_array,
4329                     View_size(sub->view) * sizeof(struct GNUNET_PeerIdentity));
4330
4331       /* Seems like recreating is the easiest way of emptying the peermap */
4332       View_clear(sub->view);
4333 #ifdef TO_FILE_FULL
4334       to_file(sub->file_name_view_log,
4335               "--- emptied ---");
4336 #endif /* TO_FILE_FULL */
4337
4338       first_border = GNUNET_MIN(ceil(alpha * sub->view_size_est_need),
4339                                 CustomPeerMap_size(sub->push_map));
4340       second_border = first_border +
4341                       GNUNET_MIN(floor(beta * sub->view_size_est_need),
4342                                  CustomPeerMap_size(sub->pull_map));
4343       final_size = second_border +
4344                    ceil((1 - (alpha + beta)) * sub->view_size_est_need);
4345       LOG(GNUNET_ERROR_TYPE_DEBUG,
4346           "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %" PRIu32 "\n",
4347           first_border,
4348           second_border,
4349           final_size);
4350
4351       /* Update view with peers received through PUSHes */
4352       permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4353                                             CustomPeerMap_size(sub->push_map));
4354       for (i = 0; i < first_border; i++)
4355         {
4356           int inserted;
4357           inserted = insert_in_view(sub,
4358                                     CustomPeerMap_get_peer_by_index(sub->push_map,
4359                                                                     permut[i]));
4360           if (GNUNET_OK == inserted)
4361             {
4362               clients_notify_stream_peer(sub,
4363                                          1,
4364                                          CustomPeerMap_get_peer_by_index(sub->push_map, permut[i]));
4365             }
4366 #ifdef TO_FILE_FULL
4367           to_file(sub->file_name_view_log,
4368                   "+%s\t(push list)",
4369                   GNUNET_i2s_full(&view_array[i]));
4370 #endif /* TO_FILE_FULL */
4371        // TODO change the peer_flags accordingly
4372         }
4373       GNUNET_free(permut);
4374       permut = NULL;
4375
4376       /* Update view with peers received through PULLs */
4377       permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4378                                             CustomPeerMap_size(sub->pull_map));
4379       for (i = first_border; i < second_border; i++)
4380         {
4381           int inserted;
4382           inserted = insert_in_view(sub,
4383                                     CustomPeerMap_get_peer_by_index(sub->pull_map,
4384                                                                     permut[i - first_border]));
4385           if (GNUNET_OK == inserted)
4386             {
4387               clients_notify_stream_peer(sub,
4388                                          1,
4389                                          CustomPeerMap_get_peer_by_index(sub->pull_map,
4390                                                                          permut[i - first_border]));
4391             }
4392 #ifdef TO_FILE_FULL
4393           to_file(sub->file_name_view_log,
4394                   "+%s\t(pull list)",
4395                   GNUNET_i2s_full(&view_array[i]));
4396 #endif /* TO_FILE_FULL */
4397        // TODO change the peer_flags accordingly
4398         }
4399       GNUNET_free(permut);
4400       permut = NULL;
4401
4402       /* Update view with peers from history */
4403       RPS_sampler_get_n_rand_peers(sub->sampler,
4404                                    final_size - second_border,
4405                                    hist_update,
4406                                    sub);
4407       // TODO change the peer_flags accordingly
4408
4409       for (i = 0; i < View_size(sub->view); i++)
4410         rem_from_list(&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4411
4412       /* Clean peers that were removed from the view */
4413       for (i = 0; i < peers_to_clean_size; i++)
4414         {
4415 #ifdef TO_FILE_FULL
4416           to_file(sub->file_name_view_log,
4417                   "-%s",
4418                   GNUNET_i2s_full(&peers_to_clean[i]));
4419 #endif /* TO_FILE_FULL */
4420           clean_peer(sub, &peers_to_clean[i]);
4421         }
4422
4423       GNUNET_array_grow(peers_to_clean, peers_to_clean_size, 0);
4424       clients_notify_view_update(sub);
4425     }
4426   else
4427     {
4428       LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4429       if (sub == msub)
4430         {
4431           GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4432           if (CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4433               !(0 >= CustomPeerMap_size(sub->pull_map)))
4434             GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4435           if (CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4436               (0 >= CustomPeerMap_size(sub->pull_map)))
4437             GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4438           if (0 >= CustomPeerMap_size(sub->push_map) &&
4439               !(0 >= CustomPeerMap_size(sub->pull_map)))
4440             GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4441           if (0 >= CustomPeerMap_size(sub->push_map) &&
4442               (0 >= CustomPeerMap_size(sub->pull_map)))
4443             GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4444           if (0 >= CustomPeerMap_size(sub->pull_map) &&
4445               CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4446               0 >= CustomPeerMap_size(sub->push_map))
4447             GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4448         }
4449     }
4450   // TODO independent of that also get some peers from CADET_get_peers()?
4451   if (CustomPeerMap_size(sub->push_map) < HISTOGRAM_FILE_SLOTS)
4452     {
4453       sub->push_recv[CustomPeerMap_size(sub->push_map)]++;
4454     }
4455   else
4456     {
4457       LOG(GNUNET_ERROR_TYPE_WARNING,
4458           "Push map size too big for histogram (%u, %u)\n",
4459           CustomPeerMap_size(sub->push_map),
4460           HISTOGRAM_FILE_SLOTS);
4461     }
4462   // FIXME check bounds of histogram
4463   sub->push_delta[(int32_t)(CustomPeerMap_size(sub->push_map) -
4464                             (alpha * sub->view_size_est_need)) +
4465                   (HISTOGRAM_FILE_SLOTS / 2)]++;
4466   if (sub == msub)
4467     {
4468       GNUNET_STATISTICS_set(stats,
4469                             "# peers in push map at end of round",
4470                             CustomPeerMap_size(sub->push_map),
4471                             GNUNET_NO);
4472       GNUNET_STATISTICS_set(stats,
4473                             "# peers in pull map at end of round",
4474                             CustomPeerMap_size(sub->pull_map),
4475                             GNUNET_NO);
4476       GNUNET_STATISTICS_set(stats,
4477                             "# peers in view at end of round",
4478                             View_size(sub->view),
4479                             GNUNET_NO);
4480       GNUNET_STATISTICS_set(stats,
4481                             "# expected pushes",
4482                             alpha * sub->view_size_est_need,
4483                             GNUNET_NO);
4484       GNUNET_STATISTICS_set(stats,
4485                             "delta expected - received pushes",
4486                             CustomPeerMap_size(sub->push_map) - (alpha * sub->view_size_est_need),
4487                             GNUNET_NO);
4488     }
4489
4490   LOG(GNUNET_ERROR_TYPE_DEBUG,
4491       "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4492       CustomPeerMap_size(sub->push_map),
4493       CustomPeerMap_size(sub->pull_map),
4494       alpha,
4495       View_size(sub->view),
4496       alpha * View_size(sub->view));
4497
4498   /* Update samplers */
4499   for (i = 0; i < CustomPeerMap_size(sub->push_map); i++)
4500     {
4501       update_peer = CustomPeerMap_get_peer_by_index(sub->push_map, i);
4502       LOG(GNUNET_ERROR_TYPE_DEBUG,
4503           "Updating with peer %s from push list\n",
4504           GNUNET_i2s(update_peer));
4505       insert_in_sampler(sub, update_peer);
4506       clean_peer(sub, update_peer); /* This cleans only if it is not in the view */
4507     }
4508
4509   for (i = 0; i < CustomPeerMap_size(sub->pull_map); i++)
4510     {
4511       LOG(GNUNET_ERROR_TYPE_DEBUG,
4512           "Updating with peer %s from pull list\n",
4513           GNUNET_i2s(CustomPeerMap_get_peer_by_index(sub->pull_map, i)));
4514       insert_in_sampler(sub, CustomPeerMap_get_peer_by_index(sub->pull_map, i));
4515       /* This cleans only if it is not in the view */
4516       clean_peer(sub, CustomPeerMap_get_peer_by_index(sub->pull_map, i));
4517     }
4518
4519
4520   /* Empty push/pull lists */
4521   CustomPeerMap_clear(sub->push_map);
4522   CustomPeerMap_clear(sub->pull_map);
4523
4524   if (sub == msub)
4525     {
4526       GNUNET_STATISTICS_set(stats,
4527                             "view size",
4528                             View_size(sub->view),
4529                             GNUNET_NO);
4530     }
4531
4532   struct GNUNET_TIME_Relative time_next_round;
4533
4534   time_next_round = compute_rand_delay(sub->round_interval, 2);
4535
4536   /* Schedule next round */
4537   sub->do_round_task = GNUNET_SCHEDULER_add_delayed(time_next_round,
4538                                                     &do_round, sub);
4539   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4540 }
4541
4542
4543 /**
4544  * This is called from GNUNET_CADET_get_peers().
4545  *
4546  * It is called on every peer(ID) that cadet somehow has contact with.
4547  * We use those to initialise the sampler.
4548  *
4549  * implements #GNUNET_CADET_PeersCB
4550  *
4551  * @param cls Closure - Sub
4552  * @param peer Peer, or NULL on "EOF".
4553  * @param tunnel Do we have a tunnel towards this peer?
4554  * @param n_paths Number of known paths towards this peer.
4555  * @param best_path How long is the best path?
4556  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4557  */
4558 void
4559 init_peer_cb(void *cls,
4560              const struct GNUNET_PeerIdentity *peer,
4561              int tunnel,  /* "Do we have a tunnel towards this peer?" */
4562              unsigned int n_paths,  /* "Number of known paths towards this peer" */
4563              unsigned int best_path)  /* "How long is the best path?
4564                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4565 {
4566   struct Sub *sub = cls;
4567
4568   (void)tunnel;
4569   (void)n_paths;
4570   (void)best_path;
4571
4572   if (NULL != peer)
4573     {
4574       LOG(GNUNET_ERROR_TYPE_DEBUG,
4575           "Got peer_id %s from cadet\n",
4576           GNUNET_i2s(peer));
4577       got_peer(sub, peer);
4578     }
4579 }
4580
4581
4582 /**
4583  * @brief Iterator function over stored, valid peers.
4584  *
4585  * We initialise the sampler with those.
4586  *
4587  * @param cls Closure - Sub
4588  * @param peer the peer id
4589  * @return #GNUNET_YES if we should continue to
4590  *         iterate,
4591  *         #GNUNET_NO if not.
4592  */
4593 static int
4594 valid_peers_iterator(void *cls,
4595                      const struct GNUNET_PeerIdentity *peer)
4596 {
4597   struct Sub *sub = cls;
4598
4599   if (NULL != peer)
4600     {
4601       LOG(GNUNET_ERROR_TYPE_DEBUG,
4602           "Got stored, valid peer %s\n",
4603           GNUNET_i2s(peer));
4604       got_peer(sub, peer);
4605     }
4606   return GNUNET_YES;
4607 }
4608
4609
4610 /**
4611  * Iterator over peers from peerinfo.
4612  *
4613  * @param cls Closure - Sub
4614  * @param peer id of the peer, NULL for last call
4615  * @param hello hello message for the peer (can be NULL)
4616  * @param error message
4617  */
4618 void
4619 process_peerinfo_peers(void *cls,
4620                        const struct GNUNET_PeerIdentity *peer,
4621                        const struct GNUNET_HELLO_Message *hello,
4622                        const char *err_msg)
4623 {
4624   struct Sub *sub = cls;
4625
4626   (void)hello;
4627   (void)err_msg;
4628
4629   if (NULL != peer)
4630     {
4631       LOG(GNUNET_ERROR_TYPE_DEBUG,
4632           "Got peer_id %s from peerinfo\n",
4633           GNUNET_i2s(peer));
4634       got_peer(sub, peer);
4635     }
4636 }
4637
4638
4639 /**
4640  * Task run during shutdown.
4641  *
4642  * @param cls Closure - unused
4643  */
4644 static void
4645 shutdown_task(void *cls)
4646 {
4647   (void)cls;
4648   struct ClientContext *client_ctx;
4649
4650   LOG(GNUNET_ERROR_TYPE_DEBUG,
4651       "RPS service is going down\n");
4652
4653   /* Clean all clients */
4654   for (client_ctx = cli_ctx_head;
4655        NULL != cli_ctx_head;
4656        client_ctx = cli_ctx_head)
4657     {
4658       destroy_cli_ctx(client_ctx);
4659     }
4660   if (NULL != msub)
4661     {
4662       destroy_sub(msub);
4663       msub = NULL;
4664     }
4665
4666   /* Disconnect from other services */
4667   GNUNET_PEERINFO_notify_cancel(peerinfo_notify_handle);
4668   GNUNET_PEERINFO_disconnect(peerinfo_handle);
4669   peerinfo_handle = NULL;
4670   GNUNET_NSE_disconnect(nse);
4671   if (NULL != map_single_hop)
4672     {
4673       /* core_init was called - core was initialised */
4674       /* disconnect first, so no callback tries to access missing peermap */
4675       GNUNET_CORE_disconnect(core_handle);
4676       core_handle = NULL;
4677       GNUNET_CONTAINER_multipeermap_destroy(map_single_hop);
4678       map_single_hop = NULL;
4679     }
4680
4681   if (NULL != stats)
4682     {
4683       GNUNET_STATISTICS_destroy(stats,
4684                                 GNUNET_NO);
4685       stats = NULL;
4686     }
4687   GNUNET_CADET_disconnect(cadet_handle);
4688   cadet_handle = NULL;
4689 #if ENABLE_MALICIOUS
4690   struct AttackedPeer *tmp_att_peer;
4691   GNUNET_array_grow(mal_peers,
4692                     num_mal_peers,
4693                     0);
4694   if (NULL != mal_peer_set)
4695     GNUNET_CONTAINER_multipeermap_destroy(mal_peer_set);
4696   if (NULL != att_peer_set)
4697     GNUNET_CONTAINER_multipeermap_destroy(att_peer_set);
4698   while (NULL != att_peers_head)
4699     {
4700       tmp_att_peer = att_peers_head;
4701       GNUNET_CONTAINER_DLL_remove(att_peers_head,
4702                                   att_peers_tail,
4703                                   tmp_att_peer);
4704       GNUNET_free(tmp_att_peer);
4705     }
4706 #endif /* ENABLE_MALICIOUS */
4707   close_all_files();
4708 }
4709
4710
4711 /**
4712  * Handle client connecting to the service.
4713  *
4714  * @param cls unused
4715  * @param client the new client
4716  * @param mq the message queue of @a client
4717  * @return @a client
4718  */
4719 static void *
4720 client_connect_cb(void *cls,
4721                   struct GNUNET_SERVICE_Client *client,
4722                   struct GNUNET_MQ_Handle *mq)
4723 {
4724   struct ClientContext *cli_ctx;
4725
4726   (void)cls;
4727
4728   LOG(GNUNET_ERROR_TYPE_DEBUG,
4729       "Client connected\n");
4730   if (NULL == client)
4731     return client; /* Server was destroyed before a client connected. Shutting down */
4732   cli_ctx = GNUNET_new(struct ClientContext);
4733   cli_ctx->mq = mq;
4734   cli_ctx->view_updates_left = -1;
4735   cli_ctx->stream_update = GNUNET_NO;
4736   cli_ctx->client = client;
4737   GNUNET_CONTAINER_DLL_insert(cli_ctx_head,
4738                               cli_ctx_tail,
4739                               cli_ctx);
4740   return cli_ctx;
4741 }
4742
4743 /**
4744  * Callback called when a client disconnected from the service
4745  *
4746  * @param cls closure for the service
4747  * @param c the client that disconnected
4748  * @param internal_cls should be equal to @a c
4749  */
4750 static void
4751 client_disconnect_cb(void *cls,
4752                      struct GNUNET_SERVICE_Client *client,
4753                      void *internal_cls)
4754 {
4755   struct ClientContext *cli_ctx = internal_cls;
4756
4757   (void)cls;
4758   GNUNET_assert(client == cli_ctx->client);
4759   if (NULL == client)
4760     {/* shutdown task - destroy all clients */
4761       while (NULL != cli_ctx_head)
4762         destroy_cli_ctx(cli_ctx_head);
4763     }
4764   else
4765     { /* destroy this client */
4766       LOG(GNUNET_ERROR_TYPE_DEBUG,
4767           "Client disconnected. Destroy its context.\n");
4768       destroy_cli_ctx(cli_ctx);
4769     }
4770 }
4771
4772
4773 /**
4774  * Handle random peer sampling clients.
4775  *
4776  * @param cls closure
4777  * @param c configuration to use
4778  * @param service the initialized service
4779  */
4780 static void
4781 run(void *cls,
4782     const struct GNUNET_CONFIGURATION_Handle *c,
4783     struct GNUNET_SERVICE_Handle *service)
4784 {
4785   struct GNUNET_TIME_Relative round_interval;
4786   long long unsigned int sampler_size;
4787   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4788   struct GNUNET_HashCode hash;
4789
4790   (void)cls;
4791   (void)service;
4792
4793   GNUNET_log_setup("rps",
4794                    GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG),
4795                    NULL);
4796   cfg = c;
4797   /* Get own ID */
4798   GNUNET_CRYPTO_get_peer_identity(cfg,
4799                                   &own_identity);  // TODO check return value
4800   GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4801              "STARTING SERVICE (rps) for peer [%s]\n",
4802              GNUNET_i2s(&own_identity));
4803 #if ENABLE_MALICIOUS
4804   GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
4805              "Malicious execution compiled in.\n");
4806 #endif /* ENABLE_MALICIOUS */
4807
4808   /* Get time interval from the configuration */
4809   if (GNUNET_OK !=
4810       GNUNET_CONFIGURATION_get_value_time(cfg,
4811                                           "RPS",
4812                                           "ROUNDINTERVAL",
4813                                           &round_interval))
4814     {
4815       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
4816                                 "RPS", "ROUNDINTERVAL");
4817       GNUNET_SCHEDULER_shutdown();
4818       return;
4819     }
4820
4821   /* Get initial size of sampler/view from the configuration */
4822   if (GNUNET_OK !=
4823       GNUNET_CONFIGURATION_get_value_number(cfg,
4824                                             "RPS",
4825                                             "MINSIZE",
4826                                             &sampler_size))
4827     {
4828       GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
4829                                 "RPS", "MINSIZE");
4830       GNUNET_SCHEDULER_shutdown();
4831       return;
4832     }
4833
4834   cadet_handle = GNUNET_CADET_connect(cfg);
4835   GNUNET_assert(NULL != cadet_handle);
4836   core_handle = GNUNET_CORE_connect(cfg,
4837                                     NULL,  /* cls */
4838                                     core_init,  /* init */
4839                                     core_connects,  /* connects */
4840                                     core_disconnects,  /* disconnects */
4841                                     NULL);  /* handlers */
4842   GNUNET_assert(NULL != core_handle);
4843
4844
4845   alpha = 0.45;
4846   beta = 0.45;
4847
4848
4849   /* Set up main Sub */
4850   GNUNET_CRYPTO_hash(hash_port_string,
4851                      strlen(hash_port_string),
4852                      &hash);
4853   msub = new_sub(&hash,
4854                  sampler_size, /* Will be overwritten by config */
4855                  round_interval);
4856
4857
4858   peerinfo_handle = GNUNET_PEERINFO_connect(cfg);
4859
4860   /* connect to NSE */
4861   nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
4862
4863   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4864   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4865   // TODO send push/pull to each of those peers?
4866   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4867   restore_valid_peers(msub);
4868   get_valid_peers(msub->valid_peers, valid_peers_iterator, msub);
4869
4870   peerinfo_notify_handle = GNUNET_PEERINFO_notify(cfg,
4871                                                   GNUNET_NO,
4872                                                   process_peerinfo_peers,
4873                                                   msub);
4874
4875   LOG(GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4876
4877   GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL);
4878   stats = GNUNET_STATISTICS_create("rps", cfg);
4879 }
4880
4881
4882 /**
4883  * Define "main" method using service macro.
4884  */
4885 GNUNET_SERVICE_MAIN
4886   ("rps",
4887   GNUNET_SERVICE_OPTION_NONE,
4888   &run,
4889   &client_connect_cb,
4890   &client_disconnect_cb,
4891   NULL,
4892   GNUNET_MQ_hd_var_size(client_seed,
4893                         GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4894                         struct GNUNET_RPS_CS_SeedMessage,
4895                         NULL),
4896 #if ENABLE_MALICIOUS
4897   GNUNET_MQ_hd_var_size(client_act_malicious,
4898                         GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4899                         struct GNUNET_RPS_CS_ActMaliciousMessage,
4900                         NULL),
4901 #endif /* ENABLE_MALICIOUS */
4902   GNUNET_MQ_hd_fixed_size(client_view_request,
4903                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4904                           struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4905                           NULL),
4906   GNUNET_MQ_hd_fixed_size(client_view_cancel,
4907                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4908                           struct GNUNET_MessageHeader,
4909                           NULL),
4910   GNUNET_MQ_hd_fixed_size(client_stream_request,
4911                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4912                           struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4913                           NULL),
4914   GNUNET_MQ_hd_fixed_size(client_stream_cancel,
4915                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4916                           struct GNUNET_MessageHeader,
4917                           NULL),
4918   GNUNET_MQ_hd_fixed_size(client_start_sub,
4919                           GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4920                           struct GNUNET_RPS_CS_SubStartMessage,
4921                           NULL),
4922   GNUNET_MQ_hd_fixed_size(client_stop_sub,
4923                           GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4924                           struct GNUNET_RPS_CS_SubStopMessage,
4925                           NULL),
4926   GNUNET_MQ_handler_end());
4927
4928 /* end of gnunet-service-rps.c */