fix /tmp/rps directory creation, fix linkage, DCE
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357   /**
358    * Name to log view to
359    */
360   char *file_name_view_log;
361
362 #ifdef TO_FILE
363   /**
364    * Name to log number of observed peers to
365    */
366   char *file_name_observed_log;
367
368   /**
369    * @brief Count the observed peers
370    */
371   uint32_t num_observed_peers;
372
373   /**
374    * @brief Multipeermap (ab-) used to count unique peer_ids
375    */
376   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
377 #endif /* TO_FILE */
378
379   /**
380    * List to store peers received through pushes temporary.
381    */
382   struct CustomPeerMap *push_map;
383
384   /**
385    * List to store peers received through pulls temporary.
386    */
387   struct CustomPeerMap *pull_map;
388
389   /**
390    * @brief This is the estimate used as view size.
391    *
392    * It is initialised with the minimum
393    */
394   unsigned int view_size_est_need;
395
396   /**
397    * @brief This is the minimum estimate used as view size.
398    *
399    * It is configured by the user.
400    */
401   unsigned int view_size_est_min;
402
403   /**
404    * @brief The view.
405    */
406   struct View *view;
407
408   /**
409    * Identifier for the main task that runs periodically.
410    */
411   struct GNUNET_SCHEDULER_Task *do_round_task;
412
413   /* === stats === */
414
415   /**
416    * @brief Counts the executed rounds.
417    */
418   uint32_t num_rounds;
419
420   /**
421    * @brief This array accumulates the number of received pushes per round.
422    *
423    * Number at index i represents the number of rounds with i observed pushes.
424    */
425   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
426
427   /**
428    * @brief Histogram of deltas between the expected and actual number of
429    * received pushes.
430    *
431    * As half of the entries are expected to be negative, this is shifted by
432    * #HISTOGRAM_FILE_SLOTS/2.
433    */
434   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
435
436   /**
437    * @brief Number of pull replies with this delay measured in rounds.
438    *
439    * Number at index i represents the number of pull replies with a delay of i
440    * rounds.
441    */
442   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
443 };
444
445
446 /***********************************************************************
447  * Globals
448 ***********************************************************************/
449
450 /**
451  * Our configuration.
452  */
453 static const struct GNUNET_CONFIGURATION_Handle *cfg;
454
455 /**
456  * Handle to the statistics service.
457  */
458 struct GNUNET_STATISTICS_Handle *stats;
459
460 /**
461  * Handler to CADET.
462  */
463 struct GNUNET_CADET_Handle *cadet_handle;
464
465 /**
466  * Handle to CORE
467  */
468 struct GNUNET_CORE_Handle *core_handle;
469
470 /**
471  * @brief PeerMap to keep track of connected peers.
472  */
473 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
474
475 /**
476  * Our own identity.
477  */
478 static struct GNUNET_PeerIdentity own_identity;
479
480 /**
481  * Percentage of total peer number in the view
482  * to send random PUSHes to
483  */
484 static float alpha;
485
486 /**
487  * Percentage of total peer number in the view
488  * to send random PULLs to
489  */
490 static float beta;
491
492 /**
493  * Handler to NSE.
494  */
495 static struct GNUNET_NSE_Handle *nse;
496
497 /**
498  * Handler to PEERINFO.
499  */
500 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
501
502 /**
503  * Handle for cancellation of iteration over peers.
504  */
505 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
506
507
508 #if ENABLE_MALICIOUS
509 /**
510  * Type of malicious peer
511  *
512  * 0 Don't act malicious at all - Default
513  * 1 Try to maximise representation
514  * 2 Try to partition the network
515  * 3 Combined attack
516  */
517 static uint32_t mal_type;
518
519 /**
520  * Other malicious peers
521  */
522 static struct GNUNET_PeerIdentity *mal_peers;
523
524 /**
525  * Hashmap of malicious peers used as set.
526  * Used to more efficiently check whether we know that peer.
527  */
528 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
529
530 /**
531  * Number of other malicious peers
532  */
533 static uint32_t num_mal_peers;
534
535
536 /**
537  * If type is 2 this is the DLL of attacked peers
538  */
539 static struct AttackedPeer *att_peers_head;
540 static struct AttackedPeer *att_peers_tail;
541
542 /**
543  * This index is used to point to an attacked peer to
544  * implement the round-robin-ish way to select attacked peers.
545  */
546 static struct AttackedPeer *att_peer_index;
547
548 /**
549  * Hashmap of attacked peers used as set.
550  * Used to more efficiently check whether we know that peer.
551  */
552 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
553
554 /**
555  * Number of attacked peers
556  */
557 static uint32_t num_attacked_peers;
558
559 /**
560  * If type is 1 this is the attacked peer
561  */
562 static struct GNUNET_PeerIdentity attacked_peer;
563
564 /**
565  * The limit of PUSHes we can send in one round.
566  * This is an assumption of the Brahms protocol and either implemented
567  * via proof of work
568  * or
569  * assumend to be the bandwidth limitation.
570  */
571 static uint32_t push_limit = 10000;
572 #endif /* ENABLE_MALICIOUS */
573
574 /**
575  * @brief Main Sub.
576  *
577  * This is run in any case by all peers and connects to all peers without
578  * specifying a shared value.
579  */
580 static struct Sub *msub;
581
582 /**
583  * @brief Maximum number of valid peers to keep.
584  * TODO read from config
585  */
586 static const uint32_t num_valid_peers_max = UINT32_MAX;
587
588 /***********************************************************************
589  * /Globals
590 ***********************************************************************/
591
592
593 static void
594 do_round (void *cls);
595
596 static void
597 do_mal_round (void *cls);
598
599
600 /**
601  * @brief Get the #PeerContext associated with a peer
602  *
603  * @param peer_map The peer map containing the context
604  * @param peer the peer id
605  *
606  * @return the #PeerContext
607  */
608 static struct PeerContext *
609 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
610               const struct GNUNET_PeerIdentity *peer)
611 {
612   struct PeerContext *ctx;
613   int ret;
614
615   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
616   GNUNET_assert (GNUNET_YES == ret);
617   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
618   GNUNET_assert (NULL != ctx);
619   return ctx;
620 }
621
622 /**
623  * @brief Check whether we have information about the given peer.
624  *
625  * FIXME probably deprecated. Make this the new _online.
626  *
627  * @param peer_map The peer map to check for the existence of @a peer
628  * @param peer peer in question
629  *
630  * @return #GNUNET_YES if peer is known
631  *         #GNUNET_NO  if peer is not knwon
632  */
633 static int
634 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
635                   const struct GNUNET_PeerIdentity *peer)
636 {
637   if (NULL != peer_map)
638   {
639     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
640   }
641   else
642   {
643     return GNUNET_NO;
644   }
645 }
646
647
648 /**
649  * @brief Create a new #PeerContext and insert it into the peer map
650  *
651  * @param sub The Sub this context belongs to.
652  * @param peer the peer to create the #PeerContext for
653  *
654  * @return the #PeerContext
655  */
656 static struct PeerContext *
657 create_peer_ctx (struct Sub *sub,
658                  const struct GNUNET_PeerIdentity *peer)
659 {
660   struct PeerContext *ctx;
661   int ret;
662
663   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
664
665   ctx = GNUNET_new (struct PeerContext);
666   ctx->peer_id = *peer;
667   ctx->sub = sub;
668   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
669       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
670   GNUNET_assert (GNUNET_OK == ret);
671   if (sub == msub)
672   {
673     GNUNET_STATISTICS_set (stats,
674                           "# known peers",
675                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
676                           GNUNET_NO);
677   }
678   return ctx;
679 }
680
681
682 /**
683  * @brief Create or get a #PeerContext
684  *
685  * @param sub The Sub to which the created context belongs to
686  * @param peer the peer to get the associated context to
687  *
688  * @return the context
689  */
690 static struct PeerContext *
691 create_or_get_peer_ctx (struct Sub *sub,
692                         const struct GNUNET_PeerIdentity *peer)
693 {
694   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
695   {
696     return create_peer_ctx (sub, peer);
697   }
698   return get_peer_ctx (sub->peer_map, peer);
699 }
700
701
702 /**
703  * @brief Check whether we have a connection to this @a peer
704  *
705  * Also sets the #Peers_ONLINE flag accordingly
706  *
707  * @param peer_ctx Context of the peer of which connectivity is to be checked
708  *
709  * @return #GNUNET_YES if we are connected
710  *         #GNUNET_NO  otherwise
711  */
712 static int
713 check_connected (struct PeerContext *peer_ctx)
714 {
715   /* If we don't know about this peer we don't know whether it's online */
716   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
717                                      &peer_ctx->peer_id))
718   {
719     return GNUNET_NO;
720   }
721   /* Get the context */
722   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
723   /* If we have no channel to this peer we don't know whether it's online */
724   if ( (NULL == peer_ctx->send_channel_ctx) &&
725        (NULL == peer_ctx->recv_channel_ctx) )
726   {
727     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
728     return GNUNET_NO;
729   }
730   /* Otherwise (if we have a channel, we know that it's online */
731   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732   return GNUNET_YES;
733 }
734
735
736 /**
737  * @brief The closure to #get_rand_peer_iterator.
738  */
739 struct GetRandPeerIteratorCls
740 {
741   /**
742    * @brief The index of the peer to return.
743    * Will be decreased until 0.
744    * Then current peer is returned.
745    */
746   uint32_t index;
747
748   /**
749    * @brief Pointer to peer to return.
750    */
751   const struct GNUNET_PeerIdentity *peer;
752 };
753
754
755 /**
756  * @brief Iterator function for #get_random_peer_from_peermap.
757  *
758  * Implements #GNUNET_CONTAINER_PeerMapIterator.
759  * Decreases the index until the index is null.
760  * Then returns the current peer.
761  *
762  * @param cls the #GetRandPeerIteratorCls containing index and peer
763  * @param peer current peer
764  * @param value unused
765  *
766  * @return  #GNUNET_YES if we should continue to
767  *          iterate,
768  *          #GNUNET_NO if not.
769  */
770 static int
771 get_rand_peer_iterator (void *cls,
772                         const struct GNUNET_PeerIdentity *peer,
773                         void *value)
774 {
775   struct GetRandPeerIteratorCls *iterator_cls = cls;
776   (void) value;
777
778   if (0 >= iterator_cls->index)
779   {
780     iterator_cls->peer = peer;
781     return GNUNET_NO;
782   }
783   iterator_cls->index--;
784   return GNUNET_YES;
785 }
786
787
788 /**
789  * @brief Get a random peer from @a peer_map
790  *
791  * @param valid_peers Peer map containing valid peers from which to select a
792  * random one
793  *
794  * @return a random peer
795  */
796 static const struct GNUNET_PeerIdentity *
797 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
798 {
799   struct GetRandPeerIteratorCls *iterator_cls;
800   const struct GNUNET_PeerIdentity *ret;
801
802   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
803   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
804       GNUNET_CONTAINER_multipeermap_size (valid_peers));
805   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
806                                                 get_rand_peer_iterator,
807                                                 iterator_cls);
808   ret = iterator_cls->peer;
809   GNUNET_free (iterator_cls);
810   return ret;
811 }
812
813
814 /**
815  * @brief Add a given @a peer to valid peers.
816  *
817  * If valid peers are already #num_valid_peers_max, delete a peer previously.
818  *
819  * @param peer The peer that is added to the valid peers.
820  * @param valid_peers Peer map of valid peers to which to add the @a peer
821  *
822  * @return #GNUNET_YES if no other peer had to be removed
823  *         #GNUNET_NO  otherwise
824  */
825 static int
826 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
827                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
828 {
829   const struct GNUNET_PeerIdentity *rand_peer;
830   int ret;
831
832   ret = GNUNET_YES;
833   /* Remove random peers until there is space for a new one */
834   while (num_valid_peers_max <=
835          GNUNET_CONTAINER_multipeermap_size (valid_peers))
836   {
837     rand_peer = get_random_peer_from_peermap (valid_peers);
838     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
839     ret = GNUNET_NO;
840   }
841   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
842       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
843   if (valid_peers == msub->valid_peers)
844   {
845     GNUNET_STATISTICS_set (stats,
846                            "# valid peers",
847                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
848                            GNUNET_NO);
849   }
850   return ret;
851 }
852
853 static void
854 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
855
856 /**
857  * @brief Set the peer flag to living and
858  *        call the pending operations on this peer.
859  *
860  * Also adds peer to #valid_peers.
861  *
862  * @param peer_ctx the #PeerContext of the peer to set online
863  */
864 static void
865 set_peer_online (struct PeerContext *peer_ctx)
866 {
867   struct GNUNET_PeerIdentity *peer;
868   unsigned int i;
869
870   peer = &peer_ctx->peer_id;
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872       "Peer %s is online and valid, calling %i pending operations on it\n",
873       GNUNET_i2s (peer),
874       peer_ctx->num_pending_ops);
875
876   if (NULL != peer_ctx->online_check_pending)
877   {
878     LOG (GNUNET_ERROR_TYPE_DEBUG,
879          "Removing pending online check for peer %s\n",
880          GNUNET_i2s (&peer_ctx->peer_id));
881     // TODO wait until cadet sets mq->cancel_impl
882     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
883     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
884     peer_ctx->online_check_pending = NULL;
885   }
886
887   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
888
889   /* Call pending operations */
890   for (i = 0; i < peer_ctx->num_pending_ops; i++)
891   {
892     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
893   }
894   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
895 }
896
897 static void
898 cleanup_destroyed_channel (void *cls,
899                            const struct GNUNET_CADET_Channel *channel);
900
901 /* Declaration of handlers */
902 static void
903 handle_peer_check (void *cls,
904                    const struct GNUNET_MessageHeader *msg);
905
906 static void
907 handle_peer_push (void *cls,
908                   const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_pull_request (void *cls,
912                           const struct GNUNET_MessageHeader *msg);
913
914 static int
915 check_peer_pull_reply (void *cls,
916                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
917
918 static void
919 handle_peer_pull_reply (void *cls,
920                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 /* End declaration of handlers */
923
924 /**
925  * @brief Allocate memory for a new channel context and insert it into DLL
926  *
927  * @param peer_ctx context of the according peer
928  *
929  * @return The channel context
930  */
931 static struct ChannelCtx *
932 add_channel_ctx (struct PeerContext *peer_ctx)
933 {
934   struct ChannelCtx *channel_ctx;
935   channel_ctx = GNUNET_new (struct ChannelCtx);
936   channel_ctx->peer_ctx = peer_ctx;
937   return channel_ctx;
938 }
939
940
941 /**
942  * @brief Free memory and NULL pointers.
943  *
944  * @param channel_ctx The channel context.
945  */
946 static void
947 remove_channel_ctx (struct ChannelCtx *channel_ctx)
948 {
949   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
950
951   if (NULL != channel_ctx->destruction_task)
952   {
953     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
954     channel_ctx->destruction_task = NULL;
955   }
956
957   GNUNET_free (channel_ctx);
958
959   if (NULL == peer_ctx) return;
960   if (channel_ctx == peer_ctx->send_channel_ctx)
961   {
962     peer_ctx->send_channel_ctx = NULL;
963     peer_ctx->mq = NULL;
964   }
965   else if (channel_ctx == peer_ctx->recv_channel_ctx)
966   {
967     peer_ctx->recv_channel_ctx = NULL;
968   }
969 }
970
971
972 /**
973  * @brief Get the channel of a peer. If not existing, create.
974  *
975  * @param peer_ctx Context of the peer of which to get the channel
976  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
977  */
978 struct GNUNET_CADET_Channel *
979 get_channel (struct PeerContext *peer_ctx)
980 {
981   /* There exists a copy-paste-clone in run() */
982   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
983     GNUNET_MQ_hd_fixed_size (peer_check,
984                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
985                              struct GNUNET_MessageHeader,
986                              NULL),
987     GNUNET_MQ_hd_fixed_size (peer_push,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_pull_request,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_var_size (peer_pull_reply,
996                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
997                            struct GNUNET_RPS_P2P_PullReplyMessage,
998                            NULL),
999     GNUNET_MQ_handler_end ()
1000   };
1001
1002
1003   if (NULL == peer_ctx->send_channel_ctx)
1004   {
1005     LOG (GNUNET_ERROR_TYPE_DEBUG,
1006          "Trying to establish channel to peer %s\n",
1007          GNUNET_i2s (&peer_ctx->peer_id));
1008     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1009     peer_ctx->send_channel_ctx->channel =
1010       GNUNET_CADET_channel_create (cadet_handle,
1011                                    peer_ctx->send_channel_ctx, /* context */
1012                                    &peer_ctx->peer_id,
1013                                    &peer_ctx->sub->hash,
1014                                    GNUNET_CADET_OPTION_RELIABLE,
1015                                    NULL, /* WindowSize handler */
1016                                    &cleanup_destroyed_channel, /* Disconnect handler */
1017                                    cadet_handlers);
1018   }
1019   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1020   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1021   return peer_ctx->send_channel_ctx->channel;
1022 }
1023
1024
1025 /**
1026  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1027  *
1028  * If we already have a message queue open to this client,
1029  * simply return it, otherways create one.
1030  *
1031  * @param peer_ctx Context of the peer of whicht to get the mq
1032  * @return the #GNUNET_MQ_Handle
1033  */
1034 static struct GNUNET_MQ_Handle *
1035 get_mq (struct PeerContext *peer_ctx)
1036 {
1037   if (NULL == peer_ctx->mq)
1038   {
1039     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1040   }
1041   return peer_ctx->mq;
1042 }
1043
1044 /**
1045  * @brief Add an envelope to a message passed to mq to list of pending messages
1046  *
1047  * @param peer_ctx Context of the peer for which to insert the envelope
1048  * @param ev envelope to the message
1049  * @param type type of the message to be sent
1050  * @return pointer to pending message
1051  */
1052 static struct PendingMessage *
1053 insert_pending_message (struct PeerContext *peer_ctx,
1054                         struct GNUNET_MQ_Envelope *ev,
1055                         const char *type)
1056 {
1057   struct PendingMessage *pending_msg;
1058
1059   pending_msg = GNUNET_new (struct PendingMessage);
1060   pending_msg->ev = ev;
1061   pending_msg->peer_ctx = peer_ctx;
1062   pending_msg->type = type;
1063   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1064                                peer_ctx->pending_messages_tail,
1065                                pending_msg);
1066   return pending_msg;
1067 }
1068
1069
1070 /**
1071  * @brief Remove a pending message from the respective DLL
1072  *
1073  * @param pending_msg the pending message to remove
1074  * @param cancel whether to cancel the pending message, too
1075  */
1076 static void
1077 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1078 {
1079   struct PeerContext *peer_ctx;
1080   (void) cancel;
1081
1082   peer_ctx = pending_msg->peer_ctx;
1083   GNUNET_assert (NULL != peer_ctx);
1084   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1085                                peer_ctx->pending_messages_tail,
1086                                pending_msg);
1087   // TODO wait for the cadet implementation of message cancellation
1088   //if (GNUNET_YES == cancel)
1089   //{
1090   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1091   //}
1092   GNUNET_free (pending_msg);
1093 }
1094
1095
1096 /**
1097  * @brief This is called in response to the first message we sent as a
1098  * online check.
1099  *
1100  * @param cls #PeerContext of peer with pending online check
1101  */
1102 static void
1103 mq_online_check_successful (void *cls)
1104 {
1105   struct PeerContext *peer_ctx = cls;
1106
1107   if (NULL != peer_ctx->online_check_pending)
1108   {
1109     LOG (GNUNET_ERROR_TYPE_DEBUG,
1110         "Online check for peer %s was successfull\n",
1111         GNUNET_i2s (&peer_ctx->peer_id));
1112     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1113     peer_ctx->online_check_pending = NULL;
1114     set_peer_online (peer_ctx);
1115     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1116   }
1117 }
1118
1119 /**
1120  * Issue a check whether peer is online
1121  *
1122  * @param peer_ctx the context of the peer
1123  */
1124 static void
1125 check_peer_online (struct PeerContext *peer_ctx)
1126 {
1127   LOG (GNUNET_ERROR_TYPE_DEBUG,
1128        "Get informed about peer %s getting online\n",
1129        GNUNET_i2s (&peer_ctx->peer_id));
1130
1131   struct GNUNET_MQ_Handle *mq;
1132   struct GNUNET_MQ_Envelope *ev;
1133
1134   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1135   peer_ctx->online_check_pending =
1136     insert_pending_message (peer_ctx, ev, "Check online");
1137   mq = get_mq (peer_ctx);
1138   GNUNET_MQ_notify_sent (ev,
1139                          mq_online_check_successful,
1140                          peer_ctx);
1141   GNUNET_MQ_send (mq, ev);
1142   if (peer_ctx->sub == msub)
1143   {
1144     GNUNET_STATISTICS_update (stats,
1145                               "# pending online checks",
1146                               1,
1147                               GNUNET_NO);
1148   }
1149 }
1150
1151
1152 /**
1153  * @brief Check whether function of type #PeerOp was already scheduled
1154  *
1155  * The array with pending operations will probably never grow really big, so
1156  * iterating over it should be ok.
1157  *
1158  * @param peer_ctx Context of the peer to check for the operation
1159  * @param peer_op the operation (#PeerOp) on the peer
1160  *
1161  * @return #GNUNET_YES if this operation is scheduled on that peer
1162  *         #GNUNET_NO  otherwise
1163  */
1164 static int
1165 check_operation_scheduled (const struct PeerContext *peer_ctx,
1166                            const PeerOp peer_op)
1167 {
1168   unsigned int i;
1169
1170   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1171     if (peer_op == peer_ctx->pending_ops[i].op)
1172       return GNUNET_YES;
1173   return GNUNET_NO;
1174 }
1175
1176
1177 /**
1178  * @brief Callback for scheduler to destroy a channel
1179  *
1180  * @param cls Context of the channel
1181  */
1182 static void
1183 destroy_channel (struct ChannelCtx *channel_ctx)
1184 {
1185   struct GNUNET_CADET_Channel *channel;
1186
1187   if (NULL != channel_ctx->destruction_task)
1188   {
1189     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1190     channel_ctx->destruction_task = NULL;
1191   }
1192   GNUNET_assert (channel_ctx->channel != NULL);
1193   channel = channel_ctx->channel;
1194   channel_ctx->channel = NULL;
1195   GNUNET_CADET_channel_destroy (channel);
1196   remove_channel_ctx (channel_ctx);
1197 }
1198
1199
1200 /**
1201  * @brief Destroy a cadet channel.
1202  *
1203  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1204  *
1205  * @param cls
1206  */
1207 static void
1208 destroy_channel_cb (void *cls)
1209 {
1210   struct ChannelCtx *channel_ctx = cls;
1211
1212   channel_ctx->destruction_task = NULL;
1213   destroy_channel (channel_ctx);
1214 }
1215
1216
1217 /**
1218  * @brief Schedule the destruction of a channel for immediately afterwards.
1219  *
1220  * In case a channel is to be destroyed from within the callback to the
1221  * destruction of another channel (send channel), we cannot call
1222  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1223  * construction.
1224  *
1225  * @param channel_ctx channel to be destroyed.
1226  */
1227 static void
1228 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1229 {
1230   GNUNET_assert (NULL ==
1231                  channel_ctx->destruction_task);
1232   GNUNET_assert (NULL !=
1233                  channel_ctx->channel);
1234   channel_ctx->destruction_task =
1235     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1236                               channel_ctx);
1237 }
1238
1239
1240 /**
1241  * @brief Remove peer
1242  *
1243  * - Empties the list with pending operations
1244  * - Empties the list with pending messages
1245  * - Cancels potentially existing online check
1246  * - Schedules closing of send and recv channels
1247  * - Removes peer from peer map
1248  *
1249  * @param peer_ctx Context of the peer to be destroyed
1250  * @return #GNUNET_YES if peer was removed
1251  *         #GNUNET_NO  otherwise
1252  */
1253 static int
1254 destroy_peer (struct PeerContext *peer_ctx)
1255 {
1256   GNUNET_assert (NULL != peer_ctx);
1257   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1258   if (GNUNET_NO ==
1259       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1260                                               &peer_ctx->peer_id))
1261   {
1262     return GNUNET_NO;
1263   }
1264   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1265   LOG (GNUNET_ERROR_TYPE_DEBUG,
1266        "Going to remove peer %s\n",
1267        GNUNET_i2s (&peer_ctx->peer_id));
1268   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1269
1270   /* Clear list of pending operations */
1271   // TODO this probably leaks memory
1272   //      ('only' the cls to the function. Not sure what to do with it)
1273   GNUNET_array_grow (peer_ctx->pending_ops,
1274                      peer_ctx->num_pending_ops,
1275                      0);
1276   /* Remove all pending messages */
1277   while (NULL != peer_ctx->pending_messages_head)
1278   {
1279     LOG (GNUNET_ERROR_TYPE_DEBUG,
1280          "Removing unsent %s\n",
1281          peer_ctx->pending_messages_head->type);
1282     /* Cancle pending message, too */
1283     if ( (NULL != peer_ctx->online_check_pending) &&
1284          (0 == memcmp (peer_ctx->pending_messages_head,
1285                      peer_ctx->online_check_pending,
1286                      sizeof (struct PendingMessage))) )
1287       {
1288         peer_ctx->online_check_pending = NULL;
1289         if (peer_ctx->sub == msub)
1290         {
1291           GNUNET_STATISTICS_update (stats,
1292                                     "# pending online checks",
1293                                     -1,
1294                                     GNUNET_NO);
1295         }
1296       }
1297     remove_pending_message (peer_ctx->pending_messages_head,
1298                             GNUNET_YES);
1299   }
1300
1301   /* If we are still waiting for notification whether this peer is online
1302    * cancel the according task */
1303   if (NULL != peer_ctx->online_check_pending)
1304   {
1305     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306                 "Removing pending online check for peer %s\n",
1307                 GNUNET_i2s (&peer_ctx->peer_id));
1308     // TODO wait until cadet sets mq->cancel_impl
1309     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1310     remove_pending_message (peer_ctx->online_check_pending,
1311                             GNUNET_YES);
1312     peer_ctx->online_check_pending = NULL;
1313   }
1314
1315   if (NULL != peer_ctx->send_channel_ctx)
1316   {
1317     /* This is possibly called from within channel destruction */
1318     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1319     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1320     peer_ctx->send_channel_ctx = NULL;
1321     peer_ctx->mq = NULL;
1322   }
1323   if (NULL != peer_ctx->recv_channel_ctx)
1324   {
1325     /* This is possibly called from within channel destruction */
1326     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1327     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1328     peer_ctx->recv_channel_ctx = NULL;
1329   }
1330
1331   if (GNUNET_YES !=
1332       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1333                                                 &peer_ctx->peer_id))
1334   {
1335     LOG (GNUNET_ERROR_TYPE_WARNING,
1336          "removing peer from peer_ctx->sub->peer_map failed\n");
1337   }
1338   if (peer_ctx->sub == msub)
1339   {
1340     GNUNET_STATISTICS_set (stats,
1341                           "# known peers",
1342                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1343                           GNUNET_NO);
1344   }
1345   GNUNET_free (peer_ctx);
1346   return GNUNET_YES;
1347 }
1348
1349
1350 /**
1351  * Iterator over hash map entries. Deletes all contexts of peers.
1352  *
1353  * @param cls closure
1354  * @param key current public key
1355  * @param value value in the hash map
1356  * @return #GNUNET_YES if we should continue to iterate,
1357  *         #GNUNET_NO if not.
1358  */
1359 static int
1360 peermap_clear_iterator (void *cls,
1361                         const struct GNUNET_PeerIdentity *key,
1362                         void *value)
1363 {
1364   struct Sub *sub = cls;
1365   (void) value;
1366
1367   destroy_peer (get_peer_ctx (sub->peer_map, key));
1368   return GNUNET_YES;
1369 }
1370
1371
1372 /**
1373  * @brief This is called once a message is sent.
1374  *
1375  * Removes the pending message
1376  *
1377  * @param cls type of the message that was sent
1378  */
1379 static void
1380 mq_notify_sent_cb (void *cls)
1381 {
1382   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1383   LOG (GNUNET_ERROR_TYPE_DEBUG,
1384       "%s was sent.\n",
1385       pending_msg->type);
1386   if (pending_msg->peer_ctx->sub == msub)
1387   {
1388     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1389       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1390     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1391       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1392     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1393       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1395                       NULL != map_single_hop &&
1396         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1397           &pending_msg->peer_ctx->peer_id))
1398       GNUNET_STATISTICS_update(stats,
1399                                "# pull requests sent (multi-hop peer)",
1400                                1,
1401                                GNUNET_NO);
1402   }
1403   /* Do not cancle message */
1404   remove_pending_message (pending_msg, GNUNET_NO);
1405 }
1406
1407
1408 /**
1409  * @brief Iterator function for #store_valid_peers.
1410  *
1411  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1412  * Writes single peer to disk.
1413  *
1414  * @param cls the file handle to write to.
1415  * @param peer current peer
1416  * @param value unused
1417  *
1418  * @return  #GNUNET_YES if we should continue to
1419  *          iterate,
1420  *          #GNUNET_NO if not.
1421  */
1422 static int
1423 store_peer_presistently_iterator (void *cls,
1424                                   const struct GNUNET_PeerIdentity *peer,
1425                                   void *value)
1426 {
1427   const struct GNUNET_DISK_FileHandle *fh = cls;
1428   char peer_string[128];
1429   int size;
1430   ssize_t ret;
1431   (void) value;
1432
1433   if (NULL == peer)
1434   {
1435     return GNUNET_YES;
1436   }
1437   size = GNUNET_snprintf (peer_string,
1438                           sizeof (peer_string),
1439                           "%s\n",
1440                           GNUNET_i2s_full (peer));
1441   GNUNET_assert (53 == size);
1442   ret = GNUNET_DISK_file_write (fh,
1443                                 peer_string,
1444                                 size);
1445   GNUNET_assert (size == ret);
1446   return GNUNET_YES;
1447 }
1448
1449
1450 /**
1451  * @brief Store the peers currently in #valid_peers to disk.
1452  *
1453  * @param sub Sub for which to store the valid peers
1454  */
1455 static void
1456 store_valid_peers (const struct Sub *sub)
1457 {
1458   struct GNUNET_DISK_FileHandle *fh;
1459   uint32_t number_written_peers;
1460   int ret;
1461
1462   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1463   {
1464     return;
1465   }
1466
1467   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1468   if (GNUNET_SYSERR == ret)
1469   {
1470     LOG (GNUNET_ERROR_TYPE_WARNING,
1471         "Not able to create directory for file `%s'\n",
1472         sub->filename_valid_peers);
1473     GNUNET_break (0);
1474   }
1475   else if (GNUNET_NO == ret)
1476   {
1477     LOG (GNUNET_ERROR_TYPE_WARNING,
1478         "Directory for file `%s' exists but is not writable for us\n",
1479         sub->filename_valid_peers);
1480     GNUNET_break (0);
1481   }
1482   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1483                               GNUNET_DISK_OPEN_WRITE |
1484                                   GNUNET_DISK_OPEN_CREATE,
1485                               GNUNET_DISK_PERM_USER_READ |
1486                                   GNUNET_DISK_PERM_USER_WRITE);
1487   if (NULL == fh)
1488   {
1489     LOG (GNUNET_ERROR_TYPE_WARNING,
1490         "Not able to write valid peers to file `%s'\n",
1491         sub->filename_valid_peers);
1492     return;
1493   }
1494   LOG (GNUNET_ERROR_TYPE_DEBUG,
1495       "Writing %u valid peers to disk\n",
1496       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1497   number_written_peers =
1498     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1499                                            store_peer_presistently_iterator,
1500                                            fh);
1501   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1502   GNUNET_assert (number_written_peers ==
1503       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1504 }
1505
1506
1507 /**
1508  * @brief Convert string representation of peer id to peer id.
1509  *
1510  * Counterpart to #GNUNET_i2s_full.
1511  *
1512  * @param string_repr The string representation of the peer id
1513  *
1514  * @return The peer id
1515  */
1516 static const struct GNUNET_PeerIdentity *
1517 s2i_full (const char *string_repr)
1518 {
1519   struct GNUNET_PeerIdentity *peer;
1520   size_t len;
1521   int ret;
1522
1523   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1524   len = strlen (string_repr);
1525   if (52 > len)
1526   {
1527     LOG (GNUNET_ERROR_TYPE_WARNING,
1528         "Not able to convert string representation of PeerID to PeerID\n"
1529         "Sting representation: %s (len %lu) - too short\n",
1530         string_repr,
1531         len);
1532     GNUNET_break (0);
1533   }
1534   else if (52 < len)
1535   {
1536     len = 52;
1537   }
1538   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1539                                                     len,
1540                                                     &peer->public_key);
1541   if (GNUNET_OK != ret)
1542   {
1543     LOG (GNUNET_ERROR_TYPE_WARNING,
1544         "Not able to convert string representation of PeerID to PeerID\n"
1545         "Sting representation: %s\n",
1546         string_repr);
1547     GNUNET_break (0);
1548   }
1549   return peer;
1550 }
1551
1552
1553 /**
1554  * @brief Restore the peers on disk to #valid_peers.
1555  *
1556  * @param sub Sub for which to restore the valid peers
1557  */
1558 static void
1559 restore_valid_peers (const struct Sub *sub)
1560 {
1561   off_t file_size;
1562   uint32_t num_peers;
1563   struct GNUNET_DISK_FileHandle *fh;
1564   char *buf;
1565   ssize_t size_read;
1566   char *iter_buf;
1567   char *str_repr;
1568   const struct GNUNET_PeerIdentity *peer;
1569
1570   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1571   {
1572     return;
1573   }
1574
1575   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1576   {
1577     return;
1578   }
1579   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1580                               GNUNET_DISK_OPEN_READ,
1581                               GNUNET_DISK_PERM_NONE);
1582   GNUNET_assert (NULL != fh);
1583   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1584   num_peers = file_size / 53;
1585   buf = GNUNET_malloc (file_size);
1586   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1587   GNUNET_assert (size_read == file_size);
1588   LOG (GNUNET_ERROR_TYPE_DEBUG,
1589       "Restoring %" PRIu32 " peers from file `%s'\n",
1590       num_peers,
1591       sub->filename_valid_peers);
1592   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1593   {
1594     str_repr = GNUNET_strndup (iter_buf, 53);
1595     peer = s2i_full (str_repr);
1596     GNUNET_free (str_repr);
1597     add_valid_peer (peer, sub->valid_peers);
1598     LOG (GNUNET_ERROR_TYPE_DEBUG,
1599         "Restored valid peer %s from disk\n",
1600         GNUNET_i2s_full (peer));
1601   }
1602   iter_buf = NULL;
1603   GNUNET_free (buf);
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1606       num_peers,
1607       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1608   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1609   {
1610     LOG (GNUNET_ERROR_TYPE_WARNING,
1611         "Number of restored peers does not match file size. Have probably duplicates.\n");
1612   }
1613   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1614   LOG (GNUNET_ERROR_TYPE_DEBUG,
1615       "Restored %u valid peers from disk\n",
1616       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1617 }
1618
1619
1620 /**
1621  * @brief Delete storage of peers that was created with #initialise_peers ()
1622  *
1623  * @param sub Sub for which the storage is deleted
1624  */
1625 static void
1626 peers_terminate (struct Sub *sub)
1627 {
1628   if (GNUNET_SYSERR ==
1629       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1630                                              &peermap_clear_iterator,
1631                                              sub))
1632   {
1633     LOG (GNUNET_ERROR_TYPE_WARNING,
1634         "Iteration destroying peers was aborted.\n");
1635   }
1636   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1637   sub->peer_map = NULL;
1638   store_valid_peers (sub);
1639   GNUNET_free (sub->filename_valid_peers);
1640   sub->filename_valid_peers = NULL;
1641   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1642   sub->valid_peers = NULL;
1643 }
1644
1645
1646 /**
1647  * Iterator over #valid_peers hash map entries.
1648  *
1649  * @param cls Closure that contains iterator function and closure
1650  * @param peer current peer id
1651  * @param value value in the hash map - unused
1652  * @return #GNUNET_YES if we should continue to
1653  *         iterate,
1654  *         #GNUNET_NO if not.
1655  */
1656 static int
1657 valid_peer_iterator (void *cls,
1658                      const struct GNUNET_PeerIdentity *peer,
1659                      void *value)
1660 {
1661   struct PeersIteratorCls *it_cls = cls;
1662   (void) value;
1663
1664   return it_cls->iterator (it_cls->cls, peer);
1665 }
1666
1667
1668 /**
1669  * @brief Get all currently known, valid peer ids.
1670  *
1671  * @param valid_peers Peer map containing the valid peers in question
1672  * @param iterator function to call on each peer id
1673  * @param it_cls extra argument to @a iterator
1674  * @return the number of key value pairs processed,
1675  *         #GNUNET_SYSERR if it aborted iteration
1676  */
1677 static int
1678 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1679                  PeersIterator iterator,
1680                  void *it_cls)
1681 {
1682   struct PeersIteratorCls *cls;
1683   int ret;
1684
1685   cls = GNUNET_new (struct PeersIteratorCls);
1686   cls->iterator = iterator;
1687   cls->cls = it_cls;
1688   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1689                                                valid_peer_iterator,
1690                                                cls);
1691   GNUNET_free (cls);
1692   return ret;
1693 }
1694
1695
1696 /**
1697  * @brief Add peer to known peers.
1698  *
1699  * This function is called on new peer_ids from 'external' sources
1700  * (client seed, cadet get_peers(), ...)
1701  *
1702  * @param sub Sub with the peer map that the @a peer will be added to
1703  * @param peer the new #GNUNET_PeerIdentity
1704  *
1705  * @return #GNUNET_YES if peer was inserted
1706  *         #GNUNET_NO  otherwise
1707  */
1708 static int
1709 insert_peer (struct Sub *sub,
1710              const struct GNUNET_PeerIdentity *peer)
1711 {
1712   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1713   {
1714     return GNUNET_NO; /* We already know this peer - nothing to do */
1715   }
1716   (void) create_peer_ctx (sub, peer);
1717   return GNUNET_YES;
1718 }
1719
1720
1721 /**
1722  * @brief Check whether flags on a peer are set.
1723  *
1724  * @param peer_map Peer map that is expected to contain the @a peer
1725  * @param peer the peer to check the flag of
1726  * @param flags the flags to check
1727  *
1728  * @return #GNUNET_SYSERR if peer is not known
1729  *         #GNUNET_YES    if all given flags are set
1730  *         #GNUNET_NO     otherwise
1731  */
1732 static int
1733 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1734                  const struct GNUNET_PeerIdentity *peer,
1735                  enum Peers_PeerFlags flags)
1736 {
1737   struct PeerContext *peer_ctx;
1738
1739   if (GNUNET_NO == check_peer_known (peer_map, peer))
1740   {
1741     return GNUNET_SYSERR;
1742   }
1743   peer_ctx = get_peer_ctx (peer_map, peer);
1744   return check_peer_flag_set (peer_ctx, flags);
1745 }
1746
1747 /**
1748  * @brief Try connecting to a peer to see whether it is online
1749  *
1750  * If not known yet, insert into known peers
1751  *
1752  * @param sub Sub which would contain the @a peer
1753  * @param peer the peer whose online is to be checked
1754  * @return #GNUNET_YES if the check was issued
1755  *         #GNUNET_NO  otherwise
1756  */
1757 static int
1758 issue_peer_online_check (struct Sub *sub,
1759                          const struct GNUNET_PeerIdentity *peer)
1760 {
1761   struct PeerContext *peer_ctx;
1762
1763   (void) insert_peer (sub, peer); // TODO even needed?
1764   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1765   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1766        (NULL == peer_ctx->online_check_pending) )
1767   {
1768     check_peer_online (peer_ctx);
1769     return GNUNET_YES;
1770   }
1771   return GNUNET_NO;
1772 }
1773
1774
1775 /**
1776  * @brief Check if peer is removable.
1777  *
1778  * Check if
1779  *  - a recv channel exists
1780  *  - there are pending messages
1781  *  - there is no pending pull reply
1782  *
1783  * @param peer_ctx Context of the peer in question
1784  * @return #GNUNET_YES    if peer is removable
1785  *         #GNUNET_NO     if peer is NOT removable
1786  *         #GNUNET_SYSERR if peer is not known
1787  */
1788 static int
1789 check_removable (const struct PeerContext *peer_ctx)
1790 {
1791   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1792                                                            &peer_ctx->peer_id))
1793   {
1794     return GNUNET_SYSERR;
1795   }
1796
1797   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1798        (NULL != peer_ctx->pending_messages_head) ||
1799        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1800   {
1801     return GNUNET_NO;
1802   }
1803   return GNUNET_YES;
1804 }
1805
1806
1807 /**
1808  * @brief Check whether @a peer is actually a peer.
1809  *
1810  * A valid peer is a peer that we know exists eg. we were connected to once.
1811  *
1812  * @param valid_peers Peer map that would contain the @a peer
1813  * @param peer peer in question
1814  *
1815  * @return #GNUNET_YES if peer is valid
1816  *         #GNUNET_NO  if peer is not valid
1817  */
1818 static int
1819 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1820                   const struct GNUNET_PeerIdentity *peer)
1821 {
1822   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1823 }
1824
1825
1826 /**
1827  * @brief Indicate that we want to send to the other peer
1828  *
1829  * This establishes a sending channel
1830  *
1831  * @param peer_ctx Context of the target peer
1832  */
1833 static void
1834 indicate_sending_intention (struct PeerContext *peer_ctx)
1835 {
1836   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1837                                                  &peer_ctx->peer_id));
1838   (void) get_channel (peer_ctx);
1839 }
1840
1841
1842 /**
1843  * @brief Check whether other peer has the intention to send/opened channel
1844  *        towars us
1845  *
1846  * @param peer_ctx Context of the peer in question
1847  *
1848  * @return #GNUNET_YES if peer has the intention to send
1849  *         #GNUNET_NO  otherwise
1850  */
1851 static int
1852 check_peer_send_intention (const struct PeerContext *peer_ctx)
1853 {
1854   if (NULL != peer_ctx->recv_channel_ctx)
1855   {
1856     return GNUNET_YES;
1857   }
1858   return GNUNET_NO;
1859 }
1860
1861
1862 /**
1863  * Handle the channel a peer opens to us.
1864  *
1865  * @param cls The closure - Sub
1866  * @param channel The channel the peer wants to establish
1867  * @param initiator The peer's peer ID
1868  *
1869  * @return initial channel context for the channel
1870  *         (can be NULL -- that's not an error)
1871  */
1872 static void *
1873 handle_inbound_channel (void *cls,
1874                         struct GNUNET_CADET_Channel *channel,
1875                         const struct GNUNET_PeerIdentity *initiator)
1876 {
1877   struct PeerContext *peer_ctx;
1878   struct ChannelCtx *channel_ctx;
1879   struct Sub *sub = cls;
1880
1881   LOG (GNUNET_ERROR_TYPE_DEBUG,
1882       "New channel was established to us (Peer %s).\n",
1883       GNUNET_i2s (initiator));
1884   GNUNET_assert (NULL != channel); /* according to cadet API */
1885   /* Make sure we 'know' about this peer */
1886   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1887   set_peer_online (peer_ctx);
1888   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1889   channel_ctx = add_channel_ctx (peer_ctx);
1890   channel_ctx->channel = channel;
1891   /* We only accept one incoming channel per peer */
1892   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1893                                                              initiator)))
1894   {
1895     LOG (GNUNET_ERROR_TYPE_WARNING,
1896         "Already got one receive channel. Destroying old one.\n");
1897     GNUNET_break_op (0);
1898     destroy_channel (peer_ctx->recv_channel_ctx);
1899     peer_ctx->recv_channel_ctx = channel_ctx;
1900     /* return the channel context */
1901     return channel_ctx;
1902   }
1903   peer_ctx->recv_channel_ctx = channel_ctx;
1904   return channel_ctx;
1905 }
1906
1907
1908 /**
1909  * @brief Check whether a sending channel towards the given peer exists
1910  *
1911  * @param peer_ctx Context of the peer in question
1912  *
1913  * @return #GNUNET_YES if a sending channel towards that peer exists
1914  *         #GNUNET_NO  otherwise
1915  */
1916 static int
1917 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1918 {
1919   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1920                                      &peer_ctx->peer_id))
1921   { /* If no such peer exists, there is no channel */
1922     return GNUNET_NO;
1923   }
1924   if (NULL == peer_ctx->send_channel_ctx)
1925   {
1926     return GNUNET_NO;
1927   }
1928   return GNUNET_YES;
1929 }
1930
1931
1932 /**
1933  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1934  *        intention to another peer
1935  *
1936  * @param peer_ctx Context to the peer
1937  * @return #GNUNET_YES if channel was destroyed
1938  *         #GNUNET_NO  otherwise
1939  */
1940 static int
1941 destroy_sending_channel (struct PeerContext *peer_ctx)
1942 {
1943   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1944                                      &peer_ctx->peer_id))
1945   {
1946     return GNUNET_NO;
1947   }
1948   if (NULL != peer_ctx->send_channel_ctx)
1949   {
1950     destroy_channel (peer_ctx->send_channel_ctx);
1951     (void) check_connected (peer_ctx);
1952     return GNUNET_YES;
1953   }
1954   return GNUNET_NO;
1955 }
1956
1957 /**
1958  * @brief Send a message to another peer.
1959  *
1960  * Keeps track about pending messages so they can be properly removed when the
1961  * peer is destroyed.
1962  *
1963  * @param peer_ctx Context of the peer to which the message is to be sent
1964  * @param ev envelope of the message
1965  * @param type type of the message
1966  */
1967 static void
1968 send_message (struct PeerContext *peer_ctx,
1969               struct GNUNET_MQ_Envelope *ev,
1970               const char *type)
1971 {
1972   struct PendingMessage *pending_msg;
1973   struct GNUNET_MQ_Handle *mq;
1974
1975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976               "Sending message to %s of type %s\n",
1977               GNUNET_i2s (&peer_ctx->peer_id),
1978               type);
1979   pending_msg = insert_pending_message (peer_ctx, ev, type);
1980   mq = get_mq (peer_ctx);
1981   GNUNET_MQ_notify_sent (ev,
1982                          mq_notify_sent_cb,
1983                          pending_msg);
1984   GNUNET_MQ_send (mq, ev);
1985 }
1986
1987 /**
1988  * @brief Schedule a operation on given peer
1989  *
1990  * Avoids scheduling an operation twice.
1991  *
1992  * @param peer_ctx Context of the peer for which to schedule the operation
1993  * @param peer_op the operation to schedule
1994  * @param cls Closure to @a peer_op
1995  *
1996  * @return #GNUNET_YES if the operation was scheduled
1997  *         #GNUNET_NO  otherwise
1998  */
1999 static int
2000 schedule_operation (struct PeerContext *peer_ctx,
2001                     const PeerOp peer_op,
2002                     void *cls)
2003 {
2004   struct PeerPendingOp pending_op;
2005
2006   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2007                                                  &peer_ctx->peer_id));
2008
2009   //TODO if ONLINE execute immediately
2010
2011   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2012   {
2013     pending_op.op = peer_op;
2014     pending_op.op_cls = cls;
2015     GNUNET_array_append (peer_ctx->pending_ops,
2016                          peer_ctx->num_pending_ops,
2017                          pending_op);
2018     return GNUNET_YES;
2019   }
2020   return GNUNET_NO;
2021 }
2022
2023 /***********************************************************************
2024  * /Old gnunet-service-rps_peers.c
2025 ***********************************************************************/
2026
2027
2028 /***********************************************************************
2029  * Housekeeping with clients
2030 ***********************************************************************/
2031
2032 /**
2033  * Closure used to pass the client and the id to the callback
2034  * that replies to a client's request
2035  */
2036 struct ReplyCls
2037 {
2038   /**
2039    * DLL
2040    */
2041   struct ReplyCls *next;
2042   struct ReplyCls *prev;
2043
2044   /**
2045    * The identifier of the request
2046    */
2047   uint32_t id;
2048
2049   /**
2050    * The handle to the request
2051    */
2052   struct RPS_SamplerRequestHandle *req_handle;
2053
2054   /**
2055    * The client handle to send the reply to
2056    */
2057   struct ClientContext *cli_ctx;
2058 };
2059
2060
2061 /**
2062  * Struct used to store the context of a connected client.
2063  */
2064 struct ClientContext
2065 {
2066   /**
2067    * DLL
2068    */
2069   struct ClientContext *next;
2070   struct ClientContext *prev;
2071
2072   /**
2073    * The message queue to communicate with the client.
2074    */
2075   struct GNUNET_MQ_Handle *mq;
2076
2077   /**
2078    * @brief How many updates this client expects to receive.
2079    */
2080   int64_t view_updates_left;
2081
2082   /**
2083    * @brief Whether this client wants to receive stream updates.
2084    * Either #GNUNET_YES or #GNUNET_NO
2085    */
2086   int8_t stream_update;
2087
2088   /**
2089    * The client handle to send the reply to
2090    */
2091   struct GNUNET_SERVICE_Client *client;
2092
2093   /**
2094    * The #Sub this context belongs to
2095    */
2096   struct Sub *sub;
2097 };
2098
2099 /**
2100  * DLL with all clients currently connected to us
2101  */
2102 struct ClientContext *cli_ctx_head;
2103 struct ClientContext *cli_ctx_tail;
2104
2105 /***********************************************************************
2106  * /Housekeeping with clients
2107 ***********************************************************************/
2108
2109
2110
2111
2112
2113 /***********************************************************************
2114  * Util functions
2115 ***********************************************************************/
2116
2117
2118 /**
2119  * Print peerlist to log.
2120  */
2121 static void
2122 print_peer_list (struct GNUNET_PeerIdentity *list,
2123                  unsigned int len)
2124 {
2125   unsigned int i;
2126
2127   LOG (GNUNET_ERROR_TYPE_DEBUG,
2128        "Printing peer list of length %u at %p:\n",
2129        len,
2130        list);
2131   for (i = 0 ; i < len ; i++)
2132   {
2133     LOG (GNUNET_ERROR_TYPE_DEBUG,
2134          "%u. peer: %s\n",
2135          i, GNUNET_i2s (&list[i]));
2136   }
2137 }
2138
2139
2140 /**
2141  * Remove peer from list.
2142  */
2143 static void
2144 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2145                unsigned int *list_size,
2146                const struct GNUNET_PeerIdentity *peer)
2147 {
2148   unsigned int i;
2149   struct GNUNET_PeerIdentity *tmp;
2150
2151   tmp = *peer_list;
2152
2153   LOG (GNUNET_ERROR_TYPE_DEBUG,
2154        "Removing peer %s from list at %p\n",
2155        GNUNET_i2s (peer),
2156        tmp);
2157
2158   for ( i = 0 ; i < *list_size ; i++ )
2159   {
2160     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2161     {
2162       if (i < *list_size -1)
2163       { /* Not at the last entry -- shift peers left */
2164         memmove (&tmp[i], &tmp[i +1],
2165                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2166       }
2167       /* Remove last entry (should be now useless PeerID) */
2168       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2169     }
2170   }
2171   *peer_list = tmp;
2172 }
2173
2174
2175 /**
2176  * Insert PeerID in #view
2177  *
2178  * Called once we know a peer is online.
2179  * Implements #PeerOp
2180  *
2181  * @return GNUNET_OK if peer was actually inserted
2182  *         GNUNET_NO if peer was not inserted
2183  */
2184 static void
2185 insert_in_view_op (void *cls,
2186                    const struct GNUNET_PeerIdentity *peer);
2187
2188 /**
2189  * Insert PeerID in #view
2190  *
2191  * Called once we know a peer is online.
2192  *
2193  * @param sub Sub in with the view to insert in
2194  * @param peer the peer to insert
2195  *
2196  * @return GNUNET_OK if peer was actually inserted
2197  *         GNUNET_NO if peer was not inserted
2198  */
2199 static int
2200 insert_in_view (struct Sub *sub,
2201                 const struct GNUNET_PeerIdentity *peer)
2202 {
2203   struct PeerContext *peer_ctx;
2204   int online;
2205   int ret;
2206
2207   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2208   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2209   if ( (GNUNET_NO == online) ||
2210        (GNUNET_SYSERR == online) ) /* peer is not even known */
2211   {
2212     (void) issue_peer_online_check (sub, peer);
2213     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2214     return GNUNET_NO;
2215   }
2216   /* Open channel towards peer to keep connection open */
2217   indicate_sending_intention (peer_ctx);
2218   ret = View_put (sub->view, peer);
2219   if (peer_ctx->sub == msub)
2220   {
2221     GNUNET_STATISTICS_set (stats,
2222                            "view size",
2223                            View_size (peer_ctx->sub->view),
2224                            GNUNET_NO);
2225   }
2226   return ret;
2227 }
2228
2229
2230 /**
2231  * @brief Send view to client
2232  *
2233  * @param cli_ctx the context of the client
2234  * @param view_array the peerids of the view as array (can be empty)
2235  * @param view_size the size of the view array (can be 0)
2236  */
2237 static void
2238 send_view (const struct ClientContext *cli_ctx,
2239            const struct GNUNET_PeerIdentity *view_array,
2240            uint64_t view_size)
2241 {
2242   struct GNUNET_MQ_Envelope *ev;
2243   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2244   struct Sub *sub;
2245
2246   if (NULL == view_array)
2247   {
2248     if (NULL == cli_ctx->sub) sub = msub;
2249     else sub = cli_ctx->sub;
2250     view_size = View_size (sub->view);
2251     view_array = View_get_as_array (sub->view);
2252   }
2253
2254   ev = GNUNET_MQ_msg_extra (out_msg,
2255                             view_size * sizeof (struct GNUNET_PeerIdentity),
2256                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2257   out_msg->num_peers = htonl (view_size);
2258
2259   GNUNET_memcpy (&out_msg[1],
2260                  view_array,
2261                  view_size * sizeof (struct GNUNET_PeerIdentity));
2262   GNUNET_MQ_send (cli_ctx->mq, ev);
2263 }
2264
2265
2266 /**
2267  * @brief Send peer from biased stream to client.
2268  *
2269  * TODO merge with send_view, parameterise
2270  *
2271  * @param cli_ctx the context of the client
2272  * @param view_array the peerids of the view as array (can be empty)
2273  * @param view_size the size of the view array (can be 0)
2274  */
2275 static void
2276 send_stream_peers (const struct ClientContext *cli_ctx,
2277                    uint64_t num_peers,
2278                    const struct GNUNET_PeerIdentity *peers)
2279 {
2280   struct GNUNET_MQ_Envelope *ev;
2281   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2282
2283   GNUNET_assert (NULL != peers);
2284
2285   ev = GNUNET_MQ_msg_extra (out_msg,
2286                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2287                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2288   out_msg->num_peers = htonl (num_peers);
2289
2290   GNUNET_memcpy (&out_msg[1],
2291                  peers,
2292                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2293   GNUNET_MQ_send (cli_ctx->mq, ev);
2294 }
2295
2296
2297 /**
2298  * @brief sends updates to clients that are interested
2299  *
2300  * @param sub Sub for which to notify clients
2301  */
2302 static void
2303 clients_notify_view_update (const struct Sub *sub)
2304 {
2305   struct ClientContext *cli_ctx_iter;
2306   uint64_t num_peers;
2307   const struct GNUNET_PeerIdentity *view_array;
2308
2309   num_peers = View_size (sub->view);
2310   view_array = View_get_as_array(sub->view);
2311   /* check size of view is small enough */
2312   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2313   {
2314     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2315                 "View is too big to send\n");
2316     return;
2317   }
2318
2319   for (cli_ctx_iter = cli_ctx_head;
2320        NULL != cli_ctx_iter;
2321        cli_ctx_iter = cli_ctx_iter->next)
2322   {
2323     if (1 < cli_ctx_iter->view_updates_left)
2324     {
2325       /* Client wants to receive limited amount of updates */
2326       cli_ctx_iter->view_updates_left -= 1;
2327     } else if (1 == cli_ctx_iter->view_updates_left)
2328     {
2329       /* Last update of view for client */
2330       cli_ctx_iter->view_updates_left = -1;
2331     } else if (0 > cli_ctx_iter->view_updates_left) {
2332       /* Client is not interested in updates */
2333       continue;
2334     }
2335     /* else _updates_left == 0 - infinite amount of updates */
2336
2337     /* send view */
2338     send_view (cli_ctx_iter, view_array, num_peers);
2339   }
2340 }
2341
2342
2343 /**
2344  * @brief sends updates to clients that are interested
2345  *
2346  * @param num_peers Number of peers to send
2347  * @param peers the array of peers to send
2348  */
2349 static void
2350 clients_notify_stream_peer (const struct Sub *sub,
2351                             uint64_t num_peers,
2352                             const struct GNUNET_PeerIdentity *peers)
2353                             // TODO enum StreamPeerSource)
2354 {
2355   struct ClientContext *cli_ctx_iter;
2356
2357   LOG (GNUNET_ERROR_TYPE_DEBUG,
2358       "Got peer (%s) from biased stream - update all clients\n",
2359       GNUNET_i2s (peers));
2360
2361   for (cli_ctx_iter = cli_ctx_head;
2362        NULL != cli_ctx_iter;
2363        cli_ctx_iter = cli_ctx_iter->next)
2364   {
2365     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2366         (sub == cli_ctx_iter->sub || sub == msub))
2367     {
2368       send_stream_peers (cli_ctx_iter, num_peers, peers);
2369     }
2370   }
2371 }
2372
2373
2374 /**
2375  * Put random peer from sampler into the view as history update.
2376  *
2377  * @param ids Array of Peers to insert into view
2378  * @param num_peers Number of peers to insert
2379  * @param cls Closure - The Sub for which this is to be done
2380  */
2381 static void
2382 hist_update (const struct GNUNET_PeerIdentity *ids,
2383              uint32_t num_peers,
2384              void *cls)
2385 {
2386   unsigned int i;
2387   struct Sub *sub = cls;
2388
2389   for (i = 0; i < num_peers; i++)
2390   {
2391     int inserted;
2392     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2393     {
2394       LOG (GNUNET_ERROR_TYPE_WARNING,
2395            "Peer in history update not known!\n");
2396       continue;
2397     }
2398     inserted = insert_in_view (sub, &ids[i]);
2399     if (GNUNET_OK == inserted)
2400     {
2401       clients_notify_stream_peer (sub, 1, &ids[i]);
2402     }
2403     to_file (sub->file_name_view_log,
2404              "+%s\t(hist)",
2405              GNUNET_i2s_full (ids));
2406   }
2407   clients_notify_view_update (sub);
2408 }
2409
2410
2411 /**
2412  * Wrapper around #RPS_sampler_resize()
2413  *
2414  * If we do not have enough sampler elements, double current sampler size
2415  * If we have more than enough sampler elements, halv current sampler size
2416  *
2417  * @param sampler The sampler to resize
2418  * @param new_size New size to which to resize
2419  */
2420 static void
2421 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2422 {
2423   unsigned int sampler_size;
2424
2425   // TODO statistics
2426   // TODO respect the min, max
2427   sampler_size = RPS_sampler_get_size (sampler);
2428   if (sampler_size > new_size * 4)
2429   { /* Shrinking */
2430     RPS_sampler_resize (sampler, sampler_size / 2);
2431   }
2432   else if (sampler_size < new_size)
2433   { /* Growing */
2434     RPS_sampler_resize (sampler, sampler_size * 2);
2435   }
2436   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2437 }
2438
2439
2440 /**
2441  * Add all peers in @a peer_array to @a peer_map used as set.
2442  *
2443  * @param peer_array array containing the peers
2444  * @param num_peers number of peers in @peer_array
2445  * @param peer_map the peermap to use as set
2446  */
2447 static void
2448 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2449                        unsigned int num_peers,
2450                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2451 {
2452   unsigned int i;
2453   if (NULL == peer_map)
2454   {
2455     LOG (GNUNET_ERROR_TYPE_WARNING,
2456          "Trying to add peers to non-existing peermap.\n");
2457     return;
2458   }
2459
2460   for (i = 0; i < num_peers; i++)
2461   {
2462     GNUNET_CONTAINER_multipeermap_put (peer_map,
2463                                        &peer_array[i],
2464                                        NULL,
2465                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2466     if (msub->peer_map == peer_map)
2467     {
2468       GNUNET_STATISTICS_set (stats,
2469                             "# known peers",
2470                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2471                             GNUNET_NO);
2472     }
2473   }
2474 }
2475
2476
2477 /**
2478  * Send a PULL REPLY to @a peer_id
2479  *
2480  * @param peer_ctx Context of the peer to send the reply to
2481  * @param peer_ids the peers to send to @a peer_id
2482  * @param num_peer_ids the number of peers to send to @a peer_id
2483  */
2484 static void
2485 send_pull_reply (struct PeerContext *peer_ctx,
2486                  const struct GNUNET_PeerIdentity *peer_ids,
2487                  unsigned int num_peer_ids)
2488 {
2489   uint32_t send_size;
2490   struct GNUNET_MQ_Envelope *ev;
2491   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2492
2493   /* Compute actual size */
2494   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2495               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2496
2497   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2498     /* Compute number of peers to send
2499      * If too long, simply truncate */
2500     // TODO select random ones via permutation
2501     //      or even better: do good protocol design
2502     send_size =
2503       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2504        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2505        sizeof (struct GNUNET_PeerIdentity);
2506   else
2507     send_size = num_peer_ids;
2508
2509   LOG (GNUNET_ERROR_TYPE_DEBUG,
2510       "Going to send PULL REPLY with %u peers to %s\n",
2511       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2512
2513   ev = GNUNET_MQ_msg_extra (out_msg,
2514                             send_size * sizeof (struct GNUNET_PeerIdentity),
2515                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2516   out_msg->num_peers = htonl (send_size);
2517   GNUNET_memcpy (&out_msg[1], peer_ids,
2518          send_size * sizeof (struct GNUNET_PeerIdentity));
2519
2520   send_message (peer_ctx, ev, "PULL REPLY");
2521   if (peer_ctx->sub == msub)
2522   {
2523     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2524   }
2525   // TODO check with send intention: as send_channel is used/opened we indicate
2526   // a sending intention without intending it.
2527   // -> clean peer afterwards?
2528   // -> use recv_channel?
2529 }
2530
2531
2532 /**
2533  * Insert PeerID in #pull_map
2534  *
2535  * Called once we know a peer is online.
2536  *
2537  * @param cls Closure - Sub with the pull map to insert into
2538  * @param peer Peer to insert
2539  */
2540 static void
2541 insert_in_pull_map (void *cls,
2542                     const struct GNUNET_PeerIdentity *peer)
2543 {
2544   struct Sub *sub = cls;
2545
2546   CustomPeerMap_put (sub->pull_map, peer);
2547 }
2548
2549
2550 /**
2551  * Insert PeerID in #view
2552  *
2553  * Called once we know a peer is online.
2554  * Implements #PeerOp
2555  *
2556  * @param cls Closure - Sub with view to insert peer into
2557  * @param peer the peer to insert
2558  */
2559 static void
2560 insert_in_view_op (void *cls,
2561                    const struct GNUNET_PeerIdentity *peer)
2562 {
2563   struct Sub *sub = cls;
2564   int inserted;
2565
2566   inserted = insert_in_view (sub, peer);
2567   if (GNUNET_OK == inserted)
2568   {
2569     clients_notify_stream_peer (sub, 1, peer);
2570   }
2571 }
2572
2573
2574 /**
2575  * Update sampler with given PeerID.
2576  * Implements #PeerOp
2577  *
2578  * @param cls Closure - Sub containing the sampler to insert into
2579  * @param peer Peer to insert
2580  */
2581 static void
2582 insert_in_sampler (void *cls,
2583                    const struct GNUNET_PeerIdentity *peer)
2584 {
2585   struct Sub *sub = cls;
2586
2587   LOG (GNUNET_ERROR_TYPE_DEBUG,
2588        "Updating samplers with peer %s from insert_in_sampler()\n",
2589        GNUNET_i2s (peer));
2590   RPS_sampler_update (sub->sampler, peer);
2591   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2592   {
2593     /* Make sure we 'know' about this peer */
2594     (void) issue_peer_online_check (sub, peer);
2595     /* Establish a channel towards that peer to indicate we are going to send
2596      * messages to it */
2597     //indicate_sending_intention (peer);
2598   }
2599 #ifdef TO_FILE
2600   sub->num_observed_peers++;
2601   GNUNET_CONTAINER_multipeermap_put
2602     (sub->observed_unique_peers,
2603      peer,
2604      NULL,
2605      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2606   uint32_t num_observed_unique_peers =
2607     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2608   to_file (sub->file_name_observed_log,
2609           "%" PRIu32 " %" PRIu32 " %f\n",
2610           sub->num_observed_peers,
2611           num_observed_unique_peers,
2612           1.0*num_observed_unique_peers/sub->num_observed_peers)
2613 #endif /* TO_FILE */
2614 }
2615
2616
2617 /**
2618  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2619  *        If the peer is not known, online check is issued and it is
2620  *        scheduled to be inserted in sampler and view.
2621  *
2622  * "External sources" refer to every source except the gossip.
2623  *
2624  * @param sub Sub for which @a peer was received
2625  * @param peer peer to insert/peer received
2626  */
2627 static void
2628 got_peer (struct Sub *sub,
2629           const struct GNUNET_PeerIdentity *peer)
2630 {
2631   /* If we did not know this peer already, insert it into sampler and view */
2632   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2633   {
2634     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2635                         &insert_in_sampler, sub);
2636     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2637                         &insert_in_view_op, sub);
2638   }
2639   if (sub == msub)
2640   {
2641     GNUNET_STATISTICS_update (stats,
2642                               "# learnd peers",
2643                               1,
2644                               GNUNET_NO);
2645   }
2646 }
2647
2648
2649 /**
2650  * @brief Checks if there is a sending channel and if it is needed
2651  *
2652  * @param peer_ctx Context of the peer to check
2653  * @return GNUNET_YES if sending channel exists and is still needed
2654  *         GNUNET_NO  otherwise
2655  */
2656 static int
2657 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2658 {
2659   /* struct GNUNET_CADET_Channel *channel; */
2660   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2661                                      &peer_ctx->peer_id))
2662   {
2663     return GNUNET_NO;
2664   }
2665   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2666   {
2667     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2668                                     &peer_ctx->peer_id)) ||
2669          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2670                                             &peer_ctx->peer_id)) ||
2671          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2672                                                      &peer_ctx->peer_id)) ||
2673          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2674                                                      &peer_ctx->peer_id)) ||
2675          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2676                                          &peer_ctx->peer_id,
2677                                          Peers_PULL_REPLY_PENDING)))
2678     { /* If we want to keep the connection to peer open */
2679       return GNUNET_YES;
2680     }
2681     return GNUNET_NO;
2682   }
2683   return GNUNET_NO;
2684 }
2685
2686
2687 /**
2688  * @brief remove peer from our knowledge, the view, push and pull maps and
2689  * samplers.
2690  *
2691  * @param sub Sub with the data structures the peer is to be removed from
2692  * @param peer the peer to remove
2693  */
2694 static void
2695 remove_peer (struct Sub *sub,
2696              const struct GNUNET_PeerIdentity *peer)
2697 {
2698   (void) View_remove_peer (sub->view,
2699                            peer);
2700   CustomPeerMap_remove_peer (sub->pull_map,
2701                              peer);
2702   CustomPeerMap_remove_peer (sub->push_map,
2703                              peer);
2704   RPS_sampler_reinitialise_by_value (sub->sampler,
2705                                      peer);
2706   /* We want to destroy the peer now.
2707    * Sometimes, it just seems that it's already been removed from the peer_map,
2708    * so check the peer_map first. */
2709   if (GNUNET_YES == check_peer_known (sub->peer_map,
2710                                       peer))
2711   {
2712           destroy_peer (get_peer_ctx (sub->peer_map,
2713                                       peer));
2714   }
2715 }
2716
2717
2718 /**
2719  * @brief Remove data that is not needed anymore.
2720  *
2721  * If the sending channel is no longer needed it is destroyed.
2722  *
2723  * @param sub Sub in which the current peer is to be cleaned
2724  * @param peer the peer whose data is about to be cleaned
2725  */
2726 static void
2727 clean_peer (struct Sub *sub,
2728             const struct GNUNET_PeerIdentity *peer)
2729 {
2730   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2731                                                                peer)))
2732   {
2733     LOG (GNUNET_ERROR_TYPE_DEBUG,
2734         "Going to remove send channel to peer %s\n",
2735         GNUNET_i2s (peer));
2736     #if ENABLE_MALICIOUS
2737     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer,
2738                                               peer))
2739       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2740                                                     peer));
2741     #else /* ENABLE_MALICIOUS */
2742     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2743                                                   peer));
2744     #endif /* ENABLE_MALICIOUS */
2745   }
2746
2747   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2748                                                            peer))
2749   {
2750     /* Peer was already removed by callback on destroyed channel */
2751     LOG (GNUNET_ERROR_TYPE_WARNING,
2752         "Peer was removed from our knowledge during cleanup\n");
2753     return;
2754   }
2755
2756   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2757                                                               peer))) &&
2758        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2759        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2760        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2761        (0 == RPS_sampler_count_id (sub->sampler,   peer)) &&
2762        (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
2763   { /* We can safely remove this peer */
2764     LOG (GNUNET_ERROR_TYPE_DEBUG,
2765         "Going to remove peer %s\n",
2766         GNUNET_i2s (peer));
2767     remove_peer (sub, peer);
2768     return;
2769   }
2770 }
2771
2772
2773 /**
2774  * @brief This is called when a channel is destroyed.
2775  *
2776  * Removes peer completely from our knowledge if the send_channel was destroyed
2777  * Otherwise simply delete the recv_channel
2778  * Also check if the knowledge about this peer is still needed.
2779  * If not, remove this peer from our knowledge.
2780  *
2781  * @param cls The closure - Context to the channel
2782  * @param channel The channel being closed
2783  */
2784 static void
2785 cleanup_destroyed_channel (void *cls,
2786                            const struct GNUNET_CADET_Channel *channel)
2787 {
2788   struct ChannelCtx *channel_ctx = cls;
2789   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2790   (void) channel;
2791
2792   channel_ctx->channel = NULL;
2793   remove_channel_ctx (channel_ctx);
2794   if (NULL != peer_ctx &&
2795       peer_ctx->send_channel_ctx == channel_ctx &&
2796       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2797   {
2798     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2799   }
2800 }
2801
2802 /***********************************************************************
2803  * /Util functions
2804 ***********************************************************************/
2805
2806
2807
2808 /***********************************************************************
2809  * Sub
2810 ***********************************************************************/
2811
2812 /**
2813  * @brief Create a new Sub
2814  *
2815  * @param hash Hash of value shared among rps instances on other hosts that
2816  *        defines a subgroup to sample from.
2817  * @param sampler_size Size of the sampler
2818  * @param round_interval Interval (in average) between two rounds
2819  *
2820  * @return Sub
2821  */
2822 struct Sub *
2823 new_sub (const struct GNUNET_HashCode *hash,
2824          uint32_t sampler_size,
2825          struct GNUNET_TIME_Relative round_interval)
2826 {
2827   struct Sub *sub;
2828
2829   sub = GNUNET_new (struct Sub);
2830
2831   /* With the hash generated from the secret value this service only connects
2832    * to rps instances that share the value */
2833   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2834     GNUNET_MQ_hd_fixed_size (peer_check,
2835                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2836                              struct GNUNET_MessageHeader,
2837                              NULL),
2838     GNUNET_MQ_hd_fixed_size (peer_push,
2839                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2840                              struct GNUNET_MessageHeader,
2841                              NULL),
2842     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2843                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2844                              struct GNUNET_MessageHeader,
2845                              NULL),
2846     GNUNET_MQ_hd_var_size (peer_pull_reply,
2847                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2848                            struct GNUNET_RPS_P2P_PullReplyMessage,
2849                            NULL),
2850     GNUNET_MQ_handler_end ()
2851   };
2852   sub->hash = *hash;
2853   sub->cadet_port =
2854     GNUNET_CADET_open_port (cadet_handle,
2855                             &sub->hash,
2856                             &handle_inbound_channel, /* Connect handler */
2857                             sub, /* cls */
2858                             NULL, /* WindowSize handler */
2859                             &cleanup_destroyed_channel, /* Disconnect handler */
2860                             cadet_handlers);
2861   if (NULL == sub->cadet_port)
2862   {
2863     LOG (GNUNET_ERROR_TYPE_ERROR,
2864         "Cadet port `%s' is already in use.\n",
2865         GNUNET_APPLICATION_PORT_RPS);
2866     GNUNET_assert (0);
2867   }
2868
2869   /* Set up general data structure to keep track about peers */
2870   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2871   if (GNUNET_OK !=
2872       GNUNET_CONFIGURATION_get_value_filename (cfg,
2873                                                "rps",
2874                                                "FILENAME_VALID_PEERS",
2875                                                &sub->filename_valid_peers))
2876   {
2877     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2878                                "rps",
2879                                "FILENAME_VALID_PEERS");
2880   }
2881   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2882   {
2883     char *tmp_filename_valid_peers;
2884     char str_hash[105];
2885
2886     GNUNET_snprintf (str_hash,
2887                      sizeof (str_hash),
2888                      GNUNET_h2s_full (hash));
2889     tmp_filename_valid_peers = sub->filename_valid_peers;
2890     GNUNET_asprintf (&sub->filename_valid_peers,
2891                      "%s%s",
2892                      tmp_filename_valid_peers,
2893                      str_hash);
2894     GNUNET_free (tmp_filename_valid_peers);
2895   }
2896   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2897
2898   /* Set up the sampler */
2899   sub->sampler_size_est_min = sampler_size;
2900   sub->sampler_size_est_need = sampler_size;;
2901   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2902   GNUNET_assert (0 != round_interval.rel_value_us);
2903   sub->round_interval = round_interval;
2904   sub->sampler = RPS_sampler_init (sampler_size,
2905                                   round_interval);
2906
2907   /* Logging of internals */
2908   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2909 #ifdef TO_FILE
2910   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2911                                                        "observed");
2912   sub->num_observed_peers = 0;
2913   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2914                                                                     GNUNET_NO);
2915 #endif /* TO_FILE */
2916
2917   /* Set up data structures for gossip */
2918   sub->push_map = CustomPeerMap_create (4);
2919   sub->pull_map = CustomPeerMap_create (4);
2920   sub->view_size_est_min = sampler_size;;
2921   sub->view = View_create (sub->view_size_est_min);
2922   if (sub == msub)
2923   {
2924     GNUNET_STATISTICS_set (stats,
2925                            "view size aim",
2926                            sub->view_size_est_min,
2927                            GNUNET_NO);
2928   }
2929
2930   /* Start executing rounds */
2931   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2932
2933   return sub;
2934 }
2935
2936
2937 #ifdef TO_FILE
2938 /**
2939  * @brief Write all numbers in the given array into the given file
2940  *
2941  * Single numbers devided by a newline
2942  *
2943  * @param hist_array[] the array to dump
2944  * @param file_name file to dump into
2945  */
2946 static void
2947 write_histogram_to_file (const uint32_t hist_array[],
2948                          const char *file_name)
2949 {
2950   char collect_str[SIZE_DUMP_FILE + 1] = "";
2951   char *recv_str_iter;
2952   char *file_name_full;
2953
2954   recv_str_iter = collect_str;
2955   file_name_full = store_prefix_file_name (&own_identity,
2956                                            file_name);
2957   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2958   {
2959     char collect_str_tmp[8];
2960
2961     GNUNET_snprintf (collect_str_tmp,
2962                      sizeof (collect_str_tmp),
2963                      "%" PRIu32 "\n",
2964                      hist_array[i]);
2965     recv_str_iter = stpncpy (recv_str_iter,
2966                              collect_str_tmp,
2967                              6);
2968   }
2969   (void) stpcpy (recv_str_iter,
2970                  "\n");
2971   LOG (GNUNET_ERROR_TYPE_DEBUG,
2972        "Writing push stats to disk\n");
2973   to_file_w_len (file_name_full,
2974                  SIZE_DUMP_FILE,
2975                  collect_str);
2976   GNUNET_free (file_name_full);
2977 }
2978 #endif /* TO_FILE */
2979
2980
2981 /**
2982  * @brief Destroy Sub.
2983  *
2984  * @param sub Sub to destroy
2985  */
2986 static void
2987 destroy_sub (struct Sub *sub)
2988 {
2989   GNUNET_assert (NULL != sub);
2990   GNUNET_assert (NULL != sub->do_round_task);
2991   GNUNET_SCHEDULER_cancel (sub->do_round_task);
2992   sub->do_round_task = NULL;
2993
2994   /* Disconnect from cadet */
2995   GNUNET_CADET_close_port (sub->cadet_port);
2996
2997   /* Clean up data structures for peers */
2998   RPS_sampler_destroy (sub->sampler);
2999   sub->sampler = NULL;
3000   View_destroy (sub->view);
3001   sub->view = NULL;
3002   CustomPeerMap_destroy (sub->push_map);
3003   sub->push_map = NULL;
3004   CustomPeerMap_destroy (sub->pull_map);
3005   sub->pull_map = NULL;
3006   peers_terminate (sub);
3007
3008   /* Free leftover data structures */
3009   GNUNET_free (sub->file_name_view_log);
3010   sub->file_name_view_log = NULL;
3011 #ifdef TO_FILE
3012   GNUNET_free (sub->file_name_observed_log);
3013   sub->file_name_observed_log = NULL;
3014
3015   /* Write push frequencies to disk */
3016   write_histogram_to_file (sub->push_recv,
3017                            "push_recv");
3018
3019   /* Write push deltas to disk */
3020   write_histogram_to_file (sub->push_delta,
3021                            "push_delta");
3022
3023   /* Write pull delays to disk */
3024   write_histogram_to_file (sub->pull_delays,
3025                            "pull_delays");
3026
3027   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3028   sub->observed_unique_peers = NULL;
3029 #endif /* TO_FILE */
3030
3031   GNUNET_free (sub);
3032 }
3033
3034
3035 /***********************************************************************
3036  * /Sub
3037 ***********************************************************************/
3038
3039
3040 /***********************************************************************
3041  * Core handlers
3042 ***********************************************************************/
3043
3044 /**
3045  * @brief Callback on initialisation of Core.
3046  *
3047  * @param cls - unused
3048  * @param my_identity - unused
3049  */
3050 void
3051 core_init (void *cls,
3052            const struct GNUNET_PeerIdentity *my_identity)
3053 {
3054   (void) cls;
3055   (void) my_identity;
3056
3057   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3058 }
3059
3060
3061 /**
3062  * @brief Callback for core.
3063  * Method called whenever a given peer connects.
3064  *
3065  * @param cls closure - unused
3066  * @param peer peer identity this notification is about
3067  * @return closure given to #core_disconnects as peer_cls
3068  */
3069 void *
3070 core_connects (void *cls,
3071                const struct GNUNET_PeerIdentity *peer,
3072                struct GNUNET_MQ_Handle *mq)
3073 {
3074   (void) cls;
3075   (void) mq;
3076
3077   GNUNET_assert (GNUNET_YES ==
3078                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3079                                                     peer,
3080                                                     NULL,
3081                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3082   return NULL;
3083 }
3084
3085
3086 /**
3087  * @brief Callback for core.
3088  * Method called whenever a peer disconnects.
3089  *
3090  * @param cls closure - unused
3091  * @param peer peer identity this notification is about
3092  * @param peer_cls closure given in #core_connects - unused
3093  */
3094 void
3095 core_disconnects (void *cls,
3096                   const struct GNUNET_PeerIdentity *peer,
3097                   void *peer_cls)
3098 {
3099   (void) cls;
3100   (void) peer_cls;
3101
3102   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3103 }
3104
3105 /***********************************************************************
3106  * /Core handlers
3107 ***********************************************************************/
3108
3109
3110 /**
3111  * @brief Destroy the context for a (connected) client
3112  *
3113  * @param cli_ctx Context to destroy
3114  */
3115 static void
3116 destroy_cli_ctx (struct ClientContext *cli_ctx)
3117 {
3118   GNUNET_assert (NULL != cli_ctx);
3119   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3120                                cli_ctx_tail,
3121                                cli_ctx);
3122   if (NULL != cli_ctx->sub)
3123   {
3124     destroy_sub (cli_ctx->sub);
3125     cli_ctx->sub = NULL;
3126   }
3127   GNUNET_free (cli_ctx);
3128 }
3129
3130
3131 /**
3132  * @brief Update sizes in sampler and view on estimate update from nse service
3133  *
3134  * @param sub Sub
3135  * @param logestimate the log(Base 2) value of the current network size estimate
3136  * @param std_dev standard deviation for the estimate
3137  */
3138 static void
3139 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3140 {
3141   double estimate;
3142   //double scale; // TODO this might go gloabal/config
3143
3144   LOG (GNUNET_ERROR_TYPE_DEBUG,
3145        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3146        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3147   //scale = .01;
3148   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3149   // GNUNET_NSE_log_estimate_to_n (logestimate);
3150   estimate = pow (estimate, 1.0 / 3);
3151   // TODO add if std_dev is a number
3152   // estimate += (std_dev * scale);
3153   if (sub->view_size_est_min < ceil (estimate))
3154   {
3155     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3156     sub->sampler_size_est_need = estimate;
3157     sub->view_size_est_need = estimate;
3158   } else
3159   {
3160     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3161     //sub->sampler_size_est_need = sub->view_size_est_min;
3162     sub->view_size_est_need = sub->view_size_est_min;
3163   }
3164   if (sub == msub)
3165   {
3166     GNUNET_STATISTICS_set (stats,
3167                            "view size aim",
3168                            sub->view_size_est_need,
3169                            GNUNET_NO);
3170   }
3171
3172   /* If the NSE has changed adapt the lists accordingly */
3173   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3174   View_change_len (sub->view, sub->view_size_est_need);
3175 }
3176
3177
3178 /**
3179  * Function called by NSE.
3180  *
3181  * Updates sizes of sampler list and view and adapt those lists
3182  * accordingly.
3183  *
3184  * implements #GNUNET_NSE_Callback
3185  *
3186  * @param cls Closure - unused
3187  * @param timestamp time when the estimate was received from the server (or created by the server)
3188  * @param logestimate the log(Base 2) value of the current network size estimate
3189  * @param std_dev standard deviation for the estimate
3190  */
3191 static void
3192 nse_callback (void *cls,
3193               struct GNUNET_TIME_Absolute timestamp,
3194               double logestimate, double std_dev)
3195 {
3196   (void) cls;
3197   (void) timestamp;
3198   struct ClientContext *cli_ctx_iter;
3199
3200   adapt_sizes (msub, logestimate, std_dev);
3201   for (cli_ctx_iter = cli_ctx_head;
3202       NULL != cli_ctx_iter;
3203       cli_ctx_iter = cli_ctx_iter->next)
3204   {
3205     if (NULL != cli_ctx_iter->sub)
3206     {
3207       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3208     }
3209   }
3210 }
3211
3212
3213 /**
3214  * @brief This function is called, when the client seeds peers.
3215  * It verifies that @a msg is well-formed.
3216  *
3217  * @param cls the closure (#ClientContext)
3218  * @param msg the message
3219  * @return #GNUNET_OK if @a msg is well-formed
3220  *         #GNUNET_SYSERR otherwise
3221  */
3222 static int
3223 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3224 {
3225   struct ClientContext *cli_ctx = cls;
3226   uint16_t msize = ntohs (msg->header.size);
3227   uint32_t num_peers = ntohl (msg->num_peers);
3228
3229   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3230   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3231        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3232   {
3233     LOG (GNUNET_ERROR_TYPE_ERROR,
3234         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3235         ntohl (msg->num_peers),
3236         (msize / sizeof (struct GNUNET_PeerIdentity)));
3237     GNUNET_break (0);
3238     GNUNET_SERVICE_client_drop (cli_ctx->client);
3239     return GNUNET_SYSERR;
3240   }
3241   return GNUNET_OK;
3242 }
3243
3244
3245 /**
3246  * Handle seed from the client.
3247  *
3248  * @param cls closure
3249  * @param message the actual message
3250  */
3251 static void
3252 handle_client_seed (void *cls,
3253                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3254 {
3255   struct ClientContext *cli_ctx = cls;
3256   struct GNUNET_PeerIdentity *peers;
3257   uint32_t num_peers;
3258   uint32_t i;
3259
3260   num_peers = ntohl (msg->num_peers);
3261   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3262
3263   LOG (GNUNET_ERROR_TYPE_DEBUG,
3264        "Client seeded peers:\n");
3265   print_peer_list (peers, num_peers);
3266
3267   for (i = 0; i < num_peers; i++)
3268   {
3269     LOG (GNUNET_ERROR_TYPE_DEBUG,
3270          "Updating samplers with seed %" PRIu32 ": %s\n",
3271          i,
3272          GNUNET_i2s (&peers[i]));
3273
3274     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3275     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3276   }
3277   GNUNET_SERVICE_client_continue (cli_ctx->client);
3278 }
3279
3280
3281 /**
3282  * Handle RPS request from the client.
3283  *
3284  * @param cls Client context
3285  * @param message Message containing the numer of updates the client wants to
3286  * receive
3287  */
3288 static void
3289 handle_client_view_request (void *cls,
3290                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3291 {
3292   struct ClientContext *cli_ctx = cls;
3293   uint64_t num_updates;
3294
3295   num_updates = ntohl (msg->num_updates);
3296
3297   LOG (GNUNET_ERROR_TYPE_DEBUG,
3298        "Client requested %" PRIu64 " updates of view.\n",
3299        num_updates);
3300
3301   GNUNET_assert (NULL != cli_ctx);
3302   cli_ctx->view_updates_left = num_updates;
3303   send_view (cli_ctx, NULL, 0);
3304   GNUNET_SERVICE_client_continue (cli_ctx->client);
3305 }
3306
3307
3308 /**
3309  * @brief Handle the cancellation of the view updates.
3310  *
3311  * @param cls The client context
3312  * @param msg Unused
3313  */
3314 static void
3315 handle_client_view_cancel (void *cls,
3316                            const struct GNUNET_MessageHeader *msg)
3317 {
3318   struct ClientContext *cli_ctx = cls;
3319   (void) msg;
3320
3321   LOG (GNUNET_ERROR_TYPE_DEBUG,
3322        "Client does not want to receive updates of view any more.\n");
3323
3324   GNUNET_assert (NULL != cli_ctx);
3325   cli_ctx->view_updates_left = 0;
3326   GNUNET_SERVICE_client_continue (cli_ctx->client);
3327   if (GNUNET_YES == cli_ctx->stream_update)
3328   {
3329     destroy_cli_ctx (cli_ctx);
3330   }
3331 }
3332
3333
3334 /**
3335  * Handle RPS request for biased stream from the client.
3336  *
3337  * @param cls Client context
3338  * @param message unused
3339  */
3340 static void
3341 handle_client_stream_request (void *cls,
3342                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3343 {
3344   struct ClientContext *cli_ctx = cls;
3345   (void) msg;
3346
3347   LOG (GNUNET_ERROR_TYPE_DEBUG,
3348        "Client requested peers from biased stream.\n");
3349   cli_ctx->stream_update = GNUNET_YES;
3350
3351   GNUNET_assert (NULL != cli_ctx);
3352   GNUNET_SERVICE_client_continue (cli_ctx->client);
3353 }
3354
3355
3356 /**
3357  * @brief Handles the cancellation of the stream of biased peer ids
3358  *
3359  * @param cls The client context
3360  * @param msg unused
3361  */
3362 static void
3363 handle_client_stream_cancel (void *cls,
3364                              const struct GNUNET_MessageHeader *msg)
3365 {
3366   struct ClientContext *cli_ctx = cls;
3367   (void) msg;
3368
3369   LOG (GNUNET_ERROR_TYPE_DEBUG,
3370        "Client canceled receiving peers from biased stream.\n");
3371   cli_ctx->stream_update = GNUNET_NO;
3372
3373   GNUNET_assert (NULL != cli_ctx);
3374   GNUNET_SERVICE_client_continue (cli_ctx->client);
3375 }
3376
3377
3378 /**
3379  * @brief Create and start a Sub.
3380  *
3381  * @param cls Closure - unused
3382  * @param msg Message containing the necessary information
3383  */
3384 static void
3385 handle_client_start_sub (void *cls,
3386                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3387 {
3388   struct ClientContext *cli_ctx = cls;
3389
3390   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3391   if (NULL != cli_ctx->sub &&
3392       0 != memcmp (&cli_ctx->sub->hash,
3393                    &msg->hash,
3394                    sizeof (struct GNUNET_HashCode)))
3395   {
3396     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3397     destroy_sub (cli_ctx->sub);
3398     cli_ctx->sub = NULL;
3399   }
3400   cli_ctx->sub = new_sub (&msg->hash,
3401                          msub->sampler_size_est_min, // TODO make api input?
3402                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3403   GNUNET_SERVICE_client_continue (cli_ctx->client);
3404 }
3405
3406
3407 /**
3408  * @brief Destroy the Sub
3409  *
3410  * @param cls Closure - unused
3411  * @param msg Message containing the hash that identifies the Sub
3412  */
3413 static void
3414 handle_client_stop_sub (void *cls,
3415                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3416 {
3417   struct ClientContext *cli_ctx = cls;
3418
3419   GNUNET_assert (NULL != cli_ctx->sub);
3420   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3421   {
3422     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3423   }
3424   destroy_sub (cli_ctx->sub);
3425   cli_ctx->sub = NULL;
3426   GNUNET_SERVICE_client_continue (cli_ctx->client);
3427 }
3428
3429
3430 /**
3431  * Handle a CHECK_LIVE message from another peer.
3432  *
3433  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3434  * the channel is blocked for all other communication.
3435  *
3436  * @param cls Closure - Context of channel
3437  * @param msg Message - unused
3438  */
3439 static void
3440 handle_peer_check (void *cls,
3441                    const struct GNUNET_MessageHeader *msg)
3442 {
3443   const struct ChannelCtx *channel_ctx = cls;
3444   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3445   (void) msg;
3446
3447   LOG (GNUNET_ERROR_TYPE_DEBUG,
3448       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3449   if (channel_ctx->peer_ctx->sub == msub)
3450   {
3451     GNUNET_STATISTICS_update (stats,
3452                               "# pending online checks",
3453                               -1,
3454                               GNUNET_NO);
3455   }
3456
3457   GNUNET_CADET_receive_done (channel_ctx->channel);
3458 }
3459
3460
3461 /**
3462  * Handle a PUSH message from another peer.
3463  *
3464  * Check the proof of work and store the PeerID
3465  * in the temporary list for pushed PeerIDs.
3466  *
3467  * @param cls Closure - Context of channel
3468  * @param msg Message - unused
3469  */
3470 static void
3471 handle_peer_push (void *cls,
3472                   const struct GNUNET_MessageHeader *msg)
3473 {
3474   const struct ChannelCtx *channel_ctx = cls;
3475   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3476   (void) msg;
3477
3478   // (check the proof of work (?))
3479
3480   LOG (GNUNET_ERROR_TYPE_DEBUG,
3481        "Received PUSH (%s)\n",
3482        GNUNET_i2s (peer));
3483   if (channel_ctx->peer_ctx->sub == msub)
3484   {
3485     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3486   }
3487
3488   #if ENABLE_MALICIOUS
3489   struct AttackedPeer *tmp_att_peer;
3490
3491   if ( (1 == mal_type) ||
3492        (3 == mal_type) )
3493   { /* Try to maximise representation */
3494     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3495     tmp_att_peer->peer_id = *peer;
3496     if (NULL == att_peer_set)
3497       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3498     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3499                                                              peer))
3500     {
3501       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3502                                    att_peers_tail,
3503                                    tmp_att_peer);
3504       add_peer_array_to_set (peer, 1, att_peer_set);
3505     }
3506     else
3507     {
3508       GNUNET_free (tmp_att_peer);
3509     }
3510   }
3511
3512
3513   else if (2 == mal_type)
3514   {
3515     /* We attack one single well-known peer - simply ignore */
3516   }
3517   #endif /* ENABLE_MALICIOUS */
3518
3519   /* Add the sending peer to the push_map */
3520   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3521
3522   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3523                                      &channel_ctx->peer_ctx->peer_id));
3524   GNUNET_CADET_receive_done (channel_ctx->channel);
3525 }
3526
3527
3528 /**
3529  * Handle PULL REQUEST request message from another peer.
3530  *
3531  * Reply with the view of PeerIDs.
3532  *
3533  * @param cls Closure - Context of channel
3534  * @param msg Message - unused
3535  */
3536 static void
3537 handle_peer_pull_request (void *cls,
3538                           const struct GNUNET_MessageHeader *msg)
3539 {
3540   const struct ChannelCtx *channel_ctx = cls;
3541   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3542   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3543   const struct GNUNET_PeerIdentity *view_array;
3544   (void) msg;
3545
3546   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3547   if (peer_ctx->sub == msub)
3548   {
3549     GNUNET_STATISTICS_update(stats,
3550                              "# pull request message received",
3551                              1,
3552                              GNUNET_NO);
3553     if (NULL != map_single_hop &&
3554         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3555                                                              &peer_ctx->peer_id))
3556     {
3557       GNUNET_STATISTICS_update (stats,
3558                                 "# pull request message received (multi-hop peer)",
3559                                 1,
3560                                 GNUNET_NO);
3561     }
3562   }
3563
3564   #if ENABLE_MALICIOUS
3565   if (1 == mal_type
3566       || 3 == mal_type)
3567   { /* Try to maximise representation */
3568     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3569   }
3570
3571   else if (2 == mal_type)
3572   { /* Try to partition network */
3573     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3574     {
3575       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3576     }
3577   }
3578   #endif /* ENABLE_MALICIOUS */
3579
3580   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3581                                      &channel_ctx->peer_ctx->peer_id));
3582   GNUNET_CADET_receive_done (channel_ctx->channel);
3583   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3584   send_pull_reply (peer_ctx,
3585                    view_array,
3586                    View_size (channel_ctx->peer_ctx->sub->view));
3587 }
3588
3589
3590 /**
3591  * Check whether we sent a corresponding request and
3592  * whether this reply is the first one.
3593  *
3594  * @param cls Closure - Context of channel
3595  * @param msg Message containing the replied peers
3596  */
3597 static int
3598 check_peer_pull_reply (void *cls,
3599                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3600 {
3601   struct ChannelCtx *channel_ctx = cls;
3602   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3603
3604   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3605   {
3606     GNUNET_break_op (0);
3607     return GNUNET_SYSERR;
3608   }
3609
3610   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3611       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3612   {
3613     LOG (GNUNET_ERROR_TYPE_ERROR,
3614         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3615         ntohl (msg->num_peers),
3616         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3617             sizeof (struct GNUNET_PeerIdentity));
3618     GNUNET_break_op (0);
3619     return GNUNET_SYSERR;
3620   }
3621
3622   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3623                                      &sender_ctx->peer_id,
3624                                      Peers_PULL_REPLY_PENDING))
3625   {
3626     LOG (GNUNET_ERROR_TYPE_WARNING,
3627         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3628         GNUNET_i2s (&sender_ctx->peer_id));
3629     if (sender_ctx->sub == msub)
3630     {
3631       GNUNET_STATISTICS_update (stats,
3632                                 "# unrequested pull replies",
3633                                 1,
3634                                 GNUNET_NO);
3635     }
3636     GNUNET_break_op (0);
3637     return GNUNET_SYSERR;
3638   }
3639   return GNUNET_OK;
3640 }
3641
3642
3643 /**
3644  * Handle PULL REPLY message from another peer.
3645  *
3646  * @param cls Closure
3647  * @param msg The message header
3648  */
3649 static void
3650 handle_peer_pull_reply (void *cls,
3651                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3652 {
3653   const struct ChannelCtx *channel_ctx = cls;
3654   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3655   const struct GNUNET_PeerIdentity *peers;
3656   struct Sub *sub = channel_ctx->peer_ctx->sub;
3657   uint32_t i;
3658 #if ENABLE_MALICIOUS
3659   struct AttackedPeer *tmp_att_peer;
3660 #endif /* ENABLE_MALICIOUS */
3661
3662   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3663   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3664   if (channel_ctx->peer_ctx->sub == msub)
3665   {
3666     GNUNET_STATISTICS_update (stats,
3667                               "# pull reply messages received",
3668                               1,
3669                               GNUNET_NO);
3670     if (NULL != map_single_hop &&
3671         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3672           &channel_ctx->peer_ctx->peer_id))
3673     {
3674       GNUNET_STATISTICS_update (stats,
3675                                 "# pull reply messages received (multi-hop peer)",
3676                                 1,
3677                                 GNUNET_NO);
3678     }
3679   }
3680
3681   #if ENABLE_MALICIOUS
3682   // We shouldn't even receive pull replies as we're not sending
3683   if (2 == mal_type)
3684   {
3685   }
3686   #endif /* ENABLE_MALICIOUS */
3687
3688   /* Do actual logic */
3689   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3690
3691   LOG (GNUNET_ERROR_TYPE_DEBUG,
3692        "PULL REPLY received, got following %u peers:\n",
3693        ntohl (msg->num_peers));
3694
3695   for (i = 0; i < ntohl (msg->num_peers); i++)
3696   {
3697     LOG (GNUNET_ERROR_TYPE_DEBUG,
3698          "%u. %s\n",
3699          i,
3700          GNUNET_i2s (&peers[i]));
3701
3702     #if ENABLE_MALICIOUS
3703     if ((NULL != att_peer_set) &&
3704         (1 == mal_type || 3 == mal_type))
3705     { /* Add attacked peer to local list */
3706       // TODO check if we sent a request and this was the first reply
3707       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3708                                                                &peers[i])
3709           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3710                                                                   &peers[i]))
3711       {
3712         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3713         tmp_att_peer->peer_id = peers[i];
3714         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3715                                      att_peers_tail,
3716                                      tmp_att_peer);
3717         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3718       }
3719       continue;
3720     }
3721     #endif /* ENABLE_MALICIOUS */
3722     /* Make sure we 'know' about this peer */
3723     (void) insert_peer (channel_ctx->peer_ctx->sub,
3724                         &peers[i]);
3725
3726     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3727                                         &peers[i]))
3728     {
3729       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3730                          &peers[i]);
3731     }
3732     else
3733     {
3734       schedule_operation (channel_ctx->peer_ctx,
3735                           insert_in_pull_map,
3736                           channel_ctx->peer_ctx->sub); /* cls */
3737       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3738                                       &peers[i]);
3739     }
3740   }
3741
3742   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3743                                  sender),
3744                    Peers_PULL_REPLY_PENDING);
3745   clean_peer (channel_ctx->peer_ctx->sub,
3746               sender);
3747
3748   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3749                                      sender));
3750   GNUNET_CADET_receive_done (channel_ctx->channel);
3751 }
3752
3753
3754 /**
3755  * Compute a random delay.
3756  * A uniformly distributed value between mean + spread and mean - spread.
3757  *
3758  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3759  * It would return a random value between 2 and 6 min.
3760  *
3761  * @param mean the mean time until the next round
3762  * @param spread the inverse amount of deviation from the mean
3763  */
3764 static struct GNUNET_TIME_Relative
3765 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3766                     unsigned int spread)
3767 {
3768   struct GNUNET_TIME_Relative half_interval;
3769   struct GNUNET_TIME_Relative ret;
3770   unsigned int rand_delay;
3771   unsigned int max_rand_delay;
3772
3773   if (0 == spread)
3774   {
3775     LOG (GNUNET_ERROR_TYPE_WARNING,
3776          "Not accepting spread of 0\n");
3777     GNUNET_break (0);
3778     GNUNET_assert (0);
3779   }
3780   GNUNET_assert (0 != mean.rel_value_us);
3781
3782   /* Compute random time value between spread * mean and spread * mean */
3783   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3784
3785   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3786   /**
3787    * Compute random value between (0 and 1) * round_interval
3788    * via multiplying round_interval with a 'fraction' (0 to value)/value
3789    */
3790   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3791   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3792   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3793   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3794
3795   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3796     LOG (GNUNET_ERROR_TYPE_WARNING,
3797          "Returning FOREVER_REL\n");
3798
3799   return ret;
3800 }
3801
3802
3803 /**
3804  * Send single pull request
3805  *
3806  * @param peer_ctx Context to the peer to send request to
3807  */
3808 static void
3809 send_pull_request (struct PeerContext *peer_ctx)
3810 {
3811   struct GNUNET_MQ_Envelope *ev;
3812
3813   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3814                                                &peer_ctx->peer_id,
3815                                                Peers_PULL_REPLY_PENDING));
3816   SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
3817   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3818
3819   LOG (GNUNET_ERROR_TYPE_DEBUG,
3820        "Going to send PULL REQUEST to peer %s.\n",
3821        GNUNET_i2s (&peer_ctx->peer_id));
3822
3823   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3824   send_message (peer_ctx, ev, "PULL REQUEST");
3825   if (peer_ctx->sub)
3826   {
3827     GNUNET_STATISTICS_update (stats,
3828                               "# pull request send issued",
3829                               1,
3830                               GNUNET_NO);
3831     if (NULL != map_single_hop &&
3832         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3833                                                              &peer_ctx->peer_id))
3834     {
3835       GNUNET_STATISTICS_update (stats,
3836                                 "# pull request send issued (multi-hop peer)",
3837                                 1,
3838                                 GNUNET_NO);
3839     }
3840   }
3841 }
3842
3843
3844 /**
3845  * Send single push
3846  *
3847  * @param peer_ctx Context of peer to send push to
3848  */
3849 static void
3850 send_push (struct PeerContext *peer_ctx)
3851 {
3852   struct GNUNET_MQ_Envelope *ev;
3853
3854   LOG (GNUNET_ERROR_TYPE_DEBUG,
3855        "Going to send PUSH to peer %s.\n",
3856        GNUNET_i2s (&peer_ctx->peer_id));
3857
3858   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3859   send_message (peer_ctx, ev, "PUSH");
3860   if (peer_ctx->sub)
3861   {
3862     GNUNET_STATISTICS_update (stats,
3863                               "# push send issued",
3864                               1,
3865                               GNUNET_NO);
3866   }
3867 }
3868
3869
3870 #if ENABLE_MALICIOUS
3871
3872
3873 /**
3874  * @brief This function is called, when the client tells us to act malicious.
3875  * It verifies that @a msg is well-formed.
3876  *
3877  * @param cls the closure (#ClientContext)
3878  * @param msg the message
3879  * @return #GNUNET_OK if @a msg is well-formed
3880  */
3881 static int
3882 check_client_act_malicious (void *cls,
3883                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3884 {
3885   struct ClientContext *cli_ctx = cls;
3886   uint16_t msize = ntohs (msg->header.size);
3887   uint32_t num_peers = ntohl (msg->num_peers);
3888
3889   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3890   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3891        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3892   {
3893     LOG (GNUNET_ERROR_TYPE_ERROR,
3894         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3895         ntohl (msg->num_peers),
3896         (msize / sizeof (struct GNUNET_PeerIdentity)));
3897     GNUNET_break (0);
3898     GNUNET_SERVICE_client_drop (cli_ctx->client);
3899     return GNUNET_SYSERR;
3900   }
3901   return GNUNET_OK;
3902 }
3903
3904 /**
3905  * Turn RPS service to act malicious.
3906  *
3907  * @param cls Closure
3908  * @param client The client that sent the message
3909  * @param msg The message header
3910  */
3911 static void
3912 handle_client_act_malicious (void *cls,
3913                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3914 {
3915   struct ClientContext *cli_ctx = cls;
3916   struct GNUNET_PeerIdentity *peers;
3917   uint32_t num_mal_peers_sent;
3918   uint32_t num_mal_peers_old;
3919   struct Sub *sub = cli_ctx->sub;
3920
3921   if (NULL == sub) sub = msub;
3922   /* Do actual logic */
3923   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3924   mal_type = ntohl (msg->type);
3925   if (NULL == mal_peer_set)
3926     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3927
3928   LOG (GNUNET_ERROR_TYPE_DEBUG,
3929        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3930        mal_type,
3931        ntohl (msg->num_peers));
3932
3933   if (1 == mal_type)
3934   { /* Try to maximise representation */
3935     /* Add other malicious peers to those we already know */
3936
3937     num_mal_peers_sent = ntohl (msg->num_peers);
3938     num_mal_peers_old = num_mal_peers;
3939     GNUNET_array_grow (mal_peers,
3940                        num_mal_peers,
3941                        num_mal_peers + num_mal_peers_sent);
3942     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3943             peers,
3944             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3945
3946     /* Add all mal peers to mal_peer_set */
3947     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3948                            num_mal_peers_sent,
3949                            mal_peer_set);
3950
3951     /* Substitute do_round () with do_mal_round () */
3952     GNUNET_assert (NULL != sub->do_round_task);
3953     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3954     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3955   }
3956
3957   else if ( (2 == mal_type) ||
3958             (3 == mal_type) )
3959   { /* Try to partition the network */
3960     /* Add other malicious peers to those we already know */
3961
3962     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3963     num_mal_peers_old = num_mal_peers;
3964     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3965     GNUNET_array_grow (mal_peers,
3966                        num_mal_peers,
3967                        num_mal_peers + num_mal_peers_sent);
3968     if (NULL != mal_peers &&
3969         0 != num_mal_peers)
3970     {
3971       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3972               peers,
3973               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3974
3975       /* Add all mal peers to mal_peer_set */
3976       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3977                              num_mal_peers_sent,
3978                              mal_peer_set);
3979     }
3980
3981     /* Store the one attacked peer */
3982     GNUNET_memcpy (&attacked_peer,
3983             &msg->attacked_peer,
3984             sizeof (struct GNUNET_PeerIdentity));
3985     /* Set the flag of the attacked peer to valid to avoid problems */
3986     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3987     {
3988       (void) issue_peer_online_check (sub, &attacked_peer);
3989     }
3990
3991     LOG (GNUNET_ERROR_TYPE_DEBUG,
3992          "Attacked peer is %s\n",
3993          GNUNET_i2s (&attacked_peer));
3994
3995     /* Substitute do_round () with do_mal_round () */
3996     if (NULL != sub->do_round_task)
3997     {
3998       /* Probably in shutdown */
3999       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4000       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4001     }
4002   }
4003   else if (0 == mal_type)
4004   { /* Stop acting malicious */
4005     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4006
4007     /* Substitute do_mal_round () with do_round () */
4008     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4009     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4010   }
4011   else
4012   {
4013     GNUNET_break (0);
4014     GNUNET_SERVICE_client_continue (cli_ctx->client);
4015   }
4016   GNUNET_SERVICE_client_continue (cli_ctx->client);
4017 }
4018
4019
4020 /**
4021  * Send out PUSHes and PULLs maliciously.
4022  *
4023  * This is executed regylary.
4024  *
4025  * @param cls Closure - Sub
4026  */
4027 static void
4028 do_mal_round (void *cls)
4029 {
4030   uint32_t num_pushes;
4031   uint32_t i;
4032   struct GNUNET_TIME_Relative time_next_round;
4033   struct AttackedPeer *tmp_att_peer;
4034   struct Sub *sub = cls;
4035
4036   LOG (GNUNET_ERROR_TYPE_DEBUG,
4037        "Going to execute next round maliciously type %" PRIu32 ".\n",
4038       mal_type);
4039   sub->do_round_task = NULL;
4040   GNUNET_assert (mal_type <= 3);
4041   /* Do malicious actions */
4042   if (1 == mal_type)
4043   { /* Try to maximise representation */
4044
4045     /* The maximum of pushes we're going to send this round */
4046     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4047                                          num_attacked_peers),
4048                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4049
4050     LOG (GNUNET_ERROR_TYPE_DEBUG,
4051          "Going to send %" PRIu32 " pushes\n",
4052          num_pushes);
4053
4054     /* Send PUSHes to attacked peers */
4055     for (i = 0 ; i < num_pushes ; i++)
4056     {
4057       if (att_peers_tail == att_peer_index)
4058         att_peer_index = att_peers_head;
4059       else
4060         att_peer_index = att_peer_index->next;
4061
4062       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4063     }
4064
4065     /* Send PULLs to some peers to learn about additional peers to attack */
4066     tmp_att_peer = att_peer_index;
4067     for (i = 0 ; i < num_pushes * alpha ; i++)
4068     {
4069       if (att_peers_tail == tmp_att_peer)
4070         tmp_att_peer = att_peers_head;
4071       else
4072         att_peer_index = tmp_att_peer->next;
4073
4074       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4075     }
4076   }
4077
4078
4079   else if (2 == mal_type)
4080   { /**
4081      * Try to partition the network
4082      * Send as many pushes to the attacked peer as possible
4083      * That is one push per round as it will ignore more.
4084      */
4085     (void) issue_peer_online_check (sub, &attacked_peer);
4086     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4087                                        &attacked_peer,
4088                                        Peers_ONLINE))
4089       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4090   }
4091
4092
4093   if (3 == mal_type)
4094   { /* Combined attack */
4095
4096     /* Send PUSH to attacked peers */
4097     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4098     {
4099       (void) issue_peer_online_check (sub, &attacked_peer);
4100       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4101                                          &attacked_peer,
4102                                          Peers_ONLINE))
4103       {
4104         LOG (GNUNET_ERROR_TYPE_DEBUG,
4105             "Goding to send push to attacked peer (%s)\n",
4106             GNUNET_i2s (&attacked_peer));
4107         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4108       }
4109     }
4110     (void) issue_peer_online_check (sub, &attacked_peer);
4111
4112     /* The maximum of pushes we're going to send this round */
4113     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4114                                          num_attacked_peers),
4115                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4116
4117     LOG (GNUNET_ERROR_TYPE_DEBUG,
4118          "Going to send %" PRIu32 " pushes\n",
4119          num_pushes);
4120
4121     for (i = 0; i < num_pushes; i++)
4122     {
4123       if (att_peers_tail == att_peer_index)
4124         att_peer_index = att_peers_head;
4125       else
4126         att_peer_index = att_peer_index->next;
4127
4128       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4129     }
4130
4131     /* Send PULLs to some peers to learn about additional peers to attack */
4132     tmp_att_peer = att_peer_index;
4133     for (i = 0; i < num_pushes * alpha; i++)
4134     {
4135       if (att_peers_tail == tmp_att_peer)
4136         tmp_att_peer = att_peers_head;
4137       else
4138         att_peer_index = tmp_att_peer->next;
4139
4140       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4141     }
4142   }
4143
4144   /* Schedule next round */
4145   time_next_round = compute_rand_delay (sub->round_interval, 2);
4146
4147   GNUNET_assert (NULL == sub->do_round_task);
4148   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4149                                                     &do_mal_round, sub);
4150   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4151 }
4152 #endif /* ENABLE_MALICIOUS */
4153
4154
4155 /**
4156  * Send out PUSHes and PULLs, possibly update #view, samplers.
4157  *
4158  * This is executed regylary.
4159  *
4160  * @param cls Closure - Sub
4161  */
4162 static void
4163 do_round (void *cls)
4164 {
4165   unsigned int i;
4166   const struct GNUNET_PeerIdentity *view_array;
4167   unsigned int *permut;
4168   unsigned int a_peers; /* Number of peers we send pushes to */
4169   unsigned int b_peers; /* Number of peers we send pull requests to */
4170   uint32_t first_border;
4171   uint32_t second_border;
4172   struct GNUNET_PeerIdentity peer;
4173   struct GNUNET_PeerIdentity *update_peer;
4174   struct Sub *sub = cls;
4175
4176   sub->num_rounds++;
4177   LOG (GNUNET_ERROR_TYPE_DEBUG,
4178        "Going to execute next round.\n");
4179   if (sub == msub)
4180   {
4181     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4182   }
4183   sub->do_round_task = NULL;
4184   LOG (GNUNET_ERROR_TYPE_DEBUG,
4185        "Printing view:\n");
4186   to_file (sub->file_name_view_log,
4187            "___ new round ___");
4188   view_array = View_get_as_array (sub->view);
4189   for (i = 0; i < View_size (sub->view); i++)
4190   {
4191     LOG (GNUNET_ERROR_TYPE_DEBUG,
4192          "\t%s\n", GNUNET_i2s (&view_array[i]));
4193     to_file (sub->file_name_view_log,
4194              "=%s\t(do round)",
4195              GNUNET_i2s_full (&view_array[i]));
4196   }
4197
4198
4199   /* Send pushes and pull requests */
4200   if (0 < View_size (sub->view))
4201   {
4202     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4203                                            View_size (sub->view));
4204
4205     /* Send PUSHes */
4206     a_peers = ceil (alpha * View_size (sub->view));
4207
4208     LOG (GNUNET_ERROR_TYPE_DEBUG,
4209          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4210          a_peers, alpha, View_size (sub->view));
4211     for (i = 0; i < a_peers; i++)
4212     {
4213       peer = view_array[permut[i]];
4214       // FIXME if this fails schedule/loop this for later
4215       send_push (get_peer_ctx (sub->peer_map, &peer));
4216     }
4217
4218     /* Send PULL requests */
4219     b_peers = ceil (beta * View_size (sub->view));
4220     first_border = a_peers;
4221     second_border = a_peers + b_peers;
4222     if (second_border > View_size (sub->view))
4223     {
4224       first_border = View_size (sub->view) - b_peers;
4225       second_border = View_size (sub->view);
4226     }
4227     LOG (GNUNET_ERROR_TYPE_DEBUG,
4228         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4229         b_peers, beta, View_size (sub->view));
4230     for (i = first_border; i < second_border; i++)
4231     {
4232       peer = view_array[permut[i]];
4233       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4234                                          &peer,
4235                                          Peers_PULL_REPLY_PENDING))
4236       { // FIXME if this fails schedule/loop this for later
4237         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4238       }
4239     }
4240
4241     GNUNET_free (permut);
4242     permut = NULL;
4243   }
4244
4245
4246   /* Update view */
4247   /* TODO see how many peers are in push-/pull- list! */
4248
4249   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4250       (0 < CustomPeerMap_size (sub->push_map)) &&
4251       (0 < CustomPeerMap_size (sub->pull_map)))
4252   { /* If conditions for update are fulfilled, update */
4253     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4254
4255     uint32_t final_size;
4256     uint32_t peers_to_clean_size;
4257     struct GNUNET_PeerIdentity *peers_to_clean;
4258
4259     peers_to_clean = NULL;
4260     peers_to_clean_size = 0;
4261     GNUNET_array_grow (peers_to_clean,
4262                        peers_to_clean_size,
4263                        View_size (sub->view));
4264     GNUNET_memcpy (peers_to_clean,
4265             view_array,
4266             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4267
4268     /* Seems like recreating is the easiest way of emptying the peermap */
4269     View_clear (sub->view);
4270     to_file (sub->file_name_view_log,
4271              "--- emptied ---");
4272
4273     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4274                                 CustomPeerMap_size (sub->push_map));
4275     second_border = first_border +
4276                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4277                                 CustomPeerMap_size (sub->pull_map));
4278     final_size    = second_border +
4279       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4280     LOG (GNUNET_ERROR_TYPE_DEBUG,
4281         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4282         first_border,
4283         second_border,
4284         final_size);
4285
4286     /* Update view with peers received through PUSHes */
4287     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4288                                            CustomPeerMap_size (sub->push_map));
4289     for (i = 0; i < first_border; i++)
4290     {
4291       int inserted;
4292       inserted = insert_in_view (sub,
4293                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4294                                                                   permut[i]));
4295       if (GNUNET_OK == inserted)
4296       {
4297         clients_notify_stream_peer (sub,
4298             1,
4299             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4300       }
4301       to_file (sub->file_name_view_log,
4302                "+%s\t(push list)",
4303                GNUNET_i2s_full (&view_array[i]));
4304       // TODO change the peer_flags accordingly
4305     }
4306     GNUNET_free (permut);
4307     permut = NULL;
4308
4309     /* Update view with peers received through PULLs */
4310     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4311                                            CustomPeerMap_size (sub->pull_map));
4312     for (i = first_border; i < second_border; i++)
4313     {
4314       int inserted;
4315       inserted = insert_in_view (sub,
4316           CustomPeerMap_get_peer_by_index (sub->pull_map,
4317                                            permut[i - first_border]));
4318       if (GNUNET_OK == inserted)
4319       {
4320         clients_notify_stream_peer (sub,
4321             1,
4322             CustomPeerMap_get_peer_by_index (sub->pull_map,
4323                                              permut[i - first_border]));
4324       }
4325       to_file (sub->file_name_view_log,
4326                "+%s\t(pull list)",
4327                GNUNET_i2s_full (&view_array[i]));
4328       // TODO change the peer_flags accordingly
4329     }
4330     GNUNET_free (permut);
4331     permut = NULL;
4332
4333     /* Update view with peers from history */
4334     RPS_sampler_get_n_rand_peers (sub->sampler,
4335                                   final_size - second_border,
4336                                   hist_update,
4337                                   sub);
4338     // TODO change the peer_flags accordingly
4339
4340     for (i = 0; i < View_size (sub->view); i++)
4341       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4342
4343     /* Clean peers that were removed from the view */
4344     for (i = 0; i < peers_to_clean_size; i++)
4345     {
4346       to_file (sub->file_name_view_log,
4347                "-%s",
4348                GNUNET_i2s_full (&peers_to_clean[i]));
4349       clean_peer (sub, &peers_to_clean[i]);
4350     }
4351
4352     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4353     clients_notify_view_update (sub);
4354   } else {
4355     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4356     if (sub == msub)
4357     {
4358       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4359       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4360           !(0 >= CustomPeerMap_size (sub->pull_map)))
4361         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4362       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4363           (0 >= CustomPeerMap_size (sub->pull_map)))
4364         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4365       if (0 >= CustomPeerMap_size (sub->push_map) &&
4366           !(0 >= CustomPeerMap_size (sub->pull_map)))
4367         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4368       if (0 >= CustomPeerMap_size (sub->push_map) &&
4369           (0 >= CustomPeerMap_size (sub->pull_map)))
4370         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4371       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4372           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4373           0 >= CustomPeerMap_size (sub->push_map))
4374         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4375     }
4376   }
4377   // TODO independent of that also get some peers from CADET_get_peers()?
4378   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4379   {
4380     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4381   }
4382   else
4383   {
4384     LOG (GNUNET_ERROR_TYPE_WARNING,
4385          "Push map size too big for histogram (%u, %u)\n",
4386          CustomPeerMap_size (sub->push_map),
4387          HISTOGRAM_FILE_SLOTS);
4388   }
4389   // FIXME check bounds of histogram
4390   sub->push_delta[(CustomPeerMap_size (sub->push_map) -
4391                    sub->view_size_est_need) +
4392                           (HISTOGRAM_FILE_SLOTS/2)]++;
4393   if (sub == msub)
4394   {
4395     GNUNET_STATISTICS_set (stats,
4396         "# peers in push map at end of round",
4397         CustomPeerMap_size (sub->push_map),
4398         GNUNET_NO);
4399     GNUNET_STATISTICS_set (stats,
4400         "# peers in pull map at end of round",
4401         CustomPeerMap_size (sub->pull_map),
4402         GNUNET_NO);
4403     GNUNET_STATISTICS_set (stats,
4404         "# peers in view at end of round",
4405         View_size (sub->view),
4406         GNUNET_NO);
4407   }
4408
4409   LOG (GNUNET_ERROR_TYPE_DEBUG,
4410        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4411        CustomPeerMap_size (sub->push_map),
4412        CustomPeerMap_size (sub->pull_map),
4413        alpha,
4414        View_size (sub->view),
4415        alpha * View_size (sub->view));
4416
4417   /* Update samplers */
4418   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4419   {
4420     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4421     LOG (GNUNET_ERROR_TYPE_DEBUG,
4422          "Updating with peer %s from push list\n",
4423          GNUNET_i2s (update_peer));
4424     insert_in_sampler (sub, update_peer);
4425     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4426   }
4427
4428   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4429   {
4430     LOG (GNUNET_ERROR_TYPE_DEBUG,
4431          "Updating with peer %s from pull list\n",
4432          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4433     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4434     /* This cleans only if it is not in the view */
4435     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4436   }
4437
4438
4439   /* Empty push/pull lists */
4440   CustomPeerMap_clear (sub->push_map);
4441   CustomPeerMap_clear (sub->pull_map);
4442
4443   if (sub == msub)
4444   {
4445     GNUNET_STATISTICS_set (stats,
4446                            "view size",
4447                            View_size(sub->view),
4448                            GNUNET_NO);
4449   }
4450
4451   struct GNUNET_TIME_Relative time_next_round;
4452
4453   time_next_round = compute_rand_delay (sub->round_interval, 2);
4454
4455   /* Schedule next round */
4456   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4457                                                      &do_round, sub);
4458   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4459 }
4460
4461
4462 /**
4463  * This is called from GNUNET_CADET_get_peers().
4464  *
4465  * It is called on every peer(ID) that cadet somehow has contact with.
4466  * We use those to initialise the sampler.
4467  *
4468  * implements #GNUNET_CADET_PeersCB
4469  *
4470  * @param cls Closure - Sub
4471  * @param peer Peer, or NULL on "EOF".
4472  * @param tunnel Do we have a tunnel towards this peer?
4473  * @param n_paths Number of known paths towards this peer.
4474  * @param best_path How long is the best path?
4475  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4476  */
4477 void
4478 init_peer_cb (void *cls,
4479               const struct GNUNET_PeerIdentity *peer,
4480               int tunnel, /* "Do we have a tunnel towards this peer?" */
4481               unsigned int n_paths, /* "Number of known paths towards this peer" */
4482               unsigned int best_path) /* "How long is the best path?
4483                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4484 {
4485   struct Sub *sub = cls;
4486   (void) tunnel;
4487   (void) n_paths;
4488   (void) best_path;
4489
4490   if (NULL != peer)
4491   {
4492     LOG (GNUNET_ERROR_TYPE_DEBUG,
4493          "Got peer_id %s from cadet\n",
4494          GNUNET_i2s (peer));
4495     got_peer (sub, peer);
4496   }
4497 }
4498
4499
4500 /**
4501  * @brief Iterator function over stored, valid peers.
4502  *
4503  * We initialise the sampler with those.
4504  *
4505  * @param cls Closure - Sub
4506  * @param peer the peer id
4507  * @return #GNUNET_YES if we should continue to
4508  *         iterate,
4509  *         #GNUNET_NO if not.
4510  */
4511 static int
4512 valid_peers_iterator (void *cls,
4513                       const struct GNUNET_PeerIdentity *peer)
4514 {
4515   struct Sub *sub = cls;
4516
4517   if (NULL != peer)
4518   {
4519     LOG (GNUNET_ERROR_TYPE_DEBUG,
4520          "Got stored, valid peer %s\n",
4521          GNUNET_i2s (peer));
4522     got_peer (sub, peer);
4523   }
4524   return GNUNET_YES;
4525 }
4526
4527
4528 /**
4529  * Iterator over peers from peerinfo.
4530  *
4531  * @param cls Closure - Sub
4532  * @param peer id of the peer, NULL for last call
4533  * @param hello hello message for the peer (can be NULL)
4534  * @param error message
4535  */
4536 void
4537 process_peerinfo_peers (void *cls,
4538                         const struct GNUNET_PeerIdentity *peer,
4539                         const struct GNUNET_HELLO_Message *hello,
4540                         const char *err_msg)
4541 {
4542   struct Sub *sub = cls;
4543   (void) hello;
4544   (void) err_msg;
4545
4546   if (NULL != peer)
4547   {
4548     LOG (GNUNET_ERROR_TYPE_DEBUG,
4549          "Got peer_id %s from peerinfo\n",
4550          GNUNET_i2s (peer));
4551     got_peer (sub, peer);
4552   }
4553 }
4554
4555
4556 /**
4557  * Task run during shutdown.
4558  *
4559  * @param cls Closure - unused
4560  */
4561 static void
4562 shutdown_task (void *cls)
4563 {
4564   (void) cls;
4565   struct ClientContext *client_ctx;
4566
4567   LOG (GNUNET_ERROR_TYPE_DEBUG,
4568        "RPS service is going down\n");
4569
4570   /* Clean all clients */
4571   for (client_ctx = cli_ctx_head;
4572        NULL != cli_ctx_head;
4573        client_ctx = cli_ctx_head)
4574   {
4575     destroy_cli_ctx (client_ctx);
4576   }
4577   if (NULL != msub)
4578   {
4579     destroy_sub (msub);
4580     msub = NULL;
4581   }
4582
4583   /* Disconnect from other services */
4584   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4585   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4586   peerinfo_handle = NULL;
4587   GNUNET_NSE_disconnect (nse);
4588   if (NULL != map_single_hop)
4589   {
4590     /* core_init was called - core was initialised */
4591     /* disconnect first, so no callback tries to access missing peermap */
4592     GNUNET_CORE_disconnect (core_handle);
4593     core_handle = NULL;
4594     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4595     map_single_hop = NULL;
4596   }
4597
4598   if (NULL != stats)
4599   {
4600     GNUNET_STATISTICS_destroy (stats,
4601                                GNUNET_NO);
4602     stats = NULL;
4603   }
4604   GNUNET_CADET_disconnect (cadet_handle);
4605   cadet_handle = NULL;
4606 #if ENABLE_MALICIOUS
4607   struct AttackedPeer *tmp_att_peer;
4608   GNUNET_array_grow (mal_peers,
4609                      num_mal_peers,
4610                      0);
4611   if (NULL != mal_peer_set)
4612     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4613   if (NULL != att_peer_set)
4614     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4615   while (NULL != att_peers_head)
4616   {
4617     tmp_att_peer = att_peers_head;
4618     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4619                                  att_peers_tail,
4620                                  tmp_att_peer);
4621     GNUNET_free (tmp_att_peer);
4622   }
4623 #endif /* ENABLE_MALICIOUS */
4624   close_all_files();
4625 }
4626
4627
4628 /**
4629  * Handle client connecting to the service.
4630  *
4631  * @param cls unused
4632  * @param client the new client
4633  * @param mq the message queue of @a client
4634  * @return @a client
4635  */
4636 static void *
4637 client_connect_cb (void *cls,
4638                    struct GNUNET_SERVICE_Client *client,
4639                    struct GNUNET_MQ_Handle *mq)
4640 {
4641   struct ClientContext *cli_ctx;
4642   (void) cls;
4643
4644   LOG (GNUNET_ERROR_TYPE_DEBUG,
4645        "Client connected\n");
4646   if (NULL == client)
4647     return client; /* Server was destroyed before a client connected. Shutting down */
4648   cli_ctx = GNUNET_new (struct ClientContext);
4649   cli_ctx->mq = mq;
4650   cli_ctx->view_updates_left = -1;
4651   cli_ctx->stream_update = GNUNET_NO;
4652   cli_ctx->client = client;
4653   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4654                                cli_ctx_tail,
4655                                cli_ctx);
4656   return cli_ctx;
4657 }
4658
4659 /**
4660  * Callback called when a client disconnected from the service
4661  *
4662  * @param cls closure for the service
4663  * @param c the client that disconnected
4664  * @param internal_cls should be equal to @a c
4665  */
4666 static void
4667 client_disconnect_cb (void *cls,
4668                       struct GNUNET_SERVICE_Client *client,
4669                       void *internal_cls)
4670 {
4671   struct ClientContext *cli_ctx = internal_cls;
4672
4673   (void) cls;
4674   GNUNET_assert (client == cli_ctx->client);
4675   if (NULL == client)
4676   {/* shutdown task - destroy all clients */
4677     while (NULL != cli_ctx_head)
4678       destroy_cli_ctx (cli_ctx_head);
4679   }
4680   else
4681   { /* destroy this client */
4682     LOG (GNUNET_ERROR_TYPE_DEBUG,
4683         "Client disconnected. Destroy its context.\n");
4684     destroy_cli_ctx (cli_ctx);
4685   }
4686 }
4687
4688
4689 /**
4690  * Handle random peer sampling clients.
4691  *
4692  * @param cls closure
4693  * @param c configuration to use
4694  * @param service the initialized service
4695  */
4696 static void
4697 run (void *cls,
4698      const struct GNUNET_CONFIGURATION_Handle *c,
4699      struct GNUNET_SERVICE_Handle *service)
4700 {
4701   struct GNUNET_TIME_Relative round_interval;
4702   long long unsigned int sampler_size;
4703   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4704   struct GNUNET_HashCode hash;
4705
4706   (void) cls;
4707   (void) service;
4708
4709   GNUNET_log_setup ("rps",
4710                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4711                     NULL);
4712   cfg = c;
4713   /* Get own ID */
4714   GNUNET_CRYPTO_get_peer_identity (cfg,
4715                                    &own_identity); // TODO check return value
4716   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4717               "STARTING SERVICE (rps) for peer [%s]\n",
4718               GNUNET_i2s (&own_identity));
4719 #if ENABLE_MALICIOUS
4720   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4721               "Malicious execution compiled in.\n");
4722 #endif /* ENABLE_MALICIOUS */
4723
4724   /* Get time interval from the configuration */
4725   if (GNUNET_OK !=
4726       GNUNET_CONFIGURATION_get_value_time (cfg,
4727                                            "RPS",
4728                                            "ROUNDINTERVAL",
4729                                            &round_interval))
4730   {
4731     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4732                                "RPS", "ROUNDINTERVAL");
4733     GNUNET_SCHEDULER_shutdown ();
4734     return;
4735   }
4736
4737   /* Get initial size of sampler/view from the configuration */
4738   if (GNUNET_OK !=
4739       GNUNET_CONFIGURATION_get_value_number (cfg,
4740                                              "RPS",
4741                                              "MINSIZE",
4742                                              &sampler_size))
4743   {
4744     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4745                                "RPS", "MINSIZE");
4746     GNUNET_SCHEDULER_shutdown ();
4747     return;
4748   }
4749
4750   cadet_handle = GNUNET_CADET_connect (cfg);
4751   GNUNET_assert (NULL != cadet_handle);
4752   core_handle = GNUNET_CORE_connect (cfg,
4753                                      NULL, /* cls */
4754                                      core_init, /* init */
4755                                      core_connects, /* connects */
4756                                      core_disconnects, /* disconnects */
4757                                      NULL); /* handlers */
4758   GNUNET_assert (NULL != core_handle);
4759
4760
4761   alpha = 0.45;
4762   beta  = 0.45;
4763
4764
4765   /* Set up main Sub */
4766   GNUNET_CRYPTO_hash (hash_port_string,
4767                       strlen (hash_port_string),
4768                       &hash);
4769   msub = new_sub (&hash,
4770                  sampler_size, /* Will be overwritten by config */
4771                  round_interval);
4772
4773
4774   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4775
4776   /* connect to NSE */
4777   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4778
4779   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4780   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4781   // TODO send push/pull to each of those peers?
4782   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4783   restore_valid_peers (msub);
4784   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4785
4786   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4787                                                    GNUNET_NO,
4788                                                    process_peerinfo_peers,
4789                                                    msub);
4790
4791   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4792
4793   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4794   stats = GNUNET_STATISTICS_create ("rps", cfg);
4795 }
4796
4797
4798 /**
4799  * Define "main" method using service macro.
4800  */
4801 GNUNET_SERVICE_MAIN
4802 ("rps",
4803  GNUNET_SERVICE_OPTION_NONE,
4804  &run,
4805  &client_connect_cb,
4806  &client_disconnect_cb,
4807  NULL,
4808  GNUNET_MQ_hd_var_size (client_seed,
4809    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4810    struct GNUNET_RPS_CS_SeedMessage,
4811    NULL),
4812 #if ENABLE_MALICIOUS
4813  GNUNET_MQ_hd_var_size (client_act_malicious,
4814    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4815    struct GNUNET_RPS_CS_ActMaliciousMessage,
4816    NULL),
4817 #endif /* ENABLE_MALICIOUS */
4818  GNUNET_MQ_hd_fixed_size (client_view_request,
4819    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4820    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4821    NULL),
4822  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4823    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4824    struct GNUNET_MessageHeader,
4825    NULL),
4826  GNUNET_MQ_hd_fixed_size (client_stream_request,
4827    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4828    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4829    NULL),
4830  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4831    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4832    struct GNUNET_MessageHeader,
4833    NULL),
4834  GNUNET_MQ_hd_fixed_size (client_start_sub,
4835    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4836    struct GNUNET_RPS_CS_SubStartMessage,
4837    NULL),
4838  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4839    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4840    struct GNUNET_RPS_CS_SubStopMessage,
4841    NULL),
4842  GNUNET_MQ_handler_end());
4843
4844 /* end of gnunet-service-rps.c */