remove 'illegal' (non-reentrant) log logic from signal handler
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log (kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask) \
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask) \
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88 /**
89  * Pending operation on peer consisting of callback and closure
90  *
91  * When an operation cannot be executed right now this struct is used to store
92  * the callback and closure for later execution.
93  */
94 struct PeerPendingOp
95 {
96   /**
97    * Callback
98    */
99   PeerOp op;
100
101   /**
102    * Closure
103    */
104   void *op_cls;
105 };
106
107 /**
108  * List containing all messages that are yet to be send
109  *
110  * This is used to keep track of all messages that have not been sent yet. When
111  * a peer is to be removed the pending messages can be removed properly.
112  */
113 struct PendingMessage
114 {
115   /**
116    * DLL next, prev
117    */
118   struct PendingMessage *next;
119   struct PendingMessage *prev;
120
121   /**
122    * The envelope to the corresponding message
123    */
124   struct GNUNET_MQ_Envelope *ev;
125
126   /**
127    * The corresponding context
128    */
129   struct PeerContext *peer_ctx;
130
131   /**
132    * The message type
133    */
134   const char *type;
135 };
136
137 /**
138  * @brief Context for a channel
139  */
140 struct ChannelCtx;
141
142 /**
143  * Struct used to keep track of other peer's status
144  *
145  * This is stored in a multipeermap.
146  * It contains information such as cadet channels, a message queue for sending,
147  * status about the channels, the pending operations on this peer and some flags
148  * about the status of the peer itself. (online, valid, ...)
149  */
150 struct PeerContext
151 {
152   /**
153    * The Sub this context belongs to.
154    */
155   struct Sub *sub;
156
157   /**
158    * Message queue open to client
159    */
160   struct GNUNET_MQ_Handle *mq;
161
162   /**
163    * Channel open to client.
164    */
165   struct ChannelCtx *send_channel_ctx;
166
167   /**
168    * Channel open from client.
169    */
170   struct ChannelCtx *recv_channel_ctx;
171
172   /**
173    * Array of pending operations on this peer.
174    */
175   struct PeerPendingOp *pending_ops;
176
177   /**
178    * Handle to the callback given to cadet_ntfy_tmt_rdy()
179    *
180    * To be canceled on shutdown.
181    */
182   struct PendingMessage *online_check_pending;
183
184   /**
185    * Number of pending operations.
186    */
187   unsigned int num_pending_ops;
188
189   /**
190    * Identity of the peer
191    */
192   struct GNUNET_PeerIdentity peer_id;
193
194   /**
195    * Flags indicating status of peer
196    */
197   uint32_t peer_flags;
198
199   /**
200    * Last time we received something from that peer.
201    */
202   struct GNUNET_TIME_Absolute last_message_recv;
203
204   /**
205    * Last time we received a keepalive message.
206    */
207   struct GNUNET_TIME_Absolute last_keepalive;
208
209   /**
210    * DLL with all messages that are yet to be sent
211    */
212   struct PendingMessage *pending_messages_head;
213   struct PendingMessage *pending_messages_tail;
214
215   /**
216    * This is pobably followed by 'statistical' data (when we first saw
217    * it, how did we get its ID, how many pushes (in a timeinterval),
218    * ...)
219    */
220   uint32_t round_pull_req;
221 };
222
223 /**
224  * @brief Closure to #valid_peer_iterator
225  */
226 struct PeersIteratorCls
227 {
228   /**
229    * Iterator function
230    */
231   PeersIterator iterator;
232
233   /**
234    * Closure to iterator
235    */
236   void *cls;
237 };
238
239 /**
240  * @brief Context for a channel
241  */
242 struct ChannelCtx
243 {
244   /**
245    * @brief The channel itself
246    */
247   struct GNUNET_CADET_Channel *channel;
248
249   /**
250    * @brief The peer context associated with the channel
251    */
252   struct PeerContext *peer_ctx;
253
254   /**
255    * @brief When channel destruction needs to be delayed (because it is called
256    * from within the cadet routine of another channel destruction) this task
257    * refers to the respective _SCHEDULER_Task.
258    */
259   struct GNUNET_SCHEDULER_Task *destruction_task;
260 };
261
262
263 #if ENABLE_MALICIOUS
264
265 /**
266  * If type is 2 This struct is used to store the attacked peers in a DLL
267  */
268 struct AttackedPeer
269 {
270   /**
271    * DLL
272    */
273   struct AttackedPeer *next;
274   struct AttackedPeer *prev;
275
276   /**
277    * PeerID
278    */
279   struct GNUNET_PeerIdentity peer_id;
280 };
281
282 #endif /* ENABLE_MALICIOUS */
283
284 /**
285  * @brief This number determines the number of slots for files that represent
286  * histograms
287  */
288 #define HISTOGRAM_FILE_SLOTS 32
289
290 /**
291  * @brief The size (in bytes) a file needs to store the histogram
292  *
293  * Per slot: 1 newline, up to 4 chars,
294  * Additionally: 1 null termination
295  */
296 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
297
298 /**
299  * @brief One Sub.
300  *
301  * Essentially one instance of brahms that only connects to other instances
302  * with the same (secret) value.
303  */
304 struct Sub
305 {
306   /**
307    * @brief Hash of the shared value that defines Subs.
308    */
309   struct GNUNET_HashCode hash;
310
311   /**
312    * @brief Port to communicate to other peers.
313    */
314   struct GNUNET_CADET_Port *cadet_port;
315
316   /**
317    * @brief Hashmap of valid peers.
318    */
319   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
320
321   /**
322    * @brief Filename of the file that stores the valid peers persistently.
323    */
324   char *filename_valid_peers;
325
326   /**
327    * Set of all peers to keep track of them.
328    */
329   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
330
331   /**
332    * @brief This is the minimum estimate used as sampler size.
333    *
334    * It is configured by the user.
335    */
336   unsigned int sampler_size_est_min;
337
338   /**
339    * The size of sampler we need to be able to satisfy the Brahms protocol's
340    * need of random peers.
341    *
342    * This is one minimum size the sampler grows to.
343    */
344   unsigned int sampler_size_est_need;
345
346   /**
347    * Time interval the do_round task runs in.
348    */
349   struct GNUNET_TIME_Relative round_interval;
350
351   /**
352    * Sampler used for the Brahms protocol itself.
353    */
354   struct RPS_Sampler *sampler;
355
356 #ifdef TO_FILE_FULL
357   /**
358    * Name to log view to
359    */
360   char *file_name_view_log;
361 #endif /* TO_FILE_FULL */
362
363 #ifdef TO_FILE
364 #ifdef TO_FILE_FULL
365   /**
366    * Name to log number of observed peers to
367    */
368   char *file_name_observed_log;
369 #endif /* TO_FILE_FULL */
370
371   /**
372    * @brief Count the observed peers
373    */
374   uint32_t num_observed_peers;
375
376   /**
377    * @brief Multipeermap (ab-) used to count unique peer_ids
378    */
379   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
380 #endif /* TO_FILE */
381
382   /**
383    * List to store peers received through pushes temporary.
384    */
385   struct CustomPeerMap *push_map;
386
387   /**
388    * List to store peers received through pulls temporary.
389    */
390   struct CustomPeerMap *pull_map;
391
392   /**
393    * @brief This is the estimate used as view size.
394    *
395    * It is initialised with the minimum
396    */
397   unsigned int view_size_est_need;
398
399   /**
400    * @brief This is the minimum estimate used as view size.
401    *
402    * It is configured by the user.
403    */
404   unsigned int view_size_est_min;
405
406   /**
407    * @brief The view.
408    */
409   struct View *view;
410
411   /**
412    * Identifier for the main task that runs periodically.
413    */
414   struct GNUNET_SCHEDULER_Task *do_round_task;
415
416   /* === stats === */
417
418   /**
419    * @brief Counts the executed rounds.
420    */
421   uint32_t num_rounds;
422
423   /**
424    * @brief This array accumulates the number of received pushes per round.
425    *
426    * Number at index i represents the number of rounds with i observed pushes.
427    */
428   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
429
430   /**
431    * @brief Histogram of deltas between the expected and actual number of
432    * received pushes.
433    *
434    * As half of the entries are expected to be negative, this is shifted by
435    * #HISTOGRAM_FILE_SLOTS/2.
436    */
437   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
438
439   /**
440    * @brief Number of pull replies with this delay measured in rounds.
441    *
442    * Number at index i represents the number of pull replies with a delay of i
443    * rounds.
444    */
445   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
446 };
447
448
449 /***********************************************************************
450 * Globals
451 ***********************************************************************/
452
453 /**
454  * Our configuration.
455  */
456 static const struct GNUNET_CONFIGURATION_Handle *cfg;
457
458 /**
459  * Handle to the statistics service.
460  */
461 struct GNUNET_STATISTICS_Handle *stats;
462
463 /**
464  * Handler to CADET.
465  */
466 struct GNUNET_CADET_Handle *cadet_handle;
467
468 /**
469  * Handle to CORE
470  */
471 struct GNUNET_CORE_Handle *core_handle;
472
473 /**
474  * @brief PeerMap to keep track of connected peers.
475  */
476 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
477
478 /**
479  * Our own identity.
480  */
481 static struct GNUNET_PeerIdentity own_identity;
482
483 /**
484  * Percentage of total peer number in the view
485  * to send random PUSHes to
486  */
487 static float alpha;
488
489 /**
490  * Percentage of total peer number in the view
491  * to send random PULLs to
492  */
493 static float beta;
494
495 /**
496  * Handler to NSE.
497  */
498 static struct GNUNET_NSE_Handle *nse;
499
500 /**
501  * Handler to PEERINFO.
502  */
503 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
504
505 /**
506  * Handle for cancellation of iteration over peers.
507  */
508 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
509
510
511 #if ENABLE_MALICIOUS
512 /**
513  * Type of malicious peer
514  *
515  * 0 Don't act malicious at all - Default
516  * 1 Try to maximise representation
517  * 2 Try to partition the network
518  * 3 Combined attack
519  */
520 static uint32_t mal_type;
521
522 /**
523  * Other malicious peers
524  */
525 static struct GNUNET_PeerIdentity *mal_peers;
526
527 /**
528  * Hashmap of malicious peers used as set.
529  * Used to more efficiently check whether we know that peer.
530  */
531 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
532
533 /**
534  * Number of other malicious peers
535  */
536 static uint32_t num_mal_peers;
537
538
539 /**
540  * If type is 2 this is the DLL of attacked peers
541  */
542 static struct AttackedPeer *att_peers_head;
543 static struct AttackedPeer *att_peers_tail;
544
545 /**
546  * This index is used to point to an attacked peer to
547  * implement the round-robin-ish way to select attacked peers.
548  */
549 static struct AttackedPeer *att_peer_index;
550
551 /**
552  * Hashmap of attacked peers used as set.
553  * Used to more efficiently check whether we know that peer.
554  */
555 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
556
557 /**
558  * Number of attacked peers
559  */
560 static uint32_t num_attacked_peers;
561
562 /**
563  * If type is 1 this is the attacked peer
564  */
565 static struct GNUNET_PeerIdentity attacked_peer;
566
567 /**
568  * The limit of PUSHes we can send in one round.
569  * This is an assumption of the Brahms protocol and either implemented
570  * via proof of work
571  * or
572  * assumend to be the bandwidth limitation.
573  */
574 static uint32_t push_limit = 10000;
575 #endif /* ENABLE_MALICIOUS */
576
577 /**
578  * @brief Main Sub.
579  *
580  * This is run in any case by all peers and connects to all peers without
581  * specifying a shared value.
582  */
583 static struct Sub *msub;
584
585 /**
586  * @brief Maximum number of valid peers to keep.
587  * TODO read from config
588  */
589 static const uint32_t num_valid_peers_max = UINT32_MAX;
590
591 /***********************************************************************
592 * /Globals
593 ***********************************************************************/
594
595
596 static void
597 do_round (void *cls);
598
599 static void
600 do_mal_round (void *cls);
601
602
603 /**
604  * @brief Get the #PeerContext associated with a peer
605  *
606  * @param peer_map The peer map containing the context
607  * @param peer the peer id
608  *
609  * @return the #PeerContext
610  */
611 static struct PeerContext *
612 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
613               const struct GNUNET_PeerIdentity *peer)
614 {
615   struct PeerContext *ctx;
616   int ret;
617
618   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
619   GNUNET_assert (GNUNET_YES == ret);
620   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
621   GNUNET_assert (NULL != ctx);
622   return ctx;
623 }
624
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673                                            GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                            "# known peers",
679                            GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                            GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ((NULL == peer_ctx->send_channel_ctx) &&
729       (NULL == peer_ctx->recv_channel_ctx))
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780
781   (void) value;
782
783   if (0 >= iterator_cls->index)
784   {
785     iterator_cls->peer = peer;
786     return GNUNET_NO;
787   }
788   iterator_cls->index--;
789   return GNUNET_YES;
790 }
791
792
793 /**
794  * @brief Get a random peer from @a peer_map
795  *
796  * @param valid_peers Peer map containing valid peers from which to select a
797  * random one
798  *
799  * @return a random peer
800  */
801 static const struct GNUNET_PeerIdentity *
802 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
803 {
804   struct GetRandPeerIteratorCls *iterator_cls;
805   const struct GNUNET_PeerIdentity *ret;
806
807   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
808   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
809                                                   GNUNET_CONTAINER_multipeermap_size (
810                                                     valid_peers));
811   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
812                                                 get_rand_peer_iterator,
813                                                 iterator_cls);
814   ret = iterator_cls->peer;
815   GNUNET_free (iterator_cls);
816   return ret;
817 }
818
819
820 /**
821  * @brief Add a given @a peer to valid peers.
822  *
823  * If valid peers are already #num_valid_peers_max, delete a peer previously.
824  *
825  * @param peer The peer that is added to the valid peers.
826  * @param valid_peers Peer map of valid peers to which to add the @a peer
827  *
828  * @return #GNUNET_YES if no other peer had to be removed
829  *         #GNUNET_NO  otherwise
830  */
831 static int
832 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
833                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
834 {
835   const struct GNUNET_PeerIdentity *rand_peer;
836   int ret;
837
838   ret = GNUNET_YES;
839   /* Remove random peers until there is space for a new one */
840   while (num_valid_peers_max <=
841          GNUNET_CONTAINER_multipeermap_size (valid_peers))
842   {
843     rand_peer = get_random_peer_from_peermap (valid_peers);
844     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
845     ret = GNUNET_NO;
846   }
847   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
848                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
849   if (valid_peers == msub->valid_peers)
850   {
851     GNUNET_STATISTICS_set (stats,
852                            "# valid peers",
853                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
854                            GNUNET_NO);
855   }
856   return ret;
857 }
858
859
860 static void
861 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
862
863 /**
864  * @brief Set the peer flag to living and
865  *        call the pending operations on this peer.
866  *
867  * Also adds peer to #valid_peers.
868  *
869  * @param peer_ctx the #PeerContext of the peer to set online
870  */
871 static void
872 set_peer_online (struct PeerContext *peer_ctx)
873 {
874   struct GNUNET_PeerIdentity *peer;
875   unsigned int i;
876
877   peer = &peer_ctx->peer_id;
878   LOG (GNUNET_ERROR_TYPE_DEBUG,
879        "Peer %s is online and valid, calling %i pending operations on it\n",
880        GNUNET_i2s (peer),
881        peer_ctx->num_pending_ops);
882
883   if (NULL != peer_ctx->online_check_pending)
884   {
885     LOG (GNUNET_ERROR_TYPE_DEBUG,
886          "Removing pending online check for peer %s\n",
887          GNUNET_i2s (&peer_ctx->peer_id));
888     // TODO wait until cadet sets mq->cancel_impl
889     // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
890     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
891     peer_ctx->online_check_pending = NULL;
892   }
893
894   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
895
896   /* Call pending operations */
897   for (i = 0; i < peer_ctx->num_pending_ops; i++)
898   {
899     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
900   }
901   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
902 }
903
904
905 static void
906 cleanup_destroyed_channel (void *cls,
907                            const struct GNUNET_CADET_Channel *channel);
908
909 /* Declaration of handlers */
910 static void
911 handle_peer_check (void *cls,
912                    const struct GNUNET_MessageHeader *msg);
913
914 static void
915 handle_peer_push (void *cls,
916                   const struct GNUNET_MessageHeader *msg);
917
918 static void
919 handle_peer_pull_request (void *cls,
920                           const struct GNUNET_MessageHeader *msg);
921
922 static int
923 check_peer_pull_reply (void *cls,
924                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
925
926 static void
927 handle_peer_pull_reply (void *cls,
928                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
929
930 /* End declaration of handlers */
931
932 /**
933  * @brief Allocate memory for a new channel context and insert it into DLL
934  *
935  * @param peer_ctx context of the according peer
936  *
937  * @return The channel context
938  */
939 static struct ChannelCtx *
940 add_channel_ctx (struct PeerContext *peer_ctx)
941 {
942   struct ChannelCtx *channel_ctx;
943
944   channel_ctx = GNUNET_new (struct ChannelCtx);
945   channel_ctx->peer_ctx = peer_ctx;
946   return channel_ctx;
947 }
948
949
950 /**
951  * @brief Free memory and NULL pointers.
952  *
953  * @param channel_ctx The channel context.
954  */
955 static void
956 remove_channel_ctx (struct ChannelCtx *channel_ctx)
957 {
958   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
959
960   if (NULL != channel_ctx->destruction_task)
961   {
962     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
963     channel_ctx->destruction_task = NULL;
964   }
965
966   GNUNET_free (channel_ctx);
967
968   if (NULL == peer_ctx)
969     return;
970   if (channel_ctx == peer_ctx->send_channel_ctx)
971   {
972     peer_ctx->send_channel_ctx = NULL;
973     peer_ctx->mq = NULL;
974   }
975   else if (channel_ctx == peer_ctx->recv_channel_ctx)
976   {
977     peer_ctx->recv_channel_ctx = NULL;
978   }
979 }
980
981
982 /**
983  * @brief Get the channel of a peer. If not existing, create.
984  *
985  * @param peer_ctx Context of the peer of which to get the channel
986  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
987  */
988 struct GNUNET_CADET_Channel *
989 get_channel (struct PeerContext *peer_ctx)
990 {
991   /* There exists a copy-paste-clone in run() */
992   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
993     GNUNET_MQ_hd_fixed_size (peer_check,
994                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
995                              struct GNUNET_MessageHeader,
996                              NULL),
997     GNUNET_MQ_hd_fixed_size (peer_push,
998                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
999                              struct GNUNET_MessageHeader,
1000                              NULL),
1001     GNUNET_MQ_hd_fixed_size (peer_pull_request,
1002                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
1003                              struct GNUNET_MessageHeader,
1004                              NULL),
1005     GNUNET_MQ_hd_var_size (peer_pull_reply,
1006                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1007                            struct GNUNET_RPS_P2P_PullReplyMessage,
1008                            NULL),
1009     GNUNET_MQ_handler_end ()
1010   };
1011
1012
1013   if (NULL == peer_ctx->send_channel_ctx)
1014   {
1015     LOG (GNUNET_ERROR_TYPE_DEBUG,
1016          "Trying to establish channel to peer %s\n",
1017          GNUNET_i2s (&peer_ctx->peer_id));
1018     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1019     peer_ctx->send_channel_ctx->channel =
1020       GNUNET_CADET_channel_create (cadet_handle,
1021                                    peer_ctx->send_channel_ctx,  /* context */
1022                                    &peer_ctx->peer_id,
1023                                    &peer_ctx->sub->hash,
1024                                    NULL,  /* WindowSize handler */
1025                                    &cleanup_destroyed_channel,  /* Disconnect handler */
1026                                    cadet_handlers);
1027   }
1028   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1029   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1030   return peer_ctx->send_channel_ctx->channel;
1031 }
1032
1033
1034 /**
1035  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1036  *
1037  * If we already have a message queue open to this client,
1038  * simply return it, otherways create one.
1039  *
1040  * @param peer_ctx Context of the peer of whicht to get the mq
1041  * @return the #GNUNET_MQ_Handle
1042  */
1043 static struct GNUNET_MQ_Handle *
1044 get_mq (struct PeerContext *peer_ctx)
1045 {
1046   if (NULL == peer_ctx->mq)
1047   {
1048     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1049   }
1050   return peer_ctx->mq;
1051 }
1052
1053
1054 /**
1055  * @brief Add an envelope to a message passed to mq to list of pending messages
1056  *
1057  * @param peer_ctx Context of the peer for which to insert the envelope
1058  * @param ev envelope to the message
1059  * @param type type of the message to be sent
1060  * @return pointer to pending message
1061  */
1062 static struct PendingMessage *
1063 insert_pending_message (struct PeerContext *peer_ctx,
1064                         struct GNUNET_MQ_Envelope *ev,
1065                         const char *type)
1066 {
1067   struct PendingMessage *pending_msg;
1068
1069   pending_msg = GNUNET_new (struct PendingMessage);
1070   pending_msg->ev = ev;
1071   pending_msg->peer_ctx = peer_ctx;
1072   pending_msg->type = type;
1073   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1074                                peer_ctx->pending_messages_tail,
1075                                pending_msg);
1076   return pending_msg;
1077 }
1078
1079
1080 /**
1081  * @brief Remove a pending message from the respective DLL
1082  *
1083  * @param pending_msg the pending message to remove
1084  * @param cancel whether to cancel the pending message, too
1085  */
1086 static void
1087 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1088 {
1089   struct PeerContext *peer_ctx;
1090
1091   (void) cancel;
1092
1093   peer_ctx = pending_msg->peer_ctx;
1094   GNUNET_assert (NULL != peer_ctx);
1095   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1096                                peer_ctx->pending_messages_tail,
1097                                pending_msg);
1098   // TODO wait for the cadet implementation of message cancellation
1099   // if (GNUNET_YES == cancel)
1100   // {
1101   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1102   // }
1103   GNUNET_free (pending_msg);
1104 }
1105
1106
1107 /**
1108  * @brief This is called in response to the first message we sent as a
1109  * online check.
1110  *
1111  * @param cls #PeerContext of peer with pending online check
1112  */
1113 static void
1114 mq_online_check_successful (void *cls)
1115 {
1116   struct PeerContext *peer_ctx = cls;
1117
1118   if (NULL != peer_ctx->online_check_pending)
1119   {
1120     LOG (GNUNET_ERROR_TYPE_DEBUG,
1121          "Online check for peer %s was successfull\n",
1122          GNUNET_i2s (&peer_ctx->peer_id));
1123     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1124     peer_ctx->online_check_pending = NULL;
1125     set_peer_online (peer_ctx);
1126     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1127   }
1128 }
1129
1130
1131 /**
1132  * Issue a check whether peer is online
1133  *
1134  * @param peer_ctx the context of the peer
1135  */
1136 static void
1137 check_peer_online (struct PeerContext *peer_ctx)
1138 {
1139   LOG (GNUNET_ERROR_TYPE_DEBUG,
1140        "Get informed about peer %s getting online\n",
1141        GNUNET_i2s (&peer_ctx->peer_id));
1142
1143   struct GNUNET_MQ_Handle *mq;
1144   struct GNUNET_MQ_Envelope *ev;
1145
1146   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1147   peer_ctx->online_check_pending =
1148     insert_pending_message (peer_ctx, ev, "Check online");
1149   mq = get_mq (peer_ctx);
1150   GNUNET_MQ_notify_sent (ev,
1151                          mq_online_check_successful,
1152                          peer_ctx);
1153   GNUNET_MQ_send (mq, ev);
1154   if (peer_ctx->sub == msub)
1155   {
1156     GNUNET_STATISTICS_update (stats,
1157                               "# pending online checks",
1158                               1,
1159                               GNUNET_NO);
1160   }
1161 }
1162
1163
1164 /**
1165  * @brief Check whether function of type #PeerOp was already scheduled
1166  *
1167  * The array with pending operations will probably never grow really big, so
1168  * iterating over it should be ok.
1169  *
1170  * @param peer_ctx Context of the peer to check for the operation
1171  * @param peer_op the operation (#PeerOp) on the peer
1172  *
1173  * @return #GNUNET_YES if this operation is scheduled on that peer
1174  *         #GNUNET_NO  otherwise
1175  */
1176 static int
1177 check_operation_scheduled (const struct PeerContext *peer_ctx,
1178                            const PeerOp peer_op)
1179 {
1180   unsigned int i;
1181
1182   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1183     if (peer_op == peer_ctx->pending_ops[i].op)
1184       return GNUNET_YES;
1185   return GNUNET_NO;
1186 }
1187
1188
1189 /**
1190  * @brief Callback for scheduler to destroy a channel
1191  *
1192  * @param cls Context of the channel
1193  */
1194 static void
1195 destroy_channel (struct ChannelCtx *channel_ctx)
1196 {
1197   struct GNUNET_CADET_Channel *channel;
1198
1199   if (NULL != channel_ctx->destruction_task)
1200   {
1201     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1202     channel_ctx->destruction_task = NULL;
1203   }
1204   GNUNET_assert (channel_ctx->channel != NULL);
1205   channel = channel_ctx->channel;
1206   channel_ctx->channel = NULL;
1207   GNUNET_CADET_channel_destroy (channel);
1208   remove_channel_ctx (channel_ctx);
1209 }
1210
1211
1212 /**
1213  * @brief Destroy a cadet channel.
1214  *
1215  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1216  *
1217  * @param cls
1218  */
1219 static void
1220 destroy_channel_cb (void *cls)
1221 {
1222   struct ChannelCtx *channel_ctx = cls;
1223
1224   channel_ctx->destruction_task = NULL;
1225   destroy_channel (channel_ctx);
1226 }
1227
1228
1229 /**
1230  * @brief Schedule the destruction of a channel for immediately afterwards.
1231  *
1232  * In case a channel is to be destroyed from within the callback to the
1233  * destruction of another channel (send channel), we cannot call
1234  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1235  * construction.
1236  *
1237  * @param channel_ctx channel to be destroyed.
1238  */
1239 static void
1240 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1241 {
1242   GNUNET_assert (NULL ==
1243                  channel_ctx->destruction_task);
1244   GNUNET_assert (NULL !=
1245                  channel_ctx->channel);
1246   channel_ctx->destruction_task =
1247     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1248                               channel_ctx);
1249 }
1250
1251
1252 /**
1253  * @brief Remove peer
1254  *
1255  * - Empties the list with pending operations
1256  * - Empties the list with pending messages
1257  * - Cancels potentially existing online check
1258  * - Schedules closing of send and recv channels
1259  * - Removes peer from peer map
1260  *
1261  * @param peer_ctx Context of the peer to be destroyed
1262  * @return #GNUNET_YES if peer was removed
1263  *         #GNUNET_NO  otherwise
1264  */
1265 static int
1266 destroy_peer (struct PeerContext *peer_ctx)
1267 {
1268   GNUNET_assert (NULL != peer_ctx);
1269   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1270   if (GNUNET_NO ==
1271       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1272                                               &peer_ctx->peer_id))
1273   {
1274     return GNUNET_NO;
1275   }
1276   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1277   LOG (GNUNET_ERROR_TYPE_DEBUG,
1278        "Going to remove peer %s\n",
1279        GNUNET_i2s (&peer_ctx->peer_id));
1280   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1281
1282   /* Clear list of pending operations */
1283   // TODO this probably leaks memory
1284   //      ('only' the cls to the function. Not sure what to do with it)
1285   GNUNET_array_grow (peer_ctx->pending_ops,
1286                      peer_ctx->num_pending_ops,
1287                      0);
1288   /* Remove all pending messages */
1289   while (NULL != peer_ctx->pending_messages_head)
1290   {
1291     LOG (GNUNET_ERROR_TYPE_DEBUG,
1292          "Removing unsent %s\n",
1293          peer_ctx->pending_messages_head->type);
1294     /* Cancle pending message, too */
1295     if ((NULL != peer_ctx->online_check_pending) &&
1296         (0 == memcmp (peer_ctx->pending_messages_head,
1297                       peer_ctx->online_check_pending,
1298                       sizeof(struct PendingMessage))))
1299     {
1300       peer_ctx->online_check_pending = NULL;
1301       if (peer_ctx->sub == msub)
1302       {
1303         GNUNET_STATISTICS_update (stats,
1304                                   "# pending online checks",
1305                                   -1,
1306                                   GNUNET_NO);
1307       }
1308     }
1309     remove_pending_message (peer_ctx->pending_messages_head,
1310                             GNUNET_YES);
1311   }
1312
1313   /* If we are still waiting for notification whether this peer is online
1314    * cancel the according task */
1315   if (NULL != peer_ctx->online_check_pending)
1316   {
1317     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318                 "Removing pending online check for peer %s\n",
1319                 GNUNET_i2s (&peer_ctx->peer_id));
1320     // TODO wait until cadet sets mq->cancel_impl
1321     // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1322     remove_pending_message (peer_ctx->online_check_pending,
1323                             GNUNET_YES);
1324     peer_ctx->online_check_pending = NULL;
1325   }
1326
1327   if (NULL != peer_ctx->send_channel_ctx)
1328   {
1329     /* This is possibly called from within channel destruction */
1330     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1331     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1332     peer_ctx->send_channel_ctx = NULL;
1333     peer_ctx->mq = NULL;
1334   }
1335   if (NULL != peer_ctx->recv_channel_ctx)
1336   {
1337     /* This is possibly called from within channel destruction */
1338     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1339     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1340     peer_ctx->recv_channel_ctx = NULL;
1341   }
1342
1343   if (GNUNET_YES !=
1344       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1345                                                 &peer_ctx->peer_id))
1346   {
1347     LOG (GNUNET_ERROR_TYPE_WARNING,
1348          "removing peer from peer_ctx->sub->peer_map failed\n");
1349   }
1350   if (peer_ctx->sub == msub)
1351   {
1352     GNUNET_STATISTICS_set (stats,
1353                            "# known peers",
1354                            GNUNET_CONTAINER_multipeermap_size (
1355                              peer_ctx->sub->peer_map),
1356                            GNUNET_NO);
1357   }
1358   GNUNET_free (peer_ctx);
1359   return GNUNET_YES;
1360 }
1361
1362
1363 /**
1364  * Iterator over hash map entries. Deletes all contexts of peers.
1365  *
1366  * @param cls closure
1367  * @param key current public key
1368  * @param value value in the hash map
1369  * @return #GNUNET_YES if we should continue to iterate,
1370  *         #GNUNET_NO if not.
1371  */
1372 static int
1373 peermap_clear_iterator (void *cls,
1374                         const struct GNUNET_PeerIdentity *key,
1375                         void *value)
1376 {
1377   struct Sub *sub = cls;
1378
1379   (void) value;
1380
1381   destroy_peer (get_peer_ctx (sub->peer_map, key));
1382   return GNUNET_YES;
1383 }
1384
1385
1386 /**
1387  * @brief This is called once a message is sent.
1388  *
1389  * Removes the pending message
1390  *
1391  * @param cls type of the message that was sent
1392  */
1393 static void
1394 mq_notify_sent_cb (void *cls)
1395 {
1396   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1397
1398   LOG (GNUNET_ERROR_TYPE_DEBUG,
1399        "%s was sent.\n",
1400        pending_msg->type);
1401   if (pending_msg->peer_ctx->sub == msub)
1402   {
1403     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1404       GNUNET_STATISTICS_update (stats, "# pull replys sent", 1, GNUNET_NO);
1405     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1406       GNUNET_STATISTICS_update (stats, "# pull requests sent", 1, GNUNET_NO);
1407     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1408       GNUNET_STATISTICS_update (stats, "# pushes sent", 1, GNUNET_NO);
1409     if ((0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) &&
1410         (NULL != map_single_hop) &&
1411         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1412                                                               &pending_msg->
1413                                                               peer_ctx->peer_id)) )
1414       GNUNET_STATISTICS_update (stats,
1415                                 "# pull requests sent (multi-hop peer)",
1416                                 1,
1417                                 GNUNET_NO);
1418   }
1419   /* Do not cancle message */
1420   remove_pending_message (pending_msg, GNUNET_NO);
1421 }
1422
1423
1424 /**
1425  * @brief Iterator function for #store_valid_peers.
1426  *
1427  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1428  * Writes single peer to disk.
1429  *
1430  * @param cls the file handle to write to.
1431  * @param peer current peer
1432  * @param value unused
1433  *
1434  * @return  #GNUNET_YES if we should continue to
1435  *          iterate,
1436  *          #GNUNET_NO if not.
1437  */
1438 static int
1439 store_peer_presistently_iterator (void *cls,
1440                                   const struct GNUNET_PeerIdentity *peer,
1441                                   void *value)
1442 {
1443   const struct GNUNET_DISK_FileHandle *fh = cls;
1444   char peer_string[128];
1445   int size;
1446   ssize_t ret;
1447
1448   (void) value;
1449
1450   if (NULL == peer)
1451   {
1452     return GNUNET_YES;
1453   }
1454   size = GNUNET_snprintf (peer_string,
1455                           sizeof(peer_string),
1456                           "%s\n",
1457                           GNUNET_i2s_full (peer));
1458   GNUNET_assert (53 == size);
1459   ret = GNUNET_DISK_file_write (fh,
1460                                 peer_string,
1461                                 size);
1462   GNUNET_assert (size == ret);
1463   return GNUNET_YES;
1464 }
1465
1466
1467 /**
1468  * @brief Store the peers currently in #valid_peers to disk.
1469  *
1470  * @param sub Sub for which to store the valid peers
1471  */
1472 static void
1473 store_valid_peers (const struct Sub *sub)
1474 {
1475   struct GNUNET_DISK_FileHandle *fh;
1476   uint32_t number_written_peers;
1477   int ret;
1478
1479   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1480   {
1481     return;
1482   }
1483
1484   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1485   if (GNUNET_SYSERR == ret)
1486   {
1487     LOG (GNUNET_ERROR_TYPE_WARNING,
1488          "Not able to create directory for file `%s'\n",
1489          sub->filename_valid_peers);
1490     GNUNET_break (0);
1491   }
1492   else if (GNUNET_NO == ret)
1493   {
1494     LOG (GNUNET_ERROR_TYPE_WARNING,
1495          "Directory for file `%s' exists but is not writable for us\n",
1496          sub->filename_valid_peers);
1497     GNUNET_break (0);
1498   }
1499   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1500                               GNUNET_DISK_OPEN_WRITE
1501                               | GNUNET_DISK_OPEN_CREATE,
1502                               GNUNET_DISK_PERM_USER_READ
1503                               | GNUNET_DISK_PERM_USER_WRITE);
1504   if (NULL == fh)
1505   {
1506     LOG (GNUNET_ERROR_TYPE_WARNING,
1507          "Not able to write valid peers to file `%s'\n",
1508          sub->filename_valid_peers);
1509     return;
1510   }
1511   LOG (GNUNET_ERROR_TYPE_DEBUG,
1512        "Writing %u valid peers to disk\n",
1513        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1514   number_written_peers =
1515     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1516                                            store_peer_presistently_iterator,
1517                                            fh);
1518   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1519   GNUNET_assert (number_written_peers ==
1520                  GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1521 }
1522
1523
1524 /**
1525  * @brief Convert string representation of peer id to peer id.
1526  *
1527  * Counterpart to #GNUNET_i2s_full.
1528  *
1529  * @param string_repr The string representation of the peer id
1530  *
1531  * @return The peer id
1532  */
1533 static const struct GNUNET_PeerIdentity *
1534 s2i_full (const char *string_repr)
1535 {
1536   struct GNUNET_PeerIdentity *peer;
1537   size_t len;
1538   int ret;
1539
1540   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1541   len = strlen (string_repr);
1542   if (52 > len)
1543   {
1544     LOG (GNUNET_ERROR_TYPE_WARNING,
1545          "Not able to convert string representation of PeerID to PeerID\n"
1546          "Sting representation: %s (len %lu) - too short\n",
1547          string_repr,
1548          len);
1549     GNUNET_break (0);
1550   }
1551   else if (52 < len)
1552   {
1553     len = 52;
1554   }
1555   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1556                                                     len,
1557                                                     &peer->public_key);
1558   if (GNUNET_OK != ret)
1559   {
1560     LOG (GNUNET_ERROR_TYPE_WARNING,
1561          "Not able to convert string representation of PeerID to PeerID\n"
1562          "Sting representation: %s\n",
1563          string_repr);
1564     GNUNET_break (0);
1565   }
1566   return peer;
1567 }
1568
1569
1570 /**
1571  * @brief Restore the peers on disk to #valid_peers.
1572  *
1573  * @param sub Sub for which to restore the valid peers
1574  */
1575 static void
1576 restore_valid_peers (const struct Sub *sub)
1577 {
1578   off_t file_size;
1579   uint32_t num_peers;
1580   struct GNUNET_DISK_FileHandle *fh;
1581   char *buf;
1582   ssize_t size_read;
1583   char *iter_buf;
1584   char *str_repr;
1585   const struct GNUNET_PeerIdentity *peer;
1586
1587   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1588   {
1589     return;
1590   }
1591
1592   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1593   {
1594     return;
1595   }
1596   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1597                               GNUNET_DISK_OPEN_READ,
1598                               GNUNET_DISK_PERM_NONE);
1599   GNUNET_assert (NULL != fh);
1600   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1601   num_peers = file_size / 53;
1602   buf = GNUNET_malloc (file_size);
1603   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1604   GNUNET_assert (size_read == file_size);
1605   LOG (GNUNET_ERROR_TYPE_DEBUG,
1606        "Restoring %" PRIu32 " peers from file `%s'\n",
1607        num_peers,
1608        sub->filename_valid_peers);
1609   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1610   {
1611     str_repr = GNUNET_strndup (iter_buf, 53);
1612     peer = s2i_full (str_repr);
1613     GNUNET_free (str_repr);
1614     add_valid_peer (peer, sub->valid_peers);
1615     LOG (GNUNET_ERROR_TYPE_DEBUG,
1616          "Restored valid peer %s from disk\n",
1617          GNUNET_i2s_full (peer));
1618   }
1619   iter_buf = NULL;
1620   GNUNET_free (buf);
1621   LOG (GNUNET_ERROR_TYPE_DEBUG,
1622        "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1623        num_peers,
1624        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1625   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1626   {
1627     LOG (GNUNET_ERROR_TYPE_WARNING,
1628          "Number of restored peers does not match file size. Have probably duplicates.\n");
1629   }
1630   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1631   LOG (GNUNET_ERROR_TYPE_DEBUG,
1632        "Restored %u valid peers from disk\n",
1633        GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1634 }
1635
1636
1637 /**
1638  * @brief Delete storage of peers that was created with #initialise_peers ()
1639  *
1640  * @param sub Sub for which the storage is deleted
1641  */
1642 static void
1643 peers_terminate (struct Sub *sub)
1644 {
1645   if (GNUNET_SYSERR ==
1646       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1647                                              &peermap_clear_iterator,
1648                                              sub))
1649   {
1650     LOG (GNUNET_ERROR_TYPE_WARNING,
1651          "Iteration destroying peers was aborted.\n");
1652   }
1653   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1654   sub->peer_map = NULL;
1655   store_valid_peers (sub);
1656   GNUNET_free (sub->filename_valid_peers);
1657   sub->filename_valid_peers = NULL;
1658   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1659   sub->valid_peers = NULL;
1660 }
1661
1662
1663 /**
1664  * Iterator over #valid_peers hash map entries.
1665  *
1666  * @param cls Closure that contains iterator function and closure
1667  * @param peer current peer id
1668  * @param value value in the hash map - unused
1669  * @return #GNUNET_YES if we should continue to
1670  *         iterate,
1671  *         #GNUNET_NO if not.
1672  */
1673 static int
1674 valid_peer_iterator (void *cls,
1675                      const struct GNUNET_PeerIdentity *peer,
1676                      void *value)
1677 {
1678   struct PeersIteratorCls *it_cls = cls;
1679
1680   (void) value;
1681
1682   return it_cls->iterator (it_cls->cls, peer);
1683 }
1684
1685
1686 /**
1687  * @brief Get all currently known, valid peer ids.
1688  *
1689  * @param valid_peers Peer map containing the valid peers in question
1690  * @param iterator function to call on each peer id
1691  * @param it_cls extra argument to @a iterator
1692  * @return the number of key value pairs processed,
1693  *         #GNUNET_SYSERR if it aborted iteration
1694  */
1695 static int
1696 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1697                  PeersIterator iterator,
1698                  void *it_cls)
1699 {
1700   struct PeersIteratorCls *cls;
1701   int ret;
1702
1703   cls = GNUNET_new (struct PeersIteratorCls);
1704   cls->iterator = iterator;
1705   cls->cls = it_cls;
1706   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1707                                                valid_peer_iterator,
1708                                                cls);
1709   GNUNET_free (cls);
1710   return ret;
1711 }
1712
1713
1714 /**
1715  * @brief Add peer to known peers.
1716  *
1717  * This function is called on new peer_ids from 'external' sources
1718  * (client seed, cadet get_peers(), ...)
1719  *
1720  * @param sub Sub with the peer map that the @a peer will be added to
1721  * @param peer the new #GNUNET_PeerIdentity
1722  *
1723  * @return #GNUNET_YES if peer was inserted
1724  *         #GNUNET_NO  otherwise
1725  */
1726 static int
1727 insert_peer (struct Sub *sub,
1728              const struct GNUNET_PeerIdentity *peer)
1729 {
1730   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1731   {
1732     return GNUNET_NO;   /* We already know this peer - nothing to do */
1733   }
1734   (void) create_peer_ctx (sub, peer);
1735   return GNUNET_YES;
1736 }
1737
1738
1739 /**
1740  * @brief Check whether flags on a peer are set.
1741  *
1742  * @param peer_map Peer map that is expected to contain the @a peer
1743  * @param peer the peer to check the flag of
1744  * @param flags the flags to check
1745  *
1746  * @return #GNUNET_SYSERR if peer is not known
1747  *         #GNUNET_YES    if all given flags are set
1748  *         #GNUNET_NO     otherwise
1749  */
1750 static int
1751 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1752                  const struct GNUNET_PeerIdentity *peer,
1753                  enum Peers_PeerFlags flags)
1754 {
1755   struct PeerContext *peer_ctx;
1756
1757   if (GNUNET_NO == check_peer_known (peer_map, peer))
1758   {
1759     return GNUNET_SYSERR;
1760   }
1761   peer_ctx = get_peer_ctx (peer_map, peer);
1762   return check_peer_flag_set (peer_ctx, flags);
1763 }
1764
1765
1766 /**
1767  * @brief Try connecting to a peer to see whether it is online
1768  *
1769  * If not known yet, insert into known peers
1770  *
1771  * @param sub Sub which would contain the @a peer
1772  * @param peer the peer whose online is to be checked
1773  * @return #GNUNET_YES if the check was issued
1774  *         #GNUNET_NO  otherwise
1775  */
1776 static int
1777 issue_peer_online_check (struct Sub *sub,
1778                          const struct GNUNET_PeerIdentity *peer)
1779 {
1780   struct PeerContext *peer_ctx;
1781
1782   (void) insert_peer (sub, peer);   // TODO even needed?
1783   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1784   if ((GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1785       (NULL == peer_ctx->online_check_pending))
1786   {
1787     check_peer_online (peer_ctx);
1788     return GNUNET_YES;
1789   }
1790   return GNUNET_NO;
1791 }
1792
1793
1794 /**
1795  * @brief Check if peer is removable.
1796  *
1797  * Check if
1798  *  - a recv channel exists
1799  *  - there are pending messages
1800  *  - there is no pending pull reply
1801  *
1802  * @param peer_ctx Context of the peer in question
1803  * @return #GNUNET_YES    if peer is removable
1804  *         #GNUNET_NO     if peer is NOT removable
1805  *         #GNUNET_SYSERR if peer is not known
1806  */
1807 static int
1808 check_removable (const struct PeerContext *peer_ctx)
1809 {
1810   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (
1811         peer_ctx->sub->peer_map,
1812         &peer_ctx->peer_id))
1813   {
1814     return GNUNET_SYSERR;
1815   }
1816
1817   if ((NULL != peer_ctx->recv_channel_ctx) ||
1818       (NULL != peer_ctx->pending_messages_head) ||
1819       (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)))
1820   {
1821     return GNUNET_NO;
1822   }
1823   return GNUNET_YES;
1824 }
1825
1826
1827 /**
1828  * @brief Check whether @a peer is actually a peer.
1829  *
1830  * A valid peer is a peer that we know exists eg. we were connected to once.
1831  *
1832  * @param valid_peers Peer map that would contain the @a peer
1833  * @param peer peer in question
1834  *
1835  * @return #GNUNET_YES if peer is valid
1836  *         #GNUNET_NO  if peer is not valid
1837  */
1838 static int
1839 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1840                   const struct GNUNET_PeerIdentity *peer)
1841 {
1842   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1843 }
1844
1845
1846 /**
1847  * @brief Indicate that we want to send to the other peer
1848  *
1849  * This establishes a sending channel
1850  *
1851  * @param peer_ctx Context of the target peer
1852  */
1853 static void
1854 indicate_sending_intention (struct PeerContext *peer_ctx)
1855 {
1856   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1857                                                  &peer_ctx->peer_id));
1858   (void) get_channel (peer_ctx);
1859 }
1860
1861
1862 /**
1863  * @brief Check whether other peer has the intention to send/opened channel
1864  *        towars us
1865  *
1866  * @param peer_ctx Context of the peer in question
1867  *
1868  * @return #GNUNET_YES if peer has the intention to send
1869  *         #GNUNET_NO  otherwise
1870  */
1871 static int
1872 check_peer_send_intention (const struct PeerContext *peer_ctx)
1873 {
1874   if (NULL != peer_ctx->recv_channel_ctx)
1875   {
1876     return GNUNET_YES;
1877   }
1878   return GNUNET_NO;
1879 }
1880
1881
1882 /**
1883  * Handle the channel a peer opens to us.
1884  *
1885  * @param cls The closure - Sub
1886  * @param channel The channel the peer wants to establish
1887  * @param initiator The peer's peer ID
1888  *
1889  * @return initial channel context for the channel
1890  *         (can be NULL -- that's not an error)
1891  */
1892 static void *
1893 handle_inbound_channel (void *cls,
1894                         struct GNUNET_CADET_Channel *channel,
1895                         const struct GNUNET_PeerIdentity *initiator)
1896 {
1897   struct PeerContext *peer_ctx;
1898   struct ChannelCtx *channel_ctx;
1899   struct Sub *sub = cls;
1900
1901   LOG (GNUNET_ERROR_TYPE_DEBUG,
1902        "New channel was established to us (Peer %s).\n",
1903        GNUNET_i2s (initiator));
1904   GNUNET_assert (NULL != channel);  /* according to cadet API */
1905   /* Make sure we 'know' about this peer */
1906   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1907   set_peer_online (peer_ctx);
1908   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1909   channel_ctx = add_channel_ctx (peer_ctx);
1910   channel_ctx->channel = channel;
1911   /* We only accept one incoming channel per peer */
1912   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1913                                                              initiator)))
1914   {
1915     LOG (GNUNET_ERROR_TYPE_WARNING,
1916          "Already got one receive channel. Destroying old one.\n");
1917     GNUNET_break_op (0);
1918     destroy_channel (peer_ctx->recv_channel_ctx);
1919     peer_ctx->recv_channel_ctx = channel_ctx;
1920     /* return the channel context */
1921     return channel_ctx;
1922   }
1923   peer_ctx->recv_channel_ctx = channel_ctx;
1924   return channel_ctx;
1925 }
1926
1927
1928 /**
1929  * @brief Check whether a sending channel towards the given peer exists
1930  *
1931  * @param peer_ctx Context of the peer in question
1932  *
1933  * @return #GNUNET_YES if a sending channel towards that peer exists
1934  *         #GNUNET_NO  otherwise
1935  */
1936 static int
1937 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1938 {
1939   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1940                                      &peer_ctx->peer_id))
1941   {   /* If no such peer exists, there is no channel */
1942     return GNUNET_NO;
1943   }
1944   if (NULL == peer_ctx->send_channel_ctx)
1945   {
1946     return GNUNET_NO;
1947   }
1948   return GNUNET_YES;
1949 }
1950
1951
1952 /**
1953  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1954  *        intention to another peer
1955  *
1956  * @param peer_ctx Context to the peer
1957  * @return #GNUNET_YES if channel was destroyed
1958  *         #GNUNET_NO  otherwise
1959  */
1960 static int
1961 destroy_sending_channel (struct PeerContext *peer_ctx)
1962 {
1963   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1964                                      &peer_ctx->peer_id))
1965   {
1966     return GNUNET_NO;
1967   }
1968   if (NULL != peer_ctx->send_channel_ctx)
1969   {
1970     destroy_channel (peer_ctx->send_channel_ctx);
1971     (void) check_connected (peer_ctx);
1972     return GNUNET_YES;
1973   }
1974   return GNUNET_NO;
1975 }
1976
1977
1978 /**
1979  * @brief Send a message to another peer.
1980  *
1981  * Keeps track about pending messages so they can be properly removed when the
1982  * peer is destroyed.
1983  *
1984  * @param peer_ctx Context of the peer to which the message is to be sent
1985  * @param ev envelope of the message
1986  * @param type type of the message
1987  */
1988 static void
1989 send_message (struct PeerContext *peer_ctx,
1990               struct GNUNET_MQ_Envelope *ev,
1991               const char *type)
1992 {
1993   struct PendingMessage *pending_msg;
1994   struct GNUNET_MQ_Handle *mq;
1995
1996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997               "Sending message to %s of type %s\n",
1998               GNUNET_i2s (&peer_ctx->peer_id),
1999               type);
2000   pending_msg = insert_pending_message (peer_ctx, ev, type);
2001   mq = get_mq (peer_ctx);
2002   GNUNET_MQ_notify_sent (ev,
2003                          mq_notify_sent_cb,
2004                          pending_msg);
2005   GNUNET_MQ_send (mq, ev);
2006 }
2007
2008
2009 /**
2010  * @brief Schedule a operation on given peer
2011  *
2012  * Avoids scheduling an operation twice.
2013  *
2014  * @param peer_ctx Context of the peer for which to schedule the operation
2015  * @param peer_op the operation to schedule
2016  * @param cls Closure to @a peer_op
2017  *
2018  * @return #GNUNET_YES if the operation was scheduled
2019  *         #GNUNET_NO  otherwise
2020  */
2021 static int
2022 schedule_operation (struct PeerContext *peer_ctx,
2023                     const PeerOp peer_op,
2024                     void *cls)
2025 {
2026   struct PeerPendingOp pending_op;
2027
2028   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2029                                                  &peer_ctx->peer_id));
2030
2031   // TODO if ONLINE execute immediately
2032
2033   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2034   {
2035     pending_op.op = peer_op;
2036     pending_op.op_cls = cls;
2037     GNUNET_array_append (peer_ctx->pending_ops,
2038                          peer_ctx->num_pending_ops,
2039                          pending_op);
2040     return GNUNET_YES;
2041   }
2042   return GNUNET_NO;
2043 }
2044
2045
2046 /***********************************************************************
2047 * /Old gnunet-service-rps_peers.c
2048 ***********************************************************************/
2049
2050
2051 /***********************************************************************
2052 * Housekeeping with clients
2053 ***********************************************************************/
2054
2055 /**
2056  * Closure used to pass the client and the id to the callback
2057  * that replies to a client's request
2058  */
2059 struct ReplyCls
2060 {
2061   /**
2062    * DLL
2063    */
2064   struct ReplyCls *next;
2065   struct ReplyCls *prev;
2066
2067   /**
2068    * The identifier of the request
2069    */
2070   uint32_t id;
2071
2072   /**
2073    * The handle to the request
2074    */
2075   struct RPS_SamplerRequestHandle *req_handle;
2076
2077   /**
2078    * The client handle to send the reply to
2079    */
2080   struct ClientContext *cli_ctx;
2081 };
2082
2083
2084 /**
2085  * Struct used to store the context of a connected client.
2086  */
2087 struct ClientContext
2088 {
2089   /**
2090    * DLL
2091    */
2092   struct ClientContext *next;
2093   struct ClientContext *prev;
2094
2095   /**
2096    * The message queue to communicate with the client.
2097    */
2098   struct GNUNET_MQ_Handle *mq;
2099
2100   /**
2101    * @brief How many updates this client expects to receive.
2102    */
2103   int64_t view_updates_left;
2104
2105   /**
2106    * @brief Whether this client wants to receive stream updates.
2107    * Either #GNUNET_YES or #GNUNET_NO
2108    */
2109   int8_t stream_update;
2110
2111   /**
2112    * The client handle to send the reply to
2113    */
2114   struct GNUNET_SERVICE_Client *client;
2115
2116   /**
2117    * The #Sub this context belongs to
2118    */
2119   struct Sub *sub;
2120 };
2121
2122 /**
2123  * DLL with all clients currently connected to us
2124  */
2125 struct ClientContext *cli_ctx_head;
2126 struct ClientContext *cli_ctx_tail;
2127
2128 /***********************************************************************
2129 * /Housekeeping with clients
2130 ***********************************************************************/
2131
2132
2133 /***********************************************************************
2134 * Util functions
2135 ***********************************************************************/
2136
2137
2138 /**
2139  * Print peerlist to log.
2140  */
2141 static void
2142 print_peer_list (struct GNUNET_PeerIdentity *list,
2143                  unsigned int len)
2144 {
2145   unsigned int i;
2146
2147   LOG (GNUNET_ERROR_TYPE_DEBUG,
2148        "Printing peer list of length %u at %p:\n",
2149        len,
2150        list);
2151   for (i = 0; i < len; i++)
2152   {
2153     LOG (GNUNET_ERROR_TYPE_DEBUG,
2154          "%u. peer: %s\n",
2155          i, GNUNET_i2s (&list[i]));
2156   }
2157 }
2158
2159
2160 /**
2161  * Remove peer from list.
2162  */
2163 static void
2164 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2165                unsigned int *list_size,
2166                const struct GNUNET_PeerIdentity *peer)
2167 {
2168   unsigned int i;
2169   struct GNUNET_PeerIdentity *tmp;
2170
2171   tmp = *peer_list;
2172
2173   LOG (GNUNET_ERROR_TYPE_DEBUG,
2174        "Removing peer %s from list at %p\n",
2175        GNUNET_i2s (peer),
2176        tmp);
2177
2178   for (i = 0; i < *list_size; i++)
2179   {
2180     if (0 == GNUNET_memcmp (&tmp[i], peer))
2181     {
2182       if (i < *list_size - 1)
2183       {       /* Not at the last entry -- shift peers left */
2184         memmove (&tmp[i], &tmp[i + 1],
2185                  ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
2186       }
2187       /* Remove last entry (should be now useless PeerID) */
2188       GNUNET_array_grow (tmp, *list_size, (*list_size) - 1);
2189     }
2190   }
2191   *peer_list = tmp;
2192 }
2193
2194
2195 /**
2196  * Insert PeerID in #view
2197  *
2198  * Called once we know a peer is online.
2199  * Implements #PeerOp
2200  *
2201  * @return GNUNET_OK if peer was actually inserted
2202  *         GNUNET_NO if peer was not inserted
2203  */
2204 static void
2205 insert_in_view_op (void *cls,
2206                    const struct GNUNET_PeerIdentity *peer);
2207
2208 /**
2209  * Insert PeerID in #view
2210  *
2211  * Called once we know a peer is online.
2212  *
2213  * @param sub Sub in with the view to insert in
2214  * @param peer the peer to insert
2215  *
2216  * @return GNUNET_OK if peer was actually inserted
2217  *         GNUNET_NO if peer was not inserted
2218  */
2219 static int
2220 insert_in_view (struct Sub *sub,
2221                 const struct GNUNET_PeerIdentity *peer)
2222 {
2223   struct PeerContext *peer_ctx;
2224   int online;
2225   int ret;
2226
2227   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2228   peer_ctx = get_peer_ctx (sub->peer_map, peer);  // TODO indirection needed?
2229   if ((GNUNET_NO == online) ||
2230       (GNUNET_SYSERR == online))   /* peer is not even known */
2231   {
2232     (void) issue_peer_online_check (sub, peer);
2233     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2234     return GNUNET_NO;
2235   }
2236   /* Open channel towards peer to keep connection open */
2237   indicate_sending_intention (peer_ctx);
2238   ret = View_put (sub->view, peer);
2239   if (peer_ctx->sub == msub)
2240   {
2241     GNUNET_STATISTICS_set (stats,
2242                            "view size",
2243                            View_size (peer_ctx->sub->view),
2244                            GNUNET_NO);
2245   }
2246   return ret;
2247 }
2248
2249
2250 /**
2251  * @brief Send view to client
2252  *
2253  * @param cli_ctx the context of the client
2254  * @param view_array the peerids of the view as array (can be empty)
2255  * @param view_size the size of the view array (can be 0)
2256  */
2257 static void
2258 send_view (const struct ClientContext *cli_ctx,
2259            const struct GNUNET_PeerIdentity *view_array,
2260            uint64_t view_size)
2261 {
2262   struct GNUNET_MQ_Envelope *ev;
2263   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2264   struct Sub *sub;
2265
2266   if (NULL == view_array)
2267   {
2268     if (NULL == cli_ctx->sub)
2269       sub = msub;
2270     else
2271       sub = cli_ctx->sub;
2272     view_size = View_size (sub->view);
2273     view_array = View_get_as_array (sub->view);
2274   }
2275
2276   ev = GNUNET_MQ_msg_extra (out_msg,
2277                             view_size * sizeof(struct GNUNET_PeerIdentity),
2278                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2279   out_msg->num_peers = htonl (view_size);
2280
2281   GNUNET_memcpy (&out_msg[1],
2282                  view_array,
2283                  view_size * sizeof(struct GNUNET_PeerIdentity));
2284   GNUNET_MQ_send (cli_ctx->mq, ev);
2285 }
2286
2287
2288 /**
2289  * @brief Send peer from biased stream to client.
2290  *
2291  * TODO merge with send_view, parameterise
2292  *
2293  * @param cli_ctx the context of the client
2294  * @param view_array the peerids of the view as array (can be empty)
2295  * @param view_size the size of the view array (can be 0)
2296  */
2297 static void
2298 send_stream_peers (const struct ClientContext *cli_ctx,
2299                    uint64_t num_peers,
2300                    const struct GNUNET_PeerIdentity *peers)
2301 {
2302   struct GNUNET_MQ_Envelope *ev;
2303   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2304
2305   GNUNET_assert (NULL != peers);
2306
2307   ev = GNUNET_MQ_msg_extra (out_msg,
2308                             num_peers * sizeof(struct GNUNET_PeerIdentity),
2309                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2310   out_msg->num_peers = htonl (num_peers);
2311
2312   GNUNET_memcpy (&out_msg[1],
2313                  peers,
2314                  num_peers * sizeof(struct GNUNET_PeerIdentity));
2315   GNUNET_MQ_send (cli_ctx->mq, ev);
2316 }
2317
2318
2319 /**
2320  * @brief sends updates to clients that are interested
2321  *
2322  * @param sub Sub for which to notify clients
2323  */
2324 static void
2325 clients_notify_view_update (const struct Sub *sub)
2326 {
2327   struct ClientContext *cli_ctx_iter;
2328   uint64_t num_peers;
2329   const struct GNUNET_PeerIdentity *view_array;
2330
2331   num_peers = View_size (sub->view);
2332   view_array = View_get_as_array (sub->view);
2333   /* check size of view is small enough */
2334   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2335   {
2336     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2337                 "View is too big to send\n");
2338     return;
2339   }
2340
2341   for (cli_ctx_iter = cli_ctx_head;
2342        NULL != cli_ctx_iter;
2343        cli_ctx_iter = cli_ctx_iter->next)
2344   {
2345     if (1 < cli_ctx_iter->view_updates_left)
2346     {
2347       /* Client wants to receive limited amount of updates */
2348       cli_ctx_iter->view_updates_left -= 1;
2349     }
2350     else if (1 == cli_ctx_iter->view_updates_left)
2351     {
2352       /* Last update of view for client */
2353       cli_ctx_iter->view_updates_left = -1;
2354     }
2355     else if (0 > cli_ctx_iter->view_updates_left)
2356     {
2357       /* Client is not interested in updates */
2358       continue;
2359     }
2360     /* else _updates_left == 0 - infinite amount of updates */
2361
2362     /* send view */
2363     send_view (cli_ctx_iter, view_array, num_peers);
2364   }
2365 }
2366
2367
2368 /**
2369  * @brief sends updates to clients that are interested
2370  *
2371  * @param num_peers Number of peers to send
2372  * @param peers the array of peers to send
2373  */
2374 static void
2375 clients_notify_stream_peer (const struct Sub *sub,
2376                             uint64_t num_peers,
2377                             const struct GNUNET_PeerIdentity *peers)
2378 // TODO enum StreamPeerSource)
2379 {
2380   struct ClientContext *cli_ctx_iter;
2381
2382   LOG (GNUNET_ERROR_TYPE_DEBUG,
2383        "Got peer (%s) from biased stream - update all clients\n",
2384        GNUNET_i2s (peers));
2385
2386   for (cli_ctx_iter = cli_ctx_head;
2387        NULL != cli_ctx_iter;
2388        cli_ctx_iter = cli_ctx_iter->next)
2389   {
2390     if ((GNUNET_YES == cli_ctx_iter->stream_update) &&
2391         ((sub == cli_ctx_iter->sub) || (sub == msub) ))
2392     {
2393       send_stream_peers (cli_ctx_iter, num_peers, peers);
2394     }
2395   }
2396 }
2397
2398
2399 /**
2400  * Put random peer from sampler into the view as history update.
2401  *
2402  * @param ids Array of Peers to insert into view
2403  * @param num_peers Number of peers to insert
2404  * @param cls Closure - The Sub for which this is to be done
2405  */
2406 static void
2407 hist_update (const struct GNUNET_PeerIdentity *ids,
2408              uint32_t num_peers,
2409              void *cls)
2410 {
2411   unsigned int i;
2412   struct Sub *sub = cls;
2413
2414   for (i = 0; i < num_peers; i++)
2415   {
2416     int inserted;
2417     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2418     {
2419       LOG (GNUNET_ERROR_TYPE_WARNING,
2420            "Peer in history update not known!\n");
2421       continue;
2422     }
2423     inserted = insert_in_view (sub, &ids[i]);
2424     if (GNUNET_OK == inserted)
2425     {
2426       clients_notify_stream_peer (sub, 1, &ids[i]);
2427     }
2428 #ifdef TO_FILE_FULL
2429     to_file (sub->file_name_view_log,
2430              "+%s\t(hist)",
2431              GNUNET_i2s_full (ids));
2432 #endif /* TO_FILE_FULL */
2433   }
2434   clients_notify_view_update (sub);
2435 }
2436
2437
2438 /**
2439  * Wrapper around #RPS_sampler_resize()
2440  *
2441  * If we do not have enough sampler elements, double current sampler size
2442  * If we have more than enough sampler elements, halv current sampler size
2443  *
2444  * @param sampler The sampler to resize
2445  * @param new_size New size to which to resize
2446  */
2447 static void
2448 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2449 {
2450   unsigned int sampler_size;
2451
2452   // TODO statistics
2453   // TODO respect the min, max
2454   sampler_size = RPS_sampler_get_size (sampler);
2455   if (sampler_size > new_size * 4)
2456   {   /* Shrinking */
2457     RPS_sampler_resize (sampler, sampler_size / 2);
2458   }
2459   else if (sampler_size < new_size)
2460   {   /* Growing */
2461     RPS_sampler_resize (sampler, sampler_size * 2);
2462   }
2463   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2464 }
2465
2466
2467 /**
2468  * Add all peers in @a peer_array to @a peer_map used as set.
2469  *
2470  * @param peer_array array containing the peers
2471  * @param num_peers number of peers in @peer_array
2472  * @param peer_map the peermap to use as set
2473  */
2474 static void
2475 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2476                        unsigned int num_peers,
2477                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2478 {
2479   unsigned int i;
2480
2481   if (NULL == peer_map)
2482   {
2483     LOG (GNUNET_ERROR_TYPE_WARNING,
2484          "Trying to add peers to non-existing peermap.\n");
2485     return;
2486   }
2487
2488   for (i = 0; i < num_peers; i++)
2489   {
2490     GNUNET_CONTAINER_multipeermap_put (peer_map,
2491                                        &peer_array[i],
2492                                        NULL,
2493                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2494     if (msub->peer_map == peer_map)
2495     {
2496       GNUNET_STATISTICS_set (stats,
2497                              "# known peers",
2498                              GNUNET_CONTAINER_multipeermap_size (peer_map),
2499                              GNUNET_NO);
2500     }
2501   }
2502 }
2503
2504
2505 /**
2506  * Send a PULL REPLY to @a peer_id
2507  *
2508  * @param peer_ctx Context of the peer to send the reply to
2509  * @param peer_ids the peers to send to @a peer_id
2510  * @param num_peer_ids the number of peers to send to @a peer_id
2511  */
2512 static void
2513 send_pull_reply (struct PeerContext *peer_ctx,
2514                  const struct GNUNET_PeerIdentity *peer_ids,
2515                  unsigned int num_peer_ids)
2516 {
2517   uint32_t send_size;
2518   struct GNUNET_MQ_Envelope *ev;
2519   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2520
2521   /* Compute actual size */
2522   send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)
2523               + num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
2524
2525   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2526     /* Compute number of peers to send
2527      * If too long, simply truncate */
2528     // TODO select random ones via permutation
2529     //      or even better: do good protocol design
2530     send_size =
2531       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE
2532        - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage))
2533       / sizeof(struct GNUNET_PeerIdentity);
2534   else
2535     send_size = num_peer_ids;
2536
2537   LOG (GNUNET_ERROR_TYPE_DEBUG,
2538        "Going to send PULL REPLY with %u peers to %s\n",
2539        send_size, GNUNET_i2s (&peer_ctx->peer_id));
2540
2541   ev = GNUNET_MQ_msg_extra (out_msg,
2542                             send_size * sizeof(struct GNUNET_PeerIdentity),
2543                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2544   out_msg->num_peers = htonl (send_size);
2545   GNUNET_memcpy (&out_msg[1], peer_ids,
2546                  send_size * sizeof(struct GNUNET_PeerIdentity));
2547
2548   send_message (peer_ctx, ev, "PULL REPLY");
2549   if (peer_ctx->sub == msub)
2550   {
2551     GNUNET_STATISTICS_update (stats, "# pull reply send issued", 1, GNUNET_NO);
2552   }
2553   // TODO check with send intention: as send_channel is used/opened we indicate
2554   // a sending intention without intending it.
2555   // -> clean peer afterwards?
2556   // -> use recv_channel?
2557 }
2558
2559
2560 /**
2561  * Insert PeerID in #pull_map
2562  *
2563  * Called once we know a peer is online.
2564  *
2565  * @param cls Closure - Sub with the pull map to insert into
2566  * @param peer Peer to insert
2567  */
2568 static void
2569 insert_in_pull_map (void *cls,
2570                     const struct GNUNET_PeerIdentity *peer)
2571 {
2572   struct Sub *sub = cls;
2573
2574   CustomPeerMap_put (sub->pull_map, peer);
2575 }
2576
2577
2578 /**
2579  * Insert PeerID in #view
2580  *
2581  * Called once we know a peer is online.
2582  * Implements #PeerOp
2583  *
2584  * @param cls Closure - Sub with view to insert peer into
2585  * @param peer the peer to insert
2586  */
2587 static void
2588 insert_in_view_op (void *cls,
2589                    const struct GNUNET_PeerIdentity *peer)
2590 {
2591   struct Sub *sub = cls;
2592   int inserted;
2593
2594   inserted = insert_in_view (sub, peer);
2595   if (GNUNET_OK == inserted)
2596   {
2597     clients_notify_stream_peer (sub, 1, peer);
2598   }
2599 }
2600
2601
2602 /**
2603  * Update sampler with given PeerID.
2604  * Implements #PeerOp
2605  *
2606  * @param cls Closure - Sub containing the sampler to insert into
2607  * @param peer Peer to insert
2608  */
2609 static void
2610 insert_in_sampler (void *cls,
2611                    const struct GNUNET_PeerIdentity *peer)
2612 {
2613   struct Sub *sub = cls;
2614
2615   LOG (GNUNET_ERROR_TYPE_DEBUG,
2616        "Updating samplers with peer %s from insert_in_sampler()\n",
2617        GNUNET_i2s (peer));
2618   RPS_sampler_update (sub->sampler, peer);
2619   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2620   {
2621     /* Make sure we 'know' about this peer */
2622     (void) issue_peer_online_check (sub, peer);
2623     /* Establish a channel towards that peer to indicate we are going to send
2624      * messages to it */
2625     // indicate_sending_intention (peer);
2626   }
2627   if (sub == msub)
2628   {
2629     GNUNET_STATISTICS_update (stats,
2630                               "# observed peers in gossip",
2631                               1,
2632                               GNUNET_NO);
2633   }
2634 #ifdef TO_FILE
2635   sub->num_observed_peers++;
2636   GNUNET_CONTAINER_multipeermap_put
2637     (sub->observed_unique_peers,
2638     peer,
2639     NULL,
2640     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2641   uint32_t num_observed_unique_peers =
2642     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2643   GNUNET_STATISTICS_set (stats,
2644                          "# unique peers in gossip",
2645                          num_observed_unique_peers,
2646                          GNUNET_NO);
2647 #ifdef TO_FILE_FULL
2648   to_file (sub->file_name_observed_log,
2649            "%" PRIu32 " %" PRIu32 " %f\n",
2650            sub->num_observed_peers,
2651            num_observed_unique_peers,
2652            1.0 * num_observed_unique_peers / sub->num_observed_peers)
2653 #endif /* TO_FILE_FULL */
2654 #endif /* TO_FILE */
2655 }
2656
2657
2658 /**
2659  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2660  *        If the peer is not known, online check is issued and it is
2661  *        scheduled to be inserted in sampler and view.
2662  *
2663  * "External sources" refer to every source except the gossip.
2664  *
2665  * @param sub Sub for which @a peer was received
2666  * @param peer peer to insert/peer received
2667  */
2668 static void
2669 got_peer (struct Sub *sub,
2670           const struct GNUNET_PeerIdentity *peer)
2671 {
2672   /* If we did not know this peer already, insert it into sampler and view */
2673   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2674   {
2675     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2676                         &insert_in_sampler, sub);
2677     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2678                         &insert_in_view_op, sub);
2679   }
2680   if (sub == msub)
2681   {
2682     GNUNET_STATISTICS_update (stats,
2683                               "# learnd peers",
2684                               1,
2685                               GNUNET_NO);
2686   }
2687 }
2688
2689
2690 /**
2691  * @brief Checks if there is a sending channel and if it is needed
2692  *
2693  * @param peer_ctx Context of the peer to check
2694  * @return GNUNET_YES if sending channel exists and is still needed
2695  *         GNUNET_NO  otherwise
2696  */
2697 static int
2698 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2699 {
2700   /* struct GNUNET_CADET_Channel *channel; */
2701   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2702                                      &peer_ctx->peer_id))
2703   {
2704     return GNUNET_NO;
2705   }
2706   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2707   {
2708     if ((0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2709                                    &peer_ctx->peer_id)) ||
2710         (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2711                                            &peer_ctx->peer_id)) ||
2712         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2713                                                     &peer_ctx->peer_id)) ||
2714         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2715                                                     &peer_ctx->peer_id)) ||
2716         (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2717                                         &peer_ctx->peer_id,
2718                                         Peers_PULL_REPLY_PENDING)))
2719     {     /* If we want to keep the connection to peer open */
2720       return GNUNET_YES;
2721     }
2722     return GNUNET_NO;
2723   }
2724   return GNUNET_NO;
2725 }
2726
2727
2728 /**
2729  * @brief remove peer from our knowledge, the view, push and pull maps and
2730  * samplers.
2731  *
2732  * @param sub Sub with the data structures the peer is to be removed from
2733  * @param peer the peer to remove
2734  */
2735 static void
2736 remove_peer (struct Sub *sub,
2737              const struct GNUNET_PeerIdentity *peer)
2738 {
2739   (void) View_remove_peer (sub->view,
2740                            peer);
2741   CustomPeerMap_remove_peer (sub->pull_map,
2742                              peer);
2743   CustomPeerMap_remove_peer (sub->push_map,
2744                              peer);
2745   RPS_sampler_reinitialise_by_value (sub->sampler,
2746                                      peer);
2747   /* We want to destroy the peer now.
2748    * Sometimes, it just seems that it's already been removed from the peer_map,
2749    * so check the peer_map first. */
2750   if (GNUNET_YES == check_peer_known (sub->peer_map,
2751                                       peer))
2752   {
2753     destroy_peer (get_peer_ctx (sub->peer_map,
2754                                 peer));
2755   }
2756 }
2757
2758
2759 /**
2760  * @brief Remove data that is not needed anymore.
2761  *
2762  * If the sending channel is no longer needed it is destroyed.
2763  *
2764  * @param sub Sub in which the current peer is to be cleaned
2765  * @param peer the peer whose data is about to be cleaned
2766  */
2767 static void
2768 clean_peer (struct Sub *sub,
2769             const struct GNUNET_PeerIdentity *peer)
2770 {
2771   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2772                                                                peer)))
2773   {
2774     LOG (GNUNET_ERROR_TYPE_DEBUG,
2775          "Going to remove send channel to peer %s\n",
2776          GNUNET_i2s (peer));
2777     #if ENABLE_MALICIOUS
2778     if (0 != GNUNET_memcmp (&attacked_peer,
2779                             peer))
2780       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2781                                                     peer));
2782     #else /* ENABLE_MALICIOUS */
2783     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2784                                                   peer));
2785     #endif /* ENABLE_MALICIOUS */
2786   }
2787
2788   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2789                                                            peer))
2790   {
2791     /* Peer was already removed by callback on destroyed channel */
2792     LOG (GNUNET_ERROR_TYPE_WARNING,
2793          "Peer was removed from our knowledge during cleanup\n");
2794     return;
2795   }
2796
2797   if ((GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2798                                                              peer))) &&
2799       (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2800       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2801       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2802       (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2803       (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))))
2804   {   /* We can safely remove this peer */
2805     LOG (GNUNET_ERROR_TYPE_DEBUG,
2806          "Going to remove peer %s\n",
2807          GNUNET_i2s (peer));
2808     remove_peer (sub, peer);
2809     return;
2810   }
2811 }
2812
2813
2814 /**
2815  * @brief This is called when a channel is destroyed.
2816  *
2817  * Removes peer completely from our knowledge if the send_channel was destroyed
2818  * Otherwise simply delete the recv_channel
2819  * Also check if the knowledge about this peer is still needed.
2820  * If not, remove this peer from our knowledge.
2821  *
2822  * @param cls The closure - Context to the channel
2823  * @param channel The channel being closed
2824  */
2825 static void
2826 cleanup_destroyed_channel (void *cls,
2827                            const struct GNUNET_CADET_Channel *channel)
2828 {
2829   struct ChannelCtx *channel_ctx = cls;
2830   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2831
2832   (void) channel;
2833
2834   channel_ctx->channel = NULL;
2835   remove_channel_ctx (channel_ctx);
2836   if ((NULL != peer_ctx) &&
2837       (peer_ctx->send_channel_ctx == channel_ctx) &&
2838       (GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx)) )
2839   {
2840     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2841   }
2842 }
2843
2844
2845 /***********************************************************************
2846 * /Util functions
2847 ***********************************************************************/
2848
2849
2850 /***********************************************************************
2851 * Sub
2852 ***********************************************************************/
2853
2854 /**
2855  * @brief Create a new Sub
2856  *
2857  * @param hash Hash of value shared among rps instances on other hosts that
2858  *        defines a subgroup to sample from.
2859  * @param sampler_size Size of the sampler
2860  * @param round_interval Interval (in average) between two rounds
2861  *
2862  * @return Sub
2863  */
2864 struct Sub *
2865 new_sub (const struct GNUNET_HashCode *hash,
2866          uint32_t sampler_size,
2867          struct GNUNET_TIME_Relative round_interval)
2868 {
2869   struct Sub *sub;
2870
2871   sub = GNUNET_new (struct Sub);
2872
2873   /* With the hash generated from the secret value this service only connects
2874    * to rps instances that share the value */
2875   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2876     GNUNET_MQ_hd_fixed_size (peer_check,
2877                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2878                              struct GNUNET_MessageHeader,
2879                              NULL),
2880     GNUNET_MQ_hd_fixed_size (peer_push,
2881                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2882                              struct GNUNET_MessageHeader,
2883                              NULL),
2884     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2885                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2886                              struct GNUNET_MessageHeader,
2887                              NULL),
2888     GNUNET_MQ_hd_var_size (peer_pull_reply,
2889                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2890                            struct GNUNET_RPS_P2P_PullReplyMessage,
2891                            NULL),
2892     GNUNET_MQ_handler_end ()
2893   };
2894   sub->hash = *hash;
2895   sub->cadet_port =
2896     GNUNET_CADET_open_port (cadet_handle,
2897                             &sub->hash,
2898                             &handle_inbound_channel, /* Connect handler */
2899                             sub, /* cls */
2900                             NULL, /* WindowSize handler */
2901                             &cleanup_destroyed_channel, /* Disconnect handler */
2902                             cadet_handlers);
2903   if (NULL == sub->cadet_port)
2904   {
2905     LOG (GNUNET_ERROR_TYPE_ERROR,
2906          "Cadet port `%s' is already in use.\n",
2907          GNUNET_APPLICATION_PORT_RPS);
2908     GNUNET_assert (0);
2909   }
2910
2911   /* Set up general data structure to keep track about peers */
2912   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2913   if (GNUNET_OK !=
2914       GNUNET_CONFIGURATION_get_value_filename (cfg,
2915                                                "rps",
2916                                                "FILENAME_VALID_PEERS",
2917                                                &sub->filename_valid_peers))
2918   {
2919     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2920                                "rps",
2921                                "FILENAME_VALID_PEERS");
2922   }
2923   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2924   {
2925     char *tmp_filename_valid_peers;
2926     char str_hash[105];
2927
2928     GNUNET_snprintf (str_hash,
2929                      sizeof(str_hash),
2930                      GNUNET_h2s_full (hash));
2931     tmp_filename_valid_peers = sub->filename_valid_peers;
2932     GNUNET_asprintf (&sub->filename_valid_peers,
2933                      "%s%s",
2934                      tmp_filename_valid_peers,
2935                      str_hash);
2936     GNUNET_free (tmp_filename_valid_peers);
2937   }
2938   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2939
2940   /* Set up the sampler */
2941   sub->sampler_size_est_min = sampler_size;
2942   sub->sampler_size_est_need = sampler_size;;
2943   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2944   GNUNET_assert (0 != round_interval.rel_value_us);
2945   sub->round_interval = round_interval;
2946   sub->sampler = RPS_sampler_init (sampler_size,
2947                                    round_interval);
2948
2949   /* Logging of internals */
2950 #ifdef TO_FILE_FULL
2951   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2952 #endif /* TO_FILE_FULL */
2953 #ifdef TO_FILE
2954 #ifdef TO_FILE_FULL
2955   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2956                                                         "observed");
2957 #endif /* TO_FILE_FULL */
2958   sub->num_observed_peers = 0;
2959   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2960                                                                      GNUNET_NO);
2961 #endif /* TO_FILE */
2962
2963   /* Set up data structures for gossip */
2964   sub->push_map = CustomPeerMap_create (4);
2965   sub->pull_map = CustomPeerMap_create (4);
2966   sub->view_size_est_min = sampler_size;;
2967   sub->view = View_create (sub->view_size_est_min);
2968   if (sub == msub)
2969   {
2970     GNUNET_STATISTICS_set (stats,
2971                            "view size aim",
2972                            sub->view_size_est_min,
2973                            GNUNET_NO);
2974   }
2975
2976   /* Start executing rounds */
2977   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2978
2979   return sub;
2980 }
2981
2982
2983 #ifdef TO_FILE
2984 /**
2985  * @brief Write all numbers in the given array into the given file
2986  *
2987  * Single numbers devided by a newline
2988  *
2989  * @param hist_array[] the array to dump
2990  * @param file_name file to dump into
2991  */
2992 static void
2993 write_histogram_to_file (const uint32_t hist_array[],
2994                          const char *file_name)
2995 {
2996   char collect_str[SIZE_DUMP_FILE + 1] = "";
2997   char *recv_str_iter;
2998   char *file_name_full;
2999
3000   recv_str_iter = collect_str;
3001   file_name_full = store_prefix_file_name (&own_identity,
3002                                            file_name);
3003   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
3004   {
3005     char collect_str_tmp[8];
3006
3007     GNUNET_snprintf (collect_str_tmp,
3008                      sizeof(collect_str_tmp),
3009                      "%" PRIu32 "\n",
3010                      hist_array[i]);
3011     recv_str_iter = stpncpy (recv_str_iter,
3012                              collect_str_tmp,
3013                              6);
3014   }
3015   (void) stpcpy (recv_str_iter,
3016                  "\n");
3017   LOG (GNUNET_ERROR_TYPE_DEBUG,
3018        "Writing push stats to disk\n");
3019   to_file_w_len (file_name_full,
3020                  SIZE_DUMP_FILE,
3021                  collect_str);
3022   GNUNET_free (file_name_full);
3023 }
3024
3025
3026 #endif /* TO_FILE */
3027
3028
3029 /**
3030  * @brief Destroy Sub.
3031  *
3032  * @param sub Sub to destroy
3033  */
3034 static void
3035 destroy_sub (struct Sub *sub)
3036 {
3037   GNUNET_assert (NULL != sub);
3038   GNUNET_assert (NULL != sub->do_round_task);
3039   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3040   sub->do_round_task = NULL;
3041
3042   /* Disconnect from cadet */
3043   GNUNET_CADET_close_port (sub->cadet_port);
3044   sub->cadet_port = NULL;
3045
3046   /* Clean up data structures for peers */
3047   RPS_sampler_destroy (sub->sampler);
3048   sub->sampler = NULL;
3049   View_destroy (sub->view);
3050   sub->view = NULL;
3051   CustomPeerMap_destroy (sub->push_map);
3052   sub->push_map = NULL;
3053   CustomPeerMap_destroy (sub->pull_map);
3054   sub->pull_map = NULL;
3055   peers_terminate (sub);
3056
3057   /* Free leftover data structures */
3058 #ifdef TO_FILE_FULL
3059   GNUNET_free (sub->file_name_view_log);
3060   sub->file_name_view_log = NULL;
3061 #endif /* TO_FILE_FULL */
3062 #ifdef TO_FILE
3063 #ifdef TO_FILE_FULL
3064   GNUNET_free (sub->file_name_observed_log);
3065   sub->file_name_observed_log = NULL;
3066 #endif /* TO_FILE_FULL */
3067
3068   /* Write push frequencies to disk */
3069   write_histogram_to_file (sub->push_recv,
3070                            "push_recv");
3071
3072   /* Write push deltas to disk */
3073   write_histogram_to_file (sub->push_delta,
3074                            "push_delta");
3075
3076   /* Write pull delays to disk */
3077   write_histogram_to_file (sub->pull_delays,
3078                            "pull_delays");
3079
3080   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3081   sub->observed_unique_peers = NULL;
3082 #endif /* TO_FILE */
3083
3084   GNUNET_free (sub);
3085 }
3086
3087
3088 /***********************************************************************
3089 * /Sub
3090 ***********************************************************************/
3091
3092
3093 /***********************************************************************
3094 * Core handlers
3095 ***********************************************************************/
3096
3097 /**
3098  * @brief Callback on initialisation of Core.
3099  *
3100  * @param cls - unused
3101  * @param my_identity - unused
3102  */
3103 void
3104 core_init (void *cls,
3105            const struct GNUNET_PeerIdentity *my_identity)
3106 {
3107   (void) cls;
3108   (void) my_identity;
3109
3110   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3111 }
3112
3113
3114 /**
3115  * @brief Callback for core.
3116  * Method called whenever a given peer connects.
3117  *
3118  * @param cls closure - unused
3119  * @param peer peer identity this notification is about
3120  * @return closure given to #core_disconnects as peer_cls
3121  */
3122 void *
3123 core_connects (void *cls,
3124                const struct GNUNET_PeerIdentity *peer,
3125                struct GNUNET_MQ_Handle *mq)
3126 {
3127   (void) cls;
3128   (void) mq;
3129
3130   GNUNET_assert (GNUNET_YES ==
3131                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3132                                                     peer,
3133                                                     NULL,
3134                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3135   return NULL;
3136 }
3137
3138
3139 /**
3140  * @brief Callback for core.
3141  * Method called whenever a peer disconnects.
3142  *
3143  * @param cls closure - unused
3144  * @param peer peer identity this notification is about
3145  * @param peer_cls closure given in #core_connects - unused
3146  */
3147 void
3148 core_disconnects (void *cls,
3149                   const struct GNUNET_PeerIdentity *peer,
3150                   void *peer_cls)
3151 {
3152   (void) cls;
3153   (void) peer_cls;
3154
3155   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3156 }
3157
3158
3159 /***********************************************************************
3160 * /Core handlers
3161 ***********************************************************************/
3162
3163
3164 /**
3165  * @brief Destroy the context for a (connected) client
3166  *
3167  * @param cli_ctx Context to destroy
3168  */
3169 static void
3170 destroy_cli_ctx (struct ClientContext *cli_ctx)
3171 {
3172   GNUNET_assert (NULL != cli_ctx);
3173   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3174                                cli_ctx_tail,
3175                                cli_ctx);
3176   if (NULL != cli_ctx->sub)
3177   {
3178     destroy_sub (cli_ctx->sub);
3179     cli_ctx->sub = NULL;
3180   }
3181   GNUNET_free (cli_ctx);
3182 }
3183
3184
3185 /**
3186  * @brief Update sizes in sampler and view on estimate update from nse service
3187  *
3188  * @param sub Sub
3189  * @param logestimate the log(Base 2) value of the current network size estimate
3190  * @param std_dev standard deviation for the estimate
3191  */
3192 static void
3193 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3194 {
3195   double estimate;
3196
3197   // double scale; // TODO this might go gloabal/config
3198
3199   LOG (GNUNET_ERROR_TYPE_DEBUG,
3200        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3201        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3202   // scale = .01;
3203   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3204   // GNUNET_NSE_log_estimate_to_n (logestimate);
3205   estimate = pow (estimate, 1.0 / 3);
3206   // TODO add if std_dev is a number
3207   // estimate += (std_dev * scale);
3208   if (sub->view_size_est_min < ceil (estimate))
3209   {
3210     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3211     sub->sampler_size_est_need = estimate;
3212     sub->view_size_est_need = estimate;
3213   }
3214   else
3215   {
3216     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3217     // sub->sampler_size_est_need = sub->view_size_est_min;
3218     sub->view_size_est_need = sub->view_size_est_min;
3219   }
3220   if (sub == msub)
3221   {
3222     GNUNET_STATISTICS_set (stats,
3223                            "view size aim",
3224                            sub->view_size_est_need,
3225                            GNUNET_NO);
3226   }
3227
3228   /* If the NSE has changed adapt the lists accordingly */
3229   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3230   View_change_len (sub->view, sub->view_size_est_need);
3231 }
3232
3233
3234 /**
3235  * Function called by NSE.
3236  *
3237  * Updates sizes of sampler list and view and adapt those lists
3238  * accordingly.
3239  *
3240  * implements #GNUNET_NSE_Callback
3241  *
3242  * @param cls Closure - unused
3243  * @param timestamp time when the estimate was received from the server (or created by the server)
3244  * @param logestimate the log(Base 2) value of the current network size estimate
3245  * @param std_dev standard deviation for the estimate
3246  */
3247 static void
3248 nse_callback (void *cls,
3249               struct GNUNET_TIME_Absolute timestamp,
3250               double logestimate, double std_dev)
3251 {
3252   (void) cls;
3253   (void) timestamp;
3254   struct ClientContext *cli_ctx_iter;
3255
3256   adapt_sizes (msub, logestimate, std_dev);
3257   for (cli_ctx_iter = cli_ctx_head;
3258        NULL != cli_ctx_iter;
3259        cli_ctx_iter = cli_ctx_iter->next)
3260   {
3261     if (NULL != cli_ctx_iter->sub)
3262     {
3263       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3264     }
3265   }
3266 }
3267
3268
3269 /**
3270  * @brief This function is called, when the client seeds peers.
3271  * It verifies that @a msg is well-formed.
3272  *
3273  * @param cls the closure (#ClientContext)
3274  * @param msg the message
3275  * @return #GNUNET_OK if @a msg is well-formed
3276  *         #GNUNET_SYSERR otherwise
3277  */
3278 static int
3279 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3280 {
3281   struct ClientContext *cli_ctx = cls;
3282   uint16_t msize = ntohs (msg->header.size);
3283   uint32_t num_peers = ntohl (msg->num_peers);
3284
3285   msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
3286   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3287       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3288   {
3289     LOG (GNUNET_ERROR_TYPE_ERROR,
3290          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3291          ntohl (msg->num_peers),
3292          (msize / sizeof(struct GNUNET_PeerIdentity)));
3293     GNUNET_break (0);
3294     GNUNET_SERVICE_client_drop (cli_ctx->client);
3295     return GNUNET_SYSERR;
3296   }
3297   return GNUNET_OK;
3298 }
3299
3300
3301 /**
3302  * Handle seed from the client.
3303  *
3304  * @param cls closure
3305  * @param message the actual message
3306  */
3307 static void
3308 handle_client_seed (void *cls,
3309                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3310 {
3311   struct ClientContext *cli_ctx = cls;
3312   struct GNUNET_PeerIdentity *peers;
3313   uint32_t num_peers;
3314   uint32_t i;
3315
3316   num_peers = ntohl (msg->num_peers);
3317   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3318
3319   LOG (GNUNET_ERROR_TYPE_DEBUG,
3320        "Client seeded peers:\n");
3321   print_peer_list (peers, num_peers);
3322
3323   for (i = 0; i < num_peers; i++)
3324   {
3325     LOG (GNUNET_ERROR_TYPE_DEBUG,
3326          "Updating samplers with seed %" PRIu32 ": %s\n",
3327          i,
3328          GNUNET_i2s (&peers[i]));
3329
3330     if (NULL != msub)
3331       got_peer (msub, &peers[i]);                 /* Condition needed? */
3332     if (NULL != cli_ctx->sub)
3333       got_peer (cli_ctx->sub, &peers[i]);
3334   }
3335   GNUNET_SERVICE_client_continue (cli_ctx->client);
3336 }
3337
3338
3339 /**
3340  * Handle RPS request from the client.
3341  *
3342  * @param cls Client context
3343  * @param message Message containing the numer of updates the client wants to
3344  * receive
3345  */
3346 static void
3347 handle_client_view_request (void *cls,
3348                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3349 {
3350   struct ClientContext *cli_ctx = cls;
3351   uint64_t num_updates;
3352
3353   num_updates = ntohl (msg->num_updates);
3354
3355   LOG (GNUNET_ERROR_TYPE_DEBUG,
3356        "Client requested %" PRIu64 " updates of view.\n",
3357        num_updates);
3358
3359   GNUNET_assert (NULL != cli_ctx);
3360   cli_ctx->view_updates_left = num_updates;
3361   send_view (cli_ctx, NULL, 0);
3362   GNUNET_SERVICE_client_continue (cli_ctx->client);
3363 }
3364
3365
3366 /**
3367  * @brief Handle the cancellation of the view updates.
3368  *
3369  * @param cls The client context
3370  * @param msg Unused
3371  */
3372 static void
3373 handle_client_view_cancel (void *cls,
3374                            const struct GNUNET_MessageHeader *msg)
3375 {
3376   struct ClientContext *cli_ctx = cls;
3377
3378   (void) msg;
3379
3380   LOG (GNUNET_ERROR_TYPE_DEBUG,
3381        "Client does not want to receive updates of view any more.\n");
3382
3383   GNUNET_assert (NULL != cli_ctx);
3384   cli_ctx->view_updates_left = 0;
3385   GNUNET_SERVICE_client_continue (cli_ctx->client);
3386   if (GNUNET_YES == cli_ctx->stream_update)
3387   {
3388     destroy_cli_ctx (cli_ctx);
3389   }
3390 }
3391
3392
3393 /**
3394  * Handle RPS request for biased stream from the client.
3395  *
3396  * @param cls Client context
3397  * @param message unused
3398  */
3399 static void
3400 handle_client_stream_request (void *cls,
3401                               const struct
3402                               GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3403 {
3404   struct ClientContext *cli_ctx = cls;
3405
3406   (void) msg;
3407
3408   LOG (GNUNET_ERROR_TYPE_DEBUG,
3409        "Client requested peers from biased stream.\n");
3410   cli_ctx->stream_update = GNUNET_YES;
3411
3412   GNUNET_assert (NULL != cli_ctx);
3413   GNUNET_SERVICE_client_continue (cli_ctx->client);
3414 }
3415
3416
3417 /**
3418  * @brief Handles the cancellation of the stream of biased peer ids
3419  *
3420  * @param cls The client context
3421  * @param msg unused
3422  */
3423 static void
3424 handle_client_stream_cancel (void *cls,
3425                              const struct GNUNET_MessageHeader *msg)
3426 {
3427   struct ClientContext *cli_ctx = cls;
3428
3429   (void) msg;
3430
3431   LOG (GNUNET_ERROR_TYPE_DEBUG,
3432        "Client canceled receiving peers from biased stream.\n");
3433   cli_ctx->stream_update = GNUNET_NO;
3434
3435   GNUNET_assert (NULL != cli_ctx);
3436   GNUNET_SERVICE_client_continue (cli_ctx->client);
3437 }
3438
3439
3440 /**
3441  * @brief Create and start a Sub.
3442  *
3443  * @param cls Closure - unused
3444  * @param msg Message containing the necessary information
3445  */
3446 static void
3447 handle_client_start_sub (void *cls,
3448                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3449 {
3450   struct ClientContext *cli_ctx = cls;
3451
3452   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3453   if ((NULL != cli_ctx->sub) &&
3454       (0 != memcmp (&cli_ctx->sub->hash,
3455                     &msg->hash,
3456                     sizeof(struct GNUNET_HashCode))) )
3457   {
3458     LOG (GNUNET_ERROR_TYPE_WARNING,
3459          "Already have a Sub with different share for this client. Remove old one, add new.\n");
3460     destroy_sub (cli_ctx->sub);
3461     cli_ctx->sub = NULL;
3462   }
3463   cli_ctx->sub = new_sub (&msg->hash,
3464                           msub->sampler_size_est_min, // TODO make api input?
3465                           GNUNET_TIME_relative_ntoh (msg->round_interval));
3466   GNUNET_SERVICE_client_continue (cli_ctx->client);
3467 }
3468
3469
3470 /**
3471  * @brief Destroy the Sub
3472  *
3473  * @param cls Closure - unused
3474  * @param msg Message containing the hash that identifies the Sub
3475  */
3476 static void
3477 handle_client_stop_sub (void *cls,
3478                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3479 {
3480   struct ClientContext *cli_ctx = cls;
3481
3482   GNUNET_assert (NULL != cli_ctx->sub);
3483   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof(struct
3484                                                            GNUNET_HashCode)))
3485   {
3486     LOG (GNUNET_ERROR_TYPE_WARNING,
3487          "Share of current sub and request differ!\n");
3488   }
3489   destroy_sub (cli_ctx->sub);
3490   cli_ctx->sub = NULL;
3491   GNUNET_SERVICE_client_continue (cli_ctx->client);
3492 }
3493
3494
3495 /**
3496  * Handle a CHECK_LIVE message from another peer.
3497  *
3498  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3499  * the channel is blocked for all other communication.
3500  *
3501  * @param cls Closure - Context of channel
3502  * @param msg Message - unused
3503  */
3504 static void
3505 handle_peer_check (void *cls,
3506                    const struct GNUNET_MessageHeader *msg)
3507 {
3508   const struct ChannelCtx *channel_ctx = cls;
3509   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3510
3511   (void) msg;
3512
3513   LOG (GNUNET_ERROR_TYPE_DEBUG,
3514        "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3515   if (channel_ctx->peer_ctx->sub == msub)
3516   {
3517     GNUNET_STATISTICS_update (stats,
3518                               "# pending online checks",
3519                               -1,
3520                               GNUNET_NO);
3521   }
3522
3523   GNUNET_CADET_receive_done (channel_ctx->channel);
3524 }
3525
3526
3527 /**
3528  * Handle a PUSH message from another peer.
3529  *
3530  * Check the proof of work and store the PeerID
3531  * in the temporary list for pushed PeerIDs.
3532  *
3533  * @param cls Closure - Context of channel
3534  * @param msg Message - unused
3535  */
3536 static void
3537 handle_peer_push (void *cls,
3538                   const struct GNUNET_MessageHeader *msg)
3539 {
3540   const struct ChannelCtx *channel_ctx = cls;
3541   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3542
3543   (void) msg;
3544
3545   // (check the proof of work (?))
3546
3547   LOG (GNUNET_ERROR_TYPE_DEBUG,
3548        "Received PUSH (%s)\n",
3549        GNUNET_i2s (peer));
3550   if (channel_ctx->peer_ctx->sub == msub)
3551   {
3552     GNUNET_STATISTICS_update (stats, "# push message received", 1, GNUNET_NO);
3553     if ((NULL != map_single_hop) &&
3554         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3555                                                               peer)))
3556     {
3557       GNUNET_STATISTICS_update (stats,
3558                                 "# push message received (multi-hop peer)",
3559                                 1,
3560                                 GNUNET_NO);
3561     }
3562   }
3563
3564   #if ENABLE_MALICIOUS
3565   struct AttackedPeer *tmp_att_peer;
3566
3567   if ((1 == mal_type) ||
3568       (3 == mal_type))
3569   {   /* Try to maximise representation */
3570     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3571     tmp_att_peer->peer_id = *peer;
3572     if (NULL == att_peer_set)
3573       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3574     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3575                                                              peer))
3576     {
3577       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3578                                    att_peers_tail,
3579                                    tmp_att_peer);
3580       add_peer_array_to_set (peer, 1, att_peer_set);
3581     }
3582     else
3583     {
3584       GNUNET_free (tmp_att_peer);
3585     }
3586   }
3587
3588
3589   else if (2 == mal_type)
3590   {
3591     /* We attack one single well-known peer - simply ignore */
3592   }
3593   #endif /* ENABLE_MALICIOUS */
3594
3595   /* Add the sending peer to the push_map */
3596   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3597
3598   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3599                                      &channel_ctx->peer_ctx->peer_id));
3600   GNUNET_CADET_receive_done (channel_ctx->channel);
3601 }
3602
3603
3604 /**
3605  * Handle PULL REQUEST request message from another peer.
3606  *
3607  * Reply with the view of PeerIDs.
3608  *
3609  * @param cls Closure - Context of channel
3610  * @param msg Message - unused
3611  */
3612 static void
3613 handle_peer_pull_request (void *cls,
3614                           const struct GNUNET_MessageHeader *msg)
3615 {
3616   const struct ChannelCtx *channel_ctx = cls;
3617   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3618   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3619   const struct GNUNET_PeerIdentity *view_array;
3620
3621   (void) msg;
3622
3623   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (
3624          peer));
3625   if (peer_ctx->sub == msub)
3626   {
3627     GNUNET_STATISTICS_update (stats,
3628                               "# pull request message received",
3629                               1,
3630                               GNUNET_NO);
3631     if ((NULL != map_single_hop) &&
3632         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3633                                                               &peer_ctx->peer_id)))
3634     {
3635       GNUNET_STATISTICS_update (stats,
3636                                 "# pull request message received (multi-hop peer)",
3637                                 1,
3638                                 GNUNET_NO);
3639     }
3640   }
3641
3642   #if ENABLE_MALICIOUS
3643   if ((1 == mal_type)
3644       || (3 == mal_type))
3645   {   /* Try to maximise representation */
3646     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3647   }
3648
3649   else if (2 == mal_type)
3650   {   /* Try to partition network */
3651     if (0 == GNUNET_memcmp (&attacked_peer, peer))
3652     {
3653       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3654     }
3655   }
3656   #endif /* ENABLE_MALICIOUS */
3657
3658   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3659                                      &channel_ctx->peer_ctx->peer_id));
3660   GNUNET_CADET_receive_done (channel_ctx->channel);
3661   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3662   send_pull_reply (peer_ctx,
3663                    view_array,
3664                    View_size (channel_ctx->peer_ctx->sub->view));
3665 }
3666
3667
3668 /**
3669  * Check whether we sent a corresponding request and
3670  * whether this reply is the first one.
3671  *
3672  * @param cls Closure - Context of channel
3673  * @param msg Message containing the replied peers
3674  */
3675 static int
3676 check_peer_pull_reply (void *cls,
3677                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3678 {
3679   struct ChannelCtx *channel_ctx = cls;
3680   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3681
3682   if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3683   {
3684     GNUNET_break_op (0);
3685     return GNUNET_SYSERR;
3686   }
3687
3688   if ((ntohs (msg->header.size) - sizeof(struct
3689                                          GNUNET_RPS_P2P_PullReplyMessage))
3690       / sizeof(struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3691   {
3692     LOG (GNUNET_ERROR_TYPE_ERROR,
3693          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3694          ntohl (msg->num_peers),
3695          (ntohs (msg->header.size) - sizeof(struct
3696                                             GNUNET_RPS_P2P_PullReplyMessage))
3697          / sizeof(struct GNUNET_PeerIdentity));
3698     GNUNET_break_op (0);
3699     return GNUNET_SYSERR;
3700   }
3701
3702   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3703                                      &sender_ctx->peer_id,
3704                                      Peers_PULL_REPLY_PENDING))
3705   {
3706     LOG (GNUNET_ERROR_TYPE_WARNING,
3707          "Received a pull reply from a peer (%s) we didn't request one from!\n",
3708          GNUNET_i2s (&sender_ctx->peer_id));
3709     if (sender_ctx->sub == msub)
3710     {
3711       GNUNET_STATISTICS_update (stats,
3712                                 "# unrequested pull replies",
3713                                 1,
3714                                 GNUNET_NO);
3715     }
3716   }
3717   return GNUNET_OK;
3718 }
3719
3720
3721 /**
3722  * Handle PULL REPLY message from another peer.
3723  *
3724  * @param cls Closure
3725  * @param msg The message header
3726  */
3727 static void
3728 handle_peer_pull_reply (void *cls,
3729                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3730 {
3731   const struct ChannelCtx *channel_ctx = cls;
3732   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3733   const struct GNUNET_PeerIdentity *peers;
3734   struct Sub *sub = channel_ctx->peer_ctx->sub;
3735   uint32_t i;
3736
3737 #if ENABLE_MALICIOUS
3738   struct AttackedPeer *tmp_att_peer;
3739 #endif /* ENABLE_MALICIOUS */
3740
3741   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3742   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (
3743          sender));
3744   if (channel_ctx->peer_ctx->sub == msub)
3745   {
3746     GNUNET_STATISTICS_update (stats,
3747                               "# pull reply messages received",
3748                               1,
3749                               GNUNET_NO);
3750     if ((NULL != map_single_hop) &&
3751         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3752                                                               &channel_ctx->
3753                                                               peer_ctx->peer_id)) )
3754     {
3755       GNUNET_STATISTICS_update (stats,
3756                                 "# pull reply messages received (multi-hop peer)",
3757                                 1,
3758                                 GNUNET_NO);
3759     }
3760   }
3761
3762   #if ENABLE_MALICIOUS
3763   // We shouldn't even receive pull replies as we're not sending
3764   if (2 == mal_type)
3765   {
3766   }
3767   #endif /* ENABLE_MALICIOUS */
3768
3769   /* Do actual logic */
3770   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3771
3772   LOG (GNUNET_ERROR_TYPE_DEBUG,
3773        "PULL REPLY received, got following %u peers:\n",
3774        ntohl (msg->num_peers));
3775
3776   for (i = 0; i < ntohl (msg->num_peers); i++)
3777   {
3778     LOG (GNUNET_ERROR_TYPE_DEBUG,
3779          "%u. %s\n",
3780          i,
3781          GNUNET_i2s (&peers[i]));
3782
3783     #if ENABLE_MALICIOUS
3784     if ((NULL != att_peer_set) &&
3785         ((1 == mal_type) || (3 == mal_type) ))
3786     {     /* Add attacked peer to local list */
3787           // TODO check if we sent a request and this was the first reply
3788       if ((GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3789                                                                 &peers[i]))
3790           && (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3791                                                                    &peers[i])) )
3792       {
3793         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3794         tmp_att_peer->peer_id = peers[i];
3795         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3796                                      att_peers_tail,
3797                                      tmp_att_peer);
3798         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3799       }
3800       continue;
3801     }
3802     #endif /* ENABLE_MALICIOUS */
3803     /* Make sure we 'know' about this peer */
3804     (void) insert_peer (channel_ctx->peer_ctx->sub,
3805                         &peers[i]);
3806
3807     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3808                                         &peers[i]))
3809     {
3810       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3811                          &peers[i]);
3812     }
3813     else
3814     {
3815       schedule_operation (channel_ctx->peer_ctx,
3816                           insert_in_pull_map,
3817                           channel_ctx->peer_ctx->sub);    /* cls */
3818       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3819                                       &peers[i]);
3820     }
3821   }
3822
3823   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3824                                  sender),
3825                    Peers_PULL_REPLY_PENDING);
3826   clean_peer (channel_ctx->peer_ctx->sub,
3827               sender);
3828
3829   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3830                                      sender));
3831   GNUNET_CADET_receive_done (channel_ctx->channel);
3832 }
3833
3834
3835 /**
3836  * Compute a random delay.
3837  * A uniformly distributed value between mean + spread and mean - spread.
3838  *
3839  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3840  * It would return a random value between 2 and 6 min.
3841  *
3842  * @param mean the mean time until the next round
3843  * @param spread the inverse amount of deviation from the mean
3844  */
3845 static struct GNUNET_TIME_Relative
3846 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3847                     unsigned int spread)
3848 {
3849   struct GNUNET_TIME_Relative half_interval;
3850   struct GNUNET_TIME_Relative ret;
3851   unsigned int rand_delay;
3852   unsigned int max_rand_delay;
3853
3854   if (0 == spread)
3855   {
3856     LOG (GNUNET_ERROR_TYPE_WARNING,
3857          "Not accepting spread of 0\n");
3858     GNUNET_break (0);
3859     GNUNET_assert (0);
3860   }
3861   GNUNET_assert (0 != mean.rel_value_us);
3862
3863   /* Compute random time value between spread * mean and spread * mean */
3864   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3865
3866   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us
3867                    / mean.rel_value_us * (2 / spread);
3868   /**
3869    * Compute random value between (0 and 1) * round_interval
3870    * via multiplying round_interval with a 'fraction' (0 to value)/value
3871    */
3872   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3873                                          max_rand_delay);
3874   ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3875   ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3876   ret = GNUNET_TIME_relative_add (ret, half_interval);
3877
3878   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3879     LOG (GNUNET_ERROR_TYPE_WARNING,
3880          "Returning FOREVER_REL\n");
3881
3882   return ret;
3883 }
3884
3885
3886 /**
3887  * Send single pull request
3888  *
3889  * @param peer_ctx Context to the peer to send request to
3890  */
3891 static void
3892 send_pull_request (struct PeerContext *peer_ctx)
3893 {
3894   struct GNUNET_MQ_Envelope *ev;
3895
3896   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3897                                                &peer_ctx->peer_id,
3898                                                Peers_PULL_REPLY_PENDING));
3899   SET_PEER_FLAG (peer_ctx,
3900                  Peers_PULL_REPLY_PENDING);
3901   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3902
3903   LOG (GNUNET_ERROR_TYPE_DEBUG,
3904        "Going to send PULL REQUEST to peer %s.\n",
3905        GNUNET_i2s (&peer_ctx->peer_id));
3906
3907   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3908   send_message (peer_ctx,
3909                 ev,
3910                 "PULL REQUEST");
3911   if (peer_ctx->sub)
3912   {
3913     GNUNET_STATISTICS_update (stats,
3914                               "# pull request send issued",
3915                               1,
3916                               GNUNET_NO);
3917     if ((NULL != map_single_hop) &&
3918         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3919                                                               &peer_ctx->peer_id)))
3920     {
3921       GNUNET_STATISTICS_update (stats,
3922                                 "# pull request send issued (multi-hop peer)",
3923                                 1,
3924                                 GNUNET_NO);
3925     }
3926   }
3927 }
3928
3929
3930 /**
3931  * Send single push
3932  *
3933  * @param peer_ctx Context of peer to send push to
3934  */
3935 static void
3936 send_push (struct PeerContext *peer_ctx)
3937 {
3938   struct GNUNET_MQ_Envelope *ev;
3939
3940   LOG (GNUNET_ERROR_TYPE_DEBUG,
3941        "Going to send PUSH to peer %s.\n",
3942        GNUNET_i2s (&peer_ctx->peer_id));
3943
3944   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3945   send_message (peer_ctx, ev, "PUSH");
3946   if (peer_ctx->sub)
3947   {
3948     GNUNET_STATISTICS_update (stats,
3949                               "# push send issued",
3950                               1,
3951                               GNUNET_NO);
3952     if ((NULL != map_single_hop) &&
3953         (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3954                                                               &peer_ctx->peer_id)))
3955     {
3956       GNUNET_STATISTICS_update (stats,
3957                                 "# push send issued (multi-hop peer)",
3958                                 1,
3959                                 GNUNET_NO);
3960     }
3961   }
3962 }
3963
3964
3965 #if ENABLE_MALICIOUS
3966
3967
3968 /**
3969  * @brief This function is called, when the client tells us to act malicious.
3970  * It verifies that @a msg is well-formed.
3971  *
3972  * @param cls the closure (#ClientContext)
3973  * @param msg the message
3974  * @return #GNUNET_OK if @a msg is well-formed
3975  */
3976 static int
3977 check_client_act_malicious (void *cls,
3978                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3979 {
3980   struct ClientContext *cli_ctx = cls;
3981   uint16_t msize = ntohs (msg->header.size);
3982   uint32_t num_peers = ntohl (msg->num_peers);
3983
3984   msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
3985   if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3986       (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3987   {
3988     LOG (GNUNET_ERROR_TYPE_ERROR,
3989          "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3990          ntohl (msg->num_peers),
3991          (msize / sizeof(struct GNUNET_PeerIdentity)));
3992     GNUNET_break (0);
3993     GNUNET_SERVICE_client_drop (cli_ctx->client);
3994     return GNUNET_SYSERR;
3995   }
3996   return GNUNET_OK;
3997 }
3998
3999
4000 /**
4001  * Turn RPS service to act malicious.
4002  *
4003  * @param cls Closure
4004  * @param client The client that sent the message
4005  * @param msg The message header
4006  */
4007 static void
4008 handle_client_act_malicious (void *cls,
4009                              const struct
4010                              GNUNET_RPS_CS_ActMaliciousMessage *msg)
4011 {
4012   struct ClientContext *cli_ctx = cls;
4013   struct GNUNET_PeerIdentity *peers;
4014   uint32_t num_mal_peers_sent;
4015   uint32_t num_mal_peers_old;
4016   struct Sub *sub = cli_ctx->sub;
4017
4018   if (NULL == sub)
4019     sub = msub;
4020   /* Do actual logic */
4021   peers = (struct GNUNET_PeerIdentity *) &msg[1];
4022   mal_type = ntohl (msg->type);
4023   if (NULL == mal_peer_set)
4024     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4025
4026   LOG (GNUNET_ERROR_TYPE_DEBUG,
4027        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
4028        mal_type,
4029        ntohl (msg->num_peers));
4030
4031   if (1 == mal_type)
4032   {   /* Try to maximise representation */
4033       /* Add other malicious peers to those we already know */
4034
4035     num_mal_peers_sent = ntohl (msg->num_peers);
4036     num_mal_peers_old = num_mal_peers;
4037     GNUNET_array_grow (mal_peers,
4038                        num_mal_peers,
4039                        num_mal_peers + num_mal_peers_sent);
4040     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4041                    peers,
4042                    num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4043
4044     /* Add all mal peers to mal_peer_set */
4045     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4046                            num_mal_peers_sent,
4047                            mal_peer_set);
4048
4049     /* Substitute do_round () with do_mal_round () */
4050     GNUNET_assert (NULL != sub->do_round_task);
4051     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4052     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4053   }
4054
4055   else if ((2 == mal_type) ||
4056            (3 == mal_type))
4057   {   /* Try to partition the network */
4058       /* Add other malicious peers to those we already know */
4059
4060     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4061     num_mal_peers_old = num_mal_peers;
4062     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4063     GNUNET_array_grow (mal_peers,
4064                        num_mal_peers,
4065                        num_mal_peers + num_mal_peers_sent);
4066     if ((NULL != mal_peers) &&
4067         (0 != num_mal_peers) )
4068     {
4069       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4070                      peers,
4071                      num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4072
4073       /* Add all mal peers to mal_peer_set */
4074       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4075                              num_mal_peers_sent,
4076                              mal_peer_set);
4077     }
4078
4079     /* Store the one attacked peer */
4080     GNUNET_memcpy (&attacked_peer,
4081                    &msg->attacked_peer,
4082                    sizeof(struct GNUNET_PeerIdentity));
4083     /* Set the flag of the attacked peer to valid to avoid problems */
4084     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4085     {
4086       (void) issue_peer_online_check (sub, &attacked_peer);
4087     }
4088
4089     LOG (GNUNET_ERROR_TYPE_DEBUG,
4090          "Attacked peer is %s\n",
4091          GNUNET_i2s (&attacked_peer));
4092
4093     /* Substitute do_round () with do_mal_round () */
4094     if (NULL != sub->do_round_task)
4095     {
4096       /* Probably in shutdown */
4097       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4098       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4099     }
4100   }
4101   else if (0 == mal_type)
4102   {   /* Stop acting malicious */
4103     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4104
4105     /* Substitute do_mal_round () with do_round () */
4106     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4107     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4108   }
4109   else
4110   {
4111     GNUNET_break (0);
4112     GNUNET_SERVICE_client_continue (cli_ctx->client);
4113   }
4114   GNUNET_SERVICE_client_continue (cli_ctx->client);
4115 }
4116
4117
4118 /**
4119  * Send out PUSHes and PULLs maliciously.
4120  *
4121  * This is executed regylary.
4122  *
4123  * @param cls Closure - Sub
4124  */
4125 static void
4126 do_mal_round (void *cls)
4127 {
4128   uint32_t num_pushes;
4129   uint32_t i;
4130   struct GNUNET_TIME_Relative time_next_round;
4131   struct AttackedPeer *tmp_att_peer;
4132   struct Sub *sub = cls;
4133
4134   LOG (GNUNET_ERROR_TYPE_DEBUG,
4135        "Going to execute next round maliciously type %" PRIu32 ".\n",
4136        mal_type);
4137   sub->do_round_task = NULL;
4138   GNUNET_assert (mal_type <= 3);
4139   /* Do malicious actions */
4140   if (1 == mal_type)
4141   {   /* Try to maximise representation */
4142       /* The maximum of pushes we're going to send this round */
4143     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4144                                          num_attacked_peers),
4145                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4146
4147     LOG (GNUNET_ERROR_TYPE_DEBUG,
4148          "Going to send %" PRIu32 " pushes\n",
4149          num_pushes);
4150
4151     /* Send PUSHes to attacked peers */
4152     for (i = 0; i < num_pushes; i++)
4153     {
4154       if (att_peers_tail == att_peer_index)
4155         att_peer_index = att_peers_head;
4156       else
4157         att_peer_index = att_peer_index->next;
4158
4159       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4160     }
4161
4162     /* Send PULLs to some peers to learn about additional peers to attack */
4163     tmp_att_peer = att_peer_index;
4164     for (i = 0; i < num_pushes * alpha; i++)
4165     {
4166       if (att_peers_tail == tmp_att_peer)
4167         tmp_att_peer = att_peers_head;
4168       else
4169         att_peer_index = tmp_att_peer->next;
4170
4171       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4172     }
4173   }
4174
4175
4176   else if (2 == mal_type)
4177   {   /**
4178        * Try to partition the network
4179        * Send as many pushes to the attacked peer as possible
4180        * That is one push per round as it will ignore more.
4181        */
4182     (void) issue_peer_online_check (sub, &attacked_peer);
4183     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4184                                        &attacked_peer,
4185                                        Peers_ONLINE))
4186       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4187   }
4188
4189
4190   if (3 == mal_type)
4191   {   /* Combined attack */
4192       /* Send PUSH to attacked peers */
4193     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4194     {
4195       (void) issue_peer_online_check (sub, &attacked_peer);
4196       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4197                                          &attacked_peer,
4198                                          Peers_ONLINE))
4199       {
4200         LOG (GNUNET_ERROR_TYPE_DEBUG,
4201              "Goding to send push to attacked peer (%s)\n",
4202              GNUNET_i2s (&attacked_peer));
4203         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4204       }
4205     }
4206     (void) issue_peer_online_check (sub, &attacked_peer);
4207
4208     /* The maximum of pushes we're going to send this round */
4209     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4210                                          num_attacked_peers),
4211                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4212
4213     LOG (GNUNET_ERROR_TYPE_DEBUG,
4214          "Going to send %" PRIu32 " pushes\n",
4215          num_pushes);
4216
4217     for (i = 0; i < num_pushes; i++)
4218     {
4219       if (att_peers_tail == att_peer_index)
4220         att_peer_index = att_peers_head;
4221       else
4222         att_peer_index = att_peer_index->next;
4223
4224       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4225     }
4226
4227     /* Send PULLs to some peers to learn about additional peers to attack */
4228     tmp_att_peer = att_peer_index;
4229     for (i = 0; i < num_pushes * alpha; i++)
4230     {
4231       if (att_peers_tail == tmp_att_peer)
4232         tmp_att_peer = att_peers_head;
4233       else
4234         att_peer_index = tmp_att_peer->next;
4235
4236       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4237     }
4238   }
4239
4240   /* Schedule next round */
4241   time_next_round = compute_rand_delay (sub->round_interval, 2);
4242
4243   GNUNET_assert (NULL == sub->do_round_task);
4244   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4245                                                      &do_mal_round, sub);
4246   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4247 }
4248
4249
4250 #endif /* ENABLE_MALICIOUS */
4251
4252
4253 /**
4254  * Send out PUSHes and PULLs, possibly update #view, samplers.
4255  *
4256  * This is executed regylary.
4257  *
4258  * @param cls Closure - Sub
4259  */
4260 static void
4261 do_round (void *cls)
4262 {
4263   unsigned int i;
4264   const struct GNUNET_PeerIdentity *view_array;
4265   unsigned int *permut;
4266   unsigned int a_peers; /* Number of peers we send pushes to */
4267   unsigned int b_peers; /* Number of peers we send pull requests to */
4268   uint32_t first_border;
4269   uint32_t second_border;
4270   struct GNUNET_PeerIdentity peer;
4271   struct GNUNET_PeerIdentity *update_peer;
4272   struct Sub *sub = cls;
4273
4274   sub->num_rounds++;
4275   LOG (GNUNET_ERROR_TYPE_DEBUG,
4276        "Going to execute next round.\n");
4277   if (sub == msub)
4278   {
4279     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4280   }
4281   sub->do_round_task = NULL;
4282 #ifdef TO_FILE_FULL
4283   to_file (sub->file_name_view_log,
4284            "___ new round ___");
4285 #endif /* TO_FILE_FULL */
4286   view_array = View_get_as_array (sub->view);
4287   for (i = 0; i < View_size (sub->view); i++)
4288   {
4289     LOG (GNUNET_ERROR_TYPE_DEBUG,
4290          "\t%s\n", GNUNET_i2s (&view_array[i]));
4291 #ifdef TO_FILE_FULL
4292     to_file (sub->file_name_view_log,
4293              "=%s\t(do round)",
4294              GNUNET_i2s_full (&view_array[i]));
4295 #endif /* TO_FILE_FULL */
4296   }
4297
4298
4299   /* Send pushes and pull requests */
4300   if (0 < View_size (sub->view))
4301   {
4302     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4303                                            View_size (sub->view));
4304
4305     /* Send PUSHes */
4306     a_peers = ceil (alpha * View_size (sub->view));
4307
4308     LOG (GNUNET_ERROR_TYPE_DEBUG,
4309          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4310          a_peers, alpha, View_size (sub->view));
4311     for (i = 0; i < a_peers; i++)
4312     {
4313       peer = view_array[permut[i]];
4314       // FIXME if this fails schedule/loop this for later
4315       send_push (get_peer_ctx (sub->peer_map, &peer));
4316     }
4317
4318     /* Send PULL requests */
4319     b_peers = ceil (beta * View_size (sub->view));
4320     first_border = a_peers;
4321     second_border = a_peers + b_peers;
4322     if (second_border > View_size (sub->view))
4323     {
4324       first_border = View_size (sub->view) - b_peers;
4325       second_border = View_size (sub->view);
4326     }
4327     LOG (GNUNET_ERROR_TYPE_DEBUG,
4328          "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4329          b_peers, beta, View_size (sub->view));
4330     for (i = first_border; i < second_border; i++)
4331     {
4332       peer = view_array[permut[i]];
4333       if (GNUNET_NO == check_peer_flag (sub->peer_map,
4334                                         &peer,
4335                                         Peers_PULL_REPLY_PENDING))
4336       {       // FIXME if this fails schedule/loop this for later
4337         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4338       }
4339     }
4340
4341     GNUNET_free (permut);
4342     permut = NULL;
4343   }
4344
4345
4346   /* Update view */
4347   /* TODO see how many peers are in push-/pull- list! */
4348
4349   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4350       (0 < CustomPeerMap_size (sub->push_map)) &&
4351       (0 < CustomPeerMap_size (sub->pull_map)))
4352   {   /* If conditions for update are fulfilled, update */
4353     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4354
4355     uint32_t final_size;
4356     uint32_t peers_to_clean_size;
4357     struct GNUNET_PeerIdentity *peers_to_clean;
4358
4359     peers_to_clean = NULL;
4360     peers_to_clean_size = 0;
4361     GNUNET_array_grow (peers_to_clean,
4362                        peers_to_clean_size,
4363                        View_size (sub->view));
4364     GNUNET_memcpy (peers_to_clean,
4365                    view_array,
4366                    View_size (sub->view) * sizeof(struct GNUNET_PeerIdentity));
4367
4368     /* Seems like recreating is the easiest way of emptying the peermap */
4369     View_clear (sub->view);
4370 #ifdef TO_FILE_FULL
4371     to_file (sub->file_name_view_log,
4372              "--- emptied ---");
4373 #endif /* TO_FILE_FULL */
4374
4375     first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4376                                CustomPeerMap_size (sub->push_map));
4377     second_border = first_border
4378                     + GNUNET_MIN (floor (beta * sub->view_size_est_need),
4379                                   CustomPeerMap_size (sub->pull_map));
4380     final_size = second_border
4381                  + ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4382     LOG (GNUNET_ERROR_TYPE_DEBUG,
4383          "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"
4384          PRIu32 "\n",
4385          first_border,
4386          second_border,
4387          final_size);
4388
4389     /* Update view with peers received through PUSHes */
4390     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4391                                            CustomPeerMap_size (sub->push_map));
4392     for (i = 0; i < first_border; i++)
4393     {
4394       int inserted;
4395       inserted = insert_in_view (sub,
4396                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4397                                                                   permut[i]));
4398       if (GNUNET_OK == inserted)
4399       {
4400         clients_notify_stream_peer (sub,
4401                                     1,
4402                                     CustomPeerMap_get_peer_by_index (
4403                                       sub->push_map, permut[i]));
4404       }
4405 #ifdef TO_FILE_FULL
4406       to_file (sub->file_name_view_log,
4407                "+%s\t(push list)",
4408                GNUNET_i2s_full (&view_array[i]));
4409 #endif /* TO_FILE_FULL */
4410        // TODO change the peer_flags accordingly
4411     }
4412     GNUNET_free (permut);
4413     permut = NULL;
4414
4415     /* Update view with peers received through PULLs */
4416     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4417                                            CustomPeerMap_size (sub->pull_map));
4418     for (i = first_border; i < second_border; i++)
4419     {
4420       int inserted;
4421       inserted = insert_in_view (sub,
4422                                  CustomPeerMap_get_peer_by_index (sub->pull_map,
4423                                                                   permut[i
4424                                                                          -
4425                                                                          first_border
4426                                                                   ]));
4427       if (GNUNET_OK == inserted)
4428       {
4429         clients_notify_stream_peer (sub,
4430                                     1,
4431                                     CustomPeerMap_get_peer_by_index (
4432                                       sub->pull_map,
4433                                       permut[i
4434                                              - first_border]));
4435       }
4436 #ifdef TO_FILE_FULL
4437       to_file (sub->file_name_view_log,
4438                "+%s\t(pull list)",
4439                GNUNET_i2s_full (&view_array[i]));
4440 #endif /* TO_FILE_FULL */
4441        // TODO change the peer_flags accordingly
4442     }
4443     GNUNET_free (permut);
4444     permut = NULL;
4445
4446     /* Update view with peers from history */
4447     RPS_sampler_get_n_rand_peers (sub->sampler,
4448                                   final_size - second_border,
4449                                   hist_update,
4450                                   sub);
4451     // TODO change the peer_flags accordingly
4452
4453     for (i = 0; i < View_size (sub->view); i++)
4454       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4455
4456     /* Clean peers that were removed from the view */
4457     for (i = 0; i < peers_to_clean_size; i++)
4458     {
4459 #ifdef TO_FILE_FULL
4460       to_file (sub->file_name_view_log,
4461                "-%s",
4462                GNUNET_i2s_full (&peers_to_clean[i]));
4463 #endif /* TO_FILE_FULL */
4464       clean_peer (sub, &peers_to_clean[i]);
4465     }
4466
4467     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4468     clients_notify_view_update (sub);
4469   }
4470   else
4471   {
4472     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4473     if (sub == msub)
4474     {
4475       GNUNET_STATISTICS_update (stats, "# rounds blocked", 1, GNUNET_NO);
4476       if ((CustomPeerMap_size (sub->push_map) > alpha
4477            * sub->view_size_est_need) &&
4478           ! (0 >= CustomPeerMap_size (sub->pull_map)))
4479         GNUNET_STATISTICS_update (stats, "# rounds blocked - too many pushes",
4480                                   1, GNUNET_NO);
4481       if ((CustomPeerMap_size (sub->push_map) > alpha
4482            * sub->view_size_est_need) &&
4483           (0 >= CustomPeerMap_size (sub->pull_map)))
4484         GNUNET_STATISTICS_update (stats,
4485                                   "# rounds blocked - too many pushes, no pull replies",
4486                                   1, GNUNET_NO);
4487       if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4488           ! (0 >= CustomPeerMap_size (sub->pull_map)))
4489         GNUNET_STATISTICS_update (stats, "# rounds blocked - no pushes", 1,
4490                                   GNUNET_NO);
4491       if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4492           (0 >= CustomPeerMap_size (sub->pull_map)))
4493         GNUNET_STATISTICS_update (stats,
4494                                   "# rounds blocked - no pushes, no pull replies",
4495                                   1, GNUNET_NO);
4496       if ((0 >= CustomPeerMap_size (sub->pull_map)) &&
4497           (CustomPeerMap_size (sub->push_map) > alpha
4498            * sub->view_size_est_need) &&
4499           (0 >= CustomPeerMap_size (sub->push_map)) )
4500         GNUNET_STATISTICS_update (stats, "# rounds blocked - no pull replies",
4501                                   1, GNUNET_NO);
4502     }
4503   }
4504   // TODO independent of that also get some peers from CADET_get_peers()?
4505   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4506   {
4507     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4508   }
4509   else
4510   {
4511     LOG (GNUNET_ERROR_TYPE_WARNING,
4512          "Push map size too big for histogram (%u, %u)\n",
4513          CustomPeerMap_size (sub->push_map),
4514          HISTOGRAM_FILE_SLOTS);
4515   }
4516   // FIXME check bounds of histogram
4517   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map)
4518                              - (alpha * sub->view_size_est_need))
4519                   + (HISTOGRAM_FILE_SLOTS / 2)]++;
4520   if (sub == msub)
4521   {
4522     GNUNET_STATISTICS_set (stats,
4523                            "# peers in push map at end of round",
4524                            CustomPeerMap_size (sub->push_map),
4525                            GNUNET_NO);
4526     GNUNET_STATISTICS_set (stats,
4527                            "# peers in pull map at end of round",
4528                            CustomPeerMap_size (sub->pull_map),
4529                            GNUNET_NO);
4530     GNUNET_STATISTICS_set (stats,
4531                            "# peers in view at end of round",
4532                            View_size (sub->view),
4533                            GNUNET_NO);
4534     GNUNET_STATISTICS_set (stats,
4535                            "# expected pushes",
4536                            alpha * sub->view_size_est_need,
4537                            GNUNET_NO);
4538     GNUNET_STATISTICS_set (stats,
4539                            "delta expected - received pushes",
4540                            CustomPeerMap_size (sub->push_map) - (alpha
4541                                                                  * sub->
4542                                                                  view_size_est_need),
4543                            GNUNET_NO);
4544   }
4545
4546   LOG (GNUNET_ERROR_TYPE_DEBUG,
4547        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4548        CustomPeerMap_size (sub->push_map),
4549        CustomPeerMap_size (sub->pull_map),
4550        alpha,
4551        View_size (sub->view),
4552        alpha * View_size (sub->view));
4553
4554   /* Update samplers */
4555   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4556   {
4557     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4558     LOG (GNUNET_ERROR_TYPE_DEBUG,
4559          "Updating with peer %s from push list\n",
4560          GNUNET_i2s (update_peer));
4561     insert_in_sampler (sub, update_peer);
4562     clean_peer (sub, update_peer);  /* This cleans only if it is not in the view */
4563   }
4564
4565   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4566   {
4567     LOG (GNUNET_ERROR_TYPE_DEBUG,
4568          "Updating with peer %s from pull list\n",
4569          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4570     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4571     /* This cleans only if it is not in the view */
4572     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4573   }
4574
4575
4576   /* Empty push/pull lists */
4577   CustomPeerMap_clear (sub->push_map);
4578   CustomPeerMap_clear (sub->pull_map);
4579
4580   if (sub == msub)
4581   {
4582     GNUNET_STATISTICS_set (stats,
4583                            "view size",
4584                            View_size (sub->view),
4585                            GNUNET_NO);
4586   }
4587
4588   struct GNUNET_TIME_Relative time_next_round;
4589
4590   time_next_round = compute_rand_delay (sub->round_interval, 2);
4591
4592   /* Schedule next round */
4593   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4594                                                      &do_round, sub);
4595   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4596 }
4597
4598
4599 /**
4600  * This is called from GNUNET_CADET_get_peers().
4601  *
4602  * It is called on every peer(ID) that cadet somehow has contact with.
4603  * We use those to initialise the sampler.
4604  *
4605  * implements #GNUNET_CADET_PeersCB
4606  *
4607  * @param cls Closure - Sub
4608  * @param peer Peer, or NULL on "EOF".
4609  * @param tunnel Do we have a tunnel towards this peer?
4610  * @param n_paths Number of known paths towards this peer.
4611  * @param best_path How long is the best path?
4612  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4613  */
4614 void
4615 init_peer_cb (void *cls,
4616               const struct GNUNET_PeerIdentity *peer,
4617               int tunnel, /* "Do we have a tunnel towards this peer?" */
4618               unsigned int n_paths, /* "Number of known paths towards this peer" */
4619               unsigned int best_path) /* "How long is the best path?
4620                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4621 {
4622   struct Sub *sub = cls;
4623
4624   (void) tunnel;
4625   (void) n_paths;
4626   (void) best_path;
4627
4628   if (NULL != peer)
4629   {
4630     LOG (GNUNET_ERROR_TYPE_DEBUG,
4631          "Got peer_id %s from cadet\n",
4632          GNUNET_i2s (peer));
4633     got_peer (sub, peer);
4634   }
4635 }
4636
4637
4638 /**
4639  * @brief Iterator function over stored, valid peers.
4640  *
4641  * We initialise the sampler with those.
4642  *
4643  * @param cls Closure - Sub
4644  * @param peer the peer id
4645  * @return #GNUNET_YES if we should continue to
4646  *         iterate,
4647  *         #GNUNET_NO if not.
4648  */
4649 static int
4650 valid_peers_iterator (void *cls,
4651                       const struct GNUNET_PeerIdentity *peer)
4652 {
4653   struct Sub *sub = cls;
4654
4655   if (NULL != peer)
4656   {
4657     LOG (GNUNET_ERROR_TYPE_DEBUG,
4658          "Got stored, valid peer %s\n",
4659          GNUNET_i2s (peer));
4660     got_peer (sub, peer);
4661   }
4662   return GNUNET_YES;
4663 }
4664
4665
4666 /**
4667  * Iterator over peers from peerinfo.
4668  *
4669  * @param cls Closure - Sub
4670  * @param peer id of the peer, NULL for last call
4671  * @param hello hello message for the peer (can be NULL)
4672  * @param error message
4673  */
4674 void
4675 process_peerinfo_peers (void *cls,
4676                         const struct GNUNET_PeerIdentity *peer,
4677                         const struct GNUNET_HELLO_Message *hello,
4678                         const char *err_msg)
4679 {
4680   struct Sub *sub = cls;
4681
4682   (void) hello;
4683   (void) err_msg;
4684
4685   if (NULL != peer)
4686   {
4687     LOG (GNUNET_ERROR_TYPE_DEBUG,
4688          "Got peer_id %s from peerinfo\n",
4689          GNUNET_i2s (peer));
4690     got_peer (sub, peer);
4691   }
4692 }
4693
4694
4695 /**
4696  * Task run during shutdown.
4697  *
4698  * @param cls Closure - unused
4699  */
4700 static void
4701 shutdown_task (void *cls)
4702 {
4703   (void) cls;
4704   struct ClientContext *client_ctx;
4705
4706   LOG (GNUNET_ERROR_TYPE_DEBUG,
4707        "RPS service is going down\n");
4708
4709   /* Clean all clients */
4710   for (client_ctx = cli_ctx_head;
4711        NULL != cli_ctx_head;
4712        client_ctx = cli_ctx_head)
4713   {
4714     destroy_cli_ctx (client_ctx);
4715   }
4716   if (NULL != msub)
4717   {
4718     destroy_sub (msub);
4719     msub = NULL;
4720   }
4721
4722   /* Disconnect from other services */
4723   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4724   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4725   peerinfo_handle = NULL;
4726   GNUNET_NSE_disconnect (nse);
4727   if (NULL != map_single_hop)
4728   {
4729     /* core_init was called - core was initialised */
4730     /* disconnect first, so no callback tries to access missing peermap */
4731     GNUNET_CORE_disconnect (core_handle);
4732     core_handle = NULL;
4733     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4734     map_single_hop = NULL;
4735   }
4736
4737   if (NULL != stats)
4738   {
4739     GNUNET_STATISTICS_destroy (stats,
4740                                GNUNET_NO);
4741     stats = NULL;
4742   }
4743   GNUNET_CADET_disconnect (cadet_handle);
4744   cadet_handle = NULL;
4745 #if ENABLE_MALICIOUS
4746   struct AttackedPeer *tmp_att_peer;
4747   GNUNET_array_grow (mal_peers,
4748                      num_mal_peers,
4749                      0);
4750   if (NULL != mal_peer_set)
4751     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4752   if (NULL != att_peer_set)
4753     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4754   while (NULL != att_peers_head)
4755   {
4756     tmp_att_peer = att_peers_head;
4757     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4758                                  att_peers_tail,
4759                                  tmp_att_peer);
4760     GNUNET_free (tmp_att_peer);
4761   }
4762 #endif /* ENABLE_MALICIOUS */
4763   close_all_files ();
4764 }
4765
4766
4767 /**
4768  * Handle client connecting to the service.
4769  *
4770  * @param cls unused
4771  * @param client the new client
4772  * @param mq the message queue of @a client
4773  * @return @a client
4774  */
4775 static void *
4776 client_connect_cb (void *cls,
4777                    struct GNUNET_SERVICE_Client *client,
4778                    struct GNUNET_MQ_Handle *mq)
4779 {
4780   struct ClientContext *cli_ctx;
4781
4782   (void) cls;
4783
4784   LOG (GNUNET_ERROR_TYPE_DEBUG,
4785        "Client connected\n");
4786   if (NULL == client)
4787     return client; /* Server was destroyed before a client connected. Shutting down */
4788   cli_ctx = GNUNET_new (struct ClientContext);
4789   cli_ctx->mq = mq;
4790   cli_ctx->view_updates_left = -1;
4791   cli_ctx->stream_update = GNUNET_NO;
4792   cli_ctx->client = client;
4793   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4794                                cli_ctx_tail,
4795                                cli_ctx);
4796   return cli_ctx;
4797 }
4798
4799
4800 /**
4801  * Callback called when a client disconnected from the service
4802  *
4803  * @param cls closure for the service
4804  * @param c the client that disconnected
4805  * @param internal_cls should be equal to @a c
4806  */
4807 static void
4808 client_disconnect_cb (void *cls,
4809                       struct GNUNET_SERVICE_Client *client,
4810                       void *internal_cls)
4811 {
4812   struct ClientContext *cli_ctx = internal_cls;
4813
4814   (void) cls;
4815   GNUNET_assert (client == cli_ctx->client);
4816   if (NULL == client)
4817   {  /* shutdown task - destroy all clients */
4818     while (NULL != cli_ctx_head)
4819       destroy_cli_ctx (cli_ctx_head);
4820   }
4821   else
4822   {   /* destroy this client */
4823     LOG (GNUNET_ERROR_TYPE_DEBUG,
4824          "Client disconnected. Destroy its context.\n");
4825     destroy_cli_ctx (cli_ctx);
4826   }
4827 }
4828
4829
4830 /**
4831  * Handle random peer sampling clients.
4832  *
4833  * @param cls closure
4834  * @param c configuration to use
4835  * @param service the initialized service
4836  */
4837 static void
4838 run (void *cls,
4839      const struct GNUNET_CONFIGURATION_Handle *c,
4840      struct GNUNET_SERVICE_Handle *service)
4841 {
4842   struct GNUNET_TIME_Relative round_interval;
4843   long long unsigned int sampler_size;
4844   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4845   struct GNUNET_HashCode hash;
4846
4847   (void) cls;
4848   (void) service;
4849
4850   GNUNET_log_setup ("rps",
4851                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4852                     NULL);
4853   cfg = c;
4854   /* Get own ID */
4855   GNUNET_CRYPTO_get_peer_identity (cfg,
4856                                    &own_identity); // TODO check return value
4857   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4858               "STARTING SERVICE (rps) for peer [%s]\n",
4859               GNUNET_i2s (&own_identity));
4860 #if ENABLE_MALICIOUS
4861   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4862               "Malicious execution compiled in.\n");
4863 #endif /* ENABLE_MALICIOUS */
4864
4865   /* Get time interval from the configuration */
4866   if (GNUNET_OK !=
4867       GNUNET_CONFIGURATION_get_value_time (cfg,
4868                                            "RPS",
4869                                            "ROUNDINTERVAL",
4870                                            &round_interval))
4871   {
4872     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4873                                "RPS", "ROUNDINTERVAL");
4874     GNUNET_SCHEDULER_shutdown ();
4875     return;
4876   }
4877
4878   /* Get initial size of sampler/view from the configuration */
4879   if (GNUNET_OK !=
4880       GNUNET_CONFIGURATION_get_value_number (cfg,
4881                                              "RPS",
4882                                              "MINSIZE",
4883                                              &sampler_size))
4884   {
4885     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4886                                "RPS", "MINSIZE");
4887     GNUNET_SCHEDULER_shutdown ();
4888     return;
4889   }
4890
4891   cadet_handle = GNUNET_CADET_connect (cfg);
4892   GNUNET_assert (NULL != cadet_handle);
4893   core_handle = GNUNET_CORE_connect (cfg,
4894                                      NULL, /* cls */
4895                                      core_init, /* init */
4896                                      core_connects, /* connects */
4897                                      core_disconnects, /* disconnects */
4898                                      NULL); /* handlers */
4899   GNUNET_assert (NULL != core_handle);
4900
4901
4902   alpha = 0.45;
4903   beta = 0.45;
4904
4905
4906   /* Set up main Sub */
4907   GNUNET_CRYPTO_hash (hash_port_string,
4908                       strlen (hash_port_string),
4909                       &hash);
4910   msub = new_sub (&hash,
4911                   sampler_size, /* Will be overwritten by config */
4912                   round_interval);
4913
4914
4915   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4916
4917   /* connect to NSE */
4918   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4919
4920   // LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4921   // GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4922   // TODO send push/pull to each of those peers?
4923   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4924   restore_valid_peers (msub);
4925   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4926
4927   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4928                                                    GNUNET_NO,
4929                                                    process_peerinfo_peers,
4930                                                    msub);
4931
4932   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4933
4934   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4935   stats = GNUNET_STATISTICS_create ("rps", cfg);
4936 }
4937
4938
4939 /**
4940  * Define "main" method using service macro.
4941  */
4942 GNUNET_SERVICE_MAIN
4943   ("rps",
4944   GNUNET_SERVICE_OPTION_NONE,
4945   &run,
4946   &client_connect_cb,
4947   &client_disconnect_cb,
4948   NULL,
4949   GNUNET_MQ_hd_var_size (client_seed,
4950                          GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4951                          struct GNUNET_RPS_CS_SeedMessage,
4952                          NULL),
4953 #if ENABLE_MALICIOUS
4954   GNUNET_MQ_hd_var_size (client_act_malicious,
4955                          GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4956                          struct GNUNET_RPS_CS_ActMaliciousMessage,
4957                          NULL),
4958 #endif /* ENABLE_MALICIOUS */
4959   GNUNET_MQ_hd_fixed_size (client_view_request,
4960                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4961                            struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4962                            NULL),
4963   GNUNET_MQ_hd_fixed_size (client_view_cancel,
4964                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4965                            struct GNUNET_MessageHeader,
4966                            NULL),
4967   GNUNET_MQ_hd_fixed_size (client_stream_request,
4968                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4969                            struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4970                            NULL),
4971   GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4972                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4973                            struct GNUNET_MessageHeader,
4974                            NULL),
4975   GNUNET_MQ_hd_fixed_size (client_start_sub,
4976                            GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4977                            struct GNUNET_RPS_CS_SubStartMessage,
4978                            NULL),
4979   GNUNET_MQ_hd_fixed_size (client_stop_sub,
4980                            GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4981                            struct GNUNET_RPS_CS_SubStopMessage,
4982                            NULL),
4983   GNUNET_MQ_handler_end ());
4984
4985 /* end of gnunet-service-rps.c */