RPS Profiler: Keep track of push to single vs. multi-hop peer
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357 #ifdef TO_FILE_FULL
358   /**
359    * Name to log view to
360    */
361   char *file_name_view_log;
362 #endif /* TO_FILE_FULL */
363
364 #ifdef TO_FILE
365 #ifdef TO_FILE_FULL
366   /**
367    * Name to log number of observed peers to
368    */
369   char *file_name_observed_log;
370 #endif /* TO_FILE_FULL */
371
372   /**
373    * @brief Count the observed peers
374    */
375   uint32_t num_observed_peers;
376
377   /**
378    * @brief Multipeermap (ab-) used to count unique peer_ids
379    */
380   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
381 #endif /* TO_FILE */
382
383   /**
384    * List to store peers received through pushes temporary.
385    */
386   struct CustomPeerMap *push_map;
387
388   /**
389    * List to store peers received through pulls temporary.
390    */
391   struct CustomPeerMap *pull_map;
392
393   /**
394    * @brief This is the estimate used as view size.
395    *
396    * It is initialised with the minimum
397    */
398   unsigned int view_size_est_need;
399
400   /**
401    * @brief This is the minimum estimate used as view size.
402    *
403    * It is configured by the user.
404    */
405   unsigned int view_size_est_min;
406
407   /**
408    * @brief The view.
409    */
410   struct View *view;
411
412   /**
413    * Identifier for the main task that runs periodically.
414    */
415   struct GNUNET_SCHEDULER_Task *do_round_task;
416
417   /* === stats === */
418
419   /**
420    * @brief Counts the executed rounds.
421    */
422   uint32_t num_rounds;
423
424   /**
425    * @brief This array accumulates the number of received pushes per round.
426    *
427    * Number at index i represents the number of rounds with i observed pushes.
428    */
429   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
430
431   /**
432    * @brief Histogram of deltas between the expected and actual number of
433    * received pushes.
434    *
435    * As half of the entries are expected to be negative, this is shifted by
436    * #HISTOGRAM_FILE_SLOTS/2.
437    */
438   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
439
440   /**
441    * @brief Number of pull replies with this delay measured in rounds.
442    *
443    * Number at index i represents the number of pull replies with a delay of i
444    * rounds.
445    */
446   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
447 };
448
449
450 /***********************************************************************
451  * Globals
452 ***********************************************************************/
453
454 /**
455  * Our configuration.
456  */
457 static const struct GNUNET_CONFIGURATION_Handle *cfg;
458
459 /**
460  * Handle to the statistics service.
461  */
462 struct GNUNET_STATISTICS_Handle *stats;
463
464 /**
465  * Handler to CADET.
466  */
467 struct GNUNET_CADET_Handle *cadet_handle;
468
469 /**
470  * Handle to CORE
471  */
472 struct GNUNET_CORE_Handle *core_handle;
473
474 /**
475  * @brief PeerMap to keep track of connected peers.
476  */
477 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478
479 /**
480  * Our own identity.
481  */
482 static struct GNUNET_PeerIdentity own_identity;
483
484 /**
485  * Percentage of total peer number in the view
486  * to send random PUSHes to
487  */
488 static float alpha;
489
490 /**
491  * Percentage of total peer number in the view
492  * to send random PULLs to
493  */
494 static float beta;
495
496 /**
497  * Handler to NSE.
498  */
499 static struct GNUNET_NSE_Handle *nse;
500
501 /**
502  * Handler to PEERINFO.
503  */
504 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
505
506 /**
507  * Handle for cancellation of iteration over peers.
508  */
509 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510
511
512 #if ENABLE_MALICIOUS
513 /**
514  * Type of malicious peer
515  *
516  * 0 Don't act malicious at all - Default
517  * 1 Try to maximise representation
518  * 2 Try to partition the network
519  * 3 Combined attack
520  */
521 static uint32_t mal_type;
522
523 /**
524  * Other malicious peers
525  */
526 static struct GNUNET_PeerIdentity *mal_peers;
527
528 /**
529  * Hashmap of malicious peers used as set.
530  * Used to more efficiently check whether we know that peer.
531  */
532 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
533
534 /**
535  * Number of other malicious peers
536  */
537 static uint32_t num_mal_peers;
538
539
540 /**
541  * If type is 2 this is the DLL of attacked peers
542  */
543 static struct AttackedPeer *att_peers_head;
544 static struct AttackedPeer *att_peers_tail;
545
546 /**
547  * This index is used to point to an attacked peer to
548  * implement the round-robin-ish way to select attacked peers.
549  */
550 static struct AttackedPeer *att_peer_index;
551
552 /**
553  * Hashmap of attacked peers used as set.
554  * Used to more efficiently check whether we know that peer.
555  */
556 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
557
558 /**
559  * Number of attacked peers
560  */
561 static uint32_t num_attacked_peers;
562
563 /**
564  * If type is 1 this is the attacked peer
565  */
566 static struct GNUNET_PeerIdentity attacked_peer;
567
568 /**
569  * The limit of PUSHes we can send in one round.
570  * This is an assumption of the Brahms protocol and either implemented
571  * via proof of work
572  * or
573  * assumend to be the bandwidth limitation.
574  */
575 static uint32_t push_limit = 10000;
576 #endif /* ENABLE_MALICIOUS */
577
578 /**
579  * @brief Main Sub.
580  *
581  * This is run in any case by all peers and connects to all peers without
582  * specifying a shared value.
583  */
584 static struct Sub *msub;
585
586 /**
587  * @brief Maximum number of valid peers to keep.
588  * TODO read from config
589  */
590 static const uint32_t num_valid_peers_max = UINT32_MAX;
591
592 /***********************************************************************
593  * /Globals
594 ***********************************************************************/
595
596
597 static void
598 do_round (void *cls);
599
600 static void
601 do_mal_round (void *cls);
602
603
604 /**
605  * @brief Get the #PeerContext associated with a peer
606  *
607  * @param peer_map The peer map containing the context
608  * @param peer the peer id
609  *
610  * @return the #PeerContext
611  */
612 static struct PeerContext *
613 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
614               const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *ctx;
617   int ret;
618
619   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
620   GNUNET_assert (GNUNET_YES == ret);
621   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
622   GNUNET_assert (NULL != ctx);
623   return ctx;
624 }
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                           "# known peers",
679                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                           GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ( (NULL == peer_ctx->send_channel_ctx) &&
729        (NULL == peer_ctx->recv_channel_ctx) )
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780   (void) value;
781
782   if (0 >= iterator_cls->index)
783   {
784     iterator_cls->peer = peer;
785     return GNUNET_NO;
786   }
787   iterator_cls->index--;
788   return GNUNET_YES;
789 }
790
791
792 /**
793  * @brief Get a random peer from @a peer_map
794  *
795  * @param valid_peers Peer map containing valid peers from which to select a
796  * random one
797  *
798  * @return a random peer
799  */
800 static const struct GNUNET_PeerIdentity *
801 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
802 {
803   struct GetRandPeerIteratorCls *iterator_cls;
804   const struct GNUNET_PeerIdentity *ret;
805
806   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
807   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
808       GNUNET_CONTAINER_multipeermap_size (valid_peers));
809   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
810                                                 get_rand_peer_iterator,
811                                                 iterator_cls);
812   ret = iterator_cls->peer;
813   GNUNET_free (iterator_cls);
814   return ret;
815 }
816
817
818 /**
819  * @brief Add a given @a peer to valid peers.
820  *
821  * If valid peers are already #num_valid_peers_max, delete a peer previously.
822  *
823  * @param peer The peer that is added to the valid peers.
824  * @param valid_peers Peer map of valid peers to which to add the @a peer
825  *
826  * @return #GNUNET_YES if no other peer had to be removed
827  *         #GNUNET_NO  otherwise
828  */
829 static int
830 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
831                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
832 {
833   const struct GNUNET_PeerIdentity *rand_peer;
834   int ret;
835
836   ret = GNUNET_YES;
837   /* Remove random peers until there is space for a new one */
838   while (num_valid_peers_max <=
839          GNUNET_CONTAINER_multipeermap_size (valid_peers))
840   {
841     rand_peer = get_random_peer_from_peermap (valid_peers);
842     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
843     ret = GNUNET_NO;
844   }
845   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
846       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
847   if (valid_peers == msub->valid_peers)
848   {
849     GNUNET_STATISTICS_set (stats,
850                            "# valid peers",
851                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
852                            GNUNET_NO);
853   }
854   return ret;
855 }
856
857 static void
858 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
859
860 /**
861  * @brief Set the peer flag to living and
862  *        call the pending operations on this peer.
863  *
864  * Also adds peer to #valid_peers.
865  *
866  * @param peer_ctx the #PeerContext of the peer to set online
867  */
868 static void
869 set_peer_online (struct PeerContext *peer_ctx)
870 {
871   struct GNUNET_PeerIdentity *peer;
872   unsigned int i;
873
874   peer = &peer_ctx->peer_id;
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876       "Peer %s is online and valid, calling %i pending operations on it\n",
877       GNUNET_i2s (peer),
878       peer_ctx->num_pending_ops);
879
880   if (NULL != peer_ctx->online_check_pending)
881   {
882     LOG (GNUNET_ERROR_TYPE_DEBUG,
883          "Removing pending online check for peer %s\n",
884          GNUNET_i2s (&peer_ctx->peer_id));
885     // TODO wait until cadet sets mq->cancel_impl
886     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
887     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
888     peer_ctx->online_check_pending = NULL;
889   }
890
891   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
892
893   /* Call pending operations */
894   for (i = 0; i < peer_ctx->num_pending_ops; i++)
895   {
896     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
897   }
898   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
899 }
900
901 static void
902 cleanup_destroyed_channel (void *cls,
903                            const struct GNUNET_CADET_Channel *channel);
904
905 /* Declaration of handlers */
906 static void
907 handle_peer_check (void *cls,
908                    const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_push (void *cls,
912                   const struct GNUNET_MessageHeader *msg);
913
914 static void
915 handle_peer_pull_request (void *cls,
916                           const struct GNUNET_MessageHeader *msg);
917
918 static int
919 check_peer_pull_reply (void *cls,
920                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 static void
923 handle_peer_pull_reply (void *cls,
924                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
925
926 /* End declaration of handlers */
927
928 /**
929  * @brief Allocate memory for a new channel context and insert it into DLL
930  *
931  * @param peer_ctx context of the according peer
932  *
933  * @return The channel context
934  */
935 static struct ChannelCtx *
936 add_channel_ctx (struct PeerContext *peer_ctx)
937 {
938   struct ChannelCtx *channel_ctx;
939   channel_ctx = GNUNET_new (struct ChannelCtx);
940   channel_ctx->peer_ctx = peer_ctx;
941   return channel_ctx;
942 }
943
944
945 /**
946  * @brief Free memory and NULL pointers.
947  *
948  * @param channel_ctx The channel context.
949  */
950 static void
951 remove_channel_ctx (struct ChannelCtx *channel_ctx)
952 {
953   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
954
955   if (NULL != channel_ctx->destruction_task)
956   {
957     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
958     channel_ctx->destruction_task = NULL;
959   }
960
961   GNUNET_free (channel_ctx);
962
963   if (NULL == peer_ctx) return;
964   if (channel_ctx == peer_ctx->send_channel_ctx)
965   {
966     peer_ctx->send_channel_ctx = NULL;
967     peer_ctx->mq = NULL;
968   }
969   else if (channel_ctx == peer_ctx->recv_channel_ctx)
970   {
971     peer_ctx->recv_channel_ctx = NULL;
972   }
973 }
974
975
976 /**
977  * @brief Get the channel of a peer. If not existing, create.
978  *
979  * @param peer_ctx Context of the peer of which to get the channel
980  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
981  */
982 struct GNUNET_CADET_Channel *
983 get_channel (struct PeerContext *peer_ctx)
984 {
985   /* There exists a copy-paste-clone in run() */
986   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
987     GNUNET_MQ_hd_fixed_size (peer_check,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_push,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_fixed_size (peer_pull_request,
996                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
997                              struct GNUNET_MessageHeader,
998                              NULL),
999     GNUNET_MQ_hd_var_size (peer_pull_reply,
1000                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1001                            struct GNUNET_RPS_P2P_PullReplyMessage,
1002                            NULL),
1003     GNUNET_MQ_handler_end ()
1004   };
1005
1006
1007   if (NULL == peer_ctx->send_channel_ctx)
1008   {
1009     LOG (GNUNET_ERROR_TYPE_DEBUG,
1010          "Trying to establish channel to peer %s\n",
1011          GNUNET_i2s (&peer_ctx->peer_id));
1012     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1013     peer_ctx->send_channel_ctx->channel =
1014       GNUNET_CADET_channel_create (cadet_handle,
1015                                    peer_ctx->send_channel_ctx, /* context */
1016                                    &peer_ctx->peer_id,
1017                                    &peer_ctx->sub->hash,
1018                                    GNUNET_CADET_OPTION_RELIABLE,
1019                                    NULL, /* WindowSize handler */
1020                                    &cleanup_destroyed_channel, /* Disconnect handler */
1021                                    cadet_handlers);
1022   }
1023   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1024   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1025   return peer_ctx->send_channel_ctx->channel;
1026 }
1027
1028
1029 /**
1030  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1031  *
1032  * If we already have a message queue open to this client,
1033  * simply return it, otherways create one.
1034  *
1035  * @param peer_ctx Context of the peer of whicht to get the mq
1036  * @return the #GNUNET_MQ_Handle
1037  */
1038 static struct GNUNET_MQ_Handle *
1039 get_mq (struct PeerContext *peer_ctx)
1040 {
1041   if (NULL == peer_ctx->mq)
1042   {
1043     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1044   }
1045   return peer_ctx->mq;
1046 }
1047
1048 /**
1049  * @brief Add an envelope to a message passed to mq to list of pending messages
1050  *
1051  * @param peer_ctx Context of the peer for which to insert the envelope
1052  * @param ev envelope to the message
1053  * @param type type of the message to be sent
1054  * @return pointer to pending message
1055  */
1056 static struct PendingMessage *
1057 insert_pending_message (struct PeerContext *peer_ctx,
1058                         struct GNUNET_MQ_Envelope *ev,
1059                         const char *type)
1060 {
1061   struct PendingMessage *pending_msg;
1062
1063   pending_msg = GNUNET_new (struct PendingMessage);
1064   pending_msg->ev = ev;
1065   pending_msg->peer_ctx = peer_ctx;
1066   pending_msg->type = type;
1067   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1068                                peer_ctx->pending_messages_tail,
1069                                pending_msg);
1070   return pending_msg;
1071 }
1072
1073
1074 /**
1075  * @brief Remove a pending message from the respective DLL
1076  *
1077  * @param pending_msg the pending message to remove
1078  * @param cancel whether to cancel the pending message, too
1079  */
1080 static void
1081 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1082 {
1083   struct PeerContext *peer_ctx;
1084   (void) cancel;
1085
1086   peer_ctx = pending_msg->peer_ctx;
1087   GNUNET_assert (NULL != peer_ctx);
1088   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1089                                peer_ctx->pending_messages_tail,
1090                                pending_msg);
1091   // TODO wait for the cadet implementation of message cancellation
1092   //if (GNUNET_YES == cancel)
1093   //{
1094   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1095   //}
1096   GNUNET_free (pending_msg);
1097 }
1098
1099
1100 /**
1101  * @brief This is called in response to the first message we sent as a
1102  * online check.
1103  *
1104  * @param cls #PeerContext of peer with pending online check
1105  */
1106 static void
1107 mq_online_check_successful (void *cls)
1108 {
1109   struct PeerContext *peer_ctx = cls;
1110
1111   if (NULL != peer_ctx->online_check_pending)
1112   {
1113     LOG (GNUNET_ERROR_TYPE_DEBUG,
1114         "Online check for peer %s was successfull\n",
1115         GNUNET_i2s (&peer_ctx->peer_id));
1116     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1117     peer_ctx->online_check_pending = NULL;
1118     set_peer_online (peer_ctx);
1119     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1120   }
1121 }
1122
1123 /**
1124  * Issue a check whether peer is online
1125  *
1126  * @param peer_ctx the context of the peer
1127  */
1128 static void
1129 check_peer_online (struct PeerContext *peer_ctx)
1130 {
1131   LOG (GNUNET_ERROR_TYPE_DEBUG,
1132        "Get informed about peer %s getting online\n",
1133        GNUNET_i2s (&peer_ctx->peer_id));
1134
1135   struct GNUNET_MQ_Handle *mq;
1136   struct GNUNET_MQ_Envelope *ev;
1137
1138   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1139   peer_ctx->online_check_pending =
1140     insert_pending_message (peer_ctx, ev, "Check online");
1141   mq = get_mq (peer_ctx);
1142   GNUNET_MQ_notify_sent (ev,
1143                          mq_online_check_successful,
1144                          peer_ctx);
1145   GNUNET_MQ_send (mq, ev);
1146   if (peer_ctx->sub == msub)
1147   {
1148     GNUNET_STATISTICS_update (stats,
1149                               "# pending online checks",
1150                               1,
1151                               GNUNET_NO);
1152   }
1153 }
1154
1155
1156 /**
1157  * @brief Check whether function of type #PeerOp was already scheduled
1158  *
1159  * The array with pending operations will probably never grow really big, so
1160  * iterating over it should be ok.
1161  *
1162  * @param peer_ctx Context of the peer to check for the operation
1163  * @param peer_op the operation (#PeerOp) on the peer
1164  *
1165  * @return #GNUNET_YES if this operation is scheduled on that peer
1166  *         #GNUNET_NO  otherwise
1167  */
1168 static int
1169 check_operation_scheduled (const struct PeerContext *peer_ctx,
1170                            const PeerOp peer_op)
1171 {
1172   unsigned int i;
1173
1174   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1175     if (peer_op == peer_ctx->pending_ops[i].op)
1176       return GNUNET_YES;
1177   return GNUNET_NO;
1178 }
1179
1180
1181 /**
1182  * @brief Callback for scheduler to destroy a channel
1183  *
1184  * @param cls Context of the channel
1185  */
1186 static void
1187 destroy_channel (struct ChannelCtx *channel_ctx)
1188 {
1189   struct GNUNET_CADET_Channel *channel;
1190
1191   if (NULL != channel_ctx->destruction_task)
1192   {
1193     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1194     channel_ctx->destruction_task = NULL;
1195   }
1196   GNUNET_assert (channel_ctx->channel != NULL);
1197   channel = channel_ctx->channel;
1198   channel_ctx->channel = NULL;
1199   GNUNET_CADET_channel_destroy (channel);
1200   remove_channel_ctx (channel_ctx);
1201 }
1202
1203
1204 /**
1205  * @brief Destroy a cadet channel.
1206  *
1207  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1208  *
1209  * @param cls
1210  */
1211 static void
1212 destroy_channel_cb (void *cls)
1213 {
1214   struct ChannelCtx *channel_ctx = cls;
1215
1216   channel_ctx->destruction_task = NULL;
1217   destroy_channel (channel_ctx);
1218 }
1219
1220
1221 /**
1222  * @brief Schedule the destruction of a channel for immediately afterwards.
1223  *
1224  * In case a channel is to be destroyed from within the callback to the
1225  * destruction of another channel (send channel), we cannot call
1226  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1227  * construction.
1228  *
1229  * @param channel_ctx channel to be destroyed.
1230  */
1231 static void
1232 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1233 {
1234   GNUNET_assert (NULL ==
1235                  channel_ctx->destruction_task);
1236   GNUNET_assert (NULL !=
1237                  channel_ctx->channel);
1238   channel_ctx->destruction_task =
1239     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1240                               channel_ctx);
1241 }
1242
1243
1244 /**
1245  * @brief Remove peer
1246  *
1247  * - Empties the list with pending operations
1248  * - Empties the list with pending messages
1249  * - Cancels potentially existing online check
1250  * - Schedules closing of send and recv channels
1251  * - Removes peer from peer map
1252  *
1253  * @param peer_ctx Context of the peer to be destroyed
1254  * @return #GNUNET_YES if peer was removed
1255  *         #GNUNET_NO  otherwise
1256  */
1257 static int
1258 destroy_peer (struct PeerContext *peer_ctx)
1259 {
1260   GNUNET_assert (NULL != peer_ctx);
1261   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1262   if (GNUNET_NO ==
1263       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1264                                               &peer_ctx->peer_id))
1265   {
1266     return GNUNET_NO;
1267   }
1268   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1269   LOG (GNUNET_ERROR_TYPE_DEBUG,
1270        "Going to remove peer %s\n",
1271        GNUNET_i2s (&peer_ctx->peer_id));
1272   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1273
1274   /* Clear list of pending operations */
1275   // TODO this probably leaks memory
1276   //      ('only' the cls to the function. Not sure what to do with it)
1277   GNUNET_array_grow (peer_ctx->pending_ops,
1278                      peer_ctx->num_pending_ops,
1279                      0);
1280   /* Remove all pending messages */
1281   while (NULL != peer_ctx->pending_messages_head)
1282   {
1283     LOG (GNUNET_ERROR_TYPE_DEBUG,
1284          "Removing unsent %s\n",
1285          peer_ctx->pending_messages_head->type);
1286     /* Cancle pending message, too */
1287     if ( (NULL != peer_ctx->online_check_pending) &&
1288          (0 == memcmp (peer_ctx->pending_messages_head,
1289                      peer_ctx->online_check_pending,
1290                      sizeof (struct PendingMessage))) )
1291       {
1292         peer_ctx->online_check_pending = NULL;
1293         if (peer_ctx->sub == msub)
1294         {
1295           GNUNET_STATISTICS_update (stats,
1296                                     "# pending online checks",
1297                                     -1,
1298                                     GNUNET_NO);
1299         }
1300       }
1301     remove_pending_message (peer_ctx->pending_messages_head,
1302                             GNUNET_YES);
1303   }
1304
1305   /* If we are still waiting for notification whether this peer is online
1306    * cancel the according task */
1307   if (NULL != peer_ctx->online_check_pending)
1308   {
1309     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310                 "Removing pending online check for peer %s\n",
1311                 GNUNET_i2s (&peer_ctx->peer_id));
1312     // TODO wait until cadet sets mq->cancel_impl
1313     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1314     remove_pending_message (peer_ctx->online_check_pending,
1315                             GNUNET_YES);
1316     peer_ctx->online_check_pending = NULL;
1317   }
1318
1319   if (NULL != peer_ctx->send_channel_ctx)
1320   {
1321     /* This is possibly called from within channel destruction */
1322     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1323     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1324     peer_ctx->send_channel_ctx = NULL;
1325     peer_ctx->mq = NULL;
1326   }
1327   if (NULL != peer_ctx->recv_channel_ctx)
1328   {
1329     /* This is possibly called from within channel destruction */
1330     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1331     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1332     peer_ctx->recv_channel_ctx = NULL;
1333   }
1334
1335   if (GNUNET_YES !=
1336       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1337                                                 &peer_ctx->peer_id))
1338   {
1339     LOG (GNUNET_ERROR_TYPE_WARNING,
1340          "removing peer from peer_ctx->sub->peer_map failed\n");
1341   }
1342   if (peer_ctx->sub == msub)
1343   {
1344     GNUNET_STATISTICS_set (stats,
1345                           "# known peers",
1346                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1347                           GNUNET_NO);
1348   }
1349   GNUNET_free (peer_ctx);
1350   return GNUNET_YES;
1351 }
1352
1353
1354 /**
1355  * Iterator over hash map entries. Deletes all contexts of peers.
1356  *
1357  * @param cls closure
1358  * @param key current public key
1359  * @param value value in the hash map
1360  * @return #GNUNET_YES if we should continue to iterate,
1361  *         #GNUNET_NO if not.
1362  */
1363 static int
1364 peermap_clear_iterator (void *cls,
1365                         const struct GNUNET_PeerIdentity *key,
1366                         void *value)
1367 {
1368   struct Sub *sub = cls;
1369   (void) value;
1370
1371   destroy_peer (get_peer_ctx (sub->peer_map, key));
1372   return GNUNET_YES;
1373 }
1374
1375
1376 /**
1377  * @brief This is called once a message is sent.
1378  *
1379  * Removes the pending message
1380  *
1381  * @param cls type of the message that was sent
1382  */
1383 static void
1384 mq_notify_sent_cb (void *cls)
1385 {
1386   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1387   LOG (GNUNET_ERROR_TYPE_DEBUG,
1388       "%s was sent.\n",
1389       pending_msg->type);
1390   if (pending_msg->peer_ctx->sub == msub)
1391   {
1392     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1393       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1395       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1396     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1397       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1398     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1399                       NULL != map_single_hop &&
1400         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1401           &pending_msg->peer_ctx->peer_id))
1402       GNUNET_STATISTICS_update(stats,
1403                                "# pull requests sent (multi-hop peer)",
1404                                1,
1405                                GNUNET_NO);
1406   }
1407   /* Do not cancle message */
1408   remove_pending_message (pending_msg, GNUNET_NO);
1409 }
1410
1411
1412 /**
1413  * @brief Iterator function for #store_valid_peers.
1414  *
1415  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1416  * Writes single peer to disk.
1417  *
1418  * @param cls the file handle to write to.
1419  * @param peer current peer
1420  * @param value unused
1421  *
1422  * @return  #GNUNET_YES if we should continue to
1423  *          iterate,
1424  *          #GNUNET_NO if not.
1425  */
1426 static int
1427 store_peer_presistently_iterator (void *cls,
1428                                   const struct GNUNET_PeerIdentity *peer,
1429                                   void *value)
1430 {
1431   const struct GNUNET_DISK_FileHandle *fh = cls;
1432   char peer_string[128];
1433   int size;
1434   ssize_t ret;
1435   (void) value;
1436
1437   if (NULL == peer)
1438   {
1439     return GNUNET_YES;
1440   }
1441   size = GNUNET_snprintf (peer_string,
1442                           sizeof (peer_string),
1443                           "%s\n",
1444                           GNUNET_i2s_full (peer));
1445   GNUNET_assert (53 == size);
1446   ret = GNUNET_DISK_file_write (fh,
1447                                 peer_string,
1448                                 size);
1449   GNUNET_assert (size == ret);
1450   return GNUNET_YES;
1451 }
1452
1453
1454 /**
1455  * @brief Store the peers currently in #valid_peers to disk.
1456  *
1457  * @param sub Sub for which to store the valid peers
1458  */
1459 static void
1460 store_valid_peers (const struct Sub *sub)
1461 {
1462   struct GNUNET_DISK_FileHandle *fh;
1463   uint32_t number_written_peers;
1464   int ret;
1465
1466   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1467   {
1468     return;
1469   }
1470
1471   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1472   if (GNUNET_SYSERR == ret)
1473   {
1474     LOG (GNUNET_ERROR_TYPE_WARNING,
1475         "Not able to create directory for file `%s'\n",
1476         sub->filename_valid_peers);
1477     GNUNET_break (0);
1478   }
1479   else if (GNUNET_NO == ret)
1480   {
1481     LOG (GNUNET_ERROR_TYPE_WARNING,
1482         "Directory for file `%s' exists but is not writable for us\n",
1483         sub->filename_valid_peers);
1484     GNUNET_break (0);
1485   }
1486   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1487                               GNUNET_DISK_OPEN_WRITE |
1488                                   GNUNET_DISK_OPEN_CREATE,
1489                               GNUNET_DISK_PERM_USER_READ |
1490                                   GNUNET_DISK_PERM_USER_WRITE);
1491   if (NULL == fh)
1492   {
1493     LOG (GNUNET_ERROR_TYPE_WARNING,
1494         "Not able to write valid peers to file `%s'\n",
1495         sub->filename_valid_peers);
1496     return;
1497   }
1498   LOG (GNUNET_ERROR_TYPE_DEBUG,
1499       "Writing %u valid peers to disk\n",
1500       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1501   number_written_peers =
1502     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1503                                            store_peer_presistently_iterator,
1504                                            fh);
1505   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1506   GNUNET_assert (number_written_peers ==
1507       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1508 }
1509
1510
1511 /**
1512  * @brief Convert string representation of peer id to peer id.
1513  *
1514  * Counterpart to #GNUNET_i2s_full.
1515  *
1516  * @param string_repr The string representation of the peer id
1517  *
1518  * @return The peer id
1519  */
1520 static const struct GNUNET_PeerIdentity *
1521 s2i_full (const char *string_repr)
1522 {
1523   struct GNUNET_PeerIdentity *peer;
1524   size_t len;
1525   int ret;
1526
1527   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1528   len = strlen (string_repr);
1529   if (52 > len)
1530   {
1531     LOG (GNUNET_ERROR_TYPE_WARNING,
1532         "Not able to convert string representation of PeerID to PeerID\n"
1533         "Sting representation: %s (len %lu) - too short\n",
1534         string_repr,
1535         len);
1536     GNUNET_break (0);
1537   }
1538   else if (52 < len)
1539   {
1540     len = 52;
1541   }
1542   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1543                                                     len,
1544                                                     &peer->public_key);
1545   if (GNUNET_OK != ret)
1546   {
1547     LOG (GNUNET_ERROR_TYPE_WARNING,
1548         "Not able to convert string representation of PeerID to PeerID\n"
1549         "Sting representation: %s\n",
1550         string_repr);
1551     GNUNET_break (0);
1552   }
1553   return peer;
1554 }
1555
1556
1557 /**
1558  * @brief Restore the peers on disk to #valid_peers.
1559  *
1560  * @param sub Sub for which to restore the valid peers
1561  */
1562 static void
1563 restore_valid_peers (const struct Sub *sub)
1564 {
1565   off_t file_size;
1566   uint32_t num_peers;
1567   struct GNUNET_DISK_FileHandle *fh;
1568   char *buf;
1569   ssize_t size_read;
1570   char *iter_buf;
1571   char *str_repr;
1572   const struct GNUNET_PeerIdentity *peer;
1573
1574   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1575   {
1576     return;
1577   }
1578
1579   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1580   {
1581     return;
1582   }
1583   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1584                               GNUNET_DISK_OPEN_READ,
1585                               GNUNET_DISK_PERM_NONE);
1586   GNUNET_assert (NULL != fh);
1587   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1588   num_peers = file_size / 53;
1589   buf = GNUNET_malloc (file_size);
1590   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1591   GNUNET_assert (size_read == file_size);
1592   LOG (GNUNET_ERROR_TYPE_DEBUG,
1593       "Restoring %" PRIu32 " peers from file `%s'\n",
1594       num_peers,
1595       sub->filename_valid_peers);
1596   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1597   {
1598     str_repr = GNUNET_strndup (iter_buf, 53);
1599     peer = s2i_full (str_repr);
1600     GNUNET_free (str_repr);
1601     add_valid_peer (peer, sub->valid_peers);
1602     LOG (GNUNET_ERROR_TYPE_DEBUG,
1603         "Restored valid peer %s from disk\n",
1604         GNUNET_i2s_full (peer));
1605   }
1606   iter_buf = NULL;
1607   GNUNET_free (buf);
1608   LOG (GNUNET_ERROR_TYPE_DEBUG,
1609       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1610       num_peers,
1611       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1612   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1613   {
1614     LOG (GNUNET_ERROR_TYPE_WARNING,
1615         "Number of restored peers does not match file size. Have probably duplicates.\n");
1616   }
1617   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1618   LOG (GNUNET_ERROR_TYPE_DEBUG,
1619       "Restored %u valid peers from disk\n",
1620       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1621 }
1622
1623
1624 /**
1625  * @brief Delete storage of peers that was created with #initialise_peers ()
1626  *
1627  * @param sub Sub for which the storage is deleted
1628  */
1629 static void
1630 peers_terminate (struct Sub *sub)
1631 {
1632   if (GNUNET_SYSERR ==
1633       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1634                                              &peermap_clear_iterator,
1635                                              sub))
1636   {
1637     LOG (GNUNET_ERROR_TYPE_WARNING,
1638         "Iteration destroying peers was aborted.\n");
1639   }
1640   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1641   sub->peer_map = NULL;
1642   store_valid_peers (sub);
1643   GNUNET_free (sub->filename_valid_peers);
1644   sub->filename_valid_peers = NULL;
1645   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1646   sub->valid_peers = NULL;
1647 }
1648
1649
1650 /**
1651  * Iterator over #valid_peers hash map entries.
1652  *
1653  * @param cls Closure that contains iterator function and closure
1654  * @param peer current peer id
1655  * @param value value in the hash map - unused
1656  * @return #GNUNET_YES if we should continue to
1657  *         iterate,
1658  *         #GNUNET_NO if not.
1659  */
1660 static int
1661 valid_peer_iterator (void *cls,
1662                      const struct GNUNET_PeerIdentity *peer,
1663                      void *value)
1664 {
1665   struct PeersIteratorCls *it_cls = cls;
1666   (void) value;
1667
1668   return it_cls->iterator (it_cls->cls, peer);
1669 }
1670
1671
1672 /**
1673  * @brief Get all currently known, valid peer ids.
1674  *
1675  * @param valid_peers Peer map containing the valid peers in question
1676  * @param iterator function to call on each peer id
1677  * @param it_cls extra argument to @a iterator
1678  * @return the number of key value pairs processed,
1679  *         #GNUNET_SYSERR if it aborted iteration
1680  */
1681 static int
1682 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1683                  PeersIterator iterator,
1684                  void *it_cls)
1685 {
1686   struct PeersIteratorCls *cls;
1687   int ret;
1688
1689   cls = GNUNET_new (struct PeersIteratorCls);
1690   cls->iterator = iterator;
1691   cls->cls = it_cls;
1692   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1693                                                valid_peer_iterator,
1694                                                cls);
1695   GNUNET_free (cls);
1696   return ret;
1697 }
1698
1699
1700 /**
1701  * @brief Add peer to known peers.
1702  *
1703  * This function is called on new peer_ids from 'external' sources
1704  * (client seed, cadet get_peers(), ...)
1705  *
1706  * @param sub Sub with the peer map that the @a peer will be added to
1707  * @param peer the new #GNUNET_PeerIdentity
1708  *
1709  * @return #GNUNET_YES if peer was inserted
1710  *         #GNUNET_NO  otherwise
1711  */
1712 static int
1713 insert_peer (struct Sub *sub,
1714              const struct GNUNET_PeerIdentity *peer)
1715 {
1716   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1717   {
1718     return GNUNET_NO; /* We already know this peer - nothing to do */
1719   }
1720   (void) create_peer_ctx (sub, peer);
1721   return GNUNET_YES;
1722 }
1723
1724
1725 /**
1726  * @brief Check whether flags on a peer are set.
1727  *
1728  * @param peer_map Peer map that is expected to contain the @a peer
1729  * @param peer the peer to check the flag of
1730  * @param flags the flags to check
1731  *
1732  * @return #GNUNET_SYSERR if peer is not known
1733  *         #GNUNET_YES    if all given flags are set
1734  *         #GNUNET_NO     otherwise
1735  */
1736 static int
1737 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1738                  const struct GNUNET_PeerIdentity *peer,
1739                  enum Peers_PeerFlags flags)
1740 {
1741   struct PeerContext *peer_ctx;
1742
1743   if (GNUNET_NO == check_peer_known (peer_map, peer))
1744   {
1745     return GNUNET_SYSERR;
1746   }
1747   peer_ctx = get_peer_ctx (peer_map, peer);
1748   return check_peer_flag_set (peer_ctx, flags);
1749 }
1750
1751 /**
1752  * @brief Try connecting to a peer to see whether it is online
1753  *
1754  * If not known yet, insert into known peers
1755  *
1756  * @param sub Sub which would contain the @a peer
1757  * @param peer the peer whose online is to be checked
1758  * @return #GNUNET_YES if the check was issued
1759  *         #GNUNET_NO  otherwise
1760  */
1761 static int
1762 issue_peer_online_check (struct Sub *sub,
1763                          const struct GNUNET_PeerIdentity *peer)
1764 {
1765   struct PeerContext *peer_ctx;
1766
1767   (void) insert_peer (sub, peer); // TODO even needed?
1768   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1769   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1770        (NULL == peer_ctx->online_check_pending) )
1771   {
1772     check_peer_online (peer_ctx);
1773     return GNUNET_YES;
1774   }
1775   return GNUNET_NO;
1776 }
1777
1778
1779 /**
1780  * @brief Check if peer is removable.
1781  *
1782  * Check if
1783  *  - a recv channel exists
1784  *  - there are pending messages
1785  *  - there is no pending pull reply
1786  *
1787  * @param peer_ctx Context of the peer in question
1788  * @return #GNUNET_YES    if peer is removable
1789  *         #GNUNET_NO     if peer is NOT removable
1790  *         #GNUNET_SYSERR if peer is not known
1791  */
1792 static int
1793 check_removable (const struct PeerContext *peer_ctx)
1794 {
1795   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1796                                                            &peer_ctx->peer_id))
1797   {
1798     return GNUNET_SYSERR;
1799   }
1800
1801   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1802        (NULL != peer_ctx->pending_messages_head) ||
1803        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1804   {
1805     return GNUNET_NO;
1806   }
1807   return GNUNET_YES;
1808 }
1809
1810
1811 /**
1812  * @brief Check whether @a peer is actually a peer.
1813  *
1814  * A valid peer is a peer that we know exists eg. we were connected to once.
1815  *
1816  * @param valid_peers Peer map that would contain the @a peer
1817  * @param peer peer in question
1818  *
1819  * @return #GNUNET_YES if peer is valid
1820  *         #GNUNET_NO  if peer is not valid
1821  */
1822 static int
1823 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1824                   const struct GNUNET_PeerIdentity *peer)
1825 {
1826   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1827 }
1828
1829
1830 /**
1831  * @brief Indicate that we want to send to the other peer
1832  *
1833  * This establishes a sending channel
1834  *
1835  * @param peer_ctx Context of the target peer
1836  */
1837 static void
1838 indicate_sending_intention (struct PeerContext *peer_ctx)
1839 {
1840   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1841                                                  &peer_ctx->peer_id));
1842   (void) get_channel (peer_ctx);
1843 }
1844
1845
1846 /**
1847  * @brief Check whether other peer has the intention to send/opened channel
1848  *        towars us
1849  *
1850  * @param peer_ctx Context of the peer in question
1851  *
1852  * @return #GNUNET_YES if peer has the intention to send
1853  *         #GNUNET_NO  otherwise
1854  */
1855 static int
1856 check_peer_send_intention (const struct PeerContext *peer_ctx)
1857 {
1858   if (NULL != peer_ctx->recv_channel_ctx)
1859   {
1860     return GNUNET_YES;
1861   }
1862   return GNUNET_NO;
1863 }
1864
1865
1866 /**
1867  * Handle the channel a peer opens to us.
1868  *
1869  * @param cls The closure - Sub
1870  * @param channel The channel the peer wants to establish
1871  * @param initiator The peer's peer ID
1872  *
1873  * @return initial channel context for the channel
1874  *         (can be NULL -- that's not an error)
1875  */
1876 static void *
1877 handle_inbound_channel (void *cls,
1878                         struct GNUNET_CADET_Channel *channel,
1879                         const struct GNUNET_PeerIdentity *initiator)
1880 {
1881   struct PeerContext *peer_ctx;
1882   struct ChannelCtx *channel_ctx;
1883   struct Sub *sub = cls;
1884
1885   LOG (GNUNET_ERROR_TYPE_DEBUG,
1886       "New channel was established to us (Peer %s).\n",
1887       GNUNET_i2s (initiator));
1888   GNUNET_assert (NULL != channel); /* according to cadet API */
1889   /* Make sure we 'know' about this peer */
1890   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1891   set_peer_online (peer_ctx);
1892   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1893   channel_ctx = add_channel_ctx (peer_ctx);
1894   channel_ctx->channel = channel;
1895   /* We only accept one incoming channel per peer */
1896   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1897                                                              initiator)))
1898   {
1899     LOG (GNUNET_ERROR_TYPE_WARNING,
1900         "Already got one receive channel. Destroying old one.\n");
1901     GNUNET_break_op (0);
1902     destroy_channel (peer_ctx->recv_channel_ctx);
1903     peer_ctx->recv_channel_ctx = channel_ctx;
1904     /* return the channel context */
1905     return channel_ctx;
1906   }
1907   peer_ctx->recv_channel_ctx = channel_ctx;
1908   return channel_ctx;
1909 }
1910
1911
1912 /**
1913  * @brief Check whether a sending channel towards the given peer exists
1914  *
1915  * @param peer_ctx Context of the peer in question
1916  *
1917  * @return #GNUNET_YES if a sending channel towards that peer exists
1918  *         #GNUNET_NO  otherwise
1919  */
1920 static int
1921 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1922 {
1923   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1924                                      &peer_ctx->peer_id))
1925   { /* If no such peer exists, there is no channel */
1926     return GNUNET_NO;
1927   }
1928   if (NULL == peer_ctx->send_channel_ctx)
1929   {
1930     return GNUNET_NO;
1931   }
1932   return GNUNET_YES;
1933 }
1934
1935
1936 /**
1937  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1938  *        intention to another peer
1939  *
1940  * @param peer_ctx Context to the peer
1941  * @return #GNUNET_YES if channel was destroyed
1942  *         #GNUNET_NO  otherwise
1943  */
1944 static int
1945 destroy_sending_channel (struct PeerContext *peer_ctx)
1946 {
1947   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1948                                      &peer_ctx->peer_id))
1949   {
1950     return GNUNET_NO;
1951   }
1952   if (NULL != peer_ctx->send_channel_ctx)
1953   {
1954     destroy_channel (peer_ctx->send_channel_ctx);
1955     (void) check_connected (peer_ctx);
1956     return GNUNET_YES;
1957   }
1958   return GNUNET_NO;
1959 }
1960
1961 /**
1962  * @brief Send a message to another peer.
1963  *
1964  * Keeps track about pending messages so they can be properly removed when the
1965  * peer is destroyed.
1966  *
1967  * @param peer_ctx Context of the peer to which the message is to be sent
1968  * @param ev envelope of the message
1969  * @param type type of the message
1970  */
1971 static void
1972 send_message (struct PeerContext *peer_ctx,
1973               struct GNUNET_MQ_Envelope *ev,
1974               const char *type)
1975 {
1976   struct PendingMessage *pending_msg;
1977   struct GNUNET_MQ_Handle *mq;
1978
1979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1980               "Sending message to %s of type %s\n",
1981               GNUNET_i2s (&peer_ctx->peer_id),
1982               type);
1983   pending_msg = insert_pending_message (peer_ctx, ev, type);
1984   mq = get_mq (peer_ctx);
1985   GNUNET_MQ_notify_sent (ev,
1986                          mq_notify_sent_cb,
1987                          pending_msg);
1988   GNUNET_MQ_send (mq, ev);
1989 }
1990
1991 /**
1992  * @brief Schedule a operation on given peer
1993  *
1994  * Avoids scheduling an operation twice.
1995  *
1996  * @param peer_ctx Context of the peer for which to schedule the operation
1997  * @param peer_op the operation to schedule
1998  * @param cls Closure to @a peer_op
1999  *
2000  * @return #GNUNET_YES if the operation was scheduled
2001  *         #GNUNET_NO  otherwise
2002  */
2003 static int
2004 schedule_operation (struct PeerContext *peer_ctx,
2005                     const PeerOp peer_op,
2006                     void *cls)
2007 {
2008   struct PeerPendingOp pending_op;
2009
2010   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2011                                                  &peer_ctx->peer_id));
2012
2013   //TODO if ONLINE execute immediately
2014
2015   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2016   {
2017     pending_op.op = peer_op;
2018     pending_op.op_cls = cls;
2019     GNUNET_array_append (peer_ctx->pending_ops,
2020                          peer_ctx->num_pending_ops,
2021                          pending_op);
2022     return GNUNET_YES;
2023   }
2024   return GNUNET_NO;
2025 }
2026
2027 /***********************************************************************
2028  * /Old gnunet-service-rps_peers.c
2029 ***********************************************************************/
2030
2031
2032 /***********************************************************************
2033  * Housekeeping with clients
2034 ***********************************************************************/
2035
2036 /**
2037  * Closure used to pass the client and the id to the callback
2038  * that replies to a client's request
2039  */
2040 struct ReplyCls
2041 {
2042   /**
2043    * DLL
2044    */
2045   struct ReplyCls *next;
2046   struct ReplyCls *prev;
2047
2048   /**
2049    * The identifier of the request
2050    */
2051   uint32_t id;
2052
2053   /**
2054    * The handle to the request
2055    */
2056   struct RPS_SamplerRequestHandle *req_handle;
2057
2058   /**
2059    * The client handle to send the reply to
2060    */
2061   struct ClientContext *cli_ctx;
2062 };
2063
2064
2065 /**
2066  * Struct used to store the context of a connected client.
2067  */
2068 struct ClientContext
2069 {
2070   /**
2071    * DLL
2072    */
2073   struct ClientContext *next;
2074   struct ClientContext *prev;
2075
2076   /**
2077    * The message queue to communicate with the client.
2078    */
2079   struct GNUNET_MQ_Handle *mq;
2080
2081   /**
2082    * @brief How many updates this client expects to receive.
2083    */
2084   int64_t view_updates_left;
2085
2086   /**
2087    * @brief Whether this client wants to receive stream updates.
2088    * Either #GNUNET_YES or #GNUNET_NO
2089    */
2090   int8_t stream_update;
2091
2092   /**
2093    * The client handle to send the reply to
2094    */
2095   struct GNUNET_SERVICE_Client *client;
2096
2097   /**
2098    * The #Sub this context belongs to
2099    */
2100   struct Sub *sub;
2101 };
2102
2103 /**
2104  * DLL with all clients currently connected to us
2105  */
2106 struct ClientContext *cli_ctx_head;
2107 struct ClientContext *cli_ctx_tail;
2108
2109 /***********************************************************************
2110  * /Housekeeping with clients
2111 ***********************************************************************/
2112
2113
2114
2115
2116
2117 /***********************************************************************
2118  * Util functions
2119 ***********************************************************************/
2120
2121
2122 /**
2123  * Print peerlist to log.
2124  */
2125 static void
2126 print_peer_list (struct GNUNET_PeerIdentity *list,
2127                  unsigned int len)
2128 {
2129   unsigned int i;
2130
2131   LOG (GNUNET_ERROR_TYPE_DEBUG,
2132        "Printing peer list of length %u at %p:\n",
2133        len,
2134        list);
2135   for (i = 0 ; i < len ; i++)
2136   {
2137     LOG (GNUNET_ERROR_TYPE_DEBUG,
2138          "%u. peer: %s\n",
2139          i, GNUNET_i2s (&list[i]));
2140   }
2141 }
2142
2143
2144 /**
2145  * Remove peer from list.
2146  */
2147 static void
2148 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2149                unsigned int *list_size,
2150                const struct GNUNET_PeerIdentity *peer)
2151 {
2152   unsigned int i;
2153   struct GNUNET_PeerIdentity *tmp;
2154
2155   tmp = *peer_list;
2156
2157   LOG (GNUNET_ERROR_TYPE_DEBUG,
2158        "Removing peer %s from list at %p\n",
2159        GNUNET_i2s (peer),
2160        tmp);
2161
2162   for ( i = 0 ; i < *list_size ; i++ )
2163   {
2164     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2165     {
2166       if (i < *list_size -1)
2167       { /* Not at the last entry -- shift peers left */
2168         memmove (&tmp[i], &tmp[i +1],
2169                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2170       }
2171       /* Remove last entry (should be now useless PeerID) */
2172       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2173     }
2174   }
2175   *peer_list = tmp;
2176 }
2177
2178
2179 /**
2180  * Insert PeerID in #view
2181  *
2182  * Called once we know a peer is online.
2183  * Implements #PeerOp
2184  *
2185  * @return GNUNET_OK if peer was actually inserted
2186  *         GNUNET_NO if peer was not inserted
2187  */
2188 static void
2189 insert_in_view_op (void *cls,
2190                    const struct GNUNET_PeerIdentity *peer);
2191
2192 /**
2193  * Insert PeerID in #view
2194  *
2195  * Called once we know a peer is online.
2196  *
2197  * @param sub Sub in with the view to insert in
2198  * @param peer the peer to insert
2199  *
2200  * @return GNUNET_OK if peer was actually inserted
2201  *         GNUNET_NO if peer was not inserted
2202  */
2203 static int
2204 insert_in_view (struct Sub *sub,
2205                 const struct GNUNET_PeerIdentity *peer)
2206 {
2207   struct PeerContext *peer_ctx;
2208   int online;
2209   int ret;
2210
2211   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2212   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2213   if ( (GNUNET_NO == online) ||
2214        (GNUNET_SYSERR == online) ) /* peer is not even known */
2215   {
2216     (void) issue_peer_online_check (sub, peer);
2217     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2218     return GNUNET_NO;
2219   }
2220   /* Open channel towards peer to keep connection open */
2221   indicate_sending_intention (peer_ctx);
2222   ret = View_put (sub->view, peer);
2223   if (peer_ctx->sub == msub)
2224   {
2225     GNUNET_STATISTICS_set (stats,
2226                            "view size",
2227                            View_size (peer_ctx->sub->view),
2228                            GNUNET_NO);
2229   }
2230   return ret;
2231 }
2232
2233
2234 /**
2235  * @brief Send view to client
2236  *
2237  * @param cli_ctx the context of the client
2238  * @param view_array the peerids of the view as array (can be empty)
2239  * @param view_size the size of the view array (can be 0)
2240  */
2241 static void
2242 send_view (const struct ClientContext *cli_ctx,
2243            const struct GNUNET_PeerIdentity *view_array,
2244            uint64_t view_size)
2245 {
2246   struct GNUNET_MQ_Envelope *ev;
2247   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2248   struct Sub *sub;
2249
2250   if (NULL == view_array)
2251   {
2252     if (NULL == cli_ctx->sub) sub = msub;
2253     else sub = cli_ctx->sub;
2254     view_size = View_size (sub->view);
2255     view_array = View_get_as_array (sub->view);
2256   }
2257
2258   ev = GNUNET_MQ_msg_extra (out_msg,
2259                             view_size * sizeof (struct GNUNET_PeerIdentity),
2260                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2261   out_msg->num_peers = htonl (view_size);
2262
2263   GNUNET_memcpy (&out_msg[1],
2264                  view_array,
2265                  view_size * sizeof (struct GNUNET_PeerIdentity));
2266   GNUNET_MQ_send (cli_ctx->mq, ev);
2267 }
2268
2269
2270 /**
2271  * @brief Send peer from biased stream to client.
2272  *
2273  * TODO merge with send_view, parameterise
2274  *
2275  * @param cli_ctx the context of the client
2276  * @param view_array the peerids of the view as array (can be empty)
2277  * @param view_size the size of the view array (can be 0)
2278  */
2279 static void
2280 send_stream_peers (const struct ClientContext *cli_ctx,
2281                    uint64_t num_peers,
2282                    const struct GNUNET_PeerIdentity *peers)
2283 {
2284   struct GNUNET_MQ_Envelope *ev;
2285   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2286
2287   GNUNET_assert (NULL != peers);
2288
2289   ev = GNUNET_MQ_msg_extra (out_msg,
2290                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2291                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2292   out_msg->num_peers = htonl (num_peers);
2293
2294   GNUNET_memcpy (&out_msg[1],
2295                  peers,
2296                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2297   GNUNET_MQ_send (cli_ctx->mq, ev);
2298 }
2299
2300
2301 /**
2302  * @brief sends updates to clients that are interested
2303  *
2304  * @param sub Sub for which to notify clients
2305  */
2306 static void
2307 clients_notify_view_update (const struct Sub *sub)
2308 {
2309   struct ClientContext *cli_ctx_iter;
2310   uint64_t num_peers;
2311   const struct GNUNET_PeerIdentity *view_array;
2312
2313   num_peers = View_size (sub->view);
2314   view_array = View_get_as_array(sub->view);
2315   /* check size of view is small enough */
2316   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2317   {
2318     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2319                 "View is too big to send\n");
2320     return;
2321   }
2322
2323   for (cli_ctx_iter = cli_ctx_head;
2324        NULL != cli_ctx_iter;
2325        cli_ctx_iter = cli_ctx_iter->next)
2326   {
2327     if (1 < cli_ctx_iter->view_updates_left)
2328     {
2329       /* Client wants to receive limited amount of updates */
2330       cli_ctx_iter->view_updates_left -= 1;
2331     } else if (1 == cli_ctx_iter->view_updates_left)
2332     {
2333       /* Last update of view for client */
2334       cli_ctx_iter->view_updates_left = -1;
2335     } else if (0 > cli_ctx_iter->view_updates_left) {
2336       /* Client is not interested in updates */
2337       continue;
2338     }
2339     /* else _updates_left == 0 - infinite amount of updates */
2340
2341     /* send view */
2342     send_view (cli_ctx_iter, view_array, num_peers);
2343   }
2344 }
2345
2346
2347 /**
2348  * @brief sends updates to clients that are interested
2349  *
2350  * @param num_peers Number of peers to send
2351  * @param peers the array of peers to send
2352  */
2353 static void
2354 clients_notify_stream_peer (const struct Sub *sub,
2355                             uint64_t num_peers,
2356                             const struct GNUNET_PeerIdentity *peers)
2357                             // TODO enum StreamPeerSource)
2358 {
2359   struct ClientContext *cli_ctx_iter;
2360
2361   LOG (GNUNET_ERROR_TYPE_DEBUG,
2362       "Got peer (%s) from biased stream - update all clients\n",
2363       GNUNET_i2s (peers));
2364
2365   for (cli_ctx_iter = cli_ctx_head;
2366        NULL != cli_ctx_iter;
2367        cli_ctx_iter = cli_ctx_iter->next)
2368   {
2369     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2370         (sub == cli_ctx_iter->sub || sub == msub))
2371     {
2372       send_stream_peers (cli_ctx_iter, num_peers, peers);
2373     }
2374   }
2375 }
2376
2377
2378 /**
2379  * Put random peer from sampler into the view as history update.
2380  *
2381  * @param ids Array of Peers to insert into view
2382  * @param num_peers Number of peers to insert
2383  * @param cls Closure - The Sub for which this is to be done
2384  */
2385 static void
2386 hist_update (const struct GNUNET_PeerIdentity *ids,
2387              uint32_t num_peers,
2388              void *cls)
2389 {
2390   unsigned int i;
2391   struct Sub *sub = cls;
2392
2393   for (i = 0; i < num_peers; i++)
2394   {
2395     int inserted;
2396     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2397     {
2398       LOG (GNUNET_ERROR_TYPE_WARNING,
2399            "Peer in history update not known!\n");
2400       continue;
2401     }
2402     inserted = insert_in_view (sub, &ids[i]);
2403     if (GNUNET_OK == inserted)
2404     {
2405       clients_notify_stream_peer (sub, 1, &ids[i]);
2406     }
2407 #ifdef TO_FILE_FULL
2408     to_file (sub->file_name_view_log,
2409              "+%s\t(hist)",
2410              GNUNET_i2s_full (ids));
2411 #endif /* TO_FILE_FULL */
2412   }
2413   clients_notify_view_update (sub);
2414 }
2415
2416
2417 /**
2418  * Wrapper around #RPS_sampler_resize()
2419  *
2420  * If we do not have enough sampler elements, double current sampler size
2421  * If we have more than enough sampler elements, halv current sampler size
2422  *
2423  * @param sampler The sampler to resize
2424  * @param new_size New size to which to resize
2425  */
2426 static void
2427 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2428 {
2429   unsigned int sampler_size;
2430
2431   // TODO statistics
2432   // TODO respect the min, max
2433   sampler_size = RPS_sampler_get_size (sampler);
2434   if (sampler_size > new_size * 4)
2435   { /* Shrinking */
2436     RPS_sampler_resize (sampler, sampler_size / 2);
2437   }
2438   else if (sampler_size < new_size)
2439   { /* Growing */
2440     RPS_sampler_resize (sampler, sampler_size * 2);
2441   }
2442   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2443 }
2444
2445
2446 /**
2447  * Add all peers in @a peer_array to @a peer_map used as set.
2448  *
2449  * @param peer_array array containing the peers
2450  * @param num_peers number of peers in @peer_array
2451  * @param peer_map the peermap to use as set
2452  */
2453 static void
2454 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2455                        unsigned int num_peers,
2456                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2457 {
2458   unsigned int i;
2459   if (NULL == peer_map)
2460   {
2461     LOG (GNUNET_ERROR_TYPE_WARNING,
2462          "Trying to add peers to non-existing peermap.\n");
2463     return;
2464   }
2465
2466   for (i = 0; i < num_peers; i++)
2467   {
2468     GNUNET_CONTAINER_multipeermap_put (peer_map,
2469                                        &peer_array[i],
2470                                        NULL,
2471                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2472     if (msub->peer_map == peer_map)
2473     {
2474       GNUNET_STATISTICS_set (stats,
2475                             "# known peers",
2476                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2477                             GNUNET_NO);
2478     }
2479   }
2480 }
2481
2482
2483 /**
2484  * Send a PULL REPLY to @a peer_id
2485  *
2486  * @param peer_ctx Context of the peer to send the reply to
2487  * @param peer_ids the peers to send to @a peer_id
2488  * @param num_peer_ids the number of peers to send to @a peer_id
2489  */
2490 static void
2491 send_pull_reply (struct PeerContext *peer_ctx,
2492                  const struct GNUNET_PeerIdentity *peer_ids,
2493                  unsigned int num_peer_ids)
2494 {
2495   uint32_t send_size;
2496   struct GNUNET_MQ_Envelope *ev;
2497   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2498
2499   /* Compute actual size */
2500   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2501               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2502
2503   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2504     /* Compute number of peers to send
2505      * If too long, simply truncate */
2506     // TODO select random ones via permutation
2507     //      or even better: do good protocol design
2508     send_size =
2509       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2510        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2511        sizeof (struct GNUNET_PeerIdentity);
2512   else
2513     send_size = num_peer_ids;
2514
2515   LOG (GNUNET_ERROR_TYPE_DEBUG,
2516       "Going to send PULL REPLY with %u peers to %s\n",
2517       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2518
2519   ev = GNUNET_MQ_msg_extra (out_msg,
2520                             send_size * sizeof (struct GNUNET_PeerIdentity),
2521                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2522   out_msg->num_peers = htonl (send_size);
2523   GNUNET_memcpy (&out_msg[1], peer_ids,
2524          send_size * sizeof (struct GNUNET_PeerIdentity));
2525
2526   send_message (peer_ctx, ev, "PULL REPLY");
2527   if (peer_ctx->sub == msub)
2528   {
2529     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2530   }
2531   // TODO check with send intention: as send_channel is used/opened we indicate
2532   // a sending intention without intending it.
2533   // -> clean peer afterwards?
2534   // -> use recv_channel?
2535 }
2536
2537
2538 /**
2539  * Insert PeerID in #pull_map
2540  *
2541  * Called once we know a peer is online.
2542  *
2543  * @param cls Closure - Sub with the pull map to insert into
2544  * @param peer Peer to insert
2545  */
2546 static void
2547 insert_in_pull_map (void *cls,
2548                     const struct GNUNET_PeerIdentity *peer)
2549 {
2550   struct Sub *sub = cls;
2551
2552   CustomPeerMap_put (sub->pull_map, peer);
2553 }
2554
2555
2556 /**
2557  * Insert PeerID in #view
2558  *
2559  * Called once we know a peer is online.
2560  * Implements #PeerOp
2561  *
2562  * @param cls Closure - Sub with view to insert peer into
2563  * @param peer the peer to insert
2564  */
2565 static void
2566 insert_in_view_op (void *cls,
2567                    const struct GNUNET_PeerIdentity *peer)
2568 {
2569   struct Sub *sub = cls;
2570   int inserted;
2571
2572   inserted = insert_in_view (sub, peer);
2573   if (GNUNET_OK == inserted)
2574   {
2575     clients_notify_stream_peer (sub, 1, peer);
2576   }
2577 }
2578
2579
2580 /**
2581  * Update sampler with given PeerID.
2582  * Implements #PeerOp
2583  *
2584  * @param cls Closure - Sub containing the sampler to insert into
2585  * @param peer Peer to insert
2586  */
2587 static void
2588 insert_in_sampler (void *cls,
2589                    const struct GNUNET_PeerIdentity *peer)
2590 {
2591   struct Sub *sub = cls;
2592
2593   LOG (GNUNET_ERROR_TYPE_DEBUG,
2594        "Updating samplers with peer %s from insert_in_sampler()\n",
2595        GNUNET_i2s (peer));
2596   RPS_sampler_update (sub->sampler, peer);
2597   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2598   {
2599     /* Make sure we 'know' about this peer */
2600     (void) issue_peer_online_check (sub, peer);
2601     /* Establish a channel towards that peer to indicate we are going to send
2602      * messages to it */
2603     //indicate_sending_intention (peer);
2604   }
2605 #ifdef TO_FILE
2606   sub->num_observed_peers++;
2607   GNUNET_CONTAINER_multipeermap_put
2608     (sub->observed_unique_peers,
2609      peer,
2610      NULL,
2611      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2612   uint32_t num_observed_unique_peers =
2613     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2614 #ifdef TO_FILE_FULL
2615   to_file (sub->file_name_observed_log,
2616           "%" PRIu32 " %" PRIu32 " %f\n",
2617           sub->num_observed_peers,
2618           num_observed_unique_peers,
2619           1.0*num_observed_unique_peers/sub->num_observed_peers)
2620 #endif /* TO_FILE_FULL */
2621 #endif /* TO_FILE */
2622 }
2623
2624
2625 /**
2626  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2627  *        If the peer is not known, online check is issued and it is
2628  *        scheduled to be inserted in sampler and view.
2629  *
2630  * "External sources" refer to every source except the gossip.
2631  *
2632  * @param sub Sub for which @a peer was received
2633  * @param peer peer to insert/peer received
2634  */
2635 static void
2636 got_peer (struct Sub *sub,
2637           const struct GNUNET_PeerIdentity *peer)
2638 {
2639   /* If we did not know this peer already, insert it into sampler and view */
2640   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2641   {
2642     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2643                         &insert_in_sampler, sub);
2644     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2645                         &insert_in_view_op, sub);
2646   }
2647   if (sub == msub)
2648   {
2649     GNUNET_STATISTICS_update (stats,
2650                               "# learnd peers",
2651                               1,
2652                               GNUNET_NO);
2653   }
2654 }
2655
2656
2657 /**
2658  * @brief Checks if there is a sending channel and if it is needed
2659  *
2660  * @param peer_ctx Context of the peer to check
2661  * @return GNUNET_YES if sending channel exists and is still needed
2662  *         GNUNET_NO  otherwise
2663  */
2664 static int
2665 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2666 {
2667   /* struct GNUNET_CADET_Channel *channel; */
2668   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2669                                      &peer_ctx->peer_id))
2670   {
2671     return GNUNET_NO;
2672   }
2673   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2674   {
2675     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2676                                     &peer_ctx->peer_id)) ||
2677          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2678                                             &peer_ctx->peer_id)) ||
2679          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2680                                                      &peer_ctx->peer_id)) ||
2681          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2682                                                      &peer_ctx->peer_id)) ||
2683          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2684                                          &peer_ctx->peer_id,
2685                                          Peers_PULL_REPLY_PENDING)))
2686     { /* If we want to keep the connection to peer open */
2687       return GNUNET_YES;
2688     }
2689     return GNUNET_NO;
2690   }
2691   return GNUNET_NO;
2692 }
2693
2694
2695 /**
2696  * @brief remove peer from our knowledge, the view, push and pull maps and
2697  * samplers.
2698  *
2699  * @param sub Sub with the data structures the peer is to be removed from
2700  * @param peer the peer to remove
2701  */
2702 static void
2703 remove_peer (struct Sub *sub,
2704              const struct GNUNET_PeerIdentity *peer)
2705 {
2706   (void) View_remove_peer (sub->view,
2707                            peer);
2708   CustomPeerMap_remove_peer (sub->pull_map,
2709                              peer);
2710   CustomPeerMap_remove_peer (sub->push_map,
2711                              peer);
2712   RPS_sampler_reinitialise_by_value (sub->sampler,
2713                                      peer);
2714   /* We want to destroy the peer now.
2715    * Sometimes, it just seems that it's already been removed from the peer_map,
2716    * so check the peer_map first. */
2717   if (GNUNET_YES == check_peer_known (sub->peer_map,
2718                                       peer))
2719   {
2720     destroy_peer (get_peer_ctx (sub->peer_map,
2721                                 peer));
2722   }
2723 }
2724
2725
2726 /**
2727  * @brief Remove data that is not needed anymore.
2728  *
2729  * If the sending channel is no longer needed it is destroyed.
2730  *
2731  * @param sub Sub in which the current peer is to be cleaned
2732  * @param peer the peer whose data is about to be cleaned
2733  */
2734 static void
2735 clean_peer (struct Sub *sub,
2736             const struct GNUNET_PeerIdentity *peer)
2737 {
2738   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2739                                                                peer)))
2740   {
2741     LOG (GNUNET_ERROR_TYPE_DEBUG,
2742         "Going to remove send channel to peer %s\n",
2743         GNUNET_i2s (peer));
2744     #if ENABLE_MALICIOUS
2745     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer,
2746                                               peer))
2747       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2748                                                     peer));
2749     #else /* ENABLE_MALICIOUS */
2750     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2751                                                   peer));
2752     #endif /* ENABLE_MALICIOUS */
2753   }
2754
2755   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2756                                                            peer))
2757   {
2758     /* Peer was already removed by callback on destroyed channel */
2759     LOG (GNUNET_ERROR_TYPE_WARNING,
2760         "Peer was removed from our knowledge during cleanup\n");
2761     return;
2762   }
2763
2764   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2765                                                               peer))) &&
2766        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2767        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2768        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2769        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2770        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2771   { /* We can safely remove this peer */
2772     LOG (GNUNET_ERROR_TYPE_DEBUG,
2773         "Going to remove peer %s\n",
2774         GNUNET_i2s (peer));
2775     remove_peer (sub, peer);
2776     return;
2777   }
2778 }
2779
2780
2781 /**
2782  * @brief This is called when a channel is destroyed.
2783  *
2784  * Removes peer completely from our knowledge if the send_channel was destroyed
2785  * Otherwise simply delete the recv_channel
2786  * Also check if the knowledge about this peer is still needed.
2787  * If not, remove this peer from our knowledge.
2788  *
2789  * @param cls The closure - Context to the channel
2790  * @param channel The channel being closed
2791  */
2792 static void
2793 cleanup_destroyed_channel (void *cls,
2794                            const struct GNUNET_CADET_Channel *channel)
2795 {
2796   struct ChannelCtx *channel_ctx = cls;
2797   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2798   (void) channel;
2799
2800   channel_ctx->channel = NULL;
2801   remove_channel_ctx (channel_ctx);
2802   if (NULL != peer_ctx &&
2803       peer_ctx->send_channel_ctx == channel_ctx &&
2804       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2805   {
2806     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2807   }
2808 }
2809
2810 /***********************************************************************
2811  * /Util functions
2812 ***********************************************************************/
2813
2814
2815
2816 /***********************************************************************
2817  * Sub
2818 ***********************************************************************/
2819
2820 /**
2821  * @brief Create a new Sub
2822  *
2823  * @param hash Hash of value shared among rps instances on other hosts that
2824  *        defines a subgroup to sample from.
2825  * @param sampler_size Size of the sampler
2826  * @param round_interval Interval (in average) between two rounds
2827  *
2828  * @return Sub
2829  */
2830 struct Sub *
2831 new_sub (const struct GNUNET_HashCode *hash,
2832          uint32_t sampler_size,
2833          struct GNUNET_TIME_Relative round_interval)
2834 {
2835   struct Sub *sub;
2836
2837   sub = GNUNET_new (struct Sub);
2838
2839   /* With the hash generated from the secret value this service only connects
2840    * to rps instances that share the value */
2841   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2842     GNUNET_MQ_hd_fixed_size (peer_check,
2843                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2844                              struct GNUNET_MessageHeader,
2845                              NULL),
2846     GNUNET_MQ_hd_fixed_size (peer_push,
2847                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2848                              struct GNUNET_MessageHeader,
2849                              NULL),
2850     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2851                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2852                              struct GNUNET_MessageHeader,
2853                              NULL),
2854     GNUNET_MQ_hd_var_size (peer_pull_reply,
2855                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2856                            struct GNUNET_RPS_P2P_PullReplyMessage,
2857                            NULL),
2858     GNUNET_MQ_handler_end ()
2859   };
2860   sub->hash = *hash;
2861   sub->cadet_port =
2862     GNUNET_CADET_open_port (cadet_handle,
2863                             &sub->hash,
2864                             &handle_inbound_channel, /* Connect handler */
2865                             sub, /* cls */
2866                             NULL, /* WindowSize handler */
2867                             &cleanup_destroyed_channel, /* Disconnect handler */
2868                             cadet_handlers);
2869   if (NULL == sub->cadet_port)
2870   {
2871     LOG (GNUNET_ERROR_TYPE_ERROR,
2872         "Cadet port `%s' is already in use.\n",
2873         GNUNET_APPLICATION_PORT_RPS);
2874     GNUNET_assert (0);
2875   }
2876
2877   /* Set up general data structure to keep track about peers */
2878   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2879   if (GNUNET_OK !=
2880       GNUNET_CONFIGURATION_get_value_filename (cfg,
2881                                                "rps",
2882                                                "FILENAME_VALID_PEERS",
2883                                                &sub->filename_valid_peers))
2884   {
2885     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2886                                "rps",
2887                                "FILENAME_VALID_PEERS");
2888   }
2889   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2890   {
2891     char *tmp_filename_valid_peers;
2892     char str_hash[105];
2893
2894     GNUNET_snprintf (str_hash,
2895                      sizeof (str_hash),
2896                      GNUNET_h2s_full (hash));
2897     tmp_filename_valid_peers = sub->filename_valid_peers;
2898     GNUNET_asprintf (&sub->filename_valid_peers,
2899                      "%s%s",
2900                      tmp_filename_valid_peers,
2901                      str_hash);
2902     GNUNET_free (tmp_filename_valid_peers);
2903   }
2904   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2905
2906   /* Set up the sampler */
2907   sub->sampler_size_est_min = sampler_size;
2908   sub->sampler_size_est_need = sampler_size;;
2909   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2910   GNUNET_assert (0 != round_interval.rel_value_us);
2911   sub->round_interval = round_interval;
2912   sub->sampler = RPS_sampler_init (sampler_size,
2913                                   round_interval);
2914
2915   /* Logging of internals */
2916 #ifdef TO_FILE_FULL
2917   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2918 #endif /* TO_FILE_FULL */
2919 #ifdef TO_FILE
2920 #ifdef TO_FILE_FULL
2921   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2922                                                        "observed");
2923 #endif /* TO_FILE_FULL */
2924   sub->num_observed_peers = 0;
2925   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2926                                                                     GNUNET_NO);
2927 #endif /* TO_FILE */
2928
2929   /* Set up data structures for gossip */
2930   sub->push_map = CustomPeerMap_create (4);
2931   sub->pull_map = CustomPeerMap_create (4);
2932   sub->view_size_est_min = sampler_size;;
2933   sub->view = View_create (sub->view_size_est_min);
2934   if (sub == msub)
2935   {
2936     GNUNET_STATISTICS_set (stats,
2937                            "view size aim",
2938                            sub->view_size_est_min,
2939                            GNUNET_NO);
2940   }
2941
2942   /* Start executing rounds */
2943   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2944
2945   return sub;
2946 }
2947
2948
2949 #ifdef TO_FILE
2950 /**
2951  * @brief Write all numbers in the given array into the given file
2952  *
2953  * Single numbers devided by a newline
2954  *
2955  * @param hist_array[] the array to dump
2956  * @param file_name file to dump into
2957  */
2958 static void
2959 write_histogram_to_file (const uint32_t hist_array[],
2960                          const char *file_name)
2961 {
2962   char collect_str[SIZE_DUMP_FILE + 1] = "";
2963   char *recv_str_iter;
2964   char *file_name_full;
2965
2966   recv_str_iter = collect_str;
2967   file_name_full = store_prefix_file_name (&own_identity,
2968                                            file_name);
2969   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2970   {
2971     char collect_str_tmp[8];
2972
2973     GNUNET_snprintf (collect_str_tmp,
2974                      sizeof (collect_str_tmp),
2975                      "%" PRIu32 "\n",
2976                      hist_array[i]);
2977     recv_str_iter = stpncpy (recv_str_iter,
2978                              collect_str_tmp,
2979                              6);
2980   }
2981   (void) stpcpy (recv_str_iter,
2982                  "\n");
2983   LOG (GNUNET_ERROR_TYPE_DEBUG,
2984        "Writing push stats to disk\n");
2985   to_file_w_len (file_name_full,
2986                  SIZE_DUMP_FILE,
2987                  collect_str);
2988   GNUNET_free (file_name_full);
2989 }
2990 #endif /* TO_FILE */
2991
2992
2993 /**
2994  * @brief Destroy Sub.
2995  *
2996  * @param sub Sub to destroy
2997  */
2998 static void
2999 destroy_sub (struct Sub *sub)
3000 {
3001   GNUNET_assert (NULL != sub);
3002   GNUNET_assert (NULL != sub->do_round_task);
3003   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3004   sub->do_round_task = NULL;
3005
3006   /* Disconnect from cadet */
3007   GNUNET_CADET_close_port (sub->cadet_port);
3008   sub->cadet_port= NULL;
3009
3010   /* Clean up data structures for peers */
3011   RPS_sampler_destroy (sub->sampler);
3012   sub->sampler = NULL;
3013   View_destroy (sub->view);
3014   sub->view = NULL;
3015   CustomPeerMap_destroy (sub->push_map);
3016   sub->push_map = NULL;
3017   CustomPeerMap_destroy (sub->pull_map);
3018   sub->pull_map = NULL;
3019   peers_terminate (sub);
3020
3021   /* Free leftover data structures */
3022 #ifdef TO_FILE_FULL
3023   GNUNET_free (sub->file_name_view_log);
3024   sub->file_name_view_log = NULL;
3025 #endif /* TO_FILE_FULL */
3026 #ifdef TO_FILE
3027 #ifdef TO_FILE_FULL
3028   GNUNET_free (sub->file_name_observed_log);
3029   sub->file_name_observed_log = NULL;
3030 #endif /* TO_FILE_FULL */
3031
3032   /* Write push frequencies to disk */
3033   write_histogram_to_file (sub->push_recv,
3034                            "push_recv");
3035
3036   /* Write push deltas to disk */
3037   write_histogram_to_file (sub->push_delta,
3038                            "push_delta");
3039
3040   /* Write pull delays to disk */
3041   write_histogram_to_file (sub->pull_delays,
3042                            "pull_delays");
3043
3044   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3045   sub->observed_unique_peers = NULL;
3046 #endif /* TO_FILE */
3047
3048   GNUNET_free (sub);
3049 }
3050
3051
3052 /***********************************************************************
3053  * /Sub
3054 ***********************************************************************/
3055
3056
3057 /***********************************************************************
3058  * Core handlers
3059 ***********************************************************************/
3060
3061 /**
3062  * @brief Callback on initialisation of Core.
3063  *
3064  * @param cls - unused
3065  * @param my_identity - unused
3066  */
3067 void
3068 core_init (void *cls,
3069            const struct GNUNET_PeerIdentity *my_identity)
3070 {
3071   (void) cls;
3072   (void) my_identity;
3073
3074   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3075 }
3076
3077
3078 /**
3079  * @brief Callback for core.
3080  * Method called whenever a given peer connects.
3081  *
3082  * @param cls closure - unused
3083  * @param peer peer identity this notification is about
3084  * @return closure given to #core_disconnects as peer_cls
3085  */
3086 void *
3087 core_connects (void *cls,
3088                const struct GNUNET_PeerIdentity *peer,
3089                struct GNUNET_MQ_Handle *mq)
3090 {
3091   (void) cls;
3092   (void) mq;
3093
3094   GNUNET_assert (GNUNET_YES ==
3095                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3096                                                     peer,
3097                                                     NULL,
3098                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3099   return NULL;
3100 }
3101
3102
3103 /**
3104  * @brief Callback for core.
3105  * Method called whenever a peer disconnects.
3106  *
3107  * @param cls closure - unused
3108  * @param peer peer identity this notification is about
3109  * @param peer_cls closure given in #core_connects - unused
3110  */
3111 void
3112 core_disconnects (void *cls,
3113                   const struct GNUNET_PeerIdentity *peer,
3114                   void *peer_cls)
3115 {
3116   (void) cls;
3117   (void) peer_cls;
3118
3119   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3120 }
3121
3122 /***********************************************************************
3123  * /Core handlers
3124 ***********************************************************************/
3125
3126
3127 /**
3128  * @brief Destroy the context for a (connected) client
3129  *
3130  * @param cli_ctx Context to destroy
3131  */
3132 static void
3133 destroy_cli_ctx (struct ClientContext *cli_ctx)
3134 {
3135   GNUNET_assert (NULL != cli_ctx);
3136   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3137                                cli_ctx_tail,
3138                                cli_ctx);
3139   if (NULL != cli_ctx->sub)
3140   {
3141     destroy_sub (cli_ctx->sub);
3142     cli_ctx->sub = NULL;
3143   }
3144   GNUNET_free (cli_ctx);
3145 }
3146
3147
3148 /**
3149  * @brief Update sizes in sampler and view on estimate update from nse service
3150  *
3151  * @param sub Sub
3152  * @param logestimate the log(Base 2) value of the current network size estimate
3153  * @param std_dev standard deviation for the estimate
3154  */
3155 static void
3156 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3157 {
3158   double estimate;
3159   //double scale; // TODO this might go gloabal/config
3160
3161   LOG (GNUNET_ERROR_TYPE_DEBUG,
3162        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3163        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3164   //scale = .01;
3165   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3166   // GNUNET_NSE_log_estimate_to_n (logestimate);
3167   estimate = pow (estimate, 1.0 / 3);
3168   // TODO add if std_dev is a number
3169   // estimate += (std_dev * scale);
3170   if (sub->view_size_est_min < ceil (estimate))
3171   {
3172     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3173     sub->sampler_size_est_need = estimate;
3174     sub->view_size_est_need = estimate;
3175   } else
3176   {
3177     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3178     //sub->sampler_size_est_need = sub->view_size_est_min;
3179     sub->view_size_est_need = sub->view_size_est_min;
3180   }
3181   if (sub == msub)
3182   {
3183     GNUNET_STATISTICS_set (stats,
3184                            "view size aim",
3185                            sub->view_size_est_need,
3186                            GNUNET_NO);
3187   }
3188
3189   /* If the NSE has changed adapt the lists accordingly */
3190   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3191   View_change_len (sub->view, sub->view_size_est_need);
3192 }
3193
3194
3195 /**
3196  * Function called by NSE.
3197  *
3198  * Updates sizes of sampler list and view and adapt those lists
3199  * accordingly.
3200  *
3201  * implements #GNUNET_NSE_Callback
3202  *
3203  * @param cls Closure - unused
3204  * @param timestamp time when the estimate was received from the server (or created by the server)
3205  * @param logestimate the log(Base 2) value of the current network size estimate
3206  * @param std_dev standard deviation for the estimate
3207  */
3208 static void
3209 nse_callback (void *cls,
3210               struct GNUNET_TIME_Absolute timestamp,
3211               double logestimate, double std_dev)
3212 {
3213   (void) cls;
3214   (void) timestamp;
3215   struct ClientContext *cli_ctx_iter;
3216
3217   adapt_sizes (msub, logestimate, std_dev);
3218   for (cli_ctx_iter = cli_ctx_head;
3219       NULL != cli_ctx_iter;
3220       cli_ctx_iter = cli_ctx_iter->next)
3221   {
3222     if (NULL != cli_ctx_iter->sub)
3223     {
3224       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3225     }
3226   }
3227 }
3228
3229
3230 /**
3231  * @brief This function is called, when the client seeds peers.
3232  * It verifies that @a msg is well-formed.
3233  *
3234  * @param cls the closure (#ClientContext)
3235  * @param msg the message
3236  * @return #GNUNET_OK if @a msg is well-formed
3237  *         #GNUNET_SYSERR otherwise
3238  */
3239 static int
3240 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3241 {
3242   struct ClientContext *cli_ctx = cls;
3243   uint16_t msize = ntohs (msg->header.size);
3244   uint32_t num_peers = ntohl (msg->num_peers);
3245
3246   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3247   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3248        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3249   {
3250     LOG (GNUNET_ERROR_TYPE_ERROR,
3251         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3252         ntohl (msg->num_peers),
3253         (msize / sizeof (struct GNUNET_PeerIdentity)));
3254     GNUNET_break (0);
3255     GNUNET_SERVICE_client_drop (cli_ctx->client);
3256     return GNUNET_SYSERR;
3257   }
3258   return GNUNET_OK;
3259 }
3260
3261
3262 /**
3263  * Handle seed from the client.
3264  *
3265  * @param cls closure
3266  * @param message the actual message
3267  */
3268 static void
3269 handle_client_seed (void *cls,
3270                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3271 {
3272   struct ClientContext *cli_ctx = cls;
3273   struct GNUNET_PeerIdentity *peers;
3274   uint32_t num_peers;
3275   uint32_t i;
3276
3277   num_peers = ntohl (msg->num_peers);
3278   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3279
3280   LOG (GNUNET_ERROR_TYPE_DEBUG,
3281        "Client seeded peers:\n");
3282   print_peer_list (peers, num_peers);
3283
3284   for (i = 0; i < num_peers; i++)
3285   {
3286     LOG (GNUNET_ERROR_TYPE_DEBUG,
3287          "Updating samplers with seed %" PRIu32 ": %s\n",
3288          i,
3289          GNUNET_i2s (&peers[i]));
3290
3291     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3292     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3293   }
3294   GNUNET_SERVICE_client_continue (cli_ctx->client);
3295 }
3296
3297
3298 /**
3299  * Handle RPS request from the client.
3300  *
3301  * @param cls Client context
3302  * @param message Message containing the numer of updates the client wants to
3303  * receive
3304  */
3305 static void
3306 handle_client_view_request (void *cls,
3307                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3308 {
3309   struct ClientContext *cli_ctx = cls;
3310   uint64_t num_updates;
3311
3312   num_updates = ntohl (msg->num_updates);
3313
3314   LOG (GNUNET_ERROR_TYPE_DEBUG,
3315        "Client requested %" PRIu64 " updates of view.\n",
3316        num_updates);
3317
3318   GNUNET_assert (NULL != cli_ctx);
3319   cli_ctx->view_updates_left = num_updates;
3320   send_view (cli_ctx, NULL, 0);
3321   GNUNET_SERVICE_client_continue (cli_ctx->client);
3322 }
3323
3324
3325 /**
3326  * @brief Handle the cancellation of the view updates.
3327  *
3328  * @param cls The client context
3329  * @param msg Unused
3330  */
3331 static void
3332 handle_client_view_cancel (void *cls,
3333                            const struct GNUNET_MessageHeader *msg)
3334 {
3335   struct ClientContext *cli_ctx = cls;
3336   (void) msg;
3337
3338   LOG (GNUNET_ERROR_TYPE_DEBUG,
3339        "Client does not want to receive updates of view any more.\n");
3340
3341   GNUNET_assert (NULL != cli_ctx);
3342   cli_ctx->view_updates_left = 0;
3343   GNUNET_SERVICE_client_continue (cli_ctx->client);
3344   if (GNUNET_YES == cli_ctx->stream_update)
3345   {
3346     destroy_cli_ctx (cli_ctx);
3347   }
3348 }
3349
3350
3351 /**
3352  * Handle RPS request for biased stream from the client.
3353  *
3354  * @param cls Client context
3355  * @param message unused
3356  */
3357 static void
3358 handle_client_stream_request (void *cls,
3359                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3360 {
3361   struct ClientContext *cli_ctx = cls;
3362   (void) msg;
3363
3364   LOG (GNUNET_ERROR_TYPE_DEBUG,
3365        "Client requested peers from biased stream.\n");
3366   cli_ctx->stream_update = GNUNET_YES;
3367
3368   GNUNET_assert (NULL != cli_ctx);
3369   GNUNET_SERVICE_client_continue (cli_ctx->client);
3370 }
3371
3372
3373 /**
3374  * @brief Handles the cancellation of the stream of biased peer ids
3375  *
3376  * @param cls The client context
3377  * @param msg unused
3378  */
3379 static void
3380 handle_client_stream_cancel (void *cls,
3381                              const struct GNUNET_MessageHeader *msg)
3382 {
3383   struct ClientContext *cli_ctx = cls;
3384   (void) msg;
3385
3386   LOG (GNUNET_ERROR_TYPE_DEBUG,
3387        "Client canceled receiving peers from biased stream.\n");
3388   cli_ctx->stream_update = GNUNET_NO;
3389
3390   GNUNET_assert (NULL != cli_ctx);
3391   GNUNET_SERVICE_client_continue (cli_ctx->client);
3392 }
3393
3394
3395 /**
3396  * @brief Create and start a Sub.
3397  *
3398  * @param cls Closure - unused
3399  * @param msg Message containing the necessary information
3400  */
3401 static void
3402 handle_client_start_sub (void *cls,
3403                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3404 {
3405   struct ClientContext *cli_ctx = cls;
3406
3407   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3408   if (NULL != cli_ctx->sub &&
3409       0 != memcmp (&cli_ctx->sub->hash,
3410                    &msg->hash,
3411                    sizeof (struct GNUNET_HashCode)))
3412   {
3413     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3414     destroy_sub (cli_ctx->sub);
3415     cli_ctx->sub = NULL;
3416   }
3417   cli_ctx->sub = new_sub (&msg->hash,
3418                          msub->sampler_size_est_min, // TODO make api input?
3419                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3420   GNUNET_SERVICE_client_continue (cli_ctx->client);
3421 }
3422
3423
3424 /**
3425  * @brief Destroy the Sub
3426  *
3427  * @param cls Closure - unused
3428  * @param msg Message containing the hash that identifies the Sub
3429  */
3430 static void
3431 handle_client_stop_sub (void *cls,
3432                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3433 {
3434   struct ClientContext *cli_ctx = cls;
3435
3436   GNUNET_assert (NULL != cli_ctx->sub);
3437   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3438   {
3439     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3440   }
3441   destroy_sub (cli_ctx->sub);
3442   cli_ctx->sub = NULL;
3443   GNUNET_SERVICE_client_continue (cli_ctx->client);
3444 }
3445
3446
3447 /**
3448  * Handle a CHECK_LIVE message from another peer.
3449  *
3450  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3451  * the channel is blocked for all other communication.
3452  *
3453  * @param cls Closure - Context of channel
3454  * @param msg Message - unused
3455  */
3456 static void
3457 handle_peer_check (void *cls,
3458                    const struct GNUNET_MessageHeader *msg)
3459 {
3460   const struct ChannelCtx *channel_ctx = cls;
3461   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3462   (void) msg;
3463
3464   LOG (GNUNET_ERROR_TYPE_DEBUG,
3465       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3466   if (channel_ctx->peer_ctx->sub == msub)
3467   {
3468     GNUNET_STATISTICS_update (stats,
3469                               "# pending online checks",
3470                               -1,
3471                               GNUNET_NO);
3472   }
3473
3474   GNUNET_CADET_receive_done (channel_ctx->channel);
3475 }
3476
3477
3478 /**
3479  * Handle a PUSH message from another peer.
3480  *
3481  * Check the proof of work and store the PeerID
3482  * in the temporary list for pushed PeerIDs.
3483  *
3484  * @param cls Closure - Context of channel
3485  * @param msg Message - unused
3486  */
3487 static void
3488 handle_peer_push (void *cls,
3489                   const struct GNUNET_MessageHeader *msg)
3490 {
3491   const struct ChannelCtx *channel_ctx = cls;
3492   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3493   (void) msg;
3494
3495   // (check the proof of work (?))
3496
3497   LOG (GNUNET_ERROR_TYPE_DEBUG,
3498        "Received PUSH (%s)\n",
3499        GNUNET_i2s (peer));
3500   if (channel_ctx->peer_ctx->sub == msub)
3501   {
3502     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3503     if (NULL != map_single_hop &&
3504         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3505                                                              peer))
3506     {
3507       GNUNET_STATISTICS_update (stats,
3508                                 "# push message received (multi-hop peer)",
3509                                 1,
3510                                 GNUNET_NO);
3511     }
3512   }
3513
3514   #if ENABLE_MALICIOUS
3515   struct AttackedPeer *tmp_att_peer;
3516
3517   if ( (1 == mal_type) ||
3518        (3 == mal_type) )
3519   { /* Try to maximise representation */
3520     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3521     tmp_att_peer->peer_id = *peer;
3522     if (NULL == att_peer_set)
3523       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3524     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3525                                                              peer))
3526     {
3527       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3528                                    att_peers_tail,
3529                                    tmp_att_peer);
3530       add_peer_array_to_set (peer, 1, att_peer_set);
3531     }
3532     else
3533     {
3534       GNUNET_free (tmp_att_peer);
3535     }
3536   }
3537
3538
3539   else if (2 == mal_type)
3540   {
3541     /* We attack one single well-known peer - simply ignore */
3542   }
3543   #endif /* ENABLE_MALICIOUS */
3544
3545   /* Add the sending peer to the push_map */
3546   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3547
3548   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3549                                      &channel_ctx->peer_ctx->peer_id));
3550   GNUNET_CADET_receive_done (channel_ctx->channel);
3551 }
3552
3553
3554 /**
3555  * Handle PULL REQUEST request message from another peer.
3556  *
3557  * Reply with the view of PeerIDs.
3558  *
3559  * @param cls Closure - Context of channel
3560  * @param msg Message - unused
3561  */
3562 static void
3563 handle_peer_pull_request (void *cls,
3564                           const struct GNUNET_MessageHeader *msg)
3565 {
3566   const struct ChannelCtx *channel_ctx = cls;
3567   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3568   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3569   const struct GNUNET_PeerIdentity *view_array;
3570   (void) msg;
3571
3572   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3573   if (peer_ctx->sub == msub)
3574   {
3575     GNUNET_STATISTICS_update(stats,
3576                              "# pull request message received",
3577                              1,
3578                              GNUNET_NO);
3579     if (NULL != map_single_hop &&
3580         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3581                                                              &peer_ctx->peer_id))
3582     {
3583       GNUNET_STATISTICS_update (stats,
3584                                 "# pull request message received (multi-hop peer)",
3585                                 1,
3586                                 GNUNET_NO);
3587     }
3588   }
3589
3590   #if ENABLE_MALICIOUS
3591   if (1 == mal_type
3592       || 3 == mal_type)
3593   { /* Try to maximise representation */
3594     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3595   }
3596
3597   else if (2 == mal_type)
3598   { /* Try to partition network */
3599     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3600     {
3601       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3602     }
3603   }
3604   #endif /* ENABLE_MALICIOUS */
3605
3606   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3607                                      &channel_ctx->peer_ctx->peer_id));
3608   GNUNET_CADET_receive_done (channel_ctx->channel);
3609   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3610   send_pull_reply (peer_ctx,
3611                    view_array,
3612                    View_size (channel_ctx->peer_ctx->sub->view));
3613 }
3614
3615
3616 /**
3617  * Check whether we sent a corresponding request and
3618  * whether this reply is the first one.
3619  *
3620  * @param cls Closure - Context of channel
3621  * @param msg Message containing the replied peers
3622  */
3623 static int
3624 check_peer_pull_reply (void *cls,
3625                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3626 {
3627   struct ChannelCtx *channel_ctx = cls;
3628   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3629
3630   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3631   {
3632     GNUNET_break_op (0);
3633     return GNUNET_SYSERR;
3634   }
3635
3636   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3637       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3638   {
3639     LOG (GNUNET_ERROR_TYPE_ERROR,
3640         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3641         ntohl (msg->num_peers),
3642         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3643             sizeof (struct GNUNET_PeerIdentity));
3644     GNUNET_break_op (0);
3645     return GNUNET_SYSERR;
3646   }
3647
3648   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3649                                      &sender_ctx->peer_id,
3650                                      Peers_PULL_REPLY_PENDING))
3651   {
3652     LOG (GNUNET_ERROR_TYPE_WARNING,
3653         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3654         GNUNET_i2s (&sender_ctx->peer_id));
3655     if (sender_ctx->sub == msub)
3656     {
3657       GNUNET_STATISTICS_update (stats,
3658                                 "# unrequested pull replies",
3659                                 1,
3660                                 GNUNET_NO);
3661     }
3662   }
3663   return GNUNET_OK;
3664 }
3665
3666
3667 /**
3668  * Handle PULL REPLY message from another peer.
3669  *
3670  * @param cls Closure
3671  * @param msg The message header
3672  */
3673 static void
3674 handle_peer_pull_reply (void *cls,
3675                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3676 {
3677   const struct ChannelCtx *channel_ctx = cls;
3678   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3679   const struct GNUNET_PeerIdentity *peers;
3680   struct Sub *sub = channel_ctx->peer_ctx->sub;
3681   uint32_t i;
3682 #if ENABLE_MALICIOUS
3683   struct AttackedPeer *tmp_att_peer;
3684 #endif /* ENABLE_MALICIOUS */
3685
3686   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3687   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3688   if (channel_ctx->peer_ctx->sub == msub)
3689   {
3690     GNUNET_STATISTICS_update (stats,
3691                               "# pull reply messages received",
3692                               1,
3693                               GNUNET_NO);
3694     if (NULL != map_single_hop &&
3695         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3696           &channel_ctx->peer_ctx->peer_id))
3697     {
3698       GNUNET_STATISTICS_update (stats,
3699                                 "# pull reply messages received (multi-hop peer)",
3700                                 1,
3701                                 GNUNET_NO);
3702     }
3703   }
3704
3705   #if ENABLE_MALICIOUS
3706   // We shouldn't even receive pull replies as we're not sending
3707   if (2 == mal_type)
3708   {
3709   }
3710   #endif /* ENABLE_MALICIOUS */
3711
3712   /* Do actual logic */
3713   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3714
3715   LOG (GNUNET_ERROR_TYPE_DEBUG,
3716        "PULL REPLY received, got following %u peers:\n",
3717        ntohl (msg->num_peers));
3718
3719   for (i = 0; i < ntohl (msg->num_peers); i++)
3720   {
3721     LOG (GNUNET_ERROR_TYPE_DEBUG,
3722          "%u. %s\n",
3723          i,
3724          GNUNET_i2s (&peers[i]));
3725
3726     #if ENABLE_MALICIOUS
3727     if ((NULL != att_peer_set) &&
3728         (1 == mal_type || 3 == mal_type))
3729     { /* Add attacked peer to local list */
3730       // TODO check if we sent a request and this was the first reply
3731       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3732                                                                &peers[i])
3733           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3734                                                                   &peers[i]))
3735       {
3736         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3737         tmp_att_peer->peer_id = peers[i];
3738         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3739                                      att_peers_tail,
3740                                      tmp_att_peer);
3741         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3742       }
3743       continue;
3744     }
3745     #endif /* ENABLE_MALICIOUS */
3746     /* Make sure we 'know' about this peer */
3747     (void) insert_peer (channel_ctx->peer_ctx->sub,
3748                         &peers[i]);
3749
3750     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3751                                         &peers[i]))
3752     {
3753       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3754                          &peers[i]);
3755     }
3756     else
3757     {
3758       schedule_operation (channel_ctx->peer_ctx,
3759                           insert_in_pull_map,
3760                           channel_ctx->peer_ctx->sub); /* cls */
3761       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3762                                       &peers[i]);
3763     }
3764   }
3765
3766   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3767                                  sender),
3768                    Peers_PULL_REPLY_PENDING);
3769   clean_peer (channel_ctx->peer_ctx->sub,
3770               sender);
3771
3772   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3773                                      sender));
3774   GNUNET_CADET_receive_done (channel_ctx->channel);
3775 }
3776
3777
3778 /**
3779  * Compute a random delay.
3780  * A uniformly distributed value between mean + spread and mean - spread.
3781  *
3782  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3783  * It would return a random value between 2 and 6 min.
3784  *
3785  * @param mean the mean time until the next round
3786  * @param spread the inverse amount of deviation from the mean
3787  */
3788 static struct GNUNET_TIME_Relative
3789 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3790                     unsigned int spread)
3791 {
3792   struct GNUNET_TIME_Relative half_interval;
3793   struct GNUNET_TIME_Relative ret;
3794   unsigned int rand_delay;
3795   unsigned int max_rand_delay;
3796
3797   if (0 == spread)
3798   {
3799     LOG (GNUNET_ERROR_TYPE_WARNING,
3800          "Not accepting spread of 0\n");
3801     GNUNET_break (0);
3802     GNUNET_assert (0);
3803   }
3804   GNUNET_assert (0 != mean.rel_value_us);
3805
3806   /* Compute random time value between spread * mean and spread * mean */
3807   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3808
3809   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3810   /**
3811    * Compute random value between (0 and 1) * round_interval
3812    * via multiplying round_interval with a 'fraction' (0 to value)/value
3813    */
3814   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3815   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3816   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3817   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3818
3819   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3820     LOG (GNUNET_ERROR_TYPE_WARNING,
3821          "Returning FOREVER_REL\n");
3822
3823   return ret;
3824 }
3825
3826
3827 /**
3828  * Send single pull request
3829  *
3830  * @param peer_ctx Context to the peer to send request to
3831  */
3832 static void
3833 send_pull_request (struct PeerContext *peer_ctx)
3834 {
3835   struct GNUNET_MQ_Envelope *ev;
3836
3837   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3838                                                &peer_ctx->peer_id,
3839                                                Peers_PULL_REPLY_PENDING));
3840   SET_PEER_FLAG (peer_ctx,
3841                  Peers_PULL_REPLY_PENDING);
3842   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3843
3844   LOG (GNUNET_ERROR_TYPE_DEBUG,
3845        "Going to send PULL REQUEST to peer %s.\n",
3846        GNUNET_i2s (&peer_ctx->peer_id));
3847
3848   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3849   send_message (peer_ctx,
3850                 ev,
3851                 "PULL REQUEST");
3852   if (peer_ctx->sub)
3853   {
3854     GNUNET_STATISTICS_update (stats,
3855                               "# pull request send issued",
3856                               1,
3857                               GNUNET_NO);
3858     if (NULL != map_single_hop &&
3859         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3860                                                              &peer_ctx->peer_id))
3861     {
3862       GNUNET_STATISTICS_update (stats,
3863                                 "# pull request send issued (multi-hop peer)",
3864                                 1,
3865                                 GNUNET_NO);
3866     }
3867   }
3868 }
3869
3870
3871 /**
3872  * Send single push
3873  *
3874  * @param peer_ctx Context of peer to send push to
3875  */
3876 static void
3877 send_push (struct PeerContext *peer_ctx)
3878 {
3879   struct GNUNET_MQ_Envelope *ev;
3880
3881   LOG (GNUNET_ERROR_TYPE_DEBUG,
3882        "Going to send PUSH to peer %s.\n",
3883        GNUNET_i2s (&peer_ctx->peer_id));
3884
3885   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3886   send_message (peer_ctx, ev, "PUSH");
3887   if (peer_ctx->sub)
3888   {
3889     GNUNET_STATISTICS_update (stats,
3890                               "# push send issued",
3891                               1,
3892                               GNUNET_NO);
3893     if (NULL != map_single_hop &&
3894         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3895                                                              &peer_ctx->peer_id))
3896     {
3897       GNUNET_STATISTICS_update (stats,
3898                                 "# push send issued (multi-hop peer)",
3899                                 1,
3900                                 GNUNET_NO);
3901     }
3902   }
3903 }
3904
3905
3906 #if ENABLE_MALICIOUS
3907
3908
3909 /**
3910  * @brief This function is called, when the client tells us to act malicious.
3911  * It verifies that @a msg is well-formed.
3912  *
3913  * @param cls the closure (#ClientContext)
3914  * @param msg the message
3915  * @return #GNUNET_OK if @a msg is well-formed
3916  */
3917 static int
3918 check_client_act_malicious (void *cls,
3919                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3920 {
3921   struct ClientContext *cli_ctx = cls;
3922   uint16_t msize = ntohs (msg->header.size);
3923   uint32_t num_peers = ntohl (msg->num_peers);
3924
3925   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3926   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3927        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3928   {
3929     LOG (GNUNET_ERROR_TYPE_ERROR,
3930         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3931         ntohl (msg->num_peers),
3932         (msize / sizeof (struct GNUNET_PeerIdentity)));
3933     GNUNET_break (0);
3934     GNUNET_SERVICE_client_drop (cli_ctx->client);
3935     return GNUNET_SYSERR;
3936   }
3937   return GNUNET_OK;
3938 }
3939
3940 /**
3941  * Turn RPS service to act malicious.
3942  *
3943  * @param cls Closure
3944  * @param client The client that sent the message
3945  * @param msg The message header
3946  */
3947 static void
3948 handle_client_act_malicious (void *cls,
3949                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3950 {
3951   struct ClientContext *cli_ctx = cls;
3952   struct GNUNET_PeerIdentity *peers;
3953   uint32_t num_mal_peers_sent;
3954   uint32_t num_mal_peers_old;
3955   struct Sub *sub = cli_ctx->sub;
3956
3957   if (NULL == sub) sub = msub;
3958   /* Do actual logic */
3959   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3960   mal_type = ntohl (msg->type);
3961   if (NULL == mal_peer_set)
3962     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3963
3964   LOG (GNUNET_ERROR_TYPE_DEBUG,
3965        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3966        mal_type,
3967        ntohl (msg->num_peers));
3968
3969   if (1 == mal_type)
3970   { /* Try to maximise representation */
3971     /* Add other malicious peers to those we already know */
3972
3973     num_mal_peers_sent = ntohl (msg->num_peers);
3974     num_mal_peers_old = num_mal_peers;
3975     GNUNET_array_grow (mal_peers,
3976                        num_mal_peers,
3977                        num_mal_peers + num_mal_peers_sent);
3978     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3979             peers,
3980             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3981
3982     /* Add all mal peers to mal_peer_set */
3983     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3984                            num_mal_peers_sent,
3985                            mal_peer_set);
3986
3987     /* Substitute do_round () with do_mal_round () */
3988     GNUNET_assert (NULL != sub->do_round_task);
3989     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3990     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3991   }
3992
3993   else if ( (2 == mal_type) ||
3994             (3 == mal_type) )
3995   { /* Try to partition the network */
3996     /* Add other malicious peers to those we already know */
3997
3998     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3999     num_mal_peers_old = num_mal_peers;
4000     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4001     GNUNET_array_grow (mal_peers,
4002                        num_mal_peers,
4003                        num_mal_peers + num_mal_peers_sent);
4004     if (NULL != mal_peers &&
4005         0 != num_mal_peers)
4006     {
4007       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4008               peers,
4009               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
4010
4011       /* Add all mal peers to mal_peer_set */
4012       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4013                              num_mal_peers_sent,
4014                              mal_peer_set);
4015     }
4016
4017     /* Store the one attacked peer */
4018     GNUNET_memcpy (&attacked_peer,
4019             &msg->attacked_peer,
4020             sizeof (struct GNUNET_PeerIdentity));
4021     /* Set the flag of the attacked peer to valid to avoid problems */
4022     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4023     {
4024       (void) issue_peer_online_check (sub, &attacked_peer);
4025     }
4026
4027     LOG (GNUNET_ERROR_TYPE_DEBUG,
4028          "Attacked peer is %s\n",
4029          GNUNET_i2s (&attacked_peer));
4030
4031     /* Substitute do_round () with do_mal_round () */
4032     if (NULL != sub->do_round_task)
4033     {
4034       /* Probably in shutdown */
4035       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4036       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4037     }
4038   }
4039   else if (0 == mal_type)
4040   { /* Stop acting malicious */
4041     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4042
4043     /* Substitute do_mal_round () with do_round () */
4044     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4045     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4046   }
4047   else
4048   {
4049     GNUNET_break (0);
4050     GNUNET_SERVICE_client_continue (cli_ctx->client);
4051   }
4052   GNUNET_SERVICE_client_continue (cli_ctx->client);
4053 }
4054
4055
4056 /**
4057  * Send out PUSHes and PULLs maliciously.
4058  *
4059  * This is executed regylary.
4060  *
4061  * @param cls Closure - Sub
4062  */
4063 static void
4064 do_mal_round (void *cls)
4065 {
4066   uint32_t num_pushes;
4067   uint32_t i;
4068   struct GNUNET_TIME_Relative time_next_round;
4069   struct AttackedPeer *tmp_att_peer;
4070   struct Sub *sub = cls;
4071
4072   LOG (GNUNET_ERROR_TYPE_DEBUG,
4073        "Going to execute next round maliciously type %" PRIu32 ".\n",
4074       mal_type);
4075   sub->do_round_task = NULL;
4076   GNUNET_assert (mal_type <= 3);
4077   /* Do malicious actions */
4078   if (1 == mal_type)
4079   { /* Try to maximise representation */
4080
4081     /* The maximum of pushes we're going to send this round */
4082     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4083                                          num_attacked_peers),
4084                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4085
4086     LOG (GNUNET_ERROR_TYPE_DEBUG,
4087          "Going to send %" PRIu32 " pushes\n",
4088          num_pushes);
4089
4090     /* Send PUSHes to attacked peers */
4091     for (i = 0 ; i < num_pushes ; i++)
4092     {
4093       if (att_peers_tail == att_peer_index)
4094         att_peer_index = att_peers_head;
4095       else
4096         att_peer_index = att_peer_index->next;
4097
4098       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4099     }
4100
4101     /* Send PULLs to some peers to learn about additional peers to attack */
4102     tmp_att_peer = att_peer_index;
4103     for (i = 0 ; i < num_pushes * alpha ; i++)
4104     {
4105       if (att_peers_tail == tmp_att_peer)
4106         tmp_att_peer = att_peers_head;
4107       else
4108         att_peer_index = tmp_att_peer->next;
4109
4110       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4111     }
4112   }
4113
4114
4115   else if (2 == mal_type)
4116   { /**
4117      * Try to partition the network
4118      * Send as many pushes to the attacked peer as possible
4119      * That is one push per round as it will ignore more.
4120      */
4121     (void) issue_peer_online_check (sub, &attacked_peer);
4122     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4123                                        &attacked_peer,
4124                                        Peers_ONLINE))
4125       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4126   }
4127
4128
4129   if (3 == mal_type)
4130   { /* Combined attack */
4131
4132     /* Send PUSH to attacked peers */
4133     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4134     {
4135       (void) issue_peer_online_check (sub, &attacked_peer);
4136       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4137                                          &attacked_peer,
4138                                          Peers_ONLINE))
4139       {
4140         LOG (GNUNET_ERROR_TYPE_DEBUG,
4141             "Goding to send push to attacked peer (%s)\n",
4142             GNUNET_i2s (&attacked_peer));
4143         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4144       }
4145     }
4146     (void) issue_peer_online_check (sub, &attacked_peer);
4147
4148     /* The maximum of pushes we're going to send this round */
4149     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4150                                          num_attacked_peers),
4151                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4152
4153     LOG (GNUNET_ERROR_TYPE_DEBUG,
4154          "Going to send %" PRIu32 " pushes\n",
4155          num_pushes);
4156
4157     for (i = 0; i < num_pushes; i++)
4158     {
4159       if (att_peers_tail == att_peer_index)
4160         att_peer_index = att_peers_head;
4161       else
4162         att_peer_index = att_peer_index->next;
4163
4164       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4165     }
4166
4167     /* Send PULLs to some peers to learn about additional peers to attack */
4168     tmp_att_peer = att_peer_index;
4169     for (i = 0; i < num_pushes * alpha; i++)
4170     {
4171       if (att_peers_tail == tmp_att_peer)
4172         tmp_att_peer = att_peers_head;
4173       else
4174         att_peer_index = tmp_att_peer->next;
4175
4176       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4177     }
4178   }
4179
4180   /* Schedule next round */
4181   time_next_round = compute_rand_delay (sub->round_interval, 2);
4182
4183   GNUNET_assert (NULL == sub->do_round_task);
4184   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4185                                                     &do_mal_round, sub);
4186   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4187 }
4188 #endif /* ENABLE_MALICIOUS */
4189
4190
4191 /**
4192  * Send out PUSHes and PULLs, possibly update #view, samplers.
4193  *
4194  * This is executed regylary.
4195  *
4196  * @param cls Closure - Sub
4197  */
4198 static void
4199 do_round (void *cls)
4200 {
4201   unsigned int i;
4202   const struct GNUNET_PeerIdentity *view_array;
4203   unsigned int *permut;
4204   unsigned int a_peers; /* Number of peers we send pushes to */
4205   unsigned int b_peers; /* Number of peers we send pull requests to */
4206   uint32_t first_border;
4207   uint32_t second_border;
4208   struct GNUNET_PeerIdentity peer;
4209   struct GNUNET_PeerIdentity *update_peer;
4210   struct Sub *sub = cls;
4211
4212   sub->num_rounds++;
4213   LOG (GNUNET_ERROR_TYPE_DEBUG,
4214        "Going to execute next round.\n");
4215   if (sub == msub)
4216   {
4217     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4218   }
4219   sub->do_round_task = NULL;
4220 #ifdef TO_FILE_FULL
4221   to_file (sub->file_name_view_log,
4222            "___ new round ___");
4223 #endif /* TO_FILE_FULL */
4224   view_array = View_get_as_array (sub->view);
4225   for (i = 0; i < View_size (sub->view); i++)
4226   {
4227     LOG (GNUNET_ERROR_TYPE_DEBUG,
4228          "\t%s\n", GNUNET_i2s (&view_array[i]));
4229 #ifdef TO_FILE_FULL
4230     to_file (sub->file_name_view_log,
4231              "=%s\t(do round)",
4232              GNUNET_i2s_full (&view_array[i]));
4233 #endif /* TO_FILE_FULL */
4234   }
4235
4236
4237   /* Send pushes and pull requests */
4238   if (0 < View_size (sub->view))
4239   {
4240     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4241                                            View_size (sub->view));
4242
4243     /* Send PUSHes */
4244     a_peers = ceil (alpha * View_size (sub->view));
4245
4246     LOG (GNUNET_ERROR_TYPE_DEBUG,
4247          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4248          a_peers, alpha, View_size (sub->view));
4249     for (i = 0; i < a_peers; i++)
4250     {
4251       peer = view_array[permut[i]];
4252       // FIXME if this fails schedule/loop this for later
4253       send_push (get_peer_ctx (sub->peer_map, &peer));
4254     }
4255
4256     /* Send PULL requests */
4257     b_peers = ceil (beta * View_size (sub->view));
4258     first_border = a_peers;
4259     second_border = a_peers + b_peers;
4260     if (second_border > View_size (sub->view))
4261     {
4262       first_border = View_size (sub->view) - b_peers;
4263       second_border = View_size (sub->view);
4264     }
4265     LOG (GNUNET_ERROR_TYPE_DEBUG,
4266         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4267         b_peers, beta, View_size (sub->view));
4268     for (i = first_border; i < second_border; i++)
4269     {
4270       peer = view_array[permut[i]];
4271       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4272                                          &peer,
4273                                          Peers_PULL_REPLY_PENDING))
4274       { // FIXME if this fails schedule/loop this for later
4275         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4276       }
4277     }
4278
4279     GNUNET_free (permut);
4280     permut = NULL;
4281   }
4282
4283
4284   /* Update view */
4285   /* TODO see how many peers are in push-/pull- list! */
4286
4287   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4288       (0 < CustomPeerMap_size (sub->push_map)) &&
4289       (0 < CustomPeerMap_size (sub->pull_map)))
4290   { /* If conditions for update are fulfilled, update */
4291     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4292
4293     uint32_t final_size;
4294     uint32_t peers_to_clean_size;
4295     struct GNUNET_PeerIdentity *peers_to_clean;
4296
4297     peers_to_clean = NULL;
4298     peers_to_clean_size = 0;
4299     GNUNET_array_grow (peers_to_clean,
4300                        peers_to_clean_size,
4301                        View_size (sub->view));
4302     GNUNET_memcpy (peers_to_clean,
4303             view_array,
4304             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4305
4306     /* Seems like recreating is the easiest way of emptying the peermap */
4307     View_clear (sub->view);
4308 #ifdef TO_FILE_FULL
4309     to_file (sub->file_name_view_log,
4310              "--- emptied ---");
4311 #endif /* TO_FILE_FULL */
4312
4313     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4314                                 CustomPeerMap_size (sub->push_map));
4315     second_border = first_border +
4316                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4317                                 CustomPeerMap_size (sub->pull_map));
4318     final_size    = second_border +
4319       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4320     LOG (GNUNET_ERROR_TYPE_DEBUG,
4321         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4322         first_border,
4323         second_border,
4324         final_size);
4325
4326     /* Update view with peers received through PUSHes */
4327     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4328                                            CustomPeerMap_size (sub->push_map));
4329     for (i = 0; i < first_border; i++)
4330     {
4331       int inserted;
4332       inserted = insert_in_view (sub,
4333                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4334                                                                   permut[i]));
4335       if (GNUNET_OK == inserted)
4336       {
4337         clients_notify_stream_peer (sub,
4338             1,
4339             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4340       }
4341 #ifdef TO_FILE_FULL
4342       to_file (sub->file_name_view_log,
4343                "+%s\t(push list)",
4344                GNUNET_i2s_full (&view_array[i]));
4345 #endif /* TO_FILE_FULL */
4346       // TODO change the peer_flags accordingly
4347     }
4348     GNUNET_free (permut);
4349     permut = NULL;
4350
4351     /* Update view with peers received through PULLs */
4352     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4353                                            CustomPeerMap_size (sub->pull_map));
4354     for (i = first_border; i < second_border; i++)
4355     {
4356       int inserted;
4357       inserted = insert_in_view (sub,
4358           CustomPeerMap_get_peer_by_index (sub->pull_map,
4359                                            permut[i - first_border]));
4360       if (GNUNET_OK == inserted)
4361       {
4362         clients_notify_stream_peer (sub,
4363             1,
4364             CustomPeerMap_get_peer_by_index (sub->pull_map,
4365                                              permut[i - first_border]));
4366       }
4367 #ifdef TO_FILE_FULL
4368       to_file (sub->file_name_view_log,
4369                "+%s\t(pull list)",
4370                GNUNET_i2s_full (&view_array[i]));
4371 #endif /* TO_FILE_FULL */
4372       // TODO change the peer_flags accordingly
4373     }
4374     GNUNET_free (permut);
4375     permut = NULL;
4376
4377     /* Update view with peers from history */
4378     RPS_sampler_get_n_rand_peers (sub->sampler,
4379                                   final_size - second_border,
4380                                   hist_update,
4381                                   sub);
4382     // TODO change the peer_flags accordingly
4383
4384     for (i = 0; i < View_size (sub->view); i++)
4385       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4386
4387     /* Clean peers that were removed from the view */
4388     for (i = 0; i < peers_to_clean_size; i++)
4389     {
4390 #ifdef TO_FILE_FULL
4391       to_file (sub->file_name_view_log,
4392                "-%s",
4393                GNUNET_i2s_full (&peers_to_clean[i]));
4394 #endif /* TO_FILE_FULL */
4395       clean_peer (sub, &peers_to_clean[i]);
4396     }
4397
4398     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4399     clients_notify_view_update (sub);
4400   } else {
4401     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4402     if (sub == msub)
4403     {
4404       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4405       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4406           !(0 >= CustomPeerMap_size (sub->pull_map)))
4407         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4408       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4409           (0 >= CustomPeerMap_size (sub->pull_map)))
4410         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4411       if (0 >= CustomPeerMap_size (sub->push_map) &&
4412           !(0 >= CustomPeerMap_size (sub->pull_map)))
4413         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4414       if (0 >= CustomPeerMap_size (sub->push_map) &&
4415           (0 >= CustomPeerMap_size (sub->pull_map)))
4416         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4417       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4418           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4419           0 >= CustomPeerMap_size (sub->push_map))
4420         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4421     }
4422   }
4423   // TODO independent of that also get some peers from CADET_get_peers()?
4424   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4425   {
4426     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4427   }
4428   else
4429   {
4430     LOG (GNUNET_ERROR_TYPE_WARNING,
4431          "Push map size too big for histogram (%u, %u)\n",
4432          CustomPeerMap_size (sub->push_map),
4433          HISTOGRAM_FILE_SLOTS);
4434   }
4435   // FIXME check bounds of histogram
4436   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4437                    (alpha * sub->view_size_est_need)) +
4438                           (HISTOGRAM_FILE_SLOTS/2)]++;
4439   if (sub == msub)
4440   {
4441     GNUNET_STATISTICS_set (stats,
4442         "# peers in push map at end of round",
4443         CustomPeerMap_size (sub->push_map),
4444         GNUNET_NO);
4445     GNUNET_STATISTICS_set (stats,
4446         "# peers in pull map at end of round",
4447         CustomPeerMap_size (sub->pull_map),
4448         GNUNET_NO);
4449     GNUNET_STATISTICS_set (stats,
4450         "# peers in view at end of round",
4451         View_size (sub->view),
4452         GNUNET_NO);
4453     GNUNET_STATISTICS_set (stats,
4454         "# expected pushes",
4455         alpha * sub->view_size_est_need,
4456         GNUNET_NO);
4457     GNUNET_STATISTICS_set (stats,
4458         "delta expected - received pushes",
4459         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4460         GNUNET_NO);
4461   }
4462
4463   LOG (GNUNET_ERROR_TYPE_DEBUG,
4464        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4465        CustomPeerMap_size (sub->push_map),
4466        CustomPeerMap_size (sub->pull_map),
4467        alpha,
4468        View_size (sub->view),
4469        alpha * View_size (sub->view));
4470
4471   /* Update samplers */
4472   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4473   {
4474     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4475     LOG (GNUNET_ERROR_TYPE_DEBUG,
4476          "Updating with peer %s from push list\n",
4477          GNUNET_i2s (update_peer));
4478     insert_in_sampler (sub, update_peer);
4479     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4480   }
4481
4482   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4483   {
4484     LOG (GNUNET_ERROR_TYPE_DEBUG,
4485          "Updating with peer %s from pull list\n",
4486          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4487     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4488     /* This cleans only if it is not in the view */
4489     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4490   }
4491
4492
4493   /* Empty push/pull lists */
4494   CustomPeerMap_clear (sub->push_map);
4495   CustomPeerMap_clear (sub->pull_map);
4496
4497   if (sub == msub)
4498   {
4499     GNUNET_STATISTICS_set (stats,
4500                            "view size",
4501                            View_size(sub->view),
4502                            GNUNET_NO);
4503   }
4504
4505   struct GNUNET_TIME_Relative time_next_round;
4506
4507   time_next_round = compute_rand_delay (sub->round_interval, 2);
4508
4509   /* Schedule next round */
4510   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4511                                                      &do_round, sub);
4512   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4513 }
4514
4515
4516 /**
4517  * This is called from GNUNET_CADET_get_peers().
4518  *
4519  * It is called on every peer(ID) that cadet somehow has contact with.
4520  * We use those to initialise the sampler.
4521  *
4522  * implements #GNUNET_CADET_PeersCB
4523  *
4524  * @param cls Closure - Sub
4525  * @param peer Peer, or NULL on "EOF".
4526  * @param tunnel Do we have a tunnel towards this peer?
4527  * @param n_paths Number of known paths towards this peer.
4528  * @param best_path How long is the best path?
4529  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4530  */
4531 void
4532 init_peer_cb (void *cls,
4533               const struct GNUNET_PeerIdentity *peer,
4534               int tunnel, /* "Do we have a tunnel towards this peer?" */
4535               unsigned int n_paths, /* "Number of known paths towards this peer" */
4536               unsigned int best_path) /* "How long is the best path?
4537                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4538 {
4539   struct Sub *sub = cls;
4540   (void) tunnel;
4541   (void) n_paths;
4542   (void) best_path;
4543
4544   if (NULL != peer)
4545   {
4546     LOG (GNUNET_ERROR_TYPE_DEBUG,
4547          "Got peer_id %s from cadet\n",
4548          GNUNET_i2s (peer));
4549     got_peer (sub, peer);
4550   }
4551 }
4552
4553
4554 /**
4555  * @brief Iterator function over stored, valid peers.
4556  *
4557  * We initialise the sampler with those.
4558  *
4559  * @param cls Closure - Sub
4560  * @param peer the peer id
4561  * @return #GNUNET_YES if we should continue to
4562  *         iterate,
4563  *         #GNUNET_NO if not.
4564  */
4565 static int
4566 valid_peers_iterator (void *cls,
4567                       const struct GNUNET_PeerIdentity *peer)
4568 {
4569   struct Sub *sub = cls;
4570
4571   if (NULL != peer)
4572   {
4573     LOG (GNUNET_ERROR_TYPE_DEBUG,
4574          "Got stored, valid peer %s\n",
4575          GNUNET_i2s (peer));
4576     got_peer (sub, peer);
4577   }
4578   return GNUNET_YES;
4579 }
4580
4581
4582 /**
4583  * Iterator over peers from peerinfo.
4584  *
4585  * @param cls Closure - Sub
4586  * @param peer id of the peer, NULL for last call
4587  * @param hello hello message for the peer (can be NULL)
4588  * @param error message
4589  */
4590 void
4591 process_peerinfo_peers (void *cls,
4592                         const struct GNUNET_PeerIdentity *peer,
4593                         const struct GNUNET_HELLO_Message *hello,
4594                         const char *err_msg)
4595 {
4596   struct Sub *sub = cls;
4597   (void) hello;
4598   (void) err_msg;
4599
4600   if (NULL != peer)
4601   {
4602     LOG (GNUNET_ERROR_TYPE_DEBUG,
4603          "Got peer_id %s from peerinfo\n",
4604          GNUNET_i2s (peer));
4605     got_peer (sub, peer);
4606   }
4607 }
4608
4609
4610 /**
4611  * Task run during shutdown.
4612  *
4613  * @param cls Closure - unused
4614  */
4615 static void
4616 shutdown_task (void *cls)
4617 {
4618   (void) cls;
4619   struct ClientContext *client_ctx;
4620
4621   LOG (GNUNET_ERROR_TYPE_DEBUG,
4622        "RPS service is going down\n");
4623
4624   /* Clean all clients */
4625   for (client_ctx = cli_ctx_head;
4626        NULL != cli_ctx_head;
4627        client_ctx = cli_ctx_head)
4628   {
4629     destroy_cli_ctx (client_ctx);
4630   }
4631   if (NULL != msub)
4632   {
4633     destroy_sub (msub);
4634     msub = NULL;
4635   }
4636
4637   /* Disconnect from other services */
4638   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4639   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4640   peerinfo_handle = NULL;
4641   GNUNET_NSE_disconnect (nse);
4642   if (NULL != map_single_hop)
4643   {
4644     /* core_init was called - core was initialised */
4645     /* disconnect first, so no callback tries to access missing peermap */
4646     GNUNET_CORE_disconnect (core_handle);
4647     core_handle = NULL;
4648     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4649     map_single_hop = NULL;
4650   }
4651
4652   if (NULL != stats)
4653   {
4654     GNUNET_STATISTICS_destroy (stats,
4655                                GNUNET_NO);
4656     stats = NULL;
4657   }
4658   GNUNET_CADET_disconnect (cadet_handle);
4659   cadet_handle = NULL;
4660 #if ENABLE_MALICIOUS
4661   struct AttackedPeer *tmp_att_peer;
4662   GNUNET_array_grow (mal_peers,
4663                      num_mal_peers,
4664                      0);
4665   if (NULL != mal_peer_set)
4666     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4667   if (NULL != att_peer_set)
4668     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4669   while (NULL != att_peers_head)
4670   {
4671     tmp_att_peer = att_peers_head;
4672     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4673                                  att_peers_tail,
4674                                  tmp_att_peer);
4675     GNUNET_free (tmp_att_peer);
4676   }
4677 #endif /* ENABLE_MALICIOUS */
4678   close_all_files();
4679 }
4680
4681
4682 /**
4683  * Handle client connecting to the service.
4684  *
4685  * @param cls unused
4686  * @param client the new client
4687  * @param mq the message queue of @a client
4688  * @return @a client
4689  */
4690 static void *
4691 client_connect_cb (void *cls,
4692                    struct GNUNET_SERVICE_Client *client,
4693                    struct GNUNET_MQ_Handle *mq)
4694 {
4695   struct ClientContext *cli_ctx;
4696   (void) cls;
4697
4698   LOG (GNUNET_ERROR_TYPE_DEBUG,
4699        "Client connected\n");
4700   if (NULL == client)
4701     return client; /* Server was destroyed before a client connected. Shutting down */
4702   cli_ctx = GNUNET_new (struct ClientContext);
4703   cli_ctx->mq = mq;
4704   cli_ctx->view_updates_left = -1;
4705   cli_ctx->stream_update = GNUNET_NO;
4706   cli_ctx->client = client;
4707   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4708                                cli_ctx_tail,
4709                                cli_ctx);
4710   return cli_ctx;
4711 }
4712
4713 /**
4714  * Callback called when a client disconnected from the service
4715  *
4716  * @param cls closure for the service
4717  * @param c the client that disconnected
4718  * @param internal_cls should be equal to @a c
4719  */
4720 static void
4721 client_disconnect_cb (void *cls,
4722                       struct GNUNET_SERVICE_Client *client,
4723                       void *internal_cls)
4724 {
4725   struct ClientContext *cli_ctx = internal_cls;
4726
4727   (void) cls;
4728   GNUNET_assert (client == cli_ctx->client);
4729   if (NULL == client)
4730   {/* shutdown task - destroy all clients */
4731     while (NULL != cli_ctx_head)
4732       destroy_cli_ctx (cli_ctx_head);
4733   }
4734   else
4735   { /* destroy this client */
4736     LOG (GNUNET_ERROR_TYPE_DEBUG,
4737         "Client disconnected. Destroy its context.\n");
4738     destroy_cli_ctx (cli_ctx);
4739   }
4740 }
4741
4742
4743 /**
4744  * Handle random peer sampling clients.
4745  *
4746  * @param cls closure
4747  * @param c configuration to use
4748  * @param service the initialized service
4749  */
4750 static void
4751 run (void *cls,
4752      const struct GNUNET_CONFIGURATION_Handle *c,
4753      struct GNUNET_SERVICE_Handle *service)
4754 {
4755   struct GNUNET_TIME_Relative round_interval;
4756   long long unsigned int sampler_size;
4757   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4758   struct GNUNET_HashCode hash;
4759
4760   (void) cls;
4761   (void) service;
4762
4763   GNUNET_log_setup ("rps",
4764                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4765                     NULL);
4766   cfg = c;
4767   /* Get own ID */
4768   GNUNET_CRYPTO_get_peer_identity (cfg,
4769                                    &own_identity); // TODO check return value
4770   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4771               "STARTING SERVICE (rps) for peer [%s]\n",
4772               GNUNET_i2s (&own_identity));
4773 #if ENABLE_MALICIOUS
4774   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4775               "Malicious execution compiled in.\n");
4776 #endif /* ENABLE_MALICIOUS */
4777
4778   /* Get time interval from the configuration */
4779   if (GNUNET_OK !=
4780       GNUNET_CONFIGURATION_get_value_time (cfg,
4781                                            "RPS",
4782                                            "ROUNDINTERVAL",
4783                                            &round_interval))
4784   {
4785     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4786                                "RPS", "ROUNDINTERVAL");
4787     GNUNET_SCHEDULER_shutdown ();
4788     return;
4789   }
4790
4791   /* Get initial size of sampler/view from the configuration */
4792   if (GNUNET_OK !=
4793       GNUNET_CONFIGURATION_get_value_number (cfg,
4794                                              "RPS",
4795                                              "MINSIZE",
4796                                              &sampler_size))
4797   {
4798     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4799                                "RPS", "MINSIZE");
4800     GNUNET_SCHEDULER_shutdown ();
4801     return;
4802   }
4803
4804   cadet_handle = GNUNET_CADET_connect (cfg);
4805   GNUNET_assert (NULL != cadet_handle);
4806   core_handle = GNUNET_CORE_connect (cfg,
4807                                      NULL, /* cls */
4808                                      core_init, /* init */
4809                                      core_connects, /* connects */
4810                                      core_disconnects, /* disconnects */
4811                                      NULL); /* handlers */
4812   GNUNET_assert (NULL != core_handle);
4813
4814
4815   alpha = 0.45;
4816   beta  = 0.45;
4817
4818
4819   /* Set up main Sub */
4820   GNUNET_CRYPTO_hash (hash_port_string,
4821                       strlen (hash_port_string),
4822                       &hash);
4823   msub = new_sub (&hash,
4824                  sampler_size, /* Will be overwritten by config */
4825                  round_interval);
4826
4827
4828   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4829
4830   /* connect to NSE */
4831   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4832
4833   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4834   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4835   // TODO send push/pull to each of those peers?
4836   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4837   restore_valid_peers (msub);
4838   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4839
4840   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4841                                                    GNUNET_NO,
4842                                                    process_peerinfo_peers,
4843                                                    msub);
4844
4845   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4846
4847   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4848   stats = GNUNET_STATISTICS_create ("rps", cfg);
4849 }
4850
4851
4852 /**
4853  * Define "main" method using service macro.
4854  */
4855 GNUNET_SERVICE_MAIN
4856 ("rps",
4857  GNUNET_SERVICE_OPTION_NONE,
4858  &run,
4859  &client_connect_cb,
4860  &client_disconnect_cb,
4861  NULL,
4862  GNUNET_MQ_hd_var_size (client_seed,
4863    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4864    struct GNUNET_RPS_CS_SeedMessage,
4865    NULL),
4866 #if ENABLE_MALICIOUS
4867  GNUNET_MQ_hd_var_size (client_act_malicious,
4868    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4869    struct GNUNET_RPS_CS_ActMaliciousMessage,
4870    NULL),
4871 #endif /* ENABLE_MALICIOUS */
4872  GNUNET_MQ_hd_fixed_size (client_view_request,
4873    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4874    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4875    NULL),
4876  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4877    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4878    struct GNUNET_MessageHeader,
4879    NULL),
4880  GNUNET_MQ_hd_fixed_size (client_stream_request,
4881    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4882    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4883    NULL),
4884  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4885    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4886    struct GNUNET_MessageHeader,
4887    NULL),
4888  GNUNET_MQ_hd_fixed_size (client_start_sub,
4889    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4890    struct GNUNET_RPS_CS_SubStartMessage,
4891    NULL),
4892  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4893    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4894    struct GNUNET_RPS_CS_SubStopMessage,
4895    NULL),
4896  GNUNET_MQ_handler_end());
4897
4898 /* end of gnunet-service-rps.c */