RPS: NULL out cadet port after closing
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357   /**
358    * Name to log view to
359    */
360   char *file_name_view_log;
361
362 #ifdef TO_FILE
363   /**
364    * Name to log number of observed peers to
365    */
366   char *file_name_observed_log;
367
368   /**
369    * @brief Count the observed peers
370    */
371   uint32_t num_observed_peers;
372
373   /**
374    * @brief Multipeermap (ab-) used to count unique peer_ids
375    */
376   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
377 #endif /* TO_FILE */
378
379   /**
380    * List to store peers received through pushes temporary.
381    */
382   struct CustomPeerMap *push_map;
383
384   /**
385    * List to store peers received through pulls temporary.
386    */
387   struct CustomPeerMap *pull_map;
388
389   /**
390    * @brief This is the estimate used as view size.
391    *
392    * It is initialised with the minimum
393    */
394   unsigned int view_size_est_need;
395
396   /**
397    * @brief This is the minimum estimate used as view size.
398    *
399    * It is configured by the user.
400    */
401   unsigned int view_size_est_min;
402
403   /**
404    * @brief The view.
405    */
406   struct View *view;
407
408   /**
409    * Identifier for the main task that runs periodically.
410    */
411   struct GNUNET_SCHEDULER_Task *do_round_task;
412
413   /* === stats === */
414
415   /**
416    * @brief Counts the executed rounds.
417    */
418   uint32_t num_rounds;
419
420   /**
421    * @brief This array accumulates the number of received pushes per round.
422    *
423    * Number at index i represents the number of rounds with i observed pushes.
424    */
425   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
426
427   /**
428    * @brief Histogram of deltas between the expected and actual number of
429    * received pushes.
430    *
431    * As half of the entries are expected to be negative, this is shifted by
432    * #HISTOGRAM_FILE_SLOTS/2.
433    */
434   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
435
436   /**
437    * @brief Number of pull replies with this delay measured in rounds.
438    *
439    * Number at index i represents the number of pull replies with a delay of i
440    * rounds.
441    */
442   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
443 };
444
445
446 /***********************************************************************
447  * Globals
448 ***********************************************************************/
449
450 /**
451  * Our configuration.
452  */
453 static const struct GNUNET_CONFIGURATION_Handle *cfg;
454
455 /**
456  * Handle to the statistics service.
457  */
458 struct GNUNET_STATISTICS_Handle *stats;
459
460 /**
461  * Handler to CADET.
462  */
463 struct GNUNET_CADET_Handle *cadet_handle;
464
465 /**
466  * Handle to CORE
467  */
468 struct GNUNET_CORE_Handle *core_handle;
469
470 /**
471  * @brief PeerMap to keep track of connected peers.
472  */
473 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
474
475 /**
476  * Our own identity.
477  */
478 static struct GNUNET_PeerIdentity own_identity;
479
480 /**
481  * Percentage of total peer number in the view
482  * to send random PUSHes to
483  */
484 static float alpha;
485
486 /**
487  * Percentage of total peer number in the view
488  * to send random PULLs to
489  */
490 static float beta;
491
492 /**
493  * Handler to NSE.
494  */
495 static struct GNUNET_NSE_Handle *nse;
496
497 /**
498  * Handler to PEERINFO.
499  */
500 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
501
502 /**
503  * Handle for cancellation of iteration over peers.
504  */
505 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
506
507
508 #if ENABLE_MALICIOUS
509 /**
510  * Type of malicious peer
511  *
512  * 0 Don't act malicious at all - Default
513  * 1 Try to maximise representation
514  * 2 Try to partition the network
515  * 3 Combined attack
516  */
517 static uint32_t mal_type;
518
519 /**
520  * Other malicious peers
521  */
522 static struct GNUNET_PeerIdentity *mal_peers;
523
524 /**
525  * Hashmap of malicious peers used as set.
526  * Used to more efficiently check whether we know that peer.
527  */
528 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
529
530 /**
531  * Number of other malicious peers
532  */
533 static uint32_t num_mal_peers;
534
535
536 /**
537  * If type is 2 this is the DLL of attacked peers
538  */
539 static struct AttackedPeer *att_peers_head;
540 static struct AttackedPeer *att_peers_tail;
541
542 /**
543  * This index is used to point to an attacked peer to
544  * implement the round-robin-ish way to select attacked peers.
545  */
546 static struct AttackedPeer *att_peer_index;
547
548 /**
549  * Hashmap of attacked peers used as set.
550  * Used to more efficiently check whether we know that peer.
551  */
552 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
553
554 /**
555  * Number of attacked peers
556  */
557 static uint32_t num_attacked_peers;
558
559 /**
560  * If type is 1 this is the attacked peer
561  */
562 static struct GNUNET_PeerIdentity attacked_peer;
563
564 /**
565  * The limit of PUSHes we can send in one round.
566  * This is an assumption of the Brahms protocol and either implemented
567  * via proof of work
568  * or
569  * assumend to be the bandwidth limitation.
570  */
571 static uint32_t push_limit = 10000;
572 #endif /* ENABLE_MALICIOUS */
573
574 /**
575  * @brief Main Sub.
576  *
577  * This is run in any case by all peers and connects to all peers without
578  * specifying a shared value.
579  */
580 static struct Sub *msub;
581
582 /**
583  * @brief Maximum number of valid peers to keep.
584  * TODO read from config
585  */
586 static const uint32_t num_valid_peers_max = UINT32_MAX;
587
588 /***********************************************************************
589  * /Globals
590 ***********************************************************************/
591
592
593 static void
594 do_round (void *cls);
595
596 static void
597 do_mal_round (void *cls);
598
599
600 /**
601  * @brief Get the #PeerContext associated with a peer
602  *
603  * @param peer_map The peer map containing the context
604  * @param peer the peer id
605  *
606  * @return the #PeerContext
607  */
608 static struct PeerContext *
609 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
610               const struct GNUNET_PeerIdentity *peer)
611 {
612   struct PeerContext *ctx;
613   int ret;
614
615   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
616   GNUNET_assert (GNUNET_YES == ret);
617   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
618   GNUNET_assert (NULL != ctx);
619   return ctx;
620 }
621
622 /**
623  * @brief Check whether we have information about the given peer.
624  *
625  * FIXME probably deprecated. Make this the new _online.
626  *
627  * @param peer_map The peer map to check for the existence of @a peer
628  * @param peer peer in question
629  *
630  * @return #GNUNET_YES if peer is known
631  *         #GNUNET_NO  if peer is not knwon
632  */
633 static int
634 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
635                   const struct GNUNET_PeerIdentity *peer)
636 {
637   if (NULL != peer_map)
638   {
639     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
640   }
641   else
642   {
643     return GNUNET_NO;
644   }
645 }
646
647
648 /**
649  * @brief Create a new #PeerContext and insert it into the peer map
650  *
651  * @param sub The Sub this context belongs to.
652  * @param peer the peer to create the #PeerContext for
653  *
654  * @return the #PeerContext
655  */
656 static struct PeerContext *
657 create_peer_ctx (struct Sub *sub,
658                  const struct GNUNET_PeerIdentity *peer)
659 {
660   struct PeerContext *ctx;
661   int ret;
662
663   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
664
665   ctx = GNUNET_new (struct PeerContext);
666   ctx->peer_id = *peer;
667   ctx->sub = sub;
668   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
669       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
670   GNUNET_assert (GNUNET_OK == ret);
671   if (sub == msub)
672   {
673     GNUNET_STATISTICS_set (stats,
674                           "# known peers",
675                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
676                           GNUNET_NO);
677   }
678   return ctx;
679 }
680
681
682 /**
683  * @brief Create or get a #PeerContext
684  *
685  * @param sub The Sub to which the created context belongs to
686  * @param peer the peer to get the associated context to
687  *
688  * @return the context
689  */
690 static struct PeerContext *
691 create_or_get_peer_ctx (struct Sub *sub,
692                         const struct GNUNET_PeerIdentity *peer)
693 {
694   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
695   {
696     return create_peer_ctx (sub, peer);
697   }
698   return get_peer_ctx (sub->peer_map, peer);
699 }
700
701
702 /**
703  * @brief Check whether we have a connection to this @a peer
704  *
705  * Also sets the #Peers_ONLINE flag accordingly
706  *
707  * @param peer_ctx Context of the peer of which connectivity is to be checked
708  *
709  * @return #GNUNET_YES if we are connected
710  *         #GNUNET_NO  otherwise
711  */
712 static int
713 check_connected (struct PeerContext *peer_ctx)
714 {
715   /* If we don't know about this peer we don't know whether it's online */
716   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
717                                      &peer_ctx->peer_id))
718   {
719     return GNUNET_NO;
720   }
721   /* Get the context */
722   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
723   /* If we have no channel to this peer we don't know whether it's online */
724   if ( (NULL == peer_ctx->send_channel_ctx) &&
725        (NULL == peer_ctx->recv_channel_ctx) )
726   {
727     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
728     return GNUNET_NO;
729   }
730   /* Otherwise (if we have a channel, we know that it's online */
731   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732   return GNUNET_YES;
733 }
734
735
736 /**
737  * @brief The closure to #get_rand_peer_iterator.
738  */
739 struct GetRandPeerIteratorCls
740 {
741   /**
742    * @brief The index of the peer to return.
743    * Will be decreased until 0.
744    * Then current peer is returned.
745    */
746   uint32_t index;
747
748   /**
749    * @brief Pointer to peer to return.
750    */
751   const struct GNUNET_PeerIdentity *peer;
752 };
753
754
755 /**
756  * @brief Iterator function for #get_random_peer_from_peermap.
757  *
758  * Implements #GNUNET_CONTAINER_PeerMapIterator.
759  * Decreases the index until the index is null.
760  * Then returns the current peer.
761  *
762  * @param cls the #GetRandPeerIteratorCls containing index and peer
763  * @param peer current peer
764  * @param value unused
765  *
766  * @return  #GNUNET_YES if we should continue to
767  *          iterate,
768  *          #GNUNET_NO if not.
769  */
770 static int
771 get_rand_peer_iterator (void *cls,
772                         const struct GNUNET_PeerIdentity *peer,
773                         void *value)
774 {
775   struct GetRandPeerIteratorCls *iterator_cls = cls;
776   (void) value;
777
778   if (0 >= iterator_cls->index)
779   {
780     iterator_cls->peer = peer;
781     return GNUNET_NO;
782   }
783   iterator_cls->index--;
784   return GNUNET_YES;
785 }
786
787
788 /**
789  * @brief Get a random peer from @a peer_map
790  *
791  * @param valid_peers Peer map containing valid peers from which to select a
792  * random one
793  *
794  * @return a random peer
795  */
796 static const struct GNUNET_PeerIdentity *
797 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
798 {
799   struct GetRandPeerIteratorCls *iterator_cls;
800   const struct GNUNET_PeerIdentity *ret;
801
802   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
803   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
804       GNUNET_CONTAINER_multipeermap_size (valid_peers));
805   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
806                                                 get_rand_peer_iterator,
807                                                 iterator_cls);
808   ret = iterator_cls->peer;
809   GNUNET_free (iterator_cls);
810   return ret;
811 }
812
813
814 /**
815  * @brief Add a given @a peer to valid peers.
816  *
817  * If valid peers are already #num_valid_peers_max, delete a peer previously.
818  *
819  * @param peer The peer that is added to the valid peers.
820  * @param valid_peers Peer map of valid peers to which to add the @a peer
821  *
822  * @return #GNUNET_YES if no other peer had to be removed
823  *         #GNUNET_NO  otherwise
824  */
825 static int
826 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
827                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
828 {
829   const struct GNUNET_PeerIdentity *rand_peer;
830   int ret;
831
832   ret = GNUNET_YES;
833   /* Remove random peers until there is space for a new one */
834   while (num_valid_peers_max <=
835          GNUNET_CONTAINER_multipeermap_size (valid_peers))
836   {
837     rand_peer = get_random_peer_from_peermap (valid_peers);
838     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
839     ret = GNUNET_NO;
840   }
841   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
842       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
843   if (valid_peers == msub->valid_peers)
844   {
845     GNUNET_STATISTICS_set (stats,
846                            "# valid peers",
847                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
848                            GNUNET_NO);
849   }
850   return ret;
851 }
852
853 static void
854 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
855
856 /**
857  * @brief Set the peer flag to living and
858  *        call the pending operations on this peer.
859  *
860  * Also adds peer to #valid_peers.
861  *
862  * @param peer_ctx the #PeerContext of the peer to set online
863  */
864 static void
865 set_peer_online (struct PeerContext *peer_ctx)
866 {
867   struct GNUNET_PeerIdentity *peer;
868   unsigned int i;
869
870   peer = &peer_ctx->peer_id;
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872       "Peer %s is online and valid, calling %i pending operations on it\n",
873       GNUNET_i2s (peer),
874       peer_ctx->num_pending_ops);
875
876   if (NULL != peer_ctx->online_check_pending)
877   {
878     LOG (GNUNET_ERROR_TYPE_DEBUG,
879          "Removing pending online check for peer %s\n",
880          GNUNET_i2s (&peer_ctx->peer_id));
881     // TODO wait until cadet sets mq->cancel_impl
882     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
883     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
884     peer_ctx->online_check_pending = NULL;
885   }
886
887   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
888
889   /* Call pending operations */
890   for (i = 0; i < peer_ctx->num_pending_ops; i++)
891   {
892     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
893   }
894   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
895 }
896
897 static void
898 cleanup_destroyed_channel (void *cls,
899                            const struct GNUNET_CADET_Channel *channel);
900
901 /* Declaration of handlers */
902 static void
903 handle_peer_check (void *cls,
904                    const struct GNUNET_MessageHeader *msg);
905
906 static void
907 handle_peer_push (void *cls,
908                   const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_pull_request (void *cls,
912                           const struct GNUNET_MessageHeader *msg);
913
914 static int
915 check_peer_pull_reply (void *cls,
916                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
917
918 static void
919 handle_peer_pull_reply (void *cls,
920                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 /* End declaration of handlers */
923
924 /**
925  * @brief Allocate memory for a new channel context and insert it into DLL
926  *
927  * @param peer_ctx context of the according peer
928  *
929  * @return The channel context
930  */
931 static struct ChannelCtx *
932 add_channel_ctx (struct PeerContext *peer_ctx)
933 {
934   struct ChannelCtx *channel_ctx;
935   channel_ctx = GNUNET_new (struct ChannelCtx);
936   channel_ctx->peer_ctx = peer_ctx;
937   return channel_ctx;
938 }
939
940
941 /**
942  * @brief Free memory and NULL pointers.
943  *
944  * @param channel_ctx The channel context.
945  */
946 static void
947 remove_channel_ctx (struct ChannelCtx *channel_ctx)
948 {
949   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
950
951   if (NULL != channel_ctx->destruction_task)
952   {
953     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
954     channel_ctx->destruction_task = NULL;
955   }
956
957   GNUNET_free (channel_ctx);
958
959   if (NULL == peer_ctx) return;
960   if (channel_ctx == peer_ctx->send_channel_ctx)
961   {
962     peer_ctx->send_channel_ctx = NULL;
963     peer_ctx->mq = NULL;
964   }
965   else if (channel_ctx == peer_ctx->recv_channel_ctx)
966   {
967     peer_ctx->recv_channel_ctx = NULL;
968   }
969 }
970
971
972 /**
973  * @brief Get the channel of a peer. If not existing, create.
974  *
975  * @param peer_ctx Context of the peer of which to get the channel
976  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
977  */
978 struct GNUNET_CADET_Channel *
979 get_channel (struct PeerContext *peer_ctx)
980 {
981   /* There exists a copy-paste-clone in run() */
982   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
983     GNUNET_MQ_hd_fixed_size (peer_check,
984                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
985                              struct GNUNET_MessageHeader,
986                              NULL),
987     GNUNET_MQ_hd_fixed_size (peer_push,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_pull_request,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_var_size (peer_pull_reply,
996                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
997                            struct GNUNET_RPS_P2P_PullReplyMessage,
998                            NULL),
999     GNUNET_MQ_handler_end ()
1000   };
1001
1002
1003   if (NULL == peer_ctx->send_channel_ctx)
1004   {
1005     LOG (GNUNET_ERROR_TYPE_DEBUG,
1006          "Trying to establish channel to peer %s\n",
1007          GNUNET_i2s (&peer_ctx->peer_id));
1008     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1009     peer_ctx->send_channel_ctx->channel =
1010       GNUNET_CADET_channel_create (cadet_handle,
1011                                    peer_ctx->send_channel_ctx, /* context */
1012                                    &peer_ctx->peer_id,
1013                                    &peer_ctx->sub->hash,
1014                                    GNUNET_CADET_OPTION_RELIABLE,
1015                                    NULL, /* WindowSize handler */
1016                                    &cleanup_destroyed_channel, /* Disconnect handler */
1017                                    cadet_handlers);
1018   }
1019   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1020   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1021   return peer_ctx->send_channel_ctx->channel;
1022 }
1023
1024
1025 /**
1026  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1027  *
1028  * If we already have a message queue open to this client,
1029  * simply return it, otherways create one.
1030  *
1031  * @param peer_ctx Context of the peer of whicht to get the mq
1032  * @return the #GNUNET_MQ_Handle
1033  */
1034 static struct GNUNET_MQ_Handle *
1035 get_mq (struct PeerContext *peer_ctx)
1036 {
1037   if (NULL == peer_ctx->mq)
1038   {
1039     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1040   }
1041   return peer_ctx->mq;
1042 }
1043
1044 /**
1045  * @brief Add an envelope to a message passed to mq to list of pending messages
1046  *
1047  * @param peer_ctx Context of the peer for which to insert the envelope
1048  * @param ev envelope to the message
1049  * @param type type of the message to be sent
1050  * @return pointer to pending message
1051  */
1052 static struct PendingMessage *
1053 insert_pending_message (struct PeerContext *peer_ctx,
1054                         struct GNUNET_MQ_Envelope *ev,
1055                         const char *type)
1056 {
1057   struct PendingMessage *pending_msg;
1058
1059   pending_msg = GNUNET_new (struct PendingMessage);
1060   pending_msg->ev = ev;
1061   pending_msg->peer_ctx = peer_ctx;
1062   pending_msg->type = type;
1063   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1064                                peer_ctx->pending_messages_tail,
1065                                pending_msg);
1066   return pending_msg;
1067 }
1068
1069
1070 /**
1071  * @brief Remove a pending message from the respective DLL
1072  *
1073  * @param pending_msg the pending message to remove
1074  * @param cancel whether to cancel the pending message, too
1075  */
1076 static void
1077 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1078 {
1079   struct PeerContext *peer_ctx;
1080   (void) cancel;
1081
1082   peer_ctx = pending_msg->peer_ctx;
1083   GNUNET_assert (NULL != peer_ctx);
1084   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1085                                peer_ctx->pending_messages_tail,
1086                                pending_msg);
1087   // TODO wait for the cadet implementation of message cancellation
1088   //if (GNUNET_YES == cancel)
1089   //{
1090   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1091   //}
1092   GNUNET_free (pending_msg);
1093 }
1094
1095
1096 /**
1097  * @brief This is called in response to the first message we sent as a
1098  * online check.
1099  *
1100  * @param cls #PeerContext of peer with pending online check
1101  */
1102 static void
1103 mq_online_check_successful (void *cls)
1104 {
1105   struct PeerContext *peer_ctx = cls;
1106
1107   if (NULL != peer_ctx->online_check_pending)
1108   {
1109     LOG (GNUNET_ERROR_TYPE_DEBUG,
1110         "Online check for peer %s was successfull\n",
1111         GNUNET_i2s (&peer_ctx->peer_id));
1112     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1113     peer_ctx->online_check_pending = NULL;
1114     set_peer_online (peer_ctx);
1115     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1116   }
1117 }
1118
1119 /**
1120  * Issue a check whether peer is online
1121  *
1122  * @param peer_ctx the context of the peer
1123  */
1124 static void
1125 check_peer_online (struct PeerContext *peer_ctx)
1126 {
1127   LOG (GNUNET_ERROR_TYPE_DEBUG,
1128        "Get informed about peer %s getting online\n",
1129        GNUNET_i2s (&peer_ctx->peer_id));
1130
1131   struct GNUNET_MQ_Handle *mq;
1132   struct GNUNET_MQ_Envelope *ev;
1133
1134   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1135   peer_ctx->online_check_pending =
1136     insert_pending_message (peer_ctx, ev, "Check online");
1137   mq = get_mq (peer_ctx);
1138   GNUNET_MQ_notify_sent (ev,
1139                          mq_online_check_successful,
1140                          peer_ctx);
1141   GNUNET_MQ_send (mq, ev);
1142   if (peer_ctx->sub == msub)
1143   {
1144     GNUNET_STATISTICS_update (stats,
1145                               "# pending online checks",
1146                               1,
1147                               GNUNET_NO);
1148   }
1149 }
1150
1151
1152 /**
1153  * @brief Check whether function of type #PeerOp was already scheduled
1154  *
1155  * The array with pending operations will probably never grow really big, so
1156  * iterating over it should be ok.
1157  *
1158  * @param peer_ctx Context of the peer to check for the operation
1159  * @param peer_op the operation (#PeerOp) on the peer
1160  *
1161  * @return #GNUNET_YES if this operation is scheduled on that peer
1162  *         #GNUNET_NO  otherwise
1163  */
1164 static int
1165 check_operation_scheduled (const struct PeerContext *peer_ctx,
1166                            const PeerOp peer_op)
1167 {
1168   unsigned int i;
1169
1170   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1171     if (peer_op == peer_ctx->pending_ops[i].op)
1172       return GNUNET_YES;
1173   return GNUNET_NO;
1174 }
1175
1176
1177 /**
1178  * @brief Callback for scheduler to destroy a channel
1179  *
1180  * @param cls Context of the channel
1181  */
1182 static void
1183 destroy_channel (struct ChannelCtx *channel_ctx)
1184 {
1185   struct GNUNET_CADET_Channel *channel;
1186
1187   if (NULL != channel_ctx->destruction_task)
1188   {
1189     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1190     channel_ctx->destruction_task = NULL;
1191   }
1192   GNUNET_assert (channel_ctx->channel != NULL);
1193   channel = channel_ctx->channel;
1194   channel_ctx->channel = NULL;
1195   GNUNET_CADET_channel_destroy (channel);
1196   remove_channel_ctx (channel_ctx);
1197 }
1198
1199
1200 /**
1201  * @brief Destroy a cadet channel.
1202  *
1203  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1204  *
1205  * @param cls
1206  */
1207 static void
1208 destroy_channel_cb (void *cls)
1209 {
1210   struct ChannelCtx *channel_ctx = cls;
1211
1212   channel_ctx->destruction_task = NULL;
1213   destroy_channel (channel_ctx);
1214 }
1215
1216
1217 /**
1218  * @brief Schedule the destruction of a channel for immediately afterwards.
1219  *
1220  * In case a channel is to be destroyed from within the callback to the
1221  * destruction of another channel (send channel), we cannot call
1222  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1223  * construction.
1224  *
1225  * @param channel_ctx channel to be destroyed.
1226  */
1227 static void
1228 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1229 {
1230   GNUNET_assert (NULL ==
1231                  channel_ctx->destruction_task);
1232   GNUNET_assert (NULL !=
1233                  channel_ctx->channel);
1234   channel_ctx->destruction_task =
1235     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1236                               channel_ctx);
1237 }
1238
1239
1240 /**
1241  * @brief Remove peer
1242  *
1243  * - Empties the list with pending operations
1244  * - Empties the list with pending messages
1245  * - Cancels potentially existing online check
1246  * - Schedules closing of send and recv channels
1247  * - Removes peer from peer map
1248  *
1249  * @param peer_ctx Context of the peer to be destroyed
1250  * @return #GNUNET_YES if peer was removed
1251  *         #GNUNET_NO  otherwise
1252  */
1253 static int
1254 destroy_peer (struct PeerContext *peer_ctx)
1255 {
1256   GNUNET_assert (NULL != peer_ctx);
1257   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1258   if (GNUNET_NO ==
1259       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1260                                               &peer_ctx->peer_id))
1261   {
1262     return GNUNET_NO;
1263   }
1264   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1265   LOG (GNUNET_ERROR_TYPE_DEBUG,
1266        "Going to remove peer %s\n",
1267        GNUNET_i2s (&peer_ctx->peer_id));
1268   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1269
1270   /* Clear list of pending operations */
1271   // TODO this probably leaks memory
1272   //      ('only' the cls to the function. Not sure what to do with it)
1273   GNUNET_array_grow (peer_ctx->pending_ops,
1274                      peer_ctx->num_pending_ops,
1275                      0);
1276   /* Remove all pending messages */
1277   while (NULL != peer_ctx->pending_messages_head)
1278   {
1279     LOG (GNUNET_ERROR_TYPE_DEBUG,
1280          "Removing unsent %s\n",
1281          peer_ctx->pending_messages_head->type);
1282     /* Cancle pending message, too */
1283     if ( (NULL != peer_ctx->online_check_pending) &&
1284          (0 == memcmp (peer_ctx->pending_messages_head,
1285                      peer_ctx->online_check_pending,
1286                      sizeof (struct PendingMessage))) )
1287       {
1288         peer_ctx->online_check_pending = NULL;
1289         if (peer_ctx->sub == msub)
1290         {
1291           GNUNET_STATISTICS_update (stats,
1292                                     "# pending online checks",
1293                                     -1,
1294                                     GNUNET_NO);
1295         }
1296       }
1297     remove_pending_message (peer_ctx->pending_messages_head,
1298                             GNUNET_YES);
1299   }
1300
1301   /* If we are still waiting for notification whether this peer is online
1302    * cancel the according task */
1303   if (NULL != peer_ctx->online_check_pending)
1304   {
1305     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306                 "Removing pending online check for peer %s\n",
1307                 GNUNET_i2s (&peer_ctx->peer_id));
1308     // TODO wait until cadet sets mq->cancel_impl
1309     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1310     remove_pending_message (peer_ctx->online_check_pending,
1311                             GNUNET_YES);
1312     peer_ctx->online_check_pending = NULL;
1313   }
1314
1315   if (NULL != peer_ctx->send_channel_ctx)
1316   {
1317     /* This is possibly called from within channel destruction */
1318     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1319     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1320     peer_ctx->send_channel_ctx = NULL;
1321     peer_ctx->mq = NULL;
1322   }
1323   if (NULL != peer_ctx->recv_channel_ctx)
1324   {
1325     /* This is possibly called from within channel destruction */
1326     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1327     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1328     peer_ctx->recv_channel_ctx = NULL;
1329   }
1330
1331   if (GNUNET_YES !=
1332       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1333                                                 &peer_ctx->peer_id))
1334   {
1335     LOG (GNUNET_ERROR_TYPE_WARNING,
1336          "removing peer from peer_ctx->sub->peer_map failed\n");
1337   }
1338   if (peer_ctx->sub == msub)
1339   {
1340     GNUNET_STATISTICS_set (stats,
1341                           "# known peers",
1342                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1343                           GNUNET_NO);
1344   }
1345   GNUNET_free (peer_ctx);
1346   return GNUNET_YES;
1347 }
1348
1349
1350 /**
1351  * Iterator over hash map entries. Deletes all contexts of peers.
1352  *
1353  * @param cls closure
1354  * @param key current public key
1355  * @param value value in the hash map
1356  * @return #GNUNET_YES if we should continue to iterate,
1357  *         #GNUNET_NO if not.
1358  */
1359 static int
1360 peermap_clear_iterator (void *cls,
1361                         const struct GNUNET_PeerIdentity *key,
1362                         void *value)
1363 {
1364   struct Sub *sub = cls;
1365   (void) value;
1366
1367   destroy_peer (get_peer_ctx (sub->peer_map, key));
1368   return GNUNET_YES;
1369 }
1370
1371
1372 /**
1373  * @brief This is called once a message is sent.
1374  *
1375  * Removes the pending message
1376  *
1377  * @param cls type of the message that was sent
1378  */
1379 static void
1380 mq_notify_sent_cb (void *cls)
1381 {
1382   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1383   LOG (GNUNET_ERROR_TYPE_DEBUG,
1384       "%s was sent.\n",
1385       pending_msg->type);
1386   if (pending_msg->peer_ctx->sub == msub)
1387   {
1388     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1389       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1390     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1391       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1392     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1393       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1395                       NULL != map_single_hop &&
1396         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1397           &pending_msg->peer_ctx->peer_id))
1398       GNUNET_STATISTICS_update(stats,
1399                                "# pull requests sent (multi-hop peer)",
1400                                1,
1401                                GNUNET_NO);
1402   }
1403   /* Do not cancle message */
1404   remove_pending_message (pending_msg, GNUNET_NO);
1405 }
1406
1407
1408 /**
1409  * @brief Iterator function for #store_valid_peers.
1410  *
1411  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1412  * Writes single peer to disk.
1413  *
1414  * @param cls the file handle to write to.
1415  * @param peer current peer
1416  * @param value unused
1417  *
1418  * @return  #GNUNET_YES if we should continue to
1419  *          iterate,
1420  *          #GNUNET_NO if not.
1421  */
1422 static int
1423 store_peer_presistently_iterator (void *cls,
1424                                   const struct GNUNET_PeerIdentity *peer,
1425                                   void *value)
1426 {
1427   const struct GNUNET_DISK_FileHandle *fh = cls;
1428   char peer_string[128];
1429   int size;
1430   ssize_t ret;
1431   (void) value;
1432
1433   if (NULL == peer)
1434   {
1435     return GNUNET_YES;
1436   }
1437   size = GNUNET_snprintf (peer_string,
1438                           sizeof (peer_string),
1439                           "%s\n",
1440                           GNUNET_i2s_full (peer));
1441   GNUNET_assert (53 == size);
1442   ret = GNUNET_DISK_file_write (fh,
1443                                 peer_string,
1444                                 size);
1445   GNUNET_assert (size == ret);
1446   return GNUNET_YES;
1447 }
1448
1449
1450 /**
1451  * @brief Store the peers currently in #valid_peers to disk.
1452  *
1453  * @param sub Sub for which to store the valid peers
1454  */
1455 static void
1456 store_valid_peers (const struct Sub *sub)
1457 {
1458   struct GNUNET_DISK_FileHandle *fh;
1459   uint32_t number_written_peers;
1460   int ret;
1461
1462   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1463   {
1464     return;
1465   }
1466
1467   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1468   if (GNUNET_SYSERR == ret)
1469   {
1470     LOG (GNUNET_ERROR_TYPE_WARNING,
1471         "Not able to create directory for file `%s'\n",
1472         sub->filename_valid_peers);
1473     GNUNET_break (0);
1474   }
1475   else if (GNUNET_NO == ret)
1476   {
1477     LOG (GNUNET_ERROR_TYPE_WARNING,
1478         "Directory for file `%s' exists but is not writable for us\n",
1479         sub->filename_valid_peers);
1480     GNUNET_break (0);
1481   }
1482   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1483                               GNUNET_DISK_OPEN_WRITE |
1484                                   GNUNET_DISK_OPEN_CREATE,
1485                               GNUNET_DISK_PERM_USER_READ |
1486                                   GNUNET_DISK_PERM_USER_WRITE);
1487   if (NULL == fh)
1488   {
1489     LOG (GNUNET_ERROR_TYPE_WARNING,
1490         "Not able to write valid peers to file `%s'\n",
1491         sub->filename_valid_peers);
1492     return;
1493   }
1494   LOG (GNUNET_ERROR_TYPE_DEBUG,
1495       "Writing %u valid peers to disk\n",
1496       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1497   number_written_peers =
1498     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1499                                            store_peer_presistently_iterator,
1500                                            fh);
1501   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1502   GNUNET_assert (number_written_peers ==
1503       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1504 }
1505
1506
1507 /**
1508  * @brief Convert string representation of peer id to peer id.
1509  *
1510  * Counterpart to #GNUNET_i2s_full.
1511  *
1512  * @param string_repr The string representation of the peer id
1513  *
1514  * @return The peer id
1515  */
1516 static const struct GNUNET_PeerIdentity *
1517 s2i_full (const char *string_repr)
1518 {
1519   struct GNUNET_PeerIdentity *peer;
1520   size_t len;
1521   int ret;
1522
1523   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1524   len = strlen (string_repr);
1525   if (52 > len)
1526   {
1527     LOG (GNUNET_ERROR_TYPE_WARNING,
1528         "Not able to convert string representation of PeerID to PeerID\n"
1529         "Sting representation: %s (len %lu) - too short\n",
1530         string_repr,
1531         len);
1532     GNUNET_break (0);
1533   }
1534   else if (52 < len)
1535   {
1536     len = 52;
1537   }
1538   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1539                                                     len,
1540                                                     &peer->public_key);
1541   if (GNUNET_OK != ret)
1542   {
1543     LOG (GNUNET_ERROR_TYPE_WARNING,
1544         "Not able to convert string representation of PeerID to PeerID\n"
1545         "Sting representation: %s\n",
1546         string_repr);
1547     GNUNET_break (0);
1548   }
1549   return peer;
1550 }
1551
1552
1553 /**
1554  * @brief Restore the peers on disk to #valid_peers.
1555  *
1556  * @param sub Sub for which to restore the valid peers
1557  */
1558 static void
1559 restore_valid_peers (const struct Sub *sub)
1560 {
1561   off_t file_size;
1562   uint32_t num_peers;
1563   struct GNUNET_DISK_FileHandle *fh;
1564   char *buf;
1565   ssize_t size_read;
1566   char *iter_buf;
1567   char *str_repr;
1568   const struct GNUNET_PeerIdentity *peer;
1569
1570   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1571   {
1572     return;
1573   }
1574
1575   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1576   {
1577     return;
1578   }
1579   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1580                               GNUNET_DISK_OPEN_READ,
1581                               GNUNET_DISK_PERM_NONE);
1582   GNUNET_assert (NULL != fh);
1583   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1584   num_peers = file_size / 53;
1585   buf = GNUNET_malloc (file_size);
1586   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1587   GNUNET_assert (size_read == file_size);
1588   LOG (GNUNET_ERROR_TYPE_DEBUG,
1589       "Restoring %" PRIu32 " peers from file `%s'\n",
1590       num_peers,
1591       sub->filename_valid_peers);
1592   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1593   {
1594     str_repr = GNUNET_strndup (iter_buf, 53);
1595     peer = s2i_full (str_repr);
1596     GNUNET_free (str_repr);
1597     add_valid_peer (peer, sub->valid_peers);
1598     LOG (GNUNET_ERROR_TYPE_DEBUG,
1599         "Restored valid peer %s from disk\n",
1600         GNUNET_i2s_full (peer));
1601   }
1602   iter_buf = NULL;
1603   GNUNET_free (buf);
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1606       num_peers,
1607       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1608   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1609   {
1610     LOG (GNUNET_ERROR_TYPE_WARNING,
1611         "Number of restored peers does not match file size. Have probably duplicates.\n");
1612   }
1613   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1614   LOG (GNUNET_ERROR_TYPE_DEBUG,
1615       "Restored %u valid peers from disk\n",
1616       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1617 }
1618
1619
1620 /**
1621  * @brief Delete storage of peers that was created with #initialise_peers ()
1622  *
1623  * @param sub Sub for which the storage is deleted
1624  */
1625 static void
1626 peers_terminate (struct Sub *sub)
1627 {
1628   if (GNUNET_SYSERR ==
1629       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1630                                              &peermap_clear_iterator,
1631                                              sub))
1632   {
1633     LOG (GNUNET_ERROR_TYPE_WARNING,
1634         "Iteration destroying peers was aborted.\n");
1635   }
1636   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1637   sub->peer_map = NULL;
1638   store_valid_peers (sub);
1639   GNUNET_free (sub->filename_valid_peers);
1640   sub->filename_valid_peers = NULL;
1641   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1642   sub->valid_peers = NULL;
1643 }
1644
1645
1646 /**
1647  * Iterator over #valid_peers hash map entries.
1648  *
1649  * @param cls Closure that contains iterator function and closure
1650  * @param peer current peer id
1651  * @param value value in the hash map - unused
1652  * @return #GNUNET_YES if we should continue to
1653  *         iterate,
1654  *         #GNUNET_NO if not.
1655  */
1656 static int
1657 valid_peer_iterator (void *cls,
1658                      const struct GNUNET_PeerIdentity *peer,
1659                      void *value)
1660 {
1661   struct PeersIteratorCls *it_cls = cls;
1662   (void) value;
1663
1664   return it_cls->iterator (it_cls->cls, peer);
1665 }
1666
1667
1668 /**
1669  * @brief Get all currently known, valid peer ids.
1670  *
1671  * @param valid_peers Peer map containing the valid peers in question
1672  * @param iterator function to call on each peer id
1673  * @param it_cls extra argument to @a iterator
1674  * @return the number of key value pairs processed,
1675  *         #GNUNET_SYSERR if it aborted iteration
1676  */
1677 static int
1678 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1679                  PeersIterator iterator,
1680                  void *it_cls)
1681 {
1682   struct PeersIteratorCls *cls;
1683   int ret;
1684
1685   cls = GNUNET_new (struct PeersIteratorCls);
1686   cls->iterator = iterator;
1687   cls->cls = it_cls;
1688   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1689                                                valid_peer_iterator,
1690                                                cls);
1691   GNUNET_free (cls);
1692   return ret;
1693 }
1694
1695
1696 /**
1697  * @brief Add peer to known peers.
1698  *
1699  * This function is called on new peer_ids from 'external' sources
1700  * (client seed, cadet get_peers(), ...)
1701  *
1702  * @param sub Sub with the peer map that the @a peer will be added to
1703  * @param peer the new #GNUNET_PeerIdentity
1704  *
1705  * @return #GNUNET_YES if peer was inserted
1706  *         #GNUNET_NO  otherwise
1707  */
1708 static int
1709 insert_peer (struct Sub *sub,
1710              const struct GNUNET_PeerIdentity *peer)
1711 {
1712   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1713   {
1714     return GNUNET_NO; /* We already know this peer - nothing to do */
1715   }
1716   (void) create_peer_ctx (sub, peer);
1717   return GNUNET_YES;
1718 }
1719
1720
1721 /**
1722  * @brief Check whether flags on a peer are set.
1723  *
1724  * @param peer_map Peer map that is expected to contain the @a peer
1725  * @param peer the peer to check the flag of
1726  * @param flags the flags to check
1727  *
1728  * @return #GNUNET_SYSERR if peer is not known
1729  *         #GNUNET_YES    if all given flags are set
1730  *         #GNUNET_NO     otherwise
1731  */
1732 static int
1733 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1734                  const struct GNUNET_PeerIdentity *peer,
1735                  enum Peers_PeerFlags flags)
1736 {
1737   struct PeerContext *peer_ctx;
1738
1739   if (GNUNET_NO == check_peer_known (peer_map, peer))
1740   {
1741     return GNUNET_SYSERR;
1742   }
1743   peer_ctx = get_peer_ctx (peer_map, peer);
1744   return check_peer_flag_set (peer_ctx, flags);
1745 }
1746
1747 /**
1748  * @brief Try connecting to a peer to see whether it is online
1749  *
1750  * If not known yet, insert into known peers
1751  *
1752  * @param sub Sub which would contain the @a peer
1753  * @param peer the peer whose online is to be checked
1754  * @return #GNUNET_YES if the check was issued
1755  *         #GNUNET_NO  otherwise
1756  */
1757 static int
1758 issue_peer_online_check (struct Sub *sub,
1759                          const struct GNUNET_PeerIdentity *peer)
1760 {
1761   struct PeerContext *peer_ctx;
1762
1763   (void) insert_peer (sub, peer); // TODO even needed?
1764   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1765   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1766        (NULL == peer_ctx->online_check_pending) )
1767   {
1768     check_peer_online (peer_ctx);
1769     return GNUNET_YES;
1770   }
1771   return GNUNET_NO;
1772 }
1773
1774
1775 /**
1776  * @brief Check if peer is removable.
1777  *
1778  * Check if
1779  *  - a recv channel exists
1780  *  - there are pending messages
1781  *  - there is no pending pull reply
1782  *
1783  * @param peer_ctx Context of the peer in question
1784  * @return #GNUNET_YES    if peer is removable
1785  *         #GNUNET_NO     if peer is NOT removable
1786  *         #GNUNET_SYSERR if peer is not known
1787  */
1788 static int
1789 check_removable (const struct PeerContext *peer_ctx)
1790 {
1791   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1792                                                            &peer_ctx->peer_id))
1793   {
1794     return GNUNET_SYSERR;
1795   }
1796
1797   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1798        (NULL != peer_ctx->pending_messages_head) ||
1799        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1800   {
1801     return GNUNET_NO;
1802   }
1803   return GNUNET_YES;
1804 }
1805
1806
1807 /**
1808  * @brief Check whether @a peer is actually a peer.
1809  *
1810  * A valid peer is a peer that we know exists eg. we were connected to once.
1811  *
1812  * @param valid_peers Peer map that would contain the @a peer
1813  * @param peer peer in question
1814  *
1815  * @return #GNUNET_YES if peer is valid
1816  *         #GNUNET_NO  if peer is not valid
1817  */
1818 static int
1819 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1820                   const struct GNUNET_PeerIdentity *peer)
1821 {
1822   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1823 }
1824
1825
1826 /**
1827  * @brief Indicate that we want to send to the other peer
1828  *
1829  * This establishes a sending channel
1830  *
1831  * @param peer_ctx Context of the target peer
1832  */
1833 static void
1834 indicate_sending_intention (struct PeerContext *peer_ctx)
1835 {
1836   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1837                                                  &peer_ctx->peer_id));
1838   (void) get_channel (peer_ctx);
1839 }
1840
1841
1842 /**
1843  * @brief Check whether other peer has the intention to send/opened channel
1844  *        towars us
1845  *
1846  * @param peer_ctx Context of the peer in question
1847  *
1848  * @return #GNUNET_YES if peer has the intention to send
1849  *         #GNUNET_NO  otherwise
1850  */
1851 static int
1852 check_peer_send_intention (const struct PeerContext *peer_ctx)
1853 {
1854   if (NULL != peer_ctx->recv_channel_ctx)
1855   {
1856     return GNUNET_YES;
1857   }
1858   return GNUNET_NO;
1859 }
1860
1861
1862 /**
1863  * Handle the channel a peer opens to us.
1864  *
1865  * @param cls The closure - Sub
1866  * @param channel The channel the peer wants to establish
1867  * @param initiator The peer's peer ID
1868  *
1869  * @return initial channel context for the channel
1870  *         (can be NULL -- that's not an error)
1871  */
1872 static void *
1873 handle_inbound_channel (void *cls,
1874                         struct GNUNET_CADET_Channel *channel,
1875                         const struct GNUNET_PeerIdentity *initiator)
1876 {
1877   struct PeerContext *peer_ctx;
1878   struct ChannelCtx *channel_ctx;
1879   struct Sub *sub = cls;
1880
1881   LOG (GNUNET_ERROR_TYPE_DEBUG,
1882       "New channel was established to us (Peer %s).\n",
1883       GNUNET_i2s (initiator));
1884   GNUNET_assert (NULL != channel); /* according to cadet API */
1885   /* Make sure we 'know' about this peer */
1886   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1887   set_peer_online (peer_ctx);
1888   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1889   channel_ctx = add_channel_ctx (peer_ctx);
1890   channel_ctx->channel = channel;
1891   /* We only accept one incoming channel per peer */
1892   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1893                                                              initiator)))
1894   {
1895     LOG (GNUNET_ERROR_TYPE_WARNING,
1896         "Already got one receive channel. Destroying old one.\n");
1897     GNUNET_break_op (0);
1898     destroy_channel (peer_ctx->recv_channel_ctx);
1899     peer_ctx->recv_channel_ctx = channel_ctx;
1900     /* return the channel context */
1901     return channel_ctx;
1902   }
1903   peer_ctx->recv_channel_ctx = channel_ctx;
1904   return channel_ctx;
1905 }
1906
1907
1908 /**
1909  * @brief Check whether a sending channel towards the given peer exists
1910  *
1911  * @param peer_ctx Context of the peer in question
1912  *
1913  * @return #GNUNET_YES if a sending channel towards that peer exists
1914  *         #GNUNET_NO  otherwise
1915  */
1916 static int
1917 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1918 {
1919   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1920                                      &peer_ctx->peer_id))
1921   { /* If no such peer exists, there is no channel */
1922     return GNUNET_NO;
1923   }
1924   if (NULL == peer_ctx->send_channel_ctx)
1925   {
1926     return GNUNET_NO;
1927   }
1928   return GNUNET_YES;
1929 }
1930
1931
1932 /**
1933  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1934  *        intention to another peer
1935  *
1936  * @param peer_ctx Context to the peer
1937  * @return #GNUNET_YES if channel was destroyed
1938  *         #GNUNET_NO  otherwise
1939  */
1940 static int
1941 destroy_sending_channel (struct PeerContext *peer_ctx)
1942 {
1943   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1944                                      &peer_ctx->peer_id))
1945   {
1946     return GNUNET_NO;
1947   }
1948   if (NULL != peer_ctx->send_channel_ctx)
1949   {
1950     destroy_channel (peer_ctx->send_channel_ctx);
1951     (void) check_connected (peer_ctx);
1952     return GNUNET_YES;
1953   }
1954   return GNUNET_NO;
1955 }
1956
1957 /**
1958  * @brief Send a message to another peer.
1959  *
1960  * Keeps track about pending messages so they can be properly removed when the
1961  * peer is destroyed.
1962  *
1963  * @param peer_ctx Context of the peer to which the message is to be sent
1964  * @param ev envelope of the message
1965  * @param type type of the message
1966  */
1967 static void
1968 send_message (struct PeerContext *peer_ctx,
1969               struct GNUNET_MQ_Envelope *ev,
1970               const char *type)
1971 {
1972   struct PendingMessage *pending_msg;
1973   struct GNUNET_MQ_Handle *mq;
1974
1975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976               "Sending message to %s of type %s\n",
1977               GNUNET_i2s (&peer_ctx->peer_id),
1978               type);
1979   pending_msg = insert_pending_message (peer_ctx, ev, type);
1980   mq = get_mq (peer_ctx);
1981   GNUNET_MQ_notify_sent (ev,
1982                          mq_notify_sent_cb,
1983                          pending_msg);
1984   GNUNET_MQ_send (mq, ev);
1985 }
1986
1987 /**
1988  * @brief Schedule a operation on given peer
1989  *
1990  * Avoids scheduling an operation twice.
1991  *
1992  * @param peer_ctx Context of the peer for which to schedule the operation
1993  * @param peer_op the operation to schedule
1994  * @param cls Closure to @a peer_op
1995  *
1996  * @return #GNUNET_YES if the operation was scheduled
1997  *         #GNUNET_NO  otherwise
1998  */
1999 static int
2000 schedule_operation (struct PeerContext *peer_ctx,
2001                     const PeerOp peer_op,
2002                     void *cls)
2003 {
2004   struct PeerPendingOp pending_op;
2005
2006   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2007                                                  &peer_ctx->peer_id));
2008
2009   //TODO if ONLINE execute immediately
2010
2011   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2012   {
2013     pending_op.op = peer_op;
2014     pending_op.op_cls = cls;
2015     GNUNET_array_append (peer_ctx->pending_ops,
2016                          peer_ctx->num_pending_ops,
2017                          pending_op);
2018     return GNUNET_YES;
2019   }
2020   return GNUNET_NO;
2021 }
2022
2023 /***********************************************************************
2024  * /Old gnunet-service-rps_peers.c
2025 ***********************************************************************/
2026
2027
2028 /***********************************************************************
2029  * Housekeeping with clients
2030 ***********************************************************************/
2031
2032 /**
2033  * Closure used to pass the client and the id to the callback
2034  * that replies to a client's request
2035  */
2036 struct ReplyCls
2037 {
2038   /**
2039    * DLL
2040    */
2041   struct ReplyCls *next;
2042   struct ReplyCls *prev;
2043
2044   /**
2045    * The identifier of the request
2046    */
2047   uint32_t id;
2048
2049   /**
2050    * The handle to the request
2051    */
2052   struct RPS_SamplerRequestHandle *req_handle;
2053
2054   /**
2055    * The client handle to send the reply to
2056    */
2057   struct ClientContext *cli_ctx;
2058 };
2059
2060
2061 /**
2062  * Struct used to store the context of a connected client.
2063  */
2064 struct ClientContext
2065 {
2066   /**
2067    * DLL
2068    */
2069   struct ClientContext *next;
2070   struct ClientContext *prev;
2071
2072   /**
2073    * The message queue to communicate with the client.
2074    */
2075   struct GNUNET_MQ_Handle *mq;
2076
2077   /**
2078    * @brief How many updates this client expects to receive.
2079    */
2080   int64_t view_updates_left;
2081
2082   /**
2083    * @brief Whether this client wants to receive stream updates.
2084    * Either #GNUNET_YES or #GNUNET_NO
2085    */
2086   int8_t stream_update;
2087
2088   /**
2089    * The client handle to send the reply to
2090    */
2091   struct GNUNET_SERVICE_Client *client;
2092
2093   /**
2094    * The #Sub this context belongs to
2095    */
2096   struct Sub *sub;
2097 };
2098
2099 /**
2100  * DLL with all clients currently connected to us
2101  */
2102 struct ClientContext *cli_ctx_head;
2103 struct ClientContext *cli_ctx_tail;
2104
2105 /***********************************************************************
2106  * /Housekeeping with clients
2107 ***********************************************************************/
2108
2109
2110
2111
2112
2113 /***********************************************************************
2114  * Util functions
2115 ***********************************************************************/
2116
2117
2118 /**
2119  * Print peerlist to log.
2120  */
2121 static void
2122 print_peer_list (struct GNUNET_PeerIdentity *list,
2123                  unsigned int len)
2124 {
2125   unsigned int i;
2126
2127   LOG (GNUNET_ERROR_TYPE_DEBUG,
2128        "Printing peer list of length %u at %p:\n",
2129        len,
2130        list);
2131   for (i = 0 ; i < len ; i++)
2132   {
2133     LOG (GNUNET_ERROR_TYPE_DEBUG,
2134          "%u. peer: %s\n",
2135          i, GNUNET_i2s (&list[i]));
2136   }
2137 }
2138
2139
2140 /**
2141  * Remove peer from list.
2142  */
2143 static void
2144 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2145                unsigned int *list_size,
2146                const struct GNUNET_PeerIdentity *peer)
2147 {
2148   unsigned int i;
2149   struct GNUNET_PeerIdentity *tmp;
2150
2151   tmp = *peer_list;
2152
2153   LOG (GNUNET_ERROR_TYPE_DEBUG,
2154        "Removing peer %s from list at %p\n",
2155        GNUNET_i2s (peer),
2156        tmp);
2157
2158   for ( i = 0 ; i < *list_size ; i++ )
2159   {
2160     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2161     {
2162       if (i < *list_size -1)
2163       { /* Not at the last entry -- shift peers left */
2164         memmove (&tmp[i], &tmp[i +1],
2165                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2166       }
2167       /* Remove last entry (should be now useless PeerID) */
2168       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2169     }
2170   }
2171   *peer_list = tmp;
2172 }
2173
2174
2175 /**
2176  * Insert PeerID in #view
2177  *
2178  * Called once we know a peer is online.
2179  * Implements #PeerOp
2180  *
2181  * @return GNUNET_OK if peer was actually inserted
2182  *         GNUNET_NO if peer was not inserted
2183  */
2184 static void
2185 insert_in_view_op (void *cls,
2186                    const struct GNUNET_PeerIdentity *peer);
2187
2188 /**
2189  * Insert PeerID in #view
2190  *
2191  * Called once we know a peer is online.
2192  *
2193  * @param sub Sub in with the view to insert in
2194  * @param peer the peer to insert
2195  *
2196  * @return GNUNET_OK if peer was actually inserted
2197  *         GNUNET_NO if peer was not inserted
2198  */
2199 static int
2200 insert_in_view (struct Sub *sub,
2201                 const struct GNUNET_PeerIdentity *peer)
2202 {
2203   struct PeerContext *peer_ctx;
2204   int online;
2205   int ret;
2206
2207   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2208   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2209   if ( (GNUNET_NO == online) ||
2210        (GNUNET_SYSERR == online) ) /* peer is not even known */
2211   {
2212     (void) issue_peer_online_check (sub, peer);
2213     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2214     return GNUNET_NO;
2215   }
2216   /* Open channel towards peer to keep connection open */
2217   indicate_sending_intention (peer_ctx);
2218   ret = View_put (sub->view, peer);
2219   if (peer_ctx->sub == msub)
2220   {
2221     GNUNET_STATISTICS_set (stats,
2222                            "view size",
2223                            View_size (peer_ctx->sub->view),
2224                            GNUNET_NO);
2225   }
2226   return ret;
2227 }
2228
2229
2230 /**
2231  * @brief Send view to client
2232  *
2233  * @param cli_ctx the context of the client
2234  * @param view_array the peerids of the view as array (can be empty)
2235  * @param view_size the size of the view array (can be 0)
2236  */
2237 static void
2238 send_view (const struct ClientContext *cli_ctx,
2239            const struct GNUNET_PeerIdentity *view_array,
2240            uint64_t view_size)
2241 {
2242   struct GNUNET_MQ_Envelope *ev;
2243   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2244   struct Sub *sub;
2245
2246   if (NULL == view_array)
2247   {
2248     if (NULL == cli_ctx->sub) sub = msub;
2249     else sub = cli_ctx->sub;
2250     view_size = View_size (sub->view);
2251     view_array = View_get_as_array (sub->view);
2252   }
2253
2254   ev = GNUNET_MQ_msg_extra (out_msg,
2255                             view_size * sizeof (struct GNUNET_PeerIdentity),
2256                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2257   out_msg->num_peers = htonl (view_size);
2258
2259   GNUNET_memcpy (&out_msg[1],
2260                  view_array,
2261                  view_size * sizeof (struct GNUNET_PeerIdentity));
2262   GNUNET_MQ_send (cli_ctx->mq, ev);
2263 }
2264
2265
2266 /**
2267  * @brief Send peer from biased stream to client.
2268  *
2269  * TODO merge with send_view, parameterise
2270  *
2271  * @param cli_ctx the context of the client
2272  * @param view_array the peerids of the view as array (can be empty)
2273  * @param view_size the size of the view array (can be 0)
2274  */
2275 static void
2276 send_stream_peers (const struct ClientContext *cli_ctx,
2277                    uint64_t num_peers,
2278                    const struct GNUNET_PeerIdentity *peers)
2279 {
2280   struct GNUNET_MQ_Envelope *ev;
2281   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2282
2283   GNUNET_assert (NULL != peers);
2284
2285   ev = GNUNET_MQ_msg_extra (out_msg,
2286                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2287                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2288   out_msg->num_peers = htonl (num_peers);
2289
2290   GNUNET_memcpy (&out_msg[1],
2291                  peers,
2292                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2293   GNUNET_MQ_send (cli_ctx->mq, ev);
2294 }
2295
2296
2297 /**
2298  * @brief sends updates to clients that are interested
2299  *
2300  * @param sub Sub for which to notify clients
2301  */
2302 static void
2303 clients_notify_view_update (const struct Sub *sub)
2304 {
2305   struct ClientContext *cli_ctx_iter;
2306   uint64_t num_peers;
2307   const struct GNUNET_PeerIdentity *view_array;
2308
2309   num_peers = View_size (sub->view);
2310   view_array = View_get_as_array(sub->view);
2311   /* check size of view is small enough */
2312   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2313   {
2314     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2315                 "View is too big to send\n");
2316     return;
2317   }
2318
2319   for (cli_ctx_iter = cli_ctx_head;
2320        NULL != cli_ctx_iter;
2321        cli_ctx_iter = cli_ctx_iter->next)
2322   {
2323     if (1 < cli_ctx_iter->view_updates_left)
2324     {
2325       /* Client wants to receive limited amount of updates */
2326       cli_ctx_iter->view_updates_left -= 1;
2327     } else if (1 == cli_ctx_iter->view_updates_left)
2328     {
2329       /* Last update of view for client */
2330       cli_ctx_iter->view_updates_left = -1;
2331     } else if (0 > cli_ctx_iter->view_updates_left) {
2332       /* Client is not interested in updates */
2333       continue;
2334     }
2335     /* else _updates_left == 0 - infinite amount of updates */
2336
2337     /* send view */
2338     send_view (cli_ctx_iter, view_array, num_peers);
2339   }
2340 }
2341
2342
2343 /**
2344  * @brief sends updates to clients that are interested
2345  *
2346  * @param num_peers Number of peers to send
2347  * @param peers the array of peers to send
2348  */
2349 static void
2350 clients_notify_stream_peer (const struct Sub *sub,
2351                             uint64_t num_peers,
2352                             const struct GNUNET_PeerIdentity *peers)
2353                             // TODO enum StreamPeerSource)
2354 {
2355   struct ClientContext *cli_ctx_iter;
2356
2357   LOG (GNUNET_ERROR_TYPE_DEBUG,
2358       "Got peer (%s) from biased stream - update all clients\n",
2359       GNUNET_i2s (peers));
2360
2361   for (cli_ctx_iter = cli_ctx_head;
2362        NULL != cli_ctx_iter;
2363        cli_ctx_iter = cli_ctx_iter->next)
2364   {
2365     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2366         (sub == cli_ctx_iter->sub || sub == msub))
2367     {
2368       send_stream_peers (cli_ctx_iter, num_peers, peers);
2369     }
2370   }
2371 }
2372
2373
2374 /**
2375  * Put random peer from sampler into the view as history update.
2376  *
2377  * @param ids Array of Peers to insert into view
2378  * @param num_peers Number of peers to insert
2379  * @param cls Closure - The Sub for which this is to be done
2380  */
2381 static void
2382 hist_update (const struct GNUNET_PeerIdentity *ids,
2383              uint32_t num_peers,
2384              void *cls)
2385 {
2386   unsigned int i;
2387   struct Sub *sub = cls;
2388
2389   for (i = 0; i < num_peers; i++)
2390   {
2391     int inserted;
2392     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2393     {
2394       LOG (GNUNET_ERROR_TYPE_WARNING,
2395            "Peer in history update not known!\n");
2396       continue;
2397     }
2398     inserted = insert_in_view (sub, &ids[i]);
2399     if (GNUNET_OK == inserted)
2400     {
2401       clients_notify_stream_peer (sub, 1, &ids[i]);
2402     }
2403     to_file (sub->file_name_view_log,
2404              "+%s\t(hist)",
2405              GNUNET_i2s_full (ids));
2406   }
2407   clients_notify_view_update (sub);
2408 }
2409
2410
2411 /**
2412  * Wrapper around #RPS_sampler_resize()
2413  *
2414  * If we do not have enough sampler elements, double current sampler size
2415  * If we have more than enough sampler elements, halv current sampler size
2416  *
2417  * @param sampler The sampler to resize
2418  * @param new_size New size to which to resize
2419  */
2420 static void
2421 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2422 {
2423   unsigned int sampler_size;
2424
2425   // TODO statistics
2426   // TODO respect the min, max
2427   sampler_size = RPS_sampler_get_size (sampler);
2428   if (sampler_size > new_size * 4)
2429   { /* Shrinking */
2430     RPS_sampler_resize (sampler, sampler_size / 2);
2431   }
2432   else if (sampler_size < new_size)
2433   { /* Growing */
2434     RPS_sampler_resize (sampler, sampler_size * 2);
2435   }
2436   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2437 }
2438
2439
2440 /**
2441  * Add all peers in @a peer_array to @a peer_map used as set.
2442  *
2443  * @param peer_array array containing the peers
2444  * @param num_peers number of peers in @peer_array
2445  * @param peer_map the peermap to use as set
2446  */
2447 static void
2448 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2449                        unsigned int num_peers,
2450                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2451 {
2452   unsigned int i;
2453   if (NULL == peer_map)
2454   {
2455     LOG (GNUNET_ERROR_TYPE_WARNING,
2456          "Trying to add peers to non-existing peermap.\n");
2457     return;
2458   }
2459
2460   for (i = 0; i < num_peers; i++)
2461   {
2462     GNUNET_CONTAINER_multipeermap_put (peer_map,
2463                                        &peer_array[i],
2464                                        NULL,
2465                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2466     if (msub->peer_map == peer_map)
2467     {
2468       GNUNET_STATISTICS_set (stats,
2469                             "# known peers",
2470                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2471                             GNUNET_NO);
2472     }
2473   }
2474 }
2475
2476
2477 /**
2478  * Send a PULL REPLY to @a peer_id
2479  *
2480  * @param peer_ctx Context of the peer to send the reply to
2481  * @param peer_ids the peers to send to @a peer_id
2482  * @param num_peer_ids the number of peers to send to @a peer_id
2483  */
2484 static void
2485 send_pull_reply (struct PeerContext *peer_ctx,
2486                  const struct GNUNET_PeerIdentity *peer_ids,
2487                  unsigned int num_peer_ids)
2488 {
2489   uint32_t send_size;
2490   struct GNUNET_MQ_Envelope *ev;
2491   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2492
2493   /* Compute actual size */
2494   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2495               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2496
2497   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2498     /* Compute number of peers to send
2499      * If too long, simply truncate */
2500     // TODO select random ones via permutation
2501     //      or even better: do good protocol design
2502     send_size =
2503       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2504        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2505        sizeof (struct GNUNET_PeerIdentity);
2506   else
2507     send_size = num_peer_ids;
2508
2509   LOG (GNUNET_ERROR_TYPE_DEBUG,
2510       "Going to send PULL REPLY with %u peers to %s\n",
2511       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2512
2513   ev = GNUNET_MQ_msg_extra (out_msg,
2514                             send_size * sizeof (struct GNUNET_PeerIdentity),
2515                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2516   out_msg->num_peers = htonl (send_size);
2517   GNUNET_memcpy (&out_msg[1], peer_ids,
2518          send_size * sizeof (struct GNUNET_PeerIdentity));
2519
2520   send_message (peer_ctx, ev, "PULL REPLY");
2521   if (peer_ctx->sub == msub)
2522   {
2523     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2524   }
2525   // TODO check with send intention: as send_channel is used/opened we indicate
2526   // a sending intention without intending it.
2527   // -> clean peer afterwards?
2528   // -> use recv_channel?
2529 }
2530
2531
2532 /**
2533  * Insert PeerID in #pull_map
2534  *
2535  * Called once we know a peer is online.
2536  *
2537  * @param cls Closure - Sub with the pull map to insert into
2538  * @param peer Peer to insert
2539  */
2540 static void
2541 insert_in_pull_map (void *cls,
2542                     const struct GNUNET_PeerIdentity *peer)
2543 {
2544   struct Sub *sub = cls;
2545
2546   CustomPeerMap_put (sub->pull_map, peer);
2547 }
2548
2549
2550 /**
2551  * Insert PeerID in #view
2552  *
2553  * Called once we know a peer is online.
2554  * Implements #PeerOp
2555  *
2556  * @param cls Closure - Sub with view to insert peer into
2557  * @param peer the peer to insert
2558  */
2559 static void
2560 insert_in_view_op (void *cls,
2561                    const struct GNUNET_PeerIdentity *peer)
2562 {
2563   struct Sub *sub = cls;
2564   int inserted;
2565
2566   inserted = insert_in_view (sub, peer);
2567   if (GNUNET_OK == inserted)
2568   {
2569     clients_notify_stream_peer (sub, 1, peer);
2570   }
2571 }
2572
2573
2574 /**
2575  * Update sampler with given PeerID.
2576  * Implements #PeerOp
2577  *
2578  * @param cls Closure - Sub containing the sampler to insert into
2579  * @param peer Peer to insert
2580  */
2581 static void
2582 insert_in_sampler (void *cls,
2583                    const struct GNUNET_PeerIdentity *peer)
2584 {
2585   struct Sub *sub = cls;
2586
2587   LOG (GNUNET_ERROR_TYPE_DEBUG,
2588        "Updating samplers with peer %s from insert_in_sampler()\n",
2589        GNUNET_i2s (peer));
2590   RPS_sampler_update (sub->sampler, peer);
2591   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2592   {
2593     /* Make sure we 'know' about this peer */
2594     (void) issue_peer_online_check (sub, peer);
2595     /* Establish a channel towards that peer to indicate we are going to send
2596      * messages to it */
2597     //indicate_sending_intention (peer);
2598   }
2599 #ifdef TO_FILE
2600   sub->num_observed_peers++;
2601   GNUNET_CONTAINER_multipeermap_put
2602     (sub->observed_unique_peers,
2603      peer,
2604      NULL,
2605      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2606   uint32_t num_observed_unique_peers =
2607     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2608   to_file (sub->file_name_observed_log,
2609           "%" PRIu32 " %" PRIu32 " %f\n",
2610           sub->num_observed_peers,
2611           num_observed_unique_peers,
2612           1.0*num_observed_unique_peers/sub->num_observed_peers)
2613 #endif /* TO_FILE */
2614 }
2615
2616
2617 /**
2618  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2619  *        If the peer is not known, online check is issued and it is
2620  *        scheduled to be inserted in sampler and view.
2621  *
2622  * "External sources" refer to every source except the gossip.
2623  *
2624  * @param sub Sub for which @a peer was received
2625  * @param peer peer to insert/peer received
2626  */
2627 static void
2628 got_peer (struct Sub *sub,
2629           const struct GNUNET_PeerIdentity *peer)
2630 {
2631   /* If we did not know this peer already, insert it into sampler and view */
2632   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2633   {
2634     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2635                         &insert_in_sampler, sub);
2636     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2637                         &insert_in_view_op, sub);
2638   }
2639   if (sub == msub)
2640   {
2641     GNUNET_STATISTICS_update (stats,
2642                               "# learnd peers",
2643                               1,
2644                               GNUNET_NO);
2645   }
2646 }
2647
2648
2649 /**
2650  * @brief Checks if there is a sending channel and if it is needed
2651  *
2652  * @param peer_ctx Context of the peer to check
2653  * @return GNUNET_YES if sending channel exists and is still needed
2654  *         GNUNET_NO  otherwise
2655  */
2656 static int
2657 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2658 {
2659   /* struct GNUNET_CADET_Channel *channel; */
2660   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2661                                      &peer_ctx->peer_id))
2662   {
2663     return GNUNET_NO;
2664   }
2665   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2666   {
2667     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2668                                     &peer_ctx->peer_id)) ||
2669          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2670                                             &peer_ctx->peer_id)) ||
2671          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2672                                                      &peer_ctx->peer_id)) ||
2673          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2674                                                      &peer_ctx->peer_id)) ||
2675          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2676                                          &peer_ctx->peer_id,
2677                                          Peers_PULL_REPLY_PENDING)))
2678     { /* If we want to keep the connection to peer open */
2679       return GNUNET_YES;
2680     }
2681     return GNUNET_NO;
2682   }
2683   return GNUNET_NO;
2684 }
2685
2686
2687 /**
2688  * @brief remove peer from our knowledge, the view, push and pull maps and
2689  * samplers.
2690  *
2691  * @param sub Sub with the data structures the peer is to be removed from
2692  * @param peer the peer to remove
2693  */
2694 static void
2695 remove_peer (struct Sub *sub,
2696              const struct GNUNET_PeerIdentity *peer)
2697 {
2698   (void) View_remove_peer (sub->view,
2699                            peer);
2700   CustomPeerMap_remove_peer (sub->pull_map,
2701                              peer);
2702   CustomPeerMap_remove_peer (sub->push_map,
2703                              peer);
2704   RPS_sampler_reinitialise_by_value (sub->sampler,
2705                                      peer);
2706   /* We want to destroy the peer now.
2707    * Sometimes, it just seems that it's already been removed from the peer_map,
2708    * so check the peer_map first. */
2709   if (GNUNET_YES == check_peer_known (sub->peer_map,
2710                                       peer))
2711   {
2712     destroy_peer (get_peer_ctx (sub->peer_map,
2713                                 peer));
2714   }
2715 }
2716
2717
2718 /**
2719  * @brief Remove data that is not needed anymore.
2720  *
2721  * If the sending channel is no longer needed it is destroyed.
2722  *
2723  * @param sub Sub in which the current peer is to be cleaned
2724  * @param peer the peer whose data is about to be cleaned
2725  */
2726 static void
2727 clean_peer (struct Sub *sub,
2728             const struct GNUNET_PeerIdentity *peer)
2729 {
2730   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2731                                                                peer)))
2732   {
2733     LOG (GNUNET_ERROR_TYPE_DEBUG,
2734         "Going to remove send channel to peer %s\n",
2735         GNUNET_i2s (peer));
2736     #if ENABLE_MALICIOUS
2737     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer,
2738                                               peer))
2739       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2740                                                     peer));
2741     #else /* ENABLE_MALICIOUS */
2742     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2743                                                   peer));
2744     #endif /* ENABLE_MALICIOUS */
2745   }
2746
2747   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2748                                                            peer))
2749   {
2750     /* Peer was already removed by callback on destroyed channel */
2751     LOG (GNUNET_ERROR_TYPE_WARNING,
2752         "Peer was removed from our knowledge during cleanup\n");
2753     return;
2754   }
2755
2756   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2757                                                               peer))) &&
2758        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2759        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2760        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2761        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2762        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2763   { /* We can safely remove this peer */
2764     LOG (GNUNET_ERROR_TYPE_DEBUG,
2765         "Going to remove peer %s\n",
2766         GNUNET_i2s (peer));
2767     remove_peer (sub, peer);
2768     return;
2769   }
2770 }
2771
2772
2773 /**
2774  * @brief This is called when a channel is destroyed.
2775  *
2776  * Removes peer completely from our knowledge if the send_channel was destroyed
2777  * Otherwise simply delete the recv_channel
2778  * Also check if the knowledge about this peer is still needed.
2779  * If not, remove this peer from our knowledge.
2780  *
2781  * @param cls The closure - Context to the channel
2782  * @param channel The channel being closed
2783  */
2784 static void
2785 cleanup_destroyed_channel (void *cls,
2786                            const struct GNUNET_CADET_Channel *channel)
2787 {
2788   struct ChannelCtx *channel_ctx = cls;
2789   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2790   (void) channel;
2791
2792   channel_ctx->channel = NULL;
2793   remove_channel_ctx (channel_ctx);
2794   if (NULL != peer_ctx &&
2795       peer_ctx->send_channel_ctx == channel_ctx &&
2796       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2797   {
2798     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2799   }
2800 }
2801
2802 /***********************************************************************
2803  * /Util functions
2804 ***********************************************************************/
2805
2806
2807
2808 /***********************************************************************
2809  * Sub
2810 ***********************************************************************/
2811
2812 /**
2813  * @brief Create a new Sub
2814  *
2815  * @param hash Hash of value shared among rps instances on other hosts that
2816  *        defines a subgroup to sample from.
2817  * @param sampler_size Size of the sampler
2818  * @param round_interval Interval (in average) between two rounds
2819  *
2820  * @return Sub
2821  */
2822 struct Sub *
2823 new_sub (const struct GNUNET_HashCode *hash,
2824          uint32_t sampler_size,
2825          struct GNUNET_TIME_Relative round_interval)
2826 {
2827   struct Sub *sub;
2828
2829   sub = GNUNET_new (struct Sub);
2830
2831   /* With the hash generated from the secret value this service only connects
2832    * to rps instances that share the value */
2833   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2834     GNUNET_MQ_hd_fixed_size (peer_check,
2835                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2836                              struct GNUNET_MessageHeader,
2837                              NULL),
2838     GNUNET_MQ_hd_fixed_size (peer_push,
2839                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2840                              struct GNUNET_MessageHeader,
2841                              NULL),
2842     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2843                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2844                              struct GNUNET_MessageHeader,
2845                              NULL),
2846     GNUNET_MQ_hd_var_size (peer_pull_reply,
2847                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2848                            struct GNUNET_RPS_P2P_PullReplyMessage,
2849                            NULL),
2850     GNUNET_MQ_handler_end ()
2851   };
2852   sub->hash = *hash;
2853   sub->cadet_port =
2854     GNUNET_CADET_open_port (cadet_handle,
2855                             &sub->hash,
2856                             &handle_inbound_channel, /* Connect handler */
2857                             sub, /* cls */
2858                             NULL, /* WindowSize handler */
2859                             &cleanup_destroyed_channel, /* Disconnect handler */
2860                             cadet_handlers);
2861   if (NULL == sub->cadet_port)
2862   {
2863     LOG (GNUNET_ERROR_TYPE_ERROR,
2864         "Cadet port `%s' is already in use.\n",
2865         GNUNET_APPLICATION_PORT_RPS);
2866     GNUNET_assert (0);
2867   }
2868
2869   /* Set up general data structure to keep track about peers */
2870   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2871   if (GNUNET_OK !=
2872       GNUNET_CONFIGURATION_get_value_filename (cfg,
2873                                                "rps",
2874                                                "FILENAME_VALID_PEERS",
2875                                                &sub->filename_valid_peers))
2876   {
2877     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2878                                "rps",
2879                                "FILENAME_VALID_PEERS");
2880   }
2881   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2882   {
2883     char *tmp_filename_valid_peers;
2884     char str_hash[105];
2885
2886     GNUNET_snprintf (str_hash,
2887                      sizeof (str_hash),
2888                      GNUNET_h2s_full (hash));
2889     tmp_filename_valid_peers = sub->filename_valid_peers;
2890     GNUNET_asprintf (&sub->filename_valid_peers,
2891                      "%s%s",
2892                      tmp_filename_valid_peers,
2893                      str_hash);
2894     GNUNET_free (tmp_filename_valid_peers);
2895   }
2896   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2897
2898   /* Set up the sampler */
2899   sub->sampler_size_est_min = sampler_size;
2900   sub->sampler_size_est_need = sampler_size;;
2901   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2902   GNUNET_assert (0 != round_interval.rel_value_us);
2903   sub->round_interval = round_interval;
2904   sub->sampler = RPS_sampler_init (sampler_size,
2905                                   round_interval);
2906
2907   /* Logging of internals */
2908   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2909 #ifdef TO_FILE
2910   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2911                                                        "observed");
2912   sub->num_observed_peers = 0;
2913   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2914                                                                     GNUNET_NO);
2915 #endif /* TO_FILE */
2916
2917   /* Set up data structures for gossip */
2918   sub->push_map = CustomPeerMap_create (4);
2919   sub->pull_map = CustomPeerMap_create (4);
2920   sub->view_size_est_min = sampler_size;;
2921   sub->view = View_create (sub->view_size_est_min);
2922   if (sub == msub)
2923   {
2924     GNUNET_STATISTICS_set (stats,
2925                            "view size aim",
2926                            sub->view_size_est_min,
2927                            GNUNET_NO);
2928   }
2929
2930   /* Start executing rounds */
2931   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2932
2933   return sub;
2934 }
2935
2936
2937 #ifdef TO_FILE
2938 /**
2939  * @brief Write all numbers in the given array into the given file
2940  *
2941  * Single numbers devided by a newline
2942  *
2943  * @param hist_array[] the array to dump
2944  * @param file_name file to dump into
2945  */
2946 static void
2947 write_histogram_to_file (const uint32_t hist_array[],
2948                          const char *file_name)
2949 {
2950   char collect_str[SIZE_DUMP_FILE + 1] = "";
2951   char *recv_str_iter;
2952   char *file_name_full;
2953
2954   recv_str_iter = collect_str;
2955   file_name_full = store_prefix_file_name (&own_identity,
2956                                            file_name);
2957   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2958   {
2959     char collect_str_tmp[8];
2960
2961     GNUNET_snprintf (collect_str_tmp,
2962                      sizeof (collect_str_tmp),
2963                      "%" PRIu32 "\n",
2964                      hist_array[i]);
2965     recv_str_iter = stpncpy (recv_str_iter,
2966                              collect_str_tmp,
2967                              6);
2968   }
2969   (void) stpcpy (recv_str_iter,
2970                  "\n");
2971   LOG (GNUNET_ERROR_TYPE_DEBUG,
2972        "Writing push stats to disk\n");
2973   to_file_w_len (file_name_full,
2974                  SIZE_DUMP_FILE,
2975                  collect_str);
2976   GNUNET_free (file_name_full);
2977 }
2978 #endif /* TO_FILE */
2979
2980
2981 /**
2982  * @brief Destroy Sub.
2983  *
2984  * @param sub Sub to destroy
2985  */
2986 static void
2987 destroy_sub (struct Sub *sub)
2988 {
2989   GNUNET_assert (NULL != sub);
2990   GNUNET_assert (NULL != sub->do_round_task);
2991   GNUNET_SCHEDULER_cancel (sub->do_round_task);
2992   sub->do_round_task = NULL;
2993
2994   /* Disconnect from cadet */
2995   GNUNET_CADET_close_port (sub->cadet_port);
2996   sub->cadet_port= NULL;
2997
2998   /* Clean up data structures for peers */
2999   RPS_sampler_destroy (sub->sampler);
3000   sub->sampler = NULL;
3001   View_destroy (sub->view);
3002   sub->view = NULL;
3003   CustomPeerMap_destroy (sub->push_map);
3004   sub->push_map = NULL;
3005   CustomPeerMap_destroy (sub->pull_map);
3006   sub->pull_map = NULL;
3007   peers_terminate (sub);
3008
3009   /* Free leftover data structures */
3010   GNUNET_free (sub->file_name_view_log);
3011   sub->file_name_view_log = NULL;
3012 #ifdef TO_FILE
3013   GNUNET_free (sub->file_name_observed_log);
3014   sub->file_name_observed_log = NULL;
3015
3016   /* Write push frequencies to disk */
3017   write_histogram_to_file (sub->push_recv,
3018                            "push_recv");
3019
3020   /* Write push deltas to disk */
3021   write_histogram_to_file (sub->push_delta,
3022                            "push_delta");
3023
3024   /* Write pull delays to disk */
3025   write_histogram_to_file (sub->pull_delays,
3026                            "pull_delays");
3027
3028   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3029   sub->observed_unique_peers = NULL;
3030 #endif /* TO_FILE */
3031
3032   GNUNET_free (sub);
3033 }
3034
3035
3036 /***********************************************************************
3037  * /Sub
3038 ***********************************************************************/
3039
3040
3041 /***********************************************************************
3042  * Core handlers
3043 ***********************************************************************/
3044
3045 /**
3046  * @brief Callback on initialisation of Core.
3047  *
3048  * @param cls - unused
3049  * @param my_identity - unused
3050  */
3051 void
3052 core_init (void *cls,
3053            const struct GNUNET_PeerIdentity *my_identity)
3054 {
3055   (void) cls;
3056   (void) my_identity;
3057
3058   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3059 }
3060
3061
3062 /**
3063  * @brief Callback for core.
3064  * Method called whenever a given peer connects.
3065  *
3066  * @param cls closure - unused
3067  * @param peer peer identity this notification is about
3068  * @return closure given to #core_disconnects as peer_cls
3069  */
3070 void *
3071 core_connects (void *cls,
3072                const struct GNUNET_PeerIdentity *peer,
3073                struct GNUNET_MQ_Handle *mq)
3074 {
3075   (void) cls;
3076   (void) mq;
3077
3078   GNUNET_assert (GNUNET_YES ==
3079                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3080                                                     peer,
3081                                                     NULL,
3082                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3083   return NULL;
3084 }
3085
3086
3087 /**
3088  * @brief Callback for core.
3089  * Method called whenever a peer disconnects.
3090  *
3091  * @param cls closure - unused
3092  * @param peer peer identity this notification is about
3093  * @param peer_cls closure given in #core_connects - unused
3094  */
3095 void
3096 core_disconnects (void *cls,
3097                   const struct GNUNET_PeerIdentity *peer,
3098                   void *peer_cls)
3099 {
3100   (void) cls;
3101   (void) peer_cls;
3102
3103   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3104 }
3105
3106 /***********************************************************************
3107  * /Core handlers
3108 ***********************************************************************/
3109
3110
3111 /**
3112  * @brief Destroy the context for a (connected) client
3113  *
3114  * @param cli_ctx Context to destroy
3115  */
3116 static void
3117 destroy_cli_ctx (struct ClientContext *cli_ctx)
3118 {
3119   GNUNET_assert (NULL != cli_ctx);
3120   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3121                                cli_ctx_tail,
3122                                cli_ctx);
3123   if (NULL != cli_ctx->sub)
3124   {
3125     destroy_sub (cli_ctx->sub);
3126     cli_ctx->sub = NULL;
3127   }
3128   GNUNET_free (cli_ctx);
3129 }
3130
3131
3132 /**
3133  * @brief Update sizes in sampler and view on estimate update from nse service
3134  *
3135  * @param sub Sub
3136  * @param logestimate the log(Base 2) value of the current network size estimate
3137  * @param std_dev standard deviation for the estimate
3138  */
3139 static void
3140 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3141 {
3142   double estimate;
3143   //double scale; // TODO this might go gloabal/config
3144
3145   LOG (GNUNET_ERROR_TYPE_DEBUG,
3146        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3147        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3148   //scale = .01;
3149   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3150   // GNUNET_NSE_log_estimate_to_n (logestimate);
3151   estimate = pow (estimate, 1.0 / 3);
3152   // TODO add if std_dev is a number
3153   // estimate += (std_dev * scale);
3154   if (sub->view_size_est_min < ceil (estimate))
3155   {
3156     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3157     sub->sampler_size_est_need = estimate;
3158     sub->view_size_est_need = estimate;
3159   } else
3160   {
3161     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3162     //sub->sampler_size_est_need = sub->view_size_est_min;
3163     sub->view_size_est_need = sub->view_size_est_min;
3164   }
3165   if (sub == msub)
3166   {
3167     GNUNET_STATISTICS_set (stats,
3168                            "view size aim",
3169                            sub->view_size_est_need,
3170                            GNUNET_NO);
3171   }
3172
3173   /* If the NSE has changed adapt the lists accordingly */
3174   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3175   View_change_len (sub->view, sub->view_size_est_need);
3176 }
3177
3178
3179 /**
3180  * Function called by NSE.
3181  *
3182  * Updates sizes of sampler list and view and adapt those lists
3183  * accordingly.
3184  *
3185  * implements #GNUNET_NSE_Callback
3186  *
3187  * @param cls Closure - unused
3188  * @param timestamp time when the estimate was received from the server (or created by the server)
3189  * @param logestimate the log(Base 2) value of the current network size estimate
3190  * @param std_dev standard deviation for the estimate
3191  */
3192 static void
3193 nse_callback (void *cls,
3194               struct GNUNET_TIME_Absolute timestamp,
3195               double logestimate, double std_dev)
3196 {
3197   (void) cls;
3198   (void) timestamp;
3199   struct ClientContext *cli_ctx_iter;
3200
3201   adapt_sizes (msub, logestimate, std_dev);
3202   for (cli_ctx_iter = cli_ctx_head;
3203       NULL != cli_ctx_iter;
3204       cli_ctx_iter = cli_ctx_iter->next)
3205   {
3206     if (NULL != cli_ctx_iter->sub)
3207     {
3208       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3209     }
3210   }
3211 }
3212
3213
3214 /**
3215  * @brief This function is called, when the client seeds peers.
3216  * It verifies that @a msg is well-formed.
3217  *
3218  * @param cls the closure (#ClientContext)
3219  * @param msg the message
3220  * @return #GNUNET_OK if @a msg is well-formed
3221  *         #GNUNET_SYSERR otherwise
3222  */
3223 static int
3224 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3225 {
3226   struct ClientContext *cli_ctx = cls;
3227   uint16_t msize = ntohs (msg->header.size);
3228   uint32_t num_peers = ntohl (msg->num_peers);
3229
3230   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3231   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3232        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3233   {
3234     LOG (GNUNET_ERROR_TYPE_ERROR,
3235         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3236         ntohl (msg->num_peers),
3237         (msize / sizeof (struct GNUNET_PeerIdentity)));
3238     GNUNET_break (0);
3239     GNUNET_SERVICE_client_drop (cli_ctx->client);
3240     return GNUNET_SYSERR;
3241   }
3242   return GNUNET_OK;
3243 }
3244
3245
3246 /**
3247  * Handle seed from the client.
3248  *
3249  * @param cls closure
3250  * @param message the actual message
3251  */
3252 static void
3253 handle_client_seed (void *cls,
3254                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3255 {
3256   struct ClientContext *cli_ctx = cls;
3257   struct GNUNET_PeerIdentity *peers;
3258   uint32_t num_peers;
3259   uint32_t i;
3260
3261   num_peers = ntohl (msg->num_peers);
3262   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3263
3264   LOG (GNUNET_ERROR_TYPE_DEBUG,
3265        "Client seeded peers:\n");
3266   print_peer_list (peers, num_peers);
3267
3268   for (i = 0; i < num_peers; i++)
3269   {
3270     LOG (GNUNET_ERROR_TYPE_DEBUG,
3271          "Updating samplers with seed %" PRIu32 ": %s\n",
3272          i,
3273          GNUNET_i2s (&peers[i]));
3274
3275     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3276     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3277   }
3278   GNUNET_SERVICE_client_continue (cli_ctx->client);
3279 }
3280
3281
3282 /**
3283  * Handle RPS request from the client.
3284  *
3285  * @param cls Client context
3286  * @param message Message containing the numer of updates the client wants to
3287  * receive
3288  */
3289 static void
3290 handle_client_view_request (void *cls,
3291                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3292 {
3293   struct ClientContext *cli_ctx = cls;
3294   uint64_t num_updates;
3295
3296   num_updates = ntohl (msg->num_updates);
3297
3298   LOG (GNUNET_ERROR_TYPE_DEBUG,
3299        "Client requested %" PRIu64 " updates of view.\n",
3300        num_updates);
3301
3302   GNUNET_assert (NULL != cli_ctx);
3303   cli_ctx->view_updates_left = num_updates;
3304   send_view (cli_ctx, NULL, 0);
3305   GNUNET_SERVICE_client_continue (cli_ctx->client);
3306 }
3307
3308
3309 /**
3310  * @brief Handle the cancellation of the view updates.
3311  *
3312  * @param cls The client context
3313  * @param msg Unused
3314  */
3315 static void
3316 handle_client_view_cancel (void *cls,
3317                            const struct GNUNET_MessageHeader *msg)
3318 {
3319   struct ClientContext *cli_ctx = cls;
3320   (void) msg;
3321
3322   LOG (GNUNET_ERROR_TYPE_DEBUG,
3323        "Client does not want to receive updates of view any more.\n");
3324
3325   GNUNET_assert (NULL != cli_ctx);
3326   cli_ctx->view_updates_left = 0;
3327   GNUNET_SERVICE_client_continue (cli_ctx->client);
3328   if (GNUNET_YES == cli_ctx->stream_update)
3329   {
3330     destroy_cli_ctx (cli_ctx);
3331   }
3332 }
3333
3334
3335 /**
3336  * Handle RPS request for biased stream from the client.
3337  *
3338  * @param cls Client context
3339  * @param message unused
3340  */
3341 static void
3342 handle_client_stream_request (void *cls,
3343                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3344 {
3345   struct ClientContext *cli_ctx = cls;
3346   (void) msg;
3347
3348   LOG (GNUNET_ERROR_TYPE_DEBUG,
3349        "Client requested peers from biased stream.\n");
3350   cli_ctx->stream_update = GNUNET_YES;
3351
3352   GNUNET_assert (NULL != cli_ctx);
3353   GNUNET_SERVICE_client_continue (cli_ctx->client);
3354 }
3355
3356
3357 /**
3358  * @brief Handles the cancellation of the stream of biased peer ids
3359  *
3360  * @param cls The client context
3361  * @param msg unused
3362  */
3363 static void
3364 handle_client_stream_cancel (void *cls,
3365                              const struct GNUNET_MessageHeader *msg)
3366 {
3367   struct ClientContext *cli_ctx = cls;
3368   (void) msg;
3369
3370   LOG (GNUNET_ERROR_TYPE_DEBUG,
3371        "Client canceled receiving peers from biased stream.\n");
3372   cli_ctx->stream_update = GNUNET_NO;
3373
3374   GNUNET_assert (NULL != cli_ctx);
3375   GNUNET_SERVICE_client_continue (cli_ctx->client);
3376 }
3377
3378
3379 /**
3380  * @brief Create and start a Sub.
3381  *
3382  * @param cls Closure - unused
3383  * @param msg Message containing the necessary information
3384  */
3385 static void
3386 handle_client_start_sub (void *cls,
3387                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3388 {
3389   struct ClientContext *cli_ctx = cls;
3390
3391   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3392   if (NULL != cli_ctx->sub &&
3393       0 != memcmp (&cli_ctx->sub->hash,
3394                    &msg->hash,
3395                    sizeof (struct GNUNET_HashCode)))
3396   {
3397     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3398     destroy_sub (cli_ctx->sub);
3399     cli_ctx->sub = NULL;
3400   }
3401   cli_ctx->sub = new_sub (&msg->hash,
3402                          msub->sampler_size_est_min, // TODO make api input?
3403                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3404   GNUNET_SERVICE_client_continue (cli_ctx->client);
3405 }
3406
3407
3408 /**
3409  * @brief Destroy the Sub
3410  *
3411  * @param cls Closure - unused
3412  * @param msg Message containing the hash that identifies the Sub
3413  */
3414 static void
3415 handle_client_stop_sub (void *cls,
3416                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3417 {
3418   struct ClientContext *cli_ctx = cls;
3419
3420   GNUNET_assert (NULL != cli_ctx->sub);
3421   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3422   {
3423     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3424   }
3425   destroy_sub (cli_ctx->sub);
3426   cli_ctx->sub = NULL;
3427   GNUNET_SERVICE_client_continue (cli_ctx->client);
3428 }
3429
3430
3431 /**
3432  * Handle a CHECK_LIVE message from another peer.
3433  *
3434  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3435  * the channel is blocked for all other communication.
3436  *
3437  * @param cls Closure - Context of channel
3438  * @param msg Message - unused
3439  */
3440 static void
3441 handle_peer_check (void *cls,
3442                    const struct GNUNET_MessageHeader *msg)
3443 {
3444   const struct ChannelCtx *channel_ctx = cls;
3445   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3446   (void) msg;
3447
3448   LOG (GNUNET_ERROR_TYPE_DEBUG,
3449       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3450   if (channel_ctx->peer_ctx->sub == msub)
3451   {
3452     GNUNET_STATISTICS_update (stats,
3453                               "# pending online checks",
3454                               -1,
3455                               GNUNET_NO);
3456   }
3457
3458   GNUNET_CADET_receive_done (channel_ctx->channel);
3459 }
3460
3461
3462 /**
3463  * Handle a PUSH message from another peer.
3464  *
3465  * Check the proof of work and store the PeerID
3466  * in the temporary list for pushed PeerIDs.
3467  *
3468  * @param cls Closure - Context of channel
3469  * @param msg Message - unused
3470  */
3471 static void
3472 handle_peer_push (void *cls,
3473                   const struct GNUNET_MessageHeader *msg)
3474 {
3475   const struct ChannelCtx *channel_ctx = cls;
3476   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3477   (void) msg;
3478
3479   // (check the proof of work (?))
3480
3481   LOG (GNUNET_ERROR_TYPE_DEBUG,
3482        "Received PUSH (%s)\n",
3483        GNUNET_i2s (peer));
3484   if (channel_ctx->peer_ctx->sub == msub)
3485   {
3486     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3487   }
3488
3489   #if ENABLE_MALICIOUS
3490   struct AttackedPeer *tmp_att_peer;
3491
3492   if ( (1 == mal_type) ||
3493        (3 == mal_type) )
3494   { /* Try to maximise representation */
3495     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3496     tmp_att_peer->peer_id = *peer;
3497     if (NULL == att_peer_set)
3498       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3499     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3500                                                              peer))
3501     {
3502       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3503                                    att_peers_tail,
3504                                    tmp_att_peer);
3505       add_peer_array_to_set (peer, 1, att_peer_set);
3506     }
3507     else
3508     {
3509       GNUNET_free (tmp_att_peer);
3510     }
3511   }
3512
3513
3514   else if (2 == mal_type)
3515   {
3516     /* We attack one single well-known peer - simply ignore */
3517   }
3518   #endif /* ENABLE_MALICIOUS */
3519
3520   /* Add the sending peer to the push_map */
3521   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3522
3523   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3524                                      &channel_ctx->peer_ctx->peer_id));
3525   GNUNET_CADET_receive_done (channel_ctx->channel);
3526 }
3527
3528
3529 /**
3530  * Handle PULL REQUEST request message from another peer.
3531  *
3532  * Reply with the view of PeerIDs.
3533  *
3534  * @param cls Closure - Context of channel
3535  * @param msg Message - unused
3536  */
3537 static void
3538 handle_peer_pull_request (void *cls,
3539                           const struct GNUNET_MessageHeader *msg)
3540 {
3541   const struct ChannelCtx *channel_ctx = cls;
3542   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3543   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3544   const struct GNUNET_PeerIdentity *view_array;
3545   (void) msg;
3546
3547   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3548   if (peer_ctx->sub == msub)
3549   {
3550     GNUNET_STATISTICS_update(stats,
3551                              "# pull request message received",
3552                              1,
3553                              GNUNET_NO);
3554     if (NULL != map_single_hop &&
3555         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3556                                                              &peer_ctx->peer_id))
3557     {
3558       GNUNET_STATISTICS_update (stats,
3559                                 "# pull request message received (multi-hop peer)",
3560                                 1,
3561                                 GNUNET_NO);
3562     }
3563   }
3564
3565   #if ENABLE_MALICIOUS
3566   if (1 == mal_type
3567       || 3 == mal_type)
3568   { /* Try to maximise representation */
3569     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3570   }
3571
3572   else if (2 == mal_type)
3573   { /* Try to partition network */
3574     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3575     {
3576       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3577     }
3578   }
3579   #endif /* ENABLE_MALICIOUS */
3580
3581   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3582                                      &channel_ctx->peer_ctx->peer_id));
3583   GNUNET_CADET_receive_done (channel_ctx->channel);
3584   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3585   send_pull_reply (peer_ctx,
3586                    view_array,
3587                    View_size (channel_ctx->peer_ctx->sub->view));
3588 }
3589
3590
3591 /**
3592  * Check whether we sent a corresponding request and
3593  * whether this reply is the first one.
3594  *
3595  * @param cls Closure - Context of channel
3596  * @param msg Message containing the replied peers
3597  */
3598 static int
3599 check_peer_pull_reply (void *cls,
3600                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3601 {
3602   struct ChannelCtx *channel_ctx = cls;
3603   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3604
3605   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3606   {
3607     GNUNET_break_op (0);
3608     return GNUNET_SYSERR;
3609   }
3610
3611   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3612       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3613   {
3614     LOG (GNUNET_ERROR_TYPE_ERROR,
3615         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3616         ntohl (msg->num_peers),
3617         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3618             sizeof (struct GNUNET_PeerIdentity));
3619     GNUNET_break_op (0);
3620     return GNUNET_SYSERR;
3621   }
3622
3623   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3624                                      &sender_ctx->peer_id,
3625                                      Peers_PULL_REPLY_PENDING))
3626   {
3627     LOG (GNUNET_ERROR_TYPE_WARNING,
3628         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3629         GNUNET_i2s (&sender_ctx->peer_id));
3630     if (sender_ctx->sub == msub)
3631     {
3632       GNUNET_STATISTICS_update (stats,
3633                                 "# unrequested pull replies",
3634                                 1,
3635                                 GNUNET_NO);
3636     }
3637   }
3638   return GNUNET_OK;
3639 }
3640
3641
3642 /**
3643  * Handle PULL REPLY message from another peer.
3644  *
3645  * @param cls Closure
3646  * @param msg The message header
3647  */
3648 static void
3649 handle_peer_pull_reply (void *cls,
3650                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3651 {
3652   const struct ChannelCtx *channel_ctx = cls;
3653   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3654   const struct GNUNET_PeerIdentity *peers;
3655   struct Sub *sub = channel_ctx->peer_ctx->sub;
3656   uint32_t i;
3657 #if ENABLE_MALICIOUS
3658   struct AttackedPeer *tmp_att_peer;
3659 #endif /* ENABLE_MALICIOUS */
3660
3661   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3662   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3663   if (channel_ctx->peer_ctx->sub == msub)
3664   {
3665     GNUNET_STATISTICS_update (stats,
3666                               "# pull reply messages received",
3667                               1,
3668                               GNUNET_NO);
3669     if (NULL != map_single_hop &&
3670         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3671           &channel_ctx->peer_ctx->peer_id))
3672     {
3673       GNUNET_STATISTICS_update (stats,
3674                                 "# pull reply messages received (multi-hop peer)",
3675                                 1,
3676                                 GNUNET_NO);
3677     }
3678   }
3679
3680   #if ENABLE_MALICIOUS
3681   // We shouldn't even receive pull replies as we're not sending
3682   if (2 == mal_type)
3683   {
3684   }
3685   #endif /* ENABLE_MALICIOUS */
3686
3687   /* Do actual logic */
3688   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3689
3690   LOG (GNUNET_ERROR_TYPE_DEBUG,
3691        "PULL REPLY received, got following %u peers:\n",
3692        ntohl (msg->num_peers));
3693
3694   for (i = 0; i < ntohl (msg->num_peers); i++)
3695   {
3696     LOG (GNUNET_ERROR_TYPE_DEBUG,
3697          "%u. %s\n",
3698          i,
3699          GNUNET_i2s (&peers[i]));
3700
3701     #if ENABLE_MALICIOUS
3702     if ((NULL != att_peer_set) &&
3703         (1 == mal_type || 3 == mal_type))
3704     { /* Add attacked peer to local list */
3705       // TODO check if we sent a request and this was the first reply
3706       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3707                                                                &peers[i])
3708           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3709                                                                   &peers[i]))
3710       {
3711         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3712         tmp_att_peer->peer_id = peers[i];
3713         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3714                                      att_peers_tail,
3715                                      tmp_att_peer);
3716         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3717       }
3718       continue;
3719     }
3720     #endif /* ENABLE_MALICIOUS */
3721     /* Make sure we 'know' about this peer */
3722     (void) insert_peer (channel_ctx->peer_ctx->sub,
3723                         &peers[i]);
3724
3725     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3726                                         &peers[i]))
3727     {
3728       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3729                          &peers[i]);
3730     }
3731     else
3732     {
3733       schedule_operation (channel_ctx->peer_ctx,
3734                           insert_in_pull_map,
3735                           channel_ctx->peer_ctx->sub); /* cls */
3736       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3737                                       &peers[i]);
3738     }
3739   }
3740
3741   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3742                                  sender),
3743                    Peers_PULL_REPLY_PENDING);
3744   clean_peer (channel_ctx->peer_ctx->sub,
3745               sender);
3746
3747   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3748                                      sender));
3749   GNUNET_CADET_receive_done (channel_ctx->channel);
3750 }
3751
3752
3753 /**
3754  * Compute a random delay.
3755  * A uniformly distributed value between mean + spread and mean - spread.
3756  *
3757  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3758  * It would return a random value between 2 and 6 min.
3759  *
3760  * @param mean the mean time until the next round
3761  * @param spread the inverse amount of deviation from the mean
3762  */
3763 static struct GNUNET_TIME_Relative
3764 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3765                     unsigned int spread)
3766 {
3767   struct GNUNET_TIME_Relative half_interval;
3768   struct GNUNET_TIME_Relative ret;
3769   unsigned int rand_delay;
3770   unsigned int max_rand_delay;
3771
3772   if (0 == spread)
3773   {
3774     LOG (GNUNET_ERROR_TYPE_WARNING,
3775          "Not accepting spread of 0\n");
3776     GNUNET_break (0);
3777     GNUNET_assert (0);
3778   }
3779   GNUNET_assert (0 != mean.rel_value_us);
3780
3781   /* Compute random time value between spread * mean and spread * mean */
3782   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3783
3784   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3785   /**
3786    * Compute random value between (0 and 1) * round_interval
3787    * via multiplying round_interval with a 'fraction' (0 to value)/value
3788    */
3789   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3790   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3791   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3792   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3793
3794   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3795     LOG (GNUNET_ERROR_TYPE_WARNING,
3796          "Returning FOREVER_REL\n");
3797
3798   return ret;
3799 }
3800
3801
3802 /**
3803  * Send single pull request
3804  *
3805  * @param peer_ctx Context to the peer to send request to
3806  */
3807 static void
3808 send_pull_request (struct PeerContext *peer_ctx)
3809 {
3810   struct GNUNET_MQ_Envelope *ev;
3811
3812   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3813                                                &peer_ctx->peer_id,
3814                                                Peers_PULL_REPLY_PENDING));
3815   SET_PEER_FLAG (peer_ctx,
3816                  Peers_PULL_REPLY_PENDING);
3817   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3818
3819   LOG (GNUNET_ERROR_TYPE_DEBUG,
3820        "Going to send PULL REQUEST to peer %s.\n",
3821        GNUNET_i2s (&peer_ctx->peer_id));
3822
3823   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3824   send_message (peer_ctx,
3825                 ev,
3826                 "PULL REQUEST");
3827   if (peer_ctx->sub)
3828   {
3829     GNUNET_STATISTICS_update (stats,
3830                               "# pull request send issued",
3831                               1,
3832                               GNUNET_NO);
3833     if (NULL != map_single_hop &&
3834         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3835                                                              &peer_ctx->peer_id))
3836     {
3837       GNUNET_STATISTICS_update (stats,
3838                                 "# pull request send issued (multi-hop peer)",
3839                                 1,
3840                                 GNUNET_NO);
3841     }
3842   }
3843 }
3844
3845
3846 /**
3847  * Send single push
3848  *
3849  * @param peer_ctx Context of peer to send push to
3850  */
3851 static void
3852 send_push (struct PeerContext *peer_ctx)
3853 {
3854   struct GNUNET_MQ_Envelope *ev;
3855
3856   LOG (GNUNET_ERROR_TYPE_DEBUG,
3857        "Going to send PUSH to peer %s.\n",
3858        GNUNET_i2s (&peer_ctx->peer_id));
3859
3860   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3861   send_message (peer_ctx, ev, "PUSH");
3862   if (peer_ctx->sub)
3863   {
3864     GNUNET_STATISTICS_update (stats,
3865                               "# push send issued",
3866                               1,
3867                               GNUNET_NO);
3868   }
3869 }
3870
3871
3872 #if ENABLE_MALICIOUS
3873
3874
3875 /**
3876  * @brief This function is called, when the client tells us to act malicious.
3877  * It verifies that @a msg is well-formed.
3878  *
3879  * @param cls the closure (#ClientContext)
3880  * @param msg the message
3881  * @return #GNUNET_OK if @a msg is well-formed
3882  */
3883 static int
3884 check_client_act_malicious (void *cls,
3885                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3886 {
3887   struct ClientContext *cli_ctx = cls;
3888   uint16_t msize = ntohs (msg->header.size);
3889   uint32_t num_peers = ntohl (msg->num_peers);
3890
3891   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3892   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3893        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3894   {
3895     LOG (GNUNET_ERROR_TYPE_ERROR,
3896         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3897         ntohl (msg->num_peers),
3898         (msize / sizeof (struct GNUNET_PeerIdentity)));
3899     GNUNET_break (0);
3900     GNUNET_SERVICE_client_drop (cli_ctx->client);
3901     return GNUNET_SYSERR;
3902   }
3903   return GNUNET_OK;
3904 }
3905
3906 /**
3907  * Turn RPS service to act malicious.
3908  *
3909  * @param cls Closure
3910  * @param client The client that sent the message
3911  * @param msg The message header
3912  */
3913 static void
3914 handle_client_act_malicious (void *cls,
3915                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3916 {
3917   struct ClientContext *cli_ctx = cls;
3918   struct GNUNET_PeerIdentity *peers;
3919   uint32_t num_mal_peers_sent;
3920   uint32_t num_mal_peers_old;
3921   struct Sub *sub = cli_ctx->sub;
3922
3923   if (NULL == sub) sub = msub;
3924   /* Do actual logic */
3925   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3926   mal_type = ntohl (msg->type);
3927   if (NULL == mal_peer_set)
3928     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3929
3930   LOG (GNUNET_ERROR_TYPE_DEBUG,
3931        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3932        mal_type,
3933        ntohl (msg->num_peers));
3934
3935   if (1 == mal_type)
3936   { /* Try to maximise representation */
3937     /* Add other malicious peers to those we already know */
3938
3939     num_mal_peers_sent = ntohl (msg->num_peers);
3940     num_mal_peers_old = num_mal_peers;
3941     GNUNET_array_grow (mal_peers,
3942                        num_mal_peers,
3943                        num_mal_peers + num_mal_peers_sent);
3944     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3945             peers,
3946             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3947
3948     /* Add all mal peers to mal_peer_set */
3949     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3950                            num_mal_peers_sent,
3951                            mal_peer_set);
3952
3953     /* Substitute do_round () with do_mal_round () */
3954     GNUNET_assert (NULL != sub->do_round_task);
3955     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3956     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3957   }
3958
3959   else if ( (2 == mal_type) ||
3960             (3 == mal_type) )
3961   { /* Try to partition the network */
3962     /* Add other malicious peers to those we already know */
3963
3964     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3965     num_mal_peers_old = num_mal_peers;
3966     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3967     GNUNET_array_grow (mal_peers,
3968                        num_mal_peers,
3969                        num_mal_peers + num_mal_peers_sent);
3970     if (NULL != mal_peers &&
3971         0 != num_mal_peers)
3972     {
3973       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3974               peers,
3975               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3976
3977       /* Add all mal peers to mal_peer_set */
3978       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3979                              num_mal_peers_sent,
3980                              mal_peer_set);
3981     }
3982
3983     /* Store the one attacked peer */
3984     GNUNET_memcpy (&attacked_peer,
3985             &msg->attacked_peer,
3986             sizeof (struct GNUNET_PeerIdentity));
3987     /* Set the flag of the attacked peer to valid to avoid problems */
3988     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3989     {
3990       (void) issue_peer_online_check (sub, &attacked_peer);
3991     }
3992
3993     LOG (GNUNET_ERROR_TYPE_DEBUG,
3994          "Attacked peer is %s\n",
3995          GNUNET_i2s (&attacked_peer));
3996
3997     /* Substitute do_round () with do_mal_round () */
3998     if (NULL != sub->do_round_task)
3999     {
4000       /* Probably in shutdown */
4001       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4002       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4003     }
4004   }
4005   else if (0 == mal_type)
4006   { /* Stop acting malicious */
4007     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4008
4009     /* Substitute do_mal_round () with do_round () */
4010     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4011     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4012   }
4013   else
4014   {
4015     GNUNET_break (0);
4016     GNUNET_SERVICE_client_continue (cli_ctx->client);
4017   }
4018   GNUNET_SERVICE_client_continue (cli_ctx->client);
4019 }
4020
4021
4022 /**
4023  * Send out PUSHes and PULLs maliciously.
4024  *
4025  * This is executed regylary.
4026  *
4027  * @param cls Closure - Sub
4028  */
4029 static void
4030 do_mal_round (void *cls)
4031 {
4032   uint32_t num_pushes;
4033   uint32_t i;
4034   struct GNUNET_TIME_Relative time_next_round;
4035   struct AttackedPeer *tmp_att_peer;
4036   struct Sub *sub = cls;
4037
4038   LOG (GNUNET_ERROR_TYPE_DEBUG,
4039        "Going to execute next round maliciously type %" PRIu32 ".\n",
4040       mal_type);
4041   sub->do_round_task = NULL;
4042   GNUNET_assert (mal_type <= 3);
4043   /* Do malicious actions */
4044   if (1 == mal_type)
4045   { /* Try to maximise representation */
4046
4047     /* The maximum of pushes we're going to send this round */
4048     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4049                                          num_attacked_peers),
4050                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4051
4052     LOG (GNUNET_ERROR_TYPE_DEBUG,
4053          "Going to send %" PRIu32 " pushes\n",
4054          num_pushes);
4055
4056     /* Send PUSHes to attacked peers */
4057     for (i = 0 ; i < num_pushes ; i++)
4058     {
4059       if (att_peers_tail == att_peer_index)
4060         att_peer_index = att_peers_head;
4061       else
4062         att_peer_index = att_peer_index->next;
4063
4064       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4065     }
4066
4067     /* Send PULLs to some peers to learn about additional peers to attack */
4068     tmp_att_peer = att_peer_index;
4069     for (i = 0 ; i < num_pushes * alpha ; i++)
4070     {
4071       if (att_peers_tail == tmp_att_peer)
4072         tmp_att_peer = att_peers_head;
4073       else
4074         att_peer_index = tmp_att_peer->next;
4075
4076       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4077     }
4078   }
4079
4080
4081   else if (2 == mal_type)
4082   { /**
4083      * Try to partition the network
4084      * Send as many pushes to the attacked peer as possible
4085      * That is one push per round as it will ignore more.
4086      */
4087     (void) issue_peer_online_check (sub, &attacked_peer);
4088     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4089                                        &attacked_peer,
4090                                        Peers_ONLINE))
4091       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4092   }
4093
4094
4095   if (3 == mal_type)
4096   { /* Combined attack */
4097
4098     /* Send PUSH to attacked peers */
4099     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4100     {
4101       (void) issue_peer_online_check (sub, &attacked_peer);
4102       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4103                                          &attacked_peer,
4104                                          Peers_ONLINE))
4105       {
4106         LOG (GNUNET_ERROR_TYPE_DEBUG,
4107             "Goding to send push to attacked peer (%s)\n",
4108             GNUNET_i2s (&attacked_peer));
4109         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4110       }
4111     }
4112     (void) issue_peer_online_check (sub, &attacked_peer);
4113
4114     /* The maximum of pushes we're going to send this round */
4115     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4116                                          num_attacked_peers),
4117                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4118
4119     LOG (GNUNET_ERROR_TYPE_DEBUG,
4120          "Going to send %" PRIu32 " pushes\n",
4121          num_pushes);
4122
4123     for (i = 0; i < num_pushes; i++)
4124     {
4125       if (att_peers_tail == att_peer_index)
4126         att_peer_index = att_peers_head;
4127       else
4128         att_peer_index = att_peer_index->next;
4129
4130       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4131     }
4132
4133     /* Send PULLs to some peers to learn about additional peers to attack */
4134     tmp_att_peer = att_peer_index;
4135     for (i = 0; i < num_pushes * alpha; i++)
4136     {
4137       if (att_peers_tail == tmp_att_peer)
4138         tmp_att_peer = att_peers_head;
4139       else
4140         att_peer_index = tmp_att_peer->next;
4141
4142       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4143     }
4144   }
4145
4146   /* Schedule next round */
4147   time_next_round = compute_rand_delay (sub->round_interval, 2);
4148
4149   GNUNET_assert (NULL == sub->do_round_task);
4150   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4151                                                     &do_mal_round, sub);
4152   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4153 }
4154 #endif /* ENABLE_MALICIOUS */
4155
4156
4157 /**
4158  * Send out PUSHes and PULLs, possibly update #view, samplers.
4159  *
4160  * This is executed regylary.
4161  *
4162  * @param cls Closure - Sub
4163  */
4164 static void
4165 do_round (void *cls)
4166 {
4167   unsigned int i;
4168   const struct GNUNET_PeerIdentity *view_array;
4169   unsigned int *permut;
4170   unsigned int a_peers; /* Number of peers we send pushes to */
4171   unsigned int b_peers; /* Number of peers we send pull requests to */
4172   uint32_t first_border;
4173   uint32_t second_border;
4174   struct GNUNET_PeerIdentity peer;
4175   struct GNUNET_PeerIdentity *update_peer;
4176   struct Sub *sub = cls;
4177
4178   sub->num_rounds++;
4179   LOG (GNUNET_ERROR_TYPE_DEBUG,
4180        "Going to execute next round.\n");
4181   if (sub == msub)
4182   {
4183     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4184   }
4185   sub->do_round_task = NULL;
4186   LOG (GNUNET_ERROR_TYPE_DEBUG,
4187        "Printing view:\n");
4188   to_file (sub->file_name_view_log,
4189            "___ new round ___");
4190   view_array = View_get_as_array (sub->view);
4191   for (i = 0; i < View_size (sub->view); i++)
4192   {
4193     LOG (GNUNET_ERROR_TYPE_DEBUG,
4194          "\t%s\n", GNUNET_i2s (&view_array[i]));
4195     to_file (sub->file_name_view_log,
4196              "=%s\t(do round)",
4197              GNUNET_i2s_full (&view_array[i]));
4198   }
4199
4200
4201   /* Send pushes and pull requests */
4202   if (0 < View_size (sub->view))
4203   {
4204     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4205                                            View_size (sub->view));
4206
4207     /* Send PUSHes */
4208     a_peers = ceil (alpha * View_size (sub->view));
4209
4210     LOG (GNUNET_ERROR_TYPE_DEBUG,
4211          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4212          a_peers, alpha, View_size (sub->view));
4213     for (i = 0; i < a_peers; i++)
4214     {
4215       peer = view_array[permut[i]];
4216       // FIXME if this fails schedule/loop this for later
4217       send_push (get_peer_ctx (sub->peer_map, &peer));
4218     }
4219
4220     /* Send PULL requests */
4221     b_peers = ceil (beta * View_size (sub->view));
4222     first_border = a_peers;
4223     second_border = a_peers + b_peers;
4224     if (second_border > View_size (sub->view))
4225     {
4226       first_border = View_size (sub->view) - b_peers;
4227       second_border = View_size (sub->view);
4228     }
4229     LOG (GNUNET_ERROR_TYPE_DEBUG,
4230         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4231         b_peers, beta, View_size (sub->view));
4232     for (i = first_border; i < second_border; i++)
4233     {
4234       peer = view_array[permut[i]];
4235       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4236                                          &peer,
4237                                          Peers_PULL_REPLY_PENDING))
4238       { // FIXME if this fails schedule/loop this for later
4239         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4240       }
4241     }
4242
4243     GNUNET_free (permut);
4244     permut = NULL;
4245   }
4246
4247
4248   /* Update view */
4249   /* TODO see how many peers are in push-/pull- list! */
4250
4251   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4252       (0 < CustomPeerMap_size (sub->push_map)) &&
4253       (0 < CustomPeerMap_size (sub->pull_map)))
4254   { /* If conditions for update are fulfilled, update */
4255     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4256
4257     uint32_t final_size;
4258     uint32_t peers_to_clean_size;
4259     struct GNUNET_PeerIdentity *peers_to_clean;
4260
4261     peers_to_clean = NULL;
4262     peers_to_clean_size = 0;
4263     GNUNET_array_grow (peers_to_clean,
4264                        peers_to_clean_size,
4265                        View_size (sub->view));
4266     GNUNET_memcpy (peers_to_clean,
4267             view_array,
4268             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4269
4270     /* Seems like recreating is the easiest way of emptying the peermap */
4271     View_clear (sub->view);
4272     to_file (sub->file_name_view_log,
4273              "--- emptied ---");
4274
4275     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4276                                 CustomPeerMap_size (sub->push_map));
4277     second_border = first_border +
4278                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4279                                 CustomPeerMap_size (sub->pull_map));
4280     final_size    = second_border +
4281       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4282     LOG (GNUNET_ERROR_TYPE_DEBUG,
4283         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4284         first_border,
4285         second_border,
4286         final_size);
4287
4288     /* Update view with peers received through PUSHes */
4289     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4290                                            CustomPeerMap_size (sub->push_map));
4291     for (i = 0; i < first_border; i++)
4292     {
4293       int inserted;
4294       inserted = insert_in_view (sub,
4295                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4296                                                                   permut[i]));
4297       if (GNUNET_OK == inserted)
4298       {
4299         clients_notify_stream_peer (sub,
4300             1,
4301             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4302       }
4303       to_file (sub->file_name_view_log,
4304                "+%s\t(push list)",
4305                GNUNET_i2s_full (&view_array[i]));
4306       // TODO change the peer_flags accordingly
4307     }
4308     GNUNET_free (permut);
4309     permut = NULL;
4310
4311     /* Update view with peers received through PULLs */
4312     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4313                                            CustomPeerMap_size (sub->pull_map));
4314     for (i = first_border; i < second_border; i++)
4315     {
4316       int inserted;
4317       inserted = insert_in_view (sub,
4318           CustomPeerMap_get_peer_by_index (sub->pull_map,
4319                                            permut[i - first_border]));
4320       if (GNUNET_OK == inserted)
4321       {
4322         clients_notify_stream_peer (sub,
4323             1,
4324             CustomPeerMap_get_peer_by_index (sub->pull_map,
4325                                              permut[i - first_border]));
4326       }
4327       to_file (sub->file_name_view_log,
4328                "+%s\t(pull list)",
4329                GNUNET_i2s_full (&view_array[i]));
4330       // TODO change the peer_flags accordingly
4331     }
4332     GNUNET_free (permut);
4333     permut = NULL;
4334
4335     /* Update view with peers from history */
4336     RPS_sampler_get_n_rand_peers (sub->sampler,
4337                                   final_size - second_border,
4338                                   hist_update,
4339                                   sub);
4340     // TODO change the peer_flags accordingly
4341
4342     for (i = 0; i < View_size (sub->view); i++)
4343       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4344
4345     /* Clean peers that were removed from the view */
4346     for (i = 0; i < peers_to_clean_size; i++)
4347     {
4348       to_file (sub->file_name_view_log,
4349                "-%s",
4350                GNUNET_i2s_full (&peers_to_clean[i]));
4351       clean_peer (sub, &peers_to_clean[i]);
4352     }
4353
4354     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4355     clients_notify_view_update (sub);
4356   } else {
4357     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4358     if (sub == msub)
4359     {
4360       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4361       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4362           !(0 >= CustomPeerMap_size (sub->pull_map)))
4363         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4364       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4365           (0 >= CustomPeerMap_size (sub->pull_map)))
4366         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4367       if (0 >= CustomPeerMap_size (sub->push_map) &&
4368           !(0 >= CustomPeerMap_size (sub->pull_map)))
4369         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4370       if (0 >= CustomPeerMap_size (sub->push_map) &&
4371           (0 >= CustomPeerMap_size (sub->pull_map)))
4372         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4373       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4374           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4375           0 >= CustomPeerMap_size (sub->push_map))
4376         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4377     }
4378   }
4379   // TODO independent of that also get some peers from CADET_get_peers()?
4380   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4381   {
4382     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4383   }
4384   else
4385   {
4386     LOG (GNUNET_ERROR_TYPE_WARNING,
4387          "Push map size too big for histogram (%u, %u)\n",
4388          CustomPeerMap_size (sub->push_map),
4389          HISTOGRAM_FILE_SLOTS);
4390   }
4391   // FIXME check bounds of histogram
4392   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4393                    (alpha * sub->view_size_est_need)) +
4394                           (HISTOGRAM_FILE_SLOTS/2)]++;
4395   if (sub == msub)
4396   {
4397     GNUNET_STATISTICS_set (stats,
4398         "# peers in push map at end of round",
4399         CustomPeerMap_size (sub->push_map),
4400         GNUNET_NO);
4401     GNUNET_STATISTICS_set (stats,
4402         "# peers in pull map at end of round",
4403         CustomPeerMap_size (sub->pull_map),
4404         GNUNET_NO);
4405     GNUNET_STATISTICS_set (stats,
4406         "# peers in view at end of round",
4407         View_size (sub->view),
4408         GNUNET_NO);
4409     GNUNET_STATISTICS_set (stats,
4410         "# expected pushes",
4411         alpha * sub->view_size_est_need,
4412         GNUNET_NO);
4413     GNUNET_STATISTICS_set (stats,
4414         "delta expected - received pushes",
4415         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4416         GNUNET_NO);
4417   }
4418
4419   LOG (GNUNET_ERROR_TYPE_DEBUG,
4420        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4421        CustomPeerMap_size (sub->push_map),
4422        CustomPeerMap_size (sub->pull_map),
4423        alpha,
4424        View_size (sub->view),
4425        alpha * View_size (sub->view));
4426
4427   /* Update samplers */
4428   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4429   {
4430     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4431     LOG (GNUNET_ERROR_TYPE_DEBUG,
4432          "Updating with peer %s from push list\n",
4433          GNUNET_i2s (update_peer));
4434     insert_in_sampler (sub, update_peer);
4435     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4436   }
4437
4438   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4439   {
4440     LOG (GNUNET_ERROR_TYPE_DEBUG,
4441          "Updating with peer %s from pull list\n",
4442          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4443     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4444     /* This cleans only if it is not in the view */
4445     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4446   }
4447
4448
4449   /* Empty push/pull lists */
4450   CustomPeerMap_clear (sub->push_map);
4451   CustomPeerMap_clear (sub->pull_map);
4452
4453   if (sub == msub)
4454   {
4455     GNUNET_STATISTICS_set (stats,
4456                            "view size",
4457                            View_size(sub->view),
4458                            GNUNET_NO);
4459   }
4460
4461   struct GNUNET_TIME_Relative time_next_round;
4462
4463   time_next_round = compute_rand_delay (sub->round_interval, 2);
4464
4465   /* Schedule next round */
4466   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4467                                                      &do_round, sub);
4468   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4469 }
4470
4471
4472 /**
4473  * This is called from GNUNET_CADET_get_peers().
4474  *
4475  * It is called on every peer(ID) that cadet somehow has contact with.
4476  * We use those to initialise the sampler.
4477  *
4478  * implements #GNUNET_CADET_PeersCB
4479  *
4480  * @param cls Closure - Sub
4481  * @param peer Peer, or NULL on "EOF".
4482  * @param tunnel Do we have a tunnel towards this peer?
4483  * @param n_paths Number of known paths towards this peer.
4484  * @param best_path How long is the best path?
4485  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4486  */
4487 void
4488 init_peer_cb (void *cls,
4489               const struct GNUNET_PeerIdentity *peer,
4490               int tunnel, /* "Do we have a tunnel towards this peer?" */
4491               unsigned int n_paths, /* "Number of known paths towards this peer" */
4492               unsigned int best_path) /* "How long is the best path?
4493                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4494 {
4495   struct Sub *sub = cls;
4496   (void) tunnel;
4497   (void) n_paths;
4498   (void) best_path;
4499
4500   if (NULL != peer)
4501   {
4502     LOG (GNUNET_ERROR_TYPE_DEBUG,
4503          "Got peer_id %s from cadet\n",
4504          GNUNET_i2s (peer));
4505     got_peer (sub, peer);
4506   }
4507 }
4508
4509
4510 /**
4511  * @brief Iterator function over stored, valid peers.
4512  *
4513  * We initialise the sampler with those.
4514  *
4515  * @param cls Closure - Sub
4516  * @param peer the peer id
4517  * @return #GNUNET_YES if we should continue to
4518  *         iterate,
4519  *         #GNUNET_NO if not.
4520  */
4521 static int
4522 valid_peers_iterator (void *cls,
4523                       const struct GNUNET_PeerIdentity *peer)
4524 {
4525   struct Sub *sub = cls;
4526
4527   if (NULL != peer)
4528   {
4529     LOG (GNUNET_ERROR_TYPE_DEBUG,
4530          "Got stored, valid peer %s\n",
4531          GNUNET_i2s (peer));
4532     got_peer (sub, peer);
4533   }
4534   return GNUNET_YES;
4535 }
4536
4537
4538 /**
4539  * Iterator over peers from peerinfo.
4540  *
4541  * @param cls Closure - Sub
4542  * @param peer id of the peer, NULL for last call
4543  * @param hello hello message for the peer (can be NULL)
4544  * @param error message
4545  */
4546 void
4547 process_peerinfo_peers (void *cls,
4548                         const struct GNUNET_PeerIdentity *peer,
4549                         const struct GNUNET_HELLO_Message *hello,
4550                         const char *err_msg)
4551 {
4552   struct Sub *sub = cls;
4553   (void) hello;
4554   (void) err_msg;
4555
4556   if (NULL != peer)
4557   {
4558     LOG (GNUNET_ERROR_TYPE_DEBUG,
4559          "Got peer_id %s from peerinfo\n",
4560          GNUNET_i2s (peer));
4561     got_peer (sub, peer);
4562   }
4563 }
4564
4565
4566 /**
4567  * Task run during shutdown.
4568  *
4569  * @param cls Closure - unused
4570  */
4571 static void
4572 shutdown_task (void *cls)
4573 {
4574   (void) cls;
4575   struct ClientContext *client_ctx;
4576
4577   LOG (GNUNET_ERROR_TYPE_DEBUG,
4578        "RPS service is going down\n");
4579
4580   /* Clean all clients */
4581   for (client_ctx = cli_ctx_head;
4582        NULL != cli_ctx_head;
4583        client_ctx = cli_ctx_head)
4584   {
4585     destroy_cli_ctx (client_ctx);
4586   }
4587   if (NULL != msub)
4588   {
4589     destroy_sub (msub);
4590     msub = NULL;
4591   }
4592
4593   /* Disconnect from other services */
4594   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4595   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4596   peerinfo_handle = NULL;
4597   GNUNET_NSE_disconnect (nse);
4598   if (NULL != map_single_hop)
4599   {
4600     /* core_init was called - core was initialised */
4601     /* disconnect first, so no callback tries to access missing peermap */
4602     GNUNET_CORE_disconnect (core_handle);
4603     core_handle = NULL;
4604     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4605     map_single_hop = NULL;
4606   }
4607
4608   if (NULL != stats)
4609   {
4610     GNUNET_STATISTICS_destroy (stats,
4611                                GNUNET_NO);
4612     stats = NULL;
4613   }
4614   GNUNET_CADET_disconnect (cadet_handle);
4615   cadet_handle = NULL;
4616 #if ENABLE_MALICIOUS
4617   struct AttackedPeer *tmp_att_peer;
4618   GNUNET_array_grow (mal_peers,
4619                      num_mal_peers,
4620                      0);
4621   if (NULL != mal_peer_set)
4622     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4623   if (NULL != att_peer_set)
4624     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4625   while (NULL != att_peers_head)
4626   {
4627     tmp_att_peer = att_peers_head;
4628     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4629                                  att_peers_tail,
4630                                  tmp_att_peer);
4631     GNUNET_free (tmp_att_peer);
4632   }
4633 #endif /* ENABLE_MALICIOUS */
4634   close_all_files();
4635 }
4636
4637
4638 /**
4639  * Handle client connecting to the service.
4640  *
4641  * @param cls unused
4642  * @param client the new client
4643  * @param mq the message queue of @a client
4644  * @return @a client
4645  */
4646 static void *
4647 client_connect_cb (void *cls,
4648                    struct GNUNET_SERVICE_Client *client,
4649                    struct GNUNET_MQ_Handle *mq)
4650 {
4651   struct ClientContext *cli_ctx;
4652   (void) cls;
4653
4654   LOG (GNUNET_ERROR_TYPE_DEBUG,
4655        "Client connected\n");
4656   if (NULL == client)
4657     return client; /* Server was destroyed before a client connected. Shutting down */
4658   cli_ctx = GNUNET_new (struct ClientContext);
4659   cli_ctx->mq = mq;
4660   cli_ctx->view_updates_left = -1;
4661   cli_ctx->stream_update = GNUNET_NO;
4662   cli_ctx->client = client;
4663   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4664                                cli_ctx_tail,
4665                                cli_ctx);
4666   return cli_ctx;
4667 }
4668
4669 /**
4670  * Callback called when a client disconnected from the service
4671  *
4672  * @param cls closure for the service
4673  * @param c the client that disconnected
4674  * @param internal_cls should be equal to @a c
4675  */
4676 static void
4677 client_disconnect_cb (void *cls,
4678                       struct GNUNET_SERVICE_Client *client,
4679                       void *internal_cls)
4680 {
4681   struct ClientContext *cli_ctx = internal_cls;
4682
4683   (void) cls;
4684   GNUNET_assert (client == cli_ctx->client);
4685   if (NULL == client)
4686   {/* shutdown task - destroy all clients */
4687     while (NULL != cli_ctx_head)
4688       destroy_cli_ctx (cli_ctx_head);
4689   }
4690   else
4691   { /* destroy this client */
4692     LOG (GNUNET_ERROR_TYPE_DEBUG,
4693         "Client disconnected. Destroy its context.\n");
4694     destroy_cli_ctx (cli_ctx);
4695   }
4696 }
4697
4698
4699 /**
4700  * Handle random peer sampling clients.
4701  *
4702  * @param cls closure
4703  * @param c configuration to use
4704  * @param service the initialized service
4705  */
4706 static void
4707 run (void *cls,
4708      const struct GNUNET_CONFIGURATION_Handle *c,
4709      struct GNUNET_SERVICE_Handle *service)
4710 {
4711   struct GNUNET_TIME_Relative round_interval;
4712   long long unsigned int sampler_size;
4713   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4714   struct GNUNET_HashCode hash;
4715
4716   (void) cls;
4717   (void) service;
4718
4719   GNUNET_log_setup ("rps",
4720                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4721                     NULL);
4722   cfg = c;
4723   /* Get own ID */
4724   GNUNET_CRYPTO_get_peer_identity (cfg,
4725                                    &own_identity); // TODO check return value
4726   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4727               "STARTING SERVICE (rps) for peer [%s]\n",
4728               GNUNET_i2s (&own_identity));
4729 #if ENABLE_MALICIOUS
4730   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4731               "Malicious execution compiled in.\n");
4732 #endif /* ENABLE_MALICIOUS */
4733
4734   /* Get time interval from the configuration */
4735   if (GNUNET_OK !=
4736       GNUNET_CONFIGURATION_get_value_time (cfg,
4737                                            "RPS",
4738                                            "ROUNDINTERVAL",
4739                                            &round_interval))
4740   {
4741     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4742                                "RPS", "ROUNDINTERVAL");
4743     GNUNET_SCHEDULER_shutdown ();
4744     return;
4745   }
4746
4747   /* Get initial size of sampler/view from the configuration */
4748   if (GNUNET_OK !=
4749       GNUNET_CONFIGURATION_get_value_number (cfg,
4750                                              "RPS",
4751                                              "MINSIZE",
4752                                              &sampler_size))
4753   {
4754     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4755                                "RPS", "MINSIZE");
4756     GNUNET_SCHEDULER_shutdown ();
4757     return;
4758   }
4759
4760   cadet_handle = GNUNET_CADET_connect (cfg);
4761   GNUNET_assert (NULL != cadet_handle);
4762   core_handle = GNUNET_CORE_connect (cfg,
4763                                      NULL, /* cls */
4764                                      core_init, /* init */
4765                                      core_connects, /* connects */
4766                                      core_disconnects, /* disconnects */
4767                                      NULL); /* handlers */
4768   GNUNET_assert (NULL != core_handle);
4769
4770
4771   alpha = 0.45;
4772   beta  = 0.45;
4773
4774
4775   /* Set up main Sub */
4776   GNUNET_CRYPTO_hash (hash_port_string,
4777                       strlen (hash_port_string),
4778                       &hash);
4779   msub = new_sub (&hash,
4780                  sampler_size, /* Will be overwritten by config */
4781                  round_interval);
4782
4783
4784   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4785
4786   /* connect to NSE */
4787   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4788
4789   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4790   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4791   // TODO send push/pull to each of those peers?
4792   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4793   restore_valid_peers (msub);
4794   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4795
4796   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4797                                                    GNUNET_NO,
4798                                                    process_peerinfo_peers,
4799                                                    msub);
4800
4801   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4802
4803   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4804   stats = GNUNET_STATISTICS_create ("rps", cfg);
4805 }
4806
4807
4808 /**
4809  * Define "main" method using service macro.
4810  */
4811 GNUNET_SERVICE_MAIN
4812 ("rps",
4813  GNUNET_SERVICE_OPTION_NONE,
4814  &run,
4815  &client_connect_cb,
4816  &client_disconnect_cb,
4817  NULL,
4818  GNUNET_MQ_hd_var_size (client_seed,
4819    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4820    struct GNUNET_RPS_CS_SeedMessage,
4821    NULL),
4822 #if ENABLE_MALICIOUS
4823  GNUNET_MQ_hd_var_size (client_act_malicious,
4824    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4825    struct GNUNET_RPS_CS_ActMaliciousMessage,
4826    NULL),
4827 #endif /* ENABLE_MALICIOUS */
4828  GNUNET_MQ_hd_fixed_size (client_view_request,
4829    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4830    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4831    NULL),
4832  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4833    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4834    struct GNUNET_MessageHeader,
4835    NULL),
4836  GNUNET_MQ_hd_fixed_size (client_stream_request,
4837    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4838    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4839    NULL),
4840  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4841    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4842    struct GNUNET_MessageHeader,
4843    NULL),
4844  GNUNET_MQ_hd_fixed_size (client_start_sub,
4845    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4846    struct GNUNET_RPS_CS_SubStartMessage,
4847    NULL),
4848  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4849    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4850    struct GNUNET_RPS_CS_SubStopMessage,
4851    NULL),
4852  GNUNET_MQ_handler_end());
4853
4854 /* end of gnunet-service-rps.c */