paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO store peers somewhere persistent
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /**
57  * Our configuration.
58  */
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62  * Handle to the statistics service.
63  */
64 static struct GNUNET_STATISTICS_Handle *stats;
65
66 /**
67  * Our own identity.
68  */
69 static struct GNUNET_PeerIdentity own_identity;
70
71
72
73 /***********************************************************************
74  * Old gnunet-service-rps_peers.c
75 ***********************************************************************/
76
77 /**
78  * Set a peer flag of given peer context.
79  */
80 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
81
82 /**
83  * Get peer flag of given peer context.
84  */
85 #define check_peer_flag_set(peer_ctx, mask)\
86   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
87
88 /**
89  * Unset flag of given peer context.
90  */
91 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
92
93 /**
94  * Set a channel flag of given channel context.
95  */
96 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
97
98 /**
99  * Get channel flag of given channel context.
100  */
101 #define check_channel_flag_set(channel_flags, mask)\
102   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
103
104 /**
105  * Unset flag of given channel context.
106  */
107 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
108
109
110
111 /**
112  * Pending operation on peer consisting of callback and closure
113  *
114  * When an operation cannot be executed right now this struct is used to store
115  * the callback and closure for later execution.
116  */
117 struct PeerPendingOp
118 {
119   /**
120    * Callback
121    */
122   PeerOp op;
123
124   /**
125    * Closure
126    */
127   void *op_cls;
128 };
129
130 /**
131  * List containing all messages that are yet to be send
132  *
133  * This is used to keep track of all messages that have not been sent yet. When
134  * a peer is to be removed the pending messages can be removed properly.
135  */
136 struct PendingMessage
137 {
138   /**
139    * DLL next, prev
140    */
141   struct PendingMessage *next;
142   struct PendingMessage *prev;
143
144   /**
145    * The envelope to the corresponding message
146    */
147   struct GNUNET_MQ_Envelope *ev;
148
149   /**
150    * The corresponding context
151    */
152   struct PeerContext *peer_ctx;
153
154   /**
155    * The message type
156    */
157   const char *type;
158 };
159
160 /**
161  * Struct used to keep track of other peer's status
162  *
163  * This is stored in a multipeermap.
164  * It contains information such as cadet channels, a message queue for sending,
165  * status about the channels, the pending operations on this peer and some flags
166  * about the status of the peer itself. (live, valid, ...)
167  */
168 struct PeerContext
169 {
170   /**
171    * Message queue open to client
172    */
173   struct GNUNET_MQ_Handle *mq;
174
175   /**
176    * Channel open to client.
177    */
178   struct GNUNET_CADET_Channel *send_channel;
179
180   /**
181    * Flags to the sending channel
182    */
183   uint32_t *send_channel_flags;
184
185   /**
186    * Channel open from client.
187    */
188   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
189
190   /**
191    * Flags to the receiving channel
192    */
193   uint32_t *recv_channel_flags;
194
195   /**
196    * Array of pending operations on this peer.
197    */
198   struct PeerPendingOp *pending_ops;
199
200   /**
201    * Handle to the callback given to cadet_ntfy_tmt_rdy()
202    *
203    * To be canceled on shutdown.
204    */
205   struct PendingMessage *liveliness_check_pending;
206
207   /**
208    * Number of pending operations.
209    */
210   unsigned int num_pending_ops;
211
212   /**
213    * Identity of the peer
214    */
215   struct GNUNET_PeerIdentity peer_id;
216
217   /**
218    * Flags indicating status of peer
219    */
220   uint32_t peer_flags;
221
222   /**
223    * Last time we received something from that peer.
224    */
225   struct GNUNET_TIME_Absolute last_message_recv;
226
227   /**
228    * Last time we received a keepalive message.
229    */
230   struct GNUNET_TIME_Absolute last_keepalive;
231
232   /**
233    * DLL with all messages that are yet to be sent
234    */
235   struct PendingMessage *pending_messages_head;
236   struct PendingMessage *pending_messages_tail;
237
238   /**
239    * This is pobably followed by 'statistical' data (when we first saw
240    * him, how did we get his ID, how many pushes (in a timeinterval),
241    * ...)
242    */
243 };
244
245 /**
246  * @brief Closure to #valid_peer_iterator
247  */
248 struct PeersIteratorCls
249 {
250   /**
251    * Iterator function
252    */
253   PeersIterator iterator;
254
255   /**
256    * Closure to iterator
257    */
258   void *cls;
259 };
260
261 /**
262  * @brief Hashmap of valid peers.
263  */
264 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
265
266 /**
267  * @brief Maximum number of valid peers to keep.
268  * TODO read from config
269  */
270 static uint32_t num_valid_peers_max = UINT32_MAX;
271
272 /**
273  * @brief Filename of the file that stores the valid peers persistently.
274  */
275 static char *filename_valid_peers;
276
277 /**
278  * Set of all peers to keep track of them.
279  */
280 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
281
282 /**
283  * Cadet handle.
284  */
285 static struct GNUNET_CADET_Handle *cadet_handle;
286
287
288
289 /**
290  * @brief Get the #PeerContext associated with a peer
291  *
292  * @param peer the peer id
293  *
294  * @return the #PeerContext
295  */
296 static struct PeerContext *
297 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
298 {
299   struct PeerContext *ctx;
300   int ret;
301
302   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
303   GNUNET_assert (GNUNET_YES == ret);
304   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
305   GNUNET_assert (NULL != ctx);
306   return ctx;
307 }
308
309 int
310 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
311
312 /**
313  * @brief Create a new #PeerContext and insert it into the peer map
314  *
315  * @param peer the peer to create the #PeerContext for
316  *
317  * @return the #PeerContext
318  */
319 static struct PeerContext *
320 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
321 {
322   struct PeerContext *ctx;
323   int ret;
324
325   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
326
327   ctx = GNUNET_new (struct PeerContext);
328   ctx->peer_id = *peer;
329   ctx->send_channel_flags = GNUNET_new (uint32_t);
330   ctx->recv_channel_flags = GNUNET_new (uint32_t);
331   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
332       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
333   GNUNET_assert (GNUNET_OK == ret);
334   return ctx;
335 }
336
337
338 /**
339  * @brief Create or get a #PeerContext
340  *
341  * @param peer the peer to get the associated context to
342  *
343  * @return the context
344  */
345 static struct PeerContext *
346 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
347 {
348   if (GNUNET_NO == Peers_check_peer_known (peer))
349   {
350     return create_peer_ctx (peer);
351   }
352   return get_peer_ctx (peer);
353 }
354
355 void
356 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
357
358 void
359 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
360
361 /**
362  * @brief Check whether we have a connection to this @a peer
363  *
364  * Also sets the #Peers_ONLINE flag accordingly
365  *
366  * @param peer the peer in question
367  *
368  * @return #GNUNET_YES if we are connected
369  *         #GNUNET_NO  otherwise
370  */
371 int
372 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
373 {
374   const struct PeerContext *peer_ctx;
375
376   /* If we don't know about this peer we don't know whether it's online */
377   if (GNUNET_NO == Peers_check_peer_known (peer))
378   {
379     return GNUNET_NO;
380   }
381   /* Get the context */
382   peer_ctx = get_peer_ctx (peer);
383   /* If we have no channel to this peer we don't know whether it's online */
384   if ( (NULL == peer_ctx->send_channel) &&
385        (NULL == peer_ctx->recv_channel) )
386   {
387     Peers_unset_peer_flag (peer, Peers_ONLINE);
388     return GNUNET_NO;
389   }
390   /* Otherwise (if we have a channel, we know that it's online */
391   Peers_set_peer_flag (peer, Peers_ONLINE);
392   return GNUNET_YES;
393 }
394
395
396 /**
397  * @brief The closure to #get_rand_peer_iterator.
398  */
399 struct GetRandPeerIteratorCls
400 {
401   /**
402    * @brief The index of the peer to return.
403    * Will be decreased until 0.
404    * Then current peer is returned.
405    */
406   uint32_t index;
407
408   /**
409    * @brief Pointer to peer to return.
410    */
411   const struct GNUNET_PeerIdentity *peer;
412 };
413
414
415 /**
416  * @brief Iterator function for #get_random_peer_from_peermap.
417  *
418  * Implements #GNUNET_CONTAINER_PeerMapIterator.
419  * Decreases the index until the index is null.
420  * Then returns the current peer.
421  *
422  * @param cls the #GetRandPeerIteratorCls containing index and peer
423  * @param peer current peer
424  * @param value unused
425  *
426  * @return  #GNUNET_YES if we should continue to
427  *          iterate,
428  *          #GNUNET_NO if not.
429  */
430 static int
431 get_rand_peer_iterator (void *cls,
432                         const struct GNUNET_PeerIdentity *peer,
433                         void *value)
434 {
435   struct GetRandPeerIteratorCls *iterator_cls = cls;
436   if (0 >= iterator_cls->index)
437   {
438     iterator_cls->peer = peer;
439     return GNUNET_NO;
440   }
441   iterator_cls->index--;
442   return GNUNET_YES;
443 }
444
445
446 /**
447  * @brief Get a random peer from @a peer_map
448  *
449  * @param peer_map the peer_map to get the peer from
450  *
451  * @return a random peer
452  */
453 static const struct GNUNET_PeerIdentity *
454 get_random_peer_from_peermap (const struct
455                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
456 {
457   struct GetRandPeerIteratorCls *iterator_cls;
458   const struct GNUNET_PeerIdentity *ret;
459
460   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
461   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
462       GNUNET_CONTAINER_multipeermap_size (peer_map));
463   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
464                                                 get_rand_peer_iterator,
465                                                 iterator_cls);
466   ret = iterator_cls->peer;
467   GNUNET_free (iterator_cls);
468   return ret;
469 }
470
471
472 /**
473  * @brief Add a given @a peer to valid peers.
474  *
475  * If valid peers are already #num_valid_peers_max, delete a peer previously.
476  *
477  * @param peer the peer that is added to the valid peers.
478  *
479  * @return #GNUNET_YES if no other peer had to be removed
480  *         #GNUNET_NO  otherwise
481  */
482 static int
483 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
484 {
485   const struct GNUNET_PeerIdentity *rand_peer;
486   int ret;
487
488   ret = GNUNET_YES;
489   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
490   {
491     rand_peer = get_random_peer_from_peermap (valid_peers);
492     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
493     ret = GNUNET_NO;
494   }
495   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
496       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
497   return ret;
498 }
499
500
501 /**
502  * @brief Set the peer flag to living and
503  *        call the pending operations on this peer.
504  *
505  * Also adds peer to #valid_peers.
506  *
507  * @param peer_ctx the #PeerContext of the peer to set live
508  */
509 static void
510 set_peer_live (struct PeerContext *peer_ctx)
511 {
512   struct GNUNET_PeerIdentity *peer;
513   unsigned int i;
514
515   peer = &peer_ctx->peer_id;
516   LOG (GNUNET_ERROR_TYPE_DEBUG,
517       "Peer %s is live and valid, calling %i pending operations on it\n",
518       GNUNET_i2s (peer),
519       peer_ctx->num_pending_ops);
520
521   if (NULL != peer_ctx->liveliness_check_pending)
522   {
523     LOG (GNUNET_ERROR_TYPE_DEBUG,
524          "Removing pending liveliness check for peer %s\n",
525          GNUNET_i2s (&peer_ctx->peer_id));
526     // TODO wait until cadet sets mq->cancel_impl
527     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
528     GNUNET_free (peer_ctx->liveliness_check_pending);
529     peer_ctx->liveliness_check_pending = NULL;
530   }
531
532   (void) add_valid_peer (peer);
533   set_peer_flag (peer_ctx, Peers_ONLINE);
534
535   /* Call pending operations */
536   for (i = 0; i < peer_ctx->num_pending_ops; i++)
537   {
538     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
539   }
540   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
541 }
542
543 static void
544 cleanup_destroyed_channel (void *cls,
545                            const struct GNUNET_CADET_Channel *channel);
546
547 /* Declaration of handlers */
548 static void
549 handle_peer_check (void *cls,
550                    const struct GNUNET_MessageHeader *msg);
551
552 static void
553 handle_peer_push (void *cls,
554                   const struct GNUNET_MessageHeader *msg);
555
556 static void
557 handle_peer_pull_request (void *cls,
558                           const struct GNUNET_MessageHeader *msg);
559
560 static int
561 check_peer_pull_reply (void *cls,
562                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
563
564 static void
565 handle_peer_pull_reply (void *cls,
566                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
567
568 /* End declaration of handlers */
569
570
571 /**
572  * @brief Get the channel of a peer. If not existing, create.
573  *
574  * @param peer the peer id
575  * @return the #GNUNET_CADET_Channel used to send data to @a peer
576  */
577 struct GNUNET_CADET_Channel *
578 get_channel (const struct GNUNET_PeerIdentity *peer)
579 {
580   struct PeerContext *peer_ctx;
581   struct GNUNET_HashCode port;
582   struct GNUNET_PeerIdentity *ctx_peer;
583   /* There exists a copy-paste-clone in run() */
584   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
585     GNUNET_MQ_hd_fixed_size (peer_check,
586                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
587                              struct GNUNET_MessageHeader,
588                              NULL),
589     GNUNET_MQ_hd_fixed_size (peer_push,
590                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
591                              struct GNUNET_MessageHeader,
592                              NULL),
593     GNUNET_MQ_hd_fixed_size (peer_pull_request,
594                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
595                              struct GNUNET_MessageHeader,
596                              NULL),
597     GNUNET_MQ_hd_var_size (peer_pull_reply,
598                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
599                            struct GNUNET_RPS_P2P_PullReplyMessage,
600                            NULL),
601     GNUNET_MQ_handler_end ()
602   };
603
604
605   peer_ctx = get_peer_ctx (peer);
606   if (NULL == peer_ctx->send_channel)
607   {
608     LOG (GNUNET_ERROR_TYPE_DEBUG,
609          "Trying to establish channel to peer %s\n",
610          GNUNET_i2s (peer));
611     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
612                         strlen (GNUNET_APPLICATION_PORT_RPS),
613                         &port);
614     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
615     *ctx_peer = *peer;
616     peer_ctx->send_channel =
617       GNUNET_CADET_channel_create (cadet_handle,
618                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
619                                    peer,
620                                    &port,
621                                    GNUNET_CADET_OPTION_RELIABLE,
622                                    NULL, /* WindowSize handler */
623                                    cleanup_destroyed_channel, /* Disconnect handler */
624                                    cadet_handlers);
625   }
626   GNUNET_assert (NULL != peer_ctx->send_channel);
627   return peer_ctx->send_channel;
628 }
629
630
631 /**
632  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
633  *
634  * If we already have a message queue open to this client,
635  * simply return it, otherways create one.
636  *
637  * @param peer the peer to get the mq to
638  * @return the #GNUNET_MQ_Handle
639  */
640 static struct GNUNET_MQ_Handle *
641 get_mq (const struct GNUNET_PeerIdentity *peer)
642 {
643   struct PeerContext *peer_ctx;
644
645   peer_ctx = get_peer_ctx (peer);
646
647   if (NULL == peer_ctx->mq)
648   {
649     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
650   }
651   return peer_ctx->mq;
652 }
653
654
655 /**
656  * @brief This is called in response to the first message we sent as a
657  * liveliness check.
658  *
659  * @param cls #PeerContext of peer with pending liveliness check
660  */
661 static void
662 mq_liveliness_check_successful (void *cls)
663 {
664   struct PeerContext *peer_ctx = cls;
665
666   if (NULL != peer_ctx->liveliness_check_pending)
667   {
668     LOG (GNUNET_ERROR_TYPE_DEBUG,
669         "Liveliness check for peer %s was successfull\n",
670         GNUNET_i2s (&peer_ctx->peer_id));
671     GNUNET_free (peer_ctx->liveliness_check_pending);
672     peer_ctx->liveliness_check_pending = NULL;
673     set_peer_live (peer_ctx);
674   }
675 }
676
677 /**
678  * Issue a check whether peer is live
679  *
680  * @param peer_ctx the context of the peer
681  */
682 static void
683 check_peer_live (struct PeerContext *peer_ctx)
684 {
685   LOG (GNUNET_ERROR_TYPE_DEBUG,
686        "Get informed about peer %s getting live\n",
687        GNUNET_i2s (&peer_ctx->peer_id));
688
689   struct GNUNET_MQ_Handle *mq;
690   struct GNUNET_MQ_Envelope *ev;
691
692   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
693   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
694   peer_ctx->liveliness_check_pending->ev = ev;
695   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
696   peer_ctx->liveliness_check_pending->type = "Check liveliness";
697   mq = get_mq (&peer_ctx->peer_id);
698   GNUNET_MQ_notify_sent (ev,
699                          mq_liveliness_check_successful,
700                          peer_ctx);
701   GNUNET_MQ_send (mq, ev);
702 }
703
704 /**
705  * @brief Add an envelope to a message passed to mq to list of pending messages
706  *
707  * @param peer peer the message was sent to
708  * @param ev envelope to the message
709  * @param type type of the message to be sent
710  * @return pointer to pending message
711  */
712 static struct PendingMessage *
713 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
714                         struct GNUNET_MQ_Envelope *ev,
715                         const char *type)
716 {
717   struct PendingMessage *pending_msg;
718   struct PeerContext *peer_ctx;
719
720   peer_ctx = get_peer_ctx (peer);
721   pending_msg = GNUNET_new (struct PendingMessage);
722   pending_msg->ev = ev;
723   pending_msg->peer_ctx = peer_ctx;
724   pending_msg->type = type;
725   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
726                                peer_ctx->pending_messages_tail,
727                                pending_msg);
728   return pending_msg;
729 }
730
731
732 /**
733  * @brief Remove a pending message from the respective DLL
734  *
735  * @param pending_msg the pending message to remove
736  * @param cancel cancel the pending message, too
737  */
738 static void
739 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
740 {
741   struct PeerContext *peer_ctx;
742
743   peer_ctx = pending_msg->peer_ctx;
744   GNUNET_assert (NULL != peer_ctx);
745   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
746                                peer_ctx->pending_messages_tail,
747                                pending_msg);
748   // TODO wait for the cadet implementation of message cancellation
749   //if (GNUNET_YES == cancel)
750   //{
751   //  GNUNET_MQ_send_cancel (pending_msg->ev);
752   //}
753   GNUNET_free (pending_msg);
754 }
755
756
757 /**
758  * @brief Check whether function of type #PeerOp was already scheduled
759  *
760  * The array with pending operations will probably never grow really big, so
761  * iterating over it should be ok.
762  *
763  * @param peer the peer to check
764  * @param peer_op the operation (#PeerOp) on the peer
765  *
766  * @return #GNUNET_YES if this operation is scheduled on that peer
767  *         #GNUNET_NO  otherwise
768  */
769 static int
770 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
771                            const PeerOp peer_op)
772 {
773   const struct PeerContext *peer_ctx;
774   unsigned int i;
775
776   peer_ctx = get_peer_ctx (peer);
777   for (i = 0; i < peer_ctx->num_pending_ops; i++)
778     if (peer_op == peer_ctx->pending_ops[i].op)
779       return GNUNET_YES;
780   return GNUNET_NO;
781 }
782
783 int
784 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
785
786 /**
787  * Iterator over hash map entries. Deletes all contexts of peers.
788  *
789  * @param cls closure
790  * @param key current public key
791  * @param value value in the hash map
792  * @return #GNUNET_YES if we should continue to iterate,
793  *         #GNUNET_NO if not.
794  */
795 static int
796 peermap_clear_iterator (void *cls,
797                         const struct GNUNET_PeerIdentity *key,
798                         void *value)
799 {
800   Peers_remove_peer (key);
801   return GNUNET_YES;
802 }
803
804
805 /**
806  * @brief This is called once a message is sent.
807  *
808  * Removes the pending message
809  *
810  * @param cls type of the message that was sent
811  */
812 static void
813 mq_notify_sent_cb (void *cls)
814 {
815   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817       "%s was sent.\n",
818       pending_msg->type);
819   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
820     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
821   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
822     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
823   if (0 == strncmp ("PUSH", pending_msg->type, 4))
824     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
825   /* Do not cancle message */
826   remove_pending_message (pending_msg, GNUNET_NO);
827 }
828
829
830 /**
831  * @brief Iterator function for #store_valid_peers.
832  *
833  * Implements #GNUNET_CONTAINER_PeerMapIterator.
834  * Writes single peer to disk.
835  *
836  * @param cls the file handle to write to.
837  * @param peer current peer
838  * @param value unused
839  *
840  * @return  #GNUNET_YES if we should continue to
841  *          iterate,
842  *          #GNUNET_NO if not.
843  */
844 static int
845 store_peer_presistently_iterator (void *cls,
846                                   const struct GNUNET_PeerIdentity *peer,
847                                   void *value)
848 {
849   const struct GNUNET_DISK_FileHandle *fh = cls;
850   char peer_string[128];
851   int size;
852   ssize_t ret;
853
854   if (NULL == peer)
855   {
856     return GNUNET_YES;
857   }
858   size = GNUNET_snprintf (peer_string,
859                           sizeof (peer_string),
860                           "%s\n",
861                           GNUNET_i2s_full (peer));
862   GNUNET_assert (53 == size);
863   ret = GNUNET_DISK_file_write (fh,
864                                 peer_string,
865                                 size);
866   GNUNET_assert (size == ret);
867   return GNUNET_YES;
868 }
869
870
871 /**
872  * @brief Store the peers currently in #valid_peers to disk.
873  */
874 static void
875 store_valid_peers ()
876 {
877   struct GNUNET_DISK_FileHandle *fh;
878   uint32_t number_written_peers;
879   int ret;
880
881   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
882   {
883     return;
884   }
885
886   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
887   if (GNUNET_SYSERR == ret)
888   {
889     LOG (GNUNET_ERROR_TYPE_WARNING,
890         "Not able to create directory for file `%s'\n",
891         filename_valid_peers);
892     GNUNET_break (0);
893   }
894   else if (GNUNET_NO == ret)
895   {
896     LOG (GNUNET_ERROR_TYPE_WARNING,
897         "Directory for file `%s' exists but is not writable for us\n",
898         filename_valid_peers);
899     GNUNET_break (0);
900   }
901   fh = GNUNET_DISK_file_open (filename_valid_peers,
902                               GNUNET_DISK_OPEN_WRITE |
903                                   GNUNET_DISK_OPEN_CREATE,
904                               GNUNET_DISK_PERM_USER_READ |
905                                   GNUNET_DISK_PERM_USER_WRITE);
906   if (NULL == fh)
907   {
908     LOG (GNUNET_ERROR_TYPE_WARNING,
909         "Not able to write valid peers to file `%s'\n",
910         filename_valid_peers);
911     return;
912   }
913   LOG (GNUNET_ERROR_TYPE_DEBUG,
914       "Writing %u valid peers to disk\n",
915       GNUNET_CONTAINER_multipeermap_size (valid_peers));
916   number_written_peers =
917     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
918                                            store_peer_presistently_iterator,
919                                            fh);
920   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
921   GNUNET_assert (number_written_peers ==
922       GNUNET_CONTAINER_multipeermap_size (valid_peers));
923 }
924
925
926 /**
927  * @brief Convert string representation of peer id to peer id.
928  *
929  * Counterpart to #GNUNET_i2s_full.
930  *
931  * @param string_repr The string representation of the peer id
932  *
933  * @return The peer id
934  */
935 static const struct GNUNET_PeerIdentity *
936 s2i_full (const char *string_repr)
937 {
938   struct GNUNET_PeerIdentity *peer;
939   size_t len;
940   int ret;
941
942   peer = GNUNET_new (struct GNUNET_PeerIdentity);
943   len = strlen (string_repr);
944   if (52 > len)
945   {
946     LOG (GNUNET_ERROR_TYPE_WARNING,
947         "Not able to convert string representation of PeerID to PeerID\n"
948         "Sting representation: %s (len %lu) - too short\n",
949         string_repr,
950         len);
951     GNUNET_break (0);
952   }
953   else if (52 < len)
954   {
955     len = 52;
956   }
957   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
958                                                     len,
959                                                     &peer->public_key);
960   if (GNUNET_OK != ret)
961   {
962     LOG (GNUNET_ERROR_TYPE_WARNING,
963         "Not able to convert string representation of PeerID to PeerID\n"
964         "Sting representation: %s\n",
965         string_repr);
966     GNUNET_break (0);
967   }
968   return peer;
969 }
970
971
972 /**
973  * @brief Restore the peers on disk to #valid_peers.
974  */
975 static void
976 restore_valid_peers ()
977 {
978   off_t file_size;
979   uint32_t num_peers;
980   struct GNUNET_DISK_FileHandle *fh;
981   char *buf;
982   ssize_t size_read;
983   char *iter_buf;
984   char *str_repr;
985   const struct GNUNET_PeerIdentity *peer;
986
987   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
988   {
989     return;
990   }
991
992   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
993   {
994     return;
995   }
996   fh = GNUNET_DISK_file_open (filename_valid_peers,
997                               GNUNET_DISK_OPEN_READ,
998                               GNUNET_DISK_PERM_NONE);
999   GNUNET_assert (NULL != fh);
1000   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1001   num_peers = file_size / 53;
1002   buf = GNUNET_malloc (file_size);
1003   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1004   GNUNET_assert (size_read == file_size);
1005   LOG (GNUNET_ERROR_TYPE_DEBUG,
1006       "Restoring %" PRIu32 " peers from file `%s'\n",
1007       num_peers,
1008       filename_valid_peers);
1009   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1010   {
1011     str_repr = GNUNET_strndup (iter_buf, 53);
1012     peer = s2i_full (str_repr);
1013     GNUNET_free (str_repr);
1014     add_valid_peer (peer);
1015     LOG (GNUNET_ERROR_TYPE_DEBUG,
1016         "Restored valid peer %s from disk\n",
1017         GNUNET_i2s_full (peer));
1018   }
1019   iter_buf = NULL;
1020   GNUNET_free (buf);
1021   LOG (GNUNET_ERROR_TYPE_DEBUG,
1022       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1023       num_peers,
1024       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1025   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1026   {
1027     LOG (GNUNET_ERROR_TYPE_WARNING,
1028         "Number of restored peers does not match file size. Have probably duplicates.\n");
1029   }
1030   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1031   LOG (GNUNET_ERROR_TYPE_DEBUG,
1032       "Restored %u valid peers from disk\n",
1033       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1034 }
1035
1036
1037 /**
1038  * @brief Initialise storage of peers
1039  *
1040  * @param fn_valid_peers filename of the file used to store valid peer ids
1041  * @param cadet_h cadet handle
1042  * @param own_id own peer identity
1043  */
1044 void
1045 Peers_initialise (char* fn_valid_peers,
1046                   struct GNUNET_CADET_Handle *cadet_h,
1047                   const struct GNUNET_PeerIdentity *own_id)
1048 {
1049   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1050   cadet_handle = cadet_h;
1051   own_identity = *own_id;
1052   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1053   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1054   restore_valid_peers ();
1055 }
1056
1057
1058 /**
1059  * @brief Delete storage of peers that was created with #Peers_initialise ()
1060  */
1061 void
1062 Peers_terminate ()
1063 {
1064   if (GNUNET_SYSERR ==
1065       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1066                                              peermap_clear_iterator,
1067                                              NULL))
1068   {
1069     LOG (GNUNET_ERROR_TYPE_WARNING,
1070         "Iteration destroying peers was aborted.\n");
1071   }
1072   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1073   peer_map = NULL;
1074   store_valid_peers ();
1075   GNUNET_free (filename_valid_peers);
1076   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1077 }
1078
1079
1080 /**
1081  * Iterator over #valid_peers hash map entries.
1082  *
1083  * @param cls closure - unused
1084  * @param peer current peer id
1085  * @param value value in the hash map - unused
1086  * @return #GNUNET_YES if we should continue to
1087  *         iterate,
1088  *         #GNUNET_NO if not.
1089  */
1090 static int
1091 valid_peer_iterator (void *cls,
1092                      const struct GNUNET_PeerIdentity *peer,
1093                      void *value)
1094 {
1095   struct PeersIteratorCls *it_cls = cls;
1096
1097   return it_cls->iterator (it_cls->cls,
1098                            peer);
1099 }
1100
1101
1102 /**
1103  * @brief Get all currently known, valid peer ids.
1104  *
1105  * @param it function to call on each peer id
1106  * @param it_cls extra argument to @a it
1107  * @return the number of key value pairs processed,
1108  *         #GNUNET_SYSERR if it aborted iteration
1109  */
1110 int
1111 Peers_get_valid_peers (PeersIterator iterator,
1112                        void *it_cls)
1113 {
1114   struct PeersIteratorCls *cls;
1115   int ret;
1116
1117   cls = GNUNET_new (struct PeersIteratorCls);
1118   cls->iterator = iterator;
1119   cls->cls = it_cls;
1120   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1121                                                valid_peer_iterator,
1122                                                cls);
1123   GNUNET_free (cls);
1124   return ret;
1125 }
1126
1127
1128 /**
1129  * @brief Add peer to known peers.
1130  *
1131  * This function is called on new peer_ids from 'external' sources
1132  * (client seed, cadet get_peers(), ...)
1133  *
1134  * @param peer the new #GNUNET_PeerIdentity
1135  *
1136  * @return #GNUNET_YES if peer was inserted
1137  *         #GNUNET_NO  otherwise (if peer was already known or
1138  *                     peer was #own_identity)
1139  */
1140 int
1141 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1142 {
1143   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1144        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1145   {
1146     return GNUNET_NO; /* We already know this peer - nothing to do */
1147   }
1148   (void) create_peer_ctx (peer);
1149   return GNUNET_YES;
1150 }
1151
1152 int
1153 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1154
1155 /**
1156  * @brief Try connecting to a peer to see whether it is online
1157  *
1158  * If not known yet, insert into known peers
1159  *
1160  * @param peer the peer whose liveliness is to be checked
1161  * @return #GNUNET_YES if peer had to be inserted
1162  *         #GNUNET_NO  otherwise (if peer was already known or
1163  *                     peer was #own_identity)
1164  */
1165 int
1166 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1167 {
1168   struct PeerContext *peer_ctx;
1169   int ret;
1170
1171   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1172   {
1173     return GNUNET_NO;
1174   }
1175   ret = Peers_insert_peer (peer);
1176   peer_ctx = get_peer_ctx (peer);
1177   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1178   {
1179     check_peer_live (peer_ctx);
1180   }
1181   return ret;
1182 }
1183
1184
1185 /**
1186  * @brief Check if peer is removable.
1187  *
1188  * Check if
1189  *  - a recv channel exists
1190  *  - there are pending messages
1191  *  - there is no pending pull reply
1192  *
1193  * @param peer the peer in question
1194  * @return #GNUNET_YES    if peer is removable
1195  *         #GNUNET_NO     if peer is NOT removable
1196  *         #GNUNET_SYSERR if peer is not known
1197  */
1198 int
1199 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1200 {
1201   struct PeerContext *peer_ctx;
1202
1203   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1204   {
1205     return GNUNET_SYSERR;
1206   }
1207
1208   peer_ctx = get_peer_ctx (peer);
1209   if ( (NULL != peer_ctx->recv_channel) ||
1210        (NULL != peer_ctx->pending_messages_head) ||
1211        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1212   {
1213     return GNUNET_NO;
1214   }
1215   return GNUNET_YES;
1216 }
1217
1218 uint32_t *
1219 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1220                         enum Peers_ChannelRole role);
1221
1222 int
1223 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1224
1225 /**
1226  * @brief Remove peer
1227  *
1228  * @param peer the peer to clean
1229  * @return #GNUNET_YES if peer was removed
1230  *         #GNUNET_NO  otherwise
1231  */
1232 int
1233 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1234 {
1235   struct PeerContext *peer_ctx;
1236   uint32_t *channel_flag;
1237
1238   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1239   {
1240     return GNUNET_NO;
1241   }
1242
1243   peer_ctx = get_peer_ctx (peer);
1244   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1245   LOG (GNUNET_ERROR_TYPE_DEBUG,
1246        "Going to remove peer %s\n",
1247        GNUNET_i2s (&peer_ctx->peer_id));
1248   Peers_unset_peer_flag (peer, Peers_ONLINE);
1249
1250   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1251   while (NULL != peer_ctx->pending_messages_head)
1252   {
1253     LOG (GNUNET_ERROR_TYPE_DEBUG,
1254         "Removing unsent %s\n",
1255         peer_ctx->pending_messages_head->type);
1256     /* Cancle pending message, too */
1257     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1258   }
1259   /* If we are still waiting for notification whether this peer is live
1260    * cancel the according task */
1261   if (NULL != peer_ctx->liveliness_check_pending)
1262   {
1263     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264          "Removing pending liveliness check for peer %s\n",
1265          GNUNET_i2s (&peer_ctx->peer_id));
1266     // TODO wait until cadet sets mq->cancel_impl
1267     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1268     GNUNET_free (peer_ctx->liveliness_check_pending);
1269     peer_ctx->liveliness_check_pending = NULL;
1270   }
1271   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1272   if (NULL != peer_ctx->send_channel &&
1273       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1274   {
1275     LOG (GNUNET_ERROR_TYPE_DEBUG,
1276         "Destroying send channel\n");
1277     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1278     peer_ctx->send_channel = NULL;
1279     peer_ctx->mq = NULL;
1280   }
1281   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1282   if (NULL != peer_ctx->recv_channel &&
1283       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1284   {
1285     LOG (GNUNET_ERROR_TYPE_DEBUG,
1286         "Destroying recv channel\n");
1287     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1288     peer_ctx->recv_channel = NULL;
1289   }
1290
1291   GNUNET_free (peer_ctx->send_channel_flags);
1292   GNUNET_free (peer_ctx->recv_channel_flags);
1293
1294   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1295   {
1296     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1297   }
1298   GNUNET_free (peer_ctx);
1299   return GNUNET_YES;
1300 }
1301
1302
1303 /**
1304  * @brief set flags on a given peer.
1305  *
1306  * @param peer the peer to set flags on
1307  * @param flags the flags
1308  */
1309 void
1310 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1311 {
1312   struct PeerContext *peer_ctx;
1313
1314   peer_ctx = get_peer_ctx (peer);
1315   set_peer_flag (peer_ctx, flags);
1316 }
1317
1318
1319 /**
1320  * @brief unset flags on a given peer.
1321  *
1322  * @param peer the peer to unset flags on
1323  * @param flags the flags
1324  */
1325 void
1326 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1327 {
1328   struct PeerContext *peer_ctx;
1329
1330   peer_ctx = get_peer_ctx (peer);
1331   unset_peer_flag (peer_ctx, flags);
1332 }
1333
1334
1335 /**
1336  * @brief Check whether flags on a peer are set.
1337  *
1338  * @param peer the peer to check the flag of
1339  * @param flags the flags to check
1340  *
1341  * @return #GNUNET_SYSERR if peer is not known
1342  *         #GNUNET_YES    if all given flags are set
1343  *         #GNUNET_NO     otherwise
1344  */
1345 int
1346 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1347 {
1348   struct PeerContext *peer_ctx;
1349
1350   if (GNUNET_NO == Peers_check_peer_known (peer))
1351   {
1352     return GNUNET_SYSERR;
1353   }
1354   peer_ctx = get_peer_ctx (peer);
1355   return check_peer_flag_set (peer_ctx, flags);
1356 }
1357
1358
1359 /**
1360  * @brief set flags on a given channel.
1361  *
1362  * @param channel the channel to set flags on
1363  * @param flags the flags
1364  */
1365 void
1366 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1367 {
1368   set_channel_flag (channel_flags, flags);
1369 }
1370
1371
1372 /**
1373  * @brief unset flags on a given channel.
1374  *
1375  * @param channel the channel to unset flags on
1376  * @param flags the flags
1377  */
1378 void
1379 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1380 {
1381   unset_channel_flag (channel_flags, flags);
1382 }
1383
1384
1385 /**
1386  * @brief Check whether flags on a channel are set.
1387  *
1388  * @param channel the channel to check the flag of
1389  * @param flags the flags to check
1390  *
1391  * @return #GNUNET_YES if all given flags are set
1392  *         #GNUNET_NO  otherwise
1393  */
1394 int
1395 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1396 {
1397   return check_channel_flag_set (channel_flags, flags);
1398 }
1399
1400 /**
1401  * @brief Get the flags for the channel in @a role for @a peer.
1402  *
1403  * @param peer Peer to get the channel flags for.
1404  * @param role Role of channel to get flags for
1405  *
1406  * @return The flags.
1407  */
1408 uint32_t *
1409 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1410                         enum Peers_ChannelRole role)
1411 {
1412   const struct PeerContext *peer_ctx;
1413
1414   peer_ctx = get_peer_ctx (peer);
1415   if (Peers_CHANNEL_ROLE_SENDING == role)
1416   {
1417     return peer_ctx->send_channel_flags;
1418   }
1419   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1420   {
1421     return peer_ctx->recv_channel_flags;
1422   }
1423   else
1424   {
1425     GNUNET_assert (0);
1426   }
1427 }
1428
1429 /**
1430  * @brief Check whether we have information about the given peer.
1431  *
1432  * FIXME probably deprecated. Make this the new _online.
1433  *
1434  * @param peer peer in question
1435  *
1436  * @return #GNUNET_YES if peer is known
1437  *         #GNUNET_NO  if peer is not knwon
1438  */
1439 int
1440 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1441 {
1442   if (NULL != peer_map)
1443   {
1444     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1445   } else
1446   {
1447     return GNUNET_NO;
1448   }
1449 }
1450
1451
1452 /**
1453  * @brief Check whether @a peer is actually a peer.
1454  *
1455  * A valid peer is a peer that we know exists eg. we were connected to once.
1456  *
1457  * @param peer peer in question
1458  *
1459  * @return #GNUNET_YES if peer is valid
1460  *         #GNUNET_NO  if peer is not valid
1461  */
1462 int
1463 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1464 {
1465   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1466 }
1467
1468
1469 /**
1470  * @brief Indicate that we want to send to the other peer
1471  *
1472  * This establishes a sending channel
1473  *
1474  * @param peer the peer to establish channel to
1475  */
1476 void
1477 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1478 {
1479   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1480   (void) get_channel (peer);
1481 }
1482
1483
1484 /**
1485  * @brief Check whether other peer has the intention to send/opened channel
1486  *        towars us
1487  *
1488  * @param peer the peer in question
1489  *
1490  * @return #GNUNET_YES if peer has the intention to send
1491  *         #GNUNET_NO  otherwise
1492  */
1493 int
1494 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1495 {
1496   const struct PeerContext *peer_ctx;
1497
1498   peer_ctx = get_peer_ctx (peer);
1499   if (NULL != peer_ctx->recv_channel)
1500   {
1501     return GNUNET_YES;
1502   }
1503   return GNUNET_NO;
1504 }
1505
1506
1507 /**
1508  * Handle the channel a peer opens to us.
1509  *
1510  * @param cls The closure
1511  * @param channel The channel the peer wants to establish
1512  * @param initiator The peer's peer ID
1513  *
1514  * @return initial channel context for the channel
1515  *         (can be NULL -- that's not an error)
1516  */
1517 void *
1518 Peers_handle_inbound_channel (void *cls,
1519                               struct GNUNET_CADET_Channel *channel,
1520                               const struct GNUNET_PeerIdentity *initiator)
1521 {
1522   struct PeerContext *peer_ctx;
1523   struct GNUNET_PeerIdentity *ctx_peer;
1524
1525   LOG (GNUNET_ERROR_TYPE_DEBUG,
1526       "New channel was established to us (Peer %s).\n",
1527       GNUNET_i2s (initiator));
1528   GNUNET_assert (NULL != channel); /* according to cadet API */
1529   /* Make sure we 'know' about this peer */
1530   peer_ctx = create_or_get_peer_ctx (initiator);
1531   set_peer_live (peer_ctx);
1532   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1533   *ctx_peer = *initiator;
1534   /* We only accept one incoming channel per peer */
1535   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1536   {
1537     set_channel_flag (peer_ctx->recv_channel_flags,
1538                       Peers_CHANNEL_ESTABLISHED_TWICE);
1539     //GNUNET_CADET_channel_destroy (channel);
1540     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1541     peer_ctx->recv_channel = channel;
1542     /* return the channel context */
1543     return ctx_peer;
1544   }
1545   peer_ctx->recv_channel = channel;
1546   return ctx_peer;
1547 }
1548
1549
1550 /**
1551  * @brief Check whether a sending channel towards the given peer exists
1552  *
1553  * @param peer the peer to check for
1554  *
1555  * @return #GNUNET_YES if a sending channel towards that peer exists
1556  *         #GNUNET_NO  otherwise
1557  */
1558 int
1559 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1560 {
1561   struct PeerContext *peer_ctx;
1562
1563   if (GNUNET_NO == Peers_check_peer_known (peer))
1564   { /* If no such peer exists, there is no channel */
1565     return GNUNET_NO;
1566   }
1567   peer_ctx = get_peer_ctx (peer);
1568   if (NULL == peer_ctx->send_channel)
1569   {
1570     return GNUNET_NO;
1571   }
1572   return GNUNET_YES;
1573 }
1574
1575
1576 /**
1577  * @brief check whether the given channel is the sending channel of the given
1578  *        peer
1579  *
1580  * @param peer the peer in question
1581  * @param channel the channel to check for
1582  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1583  *                    #Peers_CHANNEL_ROLE_RECEIVING
1584  *
1585  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1586  *         #GNUNET_NO  otherwise
1587  */
1588 int
1589 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1590                           const struct GNUNET_CADET_Channel *channel,
1591                           enum Peers_ChannelRole role)
1592 {
1593   const struct PeerContext *peer_ctx;
1594
1595   if (GNUNET_NO == Peers_check_peer_known (peer))
1596   {
1597     return GNUNET_NO;
1598   }
1599   peer_ctx = get_peer_ctx (peer);
1600   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1601        (channel == peer_ctx->send_channel) )
1602   {
1603     return GNUNET_YES;
1604   }
1605   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1606        (channel == peer_ctx->recv_channel) )
1607   {
1608     return GNUNET_YES;
1609   }
1610   return GNUNET_NO;
1611 }
1612
1613
1614 /**
1615  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1616  *        intention to another peer
1617  *
1618  * If there is also no channel to receive messages from that peer, remove it
1619  * from the peermap.
1620  * TODO really?
1621  *
1622  * @peer the peer identity of the peer whose sending channel to destroy
1623  * @return #GNUNET_YES if channel was destroyed
1624  *         #GNUNET_NO  otherwise
1625  */
1626 int
1627 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1628 {
1629   struct PeerContext *peer_ctx;
1630
1631   if (GNUNET_NO == Peers_check_peer_known (peer))
1632   {
1633     return GNUNET_NO;
1634   }
1635   peer_ctx = get_peer_ctx (peer);
1636   if (NULL != peer_ctx->send_channel)
1637   {
1638     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1639     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1640     peer_ctx->send_channel = NULL;
1641     peer_ctx->mq = NULL;
1642     (void) Peers_check_connected (peer);
1643     return GNUNET_YES;
1644   }
1645   return GNUNET_NO;
1646 }
1647
1648 /**
1649  * This is called when a channel is destroyed.
1650  *
1651  * @param cls The closure
1652  * @param channel The channel being closed
1653  */
1654 void
1655 Peers_cleanup_destroyed_channel (void *cls,
1656                                  const struct GNUNET_CADET_Channel *channel)
1657 {
1658   struct GNUNET_PeerIdentity *peer = cls;
1659   struct PeerContext *peer_ctx;
1660
1661   if (GNUNET_NO == Peers_check_peer_known (peer))
1662   {/* We don't want to implicitly create a context that we're about to kill */
1663   LOG (GNUNET_ERROR_TYPE_DEBUG,
1664        "channel (%s) without associated context was destroyed\n",
1665        GNUNET_i2s (peer));
1666     return;
1667   }
1668   peer_ctx = get_peer_ctx (peer);
1669
1670   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1671    * flag will be set. In this case simply make sure that the channels are
1672    * cleaned. */
1673   /* FIXME This distinction seems to be redundant */
1674   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1675   {/* We initiatad the destruction of this particular peer */
1676     LOG (GNUNET_ERROR_TYPE_DEBUG,
1677         "Peer is in the process of being destroyed\n");
1678     if (channel == peer_ctx->send_channel)
1679     {
1680       peer_ctx->send_channel = NULL;
1681       peer_ctx->mq = NULL;
1682     }
1683     else if (channel == peer_ctx->recv_channel)
1684     {
1685       peer_ctx->recv_channel = NULL;
1686     }
1687
1688     if (NULL != peer_ctx->send_channel)
1689     {
1690       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1691       peer_ctx->send_channel = NULL;
1692       peer_ctx->mq = NULL;
1693     }
1694     if (NULL != peer_ctx->recv_channel)
1695     {
1696       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1697       peer_ctx->recv_channel = NULL;
1698     }
1699     /* Set the #Peers_ONLINE flag accordingly */
1700     (void) Peers_check_connected (peer);
1701     return;
1702   }
1703
1704   else
1705   { /* We did not initiate the destruction of this peer */
1706     LOG (GNUNET_ERROR_TYPE_DEBUG,
1707         "Peer is NOT in the process of being destroyed\n");
1708     if (channel == peer_ctx->send_channel)
1709     { /* Something (but us) killd the channel - clean up peer */
1710       LOG (GNUNET_ERROR_TYPE_DEBUG,
1711           "send channel (%s) was destroyed - cleaning up\n",
1712           GNUNET_i2s (peer));
1713       peer_ctx->send_channel = NULL;
1714       peer_ctx->mq = NULL;
1715     }
1716     else if (channel == peer_ctx->recv_channel)
1717     { /* Other peer doesn't want to send us messages anymore */
1718       LOG (GNUNET_ERROR_TYPE_DEBUG,
1719            "Peer %s destroyed recv channel - cleaning up channel\n",
1720            GNUNET_i2s (peer));
1721       peer_ctx->recv_channel = NULL;
1722     }
1723     else
1724     {
1725       LOG (GNUNET_ERROR_TYPE_WARNING,
1726            "unknown channel (%s) was destroyed\n",
1727            GNUNET_i2s (peer));
1728     }
1729   }
1730   (void) Peers_check_connected (peer);
1731 }
1732
1733 /**
1734  * @brief Send a message to another peer.
1735  *
1736  * Keeps track about pending messages so they can be properly removed when the
1737  * peer is destroyed.
1738  *
1739  * @param peer receeiver of the message
1740  * @param ev envelope of the message
1741  * @param type type of the message
1742  */
1743 void
1744 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1745                     struct GNUNET_MQ_Envelope *ev,
1746                     const char *type)
1747 {
1748   struct PendingMessage *pending_msg;
1749   struct GNUNET_MQ_Handle *mq;
1750
1751   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1752               "Sending message to %s of type %s\n",
1753               GNUNET_i2s (peer),
1754               type);
1755   pending_msg = insert_pending_message (peer, ev, type);
1756   mq = get_mq (peer);
1757   GNUNET_MQ_notify_sent (ev,
1758                          mq_notify_sent_cb,
1759                          pending_msg);
1760   GNUNET_MQ_send (mq, ev);
1761 }
1762
1763 /**
1764  * @brief Schedule a operation on given peer
1765  *
1766  * Avoids scheduling an operation twice.
1767  *
1768  * @param peer the peer we want to schedule the operation for once it gets live
1769  *
1770  * @return #GNUNET_YES if the operation was scheduled
1771  *         #GNUNET_NO  otherwise
1772  */
1773 int
1774 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1775                           const PeerOp peer_op)
1776 {
1777   struct PeerPendingOp pending_op;
1778   struct PeerContext *peer_ctx;
1779
1780   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1781   {
1782     return GNUNET_NO;
1783   }
1784   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1785
1786   //TODO if LIVE/ONLINE execute immediately
1787
1788   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1789   {
1790     peer_ctx = get_peer_ctx (peer);
1791     pending_op.op = peer_op;
1792     pending_op.op_cls = NULL;
1793     GNUNET_array_append (peer_ctx->pending_ops,
1794                          peer_ctx->num_pending_ops,
1795                          pending_op);
1796     return GNUNET_YES;
1797   }
1798   return GNUNET_NO;
1799 }
1800
1801 /**
1802  * @brief Get the recv_channel of @a peer.
1803  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1804  * messages.
1805  *
1806  * @param peer The peer to get the recv_channel from.
1807  *
1808  * @return The recv_channel.
1809  */
1810 struct GNUNET_CADET_Channel *
1811 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1812 {
1813   struct PeerContext *peer_ctx;
1814
1815   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1816   peer_ctx = get_peer_ctx (peer);
1817   return peer_ctx->recv_channel;
1818 }
1819 /***********************************************************************
1820  * /Old gnunet-service-rps_peers.c
1821 ***********************************************************************/
1822
1823
1824 /***********************************************************************
1825  * Housekeeping with clients
1826 ***********************************************************************/
1827
1828 /**
1829  * Closure used to pass the client and the id to the callback
1830  * that replies to a client's request
1831  */
1832 struct ReplyCls
1833 {
1834   /**
1835    * DLL
1836    */
1837   struct ReplyCls *next;
1838   struct ReplyCls *prev;
1839
1840   /**
1841    * The identifier of the request
1842    */
1843   uint32_t id;
1844
1845   /**
1846    * The handle to the request
1847    */
1848   struct RPS_SamplerRequestHandle *req_handle;
1849
1850   /**
1851    * The client handle to send the reply to
1852    */
1853   struct ClientContext *cli_ctx;
1854 };
1855
1856
1857 /**
1858  * Struct used to store the context of a connected client.
1859  */
1860 struct ClientContext
1861 {
1862   /**
1863    * DLL
1864    */
1865   struct ClientContext *next;
1866   struct ClientContext *prev;
1867
1868   /**
1869    * The message queue to communicate with the client.
1870    */
1871   struct GNUNET_MQ_Handle *mq;
1872
1873   /**
1874    * DLL with handles to single requests from the client
1875    */
1876   struct ReplyCls *rep_cls_head;
1877   struct ReplyCls *rep_cls_tail;
1878
1879   /**
1880    * @brief How many updates this client expects to receive.
1881    */
1882   int64_t view_updates_left;
1883
1884   /**
1885    * The client handle to send the reply to
1886    */
1887   struct GNUNET_SERVICE_Client *client;
1888 };
1889
1890 /**
1891  * DLL with all clients currently connected to us
1892  */
1893 struct ClientContext *cli_ctx_head;
1894 struct ClientContext *cli_ctx_tail;
1895
1896 /***********************************************************************
1897  * /Housekeeping with clients
1898 ***********************************************************************/
1899
1900
1901
1902
1903
1904 /***********************************************************************
1905  * Globals
1906 ***********************************************************************/
1907
1908 /**
1909  * Sampler used for the Brahms protocol itself.
1910  */
1911 static struct RPS_Sampler *prot_sampler;
1912
1913 /**
1914  * Sampler used for the clients.
1915  */
1916 static struct RPS_Sampler *client_sampler;
1917
1918 /**
1919  * Name to log view to
1920  */
1921 static const char *file_name_view_log;
1922
1923 #ifdef TO_FILE
1924 /**
1925  * Name to log number of observed peers to
1926  */
1927 static const char *file_name_observed_log;
1928
1929 /**
1930  * @brief Count the observed peers
1931  */
1932 static uint32_t num_observed_peers;
1933
1934 /**
1935  * @brief Multipeermap (ab-) used to count unique peer_ids
1936  */
1937 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1938 #endif /* TO_FILE */
1939
1940 /**
1941  * The size of sampler we need to be able to satisfy the client's need
1942  * of random peers.
1943  */
1944 static unsigned int sampler_size_client_need;
1945
1946 /**
1947  * The size of sampler we need to be able to satisfy the Brahms protocol's
1948  * need of random peers.
1949  *
1950  * This is one minimum size the sampler grows to.
1951  */
1952 static unsigned int sampler_size_est_need;
1953
1954 /**
1955  * @brief This is the minimum estimate used as sampler size.
1956  *
1957  * It is configured by the user.
1958  */
1959 static unsigned int sampler_size_est_min;
1960
1961 /**
1962  * @brief This is the estimate used as view size.
1963  *
1964  * It is initialised with the minimum
1965  */
1966 static unsigned int view_size_est_need;
1967
1968 /**
1969  * @brief This is the minimum estimate used as view size.
1970  *
1971  * It is configured by the user.
1972  */
1973 static unsigned int view_size_est_min;
1974
1975 /**
1976  * Percentage of total peer number in the view
1977  * to send random PUSHes to
1978  */
1979 static float alpha;
1980
1981 /**
1982  * Percentage of total peer number in the view
1983  * to send random PULLs to
1984  */
1985 static float beta;
1986
1987 /**
1988  * Identifier for the main task that runs periodically.
1989  */
1990 static struct GNUNET_SCHEDULER_Task *do_round_task;
1991
1992 /**
1993  * Time inverval the do_round task runs in.
1994  */
1995 static struct GNUNET_TIME_Relative round_interval;
1996
1997 /**
1998  * List to store peers received through pushes temporary.
1999  */
2000 static struct CustomPeerMap *push_map;
2001
2002 /**
2003  * List to store peers received through pulls temporary.
2004  */
2005 static struct CustomPeerMap *pull_map;
2006
2007 /**
2008  * Handler to NSE.
2009  */
2010 static struct GNUNET_NSE_Handle *nse;
2011
2012 /**
2013  * Handler to CADET.
2014  */
2015 static struct GNUNET_CADET_Handle *cadet_handle;
2016
2017 /**
2018  * @brief Port to communicate to other peers.
2019  */
2020 static struct GNUNET_CADET_Port *cadet_port;
2021
2022 /**
2023  * Handler to PEERINFO.
2024  */
2025 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2026
2027 /**
2028  * Handle for cancellation of iteration over peers.
2029  */
2030 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2031
2032 /**
2033  * Request counter.
2034  *
2035  * Counts how many requets clients already issued.
2036  * Only needed in the beginning to check how many of the 64 deltas
2037  * we already have
2038  */
2039 static unsigned int req_counter;
2040
2041 /**
2042  * Time of the last request we received.
2043  *
2044  * Used to compute the expected request rate.
2045  */
2046 static struct GNUNET_TIME_Absolute last_request;
2047
2048 /**
2049  * Size of #request_deltas.
2050  */
2051 #define REQUEST_DELTAS_SIZE 64
2052 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2053
2054 /**
2055  * Last 64 deltas between requests
2056  */
2057 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2058
2059 /**
2060  * The prediction of the rate of requests
2061  */
2062 static struct GNUNET_TIME_Relative request_rate;
2063
2064
2065 #ifdef ENABLE_MALICIOUS
2066 /**
2067  * Type of malicious peer
2068  *
2069  * 0 Don't act malicious at all - Default
2070  * 1 Try to maximise representation
2071  * 2 Try to partition the network
2072  * 3 Combined attack
2073  */
2074 static uint32_t mal_type;
2075
2076 /**
2077  * Other malicious peers
2078  */
2079 static struct GNUNET_PeerIdentity *mal_peers;
2080
2081 /**
2082  * Hashmap of malicious peers used as set.
2083  * Used to more efficiently check whether we know that peer.
2084  */
2085 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2086
2087 /**
2088  * Number of other malicious peers
2089  */
2090 static uint32_t num_mal_peers;
2091
2092
2093 /**
2094  * If type is 2 This struct is used to store the attacked peers in a DLL
2095  */
2096 struct AttackedPeer
2097 {
2098   /**
2099    * DLL
2100    */
2101   struct AttackedPeer *next;
2102   struct AttackedPeer *prev;
2103
2104   /**
2105    * PeerID
2106    */
2107   struct GNUNET_PeerIdentity peer_id;
2108 };
2109
2110 /**
2111  * If type is 2 this is the DLL of attacked peers
2112  */
2113 static struct AttackedPeer *att_peers_head;
2114 static struct AttackedPeer *att_peers_tail;
2115
2116 /**
2117  * This index is used to point to an attacked peer to
2118  * implement the round-robin-ish way to select attacked peers.
2119  */
2120 static struct AttackedPeer *att_peer_index;
2121
2122 /**
2123  * Hashmap of attacked peers used as set.
2124  * Used to more efficiently check whether we know that peer.
2125  */
2126 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2127
2128 /**
2129  * Number of attacked peers
2130  */
2131 static uint32_t num_attacked_peers;
2132
2133 /**
2134  * If type is 1 this is the attacked peer
2135  */
2136 static struct GNUNET_PeerIdentity attacked_peer;
2137
2138 /**
2139  * The limit of PUSHes we can send in one round.
2140  * This is an assumption of the Brahms protocol and either implemented
2141  * via proof of work
2142  * or
2143  * assumend to be the bandwidth limitation.
2144  */
2145 static uint32_t push_limit = 10000;
2146 #endif /* ENABLE_MALICIOUS */
2147
2148
2149 /***********************************************************************
2150  * /Globals
2151 ***********************************************************************/
2152
2153
2154 /***********************************************************************
2155  * Util functions
2156 ***********************************************************************/
2157
2158
2159 /**
2160  * Print peerlist to log.
2161  */
2162 static void
2163 print_peer_list (struct GNUNET_PeerIdentity *list,
2164                  unsigned int len)
2165 {
2166   unsigned int i;
2167
2168   LOG (GNUNET_ERROR_TYPE_DEBUG,
2169        "Printing peer list of length %u at %p:\n",
2170        len,
2171        list);
2172   for (i = 0 ; i < len ; i++)
2173   {
2174     LOG (GNUNET_ERROR_TYPE_DEBUG,
2175          "%u. peer: %s\n",
2176          i, GNUNET_i2s (&list[i]));
2177   }
2178 }
2179
2180
2181 /**
2182  * Remove peer from list.
2183  */
2184 static void
2185 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2186                unsigned int *list_size,
2187                const struct GNUNET_PeerIdentity *peer)
2188 {
2189   unsigned int i;
2190   struct GNUNET_PeerIdentity *tmp;
2191
2192   tmp = *peer_list;
2193
2194   LOG (GNUNET_ERROR_TYPE_DEBUG,
2195        "Removing peer %s from list at %p\n",
2196        GNUNET_i2s (peer),
2197        tmp);
2198
2199   for ( i = 0 ; i < *list_size ; i++ )
2200   {
2201     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2202     {
2203       if (i < *list_size -1)
2204       { /* Not at the last entry -- shift peers left */
2205         memmove (&tmp[i], &tmp[i +1],
2206                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2207       }
2208       /* Remove last entry (should be now useless PeerID) */
2209       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2210     }
2211   }
2212   *peer_list = tmp;
2213 }
2214
2215
2216 /**
2217  * Sum all time relatives of an array.
2218  */
2219 static struct GNUNET_TIME_Relative
2220 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2221                 uint32_t arr_size)
2222 {
2223   struct GNUNET_TIME_Relative sum;
2224   uint32_t i;
2225
2226   sum = GNUNET_TIME_UNIT_ZERO;
2227   for ( i = 0 ; i < arr_size ; i++ )
2228   {
2229     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2230   }
2231   return sum;
2232 }
2233
2234
2235 /**
2236  * Compute the average of given time relatives.
2237  */
2238 static struct GNUNET_TIME_Relative
2239 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2240                 uint32_t arr_size)
2241 {
2242   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2243                                                       arr_size),
2244                                       arr_size);
2245 }
2246
2247
2248 /**
2249  * Insert PeerID in #view
2250  *
2251  * Called once we know a peer is live.
2252  * Implements #PeerOp
2253  *
2254  * @return GNUNET_OK if peer was actually inserted
2255  *         GNUNET_NO if peer was not inserted
2256  */
2257 static void
2258 insert_in_view_op (void *cls,
2259                 const struct GNUNET_PeerIdentity *peer);
2260
2261 /**
2262  * Insert PeerID in #view
2263  *
2264  * Called once we know a peer is live.
2265  *
2266  * @return GNUNET_OK if peer was actually inserted
2267  *         GNUNET_NO if peer was not inserted
2268  */
2269 static int
2270 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2271 {
2272   int online;
2273
2274   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2275   if ( (GNUNET_NO == online) ||
2276        (GNUNET_SYSERR == online) ) /* peer is not even known */
2277   {
2278     (void) Peers_issue_peer_liveliness_check (peer);
2279     (void) Peers_schedule_operation (peer, insert_in_view_op);
2280     return GNUNET_NO;
2281   }
2282   /* Open channel towards peer to keep connection open */
2283   Peers_indicate_sending_intention (peer);
2284   return View_put (peer);
2285 }
2286
2287 /**
2288  * @brief sends updates to clients that are interested
2289  */
2290 static void
2291 clients_notify_view_update (void);
2292
2293 /**
2294  * Put random peer from sampler into the view as history update.
2295  */
2296 static void
2297 hist_update (void *cls,
2298              struct GNUNET_PeerIdentity *ids,
2299              uint32_t num_peers)
2300 {
2301   unsigned int i;
2302
2303   for (i = 0; i < num_peers; i++)
2304   {
2305     (void) insert_in_view (&ids[i]);
2306     to_file (file_name_view_log,
2307              "+%s\t(hist)",
2308              GNUNET_i2s_full (ids));
2309   }
2310   clients_notify_view_update();
2311 }
2312
2313
2314 /**
2315  * Wrapper around #RPS_sampler_resize()
2316  *
2317  * If we do not have enough sampler elements, double current sampler size
2318  * If we have more than enough sampler elements, halv current sampler size
2319  */
2320 static void
2321 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2322 {
2323   unsigned int sampler_size;
2324
2325   // TODO statistics
2326   // TODO respect the min, max
2327   sampler_size = RPS_sampler_get_size (sampler);
2328   if (sampler_size > new_size * 4)
2329   { /* Shrinking */
2330     RPS_sampler_resize (sampler, sampler_size / 2);
2331   }
2332   else if (sampler_size < new_size)
2333   { /* Growing */
2334     RPS_sampler_resize (sampler, sampler_size * 2);
2335   }
2336   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2337 }
2338
2339
2340 /**
2341  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2342  */
2343 static void
2344 client_resize_wrapper ()
2345 {
2346   uint32_t bigger_size;
2347
2348   // TODO statistics
2349
2350   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2351
2352   // TODO respect the min, max
2353   resize_wrapper (client_sampler, bigger_size);
2354   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2355       bigger_size);
2356 }
2357
2358
2359 /**
2360  * Estimate request rate
2361  *
2362  * Called every time we receive a request from the client.
2363  */
2364 static void
2365 est_request_rate()
2366 {
2367   struct GNUNET_TIME_Relative max_round_duration;
2368
2369   if (request_deltas_size > req_counter)
2370     req_counter++;
2371   if ( 1 < req_counter)
2372   {
2373     /* Shift last request deltas to the right */
2374     memmove (&request_deltas[1],
2375         request_deltas,
2376         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2377
2378     /* Add current delta to beginning */
2379     request_deltas[0] =
2380         GNUNET_TIME_absolute_get_difference (last_request,
2381                                              GNUNET_TIME_absolute_get ());
2382     request_rate = T_relative_avg (request_deltas, req_counter);
2383     request_rate = (request_rate.rel_value_us < 1) ?
2384       GNUNET_TIME_relative_get_unit_ () : request_rate;
2385
2386     /* Compute the duration a round will maximally take */
2387     max_round_duration =
2388         GNUNET_TIME_relative_add (round_interval,
2389                                   GNUNET_TIME_relative_divide (round_interval, 2));
2390
2391     /* Set the estimated size the sampler has to have to
2392      * satisfy the current client request rate */
2393     sampler_size_client_need =
2394         max_round_duration.rel_value_us / request_rate.rel_value_us;
2395
2396     /* Resize the sampler */
2397     client_resize_wrapper ();
2398   }
2399   last_request = GNUNET_TIME_absolute_get ();
2400 }
2401
2402
2403 /**
2404  * Add all peers in @a peer_array to @a peer_map used as set.
2405  *
2406  * @param peer_array array containing the peers
2407  * @param num_peers number of peers in @peer_array
2408  * @param peer_map the peermap to use as set
2409  */
2410 static void
2411 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2412                        unsigned int num_peers,
2413                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2414 {
2415   unsigned int i;
2416   if (NULL == peer_map)
2417   {
2418     LOG (GNUNET_ERROR_TYPE_WARNING,
2419          "Trying to add peers to non-existing peermap.\n");
2420     return;
2421   }
2422
2423   for (i = 0; i < num_peers; i++)
2424   {
2425     GNUNET_CONTAINER_multipeermap_put (peer_map,
2426                                        &peer_array[i],
2427                                        NULL,
2428                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2429   }
2430 }
2431
2432
2433 /**
2434  * Send a PULL REPLY to @a peer_id
2435  *
2436  * @param peer_id the peer to send the reply to.
2437  * @param peer_ids the peers to send to @a peer_id
2438  * @param num_peer_ids the number of peers to send to @a peer_id
2439  */
2440 static void
2441 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2442                  const struct GNUNET_PeerIdentity *peer_ids,
2443                  unsigned int num_peer_ids)
2444 {
2445   uint32_t send_size;
2446   struct GNUNET_MQ_Envelope *ev;
2447   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2448
2449   /* Compute actual size */
2450   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2451               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2452
2453   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2454     /* Compute number of peers to send
2455      * If too long, simply truncate */
2456     // TODO select random ones via permutation
2457     //      or even better: do good protocol design
2458     send_size =
2459       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2460        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2461        sizeof (struct GNUNET_PeerIdentity);
2462   else
2463     send_size = num_peer_ids;
2464
2465   LOG (GNUNET_ERROR_TYPE_DEBUG,
2466       "Going to send PULL REPLY with %u peers to %s\n",
2467       send_size, GNUNET_i2s (peer_id));
2468
2469   ev = GNUNET_MQ_msg_extra (out_msg,
2470                             send_size * sizeof (struct GNUNET_PeerIdentity),
2471                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2472   out_msg->num_peers = htonl (send_size);
2473   GNUNET_memcpy (&out_msg[1], peer_ids,
2474          send_size * sizeof (struct GNUNET_PeerIdentity));
2475
2476   Peers_send_message (peer_id, ev, "PULL REPLY");
2477   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2478 }
2479
2480
2481 /**
2482  * Insert PeerID in #pull_map
2483  *
2484  * Called once we know a peer is live.
2485  */
2486 static void
2487 insert_in_pull_map (void *cls,
2488                     const struct GNUNET_PeerIdentity *peer)
2489 {
2490   CustomPeerMap_put (pull_map, peer);
2491 }
2492
2493
2494 /**
2495  * Insert PeerID in #view
2496  *
2497  * Called once we know a peer is live.
2498  * Implements #PeerOp
2499  */
2500 static void
2501 insert_in_view_op (void *cls,
2502                 const struct GNUNET_PeerIdentity *peer)
2503 {
2504   (void) insert_in_view (peer);
2505 }
2506
2507
2508 /**
2509  * Update sampler with given PeerID.
2510  * Implements #PeerOp
2511  */
2512 static void
2513 insert_in_sampler (void *cls,
2514                    const struct GNUNET_PeerIdentity *peer)
2515 {
2516   LOG (GNUNET_ERROR_TYPE_DEBUG,
2517        "Updating samplers with peer %s from insert_in_sampler()\n",
2518        GNUNET_i2s (peer));
2519   RPS_sampler_update (prot_sampler,   peer);
2520   RPS_sampler_update (client_sampler, peer);
2521   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2522   {
2523     /* Make sure we 'know' about this peer */
2524     (void) Peers_issue_peer_liveliness_check (peer);
2525     /* Establish a channel towards that peer to indicate we are going to send
2526      * messages to it */
2527     //Peers_indicate_sending_intention (peer);
2528   }
2529   #ifdef TO_FILE
2530   num_observed_peers++;
2531   GNUNET_CONTAINER_multipeermap_put
2532     (observed_unique_peers,
2533      peer,
2534      NULL,
2535      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2536   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2537       observed_unique_peers);
2538   to_file (file_name_observed_log,
2539           "%" PRIu32 " %" PRIu32 " %f\n",
2540           num_observed_peers,
2541           num_observed_unique_peers,
2542           1.0*num_observed_unique_peers/num_observed_peers)
2543   #endif /* TO_FILE */
2544 }
2545
2546 /**
2547  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2548  *        If the peer is not known, liveliness check is issued and it is
2549  *        scheduled to be inserted in sampler and view.
2550  *
2551  * "External sources" refer to every source except the gossip.
2552  *
2553  * @param peer peer to insert
2554  */
2555 static void
2556 got_peer (const struct GNUNET_PeerIdentity *peer)
2557 {
2558   /* If we did not know this peer already, insert it into sampler and view */
2559   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2560   {
2561     Peers_schedule_operation (peer, insert_in_sampler);
2562     Peers_schedule_operation (peer, insert_in_view_op);
2563   }
2564 }
2565
2566 /**
2567  * @brief Checks if there is a sending channel and if it is needed
2568  *
2569  * @param peer the peer whose sending channel is checked
2570  * @return GNUNET_YES if sending channel exists and is still needed
2571  *         GNUNET_NO  otherwise
2572  */
2573 static int
2574 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2575 {
2576   /* struct GNUNET_CADET_Channel *channel; */
2577   if (GNUNET_NO == Peers_check_peer_known (peer))
2578   {
2579     return GNUNET_NO;
2580   }
2581   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2582   {
2583     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2584          (GNUNET_YES == View_contains_peer (peer)) ||
2585          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2586          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2587          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2588     { /* If we want to keep the connection to peer open */
2589       return GNUNET_YES;
2590     }
2591     return GNUNET_NO;
2592   }
2593   return GNUNET_NO;
2594 }
2595
2596 /**
2597  * @brief remove peer from our knowledge, the view, push and pull maps and
2598  * samplers.
2599  *
2600  * @param peer the peer to remove
2601  */
2602 static void
2603 remove_peer (const struct GNUNET_PeerIdentity *peer)
2604 {
2605   (void) View_remove_peer (peer);
2606   CustomPeerMap_remove_peer (pull_map, peer);
2607   CustomPeerMap_remove_peer (push_map, peer);
2608   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2609   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2610   Peers_remove_peer (peer);
2611 }
2612
2613
2614 /**
2615  * @brief Remove data that is not needed anymore.
2616  *
2617  * If the sending channel is no longer needed it is destroyed.
2618  *
2619  * @param peer the peer whose data is about to be cleaned
2620  */
2621 static void
2622 clean_peer (const struct GNUNET_PeerIdentity *peer)
2623 {
2624   if (GNUNET_NO == check_sending_channel_needed (peer))
2625   {
2626     LOG (GNUNET_ERROR_TYPE_DEBUG,
2627         "Going to remove send channel to peer %s\n",
2628         GNUNET_i2s (peer));
2629     #ifdef ENABLE_MALICIOUS
2630     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2631       (void) Peers_destroy_sending_channel (peer);
2632     #else /* ENABLE_MALICIOUS */
2633     (void) Peers_destroy_sending_channel (peer);
2634     #endif /* ENABLE_MALICIOUS */
2635   }
2636
2637   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2638        (GNUNET_NO == View_contains_peer (peer)) &&
2639        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2640        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2641        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2642        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2643        (GNUNET_NO != Peers_check_removable (peer)) )
2644   { /* We can safely remove this peer */
2645     LOG (GNUNET_ERROR_TYPE_DEBUG,
2646         "Going to remove peer %s\n",
2647         GNUNET_i2s (peer));
2648     remove_peer (peer);
2649     return;
2650   }
2651 }
2652
2653 /**
2654  * @brief This is called when a channel is destroyed.
2655  *
2656  * Removes peer completely from our knowledge if the send_channel was destroyed
2657  * Otherwise simply delete the recv_channel
2658  * Also check if the knowledge about this peer is still needed.
2659  * If not, remove this peer from our knowledge.
2660  *
2661  * @param cls The closure
2662  * @param channel The channel being closed
2663  * @param channel_ctx The context associated with this channel
2664  */
2665 static void
2666 cleanup_destroyed_channel (void *cls,
2667                            const struct GNUNET_CADET_Channel *channel)
2668 {
2669   struct GNUNET_PeerIdentity *peer = cls;
2670   uint32_t *channel_flag;
2671   struct PeerContext *peer_ctx;
2672
2673   GNUNET_assert (NULL != peer);
2674
2675   if (GNUNET_NO == Peers_check_peer_known (peer))
2676   { /* We don't know a context to that peer */
2677     LOG (GNUNET_ERROR_TYPE_WARNING,
2678          "channel (%s) without associated context was destroyed\n",
2679          GNUNET_i2s (peer));
2680     GNUNET_free (peer);
2681     return;
2682   }
2683
2684   peer_ctx = get_peer_ctx (peer);
2685   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2686   {
2687     LOG (GNUNET_ERROR_TYPE_DEBUG,
2688         "Callback on destruction of recv-channel was called (%s)\n",
2689         GNUNET_i2s (peer));
2690     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2691   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2692   {
2693     LOG (GNUNET_ERROR_TYPE_DEBUG,
2694         "Callback on destruction of send-channel was called (%s)\n",
2695         GNUNET_i2s (peer));
2696     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2697   } else {
2698     LOG (GNUNET_ERROR_TYPE_ERROR,
2699         "Channel to be destroyed has is neither sending nor receiving role\n");
2700   }
2701
2702   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2703   { /* We are in the middle of removing that peer from our knowledge. In this
2704        case simply make sure that the channels are cleaned. */
2705     Peers_cleanup_destroyed_channel (cls, channel);
2706     to_file (file_name_view_log,
2707              "-%s\t(cleanup channel, ourself)",
2708              GNUNET_i2s_full (peer));
2709     GNUNET_free (peer);
2710     return;
2711   }
2712
2713   if (GNUNET_YES ==
2714       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2715   { /* Channel used for sending was destroyed */
2716     /* Possible causes of channel destruction:
2717      *  - ourselves  -> cleaning send channel -> clean context
2718      *  - other peer -> peer probably went down -> remove
2719      */
2720     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2721     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2722     { /* We are about to clean the sending channel. Clean the respective
2723        * context */
2724       Peers_cleanup_destroyed_channel (cls, channel);
2725       GNUNET_free (peer);
2726       return;
2727     }
2728     else
2729     { /* Other peer destroyed our sending channel that he is supposed to keep
2730        * open. It probably went down. Remove it from our knowledge. */
2731       Peers_cleanup_destroyed_channel (cls, channel);
2732       remove_peer (peer);
2733       GNUNET_free (peer);
2734       return;
2735     }
2736   }
2737   else if (GNUNET_YES ==
2738       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2739   { /* Channel used for receiving was destroyed */
2740     /* Possible causes of channel destruction:
2741      *  - ourselves  -> peer tried to establish channel twice -> clean context
2742      *  - other peer -> peer doesn't want to send us data -> clean
2743      */
2744     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2745     if (GNUNET_YES ==
2746         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2747     { /* Other peer tried to establish a channel to us twice. We do not accept
2748        * that. Clean the context. */
2749       Peers_cleanup_destroyed_channel (cls, channel);
2750       GNUNET_free (peer);
2751       return;
2752     }
2753     else
2754     { /* Other peer doesn't want to send us data anymore. We are free to clean
2755        * it. */
2756       Peers_cleanup_destroyed_channel (cls, channel);
2757       clean_peer (peer);
2758       GNUNET_free (peer);
2759       return;
2760     }
2761   }
2762   else
2763   {
2764     LOG (GNUNET_ERROR_TYPE_WARNING,
2765         "Destroyed channel is neither sending nor receiving channel\n");
2766   }
2767   GNUNET_free (peer);
2768 }
2769
2770 /***********************************************************************
2771  * /Util functions
2772 ***********************************************************************/
2773
2774 static void
2775 destroy_reply_cls (struct ReplyCls *rep_cls)
2776 {
2777   struct ClientContext *cli_ctx;
2778
2779   cli_ctx = rep_cls->cli_ctx;
2780   GNUNET_assert (NULL != cli_ctx);
2781   if (NULL != rep_cls->req_handle)
2782   {
2783     RPS_sampler_request_cancel (rep_cls->req_handle);
2784   }
2785   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2786                                cli_ctx->rep_cls_tail,
2787                                rep_cls);
2788   GNUNET_free (rep_cls);
2789 }
2790
2791
2792 static void
2793 destroy_cli_ctx (struct ClientContext *cli_ctx)
2794 {
2795   GNUNET_assert (NULL != cli_ctx);
2796   if (NULL != cli_ctx->rep_cls_head)
2797   {
2798     LOG (GNUNET_ERROR_TYPE_WARNING,
2799          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2800     while (NULL != cli_ctx->rep_cls_head)
2801       destroy_reply_cls (cli_ctx->rep_cls_head);
2802   }
2803   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2804                                cli_ctx_tail,
2805                                cli_ctx);
2806   GNUNET_free (cli_ctx);
2807 }
2808
2809
2810 /**
2811  * Function called by NSE.
2812  *
2813  * Updates sizes of sampler list and view and adapt those lists
2814  * accordingly.
2815  */
2816 static void
2817 nse_callback (void *cls,
2818               struct GNUNET_TIME_Absolute timestamp,
2819               double logestimate, double std_dev)
2820 {
2821   double estimate;
2822   //double scale; // TODO this might go gloabal/config
2823
2824   LOG (GNUNET_ERROR_TYPE_DEBUG,
2825        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2826        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2827   //scale = .01;
2828   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2829   // GNUNET_NSE_log_estimate_to_n (logestimate);
2830   estimate = pow (estimate, 1.0 / 3);
2831   // TODO add if std_dev is a number
2832   // estimate += (std_dev * scale);
2833   if (view_size_est_min < ceil (estimate))
2834   {
2835     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2836     sampler_size_est_need = estimate;
2837     view_size_est_need = estimate;
2838   } else
2839   {
2840     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2841     //sampler_size_est_need = view_size_est_min;
2842     view_size_est_need = view_size_est_min;
2843   }
2844
2845   /* If the NSE has changed adapt the lists accordingly */
2846   resize_wrapper (prot_sampler, sampler_size_est_need);
2847   client_resize_wrapper ();
2848 }
2849
2850
2851 /**
2852  * Callback called once the requested PeerIDs are ready.
2853  *
2854  * Sends those to the requesting client.
2855  */
2856 static void
2857 client_respond (void *cls,
2858                 struct GNUNET_PeerIdentity *peer_ids,
2859                 uint32_t num_peers)
2860 {
2861   struct ReplyCls *reply_cls = cls;
2862   uint32_t i;
2863   struct GNUNET_MQ_Envelope *ev;
2864   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2865   uint32_t size_needed;
2866   struct ClientContext *cli_ctx;
2867
2868   GNUNET_assert (NULL != reply_cls);
2869   LOG (GNUNET_ERROR_TYPE_DEBUG,
2870        "sampler returned %" PRIu32 " peers:\n",
2871        num_peers);
2872   for (i = 0; i < num_peers; i++)
2873   {
2874     LOG (GNUNET_ERROR_TYPE_DEBUG,
2875          "  %" PRIu32 ": %s\n",
2876          i,
2877          GNUNET_i2s (&peer_ids[i]));
2878   }
2879
2880   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2881                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2882
2883   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2884
2885   ev = GNUNET_MQ_msg_extra (out_msg,
2886                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2887                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2888   out_msg->num_peers = htonl (num_peers);
2889   out_msg->id = htonl (reply_cls->id);
2890
2891   GNUNET_memcpy (&out_msg[1],
2892           peer_ids,
2893           num_peers * sizeof (struct GNUNET_PeerIdentity));
2894   GNUNET_free (peer_ids);
2895
2896   cli_ctx = reply_cls->cli_ctx;
2897   GNUNET_assert (NULL != cli_ctx);
2898   reply_cls->req_handle = NULL;
2899   destroy_reply_cls (reply_cls);
2900   GNUNET_MQ_send (cli_ctx->mq, ev);
2901 }
2902
2903
2904 /**
2905  * Handle RPS request from the client.
2906  *
2907  * @param cls closure
2908  * @param message the actual message
2909  */
2910 static void
2911 handle_client_request (void *cls,
2912                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2913 {
2914   struct ClientContext *cli_ctx = cls;
2915   uint32_t num_peers;
2916   uint32_t size_needed;
2917   struct ReplyCls *reply_cls;
2918   uint32_t i;
2919
2920   num_peers = ntohl (msg->num_peers);
2921   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2922                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2923
2924   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2925   {
2926     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2927                 "Message received from client has size larger than expected\n");
2928     GNUNET_SERVICE_client_drop (cli_ctx->client);
2929     return;
2930   }
2931
2932   for (i = 0 ; i < num_peers ; i++)
2933     est_request_rate();
2934
2935   LOG (GNUNET_ERROR_TYPE_DEBUG,
2936        "Client requested %" PRIu32 " random peer(s).\n",
2937        num_peers);
2938
2939   reply_cls = GNUNET_new (struct ReplyCls);
2940   reply_cls->id = ntohl (msg->id);
2941   reply_cls->cli_ctx = cli_ctx;
2942   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2943                                                         client_respond,
2944                                                         reply_cls,
2945                                                         num_peers);
2946
2947   GNUNET_assert (NULL != cli_ctx);
2948   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2949                                cli_ctx->rep_cls_tail,
2950                                reply_cls);
2951   GNUNET_SERVICE_client_continue (cli_ctx->client);
2952 }
2953
2954
2955 /**
2956  * @brief Handle a message that requests the cancellation of a request
2957  *
2958  * @param cls unused
2959  * @param message the message containing the id of the request
2960  */
2961 static void
2962 handle_client_request_cancel (void *cls,
2963                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2964 {
2965   struct ClientContext *cli_ctx = cls;
2966   struct ReplyCls *rep_cls;
2967
2968   GNUNET_assert (NULL != cli_ctx);
2969   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2970   rep_cls = cli_ctx->rep_cls_head;
2971   LOG (GNUNET_ERROR_TYPE_DEBUG,
2972       "Client cancels request with id %" PRIu32 "\n",
2973       ntohl (msg->id));
2974   while ( (NULL != rep_cls->next) &&
2975           (rep_cls->id != ntohl (msg->id)) )
2976     rep_cls = rep_cls->next;
2977   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2978   destroy_reply_cls (rep_cls);
2979   GNUNET_SERVICE_client_continue (cli_ctx->client);
2980 }
2981
2982
2983 /**
2984  * @brief This function is called, when the client seeds peers.
2985  * It verifies that @a msg is well-formed.
2986  *
2987  * @param cls the closure (#ClientContext)
2988  * @param msg the message
2989  * @return #GNUNET_OK if @a msg is well-formed
2990  */
2991 static int
2992 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2993 {
2994   struct ClientContext *cli_ctx = cls;
2995   uint16_t msize = ntohs (msg->header.size);
2996   uint32_t num_peers = ntohl (msg->num_peers);
2997
2998   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2999   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3000        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3001   {
3002     GNUNET_break (0);
3003     GNUNET_SERVICE_client_drop (cli_ctx->client);
3004     return GNUNET_SYSERR;
3005   }
3006   return GNUNET_OK;
3007 }
3008
3009
3010 /**
3011  * Handle seed from the client.
3012  *
3013  * @param cls closure
3014  * @param message the actual message
3015  */
3016 static void
3017 handle_client_seed (void *cls,
3018                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3019 {
3020   struct ClientContext *cli_ctx = cls;
3021   struct GNUNET_PeerIdentity *peers;
3022   uint32_t num_peers;
3023   uint32_t i;
3024
3025   num_peers = ntohl (msg->num_peers);
3026   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3027   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3028   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
3029
3030   LOG (GNUNET_ERROR_TYPE_DEBUG,
3031        "Client seeded peers:\n");
3032   print_peer_list (peers, num_peers);
3033
3034   for (i = 0; i < num_peers; i++)
3035   {
3036     LOG (GNUNET_ERROR_TYPE_DEBUG,
3037          "Updating samplers with seed %" PRIu32 ": %s\n",
3038          i,
3039          GNUNET_i2s (&peers[i]));
3040
3041     got_peer (&peers[i]);
3042   }
3043
3044   ////GNUNET_free (peers);
3045
3046   GNUNET_SERVICE_client_continue (cli_ctx->client);
3047 }
3048
3049 /**
3050  * @brief Send view to client
3051  *
3052  * @param cli_ctx the context of the client
3053  * @param view_array the peerids of the view as array (can be empty)
3054  * @param view_size the size of the view array (can be 0)
3055  */
3056 void
3057 send_view (const struct ClientContext *cli_ctx,
3058            const struct GNUNET_PeerIdentity *view_array,
3059            uint64_t view_size)
3060 {
3061   struct GNUNET_MQ_Envelope *ev;
3062   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3063
3064   if (NULL == view_array)
3065   {
3066     view_size = View_size ();
3067     view_array = View_get_as_array();
3068   }
3069
3070   ev = GNUNET_MQ_msg_extra (out_msg,
3071                             view_size * sizeof (struct GNUNET_PeerIdentity),
3072                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3073   out_msg->num_peers = htonl (view_size);
3074
3075   GNUNET_memcpy (&out_msg[1],
3076           view_array,
3077           view_size * sizeof (struct GNUNET_PeerIdentity));
3078   GNUNET_MQ_send (cli_ctx->mq, ev);
3079 }
3080
3081 /**
3082  * @brief sends updates to clients that are interested
3083  */
3084 static void
3085 clients_notify_view_update (void)
3086 {
3087   struct ClientContext *cli_ctx_iter;
3088   uint64_t num_peers;
3089   const struct GNUNET_PeerIdentity *view_array;
3090
3091   num_peers = View_size ();
3092   view_array = View_get_as_array();
3093   /* check size of view is small enough */
3094   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3095   {
3096     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3097                 "View is too big to send\n");
3098     return;
3099   }
3100
3101   for (cli_ctx_iter = cli_ctx_head;
3102        NULL != cli_ctx_iter;
3103        cli_ctx_iter = cli_ctx_head->next)
3104   {
3105     if (1 < cli_ctx_iter->view_updates_left)
3106     {
3107       /* Client wants to receive limited amount of updates */
3108       cli_ctx_iter->view_updates_left -= 1;
3109     } else if (1 == cli_ctx_iter->view_updates_left)
3110     {
3111       /* Last update of view for client */
3112       cli_ctx_iter->view_updates_left = -1;
3113     } else if (0 > cli_ctx_iter->view_updates_left) {
3114       /* Client is not interested in updates */
3115       continue;
3116     }
3117     /* else _updates_left == 0 - infinite amount of updates */
3118
3119     /* send view */
3120     send_view (cli_ctx_iter, view_array, num_peers);
3121   }
3122 }
3123
3124
3125 /**
3126  * Handle RPS request from the client.
3127  *
3128  * @param cls closure
3129  * @param message the actual message
3130  */
3131 static void
3132 handle_client_view_request (void *cls,
3133                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3134 {
3135   struct ClientContext *cli_ctx = cls;
3136   uint64_t num_updates;
3137
3138   num_updates = ntohl (msg->num_updates);
3139
3140   LOG (GNUNET_ERROR_TYPE_DEBUG,
3141        "Client requested %" PRIu64 " updates of view.\n",
3142        num_updates);
3143
3144   GNUNET_assert (NULL != cli_ctx);
3145   cli_ctx->view_updates_left = num_updates;
3146   send_view (cli_ctx, NULL, 0);
3147   GNUNET_SERVICE_client_continue (cli_ctx->client);
3148 }
3149
3150 /**
3151  * Handle a CHECK_LIVE message from another peer.
3152  *
3153  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3154  * the channel is blocked for all other communication.
3155  *
3156  * @param cls Closure
3157  * @param msg The message header
3158  */
3159 static void
3160 handle_peer_check (void *cls,
3161                    const struct GNUNET_MessageHeader *msg)
3162 {
3163   const struct GNUNET_PeerIdentity *peer = cls;
3164   LOG (GNUNET_ERROR_TYPE_DEBUG,
3165       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3166
3167   GNUNET_break_op (Peers_check_peer_known (peer));
3168   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3169 }
3170
3171 /**
3172  * Handle a PUSH message from another peer.
3173  *
3174  * Check the proof of work and store the PeerID
3175  * in the temporary list for pushed PeerIDs.
3176  *
3177  * @param cls Closure
3178  * @param msg The message header
3179  */
3180 static void
3181 handle_peer_push (void *cls,
3182                   const struct GNUNET_MessageHeader *msg)
3183 {
3184   const struct GNUNET_PeerIdentity *peer = cls;
3185
3186   // (check the proof of work (?))
3187
3188   LOG (GNUNET_ERROR_TYPE_DEBUG,
3189        "Received PUSH (%s)\n",
3190        GNUNET_i2s (peer));
3191   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3192
3193   #ifdef ENABLE_MALICIOUS
3194   struct AttackedPeer *tmp_att_peer;
3195
3196   if ( (1 == mal_type) ||
3197        (3 == mal_type) )
3198   { /* Try to maximise representation */
3199     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3200     tmp_att_peer->peer_id = *peer;
3201     if (NULL == att_peer_set)
3202       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3203     if (GNUNET_NO ==
3204         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3205                                                 peer))
3206     {
3207       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3208                                    att_peers_tail,
3209                                    tmp_att_peer);
3210       add_peer_array_to_set (peer, 1, att_peer_set);
3211     }
3212   }
3213
3214
3215   else if (2 == mal_type)
3216   {
3217     /* We attack one single well-known peer - simply ignore */
3218   }
3219   #endif /* ENABLE_MALICIOUS */
3220
3221   /* Add the sending peer to the push_map */
3222   CustomPeerMap_put (push_map, peer);
3223
3224   GNUNET_break_op (Peers_check_peer_known (peer));
3225   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3226 }
3227
3228
3229 /**
3230  * Handle PULL REQUEST request message from another peer.
3231  *
3232  * Reply with the view of PeerIDs.
3233  *
3234  * @param cls Closure
3235  * @param msg The message header
3236  */
3237 static void
3238 handle_peer_pull_request (void *cls,
3239                           const struct GNUNET_MessageHeader *msg)
3240 {
3241   struct GNUNET_PeerIdentity *peer = cls;
3242   const struct GNUNET_PeerIdentity *view_array;
3243
3244   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3245   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3246
3247   #ifdef ENABLE_MALICIOUS
3248   if (1 == mal_type
3249       || 3 == mal_type)
3250   { /* Try to maximise representation */
3251     send_pull_reply (peer, mal_peers, num_mal_peers);
3252   }
3253
3254   else if (2 == mal_type)
3255   { /* Try to partition network */
3256     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3257     {
3258       send_pull_reply (peer, mal_peers, num_mal_peers);
3259     }
3260   }
3261   #endif /* ENABLE_MALICIOUS */
3262
3263   GNUNET_break_op (Peers_check_peer_known (peer));
3264   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3265   view_array = View_get_as_array ();
3266   send_pull_reply (peer, view_array, View_size ());
3267 }
3268
3269
3270 /**
3271  * Check whether we sent a corresponding request and
3272  * whether this reply is the first one.
3273  *
3274  * @param cls Closure
3275  * @param msg The message header
3276  */
3277 static int
3278 check_peer_pull_reply (void *cls,
3279                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3280 {
3281   struct GNUNET_PeerIdentity *sender = cls;
3282
3283   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3284   {
3285     GNUNET_break_op (0);
3286     return GNUNET_SYSERR;
3287   }
3288
3289   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3290       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3291   {
3292     LOG (GNUNET_ERROR_TYPE_ERROR,
3293         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3294         ntohl (msg->num_peers),
3295         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3296             sizeof (struct GNUNET_PeerIdentity));
3297     GNUNET_break_op (0);
3298     return GNUNET_SYSERR;
3299   }
3300
3301   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3302   {
3303     LOG (GNUNET_ERROR_TYPE_WARNING,
3304         "Received a pull reply from a peer we didn't request one from!\n");
3305     GNUNET_break_op (0);
3306     return GNUNET_SYSERR;
3307   }
3308   return GNUNET_OK;
3309 }
3310
3311 /**
3312  * Handle PULL REPLY message from another peer.
3313  *
3314  * @param cls Closure
3315  * @param msg The message header
3316  */
3317 static void
3318 handle_peer_pull_reply (void *cls,
3319                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3320 {
3321   const struct GNUNET_PeerIdentity *peers;
3322   struct GNUNET_PeerIdentity *sender = cls;
3323   uint32_t i;
3324 #ifdef ENABLE_MALICIOUS
3325   struct AttackedPeer *tmp_att_peer;
3326 #endif /* ENABLE_MALICIOUS */
3327
3328   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3329   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3330
3331   #ifdef ENABLE_MALICIOUS
3332   // We shouldn't even receive pull replies as we're not sending
3333   if (2 == mal_type)
3334   {
3335   }
3336   #endif /* ENABLE_MALICIOUS */
3337
3338   /* Do actual logic */
3339   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3340
3341   LOG (GNUNET_ERROR_TYPE_DEBUG,
3342        "PULL REPLY received, got following %u peers:\n",
3343        ntohl (msg->num_peers));
3344
3345   for (i = 0; i < ntohl (msg->num_peers); i++)
3346   {
3347     LOG (GNUNET_ERROR_TYPE_DEBUG,
3348          "%u. %s\n",
3349          i,
3350          GNUNET_i2s (&peers[i]));
3351
3352     #ifdef ENABLE_MALICIOUS
3353     if ((NULL != att_peer_set) &&
3354         (1 == mal_type || 3 == mal_type))
3355     { /* Add attacked peer to local list */
3356       // TODO check if we sent a request and this was the first reply
3357       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3358                                                                &peers[i])
3359           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3360                                                                   &peers[i])
3361           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3362                                                    &own_identity))
3363       {
3364         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3365         tmp_att_peer->peer_id = peers[i];
3366         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3367                                      att_peers_tail,
3368                                      tmp_att_peer);
3369         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3370       }
3371       continue;
3372     }
3373     #endif /* ENABLE_MALICIOUS */
3374     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3375                                               &peers[i]))
3376     {
3377       /* Make sure we 'know' about this peer */
3378       (void) Peers_insert_peer (&peers[i]);
3379
3380       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3381       {
3382         CustomPeerMap_put (pull_map, &peers[i]);
3383       }
3384       else
3385       {
3386         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3387         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3388       }
3389     }
3390   }
3391
3392   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3393   clean_peer (sender);
3394
3395   GNUNET_break_op (Peers_check_peer_known (sender));
3396   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3397 }
3398
3399
3400 /**
3401  * Compute a random delay.
3402  * A uniformly distributed value between mean + spread and mean - spread.
3403  *
3404  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3405  * It would return a random value between 2 and 6 min.
3406  *
3407  * @param mean the mean
3408  * @param spread the inverse amount of deviation from the mean
3409  */
3410 static struct GNUNET_TIME_Relative
3411 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3412                     unsigned int spread)
3413 {
3414   struct GNUNET_TIME_Relative half_interval;
3415   struct GNUNET_TIME_Relative ret;
3416   unsigned int rand_delay;
3417   unsigned int max_rand_delay;
3418
3419   if (0 == spread)
3420   {
3421     LOG (GNUNET_ERROR_TYPE_WARNING,
3422          "Not accepting spread of 0\n");
3423     GNUNET_break (0);
3424     GNUNET_assert (0);
3425   }
3426   GNUNET_assert (0 != mean.rel_value_us);
3427
3428   /* Compute random time value between spread * mean and spread * mean */
3429   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3430
3431   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3432   /**
3433    * Compute random value between (0 and 1) * round_interval
3434    * via multiplying round_interval with a 'fraction' (0 to value)/value
3435    */
3436   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3437   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3438   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3439   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3440
3441   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3442     LOG (GNUNET_ERROR_TYPE_WARNING,
3443          "Returning FOREVER_REL\n");
3444
3445   return ret;
3446 }
3447
3448
3449 /**
3450  * Send single pull request
3451  *
3452  * @param peer_id the peer to send the pull request to.
3453  */
3454 static void
3455 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3456 {
3457   struct GNUNET_MQ_Envelope *ev;
3458
3459   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3460                                                      Peers_PULL_REPLY_PENDING));
3461   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3462
3463   LOG (GNUNET_ERROR_TYPE_DEBUG,
3464        "Going to send PULL REQUEST to peer %s.\n",
3465        GNUNET_i2s (peer));
3466
3467   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3468   Peers_send_message (peer, ev, "PULL REQUEST");
3469   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3470 }
3471
3472
3473 /**
3474  * Send single push
3475  *
3476  * @param peer_id the peer to send the push to.
3477  */
3478 static void
3479 send_push (const struct GNUNET_PeerIdentity *peer_id)
3480 {
3481   struct GNUNET_MQ_Envelope *ev;
3482
3483   LOG (GNUNET_ERROR_TYPE_DEBUG,
3484        "Going to send PUSH to peer %s.\n",
3485        GNUNET_i2s (peer_id));
3486
3487   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3488   Peers_send_message (peer_id, ev, "PUSH");
3489   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3490 }
3491
3492
3493 static void
3494 do_round (void *cls);
3495
3496 static void
3497 do_mal_round (void *cls);
3498
3499 #ifdef ENABLE_MALICIOUS
3500
3501
3502 /**
3503  * @brief This function is called, when the client tells us to act malicious.
3504  * It verifies that @a msg is well-formed.
3505  *
3506  * @param cls the closure (#ClientContext)
3507  * @param msg the message
3508  * @return #GNUNET_OK if @a msg is well-formed
3509  */
3510 static int
3511 check_client_act_malicious (void *cls,
3512                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3513 {
3514   struct ClientContext *cli_ctx = cls;
3515   uint16_t msize = ntohs (msg->header.size);
3516   uint32_t num_peers = ntohl (msg->num_peers);
3517
3518   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3519   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3520        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3521   {
3522     LOG (GNUNET_ERROR_TYPE_ERROR,
3523         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3524         ntohl (msg->num_peers),
3525         (msize / sizeof (struct GNUNET_PeerIdentity)));
3526     GNUNET_break (0);
3527     GNUNET_SERVICE_client_drop (cli_ctx->client);
3528     return GNUNET_SYSERR;
3529   }
3530   return GNUNET_OK;
3531 }
3532
3533 /**
3534  * Turn RPS service to act malicious.
3535  *
3536  * @param cls Closure
3537  * @param client The client that sent the message
3538  * @param msg The message header
3539  */
3540 static void
3541 handle_client_act_malicious (void *cls,
3542                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3543 {
3544   struct ClientContext *cli_ctx = cls;
3545   struct GNUNET_PeerIdentity *peers;
3546   uint32_t num_mal_peers_sent;
3547   uint32_t num_mal_peers_old;
3548
3549   /* Do actual logic */
3550   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3551   mal_type = ntohl (msg->type);
3552   if (NULL == mal_peer_set)
3553     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3554
3555   LOG (GNUNET_ERROR_TYPE_DEBUG,
3556        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3557        mal_type,
3558        ntohl (msg->num_peers));
3559
3560   if (1 == mal_type)
3561   { /* Try to maximise representation */
3562     /* Add other malicious peers to those we already know */
3563
3564     num_mal_peers_sent = ntohl (msg->num_peers);
3565     num_mal_peers_old = num_mal_peers;
3566     GNUNET_array_grow (mal_peers,
3567                        num_mal_peers,
3568                        num_mal_peers + num_mal_peers_sent);
3569     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3570             peers,
3571             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3572
3573     /* Add all mal peers to mal_peer_set */
3574     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3575                            num_mal_peers_sent,
3576                            mal_peer_set);
3577
3578     /* Substitute do_round () with do_mal_round () */
3579     GNUNET_SCHEDULER_cancel (do_round_task);
3580     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3581   }
3582
3583   else if ( (2 == mal_type) ||
3584             (3 == mal_type) )
3585   { /* Try to partition the network */
3586     /* Add other malicious peers to those we already know */
3587
3588     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3589     num_mal_peers_old = num_mal_peers;
3590     GNUNET_array_grow (mal_peers,
3591                        num_mal_peers,
3592                        num_mal_peers + num_mal_peers_sent);
3593     if (NULL != mal_peers &&
3594         0 != num_mal_peers)
3595     {
3596       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3597               peers,
3598               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3599
3600       /* Add all mal peers to mal_peer_set */
3601       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3602                              num_mal_peers_sent,
3603                              mal_peer_set);
3604     }
3605
3606     /* Store the one attacked peer */
3607     GNUNET_memcpy (&attacked_peer,
3608             &msg->attacked_peer,
3609             sizeof (struct GNUNET_PeerIdentity));
3610     /* Set the flag of the attacked peer to valid to avoid problems */
3611     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3612     {
3613       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3614     }
3615
3616     LOG (GNUNET_ERROR_TYPE_DEBUG,
3617          "Attacked peer is %s\n",
3618          GNUNET_i2s (&attacked_peer));
3619
3620     /* Substitute do_round () with do_mal_round () */
3621     GNUNET_SCHEDULER_cancel (do_round_task);
3622     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3623   }
3624   else if (0 == mal_type)
3625   { /* Stop acting malicious */
3626     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3627
3628     /* Substitute do_mal_round () with do_round () */
3629     GNUNET_SCHEDULER_cancel (do_round_task);
3630     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3631   }
3632   else
3633   {
3634     GNUNET_break (0);
3635     GNUNET_SERVICE_client_continue (cli_ctx->client);
3636   }
3637   GNUNET_SERVICE_client_continue (cli_ctx->client);
3638 }
3639
3640
3641 /**
3642  * Send out PUSHes and PULLs maliciously.
3643  *
3644  * This is executed regylary.
3645  */
3646 static void
3647 do_mal_round (void *cls)
3648 {
3649   uint32_t num_pushes;
3650   uint32_t i;
3651   struct GNUNET_TIME_Relative time_next_round;
3652   struct AttackedPeer *tmp_att_peer;
3653
3654   LOG (GNUNET_ERROR_TYPE_DEBUG,
3655        "Going to execute next round maliciously type %" PRIu32 ".\n",
3656       mal_type);
3657   do_round_task = NULL;
3658   GNUNET_assert (mal_type <= 3);
3659   /* Do malicious actions */
3660   if (1 == mal_type)
3661   { /* Try to maximise representation */
3662
3663     /* The maximum of pushes we're going to send this round */
3664     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3665                                          num_attacked_peers),
3666                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3667
3668     LOG (GNUNET_ERROR_TYPE_DEBUG,
3669          "Going to send %" PRIu32 " pushes\n",
3670          num_pushes);
3671
3672     /* Send PUSHes to attacked peers */
3673     for (i = 0 ; i < num_pushes ; i++)
3674     {
3675       if (att_peers_tail == att_peer_index)
3676         att_peer_index = att_peers_head;
3677       else
3678         att_peer_index = att_peer_index->next;
3679
3680       send_push (&att_peer_index->peer_id);
3681     }
3682
3683     /* Send PULLs to some peers to learn about additional peers to attack */
3684     tmp_att_peer = att_peer_index;
3685     for (i = 0 ; i < num_pushes * alpha ; i++)
3686     {
3687       if (att_peers_tail == tmp_att_peer)
3688         tmp_att_peer = att_peers_head;
3689       else
3690         att_peer_index = tmp_att_peer->next;
3691
3692       send_pull_request (&tmp_att_peer->peer_id);
3693     }
3694   }
3695
3696
3697   else if (2 == mal_type)
3698   { /**
3699      * Try to partition the network
3700      * Send as many pushes to the attacked peer as possible
3701      * That is one push per round as it will ignore more.
3702      */
3703     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3704     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3705       send_push (&attacked_peer);
3706   }
3707
3708
3709   if (3 == mal_type)
3710   { /* Combined attack */
3711
3712     /* Send PUSH to attacked peers */
3713     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3714     {
3715       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3716       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3717       {
3718         LOG (GNUNET_ERROR_TYPE_DEBUG,
3719             "Goding to send push to attacked peer (%s)\n",
3720             GNUNET_i2s (&attacked_peer));
3721         send_push (&attacked_peer);
3722       }
3723     }
3724     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3725
3726     /* The maximum of pushes we're going to send this round */
3727     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3728                                          num_attacked_peers),
3729                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3730
3731     LOG (GNUNET_ERROR_TYPE_DEBUG,
3732          "Going to send %" PRIu32 " pushes\n",
3733          num_pushes);
3734
3735     for (i = 0; i < num_pushes; i++)
3736     {
3737       if (att_peers_tail == att_peer_index)
3738         att_peer_index = att_peers_head;
3739       else
3740         att_peer_index = att_peer_index->next;
3741
3742       send_push (&att_peer_index->peer_id);
3743     }
3744
3745     /* Send PULLs to some peers to learn about additional peers to attack */
3746     tmp_att_peer = att_peer_index;
3747     for (i = 0; i < num_pushes * alpha; i++)
3748     {
3749       if (att_peers_tail == tmp_att_peer)
3750         tmp_att_peer = att_peers_head;
3751       else
3752         att_peer_index = tmp_att_peer->next;
3753
3754       send_pull_request (&tmp_att_peer->peer_id);
3755     }
3756   }
3757
3758   /* Schedule next round */
3759   time_next_round = compute_rand_delay (round_interval, 2);
3760
3761   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3762   //NULL);
3763   GNUNET_assert (NULL == do_round_task);
3764   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3765                                                 &do_mal_round, NULL);
3766   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3767 }
3768 #endif /* ENABLE_MALICIOUS */
3769
3770 /**
3771  * Send out PUSHes and PULLs, possibly update #view, samplers.
3772  *
3773  * This is executed regylary.
3774  */
3775 static void
3776 do_round (void *cls)
3777 {
3778   uint32_t i;
3779   const struct GNUNET_PeerIdentity *view_array;
3780   unsigned int *permut;
3781   unsigned int a_peers; /* Number of peers we send pushes to */
3782   unsigned int b_peers; /* Number of peers we send pull requests to */
3783   uint32_t first_border;
3784   uint32_t second_border;
3785   struct GNUNET_PeerIdentity peer;
3786   struct GNUNET_PeerIdentity *update_peer;
3787
3788   LOG (GNUNET_ERROR_TYPE_DEBUG,
3789        "Going to execute next round.\n");
3790   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3791   do_round_task = NULL;
3792   LOG (GNUNET_ERROR_TYPE_DEBUG,
3793        "Printing view:\n");
3794   to_file (file_name_view_log,
3795            "___ new round ___");
3796   view_array = View_get_as_array ();
3797   for (i = 0; i < View_size (); i++)
3798   {
3799     LOG (GNUNET_ERROR_TYPE_DEBUG,
3800          "\t%s\n", GNUNET_i2s (&view_array[i]));
3801     to_file (file_name_view_log,
3802              "=%s\t(do round)",
3803              GNUNET_i2s_full (&view_array[i]));
3804   }
3805
3806
3807   /* Send pushes and pull requests */
3808   if (0 < View_size ())
3809   {
3810     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3811                                            View_size ());
3812
3813     /* Send PUSHes */
3814     a_peers = ceil (alpha * View_size ());
3815
3816     LOG (GNUNET_ERROR_TYPE_DEBUG,
3817          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3818          a_peers, alpha, View_size ());
3819     for (i = 0; i < a_peers; i++)
3820     {
3821       peer = view_array[permut[i]];
3822       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3823       { // FIXME if this fails schedule/loop this for later
3824         send_push (&peer);
3825       }
3826     }
3827
3828     /* Send PULL requests */
3829     b_peers = ceil (beta * View_size ());
3830     first_border = a_peers;
3831     second_border = a_peers + b_peers;
3832     if (second_border > View_size ())
3833     {
3834       first_border = View_size () - b_peers;
3835       second_border = View_size ();
3836     }
3837     LOG (GNUNET_ERROR_TYPE_DEBUG,
3838         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3839         b_peers, beta, View_size ());
3840     for (i = first_border; i < second_border; i++)
3841     {
3842       peer = view_array[permut[i]];
3843       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3844           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3845       { // FIXME if this fails schedule/loop this for later
3846         send_pull_request (&peer);
3847       }
3848     }
3849
3850     GNUNET_free (permut);
3851     permut = NULL;
3852   }
3853
3854
3855   /* Update view */
3856   /* TODO see how many peers are in push-/pull- list! */
3857
3858   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3859       (0 < CustomPeerMap_size (push_map)) &&
3860       (0 < CustomPeerMap_size (pull_map)))
3861   //if (GNUNET_YES) // disable blocking temporarily
3862   { /* If conditions for update are fulfilled, update */
3863     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3864
3865     uint32_t final_size;
3866     uint32_t peers_to_clean_size;
3867     struct GNUNET_PeerIdentity *peers_to_clean;
3868
3869     peers_to_clean = NULL;
3870     peers_to_clean_size = 0;
3871     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3872     GNUNET_memcpy (peers_to_clean,
3873             view_array,
3874             View_size () * sizeof (struct GNUNET_PeerIdentity));
3875
3876     /* Seems like recreating is the easiest way of emptying the peermap */
3877     View_clear ();
3878     to_file (file_name_view_log,
3879              "--- emptied ---");
3880
3881     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3882                                 CustomPeerMap_size (push_map));
3883     second_border = first_border +
3884                     GNUNET_MIN (floor (beta  * view_size_est_need),
3885                                 CustomPeerMap_size (pull_map));
3886     final_size    = second_border +
3887       ceil ((1 - (alpha + beta)) * view_size_est_need);
3888     LOG (GNUNET_ERROR_TYPE_DEBUG,
3889         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3890         first_border,
3891         second_border,
3892         final_size);
3893
3894     /* Update view with peers received through PUSHes */
3895     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3896                                            CustomPeerMap_size (push_map));
3897     for (i = 0; i < first_border; i++)
3898     {
3899       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3900                                                               permut[i]));
3901       to_file (file_name_view_log,
3902                "+%s\t(push list)",
3903                GNUNET_i2s_full (&view_array[i]));
3904       // TODO change the peer_flags accordingly
3905     }
3906     GNUNET_free (permut);
3907     permut = NULL;
3908
3909     /* Update view with peers received through PULLs */
3910     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3911                                            CustomPeerMap_size (pull_map));
3912     for (i = first_border; i < second_border; i++)
3913     {
3914       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3915             permut[i - first_border]));
3916       to_file (file_name_view_log,
3917                "+%s\t(pull list)",
3918                GNUNET_i2s_full (&view_array[i]));
3919       // TODO change the peer_flags accordingly
3920     }
3921     GNUNET_free (permut);
3922     permut = NULL;
3923
3924     /* Update view with peers from history */
3925     RPS_sampler_get_n_rand_peers (prot_sampler,
3926                                   hist_update,
3927                                   NULL,
3928                                   final_size - second_border);
3929     // TODO change the peer_flags accordingly
3930
3931     for (i = 0; i < View_size (); i++)
3932       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3933
3934     /* Clean peers that were removed from the view */
3935     for (i = 0; i < peers_to_clean_size; i++)
3936     {
3937       to_file (file_name_view_log,
3938                "-%s",
3939                GNUNET_i2s_full (&peers_to_clean[i]));
3940       clean_peer (&peers_to_clean[i]);
3941       //peer_destroy_channel_send (sender);
3942     }
3943
3944     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3945     clients_notify_view_update();
3946   } else {
3947     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3948     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3949     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3950         !(0 >= CustomPeerMap_size (pull_map)))
3951       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3952     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3953         (0 >= CustomPeerMap_size (pull_map)))
3954       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3955     if (0 >= CustomPeerMap_size (push_map) &&
3956         !(0 >= CustomPeerMap_size (pull_map)))
3957       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3958     if (0 >= CustomPeerMap_size (push_map) &&
3959         (0 >= CustomPeerMap_size (pull_map)))
3960       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3961     if (0 >= CustomPeerMap_size (pull_map) &&
3962         CustomPeerMap_size (push_map) > alpha * View_size () &&
3963         0 >= CustomPeerMap_size (push_map))
3964       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3965   }
3966   // TODO independent of that also get some peers from CADET_get_peers()?
3967   GNUNET_STATISTICS_set (stats,
3968       "# peers in push map at end of round",
3969       CustomPeerMap_size (push_map),
3970       GNUNET_NO);
3971   GNUNET_STATISTICS_set (stats,
3972       "# peers in pull map at end of round",
3973       CustomPeerMap_size (pull_map),
3974       GNUNET_NO);
3975   GNUNET_STATISTICS_set (stats,
3976       "# peers in view at end of round",
3977       View_size (),
3978       GNUNET_NO);
3979
3980   LOG (GNUNET_ERROR_TYPE_DEBUG,
3981        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3982        CustomPeerMap_size (push_map),
3983        CustomPeerMap_size (pull_map),
3984        alpha,
3985        View_size (),
3986        alpha * View_size ());
3987
3988   /* Update samplers */
3989   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3990   {
3991     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3992     LOG (GNUNET_ERROR_TYPE_DEBUG,
3993          "Updating with peer %s from push list\n",
3994          GNUNET_i2s (update_peer));
3995     insert_in_sampler (NULL, update_peer);
3996     clean_peer (update_peer); /* This cleans only if it is not in the view */
3997     //peer_destroy_channel_send (sender);
3998   }
3999
4000   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
4001   {
4002     LOG (GNUNET_ERROR_TYPE_DEBUG,
4003          "Updating with peer %s from pull list\n",
4004          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
4005     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
4006     /* This cleans only if it is not in the view */
4007     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
4008     //peer_destroy_channel_send (sender);
4009   }
4010
4011
4012   /* Empty push/pull lists */
4013   CustomPeerMap_clear (push_map);
4014   CustomPeerMap_clear (pull_map);
4015
4016   struct GNUNET_TIME_Relative time_next_round;
4017
4018   time_next_round = compute_rand_delay (round_interval, 2);
4019
4020   /* Schedule next round */
4021   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4022                                                 &do_round, NULL);
4023   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4024 }
4025
4026
4027 /**
4028  * This is called from GNUNET_CADET_get_peers().
4029  *
4030  * It is called on every peer(ID) that cadet somehow has contact with.
4031  * We use those to initialise the sampler.
4032  */
4033 void
4034 init_peer_cb (void *cls,
4035               const struct GNUNET_PeerIdentity *peer,
4036               int tunnel, // "Do we have a tunnel towards this peer?"
4037               unsigned int n_paths, // "Number of known paths towards this peer"
4038               unsigned int best_path) // "How long is the best path?
4039                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4040 {
4041   if (NULL != peer)
4042   {
4043     LOG (GNUNET_ERROR_TYPE_DEBUG,
4044          "Got peer_id %s from cadet\n",
4045          GNUNET_i2s (peer));
4046     got_peer (peer);
4047   }
4048 }
4049
4050 /**
4051  * @brief Iterator function over stored, valid peers.
4052  *
4053  * We initialise the sampler with those.
4054  *
4055  * @param cls the closure
4056  * @param peer the peer id
4057  * @return #GNUNET_YES if we should continue to
4058  *         iterate,
4059  *         #GNUNET_NO if not.
4060  */
4061 static int
4062 valid_peers_iterator (void *cls,
4063                       const struct GNUNET_PeerIdentity *peer)
4064 {
4065   if (NULL != peer)
4066   {
4067     LOG (GNUNET_ERROR_TYPE_DEBUG,
4068          "Got stored, valid peer %s\n",
4069          GNUNET_i2s (peer));
4070     got_peer (peer);
4071   }
4072   return GNUNET_YES;
4073 }
4074
4075
4076 /**
4077  * Iterator over peers from peerinfo.
4078  *
4079  * @param cls closure
4080  * @param peer id of the peer, NULL for last call
4081  * @param hello hello message for the peer (can be NULL)
4082  * @param error message
4083  */
4084 void
4085 process_peerinfo_peers (void *cls,
4086                         const struct GNUNET_PeerIdentity *peer,
4087                         const struct GNUNET_HELLO_Message *hello,
4088                         const char *err_msg)
4089 {
4090   if (NULL != peer)
4091   {
4092     LOG (GNUNET_ERROR_TYPE_DEBUG,
4093          "Got peer_id %s from peerinfo\n",
4094          GNUNET_i2s (peer));
4095     got_peer (peer);
4096   }
4097 }
4098
4099
4100 /**
4101  * Task run during shutdown.
4102  *
4103  * @param cls unused
4104  */
4105 static void
4106 shutdown_task (void *cls)
4107 {
4108   struct ClientContext *client_ctx;
4109   struct ReplyCls *reply_cls;
4110
4111   LOG (GNUNET_ERROR_TYPE_DEBUG,
4112        "RPS is going down\n");
4113
4114   /* Clean all clients */
4115   for (client_ctx = cli_ctx_head;
4116        NULL != cli_ctx_head;
4117        client_ctx = cli_ctx_head)
4118   {
4119     /* Clean pending requests to the sampler */
4120     for (reply_cls = client_ctx->rep_cls_head;
4121          NULL != client_ctx->rep_cls_head;
4122          reply_cls = client_ctx->rep_cls_head)
4123     {
4124       RPS_sampler_request_cancel (reply_cls->req_handle);
4125       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4126                                    client_ctx->rep_cls_tail,
4127                                    reply_cls);
4128       GNUNET_free (reply_cls);
4129     }
4130     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4131     GNUNET_free (client_ctx);
4132   }
4133   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4134   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4135
4136   if (NULL != do_round_task)
4137   {
4138     GNUNET_SCHEDULER_cancel (do_round_task);
4139     do_round_task = NULL;
4140   }
4141
4142   Peers_terminate ();
4143
4144   GNUNET_NSE_disconnect (nse);
4145   RPS_sampler_destroy (prot_sampler);
4146   RPS_sampler_destroy (client_sampler);
4147   GNUNET_CADET_close_port (cadet_port);
4148   GNUNET_CADET_disconnect (cadet_handle);
4149   View_destroy ();
4150   CustomPeerMap_destroy (push_map);
4151   CustomPeerMap_destroy (pull_map);
4152   if (NULL != stats)
4153   {
4154     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4155     stats = NULL;
4156   }
4157   #ifdef ENABLE_MALICIOUS
4158   struct AttackedPeer *tmp_att_peer;
4159   /* it is ok to free this const during shutdown: */
4160   GNUNET_free ((char *) file_name_view_log);
4161   #ifdef TO_FILE
4162   GNUNET_free ((char *) file_name_observed_log);
4163   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
4164   #endif /* TO_FILE */
4165   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4166   if (NULL != mal_peer_set)
4167     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4168   if (NULL != att_peer_set)
4169     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4170   while (NULL != att_peers_head)
4171   {
4172     tmp_att_peer = att_peers_head;
4173     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4174   }
4175   #endif /* ENABLE_MALICIOUS */
4176 }
4177
4178
4179 /**
4180  * Handle client connecting to the service.
4181  *
4182  * @param cls NULL
4183  * @param client the new client
4184  * @param mq the message queue of @a client
4185  * @return @a client
4186  */
4187 static void *
4188 client_connect_cb (void *cls,
4189                    struct GNUNET_SERVICE_Client *client,
4190                    struct GNUNET_MQ_Handle *mq)
4191 {
4192   struct ClientContext *cli_ctx;
4193
4194   LOG (GNUNET_ERROR_TYPE_DEBUG,
4195        "Client connected\n");
4196   if (NULL == client)
4197     return client; /* Server was destroyed before a client connected. Shutting down */
4198   cli_ctx = GNUNET_new (struct ClientContext);
4199   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4200   cli_ctx->view_updates_left = -1;
4201   cli_ctx->client = client;
4202   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4203                                cli_ctx_tail,
4204                                cli_ctx);
4205   return cli_ctx;
4206 }
4207
4208 /**
4209  * Callback called when a client disconnected from the service
4210  *
4211  * @param cls closure for the service
4212  * @param c the client that disconnected
4213  * @param internal_cls should be equal to @a c
4214  */
4215 static void
4216 client_disconnect_cb (void *cls,
4217                       struct GNUNET_SERVICE_Client *client,
4218                       void *internal_cls)
4219 {
4220   struct ClientContext *cli_ctx = internal_cls;
4221
4222   GNUNET_assert (client == cli_ctx->client);
4223   if (NULL == client)
4224   {/* shutdown task - destroy all clients */
4225     while (NULL != cli_ctx_head)
4226       destroy_cli_ctx (cli_ctx_head);
4227   }
4228   else
4229   { /* destroy this client */
4230     LOG (GNUNET_ERROR_TYPE_DEBUG,
4231         "Client disconnected. Destroy its context.\n");
4232     destroy_cli_ctx (cli_ctx);
4233   }
4234 }
4235
4236
4237 /**
4238  * Handle random peer sampling clients.
4239  *
4240  * @param cls closure
4241  * @param c configuration to use
4242  * @param service the initialized service
4243  */
4244 static void
4245 run (void *cls,
4246      const struct GNUNET_CONFIGURATION_Handle *c,
4247      struct GNUNET_SERVICE_Handle *service)
4248 {
4249   char* fn_valid_peers;
4250   struct GNUNET_HashCode port;
4251
4252   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4253   cfg = c;
4254
4255
4256   /* Get own ID */
4257   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4258   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4259               "STARTING SERVICE (rps) for peer [%s]\n",
4260               GNUNET_i2s (&own_identity));
4261   #ifdef ENABLE_MALICIOUS
4262   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4263               "Malicious execution compiled in.\n");
4264   #endif /* ENABLE_MALICIOUS */
4265
4266
4267
4268   /* Get time interval from the configuration */
4269   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4270                                                         "ROUNDINTERVAL",
4271                                                         &round_interval))
4272   {
4273     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4274                                "RPS", "ROUNDINTERVAL");
4275     GNUNET_SCHEDULER_shutdown ();
4276     return;
4277   }
4278
4279   /* Get initial size of sampler/view from the configuration */
4280   if (GNUNET_OK !=
4281       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4282         (long long unsigned int *) &sampler_size_est_min))
4283   {
4284     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4285                                "RPS", "MINSIZE");
4286     GNUNET_SCHEDULER_shutdown ();
4287     return;
4288   }
4289   sampler_size_est_need = sampler_size_est_min;
4290   view_size_est_min = sampler_size_est_min;
4291   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4292
4293   if (GNUNET_OK !=
4294       GNUNET_CONFIGURATION_get_value_filename (cfg,
4295                                                "rps",
4296                                                "FILENAME_VALID_PEERS",
4297                                                &fn_valid_peers))
4298   {
4299     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4300                                "rps", "FILENAME_VALID_PEERS");
4301   }
4302
4303
4304   View_create (view_size_est_min);
4305
4306   /* file_name_view_log */
4307   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4308   #ifdef TO_FILE
4309   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4310   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4311   #endif /* TO_FILE */
4312
4313   /* connect to NSE */
4314   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4315
4316
4317   alpha = 0.45;
4318   beta  = 0.45;
4319
4320
4321   /* Initialise cadet */
4322   /* There exists a copy-paste-clone in get_channel() */
4323   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4324     GNUNET_MQ_hd_fixed_size (peer_check,
4325                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4326                              struct GNUNET_MessageHeader,
4327                              NULL),
4328     GNUNET_MQ_hd_fixed_size (peer_push,
4329                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4330                              struct GNUNET_MessageHeader,
4331                              NULL),
4332     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4333                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4334                              struct GNUNET_MessageHeader,
4335                              NULL),
4336     GNUNET_MQ_hd_var_size (peer_pull_reply,
4337                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4338                            struct GNUNET_RPS_P2P_PullReplyMessage,
4339                            NULL),
4340     GNUNET_MQ_handler_end ()
4341   };
4342
4343   cadet_handle = GNUNET_CADET_connect (cfg);
4344   GNUNET_assert (NULL != cadet_handle);
4345   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4346                       strlen (GNUNET_APPLICATION_PORT_RPS),
4347                       &port);
4348   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4349                                        &port,
4350                                        &Peers_handle_inbound_channel, /* Connect handler */
4351                                        NULL, /* cls */
4352                                        NULL, /* WindowSize handler */
4353                                        cleanup_destroyed_channel, /* Disconnect handler */
4354                                        cadet_handlers);
4355
4356
4357   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4358   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4359   GNUNET_free (fn_valid_peers);
4360
4361   /* Initialise sampler */
4362   struct GNUNET_TIME_Relative half_round_interval;
4363   struct GNUNET_TIME_Relative  max_round_interval;
4364
4365   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4366   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4367
4368   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4369   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4370
4371   /* Initialise push and pull maps */
4372   push_map = CustomPeerMap_create (4);
4373   pull_map = CustomPeerMap_create (4);
4374
4375
4376   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4377   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4378   // TODO send push/pull to each of those peers?
4379   // TODO read stored valid peers from last run
4380   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4381   Peers_get_valid_peers (valid_peers_iterator, NULL);
4382
4383   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4384                                                    GNUNET_NO,
4385                                                    process_peerinfo_peers,
4386                                                    NULL);
4387
4388   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4389
4390   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4391   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4392
4393   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4394   stats = GNUNET_STATISTICS_create ("rps", cfg);
4395
4396 }
4397
4398
4399 /**
4400  * Define "main" method using service macro.
4401  */
4402 GNUNET_SERVICE_MAIN
4403 ("rps",
4404  GNUNET_SERVICE_OPTION_NONE,
4405  &run,
4406  &client_connect_cb,
4407  &client_disconnect_cb,
4408  NULL,
4409  GNUNET_MQ_hd_fixed_size (client_request,
4410    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4411    struct GNUNET_RPS_CS_RequestMessage,
4412    NULL),
4413  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4414    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4415    struct GNUNET_RPS_CS_RequestCancelMessage,
4416    NULL),
4417  GNUNET_MQ_hd_var_size (client_seed,
4418    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4419    struct GNUNET_RPS_CS_SeedMessage,
4420    NULL),
4421 #ifdef ENABLE_MALICIOUS
4422  GNUNET_MQ_hd_var_size (client_act_malicious,
4423    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4424    struct GNUNET_RPS_CS_ActMaliciousMessage,
4425    NULL),
4426 #endif /* ENABLE_MALICIOUS */
4427  GNUNET_MQ_hd_fixed_size (client_view_request,
4428    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4429    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4430    NULL),
4431  GNUNET_MQ_handler_end());
4432
4433 /* end of gnunet-service-rps.c */