Merge branch 'master' of gnunet.org:gnunet
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
34 #include "rps.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
39
40 #include <math.h>
41 #include <inttypes.h>
42 #include <string.h>
43
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /***********************************************************************
57  * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
59
60 /**
61  * Set a peer flag of given peer context.
62  */
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
64
65 /**
66  * Get peer flag of given peer context.
67  */
68 #define check_peer_flag_set(peer_ctx, mask)\
69   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
70
71 /**
72  * Unset flag of given peer context.
73  */
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
75
76 /**
77  * Get channel flag of given channel context.
78  */
79 #define check_channel_flag_set(channel_flags, mask)\
80   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
81
82 /**
83  * Unset flag of given channel context.
84  */
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
86
87
88
89 /**
90  * Pending operation on peer consisting of callback and closure
91  *
92  * When an operation cannot be executed right now this struct is used to store
93  * the callback and closure for later execution.
94  */
95 struct PeerPendingOp
96 {
97   /**
98    * Callback
99    */
100   PeerOp op;
101
102   /**
103    * Closure
104    */
105   void *op_cls;
106 };
107
108 /**
109  * List containing all messages that are yet to be send
110  *
111  * This is used to keep track of all messages that have not been sent yet. When
112  * a peer is to be removed the pending messages can be removed properly.
113  */
114 struct PendingMessage
115 {
116   /**
117    * DLL next, prev
118    */
119   struct PendingMessage *next;
120   struct PendingMessage *prev;
121
122   /**
123    * The envelope to the corresponding message
124    */
125   struct GNUNET_MQ_Envelope *ev;
126
127   /**
128    * The corresponding context
129    */
130   struct PeerContext *peer_ctx;
131
132   /**
133    * The message type
134    */
135   const char *type;
136 };
137
138 /**
139  * @brief Context for a channel
140  */
141 struct ChannelCtx;
142
143 /**
144  * Struct used to keep track of other peer's status
145  *
146  * This is stored in a multipeermap.
147  * It contains information such as cadet channels, a message queue for sending,
148  * status about the channels, the pending operations on this peer and some flags
149  * about the status of the peer itself. (online, valid, ...)
150  */
151 struct PeerContext
152 {
153   /**
154    * The Sub this context belongs to.
155    */
156   struct Sub *sub;
157
158   /**
159    * Message queue open to client
160    */
161   struct GNUNET_MQ_Handle *mq;
162
163   /**
164    * Channel open to client.
165    */
166   struct ChannelCtx *send_channel_ctx;
167
168   /**
169    * Channel open from client.
170    */
171   struct ChannelCtx *recv_channel_ctx;
172
173   /**
174    * Array of pending operations on this peer.
175    */
176   struct PeerPendingOp *pending_ops;
177
178   /**
179    * Handle to the callback given to cadet_ntfy_tmt_rdy()
180    *
181    * To be canceled on shutdown.
182    */
183   struct PendingMessage *online_check_pending;
184
185   /**
186    * Number of pending operations.
187    */
188   unsigned int num_pending_ops;
189
190   /**
191    * Identity of the peer
192    */
193   struct GNUNET_PeerIdentity peer_id;
194
195   /**
196    * Flags indicating status of peer
197    */
198   uint32_t peer_flags;
199
200   /**
201    * Last time we received something from that peer.
202    */
203   struct GNUNET_TIME_Absolute last_message_recv;
204
205   /**
206    * Last time we received a keepalive message.
207    */
208   struct GNUNET_TIME_Absolute last_keepalive;
209
210   /**
211    * DLL with all messages that are yet to be sent
212    */
213   struct PendingMessage *pending_messages_head;
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * This is pobably followed by 'statistical' data (when we first saw
218    * it, how did we get its ID, how many pushes (in a timeinterval),
219    * ...)
220    */
221   uint32_t round_pull_req;
222 };
223
224 /**
225  * @brief Closure to #valid_peer_iterator
226  */
227 struct PeersIteratorCls
228 {
229   /**
230    * Iterator function
231    */
232   PeersIterator iterator;
233
234   /**
235    * Closure to iterator
236    */
237   void *cls;
238 };
239
240 /**
241  * @brief Context for a channel
242  */
243 struct ChannelCtx
244 {
245   /**
246    * @brief The channel itself
247    */
248   struct GNUNET_CADET_Channel *channel;
249
250   /**
251    * @brief The peer context associated with the channel
252    */
253   struct PeerContext *peer_ctx;
254
255   /**
256    * @brief When channel destruction needs to be delayed (because it is called
257    * from within the cadet routine of another channel destruction) this task
258    * refers to the respective _SCHEDULER_Task.
259    */
260   struct GNUNET_SCHEDULER_Task *destruction_task;
261 };
262
263
264 #if ENABLE_MALICIOUS
265
266 /**
267  * If type is 2 This struct is used to store the attacked peers in a DLL
268  */
269 struct AttackedPeer
270 {
271   /**
272    * DLL
273    */
274   struct AttackedPeer *next;
275   struct AttackedPeer *prev;
276
277   /**
278    * PeerID
279    */
280   struct GNUNET_PeerIdentity peer_id;
281 };
282
283 #endif /* ENABLE_MALICIOUS */
284
285 /**
286  * @brief This number determines the number of slots for files that represent
287  * histograms
288  */
289 #define HISTOGRAM_FILE_SLOTS 32
290
291 /**
292  * @brief The size (in bytes) a file needs to store the histogram
293  *
294  * Per slot: 1 newline, up to 4 chars,
295  * Additionally: 1 null termination
296  */
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
298
299 /**
300  * @brief One Sub.
301  *
302  * Essentially one instance of brahms that only connects to other instances
303  * with the same (secret) value.
304  */
305 struct Sub
306 {
307   /**
308    * @brief Hash of the shared value that defines Subs.
309    */
310   struct GNUNET_HashCode hash;
311
312   /**
313    * @brief Port to communicate to other peers.
314    */
315   struct GNUNET_CADET_Port *cadet_port;
316
317   /**
318    * @brief Hashmap of valid peers.
319    */
320   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
321
322   /**
323    * @brief Filename of the file that stores the valid peers persistently.
324    */
325   char *filename_valid_peers;
326
327   /**
328    * Set of all peers to keep track of them.
329    */
330   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
331
332   /**
333    * @brief This is the minimum estimate used as sampler size.
334    *
335    * It is configured by the user.
336    */
337   unsigned int sampler_size_est_min;
338
339   /**
340    * The size of sampler we need to be able to satisfy the Brahms protocol's
341    * need of random peers.
342    *
343    * This is one minimum size the sampler grows to.
344    */
345   unsigned int sampler_size_est_need;
346
347   /**
348    * Time inverval the do_round task runs in.
349    */
350   struct GNUNET_TIME_Relative round_interval;
351
352   /**
353    * Sampler used for the Brahms protocol itself.
354    */
355   struct RPS_Sampler *sampler;
356
357 #ifdef TO_FILE_FULL
358   /**
359    * Name to log view to
360    */
361   char *file_name_view_log;
362 #endif /* TO_FILE_FULL */
363
364 #ifdef TO_FILE
365 #ifdef TO_FILE_FULL
366   /**
367    * Name to log number of observed peers to
368    */
369   char *file_name_observed_log;
370 #endif /* TO_FILE_FULL */
371
372   /**
373    * @brief Count the observed peers
374    */
375   uint32_t num_observed_peers;
376
377   /**
378    * @brief Multipeermap (ab-) used to count unique peer_ids
379    */
380   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
381 #endif /* TO_FILE */
382
383   /**
384    * List to store peers received through pushes temporary.
385    */
386   struct CustomPeerMap *push_map;
387
388   /**
389    * List to store peers received through pulls temporary.
390    */
391   struct CustomPeerMap *pull_map;
392
393   /**
394    * @brief This is the estimate used as view size.
395    *
396    * It is initialised with the minimum
397    */
398   unsigned int view_size_est_need;
399
400   /**
401    * @brief This is the minimum estimate used as view size.
402    *
403    * It is configured by the user.
404    */
405   unsigned int view_size_est_min;
406
407   /**
408    * @brief The view.
409    */
410   struct View *view;
411
412   /**
413    * Identifier for the main task that runs periodically.
414    */
415   struct GNUNET_SCHEDULER_Task *do_round_task;
416
417   /* === stats === */
418
419   /**
420    * @brief Counts the executed rounds.
421    */
422   uint32_t num_rounds;
423
424   /**
425    * @brief This array accumulates the number of received pushes per round.
426    *
427    * Number at index i represents the number of rounds with i observed pushes.
428    */
429   uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
430
431   /**
432    * @brief Histogram of deltas between the expected and actual number of
433    * received pushes.
434    *
435    * As half of the entries are expected to be negative, this is shifted by
436    * #HISTOGRAM_FILE_SLOTS/2.
437    */
438   uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
439
440   /**
441    * @brief Number of pull replies with this delay measured in rounds.
442    *
443    * Number at index i represents the number of pull replies with a delay of i
444    * rounds.
445    */
446   uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
447 };
448
449
450 /***********************************************************************
451  * Globals
452 ***********************************************************************/
453
454 /**
455  * Our configuration.
456  */
457 static const struct GNUNET_CONFIGURATION_Handle *cfg;
458
459 /**
460  * Handle to the statistics service.
461  */
462 struct GNUNET_STATISTICS_Handle *stats;
463
464 /**
465  * Handler to CADET.
466  */
467 struct GNUNET_CADET_Handle *cadet_handle;
468
469 /**
470  * Handle to CORE
471  */
472 struct GNUNET_CORE_Handle *core_handle;
473
474 /**
475  * @brief PeerMap to keep track of connected peers.
476  */
477 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478
479 /**
480  * Our own identity.
481  */
482 static struct GNUNET_PeerIdentity own_identity;
483
484 /**
485  * Percentage of total peer number in the view
486  * to send random PUSHes to
487  */
488 static float alpha;
489
490 /**
491  * Percentage of total peer number in the view
492  * to send random PULLs to
493  */
494 static float beta;
495
496 /**
497  * Handler to NSE.
498  */
499 static struct GNUNET_NSE_Handle *nse;
500
501 /**
502  * Handler to PEERINFO.
503  */
504 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
505
506 /**
507  * Handle for cancellation of iteration over peers.
508  */
509 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510
511
512 #if ENABLE_MALICIOUS
513 /**
514  * Type of malicious peer
515  *
516  * 0 Don't act malicious at all - Default
517  * 1 Try to maximise representation
518  * 2 Try to partition the network
519  * 3 Combined attack
520  */
521 static uint32_t mal_type;
522
523 /**
524  * Other malicious peers
525  */
526 static struct GNUNET_PeerIdentity *mal_peers;
527
528 /**
529  * Hashmap of malicious peers used as set.
530  * Used to more efficiently check whether we know that peer.
531  */
532 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
533
534 /**
535  * Number of other malicious peers
536  */
537 static uint32_t num_mal_peers;
538
539
540 /**
541  * If type is 2 this is the DLL of attacked peers
542  */
543 static struct AttackedPeer *att_peers_head;
544 static struct AttackedPeer *att_peers_tail;
545
546 /**
547  * This index is used to point to an attacked peer to
548  * implement the round-robin-ish way to select attacked peers.
549  */
550 static struct AttackedPeer *att_peer_index;
551
552 /**
553  * Hashmap of attacked peers used as set.
554  * Used to more efficiently check whether we know that peer.
555  */
556 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
557
558 /**
559  * Number of attacked peers
560  */
561 static uint32_t num_attacked_peers;
562
563 /**
564  * If type is 1 this is the attacked peer
565  */
566 static struct GNUNET_PeerIdentity attacked_peer;
567
568 /**
569  * The limit of PUSHes we can send in one round.
570  * This is an assumption of the Brahms protocol and either implemented
571  * via proof of work
572  * or
573  * assumend to be the bandwidth limitation.
574  */
575 static uint32_t push_limit = 10000;
576 #endif /* ENABLE_MALICIOUS */
577
578 /**
579  * @brief Main Sub.
580  *
581  * This is run in any case by all peers and connects to all peers without
582  * specifying a shared value.
583  */
584 static struct Sub *msub;
585
586 /**
587  * @brief Maximum number of valid peers to keep.
588  * TODO read from config
589  */
590 static const uint32_t num_valid_peers_max = UINT32_MAX;
591
592 /***********************************************************************
593  * /Globals
594 ***********************************************************************/
595
596
597 static void
598 do_round (void *cls);
599
600 static void
601 do_mal_round (void *cls);
602
603
604 /**
605  * @brief Get the #PeerContext associated with a peer
606  *
607  * @param peer_map The peer map containing the context
608  * @param peer the peer id
609  *
610  * @return the #PeerContext
611  */
612 static struct PeerContext *
613 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
614               const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *ctx;
617   int ret;
618
619   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
620   GNUNET_assert (GNUNET_YES == ret);
621   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
622   GNUNET_assert (NULL != ctx);
623   return ctx;
624 }
625
626 /**
627  * @brief Check whether we have information about the given peer.
628  *
629  * FIXME probably deprecated. Make this the new _online.
630  *
631  * @param peer_map The peer map to check for the existence of @a peer
632  * @param peer peer in question
633  *
634  * @return #GNUNET_YES if peer is known
635  *         #GNUNET_NO  if peer is not knwon
636  */
637 static int
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639                   const struct GNUNET_PeerIdentity *peer)
640 {
641   if (NULL != peer_map)
642   {
643     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
644   }
645   else
646   {
647     return GNUNET_NO;
648   }
649 }
650
651
652 /**
653  * @brief Create a new #PeerContext and insert it into the peer map
654  *
655  * @param sub The Sub this context belongs to.
656  * @param peer the peer to create the #PeerContext for
657  *
658  * @return the #PeerContext
659  */
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662                  const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *ctx;
665   int ret;
666
667   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
668
669   ctx = GNUNET_new (struct PeerContext);
670   ctx->peer_id = *peer;
671   ctx->sub = sub;
672   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674   GNUNET_assert (GNUNET_OK == ret);
675   if (sub == msub)
676   {
677     GNUNET_STATISTICS_set (stats,
678                           "# known peers",
679                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
680                           GNUNET_NO);
681   }
682   return ctx;
683 }
684
685
686 /**
687  * @brief Create or get a #PeerContext
688  *
689  * @param sub The Sub to which the created context belongs to
690  * @param peer the peer to get the associated context to
691  *
692  * @return the context
693  */
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696                         const struct GNUNET_PeerIdentity *peer)
697 {
698   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
699   {
700     return create_peer_ctx (sub, peer);
701   }
702   return get_peer_ctx (sub->peer_map, peer);
703 }
704
705
706 /**
707  * @brief Check whether we have a connection to this @a peer
708  *
709  * Also sets the #Peers_ONLINE flag accordingly
710  *
711  * @param peer_ctx Context of the peer of which connectivity is to be checked
712  *
713  * @return #GNUNET_YES if we are connected
714  *         #GNUNET_NO  otherwise
715  */
716 static int
717 check_connected (struct PeerContext *peer_ctx)
718 {
719   /* If we don't know about this peer we don't know whether it's online */
720   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721                                      &peer_ctx->peer_id))
722   {
723     return GNUNET_NO;
724   }
725   /* Get the context */
726   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727   /* If we have no channel to this peer we don't know whether it's online */
728   if ( (NULL == peer_ctx->send_channel_ctx) &&
729        (NULL == peer_ctx->recv_channel_ctx) )
730   {
731     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
732     return GNUNET_NO;
733   }
734   /* Otherwise (if we have a channel, we know that it's online */
735   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
736   return GNUNET_YES;
737 }
738
739
740 /**
741  * @brief The closure to #get_rand_peer_iterator.
742  */
743 struct GetRandPeerIteratorCls
744 {
745   /**
746    * @brief The index of the peer to return.
747    * Will be decreased until 0.
748    * Then current peer is returned.
749    */
750   uint32_t index;
751
752   /**
753    * @brief Pointer to peer to return.
754    */
755   const struct GNUNET_PeerIdentity *peer;
756 };
757
758
759 /**
760  * @brief Iterator function for #get_random_peer_from_peermap.
761  *
762  * Implements #GNUNET_CONTAINER_PeerMapIterator.
763  * Decreases the index until the index is null.
764  * Then returns the current peer.
765  *
766  * @param cls the #GetRandPeerIteratorCls containing index and peer
767  * @param peer current peer
768  * @param value unused
769  *
770  * @return  #GNUNET_YES if we should continue to
771  *          iterate,
772  *          #GNUNET_NO if not.
773  */
774 static int
775 get_rand_peer_iterator (void *cls,
776                         const struct GNUNET_PeerIdentity *peer,
777                         void *value)
778 {
779   struct GetRandPeerIteratorCls *iterator_cls = cls;
780   (void) value;
781
782   if (0 >= iterator_cls->index)
783   {
784     iterator_cls->peer = peer;
785     return GNUNET_NO;
786   }
787   iterator_cls->index--;
788   return GNUNET_YES;
789 }
790
791
792 /**
793  * @brief Get a random peer from @a peer_map
794  *
795  * @param valid_peers Peer map containing valid peers from which to select a
796  * random one
797  *
798  * @return a random peer
799  */
800 static const struct GNUNET_PeerIdentity *
801 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
802 {
803   struct GetRandPeerIteratorCls *iterator_cls;
804   const struct GNUNET_PeerIdentity *ret;
805
806   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
807   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
808       GNUNET_CONTAINER_multipeermap_size (valid_peers));
809   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
810                                                 get_rand_peer_iterator,
811                                                 iterator_cls);
812   ret = iterator_cls->peer;
813   GNUNET_free (iterator_cls);
814   return ret;
815 }
816
817
818 /**
819  * @brief Add a given @a peer to valid peers.
820  *
821  * If valid peers are already #num_valid_peers_max, delete a peer previously.
822  *
823  * @param peer The peer that is added to the valid peers.
824  * @param valid_peers Peer map of valid peers to which to add the @a peer
825  *
826  * @return #GNUNET_YES if no other peer had to be removed
827  *         #GNUNET_NO  otherwise
828  */
829 static int
830 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
831                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
832 {
833   const struct GNUNET_PeerIdentity *rand_peer;
834   int ret;
835
836   ret = GNUNET_YES;
837   /* Remove random peers until there is space for a new one */
838   while (num_valid_peers_max <=
839          GNUNET_CONTAINER_multipeermap_size (valid_peers))
840   {
841     rand_peer = get_random_peer_from_peermap (valid_peers);
842     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
843     ret = GNUNET_NO;
844   }
845   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
846       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
847   if (valid_peers == msub->valid_peers)
848   {
849     GNUNET_STATISTICS_set (stats,
850                            "# valid peers",
851                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
852                            GNUNET_NO);
853   }
854   return ret;
855 }
856
857 static void
858 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
859
860 /**
861  * @brief Set the peer flag to living and
862  *        call the pending operations on this peer.
863  *
864  * Also adds peer to #valid_peers.
865  *
866  * @param peer_ctx the #PeerContext of the peer to set online
867  */
868 static void
869 set_peer_online (struct PeerContext *peer_ctx)
870 {
871   struct GNUNET_PeerIdentity *peer;
872   unsigned int i;
873
874   peer = &peer_ctx->peer_id;
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876       "Peer %s is online and valid, calling %i pending operations on it\n",
877       GNUNET_i2s (peer),
878       peer_ctx->num_pending_ops);
879
880   if (NULL != peer_ctx->online_check_pending)
881   {
882     LOG (GNUNET_ERROR_TYPE_DEBUG,
883          "Removing pending online check for peer %s\n",
884          GNUNET_i2s (&peer_ctx->peer_id));
885     // TODO wait until cadet sets mq->cancel_impl
886     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
887     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
888     peer_ctx->online_check_pending = NULL;
889   }
890
891   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
892
893   /* Call pending operations */
894   for (i = 0; i < peer_ctx->num_pending_ops; i++)
895   {
896     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
897   }
898   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
899 }
900
901 static void
902 cleanup_destroyed_channel (void *cls,
903                            const struct GNUNET_CADET_Channel *channel);
904
905 /* Declaration of handlers */
906 static void
907 handle_peer_check (void *cls,
908                    const struct GNUNET_MessageHeader *msg);
909
910 static void
911 handle_peer_push (void *cls,
912                   const struct GNUNET_MessageHeader *msg);
913
914 static void
915 handle_peer_pull_request (void *cls,
916                           const struct GNUNET_MessageHeader *msg);
917
918 static int
919 check_peer_pull_reply (void *cls,
920                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
921
922 static void
923 handle_peer_pull_reply (void *cls,
924                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
925
926 /* End declaration of handlers */
927
928 /**
929  * @brief Allocate memory for a new channel context and insert it into DLL
930  *
931  * @param peer_ctx context of the according peer
932  *
933  * @return The channel context
934  */
935 static struct ChannelCtx *
936 add_channel_ctx (struct PeerContext *peer_ctx)
937 {
938   struct ChannelCtx *channel_ctx;
939   channel_ctx = GNUNET_new (struct ChannelCtx);
940   channel_ctx->peer_ctx = peer_ctx;
941   return channel_ctx;
942 }
943
944
945 /**
946  * @brief Free memory and NULL pointers.
947  *
948  * @param channel_ctx The channel context.
949  */
950 static void
951 remove_channel_ctx (struct ChannelCtx *channel_ctx)
952 {
953   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
954
955   if (NULL != channel_ctx->destruction_task)
956   {
957     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
958     channel_ctx->destruction_task = NULL;
959   }
960
961   GNUNET_free (channel_ctx);
962
963   if (NULL == peer_ctx) return;
964   if (channel_ctx == peer_ctx->send_channel_ctx)
965   {
966     peer_ctx->send_channel_ctx = NULL;
967     peer_ctx->mq = NULL;
968   }
969   else if (channel_ctx == peer_ctx->recv_channel_ctx)
970   {
971     peer_ctx->recv_channel_ctx = NULL;
972   }
973 }
974
975
976 /**
977  * @brief Get the channel of a peer. If not existing, create.
978  *
979  * @param peer_ctx Context of the peer of which to get the channel
980  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
981  */
982 struct GNUNET_CADET_Channel *
983 get_channel (struct PeerContext *peer_ctx)
984 {
985   /* There exists a copy-paste-clone in run() */
986   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
987     GNUNET_MQ_hd_fixed_size (peer_check,
988                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
989                              struct GNUNET_MessageHeader,
990                              NULL),
991     GNUNET_MQ_hd_fixed_size (peer_push,
992                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
993                              struct GNUNET_MessageHeader,
994                              NULL),
995     GNUNET_MQ_hd_fixed_size (peer_pull_request,
996                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
997                              struct GNUNET_MessageHeader,
998                              NULL),
999     GNUNET_MQ_hd_var_size (peer_pull_reply,
1000                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1001                            struct GNUNET_RPS_P2P_PullReplyMessage,
1002                            NULL),
1003     GNUNET_MQ_handler_end ()
1004   };
1005
1006
1007   if (NULL == peer_ctx->send_channel_ctx)
1008   {
1009     LOG (GNUNET_ERROR_TYPE_DEBUG,
1010          "Trying to establish channel to peer %s\n",
1011          GNUNET_i2s (&peer_ctx->peer_id));
1012     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1013     peer_ctx->send_channel_ctx->channel =
1014       GNUNET_CADET_channel_create (cadet_handle,
1015                                    peer_ctx->send_channel_ctx, /* context */
1016                                    &peer_ctx->peer_id,
1017                                    &peer_ctx->sub->hash,
1018                                    GNUNET_CADET_OPTION_RELIABLE,
1019                                    NULL, /* WindowSize handler */
1020                                    &cleanup_destroyed_channel, /* Disconnect handler */
1021                                    cadet_handlers);
1022   }
1023   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1024   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1025   return peer_ctx->send_channel_ctx->channel;
1026 }
1027
1028
1029 /**
1030  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1031  *
1032  * If we already have a message queue open to this client,
1033  * simply return it, otherways create one.
1034  *
1035  * @param peer_ctx Context of the peer of whicht to get the mq
1036  * @return the #GNUNET_MQ_Handle
1037  */
1038 static struct GNUNET_MQ_Handle *
1039 get_mq (struct PeerContext *peer_ctx)
1040 {
1041   if (NULL == peer_ctx->mq)
1042   {
1043     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1044   }
1045   return peer_ctx->mq;
1046 }
1047
1048 /**
1049  * @brief Add an envelope to a message passed to mq to list of pending messages
1050  *
1051  * @param peer_ctx Context of the peer for which to insert the envelope
1052  * @param ev envelope to the message
1053  * @param type type of the message to be sent
1054  * @return pointer to pending message
1055  */
1056 static struct PendingMessage *
1057 insert_pending_message (struct PeerContext *peer_ctx,
1058                         struct GNUNET_MQ_Envelope *ev,
1059                         const char *type)
1060 {
1061   struct PendingMessage *pending_msg;
1062
1063   pending_msg = GNUNET_new (struct PendingMessage);
1064   pending_msg->ev = ev;
1065   pending_msg->peer_ctx = peer_ctx;
1066   pending_msg->type = type;
1067   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1068                                peer_ctx->pending_messages_tail,
1069                                pending_msg);
1070   return pending_msg;
1071 }
1072
1073
1074 /**
1075  * @brief Remove a pending message from the respective DLL
1076  *
1077  * @param pending_msg the pending message to remove
1078  * @param cancel whether to cancel the pending message, too
1079  */
1080 static void
1081 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1082 {
1083   struct PeerContext *peer_ctx;
1084   (void) cancel;
1085
1086   peer_ctx = pending_msg->peer_ctx;
1087   GNUNET_assert (NULL != peer_ctx);
1088   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1089                                peer_ctx->pending_messages_tail,
1090                                pending_msg);
1091   // TODO wait for the cadet implementation of message cancellation
1092   //if (GNUNET_YES == cancel)
1093   //{
1094   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1095   //}
1096   GNUNET_free (pending_msg);
1097 }
1098
1099
1100 /**
1101  * @brief This is called in response to the first message we sent as a
1102  * online check.
1103  *
1104  * @param cls #PeerContext of peer with pending online check
1105  */
1106 static void
1107 mq_online_check_successful (void *cls)
1108 {
1109   struct PeerContext *peer_ctx = cls;
1110
1111   if (NULL != peer_ctx->online_check_pending)
1112   {
1113     LOG (GNUNET_ERROR_TYPE_DEBUG,
1114         "Online check for peer %s was successfull\n",
1115         GNUNET_i2s (&peer_ctx->peer_id));
1116     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1117     peer_ctx->online_check_pending = NULL;
1118     set_peer_online (peer_ctx);
1119     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1120   }
1121 }
1122
1123 /**
1124  * Issue a check whether peer is online
1125  *
1126  * @param peer_ctx the context of the peer
1127  */
1128 static void
1129 check_peer_online (struct PeerContext *peer_ctx)
1130 {
1131   LOG (GNUNET_ERROR_TYPE_DEBUG,
1132        "Get informed about peer %s getting online\n",
1133        GNUNET_i2s (&peer_ctx->peer_id));
1134
1135   struct GNUNET_MQ_Handle *mq;
1136   struct GNUNET_MQ_Envelope *ev;
1137
1138   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1139   peer_ctx->online_check_pending =
1140     insert_pending_message (peer_ctx, ev, "Check online");
1141   mq = get_mq (peer_ctx);
1142   GNUNET_MQ_notify_sent (ev,
1143                          mq_online_check_successful,
1144                          peer_ctx);
1145   GNUNET_MQ_send (mq, ev);
1146   if (peer_ctx->sub == msub)
1147   {
1148     GNUNET_STATISTICS_update (stats,
1149                               "# pending online checks",
1150                               1,
1151                               GNUNET_NO);
1152   }
1153 }
1154
1155
1156 /**
1157  * @brief Check whether function of type #PeerOp was already scheduled
1158  *
1159  * The array with pending operations will probably never grow really big, so
1160  * iterating over it should be ok.
1161  *
1162  * @param peer_ctx Context of the peer to check for the operation
1163  * @param peer_op the operation (#PeerOp) on the peer
1164  *
1165  * @return #GNUNET_YES if this operation is scheduled on that peer
1166  *         #GNUNET_NO  otherwise
1167  */
1168 static int
1169 check_operation_scheduled (const struct PeerContext *peer_ctx,
1170                            const PeerOp peer_op)
1171 {
1172   unsigned int i;
1173
1174   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1175     if (peer_op == peer_ctx->pending_ops[i].op)
1176       return GNUNET_YES;
1177   return GNUNET_NO;
1178 }
1179
1180
1181 /**
1182  * @brief Callback for scheduler to destroy a channel
1183  *
1184  * @param cls Context of the channel
1185  */
1186 static void
1187 destroy_channel (struct ChannelCtx *channel_ctx)
1188 {
1189   struct GNUNET_CADET_Channel *channel;
1190
1191   if (NULL != channel_ctx->destruction_task)
1192   {
1193     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1194     channel_ctx->destruction_task = NULL;
1195   }
1196   GNUNET_assert (channel_ctx->channel != NULL);
1197   channel = channel_ctx->channel;
1198   channel_ctx->channel = NULL;
1199   GNUNET_CADET_channel_destroy (channel);
1200   remove_channel_ctx (channel_ctx);
1201 }
1202
1203
1204 /**
1205  * @brief Destroy a cadet channel.
1206  *
1207  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1208  *
1209  * @param cls
1210  */
1211 static void
1212 destroy_channel_cb (void *cls)
1213 {
1214   struct ChannelCtx *channel_ctx = cls;
1215
1216   channel_ctx->destruction_task = NULL;
1217   destroy_channel (channel_ctx);
1218 }
1219
1220
1221 /**
1222  * @brief Schedule the destruction of a channel for immediately afterwards.
1223  *
1224  * In case a channel is to be destroyed from within the callback to the
1225  * destruction of another channel (send channel), we cannot call
1226  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1227  * construction.
1228  *
1229  * @param channel_ctx channel to be destroyed.
1230  */
1231 static void
1232 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1233 {
1234   GNUNET_assert (NULL ==
1235                  channel_ctx->destruction_task);
1236   GNUNET_assert (NULL !=
1237                  channel_ctx->channel);
1238   channel_ctx->destruction_task =
1239     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1240                               channel_ctx);
1241 }
1242
1243
1244 /**
1245  * @brief Remove peer
1246  *
1247  * - Empties the list with pending operations
1248  * - Empties the list with pending messages
1249  * - Cancels potentially existing online check
1250  * - Schedules closing of send and recv channels
1251  * - Removes peer from peer map
1252  *
1253  * @param peer_ctx Context of the peer to be destroyed
1254  * @return #GNUNET_YES if peer was removed
1255  *         #GNUNET_NO  otherwise
1256  */
1257 static int
1258 destroy_peer (struct PeerContext *peer_ctx)
1259 {
1260   GNUNET_assert (NULL != peer_ctx);
1261   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1262   if (GNUNET_NO ==
1263       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1264                                               &peer_ctx->peer_id))
1265   {
1266     return GNUNET_NO;
1267   }
1268   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1269   LOG (GNUNET_ERROR_TYPE_DEBUG,
1270        "Going to remove peer %s\n",
1271        GNUNET_i2s (&peer_ctx->peer_id));
1272   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1273
1274   /* Clear list of pending operations */
1275   // TODO this probably leaks memory
1276   //      ('only' the cls to the function. Not sure what to do with it)
1277   GNUNET_array_grow (peer_ctx->pending_ops,
1278                      peer_ctx->num_pending_ops,
1279                      0);
1280   /* Remove all pending messages */
1281   while (NULL != peer_ctx->pending_messages_head)
1282   {
1283     LOG (GNUNET_ERROR_TYPE_DEBUG,
1284          "Removing unsent %s\n",
1285          peer_ctx->pending_messages_head->type);
1286     /* Cancle pending message, too */
1287     if ( (NULL != peer_ctx->online_check_pending) &&
1288          (0 == memcmp (peer_ctx->pending_messages_head,
1289                      peer_ctx->online_check_pending,
1290                      sizeof (struct PendingMessage))) )
1291       {
1292         peer_ctx->online_check_pending = NULL;
1293         if (peer_ctx->sub == msub)
1294         {
1295           GNUNET_STATISTICS_update (stats,
1296                                     "# pending online checks",
1297                                     -1,
1298                                     GNUNET_NO);
1299         }
1300       }
1301     remove_pending_message (peer_ctx->pending_messages_head,
1302                             GNUNET_YES);
1303   }
1304
1305   /* If we are still waiting for notification whether this peer is online
1306    * cancel the according task */
1307   if (NULL != peer_ctx->online_check_pending)
1308   {
1309     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1310                 "Removing pending online check for peer %s\n",
1311                 GNUNET_i2s (&peer_ctx->peer_id));
1312     // TODO wait until cadet sets mq->cancel_impl
1313     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1314     remove_pending_message (peer_ctx->online_check_pending,
1315                             GNUNET_YES);
1316     peer_ctx->online_check_pending = NULL;
1317   }
1318
1319   if (NULL != peer_ctx->send_channel_ctx)
1320   {
1321     /* This is possibly called from within channel destruction */
1322     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1323     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1324     peer_ctx->send_channel_ctx = NULL;
1325     peer_ctx->mq = NULL;
1326   }
1327   if (NULL != peer_ctx->recv_channel_ctx)
1328   {
1329     /* This is possibly called from within channel destruction */
1330     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1331     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1332     peer_ctx->recv_channel_ctx = NULL;
1333   }
1334
1335   if (GNUNET_YES !=
1336       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1337                                                 &peer_ctx->peer_id))
1338   {
1339     LOG (GNUNET_ERROR_TYPE_WARNING,
1340          "removing peer from peer_ctx->sub->peer_map failed\n");
1341   }
1342   if (peer_ctx->sub == msub)
1343   {
1344     GNUNET_STATISTICS_set (stats,
1345                           "# known peers",
1346                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1347                           GNUNET_NO);
1348   }
1349   GNUNET_free (peer_ctx);
1350   return GNUNET_YES;
1351 }
1352
1353
1354 /**
1355  * Iterator over hash map entries. Deletes all contexts of peers.
1356  *
1357  * @param cls closure
1358  * @param key current public key
1359  * @param value value in the hash map
1360  * @return #GNUNET_YES if we should continue to iterate,
1361  *         #GNUNET_NO if not.
1362  */
1363 static int
1364 peermap_clear_iterator (void *cls,
1365                         const struct GNUNET_PeerIdentity *key,
1366                         void *value)
1367 {
1368   struct Sub *sub = cls;
1369   (void) value;
1370
1371   destroy_peer (get_peer_ctx (sub->peer_map, key));
1372   return GNUNET_YES;
1373 }
1374
1375
1376 /**
1377  * @brief This is called once a message is sent.
1378  *
1379  * Removes the pending message
1380  *
1381  * @param cls type of the message that was sent
1382  */
1383 static void
1384 mq_notify_sent_cb (void *cls)
1385 {
1386   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1387   LOG (GNUNET_ERROR_TYPE_DEBUG,
1388       "%s was sent.\n",
1389       pending_msg->type);
1390   if (pending_msg->peer_ctx->sub == msub)
1391   {
1392     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1393       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1394     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1395       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1396     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1397       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1398     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1399                       NULL != map_single_hop &&
1400         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1401           &pending_msg->peer_ctx->peer_id))
1402       GNUNET_STATISTICS_update(stats,
1403                                "# pull requests sent (multi-hop peer)",
1404                                1,
1405                                GNUNET_NO);
1406   }
1407   /* Do not cancle message */
1408   remove_pending_message (pending_msg, GNUNET_NO);
1409 }
1410
1411
1412 /**
1413  * @brief Iterator function for #store_valid_peers.
1414  *
1415  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1416  * Writes single peer to disk.
1417  *
1418  * @param cls the file handle to write to.
1419  * @param peer current peer
1420  * @param value unused
1421  *
1422  * @return  #GNUNET_YES if we should continue to
1423  *          iterate,
1424  *          #GNUNET_NO if not.
1425  */
1426 static int
1427 store_peer_presistently_iterator (void *cls,
1428                                   const struct GNUNET_PeerIdentity *peer,
1429                                   void *value)
1430 {
1431   const struct GNUNET_DISK_FileHandle *fh = cls;
1432   char peer_string[128];
1433   int size;
1434   ssize_t ret;
1435   (void) value;
1436
1437   if (NULL == peer)
1438   {
1439     return GNUNET_YES;
1440   }
1441   size = GNUNET_snprintf (peer_string,
1442                           sizeof (peer_string),
1443                           "%s\n",
1444                           GNUNET_i2s_full (peer));
1445   GNUNET_assert (53 == size);
1446   ret = GNUNET_DISK_file_write (fh,
1447                                 peer_string,
1448                                 size);
1449   GNUNET_assert (size == ret);
1450   return GNUNET_YES;
1451 }
1452
1453
1454 /**
1455  * @brief Store the peers currently in #valid_peers to disk.
1456  *
1457  * @param sub Sub for which to store the valid peers
1458  */
1459 static void
1460 store_valid_peers (const struct Sub *sub)
1461 {
1462   struct GNUNET_DISK_FileHandle *fh;
1463   uint32_t number_written_peers;
1464   int ret;
1465
1466   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1467   {
1468     return;
1469   }
1470
1471   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1472   if (GNUNET_SYSERR == ret)
1473   {
1474     LOG (GNUNET_ERROR_TYPE_WARNING,
1475         "Not able to create directory for file `%s'\n",
1476         sub->filename_valid_peers);
1477     GNUNET_break (0);
1478   }
1479   else if (GNUNET_NO == ret)
1480   {
1481     LOG (GNUNET_ERROR_TYPE_WARNING,
1482         "Directory for file `%s' exists but is not writable for us\n",
1483         sub->filename_valid_peers);
1484     GNUNET_break (0);
1485   }
1486   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1487                               GNUNET_DISK_OPEN_WRITE |
1488                                   GNUNET_DISK_OPEN_CREATE,
1489                               GNUNET_DISK_PERM_USER_READ |
1490                                   GNUNET_DISK_PERM_USER_WRITE);
1491   if (NULL == fh)
1492   {
1493     LOG (GNUNET_ERROR_TYPE_WARNING,
1494         "Not able to write valid peers to file `%s'\n",
1495         sub->filename_valid_peers);
1496     return;
1497   }
1498   LOG (GNUNET_ERROR_TYPE_DEBUG,
1499       "Writing %u valid peers to disk\n",
1500       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1501   number_written_peers =
1502     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1503                                            store_peer_presistently_iterator,
1504                                            fh);
1505   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1506   GNUNET_assert (number_written_peers ==
1507       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1508 }
1509
1510
1511 /**
1512  * @brief Convert string representation of peer id to peer id.
1513  *
1514  * Counterpart to #GNUNET_i2s_full.
1515  *
1516  * @param string_repr The string representation of the peer id
1517  *
1518  * @return The peer id
1519  */
1520 static const struct GNUNET_PeerIdentity *
1521 s2i_full (const char *string_repr)
1522 {
1523   struct GNUNET_PeerIdentity *peer;
1524   size_t len;
1525   int ret;
1526
1527   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1528   len = strlen (string_repr);
1529   if (52 > len)
1530   {
1531     LOG (GNUNET_ERROR_TYPE_WARNING,
1532         "Not able to convert string representation of PeerID to PeerID\n"
1533         "Sting representation: %s (len %lu) - too short\n",
1534         string_repr,
1535         len);
1536     GNUNET_break (0);
1537   }
1538   else if (52 < len)
1539   {
1540     len = 52;
1541   }
1542   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1543                                                     len,
1544                                                     &peer->public_key);
1545   if (GNUNET_OK != ret)
1546   {
1547     LOG (GNUNET_ERROR_TYPE_WARNING,
1548         "Not able to convert string representation of PeerID to PeerID\n"
1549         "Sting representation: %s\n",
1550         string_repr);
1551     GNUNET_break (0);
1552   }
1553   return peer;
1554 }
1555
1556
1557 /**
1558  * @brief Restore the peers on disk to #valid_peers.
1559  *
1560  * @param sub Sub for which to restore the valid peers
1561  */
1562 static void
1563 restore_valid_peers (const struct Sub *sub)
1564 {
1565   off_t file_size;
1566   uint32_t num_peers;
1567   struct GNUNET_DISK_FileHandle *fh;
1568   char *buf;
1569   ssize_t size_read;
1570   char *iter_buf;
1571   char *str_repr;
1572   const struct GNUNET_PeerIdentity *peer;
1573
1574   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1575   {
1576     return;
1577   }
1578
1579   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1580   {
1581     return;
1582   }
1583   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1584                               GNUNET_DISK_OPEN_READ,
1585                               GNUNET_DISK_PERM_NONE);
1586   GNUNET_assert (NULL != fh);
1587   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1588   num_peers = file_size / 53;
1589   buf = GNUNET_malloc (file_size);
1590   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1591   GNUNET_assert (size_read == file_size);
1592   LOG (GNUNET_ERROR_TYPE_DEBUG,
1593       "Restoring %" PRIu32 " peers from file `%s'\n",
1594       num_peers,
1595       sub->filename_valid_peers);
1596   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1597   {
1598     str_repr = GNUNET_strndup (iter_buf, 53);
1599     peer = s2i_full (str_repr);
1600     GNUNET_free (str_repr);
1601     add_valid_peer (peer, sub->valid_peers);
1602     LOG (GNUNET_ERROR_TYPE_DEBUG,
1603         "Restored valid peer %s from disk\n",
1604         GNUNET_i2s_full (peer));
1605   }
1606   iter_buf = NULL;
1607   GNUNET_free (buf);
1608   LOG (GNUNET_ERROR_TYPE_DEBUG,
1609       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1610       num_peers,
1611       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1612   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1613   {
1614     LOG (GNUNET_ERROR_TYPE_WARNING,
1615         "Number of restored peers does not match file size. Have probably duplicates.\n");
1616   }
1617   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1618   LOG (GNUNET_ERROR_TYPE_DEBUG,
1619       "Restored %u valid peers from disk\n",
1620       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1621 }
1622
1623
1624 /**
1625  * @brief Delete storage of peers that was created with #initialise_peers ()
1626  *
1627  * @param sub Sub for which the storage is deleted
1628  */
1629 static void
1630 peers_terminate (struct Sub *sub)
1631 {
1632   if (GNUNET_SYSERR ==
1633       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1634                                              &peermap_clear_iterator,
1635                                              sub))
1636   {
1637     LOG (GNUNET_ERROR_TYPE_WARNING,
1638         "Iteration destroying peers was aborted.\n");
1639   }
1640   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1641   sub->peer_map = NULL;
1642   store_valid_peers (sub);
1643   GNUNET_free (sub->filename_valid_peers);
1644   sub->filename_valid_peers = NULL;
1645   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1646   sub->valid_peers = NULL;
1647 }
1648
1649
1650 /**
1651  * Iterator over #valid_peers hash map entries.
1652  *
1653  * @param cls Closure that contains iterator function and closure
1654  * @param peer current peer id
1655  * @param value value in the hash map - unused
1656  * @return #GNUNET_YES if we should continue to
1657  *         iterate,
1658  *         #GNUNET_NO if not.
1659  */
1660 static int
1661 valid_peer_iterator (void *cls,
1662                      const struct GNUNET_PeerIdentity *peer,
1663                      void *value)
1664 {
1665   struct PeersIteratorCls *it_cls = cls;
1666   (void) value;
1667
1668   return it_cls->iterator (it_cls->cls, peer);
1669 }
1670
1671
1672 /**
1673  * @brief Get all currently known, valid peer ids.
1674  *
1675  * @param valid_peers Peer map containing the valid peers in question
1676  * @param iterator function to call on each peer id
1677  * @param it_cls extra argument to @a iterator
1678  * @return the number of key value pairs processed,
1679  *         #GNUNET_SYSERR if it aborted iteration
1680  */
1681 static int
1682 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1683                  PeersIterator iterator,
1684                  void *it_cls)
1685 {
1686   struct PeersIteratorCls *cls;
1687   int ret;
1688
1689   cls = GNUNET_new (struct PeersIteratorCls);
1690   cls->iterator = iterator;
1691   cls->cls = it_cls;
1692   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1693                                                valid_peer_iterator,
1694                                                cls);
1695   GNUNET_free (cls);
1696   return ret;
1697 }
1698
1699
1700 /**
1701  * @brief Add peer to known peers.
1702  *
1703  * This function is called on new peer_ids from 'external' sources
1704  * (client seed, cadet get_peers(), ...)
1705  *
1706  * @param sub Sub with the peer map that the @a peer will be added to
1707  * @param peer the new #GNUNET_PeerIdentity
1708  *
1709  * @return #GNUNET_YES if peer was inserted
1710  *         #GNUNET_NO  otherwise
1711  */
1712 static int
1713 insert_peer (struct Sub *sub,
1714              const struct GNUNET_PeerIdentity *peer)
1715 {
1716   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1717   {
1718     return GNUNET_NO; /* We already know this peer - nothing to do */
1719   }
1720   (void) create_peer_ctx (sub, peer);
1721   return GNUNET_YES;
1722 }
1723
1724
1725 /**
1726  * @brief Check whether flags on a peer are set.
1727  *
1728  * @param peer_map Peer map that is expected to contain the @a peer
1729  * @param peer the peer to check the flag of
1730  * @param flags the flags to check
1731  *
1732  * @return #GNUNET_SYSERR if peer is not known
1733  *         #GNUNET_YES    if all given flags are set
1734  *         #GNUNET_NO     otherwise
1735  */
1736 static int
1737 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1738                  const struct GNUNET_PeerIdentity *peer,
1739                  enum Peers_PeerFlags flags)
1740 {
1741   struct PeerContext *peer_ctx;
1742
1743   if (GNUNET_NO == check_peer_known (peer_map, peer))
1744   {
1745     return GNUNET_SYSERR;
1746   }
1747   peer_ctx = get_peer_ctx (peer_map, peer);
1748   return check_peer_flag_set (peer_ctx, flags);
1749 }
1750
1751 /**
1752  * @brief Try connecting to a peer to see whether it is online
1753  *
1754  * If not known yet, insert into known peers
1755  *
1756  * @param sub Sub which would contain the @a peer
1757  * @param peer the peer whose online is to be checked
1758  * @return #GNUNET_YES if the check was issued
1759  *         #GNUNET_NO  otherwise
1760  */
1761 static int
1762 issue_peer_online_check (struct Sub *sub,
1763                          const struct GNUNET_PeerIdentity *peer)
1764 {
1765   struct PeerContext *peer_ctx;
1766
1767   (void) insert_peer (sub, peer); // TODO even needed?
1768   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1769   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1770        (NULL == peer_ctx->online_check_pending) )
1771   {
1772     check_peer_online (peer_ctx);
1773     return GNUNET_YES;
1774   }
1775   return GNUNET_NO;
1776 }
1777
1778
1779 /**
1780  * @brief Check if peer is removable.
1781  *
1782  * Check if
1783  *  - a recv channel exists
1784  *  - there are pending messages
1785  *  - there is no pending pull reply
1786  *
1787  * @param peer_ctx Context of the peer in question
1788  * @return #GNUNET_YES    if peer is removable
1789  *         #GNUNET_NO     if peer is NOT removable
1790  *         #GNUNET_SYSERR if peer is not known
1791  */
1792 static int
1793 check_removable (const struct PeerContext *peer_ctx)
1794 {
1795   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1796                                                            &peer_ctx->peer_id))
1797   {
1798     return GNUNET_SYSERR;
1799   }
1800
1801   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1802        (NULL != peer_ctx->pending_messages_head) ||
1803        (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1804   {
1805     return GNUNET_NO;
1806   }
1807   return GNUNET_YES;
1808 }
1809
1810
1811 /**
1812  * @brief Check whether @a peer is actually a peer.
1813  *
1814  * A valid peer is a peer that we know exists eg. we were connected to once.
1815  *
1816  * @param valid_peers Peer map that would contain the @a peer
1817  * @param peer peer in question
1818  *
1819  * @return #GNUNET_YES if peer is valid
1820  *         #GNUNET_NO  if peer is not valid
1821  */
1822 static int
1823 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1824                   const struct GNUNET_PeerIdentity *peer)
1825 {
1826   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1827 }
1828
1829
1830 /**
1831  * @brief Indicate that we want to send to the other peer
1832  *
1833  * This establishes a sending channel
1834  *
1835  * @param peer_ctx Context of the target peer
1836  */
1837 static void
1838 indicate_sending_intention (struct PeerContext *peer_ctx)
1839 {
1840   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1841                                                  &peer_ctx->peer_id));
1842   (void) get_channel (peer_ctx);
1843 }
1844
1845
1846 /**
1847  * @brief Check whether other peer has the intention to send/opened channel
1848  *        towars us
1849  *
1850  * @param peer_ctx Context of the peer in question
1851  *
1852  * @return #GNUNET_YES if peer has the intention to send
1853  *         #GNUNET_NO  otherwise
1854  */
1855 static int
1856 check_peer_send_intention (const struct PeerContext *peer_ctx)
1857 {
1858   if (NULL != peer_ctx->recv_channel_ctx)
1859   {
1860     return GNUNET_YES;
1861   }
1862   return GNUNET_NO;
1863 }
1864
1865
1866 /**
1867  * Handle the channel a peer opens to us.
1868  *
1869  * @param cls The closure - Sub
1870  * @param channel The channel the peer wants to establish
1871  * @param initiator The peer's peer ID
1872  *
1873  * @return initial channel context for the channel
1874  *         (can be NULL -- that's not an error)
1875  */
1876 static void *
1877 handle_inbound_channel (void *cls,
1878                         struct GNUNET_CADET_Channel *channel,
1879                         const struct GNUNET_PeerIdentity *initiator)
1880 {
1881   struct PeerContext *peer_ctx;
1882   struct ChannelCtx *channel_ctx;
1883   struct Sub *sub = cls;
1884
1885   LOG (GNUNET_ERROR_TYPE_DEBUG,
1886       "New channel was established to us (Peer %s).\n",
1887       GNUNET_i2s (initiator));
1888   GNUNET_assert (NULL != channel); /* according to cadet API */
1889   /* Make sure we 'know' about this peer */
1890   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1891   set_peer_online (peer_ctx);
1892   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1893   channel_ctx = add_channel_ctx (peer_ctx);
1894   channel_ctx->channel = channel;
1895   /* We only accept one incoming channel per peer */
1896   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1897                                                              initiator)))
1898   {
1899     LOG (GNUNET_ERROR_TYPE_WARNING,
1900         "Already got one receive channel. Destroying old one.\n");
1901     GNUNET_break_op (0);
1902     destroy_channel (peer_ctx->recv_channel_ctx);
1903     peer_ctx->recv_channel_ctx = channel_ctx;
1904     /* return the channel context */
1905     return channel_ctx;
1906   }
1907   peer_ctx->recv_channel_ctx = channel_ctx;
1908   return channel_ctx;
1909 }
1910
1911
1912 /**
1913  * @brief Check whether a sending channel towards the given peer exists
1914  *
1915  * @param peer_ctx Context of the peer in question
1916  *
1917  * @return #GNUNET_YES if a sending channel towards that peer exists
1918  *         #GNUNET_NO  otherwise
1919  */
1920 static int
1921 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1922 {
1923   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1924                                      &peer_ctx->peer_id))
1925   { /* If no such peer exists, there is no channel */
1926     return GNUNET_NO;
1927   }
1928   if (NULL == peer_ctx->send_channel_ctx)
1929   {
1930     return GNUNET_NO;
1931   }
1932   return GNUNET_YES;
1933 }
1934
1935
1936 /**
1937  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1938  *        intention to another peer
1939  *
1940  * @param peer_ctx Context to the peer
1941  * @return #GNUNET_YES if channel was destroyed
1942  *         #GNUNET_NO  otherwise
1943  */
1944 static int
1945 destroy_sending_channel (struct PeerContext *peer_ctx)
1946 {
1947   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1948                                      &peer_ctx->peer_id))
1949   {
1950     return GNUNET_NO;
1951   }
1952   if (NULL != peer_ctx->send_channel_ctx)
1953   {
1954     destroy_channel (peer_ctx->send_channel_ctx);
1955     (void) check_connected (peer_ctx);
1956     return GNUNET_YES;
1957   }
1958   return GNUNET_NO;
1959 }
1960
1961 /**
1962  * @brief Send a message to another peer.
1963  *
1964  * Keeps track about pending messages so they can be properly removed when the
1965  * peer is destroyed.
1966  *
1967  * @param peer_ctx Context of the peer to which the message is to be sent
1968  * @param ev envelope of the message
1969  * @param type type of the message
1970  */
1971 static void
1972 send_message (struct PeerContext *peer_ctx,
1973               struct GNUNET_MQ_Envelope *ev,
1974               const char *type)
1975 {
1976   struct PendingMessage *pending_msg;
1977   struct GNUNET_MQ_Handle *mq;
1978
1979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1980               "Sending message to %s of type %s\n",
1981               GNUNET_i2s (&peer_ctx->peer_id),
1982               type);
1983   pending_msg = insert_pending_message (peer_ctx, ev, type);
1984   mq = get_mq (peer_ctx);
1985   GNUNET_MQ_notify_sent (ev,
1986                          mq_notify_sent_cb,
1987                          pending_msg);
1988   GNUNET_MQ_send (mq, ev);
1989 }
1990
1991 /**
1992  * @brief Schedule a operation on given peer
1993  *
1994  * Avoids scheduling an operation twice.
1995  *
1996  * @param peer_ctx Context of the peer for which to schedule the operation
1997  * @param peer_op the operation to schedule
1998  * @param cls Closure to @a peer_op
1999  *
2000  * @return #GNUNET_YES if the operation was scheduled
2001  *         #GNUNET_NO  otherwise
2002  */
2003 static int
2004 schedule_operation (struct PeerContext *peer_ctx,
2005                     const PeerOp peer_op,
2006                     void *cls)
2007 {
2008   struct PeerPendingOp pending_op;
2009
2010   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2011                                                  &peer_ctx->peer_id));
2012
2013   //TODO if ONLINE execute immediately
2014
2015   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2016   {
2017     pending_op.op = peer_op;
2018     pending_op.op_cls = cls;
2019     GNUNET_array_append (peer_ctx->pending_ops,
2020                          peer_ctx->num_pending_ops,
2021                          pending_op);
2022     return GNUNET_YES;
2023   }
2024   return GNUNET_NO;
2025 }
2026
2027 /***********************************************************************
2028  * /Old gnunet-service-rps_peers.c
2029 ***********************************************************************/
2030
2031
2032 /***********************************************************************
2033  * Housekeeping with clients
2034 ***********************************************************************/
2035
2036 /**
2037  * Closure used to pass the client and the id to the callback
2038  * that replies to a client's request
2039  */
2040 struct ReplyCls
2041 {
2042   /**
2043    * DLL
2044    */
2045   struct ReplyCls *next;
2046   struct ReplyCls *prev;
2047
2048   /**
2049    * The identifier of the request
2050    */
2051   uint32_t id;
2052
2053   /**
2054    * The handle to the request
2055    */
2056   struct RPS_SamplerRequestHandle *req_handle;
2057
2058   /**
2059    * The client handle to send the reply to
2060    */
2061   struct ClientContext *cli_ctx;
2062 };
2063
2064
2065 /**
2066  * Struct used to store the context of a connected client.
2067  */
2068 struct ClientContext
2069 {
2070   /**
2071    * DLL
2072    */
2073   struct ClientContext *next;
2074   struct ClientContext *prev;
2075
2076   /**
2077    * The message queue to communicate with the client.
2078    */
2079   struct GNUNET_MQ_Handle *mq;
2080
2081   /**
2082    * @brief How many updates this client expects to receive.
2083    */
2084   int64_t view_updates_left;
2085
2086   /**
2087    * @brief Whether this client wants to receive stream updates.
2088    * Either #GNUNET_YES or #GNUNET_NO
2089    */
2090   int8_t stream_update;
2091
2092   /**
2093    * The client handle to send the reply to
2094    */
2095   struct GNUNET_SERVICE_Client *client;
2096
2097   /**
2098    * The #Sub this context belongs to
2099    */
2100   struct Sub *sub;
2101 };
2102
2103 /**
2104  * DLL with all clients currently connected to us
2105  */
2106 struct ClientContext *cli_ctx_head;
2107 struct ClientContext *cli_ctx_tail;
2108
2109 /***********************************************************************
2110  * /Housekeeping with clients
2111 ***********************************************************************/
2112
2113
2114
2115
2116
2117 /***********************************************************************
2118  * Util functions
2119 ***********************************************************************/
2120
2121
2122 /**
2123  * Print peerlist to log.
2124  */
2125 static void
2126 print_peer_list (struct GNUNET_PeerIdentity *list,
2127                  unsigned int len)
2128 {
2129   unsigned int i;
2130
2131   LOG (GNUNET_ERROR_TYPE_DEBUG,
2132        "Printing peer list of length %u at %p:\n",
2133        len,
2134        list);
2135   for (i = 0 ; i < len ; i++)
2136   {
2137     LOG (GNUNET_ERROR_TYPE_DEBUG,
2138          "%u. peer: %s\n",
2139          i, GNUNET_i2s (&list[i]));
2140   }
2141 }
2142
2143
2144 /**
2145  * Remove peer from list.
2146  */
2147 static void
2148 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2149                unsigned int *list_size,
2150                const struct GNUNET_PeerIdentity *peer)
2151 {
2152   unsigned int i;
2153   struct GNUNET_PeerIdentity *tmp;
2154
2155   tmp = *peer_list;
2156
2157   LOG (GNUNET_ERROR_TYPE_DEBUG,
2158        "Removing peer %s from list at %p\n",
2159        GNUNET_i2s (peer),
2160        tmp);
2161
2162   for ( i = 0 ; i < *list_size ; i++ )
2163   {
2164     if (0 == GNUNET_memcmp (&tmp[i], peer))
2165     {
2166       if (i < *list_size -1)
2167       { /* Not at the last entry -- shift peers left */
2168         memmove (&tmp[i], &tmp[i +1],
2169                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2170       }
2171       /* Remove last entry (should be now useless PeerID) */
2172       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2173     }
2174   }
2175   *peer_list = tmp;
2176 }
2177
2178
2179 /**
2180  * Insert PeerID in #view
2181  *
2182  * Called once we know a peer is online.
2183  * Implements #PeerOp
2184  *
2185  * @return GNUNET_OK if peer was actually inserted
2186  *         GNUNET_NO if peer was not inserted
2187  */
2188 static void
2189 insert_in_view_op (void *cls,
2190                    const struct GNUNET_PeerIdentity *peer);
2191
2192 /**
2193  * Insert PeerID in #view
2194  *
2195  * Called once we know a peer is online.
2196  *
2197  * @param sub Sub in with the view to insert in
2198  * @param peer the peer to insert
2199  *
2200  * @return GNUNET_OK if peer was actually inserted
2201  *         GNUNET_NO if peer was not inserted
2202  */
2203 static int
2204 insert_in_view (struct Sub *sub,
2205                 const struct GNUNET_PeerIdentity *peer)
2206 {
2207   struct PeerContext *peer_ctx;
2208   int online;
2209   int ret;
2210
2211   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2212   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2213   if ( (GNUNET_NO == online) ||
2214        (GNUNET_SYSERR == online) ) /* peer is not even known */
2215   {
2216     (void) issue_peer_online_check (sub, peer);
2217     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2218     return GNUNET_NO;
2219   }
2220   /* Open channel towards peer to keep connection open */
2221   indicate_sending_intention (peer_ctx);
2222   ret = View_put (sub->view, peer);
2223   if (peer_ctx->sub == msub)
2224   {
2225     GNUNET_STATISTICS_set (stats,
2226                            "view size",
2227                            View_size (peer_ctx->sub->view),
2228                            GNUNET_NO);
2229   }
2230   return ret;
2231 }
2232
2233
2234 /**
2235  * @brief Send view to client
2236  *
2237  * @param cli_ctx the context of the client
2238  * @param view_array the peerids of the view as array (can be empty)
2239  * @param view_size the size of the view array (can be 0)
2240  */
2241 static void
2242 send_view (const struct ClientContext *cli_ctx,
2243            const struct GNUNET_PeerIdentity *view_array,
2244            uint64_t view_size)
2245 {
2246   struct GNUNET_MQ_Envelope *ev;
2247   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2248   struct Sub *sub;
2249
2250   if (NULL == view_array)
2251   {
2252     if (NULL == cli_ctx->sub) sub = msub;
2253     else sub = cli_ctx->sub;
2254     view_size = View_size (sub->view);
2255     view_array = View_get_as_array (sub->view);
2256   }
2257
2258   ev = GNUNET_MQ_msg_extra (out_msg,
2259                             view_size * sizeof (struct GNUNET_PeerIdentity),
2260                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2261   out_msg->num_peers = htonl (view_size);
2262
2263   GNUNET_memcpy (&out_msg[1],
2264                  view_array,
2265                  view_size * sizeof (struct GNUNET_PeerIdentity));
2266   GNUNET_MQ_send (cli_ctx->mq, ev);
2267 }
2268
2269
2270 /**
2271  * @brief Send peer from biased stream to client.
2272  *
2273  * TODO merge with send_view, parameterise
2274  *
2275  * @param cli_ctx the context of the client
2276  * @param view_array the peerids of the view as array (can be empty)
2277  * @param view_size the size of the view array (can be 0)
2278  */
2279 static void
2280 send_stream_peers (const struct ClientContext *cli_ctx,
2281                    uint64_t num_peers,
2282                    const struct GNUNET_PeerIdentity *peers)
2283 {
2284   struct GNUNET_MQ_Envelope *ev;
2285   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2286
2287   GNUNET_assert (NULL != peers);
2288
2289   ev = GNUNET_MQ_msg_extra (out_msg,
2290                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2291                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2292   out_msg->num_peers = htonl (num_peers);
2293
2294   GNUNET_memcpy (&out_msg[1],
2295                  peers,
2296                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2297   GNUNET_MQ_send (cli_ctx->mq, ev);
2298 }
2299
2300
2301 /**
2302  * @brief sends updates to clients that are interested
2303  *
2304  * @param sub Sub for which to notify clients
2305  */
2306 static void
2307 clients_notify_view_update (const struct Sub *sub)
2308 {
2309   struct ClientContext *cli_ctx_iter;
2310   uint64_t num_peers;
2311   const struct GNUNET_PeerIdentity *view_array;
2312
2313   num_peers = View_size (sub->view);
2314   view_array = View_get_as_array(sub->view);
2315   /* check size of view is small enough */
2316   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2317   {
2318     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2319                 "View is too big to send\n");
2320     return;
2321   }
2322
2323   for (cli_ctx_iter = cli_ctx_head;
2324        NULL != cli_ctx_iter;
2325        cli_ctx_iter = cli_ctx_iter->next)
2326   {
2327     if (1 < cli_ctx_iter->view_updates_left)
2328     {
2329       /* Client wants to receive limited amount of updates */
2330       cli_ctx_iter->view_updates_left -= 1;
2331     } else if (1 == cli_ctx_iter->view_updates_left)
2332     {
2333       /* Last update of view for client */
2334       cli_ctx_iter->view_updates_left = -1;
2335     } else if (0 > cli_ctx_iter->view_updates_left) {
2336       /* Client is not interested in updates */
2337       continue;
2338     }
2339     /* else _updates_left == 0 - infinite amount of updates */
2340
2341     /* send view */
2342     send_view (cli_ctx_iter, view_array, num_peers);
2343   }
2344 }
2345
2346
2347 /**
2348  * @brief sends updates to clients that are interested
2349  *
2350  * @param num_peers Number of peers to send
2351  * @param peers the array of peers to send
2352  */
2353 static void
2354 clients_notify_stream_peer (const struct Sub *sub,
2355                             uint64_t num_peers,
2356                             const struct GNUNET_PeerIdentity *peers)
2357                             // TODO enum StreamPeerSource)
2358 {
2359   struct ClientContext *cli_ctx_iter;
2360
2361   LOG (GNUNET_ERROR_TYPE_DEBUG,
2362       "Got peer (%s) from biased stream - update all clients\n",
2363       GNUNET_i2s (peers));
2364
2365   for (cli_ctx_iter = cli_ctx_head;
2366        NULL != cli_ctx_iter;
2367        cli_ctx_iter = cli_ctx_iter->next)
2368   {
2369     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2370         (sub == cli_ctx_iter->sub || sub == msub))
2371     {
2372       send_stream_peers (cli_ctx_iter, num_peers, peers);
2373     }
2374   }
2375 }
2376
2377
2378 /**
2379  * Put random peer from sampler into the view as history update.
2380  *
2381  * @param ids Array of Peers to insert into view
2382  * @param num_peers Number of peers to insert
2383  * @param cls Closure - The Sub for which this is to be done
2384  */
2385 static void
2386 hist_update (const struct GNUNET_PeerIdentity *ids,
2387              uint32_t num_peers,
2388              void *cls)
2389 {
2390   unsigned int i;
2391   struct Sub *sub = cls;
2392
2393   for (i = 0; i < num_peers; i++)
2394   {
2395     int inserted;
2396     if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2397     {
2398       LOG (GNUNET_ERROR_TYPE_WARNING,
2399            "Peer in history update not known!\n");
2400       continue;
2401     }
2402     inserted = insert_in_view (sub, &ids[i]);
2403     if (GNUNET_OK == inserted)
2404     {
2405       clients_notify_stream_peer (sub, 1, &ids[i]);
2406     }
2407 #ifdef TO_FILE_FULL
2408     to_file (sub->file_name_view_log,
2409              "+%s\t(hist)",
2410              GNUNET_i2s_full (ids));
2411 #endif /* TO_FILE_FULL */
2412   }
2413   clients_notify_view_update (sub);
2414 }
2415
2416
2417 /**
2418  * Wrapper around #RPS_sampler_resize()
2419  *
2420  * If we do not have enough sampler elements, double current sampler size
2421  * If we have more than enough sampler elements, halv current sampler size
2422  *
2423  * @param sampler The sampler to resize
2424  * @param new_size New size to which to resize
2425  */
2426 static void
2427 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2428 {
2429   unsigned int sampler_size;
2430
2431   // TODO statistics
2432   // TODO respect the min, max
2433   sampler_size = RPS_sampler_get_size (sampler);
2434   if (sampler_size > new_size * 4)
2435   { /* Shrinking */
2436     RPS_sampler_resize (sampler, sampler_size / 2);
2437   }
2438   else if (sampler_size < new_size)
2439   { /* Growing */
2440     RPS_sampler_resize (sampler, sampler_size * 2);
2441   }
2442   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2443 }
2444
2445
2446 /**
2447  * Add all peers in @a peer_array to @a peer_map used as set.
2448  *
2449  * @param peer_array array containing the peers
2450  * @param num_peers number of peers in @peer_array
2451  * @param peer_map the peermap to use as set
2452  */
2453 static void
2454 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2455                        unsigned int num_peers,
2456                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2457 {
2458   unsigned int i;
2459   if (NULL == peer_map)
2460   {
2461     LOG (GNUNET_ERROR_TYPE_WARNING,
2462          "Trying to add peers to non-existing peermap.\n");
2463     return;
2464   }
2465
2466   for (i = 0; i < num_peers; i++)
2467   {
2468     GNUNET_CONTAINER_multipeermap_put (peer_map,
2469                                        &peer_array[i],
2470                                        NULL,
2471                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2472     if (msub->peer_map == peer_map)
2473     {
2474       GNUNET_STATISTICS_set (stats,
2475                             "# known peers",
2476                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2477                             GNUNET_NO);
2478     }
2479   }
2480 }
2481
2482
2483 /**
2484  * Send a PULL REPLY to @a peer_id
2485  *
2486  * @param peer_ctx Context of the peer to send the reply to
2487  * @param peer_ids the peers to send to @a peer_id
2488  * @param num_peer_ids the number of peers to send to @a peer_id
2489  */
2490 static void
2491 send_pull_reply (struct PeerContext *peer_ctx,
2492                  const struct GNUNET_PeerIdentity *peer_ids,
2493                  unsigned int num_peer_ids)
2494 {
2495   uint32_t send_size;
2496   struct GNUNET_MQ_Envelope *ev;
2497   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2498
2499   /* Compute actual size */
2500   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2501               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2502
2503   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2504     /* Compute number of peers to send
2505      * If too long, simply truncate */
2506     // TODO select random ones via permutation
2507     //      or even better: do good protocol design
2508     send_size =
2509       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2510        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2511        sizeof (struct GNUNET_PeerIdentity);
2512   else
2513     send_size = num_peer_ids;
2514
2515   LOG (GNUNET_ERROR_TYPE_DEBUG,
2516       "Going to send PULL REPLY with %u peers to %s\n",
2517       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2518
2519   ev = GNUNET_MQ_msg_extra (out_msg,
2520                             send_size * sizeof (struct GNUNET_PeerIdentity),
2521                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2522   out_msg->num_peers = htonl (send_size);
2523   GNUNET_memcpy (&out_msg[1], peer_ids,
2524          send_size * sizeof (struct GNUNET_PeerIdentity));
2525
2526   send_message (peer_ctx, ev, "PULL REPLY");
2527   if (peer_ctx->sub == msub)
2528   {
2529     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2530   }
2531   // TODO check with send intention: as send_channel is used/opened we indicate
2532   // a sending intention without intending it.
2533   // -> clean peer afterwards?
2534   // -> use recv_channel?
2535 }
2536
2537
2538 /**
2539  * Insert PeerID in #pull_map
2540  *
2541  * Called once we know a peer is online.
2542  *
2543  * @param cls Closure - Sub with the pull map to insert into
2544  * @param peer Peer to insert
2545  */
2546 static void
2547 insert_in_pull_map (void *cls,
2548                     const struct GNUNET_PeerIdentity *peer)
2549 {
2550   struct Sub *sub = cls;
2551
2552   CustomPeerMap_put (sub->pull_map, peer);
2553 }
2554
2555
2556 /**
2557  * Insert PeerID in #view
2558  *
2559  * Called once we know a peer is online.
2560  * Implements #PeerOp
2561  *
2562  * @param cls Closure - Sub with view to insert peer into
2563  * @param peer the peer to insert
2564  */
2565 static void
2566 insert_in_view_op (void *cls,
2567                    const struct GNUNET_PeerIdentity *peer)
2568 {
2569   struct Sub *sub = cls;
2570   int inserted;
2571
2572   inserted = insert_in_view (sub, peer);
2573   if (GNUNET_OK == inserted)
2574   {
2575     clients_notify_stream_peer (sub, 1, peer);
2576   }
2577 }
2578
2579
2580 /**
2581  * Update sampler with given PeerID.
2582  * Implements #PeerOp
2583  *
2584  * @param cls Closure - Sub containing the sampler to insert into
2585  * @param peer Peer to insert
2586  */
2587 static void
2588 insert_in_sampler (void *cls,
2589                    const struct GNUNET_PeerIdentity *peer)
2590 {
2591   struct Sub *sub = cls;
2592
2593   LOG (GNUNET_ERROR_TYPE_DEBUG,
2594        "Updating samplers with peer %s from insert_in_sampler()\n",
2595        GNUNET_i2s (peer));
2596   RPS_sampler_update (sub->sampler, peer);
2597   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2598   {
2599     /* Make sure we 'know' about this peer */
2600     (void) issue_peer_online_check (sub, peer);
2601     /* Establish a channel towards that peer to indicate we are going to send
2602      * messages to it */
2603     //indicate_sending_intention (peer);
2604   }
2605   if (sub == msub)
2606   {
2607     GNUNET_STATISTICS_update (stats,
2608                               "# observed peers in gossip",
2609                               1,
2610                               GNUNET_NO);
2611   }
2612 #ifdef TO_FILE
2613   sub->num_observed_peers++;
2614   GNUNET_CONTAINER_multipeermap_put
2615     (sub->observed_unique_peers,
2616      peer,
2617      NULL,
2618      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2619   uint32_t num_observed_unique_peers =
2620     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2621   GNUNET_STATISTICS_set (stats,
2622                          "# unique peers in gossip",
2623                          num_observed_unique_peers,
2624                          GNUNET_NO);
2625 #ifdef TO_FILE_FULL
2626   to_file (sub->file_name_observed_log,
2627           "%" PRIu32 " %" PRIu32 " %f\n",
2628           sub->num_observed_peers,
2629           num_observed_unique_peers,
2630           1.0*num_observed_unique_peers/sub->num_observed_peers)
2631 #endif /* TO_FILE_FULL */
2632 #endif /* TO_FILE */
2633 }
2634
2635
2636 /**
2637  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2638  *        If the peer is not known, online check is issued and it is
2639  *        scheduled to be inserted in sampler and view.
2640  *
2641  * "External sources" refer to every source except the gossip.
2642  *
2643  * @param sub Sub for which @a peer was received
2644  * @param peer peer to insert/peer received
2645  */
2646 static void
2647 got_peer (struct Sub *sub,
2648           const struct GNUNET_PeerIdentity *peer)
2649 {
2650   /* If we did not know this peer already, insert it into sampler and view */
2651   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2652   {
2653     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2654                         &insert_in_sampler, sub);
2655     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2656                         &insert_in_view_op, sub);
2657   }
2658   if (sub == msub)
2659   {
2660     GNUNET_STATISTICS_update (stats,
2661                               "# learnd peers",
2662                               1,
2663                               GNUNET_NO);
2664   }
2665 }
2666
2667
2668 /**
2669  * @brief Checks if there is a sending channel and if it is needed
2670  *
2671  * @param peer_ctx Context of the peer to check
2672  * @return GNUNET_YES if sending channel exists and is still needed
2673  *         GNUNET_NO  otherwise
2674  */
2675 static int
2676 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2677 {
2678   /* struct GNUNET_CADET_Channel *channel; */
2679   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2680                                      &peer_ctx->peer_id))
2681   {
2682     return GNUNET_NO;
2683   }
2684   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2685   {
2686     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2687                                     &peer_ctx->peer_id)) ||
2688          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2689                                             &peer_ctx->peer_id)) ||
2690          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2691                                                      &peer_ctx->peer_id)) ||
2692          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2693                                                      &peer_ctx->peer_id)) ||
2694          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2695                                          &peer_ctx->peer_id,
2696                                          Peers_PULL_REPLY_PENDING)))
2697     { /* If we want to keep the connection to peer open */
2698       return GNUNET_YES;
2699     }
2700     return GNUNET_NO;
2701   }
2702   return GNUNET_NO;
2703 }
2704
2705
2706 /**
2707  * @brief remove peer from our knowledge, the view, push and pull maps and
2708  * samplers.
2709  *
2710  * @param sub Sub with the data structures the peer is to be removed from
2711  * @param peer the peer to remove
2712  */
2713 static void
2714 remove_peer (struct Sub *sub,
2715              const struct GNUNET_PeerIdentity *peer)
2716 {
2717   (void) View_remove_peer (sub->view,
2718                            peer);
2719   CustomPeerMap_remove_peer (sub->pull_map,
2720                              peer);
2721   CustomPeerMap_remove_peer (sub->push_map,
2722                              peer);
2723   RPS_sampler_reinitialise_by_value (sub->sampler,
2724                                      peer);
2725   /* We want to destroy the peer now.
2726    * Sometimes, it just seems that it's already been removed from the peer_map,
2727    * so check the peer_map first. */
2728   if (GNUNET_YES == check_peer_known (sub->peer_map,
2729                                       peer))
2730   {
2731     destroy_peer (get_peer_ctx (sub->peer_map,
2732                                 peer));
2733   }
2734 }
2735
2736
2737 /**
2738  * @brief Remove data that is not needed anymore.
2739  *
2740  * If the sending channel is no longer needed it is destroyed.
2741  *
2742  * @param sub Sub in which the current peer is to be cleaned
2743  * @param peer the peer whose data is about to be cleaned
2744  */
2745 static void
2746 clean_peer (struct Sub *sub,
2747             const struct GNUNET_PeerIdentity *peer)
2748 {
2749   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2750                                                                peer)))
2751   {
2752     LOG (GNUNET_ERROR_TYPE_DEBUG,
2753         "Going to remove send channel to peer %s\n",
2754         GNUNET_i2s (peer));
2755     #if ENABLE_MALICIOUS
2756     if (0 != GNUNET_memcmp (&attacked_peer,
2757                                               peer))
2758       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2759                                                     peer));
2760     #else /* ENABLE_MALICIOUS */
2761     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2762                                                   peer));
2763     #endif /* ENABLE_MALICIOUS */
2764   }
2765
2766   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2767                                                            peer))
2768   {
2769     /* Peer was already removed by callback on destroyed channel */
2770     LOG (GNUNET_ERROR_TYPE_WARNING,
2771         "Peer was removed from our knowledge during cleanup\n");
2772     return;
2773   }
2774
2775   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2776                                                               peer))) &&
2777        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2778        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2779        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2780        (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2781        (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2782   { /* We can safely remove this peer */
2783     LOG (GNUNET_ERROR_TYPE_DEBUG,
2784         "Going to remove peer %s\n",
2785         GNUNET_i2s (peer));
2786     remove_peer (sub, peer);
2787     return;
2788   }
2789 }
2790
2791
2792 /**
2793  * @brief This is called when a channel is destroyed.
2794  *
2795  * Removes peer completely from our knowledge if the send_channel was destroyed
2796  * Otherwise simply delete the recv_channel
2797  * Also check if the knowledge about this peer is still needed.
2798  * If not, remove this peer from our knowledge.
2799  *
2800  * @param cls The closure - Context to the channel
2801  * @param channel The channel being closed
2802  */
2803 static void
2804 cleanup_destroyed_channel (void *cls,
2805                            const struct GNUNET_CADET_Channel *channel)
2806 {
2807   struct ChannelCtx *channel_ctx = cls;
2808   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2809   (void) channel;
2810
2811   channel_ctx->channel = NULL;
2812   remove_channel_ctx (channel_ctx);
2813   if (NULL != peer_ctx &&
2814       peer_ctx->send_channel_ctx == channel_ctx &&
2815       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2816   {
2817     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2818   }
2819 }
2820
2821 /***********************************************************************
2822  * /Util functions
2823 ***********************************************************************/
2824
2825
2826
2827 /***********************************************************************
2828  * Sub
2829 ***********************************************************************/
2830
2831 /**
2832  * @brief Create a new Sub
2833  *
2834  * @param hash Hash of value shared among rps instances on other hosts that
2835  *        defines a subgroup to sample from.
2836  * @param sampler_size Size of the sampler
2837  * @param round_interval Interval (in average) between two rounds
2838  *
2839  * @return Sub
2840  */
2841 struct Sub *
2842 new_sub (const struct GNUNET_HashCode *hash,
2843          uint32_t sampler_size,
2844          struct GNUNET_TIME_Relative round_interval)
2845 {
2846   struct Sub *sub;
2847
2848   sub = GNUNET_new (struct Sub);
2849
2850   /* With the hash generated from the secret value this service only connects
2851    * to rps instances that share the value */
2852   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2853     GNUNET_MQ_hd_fixed_size (peer_check,
2854                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2855                              struct GNUNET_MessageHeader,
2856                              NULL),
2857     GNUNET_MQ_hd_fixed_size (peer_push,
2858                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2859                              struct GNUNET_MessageHeader,
2860                              NULL),
2861     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2862                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2863                              struct GNUNET_MessageHeader,
2864                              NULL),
2865     GNUNET_MQ_hd_var_size (peer_pull_reply,
2866                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2867                            struct GNUNET_RPS_P2P_PullReplyMessage,
2868                            NULL),
2869     GNUNET_MQ_handler_end ()
2870   };
2871   sub->hash = *hash;
2872   sub->cadet_port =
2873     GNUNET_CADET_open_port (cadet_handle,
2874                             &sub->hash,
2875                             &handle_inbound_channel, /* Connect handler */
2876                             sub, /* cls */
2877                             NULL, /* WindowSize handler */
2878                             &cleanup_destroyed_channel, /* Disconnect handler */
2879                             cadet_handlers);
2880   if (NULL == sub->cadet_port)
2881   {
2882     LOG (GNUNET_ERROR_TYPE_ERROR,
2883         "Cadet port `%s' is already in use.\n",
2884         GNUNET_APPLICATION_PORT_RPS);
2885     GNUNET_assert (0);
2886   }
2887
2888   /* Set up general data structure to keep track about peers */
2889   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2890   if (GNUNET_OK !=
2891       GNUNET_CONFIGURATION_get_value_filename (cfg,
2892                                                "rps",
2893                                                "FILENAME_VALID_PEERS",
2894                                                &sub->filename_valid_peers))
2895   {
2896     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2897                                "rps",
2898                                "FILENAME_VALID_PEERS");
2899   }
2900   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2901   {
2902     char *tmp_filename_valid_peers;
2903     char str_hash[105];
2904
2905     GNUNET_snprintf (str_hash,
2906                      sizeof (str_hash),
2907                      GNUNET_h2s_full (hash));
2908     tmp_filename_valid_peers = sub->filename_valid_peers;
2909     GNUNET_asprintf (&sub->filename_valid_peers,
2910                      "%s%s",
2911                      tmp_filename_valid_peers,
2912                      str_hash);
2913     GNUNET_free (tmp_filename_valid_peers);
2914   }
2915   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2916
2917   /* Set up the sampler */
2918   sub->sampler_size_est_min = sampler_size;
2919   sub->sampler_size_est_need = sampler_size;;
2920   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2921   GNUNET_assert (0 != round_interval.rel_value_us);
2922   sub->round_interval = round_interval;
2923   sub->sampler = RPS_sampler_init (sampler_size,
2924                                   round_interval);
2925
2926   /* Logging of internals */
2927 #ifdef TO_FILE_FULL
2928   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2929 #endif /* TO_FILE_FULL */
2930 #ifdef TO_FILE
2931 #ifdef TO_FILE_FULL
2932   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2933                                                        "observed");
2934 #endif /* TO_FILE_FULL */
2935   sub->num_observed_peers = 0;
2936   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2937                                                                     GNUNET_NO);
2938 #endif /* TO_FILE */
2939
2940   /* Set up data structures for gossip */
2941   sub->push_map = CustomPeerMap_create (4);
2942   sub->pull_map = CustomPeerMap_create (4);
2943   sub->view_size_est_min = sampler_size;;
2944   sub->view = View_create (sub->view_size_est_min);
2945   if (sub == msub)
2946   {
2947     GNUNET_STATISTICS_set (stats,
2948                            "view size aim",
2949                            sub->view_size_est_min,
2950                            GNUNET_NO);
2951   }
2952
2953   /* Start executing rounds */
2954   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2955
2956   return sub;
2957 }
2958
2959
2960 #ifdef TO_FILE
2961 /**
2962  * @brief Write all numbers in the given array into the given file
2963  *
2964  * Single numbers devided by a newline
2965  *
2966  * @param hist_array[] the array to dump
2967  * @param file_name file to dump into
2968  */
2969 static void
2970 write_histogram_to_file (const uint32_t hist_array[],
2971                          const char *file_name)
2972 {
2973   char collect_str[SIZE_DUMP_FILE + 1] = "";
2974   char *recv_str_iter;
2975   char *file_name_full;
2976
2977   recv_str_iter = collect_str;
2978   file_name_full = store_prefix_file_name (&own_identity,
2979                                            file_name);
2980   for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2981   {
2982     char collect_str_tmp[8];
2983
2984     GNUNET_snprintf (collect_str_tmp,
2985                      sizeof (collect_str_tmp),
2986                      "%" PRIu32 "\n",
2987                      hist_array[i]);
2988     recv_str_iter = stpncpy (recv_str_iter,
2989                              collect_str_tmp,
2990                              6);
2991   }
2992   (void) stpcpy (recv_str_iter,
2993                  "\n");
2994   LOG (GNUNET_ERROR_TYPE_DEBUG,
2995        "Writing push stats to disk\n");
2996   to_file_w_len (file_name_full,
2997                  SIZE_DUMP_FILE,
2998                  collect_str);
2999   GNUNET_free (file_name_full);
3000 }
3001 #endif /* TO_FILE */
3002
3003
3004 /**
3005  * @brief Destroy Sub.
3006  *
3007  * @param sub Sub to destroy
3008  */
3009 static void
3010 destroy_sub (struct Sub *sub)
3011 {
3012   GNUNET_assert (NULL != sub);
3013   GNUNET_assert (NULL != sub->do_round_task);
3014   GNUNET_SCHEDULER_cancel (sub->do_round_task);
3015   sub->do_round_task = NULL;
3016
3017   /* Disconnect from cadet */
3018   GNUNET_CADET_close_port (sub->cadet_port);
3019   sub->cadet_port= NULL;
3020
3021   /* Clean up data structures for peers */
3022   RPS_sampler_destroy (sub->sampler);
3023   sub->sampler = NULL;
3024   View_destroy (sub->view);
3025   sub->view = NULL;
3026   CustomPeerMap_destroy (sub->push_map);
3027   sub->push_map = NULL;
3028   CustomPeerMap_destroy (sub->pull_map);
3029   sub->pull_map = NULL;
3030   peers_terminate (sub);
3031
3032   /* Free leftover data structures */
3033 #ifdef TO_FILE_FULL
3034   GNUNET_free (sub->file_name_view_log);
3035   sub->file_name_view_log = NULL;
3036 #endif /* TO_FILE_FULL */
3037 #ifdef TO_FILE
3038 #ifdef TO_FILE_FULL
3039   GNUNET_free (sub->file_name_observed_log);
3040   sub->file_name_observed_log = NULL;
3041 #endif /* TO_FILE_FULL */
3042
3043   /* Write push frequencies to disk */
3044   write_histogram_to_file (sub->push_recv,
3045                            "push_recv");
3046
3047   /* Write push deltas to disk */
3048   write_histogram_to_file (sub->push_delta,
3049                            "push_delta");
3050
3051   /* Write pull delays to disk */
3052   write_histogram_to_file (sub->pull_delays,
3053                            "pull_delays");
3054
3055   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3056   sub->observed_unique_peers = NULL;
3057 #endif /* TO_FILE */
3058
3059   GNUNET_free (sub);
3060 }
3061
3062
3063 /***********************************************************************
3064  * /Sub
3065 ***********************************************************************/
3066
3067
3068 /***********************************************************************
3069  * Core handlers
3070 ***********************************************************************/
3071
3072 /**
3073  * @brief Callback on initialisation of Core.
3074  *
3075  * @param cls - unused
3076  * @param my_identity - unused
3077  */
3078 void
3079 core_init (void *cls,
3080            const struct GNUNET_PeerIdentity *my_identity)
3081 {
3082   (void) cls;
3083   (void) my_identity;
3084
3085   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3086 }
3087
3088
3089 /**
3090  * @brief Callback for core.
3091  * Method called whenever a given peer connects.
3092  *
3093  * @param cls closure - unused
3094  * @param peer peer identity this notification is about
3095  * @return closure given to #core_disconnects as peer_cls
3096  */
3097 void *
3098 core_connects (void *cls,
3099                const struct GNUNET_PeerIdentity *peer,
3100                struct GNUNET_MQ_Handle *mq)
3101 {
3102   (void) cls;
3103   (void) mq;
3104
3105   GNUNET_assert (GNUNET_YES ==
3106                  GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3107                                                     peer,
3108                                                     NULL,
3109                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3110   return NULL;
3111 }
3112
3113
3114 /**
3115  * @brief Callback for core.
3116  * Method called whenever a peer disconnects.
3117  *
3118  * @param cls closure - unused
3119  * @param peer peer identity this notification is about
3120  * @param peer_cls closure given in #core_connects - unused
3121  */
3122 void
3123 core_disconnects (void *cls,
3124                   const struct GNUNET_PeerIdentity *peer,
3125                   void *peer_cls)
3126 {
3127   (void) cls;
3128   (void) peer_cls;
3129
3130   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3131 }
3132
3133 /***********************************************************************
3134  * /Core handlers
3135 ***********************************************************************/
3136
3137
3138 /**
3139  * @brief Destroy the context for a (connected) client
3140  *
3141  * @param cli_ctx Context to destroy
3142  */
3143 static void
3144 destroy_cli_ctx (struct ClientContext *cli_ctx)
3145 {
3146   GNUNET_assert (NULL != cli_ctx);
3147   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3148                                cli_ctx_tail,
3149                                cli_ctx);
3150   if (NULL != cli_ctx->sub)
3151   {
3152     destroy_sub (cli_ctx->sub);
3153     cli_ctx->sub = NULL;
3154   }
3155   GNUNET_free (cli_ctx);
3156 }
3157
3158
3159 /**
3160  * @brief Update sizes in sampler and view on estimate update from nse service
3161  *
3162  * @param sub Sub
3163  * @param logestimate the log(Base 2) value of the current network size estimate
3164  * @param std_dev standard deviation for the estimate
3165  */
3166 static void
3167 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3168 {
3169   double estimate;
3170   //double scale; // TODO this might go gloabal/config
3171
3172   LOG (GNUNET_ERROR_TYPE_DEBUG,
3173        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3174        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3175   //scale = .01;
3176   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3177   // GNUNET_NSE_log_estimate_to_n (logestimate);
3178   estimate = pow (estimate, 1.0 / 3);
3179   // TODO add if std_dev is a number
3180   // estimate += (std_dev * scale);
3181   if (sub->view_size_est_min < ceil (estimate))
3182   {
3183     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3184     sub->sampler_size_est_need = estimate;
3185     sub->view_size_est_need = estimate;
3186   } else
3187   {
3188     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3189     //sub->sampler_size_est_need = sub->view_size_est_min;
3190     sub->view_size_est_need = sub->view_size_est_min;
3191   }
3192   if (sub == msub)
3193   {
3194     GNUNET_STATISTICS_set (stats,
3195                            "view size aim",
3196                            sub->view_size_est_need,
3197                            GNUNET_NO);
3198   }
3199
3200   /* If the NSE has changed adapt the lists accordingly */
3201   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3202   View_change_len (sub->view, sub->view_size_est_need);
3203 }
3204
3205
3206 /**
3207  * Function called by NSE.
3208  *
3209  * Updates sizes of sampler list and view and adapt those lists
3210  * accordingly.
3211  *
3212  * implements #GNUNET_NSE_Callback
3213  *
3214  * @param cls Closure - unused
3215  * @param timestamp time when the estimate was received from the server (or created by the server)
3216  * @param logestimate the log(Base 2) value of the current network size estimate
3217  * @param std_dev standard deviation for the estimate
3218  */
3219 static void
3220 nse_callback (void *cls,
3221               struct GNUNET_TIME_Absolute timestamp,
3222               double logestimate, double std_dev)
3223 {
3224   (void) cls;
3225   (void) timestamp;
3226   struct ClientContext *cli_ctx_iter;
3227
3228   adapt_sizes (msub, logestimate, std_dev);
3229   for (cli_ctx_iter = cli_ctx_head;
3230       NULL != cli_ctx_iter;
3231       cli_ctx_iter = cli_ctx_iter->next)
3232   {
3233     if (NULL != cli_ctx_iter->sub)
3234     {
3235       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3236     }
3237   }
3238 }
3239
3240
3241 /**
3242  * @brief This function is called, when the client seeds peers.
3243  * It verifies that @a msg is well-formed.
3244  *
3245  * @param cls the closure (#ClientContext)
3246  * @param msg the message
3247  * @return #GNUNET_OK if @a msg is well-formed
3248  *         #GNUNET_SYSERR otherwise
3249  */
3250 static int
3251 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3252 {
3253   struct ClientContext *cli_ctx = cls;
3254   uint16_t msize = ntohs (msg->header.size);
3255   uint32_t num_peers = ntohl (msg->num_peers);
3256
3257   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3258   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3259        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3260   {
3261     LOG (GNUNET_ERROR_TYPE_ERROR,
3262         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3263         ntohl (msg->num_peers),
3264         (msize / sizeof (struct GNUNET_PeerIdentity)));
3265     GNUNET_break (0);
3266     GNUNET_SERVICE_client_drop (cli_ctx->client);
3267     return GNUNET_SYSERR;
3268   }
3269   return GNUNET_OK;
3270 }
3271
3272
3273 /**
3274  * Handle seed from the client.
3275  *
3276  * @param cls closure
3277  * @param message the actual message
3278  */
3279 static void
3280 handle_client_seed (void *cls,
3281                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3282 {
3283   struct ClientContext *cli_ctx = cls;
3284   struct GNUNET_PeerIdentity *peers;
3285   uint32_t num_peers;
3286   uint32_t i;
3287
3288   num_peers = ntohl (msg->num_peers);
3289   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3290
3291   LOG (GNUNET_ERROR_TYPE_DEBUG,
3292        "Client seeded peers:\n");
3293   print_peer_list (peers, num_peers);
3294
3295   for (i = 0; i < num_peers; i++)
3296   {
3297     LOG (GNUNET_ERROR_TYPE_DEBUG,
3298          "Updating samplers with seed %" PRIu32 ": %s\n",
3299          i,
3300          GNUNET_i2s (&peers[i]));
3301
3302     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3303     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3304   }
3305   GNUNET_SERVICE_client_continue (cli_ctx->client);
3306 }
3307
3308
3309 /**
3310  * Handle RPS request from the client.
3311  *
3312  * @param cls Client context
3313  * @param message Message containing the numer of updates the client wants to
3314  * receive
3315  */
3316 static void
3317 handle_client_view_request (void *cls,
3318                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3319 {
3320   struct ClientContext *cli_ctx = cls;
3321   uint64_t num_updates;
3322
3323   num_updates = ntohl (msg->num_updates);
3324
3325   LOG (GNUNET_ERROR_TYPE_DEBUG,
3326        "Client requested %" PRIu64 " updates of view.\n",
3327        num_updates);
3328
3329   GNUNET_assert (NULL != cli_ctx);
3330   cli_ctx->view_updates_left = num_updates;
3331   send_view (cli_ctx, NULL, 0);
3332   GNUNET_SERVICE_client_continue (cli_ctx->client);
3333 }
3334
3335
3336 /**
3337  * @brief Handle the cancellation of the view updates.
3338  *
3339  * @param cls The client context
3340  * @param msg Unused
3341  */
3342 static void
3343 handle_client_view_cancel (void *cls,
3344                            const struct GNUNET_MessageHeader *msg)
3345 {
3346   struct ClientContext *cli_ctx = cls;
3347   (void) msg;
3348
3349   LOG (GNUNET_ERROR_TYPE_DEBUG,
3350        "Client does not want to receive updates of view any more.\n");
3351
3352   GNUNET_assert (NULL != cli_ctx);
3353   cli_ctx->view_updates_left = 0;
3354   GNUNET_SERVICE_client_continue (cli_ctx->client);
3355   if (GNUNET_YES == cli_ctx->stream_update)
3356   {
3357     destroy_cli_ctx (cli_ctx);
3358   }
3359 }
3360
3361
3362 /**
3363  * Handle RPS request for biased stream from the client.
3364  *
3365  * @param cls Client context
3366  * @param message unused
3367  */
3368 static void
3369 handle_client_stream_request (void *cls,
3370                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3371 {
3372   struct ClientContext *cli_ctx = cls;
3373   (void) msg;
3374
3375   LOG (GNUNET_ERROR_TYPE_DEBUG,
3376        "Client requested peers from biased stream.\n");
3377   cli_ctx->stream_update = GNUNET_YES;
3378
3379   GNUNET_assert (NULL != cli_ctx);
3380   GNUNET_SERVICE_client_continue (cli_ctx->client);
3381 }
3382
3383
3384 /**
3385  * @brief Handles the cancellation of the stream of biased peer ids
3386  *
3387  * @param cls The client context
3388  * @param msg unused
3389  */
3390 static void
3391 handle_client_stream_cancel (void *cls,
3392                              const struct GNUNET_MessageHeader *msg)
3393 {
3394   struct ClientContext *cli_ctx = cls;
3395   (void) msg;
3396
3397   LOG (GNUNET_ERROR_TYPE_DEBUG,
3398        "Client canceled receiving peers from biased stream.\n");
3399   cli_ctx->stream_update = GNUNET_NO;
3400
3401   GNUNET_assert (NULL != cli_ctx);
3402   GNUNET_SERVICE_client_continue (cli_ctx->client);
3403 }
3404
3405
3406 /**
3407  * @brief Create and start a Sub.
3408  *
3409  * @param cls Closure - unused
3410  * @param msg Message containing the necessary information
3411  */
3412 static void
3413 handle_client_start_sub (void *cls,
3414                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3415 {
3416   struct ClientContext *cli_ctx = cls;
3417
3418   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3419   if (NULL != cli_ctx->sub &&
3420       0 != memcmp (&cli_ctx->sub->hash,
3421                    &msg->hash,
3422                    sizeof (struct GNUNET_HashCode)))
3423   {
3424     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3425     destroy_sub (cli_ctx->sub);
3426     cli_ctx->sub = NULL;
3427   }
3428   cli_ctx->sub = new_sub (&msg->hash,
3429                          msub->sampler_size_est_min, // TODO make api input?
3430                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3431   GNUNET_SERVICE_client_continue (cli_ctx->client);
3432 }
3433
3434
3435 /**
3436  * @brief Destroy the Sub
3437  *
3438  * @param cls Closure - unused
3439  * @param msg Message containing the hash that identifies the Sub
3440  */
3441 static void
3442 handle_client_stop_sub (void *cls,
3443                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3444 {
3445   struct ClientContext *cli_ctx = cls;
3446
3447   GNUNET_assert (NULL != cli_ctx->sub);
3448   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3449   {
3450     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3451   }
3452   destroy_sub (cli_ctx->sub);
3453   cli_ctx->sub = NULL;
3454   GNUNET_SERVICE_client_continue (cli_ctx->client);
3455 }
3456
3457
3458 /**
3459  * Handle a CHECK_LIVE message from another peer.
3460  *
3461  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3462  * the channel is blocked for all other communication.
3463  *
3464  * @param cls Closure - Context of channel
3465  * @param msg Message - unused
3466  */
3467 static void
3468 handle_peer_check (void *cls,
3469                    const struct GNUNET_MessageHeader *msg)
3470 {
3471   const struct ChannelCtx *channel_ctx = cls;
3472   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3473   (void) msg;
3474
3475   LOG (GNUNET_ERROR_TYPE_DEBUG,
3476       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3477   if (channel_ctx->peer_ctx->sub == msub)
3478   {
3479     GNUNET_STATISTICS_update (stats,
3480                               "# pending online checks",
3481                               -1,
3482                               GNUNET_NO);
3483   }
3484
3485   GNUNET_CADET_receive_done (channel_ctx->channel);
3486 }
3487
3488
3489 /**
3490  * Handle a PUSH message from another peer.
3491  *
3492  * Check the proof of work and store the PeerID
3493  * in the temporary list for pushed PeerIDs.
3494  *
3495  * @param cls Closure - Context of channel
3496  * @param msg Message - unused
3497  */
3498 static void
3499 handle_peer_push (void *cls,
3500                   const struct GNUNET_MessageHeader *msg)
3501 {
3502   const struct ChannelCtx *channel_ctx = cls;
3503   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3504   (void) msg;
3505
3506   // (check the proof of work (?))
3507
3508   LOG (GNUNET_ERROR_TYPE_DEBUG,
3509        "Received PUSH (%s)\n",
3510        GNUNET_i2s (peer));
3511   if (channel_ctx->peer_ctx->sub == msub)
3512   {
3513     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3514     if (NULL != map_single_hop &&
3515         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3516                                                              peer))
3517     {
3518       GNUNET_STATISTICS_update (stats,
3519                                 "# push message received (multi-hop peer)",
3520                                 1,
3521                                 GNUNET_NO);
3522     }
3523   }
3524
3525   #if ENABLE_MALICIOUS
3526   struct AttackedPeer *tmp_att_peer;
3527
3528   if ( (1 == mal_type) ||
3529        (3 == mal_type) )
3530   { /* Try to maximise representation */
3531     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3532     tmp_att_peer->peer_id = *peer;
3533     if (NULL == att_peer_set)
3534       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3535     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3536                                                              peer))
3537     {
3538       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3539                                    att_peers_tail,
3540                                    tmp_att_peer);
3541       add_peer_array_to_set (peer, 1, att_peer_set);
3542     }
3543     else
3544     {
3545       GNUNET_free (tmp_att_peer);
3546     }
3547   }
3548
3549
3550   else if (2 == mal_type)
3551   {
3552     /* We attack one single well-known peer - simply ignore */
3553   }
3554   #endif /* ENABLE_MALICIOUS */
3555
3556   /* Add the sending peer to the push_map */
3557   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3558
3559   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3560                                      &channel_ctx->peer_ctx->peer_id));
3561   GNUNET_CADET_receive_done (channel_ctx->channel);
3562 }
3563
3564
3565 /**
3566  * Handle PULL REQUEST request message from another peer.
3567  *
3568  * Reply with the view of PeerIDs.
3569  *
3570  * @param cls Closure - Context of channel
3571  * @param msg Message - unused
3572  */
3573 static void
3574 handle_peer_pull_request (void *cls,
3575                           const struct GNUNET_MessageHeader *msg)
3576 {
3577   const struct ChannelCtx *channel_ctx = cls;
3578   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3579   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3580   const struct GNUNET_PeerIdentity *view_array;
3581   (void) msg;
3582
3583   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3584   if (peer_ctx->sub == msub)
3585   {
3586     GNUNET_STATISTICS_update(stats,
3587                              "# pull request message received",
3588                              1,
3589                              GNUNET_NO);
3590     if (NULL != map_single_hop &&
3591         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3592                                                              &peer_ctx->peer_id))
3593     {
3594       GNUNET_STATISTICS_update (stats,
3595                                 "# pull request message received (multi-hop peer)",
3596                                 1,
3597                                 GNUNET_NO);
3598     }
3599   }
3600
3601   #if ENABLE_MALICIOUS
3602   if (1 == mal_type
3603       || 3 == mal_type)
3604   { /* Try to maximise representation */
3605     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3606   }
3607
3608   else if (2 == mal_type)
3609   { /* Try to partition network */
3610     if (0 == GNUNET_memcmp (&attacked_peer, peer))
3611     {
3612       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3613     }
3614   }
3615   #endif /* ENABLE_MALICIOUS */
3616
3617   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3618                                      &channel_ctx->peer_ctx->peer_id));
3619   GNUNET_CADET_receive_done (channel_ctx->channel);
3620   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3621   send_pull_reply (peer_ctx,
3622                    view_array,
3623                    View_size (channel_ctx->peer_ctx->sub->view));
3624 }
3625
3626
3627 /**
3628  * Check whether we sent a corresponding request and
3629  * whether this reply is the first one.
3630  *
3631  * @param cls Closure - Context of channel
3632  * @param msg Message containing the replied peers
3633  */
3634 static int
3635 check_peer_pull_reply (void *cls,
3636                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3637 {
3638   struct ChannelCtx *channel_ctx = cls;
3639   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3640
3641   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3642   {
3643     GNUNET_break_op (0);
3644     return GNUNET_SYSERR;
3645   }
3646
3647   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3648       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3649   {
3650     LOG (GNUNET_ERROR_TYPE_ERROR,
3651         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3652         ntohl (msg->num_peers),
3653         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3654             sizeof (struct GNUNET_PeerIdentity));
3655     GNUNET_break_op (0);
3656     return GNUNET_SYSERR;
3657   }
3658
3659   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3660                                      &sender_ctx->peer_id,
3661                                      Peers_PULL_REPLY_PENDING))
3662   {
3663     LOG (GNUNET_ERROR_TYPE_WARNING,
3664         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3665         GNUNET_i2s (&sender_ctx->peer_id));
3666     if (sender_ctx->sub == msub)
3667     {
3668       GNUNET_STATISTICS_update (stats,
3669                                 "# unrequested pull replies",
3670                                 1,
3671                                 GNUNET_NO);
3672     }
3673   }
3674   return GNUNET_OK;
3675 }
3676
3677
3678 /**
3679  * Handle PULL REPLY message from another peer.
3680  *
3681  * @param cls Closure
3682  * @param msg The message header
3683  */
3684 static void
3685 handle_peer_pull_reply (void *cls,
3686                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3687 {
3688   const struct ChannelCtx *channel_ctx = cls;
3689   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3690   const struct GNUNET_PeerIdentity *peers;
3691   struct Sub *sub = channel_ctx->peer_ctx->sub;
3692   uint32_t i;
3693 #if ENABLE_MALICIOUS
3694   struct AttackedPeer *tmp_att_peer;
3695 #endif /* ENABLE_MALICIOUS */
3696
3697   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3698   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3699   if (channel_ctx->peer_ctx->sub == msub)
3700   {
3701     GNUNET_STATISTICS_update (stats,
3702                               "# pull reply messages received",
3703                               1,
3704                               GNUNET_NO);
3705     if (NULL != map_single_hop &&
3706         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3707           &channel_ctx->peer_ctx->peer_id))
3708     {
3709       GNUNET_STATISTICS_update (stats,
3710                                 "# pull reply messages received (multi-hop peer)",
3711                                 1,
3712                                 GNUNET_NO);
3713     }
3714   }
3715
3716   #if ENABLE_MALICIOUS
3717   // We shouldn't even receive pull replies as we're not sending
3718   if (2 == mal_type)
3719   {
3720   }
3721   #endif /* ENABLE_MALICIOUS */
3722
3723   /* Do actual logic */
3724   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3725
3726   LOG (GNUNET_ERROR_TYPE_DEBUG,
3727        "PULL REPLY received, got following %u peers:\n",
3728        ntohl (msg->num_peers));
3729
3730   for (i = 0; i < ntohl (msg->num_peers); i++)
3731   {
3732     LOG (GNUNET_ERROR_TYPE_DEBUG,
3733          "%u. %s\n",
3734          i,
3735          GNUNET_i2s (&peers[i]));
3736
3737     #if ENABLE_MALICIOUS
3738     if ((NULL != att_peer_set) &&
3739         (1 == mal_type || 3 == mal_type))
3740     { /* Add attacked peer to local list */
3741       // TODO check if we sent a request and this was the first reply
3742       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3743                                                                &peers[i])
3744           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3745                                                                   &peers[i]))
3746       {
3747         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3748         tmp_att_peer->peer_id = peers[i];
3749         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3750                                      att_peers_tail,
3751                                      tmp_att_peer);
3752         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3753       }
3754       continue;
3755     }
3756     #endif /* ENABLE_MALICIOUS */
3757     /* Make sure we 'know' about this peer */
3758     (void) insert_peer (channel_ctx->peer_ctx->sub,
3759                         &peers[i]);
3760
3761     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3762                                         &peers[i]))
3763     {
3764       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3765                          &peers[i]);
3766     }
3767     else
3768     {
3769       schedule_operation (channel_ctx->peer_ctx,
3770                           insert_in_pull_map,
3771                           channel_ctx->peer_ctx->sub); /* cls */
3772       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3773                                       &peers[i]);
3774     }
3775   }
3776
3777   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3778                                  sender),
3779                    Peers_PULL_REPLY_PENDING);
3780   clean_peer (channel_ctx->peer_ctx->sub,
3781               sender);
3782
3783   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3784                                      sender));
3785   GNUNET_CADET_receive_done (channel_ctx->channel);
3786 }
3787
3788
3789 /**
3790  * Compute a random delay.
3791  * A uniformly distributed value between mean + spread and mean - spread.
3792  *
3793  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3794  * It would return a random value between 2 and 6 min.
3795  *
3796  * @param mean the mean time until the next round
3797  * @param spread the inverse amount of deviation from the mean
3798  */
3799 static struct GNUNET_TIME_Relative
3800 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3801                     unsigned int spread)
3802 {
3803   struct GNUNET_TIME_Relative half_interval;
3804   struct GNUNET_TIME_Relative ret;
3805   unsigned int rand_delay;
3806   unsigned int max_rand_delay;
3807
3808   if (0 == spread)
3809   {
3810     LOG (GNUNET_ERROR_TYPE_WARNING,
3811          "Not accepting spread of 0\n");
3812     GNUNET_break (0);
3813     GNUNET_assert (0);
3814   }
3815   GNUNET_assert (0 != mean.rel_value_us);
3816
3817   /* Compute random time value between spread * mean and spread * mean */
3818   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3819
3820   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3821   /**
3822    * Compute random value between (0 and 1) * round_interval
3823    * via multiplying round_interval with a 'fraction' (0 to value)/value
3824    */
3825   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3826   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3827   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3828   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3829
3830   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3831     LOG (GNUNET_ERROR_TYPE_WARNING,
3832          "Returning FOREVER_REL\n");
3833
3834   return ret;
3835 }
3836
3837
3838 /**
3839  * Send single pull request
3840  *
3841  * @param peer_ctx Context to the peer to send request to
3842  */
3843 static void
3844 send_pull_request (struct PeerContext *peer_ctx)
3845 {
3846   struct GNUNET_MQ_Envelope *ev;
3847
3848   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3849                                                &peer_ctx->peer_id,
3850                                                Peers_PULL_REPLY_PENDING));
3851   SET_PEER_FLAG (peer_ctx,
3852                  Peers_PULL_REPLY_PENDING);
3853   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3854
3855   LOG (GNUNET_ERROR_TYPE_DEBUG,
3856        "Going to send PULL REQUEST to peer %s.\n",
3857        GNUNET_i2s (&peer_ctx->peer_id));
3858
3859   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3860   send_message (peer_ctx,
3861                 ev,
3862                 "PULL REQUEST");
3863   if (peer_ctx->sub)
3864   {
3865     GNUNET_STATISTICS_update (stats,
3866                               "# pull request send issued",
3867                               1,
3868                               GNUNET_NO);
3869     if (NULL != map_single_hop &&
3870         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3871                                                              &peer_ctx->peer_id))
3872     {
3873       GNUNET_STATISTICS_update (stats,
3874                                 "# pull request send issued (multi-hop peer)",
3875                                 1,
3876                                 GNUNET_NO);
3877     }
3878   }
3879 }
3880
3881
3882 /**
3883  * Send single push
3884  *
3885  * @param peer_ctx Context of peer to send push to
3886  */
3887 static void
3888 send_push (struct PeerContext *peer_ctx)
3889 {
3890   struct GNUNET_MQ_Envelope *ev;
3891
3892   LOG (GNUNET_ERROR_TYPE_DEBUG,
3893        "Going to send PUSH to peer %s.\n",
3894        GNUNET_i2s (&peer_ctx->peer_id));
3895
3896   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3897   send_message (peer_ctx, ev, "PUSH");
3898   if (peer_ctx->sub)
3899   {
3900     GNUNET_STATISTICS_update (stats,
3901                               "# push send issued",
3902                               1,
3903                               GNUNET_NO);
3904     if (NULL != map_single_hop &&
3905         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3906                                                              &peer_ctx->peer_id))
3907     {
3908       GNUNET_STATISTICS_update (stats,
3909                                 "# push send issued (multi-hop peer)",
3910                                 1,
3911                                 GNUNET_NO);
3912     }
3913   }
3914 }
3915
3916
3917 #if ENABLE_MALICIOUS
3918
3919
3920 /**
3921  * @brief This function is called, when the client tells us to act malicious.
3922  * It verifies that @a msg is well-formed.
3923  *
3924  * @param cls the closure (#ClientContext)
3925  * @param msg the message
3926  * @return #GNUNET_OK if @a msg is well-formed
3927  */
3928 static int
3929 check_client_act_malicious (void *cls,
3930                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3931 {
3932   struct ClientContext *cli_ctx = cls;
3933   uint16_t msize = ntohs (msg->header.size);
3934   uint32_t num_peers = ntohl (msg->num_peers);
3935
3936   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3937   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3938        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3939   {
3940     LOG (GNUNET_ERROR_TYPE_ERROR,
3941         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3942         ntohl (msg->num_peers),
3943         (msize / sizeof (struct GNUNET_PeerIdentity)));
3944     GNUNET_break (0);
3945     GNUNET_SERVICE_client_drop (cli_ctx->client);
3946     return GNUNET_SYSERR;
3947   }
3948   return GNUNET_OK;
3949 }
3950
3951 /**
3952  * Turn RPS service to act malicious.
3953  *
3954  * @param cls Closure
3955  * @param client The client that sent the message
3956  * @param msg The message header
3957  */
3958 static void
3959 handle_client_act_malicious (void *cls,
3960                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3961 {
3962   struct ClientContext *cli_ctx = cls;
3963   struct GNUNET_PeerIdentity *peers;
3964   uint32_t num_mal_peers_sent;
3965   uint32_t num_mal_peers_old;
3966   struct Sub *sub = cli_ctx->sub;
3967
3968   if (NULL == sub) sub = msub;
3969   /* Do actual logic */
3970   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3971   mal_type = ntohl (msg->type);
3972   if (NULL == mal_peer_set)
3973     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3974
3975   LOG (GNUNET_ERROR_TYPE_DEBUG,
3976        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3977        mal_type,
3978        ntohl (msg->num_peers));
3979
3980   if (1 == mal_type)
3981   { /* Try to maximise representation */
3982     /* Add other malicious peers to those we already know */
3983
3984     num_mal_peers_sent = ntohl (msg->num_peers);
3985     num_mal_peers_old = num_mal_peers;
3986     GNUNET_array_grow (mal_peers,
3987                        num_mal_peers,
3988                        num_mal_peers + num_mal_peers_sent);
3989     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3990             peers,
3991             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3992
3993     /* Add all mal peers to mal_peer_set */
3994     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3995                            num_mal_peers_sent,
3996                            mal_peer_set);
3997
3998     /* Substitute do_round () with do_mal_round () */
3999     GNUNET_assert (NULL != sub->do_round_task);
4000     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4001     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4002   }
4003
4004   else if ( (2 == mal_type) ||
4005             (3 == mal_type) )
4006   { /* Try to partition the network */
4007     /* Add other malicious peers to those we already know */
4008
4009     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4010     num_mal_peers_old = num_mal_peers;
4011     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4012     GNUNET_array_grow (mal_peers,
4013                        num_mal_peers,
4014                        num_mal_peers + num_mal_peers_sent);
4015     if (NULL != mal_peers &&
4016         0 != num_mal_peers)
4017     {
4018       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4019               peers,
4020               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
4021
4022       /* Add all mal peers to mal_peer_set */
4023       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4024                              num_mal_peers_sent,
4025                              mal_peer_set);
4026     }
4027
4028     /* Store the one attacked peer */
4029     GNUNET_memcpy (&attacked_peer,
4030             &msg->attacked_peer,
4031             sizeof (struct GNUNET_PeerIdentity));
4032     /* Set the flag of the attacked peer to valid to avoid problems */
4033     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4034     {
4035       (void) issue_peer_online_check (sub, &attacked_peer);
4036     }
4037
4038     LOG (GNUNET_ERROR_TYPE_DEBUG,
4039          "Attacked peer is %s\n",
4040          GNUNET_i2s (&attacked_peer));
4041
4042     /* Substitute do_round () with do_mal_round () */
4043     if (NULL != sub->do_round_task)
4044     {
4045       /* Probably in shutdown */
4046       GNUNET_SCHEDULER_cancel (sub->do_round_task);
4047       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4048     }
4049   }
4050   else if (0 == mal_type)
4051   { /* Stop acting malicious */
4052     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4053
4054     /* Substitute do_mal_round () with do_round () */
4055     GNUNET_SCHEDULER_cancel (sub->do_round_task);
4056     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4057   }
4058   else
4059   {
4060     GNUNET_break (0);
4061     GNUNET_SERVICE_client_continue (cli_ctx->client);
4062   }
4063   GNUNET_SERVICE_client_continue (cli_ctx->client);
4064 }
4065
4066
4067 /**
4068  * Send out PUSHes and PULLs maliciously.
4069  *
4070  * This is executed regylary.
4071  *
4072  * @param cls Closure - Sub
4073  */
4074 static void
4075 do_mal_round (void *cls)
4076 {
4077   uint32_t num_pushes;
4078   uint32_t i;
4079   struct GNUNET_TIME_Relative time_next_round;
4080   struct AttackedPeer *tmp_att_peer;
4081   struct Sub *sub = cls;
4082
4083   LOG (GNUNET_ERROR_TYPE_DEBUG,
4084        "Going to execute next round maliciously type %" PRIu32 ".\n",
4085       mal_type);
4086   sub->do_round_task = NULL;
4087   GNUNET_assert (mal_type <= 3);
4088   /* Do malicious actions */
4089   if (1 == mal_type)
4090   { /* Try to maximise representation */
4091
4092     /* The maximum of pushes we're going to send this round */
4093     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4094                                          num_attacked_peers),
4095                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4096
4097     LOG (GNUNET_ERROR_TYPE_DEBUG,
4098          "Going to send %" PRIu32 " pushes\n",
4099          num_pushes);
4100
4101     /* Send PUSHes to attacked peers */
4102     for (i = 0 ; i < num_pushes ; i++)
4103     {
4104       if (att_peers_tail == att_peer_index)
4105         att_peer_index = att_peers_head;
4106       else
4107         att_peer_index = att_peer_index->next;
4108
4109       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4110     }
4111
4112     /* Send PULLs to some peers to learn about additional peers to attack */
4113     tmp_att_peer = att_peer_index;
4114     for (i = 0 ; i < num_pushes * alpha ; i++)
4115     {
4116       if (att_peers_tail == tmp_att_peer)
4117         tmp_att_peer = att_peers_head;
4118       else
4119         att_peer_index = tmp_att_peer->next;
4120
4121       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4122     }
4123   }
4124
4125
4126   else if (2 == mal_type)
4127   { /**
4128      * Try to partition the network
4129      * Send as many pushes to the attacked peer as possible
4130      * That is one push per round as it will ignore more.
4131      */
4132     (void) issue_peer_online_check (sub, &attacked_peer);
4133     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4134                                        &attacked_peer,
4135                                        Peers_ONLINE))
4136       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4137   }
4138
4139
4140   if (3 == mal_type)
4141   { /* Combined attack */
4142
4143     /* Send PUSH to attacked peers */
4144     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4145     {
4146       (void) issue_peer_online_check (sub, &attacked_peer);
4147       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4148                                          &attacked_peer,
4149                                          Peers_ONLINE))
4150       {
4151         LOG (GNUNET_ERROR_TYPE_DEBUG,
4152             "Goding to send push to attacked peer (%s)\n",
4153             GNUNET_i2s (&attacked_peer));
4154         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4155       }
4156     }
4157     (void) issue_peer_online_check (sub, &attacked_peer);
4158
4159     /* The maximum of pushes we're going to send this round */
4160     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4161                                          num_attacked_peers),
4162                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4163
4164     LOG (GNUNET_ERROR_TYPE_DEBUG,
4165          "Going to send %" PRIu32 " pushes\n",
4166          num_pushes);
4167
4168     for (i = 0; i < num_pushes; i++)
4169     {
4170       if (att_peers_tail == att_peer_index)
4171         att_peer_index = att_peers_head;
4172       else
4173         att_peer_index = att_peer_index->next;
4174
4175       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4176     }
4177
4178     /* Send PULLs to some peers to learn about additional peers to attack */
4179     tmp_att_peer = att_peer_index;
4180     for (i = 0; i < num_pushes * alpha; i++)
4181     {
4182       if (att_peers_tail == tmp_att_peer)
4183         tmp_att_peer = att_peers_head;
4184       else
4185         att_peer_index = tmp_att_peer->next;
4186
4187       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4188     }
4189   }
4190
4191   /* Schedule next round */
4192   time_next_round = compute_rand_delay (sub->round_interval, 2);
4193
4194   GNUNET_assert (NULL == sub->do_round_task);
4195   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4196                                                     &do_mal_round, sub);
4197   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4198 }
4199 #endif /* ENABLE_MALICIOUS */
4200
4201
4202 /**
4203  * Send out PUSHes and PULLs, possibly update #view, samplers.
4204  *
4205  * This is executed regylary.
4206  *
4207  * @param cls Closure - Sub
4208  */
4209 static void
4210 do_round (void *cls)
4211 {
4212   unsigned int i;
4213   const struct GNUNET_PeerIdentity *view_array;
4214   unsigned int *permut;
4215   unsigned int a_peers; /* Number of peers we send pushes to */
4216   unsigned int b_peers; /* Number of peers we send pull requests to */
4217   uint32_t first_border;
4218   uint32_t second_border;
4219   struct GNUNET_PeerIdentity peer;
4220   struct GNUNET_PeerIdentity *update_peer;
4221   struct Sub *sub = cls;
4222
4223   sub->num_rounds++;
4224   LOG (GNUNET_ERROR_TYPE_DEBUG,
4225        "Going to execute next round.\n");
4226   if (sub == msub)
4227   {
4228     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4229   }
4230   sub->do_round_task = NULL;
4231 #ifdef TO_FILE_FULL
4232   to_file (sub->file_name_view_log,
4233            "___ new round ___");
4234 #endif /* TO_FILE_FULL */
4235   view_array = View_get_as_array (sub->view);
4236   for (i = 0; i < View_size (sub->view); i++)
4237   {
4238     LOG (GNUNET_ERROR_TYPE_DEBUG,
4239          "\t%s\n", GNUNET_i2s (&view_array[i]));
4240 #ifdef TO_FILE_FULL
4241     to_file (sub->file_name_view_log,
4242              "=%s\t(do round)",
4243              GNUNET_i2s_full (&view_array[i]));
4244 #endif /* TO_FILE_FULL */
4245   }
4246
4247
4248   /* Send pushes and pull requests */
4249   if (0 < View_size (sub->view))
4250   {
4251     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4252                                            View_size (sub->view));
4253
4254     /* Send PUSHes */
4255     a_peers = ceil (alpha * View_size (sub->view));
4256
4257     LOG (GNUNET_ERROR_TYPE_DEBUG,
4258          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4259          a_peers, alpha, View_size (sub->view));
4260     for (i = 0; i < a_peers; i++)
4261     {
4262       peer = view_array[permut[i]];
4263       // FIXME if this fails schedule/loop this for later
4264       send_push (get_peer_ctx (sub->peer_map, &peer));
4265     }
4266
4267     /* Send PULL requests */
4268     b_peers = ceil (beta * View_size (sub->view));
4269     first_border = a_peers;
4270     second_border = a_peers + b_peers;
4271     if (second_border > View_size (sub->view))
4272     {
4273       first_border = View_size (sub->view) - b_peers;
4274       second_border = View_size (sub->view);
4275     }
4276     LOG (GNUNET_ERROR_TYPE_DEBUG,
4277         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4278         b_peers, beta, View_size (sub->view));
4279     for (i = first_border; i < second_border; i++)
4280     {
4281       peer = view_array[permut[i]];
4282       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4283                                          &peer,
4284                                          Peers_PULL_REPLY_PENDING))
4285       { // FIXME if this fails schedule/loop this for later
4286         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4287       }
4288     }
4289
4290     GNUNET_free (permut);
4291     permut = NULL;
4292   }
4293
4294
4295   /* Update view */
4296   /* TODO see how many peers are in push-/pull- list! */
4297
4298   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4299       (0 < CustomPeerMap_size (sub->push_map)) &&
4300       (0 < CustomPeerMap_size (sub->pull_map)))
4301   { /* If conditions for update are fulfilled, update */
4302     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4303
4304     uint32_t final_size;
4305     uint32_t peers_to_clean_size;
4306     struct GNUNET_PeerIdentity *peers_to_clean;
4307
4308     peers_to_clean = NULL;
4309     peers_to_clean_size = 0;
4310     GNUNET_array_grow (peers_to_clean,
4311                        peers_to_clean_size,
4312                        View_size (sub->view));
4313     GNUNET_memcpy (peers_to_clean,
4314             view_array,
4315             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4316
4317     /* Seems like recreating is the easiest way of emptying the peermap */
4318     View_clear (sub->view);
4319 #ifdef TO_FILE_FULL
4320     to_file (sub->file_name_view_log,
4321              "--- emptied ---");
4322 #endif /* TO_FILE_FULL */
4323
4324     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4325                                 CustomPeerMap_size (sub->push_map));
4326     second_border = first_border +
4327                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4328                                 CustomPeerMap_size (sub->pull_map));
4329     final_size    = second_border +
4330       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4331     LOG (GNUNET_ERROR_TYPE_DEBUG,
4332         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4333         first_border,
4334         second_border,
4335         final_size);
4336
4337     /* Update view with peers received through PUSHes */
4338     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4339                                            CustomPeerMap_size (sub->push_map));
4340     for (i = 0; i < first_border; i++)
4341     {
4342       int inserted;
4343       inserted = insert_in_view (sub,
4344                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4345                                                                   permut[i]));
4346       if (GNUNET_OK == inserted)
4347       {
4348         clients_notify_stream_peer (sub,
4349             1,
4350             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4351       }
4352 #ifdef TO_FILE_FULL
4353       to_file (sub->file_name_view_log,
4354                "+%s\t(push list)",
4355                GNUNET_i2s_full (&view_array[i]));
4356 #endif /* TO_FILE_FULL */
4357       // TODO change the peer_flags accordingly
4358     }
4359     GNUNET_free (permut);
4360     permut = NULL;
4361
4362     /* Update view with peers received through PULLs */
4363     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4364                                            CustomPeerMap_size (sub->pull_map));
4365     for (i = first_border; i < second_border; i++)
4366     {
4367       int inserted;
4368       inserted = insert_in_view (sub,
4369           CustomPeerMap_get_peer_by_index (sub->pull_map,
4370                                            permut[i - first_border]));
4371       if (GNUNET_OK == inserted)
4372       {
4373         clients_notify_stream_peer (sub,
4374             1,
4375             CustomPeerMap_get_peer_by_index (sub->pull_map,
4376                                              permut[i - first_border]));
4377       }
4378 #ifdef TO_FILE_FULL
4379       to_file (sub->file_name_view_log,
4380                "+%s\t(pull list)",
4381                GNUNET_i2s_full (&view_array[i]));
4382 #endif /* TO_FILE_FULL */
4383       // TODO change the peer_flags accordingly
4384     }
4385     GNUNET_free (permut);
4386     permut = NULL;
4387
4388     /* Update view with peers from history */
4389     RPS_sampler_get_n_rand_peers (sub->sampler,
4390                                   final_size - second_border,
4391                                   hist_update,
4392                                   sub);
4393     // TODO change the peer_flags accordingly
4394
4395     for (i = 0; i < View_size (sub->view); i++)
4396       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4397
4398     /* Clean peers that were removed from the view */
4399     for (i = 0; i < peers_to_clean_size; i++)
4400     {
4401 #ifdef TO_FILE_FULL
4402       to_file (sub->file_name_view_log,
4403                "-%s",
4404                GNUNET_i2s_full (&peers_to_clean[i]));
4405 #endif /* TO_FILE_FULL */
4406       clean_peer (sub, &peers_to_clean[i]);
4407     }
4408
4409     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4410     clients_notify_view_update (sub);
4411   } else {
4412     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4413     if (sub == msub)
4414     {
4415       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4416       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4417           !(0 >= CustomPeerMap_size (sub->pull_map)))
4418         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4419       if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4420           (0 >= CustomPeerMap_size (sub->pull_map)))
4421         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4422       if (0 >= CustomPeerMap_size (sub->push_map) &&
4423           !(0 >= CustomPeerMap_size (sub->pull_map)))
4424         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4425       if (0 >= CustomPeerMap_size (sub->push_map) &&
4426           (0 >= CustomPeerMap_size (sub->pull_map)))
4427         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4428       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4429           CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4430           0 >= CustomPeerMap_size (sub->push_map))
4431         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4432     }
4433   }
4434   // TODO independent of that also get some peers from CADET_get_peers()?
4435   if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4436   {
4437     sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4438   }
4439   else
4440   {
4441     LOG (GNUNET_ERROR_TYPE_WARNING,
4442          "Push map size too big for histogram (%u, %u)\n",
4443          CustomPeerMap_size (sub->push_map),
4444          HISTOGRAM_FILE_SLOTS);
4445   }
4446   // FIXME check bounds of histogram
4447   sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map) -
4448                    (alpha * sub->view_size_est_need)) +
4449                           (HISTOGRAM_FILE_SLOTS/2)]++;
4450   if (sub == msub)
4451   {
4452     GNUNET_STATISTICS_set (stats,
4453         "# peers in push map at end of round",
4454         CustomPeerMap_size (sub->push_map),
4455         GNUNET_NO);
4456     GNUNET_STATISTICS_set (stats,
4457         "# peers in pull map at end of round",
4458         CustomPeerMap_size (sub->pull_map),
4459         GNUNET_NO);
4460     GNUNET_STATISTICS_set (stats,
4461         "# peers in view at end of round",
4462         View_size (sub->view),
4463         GNUNET_NO);
4464     GNUNET_STATISTICS_set (stats,
4465         "# expected pushes",
4466         alpha * sub->view_size_est_need,
4467         GNUNET_NO);
4468     GNUNET_STATISTICS_set (stats,
4469         "delta expected - received pushes",
4470         CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4471         GNUNET_NO);
4472   }
4473
4474   LOG (GNUNET_ERROR_TYPE_DEBUG,
4475        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4476        CustomPeerMap_size (sub->push_map),
4477        CustomPeerMap_size (sub->pull_map),
4478        alpha,
4479        View_size (sub->view),
4480        alpha * View_size (sub->view));
4481
4482   /* Update samplers */
4483   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4484   {
4485     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4486     LOG (GNUNET_ERROR_TYPE_DEBUG,
4487          "Updating with peer %s from push list\n",
4488          GNUNET_i2s (update_peer));
4489     insert_in_sampler (sub, update_peer);
4490     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4491   }
4492
4493   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4494   {
4495     LOG (GNUNET_ERROR_TYPE_DEBUG,
4496          "Updating with peer %s from pull list\n",
4497          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4498     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4499     /* This cleans only if it is not in the view */
4500     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4501   }
4502
4503
4504   /* Empty push/pull lists */
4505   CustomPeerMap_clear (sub->push_map);
4506   CustomPeerMap_clear (sub->pull_map);
4507
4508   if (sub == msub)
4509   {
4510     GNUNET_STATISTICS_set (stats,
4511                            "view size",
4512                            View_size(sub->view),
4513                            GNUNET_NO);
4514   }
4515
4516   struct GNUNET_TIME_Relative time_next_round;
4517
4518   time_next_round = compute_rand_delay (sub->round_interval, 2);
4519
4520   /* Schedule next round */
4521   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4522                                                      &do_round, sub);
4523   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4524 }
4525
4526
4527 /**
4528  * This is called from GNUNET_CADET_get_peers().
4529  *
4530  * It is called on every peer(ID) that cadet somehow has contact with.
4531  * We use those to initialise the sampler.
4532  *
4533  * implements #GNUNET_CADET_PeersCB
4534  *
4535  * @param cls Closure - Sub
4536  * @param peer Peer, or NULL on "EOF".
4537  * @param tunnel Do we have a tunnel towards this peer?
4538  * @param n_paths Number of known paths towards this peer.
4539  * @param best_path How long is the best path?
4540  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4541  */
4542 void
4543 init_peer_cb (void *cls,
4544               const struct GNUNET_PeerIdentity *peer,
4545               int tunnel, /* "Do we have a tunnel towards this peer?" */
4546               unsigned int n_paths, /* "Number of known paths towards this peer" */
4547               unsigned int best_path) /* "How long is the best path?
4548                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4549 {
4550   struct Sub *sub = cls;
4551   (void) tunnel;
4552   (void) n_paths;
4553   (void) best_path;
4554
4555   if (NULL != peer)
4556   {
4557     LOG (GNUNET_ERROR_TYPE_DEBUG,
4558          "Got peer_id %s from cadet\n",
4559          GNUNET_i2s (peer));
4560     got_peer (sub, peer);
4561   }
4562 }
4563
4564
4565 /**
4566  * @brief Iterator function over stored, valid peers.
4567  *
4568  * We initialise the sampler with those.
4569  *
4570  * @param cls Closure - Sub
4571  * @param peer the peer id
4572  * @return #GNUNET_YES if we should continue to
4573  *         iterate,
4574  *         #GNUNET_NO if not.
4575  */
4576 static int
4577 valid_peers_iterator (void *cls,
4578                       const struct GNUNET_PeerIdentity *peer)
4579 {
4580   struct Sub *sub = cls;
4581
4582   if (NULL != peer)
4583   {
4584     LOG (GNUNET_ERROR_TYPE_DEBUG,
4585          "Got stored, valid peer %s\n",
4586          GNUNET_i2s (peer));
4587     got_peer (sub, peer);
4588   }
4589   return GNUNET_YES;
4590 }
4591
4592
4593 /**
4594  * Iterator over peers from peerinfo.
4595  *
4596  * @param cls Closure - Sub
4597  * @param peer id of the peer, NULL for last call
4598  * @param hello hello message for the peer (can be NULL)
4599  * @param error message
4600  */
4601 void
4602 process_peerinfo_peers (void *cls,
4603                         const struct GNUNET_PeerIdentity *peer,
4604                         const struct GNUNET_HELLO_Message *hello,
4605                         const char *err_msg)
4606 {
4607   struct Sub *sub = cls;
4608   (void) hello;
4609   (void) err_msg;
4610
4611   if (NULL != peer)
4612   {
4613     LOG (GNUNET_ERROR_TYPE_DEBUG,
4614          "Got peer_id %s from peerinfo\n",
4615          GNUNET_i2s (peer));
4616     got_peer (sub, peer);
4617   }
4618 }
4619
4620
4621 /**
4622  * Task run during shutdown.
4623  *
4624  * @param cls Closure - unused
4625  */
4626 static void
4627 shutdown_task (void *cls)
4628 {
4629   (void) cls;
4630   struct ClientContext *client_ctx;
4631
4632   LOG (GNUNET_ERROR_TYPE_DEBUG,
4633        "RPS service is going down\n");
4634
4635   /* Clean all clients */
4636   for (client_ctx = cli_ctx_head;
4637        NULL != cli_ctx_head;
4638        client_ctx = cli_ctx_head)
4639   {
4640     destroy_cli_ctx (client_ctx);
4641   }
4642   if (NULL != msub)
4643   {
4644     destroy_sub (msub);
4645     msub = NULL;
4646   }
4647
4648   /* Disconnect from other services */
4649   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4650   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4651   peerinfo_handle = NULL;
4652   GNUNET_NSE_disconnect (nse);
4653   if (NULL != map_single_hop)
4654   {
4655     /* core_init was called - core was initialised */
4656     /* disconnect first, so no callback tries to access missing peermap */
4657     GNUNET_CORE_disconnect (core_handle);
4658     core_handle = NULL;
4659     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4660     map_single_hop = NULL;
4661   }
4662
4663   if (NULL != stats)
4664   {
4665     GNUNET_STATISTICS_destroy (stats,
4666                                GNUNET_NO);
4667     stats = NULL;
4668   }
4669   GNUNET_CADET_disconnect (cadet_handle);
4670   cadet_handle = NULL;
4671 #if ENABLE_MALICIOUS
4672   struct AttackedPeer *tmp_att_peer;
4673   GNUNET_array_grow (mal_peers,
4674                      num_mal_peers,
4675                      0);
4676   if (NULL != mal_peer_set)
4677     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4678   if (NULL != att_peer_set)
4679     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4680   while (NULL != att_peers_head)
4681   {
4682     tmp_att_peer = att_peers_head;
4683     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4684                                  att_peers_tail,
4685                                  tmp_att_peer);
4686     GNUNET_free (tmp_att_peer);
4687   }
4688 #endif /* ENABLE_MALICIOUS */
4689   close_all_files();
4690 }
4691
4692
4693 /**
4694  * Handle client connecting to the service.
4695  *
4696  * @param cls unused
4697  * @param client the new client
4698  * @param mq the message queue of @a client
4699  * @return @a client
4700  */
4701 static void *
4702 client_connect_cb (void *cls,
4703                    struct GNUNET_SERVICE_Client *client,
4704                    struct GNUNET_MQ_Handle *mq)
4705 {
4706   struct ClientContext *cli_ctx;
4707   (void) cls;
4708
4709   LOG (GNUNET_ERROR_TYPE_DEBUG,
4710        "Client connected\n");
4711   if (NULL == client)
4712     return client; /* Server was destroyed before a client connected. Shutting down */
4713   cli_ctx = GNUNET_new (struct ClientContext);
4714   cli_ctx->mq = mq;
4715   cli_ctx->view_updates_left = -1;
4716   cli_ctx->stream_update = GNUNET_NO;
4717   cli_ctx->client = client;
4718   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4719                                cli_ctx_tail,
4720                                cli_ctx);
4721   return cli_ctx;
4722 }
4723
4724 /**
4725  * Callback called when a client disconnected from the service
4726  *
4727  * @param cls closure for the service
4728  * @param c the client that disconnected
4729  * @param internal_cls should be equal to @a c
4730  */
4731 static void
4732 client_disconnect_cb (void *cls,
4733                       struct GNUNET_SERVICE_Client *client,
4734                       void *internal_cls)
4735 {
4736   struct ClientContext *cli_ctx = internal_cls;
4737
4738   (void) cls;
4739   GNUNET_assert (client == cli_ctx->client);
4740   if (NULL == client)
4741   {/* shutdown task - destroy all clients */
4742     while (NULL != cli_ctx_head)
4743       destroy_cli_ctx (cli_ctx_head);
4744   }
4745   else
4746   { /* destroy this client */
4747     LOG (GNUNET_ERROR_TYPE_DEBUG,
4748         "Client disconnected. Destroy its context.\n");
4749     destroy_cli_ctx (cli_ctx);
4750   }
4751 }
4752
4753
4754 /**
4755  * Handle random peer sampling clients.
4756  *
4757  * @param cls closure
4758  * @param c configuration to use
4759  * @param service the initialized service
4760  */
4761 static void
4762 run (void *cls,
4763      const struct GNUNET_CONFIGURATION_Handle *c,
4764      struct GNUNET_SERVICE_Handle *service)
4765 {
4766   struct GNUNET_TIME_Relative round_interval;
4767   long long unsigned int sampler_size;
4768   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4769   struct GNUNET_HashCode hash;
4770
4771   (void) cls;
4772   (void) service;
4773
4774   GNUNET_log_setup ("rps",
4775                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4776                     NULL);
4777   cfg = c;
4778   /* Get own ID */
4779   GNUNET_CRYPTO_get_peer_identity (cfg,
4780                                    &own_identity); // TODO check return value
4781   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4782               "STARTING SERVICE (rps) for peer [%s]\n",
4783               GNUNET_i2s (&own_identity));
4784 #if ENABLE_MALICIOUS
4785   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4786               "Malicious execution compiled in.\n");
4787 #endif /* ENABLE_MALICIOUS */
4788
4789   /* Get time interval from the configuration */
4790   if (GNUNET_OK !=
4791       GNUNET_CONFIGURATION_get_value_time (cfg,
4792                                            "RPS",
4793                                            "ROUNDINTERVAL",
4794                                            &round_interval))
4795   {
4796     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4797                                "RPS", "ROUNDINTERVAL");
4798     GNUNET_SCHEDULER_shutdown ();
4799     return;
4800   }
4801
4802   /* Get initial size of sampler/view from the configuration */
4803   if (GNUNET_OK !=
4804       GNUNET_CONFIGURATION_get_value_number (cfg,
4805                                              "RPS",
4806                                              "MINSIZE",
4807                                              &sampler_size))
4808   {
4809     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4810                                "RPS", "MINSIZE");
4811     GNUNET_SCHEDULER_shutdown ();
4812     return;
4813   }
4814
4815   cadet_handle = GNUNET_CADET_connect (cfg);
4816   GNUNET_assert (NULL != cadet_handle);
4817   core_handle = GNUNET_CORE_connect (cfg,
4818                                      NULL, /* cls */
4819                                      core_init, /* init */
4820                                      core_connects, /* connects */
4821                                      core_disconnects, /* disconnects */
4822                                      NULL); /* handlers */
4823   GNUNET_assert (NULL != core_handle);
4824
4825
4826   alpha = 0.45;
4827   beta  = 0.45;
4828
4829
4830   /* Set up main Sub */
4831   GNUNET_CRYPTO_hash (hash_port_string,
4832                       strlen (hash_port_string),
4833                       &hash);
4834   msub = new_sub (&hash,
4835                  sampler_size, /* Will be overwritten by config */
4836                  round_interval);
4837
4838
4839   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4840
4841   /* connect to NSE */
4842   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4843
4844   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4845   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4846   // TODO send push/pull to each of those peers?
4847   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4848   restore_valid_peers (msub);
4849   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4850
4851   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4852                                                    GNUNET_NO,
4853                                                    process_peerinfo_peers,
4854                                                    msub);
4855
4856   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4857
4858   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4859   stats = GNUNET_STATISTICS_create ("rps", cfg);
4860 }
4861
4862
4863 /**
4864  * Define "main" method using service macro.
4865  */
4866 GNUNET_SERVICE_MAIN
4867 ("rps",
4868  GNUNET_SERVICE_OPTION_NONE,
4869  &run,
4870  &client_connect_cb,
4871  &client_disconnect_cb,
4872  NULL,
4873  GNUNET_MQ_hd_var_size (client_seed,
4874    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4875    struct GNUNET_RPS_CS_SeedMessage,
4876    NULL),
4877 #if ENABLE_MALICIOUS
4878  GNUNET_MQ_hd_var_size (client_act_malicious,
4879    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4880    struct GNUNET_RPS_CS_ActMaliciousMessage,
4881    NULL),
4882 #endif /* ENABLE_MALICIOUS */
4883  GNUNET_MQ_hd_fixed_size (client_view_request,
4884    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4885    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4886    NULL),
4887  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4888    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4889    struct GNUNET_MessageHeader,
4890    NULL),
4891  GNUNET_MQ_hd_fixed_size (client_stream_request,
4892    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4893    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4894    NULL),
4895  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4896    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4897    struct GNUNET_MessageHeader,
4898    NULL),
4899  GNUNET_MQ_hd_fixed_size (client_start_sub,
4900    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4901    struct GNUNET_RPS_CS_SubStartMessage,
4902    NULL),
4903  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4904    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4905    struct GNUNET_RPS_CS_SubStopMessage,
4906    NULL),
4907  GNUNET_MQ_handler_end());
4908
4909 /* end of gnunet-service-rps.c */