RPS API: Fix whitespaces
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_core_service.h"
29 #include "gnunet_peerinfo_service.h"
30 #include "gnunet_nse_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "rps.h"
33 #include "rps-test_util.h"
34 #include "gnunet-service-rps_sampler.h"
35 #include "gnunet-service-rps_custommap.h"
36 #include "gnunet-service-rps_view.h"
37
38 #include <math.h>
39 #include <inttypes.h>
40 #include <string.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO blacklist? (-> mal peer detection on top of brahms)
51
52 // hist_size_init, hist_size_max
53
54 /***********************************************************************
55  * Old gnunet-service-rps_peers.c
56 ***********************************************************************/
57
58 /**
59  * Set a peer flag of given peer context.
60  */
61 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
62
63 /**
64  * Get peer flag of given peer context.
65  */
66 #define check_peer_flag_set(peer_ctx, mask)\
67   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
68
69 /**
70  * Unset flag of given peer context.
71  */
72 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
73
74 /**
75  * Get channel flag of given channel context.
76  */
77 #define check_channel_flag_set(channel_flags, mask)\
78   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
79
80 /**
81  * Unset flag of given channel context.
82  */
83 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
84
85
86
87 /**
88  * Pending operation on peer consisting of callback and closure
89  *
90  * When an operation cannot be executed right now this struct is used to store
91  * the callback and closure for later execution.
92  */
93 struct PeerPendingOp
94 {
95   /**
96    * Callback
97    */
98   PeerOp op;
99
100   /**
101    * Closure
102    */
103   void *op_cls;
104 };
105
106 /**
107  * List containing all messages that are yet to be send
108  *
109  * This is used to keep track of all messages that have not been sent yet. When
110  * a peer is to be removed the pending messages can be removed properly.
111  */
112 struct PendingMessage
113 {
114   /**
115    * DLL next, prev
116    */
117   struct PendingMessage *next;
118   struct PendingMessage *prev;
119
120   /**
121    * The envelope to the corresponding message
122    */
123   struct GNUNET_MQ_Envelope *ev;
124
125   /**
126    * The corresponding context
127    */
128   struct PeerContext *peer_ctx;
129
130   /**
131    * The message type
132    */
133   const char *type;
134 };
135
136 /**
137  * @brief Context for a channel
138  */
139 struct ChannelCtx;
140
141 /**
142  * Struct used to keep track of other peer's status
143  *
144  * This is stored in a multipeermap.
145  * It contains information such as cadet channels, a message queue for sending,
146  * status about the channels, the pending operations on this peer and some flags
147  * about the status of the peer itself. (online, valid, ...)
148  */
149 struct PeerContext
150 {
151   /**
152    * The Sub this context belongs to.
153    */
154   struct Sub *sub;
155
156   /**
157    * Message queue open to client
158    */
159   struct GNUNET_MQ_Handle *mq;
160
161   /**
162    * Channel open to client.
163    */
164   struct ChannelCtx *send_channel_ctx;
165
166   /**
167    * Channel open from client.
168    */
169   struct ChannelCtx *recv_channel_ctx;
170
171   /**
172    * Array of pending operations on this peer.
173    */
174   struct PeerPendingOp *pending_ops;
175
176   /**
177    * Handle to the callback given to cadet_ntfy_tmt_rdy()
178    *
179    * To be canceled on shutdown.
180    */
181   struct PendingMessage *online_check_pending;
182
183   /**
184    * Number of pending operations.
185    */
186   unsigned int num_pending_ops;
187
188   /**
189    * Identity of the peer
190    */
191   struct GNUNET_PeerIdentity peer_id;
192
193   /**
194    * Flags indicating status of peer
195    */
196   uint32_t peer_flags;
197
198   /**
199    * Last time we received something from that peer.
200    */
201   struct GNUNET_TIME_Absolute last_message_recv;
202
203   /**
204    * Last time we received a keepalive message.
205    */
206   struct GNUNET_TIME_Absolute last_keepalive;
207
208   /**
209    * DLL with all messages that are yet to be sent
210    */
211   struct PendingMessage *pending_messages_head;
212   struct PendingMessage *pending_messages_tail;
213
214   /**
215    * This is pobably followed by 'statistical' data (when we first saw
216    * it, how did we get its ID, how many pushes (in a timeinterval),
217    * ...)
218    */
219   uint32_t round_pull_req;
220 };
221
222 /**
223  * @brief Closure to #valid_peer_iterator
224  */
225 struct PeersIteratorCls
226 {
227   /**
228    * Iterator function
229    */
230   PeersIterator iterator;
231
232   /**
233    * Closure to iterator
234    */
235   void *cls;
236 };
237
238 /**
239  * @brief Context for a channel
240  */
241 struct ChannelCtx
242 {
243   /**
244    * @brief The channel itself
245    */
246   struct GNUNET_CADET_Channel *channel;
247
248   /**
249    * @brief The peer context associated with the channel
250    */
251   struct PeerContext *peer_ctx;
252
253   /**
254    * @brief When channel destruction needs to be delayed (because it is called
255    * from within the cadet routine of another channel destruction) this task
256    * refers to the respective _SCHEDULER_Task.
257    */
258   struct GNUNET_SCHEDULER_Task *destruction_task;
259 };
260
261
262 #ifdef ENABLE_MALICIOUS
263
264 /**
265  * If type is 2 This struct is used to store the attacked peers in a DLL
266  */
267 struct AttackedPeer
268 {
269   /**
270    * DLL
271    */
272   struct AttackedPeer *next;
273   struct AttackedPeer *prev;
274
275   /**
276    * PeerID
277    */
278   struct GNUNET_PeerIdentity peer_id;
279 };
280
281 #endif /* ENABLE_MALICIOUS */
282
283 /**
284  * @brief One Sub.
285  *
286  * Essentially one instance of brahms that only connects to other instances
287  * with the same (secret) value.
288  */
289 struct Sub
290 {
291   /**
292    * @brief Hash of the shared value that defines Subs.
293    */
294   struct GNUNET_HashCode hash;
295
296   /**
297    * @brief Port to communicate to other peers.
298    */
299   struct GNUNET_CADET_Port *cadet_port;
300
301   /**
302    * @brief Hashmap of valid peers.
303    */
304   struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
305
306   /**
307    * @brief Filename of the file that stores the valid peers persistently.
308    */
309   char *filename_valid_peers;
310
311   /**
312    * Set of all peers to keep track of them.
313    */
314   struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
315
316   /**
317    * @brief This is the minimum estimate used as sampler size.
318    *
319    * It is configured by the user.
320    */
321   unsigned int sampler_size_est_min;
322
323   /**
324    * The size of sampler we need to be able to satisfy the Brahms protocol's
325    * need of random peers.
326    *
327    * This is one minimum size the sampler grows to.
328    */
329   unsigned int sampler_size_est_need;
330
331   /**
332    * Time inverval the do_round task runs in.
333    */
334   struct GNUNET_TIME_Relative round_interval;
335
336   /**
337    * Sampler used for the Brahms protocol itself.
338    */
339   struct RPS_Sampler *sampler;
340
341   /**
342    * Name to log view to
343    */
344   char *file_name_view_log;
345
346 #ifdef TO_FILE
347   /**
348    * Name to log number of observed peers to
349    */
350   char *file_name_observed_log;
351
352   /**
353    * @brief Count the observed peers
354    */
355   uint32_t num_observed_peers;
356
357   /**
358    * @brief File name to log number of pushes per round to
359    */
360   char *file_name_push_recv;
361
362   /**
363    * @brief File name to log number of pushes per round to
364    */
365   char *file_name_pull_delays;
366
367   /**
368    * @brief Multipeermap (ab-) used to count unique peer_ids
369    */
370   struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
371 #endif /* TO_FILE */
372
373   /**
374    * List to store peers received through pushes temporary.
375    */
376   struct CustomPeerMap *push_map;
377
378   /**
379    * List to store peers received through pulls temporary.
380    */
381   struct CustomPeerMap *pull_map;
382
383   /**
384    * @brief This is the estimate used as view size.
385    *
386    * It is initialised with the minimum
387    */
388   unsigned int view_size_est_need;
389
390   /**
391    * @brief This is the minimum estimate used as view size.
392    *
393    * It is configured by the user.
394    */
395   unsigned int view_size_est_min;
396
397   /**
398    * @brief The view.
399    */
400   struct View *view;
401
402   /**
403    * Identifier for the main task that runs periodically.
404    */
405   struct GNUNET_SCHEDULER_Task *do_round_task;
406
407   /* === stats === */
408
409   /**
410    * @brief Counts the executed rounds.
411    */
412   uint32_t num_rounds;
413
414   /**
415    * @brief This array accumulates the number of received pushes per round.
416    *
417    * Number at index i represents the number of rounds with i observed pushes.
418    */
419   uint32_t push_recv[256];
420
421   /**
422    * @brief Number of pull replies with this delay measured in rounds.
423    *
424    * Number at index i represents the number of pull replies with a delay of i
425    * rounds.
426    */
427   uint32_t pull_delays[256];
428 };
429
430
431 /***********************************************************************
432  * Globals
433 ***********************************************************************/
434
435 /**
436  * Our configuration.
437  */
438 static const struct GNUNET_CONFIGURATION_Handle *cfg;
439
440 /**
441  * Handle to the statistics service.
442  */
443 struct GNUNET_STATISTICS_Handle *stats;
444
445 /**
446  * Handler to CADET.
447  */
448 struct GNUNET_CADET_Handle *cadet_handle;
449
450 /**
451  * Handle to CORE
452  */
453 struct GNUNET_CORE_Handle *core_handle;
454
455 /**
456  * @brief PeerMap to keep track of connected peers.
457  */
458 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
459
460 /**
461  * Our own identity.
462  */
463 static struct GNUNET_PeerIdentity own_identity;
464
465 /**
466  * Percentage of total peer number in the view
467  * to send random PUSHes to
468  */
469 static float alpha;
470
471 /**
472  * Percentage of total peer number in the view
473  * to send random PULLs to
474  */
475 static float beta;
476
477 /**
478  * Handler to NSE.
479  */
480 static struct GNUNET_NSE_Handle *nse;
481
482 /**
483  * Handler to PEERINFO.
484  */
485 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
486
487 /**
488  * Handle for cancellation of iteration over peers.
489  */
490 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
491
492
493 #ifdef ENABLE_MALICIOUS
494 /**
495  * Type of malicious peer
496  *
497  * 0 Don't act malicious at all - Default
498  * 1 Try to maximise representation
499  * 2 Try to partition the network
500  * 3 Combined attack
501  */
502 static uint32_t mal_type;
503
504 /**
505  * Other malicious peers
506  */
507 static struct GNUNET_PeerIdentity *mal_peers;
508
509 /**
510  * Hashmap of malicious peers used as set.
511  * Used to more efficiently check whether we know that peer.
512  */
513 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
514
515 /**
516  * Number of other malicious peers
517  */
518 static uint32_t num_mal_peers;
519
520
521 /**
522  * If type is 2 this is the DLL of attacked peers
523  */
524 static struct AttackedPeer *att_peers_head;
525 static struct AttackedPeer *att_peers_tail;
526
527 /**
528  * This index is used to point to an attacked peer to
529  * implement the round-robin-ish way to select attacked peers.
530  */
531 static struct AttackedPeer *att_peer_index;
532
533 /**
534  * Hashmap of attacked peers used as set.
535  * Used to more efficiently check whether we know that peer.
536  */
537 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
538
539 /**
540  * Number of attacked peers
541  */
542 static uint32_t num_attacked_peers;
543
544 /**
545  * If type is 1 this is the attacked peer
546  */
547 static struct GNUNET_PeerIdentity attacked_peer;
548
549 /**
550  * The limit of PUSHes we can send in one round.
551  * This is an assumption of the Brahms protocol and either implemented
552  * via proof of work
553  * or
554  * assumend to be the bandwidth limitation.
555  */
556 static uint32_t push_limit = 10000;
557 #endif /* ENABLE_MALICIOUS */
558
559 /**
560  * @brief Main Sub.
561  *
562  * This is run in any case by all peers and connects to all peers without
563  * specifying a shared value.
564  */
565 static struct Sub *msub;
566
567 /**
568  * @brief Maximum number of valid peers to keep.
569  * TODO read from config
570  */
571 static const uint32_t num_valid_peers_max = UINT32_MAX;
572
573 /***********************************************************************
574  * /Globals
575 ***********************************************************************/
576
577
578 static void
579 do_round (void *cls);
580
581 static void
582 do_mal_round (void *cls);
583
584
585 /**
586  * @brief Get the #PeerContext associated with a peer
587  *
588  * @param peer_map The peer map containing the context
589  * @param peer the peer id
590  *
591  * @return the #PeerContext
592  */
593 static struct PeerContext *
594 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
595               const struct GNUNET_PeerIdentity *peer)
596 {
597   struct PeerContext *ctx;
598   int ret;
599
600   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
601   GNUNET_assert (GNUNET_YES == ret);
602   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
603   GNUNET_assert (NULL != ctx);
604   return ctx;
605 }
606
607 /**
608  * @brief Check whether we have information about the given peer.
609  *
610  * FIXME probably deprecated. Make this the new _online.
611  *
612  * @param peer_map The peer map to check for the existence of @a peer
613  * @param peer peer in question
614  *
615  * @return #GNUNET_YES if peer is known
616  *         #GNUNET_NO  if peer is not knwon
617  */
618 static int
619 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
620                   const struct GNUNET_PeerIdentity *peer)
621 {
622   if (NULL != peer_map)
623   {
624     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
625   }
626   else
627   {
628     return GNUNET_NO;
629   }
630 }
631
632
633 /**
634  * @brief Create a new #PeerContext and insert it into the peer map
635  *
636  * @param sub The Sub this context belongs to.
637  * @param peer the peer to create the #PeerContext for
638  *
639  * @return the #PeerContext
640  */
641 static struct PeerContext *
642 create_peer_ctx (struct Sub *sub,
643                  const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *ctx;
646   int ret;
647
648   GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
649
650   ctx = GNUNET_new (struct PeerContext);
651   ctx->peer_id = *peer;
652   ctx->sub = sub;
653   ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
654       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
655   GNUNET_assert (GNUNET_OK == ret);
656   if (sub == msub)
657   {
658     GNUNET_STATISTICS_set (stats,
659                           "# known peers",
660                           GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
661                           GNUNET_NO);
662   }
663   return ctx;
664 }
665
666
667 /**
668  * @brief Create or get a #PeerContext
669  *
670  * @param sub The Sub to which the created context belongs to
671  * @param peer the peer to get the associated context to
672  *
673  * @return the context
674  */
675 static struct PeerContext *
676 create_or_get_peer_ctx (struct Sub *sub,
677                         const struct GNUNET_PeerIdentity *peer)
678 {
679   if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
680   {
681     return create_peer_ctx (sub, peer);
682   }
683   return get_peer_ctx (sub->peer_map, peer);
684 }
685
686
687 /**
688  * @brief Check whether we have a connection to this @a peer
689  *
690  * Also sets the #Peers_ONLINE flag accordingly
691  *
692  * @param peer_ctx Context of the peer of which connectivity is to be checked
693  *
694  * @return #GNUNET_YES if we are connected
695  *         #GNUNET_NO  otherwise
696  */
697 static int
698 check_connected (struct PeerContext *peer_ctx)
699 {
700   /* If we don't know about this peer we don't know whether it's online */
701   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
702                                      &peer_ctx->peer_id))
703   {
704     return GNUNET_NO;
705   }
706   /* Get the context */
707   peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
708   /* If we have no channel to this peer we don't know whether it's online */
709   if ( (NULL == peer_ctx->send_channel_ctx) &&
710        (NULL == peer_ctx->recv_channel_ctx) )
711   {
712     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
713     return GNUNET_NO;
714   }
715   /* Otherwise (if we have a channel, we know that it's online */
716   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
717   return GNUNET_YES;
718 }
719
720
721 /**
722  * @brief The closure to #get_rand_peer_iterator.
723  */
724 struct GetRandPeerIteratorCls
725 {
726   /**
727    * @brief The index of the peer to return.
728    * Will be decreased until 0.
729    * Then current peer is returned.
730    */
731   uint32_t index;
732
733   /**
734    * @brief Pointer to peer to return.
735    */
736   const struct GNUNET_PeerIdentity *peer;
737 };
738
739
740 /**
741  * @brief Iterator function for #get_random_peer_from_peermap.
742  *
743  * Implements #GNUNET_CONTAINER_PeerMapIterator.
744  * Decreases the index until the index is null.
745  * Then returns the current peer.
746  *
747  * @param cls the #GetRandPeerIteratorCls containing index and peer
748  * @param peer current peer
749  * @param value unused
750  *
751  * @return  #GNUNET_YES if we should continue to
752  *          iterate,
753  *          #GNUNET_NO if not.
754  */
755 static int
756 get_rand_peer_iterator (void *cls,
757                         const struct GNUNET_PeerIdentity *peer,
758                         void *value)
759 {
760   struct GetRandPeerIteratorCls *iterator_cls = cls;
761   (void) value;
762
763   if (0 >= iterator_cls->index)
764   {
765     iterator_cls->peer = peer;
766     return GNUNET_NO;
767   }
768   iterator_cls->index--;
769   return GNUNET_YES;
770 }
771
772
773 /**
774  * @brief Get a random peer from @a peer_map
775  *
776  * @param valid_peers Peer map containing valid peers from which to select a
777  * random one
778  *
779  * @return a random peer
780  */
781 static const struct GNUNET_PeerIdentity *
782 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
783 {
784   struct GetRandPeerIteratorCls *iterator_cls;
785   const struct GNUNET_PeerIdentity *ret;
786
787   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
788   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
789       GNUNET_CONTAINER_multipeermap_size (valid_peers));
790   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
791                                                 get_rand_peer_iterator,
792                                                 iterator_cls);
793   ret = iterator_cls->peer;
794   GNUNET_free (iterator_cls);
795   return ret;
796 }
797
798
799 /**
800  * @brief Add a given @a peer to valid peers.
801  *
802  * If valid peers are already #num_valid_peers_max, delete a peer previously.
803  *
804  * @param peer The peer that is added to the valid peers.
805  * @param valid_peers Peer map of valid peers to which to add the @a peer
806  *
807  * @return #GNUNET_YES if no other peer had to be removed
808  *         #GNUNET_NO  otherwise
809  */
810 static int
811 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
812                 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
813 {
814   const struct GNUNET_PeerIdentity *rand_peer;
815   int ret;
816
817   ret = GNUNET_YES;
818   /* Remove random peers until there is space for a new one */
819   while (num_valid_peers_max <=
820          GNUNET_CONTAINER_multipeermap_size (valid_peers))
821   {
822     rand_peer = get_random_peer_from_peermap (valid_peers);
823     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
824     ret = GNUNET_NO;
825   }
826   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
827       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
828   if (valid_peers == msub->valid_peers)
829   {
830     GNUNET_STATISTICS_set (stats,
831                            "# valid peers",
832                            GNUNET_CONTAINER_multipeermap_size (valid_peers),
833                            GNUNET_NO);
834   }
835   return ret;
836 }
837
838 static void
839 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
840
841 /**
842  * @brief Set the peer flag to living and
843  *        call the pending operations on this peer.
844  *
845  * Also adds peer to #valid_peers.
846  *
847  * @param peer_ctx the #PeerContext of the peer to set online
848  */
849 static void
850 set_peer_online (struct PeerContext *peer_ctx)
851 {
852   struct GNUNET_PeerIdentity *peer;
853   unsigned int i;
854
855   peer = &peer_ctx->peer_id;
856   LOG (GNUNET_ERROR_TYPE_DEBUG,
857       "Peer %s is online and valid, calling %i pending operations on it\n",
858       GNUNET_i2s (peer),
859       peer_ctx->num_pending_ops);
860
861   if (NULL != peer_ctx->online_check_pending)
862   {
863     LOG (GNUNET_ERROR_TYPE_DEBUG,
864          "Removing pending online check for peer %s\n",
865          GNUNET_i2s (&peer_ctx->peer_id));
866     // TODO wait until cadet sets mq->cancel_impl
867     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
868     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
869     peer_ctx->online_check_pending = NULL;
870   }
871
872   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
873
874   /* Call pending operations */
875   for (i = 0; i < peer_ctx->num_pending_ops; i++)
876   {
877     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
878   }
879   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
880 }
881
882 static void
883 cleanup_destroyed_channel (void *cls,
884                            const struct GNUNET_CADET_Channel *channel);
885
886 /* Declaration of handlers */
887 static void
888 handle_peer_check (void *cls,
889                    const struct GNUNET_MessageHeader *msg);
890
891 static void
892 handle_peer_push (void *cls,
893                   const struct GNUNET_MessageHeader *msg);
894
895 static void
896 handle_peer_pull_request (void *cls,
897                           const struct GNUNET_MessageHeader *msg);
898
899 static int
900 check_peer_pull_reply (void *cls,
901                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
902
903 static void
904 handle_peer_pull_reply (void *cls,
905                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
906
907 /* End declaration of handlers */
908
909 /**
910  * @brief Allocate memory for a new channel context and insert it into DLL
911  *
912  * @param peer_ctx context of the according peer
913  *
914  * @return The channel context
915  */
916 static struct ChannelCtx *
917 add_channel_ctx (struct PeerContext *peer_ctx)
918 {
919   struct ChannelCtx *channel_ctx;
920   channel_ctx = GNUNET_new (struct ChannelCtx);
921   channel_ctx->peer_ctx = peer_ctx;
922   return channel_ctx;
923 }
924
925
926 /**
927  * @brief Free memory and NULL pointers.
928  *
929  * @param channel_ctx The channel context.
930  */
931 static void
932 remove_channel_ctx (struct ChannelCtx *channel_ctx)
933 {
934   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
935
936   if (NULL != channel_ctx->destruction_task)
937   {
938     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
939     channel_ctx->destruction_task = NULL;
940   }
941
942   GNUNET_free (channel_ctx);
943
944   if (NULL == peer_ctx) return;
945   if (channel_ctx == peer_ctx->send_channel_ctx)
946   {
947     peer_ctx->send_channel_ctx = NULL;
948     peer_ctx->mq = NULL;
949   }
950   else if (channel_ctx == peer_ctx->recv_channel_ctx)
951   {
952     peer_ctx->recv_channel_ctx = NULL;
953   }
954 }
955
956
957 /**
958  * @brief Get the channel of a peer. If not existing, create.
959  *
960  * @param peer_ctx Context of the peer of which to get the channel
961  * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
962  */
963 struct GNUNET_CADET_Channel *
964 get_channel (struct PeerContext *peer_ctx)
965 {
966   /* There exists a copy-paste-clone in run() */
967   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
968     GNUNET_MQ_hd_fixed_size (peer_check,
969                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
970                              struct GNUNET_MessageHeader,
971                              NULL),
972     GNUNET_MQ_hd_fixed_size (peer_push,
973                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
974                              struct GNUNET_MessageHeader,
975                              NULL),
976     GNUNET_MQ_hd_fixed_size (peer_pull_request,
977                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
978                              struct GNUNET_MessageHeader,
979                              NULL),
980     GNUNET_MQ_hd_var_size (peer_pull_reply,
981                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
982                            struct GNUNET_RPS_P2P_PullReplyMessage,
983                            NULL),
984     GNUNET_MQ_handler_end ()
985   };
986
987
988   if (NULL == peer_ctx->send_channel_ctx)
989   {
990     LOG (GNUNET_ERROR_TYPE_DEBUG,
991          "Trying to establish channel to peer %s\n",
992          GNUNET_i2s (&peer_ctx->peer_id));
993     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
994     peer_ctx->send_channel_ctx->channel =
995       GNUNET_CADET_channel_create (cadet_handle,
996                                    peer_ctx->send_channel_ctx, /* context */
997                                    &peer_ctx->peer_id,
998                                    &peer_ctx->sub->hash,
999                                    GNUNET_CADET_OPTION_RELIABLE,
1000                                    NULL, /* WindowSize handler */
1001                                    &cleanup_destroyed_channel, /* Disconnect handler */
1002                                    cadet_handlers);
1003   }
1004   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1005   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1006   return peer_ctx->send_channel_ctx->channel;
1007 }
1008
1009
1010 /**
1011  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1012  *
1013  * If we already have a message queue open to this client,
1014  * simply return it, otherways create one.
1015  *
1016  * @param peer_ctx Context of the peer of whicht to get the mq
1017  * @return the #GNUNET_MQ_Handle
1018  */
1019 static struct GNUNET_MQ_Handle *
1020 get_mq (struct PeerContext *peer_ctx)
1021 {
1022   if (NULL == peer_ctx->mq)
1023   {
1024     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1025   }
1026   return peer_ctx->mq;
1027 }
1028
1029 /**
1030  * @brief Add an envelope to a message passed to mq to list of pending messages
1031  *
1032  * @param peer_ctx Context of the peer for which to insert the envelope
1033  * @param ev envelope to the message
1034  * @param type type of the message to be sent
1035  * @return pointer to pending message
1036  */
1037 static struct PendingMessage *
1038 insert_pending_message (struct PeerContext *peer_ctx,
1039                         struct GNUNET_MQ_Envelope *ev,
1040                         const char *type)
1041 {
1042   struct PendingMessage *pending_msg;
1043
1044   pending_msg = GNUNET_new (struct PendingMessage);
1045   pending_msg->ev = ev;
1046   pending_msg->peer_ctx = peer_ctx;
1047   pending_msg->type = type;
1048   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1049                                peer_ctx->pending_messages_tail,
1050                                pending_msg);
1051   return pending_msg;
1052 }
1053
1054
1055 /**
1056  * @brief Remove a pending message from the respective DLL
1057  *
1058  * @param pending_msg the pending message to remove
1059  * @param cancel whether to cancel the pending message, too
1060  */
1061 static void
1062 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1063 {
1064   struct PeerContext *peer_ctx;
1065   (void) cancel;
1066
1067   peer_ctx = pending_msg->peer_ctx;
1068   GNUNET_assert (NULL != peer_ctx);
1069   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1070                                peer_ctx->pending_messages_tail,
1071                                pending_msg);
1072   // TODO wait for the cadet implementation of message cancellation
1073   //if (GNUNET_YES == cancel)
1074   //{
1075   //  GNUNET_MQ_send_cancel (pending_msg->ev);
1076   //}
1077   GNUNET_free (pending_msg);
1078 }
1079
1080
1081 /**
1082  * @brief This is called in response to the first message we sent as a
1083  * online check.
1084  *
1085  * @param cls #PeerContext of peer with pending online check
1086  */
1087 static void
1088 mq_online_check_successful (void *cls)
1089 {
1090   struct PeerContext *peer_ctx = cls;
1091
1092   if (NULL != peer_ctx->online_check_pending)
1093   {
1094     LOG (GNUNET_ERROR_TYPE_DEBUG,
1095         "Online check for peer %s was successfull\n",
1096         GNUNET_i2s (&peer_ctx->peer_id));
1097     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1098     peer_ctx->online_check_pending = NULL;
1099     set_peer_online (peer_ctx);
1100     (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1101   }
1102 }
1103
1104 /**
1105  * Issue a check whether peer is online
1106  *
1107  * @param peer_ctx the context of the peer
1108  */
1109 static void
1110 check_peer_online (struct PeerContext *peer_ctx)
1111 {
1112   LOG (GNUNET_ERROR_TYPE_DEBUG,
1113        "Get informed about peer %s getting online\n",
1114        GNUNET_i2s (&peer_ctx->peer_id));
1115
1116   struct GNUNET_MQ_Handle *mq;
1117   struct GNUNET_MQ_Envelope *ev;
1118
1119   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1120   peer_ctx->online_check_pending =
1121     insert_pending_message (peer_ctx, ev, "Check online");
1122   mq = get_mq (peer_ctx);
1123   GNUNET_MQ_notify_sent (ev,
1124                          mq_online_check_successful,
1125                          peer_ctx);
1126   GNUNET_MQ_send (mq, ev);
1127   if (peer_ctx->sub == msub)
1128   {
1129     GNUNET_STATISTICS_update (stats,
1130                               "# pending online checks",
1131                               1,
1132                               GNUNET_NO);
1133   }
1134 }
1135
1136
1137 /**
1138  * @brief Check whether function of type #PeerOp was already scheduled
1139  *
1140  * The array with pending operations will probably never grow really big, so
1141  * iterating over it should be ok.
1142  *
1143  * @param peer_ctx Context of the peer to check for the operation
1144  * @param peer_op the operation (#PeerOp) on the peer
1145  *
1146  * @return #GNUNET_YES if this operation is scheduled on that peer
1147  *         #GNUNET_NO  otherwise
1148  */
1149 static int
1150 check_operation_scheduled (const struct PeerContext *peer_ctx,
1151                            const PeerOp peer_op)
1152 {
1153   unsigned int i;
1154
1155   for (i = 0; i < peer_ctx->num_pending_ops; i++)
1156     if (peer_op == peer_ctx->pending_ops[i].op)
1157       return GNUNET_YES;
1158   return GNUNET_NO;
1159 }
1160
1161
1162 /**
1163  * @brief Callback for scheduler to destroy a channel
1164  *
1165  * @param cls Context of the channel
1166  */
1167 static void
1168 destroy_channel (struct ChannelCtx *channel_ctx)
1169 {
1170   struct GNUNET_CADET_Channel *channel;
1171
1172   if (NULL != channel_ctx->destruction_task)
1173   {
1174     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1175     channel_ctx->destruction_task = NULL;
1176   }
1177   GNUNET_assert (channel_ctx->channel != NULL);
1178   channel = channel_ctx->channel;
1179   channel_ctx->channel = NULL;
1180   GNUNET_CADET_channel_destroy (channel);
1181   remove_channel_ctx (channel_ctx);
1182 }
1183
1184
1185 /**
1186  * @brief Destroy a cadet channel.
1187  *
1188  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1189  *
1190  * @param cls
1191  */
1192 static void
1193 destroy_channel_cb (void *cls)
1194 {
1195   struct ChannelCtx *channel_ctx = cls;
1196
1197   channel_ctx->destruction_task = NULL;
1198   destroy_channel (channel_ctx);
1199 }
1200
1201
1202 /**
1203  * @brief Schedule the destruction of a channel for immediately afterwards.
1204  *
1205  * In case a channel is to be destroyed from within the callback to the
1206  * destruction of another channel (send channel), we cannot call
1207  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1208  * construction.
1209  *
1210  * @param channel_ctx channel to be destroyed.
1211  */
1212 static void
1213 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1214 {
1215   GNUNET_assert (NULL ==
1216                  channel_ctx->destruction_task);
1217   GNUNET_assert (NULL !=
1218                  channel_ctx->channel);
1219   channel_ctx->destruction_task =
1220     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1221                               channel_ctx);
1222 }
1223
1224
1225 /**
1226  * @brief Remove peer
1227  *
1228  * - Empties the list with pending operations
1229  * - Empties the list with pending messages
1230  * - Cancels potentially existing online check
1231  * - Schedules closing of send and recv channels
1232  * - Removes peer from peer map
1233  *
1234  * @param peer_ctx Context of the peer to be destroyed
1235  * @return #GNUNET_YES if peer was removed
1236  *         #GNUNET_NO  otherwise
1237  */
1238 static int
1239 destroy_peer (struct PeerContext *peer_ctx)
1240 {
1241   GNUNET_assert (NULL != peer_ctx);
1242   GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1243   if (GNUNET_NO ==
1244       GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1245                                               &peer_ctx->peer_id))
1246   {
1247     return GNUNET_NO;
1248   }
1249   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1250   LOG (GNUNET_ERROR_TYPE_DEBUG,
1251        "Going to remove peer %s\n",
1252        GNUNET_i2s (&peer_ctx->peer_id));
1253   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1254
1255   /* Clear list of pending operations */
1256   // TODO this probably leaks memory
1257   //      ('only' the cls to the function. Not sure what to do with it)
1258   GNUNET_array_grow (peer_ctx->pending_ops,
1259                      peer_ctx->num_pending_ops,
1260                      0);
1261   /* Remove all pending messages */
1262   while (NULL != peer_ctx->pending_messages_head)
1263   {
1264     LOG (GNUNET_ERROR_TYPE_DEBUG,
1265          "Removing unsent %s\n",
1266          peer_ctx->pending_messages_head->type);
1267     /* Cancle pending message, too */
1268     if ( (NULL != peer_ctx->online_check_pending) &&
1269          (0 == memcmp (peer_ctx->pending_messages_head,
1270                      peer_ctx->online_check_pending,
1271                      sizeof (struct PendingMessage))) )
1272       {
1273         peer_ctx->online_check_pending = NULL;
1274         if (peer_ctx->sub == msub)
1275         {
1276           GNUNET_STATISTICS_update (stats,
1277                                     "# pending online checks",
1278                                     -1,
1279                                     GNUNET_NO);
1280         }
1281       }
1282     remove_pending_message (peer_ctx->pending_messages_head,
1283                             GNUNET_YES);
1284   }
1285
1286   /* If we are still waiting for notification whether this peer is online
1287    * cancel the according task */
1288   if (NULL != peer_ctx->online_check_pending)
1289   {
1290     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1291                 "Removing pending online check for peer %s\n",
1292                 GNUNET_i2s (&peer_ctx->peer_id));
1293     // TODO wait until cadet sets mq->cancel_impl
1294     //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1295     remove_pending_message (peer_ctx->online_check_pending,
1296                             GNUNET_YES);
1297     peer_ctx->online_check_pending = NULL;
1298   }
1299
1300   if (NULL != peer_ctx->send_channel_ctx)
1301   {
1302     /* This is possibly called from within channel destruction */
1303     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1304     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1305     peer_ctx->send_channel_ctx = NULL;
1306     peer_ctx->mq = NULL;
1307   }
1308   if (NULL != peer_ctx->recv_channel_ctx)
1309   {
1310     /* This is possibly called from within channel destruction */
1311     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1312     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1313     peer_ctx->recv_channel_ctx = NULL;
1314   }
1315
1316   if (GNUNET_YES !=
1317       GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1318                                                 &peer_ctx->peer_id))
1319   {
1320     LOG (GNUNET_ERROR_TYPE_WARNING,
1321          "removing peer from peer_ctx->sub->peer_map failed\n");
1322   }
1323   if (peer_ctx->sub == msub)
1324   {
1325     GNUNET_STATISTICS_set (stats,
1326                           "# known peers",
1327                           GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1328                           GNUNET_NO);
1329   }
1330   GNUNET_free (peer_ctx);
1331   return GNUNET_YES;
1332 }
1333
1334
1335 /**
1336  * Iterator over hash map entries. Deletes all contexts of peers.
1337  *
1338  * @param cls closure
1339  * @param key current public key
1340  * @param value value in the hash map
1341  * @return #GNUNET_YES if we should continue to iterate,
1342  *         #GNUNET_NO if not.
1343  */
1344 static int
1345 peermap_clear_iterator (void *cls,
1346                         const struct GNUNET_PeerIdentity *key,
1347                         void *value)
1348 {
1349   struct Sub *sub = cls;
1350   (void) value;
1351
1352   destroy_peer (get_peer_ctx (sub->peer_map, key));
1353   return GNUNET_YES;
1354 }
1355
1356
1357 /**
1358  * @brief This is called once a message is sent.
1359  *
1360  * Removes the pending message
1361  *
1362  * @param cls type of the message that was sent
1363  */
1364 static void
1365 mq_notify_sent_cb (void *cls)
1366 {
1367   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1368   LOG (GNUNET_ERROR_TYPE_DEBUG,
1369       "%s was sent.\n",
1370       pending_msg->type);
1371   if (pending_msg->peer_ctx->sub == msub)
1372   {
1373     if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1374       GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1375     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1376       GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1377     if (0 == strncmp ("PUSH", pending_msg->type, 4))
1378       GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1379     if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1380         GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1381           &pending_msg->peer_ctx->peer_id))
1382       GNUNET_STATISTICS_update(stats,
1383                                "# pull requests sent (multi-hop peer)",
1384                                1,
1385                                GNUNET_NO);
1386   }
1387   /* Do not cancle message */
1388   remove_pending_message (pending_msg, GNUNET_NO);
1389 }
1390
1391
1392 /**
1393  * @brief Iterator function for #store_valid_peers.
1394  *
1395  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1396  * Writes single peer to disk.
1397  *
1398  * @param cls the file handle to write to.
1399  * @param peer current peer
1400  * @param value unused
1401  *
1402  * @return  #GNUNET_YES if we should continue to
1403  *          iterate,
1404  *          #GNUNET_NO if not.
1405  */
1406 static int
1407 store_peer_presistently_iterator (void *cls,
1408                                   const struct GNUNET_PeerIdentity *peer,
1409                                   void *value)
1410 {
1411   const struct GNUNET_DISK_FileHandle *fh = cls;
1412   char peer_string[128];
1413   int size;
1414   ssize_t ret;
1415   (void) value;
1416
1417   if (NULL == peer)
1418   {
1419     return GNUNET_YES;
1420   }
1421   size = GNUNET_snprintf (peer_string,
1422                           sizeof (peer_string),
1423                           "%s\n",
1424                           GNUNET_i2s_full (peer));
1425   GNUNET_assert (53 == size);
1426   ret = GNUNET_DISK_file_write (fh,
1427                                 peer_string,
1428                                 size);
1429   GNUNET_assert (size == ret);
1430   return GNUNET_YES;
1431 }
1432
1433
1434 /**
1435  * @brief Store the peers currently in #valid_peers to disk.
1436  *
1437  * @param sub Sub for which to store the valid peers
1438  */
1439 static void
1440 store_valid_peers (const struct Sub *sub)
1441 {
1442   struct GNUNET_DISK_FileHandle *fh;
1443   uint32_t number_written_peers;
1444   int ret;
1445
1446   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1447   {
1448     return;
1449   }
1450
1451   ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1452   if (GNUNET_SYSERR == ret)
1453   {
1454     LOG (GNUNET_ERROR_TYPE_WARNING,
1455         "Not able to create directory for file `%s'\n",
1456         sub->filename_valid_peers);
1457     GNUNET_break (0);
1458   }
1459   else if (GNUNET_NO == ret)
1460   {
1461     LOG (GNUNET_ERROR_TYPE_WARNING,
1462         "Directory for file `%s' exists but is not writable for us\n",
1463         sub->filename_valid_peers);
1464     GNUNET_break (0);
1465   }
1466   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1467                               GNUNET_DISK_OPEN_WRITE |
1468                                   GNUNET_DISK_OPEN_CREATE,
1469                               GNUNET_DISK_PERM_USER_READ |
1470                                   GNUNET_DISK_PERM_USER_WRITE);
1471   if (NULL == fh)
1472   {
1473     LOG (GNUNET_ERROR_TYPE_WARNING,
1474         "Not able to write valid peers to file `%s'\n",
1475         sub->filename_valid_peers);
1476     return;
1477   }
1478   LOG (GNUNET_ERROR_TYPE_DEBUG,
1479       "Writing %u valid peers to disk\n",
1480       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1481   number_written_peers =
1482     GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1483                                            store_peer_presistently_iterator,
1484                                            fh);
1485   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1486   GNUNET_assert (number_written_peers ==
1487       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1488 }
1489
1490
1491 /**
1492  * @brief Convert string representation of peer id to peer id.
1493  *
1494  * Counterpart to #GNUNET_i2s_full.
1495  *
1496  * @param string_repr The string representation of the peer id
1497  *
1498  * @return The peer id
1499  */
1500 static const struct GNUNET_PeerIdentity *
1501 s2i_full (const char *string_repr)
1502 {
1503   struct GNUNET_PeerIdentity *peer;
1504   size_t len;
1505   int ret;
1506
1507   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1508   len = strlen (string_repr);
1509   if (52 > len)
1510   {
1511     LOG (GNUNET_ERROR_TYPE_WARNING,
1512         "Not able to convert string representation of PeerID to PeerID\n"
1513         "Sting representation: %s (len %lu) - too short\n",
1514         string_repr,
1515         len);
1516     GNUNET_break (0);
1517   }
1518   else if (52 < len)
1519   {
1520     len = 52;
1521   }
1522   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1523                                                     len,
1524                                                     &peer->public_key);
1525   if (GNUNET_OK != ret)
1526   {
1527     LOG (GNUNET_ERROR_TYPE_WARNING,
1528         "Not able to convert string representation of PeerID to PeerID\n"
1529         "Sting representation: %s\n",
1530         string_repr);
1531     GNUNET_break (0);
1532   }
1533   return peer;
1534 }
1535
1536
1537 /**
1538  * @brief Restore the peers on disk to #valid_peers.
1539  *
1540  * @param sub Sub for which to restore the valid peers
1541  */
1542 static void
1543 restore_valid_peers (const struct Sub *sub)
1544 {
1545   off_t file_size;
1546   uint32_t num_peers;
1547   struct GNUNET_DISK_FileHandle *fh;
1548   char *buf;
1549   ssize_t size_read;
1550   char *iter_buf;
1551   char *str_repr;
1552   const struct GNUNET_PeerIdentity *peer;
1553
1554   if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1555   {
1556     return;
1557   }
1558
1559   if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1560   {
1561     return;
1562   }
1563   fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1564                               GNUNET_DISK_OPEN_READ,
1565                               GNUNET_DISK_PERM_NONE);
1566   GNUNET_assert (NULL != fh);
1567   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1568   num_peers = file_size / 53;
1569   buf = GNUNET_malloc (file_size);
1570   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1571   GNUNET_assert (size_read == file_size);
1572   LOG (GNUNET_ERROR_TYPE_DEBUG,
1573       "Restoring %" PRIu32 " peers from file `%s'\n",
1574       num_peers,
1575       sub->filename_valid_peers);
1576   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1577   {
1578     str_repr = GNUNET_strndup (iter_buf, 53);
1579     peer = s2i_full (str_repr);
1580     GNUNET_free (str_repr);
1581     add_valid_peer (peer, sub->valid_peers);
1582     LOG (GNUNET_ERROR_TYPE_DEBUG,
1583         "Restored valid peer %s from disk\n",
1584         GNUNET_i2s_full (peer));
1585   }
1586   iter_buf = NULL;
1587   GNUNET_free (buf);
1588   LOG (GNUNET_ERROR_TYPE_DEBUG,
1589       "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1590       num_peers,
1591       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1592   if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1593   {
1594     LOG (GNUNET_ERROR_TYPE_WARNING,
1595         "Number of restored peers does not match file size. Have probably duplicates.\n");
1596   }
1597   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1598   LOG (GNUNET_ERROR_TYPE_DEBUG,
1599       "Restored %u valid peers from disk\n",
1600       GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1601 }
1602
1603
1604 /**
1605  * @brief Delete storage of peers that was created with #initialise_peers ()
1606  *
1607  * @param sub Sub for which the storage is deleted
1608  */
1609 static void
1610 peers_terminate (struct Sub *sub)
1611 {
1612   if (GNUNET_SYSERR ==
1613       GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1614                                              &peermap_clear_iterator,
1615                                              sub))
1616   {
1617     LOG (GNUNET_ERROR_TYPE_WARNING,
1618         "Iteration destroying peers was aborted.\n");
1619   }
1620   GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1621   sub->peer_map = NULL;
1622   store_valid_peers (sub);
1623   GNUNET_free (sub->filename_valid_peers);
1624   sub->filename_valid_peers = NULL;
1625   GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1626   sub->valid_peers = NULL;
1627 }
1628
1629
1630 /**
1631  * Iterator over #valid_peers hash map entries.
1632  *
1633  * @param cls Closure that contains iterator function and closure
1634  * @param peer current peer id
1635  * @param value value in the hash map - unused
1636  * @return #GNUNET_YES if we should continue to
1637  *         iterate,
1638  *         #GNUNET_NO if not.
1639  */
1640 static int
1641 valid_peer_iterator (void *cls,
1642                      const struct GNUNET_PeerIdentity *peer,
1643                      void *value)
1644 {
1645   struct PeersIteratorCls *it_cls = cls;
1646   (void) value;
1647
1648   return it_cls->iterator (it_cls->cls, peer);
1649 }
1650
1651
1652 /**
1653  * @brief Get all currently known, valid peer ids.
1654  *
1655  * @param valid_peers Peer map containing the valid peers in question
1656  * @param iterator function to call on each peer id
1657  * @param it_cls extra argument to @a iterator
1658  * @return the number of key value pairs processed,
1659  *         #GNUNET_SYSERR if it aborted iteration
1660  */
1661 static int
1662 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1663                  PeersIterator iterator,
1664                  void *it_cls)
1665 {
1666   struct PeersIteratorCls *cls;
1667   int ret;
1668
1669   cls = GNUNET_new (struct PeersIteratorCls);
1670   cls->iterator = iterator;
1671   cls->cls = it_cls;
1672   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1673                                                valid_peer_iterator,
1674                                                cls);
1675   GNUNET_free (cls);
1676   return ret;
1677 }
1678
1679
1680 /**
1681  * @brief Add peer to known peers.
1682  *
1683  * This function is called on new peer_ids from 'external' sources
1684  * (client seed, cadet get_peers(), ...)
1685  *
1686  * @param sub Sub with the peer map that the @a peer will be added to
1687  * @param peer the new #GNUNET_PeerIdentity
1688  *
1689  * @return #GNUNET_YES if peer was inserted
1690  *         #GNUNET_NO  otherwise
1691  */
1692 static int
1693 insert_peer (struct Sub *sub,
1694              const struct GNUNET_PeerIdentity *peer)
1695 {
1696   if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1697   {
1698     return GNUNET_NO; /* We already know this peer - nothing to do */
1699   }
1700   (void) create_peer_ctx (sub, peer);
1701   return GNUNET_YES;
1702 }
1703
1704
1705 /**
1706  * @brief Check whether flags on a peer are set.
1707  *
1708  * @param peer_map Peer map that is expected to contain the @a peer
1709  * @param peer the peer to check the flag of
1710  * @param flags the flags to check
1711  *
1712  * @return #GNUNET_SYSERR if peer is not known
1713  *         #GNUNET_YES    if all given flags are set
1714  *         #GNUNET_NO     otherwise
1715  */
1716 static int
1717 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1718                  const struct GNUNET_PeerIdentity *peer,
1719                  enum Peers_PeerFlags flags)
1720 {
1721   struct PeerContext *peer_ctx;
1722
1723   if (GNUNET_NO == check_peer_known (peer_map, peer))
1724   {
1725     return GNUNET_SYSERR;
1726   }
1727   peer_ctx = get_peer_ctx (peer_map, peer);
1728   return check_peer_flag_set (peer_ctx, flags);
1729 }
1730
1731 /**
1732  * @brief Try connecting to a peer to see whether it is online
1733  *
1734  * If not known yet, insert into known peers
1735  *
1736  * @param sub Sub which would contain the @a peer
1737  * @param peer the peer whose online is to be checked
1738  * @return #GNUNET_YES if the check was issued
1739  *         #GNUNET_NO  otherwise
1740  */
1741 static int
1742 issue_peer_online_check (struct Sub *sub,
1743                          const struct GNUNET_PeerIdentity *peer)
1744 {
1745   struct PeerContext *peer_ctx;
1746
1747   (void) insert_peer (sub, peer); // TODO even needed?
1748   peer_ctx = get_peer_ctx (sub->peer_map, peer);
1749   if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1750        (NULL == peer_ctx->online_check_pending) )
1751   {
1752     check_peer_online (peer_ctx);
1753     return GNUNET_YES;
1754   }
1755   return GNUNET_NO;
1756 }
1757
1758
1759 /**
1760  * @brief Check if peer is removable.
1761  *
1762  * Check if
1763  *  - a recv channel exists
1764  *  - there are pending messages
1765  *  - there is no pending pull reply
1766  *
1767  * @param peer_ctx Context of the peer in question
1768  * @return #GNUNET_YES    if peer is removable
1769  *         #GNUNET_NO     if peer is NOT removable
1770  *         #GNUNET_SYSERR if peer is not known
1771  */
1772 static int
1773 check_removable (const struct PeerContext *peer_ctx)
1774 {
1775   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1776                                                            &peer_ctx->peer_id))
1777   {
1778     return GNUNET_SYSERR;
1779   }
1780
1781   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1782        (NULL != peer_ctx->pending_messages_head) ||
1783        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1784   {
1785     return GNUNET_NO;
1786   }
1787   return GNUNET_YES;
1788 }
1789
1790
1791 /**
1792  * @brief Check whether @a peer is actually a peer.
1793  *
1794  * A valid peer is a peer that we know exists eg. we were connected to once.
1795  *
1796  * @param valid_peers Peer map that would contain the @a peer
1797  * @param peer peer in question
1798  *
1799  * @return #GNUNET_YES if peer is valid
1800  *         #GNUNET_NO  if peer is not valid
1801  */
1802 static int
1803 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1804                   const struct GNUNET_PeerIdentity *peer)
1805 {
1806   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1807 }
1808
1809
1810 /**
1811  * @brief Indicate that we want to send to the other peer
1812  *
1813  * This establishes a sending channel
1814  *
1815  * @param peer_ctx Context of the target peer
1816  */
1817 static void
1818 indicate_sending_intention (struct PeerContext *peer_ctx)
1819 {
1820   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1821                                                  &peer_ctx->peer_id));
1822   (void) get_channel (peer_ctx);
1823 }
1824
1825
1826 /**
1827  * @brief Check whether other peer has the intention to send/opened channel
1828  *        towars us
1829  *
1830  * @param peer_ctx Context of the peer in question
1831  *
1832  * @return #GNUNET_YES if peer has the intention to send
1833  *         #GNUNET_NO  otherwise
1834  */
1835 static int
1836 check_peer_send_intention (const struct PeerContext *peer_ctx)
1837 {
1838   if (NULL != peer_ctx->recv_channel_ctx)
1839   {
1840     return GNUNET_YES;
1841   }
1842   return GNUNET_NO;
1843 }
1844
1845
1846 /**
1847  * Handle the channel a peer opens to us.
1848  *
1849  * @param cls The closure - Sub
1850  * @param channel The channel the peer wants to establish
1851  * @param initiator The peer's peer ID
1852  *
1853  * @return initial channel context for the channel
1854  *         (can be NULL -- that's not an error)
1855  */
1856 static void *
1857 handle_inbound_channel (void *cls,
1858                         struct GNUNET_CADET_Channel *channel,
1859                         const struct GNUNET_PeerIdentity *initiator)
1860 {
1861   struct PeerContext *peer_ctx;
1862   struct ChannelCtx *channel_ctx;
1863   struct Sub *sub = cls;
1864
1865   LOG (GNUNET_ERROR_TYPE_DEBUG,
1866       "New channel was established to us (Peer %s).\n",
1867       GNUNET_i2s (initiator));
1868   GNUNET_assert (NULL != channel); /* according to cadet API */
1869   /* Make sure we 'know' about this peer */
1870   peer_ctx = create_or_get_peer_ctx (sub, initiator);
1871   set_peer_online (peer_ctx);
1872   (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1873   channel_ctx = add_channel_ctx (peer_ctx);
1874   channel_ctx->channel = channel;
1875   /* We only accept one incoming channel per peer */
1876   if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1877                                                              initiator)))
1878   {
1879     LOG (GNUNET_ERROR_TYPE_WARNING,
1880         "Already got one receive channel. Destroying old one.\n");
1881     GNUNET_break_op (0);
1882     destroy_channel (peer_ctx->recv_channel_ctx);
1883     peer_ctx->recv_channel_ctx = channel_ctx;
1884     /* return the channel context */
1885     return channel_ctx;
1886   }
1887   peer_ctx->recv_channel_ctx = channel_ctx;
1888   return channel_ctx;
1889 }
1890
1891
1892 /**
1893  * @brief Check whether a sending channel towards the given peer exists
1894  *
1895  * @param peer_ctx Context of the peer in question
1896  *
1897  * @return #GNUNET_YES if a sending channel towards that peer exists
1898  *         #GNUNET_NO  otherwise
1899  */
1900 static int
1901 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1902 {
1903   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1904                                      &peer_ctx->peer_id))
1905   { /* If no such peer exists, there is no channel */
1906     return GNUNET_NO;
1907   }
1908   if (NULL == peer_ctx->send_channel_ctx)
1909   {
1910     return GNUNET_NO;
1911   }
1912   return GNUNET_YES;
1913 }
1914
1915
1916 /**
1917  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1918  *        intention to another peer
1919  *
1920  * @param peer_ctx Context to the peer
1921  * @return #GNUNET_YES if channel was destroyed
1922  *         #GNUNET_NO  otherwise
1923  */
1924 static int
1925 destroy_sending_channel (struct PeerContext *peer_ctx)
1926 {
1927   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1928                                      &peer_ctx->peer_id))
1929   {
1930     return GNUNET_NO;
1931   }
1932   if (NULL != peer_ctx->send_channel_ctx)
1933   {
1934     destroy_channel (peer_ctx->send_channel_ctx);
1935     (void) check_connected (peer_ctx);
1936     return GNUNET_YES;
1937   }
1938   return GNUNET_NO;
1939 }
1940
1941 /**
1942  * @brief Send a message to another peer.
1943  *
1944  * Keeps track about pending messages so they can be properly removed when the
1945  * peer is destroyed.
1946  *
1947  * @param peer_ctx Context of the peer to which the message is to be sent
1948  * @param ev envelope of the message
1949  * @param type type of the message
1950  */
1951 static void
1952 send_message (struct PeerContext *peer_ctx,
1953               struct GNUNET_MQ_Envelope *ev,
1954               const char *type)
1955 {
1956   struct PendingMessage *pending_msg;
1957   struct GNUNET_MQ_Handle *mq;
1958
1959   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1960               "Sending message to %s of type %s\n",
1961               GNUNET_i2s (&peer_ctx->peer_id),
1962               type);
1963   pending_msg = insert_pending_message (peer_ctx, ev, type);
1964   mq = get_mq (peer_ctx);
1965   GNUNET_MQ_notify_sent (ev,
1966                          mq_notify_sent_cb,
1967                          pending_msg);
1968   GNUNET_MQ_send (mq, ev);
1969 }
1970
1971 /**
1972  * @brief Schedule a operation on given peer
1973  *
1974  * Avoids scheduling an operation twice.
1975  *
1976  * @param peer_ctx Context of the peer for which to schedule the operation
1977  * @param peer_op the operation to schedule
1978  * @param cls Closure to @a peer_op
1979  *
1980  * @return #GNUNET_YES if the operation was scheduled
1981  *         #GNUNET_NO  otherwise
1982  */
1983 static int
1984 schedule_operation (struct PeerContext *peer_ctx,
1985                     const PeerOp peer_op,
1986                     void *cls)
1987 {
1988   struct PeerPendingOp pending_op;
1989
1990   GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1991                                                  &peer_ctx->peer_id));
1992
1993   //TODO if ONLINE execute immediately
1994
1995   if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
1996   {
1997     pending_op.op = peer_op;
1998     pending_op.op_cls = cls;
1999     GNUNET_array_append (peer_ctx->pending_ops,
2000                          peer_ctx->num_pending_ops,
2001                          pending_op);
2002     return GNUNET_YES;
2003   }
2004   return GNUNET_NO;
2005 }
2006
2007 /***********************************************************************
2008  * /Old gnunet-service-rps_peers.c
2009 ***********************************************************************/
2010
2011
2012 /***********************************************************************
2013  * Housekeeping with clients
2014 ***********************************************************************/
2015
2016 /**
2017  * Closure used to pass the client and the id to the callback
2018  * that replies to a client's request
2019  */
2020 struct ReplyCls
2021 {
2022   /**
2023    * DLL
2024    */
2025   struct ReplyCls *next;
2026   struct ReplyCls *prev;
2027
2028   /**
2029    * The identifier of the request
2030    */
2031   uint32_t id;
2032
2033   /**
2034    * The handle to the request
2035    */
2036   struct RPS_SamplerRequestHandle *req_handle;
2037
2038   /**
2039    * The client handle to send the reply to
2040    */
2041   struct ClientContext *cli_ctx;
2042 };
2043
2044
2045 /**
2046  * Struct used to store the context of a connected client.
2047  */
2048 struct ClientContext
2049 {
2050   /**
2051    * DLL
2052    */
2053   struct ClientContext *next;
2054   struct ClientContext *prev;
2055
2056   /**
2057    * The message queue to communicate with the client.
2058    */
2059   struct GNUNET_MQ_Handle *mq;
2060
2061   /**
2062    * @brief How many updates this client expects to receive.
2063    */
2064   int64_t view_updates_left;
2065
2066   /**
2067    * @brief Whether this client wants to receive stream updates.
2068    * Either #GNUNET_YES or #GNUNET_NO
2069    */
2070   int8_t stream_update;
2071
2072   /**
2073    * The client handle to send the reply to
2074    */
2075   struct GNUNET_SERVICE_Client *client;
2076
2077   /**
2078    * The #Sub this context belongs to
2079    */
2080   struct Sub *sub;
2081 };
2082
2083 /**
2084  * DLL with all clients currently connected to us
2085  */
2086 struct ClientContext *cli_ctx_head;
2087 struct ClientContext *cli_ctx_tail;
2088
2089 /***********************************************************************
2090  * /Housekeeping with clients
2091 ***********************************************************************/
2092
2093
2094
2095
2096
2097 /***********************************************************************
2098  * Util functions
2099 ***********************************************************************/
2100
2101
2102 /**
2103  * Print peerlist to log.
2104  */
2105 static void
2106 print_peer_list (struct GNUNET_PeerIdentity *list,
2107                  unsigned int len)
2108 {
2109   unsigned int i;
2110
2111   LOG (GNUNET_ERROR_TYPE_DEBUG,
2112        "Printing peer list of length %u at %p:\n",
2113        len,
2114        list);
2115   for (i = 0 ; i < len ; i++)
2116   {
2117     LOG (GNUNET_ERROR_TYPE_DEBUG,
2118          "%u. peer: %s\n",
2119          i, GNUNET_i2s (&list[i]));
2120   }
2121 }
2122
2123
2124 /**
2125  * Remove peer from list.
2126  */
2127 static void
2128 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2129                unsigned int *list_size,
2130                const struct GNUNET_PeerIdentity *peer)
2131 {
2132   unsigned int i;
2133   struct GNUNET_PeerIdentity *tmp;
2134
2135   tmp = *peer_list;
2136
2137   LOG (GNUNET_ERROR_TYPE_DEBUG,
2138        "Removing peer %s from list at %p\n",
2139        GNUNET_i2s (peer),
2140        tmp);
2141
2142   for ( i = 0 ; i < *list_size ; i++ )
2143   {
2144     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2145     {
2146       if (i < *list_size -1)
2147       { /* Not at the last entry -- shift peers left */
2148         memmove (&tmp[i], &tmp[i +1],
2149                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2150       }
2151       /* Remove last entry (should be now useless PeerID) */
2152       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2153     }
2154   }
2155   *peer_list = tmp;
2156 }
2157
2158
2159 /**
2160  * Insert PeerID in #view
2161  *
2162  * Called once we know a peer is online.
2163  * Implements #PeerOp
2164  *
2165  * @return GNUNET_OK if peer was actually inserted
2166  *         GNUNET_NO if peer was not inserted
2167  */
2168 static void
2169 insert_in_view_op (void *cls,
2170                    const struct GNUNET_PeerIdentity *peer);
2171
2172 /**
2173  * Insert PeerID in #view
2174  *
2175  * Called once we know a peer is online.
2176  *
2177  * @param sub Sub in with the view to insert in
2178  * @param peer the peer to insert
2179  *
2180  * @return GNUNET_OK if peer was actually inserted
2181  *         GNUNET_NO if peer was not inserted
2182  */
2183 static int
2184 insert_in_view (struct Sub *sub,
2185                 const struct GNUNET_PeerIdentity *peer)
2186 {
2187   struct PeerContext *peer_ctx;
2188   int online;
2189   int ret;
2190
2191   online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2192   peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2193   if ( (GNUNET_NO == online) ||
2194        (GNUNET_SYSERR == online) ) /* peer is not even known */
2195   {
2196     (void) issue_peer_online_check (sub, peer);
2197     (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2198     return GNUNET_NO;
2199   }
2200   /* Open channel towards peer to keep connection open */
2201   indicate_sending_intention (peer_ctx);
2202   ret = View_put (sub->view, peer);
2203   if (peer_ctx->sub == msub)
2204   {
2205     GNUNET_STATISTICS_set (stats,
2206                            "view size",
2207                            View_size (peer_ctx->sub->view),
2208                            GNUNET_NO);
2209   }
2210   return ret;
2211 }
2212
2213
2214 /**
2215  * @brief Send view to client
2216  *
2217  * @param cli_ctx the context of the client
2218  * @param view_array the peerids of the view as array (can be empty)
2219  * @param view_size the size of the view array (can be 0)
2220  */
2221 static void
2222 send_view (const struct ClientContext *cli_ctx,
2223            const struct GNUNET_PeerIdentity *view_array,
2224            uint64_t view_size)
2225 {
2226   struct GNUNET_MQ_Envelope *ev;
2227   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2228   struct Sub *sub;
2229
2230   if (NULL == view_array)
2231   {
2232     if (NULL == cli_ctx->sub) sub = msub;
2233     else sub = cli_ctx->sub;
2234     view_size = View_size (sub->view);
2235     view_array = View_get_as_array (sub->view);
2236   }
2237
2238   ev = GNUNET_MQ_msg_extra (out_msg,
2239                             view_size * sizeof (struct GNUNET_PeerIdentity),
2240                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2241   out_msg->num_peers = htonl (view_size);
2242
2243   GNUNET_memcpy (&out_msg[1],
2244                  view_array,
2245                  view_size * sizeof (struct GNUNET_PeerIdentity));
2246   GNUNET_MQ_send (cli_ctx->mq, ev);
2247 }
2248
2249
2250 /**
2251  * @brief Send peer from biased stream to client.
2252  *
2253  * TODO merge with send_view, parameterise
2254  *
2255  * @param cli_ctx the context of the client
2256  * @param view_array the peerids of the view as array (can be empty)
2257  * @param view_size the size of the view array (can be 0)
2258  */
2259 static void
2260 send_stream_peers (const struct ClientContext *cli_ctx,
2261                    uint64_t num_peers,
2262                    const struct GNUNET_PeerIdentity *peers)
2263 {
2264   struct GNUNET_MQ_Envelope *ev;
2265   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2266
2267   GNUNET_assert (NULL != peers);
2268
2269   ev = GNUNET_MQ_msg_extra (out_msg,
2270                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2271                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2272   out_msg->num_peers = htonl (num_peers);
2273
2274   GNUNET_memcpy (&out_msg[1],
2275                  peers,
2276                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2277   GNUNET_MQ_send (cli_ctx->mq, ev);
2278 }
2279
2280
2281 /**
2282  * @brief sends updates to clients that are interested
2283  *
2284  * @param sub Sub for which to notify clients
2285  */
2286 static void
2287 clients_notify_view_update (const struct Sub *sub)
2288 {
2289   struct ClientContext *cli_ctx_iter;
2290   uint64_t num_peers;
2291   const struct GNUNET_PeerIdentity *view_array;
2292
2293   num_peers = View_size (sub->view);
2294   view_array = View_get_as_array(sub->view);
2295   /* check size of view is small enough */
2296   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2297   {
2298     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2299                 "View is too big to send\n");
2300     return;
2301   }
2302
2303   for (cli_ctx_iter = cli_ctx_head;
2304        NULL != cli_ctx_iter;
2305        cli_ctx_iter = cli_ctx_iter->next)
2306   {
2307     if (1 < cli_ctx_iter->view_updates_left)
2308     {
2309       /* Client wants to receive limited amount of updates */
2310       cli_ctx_iter->view_updates_left -= 1;
2311     } else if (1 == cli_ctx_iter->view_updates_left)
2312     {
2313       /* Last update of view for client */
2314       cli_ctx_iter->view_updates_left = -1;
2315     } else if (0 > cli_ctx_iter->view_updates_left) {
2316       /* Client is not interested in updates */
2317       continue;
2318     }
2319     /* else _updates_left == 0 - infinite amount of updates */
2320
2321     /* send view */
2322     send_view (cli_ctx_iter, view_array, num_peers);
2323   }
2324 }
2325
2326
2327 /**
2328  * @brief sends updates to clients that are interested
2329  *
2330  * @param num_peers Number of peers to send
2331  * @param peers the array of peers to send
2332  */
2333 static void
2334 clients_notify_stream_peer (const struct Sub *sub,
2335                             uint64_t num_peers,
2336                             const struct GNUNET_PeerIdentity *peers)
2337                             // TODO enum StreamPeerSource)
2338 {
2339   struct ClientContext *cli_ctx_iter;
2340
2341   LOG (GNUNET_ERROR_TYPE_DEBUG,
2342       "Got peer (%s) from biased stream - update all clients\n",
2343       GNUNET_i2s (peers));
2344
2345   for (cli_ctx_iter = cli_ctx_head;
2346        NULL != cli_ctx_iter;
2347        cli_ctx_iter = cli_ctx_iter->next)
2348   {
2349     if (GNUNET_YES == cli_ctx_iter->stream_update &&
2350         (sub == cli_ctx_iter->sub || sub == msub))
2351     {
2352       send_stream_peers (cli_ctx_iter, num_peers, peers);
2353     }
2354   }
2355 }
2356
2357
2358 /**
2359  * Put random peer from sampler into the view as history update.
2360  *
2361  * @param ids Array of Peers to insert into view
2362  * @param num_peers Number of peers to insert
2363  * @param cls Closure - The Sub for which this is to be done
2364  */
2365 static void
2366 hist_update (const struct GNUNET_PeerIdentity *ids,
2367              uint32_t num_peers,
2368              void *cls)
2369 {
2370   unsigned int i;
2371   struct Sub *sub = cls;
2372
2373   for (i = 0; i < num_peers; i++)
2374   {
2375     int inserted;
2376     inserted = insert_in_view (sub, &ids[i]);
2377     if (GNUNET_OK == inserted)
2378     {
2379       clients_notify_stream_peer (sub, 1, &ids[i]);
2380     }
2381     to_file (sub->file_name_view_log,
2382              "+%s\t(hist)",
2383              GNUNET_i2s_full (ids));
2384   }
2385   clients_notify_view_update (sub);
2386 }
2387
2388
2389 /**
2390  * Wrapper around #RPS_sampler_resize()
2391  *
2392  * If we do not have enough sampler elements, double current sampler size
2393  * If we have more than enough sampler elements, halv current sampler size
2394  *
2395  * @param sampler The sampler to resize
2396  * @param new_size New size to which to resize
2397  */
2398 static void
2399 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2400 {
2401   unsigned int sampler_size;
2402
2403   // TODO statistics
2404   // TODO respect the min, max
2405   sampler_size = RPS_sampler_get_size (sampler);
2406   if (sampler_size > new_size * 4)
2407   { /* Shrinking */
2408     RPS_sampler_resize (sampler, sampler_size / 2);
2409   }
2410   else if (sampler_size < new_size)
2411   { /* Growing */
2412     RPS_sampler_resize (sampler, sampler_size * 2);
2413   }
2414   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2415 }
2416
2417
2418 /**
2419  * Add all peers in @a peer_array to @a peer_map used as set.
2420  *
2421  * @param peer_array array containing the peers
2422  * @param num_peers number of peers in @peer_array
2423  * @param peer_map the peermap to use as set
2424  */
2425 static void
2426 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2427                        unsigned int num_peers,
2428                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2429 {
2430   unsigned int i;
2431   if (NULL == peer_map)
2432   {
2433     LOG (GNUNET_ERROR_TYPE_WARNING,
2434          "Trying to add peers to non-existing peermap.\n");
2435     return;
2436   }
2437
2438   for (i = 0; i < num_peers; i++)
2439   {
2440     GNUNET_CONTAINER_multipeermap_put (peer_map,
2441                                        &peer_array[i],
2442                                        NULL,
2443                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2444     if (msub->peer_map == peer_map)
2445     {
2446       GNUNET_STATISTICS_set (stats,
2447                             "# known peers",
2448                             GNUNET_CONTAINER_multipeermap_size (peer_map),
2449                             GNUNET_NO);
2450     }
2451   }
2452 }
2453
2454
2455 /**
2456  * Send a PULL REPLY to @a peer_id
2457  *
2458  * @param peer_ctx Context of the peer to send the reply to
2459  * @param peer_ids the peers to send to @a peer_id
2460  * @param num_peer_ids the number of peers to send to @a peer_id
2461  */
2462 static void
2463 send_pull_reply (struct PeerContext *peer_ctx,
2464                  const struct GNUNET_PeerIdentity *peer_ids,
2465                  unsigned int num_peer_ids)
2466 {
2467   uint32_t send_size;
2468   struct GNUNET_MQ_Envelope *ev;
2469   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2470
2471   /* Compute actual size */
2472   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2473               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2474
2475   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2476     /* Compute number of peers to send
2477      * If too long, simply truncate */
2478     // TODO select random ones via permutation
2479     //      or even better: do good protocol design
2480     send_size =
2481       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2482        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2483        sizeof (struct GNUNET_PeerIdentity);
2484   else
2485     send_size = num_peer_ids;
2486
2487   LOG (GNUNET_ERROR_TYPE_DEBUG,
2488       "Going to send PULL REPLY with %u peers to %s\n",
2489       send_size, GNUNET_i2s (&peer_ctx->peer_id));
2490
2491   ev = GNUNET_MQ_msg_extra (out_msg,
2492                             send_size * sizeof (struct GNUNET_PeerIdentity),
2493                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2494   out_msg->num_peers = htonl (send_size);
2495   GNUNET_memcpy (&out_msg[1], peer_ids,
2496          send_size * sizeof (struct GNUNET_PeerIdentity));
2497
2498   send_message (peer_ctx, ev, "PULL REPLY");
2499   if (peer_ctx->sub == msub)
2500   {
2501     GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2502   }
2503   // TODO check with send intention: as send_channel is used/opened we indicate
2504   // a sending intention without intending it.
2505   // -> clean peer afterwards?
2506   // -> use recv_channel?
2507 }
2508
2509
2510 /**
2511  * Insert PeerID in #pull_map
2512  *
2513  * Called once we know a peer is online.
2514  *
2515  * @param cls Closure - Sub with the pull map to insert into
2516  * @param peer Peer to insert
2517  */
2518 static void
2519 insert_in_pull_map (void *cls,
2520                     const struct GNUNET_PeerIdentity *peer)
2521 {
2522   struct Sub *sub = cls;
2523
2524   CustomPeerMap_put (sub->pull_map, peer);
2525 }
2526
2527
2528 /**
2529  * Insert PeerID in #view
2530  *
2531  * Called once we know a peer is online.
2532  * Implements #PeerOp
2533  *
2534  * @param cls Closure - Sub with view to insert peer into
2535  * @param peer the peer to insert
2536  */
2537 static void
2538 insert_in_view_op (void *cls,
2539                    const struct GNUNET_PeerIdentity *peer)
2540 {
2541   struct Sub *sub = cls;
2542   int inserted;
2543
2544   inserted = insert_in_view (sub, peer);
2545   if (GNUNET_OK == inserted)
2546   {
2547     clients_notify_stream_peer (sub, 1, peer);
2548   }
2549 }
2550
2551
2552 /**
2553  * Update sampler with given PeerID.
2554  * Implements #PeerOp
2555  *
2556  * @param cls Closure - Sub containing the sampler to insert into
2557  * @param peer Peer to insert
2558  */
2559 static void
2560 insert_in_sampler (void *cls,
2561                    const struct GNUNET_PeerIdentity *peer)
2562 {
2563   struct Sub *sub = cls;
2564
2565   LOG (GNUNET_ERROR_TYPE_DEBUG,
2566        "Updating samplers with peer %s from insert_in_sampler()\n",
2567        GNUNET_i2s (peer));
2568   RPS_sampler_update (sub->sampler, peer);
2569   if (0 < RPS_sampler_count_id (sub->sampler, peer))
2570   {
2571     /* Make sure we 'know' about this peer */
2572     (void) issue_peer_online_check (sub, peer);
2573     /* Establish a channel towards that peer to indicate we are going to send
2574      * messages to it */
2575     //indicate_sending_intention (peer);
2576   }
2577   #ifdef TO_FILE
2578   sub->num_observed_peers++;
2579   GNUNET_CONTAINER_multipeermap_put
2580     (sub->observed_unique_peers,
2581      peer,
2582      NULL,
2583      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2584   uint32_t num_observed_unique_peers =
2585     GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2586   to_file (sub->file_name_observed_log,
2587           "%" PRIu32 " %" PRIu32 " %f\n",
2588           sub->num_observed_peers,
2589           num_observed_unique_peers,
2590           1.0*num_observed_unique_peers/sub->num_observed_peers)
2591   #endif /* TO_FILE */
2592 }
2593
2594
2595 /**
2596  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2597  *        If the peer is not known, online check is issued and it is
2598  *        scheduled to be inserted in sampler and view.
2599  *
2600  * "External sources" refer to every source except the gossip.
2601  *
2602  * @param sub Sub for which @a peer was received
2603  * @param peer peer to insert/peer received
2604  */
2605 static void
2606 got_peer (struct Sub *sub,
2607           const struct GNUNET_PeerIdentity *peer)
2608 {
2609   /* If we did not know this peer already, insert it into sampler and view */
2610   if (GNUNET_YES == issue_peer_online_check (sub, peer))
2611   {
2612     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2613                         &insert_in_sampler, sub);
2614     schedule_operation (get_peer_ctx (sub->peer_map, peer),
2615                         &insert_in_view_op, sub);
2616   }
2617   if (sub == msub)
2618   {
2619     GNUNET_STATISTICS_update (stats,
2620                               "# learnd peers",
2621                               1,
2622                               GNUNET_NO);
2623   }
2624 }
2625
2626
2627 /**
2628  * @brief Checks if there is a sending channel and if it is needed
2629  *
2630  * @param peer_ctx Context of the peer to check
2631  * @return GNUNET_YES if sending channel exists and is still needed
2632  *         GNUNET_NO  otherwise
2633  */
2634 static int
2635 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2636 {
2637   /* struct GNUNET_CADET_Channel *channel; */
2638   if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2639                                      &peer_ctx->peer_id))
2640   {
2641     return GNUNET_NO;
2642   }
2643   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2644   {
2645     if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2646                                     &peer_ctx->peer_id)) ||
2647          (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2648                                             &peer_ctx->peer_id)) ||
2649          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2650                                                      &peer_ctx->peer_id)) ||
2651          (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2652                                                      &peer_ctx->peer_id)) ||
2653          (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2654                                          &peer_ctx->peer_id,
2655                                          Peers_PULL_REPLY_PENDING)))
2656     { /* If we want to keep the connection to peer open */
2657       return GNUNET_YES;
2658     }
2659     return GNUNET_NO;
2660   }
2661   return GNUNET_NO;
2662 }
2663
2664
2665 /**
2666  * @brief remove peer from our knowledge, the view, push and pull maps and
2667  * samplers.
2668  *
2669  * @param sub Sub with the data structures the peer is to be removed from
2670  * @param peer the peer to remove
2671  */
2672 static void
2673 remove_peer (struct Sub *sub,
2674              const struct GNUNET_PeerIdentity *peer)
2675 {
2676   (void) View_remove_peer (sub->view, peer);
2677   CustomPeerMap_remove_peer (sub->pull_map, peer);
2678   CustomPeerMap_remove_peer (sub->push_map, peer);
2679   RPS_sampler_reinitialise_by_value (sub->sampler, peer);
2680   destroy_peer (get_peer_ctx (sub->peer_map, peer));
2681 }
2682
2683
2684 /**
2685  * @brief Remove data that is not needed anymore.
2686  *
2687  * If the sending channel is no longer needed it is destroyed.
2688  *
2689  * @param sub Sub in which the current peer is to be cleaned
2690  * @param peer the peer whose data is about to be cleaned
2691  */
2692 static void
2693 clean_peer (struct Sub *sub,
2694             const struct GNUNET_PeerIdentity *peer)
2695 {
2696   if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2697                                                                peer)))
2698   {
2699     LOG (GNUNET_ERROR_TYPE_DEBUG,
2700         "Going to remove send channel to peer %s\n",
2701         GNUNET_i2s (peer));
2702     #ifdef ENABLE_MALICIOUS
2703     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2704       (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2705     #else /* ENABLE_MALICIOUS */
2706     (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
2707     #endif /* ENABLE_MALICIOUS */
2708   }
2709
2710   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map, peer))
2711   {
2712     /* Peer was already removed by callback on destroyed channel */
2713     LOG (GNUNET_ERROR_TYPE_WARNING,
2714         "Peer was removed from our knowledge during cleanup\n");
2715     return;
2716   }
2717
2718   if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2719                                                               peer))) &&
2720        (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2721        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2722        (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2723        (0 == RPS_sampler_count_id (sub->sampler,   peer)) &&
2724        (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
2725   { /* We can safely remove this peer */
2726     LOG (GNUNET_ERROR_TYPE_DEBUG,
2727         "Going to remove peer %s\n",
2728         GNUNET_i2s (peer));
2729     remove_peer (sub, peer);
2730     return;
2731   }
2732 }
2733
2734
2735 /**
2736  * @brief This is called when a channel is destroyed.
2737  *
2738  * Removes peer completely from our knowledge if the send_channel was destroyed
2739  * Otherwise simply delete the recv_channel
2740  * Also check if the knowledge about this peer is still needed.
2741  * If not, remove this peer from our knowledge.
2742  *
2743  * @param cls The closure - Context to the channel
2744  * @param channel The channel being closed
2745  */
2746 static void
2747 cleanup_destroyed_channel (void *cls,
2748                            const struct GNUNET_CADET_Channel *channel)
2749 {
2750   struct ChannelCtx *channel_ctx = cls;
2751   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2752   (void) channel;
2753
2754   channel_ctx->channel = NULL;
2755   remove_channel_ctx (channel_ctx);
2756   if (NULL != peer_ctx &&
2757       peer_ctx->send_channel_ctx == channel_ctx &&
2758       GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2759   {
2760     remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2761   }
2762 }
2763
2764 /***********************************************************************
2765  * /Util functions
2766 ***********************************************************************/
2767
2768
2769
2770 /***********************************************************************
2771  * Sub
2772 ***********************************************************************/
2773
2774 /**
2775  * @brief Create a new Sub
2776  *
2777  * @param hash Hash of value shared among rps instances on other hosts that
2778  *        defines a subgroup to sample from.
2779  * @param sampler_size Size of the sampler
2780  * @param round_interval Interval (in average) between two rounds
2781  *
2782  * @return Sub
2783  */
2784 struct Sub *
2785 new_sub (const struct GNUNET_HashCode *hash,
2786          uint32_t sampler_size,
2787          struct GNUNET_TIME_Relative round_interval)
2788 {
2789   struct Sub *sub;
2790
2791   sub = GNUNET_new (struct Sub);
2792
2793   /* With the hash generated from the secret value this service only connects
2794    * to rps instances that share the value */
2795   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2796     GNUNET_MQ_hd_fixed_size (peer_check,
2797                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2798                              struct GNUNET_MessageHeader,
2799                              NULL),
2800     GNUNET_MQ_hd_fixed_size (peer_push,
2801                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2802                              struct GNUNET_MessageHeader,
2803                              NULL),
2804     GNUNET_MQ_hd_fixed_size (peer_pull_request,
2805                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2806                              struct GNUNET_MessageHeader,
2807                              NULL),
2808     GNUNET_MQ_hd_var_size (peer_pull_reply,
2809                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2810                            struct GNUNET_RPS_P2P_PullReplyMessage,
2811                            NULL),
2812     GNUNET_MQ_handler_end ()
2813   };
2814   sub->hash = *hash;
2815   sub->cadet_port =
2816     GNUNET_CADET_open_port (cadet_handle,
2817                             &sub->hash,
2818                             &handle_inbound_channel, /* Connect handler */
2819                             sub, /* cls */
2820                             NULL, /* WindowSize handler */
2821                             &cleanup_destroyed_channel, /* Disconnect handler */
2822                             cadet_handlers);
2823   if (NULL == sub->cadet_port)
2824   {
2825     LOG (GNUNET_ERROR_TYPE_ERROR,
2826         "Cadet port `%s' is already in use.\n",
2827         GNUNET_APPLICATION_PORT_RPS);
2828     GNUNET_assert (0);
2829   }
2830
2831   /* Set up general data structure to keep track about peers */
2832   sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2833   if (GNUNET_OK !=
2834       GNUNET_CONFIGURATION_get_value_filename (cfg,
2835                                                "rps",
2836                                                "FILENAME_VALID_PEERS",
2837                                                &sub->filename_valid_peers))
2838   {
2839     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2840                                "rps",
2841                                "FILENAME_VALID_PEERS");
2842   }
2843   if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2844   {
2845     char *tmp_filename_valid_peers;
2846     char str_hash[105];
2847     uint32_t len_filename_valid_peers;
2848
2849     (void) GNUNET_snprintf (str_hash, 105, GNUNET_h2s_full (hash));
2850     tmp_filename_valid_peers = GNUNET_strdup (sub->filename_valid_peers);
2851     GNUNET_free (sub->filename_valid_peers);
2852     len_filename_valid_peers = strlen (tmp_filename_valid_peers) + 105; /* Len of full hash + 1 */
2853     sub->filename_valid_peers = GNUNET_malloc (len_filename_valid_peers);
2854     strncat (sub->filename_valid_peers,
2855              tmp_filename_valid_peers,
2856              len_filename_valid_peers);
2857     strncat (sub->filename_valid_peers,
2858              str_hash,
2859              len_filename_valid_peers);
2860     GNUNET_free (tmp_filename_valid_peers);
2861   }
2862   sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2863
2864   /* Set up the sampler */
2865   sub->sampler_size_est_min = sampler_size;
2866   sub->sampler_size_est_need = sampler_size;;
2867   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2868   GNUNET_assert (0 != round_interval.rel_value_us);
2869   sub->round_interval = round_interval;
2870   sub->sampler = RPS_sampler_init (sampler_size,
2871                                   round_interval);
2872
2873   /* Logging of internals */
2874   sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2875   #ifdef TO_FILE
2876   sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2877                                                        "observed");
2878   sub->file_name_push_recv = store_prefix_file_name (&own_identity,
2879                                                      "push_recv");
2880   sub->file_name_pull_delays = store_prefix_file_name (&own_identity,
2881                                                        "pull_delays");
2882   sub->num_observed_peers = 0;
2883   sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2884                                                                     GNUNET_NO);
2885   #endif /* TO_FILE */
2886
2887   /* Set up data structures for gossip */
2888   sub->push_map = CustomPeerMap_create (4);
2889   sub->pull_map = CustomPeerMap_create (4);
2890   sub->view_size_est_min = sampler_size;;
2891   sub->view = View_create (sub->view_size_est_min);
2892   if (sub == msub)
2893   {
2894     GNUNET_STATISTICS_set (stats,
2895                            "view size aim",
2896                            sub->view_size_est_min,
2897                            GNUNET_NO);
2898   }
2899
2900   /* Start executing rounds */
2901   sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2902
2903   return sub;
2904 }
2905
2906
2907 /**
2908  * @brief Destroy Sub.
2909  *
2910  * @param sub Sub to destroy
2911  */
2912 static void
2913 destroy_sub (struct Sub *sub)
2914 {
2915 #ifdef TO_FILE
2916   char push_recv_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2917   char pull_delays_str[1536] = ""; /* 256 * 6 (1 whitespace, 1 comma, up to 4 chars) */
2918 #endif /* TO_FILE */
2919   GNUNET_assert (NULL != sub);
2920   GNUNET_assert (NULL != sub->do_round_task);
2921   GNUNET_SCHEDULER_cancel (sub->do_round_task);
2922   sub->do_round_task = NULL;
2923
2924   /* Disconnect from cadet */
2925   GNUNET_CADET_close_port (sub->cadet_port);
2926
2927   /* Clean up data structures for peers */
2928   RPS_sampler_destroy (sub->sampler);
2929   sub->sampler = NULL;
2930   View_destroy (sub->view);
2931   sub->view = NULL;
2932   CustomPeerMap_destroy (sub->push_map);
2933   sub->push_map = NULL;
2934   CustomPeerMap_destroy (sub->pull_map);
2935   sub->pull_map = NULL;
2936   peers_terminate (sub);
2937
2938   /* Free leftover data structures */
2939   GNUNET_free (sub->file_name_view_log);
2940   sub->file_name_view_log = NULL;
2941 #ifdef TO_FILE
2942   GNUNET_free (sub->file_name_observed_log);
2943   sub->file_name_observed_log = NULL;
2944
2945   /* Write push frequencies to disk */
2946   for (uint32_t i = 0; i < 256; i++)
2947   {
2948     char push_recv_str_tmp[8];
2949     (void) snprintf (push_recv_str_tmp, 8, "%" PRIu32 "\n", sub->push_recv[i]);
2950     LOG (GNUNET_ERROR_TYPE_DEBUG,
2951          "Adding str `%s' to `%s'\n",
2952          push_recv_str_tmp,
2953          push_recv_str);
2954     (void) strncat (push_recv_str,
2955                     push_recv_str_tmp,
2956                     1535 - strnlen (push_recv_str, 1536));
2957   }
2958   (void) strncat (push_recv_str,
2959                   "\n",
2960                   1535 - strnlen (push_recv_str, 1536));
2961   LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing push stats to disk\n");
2962   to_file_w_len (sub->file_name_push_recv, 1535, push_recv_str);
2963   GNUNET_free (sub->file_name_push_recv);
2964   sub->file_name_push_recv = NULL;
2965
2966   /* Write pull delays to disk */
2967   for (uint32_t i = 0; i < 256; i++)
2968   {
2969     char pull_delays_str_tmp[8];
2970     (void) snprintf (pull_delays_str_tmp, 8, "%" PRIu32 "\n", sub->pull_delays[i]);
2971     LOG (GNUNET_ERROR_TYPE_DEBUG,
2972          "Adding str `%s' to `%s'\n",
2973          pull_delays_str_tmp,
2974          pull_delays_str);
2975     (void) strncat (pull_delays_str,
2976                     pull_delays_str_tmp,
2977                     1535 - strnlen (pull_delays_str, 1536));
2978   }
2979   (void) strncat (pull_delays_str,
2980                   "\n",
2981                   1535 - strnlen (pull_delays_str, 1536));
2982   LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing pull delays to disk\n");
2983   to_file_w_len (sub->file_name_pull_delays, 1535, pull_delays_str);
2984   GNUNET_free (sub->file_name_pull_delays);
2985   sub->file_name_pull_delays = NULL;
2986
2987   GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
2988   sub->observed_unique_peers = NULL;
2989 #endif /* TO_FILE */
2990
2991   GNUNET_free (sub);
2992 }
2993
2994
2995 /***********************************************************************
2996  * /Sub
2997 ***********************************************************************/
2998
2999
3000 /***********************************************************************
3001  * Core handlers
3002 ***********************************************************************/
3003
3004 /**
3005  * @brief Callback on initialisation of Core.
3006  *
3007  * @param cls - unused
3008  * @param my_identity - unused
3009  */
3010 void
3011 core_init (void *cls,
3012            const struct GNUNET_PeerIdentity *my_identity)
3013 {
3014   (void) cls;
3015   (void) my_identity;
3016
3017   map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3018 }
3019
3020
3021 /**
3022  * @brief Callback for core.
3023  * Method called whenever a given peer connects.
3024  *
3025  * @param cls closure - unused
3026  * @param peer peer identity this notification is about
3027  * @return closure given to #core_disconnects as peer_cls
3028  */
3029 void *
3030 core_connects (void *cls,
3031                const struct GNUNET_PeerIdentity *peer,
3032                struct GNUNET_MQ_Handle *mq)
3033 {
3034   (void) cls;
3035   (void) mq;
3036
3037   GNUNET_CONTAINER_multipeermap_put (map_single_hop, peer, NULL,
3038       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
3039   return NULL;
3040 }
3041
3042
3043 /**
3044  * @brief Callback for core.
3045  * Method called whenever a peer disconnects.
3046  *
3047  * @param cls closure - unused
3048  * @param peer peer identity this notification is about
3049  * @param peer_cls closure given in #core_connects - unused
3050  */
3051 void
3052 core_disconnects (void *cls,
3053                   const struct GNUNET_PeerIdentity *peer,
3054                   void *peer_cls)
3055 {
3056   (void) cls;
3057   (void) peer_cls;
3058
3059   GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3060 }
3061
3062 /***********************************************************************
3063  * /Core handlers
3064 ***********************************************************************/
3065
3066
3067 /**
3068  * @brief Destroy the context for a (connected) client
3069  *
3070  * @param cli_ctx Context to destroy
3071  */
3072 static void
3073 destroy_cli_ctx (struct ClientContext *cli_ctx)
3074 {
3075   GNUNET_assert (NULL != cli_ctx);
3076   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3077                                cli_ctx_tail,
3078                                cli_ctx);
3079   if (NULL != cli_ctx->sub)
3080   {
3081     destroy_sub (cli_ctx->sub);
3082     cli_ctx->sub = NULL;
3083   }
3084   GNUNET_free (cli_ctx);
3085 }
3086
3087
3088 /**
3089  * @brief Update sizes in sampler and view on estimate update from nse service
3090  *
3091  * @param sub Sub
3092  * @param logestimate the log(Base 2) value of the current network size estimate
3093  * @param std_dev standard deviation for the estimate
3094  */
3095 static void
3096 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3097 {
3098   double estimate;
3099   //double scale; // TODO this might go gloabal/config
3100
3101   LOG (GNUNET_ERROR_TYPE_DEBUG,
3102        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3103        logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3104   //scale = .01;
3105   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3106   // GNUNET_NSE_log_estimate_to_n (logestimate);
3107   estimate = pow (estimate, 1.0 / 3);
3108   // TODO add if std_dev is a number
3109   // estimate += (std_dev * scale);
3110   if (sub->view_size_est_min < ceil (estimate))
3111   {
3112     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3113     sub->sampler_size_est_need = estimate;
3114     sub->view_size_est_need = estimate;
3115   } else
3116   {
3117     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3118     //sub->sampler_size_est_need = sub->view_size_est_min;
3119     sub->view_size_est_need = sub->view_size_est_min;
3120   }
3121   if (sub == msub)
3122   {
3123     GNUNET_STATISTICS_set (stats,
3124                            "view size aim",
3125                            sub->view_size_est_need,
3126                            GNUNET_NO);
3127   }
3128
3129   /* If the NSE has changed adapt the lists accordingly */
3130   resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3131   View_change_len (sub->view, sub->view_size_est_need);
3132 }
3133
3134
3135 /**
3136  * Function called by NSE.
3137  *
3138  * Updates sizes of sampler list and view and adapt those lists
3139  * accordingly.
3140  *
3141  * implements #GNUNET_NSE_Callback
3142  *
3143  * @param cls Closure - unused
3144  * @param timestamp time when the estimate was received from the server (or created by the server)
3145  * @param logestimate the log(Base 2) value of the current network size estimate
3146  * @param std_dev standard deviation for the estimate
3147  */
3148 static void
3149 nse_callback (void *cls,
3150               struct GNUNET_TIME_Absolute timestamp,
3151               double logestimate, double std_dev)
3152 {
3153   (void) cls;
3154   (void) timestamp;
3155   struct ClientContext *cli_ctx_iter;
3156
3157   adapt_sizes (msub, logestimate, std_dev);
3158   for (cli_ctx_iter = cli_ctx_head;
3159       NULL != cli_ctx_iter;
3160       cli_ctx_iter = cli_ctx_iter->next)
3161   {
3162     if (NULL != cli_ctx_iter->sub)
3163     {
3164       adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3165     }
3166   }
3167 }
3168
3169
3170 /**
3171  * @brief This function is called, when the client seeds peers.
3172  * It verifies that @a msg is well-formed.
3173  *
3174  * @param cls the closure (#ClientContext)
3175  * @param msg the message
3176  * @return #GNUNET_OK if @a msg is well-formed
3177  *         #GNUNET_SYSERR otherwise
3178  */
3179 static int
3180 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3181 {
3182   struct ClientContext *cli_ctx = cls;
3183   uint16_t msize = ntohs (msg->header.size);
3184   uint32_t num_peers = ntohl (msg->num_peers);
3185
3186   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3187   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3188        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3189   {
3190     LOG (GNUNET_ERROR_TYPE_ERROR,
3191         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3192         ntohl (msg->num_peers),
3193         (msize / sizeof (struct GNUNET_PeerIdentity)));
3194     GNUNET_break (0);
3195     GNUNET_SERVICE_client_drop (cli_ctx->client);
3196     return GNUNET_SYSERR;
3197   }
3198   return GNUNET_OK;
3199 }
3200
3201
3202 /**
3203  * Handle seed from the client.
3204  *
3205  * @param cls closure
3206  * @param message the actual message
3207  */
3208 static void
3209 handle_client_seed (void *cls,
3210                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3211 {
3212   struct ClientContext *cli_ctx = cls;
3213   struct GNUNET_PeerIdentity *peers;
3214   uint32_t num_peers;
3215   uint32_t i;
3216
3217   num_peers = ntohl (msg->num_peers);
3218   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3219
3220   LOG (GNUNET_ERROR_TYPE_DEBUG,
3221        "Client seeded peers:\n");
3222   print_peer_list (peers, num_peers);
3223
3224   for (i = 0; i < num_peers; i++)
3225   {
3226     LOG (GNUNET_ERROR_TYPE_DEBUG,
3227          "Updating samplers with seed %" PRIu32 ": %s\n",
3228          i,
3229          GNUNET_i2s (&peers[i]));
3230
3231     if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3232     if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3233   }
3234   GNUNET_SERVICE_client_continue (cli_ctx->client);
3235 }
3236
3237
3238 /**
3239  * Handle RPS request from the client.
3240  *
3241  * @param cls Client context
3242  * @param message Message containing the numer of updates the client wants to
3243  * receive
3244  */
3245 static void
3246 handle_client_view_request (void *cls,
3247                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3248 {
3249   struct ClientContext *cli_ctx = cls;
3250   uint64_t num_updates;
3251
3252   num_updates = ntohl (msg->num_updates);
3253
3254   LOG (GNUNET_ERROR_TYPE_DEBUG,
3255        "Client requested %" PRIu64 " updates of view.\n",
3256        num_updates);
3257
3258   GNUNET_assert (NULL != cli_ctx);
3259   cli_ctx->view_updates_left = num_updates;
3260   send_view (cli_ctx, NULL, 0);
3261   GNUNET_SERVICE_client_continue (cli_ctx->client);
3262 }
3263
3264
3265 /**
3266  * @brief Handle the cancellation of the view updates.
3267  *
3268  * @param cls The client context
3269  * @param msg Unused
3270  */
3271 static void
3272 handle_client_view_cancel (void *cls,
3273                            const struct GNUNET_MessageHeader *msg)
3274 {
3275   struct ClientContext *cli_ctx = cls;
3276   (void) msg;
3277
3278   LOG (GNUNET_ERROR_TYPE_DEBUG,
3279        "Client does not want to receive updates of view any more.\n");
3280
3281   GNUNET_assert (NULL != cli_ctx);
3282   cli_ctx->view_updates_left = 0;
3283   GNUNET_SERVICE_client_continue (cli_ctx->client);
3284   if (GNUNET_YES == cli_ctx->stream_update)
3285   {
3286     destroy_cli_ctx (cli_ctx);
3287   }
3288 }
3289
3290
3291 /**
3292  * Handle RPS request for biased stream from the client.
3293  *
3294  * @param cls Client context
3295  * @param message unused
3296  */
3297 static void
3298 handle_client_stream_request (void *cls,
3299                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3300 {
3301   struct ClientContext *cli_ctx = cls;
3302   (void) msg;
3303
3304   LOG (GNUNET_ERROR_TYPE_DEBUG,
3305        "Client requested peers from biased stream.\n");
3306   cli_ctx->stream_update = GNUNET_YES;
3307
3308   GNUNET_assert (NULL != cli_ctx);
3309   GNUNET_SERVICE_client_continue (cli_ctx->client);
3310 }
3311
3312
3313 /**
3314  * @brief Handles the cancellation of the stream of biased peer ids
3315  *
3316  * @param cls The client context
3317  * @param msg unused
3318  */
3319 static void
3320 handle_client_stream_cancel (void *cls,
3321                              const struct GNUNET_MessageHeader *msg)
3322 {
3323   struct ClientContext *cli_ctx = cls;
3324   (void) msg;
3325
3326   LOG (GNUNET_ERROR_TYPE_DEBUG,
3327        "Client canceled receiving peers from biased stream.\n");
3328   cli_ctx->stream_update = GNUNET_NO;
3329
3330   GNUNET_assert (NULL != cli_ctx);
3331   GNUNET_SERVICE_client_continue (cli_ctx->client);
3332 }
3333
3334
3335 /**
3336  * @brief Create and start a Sub.
3337  *
3338  * @param cls Closure - unused
3339  * @param msg Message containing the necessary information
3340  */
3341 static void
3342 handle_client_start_sub (void *cls,
3343                          const struct GNUNET_RPS_CS_SubStartMessage *msg)
3344 {
3345   struct ClientContext *cli_ctx = cls;
3346
3347   LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3348   if (NULL != cli_ctx->sub &&
3349       0 != memcmp (&cli_ctx->sub->hash,
3350                    &msg->hash,
3351                    sizeof (struct GNUNET_HashCode)))
3352   {
3353     LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3354     destroy_sub (cli_ctx->sub);
3355     cli_ctx->sub = NULL;
3356   }
3357   cli_ctx->sub = new_sub (&msg->hash,
3358                          msub->sampler_size_est_min, // TODO make api input?
3359                          GNUNET_TIME_relative_ntoh (msg->round_interval));
3360   GNUNET_SERVICE_client_continue (cli_ctx->client);
3361 }
3362
3363
3364 /**
3365  * @brief Destroy the Sub
3366  *
3367  * @param cls Closure - unused
3368  * @param msg Message containing the hash that identifies the Sub
3369  */
3370 static void
3371 handle_client_stop_sub (void *cls,
3372                         const struct GNUNET_RPS_CS_SubStopMessage *msg)
3373 {
3374   struct ClientContext *cli_ctx = cls;
3375
3376   GNUNET_assert (NULL != cli_ctx->sub);
3377   if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3378   {
3379     LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3380   }
3381   destroy_sub (cli_ctx->sub);
3382   cli_ctx->sub = NULL;
3383   GNUNET_SERVICE_client_continue (cli_ctx->client);
3384 }
3385
3386
3387 /**
3388  * Handle a CHECK_LIVE message from another peer.
3389  *
3390  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3391  * the channel is blocked for all other communication.
3392  *
3393  * @param cls Closure - Context of channel
3394  * @param msg Message - unused
3395  */
3396 static void
3397 handle_peer_check (void *cls,
3398                    const struct GNUNET_MessageHeader *msg)
3399 {
3400   const struct ChannelCtx *channel_ctx = cls;
3401   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3402   (void) msg;
3403
3404   LOG (GNUNET_ERROR_TYPE_DEBUG,
3405       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3406   if (channel_ctx->peer_ctx->sub == msub)
3407   {
3408     GNUNET_STATISTICS_update (stats,
3409                               "# pending online checks",
3410                               -1,
3411                               GNUNET_NO);
3412   }
3413
3414   GNUNET_CADET_receive_done (channel_ctx->channel);
3415 }
3416
3417
3418 /**
3419  * Handle a PUSH message from another peer.
3420  *
3421  * Check the proof of work and store the PeerID
3422  * in the temporary list for pushed PeerIDs.
3423  *
3424  * @param cls Closure - Context of channel
3425  * @param msg Message - unused
3426  */
3427 static void
3428 handle_peer_push (void *cls,
3429                   const struct GNUNET_MessageHeader *msg)
3430 {
3431   const struct ChannelCtx *channel_ctx = cls;
3432   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3433   (void) msg;
3434
3435   // (check the proof of work (?))
3436
3437   LOG (GNUNET_ERROR_TYPE_DEBUG,
3438        "Received PUSH (%s)\n",
3439        GNUNET_i2s (peer));
3440   if (channel_ctx->peer_ctx->sub == msub)
3441   {
3442     GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3443   }
3444
3445   #ifdef ENABLE_MALICIOUS
3446   struct AttackedPeer *tmp_att_peer;
3447
3448   if ( (1 == mal_type) ||
3449        (3 == mal_type) )
3450   { /* Try to maximise representation */
3451     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3452     tmp_att_peer->peer_id = *peer;
3453     if (NULL == att_peer_set)
3454       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3455     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3456                                                              peer))
3457     {
3458       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3459                                    att_peers_tail,
3460                                    tmp_att_peer);
3461       add_peer_array_to_set (peer, 1, att_peer_set);
3462     }
3463     else
3464     {
3465       GNUNET_free (tmp_att_peer);
3466     }
3467   }
3468
3469
3470   else if (2 == mal_type)
3471   {
3472     /* We attack one single well-known peer - simply ignore */
3473   }
3474   #endif /* ENABLE_MALICIOUS */
3475
3476   /* Add the sending peer to the push_map */
3477   CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3478
3479   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3480                                      &channel_ctx->peer_ctx->peer_id));
3481   GNUNET_CADET_receive_done (channel_ctx->channel);
3482 }
3483
3484
3485 /**
3486  * Handle PULL REQUEST request message from another peer.
3487  *
3488  * Reply with the view of PeerIDs.
3489  *
3490  * @param cls Closure - Context of channel
3491  * @param msg Message - unused
3492  */
3493 static void
3494 handle_peer_pull_request (void *cls,
3495                           const struct GNUNET_MessageHeader *msg)
3496 {
3497   const struct ChannelCtx *channel_ctx = cls;
3498   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3499   const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3500   const struct GNUNET_PeerIdentity *view_array;
3501   (void) msg;
3502
3503   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3504   if (peer_ctx->sub == msub)
3505   {
3506     GNUNET_STATISTICS_update(stats,
3507                              "# pull request message received",
3508                              1,
3509                              GNUNET_NO);
3510     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3511                                                              &peer_ctx->peer_id))
3512     {
3513       GNUNET_STATISTICS_update (stats,
3514                                 "# pull request message received (multi-hop peer)",
3515                                 1,
3516                                 GNUNET_NO);
3517     }
3518   }
3519
3520   #ifdef ENABLE_MALICIOUS
3521   if (1 == mal_type
3522       || 3 == mal_type)
3523   { /* Try to maximise representation */
3524     send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3525   }
3526
3527   else if (2 == mal_type)
3528   { /* Try to partition network */
3529     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3530     {
3531       send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3532     }
3533   }
3534   #endif /* ENABLE_MALICIOUS */
3535
3536   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3537                                      &channel_ctx->peer_ctx->peer_id));
3538   GNUNET_CADET_receive_done (channel_ctx->channel);
3539   view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3540   send_pull_reply (peer_ctx,
3541                    view_array,
3542                    View_size (channel_ctx->peer_ctx->sub->view));
3543 }
3544
3545
3546 /**
3547  * Check whether we sent a corresponding request and
3548  * whether this reply is the first one.
3549  *
3550  * @param cls Closure - Context of channel
3551  * @param msg Message containing the replied peers
3552  */
3553 static int
3554 check_peer_pull_reply (void *cls,
3555                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3556 {
3557   struct ChannelCtx *channel_ctx = cls;
3558   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3559
3560   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3561   {
3562     GNUNET_break_op (0);
3563     return GNUNET_SYSERR;
3564   }
3565
3566   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3567       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3568   {
3569     LOG (GNUNET_ERROR_TYPE_ERROR,
3570         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3571         ntohl (msg->num_peers),
3572         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3573             sizeof (struct GNUNET_PeerIdentity));
3574     GNUNET_break_op (0);
3575     return GNUNET_SYSERR;
3576   }
3577
3578   if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3579                                      &sender_ctx->peer_id,
3580                                      Peers_PULL_REPLY_PENDING))
3581   {
3582     LOG (GNUNET_ERROR_TYPE_WARNING,
3583         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3584         GNUNET_i2s (&sender_ctx->peer_id));
3585     if (sender_ctx->sub == msub)
3586     {
3587       GNUNET_STATISTICS_update (stats,
3588                                 "# unrequested pull replies",
3589                                 1,
3590                                 GNUNET_NO);
3591     }
3592     GNUNET_break_op (0);
3593     return GNUNET_SYSERR;
3594   }
3595   return GNUNET_OK;
3596 }
3597
3598
3599 /**
3600  * Handle PULL REPLY message from another peer.
3601  *
3602  * @param cls Closure
3603  * @param msg The message header
3604  */
3605 static void
3606 handle_peer_pull_reply (void *cls,
3607                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3608 {
3609   const struct ChannelCtx *channel_ctx = cls;
3610   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3611   const struct GNUNET_PeerIdentity *peers;
3612   struct Sub *sub = channel_ctx->peer_ctx->sub;
3613   uint32_t i;
3614 #ifdef ENABLE_MALICIOUS
3615   struct AttackedPeer *tmp_att_peer;
3616 #endif /* ENABLE_MALICIOUS */
3617
3618   sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3619   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3620   if (channel_ctx->peer_ctx->sub == msub)
3621   {
3622     GNUNET_STATISTICS_update (stats,
3623                               "# pull reply messages received",
3624                               1,
3625                               GNUNET_NO);
3626     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3627           &channel_ctx->peer_ctx->peer_id))
3628     {
3629       GNUNET_STATISTICS_update (stats,
3630                                 "# pull reply messages received (multi-hop peer)",
3631                                 1,
3632                                 GNUNET_NO);
3633     }
3634   }
3635
3636   #ifdef ENABLE_MALICIOUS
3637   // We shouldn't even receive pull replies as we're not sending
3638   if (2 == mal_type)
3639   {
3640   }
3641   #endif /* ENABLE_MALICIOUS */
3642
3643   /* Do actual logic */
3644   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3645
3646   LOG (GNUNET_ERROR_TYPE_DEBUG,
3647        "PULL REPLY received, got following %u peers:\n",
3648        ntohl (msg->num_peers));
3649
3650   for (i = 0; i < ntohl (msg->num_peers); i++)
3651   {
3652     LOG (GNUNET_ERROR_TYPE_DEBUG,
3653          "%u. %s\n",
3654          i,
3655          GNUNET_i2s (&peers[i]));
3656
3657     #ifdef ENABLE_MALICIOUS
3658     if ((NULL != att_peer_set) &&
3659         (1 == mal_type || 3 == mal_type))
3660     { /* Add attacked peer to local list */
3661       // TODO check if we sent a request and this was the first reply
3662       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3663                                                                &peers[i])
3664           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3665                                                                   &peers[i]))
3666       {
3667         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3668         tmp_att_peer->peer_id = peers[i];
3669         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3670                                      att_peers_tail,
3671                                      tmp_att_peer);
3672         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3673       }
3674       continue;
3675     }
3676     #endif /* ENABLE_MALICIOUS */
3677     /* Make sure we 'know' about this peer */
3678     (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]);
3679
3680     if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3681                                         &peers[i]))
3682     {
3683       CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]);
3684     }
3685     else
3686     {
3687       schedule_operation (channel_ctx->peer_ctx,
3688                           insert_in_pull_map,
3689                           channel_ctx->peer_ctx->sub); /* cls */
3690       (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]);
3691     }
3692   }
3693
3694   UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender),
3695                    Peers_PULL_REPLY_PENDING);
3696   clean_peer (channel_ctx->peer_ctx->sub, sender);
3697
3698   GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3699                                      sender));
3700   GNUNET_CADET_receive_done (channel_ctx->channel);
3701 }
3702
3703
3704 /**
3705  * Compute a random delay.
3706  * A uniformly distributed value between mean + spread and mean - spread.
3707  *
3708  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3709  * It would return a random value between 2 and 6 min.
3710  *
3711  * @param mean the mean time until the next round
3712  * @param spread the inverse amount of deviation from the mean
3713  */
3714 static struct GNUNET_TIME_Relative
3715 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3716                     unsigned int spread)
3717 {
3718   struct GNUNET_TIME_Relative half_interval;
3719   struct GNUNET_TIME_Relative ret;
3720   unsigned int rand_delay;
3721   unsigned int max_rand_delay;
3722
3723   if (0 == spread)
3724   {
3725     LOG (GNUNET_ERROR_TYPE_WARNING,
3726          "Not accepting spread of 0\n");
3727     GNUNET_break (0);
3728     GNUNET_assert (0);
3729   }
3730   GNUNET_assert (0 != mean.rel_value_us);
3731
3732   /* Compute random time value between spread * mean and spread * mean */
3733   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3734
3735   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3736   /**
3737    * Compute random value between (0 and 1) * round_interval
3738    * via multiplying round_interval with a 'fraction' (0 to value)/value
3739    */
3740   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3741   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3742   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3743   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3744
3745   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3746     LOG (GNUNET_ERROR_TYPE_WARNING,
3747          "Returning FOREVER_REL\n");
3748
3749   return ret;
3750 }
3751
3752
3753 /**
3754  * Send single pull request
3755  *
3756  * @param peer_ctx Context to the peer to send request to
3757  */
3758 static void
3759 send_pull_request (struct PeerContext *peer_ctx)
3760 {
3761   struct GNUNET_MQ_Envelope *ev;
3762
3763   GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3764                                                &peer_ctx->peer_id,
3765                                                Peers_PULL_REPLY_PENDING));
3766   SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
3767   peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3768
3769   LOG (GNUNET_ERROR_TYPE_DEBUG,
3770        "Going to send PULL REQUEST to peer %s.\n",
3771        GNUNET_i2s (&peer_ctx->peer_id));
3772
3773   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3774   send_message (peer_ctx, ev, "PULL REQUEST");
3775   if (peer_ctx->sub)
3776   {
3777     GNUNET_STATISTICS_update (stats,
3778                               "# pull request send issued",
3779                               1,
3780                               GNUNET_NO);
3781     if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3782                                                              &peer_ctx->peer_id))
3783     {
3784       GNUNET_STATISTICS_update (stats,
3785                                 "# pull request send issued (multi-hop peer)",
3786                                 1,
3787                                 GNUNET_NO);
3788     }
3789   }
3790 }
3791
3792
3793 /**
3794  * Send single push
3795  *
3796  * @param peer_ctx Context of peer to send push to
3797  */
3798 static void
3799 send_push (struct PeerContext *peer_ctx)
3800 {
3801   struct GNUNET_MQ_Envelope *ev;
3802
3803   LOG (GNUNET_ERROR_TYPE_DEBUG,
3804        "Going to send PUSH to peer %s.\n",
3805        GNUNET_i2s (&peer_ctx->peer_id));
3806
3807   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3808   send_message (peer_ctx, ev, "PUSH");
3809   if (peer_ctx->sub)
3810   {
3811     GNUNET_STATISTICS_update (stats,
3812                               "# push send issued",
3813                               1,
3814                               GNUNET_NO);
3815   }
3816 }
3817
3818
3819 #ifdef ENABLE_MALICIOUS
3820
3821
3822 /**
3823  * @brief This function is called, when the client tells us to act malicious.
3824  * It verifies that @a msg is well-formed.
3825  *
3826  * @param cls the closure (#ClientContext)
3827  * @param msg the message
3828  * @return #GNUNET_OK if @a msg is well-formed
3829  */
3830 static int
3831 check_client_act_malicious (void *cls,
3832                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3833 {
3834   struct ClientContext *cli_ctx = cls;
3835   uint16_t msize = ntohs (msg->header.size);
3836   uint32_t num_peers = ntohl (msg->num_peers);
3837
3838   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3839   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3840        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3841   {
3842     LOG (GNUNET_ERROR_TYPE_ERROR,
3843         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3844         ntohl (msg->num_peers),
3845         (msize / sizeof (struct GNUNET_PeerIdentity)));
3846     GNUNET_break (0);
3847     GNUNET_SERVICE_client_drop (cli_ctx->client);
3848     return GNUNET_SYSERR;
3849   }
3850   return GNUNET_OK;
3851 }
3852
3853 /**
3854  * Turn RPS service to act malicious.
3855  *
3856  * @param cls Closure
3857  * @param client The client that sent the message
3858  * @param msg The message header
3859  */
3860 static void
3861 handle_client_act_malicious (void *cls,
3862                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3863 {
3864   struct ClientContext *cli_ctx = cls;
3865   struct GNUNET_PeerIdentity *peers;
3866   uint32_t num_mal_peers_sent;
3867   uint32_t num_mal_peers_old;
3868   struct Sub *sub = cli_ctx->sub;
3869
3870   if (NULL == sub) sub = msub;
3871   /* Do actual logic */
3872   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3873   mal_type = ntohl (msg->type);
3874   if (NULL == mal_peer_set)
3875     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3876
3877   LOG (GNUNET_ERROR_TYPE_DEBUG,
3878        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3879        mal_type,
3880        ntohl (msg->num_peers));
3881
3882   if (1 == mal_type)
3883   { /* Try to maximise representation */
3884     /* Add other malicious peers to those we already know */
3885
3886     num_mal_peers_sent = ntohl (msg->num_peers);
3887     num_mal_peers_old = num_mal_peers;
3888     GNUNET_array_grow (mal_peers,
3889                        num_mal_peers,
3890                        num_mal_peers + num_mal_peers_sent);
3891     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3892             peers,
3893             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3894
3895     /* Add all mal peers to mal_peer_set */
3896     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3897                            num_mal_peers_sent,
3898                            mal_peer_set);
3899
3900     /* Substitute do_round () with do_mal_round () */
3901     GNUNET_assert (NULL != sub->do_round_task);
3902     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3903     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3904   }
3905
3906   else if ( (2 == mal_type) ||
3907             (3 == mal_type) )
3908   { /* Try to partition the network */
3909     /* Add other malicious peers to those we already know */
3910
3911     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3912     num_mal_peers_old = num_mal_peers;
3913     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3914     GNUNET_array_grow (mal_peers,
3915                        num_mal_peers,
3916                        num_mal_peers + num_mal_peers_sent);
3917     if (NULL != mal_peers &&
3918         0 != num_mal_peers)
3919     {
3920       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3921               peers,
3922               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3923
3924       /* Add all mal peers to mal_peer_set */
3925       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3926                              num_mal_peers_sent,
3927                              mal_peer_set);
3928     }
3929
3930     /* Store the one attacked peer */
3931     GNUNET_memcpy (&attacked_peer,
3932             &msg->attacked_peer,
3933             sizeof (struct GNUNET_PeerIdentity));
3934     /* Set the flag of the attacked peer to valid to avoid problems */
3935     if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3936     {
3937       (void) issue_peer_online_check (sub, &attacked_peer);
3938     }
3939
3940     LOG (GNUNET_ERROR_TYPE_DEBUG,
3941          "Attacked peer is %s\n",
3942          GNUNET_i2s (&attacked_peer));
3943
3944     /* Substitute do_round () with do_mal_round () */
3945     if (NULL != sub->do_round_task)
3946     {
3947       /* Probably in shutdown */
3948       GNUNET_SCHEDULER_cancel (sub->do_round_task);
3949       sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3950     }
3951   }
3952   else if (0 == mal_type)
3953   { /* Stop acting malicious */
3954     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3955
3956     /* Substitute do_mal_round () with do_round () */
3957     GNUNET_SCHEDULER_cancel (sub->do_round_task);
3958     sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
3959   }
3960   else
3961   {
3962     GNUNET_break (0);
3963     GNUNET_SERVICE_client_continue (cli_ctx->client);
3964   }
3965   GNUNET_SERVICE_client_continue (cli_ctx->client);
3966 }
3967
3968
3969 /**
3970  * Send out PUSHes and PULLs maliciously.
3971  *
3972  * This is executed regylary.
3973  *
3974  * @param cls Closure - Sub
3975  */
3976 static void
3977 do_mal_round (void *cls)
3978 {
3979   uint32_t num_pushes;
3980   uint32_t i;
3981   struct GNUNET_TIME_Relative time_next_round;
3982   struct AttackedPeer *tmp_att_peer;
3983   struct Sub *sub = cls;
3984
3985   LOG (GNUNET_ERROR_TYPE_DEBUG,
3986        "Going to execute next round maliciously type %" PRIu32 ".\n",
3987       mal_type);
3988   sub->do_round_task = NULL;
3989   GNUNET_assert (mal_type <= 3);
3990   /* Do malicious actions */
3991   if (1 == mal_type)
3992   { /* Try to maximise representation */
3993
3994     /* The maximum of pushes we're going to send this round */
3995     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3996                                          num_attacked_peers),
3997                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3998
3999     LOG (GNUNET_ERROR_TYPE_DEBUG,
4000          "Going to send %" PRIu32 " pushes\n",
4001          num_pushes);
4002
4003     /* Send PUSHes to attacked peers */
4004     for (i = 0 ; i < num_pushes ; i++)
4005     {
4006       if (att_peers_tail == att_peer_index)
4007         att_peer_index = att_peers_head;
4008       else
4009         att_peer_index = att_peer_index->next;
4010
4011       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4012     }
4013
4014     /* Send PULLs to some peers to learn about additional peers to attack */
4015     tmp_att_peer = att_peer_index;
4016     for (i = 0 ; i < num_pushes * alpha ; i++)
4017     {
4018       if (att_peers_tail == tmp_att_peer)
4019         tmp_att_peer = att_peers_head;
4020       else
4021         att_peer_index = tmp_att_peer->next;
4022
4023       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4024     }
4025   }
4026
4027
4028   else if (2 == mal_type)
4029   { /**
4030      * Try to partition the network
4031      * Send as many pushes to the attacked peer as possible
4032      * That is one push per round as it will ignore more.
4033      */
4034     (void) issue_peer_online_check (sub, &attacked_peer);
4035     if (GNUNET_YES == check_peer_flag (sub->peer_map,
4036                                        &attacked_peer,
4037                                        Peers_ONLINE))
4038       send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4039   }
4040
4041
4042   if (3 == mal_type)
4043   { /* Combined attack */
4044
4045     /* Send PUSH to attacked peers */
4046     if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4047     {
4048       (void) issue_peer_online_check (sub, &attacked_peer);
4049       if (GNUNET_YES == check_peer_flag (sub->peer_map,
4050                                          &attacked_peer,
4051                                          Peers_ONLINE))
4052       {
4053         LOG (GNUNET_ERROR_TYPE_DEBUG,
4054             "Goding to send push to attacked peer (%s)\n",
4055             GNUNET_i2s (&attacked_peer));
4056         send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4057       }
4058     }
4059     (void) issue_peer_online_check (sub, &attacked_peer);
4060
4061     /* The maximum of pushes we're going to send this round */
4062     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4063                                          num_attacked_peers),
4064                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4065
4066     LOG (GNUNET_ERROR_TYPE_DEBUG,
4067          "Going to send %" PRIu32 " pushes\n",
4068          num_pushes);
4069
4070     for (i = 0; i < num_pushes; i++)
4071     {
4072       if (att_peers_tail == att_peer_index)
4073         att_peer_index = att_peers_head;
4074       else
4075         att_peer_index = att_peer_index->next;
4076
4077       send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4078     }
4079
4080     /* Send PULLs to some peers to learn about additional peers to attack */
4081     tmp_att_peer = att_peer_index;
4082     for (i = 0; i < num_pushes * alpha; i++)
4083     {
4084       if (att_peers_tail == tmp_att_peer)
4085         tmp_att_peer = att_peers_head;
4086       else
4087         att_peer_index = tmp_att_peer->next;
4088
4089       send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4090     }
4091   }
4092
4093   /* Schedule next round */
4094   time_next_round = compute_rand_delay (sub->round_interval, 2);
4095
4096   GNUNET_assert (NULL == sub->do_round_task);
4097   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4098                                                     &do_mal_round, sub);
4099   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4100 }
4101 #endif /* ENABLE_MALICIOUS */
4102
4103
4104 /**
4105  * Send out PUSHes and PULLs, possibly update #view, samplers.
4106  *
4107  * This is executed regylary.
4108  *
4109  * @param cls Closure - Sub
4110  */
4111 static void
4112 do_round (void *cls)
4113 {
4114   unsigned int i;
4115   const struct GNUNET_PeerIdentity *view_array;
4116   unsigned int *permut;
4117   unsigned int a_peers; /* Number of peers we send pushes to */
4118   unsigned int b_peers; /* Number of peers we send pull requests to */
4119   uint32_t first_border;
4120   uint32_t second_border;
4121   struct GNUNET_PeerIdentity peer;
4122   struct GNUNET_PeerIdentity *update_peer;
4123   struct Sub *sub = cls;
4124
4125   sub->num_rounds++;
4126   LOG (GNUNET_ERROR_TYPE_DEBUG,
4127        "Going to execute next round.\n");
4128   if (sub == msub)
4129   {
4130     GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4131   }
4132   sub->do_round_task = NULL;
4133   LOG (GNUNET_ERROR_TYPE_DEBUG,
4134        "Printing view:\n");
4135   to_file (sub->file_name_view_log,
4136            "___ new round ___");
4137   view_array = View_get_as_array (sub->view);
4138   for (i = 0; i < View_size (sub->view); i++)
4139   {
4140     LOG (GNUNET_ERROR_TYPE_DEBUG,
4141          "\t%s\n", GNUNET_i2s (&view_array[i]));
4142     to_file (sub->file_name_view_log,
4143              "=%s\t(do round)",
4144              GNUNET_i2s_full (&view_array[i]));
4145   }
4146
4147
4148   /* Send pushes and pull requests */
4149   if (0 < View_size (sub->view))
4150   {
4151     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4152                                            View_size (sub->view));
4153
4154     /* Send PUSHes */
4155     a_peers = ceil (alpha * View_size (sub->view));
4156
4157     LOG (GNUNET_ERROR_TYPE_DEBUG,
4158          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4159          a_peers, alpha, View_size (sub->view));
4160     for (i = 0; i < a_peers; i++)
4161     {
4162       peer = view_array[permut[i]];
4163       // FIXME if this fails schedule/loop this for later
4164       send_push (get_peer_ctx (sub->peer_map, &peer));
4165     }
4166
4167     /* Send PULL requests */
4168     b_peers = ceil (beta * View_size (sub->view));
4169     first_border = a_peers;
4170     second_border = a_peers + b_peers;
4171     if (second_border > View_size (sub->view))
4172     {
4173       first_border = View_size (sub->view) - b_peers;
4174       second_border = View_size (sub->view);
4175     }
4176     LOG (GNUNET_ERROR_TYPE_DEBUG,
4177         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4178         b_peers, beta, View_size (sub->view));
4179     for (i = first_border; i < second_border; i++)
4180     {
4181       peer = view_array[permut[i]];
4182       if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4183                                          &peer,
4184                                          Peers_PULL_REPLY_PENDING))
4185       { // FIXME if this fails schedule/loop this for later
4186         send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4187       }
4188     }
4189
4190     GNUNET_free (permut);
4191     permut = NULL;
4192   }
4193
4194
4195   /* Update view */
4196   /* TODO see how many peers are in push-/pull- list! */
4197
4198   if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4199       (0 < CustomPeerMap_size (sub->push_map)) &&
4200       (0 < CustomPeerMap_size (sub->pull_map)))
4201   { /* If conditions for update are fulfilled, update */
4202     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4203
4204     uint32_t final_size;
4205     uint32_t peers_to_clean_size;
4206     struct GNUNET_PeerIdentity *peers_to_clean;
4207
4208     peers_to_clean = NULL;
4209     peers_to_clean_size = 0;
4210     GNUNET_array_grow (peers_to_clean,
4211                        peers_to_clean_size,
4212                        View_size (sub->view));
4213     GNUNET_memcpy (peers_to_clean,
4214             view_array,
4215             View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4216
4217     /* Seems like recreating is the easiest way of emptying the peermap */
4218     View_clear (sub->view);
4219     to_file (sub->file_name_view_log,
4220              "--- emptied ---");
4221
4222     first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4223                                 CustomPeerMap_size (sub->push_map));
4224     second_border = first_border +
4225                     GNUNET_MIN (floor (beta  * sub->view_size_est_need),
4226                                 CustomPeerMap_size (sub->pull_map));
4227     final_size    = second_border +
4228       ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4229     LOG (GNUNET_ERROR_TYPE_DEBUG,
4230         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4231         first_border,
4232         second_border,
4233         final_size);
4234
4235     /* Update view with peers received through PUSHes */
4236     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4237                                            CustomPeerMap_size (sub->push_map));
4238     for (i = 0; i < first_border; i++)
4239     {
4240       int inserted;
4241       inserted = insert_in_view (sub,
4242                                  CustomPeerMap_get_peer_by_index (sub->push_map,
4243                                                                   permut[i]));
4244       if (GNUNET_OK == inserted)
4245       {
4246         clients_notify_stream_peer (sub,
4247             1,
4248             CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4249       }
4250       to_file (sub->file_name_view_log,
4251                "+%s\t(push list)",
4252                GNUNET_i2s_full (&view_array[i]));
4253       // TODO change the peer_flags accordingly
4254     }
4255     GNUNET_free (permut);
4256     permut = NULL;
4257
4258     /* Update view with peers received through PULLs */
4259     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4260                                            CustomPeerMap_size (sub->pull_map));
4261     for (i = first_border; i < second_border; i++)
4262     {
4263       int inserted;
4264       inserted = insert_in_view (sub,
4265           CustomPeerMap_get_peer_by_index (sub->pull_map,
4266                                            permut[i - first_border]));
4267       if (GNUNET_OK == inserted)
4268       {
4269         clients_notify_stream_peer (sub,
4270             1,
4271             CustomPeerMap_get_peer_by_index (sub->pull_map,
4272                                              permut[i - first_border]));
4273       }
4274       to_file (sub->file_name_view_log,
4275                "+%s\t(pull list)",
4276                GNUNET_i2s_full (&view_array[i]));
4277       // TODO change the peer_flags accordingly
4278     }
4279     GNUNET_free (permut);
4280     permut = NULL;
4281
4282     /* Update view with peers from history */
4283     RPS_sampler_get_n_rand_peers (sub->sampler,
4284                                   final_size - second_border,
4285                                   hist_update,
4286                                   sub);
4287     // TODO change the peer_flags accordingly
4288
4289     for (i = 0; i < View_size (sub->view); i++)
4290       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4291
4292     /* Clean peers that were removed from the view */
4293     for (i = 0; i < peers_to_clean_size; i++)
4294     {
4295       to_file (sub->file_name_view_log,
4296                "-%s",
4297                GNUNET_i2s_full (&peers_to_clean[i]));
4298       clean_peer (sub, &peers_to_clean[i]);
4299     }
4300
4301     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4302     clients_notify_view_update (sub);
4303   } else {
4304     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4305     if (sub == msub)
4306     {
4307       GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4308       if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
4309           !(0 >= CustomPeerMap_size (sub->pull_map)))
4310         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4311       if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
4312           (0 >= CustomPeerMap_size (sub->pull_map)))
4313         GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4314       if (0 >= CustomPeerMap_size (sub->push_map) &&
4315           !(0 >= CustomPeerMap_size (sub->pull_map)))
4316         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4317       if (0 >= CustomPeerMap_size (sub->push_map) &&
4318           (0 >= CustomPeerMap_size (sub->pull_map)))
4319         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4320       if (0 >= CustomPeerMap_size (sub->pull_map) &&
4321           CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
4322           0 >= CustomPeerMap_size (sub->push_map))
4323         GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4324     }
4325   }
4326   // TODO independent of that also get some peers from CADET_get_peers()?
4327   sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4328   if (sub == msub)
4329   {
4330     GNUNET_STATISTICS_set (stats,
4331         "# peers in push map at end of round",
4332         CustomPeerMap_size (sub->push_map),
4333         GNUNET_NO);
4334     GNUNET_STATISTICS_set (stats,
4335         "# peers in pull map at end of round",
4336         CustomPeerMap_size (sub->pull_map),
4337         GNUNET_NO);
4338     GNUNET_STATISTICS_set (stats,
4339         "# peers in view at end of round",
4340         View_size (sub->view),
4341         GNUNET_NO);
4342   }
4343
4344   LOG (GNUNET_ERROR_TYPE_DEBUG,
4345        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4346        CustomPeerMap_size (sub->push_map),
4347        CustomPeerMap_size (sub->pull_map),
4348        alpha,
4349        View_size (sub->view),
4350        alpha * View_size (sub->view));
4351
4352   /* Update samplers */
4353   for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4354   {
4355     update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4356     LOG (GNUNET_ERROR_TYPE_DEBUG,
4357          "Updating with peer %s from push list\n",
4358          GNUNET_i2s (update_peer));
4359     insert_in_sampler (sub, update_peer);
4360     clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4361   }
4362
4363   for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4364   {
4365     LOG (GNUNET_ERROR_TYPE_DEBUG,
4366          "Updating with peer %s from pull list\n",
4367          GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4368     insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4369     /* This cleans only if it is not in the view */
4370     clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4371   }
4372
4373
4374   /* Empty push/pull lists */
4375   CustomPeerMap_clear (sub->push_map);
4376   CustomPeerMap_clear (sub->pull_map);
4377
4378   if (sub == msub)
4379   {
4380     GNUNET_STATISTICS_set (stats,
4381                            "view size",
4382                            View_size(sub->view),
4383                            GNUNET_NO);
4384   }
4385
4386   struct GNUNET_TIME_Relative time_next_round;
4387
4388   time_next_round = compute_rand_delay (sub->round_interval, 2);
4389
4390   /* Schedule next round */
4391   sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4392                                                      &do_round, sub);
4393   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4394 }
4395
4396
4397 /**
4398  * This is called from GNUNET_CADET_get_peers().
4399  *
4400  * It is called on every peer(ID) that cadet somehow has contact with.
4401  * We use those to initialise the sampler.
4402  *
4403  * implements #GNUNET_CADET_PeersCB
4404  *
4405  * @param cls Closure - Sub
4406  * @param peer Peer, or NULL on "EOF".
4407  * @param tunnel Do we have a tunnel towards this peer?
4408  * @param n_paths Number of known paths towards this peer.
4409  * @param best_path How long is the best path?
4410  *                  (0 = unknown, 1 = ourselves, 2 = neighbor)
4411  */
4412 void
4413 init_peer_cb (void *cls,
4414               const struct GNUNET_PeerIdentity *peer,
4415               int tunnel, /* "Do we have a tunnel towards this peer?" */
4416               unsigned int n_paths, /* "Number of known paths towards this peer" */
4417               unsigned int best_path) /* "How long is the best path?
4418                                        * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4419 {
4420   struct Sub *sub = cls;
4421   (void) tunnel;
4422   (void) n_paths;
4423   (void) best_path;
4424
4425   if (NULL != peer)
4426   {
4427     LOG (GNUNET_ERROR_TYPE_DEBUG,
4428          "Got peer_id %s from cadet\n",
4429          GNUNET_i2s (peer));
4430     got_peer (sub, peer);
4431   }
4432 }
4433
4434
4435 /**
4436  * @brief Iterator function over stored, valid peers.
4437  *
4438  * We initialise the sampler with those.
4439  *
4440  * @param cls Closure - Sub
4441  * @param peer the peer id
4442  * @return #GNUNET_YES if we should continue to
4443  *         iterate,
4444  *         #GNUNET_NO if not.
4445  */
4446 static int
4447 valid_peers_iterator (void *cls,
4448                       const struct GNUNET_PeerIdentity *peer)
4449 {
4450   struct Sub *sub = cls;
4451
4452   if (NULL != peer)
4453   {
4454     LOG (GNUNET_ERROR_TYPE_DEBUG,
4455          "Got stored, valid peer %s\n",
4456          GNUNET_i2s (peer));
4457     got_peer (sub, peer);
4458   }
4459   return GNUNET_YES;
4460 }
4461
4462
4463 /**
4464  * Iterator over peers from peerinfo.
4465  *
4466  * @param cls Closure - Sub
4467  * @param peer id of the peer, NULL for last call
4468  * @param hello hello message for the peer (can be NULL)
4469  * @param error message
4470  */
4471 void
4472 process_peerinfo_peers (void *cls,
4473                         const struct GNUNET_PeerIdentity *peer,
4474                         const struct GNUNET_HELLO_Message *hello,
4475                         const char *err_msg)
4476 {
4477   struct Sub *sub = cls;
4478   (void) hello;
4479   (void) err_msg;
4480
4481   if (NULL != peer)
4482   {
4483     LOG (GNUNET_ERROR_TYPE_DEBUG,
4484          "Got peer_id %s from peerinfo\n",
4485          GNUNET_i2s (peer));
4486     got_peer (sub, peer);
4487   }
4488 }
4489
4490
4491 /**
4492  * Task run during shutdown.
4493  *
4494  * @param cls Closure - unused
4495  */
4496 static void
4497 shutdown_task (void *cls)
4498 {
4499   (void) cls;
4500   struct ClientContext *client_ctx;
4501
4502   LOG (GNUNET_ERROR_TYPE_DEBUG,
4503        "RPS service is going down\n");
4504
4505   /* Clean all clients */
4506   for (client_ctx = cli_ctx_head;
4507        NULL != cli_ctx_head;
4508        client_ctx = cli_ctx_head)
4509   {
4510     destroy_cli_ctx (client_ctx);
4511   }
4512   if (NULL != msub)
4513   {
4514     destroy_sub (msub);
4515     msub = NULL;
4516   }
4517
4518   /* Disconnect from other services */
4519   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4520   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4521   peerinfo_handle = NULL;
4522   GNUNET_NSE_disconnect (nse);
4523   if (NULL != map_single_hop)
4524   {
4525     /* core_init was called - core was initialised */
4526     /* disconnect first, so no callback tries to access missing peermap */
4527     GNUNET_CORE_disconnect (core_handle);
4528     core_handle = NULL;
4529     GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4530     map_single_hop = NULL;
4531   }
4532
4533   if (NULL != stats)
4534   {
4535     GNUNET_STATISTICS_destroy (stats,
4536                                GNUNET_NO);
4537     stats = NULL;
4538   }
4539   GNUNET_CADET_disconnect (cadet_handle);
4540   cadet_handle = NULL;
4541 #ifdef ENABLE_MALICIOUS
4542   struct AttackedPeer *tmp_att_peer;
4543   GNUNET_array_grow (mal_peers,
4544                      num_mal_peers,
4545                      0);
4546   if (NULL != mal_peer_set)
4547     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4548   if (NULL != att_peer_set)
4549     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4550   while (NULL != att_peers_head)
4551   {
4552     tmp_att_peer = att_peers_head;
4553     GNUNET_CONTAINER_DLL_remove (att_peers_head,
4554                                  att_peers_tail,
4555                                  tmp_att_peer);
4556     GNUNET_free (tmp_att_peer);
4557   }
4558 #endif /* ENABLE_MALICIOUS */
4559 }
4560
4561
4562 /**
4563  * Handle client connecting to the service.
4564  *
4565  * @param cls unused
4566  * @param client the new client
4567  * @param mq the message queue of @a client
4568  * @return @a client
4569  */
4570 static void *
4571 client_connect_cb (void *cls,
4572                    struct GNUNET_SERVICE_Client *client,
4573                    struct GNUNET_MQ_Handle *mq)
4574 {
4575   struct ClientContext *cli_ctx;
4576   (void) cls;
4577
4578   LOG (GNUNET_ERROR_TYPE_DEBUG,
4579        "Client connected\n");
4580   if (NULL == client)
4581     return client; /* Server was destroyed before a client connected. Shutting down */
4582   cli_ctx = GNUNET_new (struct ClientContext);
4583   cli_ctx->mq = mq;
4584   cli_ctx->view_updates_left = -1;
4585   cli_ctx->stream_update = GNUNET_NO;
4586   cli_ctx->client = client;
4587   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4588                                cli_ctx_tail,
4589                                cli_ctx);
4590   return cli_ctx;
4591 }
4592
4593 /**
4594  * Callback called when a client disconnected from the service
4595  *
4596  * @param cls closure for the service
4597  * @param c the client that disconnected
4598  * @param internal_cls should be equal to @a c
4599  */
4600 static void
4601 client_disconnect_cb (void *cls,
4602                       struct GNUNET_SERVICE_Client *client,
4603                       void *internal_cls)
4604 {
4605   struct ClientContext *cli_ctx = internal_cls;
4606
4607   (void) cls;
4608   GNUNET_assert (client == cli_ctx->client);
4609   if (NULL == client)
4610   {/* shutdown task - destroy all clients */
4611     while (NULL != cli_ctx_head)
4612       destroy_cli_ctx (cli_ctx_head);
4613   }
4614   else
4615   { /* destroy this client */
4616     LOG (GNUNET_ERROR_TYPE_DEBUG,
4617         "Client disconnected. Destroy its context.\n");
4618     destroy_cli_ctx (cli_ctx);
4619   }
4620 }
4621
4622
4623 /**
4624  * Handle random peer sampling clients.
4625  *
4626  * @param cls closure
4627  * @param c configuration to use
4628  * @param service the initialized service
4629  */
4630 static void
4631 run (void *cls,
4632      const struct GNUNET_CONFIGURATION_Handle *c,
4633      struct GNUNET_SERVICE_Handle *service)
4634 {
4635   struct GNUNET_TIME_Relative round_interval;
4636   long long unsigned int sampler_size;
4637   char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4638   struct GNUNET_HashCode hash;
4639
4640   (void) cls;
4641   (void) service;
4642
4643   GNUNET_log_setup ("rps",
4644                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4645                     NULL);
4646   cfg = c;
4647   /* Get own ID */
4648   GNUNET_CRYPTO_get_peer_identity (cfg,
4649                                    &own_identity); // TODO check return value
4650   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4651               "STARTING SERVICE (rps) for peer [%s]\n",
4652               GNUNET_i2s (&own_identity));
4653 #ifdef ENABLE_MALICIOUS
4654   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4655               "Malicious execution compiled in.\n");
4656 #endif /* ENABLE_MALICIOUS */
4657
4658   /* Get time interval from the configuration */
4659   if (GNUNET_OK !=
4660       GNUNET_CONFIGURATION_get_value_time (cfg,
4661                                            "RPS",
4662                                            "ROUNDINTERVAL",
4663                                            &round_interval))
4664   {
4665     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4666                                "RPS", "ROUNDINTERVAL");
4667     GNUNET_SCHEDULER_shutdown ();
4668     return;
4669   }
4670
4671   /* Get initial size of sampler/view from the configuration */
4672   if (GNUNET_OK !=
4673       GNUNET_CONFIGURATION_get_value_number (cfg,
4674                                              "RPS",
4675                                              "MINSIZE",
4676                                              &sampler_size))
4677   {
4678     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4679                                "RPS", "MINSIZE");
4680     GNUNET_SCHEDULER_shutdown ();
4681     return;
4682   }
4683
4684   cadet_handle = GNUNET_CADET_connect (cfg);
4685   GNUNET_assert (NULL != cadet_handle);
4686   core_handle = GNUNET_CORE_connect (cfg,
4687                                      NULL, /* cls */
4688                                      core_init, /* init */
4689                                      core_connects, /* connects */
4690                                      core_disconnects, /* disconnects */
4691                                      NULL); /* handlers */
4692   GNUNET_assert (NULL != core_handle);
4693
4694
4695   alpha = 0.45;
4696   beta  = 0.45;
4697
4698
4699   /* Set up main Sub */
4700   GNUNET_CRYPTO_hash (hash_port_string,
4701                       strlen (hash_port_string),
4702                       &hash);
4703   msub = new_sub (&hash,
4704                  sampler_size, /* Will be overwritten by config */
4705                  round_interval);
4706
4707
4708   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4709
4710   /* connect to NSE */
4711   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4712
4713   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4714   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4715   // TODO send push/pull to each of those peers?
4716   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4717   restore_valid_peers (msub);
4718   get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4719
4720   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4721                                                    GNUNET_NO,
4722                                                    process_peerinfo_peers,
4723                                                    msub);
4724
4725   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4726
4727   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4728   stats = GNUNET_STATISTICS_create ("rps", cfg);
4729 }
4730
4731
4732 /**
4733  * Define "main" method using service macro.
4734  */
4735 GNUNET_SERVICE_MAIN
4736 ("rps",
4737  GNUNET_SERVICE_OPTION_NONE,
4738  &run,
4739  &client_connect_cb,
4740  &client_disconnect_cb,
4741  NULL,
4742  GNUNET_MQ_hd_var_size (client_seed,
4743    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4744    struct GNUNET_RPS_CS_SeedMessage,
4745    NULL),
4746 #ifdef ENABLE_MALICIOUS
4747  GNUNET_MQ_hd_var_size (client_act_malicious,
4748    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4749    struct GNUNET_RPS_CS_ActMaliciousMessage,
4750    NULL),
4751 #endif /* ENABLE_MALICIOUS */
4752  GNUNET_MQ_hd_fixed_size (client_view_request,
4753    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4754    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4755    NULL),
4756  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4757    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4758    struct GNUNET_MessageHeader,
4759    NULL),
4760  GNUNET_MQ_hd_fixed_size (client_stream_request,
4761    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4762    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4763    NULL),
4764  GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4765    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4766    struct GNUNET_MessageHeader,
4767    NULL),
4768  GNUNET_MQ_hd_fixed_size (client_start_sub,
4769    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4770    struct GNUNET_RPS_CS_SubStartMessage,
4771    NULL),
4772  GNUNET_MQ_hd_fixed_size (client_stop_sub,
4773    GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4774    struct GNUNET_RPS_CS_SubStopMessage,
4775    NULL),
4776  GNUNET_MQ_handler_end());
4777
4778 /* end of gnunet-service-rps.c */