Destroy channel and their datastructures properly
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO store peers somewhere persistent
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /**
57  * Our configuration.
58  */
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62  * Handle to the statistics service.
63  */
64 static struct GNUNET_STATISTICS_Handle *stats;
65
66 /**
67  * Our own identity.
68  */
69 static struct GNUNET_PeerIdentity own_identity;
70
71 static int in_shutdown = GNUNET_NO;
72
73 /**
74  * @brief Port used for cadet.
75  *
76  * Don't compute multiple times through making it global
77  */
78 static struct GNUNET_HashCode port;
79
80 /***********************************************************************
81  * Old gnunet-service-rps_peers.c
82 ***********************************************************************/
83
84 /**
85  * Set a peer flag of given peer context.
86  */
87 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
88
89 /**
90  * Get peer flag of given peer context.
91  */
92 #define check_peer_flag_set(peer_ctx, mask)\
93   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
94
95 /**
96  * Unset flag of given peer context.
97  */
98 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * @brief Context for a channel
164  */
165 struct ChannelCtx;
166
167 /**
168  * Struct used to keep track of other peer's status
169  *
170  * This is stored in a multipeermap.
171  * It contains information such as cadet channels, a message queue for sending,
172  * status about the channels, the pending operations on this peer and some flags
173  * about the status of the peer itself. (live, valid, ...)
174  */
175 struct PeerContext
176 {
177   /**
178    * Message queue open to client
179    */
180   struct GNUNET_MQ_Handle *mq;
181
182   /**
183    * Channel open to client.
184    */
185   struct ChannelCtx *send_channel_ctx;
186
187   /**
188    * Channel open from client.
189    */
190   struct ChannelCtx *recv_channel_ctx;
191
192   /**
193    * Array of pending operations on this peer.
194    */
195   struct PeerPendingOp *pending_ops;
196
197   /**
198    * Handle to the callback given to cadet_ntfy_tmt_rdy()
199    *
200    * To be canceled on shutdown.
201    */
202   struct PendingMessage *liveliness_check_pending;
203
204   /**
205    * Number of pending operations.
206    */
207   unsigned int num_pending_ops;
208
209   /**
210    * Identity of the peer
211    */
212   struct GNUNET_PeerIdentity peer_id;
213
214   /**
215    * Flags indicating status of peer
216    */
217   uint32_t peer_flags;
218
219   /**
220    * Last time we received something from that peer.
221    */
222   struct GNUNET_TIME_Absolute last_message_recv;
223
224   /**
225    * Last time we received a keepalive message.
226    */
227   struct GNUNET_TIME_Absolute last_keepalive;
228
229   /**
230    * DLL with all messages that are yet to be sent
231    */
232   struct PendingMessage *pending_messages_head;
233   struct PendingMessage *pending_messages_tail;
234
235   /**
236    * This is pobably followed by 'statistical' data (when we first saw
237    * it, how did we get its ID, how many pushes (in a timeinterval),
238    * ...)
239    */
240 };
241
242 /**
243  * @brief Closure to #valid_peer_iterator
244  */
245 struct PeersIteratorCls
246 {
247   /**
248    * Iterator function
249    */
250   PeersIterator iterator;
251
252   /**
253    * Closure to iterator
254    */
255   void *cls;
256 };
257
258 /**
259  * @brief Context for a channel
260  */
261 struct ChannelCtx
262 {
263   /**
264    * @brief The channel itself
265    */
266   struct GNUNET_CADET_Channel *channel;
267
268   /**
269    * @brief The peer context associated with the channel
270    */
271   struct PeerContext *peer_ctx;
272
273   /**
274    * @brief When channel destruction needs to be delayed (because it is called
275    * from within the cadet routine of another channel destruction) this task
276    * refers to the respective _SCHEDULER_Task.
277    */
278   struct GNUNET_SCHEDULER_Task *destruction_task;
279 };
280
281 /**
282  * @brief Hashmap of valid peers.
283  */
284 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
285
286 /**
287  * @brief Maximum number of valid peers to keep.
288  * TODO read from config
289  */
290 static uint32_t num_valid_peers_max = UINT32_MAX;
291
292 /**
293  * @brief Filename of the file that stores the valid peers persistently.
294  */
295 static char *filename_valid_peers;
296
297 /**
298  * Set of all peers to keep track of them.
299  */
300 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
301
302 /**
303  * Cadet handle.
304  */
305 static struct GNUNET_CADET_Handle *cadet_handle;
306
307
308 /**
309  * @brief Get the #PeerContext associated with a peer
310  *
311  * @param peer the peer id
312  *
313  * @return the #PeerContext
314  */
315 static struct PeerContext *
316 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
317 {
318   struct PeerContext *ctx;
319   int ret;
320
321   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
322   GNUNET_assert (GNUNET_YES == ret);
323   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
324   GNUNET_assert (NULL != ctx);
325   return ctx;
326 }
327
328 /**
329  * @brief Check whether we have information about the given peer.
330  *
331  * FIXME probably deprecated. Make this the new _online.
332  *
333  * @param peer peer in question
334  *
335  * @return #GNUNET_YES if peer is known
336  *         #GNUNET_NO  if peer is not knwon
337  */
338 static int
339 check_peer_known (const struct GNUNET_PeerIdentity *peer)
340 {
341   if (NULL != peer_map)
342   {
343     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
344   } else
345   {
346     return GNUNET_NO;
347   }
348 }
349
350
351 /**
352  * @brief Create a new #PeerContext and insert it into the peer map
353  *
354  * @param peer the peer to create the #PeerContext for
355  *
356  * @return the #PeerContext
357  */
358 static struct PeerContext *
359 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
360 {
361   struct PeerContext *ctx;
362   int ret;
363
364   GNUNET_assert (GNUNET_NO == check_peer_known (peer));
365
366   ctx = GNUNET_new (struct PeerContext);
367   ctx->peer_id = *peer;
368   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
369       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
370   GNUNET_assert (GNUNET_OK == ret);
371   return ctx;
372 }
373
374
375 /**
376  * @brief Create or get a #PeerContext
377  *
378  * @param peer the peer to get the associated context to
379  *
380  * @return the context
381  */
382 static struct PeerContext *
383 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
384 {
385   if (GNUNET_NO == check_peer_known (peer))
386   {
387     return create_peer_ctx (peer);
388   }
389   return get_peer_ctx (peer);
390 }
391
392
393 /**
394  * @brief Check whether we have a connection to this @a peer
395  *
396  * Also sets the #Peers_ONLINE flag accordingly
397  *
398  * @param peer the peer in question
399  *
400  * @return #GNUNET_YES if we are connected
401  *         #GNUNET_NO  otherwise
402  */
403 static int
404 check_connected (const struct GNUNET_PeerIdentity *peer)
405 {
406   struct PeerContext *peer_ctx;
407
408   /* If we don't know about this peer we don't know whether it's online */
409   if (GNUNET_NO == check_peer_known (peer))
410   {
411     return GNUNET_NO;
412   }
413   /* Get the context */
414   peer_ctx = get_peer_ctx (peer);
415   /* If we have no channel to this peer we don't know whether it's online */
416   if ( (NULL == peer_ctx->send_channel_ctx) &&
417        (NULL == peer_ctx->recv_channel_ctx) )
418   {
419     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
420     return GNUNET_NO;
421   }
422   /* Otherwise (if we have a channel, we know that it's online */
423   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
424   return GNUNET_YES;
425 }
426
427
428 /**
429  * @brief The closure to #get_rand_peer_iterator.
430  */
431 struct GetRandPeerIteratorCls
432 {
433   /**
434    * @brief The index of the peer to return.
435    * Will be decreased until 0.
436    * Then current peer is returned.
437    */
438   uint32_t index;
439
440   /**
441    * @brief Pointer to peer to return.
442    */
443   const struct GNUNET_PeerIdentity *peer;
444 };
445
446
447 /**
448  * @brief Iterator function for #get_random_peer_from_peermap.
449  *
450  * Implements #GNUNET_CONTAINER_PeerMapIterator.
451  * Decreases the index until the index is null.
452  * Then returns the current peer.
453  *
454  * @param cls the #GetRandPeerIteratorCls containing index and peer
455  * @param peer current peer
456  * @param value unused
457  *
458  * @return  #GNUNET_YES if we should continue to
459  *          iterate,
460  *          #GNUNET_NO if not.
461  */
462 static int
463 get_rand_peer_iterator (void *cls,
464                         const struct GNUNET_PeerIdentity *peer,
465                         void *value)
466 {
467   struct GetRandPeerIteratorCls *iterator_cls = cls;
468   if (0 >= iterator_cls->index)
469   {
470     iterator_cls->peer = peer;
471     return GNUNET_NO;
472   }
473   iterator_cls->index--;
474   return GNUNET_YES;
475 }
476
477
478 /**
479  * @brief Get a random peer from @a peer_map
480  *
481  * @param peer_map the peer_map to get the peer from
482  *
483  * @return a random peer
484  */
485 static const struct GNUNET_PeerIdentity *
486 get_random_peer_from_peermap (const struct
487                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
488 {
489   struct GetRandPeerIteratorCls *iterator_cls;
490   const struct GNUNET_PeerIdentity *ret;
491
492   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
493   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
494       GNUNET_CONTAINER_multipeermap_size (peer_map));
495   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
496                                                 get_rand_peer_iterator,
497                                                 iterator_cls);
498   ret = iterator_cls->peer;
499   GNUNET_free (iterator_cls);
500   return ret;
501 }
502
503
504 /**
505  * @brief Add a given @a peer to valid peers.
506  *
507  * If valid peers are already #num_valid_peers_max, delete a peer previously.
508  *
509  * @param peer the peer that is added to the valid peers.
510  *
511  * @return #GNUNET_YES if no other peer had to be removed
512  *         #GNUNET_NO  otherwise
513  */
514 static int
515 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
516 {
517   const struct GNUNET_PeerIdentity *rand_peer;
518   int ret;
519
520   ret = GNUNET_YES;
521   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
522   {
523     rand_peer = get_random_peer_from_peermap (valid_peers);
524     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
525     ret = GNUNET_NO;
526   }
527   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
528       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
529   return ret;
530 }
531
532 static void
533 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
534
535 /**
536  * @brief Set the peer flag to living and
537  *        call the pending operations on this peer.
538  *
539  * Also adds peer to #valid_peers.
540  *
541  * @param peer_ctx the #PeerContext of the peer to set live
542  */
543 static void
544 set_peer_live (struct PeerContext *peer_ctx)
545 {
546   struct GNUNET_PeerIdentity *peer;
547   unsigned int i;
548
549   peer = &peer_ctx->peer_id;
550   LOG (GNUNET_ERROR_TYPE_DEBUG,
551       "Peer %s is live and valid, calling %i pending operations on it\n",
552       GNUNET_i2s (peer),
553       peer_ctx->num_pending_ops);
554
555   if (NULL != peer_ctx->liveliness_check_pending)
556   {
557     LOG (GNUNET_ERROR_TYPE_DEBUG,
558          "Removing pending liveliness check for peer %s\n",
559          GNUNET_i2s (&peer_ctx->peer_id));
560     // TODO wait until cadet sets mq->cancel_impl
561     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
562     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
563     peer_ctx->liveliness_check_pending = NULL;
564   }
565
566   (void) add_valid_peer (peer);
567   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
568
569   /* Call pending operations */
570   for (i = 0; i < peer_ctx->num_pending_ops; i++)
571   {
572     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
573   }
574   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
575 }
576
577 static void
578 cleanup_destroyed_channel (void *cls,
579                            const struct GNUNET_CADET_Channel *channel);
580
581 /* Declaration of handlers */
582 static void
583 handle_peer_check (void *cls,
584                    const struct GNUNET_MessageHeader *msg);
585
586 static void
587 handle_peer_push (void *cls,
588                   const struct GNUNET_MessageHeader *msg);
589
590 static void
591 handle_peer_pull_request (void *cls,
592                           const struct GNUNET_MessageHeader *msg);
593
594 static int
595 check_peer_pull_reply (void *cls,
596                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
597
598 static void
599 handle_peer_pull_reply (void *cls,
600                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
601
602 /* End declaration of handlers */
603
604 /**
605  * @brief Allocate memory for a new channel context and insert it into DLL
606  *
607  * @param peer_ctx context of the according peer
608  *
609  * @return The channel context
610  */
611 static struct ChannelCtx *
612 add_channel_ctx (struct PeerContext *peer_ctx)
613 {
614   struct ChannelCtx *channel_ctx;
615   channel_ctx = GNUNET_new (struct ChannelCtx);
616   channel_ctx->peer_ctx = peer_ctx;
617   return channel_ctx;
618 }
619
620
621 /**
622  * @brief Free memory and NULL pointers.
623  *
624  * @param channel_ctx The channel context.
625  */
626 static void
627 remove_channel_ctx (struct ChannelCtx *channel_ctx)
628 {
629   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
630
631   if (NULL != channel_ctx->destruction_task)
632   {
633     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
634     channel_ctx->destruction_task = NULL;
635   }
636
637   GNUNET_free (channel_ctx);
638
639   if (NULL == peer_ctx) return;
640   if (channel_ctx == peer_ctx->send_channel_ctx)
641   {
642     peer_ctx->send_channel_ctx = NULL;
643     peer_ctx->mq = NULL;
644   }
645   else if (channel_ctx == peer_ctx->recv_channel_ctx)
646   {
647     peer_ctx->recv_channel_ctx = NULL;
648   }
649 }
650
651
652 /**
653  * @brief Get the channel of a peer. If not existing, create.
654  *
655  * @param peer the peer id
656  * @return the #GNUNET_CADET_Channel used to send data to @a peer
657  */
658 struct GNUNET_CADET_Channel *
659 get_channel (const struct GNUNET_PeerIdentity *peer)
660 {
661   struct PeerContext *peer_ctx;
662   struct GNUNET_PeerIdentity *ctx_peer;
663   /* There exists a copy-paste-clone in run() */
664   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
665     GNUNET_MQ_hd_fixed_size (peer_check,
666                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
667                              struct GNUNET_MessageHeader,
668                              NULL),
669     GNUNET_MQ_hd_fixed_size (peer_push,
670                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
671                              struct GNUNET_MessageHeader,
672                              NULL),
673     GNUNET_MQ_hd_fixed_size (peer_pull_request,
674                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
675                              struct GNUNET_MessageHeader,
676                              NULL),
677     GNUNET_MQ_hd_var_size (peer_pull_reply,
678                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
679                            struct GNUNET_RPS_P2P_PullReplyMessage,
680                            NULL),
681     GNUNET_MQ_handler_end ()
682   };
683
684
685   peer_ctx = get_peer_ctx (peer);
686   if (NULL == peer_ctx->send_channel_ctx)
687   {
688     LOG (GNUNET_ERROR_TYPE_DEBUG,
689          "Trying to establish channel to peer %s\n",
690          GNUNET_i2s (peer));
691     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
692     *ctx_peer = *peer;
693     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
694     peer_ctx->send_channel_ctx->channel =
695       GNUNET_CADET_channel_create (cadet_handle,
696                                    peer_ctx->send_channel_ctx, /* context */
697                                    peer,
698                                    &port,
699                                    GNUNET_CADET_OPTION_RELIABLE,
700                                    NULL, /* WindowSize handler */
701                                    &cleanup_destroyed_channel, /* Disconnect handler */
702                                    cadet_handlers);
703   }
704   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
705   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
706   return peer_ctx->send_channel_ctx->channel;
707 }
708
709
710 /**
711  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
712  *
713  * If we already have a message queue open to this client,
714  * simply return it, otherways create one.
715  *
716  * @param peer the peer to get the mq to
717  * @return the #GNUNET_MQ_Handle
718  */
719 static struct GNUNET_MQ_Handle *
720 get_mq (const struct GNUNET_PeerIdentity *peer)
721 {
722   struct PeerContext *peer_ctx;
723
724   peer_ctx = get_peer_ctx (peer);
725
726   if (NULL == peer_ctx->mq)
727   {
728     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
729   }
730   return peer_ctx->mq;
731 }
732
733 /**
734  * @brief Add an envelope to a message passed to mq to list of pending messages
735  *
736  * @param peer peer the message was sent to
737  * @param ev envelope to the message
738  * @param type type of the message to be sent
739  * @return pointer to pending message
740  */
741 static struct PendingMessage *
742 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
743                         struct GNUNET_MQ_Envelope *ev,
744                         const char *type)
745 {
746   struct PendingMessage *pending_msg;
747   struct PeerContext *peer_ctx;
748
749   peer_ctx = get_peer_ctx (peer);
750   pending_msg = GNUNET_new (struct PendingMessage);
751   pending_msg->ev = ev;
752   pending_msg->peer_ctx = peer_ctx;
753   pending_msg->type = type;
754   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
755                                peer_ctx->pending_messages_tail,
756                                pending_msg);
757   return pending_msg;
758 }
759
760
761 /**
762  * @brief Remove a pending message from the respective DLL
763  *
764  * @param pending_msg the pending message to remove
765  * @param cancel cancel the pending message, too
766  */
767 static void
768 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
769 {
770   struct PeerContext *peer_ctx;
771
772   peer_ctx = pending_msg->peer_ctx;
773   GNUNET_assert (NULL != peer_ctx);
774   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
775                                peer_ctx->pending_messages_tail,
776                                pending_msg);
777   // TODO wait for the cadet implementation of message cancellation
778   //if (GNUNET_YES == cancel)
779   //{
780   //  GNUNET_MQ_send_cancel (pending_msg->ev);
781   //}
782   GNUNET_free (pending_msg);
783 }
784
785
786 /**
787  * @brief This is called in response to the first message we sent as a
788  * liveliness check.
789  *
790  * @param cls #PeerContext of peer with pending liveliness check
791  */
792 static void
793 mq_liveliness_check_successful (void *cls)
794 {
795   struct PeerContext *peer_ctx = cls;
796
797   if (NULL != peer_ctx->liveliness_check_pending)
798   {
799     LOG (GNUNET_ERROR_TYPE_DEBUG,
800         "Liveliness check for peer %s was successfull\n",
801         GNUNET_i2s (&peer_ctx->peer_id));
802     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
803     peer_ctx->liveliness_check_pending = NULL;
804     set_peer_live (peer_ctx);
805   }
806 }
807
808 /**
809  * Issue a check whether peer is live
810  *
811  * @param peer_ctx the context of the peer
812  */
813 static void
814 check_peer_live (struct PeerContext *peer_ctx)
815 {
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Get informed about peer %s getting live\n",
818        GNUNET_i2s (&peer_ctx->peer_id));
819
820   struct GNUNET_MQ_Handle *mq;
821   struct GNUNET_MQ_Envelope *ev;
822
823   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
824   peer_ctx->liveliness_check_pending =
825     insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
826   mq = get_mq (&peer_ctx->peer_id);
827   GNUNET_MQ_notify_sent (ev,
828                          mq_liveliness_check_successful,
829                          peer_ctx);
830   GNUNET_MQ_send (mq, ev);
831 }
832
833
834 /**
835  * @brief Check whether function of type #PeerOp was already scheduled
836  *
837  * The array with pending operations will probably never grow really big, so
838  * iterating over it should be ok.
839  *
840  * @param peer the peer to check
841  * @param peer_op the operation (#PeerOp) on the peer
842  *
843  * @return #GNUNET_YES if this operation is scheduled on that peer
844  *         #GNUNET_NO  otherwise
845  */
846 static int
847 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
848                            const PeerOp peer_op)
849 {
850   const struct PeerContext *peer_ctx;
851   unsigned int i;
852
853   peer_ctx = get_peer_ctx (peer);
854   for (i = 0; i < peer_ctx->num_pending_ops; i++)
855     if (peer_op == peer_ctx->pending_ops[i].op)
856       return GNUNET_YES;
857   return GNUNET_NO;
858 }
859
860
861 /**
862  * @brief Callback for scheduler to destroy a channel
863  *
864  * @param cls Context of the channel
865  */
866 static void
867 destroy_channel (struct ChannelCtx *channel_ctx)
868 {
869   struct GNUNET_CADET_Channel *channel;
870
871   if (NULL != channel_ctx->destruction_task)
872   {
873     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
874     channel_ctx->destruction_task = NULL;
875   }
876   GNUNET_assert (channel_ctx->channel != NULL);
877   channel = channel_ctx->channel;
878   channel_ctx->channel = NULL;
879   GNUNET_CADET_channel_destroy (channel);
880   remove_channel_ctx (channel_ctx);
881 }
882
883
884 /**
885  * @brief Destroy a cadet channel.
886  *
887  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
888  *
889  * @param cls
890  */
891 static void
892 destroy_channel_cb (void *cls)
893 {
894   struct ChannelCtx *channel_ctx = cls;
895
896   channel_ctx->destruction_task = NULL;
897   destroy_channel (channel_ctx);
898 }
899
900
901 /**
902  * @brief Schedule the destruction of a channel for immediately afterwards.
903  *
904  * In case a channel is to be destroyed from within the callback to the
905  * destruction of another channel (send channel), we cannot call
906  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
907  * construction.
908  *
909  * @param channel_ctx channel to be destroyed.
910  */
911 static void
912 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
913 {
914   GNUNET_assert (NULL ==
915                  channel_ctx->destruction_task);
916   GNUNET_assert (NULL !=
917                  channel_ctx->channel);
918   channel_ctx->destruction_task =
919     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
920                               channel_ctx);
921 }
922
923
924 /**
925  * @brief Remove peer
926  *
927  * @param peer the peer to clean
928  * @return #GNUNET_YES if peer was removed
929  *         #GNUNET_NO  otherwise
930  */
931 static int
932 destroy_peer (struct PeerContext *peer_ctx)
933 {
934   GNUNET_assert (NULL != peer_ctx);
935   GNUNET_assert (NULL != peer_map);
936   if (GNUNET_NO ==
937       GNUNET_CONTAINER_multipeermap_contains (peer_map,
938                                               &peer_ctx->peer_id))
939   {
940     return GNUNET_NO;
941   }
942   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
943   LOG (GNUNET_ERROR_TYPE_DEBUG,
944        "Going to remove peer %s\n",
945        GNUNET_i2s (&peer_ctx->peer_id));
946   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
947
948   /* Clear list of pending operations */
949   // TODO this probably leaks memory
950   //      ('only' the cls to the function. Not sure what to do with it)
951   GNUNET_array_grow (peer_ctx->pending_ops,
952                      peer_ctx->num_pending_ops,
953                      0);
954   /* Remove all pending messages */
955   while (NULL != peer_ctx->pending_messages_head)
956   {
957     LOG (GNUNET_ERROR_TYPE_DEBUG,
958          "Removing unsent %s\n",
959          peer_ctx->pending_messages_head->type);
960     /* Cancle pending message, too */
961     if ( (NULL != peer_ctx->liveliness_check_pending) &&
962          (0 == memcmp (peer_ctx->pending_messages_head,
963                      peer_ctx->liveliness_check_pending,
964                      sizeof (struct PendingMessage))) )
965       {
966         // TODO this may leak memory
967         peer_ctx->liveliness_check_pending = NULL;
968       }
969     remove_pending_message (peer_ctx->pending_messages_head,
970                             GNUNET_YES);
971   }
972
973   /* If we are still waiting for notification whether this peer is live
974    * cancel the according task */
975   if (NULL != peer_ctx->liveliness_check_pending)
976   {
977     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978                 "Removing pending liveliness check for peer %s\n",
979                 GNUNET_i2s (&peer_ctx->peer_id));
980     // TODO wait until cadet sets mq->cancel_impl
981     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
982     remove_pending_message (peer_ctx->liveliness_check_pending,
983                             GNUNET_YES);
984     peer_ctx->liveliness_check_pending = NULL;
985   }
986
987   if (NULL != peer_ctx->send_channel_ctx)
988   {
989     /* This is possibly called from within channel destruction */
990     peer_ctx->send_channel_ctx->peer_ctx = NULL;
991     schedule_channel_destruction (peer_ctx->send_channel_ctx);
992     peer_ctx->send_channel_ctx = NULL;
993     peer_ctx->mq = NULL;
994   }
995   if (NULL != peer_ctx->recv_channel_ctx)
996   {
997     /* This is possibly called from within channel destruction */
998     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
999     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1000     peer_ctx->recv_channel_ctx = NULL;
1001   }
1002
1003   if (GNUNET_YES !=
1004       GNUNET_CONTAINER_multipeermap_remove_all (peer_map,
1005                                                 &peer_ctx->peer_id))
1006   {
1007     LOG (GNUNET_ERROR_TYPE_WARNING,
1008          "removing peer from peer_map failed\n");
1009   }
1010   GNUNET_free (peer_ctx);
1011   return GNUNET_YES;
1012 }
1013
1014
1015 /**
1016  * Iterator over hash map entries. Deletes all contexts of peers.
1017  *
1018  * @param cls closure
1019  * @param key current public key
1020  * @param value value in the hash map
1021  * @return #GNUNET_YES if we should continue to iterate,
1022  *         #GNUNET_NO if not.
1023  */
1024 static int
1025 peermap_clear_iterator (void *cls,
1026                         const struct GNUNET_PeerIdentity *key,
1027                         void *value)
1028 {
1029   destroy_peer (get_peer_ctx (key));
1030   return GNUNET_YES;
1031 }
1032
1033
1034 /**
1035  * @brief This is called once a message is sent.
1036  *
1037  * Removes the pending message
1038  *
1039  * @param cls type of the message that was sent
1040  */
1041 static void
1042 mq_notify_sent_cb (void *cls)
1043 {
1044   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1045   LOG (GNUNET_ERROR_TYPE_DEBUG,
1046       "%s was sent.\n",
1047       pending_msg->type);
1048   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1049     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1050   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1051     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1052   if (0 == strncmp ("PUSH", pending_msg->type, 4))
1053     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1054   /* Do not cancle message */
1055   remove_pending_message (pending_msg, GNUNET_NO);
1056 }
1057
1058
1059 /**
1060  * @brief Iterator function for #store_valid_peers.
1061  *
1062  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1063  * Writes single peer to disk.
1064  *
1065  * @param cls the file handle to write to.
1066  * @param peer current peer
1067  * @param value unused
1068  *
1069  * @return  #GNUNET_YES if we should continue to
1070  *          iterate,
1071  *          #GNUNET_NO if not.
1072  */
1073 static int
1074 store_peer_presistently_iterator (void *cls,
1075                                   const struct GNUNET_PeerIdentity *peer,
1076                                   void *value)
1077 {
1078   const struct GNUNET_DISK_FileHandle *fh = cls;
1079   char peer_string[128];
1080   int size;
1081   ssize_t ret;
1082
1083   if (NULL == peer)
1084   {
1085     return GNUNET_YES;
1086   }
1087   size = GNUNET_snprintf (peer_string,
1088                           sizeof (peer_string),
1089                           "%s\n",
1090                           GNUNET_i2s_full (peer));
1091   GNUNET_assert (53 == size);
1092   ret = GNUNET_DISK_file_write (fh,
1093                                 peer_string,
1094                                 size);
1095   GNUNET_assert (size == ret);
1096   return GNUNET_YES;
1097 }
1098
1099
1100 /**
1101  * @brief Store the peers currently in #valid_peers to disk.
1102  */
1103 static void
1104 store_valid_peers ()
1105 {
1106   struct GNUNET_DISK_FileHandle *fh;
1107   uint32_t number_written_peers;
1108   int ret;
1109
1110   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1111   {
1112     return;
1113   }
1114
1115   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
1116   if (GNUNET_SYSERR == ret)
1117   {
1118     LOG (GNUNET_ERROR_TYPE_WARNING,
1119         "Not able to create directory for file `%s'\n",
1120         filename_valid_peers);
1121     GNUNET_break (0);
1122   }
1123   else if (GNUNET_NO == ret)
1124   {
1125     LOG (GNUNET_ERROR_TYPE_WARNING,
1126         "Directory for file `%s' exists but is not writable for us\n",
1127         filename_valid_peers);
1128     GNUNET_break (0);
1129   }
1130   fh = GNUNET_DISK_file_open (filename_valid_peers,
1131                               GNUNET_DISK_OPEN_WRITE |
1132                                   GNUNET_DISK_OPEN_CREATE,
1133                               GNUNET_DISK_PERM_USER_READ |
1134                                   GNUNET_DISK_PERM_USER_WRITE);
1135   if (NULL == fh)
1136   {
1137     LOG (GNUNET_ERROR_TYPE_WARNING,
1138         "Not able to write valid peers to file `%s'\n",
1139         filename_valid_peers);
1140     return;
1141   }
1142   LOG (GNUNET_ERROR_TYPE_DEBUG,
1143       "Writing %u valid peers to disk\n",
1144       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1145   number_written_peers =
1146     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1147                                            store_peer_presistently_iterator,
1148                                            fh);
1149   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1150   GNUNET_assert (number_written_peers ==
1151       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1152 }
1153
1154
1155 /**
1156  * @brief Convert string representation of peer id to peer id.
1157  *
1158  * Counterpart to #GNUNET_i2s_full.
1159  *
1160  * @param string_repr The string representation of the peer id
1161  *
1162  * @return The peer id
1163  */
1164 static const struct GNUNET_PeerIdentity *
1165 s2i_full (const char *string_repr)
1166 {
1167   struct GNUNET_PeerIdentity *peer;
1168   size_t len;
1169   int ret;
1170
1171   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1172   len = strlen (string_repr);
1173   if (52 > len)
1174   {
1175     LOG (GNUNET_ERROR_TYPE_WARNING,
1176         "Not able to convert string representation of PeerID to PeerID\n"
1177         "Sting representation: %s (len %lu) - too short\n",
1178         string_repr,
1179         len);
1180     GNUNET_break (0);
1181   }
1182   else if (52 < len)
1183   {
1184     len = 52;
1185   }
1186   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1187                                                     len,
1188                                                     &peer->public_key);
1189   if (GNUNET_OK != ret)
1190   {
1191     LOG (GNUNET_ERROR_TYPE_WARNING,
1192         "Not able to convert string representation of PeerID to PeerID\n"
1193         "Sting representation: %s\n",
1194         string_repr);
1195     GNUNET_break (0);
1196   }
1197   return peer;
1198 }
1199
1200
1201 /**
1202  * @brief Restore the peers on disk to #valid_peers.
1203  */
1204 static void
1205 restore_valid_peers ()
1206 {
1207   off_t file_size;
1208   uint32_t num_peers;
1209   struct GNUNET_DISK_FileHandle *fh;
1210   char *buf;
1211   ssize_t size_read;
1212   char *iter_buf;
1213   char *str_repr;
1214   const struct GNUNET_PeerIdentity *peer;
1215
1216   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1217   {
1218     return;
1219   }
1220
1221   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1222   {
1223     return;
1224   }
1225   fh = GNUNET_DISK_file_open (filename_valid_peers,
1226                               GNUNET_DISK_OPEN_READ,
1227                               GNUNET_DISK_PERM_NONE);
1228   GNUNET_assert (NULL != fh);
1229   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1230   num_peers = file_size / 53;
1231   buf = GNUNET_malloc (file_size);
1232   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1233   GNUNET_assert (size_read == file_size);
1234   LOG (GNUNET_ERROR_TYPE_DEBUG,
1235       "Restoring %" PRIu32 " peers from file `%s'\n",
1236       num_peers,
1237       filename_valid_peers);
1238   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1239   {
1240     str_repr = GNUNET_strndup (iter_buf, 53);
1241     peer = s2i_full (str_repr);
1242     GNUNET_free (str_repr);
1243     add_valid_peer (peer);
1244     LOG (GNUNET_ERROR_TYPE_DEBUG,
1245         "Restored valid peer %s from disk\n",
1246         GNUNET_i2s_full (peer));
1247   }
1248   iter_buf = NULL;
1249   GNUNET_free (buf);
1250   LOG (GNUNET_ERROR_TYPE_DEBUG,
1251       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1252       num_peers,
1253       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1254   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1255   {
1256     LOG (GNUNET_ERROR_TYPE_WARNING,
1257         "Number of restored peers does not match file size. Have probably duplicates.\n");
1258   }
1259   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1260   LOG (GNUNET_ERROR_TYPE_DEBUG,
1261       "Restored %u valid peers from disk\n",
1262       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1263 }
1264
1265
1266 /**
1267  * @brief Initialise storage of peers
1268  *
1269  * @param fn_valid_peers filename of the file used to store valid peer ids
1270  * @param cadet_h cadet handle
1271  * @param own_id own peer identity
1272  */
1273 static void
1274 initialise_peers (char* fn_valid_peers,
1275                   struct GNUNET_CADET_Handle *cadet_h)
1276 {
1277   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1278   cadet_handle = cadet_h;
1279   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1280   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1281   restore_valid_peers ();
1282 }
1283
1284
1285 /**
1286  * @brief Delete storage of peers that was created with #initialise_peers ()
1287  */
1288 static void
1289 peers_terminate ()
1290 {
1291   if (GNUNET_SYSERR ==
1292       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1293                                              &peermap_clear_iterator,
1294                                              NULL))
1295   {
1296     LOG (GNUNET_ERROR_TYPE_WARNING,
1297         "Iteration destroying peers was aborted.\n");
1298   }
1299   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1300   peer_map = NULL;
1301   store_valid_peers ();
1302   GNUNET_free (filename_valid_peers);
1303   filename_valid_peers = NULL;
1304   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1305   valid_peers = NULL;
1306 }
1307
1308
1309 /**
1310  * Iterator over #valid_peers hash map entries.
1311  *
1312  * @param cls closure - unused
1313  * @param peer current peer id
1314  * @param value value in the hash map - unused
1315  * @return #GNUNET_YES if we should continue to
1316  *         iterate,
1317  *         #GNUNET_NO if not.
1318  */
1319 static int
1320 valid_peer_iterator (void *cls,
1321                      const struct GNUNET_PeerIdentity *peer,
1322                      void *value)
1323 {
1324   struct PeersIteratorCls *it_cls = cls;
1325
1326   return it_cls->iterator (it_cls->cls,
1327                            peer);
1328 }
1329
1330
1331 /**
1332  * @brief Get all currently known, valid peer ids.
1333  *
1334  * @param it function to call on each peer id
1335  * @param it_cls extra argument to @a it
1336  * @return the number of key value pairs processed,
1337  *         #GNUNET_SYSERR if it aborted iteration
1338  */
1339 static int
1340 get_valid_peers (PeersIterator iterator,
1341                  void *it_cls)
1342 {
1343   struct PeersIteratorCls *cls;
1344   int ret;
1345
1346   cls = GNUNET_new (struct PeersIteratorCls);
1347   cls->iterator = iterator;
1348   cls->cls = it_cls;
1349   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1350                                                valid_peer_iterator,
1351                                                cls);
1352   GNUNET_free (cls);
1353   return ret;
1354 }
1355
1356
1357 /**
1358  * @brief Add peer to known peers.
1359  *
1360  * This function is called on new peer_ids from 'external' sources
1361  * (client seed, cadet get_peers(), ...)
1362  *
1363  * @param peer the new #GNUNET_PeerIdentity
1364  *
1365  * @return #GNUNET_YES if peer was inserted
1366  *         #GNUNET_NO  otherwise
1367  */
1368 static int
1369 insert_peer (const struct GNUNET_PeerIdentity *peer)
1370 {
1371   if (GNUNET_YES == check_peer_known (peer))
1372   {
1373     return GNUNET_NO; /* We already know this peer - nothing to do */
1374   }
1375   (void) create_peer_ctx (peer);
1376   return GNUNET_YES;
1377 }
1378
1379
1380 /**
1381  * @brief Check whether flags on a peer are set.
1382  *
1383  * @param peer the peer to check the flag of
1384  * @param flags the flags to check
1385  *
1386  * @return #GNUNET_SYSERR if peer is not known
1387  *         #GNUNET_YES    if all given flags are set
1388  *         #GNUNET_NO     otherwise
1389  */
1390 static int
1391 check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1392                  enum Peers_PeerFlags flags)
1393 {
1394   struct PeerContext *peer_ctx;
1395
1396   if (GNUNET_NO == check_peer_known (peer))
1397   {
1398     return GNUNET_SYSERR;
1399   }
1400   peer_ctx = get_peer_ctx (peer);
1401   return check_peer_flag_set (peer_ctx, flags);
1402 }
1403
1404 /**
1405  * @brief Try connecting to a peer to see whether it is online
1406  *
1407  * If not known yet, insert into known peers
1408  *
1409  * @param peer the peer whose liveliness is to be checked
1410  * @return #GNUNET_YES if peer had to be inserted
1411  *         #GNUNET_NO  otherwise
1412  */
1413 static int
1414 issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1415 {
1416   struct PeerContext *peer_ctx;
1417   int ret;
1418
1419   ret = insert_peer (peer);
1420   peer_ctx = get_peer_ctx (peer);
1421   if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) &&
1422        (NULL == peer_ctx->liveliness_check_pending) )
1423   {
1424     check_peer_live (peer_ctx);
1425   }
1426   return ret;
1427 }
1428
1429
1430 /**
1431  * @brief Check if peer is removable.
1432  *
1433  * Check if
1434  *  - a recv channel exists
1435  *  - there are pending messages
1436  *  - there is no pending pull reply
1437  *
1438  * @param peer the peer in question
1439  * @return #GNUNET_YES    if peer is removable
1440  *         #GNUNET_NO     if peer is NOT removable
1441  *         #GNUNET_SYSERR if peer is not known
1442  */
1443 static int
1444 check_removable (const struct GNUNET_PeerIdentity *peer)
1445 {
1446   struct PeerContext *peer_ctx;
1447
1448   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1449   {
1450     return GNUNET_SYSERR;
1451   }
1452
1453   peer_ctx = get_peer_ctx (peer);
1454   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1455        (NULL != peer_ctx->pending_messages_head) ||
1456        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1457   {
1458     return GNUNET_NO;
1459   }
1460   return GNUNET_YES;
1461 }
1462
1463
1464 /**
1465  * @brief Check whether @a peer is actually a peer.
1466  *
1467  * A valid peer is a peer that we know exists eg. we were connected to once.
1468  *
1469  * @param peer peer in question
1470  *
1471  * @return #GNUNET_YES if peer is valid
1472  *         #GNUNET_NO  if peer is not valid
1473  */
1474 static int
1475 check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1476 {
1477   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1478 }
1479
1480
1481 /**
1482  * @brief Indicate that we want to send to the other peer
1483  *
1484  * This establishes a sending channel
1485  *
1486  * @param peer the peer to establish channel to
1487  */
1488 static void
1489 indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1490 {
1491   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1492   (void) get_channel (peer);
1493 }
1494
1495
1496 /**
1497  * @brief Check whether other peer has the intention to send/opened channel
1498  *        towars us
1499  *
1500  * @param peer the peer in question
1501  *
1502  * @return #GNUNET_YES if peer has the intention to send
1503  *         #GNUNET_NO  otherwise
1504  */
1505 static int
1506 check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1507 {
1508   const struct PeerContext *peer_ctx;
1509
1510   peer_ctx = get_peer_ctx (peer);
1511   if (NULL != peer_ctx->recv_channel_ctx)
1512   {
1513     return GNUNET_YES;
1514   }
1515   return GNUNET_NO;
1516 }
1517
1518
1519 /**
1520  * Handle the channel a peer opens to us.
1521  *
1522  * @param cls The closure
1523  * @param channel The channel the peer wants to establish
1524  * @param initiator The peer's peer ID
1525  *
1526  * @return initial channel context for the channel
1527  *         (can be NULL -- that's not an error)
1528  */
1529 static void *
1530 handle_inbound_channel (void *cls,
1531                         struct GNUNET_CADET_Channel *channel,
1532                         const struct GNUNET_PeerIdentity *initiator)
1533 {
1534   struct PeerContext *peer_ctx;
1535   struct GNUNET_PeerIdentity *ctx_peer;
1536   struct ChannelCtx *channel_ctx;
1537
1538   LOG (GNUNET_ERROR_TYPE_DEBUG,
1539       "New channel was established to us (Peer %s).\n",
1540       GNUNET_i2s (initiator));
1541   GNUNET_assert (NULL != channel); /* according to cadet API */
1542   /* Make sure we 'know' about this peer */
1543   peer_ctx = create_or_get_peer_ctx (initiator);
1544   set_peer_live (peer_ctx);
1545   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1546   *ctx_peer = *initiator;
1547   channel_ctx = add_channel_ctx (peer_ctx);
1548   channel_ctx->channel = channel;
1549   /* We only accept one incoming channel per peer */
1550   if (GNUNET_YES == check_peer_send_intention (initiator))
1551   {
1552     LOG (GNUNET_ERROR_TYPE_WARNING,
1553         "Already got one receive channel. Destroying old one.\n");
1554     GNUNET_break_op (0);
1555     destroy_channel (peer_ctx->recv_channel_ctx);
1556     peer_ctx->recv_channel_ctx = channel_ctx;
1557     /* return the channel context */
1558     return channel_ctx;
1559   }
1560   peer_ctx->recv_channel_ctx = channel_ctx;
1561   return channel_ctx;
1562 }
1563
1564
1565 /**
1566  * @brief Check whether a sending channel towards the given peer exists
1567  *
1568  * @param peer the peer to check for
1569  *
1570  * @return #GNUNET_YES if a sending channel towards that peer exists
1571  *         #GNUNET_NO  otherwise
1572  */
1573 static int
1574 check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1575 {
1576   struct PeerContext *peer_ctx;
1577
1578   if (GNUNET_NO == check_peer_known (peer))
1579   { /* If no such peer exists, there is no channel */
1580     return GNUNET_NO;
1581   }
1582   peer_ctx = get_peer_ctx (peer);
1583   if (NULL == peer_ctx->send_channel_ctx)
1584   {
1585     return GNUNET_NO;
1586   }
1587   return GNUNET_YES;
1588 }
1589
1590
1591 /**
1592  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1593  *        intention to another peer
1594  *
1595  * If there is also no channel to receive messages from that peer, remove it
1596  * from the peermap.
1597  * TODO really?
1598  *
1599  * @peer the peer identity of the peer whose sending channel to destroy
1600  * @return #GNUNET_YES if channel was destroyed
1601  *         #GNUNET_NO  otherwise
1602  */
1603 static int
1604 destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1605 {
1606   struct PeerContext *peer_ctx;
1607
1608   if (GNUNET_NO == check_peer_known (peer))
1609   {
1610     return GNUNET_NO;
1611   }
1612   peer_ctx = get_peer_ctx (peer);
1613   if (NULL != peer_ctx->send_channel_ctx)
1614   {
1615     destroy_channel (peer_ctx->send_channel_ctx);
1616     (void) check_connected (peer);
1617     return GNUNET_YES;
1618   }
1619   return GNUNET_NO;
1620 }
1621
1622 /**
1623  * @brief Send a message to another peer.
1624  *
1625  * Keeps track about pending messages so they can be properly removed when the
1626  * peer is destroyed.
1627  *
1628  * @param peer receeiver of the message
1629  * @param ev envelope of the message
1630  * @param type type of the message
1631  */
1632 static void
1633 send_message (const struct GNUNET_PeerIdentity *peer,
1634               struct GNUNET_MQ_Envelope *ev,
1635               const char *type)
1636 {
1637   struct PendingMessage *pending_msg;
1638   struct GNUNET_MQ_Handle *mq;
1639
1640   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1641               "Sending message to %s of type %s\n",
1642               GNUNET_i2s (peer),
1643               type);
1644   pending_msg = insert_pending_message (peer, ev, type);
1645   mq = get_mq (peer);
1646   GNUNET_MQ_notify_sent (ev,
1647                          mq_notify_sent_cb,
1648                          pending_msg);
1649   GNUNET_MQ_send (mq, ev);
1650 }
1651
1652 /**
1653  * @brief Schedule a operation on given peer
1654  *
1655  * Avoids scheduling an operation twice.
1656  *
1657  * @param peer the peer we want to schedule the operation for once it gets live
1658  *
1659  * @return #GNUNET_YES if the operation was scheduled
1660  *         #GNUNET_NO  otherwise
1661  */
1662 static int
1663 schedule_operation (const struct GNUNET_PeerIdentity *peer,
1664                           const PeerOp peer_op)
1665 {
1666   struct PeerPendingOp pending_op;
1667   struct PeerContext *peer_ctx;
1668
1669   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1670
1671   //TODO if LIVE/ONLINE execute immediately
1672
1673   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1674   {
1675     peer_ctx = get_peer_ctx (peer);
1676     pending_op.op = peer_op;
1677     pending_op.op_cls = NULL;
1678     GNUNET_array_append (peer_ctx->pending_ops,
1679                          peer_ctx->num_pending_ops,
1680                          pending_op);
1681     return GNUNET_YES;
1682   }
1683   return GNUNET_NO;
1684 }
1685
1686 /***********************************************************************
1687  * /Old gnunet-service-rps_peers.c
1688 ***********************************************************************/
1689
1690
1691 /***********************************************************************
1692  * Housekeeping with clients
1693 ***********************************************************************/
1694
1695 /**
1696  * Closure used to pass the client and the id to the callback
1697  * that replies to a client's request
1698  */
1699 struct ReplyCls
1700 {
1701   /**
1702    * DLL
1703    */
1704   struct ReplyCls *next;
1705   struct ReplyCls *prev;
1706
1707   /**
1708    * The identifier of the request
1709    */
1710   uint32_t id;
1711
1712   /**
1713    * The handle to the request
1714    */
1715   struct RPS_SamplerRequestHandle *req_handle;
1716
1717   /**
1718    * The client handle to send the reply to
1719    */
1720   struct ClientContext *cli_ctx;
1721 };
1722
1723
1724 /**
1725  * Struct used to store the context of a connected client.
1726  */
1727 struct ClientContext
1728 {
1729   /**
1730    * DLL
1731    */
1732   struct ClientContext *next;
1733   struct ClientContext *prev;
1734
1735   /**
1736    * The message queue to communicate with the client.
1737    */
1738   struct GNUNET_MQ_Handle *mq;
1739
1740   /**
1741    * DLL with handles to single requests from the client
1742    */
1743   struct ReplyCls *rep_cls_head;
1744   struct ReplyCls *rep_cls_tail;
1745
1746   /**
1747    * @brief How many updates this client expects to receive.
1748    */
1749   int64_t view_updates_left;
1750
1751   /**
1752    * The client handle to send the reply to
1753    */
1754   struct GNUNET_SERVICE_Client *client;
1755 };
1756
1757 /**
1758  * DLL with all clients currently connected to us
1759  */
1760 struct ClientContext *cli_ctx_head;
1761 struct ClientContext *cli_ctx_tail;
1762
1763 /***********************************************************************
1764  * /Housekeeping with clients
1765 ***********************************************************************/
1766
1767
1768
1769
1770
1771 /***********************************************************************
1772  * Globals
1773 ***********************************************************************/
1774
1775 /**
1776  * Sampler used for the Brahms protocol itself.
1777  */
1778 static struct RPS_Sampler *prot_sampler;
1779
1780 /**
1781  * Sampler used for the clients.
1782  */
1783 static struct RPS_Sampler *client_sampler;
1784
1785 /**
1786  * Name to log view to
1787  */
1788 static const char *file_name_view_log;
1789
1790 #ifdef TO_FILE
1791 /**
1792  * Name to log number of observed peers to
1793  */
1794 static const char *file_name_observed_log;
1795
1796 /**
1797  * @brief Count the observed peers
1798  */
1799 static uint32_t num_observed_peers;
1800
1801 /**
1802  * @brief Multipeermap (ab-) used to count unique peer_ids
1803  */
1804 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1805 #endif /* TO_FILE */
1806
1807 /**
1808  * The size of sampler we need to be able to satisfy the client's need
1809  * of random peers.
1810  */
1811 static unsigned int sampler_size_client_need;
1812
1813 /**
1814  * The size of sampler we need to be able to satisfy the Brahms protocol's
1815  * need of random peers.
1816  *
1817  * This is one minimum size the sampler grows to.
1818  */
1819 static unsigned int sampler_size_est_need;
1820
1821 /**
1822  * @brief This is the minimum estimate used as sampler size.
1823  *
1824  * It is configured by the user.
1825  */
1826 static unsigned int sampler_size_est_min;
1827
1828 /**
1829  * @brief This is the estimate used as view size.
1830  *
1831  * It is initialised with the minimum
1832  */
1833 static unsigned int view_size_est_need;
1834
1835 /**
1836  * @brief This is the minimum estimate used as view size.
1837  *
1838  * It is configured by the user.
1839  */
1840 static unsigned int view_size_est_min;
1841
1842 /**
1843  * Percentage of total peer number in the view
1844  * to send random PUSHes to
1845  */
1846 static float alpha;
1847
1848 /**
1849  * Percentage of total peer number in the view
1850  * to send random PULLs to
1851  */
1852 static float beta;
1853
1854 /**
1855  * Identifier for the main task that runs periodically.
1856  */
1857 static struct GNUNET_SCHEDULER_Task *do_round_task;
1858
1859 /**
1860  * Time inverval the do_round task runs in.
1861  */
1862 static struct GNUNET_TIME_Relative round_interval;
1863
1864 /**
1865  * List to store peers received through pushes temporary.
1866  */
1867 static struct CustomPeerMap *push_map;
1868
1869 /**
1870  * List to store peers received through pulls temporary.
1871  */
1872 static struct CustomPeerMap *pull_map;
1873
1874 /**
1875  * Handler to NSE.
1876  */
1877 static struct GNUNET_NSE_Handle *nse;
1878
1879 /**
1880  * Handler to CADET.
1881  */
1882 static struct GNUNET_CADET_Handle *cadet_handle;
1883
1884 /**
1885  * @brief Port to communicate to other peers.
1886  */
1887 static struct GNUNET_CADET_Port *cadet_port;
1888
1889 /**
1890  * Handler to PEERINFO.
1891  */
1892 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1893
1894 /**
1895  * Handle for cancellation of iteration over peers.
1896  */
1897 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1898
1899 /**
1900  * Request counter.
1901  *
1902  * Counts how many requets clients already issued.
1903  * Only needed in the beginning to check how many of the 64 deltas
1904  * we already have
1905  */
1906 static unsigned int req_counter;
1907
1908 /**
1909  * Time of the last request we received.
1910  *
1911  * Used to compute the expected request rate.
1912  */
1913 static struct GNUNET_TIME_Absolute last_request;
1914
1915 /**
1916  * Size of #request_deltas.
1917  */
1918 #define REQUEST_DELTAS_SIZE 64
1919 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
1920
1921 /**
1922  * Last 64 deltas between requests
1923  */
1924 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
1925
1926 /**
1927  * The prediction of the rate of requests
1928  */
1929 static struct GNUNET_TIME_Relative request_rate;
1930
1931
1932 #ifdef ENABLE_MALICIOUS
1933 /**
1934  * Type of malicious peer
1935  *
1936  * 0 Don't act malicious at all - Default
1937  * 1 Try to maximise representation
1938  * 2 Try to partition the network
1939  * 3 Combined attack
1940  */
1941 static uint32_t mal_type;
1942
1943 /**
1944  * Other malicious peers
1945  */
1946 static struct GNUNET_PeerIdentity *mal_peers;
1947
1948 /**
1949  * Hashmap of malicious peers used as set.
1950  * Used to more efficiently check whether we know that peer.
1951  */
1952 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
1953
1954 /**
1955  * Number of other malicious peers
1956  */
1957 static uint32_t num_mal_peers;
1958
1959
1960 /**
1961  * If type is 2 This struct is used to store the attacked peers in a DLL
1962  */
1963 struct AttackedPeer
1964 {
1965   /**
1966    * DLL
1967    */
1968   struct AttackedPeer *next;
1969   struct AttackedPeer *prev;
1970
1971   /**
1972    * PeerID
1973    */
1974   struct GNUNET_PeerIdentity peer_id;
1975 };
1976
1977 /**
1978  * If type is 2 this is the DLL of attacked peers
1979  */
1980 static struct AttackedPeer *att_peers_head;
1981 static struct AttackedPeer *att_peers_tail;
1982
1983 /**
1984  * This index is used to point to an attacked peer to
1985  * implement the round-robin-ish way to select attacked peers.
1986  */
1987 static struct AttackedPeer *att_peer_index;
1988
1989 /**
1990  * Hashmap of attacked peers used as set.
1991  * Used to more efficiently check whether we know that peer.
1992  */
1993 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
1994
1995 /**
1996  * Number of attacked peers
1997  */
1998 static uint32_t num_attacked_peers;
1999
2000 /**
2001  * If type is 1 this is the attacked peer
2002  */
2003 static struct GNUNET_PeerIdentity attacked_peer;
2004
2005 /**
2006  * The limit of PUSHes we can send in one round.
2007  * This is an assumption of the Brahms protocol and either implemented
2008  * via proof of work
2009  * or
2010  * assumend to be the bandwidth limitation.
2011  */
2012 static uint32_t push_limit = 10000;
2013 #endif /* ENABLE_MALICIOUS */
2014
2015
2016 /***********************************************************************
2017  * /Globals
2018 ***********************************************************************/
2019
2020
2021 /***********************************************************************
2022  * Util functions
2023 ***********************************************************************/
2024
2025
2026 /**
2027  * Print peerlist to log.
2028  */
2029 static void
2030 print_peer_list (struct GNUNET_PeerIdentity *list,
2031                  unsigned int len)
2032 {
2033   unsigned int i;
2034
2035   LOG (GNUNET_ERROR_TYPE_DEBUG,
2036        "Printing peer list of length %u at %p:\n",
2037        len,
2038        list);
2039   for (i = 0 ; i < len ; i++)
2040   {
2041     LOG (GNUNET_ERROR_TYPE_DEBUG,
2042          "%u. peer: %s\n",
2043          i, GNUNET_i2s (&list[i]));
2044   }
2045 }
2046
2047
2048 /**
2049  * Remove peer from list.
2050  */
2051 static void
2052 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2053                unsigned int *list_size,
2054                const struct GNUNET_PeerIdentity *peer)
2055 {
2056   unsigned int i;
2057   struct GNUNET_PeerIdentity *tmp;
2058
2059   tmp = *peer_list;
2060
2061   LOG (GNUNET_ERROR_TYPE_DEBUG,
2062        "Removing peer %s from list at %p\n",
2063        GNUNET_i2s (peer),
2064        tmp);
2065
2066   for ( i = 0 ; i < *list_size ; i++ )
2067   {
2068     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2069     {
2070       if (i < *list_size -1)
2071       { /* Not at the last entry -- shift peers left */
2072         memmove (&tmp[i], &tmp[i +1],
2073                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2074       }
2075       /* Remove last entry (should be now useless PeerID) */
2076       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2077     }
2078   }
2079   *peer_list = tmp;
2080 }
2081
2082
2083 /**
2084  * Sum all time relatives of an array.
2085  */
2086 static struct GNUNET_TIME_Relative
2087 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2088                 uint32_t arr_size)
2089 {
2090   struct GNUNET_TIME_Relative sum;
2091   uint32_t i;
2092
2093   sum = GNUNET_TIME_UNIT_ZERO;
2094   for ( i = 0 ; i < arr_size ; i++ )
2095   {
2096     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2097   }
2098   return sum;
2099 }
2100
2101
2102 /**
2103  * Compute the average of given time relatives.
2104  */
2105 static struct GNUNET_TIME_Relative
2106 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2107                 uint32_t arr_size)
2108 {
2109   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2110                                                       arr_size),
2111                                       arr_size);
2112 }
2113
2114
2115 /**
2116  * Insert PeerID in #view
2117  *
2118  * Called once we know a peer is live.
2119  * Implements #PeerOp
2120  *
2121  * @return GNUNET_OK if peer was actually inserted
2122  *         GNUNET_NO if peer was not inserted
2123  */
2124 static void
2125 insert_in_view_op (void *cls,
2126                 const struct GNUNET_PeerIdentity *peer);
2127
2128 /**
2129  * Insert PeerID in #view
2130  *
2131  * Called once we know a peer is live.
2132  *
2133  * @return GNUNET_OK if peer was actually inserted
2134  *         GNUNET_NO if peer was not inserted
2135  */
2136 static int
2137 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2138 {
2139   int online;
2140
2141   online = check_peer_flag (peer, Peers_ONLINE);
2142   if ( (GNUNET_NO == online) ||
2143        (GNUNET_SYSERR == online) ) /* peer is not even known */
2144   {
2145     (void) issue_peer_liveliness_check (peer);
2146     (void) schedule_operation (peer, insert_in_view_op);
2147     return GNUNET_NO;
2148   }
2149   /* Open channel towards peer to keep connection open */
2150   indicate_sending_intention (peer);
2151   return View_put (peer);
2152 }
2153
2154 /**
2155  * @brief sends updates to clients that are interested
2156  */
2157 static void
2158 clients_notify_view_update (void);
2159
2160 /**
2161  * Put random peer from sampler into the view as history update.
2162  */
2163 static void
2164 hist_update (void *cls,
2165              struct GNUNET_PeerIdentity *ids,
2166              uint32_t num_peers)
2167 {
2168   unsigned int i;
2169
2170   for (i = 0; i < num_peers; i++)
2171   {
2172     (void) insert_in_view (&ids[i]);
2173     to_file (file_name_view_log,
2174              "+%s\t(hist)",
2175              GNUNET_i2s_full (ids));
2176   }
2177   clients_notify_view_update();
2178 }
2179
2180
2181 /**
2182  * Wrapper around #RPS_sampler_resize()
2183  *
2184  * If we do not have enough sampler elements, double current sampler size
2185  * If we have more than enough sampler elements, halv current sampler size
2186  */
2187 static void
2188 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2189 {
2190   unsigned int sampler_size;
2191
2192   // TODO statistics
2193   // TODO respect the min, max
2194   sampler_size = RPS_sampler_get_size (sampler);
2195   if (sampler_size > new_size * 4)
2196   { /* Shrinking */
2197     RPS_sampler_resize (sampler, sampler_size / 2);
2198   }
2199   else if (sampler_size < new_size)
2200   { /* Growing */
2201     RPS_sampler_resize (sampler, sampler_size * 2);
2202   }
2203   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2204 }
2205
2206
2207 /**
2208  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2209  */
2210 static void
2211 client_resize_wrapper ()
2212 {
2213   uint32_t bigger_size;
2214
2215   // TODO statistics
2216
2217   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2218
2219   // TODO respect the min, max
2220   resize_wrapper (client_sampler, bigger_size);
2221   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2222       bigger_size);
2223 }
2224
2225
2226 /**
2227  * Estimate request rate
2228  *
2229  * Called every time we receive a request from the client.
2230  */
2231 static void
2232 est_request_rate()
2233 {
2234   struct GNUNET_TIME_Relative max_round_duration;
2235
2236   if (request_deltas_size > req_counter)
2237     req_counter++;
2238   if ( 1 < req_counter)
2239   {
2240     /* Shift last request deltas to the right */
2241     memmove (&request_deltas[1],
2242         request_deltas,
2243         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2244
2245     /* Add current delta to beginning */
2246     request_deltas[0] =
2247         GNUNET_TIME_absolute_get_difference (last_request,
2248                                              GNUNET_TIME_absolute_get ());
2249     request_rate = T_relative_avg (request_deltas, req_counter);
2250     request_rate = (request_rate.rel_value_us < 1) ?
2251       GNUNET_TIME_relative_get_unit_ () : request_rate;
2252
2253     /* Compute the duration a round will maximally take */
2254     max_round_duration =
2255         GNUNET_TIME_relative_add (round_interval,
2256                                   GNUNET_TIME_relative_divide (round_interval, 2));
2257
2258     /* Set the estimated size the sampler has to have to
2259      * satisfy the current client request rate */
2260     sampler_size_client_need =
2261         max_round_duration.rel_value_us / request_rate.rel_value_us;
2262
2263     /* Resize the sampler */
2264     client_resize_wrapper ();
2265   }
2266   last_request = GNUNET_TIME_absolute_get ();
2267 }
2268
2269
2270 /**
2271  * Add all peers in @a peer_array to @a peer_map used as set.
2272  *
2273  * @param peer_array array containing the peers
2274  * @param num_peers number of peers in @peer_array
2275  * @param peer_map the peermap to use as set
2276  */
2277 static void
2278 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2279                        unsigned int num_peers,
2280                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2281 {
2282   unsigned int i;
2283   if (NULL == peer_map)
2284   {
2285     LOG (GNUNET_ERROR_TYPE_WARNING,
2286          "Trying to add peers to non-existing peermap.\n");
2287     return;
2288   }
2289
2290   for (i = 0; i < num_peers; i++)
2291   {
2292     GNUNET_CONTAINER_multipeermap_put (peer_map,
2293                                        &peer_array[i],
2294                                        NULL,
2295                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2296   }
2297 }
2298
2299
2300 /**
2301  * Send a PULL REPLY to @a peer_id
2302  *
2303  * @param peer_id the peer to send the reply to.
2304  * @param peer_ids the peers to send to @a peer_id
2305  * @param num_peer_ids the number of peers to send to @a peer_id
2306  */
2307 static void
2308 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2309                  const struct GNUNET_PeerIdentity *peer_ids,
2310                  unsigned int num_peer_ids)
2311 {
2312   uint32_t send_size;
2313   struct GNUNET_MQ_Envelope *ev;
2314   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2315
2316   /* Compute actual size */
2317   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2318               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2319
2320   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2321     /* Compute number of peers to send
2322      * If too long, simply truncate */
2323     // TODO select random ones via permutation
2324     //      or even better: do good protocol design
2325     send_size =
2326       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2327        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2328        sizeof (struct GNUNET_PeerIdentity);
2329   else
2330     send_size = num_peer_ids;
2331
2332   LOG (GNUNET_ERROR_TYPE_DEBUG,
2333       "Going to send PULL REPLY with %u peers to %s\n",
2334       send_size, GNUNET_i2s (peer_id));
2335
2336   ev = GNUNET_MQ_msg_extra (out_msg,
2337                             send_size * sizeof (struct GNUNET_PeerIdentity),
2338                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2339   out_msg->num_peers = htonl (send_size);
2340   GNUNET_memcpy (&out_msg[1], peer_ids,
2341          send_size * sizeof (struct GNUNET_PeerIdentity));
2342
2343   send_message (peer_id, ev, "PULL REPLY");
2344   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2345   // TODO check with send intention: as send_channel is used/opened we indicate
2346   // a sending intention without intending it.
2347   // -> clean peer afterwards?
2348 }
2349
2350
2351 /**
2352  * Insert PeerID in #pull_map
2353  *
2354  * Called once we know a peer is live.
2355  */
2356 static void
2357 insert_in_pull_map (void *cls,
2358                     const struct GNUNET_PeerIdentity *peer)
2359 {
2360   CustomPeerMap_put (pull_map, peer);
2361 }
2362
2363
2364 /**
2365  * Insert PeerID in #view
2366  *
2367  * Called once we know a peer is live.
2368  * Implements #PeerOp
2369  */
2370 static void
2371 insert_in_view_op (void *cls,
2372                 const struct GNUNET_PeerIdentity *peer)
2373 {
2374   (void) insert_in_view (peer);
2375 }
2376
2377
2378 /**
2379  * Update sampler with given PeerID.
2380  * Implements #PeerOp
2381  */
2382 static void
2383 insert_in_sampler (void *cls,
2384                    const struct GNUNET_PeerIdentity *peer)
2385 {
2386   LOG (GNUNET_ERROR_TYPE_DEBUG,
2387        "Updating samplers with peer %s from insert_in_sampler()\n",
2388        GNUNET_i2s (peer));
2389   RPS_sampler_update (prot_sampler,   peer);
2390   RPS_sampler_update (client_sampler, peer);
2391   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2392   {
2393     /* Make sure we 'know' about this peer */
2394     (void) issue_peer_liveliness_check (peer);
2395     /* Establish a channel towards that peer to indicate we are going to send
2396      * messages to it */
2397     //indicate_sending_intention (peer);
2398   }
2399   #ifdef TO_FILE
2400   num_observed_peers++;
2401   GNUNET_CONTAINER_multipeermap_put
2402     (observed_unique_peers,
2403      peer,
2404      NULL,
2405      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2406   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2407       observed_unique_peers);
2408   to_file (file_name_observed_log,
2409           "%" PRIu32 " %" PRIu32 " %f\n",
2410           num_observed_peers,
2411           num_observed_unique_peers,
2412           1.0*num_observed_unique_peers/num_observed_peers)
2413   #endif /* TO_FILE */
2414 }
2415
2416 /**
2417  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2418  *        If the peer is not known, liveliness check is issued and it is
2419  *        scheduled to be inserted in sampler and view.
2420  *
2421  * "External sources" refer to every source except the gossip.
2422  *
2423  * @param peer peer to insert
2424  */
2425 static void
2426 got_peer (const struct GNUNET_PeerIdentity *peer)
2427 {
2428   /* If we did not know this peer already, insert it into sampler and view */
2429   if (GNUNET_YES == issue_peer_liveliness_check (peer))
2430   {
2431     schedule_operation (peer, insert_in_sampler);
2432     schedule_operation (peer, insert_in_view_op);
2433   }
2434 }
2435
2436 /**
2437  * @brief Checks if there is a sending channel and if it is needed
2438  *
2439  * @param peer the peer whose sending channel is checked
2440  * @return GNUNET_YES if sending channel exists and is still needed
2441  *         GNUNET_NO  otherwise
2442  */
2443 static int
2444 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2445 {
2446   /* struct GNUNET_CADET_Channel *channel; */
2447   if (GNUNET_NO == check_peer_known (peer))
2448   {
2449     return GNUNET_NO;
2450   }
2451   if (GNUNET_YES == check_sending_channel_exists (peer))
2452   {
2453     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2454          (GNUNET_YES == View_contains_peer (peer)) ||
2455          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2456          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2457          (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2458     { /* If we want to keep the connection to peer open */
2459       return GNUNET_YES;
2460     }
2461     return GNUNET_NO;
2462   }
2463   return GNUNET_NO;
2464 }
2465
2466 /**
2467  * @brief remove peer from our knowledge, the view, push and pull maps and
2468  * samplers.
2469  *
2470  * @param peer the peer to remove
2471  */
2472 static void
2473 remove_peer (const struct GNUNET_PeerIdentity *peer)
2474 {
2475   (void) View_remove_peer (peer);
2476   CustomPeerMap_remove_peer (pull_map, peer);
2477   CustomPeerMap_remove_peer (push_map, peer);
2478   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2479   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2480   destroy_peer (get_peer_ctx (peer));
2481 }
2482
2483
2484 /**
2485  * @brief Remove data that is not needed anymore.
2486  *
2487  * If the sending channel is no longer needed it is destroyed.
2488  *
2489  * @param peer the peer whose data is about to be cleaned
2490  */
2491 static void
2492 clean_peer (const struct GNUNET_PeerIdentity *peer)
2493 {
2494   if (GNUNET_NO == check_sending_channel_needed (peer))
2495   {
2496     LOG (GNUNET_ERROR_TYPE_DEBUG,
2497         "Going to remove send channel to peer %s\n",
2498         GNUNET_i2s (peer));
2499     #ifdef ENABLE_MALICIOUS
2500     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2501       (void) destroy_sending_channel (peer);
2502     #else /* ENABLE_MALICIOUS */
2503     (void) destroy_sending_channel (peer);
2504     #endif /* ENABLE_MALICIOUS */
2505   }
2506
2507   if ( (GNUNET_NO == check_peer_send_intention (peer)) &&
2508        (GNUNET_NO == View_contains_peer (peer)) &&
2509        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2510        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2511        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2512        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2513        (GNUNET_NO != check_removable (peer)) )
2514   { /* We can safely remove this peer */
2515     LOG (GNUNET_ERROR_TYPE_DEBUG,
2516         "Going to remove peer %s\n",
2517         GNUNET_i2s (peer));
2518     remove_peer (peer);
2519     return;
2520   }
2521 }
2522
2523
2524 /**
2525  * @brief This is called when a channel is destroyed.
2526  *
2527  * Removes peer completely from our knowledge if the send_channel was destroyed
2528  * Otherwise simply delete the recv_channel
2529  * Also check if the knowledge about this peer is still needed.
2530  * If not, remove this peer from our knowledge.
2531  *
2532  * @param cls The closure
2533  * @param channel The channel being closed
2534  * @param channel_ctx The context associated with this channel
2535  */
2536 static void
2537 cleanup_destroyed_channel (void *cls,
2538                            const struct GNUNET_CADET_Channel *channel)
2539 {
2540   struct ChannelCtx *channel_ctx = cls;
2541   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2542
2543   channel_ctx->channel = NULL;
2544   remove_channel_ctx (channel_ctx);
2545   if (NULL != peer_ctx &&
2546       peer_ctx->send_channel_ctx == channel_ctx)
2547   {
2548     remove_peer (&peer_ctx->peer_id);
2549   }
2550 }
2551
2552 /***********************************************************************
2553  * /Util functions
2554 ***********************************************************************/
2555
2556 static void
2557 destroy_reply_cls (struct ReplyCls *rep_cls)
2558 {
2559   struct ClientContext *cli_ctx;
2560
2561   cli_ctx = rep_cls->cli_ctx;
2562   GNUNET_assert (NULL != cli_ctx);
2563   if (NULL != rep_cls->req_handle)
2564   {
2565     RPS_sampler_request_cancel (rep_cls->req_handle);
2566   }
2567   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2568                                cli_ctx->rep_cls_tail,
2569                                rep_cls);
2570   GNUNET_free (rep_cls);
2571 }
2572
2573
2574 static void
2575 destroy_cli_ctx (struct ClientContext *cli_ctx)
2576 {
2577   GNUNET_assert (NULL != cli_ctx);
2578   if (NULL != cli_ctx->rep_cls_head)
2579   {
2580     LOG (GNUNET_ERROR_TYPE_WARNING,
2581          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2582     while (NULL != cli_ctx->rep_cls_head)
2583       destroy_reply_cls (cli_ctx->rep_cls_head);
2584   }
2585   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2586                                cli_ctx_tail,
2587                                cli_ctx);
2588   GNUNET_free (cli_ctx);
2589 }
2590
2591
2592 /**
2593  * Function called by NSE.
2594  *
2595  * Updates sizes of sampler list and view and adapt those lists
2596  * accordingly.
2597  */
2598 static void
2599 nse_callback (void *cls,
2600               struct GNUNET_TIME_Absolute timestamp,
2601               double logestimate, double std_dev)
2602 {
2603   double estimate;
2604   //double scale; // TODO this might go gloabal/config
2605
2606   LOG (GNUNET_ERROR_TYPE_DEBUG,
2607        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2608        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2609   //scale = .01;
2610   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2611   // GNUNET_NSE_log_estimate_to_n (logestimate);
2612   estimate = pow (estimate, 1.0 / 3);
2613   // TODO add if std_dev is a number
2614   // estimate += (std_dev * scale);
2615   if (view_size_est_min < ceil (estimate))
2616   {
2617     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2618     sampler_size_est_need = estimate;
2619     view_size_est_need = estimate;
2620   } else
2621   {
2622     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2623     //sampler_size_est_need = view_size_est_min;
2624     view_size_est_need = view_size_est_min;
2625   }
2626
2627   /* If the NSE has changed adapt the lists accordingly */
2628   resize_wrapper (prot_sampler, sampler_size_est_need);
2629   client_resize_wrapper ();
2630 }
2631
2632
2633 /**
2634  * Callback called once the requested PeerIDs are ready.
2635  *
2636  * Sends those to the requesting client.
2637  */
2638 static void
2639 client_respond (void *cls,
2640                 struct GNUNET_PeerIdentity *peer_ids,
2641                 uint32_t num_peers)
2642 {
2643   struct ReplyCls *reply_cls = cls;
2644   uint32_t i;
2645   struct GNUNET_MQ_Envelope *ev;
2646   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2647   uint32_t size_needed;
2648   struct ClientContext *cli_ctx;
2649
2650   GNUNET_assert (NULL != reply_cls);
2651   LOG (GNUNET_ERROR_TYPE_DEBUG,
2652        "sampler returned %" PRIu32 " peers:\n",
2653        num_peers);
2654   for (i = 0; i < num_peers; i++)
2655   {
2656     LOG (GNUNET_ERROR_TYPE_DEBUG,
2657          "  %" PRIu32 ": %s\n",
2658          i,
2659          GNUNET_i2s (&peer_ids[i]));
2660   }
2661
2662   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2663                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2664
2665   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2666
2667   ev = GNUNET_MQ_msg_extra (out_msg,
2668                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2669                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2670   out_msg->num_peers = htonl (num_peers);
2671   out_msg->id = htonl (reply_cls->id);
2672
2673   GNUNET_memcpy (&out_msg[1],
2674           peer_ids,
2675           num_peers * sizeof (struct GNUNET_PeerIdentity));
2676
2677   cli_ctx = reply_cls->cli_ctx;
2678   GNUNET_assert (NULL != cli_ctx);
2679   reply_cls->req_handle = NULL;
2680   destroy_reply_cls (reply_cls);
2681   GNUNET_MQ_send (cli_ctx->mq, ev);
2682 }
2683
2684
2685 /**
2686  * Handle RPS request from the client.
2687  *
2688  * @param cls closure
2689  * @param message the actual message
2690  */
2691 static void
2692 handle_client_request (void *cls,
2693                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2694 {
2695   struct ClientContext *cli_ctx = cls;
2696   uint32_t num_peers;
2697   uint32_t size_needed;
2698   struct ReplyCls *reply_cls;
2699   uint32_t i;
2700
2701   num_peers = ntohl (msg->num_peers);
2702   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2703                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2704
2705   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2706   {
2707     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2708                 "Message received from client has size larger than expected\n");
2709     GNUNET_SERVICE_client_drop (cli_ctx->client);
2710     return;
2711   }
2712
2713   for (i = 0 ; i < num_peers ; i++)
2714     est_request_rate();
2715
2716   LOG (GNUNET_ERROR_TYPE_DEBUG,
2717        "Client requested %" PRIu32 " random peer(s).\n",
2718        num_peers);
2719
2720   reply_cls = GNUNET_new (struct ReplyCls);
2721   reply_cls->id = ntohl (msg->id);
2722   reply_cls->cli_ctx = cli_ctx;
2723   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2724                                                         client_respond,
2725                                                         reply_cls,
2726                                                         num_peers);
2727
2728   GNUNET_assert (NULL != cli_ctx);
2729   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2730                                cli_ctx->rep_cls_tail,
2731                                reply_cls);
2732   GNUNET_SERVICE_client_continue (cli_ctx->client);
2733 }
2734
2735
2736 /**
2737  * @brief Handle a message that requests the cancellation of a request
2738  *
2739  * @param cls unused
2740  * @param message the message containing the id of the request
2741  */
2742 static void
2743 handle_client_request_cancel (void *cls,
2744                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2745 {
2746   struct ClientContext *cli_ctx = cls;
2747   struct ReplyCls *rep_cls;
2748
2749   GNUNET_assert (NULL != cli_ctx);
2750   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2751   rep_cls = cli_ctx->rep_cls_head;
2752   LOG (GNUNET_ERROR_TYPE_DEBUG,
2753       "Client cancels request with id %" PRIu32 "\n",
2754       ntohl (msg->id));
2755   while ( (NULL != rep_cls->next) &&
2756           (rep_cls->id != ntohl (msg->id)) )
2757     rep_cls = rep_cls->next;
2758   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2759   destroy_reply_cls (rep_cls);
2760   GNUNET_SERVICE_client_continue (cli_ctx->client);
2761 }
2762
2763
2764 /**
2765  * @brief This function is called, when the client seeds peers.
2766  * It verifies that @a msg is well-formed.
2767  *
2768  * @param cls the closure (#ClientContext)
2769  * @param msg the message
2770  * @return #GNUNET_OK if @a msg is well-formed
2771  */
2772 static int
2773 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2774 {
2775   struct ClientContext *cli_ctx = cls;
2776   uint16_t msize = ntohs (msg->header.size);
2777   uint32_t num_peers = ntohl (msg->num_peers);
2778
2779   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2780   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2781        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2782   {
2783     GNUNET_break (0);
2784     GNUNET_SERVICE_client_drop (cli_ctx->client);
2785     return GNUNET_SYSERR;
2786   }
2787   return GNUNET_OK;
2788 }
2789
2790
2791 /**
2792  * Handle seed from the client.
2793  *
2794  * @param cls closure
2795  * @param message the actual message
2796  */
2797 static void
2798 handle_client_seed (void *cls,
2799                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2800 {
2801   struct ClientContext *cli_ctx = cls;
2802   struct GNUNET_PeerIdentity *peers;
2803   uint32_t num_peers;
2804   uint32_t i;
2805
2806   num_peers = ntohl (msg->num_peers);
2807   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2808
2809   LOG (GNUNET_ERROR_TYPE_DEBUG,
2810        "Client seeded peers:\n");
2811   print_peer_list (peers, num_peers);
2812
2813   for (i = 0; i < num_peers; i++)
2814   {
2815     LOG (GNUNET_ERROR_TYPE_DEBUG,
2816          "Updating samplers with seed %" PRIu32 ": %s\n",
2817          i,
2818          GNUNET_i2s (&peers[i]));
2819
2820     got_peer (&peers[i]);
2821   }
2822   GNUNET_SERVICE_client_continue (cli_ctx->client);
2823 }
2824
2825 /**
2826  * @brief Send view to client
2827  *
2828  * @param cli_ctx the context of the client
2829  * @param view_array the peerids of the view as array (can be empty)
2830  * @param view_size the size of the view array (can be 0)
2831  */
2832 void
2833 send_view (const struct ClientContext *cli_ctx,
2834            const struct GNUNET_PeerIdentity *view_array,
2835            uint64_t view_size)
2836 {
2837   struct GNUNET_MQ_Envelope *ev;
2838   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2839
2840   if (NULL == view_array)
2841   {
2842     view_size = View_size ();
2843     view_array = View_get_as_array();
2844   }
2845
2846   ev = GNUNET_MQ_msg_extra (out_msg,
2847                             view_size * sizeof (struct GNUNET_PeerIdentity),
2848                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2849   out_msg->num_peers = htonl (view_size);
2850
2851   GNUNET_memcpy (&out_msg[1],
2852           view_array,
2853           view_size * sizeof (struct GNUNET_PeerIdentity));
2854   GNUNET_MQ_send (cli_ctx->mq, ev);
2855 }
2856
2857 /**
2858  * @brief sends updates to clients that are interested
2859  */
2860 static void
2861 clients_notify_view_update (void)
2862 {
2863   struct ClientContext *cli_ctx_iter;
2864   uint64_t num_peers;
2865   const struct GNUNET_PeerIdentity *view_array;
2866
2867   num_peers = View_size ();
2868   view_array = View_get_as_array();
2869   /* check size of view is small enough */
2870   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2871   {
2872     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2873                 "View is too big to send\n");
2874     return;
2875   }
2876
2877   for (cli_ctx_iter = cli_ctx_head;
2878        NULL != cli_ctx_iter;
2879        cli_ctx_iter = cli_ctx_head->next)
2880   {
2881     if (1 < cli_ctx_iter->view_updates_left)
2882     {
2883       /* Client wants to receive limited amount of updates */
2884       cli_ctx_iter->view_updates_left -= 1;
2885     } else if (1 == cli_ctx_iter->view_updates_left)
2886     {
2887       /* Last update of view for client */
2888       cli_ctx_iter->view_updates_left = -1;
2889     } else if (0 > cli_ctx_iter->view_updates_left) {
2890       /* Client is not interested in updates */
2891       continue;
2892     }
2893     /* else _updates_left == 0 - infinite amount of updates */
2894
2895     /* send view */
2896     send_view (cli_ctx_iter, view_array, num_peers);
2897   }
2898 }
2899
2900
2901 /**
2902  * Handle RPS request from the client.
2903  *
2904  * @param cls closure
2905  * @param message the actual message
2906  */
2907 static void
2908 handle_client_view_request (void *cls,
2909                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2910 {
2911   struct ClientContext *cli_ctx = cls;
2912   uint64_t num_updates;
2913
2914   num_updates = ntohl (msg->num_updates);
2915
2916   LOG (GNUNET_ERROR_TYPE_DEBUG,
2917        "Client requested %" PRIu64 " updates of view.\n",
2918        num_updates);
2919
2920   GNUNET_assert (NULL != cli_ctx);
2921   cli_ctx->view_updates_left = num_updates;
2922   send_view (cli_ctx, NULL, 0);
2923   GNUNET_SERVICE_client_continue (cli_ctx->client);
2924 }
2925
2926 /**
2927  * Handle a CHECK_LIVE message from another peer.
2928  *
2929  * This does nothing. But without calling #GNUNET_CADET_receive_done()
2930  * the channel is blocked for all other communication.
2931  *
2932  * @param cls Closure
2933  * @param msg The message header
2934  */
2935 static void
2936 handle_peer_check (void *cls,
2937                    const struct GNUNET_MessageHeader *msg)
2938 {
2939   const struct ChannelCtx *channel_ctx = cls;
2940   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2941   LOG (GNUNET_ERROR_TYPE_DEBUG,
2942       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2943
2944   GNUNET_CADET_receive_done (channel_ctx->channel);
2945 }
2946
2947 /**
2948  * Handle a PUSH message from another peer.
2949  *
2950  * Check the proof of work and store the PeerID
2951  * in the temporary list for pushed PeerIDs.
2952  *
2953  * @param cls Closure
2954  * @param msg The message header
2955  */
2956 static void
2957 handle_peer_push (void *cls,
2958                   const struct GNUNET_MessageHeader *msg)
2959 {
2960   const struct ChannelCtx *channel_ctx = cls;
2961   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2962
2963   // (check the proof of work (?))
2964
2965   LOG (GNUNET_ERROR_TYPE_DEBUG,
2966        "Received PUSH (%s)\n",
2967        GNUNET_i2s (peer));
2968   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
2969
2970   #ifdef ENABLE_MALICIOUS
2971   struct AttackedPeer *tmp_att_peer;
2972
2973   if ( (1 == mal_type) ||
2974        (3 == mal_type) )
2975   { /* Try to maximise representation */
2976     tmp_att_peer = GNUNET_new (struct AttackedPeer);
2977     tmp_att_peer->peer_id = *peer;
2978     if (NULL == att_peer_set)
2979       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2980     if (GNUNET_NO ==
2981         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2982                                                 peer))
2983     {
2984       GNUNET_CONTAINER_DLL_insert (att_peers_head,
2985                                    att_peers_tail,
2986                                    tmp_att_peer);
2987       add_peer_array_to_set (peer, 1, att_peer_set);
2988     }
2989     else
2990     {
2991       GNUNET_free (tmp_att_peer);
2992     }
2993   }
2994
2995
2996   else if (2 == mal_type)
2997   {
2998     /* We attack one single well-known peer - simply ignore */
2999   }
3000   #endif /* ENABLE_MALICIOUS */
3001
3002   /* Add the sending peer to the push_map */
3003   CustomPeerMap_put (push_map, peer);
3004
3005   GNUNET_break_op (check_peer_known (peer));
3006   GNUNET_CADET_receive_done (channel_ctx->channel);
3007 }
3008
3009
3010 /**
3011  * Handle PULL REQUEST request message from another peer.
3012  *
3013  * Reply with the view of PeerIDs.
3014  *
3015  * @param cls Closure
3016  * @param msg The message header
3017  */
3018 static void
3019 handle_peer_pull_request (void *cls,
3020                           const struct GNUNET_MessageHeader *msg)
3021 {
3022   const struct ChannelCtx *channel_ctx = cls;
3023   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3024   const struct GNUNET_PeerIdentity *view_array;
3025
3026   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3027   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3028
3029   #ifdef ENABLE_MALICIOUS
3030   if (1 == mal_type
3031       || 3 == mal_type)
3032   { /* Try to maximise representation */
3033     send_pull_reply (peer, mal_peers, num_mal_peers);
3034   }
3035
3036   else if (2 == mal_type)
3037   { /* Try to partition network */
3038     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3039     {
3040       send_pull_reply (peer, mal_peers, num_mal_peers);
3041     }
3042   }
3043   #endif /* ENABLE_MALICIOUS */
3044
3045   GNUNET_break_op (check_peer_known (peer));
3046   GNUNET_CADET_receive_done (channel_ctx->channel);
3047   view_array = View_get_as_array ();
3048   send_pull_reply (peer, view_array, View_size ());
3049 }
3050
3051
3052 /**
3053  * Check whether we sent a corresponding request and
3054  * whether this reply is the first one.
3055  *
3056  * @param cls Closure
3057  * @param msg The message header
3058  */
3059 static int
3060 check_peer_pull_reply (void *cls,
3061                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3062 {
3063   struct GNUNET_PeerIdentity *sender = cls;
3064
3065   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3066   {
3067     GNUNET_break_op (0);
3068     return GNUNET_SYSERR;
3069   }
3070
3071   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3072       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3073   {
3074     LOG (GNUNET_ERROR_TYPE_ERROR,
3075         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3076         ntohl (msg->num_peers),
3077         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3078             sizeof (struct GNUNET_PeerIdentity));
3079     GNUNET_break_op (0);
3080     return GNUNET_SYSERR;
3081   }
3082
3083   if (GNUNET_YES != check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3084   {
3085     LOG (GNUNET_ERROR_TYPE_WARNING,
3086         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3087         GNUNET_i2s (sender));
3088     GNUNET_break_op (0);
3089     return GNUNET_SYSERR;
3090   }
3091   return GNUNET_OK;
3092 }
3093
3094 /**
3095  * Handle PULL REPLY message from another peer.
3096  *
3097  * @param cls Closure
3098  * @param msg The message header
3099  */
3100 static void
3101 handle_peer_pull_reply (void *cls,
3102                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3103 {
3104   const struct ChannelCtx *channel_ctx = cls;
3105   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3106   const struct GNUNET_PeerIdentity *peers;
3107   uint32_t i;
3108 #ifdef ENABLE_MALICIOUS
3109   struct AttackedPeer *tmp_att_peer;
3110 #endif /* ENABLE_MALICIOUS */
3111
3112   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3113   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3114
3115   #ifdef ENABLE_MALICIOUS
3116   // We shouldn't even receive pull replies as we're not sending
3117   if (2 == mal_type)
3118   {
3119   }
3120   #endif /* ENABLE_MALICIOUS */
3121
3122   /* Do actual logic */
3123   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3124
3125   LOG (GNUNET_ERROR_TYPE_DEBUG,
3126        "PULL REPLY received, got following %u peers:\n",
3127        ntohl (msg->num_peers));
3128
3129   for (i = 0; i < ntohl (msg->num_peers); i++)
3130   {
3131     LOG (GNUNET_ERROR_TYPE_DEBUG,
3132          "%u. %s\n",
3133          i,
3134          GNUNET_i2s (&peers[i]));
3135
3136     #ifdef ENABLE_MALICIOUS
3137     if ((NULL != att_peer_set) &&
3138         (1 == mal_type || 3 == mal_type))
3139     { /* Add attacked peer to local list */
3140       // TODO check if we sent a request and this was the first reply
3141       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3142                                                                &peers[i])
3143           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3144                                                                   &peers[i]))
3145       {
3146         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3147         tmp_att_peer->peer_id = peers[i];
3148         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3149                                      att_peers_tail,
3150                                      tmp_att_peer);
3151         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3152       }
3153       continue;
3154     }
3155     #endif /* ENABLE_MALICIOUS */
3156     /* Make sure we 'know' about this peer */
3157     (void) insert_peer (&peers[i]);
3158
3159     if (GNUNET_YES == check_peer_valid (&peers[i]))
3160     {
3161       CustomPeerMap_put (pull_map, &peers[i]);
3162     }
3163     else
3164     {
3165       schedule_operation (&peers[i], insert_in_pull_map);
3166       (void) issue_peer_liveliness_check (&peers[i]);
3167     }
3168   }
3169
3170   UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING);
3171   clean_peer (sender);
3172
3173   GNUNET_break_op (check_peer_known (sender));
3174   GNUNET_CADET_receive_done (channel_ctx->channel);
3175 }
3176
3177
3178 /**
3179  * Compute a random delay.
3180  * A uniformly distributed value between mean + spread and mean - spread.
3181  *
3182  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3183  * It would return a random value between 2 and 6 min.
3184  *
3185  * @param mean the mean
3186  * @param spread the inverse amount of deviation from the mean
3187  */
3188 static struct GNUNET_TIME_Relative
3189 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3190                     unsigned int spread)
3191 {
3192   struct GNUNET_TIME_Relative half_interval;
3193   struct GNUNET_TIME_Relative ret;
3194   unsigned int rand_delay;
3195   unsigned int max_rand_delay;
3196
3197   if (0 == spread)
3198   {
3199     LOG (GNUNET_ERROR_TYPE_WARNING,
3200          "Not accepting spread of 0\n");
3201     GNUNET_break (0);
3202     GNUNET_assert (0);
3203   }
3204   GNUNET_assert (0 != mean.rel_value_us);
3205
3206   /* Compute random time value between spread * mean and spread * mean */
3207   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3208
3209   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3210   /**
3211    * Compute random value between (0 and 1) * round_interval
3212    * via multiplying round_interval with a 'fraction' (0 to value)/value
3213    */
3214   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3215   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3216   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3217   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3218
3219   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3220     LOG (GNUNET_ERROR_TYPE_WARNING,
3221          "Returning FOREVER_REL\n");
3222
3223   return ret;
3224 }
3225
3226
3227 /**
3228  * Send single pull request
3229  *
3230  * @param peer_id the peer to send the pull request to.
3231  */
3232 static void
3233 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3234 {
3235   struct GNUNET_MQ_Envelope *ev;
3236
3237   GNUNET_assert (GNUNET_NO == check_peer_flag (peer,
3238                                                      Peers_PULL_REPLY_PENDING));
3239   SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING);
3240
3241   LOG (GNUNET_ERROR_TYPE_DEBUG,
3242        "Going to send PULL REQUEST to peer %s.\n",
3243        GNUNET_i2s (peer));
3244
3245   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3246   send_message (peer, ev, "PULL REQUEST");
3247   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3248 }
3249
3250
3251 /**
3252  * Send single push
3253  *
3254  * @param peer_id the peer to send the push to.
3255  */
3256 static void
3257 send_push (const struct GNUNET_PeerIdentity *peer_id)
3258 {
3259   struct GNUNET_MQ_Envelope *ev;
3260
3261   LOG (GNUNET_ERROR_TYPE_DEBUG,
3262        "Going to send PUSH to peer %s.\n",
3263        GNUNET_i2s (peer_id));
3264
3265   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3266   send_message (peer_id, ev, "PUSH");
3267   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3268 }
3269
3270
3271 static void
3272 do_round (void *cls);
3273
3274 static void
3275 do_mal_round (void *cls);
3276
3277 #ifdef ENABLE_MALICIOUS
3278
3279
3280 /**
3281  * @brief This function is called, when the client tells us to act malicious.
3282  * It verifies that @a msg is well-formed.
3283  *
3284  * @param cls the closure (#ClientContext)
3285  * @param msg the message
3286  * @return #GNUNET_OK if @a msg is well-formed
3287  */
3288 static int
3289 check_client_act_malicious (void *cls,
3290                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3291 {
3292   struct ClientContext *cli_ctx = cls;
3293   uint16_t msize = ntohs (msg->header.size);
3294   uint32_t num_peers = ntohl (msg->num_peers);
3295
3296   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3297   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3298        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3299   {
3300     LOG (GNUNET_ERROR_TYPE_ERROR,
3301         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3302         ntohl (msg->num_peers),
3303         (msize / sizeof (struct GNUNET_PeerIdentity)));
3304     GNUNET_break (0);
3305     GNUNET_SERVICE_client_drop (cli_ctx->client);
3306     return GNUNET_SYSERR;
3307   }
3308   return GNUNET_OK;
3309 }
3310
3311 /**
3312  * Turn RPS service to act malicious.
3313  *
3314  * @param cls Closure
3315  * @param client The client that sent the message
3316  * @param msg The message header
3317  */
3318 static void
3319 handle_client_act_malicious (void *cls,
3320                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3321 {
3322   struct ClientContext *cli_ctx = cls;
3323   struct GNUNET_PeerIdentity *peers;
3324   uint32_t num_mal_peers_sent;
3325   uint32_t num_mal_peers_old;
3326
3327   /* Do actual logic */
3328   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3329   mal_type = ntohl (msg->type);
3330   if (NULL == mal_peer_set)
3331     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3332
3333   LOG (GNUNET_ERROR_TYPE_DEBUG,
3334        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3335        mal_type,
3336        ntohl (msg->num_peers));
3337
3338   if (1 == mal_type)
3339   { /* Try to maximise representation */
3340     /* Add other malicious peers to those we already know */
3341
3342     num_mal_peers_sent = ntohl (msg->num_peers);
3343     num_mal_peers_old = num_mal_peers;
3344     GNUNET_array_grow (mal_peers,
3345                        num_mal_peers,
3346                        num_mal_peers + num_mal_peers_sent);
3347     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3348             peers,
3349             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3350
3351     /* Add all mal peers to mal_peer_set */
3352     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3353                            num_mal_peers_sent,
3354                            mal_peer_set);
3355
3356     /* Substitute do_round () with do_mal_round () */
3357     GNUNET_SCHEDULER_cancel (do_round_task);
3358     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3359   }
3360
3361   else if ( (2 == mal_type) ||
3362             (3 == mal_type) )
3363   { /* Try to partition the network */
3364     /* Add other malicious peers to those we already know */
3365
3366     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3367     num_mal_peers_old = num_mal_peers;
3368     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3369     GNUNET_array_grow (mal_peers,
3370                        num_mal_peers,
3371                        num_mal_peers + num_mal_peers_sent);
3372     if (NULL != mal_peers &&
3373         0 != num_mal_peers)
3374     {
3375       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3376               peers,
3377               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3378
3379       /* Add all mal peers to mal_peer_set */
3380       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3381                              num_mal_peers_sent,
3382                              mal_peer_set);
3383     }
3384
3385     /* Store the one attacked peer */
3386     GNUNET_memcpy (&attacked_peer,
3387             &msg->attacked_peer,
3388             sizeof (struct GNUNET_PeerIdentity));
3389     /* Set the flag of the attacked peer to valid to avoid problems */
3390     if (GNUNET_NO == check_peer_known (&attacked_peer))
3391     {
3392       (void) issue_peer_liveliness_check (&attacked_peer);
3393     }
3394
3395     LOG (GNUNET_ERROR_TYPE_DEBUG,
3396          "Attacked peer is %s\n",
3397          GNUNET_i2s (&attacked_peer));
3398
3399     /* Substitute do_round () with do_mal_round () */
3400     GNUNET_SCHEDULER_cancel (do_round_task);
3401     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3402   }
3403   else if (0 == mal_type)
3404   { /* Stop acting malicious */
3405     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3406
3407     /* Substitute do_mal_round () with do_round () */
3408     GNUNET_SCHEDULER_cancel (do_round_task);
3409     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3410   }
3411   else
3412   {
3413     GNUNET_break (0);
3414     GNUNET_SERVICE_client_continue (cli_ctx->client);
3415   }
3416   GNUNET_SERVICE_client_continue (cli_ctx->client);
3417 }
3418
3419
3420 /**
3421  * Send out PUSHes and PULLs maliciously.
3422  *
3423  * This is executed regylary.
3424  */
3425 static void
3426 do_mal_round (void *cls)
3427 {
3428   uint32_t num_pushes;
3429   uint32_t i;
3430   struct GNUNET_TIME_Relative time_next_round;
3431   struct AttackedPeer *tmp_att_peer;
3432
3433   LOG (GNUNET_ERROR_TYPE_DEBUG,
3434        "Going to execute next round maliciously type %" PRIu32 ".\n",
3435       mal_type);
3436   do_round_task = NULL;
3437   GNUNET_assert (mal_type <= 3);
3438   /* Do malicious actions */
3439   if (1 == mal_type)
3440   { /* Try to maximise representation */
3441
3442     /* The maximum of pushes we're going to send this round */
3443     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3444                                          num_attacked_peers),
3445                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3446
3447     LOG (GNUNET_ERROR_TYPE_DEBUG,
3448          "Going to send %" PRIu32 " pushes\n",
3449          num_pushes);
3450
3451     /* Send PUSHes to attacked peers */
3452     for (i = 0 ; i < num_pushes ; i++)
3453     {
3454       if (att_peers_tail == att_peer_index)
3455         att_peer_index = att_peers_head;
3456       else
3457         att_peer_index = att_peer_index->next;
3458
3459       send_push (&att_peer_index->peer_id);
3460     }
3461
3462     /* Send PULLs to some peers to learn about additional peers to attack */
3463     tmp_att_peer = att_peer_index;
3464     for (i = 0 ; i < num_pushes * alpha ; i++)
3465     {
3466       if (att_peers_tail == tmp_att_peer)
3467         tmp_att_peer = att_peers_head;
3468       else
3469         att_peer_index = tmp_att_peer->next;
3470
3471       send_pull_request (&tmp_att_peer->peer_id);
3472     }
3473   }
3474
3475
3476   else if (2 == mal_type)
3477   { /**
3478      * Try to partition the network
3479      * Send as many pushes to the attacked peer as possible
3480      * That is one push per round as it will ignore more.
3481      */
3482     (void) issue_peer_liveliness_check (&attacked_peer);
3483     if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3484       send_push (&attacked_peer);
3485   }
3486
3487
3488   if (3 == mal_type)
3489   { /* Combined attack */
3490
3491     /* Send PUSH to attacked peers */
3492     if (GNUNET_YES == check_peer_known (&attacked_peer))
3493     {
3494       (void) issue_peer_liveliness_check (&attacked_peer);
3495       if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3496       {
3497         LOG (GNUNET_ERROR_TYPE_DEBUG,
3498             "Goding to send push to attacked peer (%s)\n",
3499             GNUNET_i2s (&attacked_peer));
3500         send_push (&attacked_peer);
3501       }
3502     }
3503     (void) issue_peer_liveliness_check (&attacked_peer);
3504
3505     /* The maximum of pushes we're going to send this round */
3506     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3507                                          num_attacked_peers),
3508                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3509
3510     LOG (GNUNET_ERROR_TYPE_DEBUG,
3511          "Going to send %" PRIu32 " pushes\n",
3512          num_pushes);
3513
3514     for (i = 0; i < num_pushes; i++)
3515     {
3516       if (att_peers_tail == att_peer_index)
3517         att_peer_index = att_peers_head;
3518       else
3519         att_peer_index = att_peer_index->next;
3520
3521       send_push (&att_peer_index->peer_id);
3522     }
3523
3524     /* Send PULLs to some peers to learn about additional peers to attack */
3525     tmp_att_peer = att_peer_index;
3526     for (i = 0; i < num_pushes * alpha; i++)
3527     {
3528       if (att_peers_tail == tmp_att_peer)
3529         tmp_att_peer = att_peers_head;
3530       else
3531         att_peer_index = tmp_att_peer->next;
3532
3533       send_pull_request (&tmp_att_peer->peer_id);
3534     }
3535   }
3536
3537   /* Schedule next round */
3538   time_next_round = compute_rand_delay (round_interval, 2);
3539
3540   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3541   //NULL);
3542   GNUNET_assert (NULL == do_round_task);
3543   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3544                                                 &do_mal_round, NULL);
3545   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3546 }
3547 #endif /* ENABLE_MALICIOUS */
3548
3549 /**
3550  * Send out PUSHes and PULLs, possibly update #view, samplers.
3551  *
3552  * This is executed regylary.
3553  */
3554 static void
3555 do_round (void *cls)
3556 {
3557   uint32_t i;
3558   const struct GNUNET_PeerIdentity *view_array;
3559   unsigned int *permut;
3560   unsigned int a_peers; /* Number of peers we send pushes to */
3561   unsigned int b_peers; /* Number of peers we send pull requests to */
3562   uint32_t first_border;
3563   uint32_t second_border;
3564   struct GNUNET_PeerIdentity peer;
3565   struct GNUNET_PeerIdentity *update_peer;
3566
3567   LOG (GNUNET_ERROR_TYPE_DEBUG,
3568        "Going to execute next round.\n");
3569   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3570   do_round_task = NULL;
3571   LOG (GNUNET_ERROR_TYPE_DEBUG,
3572        "Printing view:\n");
3573   to_file (file_name_view_log,
3574            "___ new round ___");
3575   view_array = View_get_as_array ();
3576   for (i = 0; i < View_size (); i++)
3577   {
3578     LOG (GNUNET_ERROR_TYPE_DEBUG,
3579          "\t%s\n", GNUNET_i2s (&view_array[i]));
3580     to_file (file_name_view_log,
3581              "=%s\t(do round)",
3582              GNUNET_i2s_full (&view_array[i]));
3583   }
3584
3585
3586   /* Send pushes and pull requests */
3587   if (0 < View_size ())
3588   {
3589     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3590                                            View_size ());
3591
3592     /* Send PUSHes */
3593     a_peers = ceil (alpha * View_size ());
3594
3595     LOG (GNUNET_ERROR_TYPE_DEBUG,
3596          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3597          a_peers, alpha, View_size ());
3598     for (i = 0; i < a_peers; i++)
3599     {
3600       peer = view_array[permut[i]];
3601       // FIXME if this fails schedule/loop this for later
3602       send_push (&peer);
3603     }
3604
3605     /* Send PULL requests */
3606     b_peers = ceil (beta * View_size ());
3607     first_border = a_peers;
3608     second_border = a_peers + b_peers;
3609     if (second_border > View_size ())
3610     {
3611       first_border = View_size () - b_peers;
3612       second_border = View_size ();
3613     }
3614     LOG (GNUNET_ERROR_TYPE_DEBUG,
3615         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3616         b_peers, beta, View_size ());
3617     for (i = first_border; i < second_border; i++)
3618     {
3619       peer = view_array[permut[i]];
3620       if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3621       { // FIXME if this fails schedule/loop this for later
3622         send_pull_request (&peer);
3623       }
3624     }
3625
3626     GNUNET_free (permut);
3627     permut = NULL;
3628   }
3629
3630
3631   /* Update view */
3632   /* TODO see how many peers are in push-/pull- list! */
3633
3634   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3635       (0 < CustomPeerMap_size (push_map)) &&
3636       (0 < CustomPeerMap_size (pull_map)))
3637   //if (GNUNET_YES) // disable blocking temporarily
3638   { /* If conditions for update are fulfilled, update */
3639     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3640
3641     uint32_t final_size;
3642     uint32_t peers_to_clean_size;
3643     struct GNUNET_PeerIdentity *peers_to_clean;
3644
3645     peers_to_clean = NULL;
3646     peers_to_clean_size = 0;
3647     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3648     GNUNET_memcpy (peers_to_clean,
3649             view_array,
3650             View_size () * sizeof (struct GNUNET_PeerIdentity));
3651
3652     /* Seems like recreating is the easiest way of emptying the peermap */
3653     View_clear ();
3654     to_file (file_name_view_log,
3655              "--- emptied ---");
3656
3657     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3658                                 CustomPeerMap_size (push_map));
3659     second_border = first_border +
3660                     GNUNET_MIN (floor (beta  * view_size_est_need),
3661                                 CustomPeerMap_size (pull_map));
3662     final_size    = second_border +
3663       ceil ((1 - (alpha + beta)) * view_size_est_need);
3664     LOG (GNUNET_ERROR_TYPE_DEBUG,
3665         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3666         first_border,
3667         second_border,
3668         final_size);
3669
3670     /* Update view with peers received through PUSHes */
3671     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3672                                            CustomPeerMap_size (push_map));
3673     for (i = 0; i < first_border; i++)
3674     {
3675       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3676                                                               permut[i]));
3677       to_file (file_name_view_log,
3678                "+%s\t(push list)",
3679                GNUNET_i2s_full (&view_array[i]));
3680       // TODO change the peer_flags accordingly
3681     }
3682     GNUNET_free (permut);
3683     permut = NULL;
3684
3685     /* Update view with peers received through PULLs */
3686     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3687                                            CustomPeerMap_size (pull_map));
3688     for (i = first_border; i < second_border; i++)
3689     {
3690       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3691             permut[i - first_border]));
3692       to_file (file_name_view_log,
3693                "+%s\t(pull list)",
3694                GNUNET_i2s_full (&view_array[i]));
3695       // TODO change the peer_flags accordingly
3696     }
3697     GNUNET_free (permut);
3698     permut = NULL;
3699
3700     /* Update view with peers from history */
3701     RPS_sampler_get_n_rand_peers (prot_sampler,
3702                                   hist_update,
3703                                   NULL,
3704                                   final_size - second_border);
3705     // TODO change the peer_flags accordingly
3706
3707     for (i = 0; i < View_size (); i++)
3708       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3709
3710     /* Clean peers that were removed from the view */
3711     for (i = 0; i < peers_to_clean_size; i++)
3712     {
3713       to_file (file_name_view_log,
3714                "-%s",
3715                GNUNET_i2s_full (&peers_to_clean[i]));
3716       clean_peer (&peers_to_clean[i]);
3717     }
3718
3719     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3720     clients_notify_view_update();
3721   } else {
3722     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3723     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3724     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3725         !(0 >= CustomPeerMap_size (pull_map)))
3726       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3727     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3728         (0 >= CustomPeerMap_size (pull_map)))
3729       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3730     if (0 >= CustomPeerMap_size (push_map) &&
3731         !(0 >= CustomPeerMap_size (pull_map)))
3732       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3733     if (0 >= CustomPeerMap_size (push_map) &&
3734         (0 >= CustomPeerMap_size (pull_map)))
3735       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3736     if (0 >= CustomPeerMap_size (pull_map) &&
3737         CustomPeerMap_size (push_map) > alpha * View_size () &&
3738         0 >= CustomPeerMap_size (push_map))
3739       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3740   }
3741   // TODO independent of that also get some peers from CADET_get_peers()?
3742   GNUNET_STATISTICS_set (stats,
3743       "# peers in push map at end of round",
3744       CustomPeerMap_size (push_map),
3745       GNUNET_NO);
3746   GNUNET_STATISTICS_set (stats,
3747       "# peers in pull map at end of round",
3748       CustomPeerMap_size (pull_map),
3749       GNUNET_NO);
3750   GNUNET_STATISTICS_set (stats,
3751       "# peers in view at end of round",
3752       View_size (),
3753       GNUNET_NO);
3754
3755   LOG (GNUNET_ERROR_TYPE_DEBUG,
3756        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3757        CustomPeerMap_size (push_map),
3758        CustomPeerMap_size (pull_map),
3759        alpha,
3760        View_size (),
3761        alpha * View_size ());
3762
3763   /* Update samplers */
3764   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3765   {
3766     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3767     LOG (GNUNET_ERROR_TYPE_DEBUG,
3768          "Updating with peer %s from push list\n",
3769          GNUNET_i2s (update_peer));
3770     insert_in_sampler (NULL, update_peer);
3771     clean_peer (update_peer); /* This cleans only if it is not in the view */
3772   }
3773
3774   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3775   {
3776     LOG (GNUNET_ERROR_TYPE_DEBUG,
3777          "Updating with peer %s from pull list\n",
3778          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3779     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3780     /* This cleans only if it is not in the view */
3781     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3782   }
3783
3784
3785   /* Empty push/pull lists */
3786   CustomPeerMap_clear (push_map);
3787   CustomPeerMap_clear (pull_map);
3788
3789   struct GNUNET_TIME_Relative time_next_round;
3790
3791   time_next_round = compute_rand_delay (round_interval, 2);
3792
3793   /* Schedule next round */
3794   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3795                                                 &do_round, NULL);
3796   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3797 }
3798
3799
3800 /**
3801  * This is called from GNUNET_CADET_get_peers().
3802  *
3803  * It is called on every peer(ID) that cadet somehow has contact with.
3804  * We use those to initialise the sampler.
3805  */
3806 void
3807 init_peer_cb (void *cls,
3808               const struct GNUNET_PeerIdentity *peer,
3809               int tunnel, // "Do we have a tunnel towards this peer?"
3810               unsigned int n_paths, // "Number of known paths towards this peer"
3811               unsigned int best_path) // "How long is the best path?
3812                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3813 {
3814   if (NULL != peer)
3815   {
3816     LOG (GNUNET_ERROR_TYPE_DEBUG,
3817          "Got peer_id %s from cadet\n",
3818          GNUNET_i2s (peer));
3819     got_peer (peer);
3820   }
3821 }
3822
3823 /**
3824  * @brief Iterator function over stored, valid peers.
3825  *
3826  * We initialise the sampler with those.
3827  *
3828  * @param cls the closure
3829  * @param peer the peer id
3830  * @return #GNUNET_YES if we should continue to
3831  *         iterate,
3832  *         #GNUNET_NO if not.
3833  */
3834 static int
3835 valid_peers_iterator (void *cls,
3836                       const struct GNUNET_PeerIdentity *peer)
3837 {
3838   if (NULL != peer)
3839   {
3840     LOG (GNUNET_ERROR_TYPE_DEBUG,
3841          "Got stored, valid peer %s\n",
3842          GNUNET_i2s (peer));
3843     got_peer (peer);
3844   }
3845   return GNUNET_YES;
3846 }
3847
3848
3849 /**
3850  * Iterator over peers from peerinfo.
3851  *
3852  * @param cls closure
3853  * @param peer id of the peer, NULL for last call
3854  * @param hello hello message for the peer (can be NULL)
3855  * @param error message
3856  */
3857 void
3858 process_peerinfo_peers (void *cls,
3859                         const struct GNUNET_PeerIdentity *peer,
3860                         const struct GNUNET_HELLO_Message *hello,
3861                         const char *err_msg)
3862 {
3863   if (NULL != peer)
3864   {
3865     LOG (GNUNET_ERROR_TYPE_DEBUG,
3866          "Got peer_id %s from peerinfo\n",
3867          GNUNET_i2s (peer));
3868     got_peer (peer);
3869   }
3870 }
3871
3872
3873 /**
3874  * Task run during shutdown.
3875  *
3876  * @param cls unused
3877  */
3878 static void
3879 shutdown_task (void *cls)
3880 {
3881   struct ClientContext *client_ctx;
3882   struct ReplyCls *reply_cls;
3883
3884   in_shutdown = GNUNET_YES;
3885
3886   LOG (GNUNET_ERROR_TYPE_DEBUG,
3887        "RPS is going down\n");
3888
3889   /* Clean all clients */
3890   for (client_ctx = cli_ctx_head;
3891        NULL != cli_ctx_head;
3892        client_ctx = cli_ctx_head)
3893   {
3894     /* Clean pending requests to the sampler */
3895     for (reply_cls = client_ctx->rep_cls_head;
3896          NULL != client_ctx->rep_cls_head;
3897          reply_cls = client_ctx->rep_cls_head)
3898     {
3899       RPS_sampler_request_cancel (reply_cls->req_handle);
3900       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
3901                                    client_ctx->rep_cls_tail,
3902                                    reply_cls);
3903       GNUNET_free (reply_cls);
3904     }
3905     GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3906                                  cli_ctx_tail,
3907                                  client_ctx);
3908     GNUNET_free (client_ctx);
3909   }
3910   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3911   GNUNET_PEERINFO_disconnect (peerinfo_handle);
3912   peerinfo_handle = NULL;
3913   if (NULL != do_round_task)
3914   {
3915     GNUNET_SCHEDULER_cancel (do_round_task);
3916     do_round_task = NULL;
3917   }
3918
3919   peers_terminate ();
3920
3921   GNUNET_NSE_disconnect (nse);
3922   RPS_sampler_destroy (prot_sampler);
3923   RPS_sampler_destroy (client_sampler);
3924   GNUNET_CADET_close_port (cadet_port);
3925   GNUNET_CADET_disconnect (cadet_handle);
3926   cadet_handle = NULL;
3927   View_destroy ();
3928   CustomPeerMap_destroy (push_map);
3929   CustomPeerMap_destroy (pull_map);
3930   if (NULL != stats)
3931   {
3932     GNUNET_STATISTICS_destroy (stats,
3933                                GNUNET_NO);
3934     stats = NULL;
3935   }
3936 #ifdef ENABLE_MALICIOUS
3937   struct AttackedPeer *tmp_att_peer;
3938   /* it is ok to free this const during shutdown: */
3939   GNUNET_free ((char *) file_name_view_log);
3940 #ifdef TO_FILE
3941   GNUNET_free ((char *) file_name_observed_log);
3942   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
3943 #endif /* TO_FILE */
3944   GNUNET_array_grow (mal_peers,
3945                      num_mal_peers,
3946                      0);
3947   if (NULL != mal_peer_set)
3948     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3949   if (NULL != att_peer_set)
3950     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
3951   while (NULL != att_peers_head)
3952   {
3953     tmp_att_peer = att_peers_head;
3954     GNUNET_CONTAINER_DLL_remove (att_peers_head,
3955                                  att_peers_tail,
3956                                  tmp_att_peer);
3957     GNUNET_free (tmp_att_peer);
3958   }
3959 #endif /* ENABLE_MALICIOUS */
3960 }
3961
3962
3963 /**
3964  * Handle client connecting to the service.
3965  *
3966  * @param cls NULL
3967  * @param client the new client
3968  * @param mq the message queue of @a client
3969  * @return @a client
3970  */
3971 static void *
3972 client_connect_cb (void *cls,
3973                    struct GNUNET_SERVICE_Client *client,
3974                    struct GNUNET_MQ_Handle *mq)
3975 {
3976   struct ClientContext *cli_ctx;
3977
3978   LOG (GNUNET_ERROR_TYPE_DEBUG,
3979        "Client connected\n");
3980   if (NULL == client)
3981     return client; /* Server was destroyed before a client connected. Shutting down */
3982   cli_ctx = GNUNET_new (struct ClientContext);
3983   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
3984   cli_ctx->view_updates_left = -1;
3985   cli_ctx->client = client;
3986   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
3987                                cli_ctx_tail,
3988                                cli_ctx);
3989   return cli_ctx;
3990 }
3991
3992 /**
3993  * Callback called when a client disconnected from the service
3994  *
3995  * @param cls closure for the service
3996  * @param c the client that disconnected
3997  * @param internal_cls should be equal to @a c
3998  */
3999 static void
4000 client_disconnect_cb (void *cls,
4001                       struct GNUNET_SERVICE_Client *client,
4002                       void *internal_cls)
4003 {
4004   struct ClientContext *cli_ctx = internal_cls;
4005
4006   (void) cls;
4007   GNUNET_assert (client == cli_ctx->client);
4008   if (NULL == client)
4009   {/* shutdown task - destroy all clients */
4010     while (NULL != cli_ctx_head)
4011       destroy_cli_ctx (cli_ctx_head);
4012   }
4013   else
4014   { /* destroy this client */
4015     LOG (GNUNET_ERROR_TYPE_DEBUG,
4016         "Client disconnected. Destroy its context.\n");
4017     destroy_cli_ctx (cli_ctx);
4018   }
4019 }
4020
4021
4022 /**
4023  * Handle random peer sampling clients.
4024  *
4025  * @param cls closure
4026  * @param c configuration to use
4027  * @param service the initialized service
4028  */
4029 static void
4030 run (void *cls,
4031      const struct GNUNET_CONFIGURATION_Handle *c,
4032      struct GNUNET_SERVICE_Handle *service)
4033 {
4034   char *fn_valid_peers;
4035
4036   (void) cls;
4037   (void) service;
4038   GNUNET_log_setup ("rps",
4039                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4040                     NULL);
4041   cfg = c;
4042   /* Get own ID */
4043   GNUNET_CRYPTO_get_peer_identity (cfg,
4044                                    &own_identity); // TODO check return value
4045   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4046               "STARTING SERVICE (rps) for peer [%s]\n",
4047               GNUNET_i2s (&own_identity));
4048 #ifdef ENABLE_MALICIOUS
4049   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4050               "Malicious execution compiled in.\n");
4051 #endif /* ENABLE_MALICIOUS */
4052
4053   /* Get time interval from the configuration */
4054   if (GNUNET_OK !=
4055       GNUNET_CONFIGURATION_get_value_time (cfg,
4056                                            "RPS",
4057                                            "ROUNDINTERVAL",
4058                                            &round_interval))
4059   {
4060     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4061                                "RPS", "ROUNDINTERVAL");
4062     GNUNET_SCHEDULER_shutdown ();
4063     return;
4064   }
4065
4066   /* Get initial size of sampler/view from the configuration */
4067   if (GNUNET_OK !=
4068       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4069         (long long unsigned int *) &sampler_size_est_min))
4070   {
4071     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4072                                "RPS", "MINSIZE");
4073     GNUNET_SCHEDULER_shutdown ();
4074     return;
4075   }
4076   sampler_size_est_need = sampler_size_est_min;
4077   view_size_est_min = sampler_size_est_min;
4078   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4079
4080   if (GNUNET_OK !=
4081       GNUNET_CONFIGURATION_get_value_filename (cfg,
4082                                                "rps",
4083                                                "FILENAME_VALID_PEERS",
4084                                                &fn_valid_peers))
4085   {
4086     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4087                                "rps", "FILENAME_VALID_PEERS");
4088   }
4089
4090
4091   View_create (view_size_est_min);
4092
4093   /* file_name_view_log */
4094   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4095   #ifdef TO_FILE
4096   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4097   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4098   #endif /* TO_FILE */
4099
4100   /* connect to NSE */
4101   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4102
4103
4104   alpha = 0.45;
4105   beta  = 0.45;
4106
4107
4108   /* Initialise cadet */
4109   /* There exists a copy-paste-clone in get_channel() */
4110   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4111     GNUNET_MQ_hd_fixed_size (peer_check,
4112                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4113                              struct GNUNET_MessageHeader,
4114                              NULL),
4115     GNUNET_MQ_hd_fixed_size (peer_push,
4116                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4117                              struct GNUNET_MessageHeader,
4118                              NULL),
4119     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4120                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4121                              struct GNUNET_MessageHeader,
4122                              NULL),
4123     GNUNET_MQ_hd_var_size (peer_pull_reply,
4124                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4125                            struct GNUNET_RPS_P2P_PullReplyMessage,
4126                            NULL),
4127     GNUNET_MQ_handler_end ()
4128   };
4129
4130   cadet_handle = GNUNET_CADET_connect (cfg);
4131   GNUNET_assert (NULL != cadet_handle);
4132   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4133                       strlen (GNUNET_APPLICATION_PORT_RPS),
4134                       &port);
4135   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4136                                        &port,
4137                                        &handle_inbound_channel, /* Connect handler */
4138                                        NULL, /* cls */
4139                                        NULL, /* WindowSize handler */
4140                                        &cleanup_destroyed_channel, /* Disconnect handler */
4141                                        cadet_handlers);
4142   if (NULL == cadet_port)
4143   {
4144     LOG (GNUNET_ERROR_TYPE_ERROR,
4145         "Cadet port `%s' is already in use.\n",
4146         GNUNET_APPLICATION_PORT_RPS);
4147     GNUNET_assert (0);
4148   }
4149
4150
4151   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4152   initialise_peers (fn_valid_peers, cadet_handle);
4153   GNUNET_free (fn_valid_peers);
4154
4155   /* Initialise sampler */
4156   struct GNUNET_TIME_Relative half_round_interval;
4157   struct GNUNET_TIME_Relative  max_round_interval;
4158
4159   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4160   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4161
4162   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4163   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4164
4165   /* Initialise push and pull maps */
4166   push_map = CustomPeerMap_create (4);
4167   pull_map = CustomPeerMap_create (4);
4168
4169
4170   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4171   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4172   // TODO send push/pull to each of those peers?
4173   // TODO read stored valid peers from last run
4174   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4175   get_valid_peers (valid_peers_iterator, NULL);
4176
4177   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4178                                                    GNUNET_NO,
4179                                                    process_peerinfo_peers,
4180                                                    NULL);
4181
4182   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4183
4184   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4185   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4186
4187   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4188   stats = GNUNET_STATISTICS_create ("rps", cfg);
4189
4190 }
4191
4192
4193 /**
4194  * Define "main" method using service macro.
4195  */
4196 GNUNET_SERVICE_MAIN
4197 ("rps",
4198  GNUNET_SERVICE_OPTION_NONE,
4199  &run,
4200  &client_connect_cb,
4201  &client_disconnect_cb,
4202  NULL,
4203  GNUNET_MQ_hd_fixed_size (client_request,
4204    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4205    struct GNUNET_RPS_CS_RequestMessage,
4206    NULL),
4207  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4208    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4209    struct GNUNET_RPS_CS_RequestCancelMessage,
4210    NULL),
4211  GNUNET_MQ_hd_var_size (client_seed,
4212    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4213    struct GNUNET_RPS_CS_SeedMessage,
4214    NULL),
4215 #ifdef ENABLE_MALICIOUS
4216  GNUNET_MQ_hd_var_size (client_act_malicious,
4217    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4218    struct GNUNET_RPS_CS_ActMaliciousMessage,
4219    NULL),
4220 #endif /* ENABLE_MALICIOUS */
4221  GNUNET_MQ_hd_fixed_size (client_view_request,
4222    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4223    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4224    NULL),
4225  GNUNET_MQ_handler_end());
4226
4227 /* end of gnunet-service-rps.c */