RPS service: Record number of unique peers in gossip
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357 #ifdef TO_FILE_FULL
358   /**
359    * Name to log view to
360    */
361   char *file_name_view_log;
362 #endif /* TO_FILE_FULL */
363
364 #ifdef TO_FILE
365 #ifdef TO_FILE_FULL
366   /**
367    * Name to log number of observed peers to
368    */
369   char *file_name_observed_log;
370 #endif /* TO_FILE_FULL */
371
372   /**
373    * @brief Count the observed peers
374    */
375   uint32_t num_observed_peers;
376
377   /**
378    * @brief Multipeermap (ab-) used to count unique peer_ids
379    */
380   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
381 #endif /* TO_FILE */
382
383   /**
384    * List to store peers received through pushes temporary.
385    */
386   struct CustomPeerMap *push_map;
387
388   /**
389    * List to store peers received through pulls temporary.
390    */
391   struct CustomPeerMap *pull_map;
392
393   /**
394    * @brief This is the estimate used as view size.
395    *
396    * It is initialised with the minimum
397    */
398   unsigned int view_size_est_need;
399
400   /**
401    * @brief This is the minimum estimate used as view size.
402    *
403    * It is configured by the user.
404    */
405   unsigned int view_size_est_min;
406
407   /**
408    * @brief The view.
409    */
410   struct View *view;
411
412   /**
413    * Identifier for the main task that runs periodically.
414    */
415   struct GNUNET_SCHEDULER_Task *do_round_task;
416
417   /* === stats === */
418
419   /**
420    * @brief Counts the executed rounds.
421    */
422   uint32_t num_rounds;
423
424   /**
425    * @brief This array accumulates the number of received pushes per round.
426    *
427    * Number at index i represents the number of rounds with i observed pushes.
428    */
429   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
430
431   /**
432    * @brief Histogram of deltas between the expected and actual number of
433    * received pushes.
434    *
435    * As half of the entries are expected to be negative, this is shifted by
436    * #HISTOGRAM_FILE_SLOTS/2.
437    */
438   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
439
440   /**
441    * @brief Number of pull replies with this delay measured in rounds.
442    *
443    * Number at index i represents the number of pull replies with a delay of i
444    * rounds.
445    */
446   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
447 };
448
449
450 /***********************************************************************
451  * Globals
452 ***********************************************************************/
453
454 /**
455  * Our configuration.
456  */
457 static const struct GNUNET_CONFIGURATION_Handle *cfg;
458
459 /**
460  * Handle to the statistics service.
461  */
462 struct GNUNET_STATISTICS_Handle *stats;
463
464 /**
465  * Handler to CADET.
466  */
467 struct GNUNET_CADET_Handle *cadet_handle;
468
469 /**
470  * Handle to CORE
471  */
472 struct GNUNET_CORE_Handle *core_handle;
473
474 /**
475  * @brief PeerMap to keep track of connected peers.
476  */
477 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478
479 /**
480  * Our own identity.
481  */
482 static struct GNUNET_PeerIdentity own_identity;
483
484 /**
485  * Percentage of total peer number in the view
486  * to send random PUSHes to
487  */
488 static float alpha;
489
490 /**
491  * Percentage of total peer number in the view
492  * to send random PULLs to
493  */
494 static float beta;
495
496 /**
497  * Handler to NSE.
498  */
499 static struct GNUNET_NSE_Handle *nse;
500
501 /**
502  * Handler to PEERINFO.
503  */
504 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
505
506 /**
507  * Handle for cancellation of iteration over peers.
508  */
509 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510
511
512 #if ENABLE_MALICIOUS
513 /**
514  * Type of malicious peer
515  *
516  * 0 Don't act malicious at all - Default
517  * 1 Try to maximise representation
518  * 2 Try to partition the network
519  * 3 Combined attack
520  */
521 static uint32_t mal_type;
522
523 /**
524  * Other malicious peers
525  */
526 static struct GNUNET_PeerIdentity *mal_peers;
527
528 /**
529  * Hashmap of malicious peers used as set.
530  * Used to more efficiently check whether we know that peer.
531  */
532 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
533
534 /**
535  * Number of other malicious peers
536  */
537 static uint32_t num_mal_peers;
538
539
540 /**
541  * If type is 2 this is the DLL of attacked peers
542  */
543 static struct AttackedPeer *att_peers_head;
544 static struct AttackedPeer *att_peers_tail;
545
546 /**
547  * This index is used to point to an attacked peer to
548  * implement the round-robin-ish way to select attacked peers.
549  */
550 static struct AttackedPeer *att_peer_index;
551
552 /**
553  * Hashmap of attacked peers used as set.
554  * Used to more efficiently check whether we know that peer.
555  */
556 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
557
558 /**
559  * Number of attacked peers
560  */
561 static uint32_t num_attacked_peers;
562
563 /**
564  * If type is 1 this is the attacked peer
565  */
566 static struct GNUNET_PeerIdentity attacked_peer;
567
568 /**
569  * The limit of PUSHes we can send in one round.
570  * This is an assumption of the Brahms protocol and either implemented
571  * via proof of work
572  * or
573  * assumend to be the bandwidth limitation.
574  */
575 static uint32_t push_limit = 10000;
576 #endif /* ENABLE_MALICIOUS */
577
578 /**
579  * @brief Main Sub.
580  *
581  * This is run in any case by all peers and connects to all peers without
582  * specifying a shared value.
583  */
584 static struct Sub *msub;
585
586 /**
587  * @brief Maximum number of valid peers to keep.
588  * TODO read from config
589  */
590 static const uint32_t num_valid_peers_max = UINT32_MAX;
591
592 /***********************************************************************
593  * /Globals
594 ***********************************************************************/
595
596
597 static void
598 do_round (void *cls);
599
600 static void
601 do_mal_round (void *cls);
602
603
604 /**
605  * @brief Get the #PeerContext associated with a peer
606  *
607  * @param peer_map The peer map containing the context
608  * @param peer the peer id
609  *
610  * @return the #PeerContext
611  */
612 static struct PeerContext *
613 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
614               const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *ctx;
617   int ret;
618
619   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
620   GNUNET_assert (GNUNET_YES == ret);
621   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
622   GNUNET_assert (NULL != ctx);
623   return ctx;
624 }
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                           "# known peers",
679                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                           GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ( (NULL == peer_ctx->send_channel_ctx) &&
729        (NULL == peer_ctx->recv_channel_ctx) )
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780   (void) value;
781
782   if (0 >= iterator_cls->index)
783   {
784     iterator_cls->peer = peer;
785     return GNUNET_NO;
786   }
787   iterator_cls->index--;
788   return GNUNET_YES;
789 }
790
791
792 /**
793  * @brief Get a random peer from @a peer_map
794  *
795  * @param valid_peers Peer map containing valid peers from which to select a
796  * random one
797  *
798  * @return a random peer
799  */
800 static const struct GNUNET_PeerIdentity *
801 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
802 {
803   struct GetRandPeerIteratorCls *iterator_cls;
804   const struct GNUNET_PeerIdentity *ret;
805
806   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
807   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
808       GNUNET_CONTAINER_multipeermap_size (valid_peers));
809   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
810                                                 get_rand_peer_iterator,
811                                                 iterator_cls);
812   ret = iterator_cls->peer;
813   GNUNET_free (iterator_cls);
814   return ret;
815 }
816
817
818 /**
819  * @brief Add a given @a peer to valid peers.
820  *
821  * If valid peers are already #num_valid_peers_max, delete a peer previously.
822  *
823  * @param peer The peer that is added to the valid peers.
824  * @param valid_peers Peer map of valid peers to which to add the @a peer
825  *
826  * @return #GNUNET_YES if no other peer had to be removed
827  *         #GNUNET_NO  otherwise
828  */
829 static int
830 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
831                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
832 {
833   const struct GNUNET_PeerIdentity *rand_peer;
834   int ret;
835
836   ret = GNUNET_YES;
837   /* Remove random peers until there is space for a new one */
838   while (num_valid_peers_max <=
839          GNUNET_CONTAINER_multipeermap_size (valid_peers))
840   {
841     rand_peer = get_random_peer_from_peermap (valid_peers);
842     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
843     ret = GNUNET_NO;
844   }
845   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
846       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
847   if (valid_peers == msub->valid_peers)
848   {
849     GNUNET_STATISTICS_set (stats,
850                            "# valid peers",
851                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
852                            GNUNET_NO);
853   }
854   return ret;
855 }
856
857 static void
858 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
859
860 /**
861  * @brief Set the peer flag to living and
862  *        call the pending operations on this peer.
863  *
864  * Also adds peer to #valid_peers.
865  *
866  * @param peer_ctx the #PeerContext of the peer to set online
867  */
868 static void
869 set_peer_online (struct PeerContext *peer_ctx)
870 {
871   struct GNUNET_PeerIdentity *peer;
872   unsigned int i;
873
874   peer = &peer_ctx->peer_id;
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876       "Peer %s is online and valid, calling %i pending operations on it\n",
877       GNUNET_i2s (peer),
878       peer_ctx->num_pending_ops);
879
880   if (NULL != peer_ctx->online_check_pending)
881   {
882     LOG (GNUNET_ERROR_TYPE_DEBUG,
883          "Removing pending online check for peer %s\n",
884          GNUNET_i2s (&peer_ctx->peer_id));
885     // TODO wait until cadet sets mq->cancel_impl
886     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
887     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
888     peer_ctx->online_check_pending = NULL;
889   }
890
891   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
892
893   /* Call pending operations */
894   for (i = 0; i < peer_ctx->num_pending_ops; i++)
895   {
896     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
897   }
898   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
899 }
900
901 static void
902 cleanup_destroyed_channel (void *cls,
903                            const struct GNUNET_CADET_Channel *channel);
904
905 /* Declaration of handlers */
906 static void
907 handle_peer_check (void *cls,
908                    const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_push (void *cls,
912                   const struct GNUNET_MessageHeader *msg);
913
914 static void
915 handle_peer_pull_request (void *cls,
916                           const struct GNUNET_MessageHeader *msg);
917
918 static int
919 check_peer_pull_reply (void *cls,
920                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 static void
923 handle_peer_pull_reply (void *cls,
924                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
925
926 /* End declaration of handlers */
927
928 /**
929  * @brief Allocate memory for a new channel context and insert it into DLL
930  *
931  * @param peer_ctx context of the according peer
932  *
933  * @return The channel context
934  */
935 static struct ChannelCtx *
936 add_channel_ctx (struct PeerContext *peer_ctx)
937 {
938   struct ChannelCtx *channel_ctx;
939   channel_ctx = GNUNET_new (struct ChannelCtx);
940   channel_ctx->peer_ctx = peer_ctx;
941   return channel_ctx;
942 }
943
944
945 /**
946  * @brief Free memory and NULL pointers.
947  *
948  * @param channel_ctx The channel context.
949  */
950 static void
951 remove_channel_ctx (struct ChannelCtx *channel_ctx)
952 {
953   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
954
955   if (NULL != channel_ctx->destruction_task)
956   {
957     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
958     channel_ctx->destruction_task = NULL;
959   }
960
961   GNUNET_free (channel_ctx);
962
963   if (NULL == peer_ctx) return;
964   if (channel_ctx == peer_ctx->send_channel_ctx)
965   {
966     peer_ctx->send_channel_ctx = NULL;
967     peer_ctx->mq = NULL;
968   }
969   else if (channel_ctx == peer_ctx->recv_channel_ctx)
970   {
971     peer_ctx->recv_channel_ctx = NULL;
972   }
973 }
974
975
976 /**
977  * @brief Get the channel of a peer. If not existing, create.
978  *
979  * @param peer_ctx Context of the peer of which to get the channel
980  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
981  */
982 struct GNUNET_CADET_Channel *
983 get_channel (struct PeerContext *peer_ctx)
984 {
985   /* There exists a copy-paste-clone in run() */
986   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
987     GNUNET_MQ_hd_fixed_size (peer_check,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_push,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_fixed_size (peer_pull_request,
996                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
997                              struct GNUNET_MessageHeader,
998                              NULL),
999     GNUNET_MQ_hd_var_size (peer_pull_reply,
1000                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1001                            struct GNUNET_RPS_P2P_PullReplyMessage,
1002                            NULL),
1003     GNUNET_MQ_handler_end ()
1004   };
1005
1006
1007   if (NULL == peer_ctx->send_channel_ctx)
1008   {
1009     LOG (GNUNET_ERROR_TYPE_DEBUG,
1010          "Trying to establish channel to peer %s\n",
1011          GNUNET_i2s (&peer_ctx->peer_id));
1012     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1013     peer_ctx->send_channel_ctx->channel =
1014       GNUNET_CADET_channel_create (cadet_handle,
1015                                    peer_ctx->send_channel_ctx, /* context */
1016                                    &peer_ctx->peer_id,
1017                                    &peer_ctx->sub->hash,
1018                                    GNUNET_CADET_OPTION_RELIABLE,
1019                                    NULL, /* WindowSize handler */
1020                                    &cleanup_destroyed_channel, /* Disconnect handler */
1021                                    cadet_handlers);
1022   }
1023   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1024   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1025   return peer_ctx->send_channel_ctx->channel;
1026 }
1027
1028
1029 /**
1030  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1031  *
1032  * If we already have a message queue open to this client,
1033  * simply return it, otherways create one.
1034  *
1035  * @param peer_ctx Context of the peer of whicht to get the mq
1036  * @return the #GNUNET_MQ_Handle
1037  */
1038 static struct GNUNET_MQ_Handle *
1039 get_mq (struct PeerContext *peer_ctx)
1040 {
1041   if (NULL == peer_ctx->mq)
1042   {
1043     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1044   }
1045   return peer_ctx->mq;
1046 }
1047
1048 /**
1049  * @brief Add an envelope to a message passed to mq to list of pending messages
1050  *
1051  * @param peer_ctx Context of the peer for which to insert the envelope
1052  * @param ev envelope to the message
1053  * @param type type of the message to be sent
1054  * @return pointer to pending message
1055  */
1056 static struct PendingMessage *
1057 insert_pending_message (struct PeerContext *peer_ctx,
1058                         struct GNUNET_MQ_Envelope *ev,
1059                         const char *type)
1060 {
1061   struct PendingMessage *pending_msg;
1062
1063   pending_msg = GNUNET_new (struct PendingMessage);
1064   pending_msg->ev = ev;
1065   pending_msg->peer_ctx = peer_ctx;
1066   pending_msg->type = type;
1067   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1068                                peer_ctx->pending_messages_tail,
1069                                pending_msg);
1070   return pending_msg;
1071 }
1072
1073
1074 /**
1075  * @brief Remove a pending message from the respective DLL
1076  *
1077  * @param pending_msg the pending message to remove
1078  * @param cancel whether to cancel the pending message, too
1079  */
1080 static void
1081 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1082 {
1083   struct PeerContext *peer_ctx;
1084   (void) cancel;
1085
1086   peer_ctx = pending_msg->peer_ctx;
1087   GNUNET_assert (NULL != peer_ctx);
1088   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1089                                peer_ctx->pending_messages_tail,
1090                                pending_msg);
1091   // TODO wait for the cadet implementation of message cancellation
1092   //if (GNUNET_YES == cancel)
1093   //{
1094   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1095   //}
1096   GNUNET_free (pending_msg);
1097 }
1098
1099
1100 /**
1101  * @brief This is called in response to the first message we sent as a
1102  * online check.
1103  *
1104  * @param cls #PeerContext of peer with pending online check
1105  */
1106 static void
1107 mq_online_check_successful (void *cls)
1108 {
1109   struct PeerContext *peer_ctx = cls;
1110
1111   if (NULL != peer_ctx->online_check_pending)
1112   {
1113     LOG (GNUNET_ERROR_TYPE_DEBUG,
1114         "Online check for peer %s was successfull\n",
1115         GNUNET_i2s (&peer_ctx->peer_id));
1116     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1117     peer_ctx->online_check_pending = NULL;
1118     set_peer_online (peer_ctx);
1119     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1120   }
1121 }
1122
1123 /**
1124  * Issue a check whether peer is online
1125  *
1126  * @param peer_ctx the context of the peer
1127  */
1128 static void
1129 check_peer_online (struct PeerContext *peer_ctx)
1130 {
1131   LOG (GNUNET_ERROR_TYPE_DEBUG,
1132        "Get informed about peer %s getting online\n",
1133        GNUNET_i2s (&peer_ctx->peer_id));
1134
1135   struct GNUNET_MQ_Handle *mq;
1136   struct GNUNET_MQ_Envelope *ev;
1137
1138   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1139   peer_ctx->online_check_pending =
1140     insert_pending_message (peer_ctx, ev, "Check online");
1141   mq = get_mq (peer_ctx);
1142   GNUNET_MQ_notify_sent (ev,
1143                          mq_online_check_successful,
1144                          peer_ctx);
1145   GNUNET_MQ_send (mq, ev);
1146   if (peer_ctx->sub == msub)
1147   {
1148     GNUNET_STATISTICS_update (stats,
1149                               "# pending online checks",
1150                               1,
1151                               GNUNET_NO);
1152   }
1153 }
1154
1155
1156 /**
1157  * @brief Check whether function of type #PeerOp was already scheduled
1158  *
1159  * The array with pending operations will probably never grow really big, so
1160  * iterating over it should be ok.
1161  *
1162  * @param peer_ctx Context of the peer to check for the operation
1163  * @param peer_op the operation (#PeerOp) on the peer
1164  *
1165  * @return #GNUNET_YES if this operation is scheduled on that peer
1166  *         #GNUNET_NO  otherwise
1167  */
1168 static int
1169 check_operation_scheduled (const struct PeerContext *peer_ctx,
1170                            const PeerOp peer_op)
1171 {
1172   unsigned int i;
1173
1174   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1175     if (peer_op == peer_ctx->pending_ops[i].op)
1176       return GNUNET_YES;
1177   return GNUNET_NO;
1178 }
1179
1180
1181 /**
1182  * @brief Callback for scheduler to destroy a channel
1183  *
1184  * @param cls Context of the channel
1185  */
1186 static void
1187 destroy_channel (struct ChannelCtx *channel_ctx)
1188 {
1189   struct GNUNET_CADET_Channel *channel;
1190
1191   if (NULL != channel_ctx->destruction_task)
1192   {
1193     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1194     channel_ctx->destruction_task = NULL;
1195   }
1196   GNUNET_assert (channel_ctx->channel != NULL);
1197   channel = channel_ctx->channel;
1198   channel_ctx->channel = NULL;
1199   GNUNET_CADET_channel_destroy (channel);
1200   remove_channel_ctx (channel_ctx);
1201 }
1202
1203
1204 /**
1205  * @brief Destroy a cadet channel.
1206  *
1207  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1208  *
1209  * @param cls
1210  */
1211 static void
1212 destroy_channel_cb (void *cls)
1213 {
1214   struct ChannelCtx *channel_ctx = cls;
1215
1216   channel_ctx->destruction_task = NULL;
1217   destroy_channel (channel_ctx);
1218 }
1219
1220
1221 /**
1222  * @brief Schedule the destruction of a channel for immediately afterwards.
1223  *
1224  * In case a channel is to be destroyed from within the callback to the
1225  * destruction of another channel (send channel), we cannot call
1226  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1227  * construction.
1228  *
1229  * @param channel_ctx channel to be destroyed.
1230  */
1231 static void
1232 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1233 {
1234   GNUNET_assert (NULL ==
1235                  channel_ctx->destruction_task);
1236   GNUNET_assert (NULL !=
1237                  channel_ctx->channel);
1238   channel_ctx->destruction_task =
1239     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1240                               channel_ctx);
1241 }
1242
1243
1244 /**
1245  * @brief Remove peer
1246  *
1247  * - Empties the list with pending operations
1248  * - Empties the list with pending messages
1249  * - Cancels potentially existing online check
1250  * - Schedules closing of send and recv channels
1251  * - Removes peer from peer map
1252  *
1253  * @param peer_ctx Context of the peer to be destroyed
1254  * @return #GNUNET_YES if peer was removed
1255  *         #GNUNET_NO  otherwise
1256  */
1257 static int
1258 destroy_peer (struct PeerContext *peer_ctx)
1259 {
1260   GNUNET_assert (NULL != peer_ctx);
1261   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1262   if (GNUNET_NO ==
1263       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1264                                               &peer_ctx->peer_id))
1265   {
1266     return GNUNET_NO;
1267   }
1268   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1269   LOG (GNUNET_ERROR_TYPE_DEBUG,
1270        "Going to remove peer %s\n",
1271        GNUNET_i2s (&peer_ctx->peer_id));
1272   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1273
1274   /* Clear list of pending operations */
1275   // TODO this probably leaks memory
1276   //      ('only' the cls to the function. Not sure what to do with it)
1277   GNUNET_array_grow (peer_ctx->pending_ops,
1278                      peer_ctx->num_pending_ops,
1279                      0);
1280   /* Remove all pending messages */
1281   while (NULL != peer_ctx->pending_messages_head)
1282   {
1283     LOG (GNUNET_ERROR_TYPE_DEBUG,
1284          "Removing unsent %s\n",
1285          peer_ctx->pending_messages_head->type);
1286     /* Cancle pending message, too */
1287     if ( (NULL != peer_ctx->online_check_pending) &&
1288          (0 == memcmp (peer_ctx->pending_messages_head,
1289                      peer_ctx->online_check_pending,
1290                      sizeof (struct PendingMessage))) )
1291       {
1292         peer_ctx->online_check_pending = NULL;
1293         if (peer_ctx->sub == msub)
1294         {
1295           GNUNET_STATISTICS_update (stats,
1296                                     "# pending online checks",
1297                                     -1,
1298                                     GNUNET_NO);
1299         }
1300       }
1301     remove_pending_message (peer_ctx->pending_messages_head,
1302                             GNUNET_YES);
1303   }
1304
1305   /* If we are still waiting for notification whether this peer is online
1306    * cancel the according task */
1307   if (NULL != peer_ctx->online_check_pending)
1308   {
1309     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310                 "Removing pending online check for peer %s\n",
1311                 GNUNET_i2s (&peer_ctx->peer_id));
1312     // TODO wait until cadet sets mq->cancel_impl
1313     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1314     remove_pending_message (peer_ctx->online_check_pending,
1315                             GNUNET_YES);
1316     peer_ctx->online_check_pending = NULL;
1317   }
1318
1319   if (NULL != peer_ctx->send_channel_ctx)
1320   {
1321     /* This is possibly called from within channel destruction */
1322     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1323     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1324     peer_ctx->send_channel_ctx = NULL;
1325     peer_ctx->mq = NULL;
1326   }
1327   if (NULL != peer_ctx->recv_channel_ctx)
1328   {
1329     /* This is possibly called from within channel destruction */
1330     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1331     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1332     peer_ctx->recv_channel_ctx = NULL;
1333   }
1334
1335   if (GNUNET_YES !=
1336       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1337                                                 &peer_ctx->peer_id))
1338   {
1339     LOG (GNUNET_ERROR_TYPE_WARNING,
1340          "removing peer from peer_ctx->sub->peer_map failed\n");
1341   }
1342   if (peer_ctx->sub == msub)
1343   {
1344     GNUNET_STATISTICS_set (stats,
1345                           "# known peers",
1346                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1347                           GNUNET_NO);
1348   }
1349   GNUNET_free (peer_ctx);
1350   return GNUNET_YES;
1351 }
1352
1353
1354 /**
1355  * Iterator over hash map entries. Deletes all contexts of peers.
1356  *
1357  * @param cls closure
1358  * @param key current public key
1359  * @param value value in the hash map
1360  * @return #GNUNET_YES if we should continue to iterate,
1361  *         #GNUNET_NO if not.
1362  */
1363 static int
1364 peermap_clear_iterator (void *cls,
1365                         const struct GNUNET_PeerIdentity *key,
1366                         void *value)
1367 {
1368   struct Sub *sub = cls;
1369   (void) value;
1370
1371   destroy_peer (get_peer_ctx (sub->peer_map, key));
1372   return GNUNET_YES;
1373 }
1374
1375
1376 /**
1377  * @brief This is called once a message is sent.
1378  *
1379  * Removes the pending message
1380  *
1381  * @param cls type of the message that was sent
1382  */
1383 static void
1384 mq_notify_sent_cb (void *cls)
1385 {
1386   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1387   LOG (GNUNET_ERROR_TYPE_DEBUG,
1388       "%s was sent.\n",
1389       pending_msg->type);
1390   if (pending_msg->peer_ctx->sub == msub)
1391   {
1392     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1393       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1395       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1396     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1397       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1398     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1399                       NULL != map_single_hop &&
1400         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1401           &pending_msg->peer_ctx->peer_id))
1402       GNUNET_STATISTICS_update(stats,
1403                                "# pull requests sent (multi-hop peer)",
1404                                1,
1405                                GNUNET_NO);
1406   }
1407   /* Do not cancle message */
1408   remove_pending_message (pending_msg, GNUNET_NO);
1409 }
1410
1411
1412 /**
1413  * @brief Iterator function for #store_valid_peers.
1414  *
1415  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1416  * Writes single peer to disk.
1417  *
1418  * @param cls the file handle to write to.
1419  * @param peer current peer
1420  * @param value unused
1421  *
1422  * @return  #GNUNET_YES if we should continue to
1423  *          iterate,
1424  *          #GNUNET_NO if not.
1425  */
1426 static int
1427 store_peer_presistently_iterator (void *cls,
1428                                   const struct GNUNET_PeerIdentity *peer,
1429                                   void *value)
1430 {
1431   const struct GNUNET_DISK_FileHandle *fh = cls;
1432   char peer_string[128];
1433   int size;
1434   ssize_t ret;
1435   (void) value;
1436
1437   if (NULL == peer)
1438   {
1439     return GNUNET_YES;
1440   }
1441   size = GNUNET_snprintf (peer_string,
1442                           sizeof (peer_string),
1443                           "%s\n",
1444                           GNUNET_i2s_full (peer));
1445   GNUNET_assert (53 == size);
1446   ret = GNUNET_DISK_file_write (fh,
1447                                 peer_string,
1448                                 size);
1449   GNUNET_assert (size == ret);
1450   return GNUNET_YES;
1451 }
1452
1453
1454 /**
1455  * @brief Store the peers currently in #valid_peers to disk.
1456  *
1457  * @param sub Sub for which to store the valid peers
1458  */
1459 static void
1460 store_valid_peers (const struct Sub *sub)
1461 {
1462   struct GNUNET_DISK_FileHandle *fh;
1463   uint32_t number_written_peers;
1464   int ret;
1465
1466   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1467   {
1468     return;
1469   }
1470
1471   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1472   if (GNUNET_SYSERR == ret)
1473   {
1474     LOG (GNUNET_ERROR_TYPE_WARNING,
1475         "Not able to create directory for file `%s'\n",
1476         sub->filename_valid_peers);
1477     GNUNET_break (0);
1478   }
1479   else if (GNUNET_NO == ret)
1480   {
1481     LOG (GNUNET_ERROR_TYPE_WARNING,
1482         "Directory for file `%s' exists but is not writable for us\n",
1483         sub->filename_valid_peers);
1484     GNUNET_break (0);
1485   }
1486   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1487                               GNUNET_DISK_OPEN_WRITE |
1488                                   GNUNET_DISK_OPEN_CREATE,
1489                               GNUNET_DISK_PERM_USER_READ |
1490                                   GNUNET_DISK_PERM_USER_WRITE);
1491   if (NULL == fh)
1492   {
1493     LOG (GNUNET_ERROR_TYPE_WARNING,
1494         "Not able to write valid peers to file `%s'\n",
1495         sub->filename_valid_peers);
1496     return;
1497   }
1498   LOG (GNUNET_ERROR_TYPE_DEBUG,
1499       "Writing %u valid peers to disk\n",
1500       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1501   number_written_peers =
1502     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1503                                            store_peer_presistently_iterator,
1504                                            fh);
1505   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1506   GNUNET_assert (number_written_peers ==
1507       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1508 }
1509
1510
1511 /**
1512  * @brief Convert string representation of peer id to peer id.
1513  *
1514  * Counterpart to #GNUNET_i2s_full.
1515  *
1516  * @param string_repr The string representation of the peer id
1517  *
1518  * @return The peer id
1519  */
1520 static const struct GNUNET_PeerIdentity *
1521 s2i_full (const char *string_repr)
1522 {
1523   struct GNUNET_PeerIdentity *peer;
1524   size_t len;
1525   int ret;
1526
1527   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1528   len = strlen (string_repr);
1529   if (52 > len)
1530   {
1531     LOG (GNUNET_ERROR_TYPE_WARNING,
1532         "Not able to convert string representation of PeerID to PeerID\n"
1533         "Sting representation: %s (len %lu) - too short\n",
1534         string_repr,
1535         len);
1536     GNUNET_break (0);
1537   }
1538   else if (52 < len)
1539   {
1540     len = 52;
1541   }
1542   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1543                                                     len,
1544                                                     &peer->public_key);
1545   if (GNUNET_OK != ret)
1546   {
1547     LOG (GNUNET_ERROR_TYPE_WARNING,
1548         "Not able to convert string representation of PeerID to PeerID\n"
1549         "Sting representation: %s\n",
1550         string_repr);
1551     GNUNET_break (0);
1552   }
1553   return peer;
1554 }
1555
1556
1557 /**
1558  * @brief Restore the peers on disk to #valid_peers.
1559  *
1560  * @param sub Sub for which to restore the valid peers
1561  */
1562 static void
1563 restore_valid_peers (const struct Sub *sub)
1564 {
1565   off_t file_size;
1566   uint32_t num_peers;
1567   struct GNUNET_DISK_FileHandle *fh;
1568   char *buf;
1569   ssize_t size_read;
1570   char *iter_buf;
1571   char *str_repr;
1572   const struct GNUNET_PeerIdentity *peer;
1573
1574   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1575   {
1576     return;
1577   }
1578
1579   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1580   {
1581     return;
1582   }
1583   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1584                               GNUNET_DISK_OPEN_READ,
1585                               GNUNET_DISK_PERM_NONE);
1586   GNUNET_assert (NULL != fh);
1587   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1588   num_peers = file_size / 53;
1589   buf = GNUNET_malloc (file_size);
1590   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1591   GNUNET_assert (size_read == file_size);
1592   LOG (GNUNET_ERROR_TYPE_DEBUG,
1593       "Restoring %" PRIu32 " peers from file `%s'\n",
1594       num_peers,
1595       sub->filename_valid_peers);
1596   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1597   {
1598     str_repr = GNUNET_strndup (iter_buf, 53);
1599     peer = s2i_full (str_repr);
1600     GNUNET_free (str_repr);
1601     add_valid_peer (peer, sub->valid_peers);
1602     LOG (GNUNET_ERROR_TYPE_DEBUG,
1603         "Restored valid peer %s from disk\n",
1604         GNUNET_i2s_full (peer));
1605   }
1606   iter_buf = NULL;
1607   GNUNET_free (buf);
1608   LOG (GNUNET_ERROR_TYPE_DEBUG,
1609       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1610       num_peers,
1611       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1612   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1613   {
1614     LOG (GNUNET_ERROR_TYPE_WARNING,
1615         "Number of restored peers does not match file size. Have probably duplicates.\n");
1616   }
1617   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1618   LOG (GNUNET_ERROR_TYPE_DEBUG,
1619       "Restored %u valid peers from disk\n",
1620       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1621 }
1622
1623
1624 /**
1625  * @brief Delete storage of peers that was created with #initialise_peers ()
1626  *
1627  * @param sub Sub for which the storage is deleted
1628  */
1629 static void
1630 peers_terminate (struct Sub *sub)
1631 {
1632   if (GNUNET_SYSERR ==
1633       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1634                                              &peermap_clear_iterator,
1635                                              sub))
1636   {
1637     LOG (GNUNET_ERROR_TYPE_WARNING,
1638         "Iteration destroying peers was aborted.\n");
1639   }
1640   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1641   sub->peer_map = NULL;
1642   store_valid_peers (sub);
1643   GNUNET_free (sub->filename_valid_peers);
1644   sub->filename_valid_peers = NULL;
1645   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1646   sub->valid_peers = NULL;
1647 }
1648
1649
1650 /**
1651  * Iterator over #valid_peers hash map entries.
1652  *
1653  * @param cls Closure that contains iterator function and closure
1654  * @param peer current peer id
1655  * @param value value in the hash map - unused
1656  * @return #GNUNET_YES if we should continue to
1657  *         iterate,
1658  *         #GNUNET_NO if not.
1659  */
1660 static int
1661 valid_peer_iterator (void *cls,
1662                      const struct GNUNET_PeerIdentity *peer,
1663                      void *value)
1664 {
1665   struct PeersIteratorCls *it_cls = cls;
1666   (void) value;
1667
1668   return it_cls->iterator (it_cls->cls, peer);
1669 }
1670
1671
1672 /**
1673  * @brief Get all currently known, valid peer ids.
1674  *
1675  * @param valid_peers Peer map containing the valid peers in question
1676  * @param iterator function to call on each peer id
1677  * @param it_cls extra argument to @a iterator
1678  * @return the number of key value pairs processed,
1679  *         #GNUNET_SYSERR if it aborted iteration
1680  */
1681 static int
1682 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1683                  PeersIterator iterator,
1684                  void *it_cls)
1685 {
1686   struct PeersIteratorCls *cls;
1687   int ret;
1688
1689   cls = GNUNET_new (struct PeersIteratorCls);
1690   cls->iterator = iterator;
1691   cls->cls = it_cls;
1692   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1693                                                valid_peer_iterator,
1694                                                cls);
1695   GNUNET_free (cls);
1696   return ret;
1697 }
1698
1699
1700 /**
1701  * @brief Add peer to known peers.
1702  *
1703  * This function is called on new peer_ids from 'external' sources
1704  * (client seed, cadet get_peers(), ...)
1705  *
1706  * @param sub Sub with the peer map that the @a peer will be added to
1707  * @param peer the new #GNUNET_PeerIdentity
1708  *
1709  * @return #GNUNET_YES if peer was inserted
1710  *         #GNUNET_NO  otherwise
1711  */
1712 static int
1713 insert_peer (struct Sub *sub,
1714              const struct GNUNET_PeerIdentity *peer)
1715 {
1716   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1717   {
1718     return GNUNET_NO; /* We already know this peer - nothing to do */
1719   }
1720   (void) create_peer_ctx (sub, peer);
1721   return GNUNET_YES;
1722 }
1723
1724
1725 /**
1726  * @brief Check whether flags on a peer are set.
1727  *
1728  * @param peer_map Peer map that is expected to contain the @a peer
1729  * @param peer the peer to check the flag of
1730  * @param flags the flags to check
1731  *
1732  * @return #GNUNET_SYSERR if peer is not known
1733  *         #GNUNET_YES    if all given flags are set
1734  *         #GNUNET_NO     otherwise
1735  */
1736 static int
1737 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1738                  const struct GNUNET_PeerIdentity *peer,
1739                  enum Peers_PeerFlags flags)
1740 {
1741   struct PeerContext *peer_ctx;
1742
1743   if (GNUNET_NO == check_peer_known (peer_map, peer))
1744   {
1745     return GNUNET_SYSERR;
1746   }
1747   peer_ctx = get_peer_ctx (peer_map, peer);
1748   return check_peer_flag_set (peer_ctx, flags);
1749 }
1750
1751 /**
1752  * @brief Try connecting to a peer to see whether it is online
1753  *
1754  * If not known yet, insert into known peers
1755  *
1756  * @param sub Sub which would contain the @a peer
1757  * @param peer the peer whose online is to be checked
1758  * @return #GNUNET_YES if the check was issued
1759  *         #GNUNET_NO  otherwise
1760  */
1761 static int
1762 issue_peer_online_check (struct Sub *sub,
1763                          const struct GNUNET_PeerIdentity *peer)
1764 {
1765   struct PeerContext *peer_ctx;
1766
1767   (void) insert_peer (sub, peer); // TODO even needed?
1768   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1769   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1770        (NULL == peer_ctx->online_check_pending) )
1771   {
1772     check_peer_online (peer_ctx);
1773     return GNUNET_YES;
1774   }
1775   return GNUNET_NO;
1776 }
1777
1778
1779 /**
1780  * @brief Check if peer is removable.
1781  *
1782  * Check if
1783  *  - a recv channel exists
1784  *  - there are pending messages
1785  *  - there is no pending pull reply
1786  *
1787  * @param peer_ctx Context of the peer in question
1788  * @return #GNUNET_YES    if peer is removable
1789  *         #GNUNET_NO     if peer is NOT removable
1790  *         #GNUNET_SYSERR if peer is not known
1791  */
1792 static int
1793 check_removable (const struct PeerContext *peer_ctx)
1794 {
1795   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1796                                                            &peer_ctx->peer_id))
1797   {
1798     return GNUNET_SYSERR;
1799   }
1800
1801   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1802        (NULL != peer_ctx->pending_messages_head) ||
1803        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1804   {
1805     return GNUNET_NO;
1806   }
1807   return GNUNET_YES;
1808 }
1809
1810
1811 /**
1812  * @brief Check whether @a peer is actually a peer.
1813  *
1814  * A valid peer is a peer that we know exists eg. we were connected to once.
1815  *
1816  * @param valid_peers Peer map that would contain the @a peer
1817  * @param peer peer in question
1818  *
1819  * @return #GNUNET_YES if peer is valid
1820  *         #GNUNET_NO  if peer is not valid
1821  */
1822 static int
1823 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1824                   const struct GNUNET_PeerIdentity *peer)
1825 {
1826   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1827 }
1828
1829
1830 /**
1831  * @brief Indicate that we want to send to the other peer
1832  *
1833  * This establishes a sending channel
1834  *
1835  * @param peer_ctx Context of the target peer
1836  */
1837 static void
1838 indicate_sending_intention (struct PeerContext *peer_ctx)
1839 {
1840   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1841                                                  &peer_ctx->peer_id));
1842   (void) get_channel (peer_ctx);
1843 }
1844
1845
1846 /**
1847  * @brief Check whether other peer has the intention to send/opened channel
1848  *        towars us
1849  *
1850  * @param peer_ctx Context of the peer in question
1851  *
1852  * @return #GNUNET_YES if peer has the intention to send
1853  *         #GNUNET_NO  otherwise
1854  */
1855 static int
1856 check_peer_send_intention (const struct PeerContext *peer_ctx)
1857 {
1858   if (NULL != peer_ctx->recv_channel_ctx)
1859   {
1860     return GNUNET_YES;
1861   }
1862   return GNUNET_NO;
1863 }
1864
1865
1866 /**
1867  * Handle the channel a peer opens to us.
1868  *
1869  * @param cls The closure - Sub
1870  * @param channel The channel the peer wants to establish
1871  * @param initiator The peer's peer ID
1872  *
1873  * @return initial channel context for the channel
1874  *         (can be NULL -- that's not an error)
1875  */
1876 static void *
1877 handle_inbound_channel (void *cls,
1878                         struct GNUNET_CADET_Channel *channel,
1879                         const struct GNUNET_PeerIdentity *initiator)
1880 {
1881   struct PeerContext *peer_ctx;
1882   struct ChannelCtx *channel_ctx;
1883   struct Sub *sub = cls;
1884
1885   LOG (GNUNET_ERROR_TYPE_DEBUG,
1886       "New channel was established to us (Peer %s).\n",
1887       GNUNET_i2s (initiator));
1888   GNUNET_assert (NULL != channel); /* according to cadet API */
1889   /* Make sure we 'know' about this peer */
1890   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1891   set_peer_online (peer_ctx);
1892   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1893   channel_ctx = add_channel_ctx (peer_ctx);
1894   channel_ctx->channel = channel;
1895   /* We only accept one incoming channel per peer */
1896   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1897                                                              initiator)))
1898   {
1899     LOG (GNUNET_ERROR_TYPE_WARNING,
1900         "Already got one receive channel. Destroying old one.\n");
1901     GNUNET_break_op (0);
1902     destroy_channel (peer_ctx->recv_channel_ctx);
1903     peer_ctx->recv_channel_ctx = channel_ctx;
1904     /* return the channel context */
1905     return channel_ctx;
1906   }
1907   peer_ctx->recv_channel_ctx = channel_ctx;
1908   return channel_ctx;
1909 }
1910
1911
1912 /**
1913  * @brief Check whether a sending channel towards the given peer exists
1914  *
1915  * @param peer_ctx Context of the peer in question
1916  *
1917  * @return #GNUNET_YES if a sending channel towards that peer exists
1918  *         #GNUNET_NO  otherwise
1919  */
1920 static int
1921 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1922 {
1923   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1924                                      &peer_ctx->peer_id))
1925   { /* If no such peer exists, there is no channel */
1926     return GNUNET_NO;
1927   }
1928   if (NULL == peer_ctx->send_channel_ctx)
1929   {
1930     return GNUNET_NO;
1931   }
1932   return GNUNET_YES;
1933 }
1934
1935
1936 /**
1937  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1938  *        intention to another peer
1939  *
1940  * @param peer_ctx Context to the peer
1941  * @return #GNUNET_YES if channel was destroyed
1942  *         #GNUNET_NO  otherwise
1943  */
1944 static int
1945 destroy_sending_channel (struct PeerContext *peer_ctx)
1946 {
1947   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1948                                      &peer_ctx->peer_id))
1949   {
1950     return GNUNET_NO;
1951   }
1952   if (NULL != peer_ctx->send_channel_ctx)
1953   {
1954     destroy_channel (peer_ctx->send_channel_ctx);
1955     (void) check_connected (peer_ctx);
1956     return GNUNET_YES;
1957   }
1958   return GNUNET_NO;
1959 }
1960
1961 /**
1962  * @brief Send a message to another peer.
1963  *
1964  * Keeps track about pending messages so they can be properly removed when the
1965  * peer is destroyed.
1966  *
1967  * @param peer_ctx Context of the peer to which the message is to be sent
1968  * @param ev envelope of the message
1969  * @param type type of the message
1970  */
1971 static void
1972 send_message (struct PeerContext *peer_ctx,
1973               struct GNUNET_MQ_Envelope *ev,
1974               const char *type)
1975 {
1976   struct PendingMessage *pending_msg;
1977   struct GNUNET_MQ_Handle *mq;
1978
1979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1980               "Sending message to %s of type %s\n",
1981               GNUNET_i2s (&peer_ctx->peer_id),
1982               type);
1983   pending_msg = insert_pending_message (peer_ctx, ev, type);
1984   mq = get_mq (peer_ctx);
1985   GNUNET_MQ_notify_sent (ev,
1986                          mq_notify_sent_cb,
1987                          pending_msg);
1988   GNUNET_MQ_send (mq, ev);
1989 }
1990
1991 /**
1992  * @brief Schedule a operation on given peer
1993  *
1994  * Avoids scheduling an operation twice.
1995  *
1996  * @param peer_ctx Context of the peer for which to schedule the operation
1997  * @param peer_op the operation to schedule
1998  * @param cls Closure to @a peer_op
1999  *
2000  * @return #GNUNET_YES if the operation was scheduled
2001  *         #GNUNET_NO  otherwise
2002  */
2003 static int
2004 schedule_operation (struct PeerContext *peer_ctx,
2005                     const PeerOp peer_op,
2006                     void *cls)
2007 {
2008   struct PeerPendingOp pending_op;
2009
2010   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2011                                                  &peer_ctx->peer_id));
2012
2013   //TODO if ONLINE execute immediately
2014
2015   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2016   {
2017     pending_op.op = peer_op;
2018     pending_op.op_cls = cls;
2019     GNUNET_array_append (peer_ctx->pending_ops,
2020                          peer_ctx->num_pending_ops,
2021                          pending_op);
2022     return GNUNET_YES;
2023   }
2024   return GNUNET_NO;
2025 }
2026
2027 /***********************************************************************
2028  * /Old gnunet-service-rps_peers.c
2029 ***********************************************************************/
2030
2031
2032 /***********************************************************************
2033  * Housekeeping with clients
2034 ***********************************************************************/
2035
2036 /**
2037  * Closure used to pass the client and the id to the callback
2038  * that replies to a client's request
2039  */
2040 struct ReplyCls
2041 {
2042   /**
2043    * DLL
2044    */
2045   struct ReplyCls *next;
2046   struct ReplyCls *prev;
2047
2048   /**
2049    * The identifier of the request
2050    */
2051   uint32_t id;
2052
2053   /**
2054    * The handle to the request
2055    */
2056   struct RPS_SamplerRequestHandle *req_handle;
2057
2058   /**
2059    * The client handle to send the reply to
2060    */
2061   struct ClientContext *cli_ctx;
2062 };
2063
2064
2065 /**
2066  * Struct used to store the context of a connected client.
2067  */
2068 struct ClientContext
2069 {
2070   /**
2071    * DLL
2072    */
2073   struct ClientContext *next;
2074   struct ClientContext *prev;
2075
2076   /**
2077    * The message queue to communicate with the client.
2078    */
2079   struct GNUNET_MQ_Handle *mq;
2080
2081   /**
2082    * @brief How many updates this client expects to receive.
2083    */
2084   int64_t view_updates_left;
2085
2086   /**
2087    * @brief Whether this client wants to receive stream updates.
2088    * Either #GNUNET_YES or #GNUNET_NO
2089    */
2090   int8_t stream_update;
2091
2092   /**
2093    * The client handle to send the reply to
2094    */
2095   struct GNUNET_SERVICE_Client *client;
2096
2097   /**
2098    * The #Sub this context belongs to
2099    */
2100   struct Sub *sub;
2101 };
2102
2103 /**
2104  * DLL with all clients currently connected to us
2105  */
2106 struct ClientContext *cli_ctx_head;
2107 struct ClientContext *cli_ctx_tail;
2108
2109 /***********************************************************************
2110  * /Housekeeping with clients
2111 ***********************************************************************/
2112
2113
2114
2115
2116
2117 /***********************************************************************
2118  * Util functions
2119 ***********************************************************************/
2120
2121
2122 /**
2123  * Print peerlist to log.
2124  */
2125 static void
2126 print_peer_list (struct GNUNET_PeerIdentity *list,
2127                  unsigned int len)
2128 {
2129   unsigned int i;
2130
2131   LOG (GNUNET_ERROR_TYPE_DEBUG,
2132        "Printing peer list of length %u at %p:\n",
2133        len,
2134        list);
2135   for (i = 0 ; i < len ; i++)
2136   {
2137     LOG (GNUNET_ERROR_TYPE_DEBUG,
2138          "%u. peer: %s\n",
2139          i, GNUNET_i2s (&list[i]));
2140   }
2141 }
2142
2143
2144 /**
2145  * Remove peer from list.
2146  */
2147 static void
2148 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2149                unsigned int *list_size,
2150                const struct GNUNET_PeerIdentity *peer)
2151 {
2152   unsigned int i;
2153   struct GNUNET_PeerIdentity *tmp;
2154
2155   tmp = *peer_list;
2156
2157   LOG (GNUNET_ERROR_TYPE_DEBUG,
2158        "Removing peer %s from list at %p\n",
2159        GNUNET_i2s (peer),
2160        tmp);
2161
2162   for ( i = 0 ; i < *list_size ; i++ )
2163   {
2164     if (0 == GNUNET_memcmp (&tmp[i], peer))
2165     {
2166       if (i < *list_size -1)
2167       { /* Not at the last entry -- shift peers left */
2168         memmove (&tmp[i], &tmp[i +1],
2169                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2170       }
2171       /* Remove last entry (should be now useless PeerID) */
2172       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2173     }
2174   }
2175   *peer_list = tmp;
2176 }
2177
2178
2179 /**
2180  * Insert PeerID in #view
2181  *
2182  * Called once we know a peer is online.
2183  * Implements #PeerOp
2184  *
2185  * @return GNUNET_OK if peer was actually inserted
2186  *         GNUNET_NO if peer was not inserted
2187  */
2188 static void
2189 insert_in_view_op (void *cls,
2190                    const struct GNUNET_PeerIdentity *peer);
2191
2192 /**
2193  * Insert PeerID in #view
2194  *
2195  * Called once we know a peer is online.
2196  *
2197  * @param sub Sub in with the view to insert in
2198  * @param peer the peer to insert
2199  *
2200  * @return GNUNET_OK if peer was actually inserted
2201  *         GNUNET_NO if peer was not inserted
2202  */
2203 static int
2204 insert_in_view (struct Sub *sub,
2205                 const struct GNUNET_PeerIdentity *peer)
2206 {
2207   struct PeerContext *peer_ctx;
2208   int online;
2209   int ret;
2210
2211   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2212   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2213   if ( (GNUNET_NO == online) ||
2214        (GNUNET_SYSERR == online) ) /* peer is not even known */
2215   {
2216     (void) issue_peer_online_check (sub, peer);
2217     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2218     return GNUNET_NO;
2219   }
2220   /* Open channel towards peer to keep connection open */
2221   indicate_sending_intention (peer_ctx);
2222   ret = View_put (sub->view, peer);
2223   if (peer_ctx->sub == msub)
2224   {
2225     GNUNET_STATISTICS_set (stats,
2226                            "view size",
2227                            View_size (peer_ctx->sub->view),
2228                            GNUNET_NO);
2229   }
2230   return ret;
2231 }
2232
2233
2234 /**
2235  * @brief Send view to client
2236  *
2237  * @param cli_ctx the context of the client
2238  * @param view_array the peerids of the view as array (can be empty)
2239  * @param view_size the size of the view array (can be 0)
2240  */
2241 static void
2242 send_view (const struct ClientContext *cli_ctx,
2243            const struct GNUNET_PeerIdentity *view_array,
2244            uint64_t view_size)
2245 {
2246   struct GNUNET_MQ_Envelope *ev;
2247   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2248   struct Sub *sub;
2249
2250   if (NULL == view_array)
2251   {
2252     if (NULL == cli_ctx->sub) sub = msub;
2253     else sub = cli_ctx->sub;
2254     view_size = View_size (sub->view);
2255     view_array = View_get_as_array (sub->view);
2256   }
2257
2258   ev = GNUNET_MQ_msg_extra (out_msg,
2259                             view_size * sizeof (struct GNUNET_PeerIdentity),
2260                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2261   out_msg->num_peers = htonl (view_size);
2262
2263   GNUNET_memcpy (&out_msg[1],
2264                  view_array,
2265                  view_size * sizeof (struct GNUNET_PeerIdentity));
2266   GNUNET_MQ_send (cli_ctx->mq, ev);
2267 }
2268
2269
2270 /**
2271  * @brief Send peer from biased stream to client.
2272  *
2273  * TODO merge with send_view, parameterise
2274  *
2275  * @param cli_ctx the context of the client
2276  * @param view_array the peerids of the view as array (can be empty)
2277  * @param view_size the size of the view array (can be 0)
2278  */
2279 static void
2280 send_stream_peers (const struct ClientContext *cli_ctx,
2281                    uint64_t num_peers,
2282                    const struct GNUNET_PeerIdentity *peers)
2283 {
2284   struct GNUNET_MQ_Envelope *ev;
2285   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2286
2287   GNUNET_assert (NULL != peers);
2288
2289   ev = GNUNET_MQ_msg_extra (out_msg,
2290                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2291                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2292   out_msg->num_peers = htonl (num_peers);
2293
2294   GNUNET_memcpy (&out_msg[1],
2295                  peers,
2296                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2297   GNUNET_MQ_send (cli_ctx->mq, ev);
2298 }
2299
2300
2301 /**
2302  * @brief sends updates to clients that are interested
2303  *
2304  * @param sub Sub for which to notify clients
2305  */
2306 static void
2307 clients_notify_view_update (const struct Sub *sub)
2308 {
2309   struct ClientContext *cli_ctx_iter;
2310   uint64_t num_peers;
2311   const struct GNUNET_PeerIdentity *view_array;
2312
2313   num_peers = View_size (sub->view);
2314   view_array = View_get_as_array(sub->view);
2315   /* check size of view is small enough */
2316   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2317   {
2318     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2319                 "View is too big to send\n");
2320     return;
2321   }
2322
2323   for (cli_ctx_iter = cli_ctx_head;
2324        NULL != cli_ctx_iter;
2325        cli_ctx_iter = cli_ctx_iter->next)
2326   {
2327     if (1 < cli_ctx_iter->view_updates_left)
2328     {
2329       /* Client wants to receive limited amount of updates */
2330       cli_ctx_iter->view_updates_left -= 1;
2331     } else if (1 == cli_ctx_iter->view_updates_left)
2332     {
2333       /* Last update of view for client */
2334       cli_ctx_iter->view_updates_left = -1;
2335     } else if (0 > cli_ctx_iter->view_updates_left) {
2336       /* Client is not interested in updates */
2337       continue;
2338     }
2339     /* else _updates_left == 0 - infinite amount of updates */
2340
2341     /* send view */
2342     send_view (cli_ctx_iter, view_array, num_peers);
2343   }
2344 }
2345
2346
2347 /**
2348  * @brief sends updates to clients that are interested
2349  *
2350  * @param num_peers Number of peers to send
2351  * @param peers the array of peers to send
2352  */
2353 static void
2354 clients_notify_stream_peer (const struct Sub *sub,
2355                             uint64_t num_peers,
2356                             const struct GNUNET_PeerIdentity *peers)
2357                             // TODO enum StreamPeerSource)
2358 {
2359   struct ClientContext *cli_ctx_iter;
2360
2361   LOG (GNUNET_ERROR_TYPE_DEBUG,
2362       "Got peer (%s) from biased stream - update all clients\n",
2363       GNUNET_i2s (peers));
2364
2365   for (cli_ctx_iter = cli_ctx_head;
2366        NULL != cli_ctx_iter;
2367        cli_ctx_iter = cli_ctx_iter->next)
2368   {
2369     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2370         (sub == cli_ctx_iter->sub || sub == msub))
2371     {
2372       send_stream_peers (cli_ctx_iter, num_peers, peers);
2373     }
2374   }
2375 }
2376
2377
2378 /**
2379  * Put random peer from sampler into the view as history update.
2380  *
2381  * @param ids Array of Peers to insert into view
2382  * @param num_peers Number of peers to insert
2383  * @param cls Closure - The Sub for which this is to be done
2384  */
2385 static void
2386 hist_update (const struct GNUNET_PeerIdentity *ids,
2387              uint32_t num_peers,
2388              void *cls)
2389 {
2390   unsigned int i;
2391   struct Sub *sub = cls;
2392
2393   for (i = 0; i < num_peers; i++)
2394   {
2395     int inserted;
2396     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2397     {
2398       LOG (GNUNET_ERROR_TYPE_WARNING,
2399            "Peer in history update not known!\n");
2400       continue;
2401     }
2402     inserted = insert_in_view (sub, &ids[i]);
2403     if (GNUNET_OK == inserted)
2404     {
2405       clients_notify_stream_peer (sub, 1, &ids[i]);
2406     }
2407 #ifdef TO_FILE_FULL
2408     to_file (sub->file_name_view_log,
2409              "+%s\t(hist)",
2410              GNUNET_i2s_full (ids));
2411 #endif /* TO_FILE_FULL */
2412   }
2413   clients_notify_view_update (sub);
2414 }
2415
2416
2417 /**
2418  * Wrapper around #RPS_sampler_resize()
2419  *
2420  * If we do not have enough sampler elements, double current sampler size
2421  * If we have more than enough sampler elements, halv current sampler size
2422  *
2423  * @param sampler The sampler to resize
2424  * @param new_size New size to which to resize
2425  */
2426 static void
2427 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2428 {
2429   unsigned int sampler_size;
2430
2431   // TODO statistics
2432   // TODO respect the min, max
2433   sampler_size = RPS_sampler_get_size (sampler);
2434   if (sampler_size > new_size * 4)
2435   { /* Shrinking */
2436     RPS_sampler_resize (sampler, sampler_size / 2);
2437   }
2438   else if (sampler_size < new_size)
2439   { /* Growing */
2440     RPS_sampler_resize (sampler, sampler_size * 2);
2441   }
2442   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2443 }
2444
2445
2446 /**
2447  * Add all peers in @a peer_array to @a peer_map used as set.
2448  *
2449  * @param peer_array array containing the peers
2450  * @param num_peers number of peers in @peer_array
2451  * @param peer_map the peermap to use as set
2452  */
2453 static void
2454 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2455                        unsigned int num_peers,
2456                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2457 {
2458   unsigned int i;
2459   if (NULL == peer_map)
2460   {
2461     LOG (GNUNET_ERROR_TYPE_WARNING,
2462          "Trying to add peers to non-existing peermap.\n");
2463     return;
2464   }
2465
2466   for (i = 0; i < num_peers; i++)
2467   {
2468     GNUNET_CONTAINER_multipeermap_put (peer_map,
2469                                        &peer_array[i],
2470                                        NULL,
2471                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2472     if (msub->peer_map == peer_map)
2473     {
2474       GNUNET_STATISTICS_set (stats,
2475                             "# known peers",
2476                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2477                             GNUNET_NO);
2478     }
2479   }
2480 }
2481
2482
2483 /**
2484  * Send a PULL REPLY to @a peer_id
2485  *
2486  * @param peer_ctx Context of the peer to send the reply to
2487  * @param peer_ids the peers to send to @a peer_id
2488  * @param num_peer_ids the number of peers to send to @a peer_id
2489  */
2490 static void
2491 send_pull_reply (struct PeerContext *peer_ctx,
2492                  const struct GNUNET_PeerIdentity *peer_ids,
2493                  unsigned int num_peer_ids)
2494 {
2495   uint32_t send_size;
2496   struct GNUNET_MQ_Envelope *ev;
2497   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2498
2499   /* Compute actual size */
2500   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2501               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2502
2503   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2504     /* Compute number of peers to send
2505      * If too long, simply truncate */
2506     // TODO select random ones via permutation
2507     //      or even better: do good protocol design
2508     send_size =
2509       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2510        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2511        sizeof (struct GNUNET_PeerIdentity);
2512   else
2513     send_size = num_peer_ids;
2514
2515   LOG (GNUNET_ERROR_TYPE_DEBUG,
2516       "Going to send PULL REPLY with %u peers to %s\n",
2517       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2518
2519   ev = GNUNET_MQ_msg_extra (out_msg,
2520                             send_size * sizeof (struct GNUNET_PeerIdentity),
2521                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2522   out_msg->num_peers = htonl (send_size);
2523   GNUNET_memcpy (&out_msg[1], peer_ids,
2524          send_size * sizeof (struct GNUNET_PeerIdentity));
2525
2526   send_message (peer_ctx, ev, "PULL REPLY");
2527   if (peer_ctx->sub == msub)
2528   {
2529     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2530   }
2531   // TODO check with send intention: as send_channel is used/opened we indicate
2532   // a sending intention without intending it.
2533   // -> clean peer afterwards?
2534   // -> use recv_channel?
2535 }
2536
2537
2538 /**
2539  * Insert PeerID in #pull_map
2540  *
2541  * Called once we know a peer is online.
2542  *
2543  * @param cls Closure - Sub with the pull map to insert into
2544  * @param peer Peer to insert
2545  */
2546 static void
2547 insert_in_pull_map (void *cls,
2548                     const struct GNUNET_PeerIdentity *peer)
2549 {
2550   struct Sub *sub = cls;
2551
2552   CustomPeerMap_put (sub->pull_map, peer);
2553 }
2554
2555
2556 /**
2557  * Insert PeerID in #view
2558  *
2559  * Called once we know a peer is online.
2560  * Implements #PeerOp
2561  *
2562  * @param cls Closure - Sub with view to insert peer into
2563  * @param peer the peer to insert
2564  */
2565 static void
2566 insert_in_view_op (void *cls,
2567                    const struct GNUNET_PeerIdentity *peer)
2568 {
2569   struct Sub *sub = cls;
2570   int inserted;
2571
2572   inserted = insert_in_view (sub, peer);
2573   if (GNUNET_OK == inserted)
2574   {
2575     clients_notify_stream_peer (sub, 1, peer);
2576   }
2577 }
2578
2579
2580 /**
2581  * Update sampler with given PeerID.
2582  * Implements #PeerOp
2583  *
2584  * @param cls Closure - Sub containing the sampler to insert into
2585  * @param peer Peer to insert
2586  */
2587 static void
2588 insert_in_sampler (void *cls,
2589                    const struct GNUNET_PeerIdentity *peer)
2590 {
2591   struct Sub *sub = cls;
2592
2593   LOG (GNUNET_ERROR_TYPE_DEBUG,
2594        "Updating samplers with peer %s from insert_in_sampler()\n",
2595        GNUNET_i2s (peer));
2596   RPS_sampler_update (sub->sampler, peer);
2597   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2598   {
2599     /* Make sure we 'know' about this peer */
2600     (void) issue_peer_online_check (sub, peer);
2601     /* Establish a channel towards that peer to indicate we are going to send
2602      * messages to it */
2603     //indicate_sending_intention (peer);
2604   }
2605 #ifdef TO_FILE
2606   sub->num_observed_peers++;
2607   GNUNET_CONTAINER_multipeermap_put
2608     (sub->observed_unique_peers,
2609      peer,
2610      NULL,
2611      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2612   uint32_t num_observed_unique_peers =
2613     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2614   GNUNET_STATISTICS_set (stats,
2615                          "# unique peers in gossip",
2616                          num_observed_unique_peers,
2617                          GNUNET_NO);
2618 #ifdef TO_FILE_FULL
2619   to_file (sub->file_name_observed_log,
2620           "%" PRIu32 " %" PRIu32 " %f\n",
2621           sub->num_observed_peers,
2622           num_observed_unique_peers,
2623           1.0*num_observed_unique_peers/sub->num_observed_peers)
2624 #endif /* TO_FILE_FULL */
2625 #endif /* TO_FILE */
2626 }
2627
2628
2629 /**
2630  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2631  *        If the peer is not known, online check is issued and it is
2632  *        scheduled to be inserted in sampler and view.
2633  *
2634  * "External sources" refer to every source except the gossip.
2635  *
2636  * @param sub Sub for which @a peer was received
2637  * @param peer peer to insert/peer received
2638  */
2639 static void
2640 got_peer (struct Sub *sub,
2641           const struct GNUNET_PeerIdentity *peer)
2642 {
2643   /* If we did not know this peer already, insert it into sampler and view */
2644   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2645   {
2646     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2647                         &insert_in_sampler, sub);
2648     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2649                         &insert_in_view_op, sub);
2650   }
2651   if (sub == msub)
2652   {
2653     GNUNET_STATISTICS_update (stats,
2654                               "# learnd peers",
2655                               1,
2656                               GNUNET_NO);
2657   }
2658 }
2659
2660
2661 /**
2662  * @brief Checks if there is a sending channel and if it is needed
2663  *
2664  * @param peer_ctx Context of the peer to check
2665  * @return GNUNET_YES if sending channel exists and is still needed
2666  *         GNUNET_NO  otherwise
2667  */
2668 static int
2669 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2670 {
2671   /* struct GNUNET_CADET_Channel *channel; */
2672   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2673                                      &peer_ctx->peer_id))
2674   {
2675     return GNUNET_NO;
2676   }
2677   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2678   {
2679     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2680                                     &peer_ctx->peer_id)) ||
2681          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2682                                             &peer_ctx->peer_id)) ||
2683          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2684                                                      &peer_ctx->peer_id)) ||
2685          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2686                                                      &peer_ctx->peer_id)) ||
2687          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2688                                          &peer_ctx->peer_id,
2689                                          Peers_PULL_REPLY_PENDING)))
2690     { /* If we want to keep the connection to peer open */
2691       return GNUNET_YES;
2692     }
2693     return GNUNET_NO;
2694   }
2695   return GNUNET_NO;
2696 }
2697
2698
2699 /**
2700  * @brief remove peer from our knowledge, the view, push and pull maps and
2701  * samplers.
2702  *
2703  * @param sub Sub with the data structures the peer is to be removed from
2704  * @param peer the peer to remove
2705  */
2706 static void
2707 remove_peer (struct Sub *sub,
2708              const struct GNUNET_PeerIdentity *peer)
2709 {
2710   (void) View_remove_peer (sub->view,
2711                            peer);
2712   CustomPeerMap_remove_peer (sub->pull_map,
2713                              peer);
2714   CustomPeerMap_remove_peer (sub->push_map,
2715                              peer);
2716   RPS_sampler_reinitialise_by_value (sub->sampler,
2717                                      peer);
2718   /* We want to destroy the peer now.
2719    * Sometimes, it just seems that it's already been removed from the peer_map,
2720    * so check the peer_map first. */
2721   if (GNUNET_YES == check_peer_known (sub->peer_map,
2722                                       peer))
2723   {
2724     destroy_peer (get_peer_ctx (sub->peer_map,
2725                                 peer));
2726   }
2727 }
2728
2729
2730 /**
2731  * @brief Remove data that is not needed anymore.
2732  *
2733  * If the sending channel is no longer needed it is destroyed.
2734  *
2735  * @param sub Sub in which the current peer is to be cleaned
2736  * @param peer the peer whose data is about to be cleaned
2737  */
2738 static void
2739 clean_peer (struct Sub *sub,
2740             const struct GNUNET_PeerIdentity *peer)
2741 {
2742   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2743                                                                peer)))
2744   {
2745     LOG (GNUNET_ERROR_TYPE_DEBUG,
2746         "Going to remove send channel to peer %s\n",
2747         GNUNET_i2s (peer));
2748     #if ENABLE_MALICIOUS
2749     if (0 != GNUNET_memcmp (&attacked_peer,
2750                                               peer))
2751       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2752                                                     peer));
2753     #else /* ENABLE_MALICIOUS */
2754     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2755                                                   peer));
2756     #endif /* ENABLE_MALICIOUS */
2757   }
2758
2759   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2760                                                            peer))
2761   {
2762     /* Peer was already removed by callback on destroyed channel */
2763     LOG (GNUNET_ERROR_TYPE_WARNING,
2764         "Peer was removed from our knowledge during cleanup\n");
2765     return;
2766   }
2767
2768   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2769                                                               peer))) &&
2770        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2771        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2772        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2773        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2774        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2775   { /* We can safely remove this peer */
2776     LOG (GNUNET_ERROR_TYPE_DEBUG,
2777         "Going to remove peer %s\n",
2778         GNUNET_i2s (peer));
2779     remove_peer (sub, peer);
2780     return;
2781   }
2782 }
2783
2784
2785 /**
2786  * @brief This is called when a channel is destroyed.
2787  *
2788  * Removes peer completely from our knowledge if the send_channel was destroyed
2789  * Otherwise simply delete the recv_channel
2790  * Also check if the knowledge about this peer is still needed.
2791  * If not, remove this peer from our knowledge.
2792  *
2793  * @param cls The closure - Context to the channel
2794  * @param channel The channel being closed
2795  */
2796 static void
2797 cleanup_destroyed_channel (void *cls,
2798                            const struct GNUNET_CADET_Channel *channel)
2799 {
2800   struct ChannelCtx *channel_ctx = cls;
2801   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2802   (void) channel;
2803
2804   channel_ctx->channel = NULL;
2805   remove_channel_ctx (channel_ctx);
2806   if (NULL != peer_ctx &&
2807       peer_ctx->send_channel_ctx == channel_ctx &&
2808       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2809   {
2810     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2811   }
2812 }
2813
2814 /***********************************************************************
2815  * /Util functions
2816 ***********************************************************************/
2817
2818
2819
2820 /***********************************************************************
2821  * Sub
2822 ***********************************************************************/
2823
2824 /**
2825  * @brief Create a new Sub
2826  *
2827  * @param hash Hash of value shared among rps instances on other hosts that
2828  *        defines a subgroup to sample from.
2829  * @param sampler_size Size of the sampler
2830  * @param round_interval Interval (in average) between two rounds
2831  *
2832  * @return Sub
2833  */
2834 struct Sub *
2835 new_sub (const struct GNUNET_HashCode *hash,
2836          uint32_t sampler_size,
2837          struct GNUNET_TIME_Relative round_interval)
2838 {
2839   struct Sub *sub;
2840
2841   sub = GNUNET_new (struct Sub);
2842
2843   /* With the hash generated from the secret value this service only connects
2844    * to rps instances that share the value */
2845   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2846     GNUNET_MQ_hd_fixed_size (peer_check,
2847                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2848                              struct GNUNET_MessageHeader,
2849                              NULL),
2850     GNUNET_MQ_hd_fixed_size (peer_push,
2851                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2852                              struct GNUNET_MessageHeader,
2853                              NULL),
2854     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2855                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2856                              struct GNUNET_MessageHeader,
2857                              NULL),
2858     GNUNET_MQ_hd_var_size (peer_pull_reply,
2859                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2860                            struct GNUNET_RPS_P2P_PullReplyMessage,
2861                            NULL),
2862     GNUNET_MQ_handler_end ()
2863   };
2864   sub->hash = *hash;
2865   sub->cadet_port =
2866     GNUNET_CADET_open_port (cadet_handle,
2867                             &sub->hash,
2868                             &handle_inbound_channel, /* Connect handler */
2869                             sub, /* cls */
2870                             NULL, /* WindowSize handler */
2871                             &cleanup_destroyed_channel, /* Disconnect handler */
2872                             cadet_handlers);
2873   if (NULL == sub->cadet_port)
2874   {
2875     LOG (GNUNET_ERROR_TYPE_ERROR,
2876         "Cadet port `%s' is already in use.\n",
2877         GNUNET_APPLICATION_PORT_RPS);
2878     GNUNET_assert (0);
2879   }
2880
2881   /* Set up general data structure to keep track about peers */
2882   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2883   if (GNUNET_OK !=
2884       GNUNET_CONFIGURATION_get_value_filename (cfg,
2885                                                "rps",
2886                                                "FILENAME_VALID_PEERS",
2887                                                &sub->filename_valid_peers))
2888   {
2889     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2890                                "rps",
2891                                "FILENAME_VALID_PEERS");
2892   }
2893   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2894   {
2895     char *tmp_filename_valid_peers;
2896     char str_hash[105];
2897
2898     GNUNET_snprintf (str_hash,
2899                      sizeof (str_hash),
2900                      GNUNET_h2s_full (hash));
2901     tmp_filename_valid_peers = sub->filename_valid_peers;
2902     GNUNET_asprintf (&sub->filename_valid_peers,
2903                      "%s%s",
2904                      tmp_filename_valid_peers,
2905                      str_hash);
2906     GNUNET_free (tmp_filename_valid_peers);
2907   }
2908   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2909
2910   /* Set up the sampler */
2911   sub->sampler_size_est_min = sampler_size;
2912   sub->sampler_size_est_need = sampler_size;;
2913   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2914   GNUNET_assert (0 != round_interval.rel_value_us);
2915   sub->round_interval = round_interval;
2916   sub->sampler = RPS_sampler_init (sampler_size,
2917                                   round_interval);
2918
2919   /* Logging of internals */
2920 #ifdef TO_FILE_FULL
2921   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2922 #endif /* TO_FILE_FULL */
2923 #ifdef TO_FILE
2924 #ifdef TO_FILE_FULL
2925   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2926                                                        "observed");
2927 #endif /* TO_FILE_FULL */
2928   sub->num_observed_peers = 0;
2929   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2930                                                                     GNUNET_NO);
2931 #endif /* TO_FILE */
2932
2933   /* Set up data structures for gossip */
2934   sub->push_map = CustomPeerMap_create (4);
2935   sub->pull_map = CustomPeerMap_create (4);
2936   sub->view_size_est_min = sampler_size;;
2937   sub->view = View_create (sub->view_size_est_min);
2938   if (sub == msub)
2939   {
2940     GNUNET_STATISTICS_set (stats,
2941                            "view size aim",
2942                            sub->view_size_est_min,
2943                            GNUNET_NO);
2944   }
2945
2946   /* Start executing rounds */
2947   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2948
2949   return sub;
2950 }
2951
2952
2953 #ifdef TO_FILE
2954 /**
2955  * @brief Write all numbers in the given array into the given file
2956  *
2957  * Single numbers devided by a newline
2958  *
2959  * @param hist_array[] the array to dump
2960  * @param file_name file to dump into
2961  */
2962 static void
2963 write_histogram_to_file (const uint32_t hist_array[],
2964                          const char *file_name)
2965 {
2966   char collect_str[SIZE_DUMP_FILE + 1] = "";
2967   char *recv_str_iter;
2968   char *file_name_full;
2969
2970   recv_str_iter = collect_str;
2971   file_name_full = store_prefix_file_name (&own_identity,
2972                                            file_name);
2973   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2974   {
2975     char collect_str_tmp[8];
2976
2977     GNUNET_snprintf (collect_str_tmp,
2978                      sizeof (collect_str_tmp),
2979                      "%" PRIu32 "\n",
2980                      hist_array[i]);
2981     recv_str_iter = stpncpy (recv_str_iter,
2982                              collect_str_tmp,
2983                              6);
2984   }
2985   (void) stpcpy (recv_str_iter,
2986                  "\n");
2987   LOG (GNUNET_ERROR_TYPE_DEBUG,
2988        "Writing push stats to disk\n");
2989   to_file_w_len (file_name_full,
2990                  SIZE_DUMP_FILE,
2991                  collect_str);
2992   GNUNET_free (file_name_full);
2993 }
2994 #endif /* TO_FILE */
2995
2996
2997 /**
2998  * @brief Destroy Sub.
2999  *
3000  * @param sub Sub to destroy
3001  */
3002 static void
3003 destroy_sub (struct Sub *sub)
3004 {
3005   GNUNET_assert (NULL != sub);
3006   GNUNET_assert (NULL != sub->do_round_task);
3007   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3008   sub->do_round_task = NULL;
3009
3010   /* Disconnect from cadet */
3011   GNUNET_CADET_close_port (sub->cadet_port);
3012   sub->cadet_port= NULL;
3013
3014   /* Clean up data structures for peers */
3015   RPS_sampler_destroy (sub->sampler);
3016   sub->sampler = NULL;
3017   View_destroy (sub->view);
3018   sub->view = NULL;
3019   CustomPeerMap_destroy (sub->push_map);
3020   sub->push_map = NULL;
3021   CustomPeerMap_destroy (sub->pull_map);
3022   sub->pull_map = NULL;
3023   peers_terminate (sub);
3024
3025   /* Free leftover data structures */
3026 #ifdef TO_FILE_FULL
3027   GNUNET_free (sub->file_name_view_log);
3028   sub->file_name_view_log = NULL;
3029 #endif /* TO_FILE_FULL */
3030 #ifdef TO_FILE
3031 #ifdef TO_FILE_FULL
3032   GNUNET_free (sub->file_name_observed_log);
3033   sub->file_name_observed_log = NULL;
3034 #endif /* TO_FILE_FULL */
3035
3036   /* Write push frequencies to disk */
3037   write_histogram_to_file (sub->push_recv,
3038                            "push_recv");
3039
3040   /* Write push deltas to disk */
3041   write_histogram_to_file (sub->push_delta,
3042                            "push_delta");
3043
3044   /* Write pull delays to disk */
3045   write_histogram_to_file (sub->pull_delays,
3046                            "pull_delays");
3047
3048   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3049   sub->observed_unique_peers = NULL;
3050 #endif /* TO_FILE */
3051
3052   GNUNET_free (sub);
3053 }
3054
3055
3056 /***********************************************************************
3057  * /Sub
3058 ***********************************************************************/
3059
3060
3061 /***********************************************************************
3062  * Core handlers
3063 ***********************************************************************/
3064
3065 /**
3066  * @brief Callback on initialisation of Core.
3067  *
3068  * @param cls - unused
3069  * @param my_identity - unused
3070  */
3071 void
3072 core_init (void *cls,
3073            const struct GNUNET_PeerIdentity *my_identity)
3074 {
3075   (void) cls;
3076   (void) my_identity;
3077
3078   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3079 }
3080
3081
3082 /**
3083  * @brief Callback for core.
3084  * Method called whenever a given peer connects.
3085  *
3086  * @param cls closure - unused
3087  * @param peer peer identity this notification is about
3088  * @return closure given to #core_disconnects as peer_cls
3089  */
3090 void *
3091 core_connects (void *cls,
3092                const struct GNUNET_PeerIdentity *peer,
3093                struct GNUNET_MQ_Handle *mq)
3094 {
3095   (void) cls;
3096   (void) mq;
3097
3098   GNUNET_assert (GNUNET_YES ==
3099                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3100                                                     peer,
3101                                                     NULL,
3102                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3103   return NULL;
3104 }
3105
3106
3107 /**
3108  * @brief Callback for core.
3109  * Method called whenever a peer disconnects.
3110  *
3111  * @param cls closure - unused
3112  * @param peer peer identity this notification is about
3113  * @param peer_cls closure given in #core_connects - unused
3114  */
3115 void
3116 core_disconnects (void *cls,
3117                   const struct GNUNET_PeerIdentity *peer,
3118                   void *peer_cls)
3119 {
3120   (void) cls;
3121   (void) peer_cls;
3122
3123   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3124 }
3125
3126 /***********************************************************************
3127  * /Core handlers
3128 ***********************************************************************/
3129
3130
3131 /**
3132  * @brief Destroy the context for a (connected) client
3133  *
3134  * @param cli_ctx Context to destroy
3135  */
3136 static void
3137 destroy_cli_ctx (struct ClientContext *cli_ctx)
3138 {
3139   GNUNET_assert (NULL != cli_ctx);
3140   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3141                                cli_ctx_tail,
3142                                cli_ctx);
3143   if (NULL != cli_ctx->sub)
3144   {
3145     destroy_sub (cli_ctx->sub);
3146     cli_ctx->sub = NULL;
3147   }
3148   GNUNET_free (cli_ctx);
3149 }
3150
3151
3152 /**
3153  * @brief Update sizes in sampler and view on estimate update from nse service
3154  *
3155  * @param sub Sub
3156  * @param logestimate the log(Base 2) value of the current network size estimate
3157  * @param std_dev standard deviation for the estimate
3158  */
3159 static void
3160 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3161 {
3162   double estimate;
3163   //double scale; // TODO this might go gloabal/config
3164
3165   LOG (GNUNET_ERROR_TYPE_DEBUG,
3166        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3167        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3168   //scale = .01;
3169   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3170   // GNUNET_NSE_log_estimate_to_n (logestimate);
3171   estimate = pow (estimate, 1.0 / 3);
3172   // TODO add if std_dev is a number
3173   // estimate += (std_dev * scale);
3174   if (sub->view_size_est_min < ceil (estimate))
3175   {
3176     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3177     sub->sampler_size_est_need = estimate;
3178     sub->view_size_est_need = estimate;
3179   } else
3180   {
3181     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3182     //sub->sampler_size_est_need = sub->view_size_est_min;
3183     sub->view_size_est_need = sub->view_size_est_min;
3184   }
3185   if (sub == msub)
3186   {
3187     GNUNET_STATISTICS_set (stats,
3188                            "view size aim",
3189                            sub->view_size_est_need,
3190                            GNUNET_NO);
3191   }
3192
3193   /* If the NSE has changed adapt the lists accordingly */
3194   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3195   View_change_len (sub->view, sub->view_size_est_need);
3196 }
3197
3198
3199 /**
3200  * Function called by NSE.
3201  *
3202  * Updates sizes of sampler list and view and adapt those lists
3203  * accordingly.
3204  *
3205  * implements #GNUNET_NSE_Callback
3206  *
3207  * @param cls Closure - unused
3208  * @param timestamp time when the estimate was received from the server (or created by the server)
3209  * @param logestimate the log(Base 2) value of the current network size estimate
3210  * @param std_dev standard deviation for the estimate
3211  */
3212 static void
3213 nse_callback (void *cls,
3214               struct GNUNET_TIME_Absolute timestamp,
3215               double logestimate, double std_dev)
3216 {
3217   (void) cls;
3218   (void) timestamp;
3219   struct ClientContext *cli_ctx_iter;
3220
3221   adapt_sizes (msub, logestimate, std_dev);
3222   for (cli_ctx_iter = cli_ctx_head;
3223       NULL != cli_ctx_iter;
3224       cli_ctx_iter = cli_ctx_iter->next)
3225   {
3226     if (NULL != cli_ctx_iter->sub)
3227     {
3228       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3229     }
3230   }
3231 }
3232
3233
3234 /**
3235  * @brief This function is called, when the client seeds peers.
3236  * It verifies that @a msg is well-formed.
3237  *
3238  * @param cls the closure (#ClientContext)
3239  * @param msg the message
3240  * @return #GNUNET_OK if @a msg is well-formed
3241  *         #GNUNET_SYSERR otherwise
3242  */
3243 static int
3244 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3245 {
3246   struct ClientContext *cli_ctx = cls;
3247   uint16_t msize = ntohs (msg->header.size);
3248   uint32_t num_peers = ntohl (msg->num_peers);
3249
3250   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3251   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3252        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3253   {
3254     LOG (GNUNET_ERROR_TYPE_ERROR,
3255         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3256         ntohl (msg->num_peers),
3257         (msize / sizeof (struct GNUNET_PeerIdentity)));
3258     GNUNET_break (0);
3259     GNUNET_SERVICE_client_drop (cli_ctx->client);
3260     return GNUNET_SYSERR;
3261   }
3262   return GNUNET_OK;
3263 }
3264
3265
3266 /**
3267  * Handle seed from the client.
3268  *
3269  * @param cls closure
3270  * @param message the actual message
3271  */
3272 static void
3273 handle_client_seed (void *cls,
3274                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3275 {
3276   struct ClientContext *cli_ctx = cls;
3277   struct GNUNET_PeerIdentity *peers;
3278   uint32_t num_peers;
3279   uint32_t i;
3280
3281   num_peers = ntohl (msg->num_peers);
3282   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3283
3284   LOG (GNUNET_ERROR_TYPE_DEBUG,
3285        "Client seeded peers:\n");
3286   print_peer_list (peers, num_peers);
3287
3288   for (i = 0; i < num_peers; i++)
3289   {
3290     LOG (GNUNET_ERROR_TYPE_DEBUG,
3291          "Updating samplers with seed %" PRIu32 ": %s\n",
3292          i,
3293          GNUNET_i2s (&peers[i]));
3294
3295     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3296     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3297   }
3298   GNUNET_SERVICE_client_continue (cli_ctx->client);
3299 }
3300
3301
3302 /**
3303  * Handle RPS request from the client.
3304  *
3305  * @param cls Client context
3306  * @param message Message containing the numer of updates the client wants to
3307  * receive
3308  */
3309 static void
3310 handle_client_view_request (void *cls,
3311                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3312 {
3313   struct ClientContext *cli_ctx = cls;
3314   uint64_t num_updates;
3315
3316   num_updates = ntohl (msg->num_updates);
3317
3318   LOG (GNUNET_ERROR_TYPE_DEBUG,
3319        "Client requested %" PRIu64 " updates of view.\n",
3320        num_updates);
3321
3322   GNUNET_assert (NULL != cli_ctx);
3323   cli_ctx->view_updates_left = num_updates;
3324   send_view (cli_ctx, NULL, 0);
3325   GNUNET_SERVICE_client_continue (cli_ctx->client);
3326 }
3327
3328
3329 /**
3330  * @brief Handle the cancellation of the view updates.
3331  *
3332  * @param cls The client context
3333  * @param msg Unused
3334  */
3335 static void
3336 handle_client_view_cancel (void *cls,
3337                            const struct GNUNET_MessageHeader *msg)
3338 {
3339   struct ClientContext *cli_ctx = cls;
3340   (void) msg;
3341
3342   LOG (GNUNET_ERROR_TYPE_DEBUG,
3343        "Client does not want to receive updates of view any more.\n");
3344
3345   GNUNET_assert (NULL != cli_ctx);
3346   cli_ctx->view_updates_left = 0;
3347   GNUNET_SERVICE_client_continue (cli_ctx->client);
3348   if (GNUNET_YES == cli_ctx->stream_update)
3349   {
3350     destroy_cli_ctx (cli_ctx);
3351   }
3352 }
3353
3354
3355 /**
3356  * Handle RPS request for biased stream from the client.
3357  *
3358  * @param cls Client context
3359  * @param message unused
3360  */
3361 static void
3362 handle_client_stream_request (void *cls,
3363                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3364 {
3365   struct ClientContext *cli_ctx = cls;
3366   (void) msg;
3367
3368   LOG (GNUNET_ERROR_TYPE_DEBUG,
3369        "Client requested peers from biased stream.\n");
3370   cli_ctx->stream_update = GNUNET_YES;
3371
3372   GNUNET_assert (NULL != cli_ctx);
3373   GNUNET_SERVICE_client_continue (cli_ctx->client);
3374 }
3375
3376
3377 /**
3378  * @brief Handles the cancellation of the stream of biased peer ids
3379  *
3380  * @param cls The client context
3381  * @param msg unused
3382  */
3383 static void
3384 handle_client_stream_cancel (void *cls,
3385                              const struct GNUNET_MessageHeader *msg)
3386 {
3387   struct ClientContext *cli_ctx = cls;
3388   (void) msg;
3389
3390   LOG (GNUNET_ERROR_TYPE_DEBUG,
3391        "Client canceled receiving peers from biased stream.\n");
3392   cli_ctx->stream_update = GNUNET_NO;
3393
3394   GNUNET_assert (NULL != cli_ctx);
3395   GNUNET_SERVICE_client_continue (cli_ctx->client);
3396 }
3397
3398
3399 /**
3400  * @brief Create and start a Sub.
3401  *
3402  * @param cls Closure - unused
3403  * @param msg Message containing the necessary information
3404  */
3405 static void
3406 handle_client_start_sub (void *cls,
3407                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3408 {
3409   struct ClientContext *cli_ctx = cls;
3410
3411   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3412   if (NULL != cli_ctx->sub &&
3413       0 != memcmp (&cli_ctx->sub->hash,
3414                    &msg->hash,
3415                    sizeof (struct GNUNET_HashCode)))
3416   {
3417     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3418     destroy_sub (cli_ctx->sub);
3419     cli_ctx->sub = NULL;
3420   }
3421   cli_ctx->sub = new_sub (&msg->hash,
3422                          msub->sampler_size_est_min, // TODO make api input?
3423                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3424   GNUNET_SERVICE_client_continue (cli_ctx->client);
3425 }
3426
3427
3428 /**
3429  * @brief Destroy the Sub
3430  *
3431  * @param cls Closure - unused
3432  * @param msg Message containing the hash that identifies the Sub
3433  */
3434 static void
3435 handle_client_stop_sub (void *cls,
3436                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3437 {
3438   struct ClientContext *cli_ctx = cls;
3439
3440   GNUNET_assert (NULL != cli_ctx->sub);
3441   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3442   {
3443     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3444   }
3445   destroy_sub (cli_ctx->sub);
3446   cli_ctx->sub = NULL;
3447   GNUNET_SERVICE_client_continue (cli_ctx->client);
3448 }
3449
3450
3451 /**
3452  * Handle a CHECK_LIVE message from another peer.
3453  *
3454  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3455  * the channel is blocked for all other communication.
3456  *
3457  * @param cls Closure - Context of channel
3458  * @param msg Message - unused
3459  */
3460 static void
3461 handle_peer_check (void *cls,
3462                    const struct GNUNET_MessageHeader *msg)
3463 {
3464   const struct ChannelCtx *channel_ctx = cls;
3465   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3466   (void) msg;
3467
3468   LOG (GNUNET_ERROR_TYPE_DEBUG,
3469       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3470   if (channel_ctx->peer_ctx->sub == msub)
3471   {
3472     GNUNET_STATISTICS_update (stats,
3473                               "# pending online checks",
3474                               -1,
3475                               GNUNET_NO);
3476   }
3477
3478   GNUNET_CADET_receive_done (channel_ctx->channel);
3479 }
3480
3481
3482 /**
3483  * Handle a PUSH message from another peer.
3484  *
3485  * Check the proof of work and store the PeerID
3486  * in the temporary list for pushed PeerIDs.
3487  *
3488  * @param cls Closure - Context of channel
3489  * @param msg Message - unused
3490  */
3491 static void
3492 handle_peer_push (void *cls,
3493                   const struct GNUNET_MessageHeader *msg)
3494 {
3495   const struct ChannelCtx *channel_ctx = cls;
3496   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3497   (void) msg;
3498
3499   // (check the proof of work (?))
3500
3501   LOG (GNUNET_ERROR_TYPE_DEBUG,
3502        "Received PUSH (%s)\n",
3503        GNUNET_i2s (peer));
3504   if (channel_ctx->peer_ctx->sub == msub)
3505   {
3506     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3507     if (NULL != map_single_hop &&
3508         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3509                                                              peer))
3510     {
3511       GNUNET_STATISTICS_update (stats,
3512                                 "# push message received (multi-hop peer)",
3513                                 1,
3514                                 GNUNET_NO);
3515     }
3516   }
3517
3518   #if ENABLE_MALICIOUS
3519   struct AttackedPeer *tmp_att_peer;
3520
3521   if ( (1 == mal_type) ||
3522        (3 == mal_type) )
3523   { /* Try to maximise representation */
3524     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3525     tmp_att_peer->peer_id = *peer;
3526     if (NULL == att_peer_set)
3527       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3528     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3529                                                              peer))
3530     {
3531       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3532                                    att_peers_tail,
3533                                    tmp_att_peer);
3534       add_peer_array_to_set (peer, 1, att_peer_set);
3535     }
3536     else
3537     {
3538       GNUNET_free (tmp_att_peer);
3539     }
3540   }
3541
3542
3543   else if (2 == mal_type)
3544   {
3545     /* We attack one single well-known peer - simply ignore */
3546   }
3547   #endif /* ENABLE_MALICIOUS */
3548
3549   /* Add the sending peer to the push_map */
3550   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3551
3552   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3553                                      &channel_ctx->peer_ctx->peer_id));
3554   GNUNET_CADET_receive_done (channel_ctx->channel);
3555 }
3556
3557
3558 /**
3559  * Handle PULL REQUEST request message from another peer.
3560  *
3561  * Reply with the view of PeerIDs.
3562  *
3563  * @param cls Closure - Context of channel
3564  * @param msg Message - unused
3565  */
3566 static void
3567 handle_peer_pull_request (void *cls,
3568                           const struct GNUNET_MessageHeader *msg)
3569 {
3570   const struct ChannelCtx *channel_ctx = cls;
3571   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3572   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3573   const struct GNUNET_PeerIdentity *view_array;
3574   (void) msg;
3575
3576   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3577   if (peer_ctx->sub == msub)
3578   {
3579     GNUNET_STATISTICS_update(stats,
3580                              "# pull request message received",
3581                              1,
3582                              GNUNET_NO);
3583     if (NULL != map_single_hop &&
3584         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3585                                                              &peer_ctx->peer_id))
3586     {
3587       GNUNET_STATISTICS_update (stats,
3588                                 "# pull request message received (multi-hop peer)",
3589                                 1,
3590                                 GNUNET_NO);
3591     }
3592   }
3593
3594   #if ENABLE_MALICIOUS
3595   if (1 == mal_type
3596       || 3 == mal_type)
3597   { /* Try to maximise representation */
3598     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3599   }
3600
3601   else if (2 == mal_type)
3602   { /* Try to partition network */
3603     if (0 == GNUNET_memcmp (&attacked_peer, peer))
3604     {
3605       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3606     }
3607   }
3608   #endif /* ENABLE_MALICIOUS */
3609
3610   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3611                                      &channel_ctx->peer_ctx->peer_id));
3612   GNUNET_CADET_receive_done (channel_ctx->channel);
3613   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3614   send_pull_reply (peer_ctx,
3615                    view_array,
3616                    View_size (channel_ctx->peer_ctx->sub->view));
3617 }
3618
3619
3620 /**
3621  * Check whether we sent a corresponding request and
3622  * whether this reply is the first one.
3623  *
3624  * @param cls Closure - Context of channel
3625  * @param msg Message containing the replied peers
3626  */
3627 static int
3628 check_peer_pull_reply (void *cls,
3629                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3630 {
3631   struct ChannelCtx *channel_ctx = cls;
3632   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3633
3634   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3635   {
3636     GNUNET_break_op (0);
3637     return GNUNET_SYSERR;
3638   }
3639
3640   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3641       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3642   {
3643     LOG (GNUNET_ERROR_TYPE_ERROR,
3644         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3645         ntohl (msg->num_peers),
3646         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3647             sizeof (struct GNUNET_PeerIdentity));
3648     GNUNET_break_op (0);
3649     return GNUNET_SYSERR;
3650   }
3651
3652   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3653                                      &sender_ctx->peer_id,
3654                                      Peers_PULL_REPLY_PENDING))
3655   {
3656     LOG (GNUNET_ERROR_TYPE_WARNING,
3657         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3658         GNUNET_i2s (&sender_ctx->peer_id));
3659     if (sender_ctx->sub == msub)
3660     {
3661       GNUNET_STATISTICS_update (stats,
3662                                 "# unrequested pull replies",
3663                                 1,
3664                                 GNUNET_NO);
3665     }
3666   }
3667   return GNUNET_OK;
3668 }
3669
3670
3671 /**
3672  * Handle PULL REPLY message from another peer.
3673  *
3674  * @param cls Closure
3675  * @param msg The message header
3676  */
3677 static void
3678 handle_peer_pull_reply (void *cls,
3679                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3680 {
3681   const struct ChannelCtx *channel_ctx = cls;
3682   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3683   const struct GNUNET_PeerIdentity *peers;
3684   struct Sub *sub = channel_ctx->peer_ctx->sub;
3685   uint32_t i;
3686 #if ENABLE_MALICIOUS
3687   struct AttackedPeer *tmp_att_peer;
3688 #endif /* ENABLE_MALICIOUS */
3689
3690   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3691   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3692   if (channel_ctx->peer_ctx->sub == msub)
3693   {
3694     GNUNET_STATISTICS_update (stats,
3695                               "# pull reply messages received",
3696                               1,
3697                               GNUNET_NO);
3698     if (NULL != map_single_hop &&
3699         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3700           &channel_ctx->peer_ctx->peer_id))
3701     {
3702       GNUNET_STATISTICS_update (stats,
3703                                 "# pull reply messages received (multi-hop peer)",
3704                                 1,
3705                                 GNUNET_NO);
3706     }
3707   }
3708
3709   #if ENABLE_MALICIOUS
3710   // We shouldn't even receive pull replies as we're not sending
3711   if (2 == mal_type)
3712   {
3713   }
3714   #endif /* ENABLE_MALICIOUS */
3715
3716   /* Do actual logic */
3717   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3718
3719   LOG (GNUNET_ERROR_TYPE_DEBUG,
3720        "PULL REPLY received, got following %u peers:\n",
3721        ntohl (msg->num_peers));
3722
3723   for (i = 0; i < ntohl (msg->num_peers); i++)
3724   {
3725     LOG (GNUNET_ERROR_TYPE_DEBUG,
3726          "%u. %s\n",
3727          i,
3728          GNUNET_i2s (&peers[i]));
3729
3730     #if ENABLE_MALICIOUS
3731     if ((NULL != att_peer_set) &&
3732         (1 == mal_type || 3 == mal_type))
3733     { /* Add attacked peer to local list */
3734       // TODO check if we sent a request and this was the first reply
3735       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3736                                                                &peers[i])
3737           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3738                                                                   &peers[i]))
3739       {
3740         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3741         tmp_att_peer->peer_id = peers[i];
3742         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3743                                      att_peers_tail,
3744                                      tmp_att_peer);
3745         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3746       }
3747       continue;
3748     }
3749     #endif /* ENABLE_MALICIOUS */
3750     /* Make sure we 'know' about this peer */
3751     (void) insert_peer (channel_ctx->peer_ctx->sub,
3752                         &peers[i]);
3753
3754     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3755                                         &peers[i]))
3756     {
3757       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3758                          &peers[i]);
3759     }
3760     else
3761     {
3762       schedule_operation (channel_ctx->peer_ctx,
3763                           insert_in_pull_map,
3764                           channel_ctx->peer_ctx->sub); /* cls */
3765       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3766                                       &peers[i]);
3767     }
3768   }
3769
3770   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3771                                  sender),
3772                    Peers_PULL_REPLY_PENDING);
3773   clean_peer (channel_ctx->peer_ctx->sub,
3774               sender);
3775
3776   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3777                                      sender));
3778   GNUNET_CADET_receive_done (channel_ctx->channel);
3779 }
3780
3781
3782 /**
3783  * Compute a random delay.
3784  * A uniformly distributed value between mean + spread and mean - spread.
3785  *
3786  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3787  * It would return a random value between 2 and 6 min.
3788  *
3789  * @param mean the mean time until the next round
3790  * @param spread the inverse amount of deviation from the mean
3791  */
3792 static struct GNUNET_TIME_Relative
3793 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3794                     unsigned int spread)
3795 {
3796   struct GNUNET_TIME_Relative half_interval;
3797   struct GNUNET_TIME_Relative ret;
3798   unsigned int rand_delay;
3799   unsigned int max_rand_delay;
3800
3801   if (0 == spread)
3802   {
3803     LOG (GNUNET_ERROR_TYPE_WARNING,
3804          "Not accepting spread of 0\n");
3805     GNUNET_break (0);
3806     GNUNET_assert (0);
3807   }
3808   GNUNET_assert (0 != mean.rel_value_us);
3809
3810   /* Compute random time value between spread * mean and spread * mean */
3811   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3812
3813   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3814   /**
3815    * Compute random value between (0 and 1) * round_interval
3816    * via multiplying round_interval with a 'fraction' (0 to value)/value
3817    */
3818   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3819   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3820   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3821   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3822
3823   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3824     LOG (GNUNET_ERROR_TYPE_WARNING,
3825          "Returning FOREVER_REL\n");
3826
3827   return ret;
3828 }
3829
3830
3831 /**
3832  * Send single pull request
3833  *
3834  * @param peer_ctx Context to the peer to send request to
3835  */
3836 static void
3837 send_pull_request (struct PeerContext *peer_ctx)
3838 {
3839   struct GNUNET_MQ_Envelope *ev;
3840
3841   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3842                                                &peer_ctx->peer_id,
3843                                                Peers_PULL_REPLY_PENDING));
3844   SET_PEER_FLAG (peer_ctx,
3845                  Peers_PULL_REPLY_PENDING);
3846   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3847
3848   LOG (GNUNET_ERROR_TYPE_DEBUG,
3849        "Going to send PULL REQUEST to peer %s.\n",
3850        GNUNET_i2s (&peer_ctx->peer_id));
3851
3852   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3853   send_message (peer_ctx,
3854                 ev,
3855                 "PULL REQUEST");
3856   if (peer_ctx->sub)
3857   {
3858     GNUNET_STATISTICS_update (stats,
3859                               "# pull request send issued",
3860                               1,
3861                               GNUNET_NO);
3862     if (NULL != map_single_hop &&
3863         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3864                                                              &peer_ctx->peer_id))
3865     {
3866       GNUNET_STATISTICS_update (stats,
3867                                 "# pull request send issued (multi-hop peer)",
3868                                 1,
3869                                 GNUNET_NO);
3870     }
3871   }
3872 }
3873
3874
3875 /**
3876  * Send single push
3877  *
3878  * @param peer_ctx Context of peer to send push to
3879  */
3880 static void
3881 send_push (struct PeerContext *peer_ctx)
3882 {
3883   struct GNUNET_MQ_Envelope *ev;
3884
3885   LOG (GNUNET_ERROR_TYPE_DEBUG,
3886        "Going to send PUSH to peer %s.\n",
3887        GNUNET_i2s (&peer_ctx->peer_id));
3888
3889   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3890   send_message (peer_ctx, ev, "PUSH");
3891   if (peer_ctx->sub)
3892   {
3893     GNUNET_STATISTICS_update (stats,
3894                               "# push send issued",
3895                               1,
3896                               GNUNET_NO);
3897     if (NULL != map_single_hop &&
3898         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3899                                                              &peer_ctx->peer_id))
3900     {
3901       GNUNET_STATISTICS_update (stats,
3902                                 "# push send issued (multi-hop peer)",
3903                                 1,
3904                                 GNUNET_NO);
3905     }
3906   }
3907 }
3908
3909
3910 #if ENABLE_MALICIOUS
3911
3912
3913 /**
3914  * @brief This function is called, when the client tells us to act malicious.
3915  * It verifies that @a msg is well-formed.
3916  *
3917  * @param cls the closure (#ClientContext)
3918  * @param msg the message
3919  * @return #GNUNET_OK if @a msg is well-formed
3920  */
3921 static int
3922 check_client_act_malicious (void *cls,
3923                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3924 {
3925   struct ClientContext *cli_ctx = cls;
3926   uint16_t msize = ntohs (msg->header.size);
3927   uint32_t num_peers = ntohl (msg->num_peers);
3928
3929   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3930   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3931        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3932   {
3933     LOG (GNUNET_ERROR_TYPE_ERROR,
3934         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3935         ntohl (msg->num_peers),
3936         (msize / sizeof (struct GNUNET_PeerIdentity)));
3937     GNUNET_break (0);
3938     GNUNET_SERVICE_client_drop (cli_ctx->client);
3939     return GNUNET_SYSERR;
3940   }
3941   return GNUNET_OK;
3942 }
3943
3944 /**
3945  * Turn RPS service to act malicious.
3946  *
3947  * @param cls Closure
3948  * @param client The client that sent the message
3949  * @param msg The message header
3950  */
3951 static void
3952 handle_client_act_malicious (void *cls,
3953                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3954 {
3955   struct ClientContext *cli_ctx = cls;
3956   struct GNUNET_PeerIdentity *peers;
3957   uint32_t num_mal_peers_sent;
3958   uint32_t num_mal_peers_old;
3959   struct Sub *sub = cli_ctx->sub;
3960
3961   if (NULL == sub) sub = msub;
3962   /* Do actual logic */
3963   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3964   mal_type = ntohl (msg->type);
3965   if (NULL == mal_peer_set)
3966     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3967
3968   LOG (GNUNET_ERROR_TYPE_DEBUG,
3969        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3970        mal_type,
3971        ntohl (msg->num_peers));
3972
3973   if (1 == mal_type)
3974   { /* Try to maximise representation */
3975     /* Add other malicious peers to those we already know */
3976
3977     num_mal_peers_sent = ntohl (msg->num_peers);
3978     num_mal_peers_old = num_mal_peers;
3979     GNUNET_array_grow (mal_peers,
3980                        num_mal_peers,
3981                        num_mal_peers + num_mal_peers_sent);
3982     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3983             peers,
3984             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3985
3986     /* Add all mal peers to mal_peer_set */
3987     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3988                            num_mal_peers_sent,
3989                            mal_peer_set);
3990
3991     /* Substitute do_round () with do_mal_round () */
3992     GNUNET_assert (NULL != sub->do_round_task);
3993     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3994     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3995   }
3996
3997   else if ( (2 == mal_type) ||
3998             (3 == mal_type) )
3999   { /* Try to partition the network */
4000     /* Add other malicious peers to those we already know */
4001
4002     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4003     num_mal_peers_old = num_mal_peers;
4004     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4005     GNUNET_array_grow (mal_peers,
4006                        num_mal_peers,
4007                        num_mal_peers + num_mal_peers_sent);
4008     if (NULL != mal_peers &&
4009         0 != num_mal_peers)
4010     {
4011       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4012               peers,
4013               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
4014
4015       /* Add all mal peers to mal_peer_set */
4016       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4017                              num_mal_peers_sent,
4018                              mal_peer_set);
4019     }
4020
4021     /* Store the one attacked peer */
4022     GNUNET_memcpy (&attacked_peer,
4023             &msg->attacked_peer,
4024             sizeof (struct GNUNET_PeerIdentity));
4025     /* Set the flag of the attacked peer to valid to avoid problems */
4026     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4027     {
4028       (void) issue_peer_online_check (sub, &attacked_peer);
4029     }
4030
4031     LOG (GNUNET_ERROR_TYPE_DEBUG,
4032          "Attacked peer is %s\n",
4033          GNUNET_i2s (&attacked_peer));
4034
4035     /* Substitute do_round () with do_mal_round () */
4036     if (NULL != sub->do_round_task)
4037     {
4038       /* Probably in shutdown */
4039       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4040       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4041     }
4042   }
4043   else if (0 == mal_type)
4044   { /* Stop acting malicious */
4045     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4046
4047     /* Substitute do_mal_round () with do_round () */
4048     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4049     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4050   }
4051   else
4052   {
4053     GNUNET_break (0);
4054     GNUNET_SERVICE_client_continue (cli_ctx->client);
4055   }
4056   GNUNET_SERVICE_client_continue (cli_ctx->client);
4057 }
4058
4059
4060 /**
4061  * Send out PUSHes and PULLs maliciously.
4062  *
4063  * This is executed regylary.
4064  *
4065  * @param cls Closure - Sub
4066  */
4067 static void
4068 do_mal_round (void *cls)
4069 {
4070   uint32_t num_pushes;
4071   uint32_t i;
4072   struct GNUNET_TIME_Relative time_next_round;
4073   struct AttackedPeer *tmp_att_peer;
4074   struct Sub *sub = cls;
4075
4076   LOG (GNUNET_ERROR_TYPE_DEBUG,
4077        "Going to execute next round maliciously type %" PRIu32 ".\n",
4078       mal_type);
4079   sub->do_round_task = NULL;
4080   GNUNET_assert (mal_type <= 3);
4081   /* Do malicious actions */
4082   if (1 == mal_type)
4083   { /* Try to maximise representation */
4084
4085     /* The maximum of pushes we're going to send this round */
4086     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4087                                          num_attacked_peers),
4088                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4089
4090     LOG (GNUNET_ERROR_TYPE_DEBUG,
4091          "Going to send %" PRIu32 " pushes\n",
4092          num_pushes);
4093
4094     /* Send PUSHes to attacked peers */
4095     for (i = 0 ; i < num_pushes ; i++)
4096     {
4097       if (att_peers_tail == att_peer_index)
4098         att_peer_index = att_peers_head;
4099       else
4100         att_peer_index = att_peer_index->next;
4101
4102       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4103     }
4104
4105     /* Send PULLs to some peers to learn about additional peers to attack */
4106     tmp_att_peer = att_peer_index;
4107     for (i = 0 ; i < num_pushes * alpha ; i++)
4108     {
4109       if (att_peers_tail == tmp_att_peer)
4110         tmp_att_peer = att_peers_head;
4111       else
4112         att_peer_index = tmp_att_peer->next;
4113
4114       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4115     }
4116   }
4117
4118
4119   else if (2 == mal_type)
4120   { /**
4121      * Try to partition the network
4122      * Send as many pushes to the attacked peer as possible
4123      * That is one push per round as it will ignore more.
4124      */
4125     (void) issue_peer_online_check (sub, &attacked_peer);
4126     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4127                                        &attacked_peer,
4128                                        Peers_ONLINE))
4129       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4130   }
4131
4132
4133   if (3 == mal_type)
4134   { /* Combined attack */
4135
4136     /* Send PUSH to attacked peers */
4137     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4138     {
4139       (void) issue_peer_online_check (sub, &attacked_peer);
4140       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4141                                          &attacked_peer,
4142                                          Peers_ONLINE))
4143       {
4144         LOG (GNUNET_ERROR_TYPE_DEBUG,
4145             "Goding to send push to attacked peer (%s)\n",
4146             GNUNET_i2s (&attacked_peer));
4147         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4148       }
4149     }
4150     (void) issue_peer_online_check (sub, &attacked_peer);
4151
4152     /* The maximum of pushes we're going to send this round */
4153     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4154                                          num_attacked_peers),
4155                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4156
4157     LOG (GNUNET_ERROR_TYPE_DEBUG,
4158          "Going to send %" PRIu32 " pushes\n",
4159          num_pushes);
4160
4161     for (i = 0; i < num_pushes; i++)
4162     {
4163       if (att_peers_tail == att_peer_index)
4164         att_peer_index = att_peers_head;
4165       else
4166         att_peer_index = att_peer_index->next;
4167
4168       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4169     }
4170
4171     /* Send PULLs to some peers to learn about additional peers to attack */
4172     tmp_att_peer = att_peer_index;
4173     for (i = 0; i < num_pushes * alpha; i++)
4174     {
4175       if (att_peers_tail == tmp_att_peer)
4176         tmp_att_peer = att_peers_head;
4177       else
4178         att_peer_index = tmp_att_peer->next;
4179
4180       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4181     }
4182   }
4183
4184   /* Schedule next round */
4185   time_next_round = compute_rand_delay (sub->round_interval, 2);
4186
4187   GNUNET_assert (NULL == sub->do_round_task);
4188   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4189                                                     &do_mal_round, sub);
4190   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4191 }
4192 #endif /* ENABLE_MALICIOUS */
4193
4194
4195 /**
4196  * Send out PUSHes and PULLs, possibly update #view, samplers.
4197  *
4198  * This is executed regylary.
4199  *
4200  * @param cls Closure - Sub
4201  */
4202 static void
4203 do_round (void *cls)
4204 {
4205   unsigned int i;
4206   const struct GNUNET_PeerIdentity *view_array;
4207   unsigned int *permut;
4208   unsigned int a_peers; /* Number of peers we send pushes to */
4209   unsigned int b_peers; /* Number of peers we send pull requests to */
4210   uint32_t first_border;
4211   uint32_t second_border;
4212   struct GNUNET_PeerIdentity peer;
4213   struct GNUNET_PeerIdentity *update_peer;
4214   struct Sub *sub = cls;
4215
4216   sub->num_rounds++;
4217   LOG (GNUNET_ERROR_TYPE_DEBUG,
4218        "Going to execute next round.\n");
4219   if (sub == msub)
4220   {
4221     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4222   }
4223   sub->do_round_task = NULL;
4224 #ifdef TO_FILE_FULL
4225   to_file (sub->file_name_view_log,
4226            "___ new round ___");
4227 #endif /* TO_FILE_FULL */
4228   view_array = View_get_as_array (sub->view);
4229   for (i = 0; i < View_size (sub->view); i++)
4230   {
4231     LOG (GNUNET_ERROR_TYPE_DEBUG,
4232          "\t%s\n", GNUNET_i2s (&view_array[i]));
4233 #ifdef TO_FILE_FULL
4234     to_file (sub->file_name_view_log,
4235              "=%s\t(do round)",
4236              GNUNET_i2s_full (&view_array[i]));
4237 #endif /* TO_FILE_FULL */
4238   }
4239
4240
4241   /* Send pushes and pull requests */
4242   if (0 < View_size (sub->view))
4243   {
4244     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4245                                            View_size (sub->view));
4246
4247     /* Send PUSHes */
4248     a_peers = ceil (alpha * View_size (sub->view));
4249
4250     LOG (GNUNET_ERROR_TYPE_DEBUG,
4251          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4252          a_peers, alpha, View_size (sub->view));
4253     for (i = 0; i < a_peers; i++)
4254     {
4255       peer = view_array[permut[i]];
4256       // FIXME if this fails schedule/loop this for later
4257       send_push (get_peer_ctx (sub->peer_map, &peer));
4258     }
4259
4260     /* Send PULL requests */
4261     b_peers = ceil (beta * View_size (sub->view));
4262     first_border = a_peers;
4263     second_border = a_peers + b_peers;
4264     if (second_border > View_size (sub->view))
4265     {
4266       first_border = View_size (sub->view) - b_peers;
4267       second_border = View_size (sub->view);
4268     }
4269     LOG (GNUNET_ERROR_TYPE_DEBUG,
4270         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4271         b_peers, beta, View_size (sub->view));
4272     for (i = first_border; i < second_border; i++)
4273     {
4274       peer = view_array[permut[i]];
4275       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4276                                          &peer,
4277                                          Peers_PULL_REPLY_PENDING))
4278       { // FIXME if this fails schedule/loop this for later
4279         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4280       }
4281     }
4282
4283     GNUNET_free (permut);
4284     permut = NULL;
4285   }
4286
4287
4288   /* Update view */
4289   /* TODO see how many peers are in push-/pull- list! */
4290
4291   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4292       (0 < CustomPeerMap_size (sub->push_map)) &&
4293       (0 < CustomPeerMap_size (sub->pull_map)))
4294   { /* If conditions for update are fulfilled, update */
4295     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4296
4297     uint32_t final_size;
4298     uint32_t peers_to_clean_size;
4299     struct GNUNET_PeerIdentity *peers_to_clean;
4300
4301     peers_to_clean = NULL;
4302     peers_to_clean_size = 0;
4303     GNUNET_array_grow (peers_to_clean,
4304                        peers_to_clean_size,
4305                        View_size (sub->view));
4306     GNUNET_memcpy (peers_to_clean,
4307             view_array,
4308             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4309
4310     /* Seems like recreating is the easiest way of emptying the peermap */
4311     View_clear (sub->view);
4312 #ifdef TO_FILE_FULL
4313     to_file (sub->file_name_view_log,
4314              "--- emptied ---");
4315 #endif /* TO_FILE_FULL */
4316
4317     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4318                                 CustomPeerMap_size (sub->push_map));
4319     second_border = first_border +
4320                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4321                                 CustomPeerMap_size (sub->pull_map));
4322     final_size    = second_border +
4323       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4324     LOG (GNUNET_ERROR_TYPE_DEBUG,
4325         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4326         first_border,
4327         second_border,
4328         final_size);
4329
4330     /* Update view with peers received through PUSHes */
4331     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4332                                            CustomPeerMap_size (sub->push_map));
4333     for (i = 0; i < first_border; i++)
4334     {
4335       int inserted;
4336       inserted = insert_in_view (sub,
4337                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4338                                                                   permut[i]));
4339       if (GNUNET_OK == inserted)
4340       {
4341         clients_notify_stream_peer (sub,
4342             1,
4343             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4344       }
4345 #ifdef TO_FILE_FULL
4346       to_file (sub->file_name_view_log,
4347                "+%s\t(push list)",
4348                GNUNET_i2s_full (&view_array[i]));
4349 #endif /* TO_FILE_FULL */
4350       // TODO change the peer_flags accordingly
4351     }
4352     GNUNET_free (permut);
4353     permut = NULL;
4354
4355     /* Update view with peers received through PULLs */
4356     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4357                                            CustomPeerMap_size (sub->pull_map));
4358     for (i = first_border; i < second_border; i++)
4359     {
4360       int inserted;
4361       inserted = insert_in_view (sub,
4362           CustomPeerMap_get_peer_by_index (sub->pull_map,
4363                                            permut[i - first_border]));
4364       if (GNUNET_OK == inserted)
4365       {
4366         clients_notify_stream_peer (sub,
4367             1,
4368             CustomPeerMap_get_peer_by_index (sub->pull_map,
4369                                              permut[i - first_border]));
4370       }
4371 #ifdef TO_FILE_FULL
4372       to_file (sub->file_name_view_log,
4373                "+%s\t(pull list)",
4374                GNUNET_i2s_full (&view_array[i]));
4375 #endif /* TO_FILE_FULL */
4376       // TODO change the peer_flags accordingly
4377     }
4378     GNUNET_free (permut);
4379     permut = NULL;
4380
4381     /* Update view with peers from history */
4382     RPS_sampler_get_n_rand_peers (sub->sampler,
4383                                   final_size - second_border,
4384                                   hist_update,
4385                                   sub);
4386     // TODO change the peer_flags accordingly
4387
4388     for (i = 0; i < View_size (sub->view); i++)
4389       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4390
4391     /* Clean peers that were removed from the view */
4392     for (i = 0; i < peers_to_clean_size; i++)
4393     {
4394 #ifdef TO_FILE_FULL
4395       to_file (sub->file_name_view_log,
4396                "-%s",
4397                GNUNET_i2s_full (&peers_to_clean[i]));
4398 #endif /* TO_FILE_FULL */
4399       clean_peer (sub, &peers_to_clean[i]);
4400     }
4401
4402     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4403     clients_notify_view_update (sub);
4404   } else {
4405     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4406     if (sub == msub)
4407     {
4408       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4409       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4410           !(0 >= CustomPeerMap_size (sub->pull_map)))
4411         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4412       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4413           (0 >= CustomPeerMap_size (sub->pull_map)))
4414         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4415       if (0 >= CustomPeerMap_size (sub->push_map) &&
4416           !(0 >= CustomPeerMap_size (sub->pull_map)))
4417         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4418       if (0 >= CustomPeerMap_size (sub->push_map) &&
4419           (0 >= CustomPeerMap_size (sub->pull_map)))
4420         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4421       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4422           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4423           0 >= CustomPeerMap_size (sub->push_map))
4424         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4425     }
4426   }
4427   // TODO independent of that also get some peers from CADET_get_peers()?
4428   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4429   {
4430     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4431   }
4432   else
4433   {
4434     LOG (GNUNET_ERROR_TYPE_WARNING,
4435          "Push map size too big for histogram (%u, %u)\n",
4436          CustomPeerMap_size (sub->push_map),
4437          HISTOGRAM_FILE_SLOTS);
4438   }
4439   // FIXME check bounds of histogram
4440   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4441                    (alpha * sub->view_size_est_need)) +
4442                           (HISTOGRAM_FILE_SLOTS/2)]++;
4443   if (sub == msub)
4444   {
4445     GNUNET_STATISTICS_set (stats,
4446         "# peers in push map at end of round",
4447         CustomPeerMap_size (sub->push_map),
4448         GNUNET_NO);
4449     GNUNET_STATISTICS_set (stats,
4450         "# peers in pull map at end of round",
4451         CustomPeerMap_size (sub->pull_map),
4452         GNUNET_NO);
4453     GNUNET_STATISTICS_set (stats,
4454         "# peers in view at end of round",
4455         View_size (sub->view),
4456         GNUNET_NO);
4457     GNUNET_STATISTICS_set (stats,
4458         "# expected pushes",
4459         alpha * sub->view_size_est_need,
4460         GNUNET_NO);
4461     GNUNET_STATISTICS_set (stats,
4462         "delta expected - received pushes",
4463         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4464         GNUNET_NO);
4465   }
4466
4467   LOG (GNUNET_ERROR_TYPE_DEBUG,
4468        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4469        CustomPeerMap_size (sub->push_map),
4470        CustomPeerMap_size (sub->pull_map),
4471        alpha,
4472        View_size (sub->view),
4473        alpha * View_size (sub->view));
4474
4475   /* Update samplers */
4476   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4477   {
4478     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4479     LOG (GNUNET_ERROR_TYPE_DEBUG,
4480          "Updating with peer %s from push list\n",
4481          GNUNET_i2s (update_peer));
4482     insert_in_sampler (sub, update_peer);
4483     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4484   }
4485
4486   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4487   {
4488     LOG (GNUNET_ERROR_TYPE_DEBUG,
4489          "Updating with peer %s from pull list\n",
4490          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4491     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4492     /* This cleans only if it is not in the view */
4493     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4494   }
4495
4496
4497   /* Empty push/pull lists */
4498   CustomPeerMap_clear (sub->push_map);
4499   CustomPeerMap_clear (sub->pull_map);
4500
4501   if (sub == msub)
4502   {
4503     GNUNET_STATISTICS_set (stats,
4504                            "view size",
4505                            View_size(sub->view),
4506                            GNUNET_NO);
4507   }
4508
4509   struct GNUNET_TIME_Relative time_next_round;
4510
4511   time_next_round = compute_rand_delay (sub->round_interval, 2);
4512
4513   /* Schedule next round */
4514   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4515                                                      &do_round, sub);
4516   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4517 }
4518
4519
4520 /**
4521  * This is called from GNUNET_CADET_get_peers().
4522  *
4523  * It is called on every peer(ID) that cadet somehow has contact with.
4524  * We use those to initialise the sampler.
4525  *
4526  * implements #GNUNET_CADET_PeersCB
4527  *
4528  * @param cls Closure - Sub
4529  * @param peer Peer, or NULL on "EOF".
4530  * @param tunnel Do we have a tunnel towards this peer?
4531  * @param n_paths Number of known paths towards this peer.
4532  * @param best_path How long is the best path?
4533  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4534  */
4535 void
4536 init_peer_cb (void *cls,
4537               const struct GNUNET_PeerIdentity *peer,
4538               int tunnel, /* "Do we have a tunnel towards this peer?" */
4539               unsigned int n_paths, /* "Number of known paths towards this peer" */
4540               unsigned int best_path) /* "How long is the best path?
4541                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4542 {
4543   struct Sub *sub = cls;
4544   (void) tunnel;
4545   (void) n_paths;
4546   (void) best_path;
4547
4548   if (NULL != peer)
4549   {
4550     LOG (GNUNET_ERROR_TYPE_DEBUG,
4551          "Got peer_id %s from cadet\n",
4552          GNUNET_i2s (peer));
4553     got_peer (sub, peer);
4554   }
4555 }
4556
4557
4558 /**
4559  * @brief Iterator function over stored, valid peers.
4560  *
4561  * We initialise the sampler with those.
4562  *
4563  * @param cls Closure - Sub
4564  * @param peer the peer id
4565  * @return #GNUNET_YES if we should continue to
4566  *         iterate,
4567  *         #GNUNET_NO if not.
4568  */
4569 static int
4570 valid_peers_iterator (void *cls,
4571                       const struct GNUNET_PeerIdentity *peer)
4572 {
4573   struct Sub *sub = cls;
4574
4575   if (NULL != peer)
4576   {
4577     LOG (GNUNET_ERROR_TYPE_DEBUG,
4578          "Got stored, valid peer %s\n",
4579          GNUNET_i2s (peer));
4580     got_peer (sub, peer);
4581   }
4582   return GNUNET_YES;
4583 }
4584
4585
4586 /**
4587  * Iterator over peers from peerinfo.
4588  *
4589  * @param cls Closure - Sub
4590  * @param peer id of the peer, NULL for last call
4591  * @param hello hello message for the peer (can be NULL)
4592  * @param error message
4593  */
4594 void
4595 process_peerinfo_peers (void *cls,
4596                         const struct GNUNET_PeerIdentity *peer,
4597                         const struct GNUNET_HELLO_Message *hello,
4598                         const char *err_msg)
4599 {
4600   struct Sub *sub = cls;
4601   (void) hello;
4602   (void) err_msg;
4603
4604   if (NULL != peer)
4605   {
4606     LOG (GNUNET_ERROR_TYPE_DEBUG,
4607          "Got peer_id %s from peerinfo\n",
4608          GNUNET_i2s (peer));
4609     got_peer (sub, peer);
4610   }
4611 }
4612
4613
4614 /**
4615  * Task run during shutdown.
4616  *
4617  * @param cls Closure - unused
4618  */
4619 static void
4620 shutdown_task (void *cls)
4621 {
4622   (void) cls;
4623   struct ClientContext *client_ctx;
4624
4625   LOG (GNUNET_ERROR_TYPE_DEBUG,
4626        "RPS service is going down\n");
4627
4628   /* Clean all clients */
4629   for (client_ctx = cli_ctx_head;
4630        NULL != cli_ctx_head;
4631        client_ctx = cli_ctx_head)
4632   {
4633     destroy_cli_ctx (client_ctx);
4634   }
4635   if (NULL != msub)
4636   {
4637     destroy_sub (msub);
4638     msub = NULL;
4639   }
4640
4641   /* Disconnect from other services */
4642   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4643   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4644   peerinfo_handle = NULL;
4645   GNUNET_NSE_disconnect (nse);
4646   if (NULL != map_single_hop)
4647   {
4648     /* core_init was called - core was initialised */
4649     /* disconnect first, so no callback tries to access missing peermap */
4650     GNUNET_CORE_disconnect (core_handle);
4651     core_handle = NULL;
4652     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4653     map_single_hop = NULL;
4654   }
4655
4656   if (NULL != stats)
4657   {
4658     GNUNET_STATISTICS_destroy (stats,
4659                                GNUNET_NO);
4660     stats = NULL;
4661   }
4662   GNUNET_CADET_disconnect (cadet_handle);
4663   cadet_handle = NULL;
4664 #if ENABLE_MALICIOUS
4665   struct AttackedPeer *tmp_att_peer;
4666   GNUNET_array_grow (mal_peers,
4667                      num_mal_peers,
4668                      0);
4669   if (NULL != mal_peer_set)
4670     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4671   if (NULL != att_peer_set)
4672     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4673   while (NULL != att_peers_head)
4674   {
4675     tmp_att_peer = att_peers_head;
4676     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4677                                  att_peers_tail,
4678                                  tmp_att_peer);
4679     GNUNET_free (tmp_att_peer);
4680   }
4681 #endif /* ENABLE_MALICIOUS */
4682   close_all_files();
4683 }
4684
4685
4686 /**
4687  * Handle client connecting to the service.
4688  *
4689  * @param cls unused
4690  * @param client the new client
4691  * @param mq the message queue of @a client
4692  * @return @a client
4693  */
4694 static void *
4695 client_connect_cb (void *cls,
4696                    struct GNUNET_SERVICE_Client *client,
4697                    struct GNUNET_MQ_Handle *mq)
4698 {
4699   struct ClientContext *cli_ctx;
4700   (void) cls;
4701
4702   LOG (GNUNET_ERROR_TYPE_DEBUG,
4703        "Client connected\n");
4704   if (NULL == client)
4705     return client; /* Server was destroyed before a client connected. Shutting down */
4706   cli_ctx = GNUNET_new (struct ClientContext);
4707   cli_ctx->mq = mq;
4708   cli_ctx->view_updates_left = -1;
4709   cli_ctx->stream_update = GNUNET_NO;
4710   cli_ctx->client = client;
4711   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4712                                cli_ctx_tail,
4713                                cli_ctx);
4714   return cli_ctx;
4715 }
4716
4717 /**
4718  * Callback called when a client disconnected from the service
4719  *
4720  * @param cls closure for the service
4721  * @param c the client that disconnected
4722  * @param internal_cls should be equal to @a c
4723  */
4724 static void
4725 client_disconnect_cb (void *cls,
4726                       struct GNUNET_SERVICE_Client *client,
4727                       void *internal_cls)
4728 {
4729   struct ClientContext *cli_ctx = internal_cls;
4730
4731   (void) cls;
4732   GNUNET_assert (client == cli_ctx->client);
4733   if (NULL == client)
4734   {/* shutdown task - destroy all clients */
4735     while (NULL != cli_ctx_head)
4736       destroy_cli_ctx (cli_ctx_head);
4737   }
4738   else
4739   { /* destroy this client */
4740     LOG (GNUNET_ERROR_TYPE_DEBUG,
4741         "Client disconnected. Destroy its context.\n");
4742     destroy_cli_ctx (cli_ctx);
4743   }
4744 }
4745
4746
4747 /**
4748  * Handle random peer sampling clients.
4749  *
4750  * @param cls closure
4751  * @param c configuration to use
4752  * @param service the initialized service
4753  */
4754 static void
4755 run (void *cls,
4756      const struct GNUNET_CONFIGURATION_Handle *c,
4757      struct GNUNET_SERVICE_Handle *service)
4758 {
4759   struct GNUNET_TIME_Relative round_interval;
4760   long long unsigned int sampler_size;
4761   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4762   struct GNUNET_HashCode hash;
4763
4764   (void) cls;
4765   (void) service;
4766
4767   GNUNET_log_setup ("rps",
4768                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4769                     NULL);
4770   cfg = c;
4771   /* Get own ID */
4772   GNUNET_CRYPTO_get_peer_identity (cfg,
4773                                    &own_identity); // TODO check return value
4774   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4775               "STARTING SERVICE (rps) for peer [%s]\n",
4776               GNUNET_i2s (&own_identity));
4777 #if ENABLE_MALICIOUS
4778   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4779               "Malicious execution compiled in.\n");
4780 #endif /* ENABLE_MALICIOUS */
4781
4782   /* Get time interval from the configuration */
4783   if (GNUNET_OK !=
4784       GNUNET_CONFIGURATION_get_value_time (cfg,
4785                                            "RPS",
4786                                            "ROUNDINTERVAL",
4787                                            &round_interval))
4788   {
4789     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4790                                "RPS", "ROUNDINTERVAL");
4791     GNUNET_SCHEDULER_shutdown ();
4792     return;
4793   }
4794
4795   /* Get initial size of sampler/view from the configuration */
4796   if (GNUNET_OK !=
4797       GNUNET_CONFIGURATION_get_value_number (cfg,
4798                                              "RPS",
4799                                              "MINSIZE",
4800                                              &sampler_size))
4801   {
4802     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4803                                "RPS", "MINSIZE");
4804     GNUNET_SCHEDULER_shutdown ();
4805     return;
4806   }
4807
4808   cadet_handle = GNUNET_CADET_connect (cfg);
4809   GNUNET_assert (NULL != cadet_handle);
4810   core_handle = GNUNET_CORE_connect (cfg,
4811                                      NULL, /* cls */
4812                                      core_init, /* init */
4813                                      core_connects, /* connects */
4814                                      core_disconnects, /* disconnects */
4815                                      NULL); /* handlers */
4816   GNUNET_assert (NULL != core_handle);
4817
4818
4819   alpha = 0.45;
4820   beta  = 0.45;
4821
4822
4823   /* Set up main Sub */
4824   GNUNET_CRYPTO_hash (hash_port_string,
4825                       strlen (hash_port_string),
4826                       &hash);
4827   msub = new_sub (&hash,
4828                  sampler_size, /* Will be overwritten by config */
4829                  round_interval);
4830
4831
4832   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4833
4834   /* connect to NSE */
4835   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4836
4837   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4838   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4839   // TODO send push/pull to each of those peers?
4840   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4841   restore_valid_peers (msub);
4842   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4843
4844   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4845                                                    GNUNET_NO,
4846                                                    process_peerinfo_peers,
4847                                                    msub);
4848
4849   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4850
4851   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4852   stats = GNUNET_STATISTICS_create ("rps", cfg);
4853 }
4854
4855
4856 /**
4857  * Define "main" method using service macro.
4858  */
4859 GNUNET_SERVICE_MAIN
4860 ("rps",
4861  GNUNET_SERVICE_OPTION_NONE,
4862  &run,
4863  &client_connect_cb,
4864  &client_disconnect_cb,
4865  NULL,
4866  GNUNET_MQ_hd_var_size (client_seed,
4867    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4868    struct GNUNET_RPS_CS_SeedMessage,
4869    NULL),
4870 #if ENABLE_MALICIOUS
4871  GNUNET_MQ_hd_var_size (client_act_malicious,
4872    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4873    struct GNUNET_RPS_CS_ActMaliciousMessage,
4874    NULL),
4875 #endif /* ENABLE_MALICIOUS */
4876  GNUNET_MQ_hd_fixed_size (client_view_request,
4877    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4878    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4879    NULL),
4880  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4881    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4882    struct GNUNET_MessageHeader,
4883    NULL),
4884  GNUNET_MQ_hd_fixed_size (client_stream_request,
4885    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4886    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4887    NULL),
4888  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4889    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4890    struct GNUNET_MessageHeader,
4891    NULL),
4892  GNUNET_MQ_hd_fixed_size (client_start_sub,
4893    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4894    struct GNUNET_RPS_CS_SubStartMessage,
4895    NULL),
4896  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4897    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4898    struct GNUNET_RPS_CS_SubStopMessage,
4899    NULL),
4900  GNUNET_MQ_handler_end());
4901
4902 /* end of gnunet-service-rps.c */