add some extra GNS-record well-formedness checks if logging is enabled
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "rps.h"
34 #include "rps-test_util.h"
35 #include "gnunet-service-rps_sampler.h"
36 #include "gnunet-service-rps_custommap.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Handle to the statistics service.
65  */
66 static struct GNUNET_STATISTICS_Handle *stats;
67
68 /**
69  * Our own identity.
70  */
71 static struct GNUNET_PeerIdentity own_identity;
72
73
74
75 /***********************************************************************
76  * Old gnunet-service-rps_peers.c
77 ***********************************************************************/
78
79 /**
80  * Set a peer flag of given peer context.
81  */
82 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
83
84 /**
85  * Get peer flag of given peer context.
86  */
87 #define check_peer_flag_set(peer_ctx, mask)\
88   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
89
90 /**
91  * Unset flag of given peer context.
92  */
93 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
94
95 /**
96  * Set a channel flag of given channel context.
97  */
98 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * Struct used to keep track of other peer's status
164  *
165  * This is stored in a multipeermap.
166  * It contains information such as cadet channels, a message queue for sending,
167  * status about the channels, the pending operations on this peer and some flags
168  * about the status of the peer itself. (live, valid, ...)
169  */
170 struct PeerContext
171 {
172   /**
173    * Message queue open to client
174    */
175   struct GNUNET_MQ_Handle *mq;
176
177   /**
178    * Channel open to client.
179    */
180   struct GNUNET_CADET_Channel *send_channel;
181
182   /**
183    * Flags to the sending channel
184    */
185   uint32_t *send_channel_flags;
186
187   /**
188    * Channel open from client.
189    */
190   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
191
192   /**
193    * Flags to the receiving channel
194    */
195   uint32_t *recv_channel_flags;
196
197   /**
198    * Array of pending operations on this peer.
199    */
200   struct PeerPendingOp *pending_ops;
201
202   /**
203    * Handle to the callback given to cadet_ntfy_tmt_rdy()
204    *
205    * To be canceled on shutdown.
206    */
207   struct PendingMessage *liveliness_check_pending;
208
209   /**
210    * Number of pending operations.
211    */
212   unsigned int num_pending_ops;
213
214   /**
215    * Identity of the peer
216    */
217   struct GNUNET_PeerIdentity peer_id;
218
219   /**
220    * Flags indicating status of peer
221    */
222   uint32_t peer_flags;
223
224   /**
225    * Last time we received something from that peer.
226    */
227   struct GNUNET_TIME_Absolute last_message_recv;
228
229   /**
230    * Last time we received a keepalive message.
231    */
232   struct GNUNET_TIME_Absolute last_keepalive;
233
234   /**
235    * DLL with all messages that are yet to be sent
236    */
237   struct PendingMessage *pending_messages_head;
238   struct PendingMessage *pending_messages_tail;
239
240   /**
241    * This is pobably followed by 'statistical' data (when we first saw
242    * him, how did we get his ID, how many pushes (in a timeinterval),
243    * ...)
244    */
245 };
246
247 /**
248  * @brief Closure to #valid_peer_iterator
249  */
250 struct PeersIteratorCls
251 {
252   /**
253    * Iterator function
254    */
255   PeersIterator iterator;
256
257   /**
258    * Closure to iterator
259    */
260   void *cls;
261 };
262
263 /**
264  * @brief Hashmap of valid peers.
265  */
266 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
267
268 /**
269  * @brief Maximum number of valid peers to keep.
270  * TODO read from config
271  */
272 static uint32_t num_valid_peers_max = UINT32_MAX;
273
274 /**
275  * @brief Filename of the file that stores the valid peers persistently.
276  */
277 static char *filename_valid_peers;
278
279 /**
280  * Set of all peers to keep track of them.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
283
284 /**
285  * Cadet handle.
286  */
287 static struct GNUNET_CADET_Handle *cadet_handle;
288
289
290
291 /**
292  * @brief Get the #PeerContext associated with a peer
293  *
294  * @param peer the peer id
295  *
296  * @return the #PeerContext
297  */
298 static struct PeerContext *
299 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
300 {
301   struct PeerContext *ctx;
302   int ret;
303
304   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
305   GNUNET_assert (GNUNET_YES == ret);
306   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
307   GNUNET_assert (NULL != ctx);
308   return ctx;
309 }
310
311 int
312 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
313
314 /**
315  * @brief Create a new #PeerContext and insert it into the peer map
316  *
317  * @param peer the peer to create the #PeerContext for
318  *
319  * @return the #PeerContext
320  */
321 static struct PeerContext *
322 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
323 {
324   struct PeerContext *ctx;
325   int ret;
326
327   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
328
329   ctx = GNUNET_new (struct PeerContext);
330   ctx->peer_id = *peer;
331   ctx->send_channel_flags = GNUNET_new (uint32_t);
332   ctx->recv_channel_flags = GNUNET_new (uint32_t);
333   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
334       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
335   GNUNET_assert (GNUNET_OK == ret);
336   return ctx;
337 }
338
339
340 /**
341  * @brief Create or get a #PeerContext
342  *
343  * @param peer the peer to get the associated context to
344  *
345  * @return the context
346  */
347 static struct PeerContext *
348 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
349 {
350   if (GNUNET_NO == Peers_check_peer_known (peer))
351   {
352     return create_peer_ctx (peer);
353   }
354   return get_peer_ctx (peer);
355 }
356
357 void
358 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
359
360 void
361 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
362
363 /**
364  * @brief Check whether we have a connection to this @a peer
365  *
366  * Also sets the #Peers_ONLINE flag accordingly
367  *
368  * @param peer the peer in question
369  *
370  * @return #GNUNET_YES if we are connected
371  *         #GNUNET_NO  otherwise
372  */
373 int
374 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
375 {
376   const struct PeerContext *peer_ctx;
377
378   /* If we don't know about this peer we don't know whether it's online */
379   if (GNUNET_NO == Peers_check_peer_known (peer))
380   {
381     return GNUNET_NO;
382   }
383   /* Get the context */
384   peer_ctx = get_peer_ctx (peer);
385   /* If we have no channel to this peer we don't know whether it's online */
386   if ( (NULL == peer_ctx->send_channel) &&
387        (NULL == peer_ctx->recv_channel) )
388   {
389     Peers_unset_peer_flag (peer, Peers_ONLINE);
390     return GNUNET_NO;
391   }
392   /* Otherwise (if we have a channel, we know that it's online */
393   Peers_set_peer_flag (peer, Peers_ONLINE);
394   return GNUNET_YES;
395 }
396
397
398 /**
399  * @brief The closure to #get_rand_peer_iterator.
400  */
401 struct GetRandPeerIteratorCls
402 {
403   /**
404    * @brief The index of the peer to return.
405    * Will be decreased until 0.
406    * Then current peer is returned.
407    */
408   uint32_t index;
409
410   /**
411    * @brief Pointer to peer to return.
412    */
413   const struct GNUNET_PeerIdentity *peer;
414 };
415
416
417 /**
418  * @brief Iterator function for #get_random_peer_from_peermap.
419  *
420  * Implements #GNUNET_CONTAINER_PeerMapIterator.
421  * Decreases the index until the index is null.
422  * Then returns the current peer.
423  *
424  * @param cls the #GetRandPeerIteratorCls containing index and peer
425  * @param peer current peer
426  * @param value unused
427  *
428  * @return  #GNUNET_YES if we should continue to
429  *          iterate,
430  *          #GNUNET_NO if not.
431  */
432 static int
433 get_rand_peer_iterator (void *cls,
434                         const struct GNUNET_PeerIdentity *peer,
435                         void *value)
436 {
437   struct GetRandPeerIteratorCls *iterator_cls = cls;
438   if (0 >= iterator_cls->index)
439   {
440     iterator_cls->peer = peer;
441     return GNUNET_NO;
442   }
443   iterator_cls->index--;
444   return GNUNET_YES;
445 }
446
447
448 /**
449  * @brief Get a random peer from @a peer_map
450  *
451  * @param peer_map the peer_map to get the peer from
452  *
453  * @return a random peer
454  */
455 static const struct GNUNET_PeerIdentity *
456 get_random_peer_from_peermap (const struct
457                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
458 {
459   struct GetRandPeerIteratorCls *iterator_cls;
460   const struct GNUNET_PeerIdentity *ret;
461
462   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
463   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
464       GNUNET_CONTAINER_multipeermap_size (peer_map));
465   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
466                                                 get_rand_peer_iterator,
467                                                 iterator_cls);
468   ret = iterator_cls->peer;
469   GNUNET_free (iterator_cls);
470   return ret;
471 }
472
473
474 /**
475  * @brief Add a given @a peer to valid peers.
476  *
477  * If valid peers are already #num_valid_peers_max, delete a peer previously.
478  *
479  * @param peer the peer that is added to the valid peers.
480  *
481  * @return #GNUNET_YES if no other peer had to be removed
482  *         #GNUNET_NO  otherwise
483  */
484 static int
485 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
486 {
487   const struct GNUNET_PeerIdentity *rand_peer;
488   int ret;
489
490   ret = GNUNET_YES;
491   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
492   {
493     rand_peer = get_random_peer_from_peermap (valid_peers);
494     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
495     ret = GNUNET_NO;
496   }
497   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
498       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
499   return ret;
500 }
501
502
503 /**
504  * @brief Set the peer flag to living and
505  *        call the pending operations on this peer.
506  *
507  * Also adds peer to #valid_peers.
508  *
509  * @param peer_ctx the #PeerContext of the peer to set live
510  */
511 static void
512 set_peer_live (struct PeerContext *peer_ctx)
513 {
514   struct GNUNET_PeerIdentity *peer;
515   unsigned int i;
516
517   peer = &peer_ctx->peer_id;
518   LOG (GNUNET_ERROR_TYPE_DEBUG,
519       "Peer %s is live and valid, calling %i pending operations on it\n",
520       GNUNET_i2s (peer),
521       peer_ctx->num_pending_ops);
522
523   if (NULL != peer_ctx->liveliness_check_pending)
524   {
525     LOG (GNUNET_ERROR_TYPE_DEBUG,
526          "Removing pending liveliness check for peer %s\n",
527          GNUNET_i2s (&peer_ctx->peer_id));
528     // TODO wait until cadet sets mq->cancel_impl
529     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
530     GNUNET_free (peer_ctx->liveliness_check_pending);
531     peer_ctx->liveliness_check_pending = NULL;
532   }
533
534   (void) add_valid_peer (peer);
535   set_peer_flag (peer_ctx, Peers_ONLINE);
536
537   /* Call pending operations */
538   for (i = 0; i < peer_ctx->num_pending_ops; i++)
539   {
540     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
541   }
542   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
543 }
544
545 static void
546 cleanup_destroyed_channel (void *cls,
547                            const struct GNUNET_CADET_Channel *channel);
548
549 /* Declaration of handlers */
550 static void
551 handle_peer_check (void *cls,
552                    const struct GNUNET_MessageHeader *msg);
553
554 static void
555 handle_peer_push (void *cls,
556                   const struct GNUNET_MessageHeader *msg);
557
558 static void
559 handle_peer_pull_request (void *cls,
560                           const struct GNUNET_MessageHeader *msg);
561
562 static int
563 check_peer_pull_reply (void *cls,
564                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
565
566 static void
567 handle_peer_pull_reply (void *cls,
568                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
569
570 /* End declaration of handlers */
571
572
573 /**
574  * @brief Get the channel of a peer. If not existing, create.
575  *
576  * @param peer the peer id
577  * @return the #GNUNET_CADET_Channel used to send data to @a peer
578  */
579 struct GNUNET_CADET_Channel *
580 get_channel (const struct GNUNET_PeerIdentity *peer)
581 {
582   struct PeerContext *peer_ctx;
583   struct GNUNET_HashCode port;
584   struct GNUNET_PeerIdentity *ctx_peer;
585   /* There exists a copy-paste-clone in run() */
586   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
587     GNUNET_MQ_hd_fixed_size (peer_check,
588                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
589                              struct GNUNET_MessageHeader,
590                              NULL),
591     GNUNET_MQ_hd_fixed_size (peer_push,
592                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
593                              struct GNUNET_MessageHeader,
594                              NULL),
595     GNUNET_MQ_hd_fixed_size (peer_pull_request,
596                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
597                              struct GNUNET_MessageHeader,
598                              NULL),
599     GNUNET_MQ_hd_var_size (peer_pull_reply,
600                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
601                            struct GNUNET_RPS_P2P_PullReplyMessage,
602                            NULL),
603     GNUNET_MQ_handler_end ()
604   };
605
606
607   peer_ctx = get_peer_ctx (peer);
608   if (NULL == peer_ctx->send_channel)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Trying to establish channel to peer %s\n",
612          GNUNET_i2s (peer));
613     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
614                         strlen (GNUNET_APPLICATION_PORT_RPS),
615                         &port);
616     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
617     *ctx_peer = *peer;
618     peer_ctx->send_channel =
619       GNUNET_CADET_channel_create (cadet_handle,
620                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
621                                    peer,
622                                    &port,
623                                    GNUNET_CADET_OPTION_RELIABLE,
624                                    NULL, /* WindowSize handler */
625                                    cleanup_destroyed_channel, /* Disconnect handler */
626                                    cadet_handlers);
627   }
628   GNUNET_assert (NULL != peer_ctx->send_channel);
629   return peer_ctx->send_channel;
630 }
631
632
633 /**
634  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
635  *
636  * If we already have a message queue open to this client,
637  * simply return it, otherways create one.
638  *
639  * @param peer the peer to get the mq to
640  * @return the #GNUNET_MQ_Handle
641  */
642 static struct GNUNET_MQ_Handle *
643 get_mq (const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *peer_ctx;
646
647   peer_ctx = get_peer_ctx (peer);
648
649   if (NULL == peer_ctx->mq)
650   {
651     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
652   }
653   return peer_ctx->mq;
654 }
655
656
657 /**
658  * @brief This is called in response to the first message we sent as a
659  * liveliness check.
660  *
661  * @param cls #PeerContext of peer with pending liveliness check
662  */
663 static void
664 mq_liveliness_check_successful (void *cls)
665 {
666   struct PeerContext *peer_ctx = cls;
667
668   if (NULL != peer_ctx->liveliness_check_pending)
669   {
670     LOG (GNUNET_ERROR_TYPE_DEBUG,
671         "Liveliness check for peer %s was successfull\n",
672         GNUNET_i2s (&peer_ctx->peer_id));
673     GNUNET_free (peer_ctx->liveliness_check_pending);
674     peer_ctx->liveliness_check_pending = NULL;
675     set_peer_live (peer_ctx);
676   }
677 }
678
679 /**
680  * Issue a check whether peer is live
681  *
682  * @param peer_ctx the context of the peer
683  */
684 static void
685 check_peer_live (struct PeerContext *peer_ctx)
686 {
687   LOG (GNUNET_ERROR_TYPE_DEBUG,
688        "Get informed about peer %s getting live\n",
689        GNUNET_i2s (&peer_ctx->peer_id));
690
691   struct GNUNET_MQ_Handle *mq;
692   struct GNUNET_MQ_Envelope *ev;
693
694   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
695   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
696   peer_ctx->liveliness_check_pending->ev = ev;
697   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
698   peer_ctx->liveliness_check_pending->type = "Check liveliness";
699   mq = get_mq (&peer_ctx->peer_id);
700   GNUNET_MQ_notify_sent (ev,
701                          mq_liveliness_check_successful,
702                          peer_ctx);
703   GNUNET_MQ_send (mq, ev);
704 }
705
706 /**
707  * @brief Add an envelope to a message passed to mq to list of pending messages
708  *
709  * @param peer peer the message was sent to
710  * @param ev envelope to the message
711  * @param type type of the message to be sent
712  * @return pointer to pending message
713  */
714 static struct PendingMessage *
715 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
716                         struct GNUNET_MQ_Envelope *ev,
717                         const char *type)
718 {
719   struct PendingMessage *pending_msg;
720   struct PeerContext *peer_ctx;
721
722   peer_ctx = get_peer_ctx (peer);
723   pending_msg = GNUNET_new (struct PendingMessage);
724   pending_msg->ev = ev;
725   pending_msg->peer_ctx = peer_ctx;
726   pending_msg->type = type;
727   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
728                                peer_ctx->pending_messages_tail,
729                                pending_msg);
730   return pending_msg;
731 }
732
733
734 /**
735  * @brief Remove a pending message from the respective DLL
736  *
737  * @param pending_msg the pending message to remove
738  * @param cancel cancel the pending message, too
739  */
740 static void
741 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
742 {
743   struct PeerContext *peer_ctx;
744
745   peer_ctx = pending_msg->peer_ctx;
746   GNUNET_assert (NULL != peer_ctx);
747   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
748                                peer_ctx->pending_messages_tail,
749                                pending_msg);
750   // TODO wait for the cadet implementation of message cancellation
751   //if (GNUNET_YES == cancel)
752   //{
753   //  GNUNET_MQ_send_cancel (pending_msg->ev);
754   //}
755   GNUNET_free (pending_msg);
756 }
757
758
759 /**
760  * @brief Check whether function of type #PeerOp was already scheduled
761  *
762  * The array with pending operations will probably never grow really big, so
763  * iterating over it should be ok.
764  *
765  * @param peer the peer to check
766  * @param peer_op the operation (#PeerOp) on the peer
767  *
768  * @return #GNUNET_YES if this operation is scheduled on that peer
769  *         #GNUNET_NO  otherwise
770  */
771 static int
772 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
773                            const PeerOp peer_op)
774 {
775   const struct PeerContext *peer_ctx;
776   unsigned int i;
777
778   peer_ctx = get_peer_ctx (peer);
779   for (i = 0; i < peer_ctx->num_pending_ops; i++)
780     if (peer_op == peer_ctx->pending_ops[i].op)
781       return GNUNET_YES;
782   return GNUNET_NO;
783 }
784
785 int
786 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
787
788 /**
789  * Iterator over hash map entries. Deletes all contexts of peers.
790  *
791  * @param cls closure
792  * @param key current public key
793  * @param value value in the hash map
794  * @return #GNUNET_YES if we should continue to iterate,
795  *         #GNUNET_NO if not.
796  */
797 static int
798 peermap_clear_iterator (void *cls,
799                         const struct GNUNET_PeerIdentity *key,
800                         void *value)
801 {
802   Peers_remove_peer (key);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * @brief This is called once a message is sent.
809  *
810  * Removes the pending message
811  *
812  * @param cls type of the message that was sent
813  */
814 static void
815 mq_notify_sent_cb (void *cls)
816 {
817   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819       "%s was sent.\n",
820       pending_msg->type);
821   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
822     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
823   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
824     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
825   if (0 == strncmp ("PUSH", pending_msg->type, 4))
826     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
827   /* Do not cancle message */
828   remove_pending_message (pending_msg, GNUNET_NO);
829 }
830
831
832 /**
833  * @brief Iterator function for #store_valid_peers.
834  *
835  * Implements #GNUNET_CONTAINER_PeerMapIterator.
836  * Writes single peer to disk.
837  *
838  * @param cls the file handle to write to.
839  * @param peer current peer
840  * @param value unused
841  *
842  * @return  #GNUNET_YES if we should continue to
843  *          iterate,
844  *          #GNUNET_NO if not.
845  */
846 static int
847 store_peer_presistently_iterator (void *cls,
848                                   const struct GNUNET_PeerIdentity *peer,
849                                   void *value)
850 {
851   const struct GNUNET_DISK_FileHandle *fh = cls;
852   char peer_string[128];
853   int size;
854   ssize_t ret;
855
856   if (NULL == peer)
857   {
858     return GNUNET_YES;
859   }
860   size = GNUNET_snprintf (peer_string,
861                           sizeof (peer_string),
862                           "%s\n",
863                           GNUNET_i2s_full (peer));
864   GNUNET_assert (53 == size);
865   ret = GNUNET_DISK_file_write (fh,
866                                 peer_string,
867                                 size);
868   GNUNET_assert (size == ret);
869   return GNUNET_YES;
870 }
871
872
873 /**
874  * @brief Store the peers currently in #valid_peers to disk.
875  */
876 static void
877 store_valid_peers ()
878 {
879   struct GNUNET_DISK_FileHandle *fh;
880   uint32_t number_written_peers;
881   int ret;
882
883   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
884   {
885     return;
886   }
887
888   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
889   if (GNUNET_SYSERR == ret)
890   {
891     LOG (GNUNET_ERROR_TYPE_WARNING,
892         "Not able to create directory for file `%s'\n",
893         filename_valid_peers);
894     GNUNET_break (0);
895   }
896   else if (GNUNET_NO == ret)
897   {
898     LOG (GNUNET_ERROR_TYPE_WARNING,
899         "Directory for file `%s' exists but is not writable for us\n",
900         filename_valid_peers);
901     GNUNET_break (0);
902   }
903   fh = GNUNET_DISK_file_open (filename_valid_peers,
904                               GNUNET_DISK_OPEN_WRITE |
905                                   GNUNET_DISK_OPEN_CREATE,
906                               GNUNET_DISK_PERM_USER_READ |
907                                   GNUNET_DISK_PERM_USER_WRITE);
908   if (NULL == fh)
909   {
910     LOG (GNUNET_ERROR_TYPE_WARNING,
911         "Not able to write valid peers to file `%s'\n",
912         filename_valid_peers);
913     return;
914   }
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916       "Writing %u valid peers to disk\n",
917       GNUNET_CONTAINER_multipeermap_size (valid_peers));
918   number_written_peers =
919     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
920                                            store_peer_presistently_iterator,
921                                            fh);
922   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
923   GNUNET_assert (number_written_peers ==
924       GNUNET_CONTAINER_multipeermap_size (valid_peers));
925 }
926
927
928 /**
929  * @brief Convert string representation of peer id to peer id.
930  *
931  * Counterpart to #GNUNET_i2s_full.
932  *
933  * @param string_repr The string representation of the peer id
934  *
935  * @return The peer id
936  */
937 static const struct GNUNET_PeerIdentity *
938 s2i_full (const char *string_repr)
939 {
940   struct GNUNET_PeerIdentity *peer;
941   size_t len;
942   int ret;
943
944   peer = GNUNET_new (struct GNUNET_PeerIdentity);
945   len = strlen (string_repr);
946   if (52 > len)
947   {
948     LOG (GNUNET_ERROR_TYPE_WARNING,
949         "Not able to convert string representation of PeerID to PeerID\n"
950         "Sting representation: %s (len %lu) - too short\n",
951         string_repr,
952         len);
953     GNUNET_break (0);
954   }
955   else if (52 < len)
956   {
957     len = 52;
958   }
959   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
960                                                     len,
961                                                     &peer->public_key);
962   if (GNUNET_OK != ret)
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965         "Not able to convert string representation of PeerID to PeerID\n"
966         "Sting representation: %s\n",
967         string_repr);
968     GNUNET_break (0);
969   }
970   return peer;
971 }
972
973
974 /**
975  * @brief Restore the peers on disk to #valid_peers.
976  */
977 static void
978 restore_valid_peers ()
979 {
980   off_t file_size;
981   uint32_t num_peers;
982   struct GNUNET_DISK_FileHandle *fh;
983   char *buf;
984   ssize_t size_read;
985   char *iter_buf;
986   char *str_repr;
987   const struct GNUNET_PeerIdentity *peer;
988
989   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
990   {
991     return;
992   }
993
994   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
995   {
996     return;
997   }
998   fh = GNUNET_DISK_file_open (filename_valid_peers,
999                               GNUNET_DISK_OPEN_READ,
1000                               GNUNET_DISK_PERM_NONE);
1001   GNUNET_assert (NULL != fh);
1002   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1003   num_peers = file_size / 53;
1004   buf = GNUNET_malloc (file_size);
1005   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1006   GNUNET_assert (size_read == file_size);
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008       "Restoring %" PRIu32 " peers from file `%s'\n",
1009       num_peers,
1010       filename_valid_peers);
1011   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1012   {
1013     str_repr = GNUNET_strndup (iter_buf, 53);
1014     peer = s2i_full (str_repr);
1015     GNUNET_free (str_repr);
1016     add_valid_peer (peer);
1017     LOG (GNUNET_ERROR_TYPE_DEBUG,
1018         "Restored valid peer %s from disk\n",
1019         GNUNET_i2s_full (peer));
1020   }
1021   iter_buf = NULL;
1022   GNUNET_free (buf);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG,
1024       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1025       num_peers,
1026       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1027   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1028   {
1029     LOG (GNUNET_ERROR_TYPE_WARNING,
1030         "Number of restored peers does not match file size. Have probably duplicates.\n");
1031   }
1032   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1033   LOG (GNUNET_ERROR_TYPE_DEBUG,
1034       "Restored %u valid peers from disk\n",
1035       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1036 }
1037
1038
1039 /**
1040  * @brief Initialise storage of peers
1041  *
1042  * @param fn_valid_peers filename of the file used to store valid peer ids
1043  * @param cadet_h cadet handle
1044  * @param own_id own peer identity
1045  */
1046 void
1047 Peers_initialise (char* fn_valid_peers,
1048                   struct GNUNET_CADET_Handle *cadet_h,
1049                   const struct GNUNET_PeerIdentity *own_id)
1050 {
1051   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1052   cadet_handle = cadet_h;
1053   own_identity = *own_id;
1054   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1055   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1056   restore_valid_peers ();
1057 }
1058
1059
1060 /**
1061  * @brief Delete storage of peers that was created with #Peers_initialise ()
1062  */
1063 void
1064 Peers_terminate ()
1065 {
1066   if (GNUNET_SYSERR ==
1067       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1068                                              peermap_clear_iterator,
1069                                              NULL))
1070   {
1071     LOG (GNUNET_ERROR_TYPE_WARNING,
1072         "Iteration destroying peers was aborted.\n");
1073   }
1074   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1075   peer_map = NULL;
1076   store_valid_peers ();
1077   GNUNET_free (filename_valid_peers);
1078   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1079 }
1080
1081
1082 /**
1083  * Iterator over #valid_peers hash map entries.
1084  *
1085  * @param cls closure - unused
1086  * @param peer current peer id
1087  * @param value value in the hash map - unused
1088  * @return #GNUNET_YES if we should continue to
1089  *         iterate,
1090  *         #GNUNET_NO if not.
1091  */
1092 static int
1093 valid_peer_iterator (void *cls,
1094                      const struct GNUNET_PeerIdentity *peer,
1095                      void *value)
1096 {
1097   struct PeersIteratorCls *it_cls = cls;
1098
1099   return it_cls->iterator (it_cls->cls,
1100                            peer);
1101 }
1102
1103
1104 /**
1105  * @brief Get all currently known, valid peer ids.
1106  *
1107  * @param it function to call on each peer id
1108  * @param it_cls extra argument to @a it
1109  * @return the number of key value pairs processed,
1110  *         #GNUNET_SYSERR if it aborted iteration
1111  */
1112 int
1113 Peers_get_valid_peers (PeersIterator iterator,
1114                        void *it_cls)
1115 {
1116   struct PeersIteratorCls *cls;
1117   int ret;
1118
1119   cls = GNUNET_new (struct PeersIteratorCls);
1120   cls->iterator = iterator;
1121   cls->cls = it_cls;
1122   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1123                                                valid_peer_iterator,
1124                                                cls);
1125   GNUNET_free (cls);
1126   return ret;
1127 }
1128
1129
1130 /**
1131  * @brief Add peer to known peers.
1132  *
1133  * This function is called on new peer_ids from 'external' sources
1134  * (client seed, cadet get_peers(), ...)
1135  *
1136  * @param peer the new #GNUNET_PeerIdentity
1137  *
1138  * @return #GNUNET_YES if peer was inserted
1139  *         #GNUNET_NO  otherwise (if peer was already known or
1140  *                     peer was #own_identity)
1141  */
1142 int
1143 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1144 {
1145   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1146        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1147   {
1148     return GNUNET_NO; /* We already know this peer - nothing to do */
1149   }
1150   (void) create_peer_ctx (peer);
1151   return GNUNET_YES;
1152 }
1153
1154 int
1155 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1156
1157 /**
1158  * @brief Try connecting to a peer to see whether it is online
1159  *
1160  * If not known yet, insert into known peers
1161  *
1162  * @param peer the peer whose liveliness is to be checked
1163  * @return #GNUNET_YES if peer had to be inserted
1164  *         #GNUNET_NO  otherwise (if peer was already known or
1165  *                     peer was #own_identity)
1166  */
1167 int
1168 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1169 {
1170   struct PeerContext *peer_ctx;
1171   int ret;
1172
1173   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1174   {
1175     return GNUNET_NO;
1176   }
1177   ret = Peers_insert_peer (peer);
1178   peer_ctx = get_peer_ctx (peer);
1179   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1180   {
1181     check_peer_live (peer_ctx);
1182   }
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * @brief Check if peer is removable.
1189  *
1190  * Check if
1191  *  - a recv channel exists
1192  *  - there are pending messages
1193  *  - there is no pending pull reply
1194  *
1195  * @param peer the peer in question
1196  * @return #GNUNET_YES    if peer is removable
1197  *         #GNUNET_NO     if peer is NOT removable
1198  *         #GNUNET_SYSERR if peer is not known
1199  */
1200 int
1201 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1202 {
1203   struct PeerContext *peer_ctx;
1204
1205   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1206   {
1207     return GNUNET_SYSERR;
1208   }
1209
1210   peer_ctx = get_peer_ctx (peer);
1211   if ( (NULL != peer_ctx->recv_channel) ||
1212        (NULL != peer_ctx->pending_messages_head) ||
1213        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1214   {
1215     return GNUNET_NO;
1216   }
1217   return GNUNET_YES;
1218 }
1219
1220 uint32_t *
1221 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1222                         enum Peers_ChannelRole role);
1223
1224 int
1225 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1226
1227 /**
1228  * @brief Remove peer
1229  *
1230  * @param peer the peer to clean
1231  * @return #GNUNET_YES if peer was removed
1232  *         #GNUNET_NO  otherwise
1233  */
1234 int
1235 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1236 {
1237   struct PeerContext *peer_ctx;
1238   uint32_t *channel_flag;
1239
1240   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1241   {
1242     return GNUNET_NO;
1243   }
1244
1245   peer_ctx = get_peer_ctx (peer);
1246   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Going to remove peer %s\n",
1249        GNUNET_i2s (&peer_ctx->peer_id));
1250   Peers_unset_peer_flag (peer, Peers_ONLINE);
1251
1252   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1253   while (NULL != peer_ctx->pending_messages_head)
1254   {
1255     LOG (GNUNET_ERROR_TYPE_DEBUG,
1256         "Removing unsent %s\n",
1257         peer_ctx->pending_messages_head->type);
1258     /* Cancle pending message, too */
1259     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1260   }
1261   /* If we are still waiting for notification whether this peer is live
1262    * cancel the according task */
1263   if (NULL != peer_ctx->liveliness_check_pending)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266          "Removing pending liveliness check for peer %s\n",
1267          GNUNET_i2s (&peer_ctx->peer_id));
1268     // TODO wait until cadet sets mq->cancel_impl
1269     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1270     GNUNET_free (peer_ctx->liveliness_check_pending);
1271     peer_ctx->liveliness_check_pending = NULL;
1272   }
1273   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1274   if (NULL != peer_ctx->send_channel &&
1275       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1276   {
1277     LOG (GNUNET_ERROR_TYPE_DEBUG,
1278         "Destroying send channel\n");
1279     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1280     peer_ctx->send_channel = NULL;
1281     peer_ctx->mq = NULL;
1282   }
1283   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1284   if (NULL != peer_ctx->recv_channel &&
1285       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288         "Destroying recv channel\n");
1289     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1290     peer_ctx->recv_channel = NULL;
1291   }
1292
1293   GNUNET_free (peer_ctx->send_channel_flags);
1294   GNUNET_free (peer_ctx->recv_channel_flags);
1295
1296   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1297   {
1298     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1299   }
1300   GNUNET_free (peer_ctx);
1301   return GNUNET_YES;
1302 }
1303
1304
1305 /**
1306  * @brief set flags on a given peer.
1307  *
1308  * @param peer the peer to set flags on
1309  * @param flags the flags
1310  */
1311 void
1312 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1313 {
1314   struct PeerContext *peer_ctx;
1315
1316   peer_ctx = get_peer_ctx (peer);
1317   set_peer_flag (peer_ctx, flags);
1318 }
1319
1320
1321 /**
1322  * @brief unset flags on a given peer.
1323  *
1324  * @param peer the peer to unset flags on
1325  * @param flags the flags
1326  */
1327 void
1328 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1329 {
1330   struct PeerContext *peer_ctx;
1331
1332   peer_ctx = get_peer_ctx (peer);
1333   unset_peer_flag (peer_ctx, flags);
1334 }
1335
1336
1337 /**
1338  * @brief Check whether flags on a peer are set.
1339  *
1340  * @param peer the peer to check the flag of
1341  * @param flags the flags to check
1342  *
1343  * @return #GNUNET_SYSERR if peer is not known
1344  *         #GNUNET_YES    if all given flags are set
1345  *         #GNUNET_NO     otherwise
1346  */
1347 int
1348 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1349 {
1350   struct PeerContext *peer_ctx;
1351
1352   if (GNUNET_NO == Peers_check_peer_known (peer))
1353   {
1354     return GNUNET_SYSERR;
1355   }
1356   peer_ctx = get_peer_ctx (peer);
1357   return check_peer_flag_set (peer_ctx, flags);
1358 }
1359
1360
1361 /**
1362  * @brief set flags on a given channel.
1363  *
1364  * @param channel the channel to set flags on
1365  * @param flags the flags
1366  */
1367 void
1368 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1369 {
1370   set_channel_flag (channel_flags, flags);
1371 }
1372
1373
1374 /**
1375  * @brief unset flags on a given channel.
1376  *
1377  * @param channel the channel to unset flags on
1378  * @param flags the flags
1379  */
1380 void
1381 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1382 {
1383   unset_channel_flag (channel_flags, flags);
1384 }
1385
1386
1387 /**
1388  * @brief Check whether flags on a channel are set.
1389  *
1390  * @param channel the channel to check the flag of
1391  * @param flags the flags to check
1392  *
1393  * @return #GNUNET_YES if all given flags are set
1394  *         #GNUNET_NO  otherwise
1395  */
1396 int
1397 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1398 {
1399   return check_channel_flag_set (channel_flags, flags);
1400 }
1401
1402 /**
1403  * @brief Get the flags for the channel in @a role for @a peer.
1404  *
1405  * @param peer Peer to get the channel flags for.
1406  * @param role Role of channel to get flags for
1407  *
1408  * @return The flags.
1409  */
1410 uint32_t *
1411 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1412                         enum Peers_ChannelRole role)
1413 {
1414   const struct PeerContext *peer_ctx;
1415
1416   peer_ctx = get_peer_ctx (peer);
1417   if (Peers_CHANNEL_ROLE_SENDING == role)
1418   {
1419     return peer_ctx->send_channel_flags;
1420   }
1421   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1422   {
1423     return peer_ctx->recv_channel_flags;
1424   }
1425   else
1426   {
1427     GNUNET_assert (0);
1428   }
1429 }
1430
1431 /**
1432  * @brief Check whether we have information about the given peer.
1433  *
1434  * FIXME probably deprecated. Make this the new _online.
1435  *
1436  * @param peer peer in question
1437  *
1438  * @return #GNUNET_YES if peer is known
1439  *         #GNUNET_NO  if peer is not knwon
1440  */
1441 int
1442 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1443 {
1444   if (NULL != peer_map)
1445   {
1446     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1447   } else
1448   {
1449     return GNUNET_NO;
1450   }
1451 }
1452
1453
1454 /**
1455  * @brief Check whether @a peer is actually a peer.
1456  *
1457  * A valid peer is a peer that we know exists eg. we were connected to once.
1458  *
1459  * @param peer peer in question
1460  *
1461  * @return #GNUNET_YES if peer is valid
1462  *         #GNUNET_NO  if peer is not valid
1463  */
1464 int
1465 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1466 {
1467   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1468 }
1469
1470
1471 /**
1472  * @brief Indicate that we want to send to the other peer
1473  *
1474  * This establishes a sending channel
1475  *
1476  * @param peer the peer to establish channel to
1477  */
1478 void
1479 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1480 {
1481   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1482   (void) get_channel (peer);
1483 }
1484
1485
1486 /**
1487  * @brief Check whether other peer has the intention to send/opened channel
1488  *        towars us
1489  *
1490  * @param peer the peer in question
1491  *
1492  * @return #GNUNET_YES if peer has the intention to send
1493  *         #GNUNET_NO  otherwise
1494  */
1495 int
1496 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1497 {
1498   const struct PeerContext *peer_ctx;
1499
1500   peer_ctx = get_peer_ctx (peer);
1501   if (NULL != peer_ctx->recv_channel)
1502   {
1503     return GNUNET_YES;
1504   }
1505   return GNUNET_NO;
1506 }
1507
1508
1509 /**
1510  * Handle the channel a peer opens to us.
1511  *
1512  * @param cls The closure
1513  * @param channel The channel the peer wants to establish
1514  * @param initiator The peer's peer ID
1515  *
1516  * @return initial channel context for the channel
1517  *         (can be NULL -- that's not an error)
1518  */
1519 void *
1520 Peers_handle_inbound_channel (void *cls,
1521                               struct GNUNET_CADET_Channel *channel,
1522                               const struct GNUNET_PeerIdentity *initiator)
1523 {
1524   struct PeerContext *peer_ctx;
1525   struct GNUNET_PeerIdentity *ctx_peer;
1526
1527   LOG (GNUNET_ERROR_TYPE_DEBUG,
1528       "New channel was established to us (Peer %s).\n",
1529       GNUNET_i2s (initiator));
1530   GNUNET_assert (NULL != channel); /* according to cadet API */
1531   /* Make sure we 'know' about this peer */
1532   peer_ctx = create_or_get_peer_ctx (initiator);
1533   set_peer_live (peer_ctx);
1534   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1535   *ctx_peer = *initiator;
1536   /* We only accept one incoming channel per peer */
1537   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1538   {
1539     set_channel_flag (peer_ctx->recv_channel_flags,
1540                       Peers_CHANNEL_ESTABLISHED_TWICE);
1541     //GNUNET_CADET_channel_destroy (channel);
1542     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1543     peer_ctx->recv_channel = channel;
1544     /* return the channel context */
1545     return ctx_peer;
1546   }
1547   peer_ctx->recv_channel = channel;
1548   return ctx_peer;
1549 }
1550
1551
1552 /**
1553  * @brief Check whether a sending channel towards the given peer exists
1554  *
1555  * @param peer the peer to check for
1556  *
1557  * @return #GNUNET_YES if a sending channel towards that peer exists
1558  *         #GNUNET_NO  otherwise
1559  */
1560 int
1561 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1562 {
1563   struct PeerContext *peer_ctx;
1564
1565   if (GNUNET_NO == Peers_check_peer_known (peer))
1566   { /* If no such peer exists, there is no channel */
1567     return GNUNET_NO;
1568   }
1569   peer_ctx = get_peer_ctx (peer);
1570   if (NULL == peer_ctx->send_channel)
1571   {
1572     return GNUNET_NO;
1573   }
1574   return GNUNET_YES;
1575 }
1576
1577
1578 /**
1579  * @brief check whether the given channel is the sending channel of the given
1580  *        peer
1581  *
1582  * @param peer the peer in question
1583  * @param channel the channel to check for
1584  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1585  *                    #Peers_CHANNEL_ROLE_RECEIVING
1586  *
1587  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1588  *         #GNUNET_NO  otherwise
1589  */
1590 int
1591 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1592                           const struct GNUNET_CADET_Channel *channel,
1593                           enum Peers_ChannelRole role)
1594 {
1595   const struct PeerContext *peer_ctx;
1596
1597   if (GNUNET_NO == Peers_check_peer_known (peer))
1598   {
1599     return GNUNET_NO;
1600   }
1601   peer_ctx = get_peer_ctx (peer);
1602   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1603        (channel == peer_ctx->send_channel) )
1604   {
1605     return GNUNET_YES;
1606   }
1607   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1608        (channel == peer_ctx->recv_channel) )
1609   {
1610     return GNUNET_YES;
1611   }
1612   return GNUNET_NO;
1613 }
1614
1615
1616 /**
1617  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618  *        intention to another peer
1619  *
1620  * If there is also no channel to receive messages from that peer, remove it
1621  * from the peermap.
1622  * TODO really?
1623  *
1624  * @peer the peer identity of the peer whose sending channel to destroy
1625  * @return #GNUNET_YES if channel was destroyed
1626  *         #GNUNET_NO  otherwise
1627  */
1628 int
1629 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1630 {
1631   struct PeerContext *peer_ctx;
1632
1633   if (GNUNET_NO == Peers_check_peer_known (peer))
1634   {
1635     return GNUNET_NO;
1636   }
1637   peer_ctx = get_peer_ctx (peer);
1638   if (NULL != peer_ctx->send_channel)
1639   {
1640     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1641     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1642     peer_ctx->send_channel = NULL;
1643     peer_ctx->mq = NULL;
1644     (void) Peers_check_connected (peer);
1645     return GNUNET_YES;
1646   }
1647   return GNUNET_NO;
1648 }
1649
1650 /**
1651  * This is called when a channel is destroyed.
1652  *
1653  * @param cls The closure
1654  * @param channel The channel being closed
1655  */
1656 void
1657 Peers_cleanup_destroyed_channel (void *cls,
1658                                  const struct GNUNET_CADET_Channel *channel)
1659 {
1660   struct GNUNET_PeerIdentity *peer = cls;
1661   struct PeerContext *peer_ctx;
1662
1663   if (GNUNET_NO == Peers_check_peer_known (peer))
1664   {/* We don't want to implicitly create a context that we're about to kill */
1665   LOG (GNUNET_ERROR_TYPE_DEBUG,
1666        "channel (%s) without associated context was destroyed\n",
1667        GNUNET_i2s (peer));
1668     return;
1669   }
1670   peer_ctx = get_peer_ctx (peer);
1671
1672   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1673    * flag will be set. In this case simply make sure that the channels are
1674    * cleaned. */
1675   /* FIXME This distinction seems to be redundant */
1676   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1677   {/* We initiatad the destruction of this particular peer */
1678     LOG (GNUNET_ERROR_TYPE_DEBUG,
1679         "Peer is in the process of being destroyed\n");
1680     if (channel == peer_ctx->send_channel)
1681     {
1682       peer_ctx->send_channel = NULL;
1683       peer_ctx->mq = NULL;
1684     }
1685     else if (channel == peer_ctx->recv_channel)
1686     {
1687       peer_ctx->recv_channel = NULL;
1688     }
1689
1690     if (NULL != peer_ctx->send_channel)
1691     {
1692       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1693       peer_ctx->send_channel = NULL;
1694       peer_ctx->mq = NULL;
1695     }
1696     if (NULL != peer_ctx->recv_channel)
1697     {
1698       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1699       peer_ctx->recv_channel = NULL;
1700     }
1701     /* Set the #Peers_ONLINE flag accordingly */
1702     (void) Peers_check_connected (peer);
1703     return;
1704   }
1705
1706   else
1707   { /* We did not initiate the destruction of this peer */
1708     LOG (GNUNET_ERROR_TYPE_DEBUG,
1709         "Peer is NOT in the process of being destroyed\n");
1710     if (channel == peer_ctx->send_channel)
1711     { /* Something (but us) killd the channel - clean up peer */
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713           "send channel (%s) was destroyed - cleaning up\n",
1714           GNUNET_i2s (peer));
1715       peer_ctx->send_channel = NULL;
1716       peer_ctx->mq = NULL;
1717     }
1718     else if (channel == peer_ctx->recv_channel)
1719     { /* Other peer doesn't want to send us messages anymore */
1720       LOG (GNUNET_ERROR_TYPE_DEBUG,
1721            "Peer %s destroyed recv channel - cleaning up channel\n",
1722            GNUNET_i2s (peer));
1723       peer_ctx->recv_channel = NULL;
1724     }
1725     else
1726     {
1727       LOG (GNUNET_ERROR_TYPE_WARNING,
1728            "unknown channel (%s) was destroyed\n",
1729            GNUNET_i2s (peer));
1730     }
1731   }
1732   (void) Peers_check_connected (peer);
1733 }
1734
1735 /**
1736  * @brief Send a message to another peer.
1737  *
1738  * Keeps track about pending messages so they can be properly removed when the
1739  * peer is destroyed.
1740  *
1741  * @param peer receeiver of the message
1742  * @param ev envelope of the message
1743  * @param type type of the message
1744  */
1745 void
1746 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1747                     struct GNUNET_MQ_Envelope *ev,
1748                     const char *type)
1749 {
1750   struct PendingMessage *pending_msg;
1751   struct GNUNET_MQ_Handle *mq;
1752
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "Sending message to %s of type %s\n",
1755               GNUNET_i2s (peer),
1756               type);
1757   pending_msg = insert_pending_message (peer, ev, type);
1758   mq = get_mq (peer);
1759   GNUNET_MQ_notify_sent (ev,
1760                          mq_notify_sent_cb,
1761                          pending_msg);
1762   GNUNET_MQ_send (mq, ev);
1763 }
1764
1765 /**
1766  * @brief Schedule a operation on given peer
1767  *
1768  * Avoids scheduling an operation twice.
1769  *
1770  * @param peer the peer we want to schedule the operation for once it gets live
1771  *
1772  * @return #GNUNET_YES if the operation was scheduled
1773  *         #GNUNET_NO  otherwise
1774  */
1775 int
1776 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1777                           const PeerOp peer_op)
1778 {
1779   struct PeerPendingOp pending_op;
1780   struct PeerContext *peer_ctx;
1781
1782   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1783   {
1784     return GNUNET_NO;
1785   }
1786   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1787
1788   //TODO if LIVE/ONLINE execute immediately
1789
1790   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1791   {
1792     peer_ctx = get_peer_ctx (peer);
1793     pending_op.op = peer_op;
1794     pending_op.op_cls = NULL;
1795     GNUNET_array_append (peer_ctx->pending_ops,
1796                          peer_ctx->num_pending_ops,
1797                          pending_op);
1798     return GNUNET_YES;
1799   }
1800   return GNUNET_NO;
1801 }
1802
1803 /**
1804  * @brief Get the recv_channel of @a peer.
1805  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1806  * messages.
1807  *
1808  * @param peer The peer to get the recv_channel from.
1809  *
1810  * @return The recv_channel.
1811  */
1812 struct GNUNET_CADET_Channel *
1813 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1814 {
1815   struct PeerContext *peer_ctx;
1816
1817   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1818   peer_ctx = get_peer_ctx (peer);
1819   return peer_ctx->recv_channel;
1820 }
1821 /***********************************************************************
1822  * /Old gnunet-service-rps_peers.c
1823 ***********************************************************************/
1824
1825
1826 /***********************************************************************
1827  * Housekeeping with clients
1828 ***********************************************************************/
1829
1830 /**
1831  * Closure used to pass the client and the id to the callback
1832  * that replies to a client's request
1833  */
1834 struct ReplyCls
1835 {
1836   /**
1837    * DLL
1838    */
1839   struct ReplyCls *next;
1840   struct ReplyCls *prev;
1841
1842   /**
1843    * The identifier of the request
1844    */
1845   uint32_t id;
1846
1847   /**
1848    * The handle to the request
1849    */
1850   struct RPS_SamplerRequestHandle *req_handle;
1851
1852   /**
1853    * The client handle to send the reply to
1854    */
1855   struct ClientContext *cli_ctx;
1856 };
1857
1858
1859 /**
1860  * Struct used to store the context of a connected client.
1861  */
1862 struct ClientContext
1863 {
1864   /**
1865    * DLL
1866    */
1867   struct ClientContext *next;
1868   struct ClientContext *prev;
1869
1870   /**
1871    * The message queue to communicate with the client.
1872    */
1873   struct GNUNET_MQ_Handle *mq;
1874
1875   /**
1876    * DLL with handles to single requests from the client
1877    */
1878   struct ReplyCls *rep_cls_head;
1879   struct ReplyCls *rep_cls_tail;
1880
1881   /**
1882    * @brief How many updates this client expects to receive.
1883    */
1884   int64_t view_updates_left;
1885
1886   /**
1887    * The client handle to send the reply to
1888    */
1889   struct GNUNET_SERVICE_Client *client;
1890 };
1891
1892 /**
1893  * DLL with all clients currently connected to us
1894  */
1895 struct ClientContext *cli_ctx_head;
1896 struct ClientContext *cli_ctx_tail;
1897
1898 /***********************************************************************
1899  * /Housekeeping with clients
1900 ***********************************************************************/
1901
1902
1903
1904
1905
1906 /***********************************************************************
1907  * Globals
1908 ***********************************************************************/
1909
1910 /**
1911  * Sampler used for the Brahms protocol itself.
1912  */
1913 static struct RPS_Sampler *prot_sampler;
1914
1915 /**
1916  * Sampler used for the clients.
1917  */
1918 static struct RPS_Sampler *client_sampler;
1919
1920 /**
1921  * Name to log view to
1922  */
1923 static const char *file_name_view_log;
1924
1925 #ifdef TO_FILE
1926 /**
1927  * Name to log number of observed peers to
1928  */
1929 static const char *file_name_observed_log;
1930
1931 /**
1932  * @brief Count the observed peers
1933  */
1934 static uint32_t num_observed_peers;
1935
1936 /**
1937  * @brief Multipeermap (ab-) used to count unique peer_ids
1938  */
1939 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1940 #endif /* TO_FILE */
1941
1942 /**
1943  * The size of sampler we need to be able to satisfy the client's need
1944  * of random peers.
1945  */
1946 static unsigned int sampler_size_client_need;
1947
1948 /**
1949  * The size of sampler we need to be able to satisfy the Brahms protocol's
1950  * need of random peers.
1951  *
1952  * This is one minimum size the sampler grows to.
1953  */
1954 static unsigned int sampler_size_est_need;
1955
1956 /**
1957  * @brief This is the minimum estimate used as sampler size.
1958  *
1959  * It is configured by the user.
1960  */
1961 static unsigned int sampler_size_est_min;
1962
1963 /**
1964  * @brief This is the estimate used as view size.
1965  *
1966  * It is initialised with the minimum
1967  */
1968 static unsigned int view_size_est_need;
1969
1970 /**
1971  * @brief This is the minimum estimate used as view size.
1972  *
1973  * It is configured by the user.
1974  */
1975 static unsigned int view_size_est_min;
1976
1977 /**
1978  * Percentage of total peer number in the view
1979  * to send random PUSHes to
1980  */
1981 static float alpha;
1982
1983 /**
1984  * Percentage of total peer number in the view
1985  * to send random PULLs to
1986  */
1987 static float beta;
1988
1989 /**
1990  * Identifier for the main task that runs periodically.
1991  */
1992 static struct GNUNET_SCHEDULER_Task *do_round_task;
1993
1994 /**
1995  * Time inverval the do_round task runs in.
1996  */
1997 static struct GNUNET_TIME_Relative round_interval;
1998
1999 /**
2000  * List to store peers received through pushes temporary.
2001  */
2002 static struct CustomPeerMap *push_map;
2003
2004 /**
2005  * List to store peers received through pulls temporary.
2006  */
2007 static struct CustomPeerMap *pull_map;
2008
2009 /**
2010  * Handler to NSE.
2011  */
2012 static struct GNUNET_NSE_Handle *nse;
2013
2014 /**
2015  * Handler to CADET.
2016  */
2017 static struct GNUNET_CADET_Handle *cadet_handle;
2018
2019 /**
2020  * @brief Port to communicate to other peers.
2021  */
2022 static struct GNUNET_CADET_Port *cadet_port;
2023
2024 /**
2025  * Handler to PEERINFO.
2026  */
2027 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2028
2029 /**
2030  * Handle for cancellation of iteration over peers.
2031  */
2032 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2033
2034 /**
2035  * Request counter.
2036  *
2037  * Counts how many requets clients already issued.
2038  * Only needed in the beginning to check how many of the 64 deltas
2039  * we already have
2040  */
2041 static unsigned int req_counter;
2042
2043 /**
2044  * Time of the last request we received.
2045  *
2046  * Used to compute the expected request rate.
2047  */
2048 static struct GNUNET_TIME_Absolute last_request;
2049
2050 /**
2051  * Size of #request_deltas.
2052  */
2053 #define REQUEST_DELTAS_SIZE 64
2054 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2055
2056 /**
2057  * Last 64 deltas between requests
2058  */
2059 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2060
2061 /**
2062  * The prediction of the rate of requests
2063  */
2064 static struct GNUNET_TIME_Relative request_rate;
2065
2066
2067 #ifdef ENABLE_MALICIOUS
2068 /**
2069  * Type of malicious peer
2070  *
2071  * 0 Don't act malicious at all - Default
2072  * 1 Try to maximise representation
2073  * 2 Try to partition the network
2074  * 3 Combined attack
2075  */
2076 static uint32_t mal_type;
2077
2078 /**
2079  * Other malicious peers
2080  */
2081 static struct GNUNET_PeerIdentity *mal_peers;
2082
2083 /**
2084  * Hashmap of malicious peers used as set.
2085  * Used to more efficiently check whether we know that peer.
2086  */
2087 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2088
2089 /**
2090  * Number of other malicious peers
2091  */
2092 static uint32_t num_mal_peers;
2093
2094
2095 /**
2096  * If type is 2 This struct is used to store the attacked peers in a DLL
2097  */
2098 struct AttackedPeer
2099 {
2100   /**
2101    * DLL
2102    */
2103   struct AttackedPeer *next;
2104   struct AttackedPeer *prev;
2105
2106   /**
2107    * PeerID
2108    */
2109   struct GNUNET_PeerIdentity peer_id;
2110 };
2111
2112 /**
2113  * If type is 2 this is the DLL of attacked peers
2114  */
2115 static struct AttackedPeer *att_peers_head;
2116 static struct AttackedPeer *att_peers_tail;
2117
2118 /**
2119  * This index is used to point to an attacked peer to
2120  * implement the round-robin-ish way to select attacked peers.
2121  */
2122 static struct AttackedPeer *att_peer_index;
2123
2124 /**
2125  * Hashmap of attacked peers used as set.
2126  * Used to more efficiently check whether we know that peer.
2127  */
2128 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2129
2130 /**
2131  * Number of attacked peers
2132  */
2133 static uint32_t num_attacked_peers;
2134
2135 /**
2136  * If type is 1 this is the attacked peer
2137  */
2138 static struct GNUNET_PeerIdentity attacked_peer;
2139
2140 /**
2141  * The limit of PUSHes we can send in one round.
2142  * This is an assumption of the Brahms protocol and either implemented
2143  * via proof of work
2144  * or
2145  * assumend to be the bandwidth limitation.
2146  */
2147 static uint32_t push_limit = 10000;
2148 #endif /* ENABLE_MALICIOUS */
2149
2150
2151 /***********************************************************************
2152  * /Globals
2153 ***********************************************************************/
2154
2155
2156 /***********************************************************************
2157  * Util functions
2158 ***********************************************************************/
2159
2160
2161 /**
2162  * Print peerlist to log.
2163  */
2164 static void
2165 print_peer_list (struct GNUNET_PeerIdentity *list,
2166                  unsigned int len)
2167 {
2168   unsigned int i;
2169
2170   LOG (GNUNET_ERROR_TYPE_DEBUG,
2171        "Printing peer list of length %u at %p:\n",
2172        len,
2173        list);
2174   for (i = 0 ; i < len ; i++)
2175   {
2176     LOG (GNUNET_ERROR_TYPE_DEBUG,
2177          "%u. peer: %s\n",
2178          i, GNUNET_i2s (&list[i]));
2179   }
2180 }
2181
2182
2183 /**
2184  * Remove peer from list.
2185  */
2186 static void
2187 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2188                unsigned int *list_size,
2189                const struct GNUNET_PeerIdentity *peer)
2190 {
2191   unsigned int i;
2192   struct GNUNET_PeerIdentity *tmp;
2193
2194   tmp = *peer_list;
2195
2196   LOG (GNUNET_ERROR_TYPE_DEBUG,
2197        "Removing peer %s from list at %p\n",
2198        GNUNET_i2s (peer),
2199        tmp);
2200
2201   for ( i = 0 ; i < *list_size ; i++ )
2202   {
2203     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2204     {
2205       if (i < *list_size -1)
2206       { /* Not at the last entry -- shift peers left */
2207         memmove (&tmp[i], &tmp[i +1],
2208                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2209       }
2210       /* Remove last entry (should be now useless PeerID) */
2211       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2212     }
2213   }
2214   *peer_list = tmp;
2215 }
2216
2217
2218 /**
2219  * Sum all time relatives of an array.
2220  */
2221 static struct GNUNET_TIME_Relative
2222 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2223                 uint32_t arr_size)
2224 {
2225   struct GNUNET_TIME_Relative sum;
2226   uint32_t i;
2227
2228   sum = GNUNET_TIME_UNIT_ZERO;
2229   for ( i = 0 ; i < arr_size ; i++ )
2230   {
2231     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2232   }
2233   return sum;
2234 }
2235
2236
2237 /**
2238  * Compute the average of given time relatives.
2239  */
2240 static struct GNUNET_TIME_Relative
2241 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2242                 uint32_t arr_size)
2243 {
2244   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2245                                                       arr_size),
2246                                       arr_size);
2247 }
2248
2249
2250 /**
2251  * Insert PeerID in #view
2252  *
2253  * Called once we know a peer is live.
2254  * Implements #PeerOp
2255  *
2256  * @return GNUNET_OK if peer was actually inserted
2257  *         GNUNET_NO if peer was not inserted
2258  */
2259 static void
2260 insert_in_view_op (void *cls,
2261                 const struct GNUNET_PeerIdentity *peer);
2262
2263 /**
2264  * Insert PeerID in #view
2265  *
2266  * Called once we know a peer is live.
2267  *
2268  * @return GNUNET_OK if peer was actually inserted
2269  *         GNUNET_NO if peer was not inserted
2270  */
2271 static int
2272 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2273 {
2274   int online;
2275
2276   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2277   if ( (GNUNET_NO == online) ||
2278        (GNUNET_SYSERR == online) ) /* peer is not even known */
2279   {
2280     (void) Peers_issue_peer_liveliness_check (peer);
2281     (void) Peers_schedule_operation (peer, insert_in_view_op);
2282     return GNUNET_NO;
2283   }
2284   /* Open channel towards peer to keep connection open */
2285   Peers_indicate_sending_intention (peer);
2286   return View_put (peer);
2287 }
2288
2289 /**
2290  * @brief sends updates to clients that are interested
2291  */
2292 static void
2293 clients_notify_view_update (void);
2294
2295 /**
2296  * Put random peer from sampler into the view as history update.
2297  */
2298 static void
2299 hist_update (void *cls,
2300              struct GNUNET_PeerIdentity *ids,
2301              uint32_t num_peers)
2302 {
2303   unsigned int i;
2304
2305   for (i = 0; i < num_peers; i++)
2306   {
2307     (void) insert_in_view (&ids[i]);
2308     to_file (file_name_view_log,
2309              "+%s\t(hist)",
2310              GNUNET_i2s_full (ids));
2311   }
2312   clients_notify_view_update();
2313 }
2314
2315
2316 /**
2317  * Wrapper around #RPS_sampler_resize()
2318  *
2319  * If we do not have enough sampler elements, double current sampler size
2320  * If we have more than enough sampler elements, halv current sampler size
2321  */
2322 static void
2323 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2324 {
2325   unsigned int sampler_size;
2326
2327   // TODO statistics
2328   // TODO respect the min, max
2329   sampler_size = RPS_sampler_get_size (sampler);
2330   if (sampler_size > new_size * 4)
2331   { /* Shrinking */
2332     RPS_sampler_resize (sampler, sampler_size / 2);
2333   }
2334   else if (sampler_size < new_size)
2335   { /* Growing */
2336     RPS_sampler_resize (sampler, sampler_size * 2);
2337   }
2338   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2339 }
2340
2341
2342 /**
2343  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2344  */
2345 static void
2346 client_resize_wrapper ()
2347 {
2348   uint32_t bigger_size;
2349
2350   // TODO statistics
2351
2352   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2353
2354   // TODO respect the min, max
2355   resize_wrapper (client_sampler, bigger_size);
2356   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2357       bigger_size);
2358 }
2359
2360
2361 /**
2362  * Estimate request rate
2363  *
2364  * Called every time we receive a request from the client.
2365  */
2366 static void
2367 est_request_rate()
2368 {
2369   struct GNUNET_TIME_Relative max_round_duration;
2370
2371   if (request_deltas_size > req_counter)
2372     req_counter++;
2373   if ( 1 < req_counter)
2374   {
2375     /* Shift last request deltas to the right */
2376     memmove (&request_deltas[1],
2377         request_deltas,
2378         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2379
2380     /* Add current delta to beginning */
2381     request_deltas[0] =
2382         GNUNET_TIME_absolute_get_difference (last_request,
2383                                              GNUNET_TIME_absolute_get ());
2384     request_rate = T_relative_avg (request_deltas, req_counter);
2385     request_rate = (request_rate.rel_value_us < 1) ?
2386       GNUNET_TIME_relative_get_unit_ () : request_rate;
2387
2388     /* Compute the duration a round will maximally take */
2389     max_round_duration =
2390         GNUNET_TIME_relative_add (round_interval,
2391                                   GNUNET_TIME_relative_divide (round_interval, 2));
2392
2393     /* Set the estimated size the sampler has to have to
2394      * satisfy the current client request rate */
2395     sampler_size_client_need =
2396         max_round_duration.rel_value_us / request_rate.rel_value_us;
2397
2398     /* Resize the sampler */
2399     client_resize_wrapper ();
2400   }
2401   last_request = GNUNET_TIME_absolute_get ();
2402 }
2403
2404
2405 /**
2406  * Add all peers in @a peer_array to @a peer_map used as set.
2407  *
2408  * @param peer_array array containing the peers
2409  * @param num_peers number of peers in @peer_array
2410  * @param peer_map the peermap to use as set
2411  */
2412 static void
2413 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2414                        unsigned int num_peers,
2415                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2416 {
2417   unsigned int i;
2418   if (NULL == peer_map)
2419   {
2420     LOG (GNUNET_ERROR_TYPE_WARNING,
2421          "Trying to add peers to non-existing peermap.\n");
2422     return;
2423   }
2424
2425   for (i = 0; i < num_peers; i++)
2426   {
2427     GNUNET_CONTAINER_multipeermap_put (peer_map,
2428                                        &peer_array[i],
2429                                        NULL,
2430                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2431   }
2432 }
2433
2434
2435 /**
2436  * Send a PULL REPLY to @a peer_id
2437  *
2438  * @param peer_id the peer to send the reply to.
2439  * @param peer_ids the peers to send to @a peer_id
2440  * @param num_peer_ids the number of peers to send to @a peer_id
2441  */
2442 static void
2443 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2444                  const struct GNUNET_PeerIdentity *peer_ids,
2445                  unsigned int num_peer_ids)
2446 {
2447   uint32_t send_size;
2448   struct GNUNET_MQ_Envelope *ev;
2449   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2450
2451   /* Compute actual size */
2452   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2453               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2454
2455   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2456     /* Compute number of peers to send
2457      * If too long, simply truncate */
2458     // TODO select random ones via permutation
2459     //      or even better: do good protocol design
2460     send_size =
2461       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2462        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2463        sizeof (struct GNUNET_PeerIdentity);
2464   else
2465     send_size = num_peer_ids;
2466
2467   LOG (GNUNET_ERROR_TYPE_DEBUG,
2468       "Going to send PULL REPLY with %u peers to %s\n",
2469       send_size, GNUNET_i2s (peer_id));
2470
2471   ev = GNUNET_MQ_msg_extra (out_msg,
2472                             send_size * sizeof (struct GNUNET_PeerIdentity),
2473                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2474   out_msg->num_peers = htonl (send_size);
2475   GNUNET_memcpy (&out_msg[1], peer_ids,
2476          send_size * sizeof (struct GNUNET_PeerIdentity));
2477
2478   Peers_send_message (peer_id, ev, "PULL REPLY");
2479   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2480 }
2481
2482
2483 /**
2484  * Insert PeerID in #pull_map
2485  *
2486  * Called once we know a peer is live.
2487  */
2488 static void
2489 insert_in_pull_map (void *cls,
2490                     const struct GNUNET_PeerIdentity *peer)
2491 {
2492   CustomPeerMap_put (pull_map, peer);
2493 }
2494
2495
2496 /**
2497  * Insert PeerID in #view
2498  *
2499  * Called once we know a peer is live.
2500  * Implements #PeerOp
2501  */
2502 static void
2503 insert_in_view_op (void *cls,
2504                 const struct GNUNET_PeerIdentity *peer)
2505 {
2506   (void) insert_in_view (peer);
2507 }
2508
2509
2510 /**
2511  * Update sampler with given PeerID.
2512  * Implements #PeerOp
2513  */
2514 static void
2515 insert_in_sampler (void *cls,
2516                    const struct GNUNET_PeerIdentity *peer)
2517 {
2518   LOG (GNUNET_ERROR_TYPE_DEBUG,
2519        "Updating samplers with peer %s from insert_in_sampler()\n",
2520        GNUNET_i2s (peer));
2521   RPS_sampler_update (prot_sampler,   peer);
2522   RPS_sampler_update (client_sampler, peer);
2523   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2524   {
2525     /* Make sure we 'know' about this peer */
2526     (void) Peers_issue_peer_liveliness_check (peer);
2527     /* Establish a channel towards that peer to indicate we are going to send
2528      * messages to it */
2529     //Peers_indicate_sending_intention (peer);
2530   }
2531   #ifdef TO_FILE
2532   num_observed_peers++;
2533   GNUNET_CONTAINER_multipeermap_put
2534     (observed_unique_peers,
2535      peer,
2536      NULL,
2537      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2538   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2539       observed_unique_peers);
2540   to_file (file_name_observed_log,
2541           "%" PRIu32 " %" PRIu32 " %f\n",
2542           num_observed_peers,
2543           num_observed_unique_peers,
2544           1.0*num_observed_unique_peers/num_observed_peers)
2545   #endif /* TO_FILE */
2546 }
2547
2548 /**
2549  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2550  *        If the peer is not known, liveliness check is issued and it is
2551  *        scheduled to be inserted in sampler and view.
2552  *
2553  * "External sources" refer to every source except the gossip.
2554  *
2555  * @param peer peer to insert
2556  */
2557 static void
2558 got_peer (const struct GNUNET_PeerIdentity *peer)
2559 {
2560   /* If we did not know this peer already, insert it into sampler and view */
2561   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2562   {
2563     Peers_schedule_operation (peer, insert_in_sampler);
2564     Peers_schedule_operation (peer, insert_in_view_op);
2565   }
2566 }
2567
2568 /**
2569  * @brief Checks if there is a sending channel and if it is needed
2570  *
2571  * @param peer the peer whose sending channel is checked
2572  * @return GNUNET_YES if sending channel exists and is still needed
2573  *         GNUNET_NO  otherwise
2574  */
2575 static int
2576 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2577 {
2578   /* struct GNUNET_CADET_Channel *channel; */
2579   if (GNUNET_NO == Peers_check_peer_known (peer))
2580   {
2581     return GNUNET_NO;
2582   }
2583   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2584   {
2585     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2586          (GNUNET_YES == View_contains_peer (peer)) ||
2587          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2588          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2589          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2590     { /* If we want to keep the connection to peer open */
2591       return GNUNET_YES;
2592     }
2593     return GNUNET_NO;
2594   }
2595   return GNUNET_NO;
2596 }
2597
2598 /**
2599  * @brief remove peer from our knowledge, the view, push and pull maps and
2600  * samplers.
2601  *
2602  * @param peer the peer to remove
2603  */
2604 static void
2605 remove_peer (const struct GNUNET_PeerIdentity *peer)
2606 {
2607   (void) View_remove_peer (peer);
2608   CustomPeerMap_remove_peer (pull_map, peer);
2609   CustomPeerMap_remove_peer (push_map, peer);
2610   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2611   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2612   Peers_remove_peer (peer);
2613 }
2614
2615
2616 /**
2617  * @brief Remove data that is not needed anymore.
2618  *
2619  * If the sending channel is no longer needed it is destroyed.
2620  *
2621  * @param peer the peer whose data is about to be cleaned
2622  */
2623 static void
2624 clean_peer (const struct GNUNET_PeerIdentity *peer)
2625 {
2626   if (GNUNET_NO == check_sending_channel_needed (peer))
2627   {
2628     LOG (GNUNET_ERROR_TYPE_DEBUG,
2629         "Going to remove send channel to peer %s\n",
2630         GNUNET_i2s (peer));
2631     #ifdef ENABLE_MALICIOUS
2632     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2633       (void) Peers_destroy_sending_channel (peer);
2634     #else /* ENABLE_MALICIOUS */
2635     (void) Peers_destroy_sending_channel (peer);
2636     #endif /* ENABLE_MALICIOUS */
2637   }
2638
2639   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2640        (GNUNET_NO == View_contains_peer (peer)) &&
2641        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2642        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2643        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2644        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2645        (GNUNET_NO != Peers_check_removable (peer)) )
2646   { /* We can safely remove this peer */
2647     LOG (GNUNET_ERROR_TYPE_DEBUG,
2648         "Going to remove peer %s\n",
2649         GNUNET_i2s (peer));
2650     remove_peer (peer);
2651     return;
2652   }
2653 }
2654
2655 /**
2656  * @brief This is called when a channel is destroyed.
2657  *
2658  * Removes peer completely from our knowledge if the send_channel was destroyed
2659  * Otherwise simply delete the recv_channel
2660  * Also check if the knowledge about this peer is still needed.
2661  * If not, remove this peer from our knowledge.
2662  *
2663  * @param cls The closure
2664  * @param channel The channel being closed
2665  * @param channel_ctx The context associated with this channel
2666  */
2667 static void
2668 cleanup_destroyed_channel (void *cls,
2669                            const struct GNUNET_CADET_Channel *channel)
2670 {
2671   struct GNUNET_PeerIdentity *peer = cls;
2672   uint32_t *channel_flag;
2673   struct PeerContext *peer_ctx;
2674
2675   GNUNET_assert (NULL != peer);
2676
2677   if (GNUNET_NO == Peers_check_peer_known (peer))
2678   { /* We don't know a context to that peer */
2679     LOG (GNUNET_ERROR_TYPE_WARNING,
2680          "channel (%s) without associated context was destroyed\n",
2681          GNUNET_i2s (peer));
2682     GNUNET_free (peer);
2683     return;
2684   }
2685
2686   peer_ctx = get_peer_ctx (peer);
2687   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2688   {
2689     LOG (GNUNET_ERROR_TYPE_DEBUG,
2690         "Callback on destruction of recv-channel was called (%s)\n",
2691         GNUNET_i2s (peer));
2692     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2693   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2694   {
2695     LOG (GNUNET_ERROR_TYPE_DEBUG,
2696         "Callback on destruction of send-channel was called (%s)\n",
2697         GNUNET_i2s (peer));
2698     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2699   } else {
2700     LOG (GNUNET_ERROR_TYPE_ERROR,
2701         "Channel to be destroyed has is neither sending nor receiving role\n");
2702   }
2703
2704   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2705   { /* We are in the middle of removing that peer from our knowledge. In this
2706        case simply make sure that the channels are cleaned. */
2707     Peers_cleanup_destroyed_channel (cls, channel);
2708     to_file (file_name_view_log,
2709              "-%s\t(cleanup channel, ourself)",
2710              GNUNET_i2s_full (peer));
2711     GNUNET_free (peer);
2712     return;
2713   }
2714
2715   if (GNUNET_YES ==
2716       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2717   { /* Channel used for sending was destroyed */
2718     /* Possible causes of channel destruction:
2719      *  - ourselves  -> cleaning send channel -> clean context
2720      *  - other peer -> peer probably went down -> remove
2721      */
2722     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2723     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2724     { /* We are about to clean the sending channel. Clean the respective
2725        * context */
2726       Peers_cleanup_destroyed_channel (cls, channel);
2727       GNUNET_free (peer);
2728       return;
2729     }
2730     else
2731     { /* Other peer destroyed our sending channel that he is supposed to keep
2732        * open. It probably went down. Remove it from our knowledge. */
2733       Peers_cleanup_destroyed_channel (cls, channel);
2734       remove_peer (peer);
2735       GNUNET_free (peer);
2736       return;
2737     }
2738   }
2739   else if (GNUNET_YES ==
2740       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2741   { /* Channel used for receiving was destroyed */
2742     /* Possible causes of channel destruction:
2743      *  - ourselves  -> peer tried to establish channel twice -> clean context
2744      *  - other peer -> peer doesn't want to send us data -> clean
2745      */
2746     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2747     if (GNUNET_YES ==
2748         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2749     { /* Other peer tried to establish a channel to us twice. We do not accept
2750        * that. Clean the context. */
2751       Peers_cleanup_destroyed_channel (cls, channel);
2752       GNUNET_free (peer);
2753       return;
2754     }
2755     else
2756     { /* Other peer doesn't want to send us data anymore. We are free to clean
2757        * it. */
2758       Peers_cleanup_destroyed_channel (cls, channel);
2759       clean_peer (peer);
2760       GNUNET_free (peer);
2761       return;
2762     }
2763   }
2764   else
2765   {
2766     LOG (GNUNET_ERROR_TYPE_WARNING,
2767         "Destroyed channel is neither sending nor receiving channel\n");
2768   }
2769   GNUNET_free (peer);
2770 }
2771
2772 /***********************************************************************
2773  * /Util functions
2774 ***********************************************************************/
2775
2776 static void
2777 destroy_reply_cls (struct ReplyCls *rep_cls)
2778 {
2779   struct ClientContext *cli_ctx;
2780
2781   cli_ctx = rep_cls->cli_ctx;
2782   GNUNET_assert (NULL != cli_ctx);
2783   if (NULL != rep_cls->req_handle)
2784   {
2785     RPS_sampler_request_cancel (rep_cls->req_handle);
2786   }
2787   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2788                                cli_ctx->rep_cls_tail,
2789                                rep_cls);
2790   GNUNET_free (rep_cls);
2791 }
2792
2793
2794 static void
2795 destroy_cli_ctx (struct ClientContext *cli_ctx)
2796 {
2797   GNUNET_assert (NULL != cli_ctx);
2798   if (NULL != cli_ctx->rep_cls_head)
2799   {
2800     LOG (GNUNET_ERROR_TYPE_WARNING,
2801          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2802     while (NULL != cli_ctx->rep_cls_head)
2803       destroy_reply_cls (cli_ctx->rep_cls_head);
2804   }
2805   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2806                                cli_ctx_tail,
2807                                cli_ctx);
2808   GNUNET_free (cli_ctx);
2809 }
2810
2811
2812 /**
2813  * Function called by NSE.
2814  *
2815  * Updates sizes of sampler list and view and adapt those lists
2816  * accordingly.
2817  */
2818 static void
2819 nse_callback (void *cls,
2820               struct GNUNET_TIME_Absolute timestamp,
2821               double logestimate, double std_dev)
2822 {
2823   double estimate;
2824   //double scale; // TODO this might go gloabal/config
2825
2826   LOG (GNUNET_ERROR_TYPE_DEBUG,
2827        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2828        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2829   //scale = .01;
2830   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2831   // GNUNET_NSE_log_estimate_to_n (logestimate);
2832   estimate = pow (estimate, 1.0 / 3);
2833   // TODO add if std_dev is a number
2834   // estimate += (std_dev * scale);
2835   if (view_size_est_min < ceil (estimate))
2836   {
2837     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2838     sampler_size_est_need = estimate;
2839     view_size_est_need = estimate;
2840   } else
2841   {
2842     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2843     //sampler_size_est_need = view_size_est_min;
2844     view_size_est_need = view_size_est_min;
2845   }
2846
2847   /* If the NSE has changed adapt the lists accordingly */
2848   resize_wrapper (prot_sampler, sampler_size_est_need);
2849   client_resize_wrapper ();
2850 }
2851
2852
2853 /**
2854  * Callback called once the requested PeerIDs are ready.
2855  *
2856  * Sends those to the requesting client.
2857  */
2858 static void
2859 client_respond (void *cls,
2860                 struct GNUNET_PeerIdentity *peer_ids,
2861                 uint32_t num_peers)
2862 {
2863   struct ReplyCls *reply_cls = cls;
2864   uint32_t i;
2865   struct GNUNET_MQ_Envelope *ev;
2866   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2867   uint32_t size_needed;
2868   struct ClientContext *cli_ctx;
2869
2870   GNUNET_assert (NULL != reply_cls);
2871   LOG (GNUNET_ERROR_TYPE_DEBUG,
2872        "sampler returned %" PRIu32 " peers:\n",
2873        num_peers);
2874   for (i = 0; i < num_peers; i++)
2875   {
2876     LOG (GNUNET_ERROR_TYPE_DEBUG,
2877          "  %" PRIu32 ": %s\n",
2878          i,
2879          GNUNET_i2s (&peer_ids[i]));
2880   }
2881
2882   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2883                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2884
2885   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2886
2887   ev = GNUNET_MQ_msg_extra (out_msg,
2888                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2889                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2890   out_msg->num_peers = htonl (num_peers);
2891   out_msg->id = htonl (reply_cls->id);
2892
2893   GNUNET_memcpy (&out_msg[1],
2894           peer_ids,
2895           num_peers * sizeof (struct GNUNET_PeerIdentity));
2896   GNUNET_free (peer_ids);
2897
2898   cli_ctx = reply_cls->cli_ctx;
2899   GNUNET_assert (NULL != cli_ctx);
2900   reply_cls->req_handle = NULL;
2901   destroy_reply_cls (reply_cls);
2902   GNUNET_MQ_send (cli_ctx->mq, ev);
2903 }
2904
2905
2906 /**
2907  * Handle RPS request from the client.
2908  *
2909  * @param cls closure
2910  * @param message the actual message
2911  */
2912 static void
2913 handle_client_request (void *cls,
2914                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2915 {
2916   struct ClientContext *cli_ctx = cls;
2917   uint32_t num_peers;
2918   uint32_t size_needed;
2919   struct ReplyCls *reply_cls;
2920   uint32_t i;
2921
2922   num_peers = ntohl (msg->num_peers);
2923   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2924                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2925
2926   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2927   {
2928     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2929                 "Message received from client has size larger than expected\n");
2930     GNUNET_SERVICE_client_drop (cli_ctx->client);
2931     return;
2932   }
2933
2934   for (i = 0 ; i < num_peers ; i++)
2935     est_request_rate();
2936
2937   LOG (GNUNET_ERROR_TYPE_DEBUG,
2938        "Client requested %" PRIu32 " random peer(s).\n",
2939        num_peers);
2940
2941   reply_cls = GNUNET_new (struct ReplyCls);
2942   reply_cls->id = ntohl (msg->id);
2943   reply_cls->cli_ctx = cli_ctx;
2944   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2945                                                         client_respond,
2946                                                         reply_cls,
2947                                                         num_peers);
2948
2949   GNUNET_assert (NULL != cli_ctx);
2950   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2951                                cli_ctx->rep_cls_tail,
2952                                reply_cls);
2953   GNUNET_SERVICE_client_continue (cli_ctx->client);
2954 }
2955
2956
2957 /**
2958  * @brief Handle a message that requests the cancellation of a request
2959  *
2960  * @param cls unused
2961  * @param message the message containing the id of the request
2962  */
2963 static void
2964 handle_client_request_cancel (void *cls,
2965                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2966 {
2967   struct ClientContext *cli_ctx = cls;
2968   struct ReplyCls *rep_cls;
2969
2970   GNUNET_assert (NULL != cli_ctx);
2971   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2972   rep_cls = cli_ctx->rep_cls_head;
2973   LOG (GNUNET_ERROR_TYPE_DEBUG,
2974       "Client cancels request with id %" PRIu32 "\n",
2975       ntohl (msg->id));
2976   while ( (NULL != rep_cls->next) &&
2977           (rep_cls->id != ntohl (msg->id)) )
2978     rep_cls = rep_cls->next;
2979   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2980   destroy_reply_cls (rep_cls);
2981   GNUNET_SERVICE_client_continue (cli_ctx->client);
2982 }
2983
2984
2985 /**
2986  * @brief This function is called, when the client seeds peers.
2987  * It verifies that @a msg is well-formed.
2988  *
2989  * @param cls the closure (#ClientContext)
2990  * @param msg the message
2991  * @return #GNUNET_OK if @a msg is well-formed
2992  */
2993 static int
2994 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2995 {
2996   struct ClientContext *cli_ctx = cls;
2997   uint16_t msize = ntohs (msg->header.size);
2998   uint32_t num_peers = ntohl (msg->num_peers);
2999
3000   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3001   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3002        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3003   {
3004     GNUNET_break (0);
3005     GNUNET_SERVICE_client_drop (cli_ctx->client);
3006     return GNUNET_SYSERR;
3007   }
3008   return GNUNET_OK;
3009 }
3010
3011
3012 /**
3013  * Handle seed from the client.
3014  *
3015  * @param cls closure
3016  * @param message the actual message
3017  */
3018 static void
3019 handle_client_seed (void *cls,
3020                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3021 {
3022   struct ClientContext *cli_ctx = cls;
3023   struct GNUNET_PeerIdentity *peers;
3024   uint32_t num_peers;
3025   uint32_t i;
3026
3027   num_peers = ntohl (msg->num_peers);
3028   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3029   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3030   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
3031
3032   LOG (GNUNET_ERROR_TYPE_DEBUG,
3033        "Client seeded peers:\n");
3034   print_peer_list (peers, num_peers);
3035
3036   for (i = 0; i < num_peers; i++)
3037   {
3038     LOG (GNUNET_ERROR_TYPE_DEBUG,
3039          "Updating samplers with seed %" PRIu32 ": %s\n",
3040          i,
3041          GNUNET_i2s (&peers[i]));
3042
3043     got_peer (&peers[i]);
3044   }
3045
3046   ////GNUNET_free (peers);
3047
3048   GNUNET_SERVICE_client_continue (cli_ctx->client);
3049 }
3050
3051 /**
3052  * @brief Send view to client
3053  *
3054  * @param cli_ctx the context of the client
3055  * @param view_array the peerids of the view as array (can be empty)
3056  * @param view_size the size of the view array (can be 0)
3057  */
3058 void
3059 send_view (const struct ClientContext *cli_ctx,
3060            const struct GNUNET_PeerIdentity *view_array,
3061            uint64_t view_size)
3062 {
3063   struct GNUNET_MQ_Envelope *ev;
3064   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3065
3066   if (NULL == view_array)
3067   {
3068     view_size = View_size ();
3069     view_array = View_get_as_array();
3070   }
3071
3072   ev = GNUNET_MQ_msg_extra (out_msg,
3073                             view_size * sizeof (struct GNUNET_PeerIdentity),
3074                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3075   out_msg->num_peers = htonl (view_size);
3076
3077   GNUNET_memcpy (&out_msg[1],
3078           view_array,
3079           view_size * sizeof (struct GNUNET_PeerIdentity));
3080   GNUNET_MQ_send (cli_ctx->mq, ev);
3081 }
3082
3083 /**
3084  * @brief sends updates to clients that are interested
3085  */
3086 static void
3087 clients_notify_view_update (void)
3088 {
3089   struct ClientContext *cli_ctx_iter;
3090   uint64_t num_peers;
3091   const struct GNUNET_PeerIdentity *view_array;
3092
3093   num_peers = View_size ();
3094   view_array = View_get_as_array();
3095   /* check size of view is small enough */
3096   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3097   {
3098     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3099                 "View is too big to send\n");
3100     return;
3101   }
3102
3103   for (cli_ctx_iter = cli_ctx_head;
3104        NULL != cli_ctx_iter;
3105        cli_ctx_iter = cli_ctx_head->next)
3106   {
3107     if (1 < cli_ctx_iter->view_updates_left)
3108     {
3109       /* Client wants to receive limited amount of updates */
3110       cli_ctx_iter->view_updates_left -= 1;
3111     } else if (1 == cli_ctx_iter->view_updates_left)
3112     {
3113       /* Last update of view for client */
3114       cli_ctx_iter->view_updates_left = -1;
3115     } else if (0 > cli_ctx_iter->view_updates_left) {
3116       /* Client is not interested in updates */
3117       continue;
3118     }
3119     /* else _updates_left == 0 - infinite amount of updates */
3120
3121     /* send view */
3122     send_view (cli_ctx_iter, view_array, num_peers);
3123   }
3124 }
3125
3126
3127 /**
3128  * Handle RPS request from the client.
3129  *
3130  * @param cls closure
3131  * @param message the actual message
3132  */
3133 static void
3134 handle_client_view_request (void *cls,
3135                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3136 {
3137   struct ClientContext *cli_ctx = cls;
3138   uint64_t num_updates;
3139
3140   num_updates = ntohl (msg->num_updates);
3141
3142   LOG (GNUNET_ERROR_TYPE_DEBUG,
3143        "Client requested %" PRIu64 " updates of view.\n",
3144        num_updates);
3145
3146   GNUNET_assert (NULL != cli_ctx);
3147   cli_ctx->view_updates_left = num_updates;
3148   send_view (cli_ctx, NULL, 0);
3149   GNUNET_SERVICE_client_continue (cli_ctx->client);
3150 }
3151
3152 /**
3153  * Handle a CHECK_LIVE message from another peer.
3154  *
3155  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3156  * the channel is blocked for all other communication.
3157  *
3158  * @param cls Closure
3159  * @param msg The message header
3160  */
3161 static void
3162 handle_peer_check (void *cls,
3163                    const struct GNUNET_MessageHeader *msg)
3164 {
3165   const struct GNUNET_PeerIdentity *peer = cls;
3166   LOG (GNUNET_ERROR_TYPE_DEBUG,
3167       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3168
3169   GNUNET_break_op (Peers_check_peer_known (peer));
3170   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3171 }
3172
3173 /**
3174  * Handle a PUSH message from another peer.
3175  *
3176  * Check the proof of work and store the PeerID
3177  * in the temporary list for pushed PeerIDs.
3178  *
3179  * @param cls Closure
3180  * @param msg The message header
3181  */
3182 static void
3183 handle_peer_push (void *cls,
3184                   const struct GNUNET_MessageHeader *msg)
3185 {
3186   const struct GNUNET_PeerIdentity *peer = cls;
3187
3188   // (check the proof of work (?))
3189
3190   LOG (GNUNET_ERROR_TYPE_DEBUG,
3191        "Received PUSH (%s)\n",
3192        GNUNET_i2s (peer));
3193   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3194
3195   #ifdef ENABLE_MALICIOUS
3196   struct AttackedPeer *tmp_att_peer;
3197
3198   if ( (1 == mal_type) ||
3199        (3 == mal_type) )
3200   { /* Try to maximise representation */
3201     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3202     tmp_att_peer->peer_id = *peer;
3203     if (NULL == att_peer_set)
3204       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3205     if (GNUNET_NO ==
3206         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3207                                                 peer))
3208     {
3209       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3210                                    att_peers_tail,
3211                                    tmp_att_peer);
3212       add_peer_array_to_set (peer, 1, att_peer_set);
3213     }
3214   }
3215
3216
3217   else if (2 == mal_type)
3218   {
3219     /* We attack one single well-known peer - simply ignore */
3220   }
3221   #endif /* ENABLE_MALICIOUS */
3222
3223   /* Add the sending peer to the push_map */
3224   CustomPeerMap_put (push_map, peer);
3225
3226   GNUNET_break_op (Peers_check_peer_known (peer));
3227   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3228 }
3229
3230
3231 /**
3232  * Handle PULL REQUEST request message from another peer.
3233  *
3234  * Reply with the view of PeerIDs.
3235  *
3236  * @param cls Closure
3237  * @param msg The message header
3238  */
3239 static void
3240 handle_peer_pull_request (void *cls,
3241                           const struct GNUNET_MessageHeader *msg)
3242 {
3243   struct GNUNET_PeerIdentity *peer = cls;
3244   const struct GNUNET_PeerIdentity *view_array;
3245
3246   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3247   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3248
3249   #ifdef ENABLE_MALICIOUS
3250   if (1 == mal_type
3251       || 3 == mal_type)
3252   { /* Try to maximise representation */
3253     send_pull_reply (peer, mal_peers, num_mal_peers);
3254   }
3255
3256   else if (2 == mal_type)
3257   { /* Try to partition network */
3258     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3259     {
3260       send_pull_reply (peer, mal_peers, num_mal_peers);
3261     }
3262   }
3263   #endif /* ENABLE_MALICIOUS */
3264
3265   GNUNET_break_op (Peers_check_peer_known (peer));
3266   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3267   view_array = View_get_as_array ();
3268   send_pull_reply (peer, view_array, View_size ());
3269 }
3270
3271
3272 /**
3273  * Check whether we sent a corresponding request and
3274  * whether this reply is the first one.
3275  *
3276  * @param cls Closure
3277  * @param msg The message header
3278  */
3279 static int
3280 check_peer_pull_reply (void *cls,
3281                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3282 {
3283   struct GNUNET_PeerIdentity *sender = cls;
3284
3285   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3286   {
3287     GNUNET_break_op (0);
3288     return GNUNET_SYSERR;
3289   }
3290
3291   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3292       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3293   {
3294     LOG (GNUNET_ERROR_TYPE_ERROR,
3295         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3296         ntohl (msg->num_peers),
3297         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3298             sizeof (struct GNUNET_PeerIdentity));
3299     GNUNET_break_op (0);
3300     return GNUNET_SYSERR;
3301   }
3302
3303   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3304   {
3305     LOG (GNUNET_ERROR_TYPE_WARNING,
3306         "Received a pull reply from a peer we didn't request one from!\n");
3307     GNUNET_break_op (0);
3308     return GNUNET_SYSERR;
3309   }
3310   return GNUNET_OK;
3311 }
3312
3313 /**
3314  * Handle PULL REPLY message from another peer.
3315  *
3316  * @param cls Closure
3317  * @param msg The message header
3318  */
3319 static void
3320 handle_peer_pull_reply (void *cls,
3321                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3322 {
3323   const struct GNUNET_PeerIdentity *peers;
3324   struct GNUNET_PeerIdentity *sender = cls;
3325   uint32_t i;
3326 #ifdef ENABLE_MALICIOUS
3327   struct AttackedPeer *tmp_att_peer;
3328 #endif /* ENABLE_MALICIOUS */
3329
3330   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3331   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3332
3333   #ifdef ENABLE_MALICIOUS
3334   // We shouldn't even receive pull replies as we're not sending
3335   if (2 == mal_type)
3336   {
3337   }
3338   #endif /* ENABLE_MALICIOUS */
3339
3340   /* Do actual logic */
3341   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3342
3343   LOG (GNUNET_ERROR_TYPE_DEBUG,
3344        "PULL REPLY received, got following %u peers:\n",
3345        ntohl (msg->num_peers));
3346
3347   for (i = 0; i < ntohl (msg->num_peers); i++)
3348   {
3349     LOG (GNUNET_ERROR_TYPE_DEBUG,
3350          "%u. %s\n",
3351          i,
3352          GNUNET_i2s (&peers[i]));
3353
3354     #ifdef ENABLE_MALICIOUS
3355     if ((NULL != att_peer_set) &&
3356         (1 == mal_type || 3 == mal_type))
3357     { /* Add attacked peer to local list */
3358       // TODO check if we sent a request and this was the first reply
3359       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3360                                                                &peers[i])
3361           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3362                                                                   &peers[i])
3363           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3364                                                    &own_identity))
3365       {
3366         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3367         tmp_att_peer->peer_id = peers[i];
3368         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3369                                      att_peers_tail,
3370                                      tmp_att_peer);
3371         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3372       }
3373       continue;
3374     }
3375     #endif /* ENABLE_MALICIOUS */
3376     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3377                                               &peers[i]))
3378     {
3379       /* Make sure we 'know' about this peer */
3380       (void) Peers_insert_peer (&peers[i]);
3381
3382       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3383       {
3384         CustomPeerMap_put (pull_map, &peers[i]);
3385       }
3386       else
3387       {
3388         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3389         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3390       }
3391     }
3392   }
3393
3394   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3395   clean_peer (sender);
3396
3397   GNUNET_break_op (Peers_check_peer_known (sender));
3398   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3399 }
3400
3401
3402 /**
3403  * Compute a random delay.
3404  * A uniformly distributed value between mean + spread and mean - spread.
3405  *
3406  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3407  * It would return a random value between 2 and 6 min.
3408  *
3409  * @param mean the mean
3410  * @param spread the inverse amount of deviation from the mean
3411  */
3412 static struct GNUNET_TIME_Relative
3413 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3414                     unsigned int spread)
3415 {
3416   struct GNUNET_TIME_Relative half_interval;
3417   struct GNUNET_TIME_Relative ret;
3418   unsigned int rand_delay;
3419   unsigned int max_rand_delay;
3420
3421   if (0 == spread)
3422   {
3423     LOG (GNUNET_ERROR_TYPE_WARNING,
3424          "Not accepting spread of 0\n");
3425     GNUNET_break (0);
3426     GNUNET_assert (0);
3427   }
3428   GNUNET_assert (0 != mean.rel_value_us);
3429
3430   /* Compute random time value between spread * mean and spread * mean */
3431   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3432
3433   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3434   /**
3435    * Compute random value between (0 and 1) * round_interval
3436    * via multiplying round_interval with a 'fraction' (0 to value)/value
3437    */
3438   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3439   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3440   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3441   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3442
3443   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3444     LOG (GNUNET_ERROR_TYPE_WARNING,
3445          "Returning FOREVER_REL\n");
3446
3447   return ret;
3448 }
3449
3450
3451 /**
3452  * Send single pull request
3453  *
3454  * @param peer_id the peer to send the pull request to.
3455  */
3456 static void
3457 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3458 {
3459   struct GNUNET_MQ_Envelope *ev;
3460
3461   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3462                                                      Peers_PULL_REPLY_PENDING));
3463   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3464
3465   LOG (GNUNET_ERROR_TYPE_DEBUG,
3466        "Going to send PULL REQUEST to peer %s.\n",
3467        GNUNET_i2s (peer));
3468
3469   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3470   Peers_send_message (peer, ev, "PULL REQUEST");
3471   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3472 }
3473
3474
3475 /**
3476  * Send single push
3477  *
3478  * @param peer_id the peer to send the push to.
3479  */
3480 static void
3481 send_push (const struct GNUNET_PeerIdentity *peer_id)
3482 {
3483   struct GNUNET_MQ_Envelope *ev;
3484
3485   LOG (GNUNET_ERROR_TYPE_DEBUG,
3486        "Going to send PUSH to peer %s.\n",
3487        GNUNET_i2s (peer_id));
3488
3489   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3490   Peers_send_message (peer_id, ev, "PUSH");
3491   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3492 }
3493
3494
3495 static void
3496 do_round (void *cls);
3497
3498 static void
3499 do_mal_round (void *cls);
3500
3501 #ifdef ENABLE_MALICIOUS
3502
3503
3504 /**
3505  * @brief This function is called, when the client tells us to act malicious.
3506  * It verifies that @a msg is well-formed.
3507  *
3508  * @param cls the closure (#ClientContext)
3509  * @param msg the message
3510  * @return #GNUNET_OK if @a msg is well-formed
3511  */
3512 static int
3513 check_client_act_malicious (void *cls,
3514                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3515 {
3516   struct ClientContext *cli_ctx = cls;
3517   uint16_t msize = ntohs (msg->header.size);
3518   uint32_t num_peers = ntohl (msg->num_peers);
3519
3520   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3521   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3522        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3523   {
3524     LOG (GNUNET_ERROR_TYPE_ERROR,
3525         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3526         ntohl (msg->num_peers),
3527         (msize / sizeof (struct GNUNET_PeerIdentity)));
3528     GNUNET_break (0);
3529     GNUNET_SERVICE_client_drop (cli_ctx->client);
3530     return GNUNET_SYSERR;
3531   }
3532   return GNUNET_OK;
3533 }
3534
3535 /**
3536  * Turn RPS service to act malicious.
3537  *
3538  * @param cls Closure
3539  * @param client The client that sent the message
3540  * @param msg The message header
3541  */
3542 static void
3543 handle_client_act_malicious (void *cls,
3544                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3545 {
3546   struct ClientContext *cli_ctx = cls;
3547   struct GNUNET_PeerIdentity *peers;
3548   uint32_t num_mal_peers_sent;
3549   uint32_t num_mal_peers_old;
3550
3551   /* Do actual logic */
3552   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3553   mal_type = ntohl (msg->type);
3554   if (NULL == mal_peer_set)
3555     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3556
3557   LOG (GNUNET_ERROR_TYPE_DEBUG,
3558        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3559        mal_type,
3560        ntohl (msg->num_peers));
3561
3562   if (1 == mal_type)
3563   { /* Try to maximise representation */
3564     /* Add other malicious peers to those we already know */
3565
3566     num_mal_peers_sent = ntohl (msg->num_peers);
3567     num_mal_peers_old = num_mal_peers;
3568     GNUNET_array_grow (mal_peers,
3569                        num_mal_peers,
3570                        num_mal_peers + num_mal_peers_sent);
3571     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3572             peers,
3573             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3574
3575     /* Add all mal peers to mal_peer_set */
3576     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3577                            num_mal_peers_sent,
3578                            mal_peer_set);
3579
3580     /* Substitute do_round () with do_mal_round () */
3581     GNUNET_SCHEDULER_cancel (do_round_task);
3582     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3583   }
3584
3585   else if ( (2 == mal_type) ||
3586             (3 == mal_type) )
3587   { /* Try to partition the network */
3588     /* Add other malicious peers to those we already know */
3589
3590     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3591     num_mal_peers_old = num_mal_peers;
3592     GNUNET_array_grow (mal_peers,
3593                        num_mal_peers,
3594                        num_mal_peers + num_mal_peers_sent);
3595     if (NULL != mal_peers &&
3596         0 != num_mal_peers)
3597     {
3598       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3599               peers,
3600               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3601
3602       /* Add all mal peers to mal_peer_set */
3603       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3604                              num_mal_peers_sent,
3605                              mal_peer_set);
3606     }
3607
3608     /* Store the one attacked peer */
3609     GNUNET_memcpy (&attacked_peer,
3610             &msg->attacked_peer,
3611             sizeof (struct GNUNET_PeerIdentity));
3612     /* Set the flag of the attacked peer to valid to avoid problems */
3613     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3614     {
3615       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3616     }
3617
3618     LOG (GNUNET_ERROR_TYPE_DEBUG,
3619          "Attacked peer is %s\n",
3620          GNUNET_i2s (&attacked_peer));
3621
3622     /* Substitute do_round () with do_mal_round () */
3623     GNUNET_SCHEDULER_cancel (do_round_task);
3624     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3625   }
3626   else if (0 == mal_type)
3627   { /* Stop acting malicious */
3628     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3629
3630     /* Substitute do_mal_round () with do_round () */
3631     GNUNET_SCHEDULER_cancel (do_round_task);
3632     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3633   }
3634   else
3635   {
3636     GNUNET_break (0);
3637     GNUNET_SERVICE_client_continue (cli_ctx->client);
3638   }
3639   GNUNET_SERVICE_client_continue (cli_ctx->client);
3640 }
3641
3642
3643 /**
3644  * Send out PUSHes and PULLs maliciously.
3645  *
3646  * This is executed regylary.
3647  */
3648 static void
3649 do_mal_round (void *cls)
3650 {
3651   uint32_t num_pushes;
3652   uint32_t i;
3653   struct GNUNET_TIME_Relative time_next_round;
3654   struct AttackedPeer *tmp_att_peer;
3655
3656   LOG (GNUNET_ERROR_TYPE_DEBUG,
3657        "Going to execute next round maliciously type %" PRIu32 ".\n",
3658       mal_type);
3659   do_round_task = NULL;
3660   GNUNET_assert (mal_type <= 3);
3661   /* Do malicious actions */
3662   if (1 == mal_type)
3663   { /* Try to maximise representation */
3664
3665     /* The maximum of pushes we're going to send this round */
3666     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3667                                          num_attacked_peers),
3668                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3669
3670     LOG (GNUNET_ERROR_TYPE_DEBUG,
3671          "Going to send %" PRIu32 " pushes\n",
3672          num_pushes);
3673
3674     /* Send PUSHes to attacked peers */
3675     for (i = 0 ; i < num_pushes ; i++)
3676     {
3677       if (att_peers_tail == att_peer_index)
3678         att_peer_index = att_peers_head;
3679       else
3680         att_peer_index = att_peer_index->next;
3681
3682       send_push (&att_peer_index->peer_id);
3683     }
3684
3685     /* Send PULLs to some peers to learn about additional peers to attack */
3686     tmp_att_peer = att_peer_index;
3687     for (i = 0 ; i < num_pushes * alpha ; i++)
3688     {
3689       if (att_peers_tail == tmp_att_peer)
3690         tmp_att_peer = att_peers_head;
3691       else
3692         att_peer_index = tmp_att_peer->next;
3693
3694       send_pull_request (&tmp_att_peer->peer_id);
3695     }
3696   }
3697
3698
3699   else if (2 == mal_type)
3700   { /**
3701      * Try to partition the network
3702      * Send as many pushes to the attacked peer as possible
3703      * That is one push per round as it will ignore more.
3704      */
3705     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3706     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3707       send_push (&attacked_peer);
3708   }
3709
3710
3711   if (3 == mal_type)
3712   { /* Combined attack */
3713
3714     /* Send PUSH to attacked peers */
3715     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3716     {
3717       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3718       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3719       {
3720         LOG (GNUNET_ERROR_TYPE_DEBUG,
3721             "Goding to send push to attacked peer (%s)\n",
3722             GNUNET_i2s (&attacked_peer));
3723         send_push (&attacked_peer);
3724       }
3725     }
3726     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3727
3728     /* The maximum of pushes we're going to send this round */
3729     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3730                                          num_attacked_peers),
3731                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3732
3733     LOG (GNUNET_ERROR_TYPE_DEBUG,
3734          "Going to send %" PRIu32 " pushes\n",
3735          num_pushes);
3736
3737     for (i = 0; i < num_pushes; i++)
3738     {
3739       if (att_peers_tail == att_peer_index)
3740         att_peer_index = att_peers_head;
3741       else
3742         att_peer_index = att_peer_index->next;
3743
3744       send_push (&att_peer_index->peer_id);
3745     }
3746
3747     /* Send PULLs to some peers to learn about additional peers to attack */
3748     tmp_att_peer = att_peer_index;
3749     for (i = 0; i < num_pushes * alpha; i++)
3750     {
3751       if (att_peers_tail == tmp_att_peer)
3752         tmp_att_peer = att_peers_head;
3753       else
3754         att_peer_index = tmp_att_peer->next;
3755
3756       send_pull_request (&tmp_att_peer->peer_id);
3757     }
3758   }
3759
3760   /* Schedule next round */
3761   time_next_round = compute_rand_delay (round_interval, 2);
3762
3763   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3764   //NULL);
3765   GNUNET_assert (NULL == do_round_task);
3766   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3767                                                 &do_mal_round, NULL);
3768   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3769 }
3770 #endif /* ENABLE_MALICIOUS */
3771
3772 /**
3773  * Send out PUSHes and PULLs, possibly update #view, samplers.
3774  *
3775  * This is executed regylary.
3776  */
3777 static void
3778 do_round (void *cls)
3779 {
3780   uint32_t i;
3781   const struct GNUNET_PeerIdentity *view_array;
3782   unsigned int *permut;
3783   unsigned int a_peers; /* Number of peers we send pushes to */
3784   unsigned int b_peers; /* Number of peers we send pull requests to */
3785   uint32_t first_border;
3786   uint32_t second_border;
3787   struct GNUNET_PeerIdentity peer;
3788   struct GNUNET_PeerIdentity *update_peer;
3789
3790   LOG (GNUNET_ERROR_TYPE_DEBUG,
3791        "Going to execute next round.\n");
3792   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3793   do_round_task = NULL;
3794   LOG (GNUNET_ERROR_TYPE_DEBUG,
3795        "Printing view:\n");
3796   to_file (file_name_view_log,
3797            "___ new round ___");
3798   view_array = View_get_as_array ();
3799   for (i = 0; i < View_size (); i++)
3800   {
3801     LOG (GNUNET_ERROR_TYPE_DEBUG,
3802          "\t%s\n", GNUNET_i2s (&view_array[i]));
3803     to_file (file_name_view_log,
3804              "=%s\t(do round)",
3805              GNUNET_i2s_full (&view_array[i]));
3806   }
3807
3808
3809   /* Send pushes and pull requests */
3810   if (0 < View_size ())
3811   {
3812     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3813                                            View_size ());
3814
3815     /* Send PUSHes */
3816     a_peers = ceil (alpha * View_size ());
3817
3818     LOG (GNUNET_ERROR_TYPE_DEBUG,
3819          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3820          a_peers, alpha, View_size ());
3821     for (i = 0; i < a_peers; i++)
3822     {
3823       peer = view_array[permut[i]];
3824       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3825       { // FIXME if this fails schedule/loop this for later
3826         send_push (&peer);
3827       }
3828     }
3829
3830     /* Send PULL requests */
3831     b_peers = ceil (beta * View_size ());
3832     first_border = a_peers;
3833     second_border = a_peers + b_peers;
3834     if (second_border > View_size ())
3835     {
3836       first_border = View_size () - b_peers;
3837       second_border = View_size ();
3838     }
3839     LOG (GNUNET_ERROR_TYPE_DEBUG,
3840         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3841         b_peers, beta, View_size ());
3842     for (i = first_border; i < second_border; i++)
3843     {
3844       peer = view_array[permut[i]];
3845       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3846           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3847       { // FIXME if this fails schedule/loop this for later
3848         send_pull_request (&peer);
3849       }
3850     }
3851
3852     GNUNET_free (permut);
3853     permut = NULL;
3854   }
3855
3856
3857   /* Update view */
3858   /* TODO see how many peers are in push-/pull- list! */
3859
3860   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3861       (0 < CustomPeerMap_size (push_map)) &&
3862       (0 < CustomPeerMap_size (pull_map)))
3863   //if (GNUNET_YES) // disable blocking temporarily
3864   { /* If conditions for update are fulfilled, update */
3865     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3866
3867     uint32_t final_size;
3868     uint32_t peers_to_clean_size;
3869     struct GNUNET_PeerIdentity *peers_to_clean;
3870
3871     peers_to_clean = NULL;
3872     peers_to_clean_size = 0;
3873     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3874     GNUNET_memcpy (peers_to_clean,
3875             view_array,
3876             View_size () * sizeof (struct GNUNET_PeerIdentity));
3877
3878     /* Seems like recreating is the easiest way of emptying the peermap */
3879     View_clear ();
3880     to_file (file_name_view_log,
3881              "--- emptied ---");
3882
3883     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3884                                 CustomPeerMap_size (push_map));
3885     second_border = first_border +
3886                     GNUNET_MIN (floor (beta  * view_size_est_need),
3887                                 CustomPeerMap_size (pull_map));
3888     final_size    = second_border +
3889       ceil ((1 - (alpha + beta)) * view_size_est_need);
3890     LOG (GNUNET_ERROR_TYPE_DEBUG,
3891         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3892         first_border,
3893         second_border,
3894         final_size);
3895
3896     /* Update view with peers received through PUSHes */
3897     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3898                                            CustomPeerMap_size (push_map));
3899     for (i = 0; i < first_border; i++)
3900     {
3901       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3902                                                               permut[i]));
3903       to_file (file_name_view_log,
3904                "+%s\t(push list)",
3905                GNUNET_i2s_full (&view_array[i]));
3906       // TODO change the peer_flags accordingly
3907     }
3908     GNUNET_free (permut);
3909     permut = NULL;
3910
3911     /* Update view with peers received through PULLs */
3912     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3913                                            CustomPeerMap_size (pull_map));
3914     for (i = first_border; i < second_border; i++)
3915     {
3916       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3917             permut[i - first_border]));
3918       to_file (file_name_view_log,
3919                "+%s\t(pull list)",
3920                GNUNET_i2s_full (&view_array[i]));
3921       // TODO change the peer_flags accordingly
3922     }
3923     GNUNET_free (permut);
3924     permut = NULL;
3925
3926     /* Update view with peers from history */
3927     RPS_sampler_get_n_rand_peers (prot_sampler,
3928                                   hist_update,
3929                                   NULL,
3930                                   final_size - second_border);
3931     // TODO change the peer_flags accordingly
3932
3933     for (i = 0; i < View_size (); i++)
3934       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3935
3936     /* Clean peers that were removed from the view */
3937     for (i = 0; i < peers_to_clean_size; i++)
3938     {
3939       to_file (file_name_view_log,
3940                "-%s",
3941                GNUNET_i2s_full (&peers_to_clean[i]));
3942       clean_peer (&peers_to_clean[i]);
3943       //peer_destroy_channel_send (sender);
3944     }
3945
3946     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3947     clients_notify_view_update();
3948   } else {
3949     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3950     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3951     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3952         !(0 >= CustomPeerMap_size (pull_map)))
3953       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3954     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3955         (0 >= CustomPeerMap_size (pull_map)))
3956       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3957     if (0 >= CustomPeerMap_size (push_map) &&
3958         !(0 >= CustomPeerMap_size (pull_map)))
3959       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3960     if (0 >= CustomPeerMap_size (push_map) &&
3961         (0 >= CustomPeerMap_size (pull_map)))
3962       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3963     if (0 >= CustomPeerMap_size (pull_map) &&
3964         CustomPeerMap_size (push_map) > alpha * View_size () &&
3965         0 >= CustomPeerMap_size (push_map))
3966       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3967   }
3968   // TODO independent of that also get some peers from CADET_get_peers()?
3969   GNUNET_STATISTICS_set (stats,
3970       "# peers in push map at end of round",
3971       CustomPeerMap_size (push_map),
3972       GNUNET_NO);
3973   GNUNET_STATISTICS_set (stats,
3974       "# peers in pull map at end of round",
3975       CustomPeerMap_size (pull_map),
3976       GNUNET_NO);
3977   GNUNET_STATISTICS_set (stats,
3978       "# peers in view at end of round",
3979       View_size (),
3980       GNUNET_NO);
3981
3982   LOG (GNUNET_ERROR_TYPE_DEBUG,
3983        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3984        CustomPeerMap_size (push_map),
3985        CustomPeerMap_size (pull_map),
3986        alpha,
3987        View_size (),
3988        alpha * View_size ());
3989
3990   /* Update samplers */
3991   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3992   {
3993     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3994     LOG (GNUNET_ERROR_TYPE_DEBUG,
3995          "Updating with peer %s from push list\n",
3996          GNUNET_i2s (update_peer));
3997     insert_in_sampler (NULL, update_peer);
3998     clean_peer (update_peer); /* This cleans only if it is not in the view */
3999     //peer_destroy_channel_send (sender);
4000   }
4001
4002   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
4003   {
4004     LOG (GNUNET_ERROR_TYPE_DEBUG,
4005          "Updating with peer %s from pull list\n",
4006          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
4007     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
4008     /* This cleans only if it is not in the view */
4009     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
4010     //peer_destroy_channel_send (sender);
4011   }
4012
4013
4014   /* Empty push/pull lists */
4015   CustomPeerMap_clear (push_map);
4016   CustomPeerMap_clear (pull_map);
4017
4018   struct GNUNET_TIME_Relative time_next_round;
4019
4020   time_next_round = compute_rand_delay (round_interval, 2);
4021
4022   /* Schedule next round */
4023   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4024                                                 &do_round, NULL);
4025   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4026 }
4027
4028
4029 /**
4030  * This is called from GNUNET_CADET_get_peers().
4031  *
4032  * It is called on every peer(ID) that cadet somehow has contact with.
4033  * We use those to initialise the sampler.
4034  */
4035 void
4036 init_peer_cb (void *cls,
4037               const struct GNUNET_PeerIdentity *peer,
4038               int tunnel, // "Do we have a tunnel towards this peer?"
4039               unsigned int n_paths, // "Number of known paths towards this peer"
4040               unsigned int best_path) // "How long is the best path?
4041                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4042 {
4043   if (NULL != peer)
4044   {
4045     LOG (GNUNET_ERROR_TYPE_DEBUG,
4046          "Got peer_id %s from cadet\n",
4047          GNUNET_i2s (peer));
4048     got_peer (peer);
4049   }
4050 }
4051
4052 /**
4053  * @brief Iterator function over stored, valid peers.
4054  *
4055  * We initialise the sampler with those.
4056  *
4057  * @param cls the closure
4058  * @param peer the peer id
4059  * @return #GNUNET_YES if we should continue to
4060  *         iterate,
4061  *         #GNUNET_NO if not.
4062  */
4063 static int
4064 valid_peers_iterator (void *cls,
4065                       const struct GNUNET_PeerIdentity *peer)
4066 {
4067   if (NULL != peer)
4068   {
4069     LOG (GNUNET_ERROR_TYPE_DEBUG,
4070          "Got stored, valid peer %s\n",
4071          GNUNET_i2s (peer));
4072     got_peer (peer);
4073   }
4074   return GNUNET_YES;
4075 }
4076
4077
4078 /**
4079  * Iterator over peers from peerinfo.
4080  *
4081  * @param cls closure
4082  * @param peer id of the peer, NULL for last call
4083  * @param hello hello message for the peer (can be NULL)
4084  * @param error message
4085  */
4086 void
4087 process_peerinfo_peers (void *cls,
4088                         const struct GNUNET_PeerIdentity *peer,
4089                         const struct GNUNET_HELLO_Message *hello,
4090                         const char *err_msg)
4091 {
4092   if (NULL != peer)
4093   {
4094     LOG (GNUNET_ERROR_TYPE_DEBUG,
4095          "Got peer_id %s from peerinfo\n",
4096          GNUNET_i2s (peer));
4097     got_peer (peer);
4098   }
4099 }
4100
4101
4102 /**
4103  * Task run during shutdown.
4104  *
4105  * @param cls unused
4106  */
4107 static void
4108 shutdown_task (void *cls)
4109 {
4110   struct ClientContext *client_ctx;
4111   struct ReplyCls *reply_cls;
4112
4113   LOG (GNUNET_ERROR_TYPE_DEBUG,
4114        "RPS is going down\n");
4115
4116   /* Clean all clients */
4117   for (client_ctx = cli_ctx_head;
4118        NULL != cli_ctx_head;
4119        client_ctx = cli_ctx_head)
4120   {
4121     /* Clean pending requests to the sampler */
4122     for (reply_cls = client_ctx->rep_cls_head;
4123          NULL != client_ctx->rep_cls_head;
4124          reply_cls = client_ctx->rep_cls_head)
4125     {
4126       RPS_sampler_request_cancel (reply_cls->req_handle);
4127       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4128                                    client_ctx->rep_cls_tail,
4129                                    reply_cls);
4130       GNUNET_free (reply_cls);
4131     }
4132     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4133     GNUNET_free (client_ctx);
4134   }
4135   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4136   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4137
4138   if (NULL != do_round_task)
4139   {
4140     GNUNET_SCHEDULER_cancel (do_round_task);
4141     do_round_task = NULL;
4142   }
4143
4144   Peers_terminate ();
4145
4146   GNUNET_NSE_disconnect (nse);
4147   RPS_sampler_destroy (prot_sampler);
4148   RPS_sampler_destroy (client_sampler);
4149   GNUNET_CADET_close_port (cadet_port);
4150   GNUNET_CADET_disconnect (cadet_handle);
4151   View_destroy ();
4152   CustomPeerMap_destroy (push_map);
4153   CustomPeerMap_destroy (pull_map);
4154   if (NULL != stats)
4155   {
4156     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4157     stats = NULL;
4158   }
4159   #ifdef ENABLE_MALICIOUS
4160   struct AttackedPeer *tmp_att_peer;
4161   /* it is ok to free this const during shutdown: */
4162   GNUNET_free ((char *) file_name_view_log);
4163   #ifdef TO_FILE
4164   GNUNET_free ((char *) file_name_observed_log);
4165   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
4166   #endif /* TO_FILE */
4167   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4168   if (NULL != mal_peer_set)
4169     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4170   if (NULL != att_peer_set)
4171     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4172   while (NULL != att_peers_head)
4173   {
4174     tmp_att_peer = att_peers_head;
4175     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4176   }
4177   #endif /* ENABLE_MALICIOUS */
4178 }
4179
4180
4181 /**
4182  * Handle client connecting to the service.
4183  *
4184  * @param cls NULL
4185  * @param client the new client
4186  * @param mq the message queue of @a client
4187  * @return @a client
4188  */
4189 static void *
4190 client_connect_cb (void *cls,
4191                    struct GNUNET_SERVICE_Client *client,
4192                    struct GNUNET_MQ_Handle *mq)
4193 {
4194   struct ClientContext *cli_ctx;
4195
4196   LOG (GNUNET_ERROR_TYPE_DEBUG,
4197        "Client connected\n");
4198   if (NULL == client)
4199     return client; /* Server was destroyed before a client connected. Shutting down */
4200   cli_ctx = GNUNET_new (struct ClientContext);
4201   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4202   cli_ctx->view_updates_left = -1;
4203   cli_ctx->client = client;
4204   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4205                                cli_ctx_tail,
4206                                cli_ctx);
4207   return cli_ctx;
4208 }
4209
4210 /**
4211  * Callback called when a client disconnected from the service
4212  *
4213  * @param cls closure for the service
4214  * @param c the client that disconnected
4215  * @param internal_cls should be equal to @a c
4216  */
4217 static void
4218 client_disconnect_cb (void *cls,
4219                       struct GNUNET_SERVICE_Client *client,
4220                       void *internal_cls)
4221 {
4222   struct ClientContext *cli_ctx = internal_cls;
4223
4224   GNUNET_assert (client == cli_ctx->client);
4225   if (NULL == client)
4226   {/* shutdown task - destroy all clients */
4227     while (NULL != cli_ctx_head)
4228       destroy_cli_ctx (cli_ctx_head);
4229   }
4230   else
4231   { /* destroy this client */
4232     LOG (GNUNET_ERROR_TYPE_DEBUG,
4233         "Client disconnected. Destroy its context.\n");
4234     destroy_cli_ctx (cli_ctx);
4235   }
4236 }
4237
4238
4239 /**
4240  * Handle random peer sampling clients.
4241  *
4242  * @param cls closure
4243  * @param c configuration to use
4244  * @param service the initialized service
4245  */
4246 static void
4247 run (void *cls,
4248      const struct GNUNET_CONFIGURATION_Handle *c,
4249      struct GNUNET_SERVICE_Handle *service)
4250 {
4251   char* fn_valid_peers;
4252   struct GNUNET_HashCode port;
4253
4254   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4255   cfg = c;
4256
4257
4258   /* Get own ID */
4259   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4260   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4261               "STARTING SERVICE (rps) for peer [%s]\n",
4262               GNUNET_i2s (&own_identity));
4263   #ifdef ENABLE_MALICIOUS
4264   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4265               "Malicious execution compiled in.\n");
4266   #endif /* ENABLE_MALICIOUS */
4267
4268
4269
4270   /* Get time interval from the configuration */
4271   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4272                                                         "ROUNDINTERVAL",
4273                                                         &round_interval))
4274   {
4275     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4276                                "RPS", "ROUNDINTERVAL");
4277     GNUNET_SCHEDULER_shutdown ();
4278     return;
4279   }
4280
4281   /* Get initial size of sampler/view from the configuration */
4282   if (GNUNET_OK !=
4283       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4284         (long long unsigned int *) &sampler_size_est_min))
4285   {
4286     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4287                                "RPS", "MINSIZE");
4288     GNUNET_SCHEDULER_shutdown ();
4289     return;
4290   }
4291   sampler_size_est_need = sampler_size_est_min;
4292   view_size_est_min = sampler_size_est_min;
4293   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4294
4295   if (GNUNET_OK !=
4296       GNUNET_CONFIGURATION_get_value_filename (cfg,
4297                                                "rps",
4298                                                "FILENAME_VALID_PEERS",
4299                                                &fn_valid_peers))
4300   {
4301     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4302                                "rps", "FILENAME_VALID_PEERS");
4303   }
4304
4305
4306   View_create (view_size_est_min);
4307
4308   /* file_name_view_log */
4309   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4310   #ifdef TO_FILE
4311   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4312   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4313   #endif /* TO_FILE */
4314
4315   /* connect to NSE */
4316   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4317
4318
4319   alpha = 0.45;
4320   beta  = 0.45;
4321
4322
4323   /* Initialise cadet */
4324   /* There exists a copy-paste-clone in get_channel() */
4325   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4326     GNUNET_MQ_hd_fixed_size (peer_check,
4327                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4328                              struct GNUNET_MessageHeader,
4329                              NULL),
4330     GNUNET_MQ_hd_fixed_size (peer_push,
4331                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4332                              struct GNUNET_MessageHeader,
4333                              NULL),
4334     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4335                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4336                              struct GNUNET_MessageHeader,
4337                              NULL),
4338     GNUNET_MQ_hd_var_size (peer_pull_reply,
4339                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4340                            struct GNUNET_RPS_P2P_PullReplyMessage,
4341                            NULL),
4342     GNUNET_MQ_handler_end ()
4343   };
4344
4345   cadet_handle = GNUNET_CADET_connect (cfg);
4346   GNUNET_assert (NULL != cadet_handle);
4347   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4348                       strlen (GNUNET_APPLICATION_PORT_RPS),
4349                       &port);
4350   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4351                                        &port,
4352                                        &Peers_handle_inbound_channel, /* Connect handler */
4353                                        NULL, /* cls */
4354                                        NULL, /* WindowSize handler */
4355                                        cleanup_destroyed_channel, /* Disconnect handler */
4356                                        cadet_handlers);
4357
4358
4359   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4360   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4361   GNUNET_free (fn_valid_peers);
4362
4363   /* Initialise sampler */
4364   struct GNUNET_TIME_Relative half_round_interval;
4365   struct GNUNET_TIME_Relative  max_round_interval;
4366
4367   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4368   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4369
4370   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4371   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4372
4373   /* Initialise push and pull maps */
4374   push_map = CustomPeerMap_create (4);
4375   pull_map = CustomPeerMap_create (4);
4376
4377
4378   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4379   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4380   // TODO send push/pull to each of those peers?
4381   // TODO read stored valid peers from last run
4382   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4383   Peers_get_valid_peers (valid_peers_iterator, NULL);
4384
4385   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4386                                                    GNUNET_NO,
4387                                                    process_peerinfo_peers,
4388                                                    NULL);
4389
4390   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4391
4392   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4393   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4394
4395   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4396   stats = GNUNET_STATISTICS_create ("rps", cfg);
4397
4398 }
4399
4400
4401 /**
4402  * Define "main" method using service macro.
4403  */
4404 GNUNET_SERVICE_MAIN
4405 ("rps",
4406  GNUNET_SERVICE_OPTION_NONE,
4407  &run,
4408  &client_connect_cb,
4409  &client_disconnect_cb,
4410  NULL,
4411  GNUNET_MQ_hd_fixed_size (client_request,
4412    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4413    struct GNUNET_RPS_CS_RequestMessage,
4414    NULL),
4415  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4416    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4417    struct GNUNET_RPS_CS_RequestCancelMessage,
4418    NULL),
4419  GNUNET_MQ_hd_var_size (client_seed,
4420    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4421    struct GNUNET_RPS_CS_SeedMessage,
4422    NULL),
4423 #ifdef ENABLE_MALICIOUS
4424  GNUNET_MQ_hd_var_size (client_act_malicious,
4425    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4426    struct GNUNET_RPS_CS_ActMaliciousMessage,
4427    NULL),
4428 #endif /* ENABLE_MALICIOUS */
4429  GNUNET_MQ_hd_fixed_size (client_view_request,
4430    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4431    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4432    NULL),
4433  GNUNET_MQ_handler_end());
4434
4435 /* end of gnunet-service-rps.c */