RPS: Cast to int32_t instead of uint32_t
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357   /**
358    * Name to log view to
359    */
360   char *file_name_view_log;
361
362 #ifdef TO_FILE
363   /**
364    * Name to log number of observed peers to
365    */
366   char *file_name_observed_log;
367
368   /**
369    * @brief Count the observed peers
370    */
371   uint32_t num_observed_peers;
372
373   /**
374    * @brief Multipeermap (ab-) used to count unique peer_ids
375    */
376   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
377 #endif /* TO_FILE */
378
379   /**
380    * List to store peers received through pushes temporary.
381    */
382   struct CustomPeerMap *push_map;
383
384   /**
385    * List to store peers received through pulls temporary.
386    */
387   struct CustomPeerMap *pull_map;
388
389   /**
390    * @brief This is the estimate used as view size.
391    *
392    * It is initialised with the minimum
393    */
394   unsigned int view_size_est_need;
395
396   /**
397    * @brief This is the minimum estimate used as view size.
398    *
399    * It is configured by the user.
400    */
401   unsigned int view_size_est_min;
402
403   /**
404    * @brief The view.
405    */
406   struct View *view;
407
408   /**
409    * Identifier for the main task that runs periodically.
410    */
411   struct GNUNET_SCHEDULER_Task *do_round_task;
412
413   /* === stats === */
414
415   /**
416    * @brief Counts the executed rounds.
417    */
418   uint32_t num_rounds;
419
420   /**
421    * @brief This array accumulates the number of received pushes per round.
422    *
423    * Number at index i represents the number of rounds with i observed pushes.
424    */
425   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
426
427   /**
428    * @brief Histogram of deltas between the expected and actual number of
429    * received pushes.
430    *
431    * As half of the entries are expected to be negative, this is shifted by
432    * #HISTOGRAM_FILE_SLOTS/2.
433    */
434   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
435
436   /**
437    * @brief Number of pull replies with this delay measured in rounds.
438    *
439    * Number at index i represents the number of pull replies with a delay of i
440    * rounds.
441    */
442   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
443 };
444
445
446 /***********************************************************************
447  * Globals
448 ***********************************************************************/
449
450 /**
451  * Our configuration.
452  */
453 static const struct GNUNET_CONFIGURATION_Handle *cfg;
454
455 /**
456  * Handle to the statistics service.
457  */
458 struct GNUNET_STATISTICS_Handle *stats;
459
460 /**
461  * Handler to CADET.
462  */
463 struct GNUNET_CADET_Handle *cadet_handle;
464
465 /**
466  * Handle to CORE
467  */
468 struct GNUNET_CORE_Handle *core_handle;
469
470 /**
471  * @brief PeerMap to keep track of connected peers.
472  */
473 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
474
475 /**
476  * Our own identity.
477  */
478 static struct GNUNET_PeerIdentity own_identity;
479
480 /**
481  * Percentage of total peer number in the view
482  * to send random PUSHes to
483  */
484 static float alpha;
485
486 /**
487  * Percentage of total peer number in the view
488  * to send random PULLs to
489  */
490 static float beta;
491
492 /**
493  * Handler to NSE.
494  */
495 static struct GNUNET_NSE_Handle *nse;
496
497 /**
498  * Handler to PEERINFO.
499  */
500 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
501
502 /**
503  * Handle for cancellation of iteration over peers.
504  */
505 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
506
507
508 #if ENABLE_MALICIOUS
509 /**
510  * Type of malicious peer
511  *
512  * 0 Don't act malicious at all - Default
513  * 1 Try to maximise representation
514  * 2 Try to partition the network
515  * 3 Combined attack
516  */
517 static uint32_t mal_type;
518
519 /**
520  * Other malicious peers
521  */
522 static struct GNUNET_PeerIdentity *mal_peers;
523
524 /**
525  * Hashmap of malicious peers used as set.
526  * Used to more efficiently check whether we know that peer.
527  */
528 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
529
530 /**
531  * Number of other malicious peers
532  */
533 static uint32_t num_mal_peers;
534
535
536 /**
537  * If type is 2 this is the DLL of attacked peers
538  */
539 static struct AttackedPeer *att_peers_head;
540 static struct AttackedPeer *att_peers_tail;
541
542 /**
543  * This index is used to point to an attacked peer to
544  * implement the round-robin-ish way to select attacked peers.
545  */
546 static struct AttackedPeer *att_peer_index;
547
548 /**
549  * Hashmap of attacked peers used as set.
550  * Used to more efficiently check whether we know that peer.
551  */
552 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
553
554 /**
555  * Number of attacked peers
556  */
557 static uint32_t num_attacked_peers;
558
559 /**
560  * If type is 1 this is the attacked peer
561  */
562 static struct GNUNET_PeerIdentity attacked_peer;
563
564 /**
565  * The limit of PUSHes we can send in one round.
566  * This is an assumption of the Brahms protocol and either implemented
567  * via proof of work
568  * or
569  * assumend to be the bandwidth limitation.
570  */
571 static uint32_t push_limit = 10000;
572 #endif /* ENABLE_MALICIOUS */
573
574 /**
575  * @brief Main Sub.
576  *
577  * This is run in any case by all peers and connects to all peers without
578  * specifying a shared value.
579  */
580 static struct Sub *msub;
581
582 /**
583  * @brief Maximum number of valid peers to keep.
584  * TODO read from config
585  */
586 static const uint32_t num_valid_peers_max = UINT32_MAX;
587
588 /***********************************************************************
589  * /Globals
590 ***********************************************************************/
591
592
593 static void
594 do_round (void *cls);
595
596 static void
597 do_mal_round (void *cls);
598
599
600 /**
601  * @brief Get the #PeerContext associated with a peer
602  *
603  * @param peer_map The peer map containing the context
604  * @param peer the peer id
605  *
606  * @return the #PeerContext
607  */
608 static struct PeerContext *
609 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
610               const struct GNUNET_PeerIdentity *peer)
611 {
612   struct PeerContext *ctx;
613   int ret;
614
615   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
616   GNUNET_assert (GNUNET_YES == ret);
617   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
618   GNUNET_assert (NULL != ctx);
619   return ctx;
620 }
621
622 /**
623  * @brief Check whether we have information about the given peer.
624  *
625  * FIXME probably deprecated. Make this the new _online.
626  *
627  * @param peer_map The peer map to check for the existence of @a peer
628  * @param peer peer in question
629  *
630  * @return #GNUNET_YES if peer is known
631  *         #GNUNET_NO  if peer is not knwon
632  */
633 static int
634 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
635                   const struct GNUNET_PeerIdentity *peer)
636 {
637   if (NULL != peer_map)
638   {
639     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
640   }
641   else
642   {
643     return GNUNET_NO;
644   }
645 }
646
647
648 /**
649  * @brief Create a new #PeerContext and insert it into the peer map
650  *
651  * @param sub The Sub this context belongs to.
652  * @param peer the peer to create the #PeerContext for
653  *
654  * @return the #PeerContext
655  */
656 static struct PeerContext *
657 create_peer_ctx (struct Sub *sub,
658                  const struct GNUNET_PeerIdentity *peer)
659 {
660   struct PeerContext *ctx;
661   int ret;
662
663   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
664
665   ctx = GNUNET_new (struct PeerContext);
666   ctx->peer_id = *peer;
667   ctx->sub = sub;
668   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
669       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
670   GNUNET_assert (GNUNET_OK == ret);
671   if (sub == msub)
672   {
673     GNUNET_STATISTICS_set (stats,
674                           "# known peers",
675                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
676                           GNUNET_NO);
677   }
678   return ctx;
679 }
680
681
682 /**
683  * @brief Create or get a #PeerContext
684  *
685  * @param sub The Sub to which the created context belongs to
686  * @param peer the peer to get the associated context to
687  *
688  * @return the context
689  */
690 static struct PeerContext *
691 create_or_get_peer_ctx (struct Sub *sub,
692                         const struct GNUNET_PeerIdentity *peer)
693 {
694   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
695   {
696     return create_peer_ctx (sub, peer);
697   }
698   return get_peer_ctx (sub->peer_map, peer);
699 }
700
701
702 /**
703  * @brief Check whether we have a connection to this @a peer
704  *
705  * Also sets the #Peers_ONLINE flag accordingly
706  *
707  * @param peer_ctx Context of the peer of which connectivity is to be checked
708  *
709  * @return #GNUNET_YES if we are connected
710  *         #GNUNET_NO  otherwise
711  */
712 static int
713 check_connected (struct PeerContext *peer_ctx)
714 {
715   /* If we don't know about this peer we don't know whether it's online */
716   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
717                                      &peer_ctx->peer_id))
718   {
719     return GNUNET_NO;
720   }
721   /* Get the context */
722   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
723   /* If we have no channel to this peer we don't know whether it's online */
724   if ( (NULL == peer_ctx->send_channel_ctx) &&
725        (NULL == peer_ctx->recv_channel_ctx) )
726   {
727     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
728     return GNUNET_NO;
729   }
730   /* Otherwise (if we have a channel, we know that it's online */
731   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732   return GNUNET_YES;
733 }
734
735
736 /**
737  * @brief The closure to #get_rand_peer_iterator.
738  */
739 struct GetRandPeerIteratorCls
740 {
741   /**
742    * @brief The index of the peer to return.
743    * Will be decreased until 0.
744    * Then current peer is returned.
745    */
746   uint32_t index;
747
748   /**
749    * @brief Pointer to peer to return.
750    */
751   const struct GNUNET_PeerIdentity *peer;
752 };
753
754
755 /**
756  * @brief Iterator function for #get_random_peer_from_peermap.
757  *
758  * Implements #GNUNET_CONTAINER_PeerMapIterator.
759  * Decreases the index until the index is null.
760  * Then returns the current peer.
761  *
762  * @param cls the #GetRandPeerIteratorCls containing index and peer
763  * @param peer current peer
764  * @param value unused
765  *
766  * @return  #GNUNET_YES if we should continue to
767  *          iterate,
768  *          #GNUNET_NO if not.
769  */
770 static int
771 get_rand_peer_iterator (void *cls,
772                         const struct GNUNET_PeerIdentity *peer,
773                         void *value)
774 {
775   struct GetRandPeerIteratorCls *iterator_cls = cls;
776   (void) value;
777
778   if (0 >= iterator_cls->index)
779   {
780     iterator_cls->peer = peer;
781     return GNUNET_NO;
782   }
783   iterator_cls->index--;
784   return GNUNET_YES;
785 }
786
787
788 /**
789  * @brief Get a random peer from @a peer_map
790  *
791  * @param valid_peers Peer map containing valid peers from which to select a
792  * random one
793  *
794  * @return a random peer
795  */
796 static const struct GNUNET_PeerIdentity *
797 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
798 {
799   struct GetRandPeerIteratorCls *iterator_cls;
800   const struct GNUNET_PeerIdentity *ret;
801
802   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
803   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
804       GNUNET_CONTAINER_multipeermap_size (valid_peers));
805   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
806                                                 get_rand_peer_iterator,
807                                                 iterator_cls);
808   ret = iterator_cls->peer;
809   GNUNET_free (iterator_cls);
810   return ret;
811 }
812
813
814 /**
815  * @brief Add a given @a peer to valid peers.
816  *
817  * If valid peers are already #num_valid_peers_max, delete a peer previously.
818  *
819  * @param peer The peer that is added to the valid peers.
820  * @param valid_peers Peer map of valid peers to which to add the @a peer
821  *
822  * @return #GNUNET_YES if no other peer had to be removed
823  *         #GNUNET_NO  otherwise
824  */
825 static int
826 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
827                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
828 {
829   const struct GNUNET_PeerIdentity *rand_peer;
830   int ret;
831
832   ret = GNUNET_YES;
833   /* Remove random peers until there is space for a new one */
834   while (num_valid_peers_max <=
835          GNUNET_CONTAINER_multipeermap_size (valid_peers))
836   {
837     rand_peer = get_random_peer_from_peermap (valid_peers);
838     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
839     ret = GNUNET_NO;
840   }
841   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
842       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
843   if (valid_peers == msub->valid_peers)
844   {
845     GNUNET_STATISTICS_set (stats,
846                            "# valid peers",
847                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
848                            GNUNET_NO);
849   }
850   return ret;
851 }
852
853 static void
854 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
855
856 /**
857  * @brief Set the peer flag to living and
858  *        call the pending operations on this peer.
859  *
860  * Also adds peer to #valid_peers.
861  *
862  * @param peer_ctx the #PeerContext of the peer to set online
863  */
864 static void
865 set_peer_online (struct PeerContext *peer_ctx)
866 {
867   struct GNUNET_PeerIdentity *peer;
868   unsigned int i;
869
870   peer = &peer_ctx->peer_id;
871   LOG (GNUNET_ERROR_TYPE_DEBUG,
872       "Peer %s is online and valid, calling %i pending operations on it\n",
873       GNUNET_i2s (peer),
874       peer_ctx->num_pending_ops);
875
876   if (NULL != peer_ctx->online_check_pending)
877   {
878     LOG (GNUNET_ERROR_TYPE_DEBUG,
879          "Removing pending online check for peer %s\n",
880          GNUNET_i2s (&peer_ctx->peer_id));
881     // TODO wait until cadet sets mq->cancel_impl
882     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
883     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
884     peer_ctx->online_check_pending = NULL;
885   }
886
887   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
888
889   /* Call pending operations */
890   for (i = 0; i < peer_ctx->num_pending_ops; i++)
891   {
892     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
893   }
894   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
895 }
896
897 static void
898 cleanup_destroyed_channel (void *cls,
899                            const struct GNUNET_CADET_Channel *channel);
900
901 /* Declaration of handlers */
902 static void
903 handle_peer_check (void *cls,
904                    const struct GNUNET_MessageHeader *msg);
905
906 static void
907 handle_peer_push (void *cls,
908                   const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_pull_request (void *cls,
912                           const struct GNUNET_MessageHeader *msg);
913
914 static int
915 check_peer_pull_reply (void *cls,
916                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
917
918 static void
919 handle_peer_pull_reply (void *cls,
920                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 /* End declaration of handlers */
923
924 /**
925  * @brief Allocate memory for a new channel context and insert it into DLL
926  *
927  * @param peer_ctx context of the according peer
928  *
929  * @return The channel context
930  */
931 static struct ChannelCtx *
932 add_channel_ctx (struct PeerContext *peer_ctx)
933 {
934   struct ChannelCtx *channel_ctx;
935   channel_ctx = GNUNET_new (struct ChannelCtx);
936   channel_ctx->peer_ctx = peer_ctx;
937   return channel_ctx;
938 }
939
940
941 /**
942  * @brief Free memory and NULL pointers.
943  *
944  * @param channel_ctx The channel context.
945  */
946 static void
947 remove_channel_ctx (struct ChannelCtx *channel_ctx)
948 {
949   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
950
951   if (NULL != channel_ctx->destruction_task)
952   {
953     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
954     channel_ctx->destruction_task = NULL;
955   }
956
957   GNUNET_free (channel_ctx);
958
959   if (NULL == peer_ctx) return;
960   if (channel_ctx == peer_ctx->send_channel_ctx)
961   {
962     peer_ctx->send_channel_ctx = NULL;
963     peer_ctx->mq = NULL;
964   }
965   else if (channel_ctx == peer_ctx->recv_channel_ctx)
966   {
967     peer_ctx->recv_channel_ctx = NULL;
968   }
969 }
970
971
972 /**
973  * @brief Get the channel of a peer. If not existing, create.
974  *
975  * @param peer_ctx Context of the peer of which to get the channel
976  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
977  */
978 struct GNUNET_CADET_Channel *
979 get_channel (struct PeerContext *peer_ctx)
980 {
981   /* There exists a copy-paste-clone in run() */
982   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
983     GNUNET_MQ_hd_fixed_size (peer_check,
984                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
985                              struct GNUNET_MessageHeader,
986                              NULL),
987     GNUNET_MQ_hd_fixed_size (peer_push,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_pull_request,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_var_size (peer_pull_reply,
996                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
997                            struct GNUNET_RPS_P2P_PullReplyMessage,
998                            NULL),
999     GNUNET_MQ_handler_end ()
1000   };
1001
1002
1003   if (NULL == peer_ctx->send_channel_ctx)
1004   {
1005     LOG (GNUNET_ERROR_TYPE_DEBUG,
1006          "Trying to establish channel to peer %s\n",
1007          GNUNET_i2s (&peer_ctx->peer_id));
1008     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1009     peer_ctx->send_channel_ctx->channel =
1010       GNUNET_CADET_channel_create (cadet_handle,
1011                                    peer_ctx->send_channel_ctx, /* context */
1012                                    &peer_ctx->peer_id,
1013                                    &peer_ctx->sub->hash,
1014                                    GNUNET_CADET_OPTION_RELIABLE,
1015                                    NULL, /* WindowSize handler */
1016                                    &cleanup_destroyed_channel, /* Disconnect handler */
1017                                    cadet_handlers);
1018   }
1019   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1020   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1021   return peer_ctx->send_channel_ctx->channel;
1022 }
1023
1024
1025 /**
1026  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1027  *
1028  * If we already have a message queue open to this client,
1029  * simply return it, otherways create one.
1030  *
1031  * @param peer_ctx Context of the peer of whicht to get the mq
1032  * @return the #GNUNET_MQ_Handle
1033  */
1034 static struct GNUNET_MQ_Handle *
1035 get_mq (struct PeerContext *peer_ctx)
1036 {
1037   if (NULL == peer_ctx->mq)
1038   {
1039     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1040   }
1041   return peer_ctx->mq;
1042 }
1043
1044 /**
1045  * @brief Add an envelope to a message passed to mq to list of pending messages
1046  *
1047  * @param peer_ctx Context of the peer for which to insert the envelope
1048  * @param ev envelope to the message
1049  * @param type type of the message to be sent
1050  * @return pointer to pending message
1051  */
1052 static struct PendingMessage *
1053 insert_pending_message (struct PeerContext *peer_ctx,
1054                         struct GNUNET_MQ_Envelope *ev,
1055                         const char *type)
1056 {
1057   struct PendingMessage *pending_msg;
1058
1059   pending_msg = GNUNET_new (struct PendingMessage);
1060   pending_msg->ev = ev;
1061   pending_msg->peer_ctx = peer_ctx;
1062   pending_msg->type = type;
1063   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1064                                peer_ctx->pending_messages_tail,
1065                                pending_msg);
1066   return pending_msg;
1067 }
1068
1069
1070 /**
1071  * @brief Remove a pending message from the respective DLL
1072  *
1073  * @param pending_msg the pending message to remove
1074  * @param cancel whether to cancel the pending message, too
1075  */
1076 static void
1077 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1078 {
1079   struct PeerContext *peer_ctx;
1080   (void) cancel;
1081
1082   peer_ctx = pending_msg->peer_ctx;
1083   GNUNET_assert (NULL != peer_ctx);
1084   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1085                                peer_ctx->pending_messages_tail,
1086                                pending_msg);
1087   // TODO wait for the cadet implementation of message cancellation
1088   //if (GNUNET_YES == cancel)
1089   //{
1090   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1091   //}
1092   GNUNET_free (pending_msg);
1093 }
1094
1095
1096 /**
1097  * @brief This is called in response to the first message we sent as a
1098  * online check.
1099  *
1100  * @param cls #PeerContext of peer with pending online check
1101  */
1102 static void
1103 mq_online_check_successful (void *cls)
1104 {
1105   struct PeerContext *peer_ctx = cls;
1106
1107   if (NULL != peer_ctx->online_check_pending)
1108   {
1109     LOG (GNUNET_ERROR_TYPE_DEBUG,
1110         "Online check for peer %s was successfull\n",
1111         GNUNET_i2s (&peer_ctx->peer_id));
1112     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1113     peer_ctx->online_check_pending = NULL;
1114     set_peer_online (peer_ctx);
1115     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1116   }
1117 }
1118
1119 /**
1120  * Issue a check whether peer is online
1121  *
1122  * @param peer_ctx the context of the peer
1123  */
1124 static void
1125 check_peer_online (struct PeerContext *peer_ctx)
1126 {
1127   LOG (GNUNET_ERROR_TYPE_DEBUG,
1128        "Get informed about peer %s getting online\n",
1129        GNUNET_i2s (&peer_ctx->peer_id));
1130
1131   struct GNUNET_MQ_Handle *mq;
1132   struct GNUNET_MQ_Envelope *ev;
1133
1134   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1135   peer_ctx->online_check_pending =
1136     insert_pending_message (peer_ctx, ev, "Check online");
1137   mq = get_mq (peer_ctx);
1138   GNUNET_MQ_notify_sent (ev,
1139                          mq_online_check_successful,
1140                          peer_ctx);
1141   GNUNET_MQ_send (mq, ev);
1142   if (peer_ctx->sub == msub)
1143   {
1144     GNUNET_STATISTICS_update (stats,
1145                               "# pending online checks",
1146                               1,
1147                               GNUNET_NO);
1148   }
1149 }
1150
1151
1152 /**
1153  * @brief Check whether function of type #PeerOp was already scheduled
1154  *
1155  * The array with pending operations will probably never grow really big, so
1156  * iterating over it should be ok.
1157  *
1158  * @param peer_ctx Context of the peer to check for the operation
1159  * @param peer_op the operation (#PeerOp) on the peer
1160  *
1161  * @return #GNUNET_YES if this operation is scheduled on that peer
1162  *         #GNUNET_NO  otherwise
1163  */
1164 static int
1165 check_operation_scheduled (const struct PeerContext *peer_ctx,
1166                            const PeerOp peer_op)
1167 {
1168   unsigned int i;
1169
1170   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1171     if (peer_op == peer_ctx->pending_ops[i].op)
1172       return GNUNET_YES;
1173   return GNUNET_NO;
1174 }
1175
1176
1177 /**
1178  * @brief Callback for scheduler to destroy a channel
1179  *
1180  * @param cls Context of the channel
1181  */
1182 static void
1183 destroy_channel (struct ChannelCtx *channel_ctx)
1184 {
1185   struct GNUNET_CADET_Channel *channel;
1186
1187   if (NULL != channel_ctx->destruction_task)
1188   {
1189     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1190     channel_ctx->destruction_task = NULL;
1191   }
1192   GNUNET_assert (channel_ctx->channel != NULL);
1193   channel = channel_ctx->channel;
1194   channel_ctx->channel = NULL;
1195   GNUNET_CADET_channel_destroy (channel);
1196   remove_channel_ctx (channel_ctx);
1197 }
1198
1199
1200 /**
1201  * @brief Destroy a cadet channel.
1202  *
1203  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1204  *
1205  * @param cls
1206  */
1207 static void
1208 destroy_channel_cb (void *cls)
1209 {
1210   struct ChannelCtx *channel_ctx = cls;
1211
1212   channel_ctx->destruction_task = NULL;
1213   destroy_channel (channel_ctx);
1214 }
1215
1216
1217 /**
1218  * @brief Schedule the destruction of a channel for immediately afterwards.
1219  *
1220  * In case a channel is to be destroyed from within the callback to the
1221  * destruction of another channel (send channel), we cannot call
1222  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1223  * construction.
1224  *
1225  * @param channel_ctx channel to be destroyed.
1226  */
1227 static void
1228 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1229 {
1230   GNUNET_assert (NULL ==
1231                  channel_ctx->destruction_task);
1232   GNUNET_assert (NULL !=
1233                  channel_ctx->channel);
1234   channel_ctx->destruction_task =
1235     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1236                               channel_ctx);
1237 }
1238
1239
1240 /**
1241  * @brief Remove peer
1242  *
1243  * - Empties the list with pending operations
1244  * - Empties the list with pending messages
1245  * - Cancels potentially existing online check
1246  * - Schedules closing of send and recv channels
1247  * - Removes peer from peer map
1248  *
1249  * @param peer_ctx Context of the peer to be destroyed
1250  * @return #GNUNET_YES if peer was removed
1251  *         #GNUNET_NO  otherwise
1252  */
1253 static int
1254 destroy_peer (struct PeerContext *peer_ctx)
1255 {
1256   GNUNET_assert (NULL != peer_ctx);
1257   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1258   if (GNUNET_NO ==
1259       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1260                                               &peer_ctx->peer_id))
1261   {
1262     return GNUNET_NO;
1263   }
1264   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1265   LOG (GNUNET_ERROR_TYPE_DEBUG,
1266        "Going to remove peer %s\n",
1267        GNUNET_i2s (&peer_ctx->peer_id));
1268   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1269
1270   /* Clear list of pending operations */
1271   // TODO this probably leaks memory
1272   //      ('only' the cls to the function. Not sure what to do with it)
1273   GNUNET_array_grow (peer_ctx->pending_ops,
1274                      peer_ctx->num_pending_ops,
1275                      0);
1276   /* Remove all pending messages */
1277   while (NULL != peer_ctx->pending_messages_head)
1278   {
1279     LOG (GNUNET_ERROR_TYPE_DEBUG,
1280          "Removing unsent %s\n",
1281          peer_ctx->pending_messages_head->type);
1282     /* Cancle pending message, too */
1283     if ( (NULL != peer_ctx->online_check_pending) &&
1284          (0 == memcmp (peer_ctx->pending_messages_head,
1285                      peer_ctx->online_check_pending,
1286                      sizeof (struct PendingMessage))) )
1287       {
1288         peer_ctx->online_check_pending = NULL;
1289         if (peer_ctx->sub == msub)
1290         {
1291           GNUNET_STATISTICS_update (stats,
1292                                     "# pending online checks",
1293                                     -1,
1294                                     GNUNET_NO);
1295         }
1296       }
1297     remove_pending_message (peer_ctx->pending_messages_head,
1298                             GNUNET_YES);
1299   }
1300
1301   /* If we are still waiting for notification whether this peer is online
1302    * cancel the according task */
1303   if (NULL != peer_ctx->online_check_pending)
1304   {
1305     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306                 "Removing pending online check for peer %s\n",
1307                 GNUNET_i2s (&peer_ctx->peer_id));
1308     // TODO wait until cadet sets mq->cancel_impl
1309     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1310     remove_pending_message (peer_ctx->online_check_pending,
1311                             GNUNET_YES);
1312     peer_ctx->online_check_pending = NULL;
1313   }
1314
1315   if (NULL != peer_ctx->send_channel_ctx)
1316   {
1317     /* This is possibly called from within channel destruction */
1318     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1319     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1320     peer_ctx->send_channel_ctx = NULL;
1321     peer_ctx->mq = NULL;
1322   }
1323   if (NULL != peer_ctx->recv_channel_ctx)
1324   {
1325     /* This is possibly called from within channel destruction */
1326     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1327     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1328     peer_ctx->recv_channel_ctx = NULL;
1329   }
1330
1331   if (GNUNET_YES !=
1332       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1333                                                 &peer_ctx->peer_id))
1334   {
1335     LOG (GNUNET_ERROR_TYPE_WARNING,
1336          "removing peer from peer_ctx->sub->peer_map failed\n");
1337   }
1338   if (peer_ctx->sub == msub)
1339   {
1340     GNUNET_STATISTICS_set (stats,
1341                           "# known peers",
1342                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1343                           GNUNET_NO);
1344   }
1345   GNUNET_free (peer_ctx);
1346   return GNUNET_YES;
1347 }
1348
1349
1350 /**
1351  * Iterator over hash map entries. Deletes all contexts of peers.
1352  *
1353  * @param cls closure
1354  * @param key current public key
1355  * @param value value in the hash map
1356  * @return #GNUNET_YES if we should continue to iterate,
1357  *         #GNUNET_NO if not.
1358  */
1359 static int
1360 peermap_clear_iterator (void *cls,
1361                         const struct GNUNET_PeerIdentity *key,
1362                         void *value)
1363 {
1364   struct Sub *sub = cls;
1365   (void) value;
1366
1367   destroy_peer (get_peer_ctx (sub->peer_map, key));
1368   return GNUNET_YES;
1369 }
1370
1371
1372 /**
1373  * @brief This is called once a message is sent.
1374  *
1375  * Removes the pending message
1376  *
1377  * @param cls type of the message that was sent
1378  */
1379 static void
1380 mq_notify_sent_cb (void *cls)
1381 {
1382   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1383   LOG (GNUNET_ERROR_TYPE_DEBUG,
1384       "%s was sent.\n",
1385       pending_msg->type);
1386   if (pending_msg->peer_ctx->sub == msub)
1387   {
1388     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1389       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1390     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1391       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1392     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1393       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1395                       NULL != map_single_hop &&
1396         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1397           &pending_msg->peer_ctx->peer_id))
1398       GNUNET_STATISTICS_update(stats,
1399                                "# pull requests sent (multi-hop peer)",
1400                                1,
1401                                GNUNET_NO);
1402   }
1403   /* Do not cancle message */
1404   remove_pending_message (pending_msg, GNUNET_NO);
1405 }
1406
1407
1408 /**
1409  * @brief Iterator function for #store_valid_peers.
1410  *
1411  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1412  * Writes single peer to disk.
1413  *
1414  * @param cls the file handle to write to.
1415  * @param peer current peer
1416  * @param value unused
1417  *
1418  * @return  #GNUNET_YES if we should continue to
1419  *          iterate,
1420  *          #GNUNET_NO if not.
1421  */
1422 static int
1423 store_peer_presistently_iterator (void *cls,
1424                                   const struct GNUNET_PeerIdentity *peer,
1425                                   void *value)
1426 {
1427   const struct GNUNET_DISK_FileHandle *fh = cls;
1428   char peer_string[128];
1429   int size;
1430   ssize_t ret;
1431   (void) value;
1432
1433   if (NULL == peer)
1434   {
1435     return GNUNET_YES;
1436   }
1437   size = GNUNET_snprintf (peer_string,
1438                           sizeof (peer_string),
1439                           "%s\n",
1440                           GNUNET_i2s_full (peer));
1441   GNUNET_assert (53 == size);
1442   ret = GNUNET_DISK_file_write (fh,
1443                                 peer_string,
1444                                 size);
1445   GNUNET_assert (size == ret);
1446   return GNUNET_YES;
1447 }
1448
1449
1450 /**
1451  * @brief Store the peers currently in #valid_peers to disk.
1452  *
1453  * @param sub Sub for which to store the valid peers
1454  */
1455 static void
1456 store_valid_peers (const struct Sub *sub)
1457 {
1458   struct GNUNET_DISK_FileHandle *fh;
1459   uint32_t number_written_peers;
1460   int ret;
1461
1462   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1463   {
1464     return;
1465   }
1466
1467   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1468   if (GNUNET_SYSERR == ret)
1469   {
1470     LOG (GNUNET_ERROR_TYPE_WARNING,
1471         "Not able to create directory for file `%s'\n",
1472         sub->filename_valid_peers);
1473     GNUNET_break (0);
1474   }
1475   else if (GNUNET_NO == ret)
1476   {
1477     LOG (GNUNET_ERROR_TYPE_WARNING,
1478         "Directory for file `%s' exists but is not writable for us\n",
1479         sub->filename_valid_peers);
1480     GNUNET_break (0);
1481   }
1482   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1483                               GNUNET_DISK_OPEN_WRITE |
1484                                   GNUNET_DISK_OPEN_CREATE,
1485                               GNUNET_DISK_PERM_USER_READ |
1486                                   GNUNET_DISK_PERM_USER_WRITE);
1487   if (NULL == fh)
1488   {
1489     LOG (GNUNET_ERROR_TYPE_WARNING,
1490         "Not able to write valid peers to file `%s'\n",
1491         sub->filename_valid_peers);
1492     return;
1493   }
1494   LOG (GNUNET_ERROR_TYPE_DEBUG,
1495       "Writing %u valid peers to disk\n",
1496       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1497   number_written_peers =
1498     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1499                                            store_peer_presistently_iterator,
1500                                            fh);
1501   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1502   GNUNET_assert (number_written_peers ==
1503       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1504 }
1505
1506
1507 /**
1508  * @brief Convert string representation of peer id to peer id.
1509  *
1510  * Counterpart to #GNUNET_i2s_full.
1511  *
1512  * @param string_repr The string representation of the peer id
1513  *
1514  * @return The peer id
1515  */
1516 static const struct GNUNET_PeerIdentity *
1517 s2i_full (const char *string_repr)
1518 {
1519   struct GNUNET_PeerIdentity *peer;
1520   size_t len;
1521   int ret;
1522
1523   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1524   len = strlen (string_repr);
1525   if (52 > len)
1526   {
1527     LOG (GNUNET_ERROR_TYPE_WARNING,
1528         "Not able to convert string representation of PeerID to PeerID\n"
1529         "Sting representation: %s (len %lu) - too short\n",
1530         string_repr,
1531         len);
1532     GNUNET_break (0);
1533   }
1534   else if (52 < len)
1535   {
1536     len = 52;
1537   }
1538   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1539                                                     len,
1540                                                     &peer->public_key);
1541   if (GNUNET_OK != ret)
1542   {
1543     LOG (GNUNET_ERROR_TYPE_WARNING,
1544         "Not able to convert string representation of PeerID to PeerID\n"
1545         "Sting representation: %s\n",
1546         string_repr);
1547     GNUNET_break (0);
1548   }
1549   return peer;
1550 }
1551
1552
1553 /**
1554  * @brief Restore the peers on disk to #valid_peers.
1555  *
1556  * @param sub Sub for which to restore the valid peers
1557  */
1558 static void
1559 restore_valid_peers (const struct Sub *sub)
1560 {
1561   off_t file_size;
1562   uint32_t num_peers;
1563   struct GNUNET_DISK_FileHandle *fh;
1564   char *buf;
1565   ssize_t size_read;
1566   char *iter_buf;
1567   char *str_repr;
1568   const struct GNUNET_PeerIdentity *peer;
1569
1570   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1571   {
1572     return;
1573   }
1574
1575   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1576   {
1577     return;
1578   }
1579   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1580                               GNUNET_DISK_OPEN_READ,
1581                               GNUNET_DISK_PERM_NONE);
1582   GNUNET_assert (NULL != fh);
1583   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1584   num_peers = file_size / 53;
1585   buf = GNUNET_malloc (file_size);
1586   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1587   GNUNET_assert (size_read == file_size);
1588   LOG (GNUNET_ERROR_TYPE_DEBUG,
1589       "Restoring %" PRIu32 " peers from file `%s'\n",
1590       num_peers,
1591       sub->filename_valid_peers);
1592   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1593   {
1594     str_repr = GNUNET_strndup (iter_buf, 53);
1595     peer = s2i_full (str_repr);
1596     GNUNET_free (str_repr);
1597     add_valid_peer (peer, sub->valid_peers);
1598     LOG (GNUNET_ERROR_TYPE_DEBUG,
1599         "Restored valid peer %s from disk\n",
1600         GNUNET_i2s_full (peer));
1601   }
1602   iter_buf = NULL;
1603   GNUNET_free (buf);
1604   LOG (GNUNET_ERROR_TYPE_DEBUG,
1605       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1606       num_peers,
1607       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1608   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1609   {
1610     LOG (GNUNET_ERROR_TYPE_WARNING,
1611         "Number of restored peers does not match file size. Have probably duplicates.\n");
1612   }
1613   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1614   LOG (GNUNET_ERROR_TYPE_DEBUG,
1615       "Restored %u valid peers from disk\n",
1616       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1617 }
1618
1619
1620 /**
1621  * @brief Delete storage of peers that was created with #initialise_peers ()
1622  *
1623  * @param sub Sub for which the storage is deleted
1624  */
1625 static void
1626 peers_terminate (struct Sub *sub)
1627 {
1628   if (GNUNET_SYSERR ==
1629       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1630                                              &peermap_clear_iterator,
1631                                              sub))
1632   {
1633     LOG (GNUNET_ERROR_TYPE_WARNING,
1634         "Iteration destroying peers was aborted.\n");
1635   }
1636   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1637   sub->peer_map = NULL;
1638   store_valid_peers (sub);
1639   GNUNET_free (sub->filename_valid_peers);
1640   sub->filename_valid_peers = NULL;
1641   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1642   sub->valid_peers = NULL;
1643 }
1644
1645
1646 /**
1647  * Iterator over #valid_peers hash map entries.
1648  *
1649  * @param cls Closure that contains iterator function and closure
1650  * @param peer current peer id
1651  * @param value value in the hash map - unused
1652  * @return #GNUNET_YES if we should continue to
1653  *         iterate,
1654  *         #GNUNET_NO if not.
1655  */
1656 static int
1657 valid_peer_iterator (void *cls,
1658                      const struct GNUNET_PeerIdentity *peer,
1659                      void *value)
1660 {
1661   struct PeersIteratorCls *it_cls = cls;
1662   (void) value;
1663
1664   return it_cls->iterator (it_cls->cls, peer);
1665 }
1666
1667
1668 /**
1669  * @brief Get all currently known, valid peer ids.
1670  *
1671  * @param valid_peers Peer map containing the valid peers in question
1672  * @param iterator function to call on each peer id
1673  * @param it_cls extra argument to @a iterator
1674  * @return the number of key value pairs processed,
1675  *         #GNUNET_SYSERR if it aborted iteration
1676  */
1677 static int
1678 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1679                  PeersIterator iterator,
1680                  void *it_cls)
1681 {
1682   struct PeersIteratorCls *cls;
1683   int ret;
1684
1685   cls = GNUNET_new (struct PeersIteratorCls);
1686   cls->iterator = iterator;
1687   cls->cls = it_cls;
1688   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1689                                                valid_peer_iterator,
1690                                                cls);
1691   GNUNET_free (cls);
1692   return ret;
1693 }
1694
1695
1696 /**
1697  * @brief Add peer to known peers.
1698  *
1699  * This function is called on new peer_ids from 'external' sources
1700  * (client seed, cadet get_peers(), ...)
1701  *
1702  * @param sub Sub with the peer map that the @a peer will be added to
1703  * @param peer the new #GNUNET_PeerIdentity
1704  *
1705  * @return #GNUNET_YES if peer was inserted
1706  *         #GNUNET_NO  otherwise
1707  */
1708 static int
1709 insert_peer (struct Sub *sub,
1710              const struct GNUNET_PeerIdentity *peer)
1711 {
1712   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1713   {
1714     return GNUNET_NO; /* We already know this peer - nothing to do */
1715   }
1716   (void) create_peer_ctx (sub, peer);
1717   return GNUNET_YES;
1718 }
1719
1720
1721 /**
1722  * @brief Check whether flags on a peer are set.
1723  *
1724  * @param peer_map Peer map that is expected to contain the @a peer
1725  * @param peer the peer to check the flag of
1726  * @param flags the flags to check
1727  *
1728  * @return #GNUNET_SYSERR if peer is not known
1729  *         #GNUNET_YES    if all given flags are set
1730  *         #GNUNET_NO     otherwise
1731  */
1732 static int
1733 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1734                  const struct GNUNET_PeerIdentity *peer,
1735                  enum Peers_PeerFlags flags)
1736 {
1737   struct PeerContext *peer_ctx;
1738
1739   if (GNUNET_NO == check_peer_known (peer_map, peer))
1740   {
1741     return GNUNET_SYSERR;
1742   }
1743   peer_ctx = get_peer_ctx (peer_map, peer);
1744   return check_peer_flag_set (peer_ctx, flags);
1745 }
1746
1747 /**
1748  * @brief Try connecting to a peer to see whether it is online
1749  *
1750  * If not known yet, insert into known peers
1751  *
1752  * @param sub Sub which would contain the @a peer
1753  * @param peer the peer whose online is to be checked
1754  * @return #GNUNET_YES if the check was issued
1755  *         #GNUNET_NO  otherwise
1756  */
1757 static int
1758 issue_peer_online_check (struct Sub *sub,
1759                          const struct GNUNET_PeerIdentity *peer)
1760 {
1761   struct PeerContext *peer_ctx;
1762
1763   (void) insert_peer (sub, peer); // TODO even needed?
1764   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1765   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1766        (NULL == peer_ctx->online_check_pending) )
1767   {
1768     check_peer_online (peer_ctx);
1769     return GNUNET_YES;
1770   }
1771   return GNUNET_NO;
1772 }
1773
1774
1775 /**
1776  * @brief Check if peer is removable.
1777  *
1778  * Check if
1779  *  - a recv channel exists
1780  *  - there are pending messages
1781  *  - there is no pending pull reply
1782  *
1783  * @param peer_ctx Context of the peer in question
1784  * @return #GNUNET_YES    if peer is removable
1785  *         #GNUNET_NO     if peer is NOT removable
1786  *         #GNUNET_SYSERR if peer is not known
1787  */
1788 static int
1789 check_removable (const struct PeerContext *peer_ctx)
1790 {
1791   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1792                                                            &peer_ctx->peer_id))
1793   {
1794     return GNUNET_SYSERR;
1795   }
1796
1797   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1798        (NULL != peer_ctx->pending_messages_head) ||
1799        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1800   {
1801     return GNUNET_NO;
1802   }
1803   return GNUNET_YES;
1804 }
1805
1806
1807 /**
1808  * @brief Check whether @a peer is actually a peer.
1809  *
1810  * A valid peer is a peer that we know exists eg. we were connected to once.
1811  *
1812  * @param valid_peers Peer map that would contain the @a peer
1813  * @param peer peer in question
1814  *
1815  * @return #GNUNET_YES if peer is valid
1816  *         #GNUNET_NO  if peer is not valid
1817  */
1818 static int
1819 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1820                   const struct GNUNET_PeerIdentity *peer)
1821 {
1822   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1823 }
1824
1825
1826 /**
1827  * @brief Indicate that we want to send to the other peer
1828  *
1829  * This establishes a sending channel
1830  *
1831  * @param peer_ctx Context of the target peer
1832  */
1833 static void
1834 indicate_sending_intention (struct PeerContext *peer_ctx)
1835 {
1836   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1837                                                  &peer_ctx->peer_id));
1838   (void) get_channel (peer_ctx);
1839 }
1840
1841
1842 /**
1843  * @brief Check whether other peer has the intention to send/opened channel
1844  *        towars us
1845  *
1846  * @param peer_ctx Context of the peer in question
1847  *
1848  * @return #GNUNET_YES if peer has the intention to send
1849  *         #GNUNET_NO  otherwise
1850  */
1851 static int
1852 check_peer_send_intention (const struct PeerContext *peer_ctx)
1853 {
1854   if (NULL != peer_ctx->recv_channel_ctx)
1855   {
1856     return GNUNET_YES;
1857   }
1858   return GNUNET_NO;
1859 }
1860
1861
1862 /**
1863  * Handle the channel a peer opens to us.
1864  *
1865  * @param cls The closure - Sub
1866  * @param channel The channel the peer wants to establish
1867  * @param initiator The peer's peer ID
1868  *
1869  * @return initial channel context for the channel
1870  *         (can be NULL -- that's not an error)
1871  */
1872 static void *
1873 handle_inbound_channel (void *cls,
1874                         struct GNUNET_CADET_Channel *channel,
1875                         const struct GNUNET_PeerIdentity *initiator)
1876 {
1877   struct PeerContext *peer_ctx;
1878   struct ChannelCtx *channel_ctx;
1879   struct Sub *sub = cls;
1880
1881   LOG (GNUNET_ERROR_TYPE_DEBUG,
1882       "New channel was established to us (Peer %s).\n",
1883       GNUNET_i2s (initiator));
1884   GNUNET_assert (NULL != channel); /* according to cadet API */
1885   /* Make sure we 'know' about this peer */
1886   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1887   set_peer_online (peer_ctx);
1888   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1889   channel_ctx = add_channel_ctx (peer_ctx);
1890   channel_ctx->channel = channel;
1891   /* We only accept one incoming channel per peer */
1892   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1893                                                              initiator)))
1894   {
1895     LOG (GNUNET_ERROR_TYPE_WARNING,
1896         "Already got one receive channel. Destroying old one.\n");
1897     GNUNET_break_op (0);
1898     destroy_channel (peer_ctx->recv_channel_ctx);
1899     peer_ctx->recv_channel_ctx = channel_ctx;
1900     /* return the channel context */
1901     return channel_ctx;
1902   }
1903   peer_ctx->recv_channel_ctx = channel_ctx;
1904   return channel_ctx;
1905 }
1906
1907
1908 /**
1909  * @brief Check whether a sending channel towards the given peer exists
1910  *
1911  * @param peer_ctx Context of the peer in question
1912  *
1913  * @return #GNUNET_YES if a sending channel towards that peer exists
1914  *         #GNUNET_NO  otherwise
1915  */
1916 static int
1917 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1918 {
1919   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1920                                      &peer_ctx->peer_id))
1921   { /* If no such peer exists, there is no channel */
1922     return GNUNET_NO;
1923   }
1924   if (NULL == peer_ctx->send_channel_ctx)
1925   {
1926     return GNUNET_NO;
1927   }
1928   return GNUNET_YES;
1929 }
1930
1931
1932 /**
1933  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1934  *        intention to another peer
1935  *
1936  * @param peer_ctx Context to the peer
1937  * @return #GNUNET_YES if channel was destroyed
1938  *         #GNUNET_NO  otherwise
1939  */
1940 static int
1941 destroy_sending_channel (struct PeerContext *peer_ctx)
1942 {
1943   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1944                                      &peer_ctx->peer_id))
1945   {
1946     return GNUNET_NO;
1947   }
1948   if (NULL != peer_ctx->send_channel_ctx)
1949   {
1950     destroy_channel (peer_ctx->send_channel_ctx);
1951     (void) check_connected (peer_ctx);
1952     return GNUNET_YES;
1953   }
1954   return GNUNET_NO;
1955 }
1956
1957 /**
1958  * @brief Send a message to another peer.
1959  *
1960  * Keeps track about pending messages so they can be properly removed when the
1961  * peer is destroyed.
1962  *
1963  * @param peer_ctx Context of the peer to which the message is to be sent
1964  * @param ev envelope of the message
1965  * @param type type of the message
1966  */
1967 static void
1968 send_message (struct PeerContext *peer_ctx,
1969               struct GNUNET_MQ_Envelope *ev,
1970               const char *type)
1971 {
1972   struct PendingMessage *pending_msg;
1973   struct GNUNET_MQ_Handle *mq;
1974
1975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976               "Sending message to %s of type %s\n",
1977               GNUNET_i2s (&peer_ctx->peer_id),
1978               type);
1979   pending_msg = insert_pending_message (peer_ctx, ev, type);
1980   mq = get_mq (peer_ctx);
1981   GNUNET_MQ_notify_sent (ev,
1982                          mq_notify_sent_cb,
1983                          pending_msg);
1984   GNUNET_MQ_send (mq, ev);
1985 }
1986
1987 /**
1988  * @brief Schedule a operation on given peer
1989  *
1990  * Avoids scheduling an operation twice.
1991  *
1992  * @param peer_ctx Context of the peer for which to schedule the operation
1993  * @param peer_op the operation to schedule
1994  * @param cls Closure to @a peer_op
1995  *
1996  * @return #GNUNET_YES if the operation was scheduled
1997  *         #GNUNET_NO  otherwise
1998  */
1999 static int
2000 schedule_operation (struct PeerContext *peer_ctx,
2001                     const PeerOp peer_op,
2002                     void *cls)
2003 {
2004   struct PeerPendingOp pending_op;
2005
2006   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2007                                                  &peer_ctx->peer_id));
2008
2009   //TODO if ONLINE execute immediately
2010
2011   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2012   {
2013     pending_op.op = peer_op;
2014     pending_op.op_cls = cls;
2015     GNUNET_array_append (peer_ctx->pending_ops,
2016                          peer_ctx->num_pending_ops,
2017                          pending_op);
2018     return GNUNET_YES;
2019   }
2020   return GNUNET_NO;
2021 }
2022
2023 /***********************************************************************
2024  * /Old gnunet-service-rps_peers.c
2025 ***********************************************************************/
2026
2027
2028 /***********************************************************************
2029  * Housekeeping with clients
2030 ***********************************************************************/
2031
2032 /**
2033  * Closure used to pass the client and the id to the callback
2034  * that replies to a client's request
2035  */
2036 struct ReplyCls
2037 {
2038   /**
2039    * DLL
2040    */
2041   struct ReplyCls *next;
2042   struct ReplyCls *prev;
2043
2044   /**
2045    * The identifier of the request
2046    */
2047   uint32_t id;
2048
2049   /**
2050    * The handle to the request
2051    */
2052   struct RPS_SamplerRequestHandle *req_handle;
2053
2054   /**
2055    * The client handle to send the reply to
2056    */
2057   struct ClientContext *cli_ctx;
2058 };
2059
2060
2061 /**
2062  * Struct used to store the context of a connected client.
2063  */
2064 struct ClientContext
2065 {
2066   /**
2067    * DLL
2068    */
2069   struct ClientContext *next;
2070   struct ClientContext *prev;
2071
2072   /**
2073    * The message queue to communicate with the client.
2074    */
2075   struct GNUNET_MQ_Handle *mq;
2076
2077   /**
2078    * @brief How many updates this client expects to receive.
2079    */
2080   int64_t view_updates_left;
2081
2082   /**
2083    * @brief Whether this client wants to receive stream updates.
2084    * Either #GNUNET_YES or #GNUNET_NO
2085    */
2086   int8_t stream_update;
2087
2088   /**
2089    * The client handle to send the reply to
2090    */
2091   struct GNUNET_SERVICE_Client *client;
2092
2093   /**
2094    * The #Sub this context belongs to
2095    */
2096   struct Sub *sub;
2097 };
2098
2099 /**
2100  * DLL with all clients currently connected to us
2101  */
2102 struct ClientContext *cli_ctx_head;
2103 struct ClientContext *cli_ctx_tail;
2104
2105 /***********************************************************************
2106  * /Housekeeping with clients
2107 ***********************************************************************/
2108
2109
2110
2111
2112
2113 /***********************************************************************
2114  * Util functions
2115 ***********************************************************************/
2116
2117
2118 /**
2119  * Print peerlist to log.
2120  */
2121 static void
2122 print_peer_list (struct GNUNET_PeerIdentity *list,
2123                  unsigned int len)
2124 {
2125   unsigned int i;
2126
2127   LOG (GNUNET_ERROR_TYPE_DEBUG,
2128        "Printing peer list of length %u at %p:\n",
2129        len,
2130        list);
2131   for (i = 0 ; i < len ; i++)
2132   {
2133     LOG (GNUNET_ERROR_TYPE_DEBUG,
2134          "%u. peer: %s\n",
2135          i, GNUNET_i2s (&list[i]));
2136   }
2137 }
2138
2139
2140 /**
2141  * Remove peer from list.
2142  */
2143 static void
2144 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2145                unsigned int *list_size,
2146                const struct GNUNET_PeerIdentity *peer)
2147 {
2148   unsigned int i;
2149   struct GNUNET_PeerIdentity *tmp;
2150
2151   tmp = *peer_list;
2152
2153   LOG (GNUNET_ERROR_TYPE_DEBUG,
2154        "Removing peer %s from list at %p\n",
2155        GNUNET_i2s (peer),
2156        tmp);
2157
2158   for ( i = 0 ; i < *list_size ; i++ )
2159   {
2160     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2161     {
2162       if (i < *list_size -1)
2163       { /* Not at the last entry -- shift peers left */
2164         memmove (&tmp[i], &tmp[i +1],
2165                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2166       }
2167       /* Remove last entry (should be now useless PeerID) */
2168       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2169     }
2170   }
2171   *peer_list = tmp;
2172 }
2173
2174
2175 /**
2176  * Insert PeerID in #view
2177  *
2178  * Called once we know a peer is online.
2179  * Implements #PeerOp
2180  *
2181  * @return GNUNET_OK if peer was actually inserted
2182  *         GNUNET_NO if peer was not inserted
2183  */
2184 static void
2185 insert_in_view_op (void *cls,
2186                    const struct GNUNET_PeerIdentity *peer);
2187
2188 /**
2189  * Insert PeerID in #view
2190  *
2191  * Called once we know a peer is online.
2192  *
2193  * @param sub Sub in with the view to insert in
2194  * @param peer the peer to insert
2195  *
2196  * @return GNUNET_OK if peer was actually inserted
2197  *         GNUNET_NO if peer was not inserted
2198  */
2199 static int
2200 insert_in_view (struct Sub *sub,
2201                 const struct GNUNET_PeerIdentity *peer)
2202 {
2203   struct PeerContext *peer_ctx;
2204   int online;
2205   int ret;
2206
2207   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2208   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2209   if ( (GNUNET_NO == online) ||
2210        (GNUNET_SYSERR == online) ) /* peer is not even known */
2211   {
2212     (void) issue_peer_online_check (sub, peer);
2213     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2214     return GNUNET_NO;
2215   }
2216   /* Open channel towards peer to keep connection open */
2217   indicate_sending_intention (peer_ctx);
2218   ret = View_put (sub->view, peer);
2219   if (peer_ctx->sub == msub)
2220   {
2221     GNUNET_STATISTICS_set (stats,
2222                            "view size",
2223                            View_size (peer_ctx->sub->view),
2224                            GNUNET_NO);
2225   }
2226   return ret;
2227 }
2228
2229
2230 /**
2231  * @brief Send view to client
2232  *
2233  * @param cli_ctx the context of the client
2234  * @param view_array the peerids of the view as array (can be empty)
2235  * @param view_size the size of the view array (can be 0)
2236  */
2237 static void
2238 send_view (const struct ClientContext *cli_ctx,
2239            const struct GNUNET_PeerIdentity *view_array,
2240            uint64_t view_size)
2241 {
2242   struct GNUNET_MQ_Envelope *ev;
2243   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2244   struct Sub *sub;
2245
2246   if (NULL == view_array)
2247   {
2248     if (NULL == cli_ctx->sub) sub = msub;
2249     else sub = cli_ctx->sub;
2250     view_size = View_size (sub->view);
2251     view_array = View_get_as_array (sub->view);
2252   }
2253
2254   ev = GNUNET_MQ_msg_extra (out_msg,
2255                             view_size * sizeof (struct GNUNET_PeerIdentity),
2256                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2257   out_msg->num_peers = htonl (view_size);
2258
2259   GNUNET_memcpy (&out_msg[1],
2260                  view_array,
2261                  view_size * sizeof (struct GNUNET_PeerIdentity));
2262   GNUNET_MQ_send (cli_ctx->mq, ev);
2263 }
2264
2265
2266 /**
2267  * @brief Send peer from biased stream to client.
2268  *
2269  * TODO merge with send_view, parameterise
2270  *
2271  * @param cli_ctx the context of the client
2272  * @param view_array the peerids of the view as array (can be empty)
2273  * @param view_size the size of the view array (can be 0)
2274  */
2275 static void
2276 send_stream_peers (const struct ClientContext *cli_ctx,
2277                    uint64_t num_peers,
2278                    const struct GNUNET_PeerIdentity *peers)
2279 {
2280   struct GNUNET_MQ_Envelope *ev;
2281   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2282
2283   GNUNET_assert (NULL != peers);
2284
2285   ev = GNUNET_MQ_msg_extra (out_msg,
2286                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2287                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2288   out_msg->num_peers = htonl (num_peers);
2289
2290   GNUNET_memcpy (&out_msg[1],
2291                  peers,
2292                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2293   GNUNET_MQ_send (cli_ctx->mq, ev);
2294 }
2295
2296
2297 /**
2298  * @brief sends updates to clients that are interested
2299  *
2300  * @param sub Sub for which to notify clients
2301  */
2302 static void
2303 clients_notify_view_update (const struct Sub *sub)
2304 {
2305   struct ClientContext *cli_ctx_iter;
2306   uint64_t num_peers;
2307   const struct GNUNET_PeerIdentity *view_array;
2308
2309   num_peers = View_size (sub->view);
2310   view_array = View_get_as_array(sub->view);
2311   /* check size of view is small enough */
2312   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2313   {
2314     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2315                 "View is too big to send\n");
2316     return;
2317   }
2318
2319   for (cli_ctx_iter = cli_ctx_head;
2320        NULL != cli_ctx_iter;
2321        cli_ctx_iter = cli_ctx_iter->next)
2322   {
2323     if (1 < cli_ctx_iter->view_updates_left)
2324     {
2325       /* Client wants to receive limited amount of updates */
2326       cli_ctx_iter->view_updates_left -= 1;
2327     } else if (1 == cli_ctx_iter->view_updates_left)
2328     {
2329       /* Last update of view for client */
2330       cli_ctx_iter->view_updates_left = -1;
2331     } else if (0 > cli_ctx_iter->view_updates_left) {
2332       /* Client is not interested in updates */
2333       continue;
2334     }
2335     /* else _updates_left == 0 - infinite amount of updates */
2336
2337     /* send view */
2338     send_view (cli_ctx_iter, view_array, num_peers);
2339   }
2340 }
2341
2342
2343 /**
2344  * @brief sends updates to clients that are interested
2345  *
2346  * @param num_peers Number of peers to send
2347  * @param peers the array of peers to send
2348  */
2349 static void
2350 clients_notify_stream_peer (const struct Sub *sub,
2351                             uint64_t num_peers,
2352                             const struct GNUNET_PeerIdentity *peers)
2353                             // TODO enum StreamPeerSource)
2354 {
2355   struct ClientContext *cli_ctx_iter;
2356
2357   LOG (GNUNET_ERROR_TYPE_DEBUG,
2358       "Got peer (%s) from biased stream - update all clients\n",
2359       GNUNET_i2s (peers));
2360
2361   for (cli_ctx_iter = cli_ctx_head;
2362        NULL != cli_ctx_iter;
2363        cli_ctx_iter = cli_ctx_iter->next)
2364   {
2365     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2366         (sub == cli_ctx_iter->sub || sub == msub))
2367     {
2368       send_stream_peers (cli_ctx_iter, num_peers, peers);
2369     }
2370   }
2371 }
2372
2373
2374 /**
2375  * Put random peer from sampler into the view as history update.
2376  *
2377  * @param ids Array of Peers to insert into view
2378  * @param num_peers Number of peers to insert
2379  * @param cls Closure - The Sub for which this is to be done
2380  */
2381 static void
2382 hist_update (const struct GNUNET_PeerIdentity *ids,
2383              uint32_t num_peers,
2384              void *cls)
2385 {
2386   unsigned int i;
2387   struct Sub *sub = cls;
2388
2389   for (i = 0; i < num_peers; i++)
2390   {
2391     int inserted;
2392     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2393     {
2394       LOG (GNUNET_ERROR_TYPE_WARNING,
2395            "Peer in history update not known!\n");
2396       continue;
2397     }
2398     inserted = insert_in_view (sub, &ids[i]);
2399     if (GNUNET_OK == inserted)
2400     {
2401       clients_notify_stream_peer (sub, 1, &ids[i]);
2402     }
2403     to_file (sub->file_name_view_log,
2404              "+%s\t(hist)",
2405              GNUNET_i2s_full (ids));
2406   }
2407   clients_notify_view_update (sub);
2408 }
2409
2410
2411 /**
2412  * Wrapper around #RPS_sampler_resize()
2413  *
2414  * If we do not have enough sampler elements, double current sampler size
2415  * If we have more than enough sampler elements, halv current sampler size
2416  *
2417  * @param sampler The sampler to resize
2418  * @param new_size New size to which to resize
2419  */
2420 static void
2421 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2422 {
2423   unsigned int sampler_size;
2424
2425   // TODO statistics
2426   // TODO respect the min, max
2427   sampler_size = RPS_sampler_get_size (sampler);
2428   if (sampler_size > new_size * 4)
2429   { /* Shrinking */
2430     RPS_sampler_resize (sampler, sampler_size / 2);
2431   }
2432   else if (sampler_size < new_size)
2433   { /* Growing */
2434     RPS_sampler_resize (sampler, sampler_size * 2);
2435   }
2436   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2437 }
2438
2439
2440 /**
2441  * Add all peers in @a peer_array to @a peer_map used as set.
2442  *
2443  * @param peer_array array containing the peers
2444  * @param num_peers number of peers in @peer_array
2445  * @param peer_map the peermap to use as set
2446  */
2447 static void
2448 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2449                        unsigned int num_peers,
2450                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2451 {
2452   unsigned int i;
2453   if (NULL == peer_map)
2454   {
2455     LOG (GNUNET_ERROR_TYPE_WARNING,
2456          "Trying to add peers to non-existing peermap.\n");
2457     return;
2458   }
2459
2460   for (i = 0; i < num_peers; i++)
2461   {
2462     GNUNET_CONTAINER_multipeermap_put (peer_map,
2463                                        &peer_array[i],
2464                                        NULL,
2465                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2466     if (msub->peer_map == peer_map)
2467     {
2468       GNUNET_STATISTICS_set (stats,
2469                             "# known peers",
2470                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2471                             GNUNET_NO);
2472     }
2473   }
2474 }
2475
2476
2477 /**
2478  * Send a PULL REPLY to @a peer_id
2479  *
2480  * @param peer_ctx Context of the peer to send the reply to
2481  * @param peer_ids the peers to send to @a peer_id
2482  * @param num_peer_ids the number of peers to send to @a peer_id
2483  */
2484 static void
2485 send_pull_reply (struct PeerContext *peer_ctx,
2486                  const struct GNUNET_PeerIdentity *peer_ids,
2487                  unsigned int num_peer_ids)
2488 {
2489   uint32_t send_size;
2490   struct GNUNET_MQ_Envelope *ev;
2491   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2492
2493   /* Compute actual size */
2494   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2495               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2496
2497   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2498     /* Compute number of peers to send
2499      * If too long, simply truncate */
2500     // TODO select random ones via permutation
2501     //      or even better: do good protocol design
2502     send_size =
2503       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2504        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2505        sizeof (struct GNUNET_PeerIdentity);
2506   else
2507     send_size = num_peer_ids;
2508
2509   LOG (GNUNET_ERROR_TYPE_DEBUG,
2510       "Going to send PULL REPLY with %u peers to %s\n",
2511       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2512
2513   ev = GNUNET_MQ_msg_extra (out_msg,
2514                             send_size * sizeof (struct GNUNET_PeerIdentity),
2515                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2516   out_msg->num_peers = htonl (send_size);
2517   GNUNET_memcpy (&out_msg[1], peer_ids,
2518          send_size * sizeof (struct GNUNET_PeerIdentity));
2519
2520   send_message (peer_ctx, ev, "PULL REPLY");
2521   if (peer_ctx->sub == msub)
2522   {
2523     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2524   }
2525   // TODO check with send intention: as send_channel is used/opened we indicate
2526   // a sending intention without intending it.
2527   // -> clean peer afterwards?
2528   // -> use recv_channel?
2529 }
2530
2531
2532 /**
2533  * Insert PeerID in #pull_map
2534  *
2535  * Called once we know a peer is online.
2536  *
2537  * @param cls Closure - Sub with the pull map to insert into
2538  * @param peer Peer to insert
2539  */
2540 static void
2541 insert_in_pull_map (void *cls,
2542                     const struct GNUNET_PeerIdentity *peer)
2543 {
2544   struct Sub *sub = cls;
2545
2546   CustomPeerMap_put (sub->pull_map, peer);
2547 }
2548
2549
2550 /**
2551  * Insert PeerID in #view
2552  *
2553  * Called once we know a peer is online.
2554  * Implements #PeerOp
2555  *
2556  * @param cls Closure - Sub with view to insert peer into
2557  * @param peer the peer to insert
2558  */
2559 static void
2560 insert_in_view_op (void *cls,
2561                    const struct GNUNET_PeerIdentity *peer)
2562 {
2563   struct Sub *sub = cls;
2564   int inserted;
2565
2566   inserted = insert_in_view (sub, peer);
2567   if (GNUNET_OK == inserted)
2568   {
2569     clients_notify_stream_peer (sub, 1, peer);
2570   }
2571 }
2572
2573
2574 /**
2575  * Update sampler with given PeerID.
2576  * Implements #PeerOp
2577  *
2578  * @param cls Closure - Sub containing the sampler to insert into
2579  * @param peer Peer to insert
2580  */
2581 static void
2582 insert_in_sampler (void *cls,
2583                    const struct GNUNET_PeerIdentity *peer)
2584 {
2585   struct Sub *sub = cls;
2586
2587   LOG (GNUNET_ERROR_TYPE_DEBUG,
2588        "Updating samplers with peer %s from insert_in_sampler()\n",
2589        GNUNET_i2s (peer));
2590   RPS_sampler_update (sub->sampler, peer);
2591   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2592   {
2593     /* Make sure we 'know' about this peer */
2594     (void) issue_peer_online_check (sub, peer);
2595     /* Establish a channel towards that peer to indicate we are going to send
2596      * messages to it */
2597     //indicate_sending_intention (peer);
2598   }
2599 #ifdef TO_FILE
2600   sub->num_observed_peers++;
2601   GNUNET_CONTAINER_multipeermap_put
2602     (sub->observed_unique_peers,
2603      peer,
2604      NULL,
2605      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2606   uint32_t num_observed_unique_peers =
2607     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2608   to_file (sub->file_name_observed_log,
2609           "%" PRIu32 " %" PRIu32 " %f\n",
2610           sub->num_observed_peers,
2611           num_observed_unique_peers,
2612           1.0*num_observed_unique_peers/sub->num_observed_peers)
2613 #endif /* TO_FILE */
2614 }
2615
2616
2617 /**
2618  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2619  *        If the peer is not known, online check is issued and it is
2620  *        scheduled to be inserted in sampler and view.
2621  *
2622  * "External sources" refer to every source except the gossip.
2623  *
2624  * @param sub Sub for which @a peer was received
2625  * @param peer peer to insert/peer received
2626  */
2627 static void
2628 got_peer (struct Sub *sub,
2629           const struct GNUNET_PeerIdentity *peer)
2630 {
2631   /* If we did not know this peer already, insert it into sampler and view */
2632   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2633   {
2634     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2635                         &insert_in_sampler, sub);
2636     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2637                         &insert_in_view_op, sub);
2638   }
2639   if (sub == msub)
2640   {
2641     GNUNET_STATISTICS_update (stats,
2642                               "# learnd peers",
2643                               1,
2644                               GNUNET_NO);
2645   }
2646 }
2647
2648
2649 /**
2650  * @brief Checks if there is a sending channel and if it is needed
2651  *
2652  * @param peer_ctx Context of the peer to check
2653  * @return GNUNET_YES if sending channel exists and is still needed
2654  *         GNUNET_NO  otherwise
2655  */
2656 static int
2657 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2658 {
2659   /* struct GNUNET_CADET_Channel *channel; */
2660   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2661                                      &peer_ctx->peer_id))
2662   {
2663     return GNUNET_NO;
2664   }
2665   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2666   {
2667     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2668                                     &peer_ctx->peer_id)) ||
2669          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2670                                             &peer_ctx->peer_id)) ||
2671          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2672                                                      &peer_ctx->peer_id)) ||
2673          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2674                                                      &peer_ctx->peer_id)) ||
2675          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2676                                          &peer_ctx->peer_id,
2677                                          Peers_PULL_REPLY_PENDING)))
2678     { /* If we want to keep the connection to peer open */
2679       return GNUNET_YES;
2680     }
2681     return GNUNET_NO;
2682   }
2683   return GNUNET_NO;
2684 }
2685
2686
2687 /**
2688  * @brief remove peer from our knowledge, the view, push and pull maps and
2689  * samplers.
2690  *
2691  * @param sub Sub with the data structures the peer is to be removed from
2692  * @param peer the peer to remove
2693  */
2694 static void
2695 remove_peer (struct Sub *sub,
2696              const struct GNUNET_PeerIdentity *peer)
2697 {
2698   (void) View_remove_peer (sub->view,
2699                            peer);
2700   CustomPeerMap_remove_peer (sub->pull_map,
2701                              peer);
2702   CustomPeerMap_remove_peer (sub->push_map,
2703                              peer);
2704   RPS_sampler_reinitialise_by_value (sub->sampler,
2705                                      peer);
2706   /* We want to destroy the peer now.
2707    * Sometimes, it just seems that it's already been removed from the peer_map,
2708    * so check the peer_map first. */
2709   if (GNUNET_YES == check_peer_known (sub->peer_map,
2710                                       peer))
2711   {
2712     destroy_peer (get_peer_ctx (sub->peer_map,
2713                                 peer));
2714   }
2715 }
2716
2717
2718 /**
2719  * @brief Remove data that is not needed anymore.
2720  *
2721  * If the sending channel is no longer needed it is destroyed.
2722  *
2723  * @param sub Sub in which the current peer is to be cleaned
2724  * @param peer the peer whose data is about to be cleaned
2725  */
2726 static void
2727 clean_peer (struct Sub *sub,
2728             const struct GNUNET_PeerIdentity *peer)
2729 {
2730   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2731                                                                peer)))
2732   {
2733     LOG (GNUNET_ERROR_TYPE_DEBUG,
2734         "Going to remove send channel to peer %s\n",
2735         GNUNET_i2s (peer));
2736     #if ENABLE_MALICIOUS
2737     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer,
2738                                               peer))
2739       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2740                                                     peer));
2741     #else /* ENABLE_MALICIOUS */
2742     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2743                                                   peer));
2744     #endif /* ENABLE_MALICIOUS */
2745   }
2746
2747   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2748                                                            peer))
2749   {
2750     /* Peer was already removed by callback on destroyed channel */
2751     LOG (GNUNET_ERROR_TYPE_WARNING,
2752         "Peer was removed from our knowledge during cleanup\n");
2753     return;
2754   }
2755
2756   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2757                                                               peer))) &&
2758        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2759        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2760        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2761        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2762        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2763   { /* We can safely remove this peer */
2764     LOG (GNUNET_ERROR_TYPE_DEBUG,
2765         "Going to remove peer %s\n",
2766         GNUNET_i2s (peer));
2767     remove_peer (sub, peer);
2768     return;
2769   }
2770 }
2771
2772
2773 /**
2774  * @brief This is called when a channel is destroyed.
2775  *
2776  * Removes peer completely from our knowledge if the send_channel was destroyed
2777  * Otherwise simply delete the recv_channel
2778  * Also check if the knowledge about this peer is still needed.
2779  * If not, remove this peer from our knowledge.
2780  *
2781  * @param cls The closure - Context to the channel
2782  * @param channel The channel being closed
2783  */
2784 static void
2785 cleanup_destroyed_channel (void *cls,
2786                            const struct GNUNET_CADET_Channel *channel)
2787 {
2788   struct ChannelCtx *channel_ctx = cls;
2789   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2790   (void) channel;
2791
2792   channel_ctx->channel = NULL;
2793   remove_channel_ctx (channel_ctx);
2794   if (NULL != peer_ctx &&
2795       peer_ctx->send_channel_ctx == channel_ctx &&
2796       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2797   {
2798     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2799   }
2800 }
2801
2802 /***********************************************************************
2803  * /Util functions
2804 ***********************************************************************/
2805
2806
2807
2808 /***********************************************************************
2809  * Sub
2810 ***********************************************************************/
2811
2812 /**
2813  * @brief Create a new Sub
2814  *
2815  * @param hash Hash of value shared among rps instances on other hosts that
2816  *        defines a subgroup to sample from.
2817  * @param sampler_size Size of the sampler
2818  * @param round_interval Interval (in average) between two rounds
2819  *
2820  * @return Sub
2821  */
2822 struct Sub *
2823 new_sub (const struct GNUNET_HashCode *hash,
2824          uint32_t sampler_size,
2825          struct GNUNET_TIME_Relative round_interval)
2826 {
2827   struct Sub *sub;
2828
2829   sub = GNUNET_new (struct Sub);
2830
2831   /* With the hash generated from the secret value this service only connects
2832    * to rps instances that share the value */
2833   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2834     GNUNET_MQ_hd_fixed_size (peer_check,
2835                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2836                              struct GNUNET_MessageHeader,
2837                              NULL),
2838     GNUNET_MQ_hd_fixed_size (peer_push,
2839                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2840                              struct GNUNET_MessageHeader,
2841                              NULL),
2842     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2843                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2844                              struct GNUNET_MessageHeader,
2845                              NULL),
2846     GNUNET_MQ_hd_var_size (peer_pull_reply,
2847                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2848                            struct GNUNET_RPS_P2P_PullReplyMessage,
2849                            NULL),
2850     GNUNET_MQ_handler_end ()
2851   };
2852   sub->hash = *hash;
2853   sub->cadet_port =
2854     GNUNET_CADET_open_port (cadet_handle,
2855                             &sub->hash,
2856                             &handle_inbound_channel, /* Connect handler */
2857                             sub, /* cls */
2858                             NULL, /* WindowSize handler */
2859                             &cleanup_destroyed_channel, /* Disconnect handler */
2860                             cadet_handlers);
2861   if (NULL == sub->cadet_port)
2862   {
2863     LOG (GNUNET_ERROR_TYPE_ERROR,
2864         "Cadet port `%s' is already in use.\n",
2865         GNUNET_APPLICATION_PORT_RPS);
2866     GNUNET_assert (0);
2867   }
2868
2869   /* Set up general data structure to keep track about peers */
2870   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2871   if (GNUNET_OK !=
2872       GNUNET_CONFIGURATION_get_value_filename (cfg,
2873                                                "rps",
2874                                                "FILENAME_VALID_PEERS",
2875                                                &sub->filename_valid_peers))
2876   {
2877     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2878                                "rps",
2879                                "FILENAME_VALID_PEERS");
2880   }
2881   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2882   {
2883     char *tmp_filename_valid_peers;
2884     char str_hash[105];
2885
2886     GNUNET_snprintf (str_hash,
2887                      sizeof (str_hash),
2888                      GNUNET_h2s_full (hash));
2889     tmp_filename_valid_peers = sub->filename_valid_peers;
2890     GNUNET_asprintf (&sub->filename_valid_peers,
2891                      "%s%s",
2892                      tmp_filename_valid_peers,
2893                      str_hash);
2894     GNUNET_free (tmp_filename_valid_peers);
2895   }
2896   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2897
2898   /* Set up the sampler */
2899   sub->sampler_size_est_min = sampler_size;
2900   sub->sampler_size_est_need = sampler_size;;
2901   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2902   GNUNET_assert (0 != round_interval.rel_value_us);
2903   sub->round_interval = round_interval;
2904   sub->sampler = RPS_sampler_init (sampler_size,
2905                                   round_interval);
2906
2907   /* Logging of internals */
2908   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2909 #ifdef TO_FILE
2910   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2911                                                        "observed");
2912   sub->num_observed_peers = 0;
2913   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2914                                                                     GNUNET_NO);
2915 #endif /* TO_FILE */
2916
2917   /* Set up data structures for gossip */
2918   sub->push_map = CustomPeerMap_create (4);
2919   sub->pull_map = CustomPeerMap_create (4);
2920   sub->view_size_est_min = sampler_size;;
2921   sub->view = View_create (sub->view_size_est_min);
2922   if (sub == msub)
2923   {
2924     GNUNET_STATISTICS_set (stats,
2925                            "view size aim",
2926                            sub->view_size_est_min,
2927                            GNUNET_NO);
2928   }
2929
2930   /* Start executing rounds */
2931   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2932
2933   return sub;
2934 }
2935
2936
2937 #ifdef TO_FILE
2938 /**
2939  * @brief Write all numbers in the given array into the given file
2940  *
2941  * Single numbers devided by a newline
2942  *
2943  * @param hist_array[] the array to dump
2944  * @param file_name file to dump into
2945  */
2946 static void
2947 write_histogram_to_file (const uint32_t hist_array[],
2948                          const char *file_name)
2949 {
2950   char collect_str[SIZE_DUMP_FILE + 1] = "";
2951   char *recv_str_iter;
2952   char *file_name_full;
2953
2954   recv_str_iter = collect_str;
2955   file_name_full = store_prefix_file_name (&own_identity,
2956                                            file_name);
2957   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2958   {
2959     char collect_str_tmp[8];
2960
2961     GNUNET_snprintf (collect_str_tmp,
2962                      sizeof (collect_str_tmp),
2963                      "%" PRIu32 "\n",
2964                      hist_array[i]);
2965     recv_str_iter = stpncpy (recv_str_iter,
2966                              collect_str_tmp,
2967                              6);
2968   }
2969   (void) stpcpy (recv_str_iter,
2970                  "\n");
2971   LOG (GNUNET_ERROR_TYPE_DEBUG,
2972        "Writing push stats to disk\n");
2973   to_file_w_len (file_name_full,
2974                  SIZE_DUMP_FILE,
2975                  collect_str);
2976   GNUNET_free (file_name_full);
2977 }
2978 #endif /* TO_FILE */
2979
2980
2981 /**
2982  * @brief Destroy Sub.
2983  *
2984  * @param sub Sub to destroy
2985  */
2986 static void
2987 destroy_sub (struct Sub *sub)
2988 {
2989   GNUNET_assert (NULL != sub);
2990   GNUNET_assert (NULL != sub->do_round_task);
2991   GNUNET_SCHEDULER_cancel (sub->do_round_task);
2992   sub->do_round_task = NULL;
2993
2994   /* Disconnect from cadet */
2995   GNUNET_CADET_close_port (sub->cadet_port);
2996
2997   /* Clean up data structures for peers */
2998   RPS_sampler_destroy (sub->sampler);
2999   sub->sampler = NULL;
3000   View_destroy (sub->view);
3001   sub->view = NULL;
3002   CustomPeerMap_destroy (sub->push_map);
3003   sub->push_map = NULL;
3004   CustomPeerMap_destroy (sub->pull_map);
3005   sub->pull_map = NULL;
3006   peers_terminate (sub);
3007
3008   /* Free leftover data structures */
3009   GNUNET_free (sub->file_name_view_log);
3010   sub->file_name_view_log = NULL;
3011 #ifdef TO_FILE
3012   GNUNET_free (sub->file_name_observed_log);
3013   sub->file_name_observed_log = NULL;
3014
3015   /* Write push frequencies to disk */
3016   write_histogram_to_file (sub->push_recv,
3017                            "push_recv");
3018
3019   /* Write push deltas to disk */
3020   write_histogram_to_file (sub->push_delta,
3021                            "push_delta");
3022
3023   /* Write pull delays to disk */
3024   write_histogram_to_file (sub->pull_delays,
3025                            "pull_delays");
3026
3027   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3028   sub->observed_unique_peers = NULL;
3029 #endif /* TO_FILE */
3030
3031   GNUNET_free (sub);
3032 }
3033
3034
3035 /***********************************************************************
3036  * /Sub
3037 ***********************************************************************/
3038
3039
3040 /***********************************************************************
3041  * Core handlers
3042 ***********************************************************************/
3043
3044 /**
3045  * @brief Callback on initialisation of Core.
3046  *
3047  * @param cls - unused
3048  * @param my_identity - unused
3049  */
3050 void
3051 core_init (void *cls,
3052            const struct GNUNET_PeerIdentity *my_identity)
3053 {
3054   (void) cls;
3055   (void) my_identity;
3056
3057   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3058 }
3059
3060
3061 /**
3062  * @brief Callback for core.
3063  * Method called whenever a given peer connects.
3064  *
3065  * @param cls closure - unused
3066  * @param peer peer identity this notification is about
3067  * @return closure given to #core_disconnects as peer_cls
3068  */
3069 void *
3070 core_connects (void *cls,
3071                const struct GNUNET_PeerIdentity *peer,
3072                struct GNUNET_MQ_Handle *mq)
3073 {
3074   (void) cls;
3075   (void) mq;
3076
3077   GNUNET_assert (GNUNET_YES ==
3078                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3079                                                     peer,
3080                                                     NULL,
3081                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3082   return NULL;
3083 }
3084
3085
3086 /**
3087  * @brief Callback for core.
3088  * Method called whenever a peer disconnects.
3089  *
3090  * @param cls closure - unused
3091  * @param peer peer identity this notification is about
3092  * @param peer_cls closure given in #core_connects - unused
3093  */
3094 void
3095 core_disconnects (void *cls,
3096                   const struct GNUNET_PeerIdentity *peer,
3097                   void *peer_cls)
3098 {
3099   (void) cls;
3100   (void) peer_cls;
3101
3102   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3103 }
3104
3105 /***********************************************************************
3106  * /Core handlers
3107 ***********************************************************************/
3108
3109
3110 /**
3111  * @brief Destroy the context for a (connected) client
3112  *
3113  * @param cli_ctx Context to destroy
3114  */
3115 static void
3116 destroy_cli_ctx (struct ClientContext *cli_ctx)
3117 {
3118   GNUNET_assert (NULL != cli_ctx);
3119   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3120                                cli_ctx_tail,
3121                                cli_ctx);
3122   if (NULL != cli_ctx->sub)
3123   {
3124     destroy_sub (cli_ctx->sub);
3125     cli_ctx->sub = NULL;
3126   }
3127   GNUNET_free (cli_ctx);
3128 }
3129
3130
3131 /**
3132  * @brief Update sizes in sampler and view on estimate update from nse service
3133  *
3134  * @param sub Sub
3135  * @param logestimate the log(Base 2) value of the current network size estimate
3136  * @param std_dev standard deviation for the estimate
3137  */
3138 static void
3139 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3140 {
3141   double estimate;
3142   //double scale; // TODO this might go gloabal/config
3143
3144   LOG (GNUNET_ERROR_TYPE_DEBUG,
3145        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3146        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3147   //scale = .01;
3148   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3149   // GNUNET_NSE_log_estimate_to_n (logestimate);
3150   estimate = pow (estimate, 1.0 / 3);
3151   // TODO add if std_dev is a number
3152   // estimate += (std_dev * scale);
3153   if (sub->view_size_est_min < ceil (estimate))
3154   {
3155     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3156     sub->sampler_size_est_need = estimate;
3157     sub->view_size_est_need = estimate;
3158   } else
3159   {
3160     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3161     //sub->sampler_size_est_need = sub->view_size_est_min;
3162     sub->view_size_est_need = sub->view_size_est_min;
3163   }
3164   if (sub == msub)
3165   {
3166     GNUNET_STATISTICS_set (stats,
3167                            "view size aim",
3168                            sub->view_size_est_need,
3169                            GNUNET_NO);
3170   }
3171
3172   /* If the NSE has changed adapt the lists accordingly */
3173   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3174   View_change_len (sub->view, sub->view_size_est_need);
3175 }
3176
3177
3178 /**
3179  * Function called by NSE.
3180  *
3181  * Updates sizes of sampler list and view and adapt those lists
3182  * accordingly.
3183  *
3184  * implements #GNUNET_NSE_Callback
3185  *
3186  * @param cls Closure - unused
3187  * @param timestamp time when the estimate was received from the server (or created by the server)
3188  * @param logestimate the log(Base 2) value of the current network size estimate
3189  * @param std_dev standard deviation for the estimate
3190  */
3191 static void
3192 nse_callback (void *cls,
3193               struct GNUNET_TIME_Absolute timestamp,
3194               double logestimate, double std_dev)
3195 {
3196   (void) cls;
3197   (void) timestamp;
3198   struct ClientContext *cli_ctx_iter;
3199
3200   adapt_sizes (msub, logestimate, std_dev);
3201   for (cli_ctx_iter = cli_ctx_head;
3202       NULL != cli_ctx_iter;
3203       cli_ctx_iter = cli_ctx_iter->next)
3204   {
3205     if (NULL != cli_ctx_iter->sub)
3206     {
3207       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3208     }
3209   }
3210 }
3211
3212
3213 /**
3214  * @brief This function is called, when the client seeds peers.
3215  * It verifies that @a msg is well-formed.
3216  *
3217  * @param cls the closure (#ClientContext)
3218  * @param msg the message
3219  * @return #GNUNET_OK if @a msg is well-formed
3220  *         #GNUNET_SYSERR otherwise
3221  */
3222 static int
3223 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3224 {
3225   struct ClientContext *cli_ctx = cls;
3226   uint16_t msize = ntohs (msg->header.size);
3227   uint32_t num_peers = ntohl (msg->num_peers);
3228
3229   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3230   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3231        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3232   {
3233     LOG (GNUNET_ERROR_TYPE_ERROR,
3234         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3235         ntohl (msg->num_peers),
3236         (msize / sizeof (struct GNUNET_PeerIdentity)));
3237     GNUNET_break (0);
3238     GNUNET_SERVICE_client_drop (cli_ctx->client);
3239     return GNUNET_SYSERR;
3240   }
3241   return GNUNET_OK;
3242 }
3243
3244
3245 /**
3246  * Handle seed from the client.
3247  *
3248  * @param cls closure
3249  * @param message the actual message
3250  */
3251 static void
3252 handle_client_seed (void *cls,
3253                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3254 {
3255   struct ClientContext *cli_ctx = cls;
3256   struct GNUNET_PeerIdentity *peers;
3257   uint32_t num_peers;
3258   uint32_t i;
3259
3260   num_peers = ntohl (msg->num_peers);
3261   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3262
3263   LOG (GNUNET_ERROR_TYPE_DEBUG,
3264        "Client seeded peers:\n");
3265   print_peer_list (peers, num_peers);
3266
3267   for (i = 0; i < num_peers; i++)
3268   {
3269     LOG (GNUNET_ERROR_TYPE_DEBUG,
3270          "Updating samplers with seed %" PRIu32 ": %s\n",
3271          i,
3272          GNUNET_i2s (&peers[i]));
3273
3274     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3275     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3276   }
3277   GNUNET_SERVICE_client_continue (cli_ctx->client);
3278 }
3279
3280
3281 /**
3282  * Handle RPS request from the client.
3283  *
3284  * @param cls Client context
3285  * @param message Message containing the numer of updates the client wants to
3286  * receive
3287  */
3288 static void
3289 handle_client_view_request (void *cls,
3290                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3291 {
3292   struct ClientContext *cli_ctx = cls;
3293   uint64_t num_updates;
3294
3295   num_updates = ntohl (msg->num_updates);
3296
3297   LOG (GNUNET_ERROR_TYPE_DEBUG,
3298        "Client requested %" PRIu64 " updates of view.\n",
3299        num_updates);
3300
3301   GNUNET_assert (NULL != cli_ctx);
3302   cli_ctx->view_updates_left = num_updates;
3303   send_view (cli_ctx, NULL, 0);
3304   GNUNET_SERVICE_client_continue (cli_ctx->client);
3305 }
3306
3307
3308 /**
3309  * @brief Handle the cancellation of the view updates.
3310  *
3311  * @param cls The client context
3312  * @param msg Unused
3313  */
3314 static void
3315 handle_client_view_cancel (void *cls,
3316                            const struct GNUNET_MessageHeader *msg)
3317 {
3318   struct ClientContext *cli_ctx = cls;
3319   (void) msg;
3320
3321   LOG (GNUNET_ERROR_TYPE_DEBUG,
3322        "Client does not want to receive updates of view any more.\n");
3323
3324   GNUNET_assert (NULL != cli_ctx);
3325   cli_ctx->view_updates_left = 0;
3326   GNUNET_SERVICE_client_continue (cli_ctx->client);
3327   if (GNUNET_YES == cli_ctx->stream_update)
3328   {
3329     destroy_cli_ctx (cli_ctx);
3330   }
3331 }
3332
3333
3334 /**
3335  * Handle RPS request for biased stream from the client.
3336  *
3337  * @param cls Client context
3338  * @param message unused
3339  */
3340 static void
3341 handle_client_stream_request (void *cls,
3342                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3343 {
3344   struct ClientContext *cli_ctx = cls;
3345   (void) msg;
3346
3347   LOG (GNUNET_ERROR_TYPE_DEBUG,
3348        "Client requested peers from biased stream.\n");
3349   cli_ctx->stream_update = GNUNET_YES;
3350
3351   GNUNET_assert (NULL != cli_ctx);
3352   GNUNET_SERVICE_client_continue (cli_ctx->client);
3353 }
3354
3355
3356 /**
3357  * @brief Handles the cancellation of the stream of biased peer ids
3358  *
3359  * @param cls The client context
3360  * @param msg unused
3361  */
3362 static void
3363 handle_client_stream_cancel (void *cls,
3364                              const struct GNUNET_MessageHeader *msg)
3365 {
3366   struct ClientContext *cli_ctx = cls;
3367   (void) msg;
3368
3369   LOG (GNUNET_ERROR_TYPE_DEBUG,
3370        "Client canceled receiving peers from biased stream.\n");
3371   cli_ctx->stream_update = GNUNET_NO;
3372
3373   GNUNET_assert (NULL != cli_ctx);
3374   GNUNET_SERVICE_client_continue (cli_ctx->client);
3375 }
3376
3377
3378 /**
3379  * @brief Create and start a Sub.
3380  *
3381  * @param cls Closure - unused
3382  * @param msg Message containing the necessary information
3383  */
3384 static void
3385 handle_client_start_sub (void *cls,
3386                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3387 {
3388   struct ClientContext *cli_ctx = cls;
3389
3390   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3391   if (NULL != cli_ctx->sub &&
3392       0 != memcmp (&cli_ctx->sub->hash,
3393                    &msg->hash,
3394                    sizeof (struct GNUNET_HashCode)))
3395   {
3396     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3397     destroy_sub (cli_ctx->sub);
3398     cli_ctx->sub = NULL;
3399   }
3400   cli_ctx->sub = new_sub (&msg->hash,
3401                          msub->sampler_size_est_min, // TODO make api input?
3402                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3403   GNUNET_SERVICE_client_continue (cli_ctx->client);
3404 }
3405
3406
3407 /**
3408  * @brief Destroy the Sub
3409  *
3410  * @param cls Closure - unused
3411  * @param msg Message containing the hash that identifies the Sub
3412  */
3413 static void
3414 handle_client_stop_sub (void *cls,
3415                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3416 {
3417   struct ClientContext *cli_ctx = cls;
3418
3419   GNUNET_assert (NULL != cli_ctx->sub);
3420   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3421   {
3422     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3423   }
3424   destroy_sub (cli_ctx->sub);
3425   cli_ctx->sub = NULL;
3426   GNUNET_SERVICE_client_continue (cli_ctx->client);
3427 }
3428
3429
3430 /**
3431  * Handle a CHECK_LIVE message from another peer.
3432  *
3433  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3434  * the channel is blocked for all other communication.
3435  *
3436  * @param cls Closure - Context of channel
3437  * @param msg Message - unused
3438  */
3439 static void
3440 handle_peer_check (void *cls,
3441                    const struct GNUNET_MessageHeader *msg)
3442 {
3443   const struct ChannelCtx *channel_ctx = cls;
3444   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3445   (void) msg;
3446
3447   LOG (GNUNET_ERROR_TYPE_DEBUG,
3448       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3449   if (channel_ctx->peer_ctx->sub == msub)
3450   {
3451     GNUNET_STATISTICS_update (stats,
3452                               "# pending online checks",
3453                               -1,
3454                               GNUNET_NO);
3455   }
3456
3457   GNUNET_CADET_receive_done (channel_ctx->channel);
3458 }
3459
3460
3461 /**
3462  * Handle a PUSH message from another peer.
3463  *
3464  * Check the proof of work and store the PeerID
3465  * in the temporary list for pushed PeerIDs.
3466  *
3467  * @param cls Closure - Context of channel
3468  * @param msg Message - unused
3469  */
3470 static void
3471 handle_peer_push (void *cls,
3472                   const struct GNUNET_MessageHeader *msg)
3473 {
3474   const struct ChannelCtx *channel_ctx = cls;
3475   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3476   (void) msg;
3477
3478   // (check the proof of work (?))
3479
3480   LOG (GNUNET_ERROR_TYPE_DEBUG,
3481        "Received PUSH (%s)\n",
3482        GNUNET_i2s (peer));
3483   if (channel_ctx->peer_ctx->sub == msub)
3484   {
3485     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3486   }
3487
3488   #if ENABLE_MALICIOUS
3489   struct AttackedPeer *tmp_att_peer;
3490
3491   if ( (1 == mal_type) ||
3492        (3 == mal_type) )
3493   { /* Try to maximise representation */
3494     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3495     tmp_att_peer->peer_id = *peer;
3496     if (NULL == att_peer_set)
3497       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3498     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3499                                                              peer))
3500     {
3501       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3502                                    att_peers_tail,
3503                                    tmp_att_peer);
3504       add_peer_array_to_set (peer, 1, att_peer_set);
3505     }
3506     else
3507     {
3508       GNUNET_free (tmp_att_peer);
3509     }
3510   }
3511
3512
3513   else if (2 == mal_type)
3514   {
3515     /* We attack one single well-known peer - simply ignore */
3516   }
3517   #endif /* ENABLE_MALICIOUS */
3518
3519   /* Add the sending peer to the push_map */
3520   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3521
3522   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3523                                      &channel_ctx->peer_ctx->peer_id));
3524   GNUNET_CADET_receive_done (channel_ctx->channel);
3525 }
3526
3527
3528 /**
3529  * Handle PULL REQUEST request message from another peer.
3530  *
3531  * Reply with the view of PeerIDs.
3532  *
3533  * @param cls Closure - Context of channel
3534  * @param msg Message - unused
3535  */
3536 static void
3537 handle_peer_pull_request (void *cls,
3538                           const struct GNUNET_MessageHeader *msg)
3539 {
3540   const struct ChannelCtx *channel_ctx = cls;
3541   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3542   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3543   const struct GNUNET_PeerIdentity *view_array;
3544   (void) msg;
3545
3546   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3547   if (peer_ctx->sub == msub)
3548   {
3549     GNUNET_STATISTICS_update(stats,
3550                              "# pull request message received",
3551                              1,
3552                              GNUNET_NO);
3553     if (NULL != map_single_hop &&
3554         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3555                                                              &peer_ctx->peer_id))
3556     {
3557       GNUNET_STATISTICS_update (stats,
3558                                 "# pull request message received (multi-hop peer)",
3559                                 1,
3560                                 GNUNET_NO);
3561     }
3562   }
3563
3564   #if ENABLE_MALICIOUS
3565   if (1 == mal_type
3566       || 3 == mal_type)
3567   { /* Try to maximise representation */
3568     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3569   }
3570
3571   else if (2 == mal_type)
3572   { /* Try to partition network */
3573     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3574     {
3575       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3576     }
3577   }
3578   #endif /* ENABLE_MALICIOUS */
3579
3580   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3581                                      &channel_ctx->peer_ctx->peer_id));
3582   GNUNET_CADET_receive_done (channel_ctx->channel);
3583   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3584   send_pull_reply (peer_ctx,
3585                    view_array,
3586                    View_size (channel_ctx->peer_ctx->sub->view));
3587 }
3588
3589
3590 /**
3591  * Check whether we sent a corresponding request and
3592  * whether this reply is the first one.
3593  *
3594  * @param cls Closure - Context of channel
3595  * @param msg Message containing the replied peers
3596  */
3597 static int
3598 check_peer_pull_reply (void *cls,
3599                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3600 {
3601   struct ChannelCtx *channel_ctx = cls;
3602   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3603
3604   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3605   {
3606     GNUNET_break_op (0);
3607     return GNUNET_SYSERR;
3608   }
3609
3610   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3611       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3612   {
3613     LOG (GNUNET_ERROR_TYPE_ERROR,
3614         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3615         ntohl (msg->num_peers),
3616         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3617             sizeof (struct GNUNET_PeerIdentity));
3618     GNUNET_break_op (0);
3619     return GNUNET_SYSERR;
3620   }
3621
3622   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3623                                      &sender_ctx->peer_id,
3624                                      Peers_PULL_REPLY_PENDING))
3625   {
3626     LOG (GNUNET_ERROR_TYPE_WARNING,
3627         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3628         GNUNET_i2s (&sender_ctx->peer_id));
3629     if (sender_ctx->sub == msub)
3630     {
3631       GNUNET_STATISTICS_update (stats,
3632                                 "# unrequested pull replies",
3633                                 1,
3634                                 GNUNET_NO);
3635     }
3636   }
3637   return GNUNET_OK;
3638 }
3639
3640
3641 /**
3642  * Handle PULL REPLY message from another peer.
3643  *
3644  * @param cls Closure
3645  * @param msg The message header
3646  */
3647 static void
3648 handle_peer_pull_reply (void *cls,
3649                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3650 {
3651   const struct ChannelCtx *channel_ctx = cls;
3652   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3653   const struct GNUNET_PeerIdentity *peers;
3654   struct Sub *sub = channel_ctx->peer_ctx->sub;
3655   uint32_t i;
3656 #if ENABLE_MALICIOUS
3657   struct AttackedPeer *tmp_att_peer;
3658 #endif /* ENABLE_MALICIOUS */
3659
3660   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3661   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3662   if (channel_ctx->peer_ctx->sub == msub)
3663   {
3664     GNUNET_STATISTICS_update (stats,
3665                               "# pull reply messages received",
3666                               1,
3667                               GNUNET_NO);
3668     if (NULL != map_single_hop &&
3669         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3670           &channel_ctx->peer_ctx->peer_id))
3671     {
3672       GNUNET_STATISTICS_update (stats,
3673                                 "# pull reply messages received (multi-hop peer)",
3674                                 1,
3675                                 GNUNET_NO);
3676     }
3677   }
3678
3679   #if ENABLE_MALICIOUS
3680   // We shouldn't even receive pull replies as we're not sending
3681   if (2 == mal_type)
3682   {
3683   }
3684   #endif /* ENABLE_MALICIOUS */
3685
3686   /* Do actual logic */
3687   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3688
3689   LOG (GNUNET_ERROR_TYPE_DEBUG,
3690        "PULL REPLY received, got following %u peers:\n",
3691        ntohl (msg->num_peers));
3692
3693   for (i = 0; i < ntohl (msg->num_peers); i++)
3694   {
3695     LOG (GNUNET_ERROR_TYPE_DEBUG,
3696          "%u. %s\n",
3697          i,
3698          GNUNET_i2s (&peers[i]));
3699
3700     #if ENABLE_MALICIOUS
3701     if ((NULL != att_peer_set) &&
3702         (1 == mal_type || 3 == mal_type))
3703     { /* Add attacked peer to local list */
3704       // TODO check if we sent a request and this was the first reply
3705       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3706                                                                &peers[i])
3707           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3708                                                                   &peers[i]))
3709       {
3710         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3711         tmp_att_peer->peer_id = peers[i];
3712         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3713                                      att_peers_tail,
3714                                      tmp_att_peer);
3715         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3716       }
3717       continue;
3718     }
3719     #endif /* ENABLE_MALICIOUS */
3720     /* Make sure we 'know' about this peer */
3721     (void) insert_peer (channel_ctx->peer_ctx->sub,
3722                         &peers[i]);
3723
3724     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3725                                         &peers[i]))
3726     {
3727       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3728                          &peers[i]);
3729     }
3730     else
3731     {
3732       schedule_operation (channel_ctx->peer_ctx,
3733                           insert_in_pull_map,
3734                           channel_ctx->peer_ctx->sub); /* cls */
3735       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3736                                       &peers[i]);
3737     }
3738   }
3739
3740   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3741                                  sender),
3742                    Peers_PULL_REPLY_PENDING);
3743   clean_peer (channel_ctx->peer_ctx->sub,
3744               sender);
3745
3746   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3747                                      sender));
3748   GNUNET_CADET_receive_done (channel_ctx->channel);
3749 }
3750
3751
3752 /**
3753  * Compute a random delay.
3754  * A uniformly distributed value between mean + spread and mean - spread.
3755  *
3756  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3757  * It would return a random value between 2 and 6 min.
3758  *
3759  * @param mean the mean time until the next round
3760  * @param spread the inverse amount of deviation from the mean
3761  */
3762 static struct GNUNET_TIME_Relative
3763 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3764                     unsigned int spread)
3765 {
3766   struct GNUNET_TIME_Relative half_interval;
3767   struct GNUNET_TIME_Relative ret;
3768   unsigned int rand_delay;
3769   unsigned int max_rand_delay;
3770
3771   if (0 == spread)
3772   {
3773     LOG (GNUNET_ERROR_TYPE_WARNING,
3774          "Not accepting spread of 0\n");
3775     GNUNET_break (0);
3776     GNUNET_assert (0);
3777   }
3778   GNUNET_assert (0 != mean.rel_value_us);
3779
3780   /* Compute random time value between spread * mean and spread * mean */
3781   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3782
3783   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3784   /**
3785    * Compute random value between (0 and 1) * round_interval
3786    * via multiplying round_interval with a 'fraction' (0 to value)/value
3787    */
3788   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3789   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3790   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3791   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3792
3793   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3794     LOG (GNUNET_ERROR_TYPE_WARNING,
3795          "Returning FOREVER_REL\n");
3796
3797   return ret;
3798 }
3799
3800
3801 /**
3802  * Send single pull request
3803  *
3804  * @param peer_ctx Context to the peer to send request to
3805  */
3806 static void
3807 send_pull_request (struct PeerContext *peer_ctx)
3808 {
3809   struct GNUNET_MQ_Envelope *ev;
3810
3811   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3812                                                &peer_ctx->peer_id,
3813                                                Peers_PULL_REPLY_PENDING));
3814   SET_PEER_FLAG (peer_ctx,
3815                  Peers_PULL_REPLY_PENDING);
3816   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3817
3818   LOG (GNUNET_ERROR_TYPE_DEBUG,
3819        "Going to send PULL REQUEST to peer %s.\n",
3820        GNUNET_i2s (&peer_ctx->peer_id));
3821
3822   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3823   send_message (peer_ctx,
3824                 ev,
3825                 "PULL REQUEST");
3826   if (peer_ctx->sub)
3827   {
3828     GNUNET_STATISTICS_update (stats,
3829                               "# pull request send issued",
3830                               1,
3831                               GNUNET_NO);
3832     if (NULL != map_single_hop &&
3833         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3834                                                              &peer_ctx->peer_id))
3835     {
3836       GNUNET_STATISTICS_update (stats,
3837                                 "# pull request send issued (multi-hop peer)",
3838                                 1,
3839                                 GNUNET_NO);
3840     }
3841   }
3842 }
3843
3844
3845 /**
3846  * Send single push
3847  *
3848  * @param peer_ctx Context of peer to send push to
3849  */
3850 static void
3851 send_push (struct PeerContext *peer_ctx)
3852 {
3853   struct GNUNET_MQ_Envelope *ev;
3854
3855   LOG (GNUNET_ERROR_TYPE_DEBUG,
3856        "Going to send PUSH to peer %s.\n",
3857        GNUNET_i2s (&peer_ctx->peer_id));
3858
3859   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3860   send_message (peer_ctx, ev, "PUSH");
3861   if (peer_ctx->sub)
3862   {
3863     GNUNET_STATISTICS_update (stats,
3864                               "# push send issued",
3865                               1,
3866                               GNUNET_NO);
3867   }
3868 }
3869
3870
3871 #if ENABLE_MALICIOUS
3872
3873
3874 /**
3875  * @brief This function is called, when the client tells us to act malicious.
3876  * It verifies that @a msg is well-formed.
3877  *
3878  * @param cls the closure (#ClientContext)
3879  * @param msg the message
3880  * @return #GNUNET_OK if @a msg is well-formed
3881  */
3882 static int
3883 check_client_act_malicious (void *cls,
3884                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3885 {
3886   struct ClientContext *cli_ctx = cls;
3887   uint16_t msize = ntohs (msg->header.size);
3888   uint32_t num_peers = ntohl (msg->num_peers);
3889
3890   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3891   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3892        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3893   {
3894     LOG (GNUNET_ERROR_TYPE_ERROR,
3895         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3896         ntohl (msg->num_peers),
3897         (msize / sizeof (struct GNUNET_PeerIdentity)));
3898     GNUNET_break (0);
3899     GNUNET_SERVICE_client_drop (cli_ctx->client);
3900     return GNUNET_SYSERR;
3901   }
3902   return GNUNET_OK;
3903 }
3904
3905 /**
3906  * Turn RPS service to act malicious.
3907  *
3908  * @param cls Closure
3909  * @param client The client that sent the message
3910  * @param msg The message header
3911  */
3912 static void
3913 handle_client_act_malicious (void *cls,
3914                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3915 {
3916   struct ClientContext *cli_ctx = cls;
3917   struct GNUNET_PeerIdentity *peers;
3918   uint32_t num_mal_peers_sent;
3919   uint32_t num_mal_peers_old;
3920   struct Sub *sub = cli_ctx->sub;
3921
3922   if (NULL == sub) sub = msub;
3923   /* Do actual logic */
3924   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3925   mal_type = ntohl (msg->type);
3926   if (NULL == mal_peer_set)
3927     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3928
3929   LOG (GNUNET_ERROR_TYPE_DEBUG,
3930        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3931        mal_type,
3932        ntohl (msg->num_peers));
3933
3934   if (1 == mal_type)
3935   { /* Try to maximise representation */
3936     /* Add other malicious peers to those we already know */
3937
3938     num_mal_peers_sent = ntohl (msg->num_peers);
3939     num_mal_peers_old = num_mal_peers;
3940     GNUNET_array_grow (mal_peers,
3941                        num_mal_peers,
3942                        num_mal_peers + num_mal_peers_sent);
3943     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3944             peers,
3945             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3946
3947     /* Add all mal peers to mal_peer_set */
3948     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3949                            num_mal_peers_sent,
3950                            mal_peer_set);
3951
3952     /* Substitute do_round () with do_mal_round () */
3953     GNUNET_assert (NULL != sub->do_round_task);
3954     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3955     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3956   }
3957
3958   else if ( (2 == mal_type) ||
3959             (3 == mal_type) )
3960   { /* Try to partition the network */
3961     /* Add other malicious peers to those we already know */
3962
3963     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3964     num_mal_peers_old = num_mal_peers;
3965     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3966     GNUNET_array_grow (mal_peers,
3967                        num_mal_peers,
3968                        num_mal_peers + num_mal_peers_sent);
3969     if (NULL != mal_peers &&
3970         0 != num_mal_peers)
3971     {
3972       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3973               peers,
3974               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3975
3976       /* Add all mal peers to mal_peer_set */
3977       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3978                              num_mal_peers_sent,
3979                              mal_peer_set);
3980     }
3981
3982     /* Store the one attacked peer */
3983     GNUNET_memcpy (&attacked_peer,
3984             &msg->attacked_peer,
3985             sizeof (struct GNUNET_PeerIdentity));
3986     /* Set the flag of the attacked peer to valid to avoid problems */
3987     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3988     {
3989       (void) issue_peer_online_check (sub, &attacked_peer);
3990     }
3991
3992     LOG (GNUNET_ERROR_TYPE_DEBUG,
3993          "Attacked peer is %s\n",
3994          GNUNET_i2s (&attacked_peer));
3995
3996     /* Substitute do_round () with do_mal_round () */
3997     if (NULL != sub->do_round_task)
3998     {
3999       /* Probably in shutdown */
4000       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4001       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4002     }
4003   }
4004   else if (0 == mal_type)
4005   { /* Stop acting malicious */
4006     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4007
4008     /* Substitute do_mal_round () with do_round () */
4009     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4010     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4011   }
4012   else
4013   {
4014     GNUNET_break (0);
4015     GNUNET_SERVICE_client_continue (cli_ctx->client);
4016   }
4017   GNUNET_SERVICE_client_continue (cli_ctx->client);
4018 }
4019
4020
4021 /**
4022  * Send out PUSHes and PULLs maliciously.
4023  *
4024  * This is executed regylary.
4025  *
4026  * @param cls Closure - Sub
4027  */
4028 static void
4029 do_mal_round (void *cls)
4030 {
4031   uint32_t num_pushes;
4032   uint32_t i;
4033   struct GNUNET_TIME_Relative time_next_round;
4034   struct AttackedPeer *tmp_att_peer;
4035   struct Sub *sub = cls;
4036
4037   LOG (GNUNET_ERROR_TYPE_DEBUG,
4038        "Going to execute next round maliciously type %" PRIu32 ".\n",
4039       mal_type);
4040   sub->do_round_task = NULL;
4041   GNUNET_assert (mal_type <= 3);
4042   /* Do malicious actions */
4043   if (1 == mal_type)
4044   { /* Try to maximise representation */
4045
4046     /* The maximum of pushes we're going to send this round */
4047     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4048                                          num_attacked_peers),
4049                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4050
4051     LOG (GNUNET_ERROR_TYPE_DEBUG,
4052          "Going to send %" PRIu32 " pushes\n",
4053          num_pushes);
4054
4055     /* Send PUSHes to attacked peers */
4056     for (i = 0 ; i < num_pushes ; i++)
4057     {
4058       if (att_peers_tail == att_peer_index)
4059         att_peer_index = att_peers_head;
4060       else
4061         att_peer_index = att_peer_index->next;
4062
4063       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4064     }
4065
4066     /* Send PULLs to some peers to learn about additional peers to attack */
4067     tmp_att_peer = att_peer_index;
4068     for (i = 0 ; i < num_pushes * alpha ; i++)
4069     {
4070       if (att_peers_tail == tmp_att_peer)
4071         tmp_att_peer = att_peers_head;
4072       else
4073         att_peer_index = tmp_att_peer->next;
4074
4075       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4076     }
4077   }
4078
4079
4080   else if (2 == mal_type)
4081   { /**
4082      * Try to partition the network
4083      * Send as many pushes to the attacked peer as possible
4084      * That is one push per round as it will ignore more.
4085      */
4086     (void) issue_peer_online_check (sub, &attacked_peer);
4087     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4088                                        &attacked_peer,
4089                                        Peers_ONLINE))
4090       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4091   }
4092
4093
4094   if (3 == mal_type)
4095   { /* Combined attack */
4096
4097     /* Send PUSH to attacked peers */
4098     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4099     {
4100       (void) issue_peer_online_check (sub, &attacked_peer);
4101       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4102                                          &attacked_peer,
4103                                          Peers_ONLINE))
4104       {
4105         LOG (GNUNET_ERROR_TYPE_DEBUG,
4106             "Goding to send push to attacked peer (%s)\n",
4107             GNUNET_i2s (&attacked_peer));
4108         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4109       }
4110     }
4111     (void) issue_peer_online_check (sub, &attacked_peer);
4112
4113     /* The maximum of pushes we're going to send this round */
4114     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4115                                          num_attacked_peers),
4116                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4117
4118     LOG (GNUNET_ERROR_TYPE_DEBUG,
4119          "Going to send %" PRIu32 " pushes\n",
4120          num_pushes);
4121
4122     for (i = 0; i < num_pushes; i++)
4123     {
4124       if (att_peers_tail == att_peer_index)
4125         att_peer_index = att_peers_head;
4126       else
4127         att_peer_index = att_peer_index->next;
4128
4129       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4130     }
4131
4132     /* Send PULLs to some peers to learn about additional peers to attack */
4133     tmp_att_peer = att_peer_index;
4134     for (i = 0; i < num_pushes * alpha; i++)
4135     {
4136       if (att_peers_tail == tmp_att_peer)
4137         tmp_att_peer = att_peers_head;
4138       else
4139         att_peer_index = tmp_att_peer->next;
4140
4141       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4142     }
4143   }
4144
4145   /* Schedule next round */
4146   time_next_round = compute_rand_delay (sub->round_interval, 2);
4147
4148   GNUNET_assert (NULL == sub->do_round_task);
4149   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4150                                                     &do_mal_round, sub);
4151   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4152 }
4153 #endif /* ENABLE_MALICIOUS */
4154
4155
4156 /**
4157  * Send out PUSHes and PULLs, possibly update #view, samplers.
4158  *
4159  * This is executed regylary.
4160  *
4161  * @param cls Closure - Sub
4162  */
4163 static void
4164 do_round (void *cls)
4165 {
4166   unsigned int i;
4167   const struct GNUNET_PeerIdentity *view_array;
4168   unsigned int *permut;
4169   unsigned int a_peers; /* Number of peers we send pushes to */
4170   unsigned int b_peers; /* Number of peers we send pull requests to */
4171   uint32_t first_border;
4172   uint32_t second_border;
4173   struct GNUNET_PeerIdentity peer;
4174   struct GNUNET_PeerIdentity *update_peer;
4175   struct Sub *sub = cls;
4176
4177   sub->num_rounds++;
4178   LOG (GNUNET_ERROR_TYPE_DEBUG,
4179        "Going to execute next round.\n");
4180   if (sub == msub)
4181   {
4182     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4183   }
4184   sub->do_round_task = NULL;
4185   LOG (GNUNET_ERROR_TYPE_DEBUG,
4186        "Printing view:\n");
4187   to_file (sub->file_name_view_log,
4188            "___ new round ___");
4189   view_array = View_get_as_array (sub->view);
4190   for (i = 0; i < View_size (sub->view); i++)
4191   {
4192     LOG (GNUNET_ERROR_TYPE_DEBUG,
4193          "\t%s\n", GNUNET_i2s (&view_array[i]));
4194     to_file (sub->file_name_view_log,
4195              "=%s\t(do round)",
4196              GNUNET_i2s_full (&view_array[i]));
4197   }
4198
4199
4200   /* Send pushes and pull requests */
4201   if (0 < View_size (sub->view))
4202   {
4203     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4204                                            View_size (sub->view));
4205
4206     /* Send PUSHes */
4207     a_peers = ceil (alpha * View_size (sub->view));
4208
4209     LOG (GNUNET_ERROR_TYPE_DEBUG,
4210          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4211          a_peers, alpha, View_size (sub->view));
4212     for (i = 0; i < a_peers; i++)
4213     {
4214       peer = view_array[permut[i]];
4215       // FIXME if this fails schedule/loop this for later
4216       send_push (get_peer_ctx (sub->peer_map, &peer));
4217     }
4218
4219     /* Send PULL requests */
4220     b_peers = ceil (beta * View_size (sub->view));
4221     first_border = a_peers;
4222     second_border = a_peers + b_peers;
4223     if (second_border > View_size (sub->view))
4224     {
4225       first_border = View_size (sub->view) - b_peers;
4226       second_border = View_size (sub->view);
4227     }
4228     LOG (GNUNET_ERROR_TYPE_DEBUG,
4229         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4230         b_peers, beta, View_size (sub->view));
4231     for (i = first_border; i < second_border; i++)
4232     {
4233       peer = view_array[permut[i]];
4234       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4235                                          &peer,
4236                                          Peers_PULL_REPLY_PENDING))
4237       { // FIXME if this fails schedule/loop this for later
4238         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4239       }
4240     }
4241
4242     GNUNET_free (permut);
4243     permut = NULL;
4244   }
4245
4246
4247   /* Update view */
4248   /* TODO see how many peers are in push-/pull- list! */
4249
4250   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4251       (0 < CustomPeerMap_size (sub->push_map)) &&
4252       (0 < CustomPeerMap_size (sub->pull_map)))
4253   { /* If conditions for update are fulfilled, update */
4254     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4255
4256     uint32_t final_size;
4257     uint32_t peers_to_clean_size;
4258     struct GNUNET_PeerIdentity *peers_to_clean;
4259
4260     peers_to_clean = NULL;
4261     peers_to_clean_size = 0;
4262     GNUNET_array_grow (peers_to_clean,
4263                        peers_to_clean_size,
4264                        View_size (sub->view));
4265     GNUNET_memcpy (peers_to_clean,
4266             view_array,
4267             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4268
4269     /* Seems like recreating is the easiest way of emptying the peermap */
4270     View_clear (sub->view);
4271     to_file (sub->file_name_view_log,
4272              "--- emptied ---");
4273
4274     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4275                                 CustomPeerMap_size (sub->push_map));
4276     second_border = first_border +
4277                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4278                                 CustomPeerMap_size (sub->pull_map));
4279     final_size    = second_border +
4280       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4281     LOG (GNUNET_ERROR_TYPE_DEBUG,
4282         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4283         first_border,
4284         second_border,
4285         final_size);
4286
4287     /* Update view with peers received through PUSHes */
4288     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4289                                            CustomPeerMap_size (sub->push_map));
4290     for (i = 0; i < first_border; i++)
4291     {
4292       int inserted;
4293       inserted = insert_in_view (sub,
4294                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4295                                                                   permut[i]));
4296       if (GNUNET_OK == inserted)
4297       {
4298         clients_notify_stream_peer (sub,
4299             1,
4300             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4301       }
4302       to_file (sub->file_name_view_log,
4303                "+%s\t(push list)",
4304                GNUNET_i2s_full (&view_array[i]));
4305       // TODO change the peer_flags accordingly
4306     }
4307     GNUNET_free (permut);
4308     permut = NULL;
4309
4310     /* Update view with peers received through PULLs */
4311     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4312                                            CustomPeerMap_size (sub->pull_map));
4313     for (i = first_border; i < second_border; i++)
4314     {
4315       int inserted;
4316       inserted = insert_in_view (sub,
4317           CustomPeerMap_get_peer_by_index (sub->pull_map,
4318                                            permut[i - first_border]));
4319       if (GNUNET_OK == inserted)
4320       {
4321         clients_notify_stream_peer (sub,
4322             1,
4323             CustomPeerMap_get_peer_by_index (sub->pull_map,
4324                                              permut[i - first_border]));
4325       }
4326       to_file (sub->file_name_view_log,
4327                "+%s\t(pull list)",
4328                GNUNET_i2s_full (&view_array[i]));
4329       // TODO change the peer_flags accordingly
4330     }
4331     GNUNET_free (permut);
4332     permut = NULL;
4333
4334     /* Update view with peers from history */
4335     RPS_sampler_get_n_rand_peers (sub->sampler,
4336                                   final_size - second_border,
4337                                   hist_update,
4338                                   sub);
4339     // TODO change the peer_flags accordingly
4340
4341     for (i = 0; i < View_size (sub->view); i++)
4342       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4343
4344     /* Clean peers that were removed from the view */
4345     for (i = 0; i < peers_to_clean_size; i++)
4346     {
4347       to_file (sub->file_name_view_log,
4348                "-%s",
4349                GNUNET_i2s_full (&peers_to_clean[i]));
4350       clean_peer (sub, &peers_to_clean[i]);
4351     }
4352
4353     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4354     clients_notify_view_update (sub);
4355   } else {
4356     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4357     if (sub == msub)
4358     {
4359       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4360       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4361           !(0 >= CustomPeerMap_size (sub->pull_map)))
4362         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4363       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4364           (0 >= CustomPeerMap_size (sub->pull_map)))
4365         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4366       if (0 >= CustomPeerMap_size (sub->push_map) &&
4367           !(0 >= CustomPeerMap_size (sub->pull_map)))
4368         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4369       if (0 >= CustomPeerMap_size (sub->push_map) &&
4370           (0 >= CustomPeerMap_size (sub->pull_map)))
4371         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4372       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4373           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4374           0 >= CustomPeerMap_size (sub->push_map))
4375         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4376     }
4377   }
4378   // TODO independent of that also get some peers from CADET_get_peers()?
4379   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4380   {
4381     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4382   }
4383   else
4384   {
4385     LOG (GNUNET_ERROR_TYPE_WARNING,
4386          "Push map size too big for histogram (%u, %u)\n",
4387          CustomPeerMap_size (sub->push_map),
4388          HISTOGRAM_FILE_SLOTS);
4389   }
4390   // FIXME check bounds of histogram
4391   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4392                    (alpha * sub->view_size_est_need)) +
4393                           (HISTOGRAM_FILE_SLOTS/2)]++;
4394   if (sub == msub)
4395   {
4396     GNUNET_STATISTICS_set (stats,
4397         "# peers in push map at end of round",
4398         CustomPeerMap_size (sub->push_map),
4399         GNUNET_NO);
4400     GNUNET_STATISTICS_set (stats,
4401         "# peers in pull map at end of round",
4402         CustomPeerMap_size (sub->pull_map),
4403         GNUNET_NO);
4404     GNUNET_STATISTICS_set (stats,
4405         "# peers in view at end of round",
4406         View_size (sub->view),
4407         GNUNET_NO);
4408     GNUNET_STATISTICS_set (stats,
4409         "# expected pushes",
4410         alpha * sub->view_size_est_need,
4411         GNUNET_NO);
4412     GNUNET_STATISTICS_set (stats,
4413         "delta expected - received pushes",
4414         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4415         GNUNET_NO);
4416   }
4417
4418   LOG (GNUNET_ERROR_TYPE_DEBUG,
4419        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4420        CustomPeerMap_size (sub->push_map),
4421        CustomPeerMap_size (sub->pull_map),
4422        alpha,
4423        View_size (sub->view),
4424        alpha * View_size (sub->view));
4425
4426   /* Update samplers */
4427   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4428   {
4429     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4430     LOG (GNUNET_ERROR_TYPE_DEBUG,
4431          "Updating with peer %s from push list\n",
4432          GNUNET_i2s (update_peer));
4433     insert_in_sampler (sub, update_peer);
4434     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4435   }
4436
4437   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4438   {
4439     LOG (GNUNET_ERROR_TYPE_DEBUG,
4440          "Updating with peer %s from pull list\n",
4441          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4442     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4443     /* This cleans only if it is not in the view */
4444     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4445   }
4446
4447
4448   /* Empty push/pull lists */
4449   CustomPeerMap_clear (sub->push_map);
4450   CustomPeerMap_clear (sub->pull_map);
4451
4452   if (sub == msub)
4453   {
4454     GNUNET_STATISTICS_set (stats,
4455                            "view size",
4456                            View_size(sub->view),
4457                            GNUNET_NO);
4458   }
4459
4460   struct GNUNET_TIME_Relative time_next_round;
4461
4462   time_next_round = compute_rand_delay (sub->round_interval, 2);
4463
4464   /* Schedule next round */
4465   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4466                                                      &do_round, sub);
4467   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4468 }
4469
4470
4471 /**
4472  * This is called from GNUNET_CADET_get_peers().
4473  *
4474  * It is called on every peer(ID) that cadet somehow has contact with.
4475  * We use those to initialise the sampler.
4476  *
4477  * implements #GNUNET_CADET_PeersCB
4478  *
4479  * @param cls Closure - Sub
4480  * @param peer Peer, or NULL on "EOF".
4481  * @param tunnel Do we have a tunnel towards this peer?
4482  * @param n_paths Number of known paths towards this peer.
4483  * @param best_path How long is the best path?
4484  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4485  */
4486 void
4487 init_peer_cb (void *cls,
4488               const struct GNUNET_PeerIdentity *peer,
4489               int tunnel, /* "Do we have a tunnel towards this peer?" */
4490               unsigned int n_paths, /* "Number of known paths towards this peer" */
4491               unsigned int best_path) /* "How long is the best path?
4492                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4493 {
4494   struct Sub *sub = cls;
4495   (void) tunnel;
4496   (void) n_paths;
4497   (void) best_path;
4498
4499   if (NULL != peer)
4500   {
4501     LOG (GNUNET_ERROR_TYPE_DEBUG,
4502          "Got peer_id %s from cadet\n",
4503          GNUNET_i2s (peer));
4504     got_peer (sub, peer);
4505   }
4506 }
4507
4508
4509 /**
4510  * @brief Iterator function over stored, valid peers.
4511  *
4512  * We initialise the sampler with those.
4513  *
4514  * @param cls Closure - Sub
4515  * @param peer the peer id
4516  * @return #GNUNET_YES if we should continue to
4517  *         iterate,
4518  *         #GNUNET_NO if not.
4519  */
4520 static int
4521 valid_peers_iterator (void *cls,
4522                       const struct GNUNET_PeerIdentity *peer)
4523 {
4524   struct Sub *sub = cls;
4525
4526   if (NULL != peer)
4527   {
4528     LOG (GNUNET_ERROR_TYPE_DEBUG,
4529          "Got stored, valid peer %s\n",
4530          GNUNET_i2s (peer));
4531     got_peer (sub, peer);
4532   }
4533   return GNUNET_YES;
4534 }
4535
4536
4537 /**
4538  * Iterator over peers from peerinfo.
4539  *
4540  * @param cls Closure - Sub
4541  * @param peer id of the peer, NULL for last call
4542  * @param hello hello message for the peer (can be NULL)
4543  * @param error message
4544  */
4545 void
4546 process_peerinfo_peers (void *cls,
4547                         const struct GNUNET_PeerIdentity *peer,
4548                         const struct GNUNET_HELLO_Message *hello,
4549                         const char *err_msg)
4550 {
4551   struct Sub *sub = cls;
4552   (void) hello;
4553   (void) err_msg;
4554
4555   if (NULL != peer)
4556   {
4557     LOG (GNUNET_ERROR_TYPE_DEBUG,
4558          "Got peer_id %s from peerinfo\n",
4559          GNUNET_i2s (peer));
4560     got_peer (sub, peer);
4561   }
4562 }
4563
4564
4565 /**
4566  * Task run during shutdown.
4567  *
4568  * @param cls Closure - unused
4569  */
4570 static void
4571 shutdown_task (void *cls)
4572 {
4573   (void) cls;
4574   struct ClientContext *client_ctx;
4575
4576   LOG (GNUNET_ERROR_TYPE_DEBUG,
4577        "RPS service is going down\n");
4578
4579   /* Clean all clients */
4580   for (client_ctx = cli_ctx_head;
4581        NULL != cli_ctx_head;
4582        client_ctx = cli_ctx_head)
4583   {
4584     destroy_cli_ctx (client_ctx);
4585   }
4586   if (NULL != msub)
4587   {
4588     destroy_sub (msub);
4589     msub = NULL;
4590   }
4591
4592   /* Disconnect from other services */
4593   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4594   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4595   peerinfo_handle = NULL;
4596   GNUNET_NSE_disconnect (nse);
4597   if (NULL != map_single_hop)
4598   {
4599     /* core_init was called - core was initialised */
4600     /* disconnect first, so no callback tries to access missing peermap */
4601     GNUNET_CORE_disconnect (core_handle);
4602     core_handle = NULL;
4603     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4604     map_single_hop = NULL;
4605   }
4606
4607   if (NULL != stats)
4608   {
4609     GNUNET_STATISTICS_destroy (stats,
4610                                GNUNET_NO);
4611     stats = NULL;
4612   }
4613   GNUNET_CADET_disconnect (cadet_handle);
4614   cadet_handle = NULL;
4615 #if ENABLE_MALICIOUS
4616   struct AttackedPeer *tmp_att_peer;
4617   GNUNET_array_grow (mal_peers,
4618                      num_mal_peers,
4619                      0);
4620   if (NULL != mal_peer_set)
4621     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4622   if (NULL != att_peer_set)
4623     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4624   while (NULL != att_peers_head)
4625   {
4626     tmp_att_peer = att_peers_head;
4627     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4628                                  att_peers_tail,
4629                                  tmp_att_peer);
4630     GNUNET_free (tmp_att_peer);
4631   }
4632 #endif /* ENABLE_MALICIOUS */
4633   close_all_files();
4634 }
4635
4636
4637 /**
4638  * Handle client connecting to the service.
4639  *
4640  * @param cls unused
4641  * @param client the new client
4642  * @param mq the message queue of @a client
4643  * @return @a client
4644  */
4645 static void *
4646 client_connect_cb (void *cls,
4647                    struct GNUNET_SERVICE_Client *client,
4648                    struct GNUNET_MQ_Handle *mq)
4649 {
4650   struct ClientContext *cli_ctx;
4651   (void) cls;
4652
4653   LOG (GNUNET_ERROR_TYPE_DEBUG,
4654        "Client connected\n");
4655   if (NULL == client)
4656     return client; /* Server was destroyed before a client connected. Shutting down */
4657   cli_ctx = GNUNET_new (struct ClientContext);
4658   cli_ctx->mq = mq;
4659   cli_ctx->view_updates_left = -1;
4660   cli_ctx->stream_update = GNUNET_NO;
4661   cli_ctx->client = client;
4662   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4663                                cli_ctx_tail,
4664                                cli_ctx);
4665   return cli_ctx;
4666 }
4667
4668 /**
4669  * Callback called when a client disconnected from the service
4670  *
4671  * @param cls closure for the service
4672  * @param c the client that disconnected
4673  * @param internal_cls should be equal to @a c
4674  */
4675 static void
4676 client_disconnect_cb (void *cls,
4677                       struct GNUNET_SERVICE_Client *client,
4678                       void *internal_cls)
4679 {
4680   struct ClientContext *cli_ctx = internal_cls;
4681
4682   (void) cls;
4683   GNUNET_assert (client == cli_ctx->client);
4684   if (NULL == client)
4685   {/* shutdown task - destroy all clients */
4686     while (NULL != cli_ctx_head)
4687       destroy_cli_ctx (cli_ctx_head);
4688   }
4689   else
4690   { /* destroy this client */
4691     LOG (GNUNET_ERROR_TYPE_DEBUG,
4692         "Client disconnected. Destroy its context.\n");
4693     destroy_cli_ctx (cli_ctx);
4694   }
4695 }
4696
4697
4698 /**
4699  * Handle random peer sampling clients.
4700  *
4701  * @param cls closure
4702  * @param c configuration to use
4703  * @param service the initialized service
4704  */
4705 static void
4706 run (void *cls,
4707      const struct GNUNET_CONFIGURATION_Handle *c,
4708      struct GNUNET_SERVICE_Handle *service)
4709 {
4710   struct GNUNET_TIME_Relative round_interval;
4711   long long unsigned int sampler_size;
4712   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4713   struct GNUNET_HashCode hash;
4714
4715   (void) cls;
4716   (void) service;
4717
4718   GNUNET_log_setup ("rps",
4719                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4720                     NULL);
4721   cfg = c;
4722   /* Get own ID */
4723   GNUNET_CRYPTO_get_peer_identity (cfg,
4724                                    &own_identity); // TODO check return value
4725   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4726               "STARTING SERVICE (rps) for peer [%s]\n",
4727               GNUNET_i2s (&own_identity));
4728 #if ENABLE_MALICIOUS
4729   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4730               "Malicious execution compiled in.\n");
4731 #endif /* ENABLE_MALICIOUS */
4732
4733   /* Get time interval from the configuration */
4734   if (GNUNET_OK !=
4735       GNUNET_CONFIGURATION_get_value_time (cfg,
4736                                            "RPS",
4737                                            "ROUNDINTERVAL",
4738                                            &round_interval))
4739   {
4740     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4741                                "RPS", "ROUNDINTERVAL");
4742     GNUNET_SCHEDULER_shutdown ();
4743     return;
4744   }
4745
4746   /* Get initial size of sampler/view from the configuration */
4747   if (GNUNET_OK !=
4748       GNUNET_CONFIGURATION_get_value_number (cfg,
4749                                              "RPS",
4750                                              "MINSIZE",
4751                                              &sampler_size))
4752   {
4753     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4754                                "RPS", "MINSIZE");
4755     GNUNET_SCHEDULER_shutdown ();
4756     return;
4757   }
4758
4759   cadet_handle = GNUNET_CADET_connect (cfg);
4760   GNUNET_assert (NULL != cadet_handle);
4761   core_handle = GNUNET_CORE_connect (cfg,
4762                                      NULL, /* cls */
4763                                      core_init, /* init */
4764                                      core_connects, /* connects */
4765                                      core_disconnects, /* disconnects */
4766                                      NULL); /* handlers */
4767   GNUNET_assert (NULL != core_handle);
4768
4769
4770   alpha = 0.45;
4771   beta  = 0.45;
4772
4773
4774   /* Set up main Sub */
4775   GNUNET_CRYPTO_hash (hash_port_string,
4776                       strlen (hash_port_string),
4777                       &hash);
4778   msub = new_sub (&hash,
4779                  sampler_size, /* Will be overwritten by config */
4780                  round_interval);
4781
4782
4783   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4784
4785   /* connect to NSE */
4786   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4787
4788   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4789   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4790   // TODO send push/pull to each of those peers?
4791   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4792   restore_valid_peers (msub);
4793   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4794
4795   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4796                                                    GNUNET_NO,
4797                                                    process_peerinfo_peers,
4798                                                    msub);
4799
4800   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4801
4802   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4803   stats = GNUNET_STATISTICS_create ("rps", cfg);
4804 }
4805
4806
4807 /**
4808  * Define "main" method using service macro.
4809  */
4810 GNUNET_SERVICE_MAIN
4811 ("rps",
4812  GNUNET_SERVICE_OPTION_NONE,
4813  &run,
4814  &client_connect_cb,
4815  &client_disconnect_cb,
4816  NULL,
4817  GNUNET_MQ_hd_var_size (client_seed,
4818    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4819    struct GNUNET_RPS_CS_SeedMessage,
4820    NULL),
4821 #if ENABLE_MALICIOUS
4822  GNUNET_MQ_hd_var_size (client_act_malicious,
4823    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4824    struct GNUNET_RPS_CS_ActMaliciousMessage,
4825    NULL),
4826 #endif /* ENABLE_MALICIOUS */
4827  GNUNET_MQ_hd_fixed_size (client_view_request,
4828    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4829    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4830    NULL),
4831  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4832    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4833    struct GNUNET_MessageHeader,
4834    NULL),
4835  GNUNET_MQ_hd_fixed_size (client_stream_request,
4836    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4837    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4838    NULL),
4839  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4840    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4841    struct GNUNET_MessageHeader,
4842    NULL),
4843  GNUNET_MQ_hd_fixed_size (client_start_sub,
4844    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4845    struct GNUNET_RPS_CS_SubStartMessage,
4846    NULL),
4847  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4848    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4849    struct GNUNET_RPS_CS_SubStopMessage,
4850    NULL),
4851  GNUNET_MQ_handler_end());
4852
4853 /* end of gnunet-service-rps.c */