Schedule destruction of channel during destruction of other channel
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO store peers somewhere persistent
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /**
57  * Our configuration.
58  */
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62  * Handle to the statistics service.
63  */
64 static struct GNUNET_STATISTICS_Handle *stats;
65
66 /**
67  * Our own identity.
68  */
69 static struct GNUNET_PeerIdentity own_identity;
70
71 static int in_shutdown = GNUNET_NO;
72
73 /**
74  * @brief Port used for cadet.
75  *
76  * Don't compute multiple times through making it global
77  */
78 static struct GNUNET_HashCode port;
79
80 /***********************************************************************
81  * Old gnunet-service-rps_peers.c
82 ***********************************************************************/
83
84 /**
85  * Set a peer flag of given peer context.
86  */
87 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
88
89 /**
90  * Get peer flag of given peer context.
91  */
92 #define check_peer_flag_set(peer_ctx, mask)\
93   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
94
95 /**
96  * Unset flag of given peer context.
97  */
98 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * @brief Context for a channel
164  */
165 struct ChannelCtx;
166
167 /**
168  * Struct used to keep track of other peer's status
169  *
170  * This is stored in a multipeermap.
171  * It contains information such as cadet channels, a message queue for sending,
172  * status about the channels, the pending operations on this peer and some flags
173  * about the status of the peer itself. (live, valid, ...)
174  */
175 struct PeerContext
176 {
177   /**
178    * Message queue open to client
179    */
180   struct GNUNET_MQ_Handle *mq;
181
182   /**
183    * Channel open to client.
184    */
185   struct ChannelCtx *send_channel_ctx;
186
187   /**
188    * Channel open from client.
189    */
190   struct ChannelCtx *recv_channel_ctx;
191
192   /**
193    * Array of pending operations on this peer.
194    */
195   struct PeerPendingOp *pending_ops;
196
197   /**
198    * Handle to the callback given to cadet_ntfy_tmt_rdy()
199    *
200    * To be canceled on shutdown.
201    */
202   struct PendingMessage *liveliness_check_pending;
203
204   /**
205    * Number of pending operations.
206    */
207   unsigned int num_pending_ops;
208
209   /**
210    * Identity of the peer
211    */
212   struct GNUNET_PeerIdentity peer_id;
213
214   /**
215    * Flags indicating status of peer
216    */
217   uint32_t peer_flags;
218
219   /**
220    * Last time we received something from that peer.
221    */
222   struct GNUNET_TIME_Absolute last_message_recv;
223
224   /**
225    * Last time we received a keepalive message.
226    */
227   struct GNUNET_TIME_Absolute last_keepalive;
228
229   /**
230    * DLL with all messages that are yet to be sent
231    */
232   struct PendingMessage *pending_messages_head;
233   struct PendingMessage *pending_messages_tail;
234
235   /**
236    * This is pobably followed by 'statistical' data (when we first saw
237    * it, how did we get its ID, how many pushes (in a timeinterval),
238    * ...)
239    */
240 };
241
242 /**
243  * @brief Closure to #valid_peer_iterator
244  */
245 struct PeersIteratorCls
246 {
247   /**
248    * Iterator function
249    */
250   PeersIterator iterator;
251
252   /**
253    * Closure to iterator
254    */
255   void *cls;
256 };
257
258 /**
259  * @brief Context for a channel
260  */
261 struct ChannelCtx
262 {
263   /**
264    * @brief Meant to be used in a DLL
265    */
266   struct ChannelCtx *next;
267   struct ChannelCtx *prev;
268
269   /**
270    * @brief The channel itself
271    */
272   struct GNUNET_CADET_Channel *channel;
273
274   /**
275    * @brief The peer context associated with the channel
276    */
277   struct PeerContext *peer_ctx;
278
279   /**
280    * @brief When channel destruction needs to be delayed (because it is called
281    * from within the cadet routine of another channel destruction) this task
282    * refers to the respective _SCHEDULER_Task.
283    */
284   struct GNUNET_SCHEDULER_Task *destruction_task;
285 };
286
287 /**
288  * @brief Hashmap of valid peers.
289  */
290 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
291
292 /**
293  * @brief Maximum number of valid peers to keep.
294  * TODO read from config
295  */
296 static uint32_t num_valid_peers_max = UINT32_MAX;
297
298 /**
299  * @brief Filename of the file that stores the valid peers persistently.
300  */
301 static char *filename_valid_peers;
302
303 /**
304  * Set of all peers to keep track of them.
305  */
306 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
307
308 /**
309  * Cadet handle.
310  */
311 static struct GNUNET_CADET_Handle *cadet_handle;
312
313
314 /**
315  * @brief Get the #PeerContext associated with a peer
316  *
317  * @param peer the peer id
318  *
319  * @return the #PeerContext
320  */
321 static struct PeerContext *
322 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
323 {
324   struct PeerContext *ctx;
325   int ret;
326
327   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
328   GNUNET_assert (GNUNET_YES == ret);
329   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
330   GNUNET_assert (NULL != ctx);
331   return ctx;
332 }
333
334 /**
335  * @brief Check whether we have information about the given peer.
336  *
337  * FIXME probably deprecated. Make this the new _online.
338  *
339  * @param peer peer in question
340  *
341  * @return #GNUNET_YES if peer is known
342  *         #GNUNET_NO  if peer is not knwon
343  */
344 static int
345 check_peer_known (const struct GNUNET_PeerIdentity *peer)
346 {
347   if (NULL != peer_map)
348   {
349     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
350   } else
351   {
352     return GNUNET_NO;
353   }
354 }
355
356
357 /**
358  * @brief Create a new #PeerContext and insert it into the peer map
359  *
360  * @param peer the peer to create the #PeerContext for
361  *
362  * @return the #PeerContext
363  */
364 static struct PeerContext *
365 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
366 {
367   struct PeerContext *ctx;
368   int ret;
369
370   GNUNET_assert (GNUNET_NO == check_peer_known (peer));
371
372   ctx = GNUNET_new (struct PeerContext);
373   ctx->peer_id = *peer;
374   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
375       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
376   GNUNET_assert (GNUNET_OK == ret);
377   return ctx;
378 }
379
380
381 /**
382  * @brief Create or get a #PeerContext
383  *
384  * @param peer the peer to get the associated context to
385  *
386  * @return the context
387  */
388 static struct PeerContext *
389 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
390 {
391   if (GNUNET_NO == check_peer_known (peer))
392   {
393     return create_peer_ctx (peer);
394   }
395   return get_peer_ctx (peer);
396 }
397
398
399 /**
400  * @brief Check whether we have a connection to this @a peer
401  *
402  * Also sets the #Peers_ONLINE flag accordingly
403  *
404  * @param peer the peer in question
405  *
406  * @return #GNUNET_YES if we are connected
407  *         #GNUNET_NO  otherwise
408  */
409 static int
410 check_connected (const struct GNUNET_PeerIdentity *peer)
411 {
412   struct PeerContext *peer_ctx;
413
414   /* If we don't know about this peer we don't know whether it's online */
415   if (GNUNET_NO == check_peer_known (peer))
416   {
417     return GNUNET_NO;
418   }
419   /* Get the context */
420   peer_ctx = get_peer_ctx (peer);
421   /* If we have no channel to this peer we don't know whether it's online */
422   if ( (NULL == peer_ctx->send_channel_ctx) &&
423        (NULL == peer_ctx->recv_channel_ctx) )
424   {
425     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
426     return GNUNET_NO;
427   }
428   /* Otherwise (if we have a channel, we know that it's online */
429   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
430   return GNUNET_YES;
431 }
432
433
434 /**
435  * @brief The closure to #get_rand_peer_iterator.
436  */
437 struct GetRandPeerIteratorCls
438 {
439   /**
440    * @brief The index of the peer to return.
441    * Will be decreased until 0.
442    * Then current peer is returned.
443    */
444   uint32_t index;
445
446   /**
447    * @brief Pointer to peer to return.
448    */
449   const struct GNUNET_PeerIdentity *peer;
450 };
451
452
453 /**
454  * @brief Iterator function for #get_random_peer_from_peermap.
455  *
456  * Implements #GNUNET_CONTAINER_PeerMapIterator.
457  * Decreases the index until the index is null.
458  * Then returns the current peer.
459  *
460  * @param cls the #GetRandPeerIteratorCls containing index and peer
461  * @param peer current peer
462  * @param value unused
463  *
464  * @return  #GNUNET_YES if we should continue to
465  *          iterate,
466  *          #GNUNET_NO if not.
467  */
468 static int
469 get_rand_peer_iterator (void *cls,
470                         const struct GNUNET_PeerIdentity *peer,
471                         void *value)
472 {
473   struct GetRandPeerIteratorCls *iterator_cls = cls;
474   if (0 >= iterator_cls->index)
475   {
476     iterator_cls->peer = peer;
477     return GNUNET_NO;
478   }
479   iterator_cls->index--;
480   return GNUNET_YES;
481 }
482
483
484 /**
485  * @brief Get a random peer from @a peer_map
486  *
487  * @param peer_map the peer_map to get the peer from
488  *
489  * @return a random peer
490  */
491 static const struct GNUNET_PeerIdentity *
492 get_random_peer_from_peermap (const struct
493                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
494 {
495   struct GetRandPeerIteratorCls *iterator_cls;
496   const struct GNUNET_PeerIdentity *ret;
497
498   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
499   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
500       GNUNET_CONTAINER_multipeermap_size (peer_map));
501   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
502                                                 get_rand_peer_iterator,
503                                                 iterator_cls);
504   ret = iterator_cls->peer;
505   GNUNET_free (iterator_cls);
506   return ret;
507 }
508
509
510 /**
511  * @brief Add a given @a peer to valid peers.
512  *
513  * If valid peers are already #num_valid_peers_max, delete a peer previously.
514  *
515  * @param peer the peer that is added to the valid peers.
516  *
517  * @return #GNUNET_YES if no other peer had to be removed
518  *         #GNUNET_NO  otherwise
519  */
520 static int
521 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
522 {
523   const struct GNUNET_PeerIdentity *rand_peer;
524   int ret;
525
526   ret = GNUNET_YES;
527   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
528   {
529     rand_peer = get_random_peer_from_peermap (valid_peers);
530     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
531     ret = GNUNET_NO;
532   }
533   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
534       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
535   return ret;
536 }
537
538 static void
539 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
540
541 /**
542  * @brief Set the peer flag to living and
543  *        call the pending operations on this peer.
544  *
545  * Also adds peer to #valid_peers.
546  *
547  * @param peer_ctx the #PeerContext of the peer to set live
548  */
549 static void
550 set_peer_live (struct PeerContext *peer_ctx)
551 {
552   struct GNUNET_PeerIdentity *peer;
553   unsigned int i;
554
555   peer = &peer_ctx->peer_id;
556   LOG (GNUNET_ERROR_TYPE_DEBUG,
557       "Peer %s is live and valid, calling %i pending operations on it\n",
558       GNUNET_i2s (peer),
559       peer_ctx->num_pending_ops);
560
561   if (NULL != peer_ctx->liveliness_check_pending)
562   {
563     LOG (GNUNET_ERROR_TYPE_DEBUG,
564          "Removing pending liveliness check for peer %s\n",
565          GNUNET_i2s (&peer_ctx->peer_id));
566     // TODO wait until cadet sets mq->cancel_impl
567     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
568     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
569     peer_ctx->liveliness_check_pending = NULL;
570   }
571
572   (void) add_valid_peer (peer);
573   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
574
575   /* Call pending operations */
576   for (i = 0; i < peer_ctx->num_pending_ops; i++)
577   {
578     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
579   }
580   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
581 }
582
583 static void
584 cleanup_destroyed_channel (void *cls,
585                            const struct GNUNET_CADET_Channel *channel);
586
587 /* Declaration of handlers */
588 static void
589 handle_peer_check (void *cls,
590                    const struct GNUNET_MessageHeader *msg);
591
592 static void
593 handle_peer_push (void *cls,
594                   const struct GNUNET_MessageHeader *msg);
595
596 static void
597 handle_peer_pull_request (void *cls,
598                           const struct GNUNET_MessageHeader *msg);
599
600 static int
601 check_peer_pull_reply (void *cls,
602                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
603
604 static void
605 handle_peer_pull_reply (void *cls,
606                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
607
608 /* End declaration of handlers */
609
610 /**
611  * @brief Allocate memory for a new channel context and insert it into DLL
612  *
613  * @param peer_ctx context of the according peer
614  *
615  * @return The channel context
616  */
617 static struct ChannelCtx *
618 add_channel_ctx (struct PeerContext *peer_ctx)
619 {
620   struct ChannelCtx *channel_ctx;
621   channel_ctx = GNUNET_new (struct ChannelCtx);
622   channel_ctx->peer_ctx = peer_ctx;
623   return channel_ctx;
624 }
625
626
627 /**
628  * @brief Remove the channel context from the DLL and free the memory.
629  *
630  * @param channel_ctx The channel context.
631  */
632 static void
633 remove_channel_ctx (struct ChannelCtx *channel_ctx)
634 {
635   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
636
637   if (channel_ctx == peer_ctx->send_channel_ctx)
638   {
639     GNUNET_free (channel_ctx);
640     peer_ctx->send_channel_ctx = NULL;
641     peer_ctx->mq = NULL;
642   }
643   else if (channel_ctx == peer_ctx->recv_channel_ctx)
644   {
645     GNUNET_free (channel_ctx);
646     peer_ctx->recv_channel_ctx = NULL;
647   }
648   else
649   {
650     GNUNET_assert (0);
651   }
652 }
653
654
655 /**
656  * @brief Get the channel of a peer. If not existing, create.
657  *
658  * @param peer the peer id
659  * @return the #GNUNET_CADET_Channel used to send data to @a peer
660  */
661 struct GNUNET_CADET_Channel *
662 get_channel (const struct GNUNET_PeerIdentity *peer)
663 {
664   struct PeerContext *peer_ctx;
665   struct GNUNET_PeerIdentity *ctx_peer;
666   /* There exists a copy-paste-clone in run() */
667   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
668     GNUNET_MQ_hd_fixed_size (peer_check,
669                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
670                              struct GNUNET_MessageHeader,
671                              NULL),
672     GNUNET_MQ_hd_fixed_size (peer_push,
673                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
674                              struct GNUNET_MessageHeader,
675                              NULL),
676     GNUNET_MQ_hd_fixed_size (peer_pull_request,
677                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
678                              struct GNUNET_MessageHeader,
679                              NULL),
680     GNUNET_MQ_hd_var_size (peer_pull_reply,
681                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
682                            struct GNUNET_RPS_P2P_PullReplyMessage,
683                            NULL),
684     GNUNET_MQ_handler_end ()
685   };
686
687
688   peer_ctx = get_peer_ctx (peer);
689   if (NULL == peer_ctx->send_channel_ctx)
690   {
691     LOG (GNUNET_ERROR_TYPE_DEBUG,
692          "Trying to establish channel to peer %s\n",
693          GNUNET_i2s (peer));
694     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
695     *ctx_peer = *peer;
696     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
697     peer_ctx->send_channel_ctx->channel =
698       GNUNET_CADET_channel_create (cadet_handle,
699                                    peer_ctx->send_channel_ctx, /* context */
700                                    peer,
701                                    &port,
702                                    GNUNET_CADET_OPTION_RELIABLE,
703                                    NULL, /* WindowSize handler */
704                                    cleanup_destroyed_channel, /* Disconnect handler */
705                                    cadet_handlers);
706   }
707   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
708   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
709   return peer_ctx->send_channel_ctx->channel;
710 }
711
712
713 /**
714  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
715  *
716  * If we already have a message queue open to this client,
717  * simply return it, otherways create one.
718  *
719  * @param peer the peer to get the mq to
720  * @return the #GNUNET_MQ_Handle
721  */
722 static struct GNUNET_MQ_Handle *
723 get_mq (const struct GNUNET_PeerIdentity *peer)
724 {
725   struct PeerContext *peer_ctx;
726
727   peer_ctx = get_peer_ctx (peer);
728
729   if (NULL == peer_ctx->mq)
730   {
731     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
732   }
733   return peer_ctx->mq;
734 }
735
736 /**
737  * @brief Add an envelope to a message passed to mq to list of pending messages
738  *
739  * @param peer peer the message was sent to
740  * @param ev envelope to the message
741  * @param type type of the message to be sent
742  * @return pointer to pending message
743  */
744 static struct PendingMessage *
745 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
746                         struct GNUNET_MQ_Envelope *ev,
747                         const char *type)
748 {
749   struct PendingMessage *pending_msg;
750   struct PeerContext *peer_ctx;
751
752   peer_ctx = get_peer_ctx (peer);
753   pending_msg = GNUNET_new (struct PendingMessage);
754   pending_msg->ev = ev;
755   pending_msg->peer_ctx = peer_ctx;
756   pending_msg->type = type;
757   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
758                                peer_ctx->pending_messages_tail,
759                                pending_msg);
760   return pending_msg;
761 }
762
763
764 /**
765  * @brief Remove a pending message from the respective DLL
766  *
767  * @param pending_msg the pending message to remove
768  * @param cancel cancel the pending message, too
769  */
770 static void
771 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
772 {
773   struct PeerContext *peer_ctx;
774
775   peer_ctx = pending_msg->peer_ctx;
776   GNUNET_assert (NULL != peer_ctx);
777   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
778                                peer_ctx->pending_messages_tail,
779                                pending_msg);
780   // TODO wait for the cadet implementation of message cancellation
781   //if (GNUNET_YES == cancel)
782   //{
783   //  GNUNET_MQ_send_cancel (pending_msg->ev);
784   //}
785   GNUNET_free (pending_msg);
786 }
787
788
789 /**
790  * @brief This is called in response to the first message we sent as a
791  * liveliness check.
792  *
793  * @param cls #PeerContext of peer with pending liveliness check
794  */
795 static void
796 mq_liveliness_check_successful (void *cls)
797 {
798   struct PeerContext *peer_ctx = cls;
799
800   if (NULL != peer_ctx->liveliness_check_pending)
801   {
802     LOG (GNUNET_ERROR_TYPE_DEBUG,
803         "Liveliness check for peer %s was successfull\n",
804         GNUNET_i2s (&peer_ctx->peer_id));
805     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
806     peer_ctx->liveliness_check_pending = NULL;
807     set_peer_live (peer_ctx);
808   }
809 }
810
811 /**
812  * Issue a check whether peer is live
813  *
814  * @param peer_ctx the context of the peer
815  */
816 static void
817 check_peer_live (struct PeerContext *peer_ctx)
818 {
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820        "Get informed about peer %s getting live\n",
821        GNUNET_i2s (&peer_ctx->peer_id));
822
823   struct GNUNET_MQ_Handle *mq;
824   struct GNUNET_MQ_Envelope *ev;
825
826   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
827   peer_ctx->liveliness_check_pending =
828     insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
829   mq = get_mq (&peer_ctx->peer_id);
830   GNUNET_MQ_notify_sent (ev,
831                          mq_liveliness_check_successful,
832                          peer_ctx);
833   GNUNET_MQ_send (mq, ev);
834 }
835
836
837 /**
838  * @brief Check whether function of type #PeerOp was already scheduled
839  *
840  * The array with pending operations will probably never grow really big, so
841  * iterating over it should be ok.
842  *
843  * @param peer the peer to check
844  * @param peer_op the operation (#PeerOp) on the peer
845  *
846  * @return #GNUNET_YES if this operation is scheduled on that peer
847  *         #GNUNET_NO  otherwise
848  */
849 static int
850 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
851                            const PeerOp peer_op)
852 {
853   const struct PeerContext *peer_ctx;
854   unsigned int i;
855
856   peer_ctx = get_peer_ctx (peer);
857   for (i = 0; i < peer_ctx->num_pending_ops; i++)
858     if (peer_op == peer_ctx->pending_ops[i].op)
859       return GNUNET_YES;
860   return GNUNET_NO;
861 }
862
863 /**
864  * @brief Callback for scheduler to destroy a channel
865  *
866  * @param cls Context of the channel
867  */
868 static void
869 destroy_channel (struct ChannelCtx *channel_ctx)
870 {
871   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
872
873   if (NULL != channel_ctx->destruction_task)
874   {
875     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
876     channel_ctx->destruction_task = NULL;
877   }
878   GNUNET_CADET_channel_destroy (channel_ctx->channel);
879   channel_ctx->channel = NULL;
880   remove_channel_ctx (channel_ctx);
881 }
882
883 /**
884  * @brief Destroy a cadet channel.
885  *
886  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
887  *
888  * @param cls
889  */
890 static void
891 destroy_channel_cb (void *cls)
892 {
893   struct ChannelCtx *channel_ctx = cls;
894   channel_ctx->destruction_task = NULL;
895   destroy_channel (channel_ctx);
896 }
897
898 /**
899  * @brief Schedule the destruction of a channel for immediately afterwards.
900  *
901  * In case a channel is to be destroyed from within the callback to the
902  * destruction of another channel (send channel), we cannot call
903  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
904  * construction.
905  *
906  * @param channel_ctx channel to be destroyed.
907  */
908 static void
909 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
910 {
911   channel_ctx->destruction_task =
912     GNUNET_SCHEDULER_add_now (destroy_channel_cb, channel_ctx);
913 }
914
915 /**
916  * @brief Remove peer
917  *
918  * @param peer the peer to clean
919  * @return #GNUNET_YES if peer was removed
920  *         #GNUNET_NO  otherwise
921  */
922 static int
923 destroy_peer (struct PeerContext *peer_ctx)
924 {
925   GNUNET_assert (NULL != peer_ctx);
926   GNUNET_assert (NULL != peer_map);
927   if (GNUNET_NO ==
928       GNUNET_CONTAINER_multipeermap_contains (peer_map,
929                                               &peer_ctx->peer_id))
930   {
931     return GNUNET_NO;
932   }
933   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
934   LOG (GNUNET_ERROR_TYPE_DEBUG,
935        "Going to remove peer %s\n",
936        GNUNET_i2s (&peer_ctx->peer_id));
937   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
938
939   /* Clear list of pending operations */
940   // TODO this probably leaks memory
941   //      ('only' the cls to the function. Not sure what to do with it)
942   GNUNET_array_grow (peer_ctx->pending_ops,
943                      peer_ctx->num_pending_ops,
944                      0);
945   /* Remove all pending messages */
946   while (NULL != peer_ctx->pending_messages_head)
947   {
948     LOG (GNUNET_ERROR_TYPE_DEBUG,
949          "Removing unsent %s\n",
950          peer_ctx->pending_messages_head->type);
951     /* Cancle pending message, too */
952     if ( (NULL != peer_ctx->liveliness_check_pending) &&
953          (0 == memcmp (peer_ctx->pending_messages_head,
954                      peer_ctx->liveliness_check_pending,
955                      sizeof (struct PendingMessage))) )
956       {
957         // TODO this may leak memory
958         peer_ctx->liveliness_check_pending = NULL;
959       }
960     remove_pending_message (peer_ctx->pending_messages_head,
961                             GNUNET_YES);
962   }
963
964   /* If we are still waiting for notification whether this peer is live
965    * cancel the according task */
966   if (NULL != peer_ctx->liveliness_check_pending)
967   {
968     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969                 "Removing pending liveliness check for peer %s\n",
970                 GNUNET_i2s (&peer_ctx->peer_id));
971     // TODO wait until cadet sets mq->cancel_impl
972     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
973     remove_pending_message (peer_ctx->liveliness_check_pending,
974                             GNUNET_YES);
975     peer_ctx->liveliness_check_pending = NULL;
976   }
977
978   if (NULL != peer_ctx->send_channel_ctx)
979   {
980     /* This is possibly called from within channel destruction */
981     schedule_channel_destruction (peer_ctx->send_channel_ctx);
982   }
983   if (NULL != peer_ctx->recv_channel_ctx)
984   {
985     /* This is possibly called from within channel destruction */
986     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
987   }
988
989   if (GNUNET_YES !=
990       GNUNET_CONTAINER_multipeermap_remove_all (peer_map,
991                                                 &peer_ctx->peer_id))
992   {
993     LOG (GNUNET_ERROR_TYPE_WARNING,
994          "removing peer from peer_map failed\n");
995   }
996   GNUNET_free (peer_ctx);
997   return GNUNET_YES;
998 }
999
1000
1001 /**
1002  * Iterator over hash map entries. Deletes all contexts of peers.
1003  *
1004  * @param cls closure
1005  * @param key current public key
1006  * @param value value in the hash map
1007  * @return #GNUNET_YES if we should continue to iterate,
1008  *         #GNUNET_NO if not.
1009  */
1010 static int
1011 peermap_clear_iterator (void *cls,
1012                         const struct GNUNET_PeerIdentity *key,
1013                         void *value)
1014 {
1015   destroy_peer (get_peer_ctx (key));
1016   return GNUNET_YES;
1017 }
1018
1019
1020 /**
1021  * @brief This is called once a message is sent.
1022  *
1023  * Removes the pending message
1024  *
1025  * @param cls type of the message that was sent
1026  */
1027 static void
1028 mq_notify_sent_cb (void *cls)
1029 {
1030   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032       "%s was sent.\n",
1033       pending_msg->type);
1034   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1035     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1036   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1037     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1038   if (0 == strncmp ("PUSH", pending_msg->type, 4))
1039     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1040   /* Do not cancle message */
1041   remove_pending_message (pending_msg, GNUNET_NO);
1042 }
1043
1044
1045 /**
1046  * @brief Iterator function for #store_valid_peers.
1047  *
1048  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1049  * Writes single peer to disk.
1050  *
1051  * @param cls the file handle to write to.
1052  * @param peer current peer
1053  * @param value unused
1054  *
1055  * @return  #GNUNET_YES if we should continue to
1056  *          iterate,
1057  *          #GNUNET_NO if not.
1058  */
1059 static int
1060 store_peer_presistently_iterator (void *cls,
1061                                   const struct GNUNET_PeerIdentity *peer,
1062                                   void *value)
1063 {
1064   const struct GNUNET_DISK_FileHandle *fh = cls;
1065   char peer_string[128];
1066   int size;
1067   ssize_t ret;
1068
1069   if (NULL == peer)
1070   {
1071     return GNUNET_YES;
1072   }
1073   size = GNUNET_snprintf (peer_string,
1074                           sizeof (peer_string),
1075                           "%s\n",
1076                           GNUNET_i2s_full (peer));
1077   GNUNET_assert (53 == size);
1078   ret = GNUNET_DISK_file_write (fh,
1079                                 peer_string,
1080                                 size);
1081   GNUNET_assert (size == ret);
1082   return GNUNET_YES;
1083 }
1084
1085
1086 /**
1087  * @brief Store the peers currently in #valid_peers to disk.
1088  */
1089 static void
1090 store_valid_peers ()
1091 {
1092   struct GNUNET_DISK_FileHandle *fh;
1093   uint32_t number_written_peers;
1094   int ret;
1095
1096   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1097   {
1098     return;
1099   }
1100
1101   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
1102   if (GNUNET_SYSERR == ret)
1103   {
1104     LOG (GNUNET_ERROR_TYPE_WARNING,
1105         "Not able to create directory for file `%s'\n",
1106         filename_valid_peers);
1107     GNUNET_break (0);
1108   }
1109   else if (GNUNET_NO == ret)
1110   {
1111     LOG (GNUNET_ERROR_TYPE_WARNING,
1112         "Directory for file `%s' exists but is not writable for us\n",
1113         filename_valid_peers);
1114     GNUNET_break (0);
1115   }
1116   fh = GNUNET_DISK_file_open (filename_valid_peers,
1117                               GNUNET_DISK_OPEN_WRITE |
1118                                   GNUNET_DISK_OPEN_CREATE,
1119                               GNUNET_DISK_PERM_USER_READ |
1120                                   GNUNET_DISK_PERM_USER_WRITE);
1121   if (NULL == fh)
1122   {
1123     LOG (GNUNET_ERROR_TYPE_WARNING,
1124         "Not able to write valid peers to file `%s'\n",
1125         filename_valid_peers);
1126     return;
1127   }
1128   LOG (GNUNET_ERROR_TYPE_DEBUG,
1129       "Writing %u valid peers to disk\n",
1130       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1131   number_written_peers =
1132     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1133                                            store_peer_presistently_iterator,
1134                                            fh);
1135   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1136   GNUNET_assert (number_written_peers ==
1137       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1138 }
1139
1140
1141 /**
1142  * @brief Convert string representation of peer id to peer id.
1143  *
1144  * Counterpart to #GNUNET_i2s_full.
1145  *
1146  * @param string_repr The string representation of the peer id
1147  *
1148  * @return The peer id
1149  */
1150 static const struct GNUNET_PeerIdentity *
1151 s2i_full (const char *string_repr)
1152 {
1153   struct GNUNET_PeerIdentity *peer;
1154   size_t len;
1155   int ret;
1156
1157   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1158   len = strlen (string_repr);
1159   if (52 > len)
1160   {
1161     LOG (GNUNET_ERROR_TYPE_WARNING,
1162         "Not able to convert string representation of PeerID to PeerID\n"
1163         "Sting representation: %s (len %lu) - too short\n",
1164         string_repr,
1165         len);
1166     GNUNET_break (0);
1167   }
1168   else if (52 < len)
1169   {
1170     len = 52;
1171   }
1172   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1173                                                     len,
1174                                                     &peer->public_key);
1175   if (GNUNET_OK != ret)
1176   {
1177     LOG (GNUNET_ERROR_TYPE_WARNING,
1178         "Not able to convert string representation of PeerID to PeerID\n"
1179         "Sting representation: %s\n",
1180         string_repr);
1181     GNUNET_break (0);
1182   }
1183   return peer;
1184 }
1185
1186
1187 /**
1188  * @brief Restore the peers on disk to #valid_peers.
1189  */
1190 static void
1191 restore_valid_peers ()
1192 {
1193   off_t file_size;
1194   uint32_t num_peers;
1195   struct GNUNET_DISK_FileHandle *fh;
1196   char *buf;
1197   ssize_t size_read;
1198   char *iter_buf;
1199   char *str_repr;
1200   const struct GNUNET_PeerIdentity *peer;
1201
1202   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1203   {
1204     return;
1205   }
1206
1207   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1208   {
1209     return;
1210   }
1211   fh = GNUNET_DISK_file_open (filename_valid_peers,
1212                               GNUNET_DISK_OPEN_READ,
1213                               GNUNET_DISK_PERM_NONE);
1214   GNUNET_assert (NULL != fh);
1215   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1216   num_peers = file_size / 53;
1217   buf = GNUNET_malloc (file_size);
1218   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1219   GNUNET_assert (size_read == file_size);
1220   LOG (GNUNET_ERROR_TYPE_DEBUG,
1221       "Restoring %" PRIu32 " peers from file `%s'\n",
1222       num_peers,
1223       filename_valid_peers);
1224   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1225   {
1226     str_repr = GNUNET_strndup (iter_buf, 53);
1227     peer = s2i_full (str_repr);
1228     GNUNET_free (str_repr);
1229     add_valid_peer (peer);
1230     LOG (GNUNET_ERROR_TYPE_DEBUG,
1231         "Restored valid peer %s from disk\n",
1232         GNUNET_i2s_full (peer));
1233   }
1234   iter_buf = NULL;
1235   GNUNET_free (buf);
1236   LOG (GNUNET_ERROR_TYPE_DEBUG,
1237       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1238       num_peers,
1239       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1240   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1241   {
1242     LOG (GNUNET_ERROR_TYPE_WARNING,
1243         "Number of restored peers does not match file size. Have probably duplicates.\n");
1244   }
1245   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1246   LOG (GNUNET_ERROR_TYPE_DEBUG,
1247       "Restored %u valid peers from disk\n",
1248       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1249 }
1250
1251
1252 /**
1253  * @brief Initialise storage of peers
1254  *
1255  * @param fn_valid_peers filename of the file used to store valid peer ids
1256  * @param cadet_h cadet handle
1257  * @param own_id own peer identity
1258  */
1259 static void
1260 initialise_peers (char* fn_valid_peers,
1261                   struct GNUNET_CADET_Handle *cadet_h)
1262 {
1263   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1264   cadet_handle = cadet_h;
1265   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1266   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1267   restore_valid_peers ();
1268 }
1269
1270
1271 /**
1272  * @brief Delete storage of peers that was created with #initialise_peers ()
1273  */
1274 static void
1275 peers_terminate ()
1276 {
1277   if (GNUNET_SYSERR ==
1278       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1279                                              &peermap_clear_iterator,
1280                                              NULL))
1281   {
1282     LOG (GNUNET_ERROR_TYPE_WARNING,
1283         "Iteration destroying peers was aborted.\n");
1284   }
1285   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1286   peer_map = NULL;
1287   store_valid_peers ();
1288   GNUNET_free (filename_valid_peers);
1289   filename_valid_peers = NULL;
1290   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1291   valid_peers = NULL;
1292 }
1293
1294
1295 /**
1296  * Iterator over #valid_peers hash map entries.
1297  *
1298  * @param cls closure - unused
1299  * @param peer current peer id
1300  * @param value value in the hash map - unused
1301  * @return #GNUNET_YES if we should continue to
1302  *         iterate,
1303  *         #GNUNET_NO if not.
1304  */
1305 static int
1306 valid_peer_iterator (void *cls,
1307                      const struct GNUNET_PeerIdentity *peer,
1308                      void *value)
1309 {
1310   struct PeersIteratorCls *it_cls = cls;
1311
1312   return it_cls->iterator (it_cls->cls,
1313                            peer);
1314 }
1315
1316
1317 /**
1318  * @brief Get all currently known, valid peer ids.
1319  *
1320  * @param it function to call on each peer id
1321  * @param it_cls extra argument to @a it
1322  * @return the number of key value pairs processed,
1323  *         #GNUNET_SYSERR if it aborted iteration
1324  */
1325 static int
1326 get_valid_peers (PeersIterator iterator,
1327                  void *it_cls)
1328 {
1329   struct PeersIteratorCls *cls;
1330   int ret;
1331
1332   cls = GNUNET_new (struct PeersIteratorCls);
1333   cls->iterator = iterator;
1334   cls->cls = it_cls;
1335   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1336                                                valid_peer_iterator,
1337                                                cls);
1338   GNUNET_free (cls);
1339   return ret;
1340 }
1341
1342
1343 /**
1344  * @brief Add peer to known peers.
1345  *
1346  * This function is called on new peer_ids from 'external' sources
1347  * (client seed, cadet get_peers(), ...)
1348  *
1349  * @param peer the new #GNUNET_PeerIdentity
1350  *
1351  * @return #GNUNET_YES if peer was inserted
1352  *         #GNUNET_NO  otherwise
1353  */
1354 static int
1355 insert_peer (const struct GNUNET_PeerIdentity *peer)
1356 {
1357   if (GNUNET_YES == check_peer_known (peer))
1358   {
1359     return GNUNET_NO; /* We already know this peer - nothing to do */
1360   }
1361   (void) create_peer_ctx (peer);
1362   return GNUNET_YES;
1363 }
1364
1365
1366 /**
1367  * @brief Check whether flags on a peer are set.
1368  *
1369  * @param peer the peer to check the flag of
1370  * @param flags the flags to check
1371  *
1372  * @return #GNUNET_SYSERR if peer is not known
1373  *         #GNUNET_YES    if all given flags are set
1374  *         #GNUNET_NO     otherwise
1375  */
1376 static int
1377 check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1378                  enum Peers_PeerFlags flags)
1379 {
1380   struct PeerContext *peer_ctx;
1381
1382   if (GNUNET_NO == check_peer_known (peer))
1383   {
1384     return GNUNET_SYSERR;
1385   }
1386   peer_ctx = get_peer_ctx (peer);
1387   return check_peer_flag_set (peer_ctx, flags);
1388 }
1389
1390 /**
1391  * @brief Try connecting to a peer to see whether it is online
1392  *
1393  * If not known yet, insert into known peers
1394  *
1395  * @param peer the peer whose liveliness is to be checked
1396  * @return #GNUNET_YES if peer had to be inserted
1397  *         #GNUNET_NO  otherwise
1398  */
1399 static int
1400 issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1401 {
1402   struct PeerContext *peer_ctx;
1403   int ret;
1404
1405   ret = insert_peer (peer);
1406   peer_ctx = get_peer_ctx (peer);
1407   if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) &&
1408        (NULL == peer_ctx->liveliness_check_pending) )
1409   {
1410     check_peer_live (peer_ctx);
1411   }
1412   return ret;
1413 }
1414
1415
1416 /**
1417  * @brief Check if peer is removable.
1418  *
1419  * Check if
1420  *  - a recv channel exists
1421  *  - there are pending messages
1422  *  - there is no pending pull reply
1423  *
1424  * @param peer the peer in question
1425  * @return #GNUNET_YES    if peer is removable
1426  *         #GNUNET_NO     if peer is NOT removable
1427  *         #GNUNET_SYSERR if peer is not known
1428  */
1429 static int
1430 check_removable (const struct GNUNET_PeerIdentity *peer)
1431 {
1432   struct PeerContext *peer_ctx;
1433
1434   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1435   {
1436     return GNUNET_SYSERR;
1437   }
1438
1439   peer_ctx = get_peer_ctx (peer);
1440   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1441        (NULL != peer_ctx->pending_messages_head) ||
1442        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1443   {
1444     return GNUNET_NO;
1445   }
1446   return GNUNET_YES;
1447 }
1448
1449
1450 /**
1451  * @brief Check whether @a peer is actually a peer.
1452  *
1453  * A valid peer is a peer that we know exists eg. we were connected to once.
1454  *
1455  * @param peer peer in question
1456  *
1457  * @return #GNUNET_YES if peer is valid
1458  *         #GNUNET_NO  if peer is not valid
1459  */
1460 static int
1461 check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1462 {
1463   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1464 }
1465
1466
1467 /**
1468  * @brief Indicate that we want to send to the other peer
1469  *
1470  * This establishes a sending channel
1471  *
1472  * @param peer the peer to establish channel to
1473  */
1474 static void
1475 indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1476 {
1477   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1478   (void) get_channel (peer);
1479 }
1480
1481
1482 /**
1483  * @brief Check whether other peer has the intention to send/opened channel
1484  *        towars us
1485  *
1486  * @param peer the peer in question
1487  *
1488  * @return #GNUNET_YES if peer has the intention to send
1489  *         #GNUNET_NO  otherwise
1490  */
1491 static int
1492 check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1493 {
1494   const struct PeerContext *peer_ctx;
1495
1496   peer_ctx = get_peer_ctx (peer);
1497   if (NULL != peer_ctx->recv_channel_ctx)
1498   {
1499     return GNUNET_YES;
1500   }
1501   return GNUNET_NO;
1502 }
1503
1504
1505 /**
1506  * Handle the channel a peer opens to us.
1507  *
1508  * @param cls The closure
1509  * @param channel The channel the peer wants to establish
1510  * @param initiator The peer's peer ID
1511  *
1512  * @return initial channel context for the channel
1513  *         (can be NULL -- that's not an error)
1514  */
1515 static void *
1516 handle_inbound_channel (void *cls,
1517                         struct GNUNET_CADET_Channel *channel,
1518                         const struct GNUNET_PeerIdentity *initiator)
1519 {
1520   struct PeerContext *peer_ctx;
1521   struct GNUNET_PeerIdentity *ctx_peer;
1522   struct ChannelCtx *channel_ctx;
1523
1524   LOG (GNUNET_ERROR_TYPE_DEBUG,
1525       "New channel was established to us (Peer %s).\n",
1526       GNUNET_i2s (initiator));
1527   GNUNET_assert (NULL != channel); /* according to cadet API */
1528   /* Make sure we 'know' about this peer */
1529   peer_ctx = create_or_get_peer_ctx (initiator);
1530   set_peer_live (peer_ctx);
1531   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1532   *ctx_peer = *initiator;
1533   channel_ctx = add_channel_ctx (peer_ctx);
1534   channel_ctx->channel = channel;
1535   /* We only accept one incoming channel per peer */
1536   if (GNUNET_YES == check_peer_send_intention (initiator))
1537   {
1538     LOG (GNUNET_ERROR_TYPE_WARNING,
1539         "Already got one receive channel. Destroying old one.\n");
1540     GNUNET_break_op (0);
1541     destroy_channel (peer_ctx->recv_channel_ctx);
1542     peer_ctx->recv_channel_ctx = channel_ctx;
1543     /* return the channel context */
1544     return channel_ctx;
1545   }
1546   peer_ctx->recv_channel_ctx = channel_ctx;
1547   return channel_ctx;
1548 }
1549
1550
1551 /**
1552  * @brief Check whether a sending channel towards the given peer exists
1553  *
1554  * @param peer the peer to check for
1555  *
1556  * @return #GNUNET_YES if a sending channel towards that peer exists
1557  *         #GNUNET_NO  otherwise
1558  */
1559 static int
1560 check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1561 {
1562   struct PeerContext *peer_ctx;
1563
1564   if (GNUNET_NO == check_peer_known (peer))
1565   { /* If no such peer exists, there is no channel */
1566     return GNUNET_NO;
1567   }
1568   peer_ctx = get_peer_ctx (peer);
1569   if (NULL == peer_ctx->send_channel_ctx)
1570   {
1571     return GNUNET_NO;
1572   }
1573   return GNUNET_YES;
1574 }
1575
1576
1577 /**
1578  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1579  *        intention to another peer
1580  *
1581  * If there is also no channel to receive messages from that peer, remove it
1582  * from the peermap.
1583  * TODO really?
1584  *
1585  * @peer the peer identity of the peer whose sending channel to destroy
1586  * @return #GNUNET_YES if channel was destroyed
1587  *         #GNUNET_NO  otherwise
1588  */
1589 static int
1590 destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1591 {
1592   struct PeerContext *peer_ctx;
1593
1594   if (GNUNET_NO == check_peer_known (peer))
1595   {
1596     return GNUNET_NO;
1597   }
1598   peer_ctx = get_peer_ctx (peer);
1599   if (NULL != peer_ctx->send_channel_ctx)
1600   {
1601     destroy_channel (peer_ctx->send_channel_ctx);
1602     (void) check_connected (peer);
1603     return GNUNET_YES;
1604   }
1605   return GNUNET_NO;
1606 }
1607
1608 /**
1609  * @brief Send a message to another peer.
1610  *
1611  * Keeps track about pending messages so they can be properly removed when the
1612  * peer is destroyed.
1613  *
1614  * @param peer receeiver of the message
1615  * @param ev envelope of the message
1616  * @param type type of the message
1617  */
1618 static void
1619 send_message (const struct GNUNET_PeerIdentity *peer,
1620               struct GNUNET_MQ_Envelope *ev,
1621               const char *type)
1622 {
1623   struct PendingMessage *pending_msg;
1624   struct GNUNET_MQ_Handle *mq;
1625
1626   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1627               "Sending message to %s of type %s\n",
1628               GNUNET_i2s (peer),
1629               type);
1630   pending_msg = insert_pending_message (peer, ev, type);
1631   mq = get_mq (peer);
1632   GNUNET_MQ_notify_sent (ev,
1633                          mq_notify_sent_cb,
1634                          pending_msg);
1635   GNUNET_MQ_send (mq, ev);
1636 }
1637
1638 /**
1639  * @brief Schedule a operation on given peer
1640  *
1641  * Avoids scheduling an operation twice.
1642  *
1643  * @param peer the peer we want to schedule the operation for once it gets live
1644  *
1645  * @return #GNUNET_YES if the operation was scheduled
1646  *         #GNUNET_NO  otherwise
1647  */
1648 static int
1649 schedule_operation (const struct GNUNET_PeerIdentity *peer,
1650                           const PeerOp peer_op)
1651 {
1652   struct PeerPendingOp pending_op;
1653   struct PeerContext *peer_ctx;
1654
1655   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1656
1657   //TODO if LIVE/ONLINE execute immediately
1658
1659   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1660   {
1661     peer_ctx = get_peer_ctx (peer);
1662     pending_op.op = peer_op;
1663     pending_op.op_cls = NULL;
1664     GNUNET_array_append (peer_ctx->pending_ops,
1665                          peer_ctx->num_pending_ops,
1666                          pending_op);
1667     return GNUNET_YES;
1668   }
1669   return GNUNET_NO;
1670 }
1671
1672 /***********************************************************************
1673  * /Old gnunet-service-rps_peers.c
1674 ***********************************************************************/
1675
1676
1677 /***********************************************************************
1678  * Housekeeping with clients
1679 ***********************************************************************/
1680
1681 /**
1682  * Closure used to pass the client and the id to the callback
1683  * that replies to a client's request
1684  */
1685 struct ReplyCls
1686 {
1687   /**
1688    * DLL
1689    */
1690   struct ReplyCls *next;
1691   struct ReplyCls *prev;
1692
1693   /**
1694    * The identifier of the request
1695    */
1696   uint32_t id;
1697
1698   /**
1699    * The handle to the request
1700    */
1701   struct RPS_SamplerRequestHandle *req_handle;
1702
1703   /**
1704    * The client handle to send the reply to
1705    */
1706   struct ClientContext *cli_ctx;
1707 };
1708
1709
1710 /**
1711  * Struct used to store the context of a connected client.
1712  */
1713 struct ClientContext
1714 {
1715   /**
1716    * DLL
1717    */
1718   struct ClientContext *next;
1719   struct ClientContext *prev;
1720
1721   /**
1722    * The message queue to communicate with the client.
1723    */
1724   struct GNUNET_MQ_Handle *mq;
1725
1726   /**
1727    * DLL with handles to single requests from the client
1728    */
1729   struct ReplyCls *rep_cls_head;
1730   struct ReplyCls *rep_cls_tail;
1731
1732   /**
1733    * @brief How many updates this client expects to receive.
1734    */
1735   int64_t view_updates_left;
1736
1737   /**
1738    * The client handle to send the reply to
1739    */
1740   struct GNUNET_SERVICE_Client *client;
1741 };
1742
1743 /**
1744  * DLL with all clients currently connected to us
1745  */
1746 struct ClientContext *cli_ctx_head;
1747 struct ClientContext *cli_ctx_tail;
1748
1749 /***********************************************************************
1750  * /Housekeeping with clients
1751 ***********************************************************************/
1752
1753
1754
1755
1756
1757 /***********************************************************************
1758  * Globals
1759 ***********************************************************************/
1760
1761 /**
1762  * Sampler used for the Brahms protocol itself.
1763  */
1764 static struct RPS_Sampler *prot_sampler;
1765
1766 /**
1767  * Sampler used for the clients.
1768  */
1769 static struct RPS_Sampler *client_sampler;
1770
1771 /**
1772  * Name to log view to
1773  */
1774 static const char *file_name_view_log;
1775
1776 #ifdef TO_FILE
1777 /**
1778  * Name to log number of observed peers to
1779  */
1780 static const char *file_name_observed_log;
1781
1782 /**
1783  * @brief Count the observed peers
1784  */
1785 static uint32_t num_observed_peers;
1786
1787 /**
1788  * @brief Multipeermap (ab-) used to count unique peer_ids
1789  */
1790 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1791 #endif /* TO_FILE */
1792
1793 /**
1794  * The size of sampler we need to be able to satisfy the client's need
1795  * of random peers.
1796  */
1797 static unsigned int sampler_size_client_need;
1798
1799 /**
1800  * The size of sampler we need to be able to satisfy the Brahms protocol's
1801  * need of random peers.
1802  *
1803  * This is one minimum size the sampler grows to.
1804  */
1805 static unsigned int sampler_size_est_need;
1806
1807 /**
1808  * @brief This is the minimum estimate used as sampler size.
1809  *
1810  * It is configured by the user.
1811  */
1812 static unsigned int sampler_size_est_min;
1813
1814 /**
1815  * @brief This is the estimate used as view size.
1816  *
1817  * It is initialised with the minimum
1818  */
1819 static unsigned int view_size_est_need;
1820
1821 /**
1822  * @brief This is the minimum estimate used as view size.
1823  *
1824  * It is configured by the user.
1825  */
1826 static unsigned int view_size_est_min;
1827
1828 /**
1829  * Percentage of total peer number in the view
1830  * to send random PUSHes to
1831  */
1832 static float alpha;
1833
1834 /**
1835  * Percentage of total peer number in the view
1836  * to send random PULLs to
1837  */
1838 static float beta;
1839
1840 /**
1841  * Identifier for the main task that runs periodically.
1842  */
1843 static struct GNUNET_SCHEDULER_Task *do_round_task;
1844
1845 /**
1846  * Time inverval the do_round task runs in.
1847  */
1848 static struct GNUNET_TIME_Relative round_interval;
1849
1850 /**
1851  * List to store peers received through pushes temporary.
1852  */
1853 static struct CustomPeerMap *push_map;
1854
1855 /**
1856  * List to store peers received through pulls temporary.
1857  */
1858 static struct CustomPeerMap *pull_map;
1859
1860 /**
1861  * Handler to NSE.
1862  */
1863 static struct GNUNET_NSE_Handle *nse;
1864
1865 /**
1866  * Handler to CADET.
1867  */
1868 static struct GNUNET_CADET_Handle *cadet_handle;
1869
1870 /**
1871  * @brief Port to communicate to other peers.
1872  */
1873 static struct GNUNET_CADET_Port *cadet_port;
1874
1875 /**
1876  * Handler to PEERINFO.
1877  */
1878 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1879
1880 /**
1881  * Handle for cancellation of iteration over peers.
1882  */
1883 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1884
1885 /**
1886  * Request counter.
1887  *
1888  * Counts how many requets clients already issued.
1889  * Only needed in the beginning to check how many of the 64 deltas
1890  * we already have
1891  */
1892 static unsigned int req_counter;
1893
1894 /**
1895  * Time of the last request we received.
1896  *
1897  * Used to compute the expected request rate.
1898  */
1899 static struct GNUNET_TIME_Absolute last_request;
1900
1901 /**
1902  * Size of #request_deltas.
1903  */
1904 #define REQUEST_DELTAS_SIZE 64
1905 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
1906
1907 /**
1908  * Last 64 deltas between requests
1909  */
1910 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
1911
1912 /**
1913  * The prediction of the rate of requests
1914  */
1915 static struct GNUNET_TIME_Relative request_rate;
1916
1917
1918 #ifdef ENABLE_MALICIOUS
1919 /**
1920  * Type of malicious peer
1921  *
1922  * 0 Don't act malicious at all - Default
1923  * 1 Try to maximise representation
1924  * 2 Try to partition the network
1925  * 3 Combined attack
1926  */
1927 static uint32_t mal_type;
1928
1929 /**
1930  * Other malicious peers
1931  */
1932 static struct GNUNET_PeerIdentity *mal_peers;
1933
1934 /**
1935  * Hashmap of malicious peers used as set.
1936  * Used to more efficiently check whether we know that peer.
1937  */
1938 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
1939
1940 /**
1941  * Number of other malicious peers
1942  */
1943 static uint32_t num_mal_peers;
1944
1945
1946 /**
1947  * If type is 2 This struct is used to store the attacked peers in a DLL
1948  */
1949 struct AttackedPeer
1950 {
1951   /**
1952    * DLL
1953    */
1954   struct AttackedPeer *next;
1955   struct AttackedPeer *prev;
1956
1957   /**
1958    * PeerID
1959    */
1960   struct GNUNET_PeerIdentity peer_id;
1961 };
1962
1963 /**
1964  * If type is 2 this is the DLL of attacked peers
1965  */
1966 static struct AttackedPeer *att_peers_head;
1967 static struct AttackedPeer *att_peers_tail;
1968
1969 /**
1970  * This index is used to point to an attacked peer to
1971  * implement the round-robin-ish way to select attacked peers.
1972  */
1973 static struct AttackedPeer *att_peer_index;
1974
1975 /**
1976  * Hashmap of attacked peers used as set.
1977  * Used to more efficiently check whether we know that peer.
1978  */
1979 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
1980
1981 /**
1982  * Number of attacked peers
1983  */
1984 static uint32_t num_attacked_peers;
1985
1986 /**
1987  * If type is 1 this is the attacked peer
1988  */
1989 static struct GNUNET_PeerIdentity attacked_peer;
1990
1991 /**
1992  * The limit of PUSHes we can send in one round.
1993  * This is an assumption of the Brahms protocol and either implemented
1994  * via proof of work
1995  * or
1996  * assumend to be the bandwidth limitation.
1997  */
1998 static uint32_t push_limit = 10000;
1999 #endif /* ENABLE_MALICIOUS */
2000
2001
2002 /***********************************************************************
2003  * /Globals
2004 ***********************************************************************/
2005
2006
2007 /***********************************************************************
2008  * Util functions
2009 ***********************************************************************/
2010
2011
2012 /**
2013  * Print peerlist to log.
2014  */
2015 static void
2016 print_peer_list (struct GNUNET_PeerIdentity *list,
2017                  unsigned int len)
2018 {
2019   unsigned int i;
2020
2021   LOG (GNUNET_ERROR_TYPE_DEBUG,
2022        "Printing peer list of length %u at %p:\n",
2023        len,
2024        list);
2025   for (i = 0 ; i < len ; i++)
2026   {
2027     LOG (GNUNET_ERROR_TYPE_DEBUG,
2028          "%u. peer: %s\n",
2029          i, GNUNET_i2s (&list[i]));
2030   }
2031 }
2032
2033
2034 /**
2035  * Remove peer from list.
2036  */
2037 static void
2038 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2039                unsigned int *list_size,
2040                const struct GNUNET_PeerIdentity *peer)
2041 {
2042   unsigned int i;
2043   struct GNUNET_PeerIdentity *tmp;
2044
2045   tmp = *peer_list;
2046
2047   LOG (GNUNET_ERROR_TYPE_DEBUG,
2048        "Removing peer %s from list at %p\n",
2049        GNUNET_i2s (peer),
2050        tmp);
2051
2052   for ( i = 0 ; i < *list_size ; i++ )
2053   {
2054     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2055     {
2056       if (i < *list_size -1)
2057       { /* Not at the last entry -- shift peers left */
2058         memmove (&tmp[i], &tmp[i +1],
2059                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2060       }
2061       /* Remove last entry (should be now useless PeerID) */
2062       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2063     }
2064   }
2065   *peer_list = tmp;
2066 }
2067
2068
2069 /**
2070  * Sum all time relatives of an array.
2071  */
2072 static struct GNUNET_TIME_Relative
2073 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2074                 uint32_t arr_size)
2075 {
2076   struct GNUNET_TIME_Relative sum;
2077   uint32_t i;
2078
2079   sum = GNUNET_TIME_UNIT_ZERO;
2080   for ( i = 0 ; i < arr_size ; i++ )
2081   {
2082     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2083   }
2084   return sum;
2085 }
2086
2087
2088 /**
2089  * Compute the average of given time relatives.
2090  */
2091 static struct GNUNET_TIME_Relative
2092 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2093                 uint32_t arr_size)
2094 {
2095   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2096                                                       arr_size),
2097                                       arr_size);
2098 }
2099
2100
2101 /**
2102  * Insert PeerID in #view
2103  *
2104  * Called once we know a peer is live.
2105  * Implements #PeerOp
2106  *
2107  * @return GNUNET_OK if peer was actually inserted
2108  *         GNUNET_NO if peer was not inserted
2109  */
2110 static void
2111 insert_in_view_op (void *cls,
2112                 const struct GNUNET_PeerIdentity *peer);
2113
2114 /**
2115  * Insert PeerID in #view
2116  *
2117  * Called once we know a peer is live.
2118  *
2119  * @return GNUNET_OK if peer was actually inserted
2120  *         GNUNET_NO if peer was not inserted
2121  */
2122 static int
2123 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2124 {
2125   int online;
2126
2127   online = check_peer_flag (peer, Peers_ONLINE);
2128   if ( (GNUNET_NO == online) ||
2129        (GNUNET_SYSERR == online) ) /* peer is not even known */
2130   {
2131     (void) issue_peer_liveliness_check (peer);
2132     (void) schedule_operation (peer, insert_in_view_op);
2133     return GNUNET_NO;
2134   }
2135   /* Open channel towards peer to keep connection open */
2136   indicate_sending_intention (peer);
2137   return View_put (peer);
2138 }
2139
2140 /**
2141  * @brief sends updates to clients that are interested
2142  */
2143 static void
2144 clients_notify_view_update (void);
2145
2146 /**
2147  * Put random peer from sampler into the view as history update.
2148  */
2149 static void
2150 hist_update (void *cls,
2151              struct GNUNET_PeerIdentity *ids,
2152              uint32_t num_peers)
2153 {
2154   unsigned int i;
2155
2156   for (i = 0; i < num_peers; i++)
2157   {
2158     (void) insert_in_view (&ids[i]);
2159     to_file (file_name_view_log,
2160              "+%s\t(hist)",
2161              GNUNET_i2s_full (ids));
2162   }
2163   clients_notify_view_update();
2164 }
2165
2166
2167 /**
2168  * Wrapper around #RPS_sampler_resize()
2169  *
2170  * If we do not have enough sampler elements, double current sampler size
2171  * If we have more than enough sampler elements, halv current sampler size
2172  */
2173 static void
2174 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2175 {
2176   unsigned int sampler_size;
2177
2178   // TODO statistics
2179   // TODO respect the min, max
2180   sampler_size = RPS_sampler_get_size (sampler);
2181   if (sampler_size > new_size * 4)
2182   { /* Shrinking */
2183     RPS_sampler_resize (sampler, sampler_size / 2);
2184   }
2185   else if (sampler_size < new_size)
2186   { /* Growing */
2187     RPS_sampler_resize (sampler, sampler_size * 2);
2188   }
2189   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2190 }
2191
2192
2193 /**
2194  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2195  */
2196 static void
2197 client_resize_wrapper ()
2198 {
2199   uint32_t bigger_size;
2200
2201   // TODO statistics
2202
2203   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2204
2205   // TODO respect the min, max
2206   resize_wrapper (client_sampler, bigger_size);
2207   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2208       bigger_size);
2209 }
2210
2211
2212 /**
2213  * Estimate request rate
2214  *
2215  * Called every time we receive a request from the client.
2216  */
2217 static void
2218 est_request_rate()
2219 {
2220   struct GNUNET_TIME_Relative max_round_duration;
2221
2222   if (request_deltas_size > req_counter)
2223     req_counter++;
2224   if ( 1 < req_counter)
2225   {
2226     /* Shift last request deltas to the right */
2227     memmove (&request_deltas[1],
2228         request_deltas,
2229         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2230
2231     /* Add current delta to beginning */
2232     request_deltas[0] =
2233         GNUNET_TIME_absolute_get_difference (last_request,
2234                                              GNUNET_TIME_absolute_get ());
2235     request_rate = T_relative_avg (request_deltas, req_counter);
2236     request_rate = (request_rate.rel_value_us < 1) ?
2237       GNUNET_TIME_relative_get_unit_ () : request_rate;
2238
2239     /* Compute the duration a round will maximally take */
2240     max_round_duration =
2241         GNUNET_TIME_relative_add (round_interval,
2242                                   GNUNET_TIME_relative_divide (round_interval, 2));
2243
2244     /* Set the estimated size the sampler has to have to
2245      * satisfy the current client request rate */
2246     sampler_size_client_need =
2247         max_round_duration.rel_value_us / request_rate.rel_value_us;
2248
2249     /* Resize the sampler */
2250     client_resize_wrapper ();
2251   }
2252   last_request = GNUNET_TIME_absolute_get ();
2253 }
2254
2255
2256 /**
2257  * Add all peers in @a peer_array to @a peer_map used as set.
2258  *
2259  * @param peer_array array containing the peers
2260  * @param num_peers number of peers in @peer_array
2261  * @param peer_map the peermap to use as set
2262  */
2263 static void
2264 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2265                        unsigned int num_peers,
2266                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2267 {
2268   unsigned int i;
2269   if (NULL == peer_map)
2270   {
2271     LOG (GNUNET_ERROR_TYPE_WARNING,
2272          "Trying to add peers to non-existing peermap.\n");
2273     return;
2274   }
2275
2276   for (i = 0; i < num_peers; i++)
2277   {
2278     GNUNET_CONTAINER_multipeermap_put (peer_map,
2279                                        &peer_array[i],
2280                                        NULL,
2281                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2282   }
2283 }
2284
2285
2286 /**
2287  * Send a PULL REPLY to @a peer_id
2288  *
2289  * @param peer_id the peer to send the reply to.
2290  * @param peer_ids the peers to send to @a peer_id
2291  * @param num_peer_ids the number of peers to send to @a peer_id
2292  */
2293 static void
2294 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2295                  const struct GNUNET_PeerIdentity *peer_ids,
2296                  unsigned int num_peer_ids)
2297 {
2298   uint32_t send_size;
2299   struct GNUNET_MQ_Envelope *ev;
2300   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2301
2302   /* Compute actual size */
2303   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2304               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2305
2306   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2307     /* Compute number of peers to send
2308      * If too long, simply truncate */
2309     // TODO select random ones via permutation
2310     //      or even better: do good protocol design
2311     send_size =
2312       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2313        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2314        sizeof (struct GNUNET_PeerIdentity);
2315   else
2316     send_size = num_peer_ids;
2317
2318   LOG (GNUNET_ERROR_TYPE_DEBUG,
2319       "Going to send PULL REPLY with %u peers to %s\n",
2320       send_size, GNUNET_i2s (peer_id));
2321
2322   ev = GNUNET_MQ_msg_extra (out_msg,
2323                             send_size * sizeof (struct GNUNET_PeerIdentity),
2324                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2325   out_msg->num_peers = htonl (send_size);
2326   GNUNET_memcpy (&out_msg[1], peer_ids,
2327          send_size * sizeof (struct GNUNET_PeerIdentity));
2328
2329   send_message (peer_id, ev, "PULL REPLY");
2330   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2331   // TODO check with send intention: as send_channel is used/opened we indicate
2332   // a sending intention without intending it.
2333   // -> clean peer afterwards?
2334 }
2335
2336
2337 /**
2338  * Insert PeerID in #pull_map
2339  *
2340  * Called once we know a peer is live.
2341  */
2342 static void
2343 insert_in_pull_map (void *cls,
2344                     const struct GNUNET_PeerIdentity *peer)
2345 {
2346   CustomPeerMap_put (pull_map, peer);
2347 }
2348
2349
2350 /**
2351  * Insert PeerID in #view
2352  *
2353  * Called once we know a peer is live.
2354  * Implements #PeerOp
2355  */
2356 static void
2357 insert_in_view_op (void *cls,
2358                 const struct GNUNET_PeerIdentity *peer)
2359 {
2360   (void) insert_in_view (peer);
2361 }
2362
2363
2364 /**
2365  * Update sampler with given PeerID.
2366  * Implements #PeerOp
2367  */
2368 static void
2369 insert_in_sampler (void *cls,
2370                    const struct GNUNET_PeerIdentity *peer)
2371 {
2372   LOG (GNUNET_ERROR_TYPE_DEBUG,
2373        "Updating samplers with peer %s from insert_in_sampler()\n",
2374        GNUNET_i2s (peer));
2375   RPS_sampler_update (prot_sampler,   peer);
2376   RPS_sampler_update (client_sampler, peer);
2377   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2378   {
2379     /* Make sure we 'know' about this peer */
2380     (void) issue_peer_liveliness_check (peer);
2381     /* Establish a channel towards that peer to indicate we are going to send
2382      * messages to it */
2383     //indicate_sending_intention (peer);
2384   }
2385   #ifdef TO_FILE
2386   num_observed_peers++;
2387   GNUNET_CONTAINER_multipeermap_put
2388     (observed_unique_peers,
2389      peer,
2390      NULL,
2391      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2392   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2393       observed_unique_peers);
2394   to_file (file_name_observed_log,
2395           "%" PRIu32 " %" PRIu32 " %f\n",
2396           num_observed_peers,
2397           num_observed_unique_peers,
2398           1.0*num_observed_unique_peers/num_observed_peers)
2399   #endif /* TO_FILE */
2400 }
2401
2402 /**
2403  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2404  *        If the peer is not known, liveliness check is issued and it is
2405  *        scheduled to be inserted in sampler and view.
2406  *
2407  * "External sources" refer to every source except the gossip.
2408  *
2409  * @param peer peer to insert
2410  */
2411 static void
2412 got_peer (const struct GNUNET_PeerIdentity *peer)
2413 {
2414   /* If we did not know this peer already, insert it into sampler and view */
2415   if (GNUNET_YES == issue_peer_liveliness_check (peer))
2416   {
2417     schedule_operation (peer, insert_in_sampler);
2418     schedule_operation (peer, insert_in_view_op);
2419   }
2420 }
2421
2422 /**
2423  * @brief Checks if there is a sending channel and if it is needed
2424  *
2425  * @param peer the peer whose sending channel is checked
2426  * @return GNUNET_YES if sending channel exists and is still needed
2427  *         GNUNET_NO  otherwise
2428  */
2429 static int
2430 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2431 {
2432   /* struct GNUNET_CADET_Channel *channel; */
2433   if (GNUNET_NO == check_peer_known (peer))
2434   {
2435     return GNUNET_NO;
2436   }
2437   if (GNUNET_YES == check_sending_channel_exists (peer))
2438   {
2439     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2440          (GNUNET_YES == View_contains_peer (peer)) ||
2441          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2442          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2443          (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2444     { /* If we want to keep the connection to peer open */
2445       return GNUNET_YES;
2446     }
2447     return GNUNET_NO;
2448   }
2449   return GNUNET_NO;
2450 }
2451
2452 /**
2453  * @brief remove peer from our knowledge, the view, push and pull maps and
2454  * samplers.
2455  *
2456  * @param peer the peer to remove
2457  */
2458 static void
2459 remove_peer (const struct GNUNET_PeerIdentity *peer)
2460 {
2461   (void) View_remove_peer (peer);
2462   CustomPeerMap_remove_peer (pull_map, peer);
2463   CustomPeerMap_remove_peer (push_map, peer);
2464   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2465   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2466   destroy_peer (get_peer_ctx (peer));
2467 }
2468
2469
2470 /**
2471  * @brief Remove data that is not needed anymore.
2472  *
2473  * If the sending channel is no longer needed it is destroyed.
2474  *
2475  * @param peer the peer whose data is about to be cleaned
2476  */
2477 static void
2478 clean_peer (const struct GNUNET_PeerIdentity *peer)
2479 {
2480   if (GNUNET_NO == check_sending_channel_needed (peer))
2481   {
2482     LOG (GNUNET_ERROR_TYPE_DEBUG,
2483         "Going to remove send channel to peer %s\n",
2484         GNUNET_i2s (peer));
2485     #ifdef ENABLE_MALICIOUS
2486     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2487       (void) destroy_sending_channel (peer);
2488     #else /* ENABLE_MALICIOUS */
2489     (void) destroy_sending_channel (peer);
2490     #endif /* ENABLE_MALICIOUS */
2491   }
2492
2493   if ( (GNUNET_NO == check_peer_send_intention (peer)) &&
2494        (GNUNET_NO == View_contains_peer (peer)) &&
2495        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2496        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2497        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2498        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2499        (GNUNET_NO != check_removable (peer)) )
2500   { /* We can safely remove this peer */
2501     LOG (GNUNET_ERROR_TYPE_DEBUG,
2502         "Going to remove peer %s\n",
2503         GNUNET_i2s (peer));
2504     remove_peer (peer);
2505     return;
2506   }
2507 }
2508
2509
2510 /**
2511  * @brief This is called when a channel is destroyed.
2512  *
2513  * Removes peer completely from our knowledge if the send_channel was destroyed
2514  * Otherwise simply delete the recv_channel
2515  * Also check if the knowledge about this peer is still needed.
2516  * If not, remove this peer from our knowledge.
2517  *
2518  * @param cls The closure
2519  * @param channel The channel being closed
2520  * @param channel_ctx The context associated with this channel
2521  */
2522 static void
2523 cleanup_destroyed_channel (void *cls,
2524                            const struct GNUNET_CADET_Channel *channel)
2525 {
2526   struct ChannelCtx *channel_ctx = cls;
2527   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2528
2529   // What should be done here:
2530   //  * cleanup everything related to the channel
2531   //    * memory
2532   //  * remove peer if necessary
2533
2534   if (peer_ctx->recv_channel_ctx == channel_ctx)
2535   {
2536     remove_channel_ctx (channel_ctx);
2537   }
2538   else if (peer_ctx->send_channel_ctx == channel_ctx)
2539   {
2540     remove_channel_ctx (channel_ctx);
2541     remove_peer (&peer_ctx->peer_id);
2542   }
2543 }
2544
2545 /***********************************************************************
2546  * /Util functions
2547 ***********************************************************************/
2548
2549 static void
2550 destroy_reply_cls (struct ReplyCls *rep_cls)
2551 {
2552   struct ClientContext *cli_ctx;
2553
2554   cli_ctx = rep_cls->cli_ctx;
2555   GNUNET_assert (NULL != cli_ctx);
2556   if (NULL != rep_cls->req_handle)
2557   {
2558     RPS_sampler_request_cancel (rep_cls->req_handle);
2559   }
2560   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2561                                cli_ctx->rep_cls_tail,
2562                                rep_cls);
2563   GNUNET_free (rep_cls);
2564 }
2565
2566
2567 static void
2568 destroy_cli_ctx (struct ClientContext *cli_ctx)
2569 {
2570   GNUNET_assert (NULL != cli_ctx);
2571   if (NULL != cli_ctx->rep_cls_head)
2572   {
2573     LOG (GNUNET_ERROR_TYPE_WARNING,
2574          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2575     while (NULL != cli_ctx->rep_cls_head)
2576       destroy_reply_cls (cli_ctx->rep_cls_head);
2577   }
2578   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2579                                cli_ctx_tail,
2580                                cli_ctx);
2581   GNUNET_free (cli_ctx);
2582 }
2583
2584
2585 /**
2586  * Function called by NSE.
2587  *
2588  * Updates sizes of sampler list and view and adapt those lists
2589  * accordingly.
2590  */
2591 static void
2592 nse_callback (void *cls,
2593               struct GNUNET_TIME_Absolute timestamp,
2594               double logestimate, double std_dev)
2595 {
2596   double estimate;
2597   //double scale; // TODO this might go gloabal/config
2598
2599   LOG (GNUNET_ERROR_TYPE_DEBUG,
2600        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2601        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2602   //scale = .01;
2603   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2604   // GNUNET_NSE_log_estimate_to_n (logestimate);
2605   estimate = pow (estimate, 1.0 / 3);
2606   // TODO add if std_dev is a number
2607   // estimate += (std_dev * scale);
2608   if (view_size_est_min < ceil (estimate))
2609   {
2610     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2611     sampler_size_est_need = estimate;
2612     view_size_est_need = estimate;
2613   } else
2614   {
2615     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2616     //sampler_size_est_need = view_size_est_min;
2617     view_size_est_need = view_size_est_min;
2618   }
2619
2620   /* If the NSE has changed adapt the lists accordingly */
2621   resize_wrapper (prot_sampler, sampler_size_est_need);
2622   client_resize_wrapper ();
2623 }
2624
2625
2626 /**
2627  * Callback called once the requested PeerIDs are ready.
2628  *
2629  * Sends those to the requesting client.
2630  */
2631 static void
2632 client_respond (void *cls,
2633                 struct GNUNET_PeerIdentity *peer_ids,
2634                 uint32_t num_peers)
2635 {
2636   struct ReplyCls *reply_cls = cls;
2637   uint32_t i;
2638   struct GNUNET_MQ_Envelope *ev;
2639   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2640   uint32_t size_needed;
2641   struct ClientContext *cli_ctx;
2642
2643   GNUNET_assert (NULL != reply_cls);
2644   LOG (GNUNET_ERROR_TYPE_DEBUG,
2645        "sampler returned %" PRIu32 " peers:\n",
2646        num_peers);
2647   for (i = 0; i < num_peers; i++)
2648   {
2649     LOG (GNUNET_ERROR_TYPE_DEBUG,
2650          "  %" PRIu32 ": %s\n",
2651          i,
2652          GNUNET_i2s (&peer_ids[i]));
2653   }
2654
2655   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2656                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2657
2658   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2659
2660   ev = GNUNET_MQ_msg_extra (out_msg,
2661                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2662                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2663   out_msg->num_peers = htonl (num_peers);
2664   out_msg->id = htonl (reply_cls->id);
2665
2666   GNUNET_memcpy (&out_msg[1],
2667           peer_ids,
2668           num_peers * sizeof (struct GNUNET_PeerIdentity));
2669
2670   cli_ctx = reply_cls->cli_ctx;
2671   GNUNET_assert (NULL != cli_ctx);
2672   reply_cls->req_handle = NULL;
2673   destroy_reply_cls (reply_cls);
2674   GNUNET_MQ_send (cli_ctx->mq, ev);
2675 }
2676
2677
2678 /**
2679  * Handle RPS request from the client.
2680  *
2681  * @param cls closure
2682  * @param message the actual message
2683  */
2684 static void
2685 handle_client_request (void *cls,
2686                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2687 {
2688   struct ClientContext *cli_ctx = cls;
2689   uint32_t num_peers;
2690   uint32_t size_needed;
2691   struct ReplyCls *reply_cls;
2692   uint32_t i;
2693
2694   num_peers = ntohl (msg->num_peers);
2695   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2696                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2697
2698   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2699   {
2700     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2701                 "Message received from client has size larger than expected\n");
2702     GNUNET_SERVICE_client_drop (cli_ctx->client);
2703     return;
2704   }
2705
2706   for (i = 0 ; i < num_peers ; i++)
2707     est_request_rate();
2708
2709   LOG (GNUNET_ERROR_TYPE_DEBUG,
2710        "Client requested %" PRIu32 " random peer(s).\n",
2711        num_peers);
2712
2713   reply_cls = GNUNET_new (struct ReplyCls);
2714   reply_cls->id = ntohl (msg->id);
2715   reply_cls->cli_ctx = cli_ctx;
2716   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2717                                                         client_respond,
2718                                                         reply_cls,
2719                                                         num_peers);
2720
2721   GNUNET_assert (NULL != cli_ctx);
2722   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2723                                cli_ctx->rep_cls_tail,
2724                                reply_cls);
2725   GNUNET_SERVICE_client_continue (cli_ctx->client);
2726 }
2727
2728
2729 /**
2730  * @brief Handle a message that requests the cancellation of a request
2731  *
2732  * @param cls unused
2733  * @param message the message containing the id of the request
2734  */
2735 static void
2736 handle_client_request_cancel (void *cls,
2737                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2738 {
2739   struct ClientContext *cli_ctx = cls;
2740   struct ReplyCls *rep_cls;
2741
2742   GNUNET_assert (NULL != cli_ctx);
2743   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2744   rep_cls = cli_ctx->rep_cls_head;
2745   LOG (GNUNET_ERROR_TYPE_DEBUG,
2746       "Client cancels request with id %" PRIu32 "\n",
2747       ntohl (msg->id));
2748   while ( (NULL != rep_cls->next) &&
2749           (rep_cls->id != ntohl (msg->id)) )
2750     rep_cls = rep_cls->next;
2751   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2752   destroy_reply_cls (rep_cls);
2753   GNUNET_SERVICE_client_continue (cli_ctx->client);
2754 }
2755
2756
2757 /**
2758  * @brief This function is called, when the client seeds peers.
2759  * It verifies that @a msg is well-formed.
2760  *
2761  * @param cls the closure (#ClientContext)
2762  * @param msg the message
2763  * @return #GNUNET_OK if @a msg is well-formed
2764  */
2765 static int
2766 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2767 {
2768   struct ClientContext *cli_ctx = cls;
2769   uint16_t msize = ntohs (msg->header.size);
2770   uint32_t num_peers = ntohl (msg->num_peers);
2771
2772   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2773   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2774        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2775   {
2776     GNUNET_break (0);
2777     GNUNET_SERVICE_client_drop (cli_ctx->client);
2778     return GNUNET_SYSERR;
2779   }
2780   return GNUNET_OK;
2781 }
2782
2783
2784 /**
2785  * Handle seed from the client.
2786  *
2787  * @param cls closure
2788  * @param message the actual message
2789  */
2790 static void
2791 handle_client_seed (void *cls,
2792                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2793 {
2794   struct ClientContext *cli_ctx = cls;
2795   struct GNUNET_PeerIdentity *peers;
2796   uint32_t num_peers;
2797   uint32_t i;
2798
2799   num_peers = ntohl (msg->num_peers);
2800   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2801
2802   LOG (GNUNET_ERROR_TYPE_DEBUG,
2803        "Client seeded peers:\n");
2804   print_peer_list (peers, num_peers);
2805
2806   for (i = 0; i < num_peers; i++)
2807   {
2808     LOG (GNUNET_ERROR_TYPE_DEBUG,
2809          "Updating samplers with seed %" PRIu32 ": %s\n",
2810          i,
2811          GNUNET_i2s (&peers[i]));
2812
2813     got_peer (&peers[i]);
2814   }
2815   GNUNET_SERVICE_client_continue (cli_ctx->client);
2816 }
2817
2818 /**
2819  * @brief Send view to client
2820  *
2821  * @param cli_ctx the context of the client
2822  * @param view_array the peerids of the view as array (can be empty)
2823  * @param view_size the size of the view array (can be 0)
2824  */
2825 void
2826 send_view (const struct ClientContext *cli_ctx,
2827            const struct GNUNET_PeerIdentity *view_array,
2828            uint64_t view_size)
2829 {
2830   struct GNUNET_MQ_Envelope *ev;
2831   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2832
2833   if (NULL == view_array)
2834   {
2835     view_size = View_size ();
2836     view_array = View_get_as_array();
2837   }
2838
2839   ev = GNUNET_MQ_msg_extra (out_msg,
2840                             view_size * sizeof (struct GNUNET_PeerIdentity),
2841                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2842   out_msg->num_peers = htonl (view_size);
2843
2844   GNUNET_memcpy (&out_msg[1],
2845           view_array,
2846           view_size * sizeof (struct GNUNET_PeerIdentity));
2847   GNUNET_MQ_send (cli_ctx->mq, ev);
2848 }
2849
2850 /**
2851  * @brief sends updates to clients that are interested
2852  */
2853 static void
2854 clients_notify_view_update (void)
2855 {
2856   struct ClientContext *cli_ctx_iter;
2857   uint64_t num_peers;
2858   const struct GNUNET_PeerIdentity *view_array;
2859
2860   num_peers = View_size ();
2861   view_array = View_get_as_array();
2862   /* check size of view is small enough */
2863   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2864   {
2865     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2866                 "View is too big to send\n");
2867     return;
2868   }
2869
2870   for (cli_ctx_iter = cli_ctx_head;
2871        NULL != cli_ctx_iter;
2872        cli_ctx_iter = cli_ctx_head->next)
2873   {
2874     if (1 < cli_ctx_iter->view_updates_left)
2875     {
2876       /* Client wants to receive limited amount of updates */
2877       cli_ctx_iter->view_updates_left -= 1;
2878     } else if (1 == cli_ctx_iter->view_updates_left)
2879     {
2880       /* Last update of view for client */
2881       cli_ctx_iter->view_updates_left = -1;
2882     } else if (0 > cli_ctx_iter->view_updates_left) {
2883       /* Client is not interested in updates */
2884       continue;
2885     }
2886     /* else _updates_left == 0 - infinite amount of updates */
2887
2888     /* send view */
2889     send_view (cli_ctx_iter, view_array, num_peers);
2890   }
2891 }
2892
2893
2894 /**
2895  * Handle RPS request from the client.
2896  *
2897  * @param cls closure
2898  * @param message the actual message
2899  */
2900 static void
2901 handle_client_view_request (void *cls,
2902                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2903 {
2904   struct ClientContext *cli_ctx = cls;
2905   uint64_t num_updates;
2906
2907   num_updates = ntohl (msg->num_updates);
2908
2909   LOG (GNUNET_ERROR_TYPE_DEBUG,
2910        "Client requested %" PRIu64 " updates of view.\n",
2911        num_updates);
2912
2913   GNUNET_assert (NULL != cli_ctx);
2914   cli_ctx->view_updates_left = num_updates;
2915   send_view (cli_ctx, NULL, 0);
2916   GNUNET_SERVICE_client_continue (cli_ctx->client);
2917 }
2918
2919 /**
2920  * Handle a CHECK_LIVE message from another peer.
2921  *
2922  * This does nothing. But without calling #GNUNET_CADET_receive_done()
2923  * the channel is blocked for all other communication.
2924  *
2925  * @param cls Closure
2926  * @param msg The message header
2927  */
2928 static void
2929 handle_peer_check (void *cls,
2930                    const struct GNUNET_MessageHeader *msg)
2931 {
2932   const struct ChannelCtx *channel_ctx = cls;
2933   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2934   LOG (GNUNET_ERROR_TYPE_DEBUG,
2935       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2936
2937   GNUNET_CADET_receive_done (channel_ctx->channel);
2938 }
2939
2940 /**
2941  * Handle a PUSH message from another peer.
2942  *
2943  * Check the proof of work and store the PeerID
2944  * in the temporary list for pushed PeerIDs.
2945  *
2946  * @param cls Closure
2947  * @param msg The message header
2948  */
2949 static void
2950 handle_peer_push (void *cls,
2951                   const struct GNUNET_MessageHeader *msg)
2952 {
2953   const struct ChannelCtx *channel_ctx = cls;
2954   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2955
2956   // (check the proof of work (?))
2957
2958   LOG (GNUNET_ERROR_TYPE_DEBUG,
2959        "Received PUSH (%s)\n",
2960        GNUNET_i2s (peer));
2961   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
2962
2963   #ifdef ENABLE_MALICIOUS
2964   struct AttackedPeer *tmp_att_peer;
2965
2966   if ( (1 == mal_type) ||
2967        (3 == mal_type) )
2968   { /* Try to maximise representation */
2969     tmp_att_peer = GNUNET_new (struct AttackedPeer);
2970     tmp_att_peer->peer_id = *peer;
2971     if (NULL == att_peer_set)
2972       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2973     if (GNUNET_NO ==
2974         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2975                                                 peer))
2976     {
2977       GNUNET_CONTAINER_DLL_insert (att_peers_head,
2978                                    att_peers_tail,
2979                                    tmp_att_peer);
2980       add_peer_array_to_set (peer, 1, att_peer_set);
2981     }
2982     else
2983     {
2984       GNUNET_free (tmp_att_peer);
2985     }
2986   }
2987
2988
2989   else if (2 == mal_type)
2990   {
2991     /* We attack one single well-known peer - simply ignore */
2992   }
2993   #endif /* ENABLE_MALICIOUS */
2994
2995   /* Add the sending peer to the push_map */
2996   CustomPeerMap_put (push_map, peer);
2997
2998   GNUNET_break_op (check_peer_known (peer));
2999   GNUNET_CADET_receive_done (channel_ctx->channel);
3000 }
3001
3002
3003 /**
3004  * Handle PULL REQUEST request message from another peer.
3005  *
3006  * Reply with the view of PeerIDs.
3007  *
3008  * @param cls Closure
3009  * @param msg The message header
3010  */
3011 static void
3012 handle_peer_pull_request (void *cls,
3013                           const struct GNUNET_MessageHeader *msg)
3014 {
3015   const struct ChannelCtx *channel_ctx = cls;
3016   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3017   const struct GNUNET_PeerIdentity *view_array;
3018
3019   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3020   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3021
3022   #ifdef ENABLE_MALICIOUS
3023   if (1 == mal_type
3024       || 3 == mal_type)
3025   { /* Try to maximise representation */
3026     send_pull_reply (peer, mal_peers, num_mal_peers);
3027   }
3028
3029   else if (2 == mal_type)
3030   { /* Try to partition network */
3031     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3032     {
3033       send_pull_reply (peer, mal_peers, num_mal_peers);
3034     }
3035   }
3036   #endif /* ENABLE_MALICIOUS */
3037
3038   GNUNET_break_op (check_peer_known (peer));
3039   GNUNET_CADET_receive_done (channel_ctx->channel);
3040   view_array = View_get_as_array ();
3041   send_pull_reply (peer, view_array, View_size ());
3042 }
3043
3044
3045 /**
3046  * Check whether we sent a corresponding request and
3047  * whether this reply is the first one.
3048  *
3049  * @param cls Closure
3050  * @param msg The message header
3051  */
3052 static int
3053 check_peer_pull_reply (void *cls,
3054                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3055 {
3056   struct GNUNET_PeerIdentity *sender = cls;
3057
3058   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3059   {
3060     GNUNET_break_op (0);
3061     return GNUNET_SYSERR;
3062   }
3063
3064   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3065       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3066   {
3067     LOG (GNUNET_ERROR_TYPE_ERROR,
3068         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3069         ntohl (msg->num_peers),
3070         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3071             sizeof (struct GNUNET_PeerIdentity));
3072     GNUNET_break_op (0);
3073     return GNUNET_SYSERR;
3074   }
3075
3076   if (GNUNET_YES != check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3077   {
3078     LOG (GNUNET_ERROR_TYPE_WARNING,
3079         "Received a pull reply from a peer (%s) we didn't request one from!\n",
3080         GNUNET_i2s (sender));
3081     GNUNET_break_op (0);
3082     return GNUNET_SYSERR;
3083   }
3084   return GNUNET_OK;
3085 }
3086
3087 /**
3088  * Handle PULL REPLY message from another peer.
3089  *
3090  * @param cls Closure
3091  * @param msg The message header
3092  */
3093 static void
3094 handle_peer_pull_reply (void *cls,
3095                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3096 {
3097   const struct ChannelCtx *channel_ctx = cls;
3098   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3099   const struct GNUNET_PeerIdentity *peers;
3100   uint32_t i;
3101 #ifdef ENABLE_MALICIOUS
3102   struct AttackedPeer *tmp_att_peer;
3103 #endif /* ENABLE_MALICIOUS */
3104
3105   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3106   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3107
3108   #ifdef ENABLE_MALICIOUS
3109   // We shouldn't even receive pull replies as we're not sending
3110   if (2 == mal_type)
3111   {
3112   }
3113   #endif /* ENABLE_MALICIOUS */
3114
3115   /* Do actual logic */
3116   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3117
3118   LOG (GNUNET_ERROR_TYPE_DEBUG,
3119        "PULL REPLY received, got following %u peers:\n",
3120        ntohl (msg->num_peers));
3121
3122   for (i = 0; i < ntohl (msg->num_peers); i++)
3123   {
3124     LOG (GNUNET_ERROR_TYPE_DEBUG,
3125          "%u. %s\n",
3126          i,
3127          GNUNET_i2s (&peers[i]));
3128
3129     #ifdef ENABLE_MALICIOUS
3130     if ((NULL != att_peer_set) &&
3131         (1 == mal_type || 3 == mal_type))
3132     { /* Add attacked peer to local list */
3133       // TODO check if we sent a request and this was the first reply
3134       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3135                                                                &peers[i])
3136           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3137                                                                   &peers[i]))
3138       {
3139         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3140         tmp_att_peer->peer_id = peers[i];
3141         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3142                                      att_peers_tail,
3143                                      tmp_att_peer);
3144         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3145       }
3146       continue;
3147     }
3148     #endif /* ENABLE_MALICIOUS */
3149     /* Make sure we 'know' about this peer */
3150     (void) insert_peer (&peers[i]);
3151
3152     if (GNUNET_YES == check_peer_valid (&peers[i]))
3153     {
3154       CustomPeerMap_put (pull_map, &peers[i]);
3155     }
3156     else
3157     {
3158       schedule_operation (&peers[i], insert_in_pull_map);
3159       (void) issue_peer_liveliness_check (&peers[i]);
3160     }
3161   }
3162
3163   UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING);
3164   clean_peer (sender);
3165
3166   GNUNET_break_op (check_peer_known (sender));
3167   GNUNET_CADET_receive_done (channel_ctx->channel);
3168 }
3169
3170
3171 /**
3172  * Compute a random delay.
3173  * A uniformly distributed value between mean + spread and mean - spread.
3174  *
3175  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3176  * It would return a random value between 2 and 6 min.
3177  *
3178  * @param mean the mean
3179  * @param spread the inverse amount of deviation from the mean
3180  */
3181 static struct GNUNET_TIME_Relative
3182 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3183                     unsigned int spread)
3184 {
3185   struct GNUNET_TIME_Relative half_interval;
3186   struct GNUNET_TIME_Relative ret;
3187   unsigned int rand_delay;
3188   unsigned int max_rand_delay;
3189
3190   if (0 == spread)
3191   {
3192     LOG (GNUNET_ERROR_TYPE_WARNING,
3193          "Not accepting spread of 0\n");
3194     GNUNET_break (0);
3195     GNUNET_assert (0);
3196   }
3197   GNUNET_assert (0 != mean.rel_value_us);
3198
3199   /* Compute random time value between spread * mean and spread * mean */
3200   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3201
3202   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3203   /**
3204    * Compute random value between (0 and 1) * round_interval
3205    * via multiplying round_interval with a 'fraction' (0 to value)/value
3206    */
3207   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3208   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3209   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3210   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3211
3212   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3213     LOG (GNUNET_ERROR_TYPE_WARNING,
3214          "Returning FOREVER_REL\n");
3215
3216   return ret;
3217 }
3218
3219
3220 /**
3221  * Send single pull request
3222  *
3223  * @param peer_id the peer to send the pull request to.
3224  */
3225 static void
3226 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3227 {
3228   struct GNUNET_MQ_Envelope *ev;
3229
3230   GNUNET_assert (GNUNET_NO == check_peer_flag (peer,
3231                                                      Peers_PULL_REPLY_PENDING));
3232   SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING);
3233
3234   LOG (GNUNET_ERROR_TYPE_DEBUG,
3235        "Going to send PULL REQUEST to peer %s.\n",
3236        GNUNET_i2s (peer));
3237
3238   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3239   send_message (peer, ev, "PULL REQUEST");
3240   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3241 }
3242
3243
3244 /**
3245  * Send single push
3246  *
3247  * @param peer_id the peer to send the push to.
3248  */
3249 static void
3250 send_push (const struct GNUNET_PeerIdentity *peer_id)
3251 {
3252   struct GNUNET_MQ_Envelope *ev;
3253
3254   LOG (GNUNET_ERROR_TYPE_DEBUG,
3255        "Going to send PUSH to peer %s.\n",
3256        GNUNET_i2s (peer_id));
3257
3258   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3259   send_message (peer_id, ev, "PUSH");
3260   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3261 }
3262
3263
3264 static void
3265 do_round (void *cls);
3266
3267 static void
3268 do_mal_round (void *cls);
3269
3270 #ifdef ENABLE_MALICIOUS
3271
3272
3273 /**
3274  * @brief This function is called, when the client tells us to act malicious.
3275  * It verifies that @a msg is well-formed.
3276  *
3277  * @param cls the closure (#ClientContext)
3278  * @param msg the message
3279  * @return #GNUNET_OK if @a msg is well-formed
3280  */
3281 static int
3282 check_client_act_malicious (void *cls,
3283                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3284 {
3285   struct ClientContext *cli_ctx = cls;
3286   uint16_t msize = ntohs (msg->header.size);
3287   uint32_t num_peers = ntohl (msg->num_peers);
3288
3289   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3290   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3291        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3292   {
3293     LOG (GNUNET_ERROR_TYPE_ERROR,
3294         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3295         ntohl (msg->num_peers),
3296         (msize / sizeof (struct GNUNET_PeerIdentity)));
3297     GNUNET_break (0);
3298     GNUNET_SERVICE_client_drop (cli_ctx->client);
3299     return GNUNET_SYSERR;
3300   }
3301   return GNUNET_OK;
3302 }
3303
3304 /**
3305  * Turn RPS service to act malicious.
3306  *
3307  * @param cls Closure
3308  * @param client The client that sent the message
3309  * @param msg The message header
3310  */
3311 static void
3312 handle_client_act_malicious (void *cls,
3313                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3314 {
3315   struct ClientContext *cli_ctx = cls;
3316   struct GNUNET_PeerIdentity *peers;
3317   uint32_t num_mal_peers_sent;
3318   uint32_t num_mal_peers_old;
3319
3320   /* Do actual logic */
3321   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3322   mal_type = ntohl (msg->type);
3323   if (NULL == mal_peer_set)
3324     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3325
3326   LOG (GNUNET_ERROR_TYPE_DEBUG,
3327        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3328        mal_type,
3329        ntohl (msg->num_peers));
3330
3331   if (1 == mal_type)
3332   { /* Try to maximise representation */
3333     /* Add other malicious peers to those we already know */
3334
3335     num_mal_peers_sent = ntohl (msg->num_peers);
3336     num_mal_peers_old = num_mal_peers;
3337     GNUNET_array_grow (mal_peers,
3338                        num_mal_peers,
3339                        num_mal_peers + num_mal_peers_sent);
3340     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3341             peers,
3342             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3343
3344     /* Add all mal peers to mal_peer_set */
3345     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3346                            num_mal_peers_sent,
3347                            mal_peer_set);
3348
3349     /* Substitute do_round () with do_mal_round () */
3350     GNUNET_SCHEDULER_cancel (do_round_task);
3351     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3352   }
3353
3354   else if ( (2 == mal_type) ||
3355             (3 == mal_type) )
3356   { /* Try to partition the network */
3357     /* Add other malicious peers to those we already know */
3358
3359     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3360     num_mal_peers_old = num_mal_peers;
3361     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3362     GNUNET_array_grow (mal_peers,
3363                        num_mal_peers,
3364                        num_mal_peers + num_mal_peers_sent);
3365     if (NULL != mal_peers &&
3366         0 != num_mal_peers)
3367     {
3368       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3369               peers,
3370               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3371
3372       /* Add all mal peers to mal_peer_set */
3373       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3374                              num_mal_peers_sent,
3375                              mal_peer_set);
3376     }
3377
3378     /* Store the one attacked peer */
3379     GNUNET_memcpy (&attacked_peer,
3380             &msg->attacked_peer,
3381             sizeof (struct GNUNET_PeerIdentity));
3382     /* Set the flag of the attacked peer to valid to avoid problems */
3383     if (GNUNET_NO == check_peer_known (&attacked_peer))
3384     {
3385       (void) issue_peer_liveliness_check (&attacked_peer);
3386     }
3387
3388     LOG (GNUNET_ERROR_TYPE_DEBUG,
3389          "Attacked peer is %s\n",
3390          GNUNET_i2s (&attacked_peer));
3391
3392     /* Substitute do_round () with do_mal_round () */
3393     GNUNET_SCHEDULER_cancel (do_round_task);
3394     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3395   }
3396   else if (0 == mal_type)
3397   { /* Stop acting malicious */
3398     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3399
3400     /* Substitute do_mal_round () with do_round () */
3401     GNUNET_SCHEDULER_cancel (do_round_task);
3402     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3403   }
3404   else
3405   {
3406     GNUNET_break (0);
3407     GNUNET_SERVICE_client_continue (cli_ctx->client);
3408   }
3409   GNUNET_SERVICE_client_continue (cli_ctx->client);
3410 }
3411
3412
3413 /**
3414  * Send out PUSHes and PULLs maliciously.
3415  *
3416  * This is executed regylary.
3417  */
3418 static void
3419 do_mal_round (void *cls)
3420 {
3421   uint32_t num_pushes;
3422   uint32_t i;
3423   struct GNUNET_TIME_Relative time_next_round;
3424   struct AttackedPeer *tmp_att_peer;
3425
3426   LOG (GNUNET_ERROR_TYPE_DEBUG,
3427        "Going to execute next round maliciously type %" PRIu32 ".\n",
3428       mal_type);
3429   do_round_task = NULL;
3430   GNUNET_assert (mal_type <= 3);
3431   /* Do malicious actions */
3432   if (1 == mal_type)
3433   { /* Try to maximise representation */
3434
3435     /* The maximum of pushes we're going to send this round */
3436     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3437                                          num_attacked_peers),
3438                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3439
3440     LOG (GNUNET_ERROR_TYPE_DEBUG,
3441          "Going to send %" PRIu32 " pushes\n",
3442          num_pushes);
3443
3444     /* Send PUSHes to attacked peers */
3445     for (i = 0 ; i < num_pushes ; i++)
3446     {
3447       if (att_peers_tail == att_peer_index)
3448         att_peer_index = att_peers_head;
3449       else
3450         att_peer_index = att_peer_index->next;
3451
3452       send_push (&att_peer_index->peer_id);
3453     }
3454
3455     /* Send PULLs to some peers to learn about additional peers to attack */
3456     tmp_att_peer = att_peer_index;
3457     for (i = 0 ; i < num_pushes * alpha ; i++)
3458     {
3459       if (att_peers_tail == tmp_att_peer)
3460         tmp_att_peer = att_peers_head;
3461       else
3462         att_peer_index = tmp_att_peer->next;
3463
3464       send_pull_request (&tmp_att_peer->peer_id);
3465     }
3466   }
3467
3468
3469   else if (2 == mal_type)
3470   { /**
3471      * Try to partition the network
3472      * Send as many pushes to the attacked peer as possible
3473      * That is one push per round as it will ignore more.
3474      */
3475     (void) issue_peer_liveliness_check (&attacked_peer);
3476     if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3477       send_push (&attacked_peer);
3478   }
3479
3480
3481   if (3 == mal_type)
3482   { /* Combined attack */
3483
3484     /* Send PUSH to attacked peers */
3485     if (GNUNET_YES == check_peer_known (&attacked_peer))
3486     {
3487       (void) issue_peer_liveliness_check (&attacked_peer);
3488       if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3489       {
3490         LOG (GNUNET_ERROR_TYPE_DEBUG,
3491             "Goding to send push to attacked peer (%s)\n",
3492             GNUNET_i2s (&attacked_peer));
3493         send_push (&attacked_peer);
3494       }
3495     }
3496     (void) issue_peer_liveliness_check (&attacked_peer);
3497
3498     /* The maximum of pushes we're going to send this round */
3499     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3500                                          num_attacked_peers),
3501                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3502
3503     LOG (GNUNET_ERROR_TYPE_DEBUG,
3504          "Going to send %" PRIu32 " pushes\n",
3505          num_pushes);
3506
3507     for (i = 0; i < num_pushes; i++)
3508     {
3509       if (att_peers_tail == att_peer_index)
3510         att_peer_index = att_peers_head;
3511       else
3512         att_peer_index = att_peer_index->next;
3513
3514       send_push (&att_peer_index->peer_id);
3515     }
3516
3517     /* Send PULLs to some peers to learn about additional peers to attack */
3518     tmp_att_peer = att_peer_index;
3519     for (i = 0; i < num_pushes * alpha; i++)
3520     {
3521       if (att_peers_tail == tmp_att_peer)
3522         tmp_att_peer = att_peers_head;
3523       else
3524         att_peer_index = tmp_att_peer->next;
3525
3526       send_pull_request (&tmp_att_peer->peer_id);
3527     }
3528   }
3529
3530   /* Schedule next round */
3531   time_next_round = compute_rand_delay (round_interval, 2);
3532
3533   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3534   //NULL);
3535   GNUNET_assert (NULL == do_round_task);
3536   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3537                                                 &do_mal_round, NULL);
3538   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3539 }
3540 #endif /* ENABLE_MALICIOUS */
3541
3542 /**
3543  * Send out PUSHes and PULLs, possibly update #view, samplers.
3544  *
3545  * This is executed regylary.
3546  */
3547 static void
3548 do_round (void *cls)
3549 {
3550   uint32_t i;
3551   const struct GNUNET_PeerIdentity *view_array;
3552   unsigned int *permut;
3553   unsigned int a_peers; /* Number of peers we send pushes to */
3554   unsigned int b_peers; /* Number of peers we send pull requests to */
3555   uint32_t first_border;
3556   uint32_t second_border;
3557   struct GNUNET_PeerIdentity peer;
3558   struct GNUNET_PeerIdentity *update_peer;
3559
3560   LOG (GNUNET_ERROR_TYPE_DEBUG,
3561        "Going to execute next round.\n");
3562   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3563   do_round_task = NULL;
3564   LOG (GNUNET_ERROR_TYPE_DEBUG,
3565        "Printing view:\n");
3566   to_file (file_name_view_log,
3567            "___ new round ___");
3568   view_array = View_get_as_array ();
3569   for (i = 0; i < View_size (); i++)
3570   {
3571     LOG (GNUNET_ERROR_TYPE_DEBUG,
3572          "\t%s\n", GNUNET_i2s (&view_array[i]));
3573     to_file (file_name_view_log,
3574              "=%s\t(do round)",
3575              GNUNET_i2s_full (&view_array[i]));
3576   }
3577
3578
3579   /* Send pushes and pull requests */
3580   if (0 < View_size ())
3581   {
3582     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3583                                            View_size ());
3584
3585     /* Send PUSHes */
3586     a_peers = ceil (alpha * View_size ());
3587
3588     LOG (GNUNET_ERROR_TYPE_DEBUG,
3589          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3590          a_peers, alpha, View_size ());
3591     for (i = 0; i < a_peers; i++)
3592     {
3593       peer = view_array[permut[i]];
3594       // FIXME if this fails schedule/loop this for later
3595       send_push (&peer);
3596     }
3597
3598     /* Send PULL requests */
3599     b_peers = ceil (beta * View_size ());
3600     first_border = a_peers;
3601     second_border = a_peers + b_peers;
3602     if (second_border > View_size ())
3603     {
3604       first_border = View_size () - b_peers;
3605       second_border = View_size ();
3606     }
3607     LOG (GNUNET_ERROR_TYPE_DEBUG,
3608         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3609         b_peers, beta, View_size ());
3610     for (i = first_border; i < second_border; i++)
3611     {
3612       peer = view_array[permut[i]];
3613       if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3614       { // FIXME if this fails schedule/loop this for later
3615         send_pull_request (&peer);
3616       }
3617     }
3618
3619     GNUNET_free (permut);
3620     permut = NULL;
3621   }
3622
3623
3624   /* Update view */
3625   /* TODO see how many peers are in push-/pull- list! */
3626
3627   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3628       (0 < CustomPeerMap_size (push_map)) &&
3629       (0 < CustomPeerMap_size (pull_map)))
3630   //if (GNUNET_YES) // disable blocking temporarily
3631   { /* If conditions for update are fulfilled, update */
3632     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3633
3634     uint32_t final_size;
3635     uint32_t peers_to_clean_size;
3636     struct GNUNET_PeerIdentity *peers_to_clean;
3637
3638     peers_to_clean = NULL;
3639     peers_to_clean_size = 0;
3640     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3641     GNUNET_memcpy (peers_to_clean,
3642             view_array,
3643             View_size () * sizeof (struct GNUNET_PeerIdentity));
3644
3645     /* Seems like recreating is the easiest way of emptying the peermap */
3646     View_clear ();
3647     to_file (file_name_view_log,
3648              "--- emptied ---");
3649
3650     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3651                                 CustomPeerMap_size (push_map));
3652     second_border = first_border +
3653                     GNUNET_MIN (floor (beta  * view_size_est_need),
3654                                 CustomPeerMap_size (pull_map));
3655     final_size    = second_border +
3656       ceil ((1 - (alpha + beta)) * view_size_est_need);
3657     LOG (GNUNET_ERROR_TYPE_DEBUG,
3658         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3659         first_border,
3660         second_border,
3661         final_size);
3662
3663     /* Update view with peers received through PUSHes */
3664     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3665                                            CustomPeerMap_size (push_map));
3666     for (i = 0; i < first_border; i++)
3667     {
3668       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3669                                                               permut[i]));
3670       to_file (file_name_view_log,
3671                "+%s\t(push list)",
3672                GNUNET_i2s_full (&view_array[i]));
3673       // TODO change the peer_flags accordingly
3674     }
3675     GNUNET_free (permut);
3676     permut = NULL;
3677
3678     /* Update view with peers received through PULLs */
3679     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3680                                            CustomPeerMap_size (pull_map));
3681     for (i = first_border; i < second_border; i++)
3682     {
3683       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3684             permut[i - first_border]));
3685       to_file (file_name_view_log,
3686                "+%s\t(pull list)",
3687                GNUNET_i2s_full (&view_array[i]));
3688       // TODO change the peer_flags accordingly
3689     }
3690     GNUNET_free (permut);
3691     permut = NULL;
3692
3693     /* Update view with peers from history */
3694     RPS_sampler_get_n_rand_peers (prot_sampler,
3695                                   hist_update,
3696                                   NULL,
3697                                   final_size - second_border);
3698     // TODO change the peer_flags accordingly
3699
3700     for (i = 0; i < View_size (); i++)
3701       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3702
3703     /* Clean peers that were removed from the view */
3704     for (i = 0; i < peers_to_clean_size; i++)
3705     {
3706       to_file (file_name_view_log,
3707                "-%s",
3708                GNUNET_i2s_full (&peers_to_clean[i]));
3709       clean_peer (&peers_to_clean[i]);
3710     }
3711
3712     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3713     clients_notify_view_update();
3714   } else {
3715     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3716     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3717     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3718         !(0 >= CustomPeerMap_size (pull_map)))
3719       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3720     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3721         (0 >= CustomPeerMap_size (pull_map)))
3722       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3723     if (0 >= CustomPeerMap_size (push_map) &&
3724         !(0 >= CustomPeerMap_size (pull_map)))
3725       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3726     if (0 >= CustomPeerMap_size (push_map) &&
3727         (0 >= CustomPeerMap_size (pull_map)))
3728       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3729     if (0 >= CustomPeerMap_size (pull_map) &&
3730         CustomPeerMap_size (push_map) > alpha * View_size () &&
3731         0 >= CustomPeerMap_size (push_map))
3732       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3733   }
3734   // TODO independent of that also get some peers from CADET_get_peers()?
3735   GNUNET_STATISTICS_set (stats,
3736       "# peers in push map at end of round",
3737       CustomPeerMap_size (push_map),
3738       GNUNET_NO);
3739   GNUNET_STATISTICS_set (stats,
3740       "# peers in pull map at end of round",
3741       CustomPeerMap_size (pull_map),
3742       GNUNET_NO);
3743   GNUNET_STATISTICS_set (stats,
3744       "# peers in view at end of round",
3745       View_size (),
3746       GNUNET_NO);
3747
3748   LOG (GNUNET_ERROR_TYPE_DEBUG,
3749        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3750        CustomPeerMap_size (push_map),
3751        CustomPeerMap_size (pull_map),
3752        alpha,
3753        View_size (),
3754        alpha * View_size ());
3755
3756   /* Update samplers */
3757   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3758   {
3759     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3760     LOG (GNUNET_ERROR_TYPE_DEBUG,
3761          "Updating with peer %s from push list\n",
3762          GNUNET_i2s (update_peer));
3763     insert_in_sampler (NULL, update_peer);
3764     clean_peer (update_peer); /* This cleans only if it is not in the view */
3765   }
3766
3767   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3768   {
3769     LOG (GNUNET_ERROR_TYPE_DEBUG,
3770          "Updating with peer %s from pull list\n",
3771          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3772     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3773     /* This cleans only if it is not in the view */
3774     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3775   }
3776
3777
3778   /* Empty push/pull lists */
3779   CustomPeerMap_clear (push_map);
3780   CustomPeerMap_clear (pull_map);
3781
3782   struct GNUNET_TIME_Relative time_next_round;
3783
3784   time_next_round = compute_rand_delay (round_interval, 2);
3785
3786   /* Schedule next round */
3787   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3788                                                 &do_round, NULL);
3789   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3790 }
3791
3792
3793 /**
3794  * This is called from GNUNET_CADET_get_peers().
3795  *
3796  * It is called on every peer(ID) that cadet somehow has contact with.
3797  * We use those to initialise the sampler.
3798  */
3799 void
3800 init_peer_cb (void *cls,
3801               const struct GNUNET_PeerIdentity *peer,
3802               int tunnel, // "Do we have a tunnel towards this peer?"
3803               unsigned int n_paths, // "Number of known paths towards this peer"
3804               unsigned int best_path) // "How long is the best path?
3805                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3806 {
3807   if (NULL != peer)
3808   {
3809     LOG (GNUNET_ERROR_TYPE_DEBUG,
3810          "Got peer_id %s from cadet\n",
3811          GNUNET_i2s (peer));
3812     got_peer (peer);
3813   }
3814 }
3815
3816 /**
3817  * @brief Iterator function over stored, valid peers.
3818  *
3819  * We initialise the sampler with those.
3820  *
3821  * @param cls the closure
3822  * @param peer the peer id
3823  * @return #GNUNET_YES if we should continue to
3824  *         iterate,
3825  *         #GNUNET_NO if not.
3826  */
3827 static int
3828 valid_peers_iterator (void *cls,
3829                       const struct GNUNET_PeerIdentity *peer)
3830 {
3831   if (NULL != peer)
3832   {
3833     LOG (GNUNET_ERROR_TYPE_DEBUG,
3834          "Got stored, valid peer %s\n",
3835          GNUNET_i2s (peer));
3836     got_peer (peer);
3837   }
3838   return GNUNET_YES;
3839 }
3840
3841
3842 /**
3843  * Iterator over peers from peerinfo.
3844  *
3845  * @param cls closure
3846  * @param peer id of the peer, NULL for last call
3847  * @param hello hello message for the peer (can be NULL)
3848  * @param error message
3849  */
3850 void
3851 process_peerinfo_peers (void *cls,
3852                         const struct GNUNET_PeerIdentity *peer,
3853                         const struct GNUNET_HELLO_Message *hello,
3854                         const char *err_msg)
3855 {
3856   if (NULL != peer)
3857   {
3858     LOG (GNUNET_ERROR_TYPE_DEBUG,
3859          "Got peer_id %s from peerinfo\n",
3860          GNUNET_i2s (peer));
3861     got_peer (peer);
3862   }
3863 }
3864
3865
3866 /**
3867  * Task run during shutdown.
3868  *
3869  * @param cls unused
3870  */
3871 static void
3872 shutdown_task (void *cls)
3873 {
3874   struct ClientContext *client_ctx;
3875   struct ReplyCls *reply_cls;
3876
3877   in_shutdown = GNUNET_YES;
3878
3879   LOG (GNUNET_ERROR_TYPE_DEBUG,
3880        "RPS is going down\n");
3881
3882   /* Clean all clients */
3883   for (client_ctx = cli_ctx_head;
3884        NULL != cli_ctx_head;
3885        client_ctx = cli_ctx_head)
3886   {
3887     /* Clean pending requests to the sampler */
3888     for (reply_cls = client_ctx->rep_cls_head;
3889          NULL != client_ctx->rep_cls_head;
3890          reply_cls = client_ctx->rep_cls_head)
3891     {
3892       RPS_sampler_request_cancel (reply_cls->req_handle);
3893       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
3894                                    client_ctx->rep_cls_tail,
3895                                    reply_cls);
3896       GNUNET_free (reply_cls);
3897     }
3898     GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3899                                  cli_ctx_tail,
3900                                  client_ctx);
3901     GNUNET_free (client_ctx);
3902   }
3903   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3904   GNUNET_PEERINFO_disconnect (peerinfo_handle);
3905   peerinfo_handle = NULL;
3906   if (NULL != do_round_task)
3907   {
3908     GNUNET_SCHEDULER_cancel (do_round_task);
3909     do_round_task = NULL;
3910   }
3911
3912   peers_terminate ();
3913
3914   GNUNET_NSE_disconnect (nse);
3915   RPS_sampler_destroy (prot_sampler);
3916   RPS_sampler_destroy (client_sampler);
3917   GNUNET_CADET_close_port (cadet_port);
3918   GNUNET_CADET_disconnect (cadet_handle);
3919   cadet_handle = NULL;
3920   View_destroy ();
3921   CustomPeerMap_destroy (push_map);
3922   CustomPeerMap_destroy (pull_map);
3923   if (NULL != stats)
3924   {
3925     GNUNET_STATISTICS_destroy (stats,
3926                                GNUNET_NO);
3927     stats = NULL;
3928   }
3929 #ifdef ENABLE_MALICIOUS
3930   struct AttackedPeer *tmp_att_peer;
3931   /* it is ok to free this const during shutdown: */
3932   GNUNET_free ((char *) file_name_view_log);
3933 #ifdef TO_FILE
3934   GNUNET_free ((char *) file_name_observed_log);
3935   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
3936 #endif /* TO_FILE */
3937   GNUNET_array_grow (mal_peers,
3938                      num_mal_peers,
3939                      0);
3940   if (NULL != mal_peer_set)
3941     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3942   if (NULL != att_peer_set)
3943     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
3944   while (NULL != att_peers_head)
3945   {
3946     tmp_att_peer = att_peers_head;
3947     GNUNET_CONTAINER_DLL_remove (att_peers_head,
3948                                  att_peers_tail,
3949                                  tmp_att_peer);
3950     GNUNET_free (tmp_att_peer);
3951   }
3952 #endif /* ENABLE_MALICIOUS */
3953 }
3954
3955
3956 /**
3957  * Handle client connecting to the service.
3958  *
3959  * @param cls NULL
3960  * @param client the new client
3961  * @param mq the message queue of @a client
3962  * @return @a client
3963  */
3964 static void *
3965 client_connect_cb (void *cls,
3966                    struct GNUNET_SERVICE_Client *client,
3967                    struct GNUNET_MQ_Handle *mq)
3968 {
3969   struct ClientContext *cli_ctx;
3970
3971   LOG (GNUNET_ERROR_TYPE_DEBUG,
3972        "Client connected\n");
3973   if (NULL == client)
3974     return client; /* Server was destroyed before a client connected. Shutting down */
3975   cli_ctx = GNUNET_new (struct ClientContext);
3976   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
3977   cli_ctx->view_updates_left = -1;
3978   cli_ctx->client = client;
3979   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
3980                                cli_ctx_tail,
3981                                cli_ctx);
3982   return cli_ctx;
3983 }
3984
3985 /**
3986  * Callback called when a client disconnected from the service
3987  *
3988  * @param cls closure for the service
3989  * @param c the client that disconnected
3990  * @param internal_cls should be equal to @a c
3991  */
3992 static void
3993 client_disconnect_cb (void *cls,
3994                       struct GNUNET_SERVICE_Client *client,
3995                       void *internal_cls)
3996 {
3997   struct ClientContext *cli_ctx = internal_cls;
3998
3999   (void) cls;
4000   GNUNET_assert (client == cli_ctx->client);
4001   if (NULL == client)
4002   {/* shutdown task - destroy all clients */
4003     while (NULL != cli_ctx_head)
4004       destroy_cli_ctx (cli_ctx_head);
4005   }
4006   else
4007   { /* destroy this client */
4008     LOG (GNUNET_ERROR_TYPE_DEBUG,
4009         "Client disconnected. Destroy its context.\n");
4010     destroy_cli_ctx (cli_ctx);
4011   }
4012 }
4013
4014
4015 /**
4016  * Handle random peer sampling clients.
4017  *
4018  * @param cls closure
4019  * @param c configuration to use
4020  * @param service the initialized service
4021  */
4022 static void
4023 run (void *cls,
4024      const struct GNUNET_CONFIGURATION_Handle *c,
4025      struct GNUNET_SERVICE_Handle *service)
4026 {
4027   char *fn_valid_peers;
4028
4029   (void) cls;
4030   (void) service;
4031   GNUNET_log_setup ("rps",
4032                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4033                     NULL);
4034   cfg = c;
4035   /* Get own ID */
4036   GNUNET_CRYPTO_get_peer_identity (cfg,
4037                                    &own_identity); // TODO check return value
4038   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4039               "STARTING SERVICE (rps) for peer [%s]\n",
4040               GNUNET_i2s (&own_identity));
4041 #ifdef ENABLE_MALICIOUS
4042   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4043               "Malicious execution compiled in.\n");
4044 #endif /* ENABLE_MALICIOUS */
4045
4046   /* Get time interval from the configuration */
4047   if (GNUNET_OK !=
4048       GNUNET_CONFIGURATION_get_value_time (cfg,
4049                                            "RPS",
4050                                            "ROUNDINTERVAL",
4051                                            &round_interval))
4052   {
4053     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4054                                "RPS", "ROUNDINTERVAL");
4055     GNUNET_SCHEDULER_shutdown ();
4056     return;
4057   }
4058
4059   /* Get initial size of sampler/view from the configuration */
4060   if (GNUNET_OK !=
4061       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4062         (long long unsigned int *) &sampler_size_est_min))
4063   {
4064     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4065                                "RPS", "MINSIZE");
4066     GNUNET_SCHEDULER_shutdown ();
4067     return;
4068   }
4069   sampler_size_est_need = sampler_size_est_min;
4070   view_size_est_min = sampler_size_est_min;
4071   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4072
4073   if (GNUNET_OK !=
4074       GNUNET_CONFIGURATION_get_value_filename (cfg,
4075                                                "rps",
4076                                                "FILENAME_VALID_PEERS",
4077                                                &fn_valid_peers))
4078   {
4079     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4080                                "rps", "FILENAME_VALID_PEERS");
4081   }
4082
4083
4084   View_create (view_size_est_min);
4085
4086   /* file_name_view_log */
4087   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4088   #ifdef TO_FILE
4089   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4090   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4091   #endif /* TO_FILE */
4092
4093   /* connect to NSE */
4094   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4095
4096
4097   alpha = 0.45;
4098   beta  = 0.45;
4099
4100
4101   /* Initialise cadet */
4102   /* There exists a copy-paste-clone in get_channel() */
4103   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4104     GNUNET_MQ_hd_fixed_size (peer_check,
4105                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4106                              struct GNUNET_MessageHeader,
4107                              NULL),
4108     GNUNET_MQ_hd_fixed_size (peer_push,
4109                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4110                              struct GNUNET_MessageHeader,
4111                              NULL),
4112     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4113                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4114                              struct GNUNET_MessageHeader,
4115                              NULL),
4116     GNUNET_MQ_hd_var_size (peer_pull_reply,
4117                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4118                            struct GNUNET_RPS_P2P_PullReplyMessage,
4119                            NULL),
4120     GNUNET_MQ_handler_end ()
4121   };
4122
4123   cadet_handle = GNUNET_CADET_connect (cfg);
4124   GNUNET_assert (NULL != cadet_handle);
4125   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4126                       strlen (GNUNET_APPLICATION_PORT_RPS),
4127                       &port);
4128   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4129                                        &port,
4130                                        &handle_inbound_channel, /* Connect handler */
4131                                        NULL, /* cls */
4132                                        NULL, /* WindowSize handler */
4133                                        &cleanup_destroyed_channel, /* Disconnect handler */
4134                                        cadet_handlers);
4135   if (NULL == cadet_port)
4136   {
4137     LOG (GNUNET_ERROR_TYPE_ERROR,
4138         "Cadet port `%s' is already in use.\n",
4139         GNUNET_APPLICATION_PORT_RPS);
4140     GNUNET_assert (0);
4141   }
4142
4143
4144   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4145   initialise_peers (fn_valid_peers, cadet_handle);
4146   GNUNET_free (fn_valid_peers);
4147
4148   /* Initialise sampler */
4149   struct GNUNET_TIME_Relative half_round_interval;
4150   struct GNUNET_TIME_Relative  max_round_interval;
4151
4152   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4153   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4154
4155   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4156   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4157
4158   /* Initialise push and pull maps */
4159   push_map = CustomPeerMap_create (4);
4160   pull_map = CustomPeerMap_create (4);
4161
4162
4163   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4164   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4165   // TODO send push/pull to each of those peers?
4166   // TODO read stored valid peers from last run
4167   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4168   get_valid_peers (valid_peers_iterator, NULL);
4169
4170   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4171                                                    GNUNET_NO,
4172                                                    process_peerinfo_peers,
4173                                                    NULL);
4174
4175   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4176
4177   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4178   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4179
4180   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4181   stats = GNUNET_STATISTICS_create ("rps", cfg);
4182
4183 }
4184
4185
4186 /**
4187  * Define "main" method using service macro.
4188  */
4189 GNUNET_SERVICE_MAIN
4190 ("rps",
4191  GNUNET_SERVICE_OPTION_NONE,
4192  &run,
4193  &client_connect_cb,
4194  &client_disconnect_cb,
4195  NULL,
4196  GNUNET_MQ_hd_fixed_size (client_request,
4197    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4198    struct GNUNET_RPS_CS_RequestMessage,
4199    NULL),
4200  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4201    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4202    struct GNUNET_RPS_CS_RequestCancelMessage,
4203    NULL),
4204  GNUNET_MQ_hd_var_size (client_seed,
4205    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4206    struct GNUNET_RPS_CS_SeedMessage,
4207    NULL),
4208 #ifdef ENABLE_MALICIOUS
4209  GNUNET_MQ_hd_var_size (client_act_malicious,
4210    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4211    struct GNUNET_RPS_CS_ActMaliciousMessage,
4212    NULL),
4213 #endif /* ENABLE_MALICIOUS */
4214  GNUNET_MQ_hd_fixed_size (client_view_request,
4215    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4216    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4217    NULL),
4218  GNUNET_MQ_handler_end());
4219
4220 /* end of gnunet-service-rps.c */