first step to remove plibc
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time interval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357 #ifdef TO_FILE_FULL
358   /**
359    * Name to log view to
360    */
361   char *file_name_view_log;
362 #endif /* TO_FILE_FULL */
363
364 #ifdef TO_FILE
365 #ifdef TO_FILE_FULL
366   /**
367    * Name to log number of observed peers to
368    */
369   char *file_name_observed_log;
370 #endif /* TO_FILE_FULL */
371
372   /**
373    * @brief Count the observed peers
374    */
375   uint32_t num_observed_peers;
376
377   /**
378    * @brief Multipeermap (ab-) used to count unique peer_ids
379    */
380   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
381 #endif /* TO_FILE */
382
383   /**
384    * List to store peers received through pushes temporary.
385    */
386   struct CustomPeerMap *push_map;
387
388   /**
389    * List to store peers received through pulls temporary.
390    */
391   struct CustomPeerMap *pull_map;
392
393   /**
394    * @brief This is the estimate used as view size.
395    *
396    * It is initialised with the minimum
397    */
398   unsigned int view_size_est_need;
399
400   /**
401    * @brief This is the minimum estimate used as view size.
402    *
403    * It is configured by the user.
404    */
405   unsigned int view_size_est_min;
406
407   /**
408    * @brief The view.
409    */
410   struct View *view;
411
412   /**
413    * Identifier for the main task that runs periodically.
414    */
415   struct GNUNET_SCHEDULER_Task *do_round_task;
416
417   /* === stats === */
418
419   /**
420    * @brief Counts the executed rounds.
421    */
422   uint32_t num_rounds;
423
424   /**
425    * @brief This array accumulates the number of received pushes per round.
426    *
427    * Number at index i represents the number of rounds with i observed pushes.
428    */
429   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
430
431   /**
432    * @brief Histogram of deltas between the expected and actual number of
433    * received pushes.
434    *
435    * As half of the entries are expected to be negative, this is shifted by
436    * #HISTOGRAM_FILE_SLOTS/2.
437    */
438   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
439
440   /**
441    * @brief Number of pull replies with this delay measured in rounds.
442    *
443    * Number at index i represents the number of pull replies with a delay of i
444    * rounds.
445    */
446   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
447 };
448
449
450 /***********************************************************************
451  * Globals
452 ***********************************************************************/
453
454 /**
455  * Our configuration.
456  */
457 static const struct GNUNET_CONFIGURATION_Handle *cfg;
458
459 /**
460  * Handle to the statistics service.
461  */
462 struct GNUNET_STATISTICS_Handle *stats;
463
464 /**
465  * Handler to CADET.
466  */
467 struct GNUNET_CADET_Handle *cadet_handle;
468
469 /**
470  * Handle to CORE
471  */
472 struct GNUNET_CORE_Handle *core_handle;
473
474 /**
475  * @brief PeerMap to keep track of connected peers.
476  */
477 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478
479 /**
480  * Our own identity.
481  */
482 static struct GNUNET_PeerIdentity own_identity;
483
484 /**
485  * Percentage of total peer number in the view
486  * to send random PUSHes to
487  */
488 static float alpha;
489
490 /**
491  * Percentage of total peer number in the view
492  * to send random PULLs to
493  */
494 static float beta;
495
496 /**
497  * Handler to NSE.
498  */
499 static struct GNUNET_NSE_Handle *nse;
500
501 /**
502  * Handler to PEERINFO.
503  */
504 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
505
506 /**
507  * Handle for cancellation of iteration over peers.
508  */
509 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510
511
512 #if ENABLE_MALICIOUS
513 /**
514  * Type of malicious peer
515  *
516  * 0 Don't act malicious at all - Default
517  * 1 Try to maximise representation
518  * 2 Try to partition the network
519  * 3 Combined attack
520  */
521 static uint32_t mal_type;
522
523 /**
524  * Other malicious peers
525  */
526 static struct GNUNET_PeerIdentity *mal_peers;
527
528 /**
529  * Hashmap of malicious peers used as set.
530  * Used to more efficiently check whether we know that peer.
531  */
532 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
533
534 /**
535  * Number of other malicious peers
536  */
537 static uint32_t num_mal_peers;
538
539
540 /**
541  * If type is 2 this is the DLL of attacked peers
542  */
543 static struct AttackedPeer *att_peers_head;
544 static struct AttackedPeer *att_peers_tail;
545
546 /**
547  * This index is used to point to an attacked peer to
548  * implement the round-robin-ish way to select attacked peers.
549  */
550 static struct AttackedPeer *att_peer_index;
551
552 /**
553  * Hashmap of attacked peers used as set.
554  * Used to more efficiently check whether we know that peer.
555  */
556 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
557
558 /**
559  * Number of attacked peers
560  */
561 static uint32_t num_attacked_peers;
562
563 /**
564  * If type is 1 this is the attacked peer
565  */
566 static struct GNUNET_PeerIdentity attacked_peer;
567
568 /**
569  * The limit of PUSHes we can send in one round.
570  * This is an assumption of the Brahms protocol and either implemented
571  * via proof of work
572  * or
573  * assumend to be the bandwidth limitation.
574  */
575 static uint32_t push_limit = 10000;
576 #endif /* ENABLE_MALICIOUS */
577
578 /**
579  * @brief Main Sub.
580  *
581  * This is run in any case by all peers and connects to all peers without
582  * specifying a shared value.
583  */
584 static struct Sub *msub;
585
586 /**
587  * @brief Maximum number of valid peers to keep.
588  * TODO read from config
589  */
590 static const uint32_t num_valid_peers_max = UINT32_MAX;
591
592 /***********************************************************************
593  * /Globals
594 ***********************************************************************/
595
596
597 static void
598 do_round (void *cls);
599
600 static void
601 do_mal_round (void *cls);
602
603
604 /**
605  * @brief Get the #PeerContext associated with a peer
606  *
607  * @param peer_map The peer map containing the context
608  * @param peer the peer id
609  *
610  * @return the #PeerContext
611  */
612 static struct PeerContext *
613 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
614               const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *ctx;
617   int ret;
618
619   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
620   GNUNET_assert (GNUNET_YES == ret);
621   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
622   GNUNET_assert (NULL != ctx);
623   return ctx;
624 }
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                           "# known peers",
679                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                           GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ( (NULL == peer_ctx->send_channel_ctx) &&
729        (NULL == peer_ctx->recv_channel_ctx) )
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780   (void) value;
781
782   if (0 >= iterator_cls->index)
783   {
784     iterator_cls->peer = peer;
785     return GNUNET_NO;
786   }
787   iterator_cls->index--;
788   return GNUNET_YES;
789 }
790
791
792 /**
793  * @brief Get a random peer from @a peer_map
794  *
795  * @param valid_peers Peer map containing valid peers from which to select a
796  * random one
797  *
798  * @return a random peer
799  */
800 static const struct GNUNET_PeerIdentity *
801 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
802 {
803   struct GetRandPeerIteratorCls *iterator_cls;
804   const struct GNUNET_PeerIdentity *ret;
805
806   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
807   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
808       GNUNET_CONTAINER_multipeermap_size (valid_peers));
809   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
810                                                 get_rand_peer_iterator,
811                                                 iterator_cls);
812   ret = iterator_cls->peer;
813   GNUNET_free (iterator_cls);
814   return ret;
815 }
816
817
818 /**
819  * @brief Add a given @a peer to valid peers.
820  *
821  * If valid peers are already #num_valid_peers_max, delete a peer previously.
822  *
823  * @param peer The peer that is added to the valid peers.
824  * @param valid_peers Peer map of valid peers to which to add the @a peer
825  *
826  * @return #GNUNET_YES if no other peer had to be removed
827  *         #GNUNET_NO  otherwise
828  */
829 static int
830 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
831                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
832 {
833   const struct GNUNET_PeerIdentity *rand_peer;
834   int ret;
835
836   ret = GNUNET_YES;
837   /* Remove random peers until there is space for a new one */
838   while (num_valid_peers_max <=
839          GNUNET_CONTAINER_multipeermap_size (valid_peers))
840   {
841     rand_peer = get_random_peer_from_peermap (valid_peers);
842     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
843     ret = GNUNET_NO;
844   }
845   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
846       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
847   if (valid_peers == msub->valid_peers)
848   {
849     GNUNET_STATISTICS_set (stats,
850                            "# valid peers",
851                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
852                            GNUNET_NO);
853   }
854   return ret;
855 }
856
857 static void
858 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
859
860 /**
861  * @brief Set the peer flag to living and
862  *        call the pending operations on this peer.
863  *
864  * Also adds peer to #valid_peers.
865  *
866  * @param peer_ctx the #PeerContext of the peer to set online
867  */
868 static void
869 set_peer_online (struct PeerContext *peer_ctx)
870 {
871   struct GNUNET_PeerIdentity *peer;
872   unsigned int i;
873
874   peer = &peer_ctx->peer_id;
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876       "Peer %s is online and valid, calling %i pending operations on it\n",
877       GNUNET_i2s (peer),
878       peer_ctx->num_pending_ops);
879
880   if (NULL != peer_ctx->online_check_pending)
881   {
882     LOG (GNUNET_ERROR_TYPE_DEBUG,
883          "Removing pending online check for peer %s\n",
884          GNUNET_i2s (&peer_ctx->peer_id));
885     // TODO wait until cadet sets mq->cancel_impl
886     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
887     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
888     peer_ctx->online_check_pending = NULL;
889   }
890
891   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
892
893   /* Call pending operations */
894   for (i = 0; i < peer_ctx->num_pending_ops; i++)
895   {
896     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
897   }
898   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
899 }
900
901 static void
902 cleanup_destroyed_channel (void *cls,
903                            const struct GNUNET_CADET_Channel *channel);
904
905 /* Declaration of handlers */
906 static void
907 handle_peer_check (void *cls,
908                    const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_push (void *cls,
912                   const struct GNUNET_MessageHeader *msg);
913
914 static void
915 handle_peer_pull_request (void *cls,
916                           const struct GNUNET_MessageHeader *msg);
917
918 static int
919 check_peer_pull_reply (void *cls,
920                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 static void
923 handle_peer_pull_reply (void *cls,
924                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
925
926 /* End declaration of handlers */
927
928 /**
929  * @brief Allocate memory for a new channel context and insert it into DLL
930  *
931  * @param peer_ctx context of the according peer
932  *
933  * @return The channel context
934  */
935 static struct ChannelCtx *
936 add_channel_ctx (struct PeerContext *peer_ctx)
937 {
938   struct ChannelCtx *channel_ctx;
939   channel_ctx = GNUNET_new (struct ChannelCtx);
940   channel_ctx->peer_ctx = peer_ctx;
941   return channel_ctx;
942 }
943
944
945 /**
946  * @brief Free memory and NULL pointers.
947  *
948  * @param channel_ctx The channel context.
949  */
950 static void
951 remove_channel_ctx (struct ChannelCtx *channel_ctx)
952 {
953   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
954
955   if (NULL != channel_ctx->destruction_task)
956   {
957     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
958     channel_ctx->destruction_task = NULL;
959   }
960
961   GNUNET_free (channel_ctx);
962
963   if (NULL == peer_ctx) return;
964   if (channel_ctx == peer_ctx->send_channel_ctx)
965   {
966     peer_ctx->send_channel_ctx = NULL;
967     peer_ctx->mq = NULL;
968   }
969   else if (channel_ctx == peer_ctx->recv_channel_ctx)
970   {
971     peer_ctx->recv_channel_ctx = NULL;
972   }
973 }
974
975
976 /**
977  * @brief Get the channel of a peer. If not existing, create.
978  *
979  * @param peer_ctx Context of the peer of which to get the channel
980  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
981  */
982 struct GNUNET_CADET_Channel *
983 get_channel (struct PeerContext *peer_ctx)
984 {
985   /* There exists a copy-paste-clone in run() */
986   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
987     GNUNET_MQ_hd_fixed_size (peer_check,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_push,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_fixed_size (peer_pull_request,
996                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
997                              struct GNUNET_MessageHeader,
998                              NULL),
999     GNUNET_MQ_hd_var_size (peer_pull_reply,
1000                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1001                            struct GNUNET_RPS_P2P_PullReplyMessage,
1002                            NULL),
1003     GNUNET_MQ_handler_end ()
1004   };
1005
1006
1007   if (NULL == peer_ctx->send_channel_ctx)
1008   {
1009     LOG (GNUNET_ERROR_TYPE_DEBUG,
1010          "Trying to establish channel to peer %s\n",
1011          GNUNET_i2s (&peer_ctx->peer_id));
1012     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1013     peer_ctx->send_channel_ctx->channel =
1014       GNUNET_CADET_channel_create (cadet_handle,
1015                                    peer_ctx->send_channel_ctx, /* context */
1016                                    &peer_ctx->peer_id,
1017                                    &peer_ctx->sub->hash,
1018                                    NULL, /* WindowSize handler */
1019                                    &cleanup_destroyed_channel, /* Disconnect handler */
1020                                    cadet_handlers);
1021   }
1022   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1023   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1024   return peer_ctx->send_channel_ctx->channel;
1025 }
1026
1027
1028 /**
1029  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1030  *
1031  * If we already have a message queue open to this client,
1032  * simply return it, otherways create one.
1033  *
1034  * @param peer_ctx Context of the peer of whicht to get the mq
1035  * @return the #GNUNET_MQ_Handle
1036  */
1037 static struct GNUNET_MQ_Handle *
1038 get_mq (struct PeerContext *peer_ctx)
1039 {
1040   if (NULL == peer_ctx->mq)
1041   {
1042     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1043   }
1044   return peer_ctx->mq;
1045 }
1046
1047 /**
1048  * @brief Add an envelope to a message passed to mq to list of pending messages
1049  *
1050  * @param peer_ctx Context of the peer for which to insert the envelope
1051  * @param ev envelope to the message
1052  * @param type type of the message to be sent
1053  * @return pointer to pending message
1054  */
1055 static struct PendingMessage *
1056 insert_pending_message (struct PeerContext *peer_ctx,
1057                         struct GNUNET_MQ_Envelope *ev,
1058                         const char *type)
1059 {
1060   struct PendingMessage *pending_msg;
1061
1062   pending_msg = GNUNET_new (struct PendingMessage);
1063   pending_msg->ev = ev;
1064   pending_msg->peer_ctx = peer_ctx;
1065   pending_msg->type = type;
1066   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1067                                peer_ctx->pending_messages_tail,
1068                                pending_msg);
1069   return pending_msg;
1070 }
1071
1072
1073 /**
1074  * @brief Remove a pending message from the respective DLL
1075  *
1076  * @param pending_msg the pending message to remove
1077  * @param cancel whether to cancel the pending message, too
1078  */
1079 static void
1080 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1081 {
1082   struct PeerContext *peer_ctx;
1083   (void) cancel;
1084
1085   peer_ctx = pending_msg->peer_ctx;
1086   GNUNET_assert (NULL != peer_ctx);
1087   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1088                                peer_ctx->pending_messages_tail,
1089                                pending_msg);
1090   // TODO wait for the cadet implementation of message cancellation
1091   //if (GNUNET_YES == cancel)
1092   //{
1093   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1094   //}
1095   GNUNET_free (pending_msg);
1096 }
1097
1098
1099 /**
1100  * @brief This is called in response to the first message we sent as a
1101  * online check.
1102  *
1103  * @param cls #PeerContext of peer with pending online check
1104  */
1105 static void
1106 mq_online_check_successful (void *cls)
1107 {
1108   struct PeerContext *peer_ctx = cls;
1109
1110   if (NULL != peer_ctx->online_check_pending)
1111   {
1112     LOG (GNUNET_ERROR_TYPE_DEBUG,
1113         "Online check for peer %s was successfull\n",
1114         GNUNET_i2s (&peer_ctx->peer_id));
1115     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1116     peer_ctx->online_check_pending = NULL;
1117     set_peer_online (peer_ctx);
1118     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1119   }
1120 }
1121
1122 /**
1123  * Issue a check whether peer is online
1124  *
1125  * @param peer_ctx the context of the peer
1126  */
1127 static void
1128 check_peer_online (struct PeerContext *peer_ctx)
1129 {
1130   LOG (GNUNET_ERROR_TYPE_DEBUG,
1131        "Get informed about peer %s getting online\n",
1132        GNUNET_i2s (&peer_ctx->peer_id));
1133
1134   struct GNUNET_MQ_Handle *mq;
1135   struct GNUNET_MQ_Envelope *ev;
1136
1137   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1138   peer_ctx->online_check_pending =
1139     insert_pending_message (peer_ctx, ev, "Check online");
1140   mq = get_mq (peer_ctx);
1141   GNUNET_MQ_notify_sent (ev,
1142                          mq_online_check_successful,
1143                          peer_ctx);
1144   GNUNET_MQ_send (mq, ev);
1145   if (peer_ctx->sub == msub)
1146   {
1147     GNUNET_STATISTICS_update (stats,
1148                               "# pending online checks",
1149                               1,
1150                               GNUNET_NO);
1151   }
1152 }
1153
1154
1155 /**
1156  * @brief Check whether function of type #PeerOp was already scheduled
1157  *
1158  * The array with pending operations will probably never grow really big, so
1159  * iterating over it should be ok.
1160  *
1161  * @param peer_ctx Context of the peer to check for the operation
1162  * @param peer_op the operation (#PeerOp) on the peer
1163  *
1164  * @return #GNUNET_YES if this operation is scheduled on that peer
1165  *         #GNUNET_NO  otherwise
1166  */
1167 static int
1168 check_operation_scheduled (const struct PeerContext *peer_ctx,
1169                            const PeerOp peer_op)
1170 {
1171   unsigned int i;
1172
1173   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1174     if (peer_op == peer_ctx->pending_ops[i].op)
1175       return GNUNET_YES;
1176   return GNUNET_NO;
1177 }
1178
1179
1180 /**
1181  * @brief Callback for scheduler to destroy a channel
1182  *
1183  * @param cls Context of the channel
1184  */
1185 static void
1186 destroy_channel (struct ChannelCtx *channel_ctx)
1187 {
1188   struct GNUNET_CADET_Channel *channel;
1189
1190   if (NULL != channel_ctx->destruction_task)
1191   {
1192     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1193     channel_ctx->destruction_task = NULL;
1194   }
1195   GNUNET_assert (channel_ctx->channel != NULL);
1196   channel = channel_ctx->channel;
1197   channel_ctx->channel = NULL;
1198   GNUNET_CADET_channel_destroy (channel);
1199   remove_channel_ctx (channel_ctx);
1200 }
1201
1202
1203 /**
1204  * @brief Destroy a cadet channel.
1205  *
1206  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1207  *
1208  * @param cls
1209  */
1210 static void
1211 destroy_channel_cb (void *cls)
1212 {
1213   struct ChannelCtx *channel_ctx = cls;
1214
1215   channel_ctx->destruction_task = NULL;
1216   destroy_channel (channel_ctx);
1217 }
1218
1219
1220 /**
1221  * @brief Schedule the destruction of a channel for immediately afterwards.
1222  *
1223  * In case a channel is to be destroyed from within the callback to the
1224  * destruction of another channel (send channel), we cannot call
1225  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1226  * construction.
1227  *
1228  * @param channel_ctx channel to be destroyed.
1229  */
1230 static void
1231 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1232 {
1233   GNUNET_assert (NULL ==
1234                  channel_ctx->destruction_task);
1235   GNUNET_assert (NULL !=
1236                  channel_ctx->channel);
1237   channel_ctx->destruction_task =
1238     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1239                               channel_ctx);
1240 }
1241
1242
1243 /**
1244  * @brief Remove peer
1245  *
1246  * - Empties the list with pending operations
1247  * - Empties the list with pending messages
1248  * - Cancels potentially existing online check
1249  * - Schedules closing of send and recv channels
1250  * - Removes peer from peer map
1251  *
1252  * @param peer_ctx Context of the peer to be destroyed
1253  * @return #GNUNET_YES if peer was removed
1254  *         #GNUNET_NO  otherwise
1255  */
1256 static int
1257 destroy_peer (struct PeerContext *peer_ctx)
1258 {
1259   GNUNET_assert (NULL != peer_ctx);
1260   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1261   if (GNUNET_NO ==
1262       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1263                                               &peer_ctx->peer_id))
1264   {
1265     return GNUNET_NO;
1266   }
1267   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1268   LOG (GNUNET_ERROR_TYPE_DEBUG,
1269        "Going to remove peer %s\n",
1270        GNUNET_i2s (&peer_ctx->peer_id));
1271   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1272
1273   /* Clear list of pending operations */
1274   // TODO this probably leaks memory
1275   //      ('only' the cls to the function. Not sure what to do with it)
1276   GNUNET_array_grow (peer_ctx->pending_ops,
1277                      peer_ctx->num_pending_ops,
1278                      0);
1279   /* Remove all pending messages */
1280   while (NULL != peer_ctx->pending_messages_head)
1281   {
1282     LOG (GNUNET_ERROR_TYPE_DEBUG,
1283          "Removing unsent %s\n",
1284          peer_ctx->pending_messages_head->type);
1285     /* Cancle pending message, too */
1286     if ( (NULL != peer_ctx->online_check_pending) &&
1287          (0 == memcmp (peer_ctx->pending_messages_head,
1288                      peer_ctx->online_check_pending,
1289                      sizeof (struct PendingMessage))) )
1290       {
1291         peer_ctx->online_check_pending = NULL;
1292         if (peer_ctx->sub == msub)
1293         {
1294           GNUNET_STATISTICS_update (stats,
1295                                     "# pending online checks",
1296                                     -1,
1297                                     GNUNET_NO);
1298         }
1299       }
1300     remove_pending_message (peer_ctx->pending_messages_head,
1301                             GNUNET_YES);
1302   }
1303
1304   /* If we are still waiting for notification whether this peer is online
1305    * cancel the according task */
1306   if (NULL != peer_ctx->online_check_pending)
1307   {
1308     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1309                 "Removing pending online check for peer %s\n",
1310                 GNUNET_i2s (&peer_ctx->peer_id));
1311     // TODO wait until cadet sets mq->cancel_impl
1312     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1313     remove_pending_message (peer_ctx->online_check_pending,
1314                             GNUNET_YES);
1315     peer_ctx->online_check_pending = NULL;
1316   }
1317
1318   if (NULL != peer_ctx->send_channel_ctx)
1319   {
1320     /* This is possibly called from within channel destruction */
1321     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1322     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1323     peer_ctx->send_channel_ctx = NULL;
1324     peer_ctx->mq = NULL;
1325   }
1326   if (NULL != peer_ctx->recv_channel_ctx)
1327   {
1328     /* This is possibly called from within channel destruction */
1329     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1330     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1331     peer_ctx->recv_channel_ctx = NULL;
1332   }
1333
1334   if (GNUNET_YES !=
1335       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1336                                                 &peer_ctx->peer_id))
1337   {
1338     LOG (GNUNET_ERROR_TYPE_WARNING,
1339          "removing peer from peer_ctx->sub->peer_map failed\n");
1340   }
1341   if (peer_ctx->sub == msub)
1342   {
1343     GNUNET_STATISTICS_set (stats,
1344                           "# known peers",
1345                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1346                           GNUNET_NO);
1347   }
1348   GNUNET_free (peer_ctx);
1349   return GNUNET_YES;
1350 }
1351
1352
1353 /**
1354  * Iterator over hash map entries. Deletes all contexts of peers.
1355  *
1356  * @param cls closure
1357  * @param key current public key
1358  * @param value value in the hash map
1359  * @return #GNUNET_YES if we should continue to iterate,
1360  *         #GNUNET_NO if not.
1361  */
1362 static int
1363 peermap_clear_iterator (void *cls,
1364                         const struct GNUNET_PeerIdentity *key,
1365                         void *value)
1366 {
1367   struct Sub *sub = cls;
1368   (void) value;
1369
1370   destroy_peer (get_peer_ctx (sub->peer_map, key));
1371   return GNUNET_YES;
1372 }
1373
1374
1375 /**
1376  * @brief This is called once a message is sent.
1377  *
1378  * Removes the pending message
1379  *
1380  * @param cls type of the message that was sent
1381  */
1382 static void
1383 mq_notify_sent_cb (void *cls)
1384 {
1385   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1386   LOG (GNUNET_ERROR_TYPE_DEBUG,
1387       "%s was sent.\n",
1388       pending_msg->type);
1389   if (pending_msg->peer_ctx->sub == msub)
1390   {
1391     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1392       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1393     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1394       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1395     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1396       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1397     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1398                       NULL != map_single_hop &&
1399         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1400           &pending_msg->peer_ctx->peer_id))
1401       GNUNET_STATISTICS_update(stats,
1402                                "# pull requests sent (multi-hop peer)",
1403                                1,
1404                                GNUNET_NO);
1405   }
1406   /* Do not cancle message */
1407   remove_pending_message (pending_msg, GNUNET_NO);
1408 }
1409
1410
1411 /**
1412  * @brief Iterator function for #store_valid_peers.
1413  *
1414  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1415  * Writes single peer to disk.
1416  *
1417  * @param cls the file handle to write to.
1418  * @param peer current peer
1419  * @param value unused
1420  *
1421  * @return  #GNUNET_YES if we should continue to
1422  *          iterate,
1423  *          #GNUNET_NO if not.
1424  */
1425 static int
1426 store_peer_presistently_iterator (void *cls,
1427                                   const struct GNUNET_PeerIdentity *peer,
1428                                   void *value)
1429 {
1430   const struct GNUNET_DISK_FileHandle *fh = cls;
1431   char peer_string[128];
1432   int size;
1433   ssize_t ret;
1434   (void) value;
1435
1436   if (NULL == peer)
1437   {
1438     return GNUNET_YES;
1439   }
1440   size = GNUNET_snprintf (peer_string,
1441                           sizeof (peer_string),
1442                           "%s\n",
1443                           GNUNET_i2s_full (peer));
1444   GNUNET_assert (53 == size);
1445   ret = GNUNET_DISK_file_write (fh,
1446                                 peer_string,
1447                                 size);
1448   GNUNET_assert (size == ret);
1449   return GNUNET_YES;
1450 }
1451
1452
1453 /**
1454  * @brief Store the peers currently in #valid_peers to disk.
1455  *
1456  * @param sub Sub for which to store the valid peers
1457  */
1458 static void
1459 store_valid_peers (const struct Sub *sub)
1460 {
1461   struct GNUNET_DISK_FileHandle *fh;
1462   uint32_t number_written_peers;
1463   int ret;
1464
1465   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1466   {
1467     return;
1468   }
1469
1470   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1471   if (GNUNET_SYSERR == ret)
1472   {
1473     LOG (GNUNET_ERROR_TYPE_WARNING,
1474         "Not able to create directory for file `%s'\n",
1475         sub->filename_valid_peers);
1476     GNUNET_break (0);
1477   }
1478   else if (GNUNET_NO == ret)
1479   {
1480     LOG (GNUNET_ERROR_TYPE_WARNING,
1481         "Directory for file `%s' exists but is not writable for us\n",
1482         sub->filename_valid_peers);
1483     GNUNET_break (0);
1484   }
1485   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1486                               GNUNET_DISK_OPEN_WRITE |
1487                                   GNUNET_DISK_OPEN_CREATE,
1488                               GNUNET_DISK_PERM_USER_READ |
1489                                   GNUNET_DISK_PERM_USER_WRITE);
1490   if (NULL == fh)
1491   {
1492     LOG (GNUNET_ERROR_TYPE_WARNING,
1493         "Not able to write valid peers to file `%s'\n",
1494         sub->filename_valid_peers);
1495     return;
1496   }
1497   LOG (GNUNET_ERROR_TYPE_DEBUG,
1498       "Writing %u valid peers to disk\n",
1499       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1500   number_written_peers =
1501     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1502                                            store_peer_presistently_iterator,
1503                                            fh);
1504   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1505   GNUNET_assert (number_written_peers ==
1506       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1507 }
1508
1509
1510 /**
1511  * @brief Convert string representation of peer id to peer id.
1512  *
1513  * Counterpart to #GNUNET_i2s_full.
1514  *
1515  * @param string_repr The string representation of the peer id
1516  *
1517  * @return The peer id
1518  */
1519 static const struct GNUNET_PeerIdentity *
1520 s2i_full (const char *string_repr)
1521 {
1522   struct GNUNET_PeerIdentity *peer;
1523   size_t len;
1524   int ret;
1525
1526   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1527   len = strlen (string_repr);
1528   if (52 > len)
1529   {
1530     LOG (GNUNET_ERROR_TYPE_WARNING,
1531         "Not able to convert string representation of PeerID to PeerID\n"
1532         "Sting representation: %s (len %lu) - too short\n",
1533         string_repr,
1534         len);
1535     GNUNET_break (0);
1536   }
1537   else if (52 < len)
1538   {
1539     len = 52;
1540   }
1541   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1542                                                     len,
1543                                                     &peer->public_key);
1544   if (GNUNET_OK != ret)
1545   {
1546     LOG (GNUNET_ERROR_TYPE_WARNING,
1547         "Not able to convert string representation of PeerID to PeerID\n"
1548         "Sting representation: %s\n",
1549         string_repr);
1550     GNUNET_break (0);
1551   }
1552   return peer;
1553 }
1554
1555
1556 /**
1557  * @brief Restore the peers on disk to #valid_peers.
1558  *
1559  * @param sub Sub for which to restore the valid peers
1560  */
1561 static void
1562 restore_valid_peers (const struct Sub *sub)
1563 {
1564   off_t file_size;
1565   uint32_t num_peers;
1566   struct GNUNET_DISK_FileHandle *fh;
1567   char *buf;
1568   ssize_t size_read;
1569   char *iter_buf;
1570   char *str_repr;
1571   const struct GNUNET_PeerIdentity *peer;
1572
1573   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1574   {
1575     return;
1576   }
1577
1578   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1579   {
1580     return;
1581   }
1582   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1583                               GNUNET_DISK_OPEN_READ,
1584                               GNUNET_DISK_PERM_NONE);
1585   GNUNET_assert (NULL != fh);
1586   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1587   num_peers = file_size / 53;
1588   buf = GNUNET_malloc (file_size);
1589   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1590   GNUNET_assert (size_read == file_size);
1591   LOG (GNUNET_ERROR_TYPE_DEBUG,
1592       "Restoring %" PRIu32 " peers from file `%s'\n",
1593       num_peers,
1594       sub->filename_valid_peers);
1595   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1596   {
1597     str_repr = GNUNET_strndup (iter_buf, 53);
1598     peer = s2i_full (str_repr);
1599     GNUNET_free (str_repr);
1600     add_valid_peer (peer, sub->valid_peers);
1601     LOG (GNUNET_ERROR_TYPE_DEBUG,
1602         "Restored valid peer %s from disk\n",
1603         GNUNET_i2s_full (peer));
1604   }
1605   iter_buf = NULL;
1606   GNUNET_free (buf);
1607   LOG (GNUNET_ERROR_TYPE_DEBUG,
1608       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1609       num_peers,
1610       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1611   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1612   {
1613     LOG (GNUNET_ERROR_TYPE_WARNING,
1614         "Number of restored peers does not match file size. Have probably duplicates.\n");
1615   }
1616   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1617   LOG (GNUNET_ERROR_TYPE_DEBUG,
1618       "Restored %u valid peers from disk\n",
1619       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1620 }
1621
1622
1623 /**
1624  * @brief Delete storage of peers that was created with #initialise_peers ()
1625  *
1626  * @param sub Sub for which the storage is deleted
1627  */
1628 static void
1629 peers_terminate (struct Sub *sub)
1630 {
1631   if (GNUNET_SYSERR ==
1632       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1633                                              &peermap_clear_iterator,
1634                                              sub))
1635   {
1636     LOG (GNUNET_ERROR_TYPE_WARNING,
1637         "Iteration destroying peers was aborted.\n");
1638   }
1639   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1640   sub->peer_map = NULL;
1641   store_valid_peers (sub);
1642   GNUNET_free (sub->filename_valid_peers);
1643   sub->filename_valid_peers = NULL;
1644   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1645   sub->valid_peers = NULL;
1646 }
1647
1648
1649 /**
1650  * Iterator over #valid_peers hash map entries.
1651  *
1652  * @param cls Closure that contains iterator function and closure
1653  * @param peer current peer id
1654  * @param value value in the hash map - unused
1655  * @return #GNUNET_YES if we should continue to
1656  *         iterate,
1657  *         #GNUNET_NO if not.
1658  */
1659 static int
1660 valid_peer_iterator (void *cls,
1661                      const struct GNUNET_PeerIdentity *peer,
1662                      void *value)
1663 {
1664   struct PeersIteratorCls *it_cls = cls;
1665   (void) value;
1666
1667   return it_cls->iterator (it_cls->cls, peer);
1668 }
1669
1670
1671 /**
1672  * @brief Get all currently known, valid peer ids.
1673  *
1674  * @param valid_peers Peer map containing the valid peers in question
1675  * @param iterator function to call on each peer id
1676  * @param it_cls extra argument to @a iterator
1677  * @return the number of key value pairs processed,
1678  *         #GNUNET_SYSERR if it aborted iteration
1679  */
1680 static int
1681 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1682                  PeersIterator iterator,
1683                  void *it_cls)
1684 {
1685   struct PeersIteratorCls *cls;
1686   int ret;
1687
1688   cls = GNUNET_new (struct PeersIteratorCls);
1689   cls->iterator = iterator;
1690   cls->cls = it_cls;
1691   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1692                                                valid_peer_iterator,
1693                                                cls);
1694   GNUNET_free (cls);
1695   return ret;
1696 }
1697
1698
1699 /**
1700  * @brief Add peer to known peers.
1701  *
1702  * This function is called on new peer_ids from 'external' sources
1703  * (client seed, cadet get_peers(), ...)
1704  *
1705  * @param sub Sub with the peer map that the @a peer will be added to
1706  * @param peer the new #GNUNET_PeerIdentity
1707  *
1708  * @return #GNUNET_YES if peer was inserted
1709  *         #GNUNET_NO  otherwise
1710  */
1711 static int
1712 insert_peer (struct Sub *sub,
1713              const struct GNUNET_PeerIdentity *peer)
1714 {
1715   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1716   {
1717     return GNUNET_NO; /* We already know this peer - nothing to do */
1718   }
1719   (void) create_peer_ctx (sub, peer);
1720   return GNUNET_YES;
1721 }
1722
1723
1724 /**
1725  * @brief Check whether flags on a peer are set.
1726  *
1727  * @param peer_map Peer map that is expected to contain the @a peer
1728  * @param peer the peer to check the flag of
1729  * @param flags the flags to check
1730  *
1731  * @return #GNUNET_SYSERR if peer is not known
1732  *         #GNUNET_YES    if all given flags are set
1733  *         #GNUNET_NO     otherwise
1734  */
1735 static int
1736 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1737                  const struct GNUNET_PeerIdentity *peer,
1738                  enum Peers_PeerFlags flags)
1739 {
1740   struct PeerContext *peer_ctx;
1741
1742   if (GNUNET_NO == check_peer_known (peer_map, peer))
1743   {
1744     return GNUNET_SYSERR;
1745   }
1746   peer_ctx = get_peer_ctx (peer_map, peer);
1747   return check_peer_flag_set (peer_ctx, flags);
1748 }
1749
1750 /**
1751  * @brief Try connecting to a peer to see whether it is online
1752  *
1753  * If not known yet, insert into known peers
1754  *
1755  * @param sub Sub which would contain the @a peer
1756  * @param peer the peer whose online is to be checked
1757  * @return #GNUNET_YES if the check was issued
1758  *         #GNUNET_NO  otherwise
1759  */
1760 static int
1761 issue_peer_online_check (struct Sub *sub,
1762                          const struct GNUNET_PeerIdentity *peer)
1763 {
1764   struct PeerContext *peer_ctx;
1765
1766   (void) insert_peer (sub, peer); // TODO even needed?
1767   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1768   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1769        (NULL == peer_ctx->online_check_pending) )
1770   {
1771     check_peer_online (peer_ctx);
1772     return GNUNET_YES;
1773   }
1774   return GNUNET_NO;
1775 }
1776
1777
1778 /**
1779  * @brief Check if peer is removable.
1780  *
1781  * Check if
1782  *  - a recv channel exists
1783  *  - there are pending messages
1784  *  - there is no pending pull reply
1785  *
1786  * @param peer_ctx Context of the peer in question
1787  * @return #GNUNET_YES    if peer is removable
1788  *         #GNUNET_NO     if peer is NOT removable
1789  *         #GNUNET_SYSERR if peer is not known
1790  */
1791 static int
1792 check_removable (const struct PeerContext *peer_ctx)
1793 {
1794   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1795                                                            &peer_ctx->peer_id))
1796   {
1797     return GNUNET_SYSERR;
1798   }
1799
1800   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1801        (NULL != peer_ctx->pending_messages_head) ||
1802        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1803   {
1804     return GNUNET_NO;
1805   }
1806   return GNUNET_YES;
1807 }
1808
1809
1810 /**
1811  * @brief Check whether @a peer is actually a peer.
1812  *
1813  * A valid peer is a peer that we know exists eg. we were connected to once.
1814  *
1815  * @param valid_peers Peer map that would contain the @a peer
1816  * @param peer peer in question
1817  *
1818  * @return #GNUNET_YES if peer is valid
1819  *         #GNUNET_NO  if peer is not valid
1820  */
1821 static int
1822 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1823                   const struct GNUNET_PeerIdentity *peer)
1824 {
1825   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1826 }
1827
1828
1829 /**
1830  * @brief Indicate that we want to send to the other peer
1831  *
1832  * This establishes a sending channel
1833  *
1834  * @param peer_ctx Context of the target peer
1835  */
1836 static void
1837 indicate_sending_intention (struct PeerContext *peer_ctx)
1838 {
1839   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1840                                                  &peer_ctx->peer_id));
1841   (void) get_channel (peer_ctx);
1842 }
1843
1844
1845 /**
1846  * @brief Check whether other peer has the intention to send/opened channel
1847  *        towars us
1848  *
1849  * @param peer_ctx Context of the peer in question
1850  *
1851  * @return #GNUNET_YES if peer has the intention to send
1852  *         #GNUNET_NO  otherwise
1853  */
1854 static int
1855 check_peer_send_intention (const struct PeerContext *peer_ctx)
1856 {
1857   if (NULL != peer_ctx->recv_channel_ctx)
1858   {
1859     return GNUNET_YES;
1860   }
1861   return GNUNET_NO;
1862 }
1863
1864
1865 /**
1866  * Handle the channel a peer opens to us.
1867  *
1868  * @param cls The closure - Sub
1869  * @param channel The channel the peer wants to establish
1870  * @param initiator The peer's peer ID
1871  *
1872  * @return initial channel context for the channel
1873  *         (can be NULL -- that's not an error)
1874  */
1875 static void *
1876 handle_inbound_channel (void *cls,
1877                         struct GNUNET_CADET_Channel *channel,
1878                         const struct GNUNET_PeerIdentity *initiator)
1879 {
1880   struct PeerContext *peer_ctx;
1881   struct ChannelCtx *channel_ctx;
1882   struct Sub *sub = cls;
1883
1884   LOG (GNUNET_ERROR_TYPE_DEBUG,
1885       "New channel was established to us (Peer %s).\n",
1886       GNUNET_i2s (initiator));
1887   GNUNET_assert (NULL != channel); /* according to cadet API */
1888   /* Make sure we 'know' about this peer */
1889   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1890   set_peer_online (peer_ctx);
1891   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1892   channel_ctx = add_channel_ctx (peer_ctx);
1893   channel_ctx->channel = channel;
1894   /* We only accept one incoming channel per peer */
1895   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1896                                                              initiator)))
1897   {
1898     LOG (GNUNET_ERROR_TYPE_WARNING,
1899         "Already got one receive channel. Destroying old one.\n");
1900     GNUNET_break_op (0);
1901     destroy_channel (peer_ctx->recv_channel_ctx);
1902     peer_ctx->recv_channel_ctx = channel_ctx;
1903     /* return the channel context */
1904     return channel_ctx;
1905   }
1906   peer_ctx->recv_channel_ctx = channel_ctx;
1907   return channel_ctx;
1908 }
1909
1910
1911 /**
1912  * @brief Check whether a sending channel towards the given peer exists
1913  *
1914  * @param peer_ctx Context of the peer in question
1915  *
1916  * @return #GNUNET_YES if a sending channel towards that peer exists
1917  *         #GNUNET_NO  otherwise
1918  */
1919 static int
1920 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1921 {
1922   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1923                                      &peer_ctx->peer_id))
1924   { /* If no such peer exists, there is no channel */
1925     return GNUNET_NO;
1926   }
1927   if (NULL == peer_ctx->send_channel_ctx)
1928   {
1929     return GNUNET_NO;
1930   }
1931   return GNUNET_YES;
1932 }
1933
1934
1935 /**
1936  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1937  *        intention to another peer
1938  *
1939  * @param peer_ctx Context to the peer
1940  * @return #GNUNET_YES if channel was destroyed
1941  *         #GNUNET_NO  otherwise
1942  */
1943 static int
1944 destroy_sending_channel (struct PeerContext *peer_ctx)
1945 {
1946   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1947                                      &peer_ctx->peer_id))
1948   {
1949     return GNUNET_NO;
1950   }
1951   if (NULL != peer_ctx->send_channel_ctx)
1952   {
1953     destroy_channel (peer_ctx->send_channel_ctx);
1954     (void) check_connected (peer_ctx);
1955     return GNUNET_YES;
1956   }
1957   return GNUNET_NO;
1958 }
1959
1960 /**
1961  * @brief Send a message to another peer.
1962  *
1963  * Keeps track about pending messages so they can be properly removed when the
1964  * peer is destroyed.
1965  *
1966  * @param peer_ctx Context of the peer to which the message is to be sent
1967  * @param ev envelope of the message
1968  * @param type type of the message
1969  */
1970 static void
1971 send_message (struct PeerContext *peer_ctx,
1972               struct GNUNET_MQ_Envelope *ev,
1973               const char *type)
1974 {
1975   struct PendingMessage *pending_msg;
1976   struct GNUNET_MQ_Handle *mq;
1977
1978   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1979               "Sending message to %s of type %s\n",
1980               GNUNET_i2s (&peer_ctx->peer_id),
1981               type);
1982   pending_msg = insert_pending_message (peer_ctx, ev, type);
1983   mq = get_mq (peer_ctx);
1984   GNUNET_MQ_notify_sent (ev,
1985                          mq_notify_sent_cb,
1986                          pending_msg);
1987   GNUNET_MQ_send (mq, ev);
1988 }
1989
1990 /**
1991  * @brief Schedule a operation on given peer
1992  *
1993  * Avoids scheduling an operation twice.
1994  *
1995  * @param peer_ctx Context of the peer for which to schedule the operation
1996  * @param peer_op the operation to schedule
1997  * @param cls Closure to @a peer_op
1998  *
1999  * @return #GNUNET_YES if the operation was scheduled
2000  *         #GNUNET_NO  otherwise
2001  */
2002 static int
2003 schedule_operation (struct PeerContext *peer_ctx,
2004                     const PeerOp peer_op,
2005                     void *cls)
2006 {
2007   struct PeerPendingOp pending_op;
2008
2009   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2010                                                  &peer_ctx->peer_id));
2011
2012   //TODO if ONLINE execute immediately
2013
2014   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2015   {
2016     pending_op.op = peer_op;
2017     pending_op.op_cls = cls;
2018     GNUNET_array_append (peer_ctx->pending_ops,
2019                          peer_ctx->num_pending_ops,
2020                          pending_op);
2021     return GNUNET_YES;
2022   }
2023   return GNUNET_NO;
2024 }
2025
2026 /***********************************************************************
2027  * /Old gnunet-service-rps_peers.c
2028 ***********************************************************************/
2029
2030
2031 /***********************************************************************
2032  * Housekeeping with clients
2033 ***********************************************************************/
2034
2035 /**
2036  * Closure used to pass the client and the id to the callback
2037  * that replies to a client's request
2038  */
2039 struct ReplyCls
2040 {
2041   /**
2042    * DLL
2043    */
2044   struct ReplyCls *next;
2045   struct ReplyCls *prev;
2046
2047   /**
2048    * The identifier of the request
2049    */
2050   uint32_t id;
2051
2052   /**
2053    * The handle to the request
2054    */
2055   struct RPS_SamplerRequestHandle *req_handle;
2056
2057   /**
2058    * The client handle to send the reply to
2059    */
2060   struct ClientContext *cli_ctx;
2061 };
2062
2063
2064 /**
2065  * Struct used to store the context of a connected client.
2066  */
2067 struct ClientContext
2068 {
2069   /**
2070    * DLL
2071    */
2072   struct ClientContext *next;
2073   struct ClientContext *prev;
2074
2075   /**
2076    * The message queue to communicate with the client.
2077    */
2078   struct GNUNET_MQ_Handle *mq;
2079
2080   /**
2081    * @brief How many updates this client expects to receive.
2082    */
2083   int64_t view_updates_left;
2084
2085   /**
2086    * @brief Whether this client wants to receive stream updates.
2087    * Either #GNUNET_YES or #GNUNET_NO
2088    */
2089   int8_t stream_update;
2090
2091   /**
2092    * The client handle to send the reply to
2093    */
2094   struct GNUNET_SERVICE_Client *client;
2095
2096   /**
2097    * The #Sub this context belongs to
2098    */
2099   struct Sub *sub;
2100 };
2101
2102 /**
2103  * DLL with all clients currently connected to us
2104  */
2105 struct ClientContext *cli_ctx_head;
2106 struct ClientContext *cli_ctx_tail;
2107
2108 /***********************************************************************
2109  * /Housekeeping with clients
2110 ***********************************************************************/
2111
2112
2113
2114
2115
2116 /***********************************************************************
2117  * Util functions
2118 ***********************************************************************/
2119
2120
2121 /**
2122  * Print peerlist to log.
2123  */
2124 static void
2125 print_peer_list (struct GNUNET_PeerIdentity *list,
2126                  unsigned int len)
2127 {
2128   unsigned int i;
2129
2130   LOG (GNUNET_ERROR_TYPE_DEBUG,
2131        "Printing peer list of length %u at %p:\n",
2132        len,
2133        list);
2134   for (i = 0 ; i < len ; i++)
2135   {
2136     LOG (GNUNET_ERROR_TYPE_DEBUG,
2137          "%u. peer: %s\n",
2138          i, GNUNET_i2s (&list[i]));
2139   }
2140 }
2141
2142
2143 /**
2144  * Remove peer from list.
2145  */
2146 static void
2147 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2148                unsigned int *list_size,
2149                const struct GNUNET_PeerIdentity *peer)
2150 {
2151   unsigned int i;
2152   struct GNUNET_PeerIdentity *tmp;
2153
2154   tmp = *peer_list;
2155
2156   LOG (GNUNET_ERROR_TYPE_DEBUG,
2157        "Removing peer %s from list at %p\n",
2158        GNUNET_i2s (peer),
2159        tmp);
2160
2161   for ( i = 0 ; i < *list_size ; i++ )
2162   {
2163     if (0 == GNUNET_memcmp (&tmp[i], peer))
2164     {
2165       if (i < *list_size -1)
2166       { /* Not at the last entry -- shift peers left */
2167         memmove (&tmp[i], &tmp[i +1],
2168                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2169       }
2170       /* Remove last entry (should be now useless PeerID) */
2171       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2172     }
2173   }
2174   *peer_list = tmp;
2175 }
2176
2177
2178 /**
2179  * Insert PeerID in #view
2180  *
2181  * Called once we know a peer is online.
2182  * Implements #PeerOp
2183  *
2184  * @return GNUNET_OK if peer was actually inserted
2185  *         GNUNET_NO if peer was not inserted
2186  */
2187 static void
2188 insert_in_view_op (void *cls,
2189                    const struct GNUNET_PeerIdentity *peer);
2190
2191 /**
2192  * Insert PeerID in #view
2193  *
2194  * Called once we know a peer is online.
2195  *
2196  * @param sub Sub in with the view to insert in
2197  * @param peer the peer to insert
2198  *
2199  * @return GNUNET_OK if peer was actually inserted
2200  *         GNUNET_NO if peer was not inserted
2201  */
2202 static int
2203 insert_in_view (struct Sub *sub,
2204                 const struct GNUNET_PeerIdentity *peer)
2205 {
2206   struct PeerContext *peer_ctx;
2207   int online;
2208   int ret;
2209
2210   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2211   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2212   if ( (GNUNET_NO == online) ||
2213        (GNUNET_SYSERR == online) ) /* peer is not even known */
2214   {
2215     (void) issue_peer_online_check (sub, peer);
2216     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2217     return GNUNET_NO;
2218   }
2219   /* Open channel towards peer to keep connection open */
2220   indicate_sending_intention (peer_ctx);
2221   ret = View_put (sub->view, peer);
2222   if (peer_ctx->sub == msub)
2223   {
2224     GNUNET_STATISTICS_set (stats,
2225                            "view size",
2226                            View_size (peer_ctx->sub->view),
2227                            GNUNET_NO);
2228   }
2229   return ret;
2230 }
2231
2232
2233 /**
2234  * @brief Send view to client
2235  *
2236  * @param cli_ctx the context of the client
2237  * @param view_array the peerids of the view as array (can be empty)
2238  * @param view_size the size of the view array (can be 0)
2239  */
2240 static void
2241 send_view (const struct ClientContext *cli_ctx,
2242            const struct GNUNET_PeerIdentity *view_array,
2243            uint64_t view_size)
2244 {
2245   struct GNUNET_MQ_Envelope *ev;
2246   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2247   struct Sub *sub;
2248
2249   if (NULL == view_array)
2250   {
2251     if (NULL == cli_ctx->sub) sub = msub;
2252     else sub = cli_ctx->sub;
2253     view_size = View_size (sub->view);
2254     view_array = View_get_as_array (sub->view);
2255   }
2256
2257   ev = GNUNET_MQ_msg_extra (out_msg,
2258                             view_size * sizeof (struct GNUNET_PeerIdentity),
2259                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2260   out_msg->num_peers = htonl (view_size);
2261
2262   GNUNET_memcpy (&out_msg[1],
2263                  view_array,
2264                  view_size * sizeof (struct GNUNET_PeerIdentity));
2265   GNUNET_MQ_send (cli_ctx->mq, ev);
2266 }
2267
2268
2269 /**
2270  * @brief Send peer from biased stream to client.
2271  *
2272  * TODO merge with send_view, parameterise
2273  *
2274  * @param cli_ctx the context of the client
2275  * @param view_array the peerids of the view as array (can be empty)
2276  * @param view_size the size of the view array (can be 0)
2277  */
2278 static void
2279 send_stream_peers (const struct ClientContext *cli_ctx,
2280                    uint64_t num_peers,
2281                    const struct GNUNET_PeerIdentity *peers)
2282 {
2283   struct GNUNET_MQ_Envelope *ev;
2284   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2285
2286   GNUNET_assert (NULL != peers);
2287
2288   ev = GNUNET_MQ_msg_extra (out_msg,
2289                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2290                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2291   out_msg->num_peers = htonl (num_peers);
2292
2293   GNUNET_memcpy (&out_msg[1],
2294                  peers,
2295                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2296   GNUNET_MQ_send (cli_ctx->mq, ev);
2297 }
2298
2299
2300 /**
2301  * @brief sends updates to clients that are interested
2302  *
2303  * @param sub Sub for which to notify clients
2304  */
2305 static void
2306 clients_notify_view_update (const struct Sub *sub)
2307 {
2308   struct ClientContext *cli_ctx_iter;
2309   uint64_t num_peers;
2310   const struct GNUNET_PeerIdentity *view_array;
2311
2312   num_peers = View_size (sub->view);
2313   view_array = View_get_as_array(sub->view);
2314   /* check size of view is small enough */
2315   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2316   {
2317     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2318                 "View is too big to send\n");
2319     return;
2320   }
2321
2322   for (cli_ctx_iter = cli_ctx_head;
2323        NULL != cli_ctx_iter;
2324        cli_ctx_iter = cli_ctx_iter->next)
2325   {
2326     if (1 < cli_ctx_iter->view_updates_left)
2327     {
2328       /* Client wants to receive limited amount of updates */
2329       cli_ctx_iter->view_updates_left -= 1;
2330     } else if (1 == cli_ctx_iter->view_updates_left)
2331     {
2332       /* Last update of view for client */
2333       cli_ctx_iter->view_updates_left = -1;
2334     } else if (0 > cli_ctx_iter->view_updates_left) {
2335       /* Client is not interested in updates */
2336       continue;
2337     }
2338     /* else _updates_left == 0 - infinite amount of updates */
2339
2340     /* send view */
2341     send_view (cli_ctx_iter, view_array, num_peers);
2342   }
2343 }
2344
2345
2346 /**
2347  * @brief sends updates to clients that are interested
2348  *
2349  * @param num_peers Number of peers to send
2350  * @param peers the array of peers to send
2351  */
2352 static void
2353 clients_notify_stream_peer (const struct Sub *sub,
2354                             uint64_t num_peers,
2355                             const struct GNUNET_PeerIdentity *peers)
2356                             // TODO enum StreamPeerSource)
2357 {
2358   struct ClientContext *cli_ctx_iter;
2359
2360   LOG (GNUNET_ERROR_TYPE_DEBUG,
2361       "Got peer (%s) from biased stream - update all clients\n",
2362       GNUNET_i2s (peers));
2363
2364   for (cli_ctx_iter = cli_ctx_head;
2365        NULL != cli_ctx_iter;
2366        cli_ctx_iter = cli_ctx_iter->next)
2367   {
2368     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2369         (sub == cli_ctx_iter->sub || sub == msub))
2370     {
2371       send_stream_peers (cli_ctx_iter, num_peers, peers);
2372     }
2373   }
2374 }
2375
2376
2377 /**
2378  * Put random peer from sampler into the view as history update.
2379  *
2380  * @param ids Array of Peers to insert into view
2381  * @param num_peers Number of peers to insert
2382  * @param cls Closure - The Sub for which this is to be done
2383  */
2384 static void
2385 hist_update (const struct GNUNET_PeerIdentity *ids,
2386              uint32_t num_peers,
2387              void *cls)
2388 {
2389   unsigned int i;
2390   struct Sub *sub = cls;
2391
2392   for (i = 0; i < num_peers; i++)
2393   {
2394     int inserted;
2395     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2396     {
2397       LOG (GNUNET_ERROR_TYPE_WARNING,
2398            "Peer in history update not known!\n");
2399       continue;
2400     }
2401     inserted = insert_in_view (sub, &ids[i]);
2402     if (GNUNET_OK == inserted)
2403     {
2404       clients_notify_stream_peer (sub, 1, &ids[i]);
2405     }
2406 #ifdef TO_FILE_FULL
2407     to_file (sub->file_name_view_log,
2408              "+%s\t(hist)",
2409              GNUNET_i2s_full (ids));
2410 #endif /* TO_FILE_FULL */
2411   }
2412   clients_notify_view_update (sub);
2413 }
2414
2415
2416 /**
2417  * Wrapper around #RPS_sampler_resize()
2418  *
2419  * If we do not have enough sampler elements, double current sampler size
2420  * If we have more than enough sampler elements, halv current sampler size
2421  *
2422  * @param sampler The sampler to resize
2423  * @param new_size New size to which to resize
2424  */
2425 static void
2426 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2427 {
2428   unsigned int sampler_size;
2429
2430   // TODO statistics
2431   // TODO respect the min, max
2432   sampler_size = RPS_sampler_get_size (sampler);
2433   if (sampler_size > new_size * 4)
2434   { /* Shrinking */
2435     RPS_sampler_resize (sampler, sampler_size / 2);
2436   }
2437   else if (sampler_size < new_size)
2438   { /* Growing */
2439     RPS_sampler_resize (sampler, sampler_size * 2);
2440   }
2441   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2442 }
2443
2444
2445 /**
2446  * Add all peers in @a peer_array to @a peer_map used as set.
2447  *
2448  * @param peer_array array containing the peers
2449  * @param num_peers number of peers in @peer_array
2450  * @param peer_map the peermap to use as set
2451  */
2452 static void
2453 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2454                        unsigned int num_peers,
2455                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2456 {
2457   unsigned int i;
2458   if (NULL == peer_map)
2459   {
2460     LOG (GNUNET_ERROR_TYPE_WARNING,
2461          "Trying to add peers to non-existing peermap.\n");
2462     return;
2463   }
2464
2465   for (i = 0; i < num_peers; i++)
2466   {
2467     GNUNET_CONTAINER_multipeermap_put (peer_map,
2468                                        &peer_array[i],
2469                                        NULL,
2470                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2471     if (msub->peer_map == peer_map)
2472     {
2473       GNUNET_STATISTICS_set (stats,
2474                             "# known peers",
2475                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2476                             GNUNET_NO);
2477     }
2478   }
2479 }
2480
2481
2482 /**
2483  * Send a PULL REPLY to @a peer_id
2484  *
2485  * @param peer_ctx Context of the peer to send the reply to
2486  * @param peer_ids the peers to send to @a peer_id
2487  * @param num_peer_ids the number of peers to send to @a peer_id
2488  */
2489 static void
2490 send_pull_reply (struct PeerContext *peer_ctx,
2491                  const struct GNUNET_PeerIdentity *peer_ids,
2492                  unsigned int num_peer_ids)
2493 {
2494   uint32_t send_size;
2495   struct GNUNET_MQ_Envelope *ev;
2496   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2497
2498   /* Compute actual size */
2499   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2500               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2501
2502   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2503     /* Compute number of peers to send
2504      * If too long, simply truncate */
2505     // TODO select random ones via permutation
2506     //      or even better: do good protocol design
2507     send_size =
2508       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2509        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2510        sizeof (struct GNUNET_PeerIdentity);
2511   else
2512     send_size = num_peer_ids;
2513
2514   LOG (GNUNET_ERROR_TYPE_DEBUG,
2515       "Going to send PULL REPLY with %u peers to %s\n",
2516       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2517
2518   ev = GNUNET_MQ_msg_extra (out_msg,
2519                             send_size * sizeof (struct GNUNET_PeerIdentity),
2520                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2521   out_msg->num_peers = htonl (send_size);
2522   GNUNET_memcpy (&out_msg[1], peer_ids,
2523          send_size * sizeof (struct GNUNET_PeerIdentity));
2524
2525   send_message (peer_ctx, ev, "PULL REPLY");
2526   if (peer_ctx->sub == msub)
2527   {
2528     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2529   }
2530   // TODO check with send intention: as send_channel is used/opened we indicate
2531   // a sending intention without intending it.
2532   // -> clean peer afterwards?
2533   // -> use recv_channel?
2534 }
2535
2536
2537 /**
2538  * Insert PeerID in #pull_map
2539  *
2540  * Called once we know a peer is online.
2541  *
2542  * @param cls Closure - Sub with the pull map to insert into
2543  * @param peer Peer to insert
2544  */
2545 static void
2546 insert_in_pull_map (void *cls,
2547                     const struct GNUNET_PeerIdentity *peer)
2548 {
2549   struct Sub *sub = cls;
2550
2551   CustomPeerMap_put (sub->pull_map, peer);
2552 }
2553
2554
2555 /**
2556  * Insert PeerID in #view
2557  *
2558  * Called once we know a peer is online.
2559  * Implements #PeerOp
2560  *
2561  * @param cls Closure - Sub with view to insert peer into
2562  * @param peer the peer to insert
2563  */
2564 static void
2565 insert_in_view_op (void *cls,
2566                    const struct GNUNET_PeerIdentity *peer)
2567 {
2568   struct Sub *sub = cls;
2569   int inserted;
2570
2571   inserted = insert_in_view (sub, peer);
2572   if (GNUNET_OK == inserted)
2573   {
2574     clients_notify_stream_peer (sub, 1, peer);
2575   }
2576 }
2577
2578
2579 /**
2580  * Update sampler with given PeerID.
2581  * Implements #PeerOp
2582  *
2583  * @param cls Closure - Sub containing the sampler to insert into
2584  * @param peer Peer to insert
2585  */
2586 static void
2587 insert_in_sampler (void *cls,
2588                    const struct GNUNET_PeerIdentity *peer)
2589 {
2590   struct Sub *sub = cls;
2591
2592   LOG (GNUNET_ERROR_TYPE_DEBUG,
2593        "Updating samplers with peer %s from insert_in_sampler()\n",
2594        GNUNET_i2s (peer));
2595   RPS_sampler_update (sub->sampler, peer);
2596   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2597   {
2598     /* Make sure we 'know' about this peer */
2599     (void) issue_peer_online_check (sub, peer);
2600     /* Establish a channel towards that peer to indicate we are going to send
2601      * messages to it */
2602     //indicate_sending_intention (peer);
2603   }
2604   if (sub == msub)
2605   {
2606     GNUNET_STATISTICS_update (stats,
2607                               "# observed peers in gossip",
2608                               1,
2609                               GNUNET_NO);
2610   }
2611 #ifdef TO_FILE
2612   sub->num_observed_peers++;
2613   GNUNET_CONTAINER_multipeermap_put
2614     (sub->observed_unique_peers,
2615      peer,
2616      NULL,
2617      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2618   uint32_t num_observed_unique_peers =
2619     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2620   GNUNET_STATISTICS_set (stats,
2621                          "# unique peers in gossip",
2622                          num_observed_unique_peers,
2623                          GNUNET_NO);
2624 #ifdef TO_FILE_FULL
2625   to_file (sub->file_name_observed_log,
2626           "%" PRIu32 " %" PRIu32 " %f\n",
2627           sub->num_observed_peers,
2628           num_observed_unique_peers,
2629           1.0*num_observed_unique_peers/sub->num_observed_peers)
2630 #endif /* TO_FILE_FULL */
2631 #endif /* TO_FILE */
2632 }
2633
2634
2635 /**
2636  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2637  *        If the peer is not known, online check is issued and it is
2638  *        scheduled to be inserted in sampler and view.
2639  *
2640  * "External sources" refer to every source except the gossip.
2641  *
2642  * @param sub Sub for which @a peer was received
2643  * @param peer peer to insert/peer received
2644  */
2645 static void
2646 got_peer (struct Sub *sub,
2647           const struct GNUNET_PeerIdentity *peer)
2648 {
2649   /* If we did not know this peer already, insert it into sampler and view */
2650   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2651   {
2652     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2653                         &insert_in_sampler, sub);
2654     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2655                         &insert_in_view_op, sub);
2656   }
2657   if (sub == msub)
2658   {
2659     GNUNET_STATISTICS_update (stats,
2660                               "# learnd peers",
2661                               1,
2662                               GNUNET_NO);
2663   }
2664 }
2665
2666
2667 /**
2668  * @brief Checks if there is a sending channel and if it is needed
2669  *
2670  * @param peer_ctx Context of the peer to check
2671  * @return GNUNET_YES if sending channel exists and is still needed
2672  *         GNUNET_NO  otherwise
2673  */
2674 static int
2675 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2676 {
2677   /* struct GNUNET_CADET_Channel *channel; */
2678   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2679                                      &peer_ctx->peer_id))
2680   {
2681     return GNUNET_NO;
2682   }
2683   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2684   {
2685     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2686                                     &peer_ctx->peer_id)) ||
2687          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2688                                             &peer_ctx->peer_id)) ||
2689          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2690                                                      &peer_ctx->peer_id)) ||
2691          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2692                                                      &peer_ctx->peer_id)) ||
2693          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2694                                          &peer_ctx->peer_id,
2695                                          Peers_PULL_REPLY_PENDING)))
2696     { /* If we want to keep the connection to peer open */
2697       return GNUNET_YES;
2698     }
2699     return GNUNET_NO;
2700   }
2701   return GNUNET_NO;
2702 }
2703
2704
2705 /**
2706  * @brief remove peer from our knowledge, the view, push and pull maps and
2707  * samplers.
2708  *
2709  * @param sub Sub with the data structures the peer is to be removed from
2710  * @param peer the peer to remove
2711  */
2712 static void
2713 remove_peer (struct Sub *sub,
2714              const struct GNUNET_PeerIdentity *peer)
2715 {
2716   (void) View_remove_peer (sub->view,
2717                            peer);
2718   CustomPeerMap_remove_peer (sub->pull_map,
2719                              peer);
2720   CustomPeerMap_remove_peer (sub->push_map,
2721                              peer);
2722   RPS_sampler_reinitialise_by_value (sub->sampler,
2723                                      peer);
2724   /* We want to destroy the peer now.
2725    * Sometimes, it just seems that it's already been removed from the peer_map,
2726    * so check the peer_map first. */
2727   if (GNUNET_YES == check_peer_known (sub->peer_map,
2728                                       peer))
2729   {
2730     destroy_peer (get_peer_ctx (sub->peer_map,
2731                                 peer));
2732   }
2733 }
2734
2735
2736 /**
2737  * @brief Remove data that is not needed anymore.
2738  *
2739  * If the sending channel is no longer needed it is destroyed.
2740  *
2741  * @param sub Sub in which the current peer is to be cleaned
2742  * @param peer the peer whose data is about to be cleaned
2743  */
2744 static void
2745 clean_peer (struct Sub *sub,
2746             const struct GNUNET_PeerIdentity *peer)
2747 {
2748   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2749                                                                peer)))
2750   {
2751     LOG (GNUNET_ERROR_TYPE_DEBUG,
2752         "Going to remove send channel to peer %s\n",
2753         GNUNET_i2s (peer));
2754     #if ENABLE_MALICIOUS
2755     if (0 != GNUNET_memcmp (&attacked_peer,
2756                                               peer))
2757       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2758                                                     peer));
2759     #else /* ENABLE_MALICIOUS */
2760     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2761                                                   peer));
2762     #endif /* ENABLE_MALICIOUS */
2763   }
2764
2765   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2766                                                            peer))
2767   {
2768     /* Peer was already removed by callback on destroyed channel */
2769     LOG (GNUNET_ERROR_TYPE_WARNING,
2770         "Peer was removed from our knowledge during cleanup\n");
2771     return;
2772   }
2773
2774   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2775                                                               peer))) &&
2776        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2777        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2778        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2779        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2780        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2781   { /* We can safely remove this peer */
2782     LOG (GNUNET_ERROR_TYPE_DEBUG,
2783         "Going to remove peer %s\n",
2784         GNUNET_i2s (peer));
2785     remove_peer (sub, peer);
2786     return;
2787   }
2788 }
2789
2790
2791 /**
2792  * @brief This is called when a channel is destroyed.
2793  *
2794  * Removes peer completely from our knowledge if the send_channel was destroyed
2795  * Otherwise simply delete the recv_channel
2796  * Also check if the knowledge about this peer is still needed.
2797  * If not, remove this peer from our knowledge.
2798  *
2799  * @param cls The closure - Context to the channel
2800  * @param channel The channel being closed
2801  */
2802 static void
2803 cleanup_destroyed_channel (void *cls,
2804                            const struct GNUNET_CADET_Channel *channel)
2805 {
2806   struct ChannelCtx *channel_ctx = cls;
2807   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2808   (void) channel;
2809
2810   channel_ctx->channel = NULL;
2811   remove_channel_ctx (channel_ctx);
2812   if (NULL != peer_ctx &&
2813       peer_ctx->send_channel_ctx == channel_ctx &&
2814       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2815   {
2816     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2817   }
2818 }
2819
2820 /***********************************************************************
2821  * /Util functions
2822 ***********************************************************************/
2823
2824
2825
2826 /***********************************************************************
2827  * Sub
2828 ***********************************************************************/
2829
2830 /**
2831  * @brief Create a new Sub
2832  *
2833  * @param hash Hash of value shared among rps instances on other hosts that
2834  *        defines a subgroup to sample from.
2835  * @param sampler_size Size of the sampler
2836  * @param round_interval Interval (in average) between two rounds
2837  *
2838  * @return Sub
2839  */
2840 struct Sub *
2841 new_sub (const struct GNUNET_HashCode *hash,
2842          uint32_t sampler_size,
2843          struct GNUNET_TIME_Relative round_interval)
2844 {
2845   struct Sub *sub;
2846
2847   sub = GNUNET_new (struct Sub);
2848
2849   /* With the hash generated from the secret value this service only connects
2850    * to rps instances that share the value */
2851   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2852     GNUNET_MQ_hd_fixed_size (peer_check,
2853                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2854                              struct GNUNET_MessageHeader,
2855                              NULL),
2856     GNUNET_MQ_hd_fixed_size (peer_push,
2857                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2858                              struct GNUNET_MessageHeader,
2859                              NULL),
2860     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2861                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2862                              struct GNUNET_MessageHeader,
2863                              NULL),
2864     GNUNET_MQ_hd_var_size (peer_pull_reply,
2865                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2866                            struct GNUNET_RPS_P2P_PullReplyMessage,
2867                            NULL),
2868     GNUNET_MQ_handler_end ()
2869   };
2870   sub->hash = *hash;
2871   sub->cadet_port =
2872     GNUNET_CADET_open_port (cadet_handle,
2873                             &sub->hash,
2874                             &handle_inbound_channel, /* Connect handler */
2875                             sub, /* cls */
2876                             NULL, /* WindowSize handler */
2877                             &cleanup_destroyed_channel, /* Disconnect handler */
2878                             cadet_handlers);
2879   if (NULL == sub->cadet_port)
2880   {
2881     LOG (GNUNET_ERROR_TYPE_ERROR,
2882         "Cadet port `%s' is already in use.\n",
2883         GNUNET_APPLICATION_PORT_RPS);
2884     GNUNET_assert (0);
2885   }
2886
2887   /* Set up general data structure to keep track about peers */
2888   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2889   if (GNUNET_OK !=
2890       GNUNET_CONFIGURATION_get_value_filename (cfg,
2891                                                "rps",
2892                                                "FILENAME_VALID_PEERS",
2893                                                &sub->filename_valid_peers))
2894   {
2895     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2896                                "rps",
2897                                "FILENAME_VALID_PEERS");
2898   }
2899   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2900   {
2901     char *tmp_filename_valid_peers;
2902     char str_hash[105];
2903
2904     GNUNET_snprintf (str_hash,
2905                      sizeof (str_hash),
2906                      GNUNET_h2s_full (hash));
2907     tmp_filename_valid_peers = sub->filename_valid_peers;
2908     GNUNET_asprintf (&sub->filename_valid_peers,
2909                      "%s%s",
2910                      tmp_filename_valid_peers,
2911                      str_hash);
2912     GNUNET_free (tmp_filename_valid_peers);
2913   }
2914   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2915
2916   /* Set up the sampler */
2917   sub->sampler_size_est_min = sampler_size;
2918   sub->sampler_size_est_need = sampler_size;;
2919   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2920   GNUNET_assert (0 != round_interval.rel_value_us);
2921   sub->round_interval = round_interval;
2922   sub->sampler = RPS_sampler_init (sampler_size,
2923                                   round_interval);
2924
2925   /* Logging of internals */
2926 #ifdef TO_FILE_FULL
2927   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2928 #endif /* TO_FILE_FULL */
2929 #ifdef TO_FILE
2930 #ifdef TO_FILE_FULL
2931   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2932                                                        "observed");
2933 #endif /* TO_FILE_FULL */
2934   sub->num_observed_peers = 0;
2935   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2936                                                                     GNUNET_NO);
2937 #endif /* TO_FILE */
2938
2939   /* Set up data structures for gossip */
2940   sub->push_map = CustomPeerMap_create (4);
2941   sub->pull_map = CustomPeerMap_create (4);
2942   sub->view_size_est_min = sampler_size;;
2943   sub->view = View_create (sub->view_size_est_min);
2944   if (sub == msub)
2945   {
2946     GNUNET_STATISTICS_set (stats,
2947                            "view size aim",
2948                            sub->view_size_est_min,
2949                            GNUNET_NO);
2950   }
2951
2952   /* Start executing rounds */
2953   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2954
2955   return sub;
2956 }
2957
2958
2959 #ifdef TO_FILE
2960 /**
2961  * @brief Write all numbers in the given array into the given file
2962  *
2963  * Single numbers devided by a newline
2964  *
2965  * @param hist_array[] the array to dump
2966  * @param file_name file to dump into
2967  */
2968 static void
2969 write_histogram_to_file (const uint32_t hist_array[],
2970                          const char *file_name)
2971 {
2972   char collect_str[SIZE_DUMP_FILE + 1] = "";
2973   char *recv_str_iter;
2974   char *file_name_full;
2975
2976   recv_str_iter = collect_str;
2977   file_name_full = store_prefix_file_name (&own_identity,
2978                                            file_name);
2979   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2980   {
2981     char collect_str_tmp[8];
2982
2983     GNUNET_snprintf (collect_str_tmp,
2984                      sizeof (collect_str_tmp),
2985                      "%" PRIu32 "\n",
2986                      hist_array[i]);
2987     recv_str_iter = stpncpy (recv_str_iter,
2988                              collect_str_tmp,
2989                              6);
2990   }
2991   (void) stpcpy (recv_str_iter,
2992                  "\n");
2993   LOG (GNUNET_ERROR_TYPE_DEBUG,
2994        "Writing push stats to disk\n");
2995   to_file_w_len (file_name_full,
2996                  SIZE_DUMP_FILE,
2997                  collect_str);
2998   GNUNET_free (file_name_full);
2999 }
3000 #endif /* TO_FILE */
3001
3002
3003 /**
3004  * @brief Destroy Sub.
3005  *
3006  * @param sub Sub to destroy
3007  */
3008 static void
3009 destroy_sub (struct Sub *sub)
3010 {
3011   GNUNET_assert (NULL != sub);
3012   GNUNET_assert (NULL != sub->do_round_task);
3013   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3014   sub->do_round_task = NULL;
3015
3016   /* Disconnect from cadet */
3017   GNUNET_CADET_close_port (sub->cadet_port);
3018   sub->cadet_port= NULL;
3019
3020   /* Clean up data structures for peers */
3021   RPS_sampler_destroy (sub->sampler);
3022   sub->sampler = NULL;
3023   View_destroy (sub->view);
3024   sub->view = NULL;
3025   CustomPeerMap_destroy (sub->push_map);
3026   sub->push_map = NULL;
3027   CustomPeerMap_destroy (sub->pull_map);
3028   sub->pull_map = NULL;
3029   peers_terminate (sub);
3030
3031   /* Free leftover data structures */
3032 #ifdef TO_FILE_FULL
3033   GNUNET_free (sub->file_name_view_log);
3034   sub->file_name_view_log = NULL;
3035 #endif /* TO_FILE_FULL */
3036 #ifdef TO_FILE
3037 #ifdef TO_FILE_FULL
3038   GNUNET_free (sub->file_name_observed_log);
3039   sub->file_name_observed_log = NULL;
3040 #endif /* TO_FILE_FULL */
3041
3042   /* Write push frequencies to disk */
3043   write_histogram_to_file (sub->push_recv,
3044                            "push_recv");
3045
3046   /* Write push deltas to disk */
3047   write_histogram_to_file (sub->push_delta,
3048                            "push_delta");
3049
3050   /* Write pull delays to disk */
3051   write_histogram_to_file (sub->pull_delays,
3052                            "pull_delays");
3053
3054   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3055   sub->observed_unique_peers = NULL;
3056 #endif /* TO_FILE */
3057
3058   GNUNET_free (sub);
3059 }
3060
3061
3062 /***********************************************************************
3063  * /Sub
3064 ***********************************************************************/
3065
3066
3067 /***********************************************************************
3068  * Core handlers
3069 ***********************************************************************/
3070
3071 /**
3072  * @brief Callback on initialisation of Core.
3073  *
3074  * @param cls - unused
3075  * @param my_identity - unused
3076  */
3077 void
3078 core_init (void *cls,
3079            const struct GNUNET_PeerIdentity *my_identity)
3080 {
3081   (void) cls;
3082   (void) my_identity;
3083
3084   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3085 }
3086
3087
3088 /**
3089  * @brief Callback for core.
3090  * Method called whenever a given peer connects.
3091  *
3092  * @param cls closure - unused
3093  * @param peer peer identity this notification is about
3094  * @return closure given to #core_disconnects as peer_cls
3095  */
3096 void *
3097 core_connects (void *cls,
3098                const struct GNUNET_PeerIdentity *peer,
3099                struct GNUNET_MQ_Handle *mq)
3100 {
3101   (void) cls;
3102   (void) mq;
3103
3104   GNUNET_assert (GNUNET_YES ==
3105                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3106                                                     peer,
3107                                                     NULL,
3108                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3109   return NULL;
3110 }
3111
3112
3113 /**
3114  * @brief Callback for core.
3115  * Method called whenever a peer disconnects.
3116  *
3117  * @param cls closure - unused
3118  * @param peer peer identity this notification is about
3119  * @param peer_cls closure given in #core_connects - unused
3120  */
3121 void
3122 core_disconnects (void *cls,
3123                   const struct GNUNET_PeerIdentity *peer,
3124                   void *peer_cls)
3125 {
3126   (void) cls;
3127   (void) peer_cls;
3128
3129   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3130 }
3131
3132 /***********************************************************************
3133  * /Core handlers
3134 ***********************************************************************/
3135
3136
3137 /**
3138  * @brief Destroy the context for a (connected) client
3139  *
3140  * @param cli_ctx Context to destroy
3141  */
3142 static void
3143 destroy_cli_ctx (struct ClientContext *cli_ctx)
3144 {
3145   GNUNET_assert (NULL != cli_ctx);
3146   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3147                                cli_ctx_tail,
3148                                cli_ctx);
3149   if (NULL != cli_ctx->sub)
3150   {
3151     destroy_sub (cli_ctx->sub);
3152     cli_ctx->sub = NULL;
3153   }
3154   GNUNET_free (cli_ctx);
3155 }
3156
3157
3158 /**
3159  * @brief Update sizes in sampler and view on estimate update from nse service
3160  *
3161  * @param sub Sub
3162  * @param logestimate the log(Base 2) value of the current network size estimate
3163  * @param std_dev standard deviation for the estimate
3164  */
3165 static void
3166 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3167 {
3168   double estimate;
3169   //double scale; // TODO this might go gloabal/config
3170
3171   LOG (GNUNET_ERROR_TYPE_DEBUG,
3172        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3173        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3174   //scale = .01;
3175   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3176   // GNUNET_NSE_log_estimate_to_n (logestimate);
3177   estimate = pow (estimate, 1.0 / 3);
3178   // TODO add if std_dev is a number
3179   // estimate += (std_dev * scale);
3180   if (sub->view_size_est_min < ceil (estimate))
3181   {
3182     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3183     sub->sampler_size_est_need = estimate;
3184     sub->view_size_est_need = estimate;
3185   } else
3186   {
3187     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3188     //sub->sampler_size_est_need = sub->view_size_est_min;
3189     sub->view_size_est_need = sub->view_size_est_min;
3190   }
3191   if (sub == msub)
3192   {
3193     GNUNET_STATISTICS_set (stats,
3194                            "view size aim",
3195                            sub->view_size_est_need,
3196                            GNUNET_NO);
3197   }
3198
3199   /* If the NSE has changed adapt the lists accordingly */
3200   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3201   View_change_len (sub->view, sub->view_size_est_need);
3202 }
3203
3204
3205 /**
3206  * Function called by NSE.
3207  *
3208  * Updates sizes of sampler list and view and adapt those lists
3209  * accordingly.
3210  *
3211  * implements #GNUNET_NSE_Callback
3212  *
3213  * @param cls Closure - unused
3214  * @param timestamp time when the estimate was received from the server (or created by the server)
3215  * @param logestimate the log(Base 2) value of the current network size estimate
3216  * @param std_dev standard deviation for the estimate
3217  */
3218 static void
3219 nse_callback (void *cls,
3220               struct GNUNET_TIME_Absolute timestamp,
3221               double logestimate, double std_dev)
3222 {
3223   (void) cls;
3224   (void) timestamp;
3225   struct ClientContext *cli_ctx_iter;
3226
3227   adapt_sizes (msub, logestimate, std_dev);
3228   for (cli_ctx_iter = cli_ctx_head;
3229       NULL != cli_ctx_iter;
3230       cli_ctx_iter = cli_ctx_iter->next)
3231   {
3232     if (NULL != cli_ctx_iter->sub)
3233     {
3234       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3235     }
3236   }
3237 }
3238
3239
3240 /**
3241  * @brief This function is called, when the client seeds peers.
3242  * It verifies that @a msg is well-formed.
3243  *
3244  * @param cls the closure (#ClientContext)
3245  * @param msg the message
3246  * @return #GNUNET_OK if @a msg is well-formed
3247  *         #GNUNET_SYSERR otherwise
3248  */
3249 static int
3250 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3251 {
3252   struct ClientContext *cli_ctx = cls;
3253   uint16_t msize = ntohs (msg->header.size);
3254   uint32_t num_peers = ntohl (msg->num_peers);
3255
3256   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3257   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3258        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3259   {
3260     LOG (GNUNET_ERROR_TYPE_ERROR,
3261         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3262         ntohl (msg->num_peers),
3263         (msize / sizeof (struct GNUNET_PeerIdentity)));
3264     GNUNET_break (0);
3265     GNUNET_SERVICE_client_drop (cli_ctx->client);
3266     return GNUNET_SYSERR;
3267   }
3268   return GNUNET_OK;
3269 }
3270
3271
3272 /**
3273  * Handle seed from the client.
3274  *
3275  * @param cls closure
3276  * @param message the actual message
3277  */
3278 static void
3279 handle_client_seed (void *cls,
3280                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3281 {
3282   struct ClientContext *cli_ctx = cls;
3283   struct GNUNET_PeerIdentity *peers;
3284   uint32_t num_peers;
3285   uint32_t i;
3286
3287   num_peers = ntohl (msg->num_peers);
3288   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3289
3290   LOG (GNUNET_ERROR_TYPE_DEBUG,
3291        "Client seeded peers:\n");
3292   print_peer_list (peers, num_peers);
3293
3294   for (i = 0; i < num_peers; i++)
3295   {
3296     LOG (GNUNET_ERROR_TYPE_DEBUG,
3297          "Updating samplers with seed %" PRIu32 ": %s\n",
3298          i,
3299          GNUNET_i2s (&peers[i]));
3300
3301     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3302     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3303   }
3304   GNUNET_SERVICE_client_continue (cli_ctx->client);
3305 }
3306
3307
3308 /**
3309  * Handle RPS request from the client.
3310  *
3311  * @param cls Client context
3312  * @param message Message containing the numer of updates the client wants to
3313  * receive
3314  */
3315 static void
3316 handle_client_view_request (void *cls,
3317                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3318 {
3319   struct ClientContext *cli_ctx = cls;
3320   uint64_t num_updates;
3321
3322   num_updates = ntohl (msg->num_updates);
3323
3324   LOG (GNUNET_ERROR_TYPE_DEBUG,
3325        "Client requested %" PRIu64 " updates of view.\n",
3326        num_updates);
3327
3328   GNUNET_assert (NULL != cli_ctx);
3329   cli_ctx->view_updates_left = num_updates;
3330   send_view (cli_ctx, NULL, 0);
3331   GNUNET_SERVICE_client_continue (cli_ctx->client);
3332 }
3333
3334
3335 /**
3336  * @brief Handle the cancellation of the view updates.
3337  *
3338  * @param cls The client context
3339  * @param msg Unused
3340  */
3341 static void
3342 handle_client_view_cancel (void *cls,
3343                            const struct GNUNET_MessageHeader *msg)
3344 {
3345   struct ClientContext *cli_ctx = cls;
3346   (void) msg;
3347
3348   LOG (GNUNET_ERROR_TYPE_DEBUG,
3349        "Client does not want to receive updates of view any more.\n");
3350
3351   GNUNET_assert (NULL != cli_ctx);
3352   cli_ctx->view_updates_left = 0;
3353   GNUNET_SERVICE_client_continue (cli_ctx->client);
3354   if (GNUNET_YES == cli_ctx->stream_update)
3355   {
3356     destroy_cli_ctx (cli_ctx);
3357   }
3358 }
3359
3360
3361 /**
3362  * Handle RPS request for biased stream from the client.
3363  *
3364  * @param cls Client context
3365  * @param message unused
3366  */
3367 static void
3368 handle_client_stream_request (void *cls,
3369                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3370 {
3371   struct ClientContext *cli_ctx = cls;
3372   (void) msg;
3373
3374   LOG (GNUNET_ERROR_TYPE_DEBUG,
3375        "Client requested peers from biased stream.\n");
3376   cli_ctx->stream_update = GNUNET_YES;
3377
3378   GNUNET_assert (NULL != cli_ctx);
3379   GNUNET_SERVICE_client_continue (cli_ctx->client);
3380 }
3381
3382
3383 /**
3384  * @brief Handles the cancellation of the stream of biased peer ids
3385  *
3386  * @param cls The client context
3387  * @param msg unused
3388  */
3389 static void
3390 handle_client_stream_cancel (void *cls,
3391                              const struct GNUNET_MessageHeader *msg)
3392 {
3393   struct ClientContext *cli_ctx = cls;
3394   (void) msg;
3395
3396   LOG (GNUNET_ERROR_TYPE_DEBUG,
3397        "Client canceled receiving peers from biased stream.\n");
3398   cli_ctx->stream_update = GNUNET_NO;
3399
3400   GNUNET_assert (NULL != cli_ctx);
3401   GNUNET_SERVICE_client_continue (cli_ctx->client);
3402 }
3403
3404
3405 /**
3406  * @brief Create and start a Sub.
3407  *
3408  * @param cls Closure - unused
3409  * @param msg Message containing the necessary information
3410  */
3411 static void
3412 handle_client_start_sub (void *cls,
3413                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3414 {
3415   struct ClientContext *cli_ctx = cls;
3416
3417   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3418   if (NULL != cli_ctx->sub &&
3419       0 != memcmp (&cli_ctx->sub->hash,
3420                    &msg->hash,
3421                    sizeof (struct GNUNET_HashCode)))
3422   {
3423     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3424     destroy_sub (cli_ctx->sub);
3425     cli_ctx->sub = NULL;
3426   }
3427   cli_ctx->sub = new_sub (&msg->hash,
3428                          msub->sampler_size_est_min, // TODO make api input?
3429                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3430   GNUNET_SERVICE_client_continue (cli_ctx->client);
3431 }
3432
3433
3434 /**
3435  * @brief Destroy the Sub
3436  *
3437  * @param cls Closure - unused
3438  * @param msg Message containing the hash that identifies the Sub
3439  */
3440 static void
3441 handle_client_stop_sub (void *cls,
3442                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3443 {
3444   struct ClientContext *cli_ctx = cls;
3445
3446   GNUNET_assert (NULL != cli_ctx->sub);
3447   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3448   {
3449     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3450   }
3451   destroy_sub (cli_ctx->sub);
3452   cli_ctx->sub = NULL;
3453   GNUNET_SERVICE_client_continue (cli_ctx->client);
3454 }
3455
3456
3457 /**
3458  * Handle a CHECK_LIVE message from another peer.
3459  *
3460  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3461  * the channel is blocked for all other communication.
3462  *
3463  * @param cls Closure - Context of channel
3464  * @param msg Message - unused
3465  */
3466 static void
3467 handle_peer_check (void *cls,
3468                    const struct GNUNET_MessageHeader *msg)
3469 {
3470   const struct ChannelCtx *channel_ctx = cls;
3471   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3472   (void) msg;
3473
3474   LOG (GNUNET_ERROR_TYPE_DEBUG,
3475       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3476   if (channel_ctx->peer_ctx->sub == msub)
3477   {
3478     GNUNET_STATISTICS_update (stats,
3479                               "# pending online checks",
3480                               -1,
3481                               GNUNET_NO);
3482   }
3483
3484   GNUNET_CADET_receive_done (channel_ctx->channel);
3485 }
3486
3487
3488 /**
3489  * Handle a PUSH message from another peer.
3490  *
3491  * Check the proof of work and store the PeerID
3492  * in the temporary list for pushed PeerIDs.
3493  *
3494  * @param cls Closure - Context of channel
3495  * @param msg Message - unused
3496  */
3497 static void
3498 handle_peer_push (void *cls,
3499                   const struct GNUNET_MessageHeader *msg)
3500 {
3501   const struct ChannelCtx *channel_ctx = cls;
3502   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3503   (void) msg;
3504
3505   // (check the proof of work (?))
3506
3507   LOG (GNUNET_ERROR_TYPE_DEBUG,
3508        "Received PUSH (%s)\n",
3509        GNUNET_i2s (peer));
3510   if (channel_ctx->peer_ctx->sub == msub)
3511   {
3512     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3513     if (NULL != map_single_hop &&
3514         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3515                                                              peer))
3516     {
3517       GNUNET_STATISTICS_update (stats,
3518                                 "# push message received (multi-hop peer)",
3519                                 1,
3520                                 GNUNET_NO);
3521     }
3522   }
3523
3524   #if ENABLE_MALICIOUS
3525   struct AttackedPeer *tmp_att_peer;
3526
3527   if ( (1 == mal_type) ||
3528        (3 == mal_type) )
3529   { /* Try to maximise representation */
3530     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3531     tmp_att_peer->peer_id = *peer;
3532     if (NULL == att_peer_set)
3533       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3534     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3535                                                              peer))
3536     {
3537       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3538                                    att_peers_tail,
3539                                    tmp_att_peer);
3540       add_peer_array_to_set (peer, 1, att_peer_set);
3541     }
3542     else
3543     {
3544       GNUNET_free (tmp_att_peer);
3545     }
3546   }
3547
3548
3549   else if (2 == mal_type)
3550   {
3551     /* We attack one single well-known peer - simply ignore */
3552   }
3553   #endif /* ENABLE_MALICIOUS */
3554
3555   /* Add the sending peer to the push_map */
3556   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3557
3558   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3559                                      &channel_ctx->peer_ctx->peer_id));
3560   GNUNET_CADET_receive_done (channel_ctx->channel);
3561 }
3562
3563
3564 /**
3565  * Handle PULL REQUEST request message from another peer.
3566  *
3567  * Reply with the view of PeerIDs.
3568  *
3569  * @param cls Closure - Context of channel
3570  * @param msg Message - unused
3571  */
3572 static void
3573 handle_peer_pull_request (void *cls,
3574                           const struct GNUNET_MessageHeader *msg)
3575 {
3576   const struct ChannelCtx *channel_ctx = cls;
3577   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3578   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3579   const struct GNUNET_PeerIdentity *view_array;
3580   (void) msg;
3581
3582   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3583   if (peer_ctx->sub == msub)
3584   {
3585     GNUNET_STATISTICS_update(stats,
3586                              "# pull request message received",
3587                              1,
3588                              GNUNET_NO);
3589     if (NULL != map_single_hop &&
3590         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3591                                                              &peer_ctx->peer_id))
3592     {
3593       GNUNET_STATISTICS_update (stats,
3594                                 "# pull request message received (multi-hop peer)",
3595                                 1,
3596                                 GNUNET_NO);
3597     }
3598   }
3599
3600   #if ENABLE_MALICIOUS
3601   if (1 == mal_type
3602       || 3 == mal_type)
3603   { /* Try to maximise representation */
3604     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3605   }
3606
3607   else if (2 == mal_type)
3608   { /* Try to partition network */
3609     if (0 == GNUNET_memcmp (&attacked_peer, peer))
3610     {
3611       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3612     }
3613   }
3614   #endif /* ENABLE_MALICIOUS */
3615
3616   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3617                                      &channel_ctx->peer_ctx->peer_id));
3618   GNUNET_CADET_receive_done (channel_ctx->channel);
3619   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3620   send_pull_reply (peer_ctx,
3621                    view_array,
3622                    View_size (channel_ctx->peer_ctx->sub->view));
3623 }
3624
3625
3626 /**
3627  * Check whether we sent a corresponding request and
3628  * whether this reply is the first one.
3629  *
3630  * @param cls Closure - Context of channel
3631  * @param msg Message containing the replied peers
3632  */
3633 static int
3634 check_peer_pull_reply (void *cls,
3635                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3636 {
3637   struct ChannelCtx *channel_ctx = cls;
3638   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3639
3640   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3641   {
3642     GNUNET_break_op (0);
3643     return GNUNET_SYSERR;
3644   }
3645
3646   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3647       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3648   {
3649     LOG (GNUNET_ERROR_TYPE_ERROR,
3650         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3651         ntohl (msg->num_peers),
3652         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3653             sizeof (struct GNUNET_PeerIdentity));
3654     GNUNET_break_op (0);
3655     return GNUNET_SYSERR;
3656   }
3657
3658   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3659                                      &sender_ctx->peer_id,
3660                                      Peers_PULL_REPLY_PENDING))
3661   {
3662     LOG (GNUNET_ERROR_TYPE_WARNING,
3663         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3664         GNUNET_i2s (&sender_ctx->peer_id));
3665     if (sender_ctx->sub == msub)
3666     {
3667       GNUNET_STATISTICS_update (stats,
3668                                 "# unrequested pull replies",
3669                                 1,
3670                                 GNUNET_NO);
3671     }
3672   }
3673   return GNUNET_OK;
3674 }
3675
3676
3677 /**
3678  * Handle PULL REPLY message from another peer.
3679  *
3680  * @param cls Closure
3681  * @param msg The message header
3682  */
3683 static void
3684 handle_peer_pull_reply (void *cls,
3685                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3686 {
3687   const struct ChannelCtx *channel_ctx = cls;
3688   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3689   const struct GNUNET_PeerIdentity *peers;
3690   struct Sub *sub = channel_ctx->peer_ctx->sub;
3691   uint32_t i;
3692 #if ENABLE_MALICIOUS
3693   struct AttackedPeer *tmp_att_peer;
3694 #endif /* ENABLE_MALICIOUS */
3695
3696   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3697   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3698   if (channel_ctx->peer_ctx->sub == msub)
3699   {
3700     GNUNET_STATISTICS_update (stats,
3701                               "# pull reply messages received",
3702                               1,
3703                               GNUNET_NO);
3704     if (NULL != map_single_hop &&
3705         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3706           &channel_ctx->peer_ctx->peer_id))
3707     {
3708       GNUNET_STATISTICS_update (stats,
3709                                 "# pull reply messages received (multi-hop peer)",
3710                                 1,
3711                                 GNUNET_NO);
3712     }
3713   }
3714
3715   #if ENABLE_MALICIOUS
3716   // We shouldn't even receive pull replies as we're not sending
3717   if (2 == mal_type)
3718   {
3719   }
3720   #endif /* ENABLE_MALICIOUS */
3721
3722   /* Do actual logic */
3723   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3724
3725   LOG (GNUNET_ERROR_TYPE_DEBUG,
3726        "PULL REPLY received, got following %u peers:\n",
3727        ntohl (msg->num_peers));
3728
3729   for (i = 0; i < ntohl (msg->num_peers); i++)
3730   {
3731     LOG (GNUNET_ERROR_TYPE_DEBUG,
3732          "%u. %s\n",
3733          i,
3734          GNUNET_i2s (&peers[i]));
3735
3736     #if ENABLE_MALICIOUS
3737     if ((NULL != att_peer_set) &&
3738         (1 == mal_type || 3 == mal_type))
3739     { /* Add attacked peer to local list */
3740       // TODO check if we sent a request and this was the first reply
3741       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3742                                                                &peers[i])
3743           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3744                                                                   &peers[i]))
3745       {
3746         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3747         tmp_att_peer->peer_id = peers[i];
3748         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3749                                      att_peers_tail,
3750                                      tmp_att_peer);
3751         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3752       }
3753       continue;
3754     }
3755     #endif /* ENABLE_MALICIOUS */
3756     /* Make sure we 'know' about this peer */
3757     (void) insert_peer (channel_ctx->peer_ctx->sub,
3758                         &peers[i]);
3759
3760     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3761                                         &peers[i]))
3762     {
3763       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3764                          &peers[i]);
3765     }
3766     else
3767     {
3768       schedule_operation (channel_ctx->peer_ctx,
3769                           insert_in_pull_map,
3770                           channel_ctx->peer_ctx->sub); /* cls */
3771       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3772                                       &peers[i]);
3773     }
3774   }
3775
3776   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3777                                  sender),
3778                    Peers_PULL_REPLY_PENDING);
3779   clean_peer (channel_ctx->peer_ctx->sub,
3780               sender);
3781
3782   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3783                                      sender));
3784   GNUNET_CADET_receive_done (channel_ctx->channel);
3785 }
3786
3787
3788 /**
3789  * Compute a random delay.
3790  * A uniformly distributed value between mean + spread and mean - spread.
3791  *
3792  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3793  * It would return a random value between 2 and 6 min.
3794  *
3795  * @param mean the mean time until the next round
3796  * @param spread the inverse amount of deviation from the mean
3797  */
3798 static struct GNUNET_TIME_Relative
3799 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3800                     unsigned int spread)
3801 {
3802   struct GNUNET_TIME_Relative half_interval;
3803   struct GNUNET_TIME_Relative ret;
3804   unsigned int rand_delay;
3805   unsigned int max_rand_delay;
3806
3807   if (0 == spread)
3808   {
3809     LOG (GNUNET_ERROR_TYPE_WARNING,
3810          "Not accepting spread of 0\n");
3811     GNUNET_break (0);
3812     GNUNET_assert (0);
3813   }
3814   GNUNET_assert (0 != mean.rel_value_us);
3815
3816   /* Compute random time value between spread * mean and spread * mean */
3817   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3818
3819   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3820   /**
3821    * Compute random value between (0 and 1) * round_interval
3822    * via multiplying round_interval with a 'fraction' (0 to value)/value
3823    */
3824   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3825   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3826   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3827   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3828
3829   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3830     LOG (GNUNET_ERROR_TYPE_WARNING,
3831          "Returning FOREVER_REL\n");
3832
3833   return ret;
3834 }
3835
3836
3837 /**
3838  * Send single pull request
3839  *
3840  * @param peer_ctx Context to the peer to send request to
3841  */
3842 static void
3843 send_pull_request (struct PeerContext *peer_ctx)
3844 {
3845   struct GNUNET_MQ_Envelope *ev;
3846
3847   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3848                                                &peer_ctx->peer_id,
3849                                                Peers_PULL_REPLY_PENDING));
3850   SET_PEER_FLAG (peer_ctx,
3851                  Peers_PULL_REPLY_PENDING);
3852   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3853
3854   LOG (GNUNET_ERROR_TYPE_DEBUG,
3855        "Going to send PULL REQUEST to peer %s.\n",
3856        GNUNET_i2s (&peer_ctx->peer_id));
3857
3858   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3859   send_message (peer_ctx,
3860                 ev,
3861                 "PULL REQUEST");
3862   if (peer_ctx->sub)
3863   {
3864     GNUNET_STATISTICS_update (stats,
3865                               "# pull request send issued",
3866                               1,
3867                               GNUNET_NO);
3868     if (NULL != map_single_hop &&
3869         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3870                                                              &peer_ctx->peer_id))
3871     {
3872       GNUNET_STATISTICS_update (stats,
3873                                 "# pull request send issued (multi-hop peer)",
3874                                 1,
3875                                 GNUNET_NO);
3876     }
3877   }
3878 }
3879
3880
3881 /**
3882  * Send single push
3883  *
3884  * @param peer_ctx Context of peer to send push to
3885  */
3886 static void
3887 send_push (struct PeerContext *peer_ctx)
3888 {
3889   struct GNUNET_MQ_Envelope *ev;
3890
3891   LOG (GNUNET_ERROR_TYPE_DEBUG,
3892        "Going to send PUSH to peer %s.\n",
3893        GNUNET_i2s (&peer_ctx->peer_id));
3894
3895   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3896   send_message (peer_ctx, ev, "PUSH");
3897   if (peer_ctx->sub)
3898   {
3899     GNUNET_STATISTICS_update (stats,
3900                               "# push send issued",
3901                               1,
3902                               GNUNET_NO);
3903     if (NULL != map_single_hop &&
3904         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3905                                                              &peer_ctx->peer_id))
3906     {
3907       GNUNET_STATISTICS_update (stats,
3908                                 "# push send issued (multi-hop peer)",
3909                                 1,
3910                                 GNUNET_NO);
3911     }
3912   }
3913 }
3914
3915
3916 #if ENABLE_MALICIOUS
3917
3918
3919 /**
3920  * @brief This function is called, when the client tells us to act malicious.
3921  * It verifies that @a msg is well-formed.
3922  *
3923  * @param cls the closure (#ClientContext)
3924  * @param msg the message
3925  * @return #GNUNET_OK if @a msg is well-formed
3926  */
3927 static int
3928 check_client_act_malicious (void *cls,
3929                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3930 {
3931   struct ClientContext *cli_ctx = cls;
3932   uint16_t msize = ntohs (msg->header.size);
3933   uint32_t num_peers = ntohl (msg->num_peers);
3934
3935   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3936   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3937        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3938   {
3939     LOG (GNUNET_ERROR_TYPE_ERROR,
3940         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3941         ntohl (msg->num_peers),
3942         (msize / sizeof (struct GNUNET_PeerIdentity)));
3943     GNUNET_break (0);
3944     GNUNET_SERVICE_client_drop (cli_ctx->client);
3945     return GNUNET_SYSERR;
3946   }
3947   return GNUNET_OK;
3948 }
3949
3950 /**
3951  * Turn RPS service to act malicious.
3952  *
3953  * @param cls Closure
3954  * @param client The client that sent the message
3955  * @param msg The message header
3956  */
3957 static void
3958 handle_client_act_malicious (void *cls,
3959                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3960 {
3961   struct ClientContext *cli_ctx = cls;
3962   struct GNUNET_PeerIdentity *peers;
3963   uint32_t num_mal_peers_sent;
3964   uint32_t num_mal_peers_old;
3965   struct Sub *sub = cli_ctx->sub;
3966
3967   if (NULL == sub) sub = msub;
3968   /* Do actual logic */
3969   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3970   mal_type = ntohl (msg->type);
3971   if (NULL == mal_peer_set)
3972     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3973
3974   LOG (GNUNET_ERROR_TYPE_DEBUG,
3975        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3976        mal_type,
3977        ntohl (msg->num_peers));
3978
3979   if (1 == mal_type)
3980   { /* Try to maximise representation */
3981     /* Add other malicious peers to those we already know */
3982
3983     num_mal_peers_sent = ntohl (msg->num_peers);
3984     num_mal_peers_old = num_mal_peers;
3985     GNUNET_array_grow (mal_peers,
3986                        num_mal_peers,
3987                        num_mal_peers + num_mal_peers_sent);
3988     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3989             peers,
3990             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3991
3992     /* Add all mal peers to mal_peer_set */
3993     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3994                            num_mal_peers_sent,
3995                            mal_peer_set);
3996
3997     /* Substitute do_round () with do_mal_round () */
3998     GNUNET_assert (NULL != sub->do_round_task);
3999     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4000     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4001   }
4002
4003   else if ( (2 == mal_type) ||
4004             (3 == mal_type) )
4005   { /* Try to partition the network */
4006     /* Add other malicious peers to those we already know */
4007
4008     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4009     num_mal_peers_old = num_mal_peers;
4010     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4011     GNUNET_array_grow (mal_peers,
4012                        num_mal_peers,
4013                        num_mal_peers + num_mal_peers_sent);
4014     if (NULL != mal_peers &&
4015         0 != num_mal_peers)
4016     {
4017       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4018               peers,
4019               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
4020
4021       /* Add all mal peers to mal_peer_set */
4022       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4023                              num_mal_peers_sent,
4024                              mal_peer_set);
4025     }
4026
4027     /* Store the one attacked peer */
4028     GNUNET_memcpy (&attacked_peer,
4029             &msg->attacked_peer,
4030             sizeof (struct GNUNET_PeerIdentity));
4031     /* Set the flag of the attacked peer to valid to avoid problems */
4032     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4033     {
4034       (void) issue_peer_online_check (sub, &attacked_peer);
4035     }
4036
4037     LOG (GNUNET_ERROR_TYPE_DEBUG,
4038          "Attacked peer is %s\n",
4039          GNUNET_i2s (&attacked_peer));
4040
4041     /* Substitute do_round () with do_mal_round () */
4042     if (NULL != sub->do_round_task)
4043     {
4044       /* Probably in shutdown */
4045       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4046       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4047     }
4048   }
4049   else if (0 == mal_type)
4050   { /* Stop acting malicious */
4051     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4052
4053     /* Substitute do_mal_round () with do_round () */
4054     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4055     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4056   }
4057   else
4058   {
4059     GNUNET_break (0);
4060     GNUNET_SERVICE_client_continue (cli_ctx->client);
4061   }
4062   GNUNET_SERVICE_client_continue (cli_ctx->client);
4063 }
4064
4065
4066 /**
4067  * Send out PUSHes and PULLs maliciously.
4068  *
4069  * This is executed regylary.
4070  *
4071  * @param cls Closure - Sub
4072  */
4073 static void
4074 do_mal_round (void *cls)
4075 {
4076   uint32_t num_pushes;
4077   uint32_t i;
4078   struct GNUNET_TIME_Relative time_next_round;
4079   struct AttackedPeer *tmp_att_peer;
4080   struct Sub *sub = cls;
4081
4082   LOG (GNUNET_ERROR_TYPE_DEBUG,
4083        "Going to execute next round maliciously type %" PRIu32 ".\n",
4084       mal_type);
4085   sub->do_round_task = NULL;
4086   GNUNET_assert (mal_type <= 3);
4087   /* Do malicious actions */
4088   if (1 == mal_type)
4089   { /* Try to maximise representation */
4090
4091     /* The maximum of pushes we're going to send this round */
4092     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4093                                          num_attacked_peers),
4094                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4095
4096     LOG (GNUNET_ERROR_TYPE_DEBUG,
4097          "Going to send %" PRIu32 " pushes\n",
4098          num_pushes);
4099
4100     /* Send PUSHes to attacked peers */
4101     for (i = 0 ; i < num_pushes ; i++)
4102     {
4103       if (att_peers_tail == att_peer_index)
4104         att_peer_index = att_peers_head;
4105       else
4106         att_peer_index = att_peer_index->next;
4107
4108       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4109     }
4110
4111     /* Send PULLs to some peers to learn about additional peers to attack */
4112     tmp_att_peer = att_peer_index;
4113     for (i = 0 ; i < num_pushes * alpha ; i++)
4114     {
4115       if (att_peers_tail == tmp_att_peer)
4116         tmp_att_peer = att_peers_head;
4117       else
4118         att_peer_index = tmp_att_peer->next;
4119
4120       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4121     }
4122   }
4123
4124
4125   else if (2 == mal_type)
4126   { /**
4127      * Try to partition the network
4128      * Send as many pushes to the attacked peer as possible
4129      * That is one push per round as it will ignore more.
4130      */
4131     (void) issue_peer_online_check (sub, &attacked_peer);
4132     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4133                                        &attacked_peer,
4134                                        Peers_ONLINE))
4135       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4136   }
4137
4138
4139   if (3 == mal_type)
4140   { /* Combined attack */
4141
4142     /* Send PUSH to attacked peers */
4143     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4144     {
4145       (void) issue_peer_online_check (sub, &attacked_peer);
4146       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4147                                          &attacked_peer,
4148                                          Peers_ONLINE))
4149       {
4150         LOG (GNUNET_ERROR_TYPE_DEBUG,
4151             "Goding to send push to attacked peer (%s)\n",
4152             GNUNET_i2s (&attacked_peer));
4153         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4154       }
4155     }
4156     (void) issue_peer_online_check (sub, &attacked_peer);
4157
4158     /* The maximum of pushes we're going to send this round */
4159     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4160                                          num_attacked_peers),
4161                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4162
4163     LOG (GNUNET_ERROR_TYPE_DEBUG,
4164          "Going to send %" PRIu32 " pushes\n",
4165          num_pushes);
4166
4167     for (i = 0; i < num_pushes; i++)
4168     {
4169       if (att_peers_tail == att_peer_index)
4170         att_peer_index = att_peers_head;
4171       else
4172         att_peer_index = att_peer_index->next;
4173
4174       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4175     }
4176
4177     /* Send PULLs to some peers to learn about additional peers to attack */
4178     tmp_att_peer = att_peer_index;
4179     for (i = 0; i < num_pushes * alpha; i++)
4180     {
4181       if (att_peers_tail == tmp_att_peer)
4182         tmp_att_peer = att_peers_head;
4183       else
4184         att_peer_index = tmp_att_peer->next;
4185
4186       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4187     }
4188   }
4189
4190   /* Schedule next round */
4191   time_next_round = compute_rand_delay (sub->round_interval, 2);
4192
4193   GNUNET_assert (NULL == sub->do_round_task);
4194   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4195                                                     &do_mal_round, sub);
4196   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4197 }
4198 #endif /* ENABLE_MALICIOUS */
4199
4200
4201 /**
4202  * Send out PUSHes and PULLs, possibly update #view, samplers.
4203  *
4204  * This is executed regylary.
4205  *
4206  * @param cls Closure - Sub
4207  */
4208 static void
4209 do_round (void *cls)
4210 {
4211   unsigned int i;
4212   const struct GNUNET_PeerIdentity *view_array;
4213   unsigned int *permut;
4214   unsigned int a_peers; /* Number of peers we send pushes to */
4215   unsigned int b_peers; /* Number of peers we send pull requests to */
4216   uint32_t first_border;
4217   uint32_t second_border;
4218   struct GNUNET_PeerIdentity peer;
4219   struct GNUNET_PeerIdentity *update_peer;
4220   struct Sub *sub = cls;
4221
4222   sub->num_rounds++;
4223   LOG (GNUNET_ERROR_TYPE_DEBUG,
4224        "Going to execute next round.\n");
4225   if (sub == msub)
4226   {
4227     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4228   }
4229   sub->do_round_task = NULL;
4230 #ifdef TO_FILE_FULL
4231   to_file (sub->file_name_view_log,
4232            "___ new round ___");
4233 #endif /* TO_FILE_FULL */
4234   view_array = View_get_as_array (sub->view);
4235   for (i = 0; i < View_size (sub->view); i++)
4236   {
4237     LOG (GNUNET_ERROR_TYPE_DEBUG,
4238          "\t%s\n", GNUNET_i2s (&view_array[i]));
4239 #ifdef TO_FILE_FULL
4240     to_file (sub->file_name_view_log,
4241              "=%s\t(do round)",
4242              GNUNET_i2s_full (&view_array[i]));
4243 #endif /* TO_FILE_FULL */
4244   }
4245
4246
4247   /* Send pushes and pull requests */
4248   if (0 < View_size (sub->view))
4249   {
4250     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4251                                            View_size (sub->view));
4252
4253     /* Send PUSHes */
4254     a_peers = ceil (alpha * View_size (sub->view));
4255
4256     LOG (GNUNET_ERROR_TYPE_DEBUG,
4257          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4258          a_peers, alpha, View_size (sub->view));
4259     for (i = 0; i < a_peers; i++)
4260     {
4261       peer = view_array[permut[i]];
4262       // FIXME if this fails schedule/loop this for later
4263       send_push (get_peer_ctx (sub->peer_map, &peer));
4264     }
4265
4266     /* Send PULL requests */
4267     b_peers = ceil (beta * View_size (sub->view));
4268     first_border = a_peers;
4269     second_border = a_peers + b_peers;
4270     if (second_border > View_size (sub->view))
4271     {
4272       first_border = View_size (sub->view) - b_peers;
4273       second_border = View_size (sub->view);
4274     }
4275     LOG (GNUNET_ERROR_TYPE_DEBUG,
4276         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4277         b_peers, beta, View_size (sub->view));
4278     for (i = first_border; i < second_border; i++)
4279     {
4280       peer = view_array[permut[i]];
4281       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4282                                          &peer,
4283                                          Peers_PULL_REPLY_PENDING))
4284       { // FIXME if this fails schedule/loop this for later
4285         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4286       }
4287     }
4288
4289     GNUNET_free (permut);
4290     permut = NULL;
4291   }
4292
4293
4294   /* Update view */
4295   /* TODO see how many peers are in push-/pull- list! */
4296
4297   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4298       (0 < CustomPeerMap_size (sub->push_map)) &&
4299       (0 < CustomPeerMap_size (sub->pull_map)))
4300   { /* If conditions for update are fulfilled, update */
4301     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4302
4303     uint32_t final_size;
4304     uint32_t peers_to_clean_size;
4305     struct GNUNET_PeerIdentity *peers_to_clean;
4306
4307     peers_to_clean = NULL;
4308     peers_to_clean_size = 0;
4309     GNUNET_array_grow (peers_to_clean,
4310                        peers_to_clean_size,
4311                        View_size (sub->view));
4312     GNUNET_memcpy (peers_to_clean,
4313             view_array,
4314             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4315
4316     /* Seems like recreating is the easiest way of emptying the peermap */
4317     View_clear (sub->view);
4318 #ifdef TO_FILE_FULL
4319     to_file (sub->file_name_view_log,
4320              "--- emptied ---");
4321 #endif /* TO_FILE_FULL */
4322
4323     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4324                                 CustomPeerMap_size (sub->push_map));
4325     second_border = first_border +
4326                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4327                                 CustomPeerMap_size (sub->pull_map));
4328     final_size    = second_border +
4329       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4330     LOG (GNUNET_ERROR_TYPE_DEBUG,
4331         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4332         first_border,
4333         second_border,
4334         final_size);
4335
4336     /* Update view with peers received through PUSHes */
4337     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4338                                            CustomPeerMap_size (sub->push_map));
4339     for (i = 0; i < first_border; i++)
4340     {
4341       int inserted;
4342       inserted = insert_in_view (sub,
4343                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4344                                                                   permut[i]));
4345       if (GNUNET_OK == inserted)
4346       {
4347         clients_notify_stream_peer (sub,
4348             1,
4349             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4350       }
4351 #ifdef TO_FILE_FULL
4352       to_file (sub->file_name_view_log,
4353                "+%s\t(push list)",
4354                GNUNET_i2s_full (&view_array[i]));
4355 #endif /* TO_FILE_FULL */
4356       // TODO change the peer_flags accordingly
4357     }
4358     GNUNET_free (permut);
4359     permut = NULL;
4360
4361     /* Update view with peers received through PULLs */
4362     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4363                                            CustomPeerMap_size (sub->pull_map));
4364     for (i = first_border; i < second_border; i++)
4365     {
4366       int inserted;
4367       inserted = insert_in_view (sub,
4368           CustomPeerMap_get_peer_by_index (sub->pull_map,
4369                                            permut[i - first_border]));
4370       if (GNUNET_OK == inserted)
4371       {
4372         clients_notify_stream_peer (sub,
4373             1,
4374             CustomPeerMap_get_peer_by_index (sub->pull_map,
4375                                              permut[i - first_border]));
4376       }
4377 #ifdef TO_FILE_FULL
4378       to_file (sub->file_name_view_log,
4379                "+%s\t(pull list)",
4380                GNUNET_i2s_full (&view_array[i]));
4381 #endif /* TO_FILE_FULL */
4382       // TODO change the peer_flags accordingly
4383     }
4384     GNUNET_free (permut);
4385     permut = NULL;
4386
4387     /* Update view with peers from history */
4388     RPS_sampler_get_n_rand_peers (sub->sampler,
4389                                   final_size - second_border,
4390                                   hist_update,
4391                                   sub);
4392     // TODO change the peer_flags accordingly
4393
4394     for (i = 0; i < View_size (sub->view); i++)
4395       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4396
4397     /* Clean peers that were removed from the view */
4398     for (i = 0; i < peers_to_clean_size; i++)
4399     {
4400 #ifdef TO_FILE_FULL
4401       to_file (sub->file_name_view_log,
4402                "-%s",
4403                GNUNET_i2s_full (&peers_to_clean[i]));
4404 #endif /* TO_FILE_FULL */
4405       clean_peer (sub, &peers_to_clean[i]);
4406     }
4407
4408     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4409     clients_notify_view_update (sub);
4410   } else {
4411     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4412     if (sub == msub)
4413     {
4414       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4415       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4416           !(0 >= CustomPeerMap_size (sub->pull_map)))
4417         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4418       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4419           (0 >= CustomPeerMap_size (sub->pull_map)))
4420         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4421       if (0 >= CustomPeerMap_size (sub->push_map) &&
4422           !(0 >= CustomPeerMap_size (sub->pull_map)))
4423         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4424       if (0 >= CustomPeerMap_size (sub->push_map) &&
4425           (0 >= CustomPeerMap_size (sub->pull_map)))
4426         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4427       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4428           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4429           0 >= CustomPeerMap_size (sub->push_map))
4430         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4431     }
4432   }
4433   // TODO independent of that also get some peers from CADET_get_peers()?
4434   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4435   {
4436     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4437   }
4438   else
4439   {
4440     LOG (GNUNET_ERROR_TYPE_WARNING,
4441          "Push map size too big for histogram (%u, %u)\n",
4442          CustomPeerMap_size (sub->push_map),
4443          HISTOGRAM_FILE_SLOTS);
4444   }
4445   // FIXME check bounds of histogram
4446   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4447                    (alpha * sub->view_size_est_need)) +
4448                           (HISTOGRAM_FILE_SLOTS/2)]++;
4449   if (sub == msub)
4450   {
4451     GNUNET_STATISTICS_set (stats,
4452         "# peers in push map at end of round",
4453         CustomPeerMap_size (sub->push_map),
4454         GNUNET_NO);
4455     GNUNET_STATISTICS_set (stats,
4456         "# peers in pull map at end of round",
4457         CustomPeerMap_size (sub->pull_map),
4458         GNUNET_NO);
4459     GNUNET_STATISTICS_set (stats,
4460         "# peers in view at end of round",
4461         View_size (sub->view),
4462         GNUNET_NO);
4463     GNUNET_STATISTICS_set (stats,
4464         "# expected pushes",
4465         alpha * sub->view_size_est_need,
4466         GNUNET_NO);
4467     GNUNET_STATISTICS_set (stats,
4468         "delta expected - received pushes",
4469         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4470         GNUNET_NO);
4471   }
4472
4473   LOG (GNUNET_ERROR_TYPE_DEBUG,
4474        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4475        CustomPeerMap_size (sub->push_map),
4476        CustomPeerMap_size (sub->pull_map),
4477        alpha,
4478        View_size (sub->view),
4479        alpha * View_size (sub->view));
4480
4481   /* Update samplers */
4482   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4483   {
4484     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4485     LOG (GNUNET_ERROR_TYPE_DEBUG,
4486          "Updating with peer %s from push list\n",
4487          GNUNET_i2s (update_peer));
4488     insert_in_sampler (sub, update_peer);
4489     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4490   }
4491
4492   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4493   {
4494     LOG (GNUNET_ERROR_TYPE_DEBUG,
4495          "Updating with peer %s from pull list\n",
4496          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4497     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4498     /* This cleans only if it is not in the view */
4499     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4500   }
4501
4502
4503   /* Empty push/pull lists */
4504   CustomPeerMap_clear (sub->push_map);
4505   CustomPeerMap_clear (sub->pull_map);
4506
4507   if (sub == msub)
4508   {
4509     GNUNET_STATISTICS_set (stats,
4510                            "view size",
4511                            View_size(sub->view),
4512                            GNUNET_NO);
4513   }
4514
4515   struct GNUNET_TIME_Relative time_next_round;
4516
4517   time_next_round = compute_rand_delay (sub->round_interval, 2);
4518
4519   /* Schedule next round */
4520   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4521                                                      &do_round, sub);
4522   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4523 }
4524
4525
4526 /**
4527  * This is called from GNUNET_CADET_get_peers().
4528  *
4529  * It is called on every peer(ID) that cadet somehow has contact with.
4530  * We use those to initialise the sampler.
4531  *
4532  * implements #GNUNET_CADET_PeersCB
4533  *
4534  * @param cls Closure - Sub
4535  * @param peer Peer, or NULL on "EOF".
4536  * @param tunnel Do we have a tunnel towards this peer?
4537  * @param n_paths Number of known paths towards this peer.
4538  * @param best_path How long is the best path?
4539  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4540  */
4541 void
4542 init_peer_cb (void *cls,
4543               const struct GNUNET_PeerIdentity *peer,
4544               int tunnel, /* "Do we have a tunnel towards this peer?" */
4545               unsigned int n_paths, /* "Number of known paths towards this peer" */
4546               unsigned int best_path) /* "How long is the best path?
4547                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4548 {
4549   struct Sub *sub = cls;
4550   (void) tunnel;
4551   (void) n_paths;
4552   (void) best_path;
4553
4554   if (NULL != peer)
4555   {
4556     LOG (GNUNET_ERROR_TYPE_DEBUG,
4557          "Got peer_id %s from cadet\n",
4558          GNUNET_i2s (peer));
4559     got_peer (sub, peer);
4560   }
4561 }
4562
4563
4564 /**
4565  * @brief Iterator function over stored, valid peers.
4566  *
4567  * We initialise the sampler with those.
4568  *
4569  * @param cls Closure - Sub
4570  * @param peer the peer id
4571  * @return #GNUNET_YES if we should continue to
4572  *         iterate,
4573  *         #GNUNET_NO if not.
4574  */
4575 static int
4576 valid_peers_iterator (void *cls,
4577                       const struct GNUNET_PeerIdentity *peer)
4578 {
4579   struct Sub *sub = cls;
4580
4581   if (NULL != peer)
4582   {
4583     LOG (GNUNET_ERROR_TYPE_DEBUG,
4584          "Got stored, valid peer %s\n",
4585          GNUNET_i2s (peer));
4586     got_peer (sub, peer);
4587   }
4588   return GNUNET_YES;
4589 }
4590
4591
4592 /**
4593  * Iterator over peers from peerinfo.
4594  *
4595  * @param cls Closure - Sub
4596  * @param peer id of the peer, NULL for last call
4597  * @param hello hello message for the peer (can be NULL)
4598  * @param error message
4599  */
4600 void
4601 process_peerinfo_peers (void *cls,
4602                         const struct GNUNET_PeerIdentity *peer,
4603                         const struct GNUNET_HELLO_Message *hello,
4604                         const char *err_msg)
4605 {
4606   struct Sub *sub = cls;
4607   (void) hello;
4608   (void) err_msg;
4609
4610   if (NULL != peer)
4611   {
4612     LOG (GNUNET_ERROR_TYPE_DEBUG,
4613          "Got peer_id %s from peerinfo\n",
4614          GNUNET_i2s (peer));
4615     got_peer (sub, peer);
4616   }
4617 }
4618
4619
4620 /**
4621  * Task run during shutdown.
4622  *
4623  * @param cls Closure - unused
4624  */
4625 static void
4626 shutdown_task (void *cls)
4627 {
4628   (void) cls;
4629   struct ClientContext *client_ctx;
4630
4631   LOG (GNUNET_ERROR_TYPE_DEBUG,
4632        "RPS service is going down\n");
4633
4634   /* Clean all clients */
4635   for (client_ctx = cli_ctx_head;
4636        NULL != cli_ctx_head;
4637        client_ctx = cli_ctx_head)
4638   {
4639     destroy_cli_ctx (client_ctx);
4640   }
4641   if (NULL != msub)
4642   {
4643     destroy_sub (msub);
4644     msub = NULL;
4645   }
4646
4647   /* Disconnect from other services */
4648   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4649   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4650   peerinfo_handle = NULL;
4651   GNUNET_NSE_disconnect (nse);
4652   if (NULL != map_single_hop)
4653   {
4654     /* core_init was called - core was initialised */
4655     /* disconnect first, so no callback tries to access missing peermap */
4656     GNUNET_CORE_disconnect (core_handle);
4657     core_handle = NULL;
4658     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4659     map_single_hop = NULL;
4660   }
4661
4662   if (NULL != stats)
4663   {
4664     GNUNET_STATISTICS_destroy (stats,
4665                                GNUNET_NO);
4666     stats = NULL;
4667   }
4668   GNUNET_CADET_disconnect (cadet_handle);
4669   cadet_handle = NULL;
4670 #if ENABLE_MALICIOUS
4671   struct AttackedPeer *tmp_att_peer;
4672   GNUNET_array_grow (mal_peers,
4673                      num_mal_peers,
4674                      0);
4675   if (NULL != mal_peer_set)
4676     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4677   if (NULL != att_peer_set)
4678     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4679   while (NULL != att_peers_head)
4680   {
4681     tmp_att_peer = att_peers_head;
4682     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4683                                  att_peers_tail,
4684                                  tmp_att_peer);
4685     GNUNET_free (tmp_att_peer);
4686   }
4687 #endif /* ENABLE_MALICIOUS */
4688   close_all_files();
4689 }
4690
4691
4692 /**
4693  * Handle client connecting to the service.
4694  *
4695  * @param cls unused
4696  * @param client the new client
4697  * @param mq the message queue of @a client
4698  * @return @a client
4699  */
4700 static void *
4701 client_connect_cb (void *cls,
4702                    struct GNUNET_SERVICE_Client *client,
4703                    struct GNUNET_MQ_Handle *mq)
4704 {
4705   struct ClientContext *cli_ctx;
4706   (void) cls;
4707
4708   LOG (GNUNET_ERROR_TYPE_DEBUG,
4709        "Client connected\n");
4710   if (NULL == client)
4711     return client; /* Server was destroyed before a client connected. Shutting down */
4712   cli_ctx = GNUNET_new (struct ClientContext);
4713   cli_ctx->mq = mq;
4714   cli_ctx->view_updates_left = -1;
4715   cli_ctx->stream_update = GNUNET_NO;
4716   cli_ctx->client = client;
4717   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4718                                cli_ctx_tail,
4719                                cli_ctx);
4720   return cli_ctx;
4721 }
4722
4723 /**
4724  * Callback called when a client disconnected from the service
4725  *
4726  * @param cls closure for the service
4727  * @param c the client that disconnected
4728  * @param internal_cls should be equal to @a c
4729  */
4730 static void
4731 client_disconnect_cb (void *cls,
4732                       struct GNUNET_SERVICE_Client *client,
4733                       void *internal_cls)
4734 {
4735   struct ClientContext *cli_ctx = internal_cls;
4736
4737   (void) cls;
4738   GNUNET_assert (client == cli_ctx->client);
4739   if (NULL == client)
4740   {/* shutdown task - destroy all clients */
4741     while (NULL != cli_ctx_head)
4742       destroy_cli_ctx (cli_ctx_head);
4743   }
4744   else
4745   { /* destroy this client */
4746     LOG (GNUNET_ERROR_TYPE_DEBUG,
4747         "Client disconnected. Destroy its context.\n");
4748     destroy_cli_ctx (cli_ctx);
4749   }
4750 }
4751
4752
4753 /**
4754  * Handle random peer sampling clients.
4755  *
4756  * @param cls closure
4757  * @param c configuration to use
4758  * @param service the initialized service
4759  */
4760 static void
4761 run (void *cls,
4762      const struct GNUNET_CONFIGURATION_Handle *c,
4763      struct GNUNET_SERVICE_Handle *service)
4764 {
4765   struct GNUNET_TIME_Relative round_interval;
4766   long long unsigned int sampler_size;
4767   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4768   struct GNUNET_HashCode hash;
4769
4770   (void) cls;
4771   (void) service;
4772
4773   GNUNET_log_setup ("rps",
4774                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4775                     NULL);
4776   cfg = c;
4777   /* Get own ID */
4778   GNUNET_CRYPTO_get_peer_identity (cfg,
4779                                    &own_identity); // TODO check return value
4780   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4781               "STARTING SERVICE (rps) for peer [%s]\n",
4782               GNUNET_i2s (&own_identity));
4783 #if ENABLE_MALICIOUS
4784   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4785               "Malicious execution compiled in.\n");
4786 #endif /* ENABLE_MALICIOUS */
4787
4788   /* Get time interval from the configuration */
4789   if (GNUNET_OK !=
4790       GNUNET_CONFIGURATION_get_value_time (cfg,
4791                                            "RPS",
4792                                            "ROUNDINTERVAL",
4793                                            &round_interval))
4794   {
4795     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4796                                "RPS", "ROUNDINTERVAL");
4797     GNUNET_SCHEDULER_shutdown ();
4798     return;
4799   }
4800
4801   /* Get initial size of sampler/view from the configuration */
4802   if (GNUNET_OK !=
4803       GNUNET_CONFIGURATION_get_value_number (cfg,
4804                                              "RPS",
4805                                              "MINSIZE",
4806                                              &sampler_size))
4807   {
4808     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4809                                "RPS", "MINSIZE");
4810     GNUNET_SCHEDULER_shutdown ();
4811     return;
4812   }
4813
4814   cadet_handle = GNUNET_CADET_connect (cfg);
4815   GNUNET_assert (NULL != cadet_handle);
4816   core_handle = GNUNET_CORE_connect (cfg,
4817                                      NULL, /* cls */
4818                                      core_init, /* init */
4819                                      core_connects, /* connects */
4820                                      core_disconnects, /* disconnects */
4821                                      NULL); /* handlers */
4822   GNUNET_assert (NULL != core_handle);
4823
4824
4825   alpha = 0.45;
4826   beta  = 0.45;
4827
4828
4829   /* Set up main Sub */
4830   GNUNET_CRYPTO_hash (hash_port_string,
4831                       strlen (hash_port_string),
4832                       &hash);
4833   msub = new_sub (&hash,
4834                  sampler_size, /* Will be overwritten by config */
4835                  round_interval);
4836
4837
4838   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4839
4840   /* connect to NSE */
4841   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4842
4843   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4844   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4845   // TODO send push/pull to each of those peers?
4846   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4847   restore_valid_peers (msub);
4848   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4849
4850   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4851                                                    GNUNET_NO,
4852                                                    process_peerinfo_peers,
4853                                                    msub);
4854
4855   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4856
4857   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4858   stats = GNUNET_STATISTICS_create ("rps", cfg);
4859 }
4860
4861
4862 /**
4863  * Define "main" method using service macro.
4864  */
4865 GNUNET_SERVICE_MAIN
4866 ("rps",
4867  GNUNET_SERVICE_OPTION_NONE,
4868  &run,
4869  &client_connect_cb,
4870  &client_disconnect_cb,
4871  NULL,
4872  GNUNET_MQ_hd_var_size (client_seed,
4873    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4874    struct GNUNET_RPS_CS_SeedMessage,
4875    NULL),
4876 #if ENABLE_MALICIOUS
4877  GNUNET_MQ_hd_var_size (client_act_malicious,
4878    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4879    struct GNUNET_RPS_CS_ActMaliciousMessage,
4880    NULL),
4881 #endif /* ENABLE_MALICIOUS */
4882  GNUNET_MQ_hd_fixed_size (client_view_request,
4883    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4884    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4885    NULL),
4886  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4887    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4888    struct GNUNET_MessageHeader,
4889    NULL),
4890  GNUNET_MQ_hd_fixed_size (client_stream_request,
4891    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4892    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4893    NULL),
4894  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4895    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4896    struct GNUNET_MessageHeader,
4897    NULL),
4898  GNUNET_MQ_hd_fixed_size (client_start_sub,
4899    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4900    struct GNUNET_RPS_CS_SubStartMessage,
4901    NULL),
4902  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4903    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4904    struct GNUNET_RPS_CS_SubStopMessage,
4905    NULL),
4906  GNUNET_MQ_handler_end());
4907
4908 /* end of gnunet-service-rps.c */