rps service: configure minimum view instead of initval
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "rps.h"
34 #include "rps-test_util.h"
35 #include "gnunet-service-rps_sampler.h"
36 #include "gnunet-service-rps_custommap.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Handle to the statistics service.
65  */
66 static struct GNUNET_STATISTICS_Handle *stats;
67
68 /**
69  * Our own identity.
70  */
71 static struct GNUNET_PeerIdentity own_identity;
72
73
74
75 /***********************************************************************
76  * Old gnunet-service-rps_peers.c
77 ***********************************************************************/
78
79 /**
80  * Set a peer flag of given peer context.
81  */
82 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
83
84 /**
85  * Get peer flag of given peer context.
86  */
87 #define check_peer_flag_set(peer_ctx, mask)\
88   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
89
90 /**
91  * Unset flag of given peer context.
92  */
93 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
94
95 /**
96  * Set a channel flag of given channel context.
97  */
98 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * Struct used to keep track of other peer's status
164  *
165  * This is stored in a multipeermap.
166  * It contains information such as cadet channels, a message queue for sending,
167  * status about the channels, the pending operations on this peer and some flags
168  * about the status of the peer itself. (live, valid, ...)
169  */
170 struct PeerContext
171 {
172   /**
173    * Message queue open to client
174    */
175   struct GNUNET_MQ_Handle *mq;
176
177   /**
178    * Channel open to client.
179    */
180   struct GNUNET_CADET_Channel *send_channel;
181
182   /**
183    * Flags to the sending channel
184    */
185   uint32_t *send_channel_flags;
186
187   /**
188    * Channel open from client.
189    */
190   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
191
192   /**
193    * Flags to the receiving channel
194    */
195   uint32_t *recv_channel_flags;
196
197   /**
198    * Array of pending operations on this peer.
199    */
200   struct PeerPendingOp *pending_ops;
201
202   /**
203    * Handle to the callback given to cadet_ntfy_tmt_rdy()
204    *
205    * To be canceled on shutdown.
206    */
207   struct PendingMessage *liveliness_check_pending;
208
209   /**
210    * Number of pending operations.
211    */
212   unsigned int num_pending_ops;
213
214   /**
215    * Identity of the peer
216    */
217   struct GNUNET_PeerIdentity peer_id;
218
219   /**
220    * Flags indicating status of peer
221    */
222   uint32_t peer_flags;
223
224   /**
225    * Last time we received something from that peer.
226    */
227   struct GNUNET_TIME_Absolute last_message_recv;
228
229   /**
230    * Last time we received a keepalive message.
231    */
232   struct GNUNET_TIME_Absolute last_keepalive;
233
234   /**
235    * DLL with all messages that are yet to be sent
236    */
237   struct PendingMessage *pending_messages_head;
238   struct PendingMessage *pending_messages_tail;
239
240   /**
241    * This is pobably followed by 'statistical' data (when we first saw
242    * him, how did we get his ID, how many pushes (in a timeinterval),
243    * ...)
244    */
245 };
246
247 /**
248  * @brief Closure to #valid_peer_iterator
249  */
250 struct PeersIteratorCls
251 {
252   /**
253    * Iterator function
254    */
255   PeersIterator iterator;
256
257   /**
258    * Closure to iterator
259    */
260   void *cls;
261 };
262
263 /**
264  * @brief Hashmap of valid peers.
265  */
266 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
267
268 /**
269  * @brief Maximum number of valid peers to keep.
270  * TODO read from config
271  */
272 static uint32_t num_valid_peers_max = UINT32_MAX;
273
274 /**
275  * @brief Filename of the file that stores the valid peers persistently.
276  */
277 static char *filename_valid_peers;
278
279 /**
280  * Set of all peers to keep track of them.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
283
284 /**
285  * Cadet handle.
286  */
287 static struct GNUNET_CADET_Handle *cadet_handle;
288
289
290
291 /**
292  * @brief Get the #PeerContext associated with a peer
293  *
294  * @param peer the peer id
295  *
296  * @return the #PeerContext
297  */
298 static struct PeerContext *
299 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
300 {
301   struct PeerContext *ctx;
302   int ret;
303
304   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
305   GNUNET_assert (GNUNET_YES == ret);
306   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
307   GNUNET_assert (NULL != ctx);
308   return ctx;
309 }
310
311 int
312 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
313
314 /**
315  * @brief Create a new #PeerContext and insert it into the peer map
316  *
317  * @param peer the peer to create the #PeerContext for
318  *
319  * @return the #PeerContext
320  */
321 static struct PeerContext *
322 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
323 {
324   struct PeerContext *ctx;
325   int ret;
326
327   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
328
329   ctx = GNUNET_new (struct PeerContext);
330   ctx->peer_id = *peer;
331   ctx->send_channel_flags = GNUNET_new (uint32_t);
332   ctx->recv_channel_flags = GNUNET_new (uint32_t);
333   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
334       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
335   GNUNET_assert (GNUNET_OK == ret);
336   return ctx;
337 }
338
339
340 /**
341  * @brief Create or get a #PeerContext
342  *
343  * @param peer the peer to get the associated context to
344  *
345  * @return the context
346  */
347 static struct PeerContext *
348 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
349 {
350   if (GNUNET_NO == Peers_check_peer_known (peer))
351   {
352     return create_peer_ctx (peer);
353   }
354   return get_peer_ctx (peer);
355 }
356
357 void
358 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
359
360 void
361 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
362
363 /**
364  * @brief Check whether we have a connection to this @a peer
365  *
366  * Also sets the #Peers_ONLINE flag accordingly
367  *
368  * @param peer the peer in question
369  *
370  * @return #GNUNET_YES if we are connected
371  *         #GNUNET_NO  otherwise
372  */
373 int
374 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
375 {
376   const struct PeerContext *peer_ctx;
377
378   /* If we don't know about this peer we don't know whether it's online */
379   if (GNUNET_NO == Peers_check_peer_known (peer))
380   {
381     return GNUNET_NO;
382   }
383   /* Get the context */
384   peer_ctx = get_peer_ctx (peer);
385   /* If we have no channel to this peer we don't know whether it's online */
386   if ( (NULL == peer_ctx->send_channel) &&
387        (NULL == peer_ctx->recv_channel) )
388   {
389     Peers_unset_peer_flag (peer, Peers_ONLINE);
390     return GNUNET_NO;
391   }
392   /* Otherwise (if we have a channel, we know that it's online */
393   Peers_set_peer_flag (peer, Peers_ONLINE);
394   return GNUNET_YES;
395 }
396
397
398 /**
399  * @brief The closure to #get_rand_peer_iterator.
400  */
401 struct GetRandPeerIteratorCls
402 {
403   /**
404    * @brief The index of the peer to return.
405    * Will be decreased until 0.
406    * Then current peer is returned.
407    */
408   uint32_t index;
409
410   /**
411    * @brief Pointer to peer to return.
412    */
413   const struct GNUNET_PeerIdentity *peer;
414 };
415
416
417 /**
418  * @brief Iterator function for #get_random_peer_from_peermap.
419  *
420  * Implements #GNUNET_CONTAINER_PeerMapIterator.
421  * Decreases the index until the index is null.
422  * Then returns the current peer.
423  *
424  * @param cls the #GetRandPeerIteratorCls containing index and peer
425  * @param peer current peer
426  * @param value unused
427  *
428  * @return  #GNUNET_YES if we should continue to
429  *          iterate,
430  *          #GNUNET_NO if not.
431  */
432 static int
433 get_rand_peer_iterator (void *cls,
434                         const struct GNUNET_PeerIdentity *peer,
435                         void *value)
436 {
437   struct GetRandPeerIteratorCls *iterator_cls = cls;
438   if (0 >= iterator_cls->index)
439   {
440     iterator_cls->peer = peer;
441     return GNUNET_NO;
442   }
443   iterator_cls->index--;
444   return GNUNET_YES;
445 }
446
447
448 /**
449  * @brief Get a random peer from @a peer_map
450  *
451  * @param peer_map the peer_map to get the peer from
452  *
453  * @return a random peer
454  */
455 static const struct GNUNET_PeerIdentity *
456 get_random_peer_from_peermap (const struct
457                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
458 {
459   struct GetRandPeerIteratorCls *iterator_cls;
460   const struct GNUNET_PeerIdentity *ret;
461
462   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
463   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
464       GNUNET_CONTAINER_multipeermap_size (peer_map));
465   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
466                                                 get_rand_peer_iterator,
467                                                 iterator_cls);
468   ret = iterator_cls->peer;
469   GNUNET_free (iterator_cls);
470   return ret;
471 }
472
473
474 /**
475  * @brief Add a given @a peer to valid peers.
476  *
477  * If valid peers are already #num_valid_peers_max, delete a peer previously.
478  *
479  * @param peer the peer that is added to the valid peers.
480  *
481  * @return #GNUNET_YES if no other peer had to be removed
482  *         #GNUNET_NO  otherwise
483  */
484 static int
485 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
486 {
487   const struct GNUNET_PeerIdentity *rand_peer;
488   int ret;
489
490   ret = GNUNET_YES;
491   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
492   {
493     rand_peer = get_random_peer_from_peermap (valid_peers);
494     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
495     ret = GNUNET_NO;
496   }
497   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
498       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
499   return ret;
500 }
501
502
503 /**
504  * @brief Set the peer flag to living and
505  *        call the pending operations on this peer.
506  *
507  * Also adds peer to #valid_peers.
508  *
509  * @param peer_ctx the #PeerContext of the peer to set live
510  */
511 static void
512 set_peer_live (struct PeerContext *peer_ctx)
513 {
514   struct GNUNET_PeerIdentity *peer;
515   unsigned int i;
516
517   peer = &peer_ctx->peer_id;
518   LOG (GNUNET_ERROR_TYPE_DEBUG,
519       "Peer %s is live and valid, calling %i pending operations on it\n",
520       GNUNET_i2s (peer),
521       peer_ctx->num_pending_ops);
522
523   if (NULL != peer_ctx->liveliness_check_pending)
524   {
525     LOG (GNUNET_ERROR_TYPE_DEBUG,
526          "Removing pending liveliness check for peer %s\n",
527          GNUNET_i2s (&peer_ctx->peer_id));
528     // TODO wait until cadet sets mq->cancel_impl
529     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
530     GNUNET_free (peer_ctx->liveliness_check_pending);
531     peer_ctx->liveliness_check_pending = NULL;
532   }
533
534   (void) add_valid_peer (peer);
535   set_peer_flag (peer_ctx, Peers_ONLINE);
536
537   /* Call pending operations */
538   for (i = 0; i < peer_ctx->num_pending_ops; i++)
539   {
540     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
541   }
542   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
543 }
544
545 static void
546 cleanup_destroyed_channel (void *cls,
547                            const struct GNUNET_CADET_Channel *channel);
548
549 /* Declaration of handlers */
550 static void
551 handle_peer_check (void *cls,
552                    const struct GNUNET_MessageHeader *msg);
553
554 static void
555 handle_peer_push (void *cls,
556                   const struct GNUNET_MessageHeader *msg);
557
558 static void
559 handle_peer_pull_request (void *cls,
560                           const struct GNUNET_MessageHeader *msg);
561
562 static int
563 check_peer_pull_reply (void *cls,
564                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
565
566 static void
567 handle_peer_pull_reply (void *cls,
568                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
569
570 /* End declaration of handlers */
571
572
573 /**
574  * @brief Get the channel of a peer. If not existing, create.
575  *
576  * @param peer the peer id
577  * @return the #GNUNET_CADET_Channel used to send data to @a peer
578  */
579 struct GNUNET_CADET_Channel *
580 get_channel (const struct GNUNET_PeerIdentity *peer)
581 {
582   struct PeerContext *peer_ctx;
583   struct GNUNET_HashCode port;
584   struct GNUNET_PeerIdentity *ctx_peer;
585   /* There exists a copy-paste-clone in run() */
586   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
587     GNUNET_MQ_hd_fixed_size (peer_check,
588                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
589                              struct GNUNET_MessageHeader,
590                              NULL),
591     GNUNET_MQ_hd_fixed_size (peer_push,
592                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
593                              struct GNUNET_MessageHeader,
594                              NULL),
595     GNUNET_MQ_hd_fixed_size (peer_pull_request,
596                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
597                              struct GNUNET_MessageHeader,
598                              NULL),
599     GNUNET_MQ_hd_var_size (peer_pull_reply,
600                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
601                            struct GNUNET_RPS_P2P_PullReplyMessage,
602                            NULL),
603     GNUNET_MQ_handler_end ()
604   };
605
606
607   peer_ctx = get_peer_ctx (peer);
608   if (NULL == peer_ctx->send_channel)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Trying to establish channel to peer %s\n",
612          GNUNET_i2s (peer));
613     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
614                         strlen (GNUNET_APPLICATION_PORT_RPS),
615                         &port);
616     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
617     *ctx_peer = *peer;
618     peer_ctx->send_channel =
619       GNUNET_CADET_channel_create (cadet_handle,
620                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
621                                    peer,
622                                    &port,
623                                    GNUNET_CADET_OPTION_RELIABLE,
624                                    NULL, /* WindowSize handler */
625                                    cleanup_destroyed_channel, /* Disconnect handler */
626                                    cadet_handlers);
627   }
628   GNUNET_assert (NULL != peer_ctx->send_channel);
629   return peer_ctx->send_channel;
630 }
631
632
633 /**
634  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
635  *
636  * If we already have a message queue open to this client,
637  * simply return it, otherways create one.
638  *
639  * @param peer the peer to get the mq to
640  * @return the #GNUNET_MQ_Handle
641  */
642 static struct GNUNET_MQ_Handle *
643 get_mq (const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *peer_ctx;
646
647   peer_ctx = get_peer_ctx (peer);
648
649   if (NULL == peer_ctx->mq)
650   {
651     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
652   }
653   return peer_ctx->mq;
654 }
655
656
657 /**
658  * @brief This is called in response to the first message we sent as a
659  * liveliness check.
660  *
661  * @param cls #PeerContext of peer with pending liveliness check
662  */
663 static void
664 mq_liveliness_check_successful (void *cls)
665 {
666   struct PeerContext *peer_ctx = cls;
667
668   if (NULL != peer_ctx->liveliness_check_pending)
669   {
670     LOG (GNUNET_ERROR_TYPE_DEBUG,
671         "Liveliness check for peer %s was successfull\n",
672         GNUNET_i2s (&peer_ctx->peer_id));
673     GNUNET_free (peer_ctx->liveliness_check_pending);
674     peer_ctx->liveliness_check_pending = NULL;
675     set_peer_live (peer_ctx);
676   }
677 }
678
679 /**
680  * Issue a check whether peer is live
681  *
682  * @param peer_ctx the context of the peer
683  */
684 static void
685 check_peer_live (struct PeerContext *peer_ctx)
686 {
687   LOG (GNUNET_ERROR_TYPE_DEBUG,
688        "Get informed about peer %s getting live\n",
689        GNUNET_i2s (&peer_ctx->peer_id));
690
691   struct GNUNET_MQ_Handle *mq;
692   struct GNUNET_MQ_Envelope *ev;
693
694   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
695   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
696   peer_ctx->liveliness_check_pending->ev = ev;
697   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
698   peer_ctx->liveliness_check_pending->type = "Check liveliness";
699   mq = get_mq (&peer_ctx->peer_id);
700   GNUNET_MQ_notify_sent (ev,
701                          mq_liveliness_check_successful,
702                          peer_ctx);
703   GNUNET_MQ_send (mq, ev);
704 }
705
706 /**
707  * @brief Add an envelope to a message passed to mq to list of pending messages
708  *
709  * @param peer peer the message was sent to
710  * @param ev envelope to the message
711  * @param type type of the message to be sent
712  * @return pointer to pending message
713  */
714 static struct PendingMessage *
715 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
716                         struct GNUNET_MQ_Envelope *ev,
717                         const char *type)
718 {
719   struct PendingMessage *pending_msg;
720   struct PeerContext *peer_ctx;
721
722   peer_ctx = get_peer_ctx (peer);
723   pending_msg = GNUNET_new (struct PendingMessage);
724   pending_msg->ev = ev;
725   pending_msg->peer_ctx = peer_ctx;
726   pending_msg->type = type;
727   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
728                                peer_ctx->pending_messages_tail,
729                                pending_msg);
730   return pending_msg;
731 }
732
733
734 /**
735  * @brief Remove a pending message from the respective DLL
736  *
737  * @param pending_msg the pending message to remove
738  * @param cancel cancel the pending message, too
739  */
740 static void
741 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
742 {
743   struct PeerContext *peer_ctx;
744
745   peer_ctx = pending_msg->peer_ctx;
746   GNUNET_assert (NULL != peer_ctx);
747   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
748                                peer_ctx->pending_messages_tail,
749                                pending_msg);
750   // TODO wait for the cadet implementation of message cancellation
751   //if (GNUNET_YES == cancel)
752   //{
753   //  GNUNET_MQ_send_cancel (pending_msg->ev);
754   //}
755   GNUNET_free (pending_msg);
756 }
757
758
759 /**
760  * @brief Check whether function of type #PeerOp was already scheduled
761  *
762  * The array with pending operations will probably never grow really big, so
763  * iterating over it should be ok.
764  *
765  * @param peer the peer to check
766  * @param peer_op the operation (#PeerOp) on the peer
767  *
768  * @return #GNUNET_YES if this operation is scheduled on that peer
769  *         #GNUNET_NO  otherwise
770  */
771 static int
772 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
773                            const PeerOp peer_op)
774 {
775   const struct PeerContext *peer_ctx;
776   unsigned int i;
777
778   peer_ctx = get_peer_ctx (peer);
779   for (i = 0; i < peer_ctx->num_pending_ops; i++)
780     if (peer_op == peer_ctx->pending_ops[i].op)
781       return GNUNET_YES;
782   return GNUNET_NO;
783 }
784
785 int
786 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
787
788 /**
789  * Iterator over hash map entries. Deletes all contexts of peers.
790  *
791  * @param cls closure
792  * @param key current public key
793  * @param value value in the hash map
794  * @return #GNUNET_YES if we should continue to iterate,
795  *         #GNUNET_NO if not.
796  */
797 static int
798 peermap_clear_iterator (void *cls,
799                         const struct GNUNET_PeerIdentity *key,
800                         void *value)
801 {
802   Peers_remove_peer (key);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * @brief This is called once a message is sent.
809  *
810  * Removes the pending message
811  *
812  * @param cls type of the message that was sent
813  */
814 static void
815 mq_notify_sent_cb (void *cls)
816 {
817   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819       "%s was sent.\n",
820       pending_msg->type);
821   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
822     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
823   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
824     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
825   if (0 == strncmp ("PUSH", pending_msg->type, 4))
826     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
827   /* Do not cancle message */
828   remove_pending_message (pending_msg, GNUNET_NO);
829 }
830
831
832 /**
833  * @brief Iterator function for #store_valid_peers.
834  *
835  * Implements #GNUNET_CONTAINER_PeerMapIterator.
836  * Writes single peer to disk.
837  *
838  * @param cls the file handle to write to.
839  * @param peer current peer
840  * @param value unused
841  *
842  * @return  #GNUNET_YES if we should continue to
843  *          iterate,
844  *          #GNUNET_NO if not.
845  */
846 static int
847 store_peer_presistently_iterator (void *cls,
848                                   const struct GNUNET_PeerIdentity *peer,
849                                   void *value)
850 {
851   const struct GNUNET_DISK_FileHandle *fh = cls;
852   char peer_string[128];
853   int size;
854   ssize_t ret;
855
856   if (NULL == peer)
857   {
858     return GNUNET_YES;
859   }
860   size = GNUNET_snprintf (peer_string,
861                           sizeof (peer_string),
862                           "%s\n",
863                           GNUNET_i2s_full (peer));
864   GNUNET_assert (53 == size);
865   ret = GNUNET_DISK_file_write (fh,
866                                 peer_string,
867                                 size);
868   GNUNET_assert (size == ret);
869   return GNUNET_YES;
870 }
871
872
873 /**
874  * @brief Store the peers currently in #valid_peers to disk.
875  */
876 static void
877 store_valid_peers ()
878 {
879   struct GNUNET_DISK_FileHandle *fh;
880   uint32_t number_written_peers;
881   int ret;
882
883   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
884   {
885     return;
886   }
887
888   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
889   if (GNUNET_SYSERR == ret)
890   {
891     LOG (GNUNET_ERROR_TYPE_WARNING,
892         "Not able to create directory for file `%s'\n",
893         filename_valid_peers);
894     GNUNET_break (0);
895   }
896   else if (GNUNET_NO == ret)
897   {
898     LOG (GNUNET_ERROR_TYPE_WARNING,
899         "Directory for file `%s' exists but is not writable for us\n",
900         filename_valid_peers);
901     GNUNET_break (0);
902   }
903   fh = GNUNET_DISK_file_open (filename_valid_peers,
904                               GNUNET_DISK_OPEN_WRITE |
905                                   GNUNET_DISK_OPEN_CREATE,
906                               GNUNET_DISK_PERM_USER_READ |
907                                   GNUNET_DISK_PERM_USER_WRITE);
908   if (NULL == fh)
909   {
910     LOG (GNUNET_ERROR_TYPE_WARNING,
911         "Not able to write valid peers to file `%s'\n",
912         filename_valid_peers);
913     return;
914   }
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916       "Writing %u valid peers to disk\n",
917       GNUNET_CONTAINER_multipeermap_size (valid_peers));
918   number_written_peers =
919     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
920                                            store_peer_presistently_iterator,
921                                            fh);
922   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
923   GNUNET_assert (number_written_peers ==
924       GNUNET_CONTAINER_multipeermap_size (valid_peers));
925 }
926
927
928 /**
929  * @brief Convert string representation of peer id to peer id.
930  *
931  * Counterpart to #GNUNET_i2s_full.
932  *
933  * @param string_repr The string representation of the peer id
934  *
935  * @return The peer id
936  */
937 static const struct GNUNET_PeerIdentity *
938 s2i_full (const char *string_repr)
939 {
940   struct GNUNET_PeerIdentity *peer;
941   size_t len;
942   int ret;
943
944   peer = GNUNET_new (struct GNUNET_PeerIdentity);
945   len = strlen (string_repr);
946   if (52 > len)
947   {
948     LOG (GNUNET_ERROR_TYPE_WARNING,
949         "Not able to convert string representation of PeerID to PeerID\n"
950         "Sting representation: %s (len %lu) - too short\n",
951         string_repr,
952         len);
953     GNUNET_break (0);
954   }
955   else if (52 < len)
956   {
957     len = 52;
958   }
959   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
960                                                     len,
961                                                     &peer->public_key);
962   if (GNUNET_OK != ret)
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965         "Not able to convert string representation of PeerID to PeerID\n"
966         "Sting representation: %s\n",
967         string_repr);
968     GNUNET_break (0);
969   }
970   return peer;
971 }
972
973
974 /**
975  * @brief Restore the peers on disk to #valid_peers.
976  */
977 static void
978 restore_valid_peers ()
979 {
980   off_t file_size;
981   uint32_t num_peers;
982   struct GNUNET_DISK_FileHandle *fh;
983   char *buf;
984   ssize_t size_read;
985   char *iter_buf;
986   char *str_repr;
987   const struct GNUNET_PeerIdentity *peer;
988
989   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
990   {
991     return;
992   }
993
994   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
995   {
996     return;
997   }
998   fh = GNUNET_DISK_file_open (filename_valid_peers,
999                               GNUNET_DISK_OPEN_READ,
1000                               GNUNET_DISK_PERM_NONE);
1001   GNUNET_assert (NULL != fh);
1002   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1003   num_peers = file_size / 53;
1004   buf = GNUNET_malloc (file_size);
1005   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1006   GNUNET_assert (size_read == file_size);
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008       "Restoring %" PRIu32 " peers from file `%s'\n",
1009       num_peers,
1010       filename_valid_peers);
1011   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1012   {
1013     str_repr = GNUNET_strndup (iter_buf, 53);
1014     peer = s2i_full (str_repr);
1015     GNUNET_free (str_repr);
1016     add_valid_peer (peer);
1017     LOG (GNUNET_ERROR_TYPE_DEBUG,
1018         "Restored valid peer %s from disk\n",
1019         GNUNET_i2s_full (peer));
1020   }
1021   iter_buf = NULL;
1022   GNUNET_free (buf);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG,
1024       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1025       num_peers,
1026       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1027   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1028   {
1029     LOG (GNUNET_ERROR_TYPE_WARNING,
1030         "Number of restored peers does not match file size. Have probably duplicates.\n");
1031   }
1032   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1033   LOG (GNUNET_ERROR_TYPE_DEBUG,
1034       "Restored %u valid peers from disk\n",
1035       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1036 }
1037
1038
1039 /**
1040  * @brief Initialise storage of peers
1041  *
1042  * @param fn_valid_peers filename of the file used to store valid peer ids
1043  * @param cadet_h cadet handle
1044  * @param own_id own peer identity
1045  */
1046 void
1047 Peers_initialise (char* fn_valid_peers,
1048                   struct GNUNET_CADET_Handle *cadet_h,
1049                   const struct GNUNET_PeerIdentity *own_id)
1050 {
1051   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1052   cadet_handle = cadet_h;
1053   own_identity = *own_id;
1054   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1055   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1056   restore_valid_peers ();
1057 }
1058
1059
1060 /**
1061  * @brief Delete storage of peers that was created with #Peers_initialise ()
1062  */
1063 void
1064 Peers_terminate ()
1065 {
1066   if (GNUNET_SYSERR ==
1067       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1068                                              peermap_clear_iterator,
1069                                              NULL))
1070   {
1071     LOG (GNUNET_ERROR_TYPE_WARNING,
1072         "Iteration destroying peers was aborted.\n");
1073   }
1074   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1075   peer_map = NULL;
1076   store_valid_peers ();
1077   GNUNET_free (filename_valid_peers);
1078   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1079 }
1080
1081
1082 /**
1083  * Iterator over #valid_peers hash map entries.
1084  *
1085  * @param cls closure - unused
1086  * @param peer current peer id
1087  * @param value value in the hash map - unused
1088  * @return #GNUNET_YES if we should continue to
1089  *         iterate,
1090  *         #GNUNET_NO if not.
1091  */
1092 static int
1093 valid_peer_iterator (void *cls,
1094                      const struct GNUNET_PeerIdentity *peer,
1095                      void *value)
1096 {
1097   struct PeersIteratorCls *it_cls = cls;
1098
1099   return it_cls->iterator (it_cls->cls,
1100                            peer);
1101 }
1102
1103
1104 /**
1105  * @brief Get all currently known, valid peer ids.
1106  *
1107  * @param it function to call on each peer id
1108  * @param it_cls extra argument to @a it
1109  * @return the number of key value pairs processed,
1110  *         #GNUNET_SYSERR if it aborted iteration
1111  */
1112 int
1113 Peers_get_valid_peers (PeersIterator iterator,
1114                        void *it_cls)
1115 {
1116   struct PeersIteratorCls *cls;
1117   int ret;
1118
1119   cls = GNUNET_new (struct PeersIteratorCls);
1120   cls->iterator = iterator;
1121   cls->cls = it_cls;
1122   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1123                                                valid_peer_iterator,
1124                                                cls);
1125   GNUNET_free (cls);
1126   return ret;
1127 }
1128
1129
1130 /**
1131  * @brief Add peer to known peers.
1132  *
1133  * This function is called on new peer_ids from 'external' sources
1134  * (client seed, cadet get_peers(), ...)
1135  *
1136  * @param peer the new #GNUNET_PeerIdentity
1137  *
1138  * @return #GNUNET_YES if peer was inserted
1139  *         #GNUNET_NO  otherwise (if peer was already known or
1140  *                     peer was #own_identity)
1141  */
1142 int
1143 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1144 {
1145   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1146        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1147   {
1148     return GNUNET_NO; /* We already know this peer - nothing to do */
1149   }
1150   (void) create_peer_ctx (peer);
1151   return GNUNET_YES;
1152 }
1153
1154 int
1155 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1156
1157 /**
1158  * @brief Try connecting to a peer to see whether it is online
1159  *
1160  * If not known yet, insert into known peers
1161  *
1162  * @param peer the peer whose liveliness is to be checked
1163  * @return #GNUNET_YES if peer had to be inserted
1164  *         #GNUNET_NO  otherwise (if peer was already known or
1165  *                     peer was #own_identity)
1166  */
1167 int
1168 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1169 {
1170   struct PeerContext *peer_ctx;
1171   int ret;
1172
1173   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1174   {
1175     return GNUNET_NO;
1176   }
1177   ret = Peers_insert_peer (peer);
1178   peer_ctx = get_peer_ctx (peer);
1179   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1180   {
1181     check_peer_live (peer_ctx);
1182   }
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * @brief Check if peer is removable.
1189  *
1190  * Check if
1191  *  - a recv channel exists
1192  *  - there are pending messages
1193  *  - there is no pending pull reply
1194  *
1195  * @param peer the peer in question
1196  * @return #GNUNET_YES    if peer is removable
1197  *         #GNUNET_NO     if peer is NOT removable
1198  *         #GNUNET_SYSERR if peer is not known
1199  */
1200 int
1201 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1202 {
1203   struct PeerContext *peer_ctx;
1204
1205   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1206   {
1207     return GNUNET_SYSERR;
1208   }
1209
1210   peer_ctx = get_peer_ctx (peer);
1211   if ( (NULL != peer_ctx->recv_channel) ||
1212        (NULL != peer_ctx->pending_messages_head) ||
1213        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1214   {
1215     return GNUNET_NO;
1216   }
1217   return GNUNET_YES;
1218 }
1219
1220 uint32_t *
1221 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1222                         enum Peers_ChannelRole role);
1223
1224 int
1225 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1226
1227 /**
1228  * @brief Remove peer
1229  *
1230  * @param peer the peer to clean
1231  * @return #GNUNET_YES if peer was removed
1232  *         #GNUNET_NO  otherwise
1233  */
1234 int
1235 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1236 {
1237   struct PeerContext *peer_ctx;
1238   uint32_t *channel_flag;
1239
1240   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1241   {
1242     return GNUNET_NO;
1243   }
1244
1245   peer_ctx = get_peer_ctx (peer);
1246   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Going to remove peer %s\n",
1249        GNUNET_i2s (&peer_ctx->peer_id));
1250   Peers_unset_peer_flag (peer, Peers_ONLINE);
1251
1252   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1253   while (NULL != peer_ctx->pending_messages_head)
1254   {
1255     LOG (GNUNET_ERROR_TYPE_DEBUG,
1256         "Removing unsent %s\n",
1257         peer_ctx->pending_messages_head->type);
1258     /* Cancle pending message, too */
1259     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1260   }
1261   /* If we are still waiting for notification whether this peer is live
1262    * cancel the according task */
1263   if (NULL != peer_ctx->liveliness_check_pending)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266          "Removing pending liveliness check for peer %s\n",
1267          GNUNET_i2s (&peer_ctx->peer_id));
1268     // TODO wait until cadet sets mq->cancel_impl
1269     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1270     GNUNET_free (peer_ctx->liveliness_check_pending);
1271     peer_ctx->liveliness_check_pending = NULL;
1272   }
1273   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1274   if (NULL != peer_ctx->send_channel &&
1275       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1276   {
1277     LOG (GNUNET_ERROR_TYPE_DEBUG,
1278         "Destroying send channel\n");
1279     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1280     peer_ctx->send_channel = NULL;
1281     peer_ctx->mq = NULL;
1282   }
1283   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1284   if (NULL != peer_ctx->recv_channel &&
1285       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288         "Destroying recv channel\n");
1289     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1290     peer_ctx->recv_channel = NULL;
1291   }
1292
1293   GNUNET_free (peer_ctx->send_channel_flags);
1294   GNUNET_free (peer_ctx->recv_channel_flags);
1295
1296   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1297   {
1298     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1299   }
1300   GNUNET_free (peer_ctx);
1301   return GNUNET_YES;
1302 }
1303
1304
1305 /**
1306  * @brief set flags on a given peer.
1307  *
1308  * @param peer the peer to set flags on
1309  * @param flags the flags
1310  */
1311 void
1312 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1313 {
1314   struct PeerContext *peer_ctx;
1315
1316   peer_ctx = get_peer_ctx (peer);
1317   set_peer_flag (peer_ctx, flags);
1318 }
1319
1320
1321 /**
1322  * @brief unset flags on a given peer.
1323  *
1324  * @param peer the peer to unset flags on
1325  * @param flags the flags
1326  */
1327 void
1328 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1329 {
1330   struct PeerContext *peer_ctx;
1331
1332   peer_ctx = get_peer_ctx (peer);
1333   unset_peer_flag (peer_ctx, flags);
1334 }
1335
1336
1337 /**
1338  * @brief Check whether flags on a peer are set.
1339  *
1340  * @param peer the peer to check the flag of
1341  * @param flags the flags to check
1342  *
1343  * @return #GNUNET_SYSERR if peer is not known
1344  *         #GNUNET_YES    if all given flags are set
1345  *         #GNUNET_NO     otherwise
1346  */
1347 int
1348 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1349 {
1350   struct PeerContext *peer_ctx;
1351
1352   if (GNUNET_NO == Peers_check_peer_known (peer))
1353   {
1354     return GNUNET_SYSERR;
1355   }
1356   peer_ctx = get_peer_ctx (peer);
1357   return check_peer_flag_set (peer_ctx, flags);
1358 }
1359
1360
1361 /**
1362  * @brief set flags on a given channel.
1363  *
1364  * @param channel the channel to set flags on
1365  * @param flags the flags
1366  */
1367 void
1368 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1369 {
1370   set_channel_flag (channel_flags, flags);
1371 }
1372
1373
1374 /**
1375  * @brief unset flags on a given channel.
1376  *
1377  * @param channel the channel to unset flags on
1378  * @param flags the flags
1379  */
1380 void
1381 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1382 {
1383   unset_channel_flag (channel_flags, flags);
1384 }
1385
1386
1387 /**
1388  * @brief Check whether flags on a channel are set.
1389  *
1390  * @param channel the channel to check the flag of
1391  * @param flags the flags to check
1392  *
1393  * @return #GNUNET_YES if all given flags are set
1394  *         #GNUNET_NO  otherwise
1395  */
1396 int
1397 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1398 {
1399   return check_channel_flag_set (channel_flags, flags);
1400 }
1401
1402 /**
1403  * @brief Get the flags for the channel in @a role for @a peer.
1404  *
1405  * @param peer Peer to get the channel flags for.
1406  * @param role Role of channel to get flags for
1407  *
1408  * @return The flags.
1409  */
1410 uint32_t *
1411 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1412                         enum Peers_ChannelRole role)
1413 {
1414   const struct PeerContext *peer_ctx;
1415
1416   peer_ctx = get_peer_ctx (peer);
1417   if (Peers_CHANNEL_ROLE_SENDING == role)
1418   {
1419     return peer_ctx->send_channel_flags;
1420   }
1421   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1422   {
1423     return peer_ctx->recv_channel_flags;
1424   }
1425   else
1426   {
1427     GNUNET_assert (0);
1428   }
1429 }
1430
1431 /**
1432  * @brief Check whether we have information about the given peer.
1433  *
1434  * FIXME probably deprecated. Make this the new _online.
1435  *
1436  * @param peer peer in question
1437  *
1438  * @return #GNUNET_YES if peer is known
1439  *         #GNUNET_NO  if peer is not knwon
1440  */
1441 int
1442 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1443 {
1444   if (NULL != peer_map)
1445   {
1446     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1447   } else
1448   {
1449     return GNUNET_NO;
1450   }
1451 }
1452
1453
1454 /**
1455  * @brief Check whether @a peer is actually a peer.
1456  *
1457  * A valid peer is a peer that we know exists eg. we were connected to once.
1458  *
1459  * @param peer peer in question
1460  *
1461  * @return #GNUNET_YES if peer is valid
1462  *         #GNUNET_NO  if peer is not valid
1463  */
1464 int
1465 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1466 {
1467   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1468 }
1469
1470
1471 /**
1472  * @brief Indicate that we want to send to the other peer
1473  *
1474  * This establishes a sending channel
1475  *
1476  * @param peer the peer to establish channel to
1477  */
1478 void
1479 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1480 {
1481   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1482   (void) get_channel (peer);
1483 }
1484
1485
1486 /**
1487  * @brief Check whether other peer has the intention to send/opened channel
1488  *        towars us
1489  *
1490  * @param peer the peer in question
1491  *
1492  * @return #GNUNET_YES if peer has the intention to send
1493  *         #GNUNET_NO  otherwise
1494  */
1495 int
1496 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1497 {
1498   const struct PeerContext *peer_ctx;
1499
1500   peer_ctx = get_peer_ctx (peer);
1501   if (NULL != peer_ctx->recv_channel)
1502   {
1503     return GNUNET_YES;
1504   }
1505   return GNUNET_NO;
1506 }
1507
1508
1509 /**
1510  * Handle the channel a peer opens to us.
1511  *
1512  * @param cls The closure
1513  * @param channel The channel the peer wants to establish
1514  * @param initiator The peer's peer ID
1515  *
1516  * @return initial channel context for the channel
1517  *         (can be NULL -- that's not an error)
1518  */
1519 void *
1520 Peers_handle_inbound_channel (void *cls,
1521                               struct GNUNET_CADET_Channel *channel,
1522                               const struct GNUNET_PeerIdentity *initiator)
1523 {
1524   struct PeerContext *peer_ctx;
1525   struct GNUNET_PeerIdentity *ctx_peer;
1526
1527   LOG (GNUNET_ERROR_TYPE_DEBUG,
1528       "New channel was established to us (Peer %s).\n",
1529       GNUNET_i2s (initiator));
1530   GNUNET_assert (NULL != channel); /* according to cadet API */
1531   /* Make sure we 'know' about this peer */
1532   peer_ctx = create_or_get_peer_ctx (initiator);
1533   set_peer_live (peer_ctx);
1534   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1535   *ctx_peer = *initiator;
1536   /* We only accept one incoming channel per peer */
1537   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1538   {
1539     set_channel_flag (peer_ctx->recv_channel_flags,
1540                       Peers_CHANNEL_ESTABLISHED_TWICE);
1541     //GNUNET_CADET_channel_destroy (channel);
1542     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1543     peer_ctx->recv_channel = channel;
1544     /* return the channel context */
1545     return ctx_peer;
1546   }
1547   peer_ctx->recv_channel = channel;
1548   return ctx_peer;
1549 }
1550
1551
1552 /**
1553  * @brief Check whether a sending channel towards the given peer exists
1554  *
1555  * @param peer the peer to check for
1556  *
1557  * @return #GNUNET_YES if a sending channel towards that peer exists
1558  *         #GNUNET_NO  otherwise
1559  */
1560 int
1561 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1562 {
1563   struct PeerContext *peer_ctx;
1564
1565   if (GNUNET_NO == Peers_check_peer_known (peer))
1566   { /* If no such peer exists, there is no channel */
1567     return GNUNET_NO;
1568   }
1569   peer_ctx = get_peer_ctx (peer);
1570   if (NULL == peer_ctx->send_channel)
1571   {
1572     return GNUNET_NO;
1573   }
1574   return GNUNET_YES;
1575 }
1576
1577
1578 /**
1579  * @brief check whether the given channel is the sending channel of the given
1580  *        peer
1581  *
1582  * @param peer the peer in question
1583  * @param channel the channel to check for
1584  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1585  *                    #Peers_CHANNEL_ROLE_RECEIVING
1586  *
1587  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1588  *         #GNUNET_NO  otherwise
1589  */
1590 int
1591 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1592                           const struct GNUNET_CADET_Channel *channel,
1593                           enum Peers_ChannelRole role)
1594 {
1595   const struct PeerContext *peer_ctx;
1596
1597   if (GNUNET_NO == Peers_check_peer_known (peer))
1598   {
1599     return GNUNET_NO;
1600   }
1601   peer_ctx = get_peer_ctx (peer);
1602   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1603        (channel == peer_ctx->send_channel) )
1604   {
1605     return GNUNET_YES;
1606   }
1607   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1608        (channel == peer_ctx->recv_channel) )
1609   {
1610     return GNUNET_YES;
1611   }
1612   return GNUNET_NO;
1613 }
1614
1615
1616 /**
1617  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618  *        intention to another peer
1619  *
1620  * If there is also no channel to receive messages from that peer, remove it
1621  * from the peermap.
1622  * TODO really?
1623  *
1624  * @peer the peer identity of the peer whose sending channel to destroy
1625  * @return #GNUNET_YES if channel was destroyed
1626  *         #GNUNET_NO  otherwise
1627  */
1628 int
1629 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1630 {
1631   struct PeerContext *peer_ctx;
1632
1633   if (GNUNET_NO == Peers_check_peer_known (peer))
1634   {
1635     return GNUNET_NO;
1636   }
1637   peer_ctx = get_peer_ctx (peer);
1638   if (NULL != peer_ctx->send_channel)
1639   {
1640     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1641     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1642     peer_ctx->send_channel = NULL;
1643     peer_ctx->mq = NULL;
1644     (void) Peers_check_connected (peer);
1645     return GNUNET_YES;
1646   }
1647   return GNUNET_NO;
1648 }
1649
1650 /**
1651  * This is called when a channel is destroyed.
1652  *
1653  * @param cls The closure
1654  * @param channel The channel being closed
1655  */
1656 void
1657 Peers_cleanup_destroyed_channel (void *cls,
1658                                  const struct GNUNET_CADET_Channel *channel)
1659 {
1660   struct GNUNET_PeerIdentity *peer = cls;
1661   struct PeerContext *peer_ctx;
1662
1663   if (GNUNET_NO == Peers_check_peer_known (peer))
1664   {/* We don't want to implicitly create a context that we're about to kill */
1665   LOG (GNUNET_ERROR_TYPE_DEBUG,
1666        "channel (%s) without associated context was destroyed\n",
1667        GNUNET_i2s (peer));
1668     return;
1669   }
1670   peer_ctx = get_peer_ctx (peer);
1671
1672   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1673    * flag will be set. In this case simply make sure that the channels are
1674    * cleaned. */
1675   /* FIXME This distinction seems to be redundant */
1676   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1677   {/* We initiatad the destruction of this particular peer */
1678     LOG (GNUNET_ERROR_TYPE_DEBUG,
1679         "Peer is in the process of being destroyed\n");
1680     if (channel == peer_ctx->send_channel)
1681     {
1682       peer_ctx->send_channel = NULL;
1683       peer_ctx->mq = NULL;
1684     }
1685     else if (channel == peer_ctx->recv_channel)
1686     {
1687       peer_ctx->recv_channel = NULL;
1688     }
1689
1690     if (NULL != peer_ctx->send_channel)
1691     {
1692       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1693       peer_ctx->send_channel = NULL;
1694       peer_ctx->mq = NULL;
1695     }
1696     if (NULL != peer_ctx->recv_channel)
1697     {
1698       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1699       peer_ctx->recv_channel = NULL;
1700     }
1701     /* Set the #Peers_ONLINE flag accordingly */
1702     (void) Peers_check_connected (peer);
1703     return;
1704   }
1705
1706   else
1707   { /* We did not initiate the destruction of this peer */
1708     LOG (GNUNET_ERROR_TYPE_DEBUG,
1709         "Peer is NOT in the process of being destroyed\n");
1710     if (channel == peer_ctx->send_channel)
1711     { /* Something (but us) killd the channel - clean up peer */
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713           "send channel (%s) was destroyed - cleaning up\n",
1714           GNUNET_i2s (peer));
1715       peer_ctx->send_channel = NULL;
1716       peer_ctx->mq = NULL;
1717     }
1718     else if (channel == peer_ctx->recv_channel)
1719     { /* Other peer doesn't want to send us messages anymore */
1720       LOG (GNUNET_ERROR_TYPE_DEBUG,
1721            "Peer %s destroyed recv channel - cleaning up channel\n",
1722            GNUNET_i2s (peer));
1723       peer_ctx->recv_channel = NULL;
1724     }
1725     else
1726     {
1727       LOG (GNUNET_ERROR_TYPE_WARNING,
1728            "unknown channel (%s) was destroyed\n",
1729            GNUNET_i2s (peer));
1730     }
1731   }
1732   (void) Peers_check_connected (peer);
1733 }
1734
1735 /**
1736  * @brief Send a message to another peer.
1737  *
1738  * Keeps track about pending messages so they can be properly removed when the
1739  * peer is destroyed.
1740  *
1741  * @param peer receeiver of the message
1742  * @param ev envelope of the message
1743  * @param type type of the message
1744  */
1745 void
1746 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1747                     struct GNUNET_MQ_Envelope *ev,
1748                     const char *type)
1749 {
1750   struct PendingMessage *pending_msg;
1751   struct GNUNET_MQ_Handle *mq;
1752
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "Sending message to %s of type %s\n",
1755               GNUNET_i2s (peer),
1756               type);
1757   pending_msg = insert_pending_message (peer, ev, type);
1758   mq = get_mq (peer);
1759   GNUNET_MQ_notify_sent (ev,
1760                          mq_notify_sent_cb,
1761                          pending_msg);
1762   GNUNET_MQ_send (mq, ev);
1763 }
1764
1765 /**
1766  * @brief Schedule a operation on given peer
1767  *
1768  * Avoids scheduling an operation twice.
1769  *
1770  * @param peer the peer we want to schedule the operation for once it gets live
1771  *
1772  * @return #GNUNET_YES if the operation was scheduled
1773  *         #GNUNET_NO  otherwise
1774  */
1775 int
1776 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1777                           const PeerOp peer_op)
1778 {
1779   struct PeerPendingOp pending_op;
1780   struct PeerContext *peer_ctx;
1781
1782   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1783   {
1784     return GNUNET_NO;
1785   }
1786   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1787
1788   //TODO if LIVE/ONLINE execute immediately
1789
1790   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1791   {
1792     peer_ctx = get_peer_ctx (peer);
1793     pending_op.op = peer_op;
1794     pending_op.op_cls = NULL;
1795     GNUNET_array_append (peer_ctx->pending_ops,
1796                          peer_ctx->num_pending_ops,
1797                          pending_op);
1798     return GNUNET_YES;
1799   }
1800   return GNUNET_NO;
1801 }
1802
1803 /**
1804  * @brief Get the recv_channel of @a peer.
1805  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1806  * messages.
1807  *
1808  * @param peer The peer to get the recv_channel from.
1809  *
1810  * @return The recv_channel.
1811  */
1812 struct GNUNET_CADET_Channel *
1813 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1814 {
1815   struct PeerContext *peer_ctx;
1816
1817   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1818   peer_ctx = get_peer_ctx (peer);
1819   return peer_ctx->recv_channel;
1820 }
1821 /***********************************************************************
1822  * /Old gnunet-service-rps_peers.c
1823 ***********************************************************************/
1824
1825
1826 /***********************************************************************
1827  * Housekeeping with clients
1828 ***********************************************************************/
1829
1830 /**
1831  * Closure used to pass the client and the id to the callback
1832  * that replies to a client's request
1833  */
1834 struct ReplyCls
1835 {
1836   /**
1837    * DLL
1838    */
1839   struct ReplyCls *next;
1840   struct ReplyCls *prev;
1841
1842   /**
1843    * The identifier of the request
1844    */
1845   uint32_t id;
1846
1847   /**
1848    * The handle to the request
1849    */
1850   struct RPS_SamplerRequestHandle *req_handle;
1851
1852   /**
1853    * The client handle to send the reply to
1854    */
1855   struct ClientContext *cli_ctx;
1856 };
1857
1858
1859 /**
1860  * Struct used to store the context of a connected client.
1861  */
1862 struct ClientContext
1863 {
1864   /**
1865    * DLL
1866    */
1867   struct ClientContext *next;
1868   struct ClientContext *prev;
1869
1870   /**
1871    * The message queue to communicate with the client.
1872    */
1873   struct GNUNET_MQ_Handle *mq;
1874
1875   /**
1876    * DLL with handles to single requests from the client
1877    */
1878   struct ReplyCls *rep_cls_head;
1879   struct ReplyCls *rep_cls_tail;
1880
1881   /**
1882    * @brief How many updates this client expects to receive.
1883    */
1884   int64_t view_updates_left;
1885
1886   /**
1887    * The client handle to send the reply to
1888    */
1889   struct GNUNET_SERVICE_Client *client;
1890 };
1891
1892 /**
1893  * DLL with all clients currently connected to us
1894  */
1895 struct ClientContext *cli_ctx_head;
1896 struct ClientContext *cli_ctx_tail;
1897
1898 /***********************************************************************
1899  * /Housekeeping with clients
1900 ***********************************************************************/
1901
1902
1903
1904
1905
1906 /***********************************************************************
1907  * Globals
1908 ***********************************************************************/
1909
1910 /**
1911  * Sampler used for the Brahms protocol itself.
1912  */
1913 static struct RPS_Sampler *prot_sampler;
1914
1915 /**
1916  * Sampler used for the clients.
1917  */
1918 static struct RPS_Sampler *client_sampler;
1919
1920 /**
1921  * Name to log view to
1922  */
1923 static char *file_name_view_log;
1924
1925 /**
1926  * The size of sampler we need to be able to satisfy the client's need
1927  * of random peers.
1928  */
1929 static unsigned int sampler_size_client_need;
1930
1931 /**
1932  * The size of sampler we need to be able to satisfy the Brahms protocol's
1933  * need of random peers.
1934  *
1935  * This is one minimum size the sampler grows to.
1936  */
1937 static unsigned int sampler_size_est_need;
1938
1939 /**
1940  * @brief This is the minimum estimate used as view size.
1941  *
1942  * It is configured by the user.
1943  */
1944 static unsigned int sampler_size_est_min;
1945
1946 /**
1947  * Percentage of total peer number in the view
1948  * to send random PUSHes to
1949  */
1950 static float alpha;
1951
1952 /**
1953  * Percentage of total peer number in the view
1954  * to send random PULLs to
1955  */
1956 static float beta;
1957
1958 /**
1959  * Identifier for the main task that runs periodically.
1960  */
1961 static struct GNUNET_SCHEDULER_Task *do_round_task;
1962
1963 /**
1964  * Time inverval the do_round task runs in.
1965  */
1966 static struct GNUNET_TIME_Relative round_interval;
1967
1968 /**
1969  * List to store peers received through pushes temporary.
1970  */
1971 static struct CustomPeerMap *push_map;
1972
1973 /**
1974  * List to store peers received through pulls temporary.
1975  */
1976 static struct CustomPeerMap *pull_map;
1977
1978 /**
1979  * Handler to NSE.
1980  */
1981 static struct GNUNET_NSE_Handle *nse;
1982
1983 /**
1984  * Handler to CADET.
1985  */
1986 static struct GNUNET_CADET_Handle *cadet_handle;
1987
1988 /**
1989  * @brief Port to communicate to other peers.
1990  */
1991 static struct GNUNET_CADET_Port *cadet_port;
1992
1993 /**
1994  * Handler to PEERINFO.
1995  */
1996 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1997
1998 /**
1999  * Handle for cancellation of iteration over peers.
2000  */
2001 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2002
2003 /**
2004  * Request counter.
2005  *
2006  * Counts how many requets clients already issued.
2007  * Only needed in the beginning to check how many of the 64 deltas
2008  * we already have
2009  */
2010 static unsigned int req_counter;
2011
2012 /**
2013  * Time of the last request we received.
2014  *
2015  * Used to compute the expected request rate.
2016  */
2017 static struct GNUNET_TIME_Absolute last_request;
2018
2019 /**
2020  * Size of #request_deltas.
2021  */
2022 #define REQUEST_DELTAS_SIZE 64
2023 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2024
2025 /**
2026  * Last 64 deltas between requests
2027  */
2028 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2029
2030 /**
2031  * The prediction of the rate of requests
2032  */
2033 static struct GNUNET_TIME_Relative request_rate;
2034
2035
2036 #ifdef ENABLE_MALICIOUS
2037 /**
2038  * Type of malicious peer
2039  *
2040  * 0 Don't act malicious at all - Default
2041  * 1 Try to maximise representation
2042  * 2 Try to partition the network
2043  * 3 Combined attack
2044  */
2045 static uint32_t mal_type;
2046
2047 /**
2048  * Other malicious peers
2049  */
2050 static struct GNUNET_PeerIdentity *mal_peers;
2051
2052 /**
2053  * Hashmap of malicious peers used as set.
2054  * Used to more efficiently check whether we know that peer.
2055  */
2056 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2057
2058 /**
2059  * Number of other malicious peers
2060  */
2061 static uint32_t num_mal_peers;
2062
2063
2064 /**
2065  * If type is 2 This struct is used to store the attacked peers in a DLL
2066  */
2067 struct AttackedPeer
2068 {
2069   /**
2070    * DLL
2071    */
2072   struct AttackedPeer *next;
2073   struct AttackedPeer *prev;
2074
2075   /**
2076    * PeerID
2077    */
2078   struct GNUNET_PeerIdentity peer_id;
2079 };
2080
2081 /**
2082  * If type is 2 this is the DLL of attacked peers
2083  */
2084 static struct AttackedPeer *att_peers_head;
2085 static struct AttackedPeer *att_peers_tail;
2086
2087 /**
2088  * This index is used to point to an attacked peer to
2089  * implement the round-robin-ish way to select attacked peers.
2090  */
2091 static struct AttackedPeer *att_peer_index;
2092
2093 /**
2094  * Hashmap of attacked peers used as set.
2095  * Used to more efficiently check whether we know that peer.
2096  */
2097 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2098
2099 /**
2100  * Number of attacked peers
2101  */
2102 static uint32_t num_attacked_peers;
2103
2104 /**
2105  * If type is 1 this is the attacked peer
2106  */
2107 static struct GNUNET_PeerIdentity attacked_peer;
2108
2109 /**
2110  * The limit of PUSHes we can send in one round.
2111  * This is an assumption of the Brahms protocol and either implemented
2112  * via proof of work
2113  * or
2114  * assumend to be the bandwidth limitation.
2115  */
2116 static uint32_t push_limit = 10000;
2117 #endif /* ENABLE_MALICIOUS */
2118
2119
2120 /***********************************************************************
2121  * /Globals
2122 ***********************************************************************/
2123
2124
2125 /***********************************************************************
2126  * Util functions
2127 ***********************************************************************/
2128
2129
2130 /**
2131  * Print peerlist to log.
2132  */
2133 static void
2134 print_peer_list (struct GNUNET_PeerIdentity *list,
2135                  unsigned int len)
2136 {
2137   unsigned int i;
2138
2139   LOG (GNUNET_ERROR_TYPE_DEBUG,
2140        "Printing peer list of length %u at %p:\n",
2141        len,
2142        list);
2143   for (i = 0 ; i < len ; i++)
2144   {
2145     LOG (GNUNET_ERROR_TYPE_DEBUG,
2146          "%u. peer: %s\n",
2147          i, GNUNET_i2s (&list[i]));
2148   }
2149 }
2150
2151
2152 /**
2153  * Remove peer from list.
2154  */
2155 static void
2156 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2157                unsigned int *list_size,
2158                const struct GNUNET_PeerIdentity *peer)
2159 {
2160   unsigned int i;
2161   struct GNUNET_PeerIdentity *tmp;
2162
2163   tmp = *peer_list;
2164
2165   LOG (GNUNET_ERROR_TYPE_DEBUG,
2166        "Removing peer %s from list at %p\n",
2167        GNUNET_i2s (peer),
2168        tmp);
2169
2170   for ( i = 0 ; i < *list_size ; i++ )
2171   {
2172     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2173     {
2174       if (i < *list_size -1)
2175       { /* Not at the last entry -- shift peers left */
2176         memmove (&tmp[i], &tmp[i +1],
2177                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2178       }
2179       /* Remove last entry (should be now useless PeerID) */
2180       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2181     }
2182   }
2183   *peer_list = tmp;
2184 }
2185
2186
2187 /**
2188  * Sum all time relatives of an array.
2189  */
2190 static struct GNUNET_TIME_Relative
2191 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2192                 uint32_t arr_size)
2193 {
2194   struct GNUNET_TIME_Relative sum;
2195   uint32_t i;
2196
2197   sum = GNUNET_TIME_UNIT_ZERO;
2198   for ( i = 0 ; i < arr_size ; i++ )
2199   {
2200     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2201   }
2202   return sum;
2203 }
2204
2205
2206 /**
2207  * Compute the average of given time relatives.
2208  */
2209 static struct GNUNET_TIME_Relative
2210 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2211                 uint32_t arr_size)
2212 {
2213   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2214                                                       arr_size),
2215                                       arr_size);
2216 }
2217
2218
2219 /**
2220  * Insert PeerID in #view
2221  *
2222  * Called once we know a peer is live.
2223  * Implements #PeerOp
2224  *
2225  * @return GNUNET_OK if peer was actually inserted
2226  *         GNUNET_NO if peer was not inserted
2227  */
2228 static void
2229 insert_in_view_op (void *cls,
2230                 const struct GNUNET_PeerIdentity *peer);
2231
2232 /**
2233  * Insert PeerID in #view
2234  *
2235  * Called once we know a peer is live.
2236  *
2237  * @return GNUNET_OK if peer was actually inserted
2238  *         GNUNET_NO if peer was not inserted
2239  */
2240 static int
2241 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2242 {
2243   int online;
2244
2245   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2246   if ( (GNUNET_NO == online) ||
2247        (GNUNET_SYSERR == online) ) /* peer is not even known */
2248   {
2249     (void) Peers_issue_peer_liveliness_check (peer);
2250     (void) Peers_schedule_operation (peer, insert_in_view_op);
2251     return GNUNET_NO;
2252   }
2253   /* Open channel towards peer to keep connection open */
2254   Peers_indicate_sending_intention (peer);
2255   return View_put (peer);
2256 }
2257
2258 /**
2259  * Put random peer from sampler into the view as history update.
2260  */
2261 static void
2262 hist_update (void *cls,
2263              struct GNUNET_PeerIdentity *ids,
2264              uint32_t num_peers)
2265 {
2266   unsigned int i;
2267
2268   for (i = 0; i < num_peers; i++)
2269   {
2270     (void) insert_in_view (&ids[i]);
2271     to_file (file_name_view_log,
2272              "+%s\t(hist)",
2273              GNUNET_i2s_full (ids));
2274   }
2275 }
2276
2277
2278 /**
2279  * Wrapper around #RPS_sampler_resize()
2280  *
2281  * If we do not have enough sampler elements, double current sampler size
2282  * If we have more than enough sampler elements, halv current sampler size
2283  */
2284 static void
2285 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2286 {
2287   unsigned int sampler_size;
2288
2289   // TODO statistics
2290   // TODO respect the min, max
2291   sampler_size = RPS_sampler_get_size (sampler);
2292   if (sampler_size > new_size * 4)
2293   { /* Shrinking */
2294     RPS_sampler_resize (sampler, sampler_size / 2);
2295   }
2296   else if (sampler_size < new_size)
2297   { /* Growing */
2298     RPS_sampler_resize (sampler, sampler_size * 2);
2299   }
2300   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2301 }
2302
2303
2304 /**
2305  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2306  */
2307 static void
2308 client_resize_wrapper ()
2309 {
2310   uint32_t bigger_size;
2311
2312   // TODO statistics
2313
2314   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2315
2316   // TODO respect the min, max
2317   resize_wrapper (client_sampler, bigger_size);
2318   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2319       bigger_size);
2320 }
2321
2322
2323 /**
2324  * Estimate request rate
2325  *
2326  * Called every time we receive a request from the client.
2327  */
2328 static void
2329 est_request_rate()
2330 {
2331   struct GNUNET_TIME_Relative max_round_duration;
2332
2333   if (request_deltas_size > req_counter)
2334     req_counter++;
2335   if ( 1 < req_counter)
2336   {
2337     /* Shift last request deltas to the right */
2338     memmove (&request_deltas[1],
2339         request_deltas,
2340         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2341
2342     /* Add current delta to beginning */
2343     request_deltas[0] =
2344         GNUNET_TIME_absolute_get_difference (last_request,
2345                                              GNUNET_TIME_absolute_get ());
2346     request_rate = T_relative_avg (request_deltas, req_counter);
2347     request_rate = (request_rate.rel_value_us < 1) ?
2348       GNUNET_TIME_relative_get_unit_ () : request_rate;
2349
2350     /* Compute the duration a round will maximally take */
2351     max_round_duration =
2352         GNUNET_TIME_relative_add (round_interval,
2353                                   GNUNET_TIME_relative_divide (round_interval, 2));
2354
2355     /* Set the estimated size the sampler has to have to
2356      * satisfy the current client request rate */
2357     sampler_size_client_need =
2358         max_round_duration.rel_value_us / request_rate.rel_value_us;
2359
2360     /* Resize the sampler */
2361     client_resize_wrapper ();
2362   }
2363   last_request = GNUNET_TIME_absolute_get ();
2364 }
2365
2366
2367 /**
2368  * Add all peers in @a peer_array to @a peer_map used as set.
2369  *
2370  * @param peer_array array containing the peers
2371  * @param num_peers number of peers in @peer_array
2372  * @param peer_map the peermap to use as set
2373  */
2374 static void
2375 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2376                        unsigned int num_peers,
2377                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2378 {
2379   unsigned int i;
2380   if (NULL == peer_map)
2381   {
2382     LOG (GNUNET_ERROR_TYPE_WARNING,
2383          "Trying to add peers to non-existing peermap.\n");
2384     return;
2385   }
2386
2387   for (i = 0; i < num_peers; i++)
2388   {
2389     GNUNET_CONTAINER_multipeermap_put (peer_map,
2390                                        &peer_array[i],
2391                                        NULL,
2392                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2393   }
2394 }
2395
2396
2397 /**
2398  * Send a PULL REPLY to @a peer_id
2399  *
2400  * @param peer_id the peer to send the reply to.
2401  * @param peer_ids the peers to send to @a peer_id
2402  * @param num_peer_ids the number of peers to send to @a peer_id
2403  */
2404 static void
2405 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2406                  const struct GNUNET_PeerIdentity *peer_ids,
2407                  unsigned int num_peer_ids)
2408 {
2409   uint32_t send_size;
2410   struct GNUNET_MQ_Envelope *ev;
2411   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2412
2413   /* Compute actual size */
2414   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2415               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2416
2417   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2418     /* Compute number of peers to send
2419      * If too long, simply truncate */
2420     // TODO select random ones via permutation
2421     //      or even better: do good protocol design
2422     send_size =
2423       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2424        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2425        sizeof (struct GNUNET_PeerIdentity);
2426   else
2427     send_size = num_peer_ids;
2428
2429   LOG (GNUNET_ERROR_TYPE_DEBUG,
2430       "Going to send PULL REPLY with %u peers to %s\n",
2431       send_size, GNUNET_i2s (peer_id));
2432
2433   ev = GNUNET_MQ_msg_extra (out_msg,
2434                             send_size * sizeof (struct GNUNET_PeerIdentity),
2435                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2436   out_msg->num_peers = htonl (send_size);
2437   GNUNET_memcpy (&out_msg[1], peer_ids,
2438          send_size * sizeof (struct GNUNET_PeerIdentity));
2439
2440   Peers_send_message (peer_id, ev, "PULL REPLY");
2441   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2442 }
2443
2444
2445 /**
2446  * Insert PeerID in #pull_map
2447  *
2448  * Called once we know a peer is live.
2449  */
2450 static void
2451 insert_in_pull_map (void *cls,
2452                     const struct GNUNET_PeerIdentity *peer)
2453 {
2454   CustomPeerMap_put (pull_map, peer);
2455 }
2456
2457
2458 /**
2459  * Insert PeerID in #view
2460  *
2461  * Called once we know a peer is live.
2462  * Implements #PeerOp
2463  */
2464 static void
2465 insert_in_view_op (void *cls,
2466                 const struct GNUNET_PeerIdentity *peer)
2467 {
2468   (void) insert_in_view (peer);
2469 }
2470
2471
2472 /**
2473  * Update sampler with given PeerID.
2474  * Implements #PeerOp
2475  */
2476 static void
2477 insert_in_sampler (void *cls,
2478                    const struct GNUNET_PeerIdentity *peer)
2479 {
2480   LOG (GNUNET_ERROR_TYPE_DEBUG,
2481        "Updating samplers with peer %s from insert_in_sampler()\n",
2482        GNUNET_i2s (peer));
2483   RPS_sampler_update (prot_sampler,   peer);
2484   RPS_sampler_update (client_sampler, peer);
2485   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2486   {
2487     /* Make sure we 'know' about this peer */
2488     (void) Peers_issue_peer_liveliness_check (peer);
2489     /* Establish a channel towards that peer to indicate we are going to send
2490      * messages to it */
2491     //Peers_indicate_sending_intention (peer);
2492   }
2493 }
2494
2495 /**
2496  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2497  *        If the peer is not known, liveliness check is issued and it is
2498  *        scheduled to be inserted in sampler and view.
2499  *
2500  * "External sources" refer to every source except the gossip.
2501  *
2502  * @param peer peer to insert
2503  */
2504 static void
2505 got_peer (const struct GNUNET_PeerIdentity *peer)
2506 {
2507   /* If we did not know this peer already, insert it into sampler and view */
2508   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2509   {
2510     Peers_schedule_operation (peer, insert_in_sampler);
2511     Peers_schedule_operation (peer, insert_in_view_op);
2512   }
2513 }
2514
2515 /**
2516  * @brief Checks if there is a sending channel and if it is needed
2517  *
2518  * @param peer the peer whose sending channel is checked
2519  * @return GNUNET_YES if sending channel exists and is still needed
2520  *         GNUNET_NO  otherwise
2521  */
2522 static int
2523 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2524 {
2525   /* struct GNUNET_CADET_Channel *channel; */
2526   if (GNUNET_NO == Peers_check_peer_known (peer))
2527   {
2528     return GNUNET_NO;
2529   }
2530   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2531   {
2532     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2533          (GNUNET_YES == View_contains_peer (peer)) ||
2534          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2535          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2536          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2537     { /* If we want to keep the connection to peer open */
2538       return GNUNET_YES;
2539     }
2540     return GNUNET_NO;
2541   }
2542   return GNUNET_NO;
2543 }
2544
2545 /**
2546  * @brief remove peer from our knowledge, the view, push and pull maps and
2547  * samplers.
2548  *
2549  * @param peer the peer to remove
2550  */
2551 static void
2552 remove_peer (const struct GNUNET_PeerIdentity *peer)
2553 {
2554   (void) View_remove_peer (peer);
2555   CustomPeerMap_remove_peer (pull_map, peer);
2556   CustomPeerMap_remove_peer (push_map, peer);
2557   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2558   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2559   Peers_remove_peer (peer);
2560 }
2561
2562
2563 /**
2564  * @brief Remove data that is not needed anymore.
2565  *
2566  * If the sending channel is no longer needed it is destroyed.
2567  *
2568  * @param peer the peer whose data is about to be cleaned
2569  */
2570 static void
2571 clean_peer (const struct GNUNET_PeerIdentity *peer)
2572 {
2573   if (GNUNET_NO == check_sending_channel_needed (peer))
2574   {
2575     LOG (GNUNET_ERROR_TYPE_DEBUG,
2576         "Going to remove send channel to peer %s\n",
2577         GNUNET_i2s (peer));
2578     #ifdef ENABLE_MALICIOUS
2579     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2580       (void) Peers_destroy_sending_channel (peer);
2581     #else /* ENABLE_MALICIOUS */
2582     (void) Peers_destroy_sending_channel (peer);
2583     #endif /* ENABLE_MALICIOUS */
2584   }
2585
2586   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2587        (GNUNET_NO == View_contains_peer (peer)) &&
2588        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2589        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2590        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2591        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2592        (GNUNET_NO != Peers_check_removable (peer)) )
2593   { /* We can safely remove this peer */
2594     LOG (GNUNET_ERROR_TYPE_DEBUG,
2595         "Going to remove peer %s\n",
2596         GNUNET_i2s (peer));
2597     remove_peer (peer);
2598     return;
2599   }
2600 }
2601
2602 /**
2603  * @brief This is called when a channel is destroyed.
2604  *
2605  * Removes peer completely from our knowledge if the send_channel was destroyed
2606  * Otherwise simply delete the recv_channel
2607  * Also check if the knowledge about this peer is still needed.
2608  * If not, remove this peer from our knowledge.
2609  *
2610  * @param cls The closure
2611  * @param channel The channel being closed
2612  * @param channel_ctx The context associated with this channel
2613  */
2614 static void
2615 cleanup_destroyed_channel (void *cls,
2616                            const struct GNUNET_CADET_Channel *channel)
2617 {
2618   struct GNUNET_PeerIdentity *peer = cls;
2619   uint32_t *channel_flag;
2620   struct PeerContext *peer_ctx;
2621
2622   GNUNET_assert (NULL != peer);
2623
2624   if (GNUNET_NO == Peers_check_peer_known (peer))
2625   { /* We don't know a context to that peer */
2626     LOG (GNUNET_ERROR_TYPE_WARNING,
2627          "channel (%s) without associated context was destroyed\n",
2628          GNUNET_i2s (peer));
2629     GNUNET_free (peer);
2630     return;
2631   }
2632
2633   peer_ctx = get_peer_ctx (peer);
2634   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2635   {
2636     LOG (GNUNET_ERROR_TYPE_DEBUG,
2637         "Callback on destruction of recv-channel was called (%s)\n",
2638         GNUNET_i2s (peer));
2639     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2640   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2641   {
2642     LOG (GNUNET_ERROR_TYPE_DEBUG,
2643         "Callback on destruction of send-channel was called (%s)\n",
2644         GNUNET_i2s (peer));
2645     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2646   } else {
2647     LOG (GNUNET_ERROR_TYPE_ERROR,
2648         "Channel to be destroyed has is neither sending nor receiving role\n");
2649   }
2650
2651   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2652   { /* We are in the middle of removing that peer from our knowledge. In this
2653        case simply make sure that the channels are cleaned. */
2654     Peers_cleanup_destroyed_channel (cls, channel);
2655     to_file (file_name_view_log,
2656              "-%s\t(cleanup channel, ourself)",
2657              GNUNET_i2s_full (peer));
2658     GNUNET_free (peer);
2659     return;
2660   }
2661
2662   if (GNUNET_YES ==
2663       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2664   { /* Channel used for sending was destroyed */
2665     /* Possible causes of channel destruction:
2666      *  - ourselves  -> cleaning send channel -> clean context
2667      *  - other peer -> peer probably went down -> remove
2668      */
2669     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2670     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2671     { /* We are about to clean the sending channel. Clean the respective
2672        * context */
2673       Peers_cleanup_destroyed_channel (cls, channel);
2674       GNUNET_free (peer);
2675       return;
2676     }
2677     else
2678     { /* Other peer destroyed our sending channel that he is supposed to keep
2679        * open. It probably went down. Remove it from our knowledge. */
2680       Peers_cleanup_destroyed_channel (cls, channel);
2681       remove_peer (peer);
2682       GNUNET_free (peer);
2683       return;
2684     }
2685   }
2686   else if (GNUNET_YES ==
2687       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2688   { /* Channel used for receiving was destroyed */
2689     /* Possible causes of channel destruction:
2690      *  - ourselves  -> peer tried to establish channel twice -> clean context
2691      *  - other peer -> peer doesn't want to send us data -> clean
2692      */
2693     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2694     if (GNUNET_YES ==
2695         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2696     { /* Other peer tried to establish a channel to us twice. We do not accept
2697        * that. Clean the context. */
2698       Peers_cleanup_destroyed_channel (cls, channel);
2699       GNUNET_free (peer);
2700       return;
2701     }
2702     else
2703     { /* Other peer doesn't want to send us data anymore. We are free to clean
2704        * it. */
2705       Peers_cleanup_destroyed_channel (cls, channel);
2706       clean_peer (peer);
2707       GNUNET_free (peer);
2708       return;
2709     }
2710   }
2711   else
2712   {
2713     LOG (GNUNET_ERROR_TYPE_WARNING,
2714         "Destroyed channel is neither sending nor receiving channel\n");
2715   }
2716   GNUNET_free (peer);
2717 }
2718
2719 /***********************************************************************
2720  * /Util functions
2721 ***********************************************************************/
2722
2723 static void
2724 destroy_reply_cls (struct ReplyCls *rep_cls)
2725 {
2726   struct ClientContext *cli_ctx;
2727
2728   cli_ctx = rep_cls->cli_ctx;
2729   GNUNET_assert (NULL != cli_ctx);
2730   if (NULL != rep_cls->req_handle)
2731   {
2732     RPS_sampler_request_cancel (rep_cls->req_handle);
2733   }
2734   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2735                                cli_ctx->rep_cls_tail,
2736                                rep_cls);
2737   GNUNET_free (rep_cls);
2738 }
2739
2740
2741 static void
2742 destroy_cli_ctx (struct ClientContext *cli_ctx)
2743 {
2744   GNUNET_assert (NULL != cli_ctx);
2745   if (NULL != cli_ctx->rep_cls_head)
2746   {
2747     LOG (GNUNET_ERROR_TYPE_WARNING,
2748          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2749     while (NULL != cli_ctx->rep_cls_head)
2750       destroy_reply_cls (cli_ctx->rep_cls_head);
2751   }
2752   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2753                                cli_ctx_tail,
2754                                cli_ctx);
2755   GNUNET_free (cli_ctx);
2756 }
2757
2758
2759 /**
2760  * Function called by NSE.
2761  *
2762  * Updates sizes of sampler list and view and adapt those lists
2763  * accordingly.
2764  */
2765 static void
2766 nse_callback (void *cls,
2767               struct GNUNET_TIME_Absolute timestamp,
2768               double logestimate, double std_dev)
2769 {
2770   double estimate;
2771   //double scale; // TODO this might go gloabal/config
2772
2773   LOG (GNUNET_ERROR_TYPE_DEBUG,
2774        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2775        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2776   //scale = .01;
2777   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2778   // GNUNET_NSE_log_estimate_to_n (logestimate);
2779   estimate = pow (estimate, 1.0 / 3);
2780   // TODO add if std_dev is a number
2781   // estimate += (std_dev * scale);
2782   if (sampler_size_est_min < ceil (estimate))
2783   {
2784     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2785     sampler_size_est_need = estimate;
2786   } else
2787   {
2788     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2789     sampler_size_est_need = sampler_size_est_min;
2790   }
2791
2792   /* If the NSE has changed adapt the lists accordingly */
2793   resize_wrapper (prot_sampler, sampler_size_est_need);
2794   client_resize_wrapper ();
2795 }
2796
2797
2798 /**
2799  * Callback called once the requested PeerIDs are ready.
2800  *
2801  * Sends those to the requesting client.
2802  */
2803 static void
2804 client_respond (void *cls,
2805                 struct GNUNET_PeerIdentity *peer_ids,
2806                 uint32_t num_peers)
2807 {
2808   struct ReplyCls *reply_cls = cls;
2809   uint32_t i;
2810   struct GNUNET_MQ_Envelope *ev;
2811   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2812   uint32_t size_needed;
2813   struct ClientContext *cli_ctx;
2814
2815   GNUNET_assert (NULL != reply_cls);
2816   LOG (GNUNET_ERROR_TYPE_DEBUG,
2817        "sampler returned %" PRIu32 " peers:\n",
2818        num_peers);
2819   for (i = 0; i < num_peers; i++)
2820   {
2821     LOG (GNUNET_ERROR_TYPE_DEBUG,
2822          "  %" PRIu32 ": %s\n",
2823          i,
2824          GNUNET_i2s (&peer_ids[i]));
2825   }
2826
2827   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2828                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2829
2830   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2831
2832   ev = GNUNET_MQ_msg_extra (out_msg,
2833                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2834                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2835   out_msg->num_peers = htonl (num_peers);
2836   out_msg->id = htonl (reply_cls->id);
2837
2838   GNUNET_memcpy (&out_msg[1],
2839           peer_ids,
2840           num_peers * sizeof (struct GNUNET_PeerIdentity));
2841   GNUNET_free (peer_ids);
2842
2843   cli_ctx = reply_cls->cli_ctx;
2844   GNUNET_assert (NULL != cli_ctx);
2845   reply_cls->req_handle = NULL;
2846   destroy_reply_cls (reply_cls);
2847   GNUNET_MQ_send (cli_ctx->mq, ev);
2848 }
2849
2850
2851 /**
2852  * Handle RPS request from the client.
2853  *
2854  * @param cls closure
2855  * @param message the actual message
2856  */
2857 static void
2858 handle_client_request (void *cls,
2859                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2860 {
2861   struct ClientContext *cli_ctx = cls;
2862   uint32_t num_peers;
2863   uint32_t size_needed;
2864   struct ReplyCls *reply_cls;
2865   uint32_t i;
2866
2867   num_peers = ntohl (msg->num_peers);
2868   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2869                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2870
2871   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2872   {
2873     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2874                 "Message received from client has size larger than expected\n");
2875     GNUNET_SERVICE_client_drop (cli_ctx->client);
2876     return;
2877   }
2878
2879   for (i = 0 ; i < num_peers ; i++)
2880     est_request_rate();
2881
2882   LOG (GNUNET_ERROR_TYPE_DEBUG,
2883        "Client requested %" PRIu32 " random peer(s).\n",
2884        num_peers);
2885
2886   reply_cls = GNUNET_new (struct ReplyCls);
2887   reply_cls->id = ntohl (msg->id);
2888   reply_cls->cli_ctx = cli_ctx;
2889   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2890                                                         client_respond,
2891                                                         reply_cls,
2892                                                         num_peers);
2893
2894   GNUNET_assert (NULL != cli_ctx);
2895   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2896                                cli_ctx->rep_cls_tail,
2897                                reply_cls);
2898   GNUNET_SERVICE_client_continue (cli_ctx->client);
2899 }
2900
2901
2902 /**
2903  * @brief Handle a message that requests the cancellation of a request
2904  *
2905  * @param cls unused
2906  * @param message the message containing the id of the request
2907  */
2908 static void
2909 handle_client_request_cancel (void *cls,
2910                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2911 {
2912   struct ClientContext *cli_ctx = cls;
2913   struct ReplyCls *rep_cls;
2914
2915   GNUNET_assert (NULL != cli_ctx);
2916   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2917   rep_cls = cli_ctx->rep_cls_head;
2918   LOG (GNUNET_ERROR_TYPE_DEBUG,
2919       "Client cancels request with id %" PRIu32 "\n",
2920       ntohl (msg->id));
2921   while ( (NULL != rep_cls->next) &&
2922           (rep_cls->id != ntohl (msg->id)) )
2923     rep_cls = rep_cls->next;
2924   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2925   destroy_reply_cls (rep_cls);
2926   GNUNET_SERVICE_client_continue (cli_ctx->client);
2927 }
2928
2929
2930 /**
2931  * @brief This function is called, when the client seeds peers.
2932  * It verifies that @a msg is well-formed.
2933  *
2934  * @param cls the closure (#ClientContext)
2935  * @param msg the message
2936  * @return #GNUNET_OK if @a msg is well-formed
2937  */
2938 static int
2939 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2940 {
2941   struct ClientContext *cli_ctx = cls;
2942   uint16_t msize = ntohs (msg->header.size);
2943   uint32_t num_peers = ntohl (msg->num_peers);
2944
2945   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2946   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2947        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2948   {
2949     GNUNET_break (0);
2950     GNUNET_SERVICE_client_drop (cli_ctx->client);
2951     return GNUNET_SYSERR;
2952   }
2953   return GNUNET_OK;
2954 }
2955
2956
2957 /**
2958  * Handle seed from the client.
2959  *
2960  * @param cls closure
2961  * @param message the actual message
2962  */
2963 static void
2964 handle_client_seed (void *cls,
2965                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2966 {
2967   struct ClientContext *cli_ctx = cls;
2968   struct GNUNET_PeerIdentity *peers;
2969   uint32_t num_peers;
2970   uint32_t i;
2971
2972   num_peers = ntohl (msg->num_peers);
2973   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2974   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
2975   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
2976
2977   LOG (GNUNET_ERROR_TYPE_DEBUG,
2978        "Client seeded peers:\n");
2979   print_peer_list (peers, num_peers);
2980
2981   for (i = 0; i < num_peers; i++)
2982   {
2983     LOG (GNUNET_ERROR_TYPE_DEBUG,
2984          "Updating samplers with seed %" PRIu32 ": %s\n",
2985          i,
2986          GNUNET_i2s (&peers[i]));
2987
2988     got_peer (&peers[i]);
2989   }
2990
2991   ////GNUNET_free (peers);
2992
2993   GNUNET_SERVICE_client_continue (cli_ctx->client);
2994 }
2995
2996 /**
2997  * @brief Send view to client
2998  *
2999  * @param cli_ctx the context of the client
3000  * @param view_array the peerids of the view as array (can be empty)
3001  * @param view_size the size of the view array (can be 0)
3002  */
3003 void
3004 send_view (const struct ClientContext *cli_ctx,
3005            const struct GNUNET_PeerIdentity *view_array,
3006            uint64_t view_size)
3007 {
3008   struct GNUNET_MQ_Envelope *ev;
3009   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3010
3011   if (NULL == view_array)
3012   {
3013     view_size = View_size ();
3014     view_array = View_get_as_array();
3015   }
3016
3017   ev = GNUNET_MQ_msg_extra (out_msg,
3018                             view_size * sizeof (struct GNUNET_PeerIdentity),
3019                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3020   out_msg->num_peers = htonl (view_size);
3021
3022   GNUNET_memcpy (&out_msg[1],
3023           view_array,
3024           view_size * sizeof (struct GNUNET_PeerIdentity));
3025   GNUNET_MQ_send (cli_ctx->mq, ev);
3026 }
3027
3028 /**
3029  * @brief sends updates to clients that are interested
3030  */
3031 static void
3032 clients_notify_view_update (void)
3033 {
3034   struct ClientContext *cli_ctx_iter;
3035   uint64_t num_peers;
3036   const struct GNUNET_PeerIdentity *view_array;
3037
3038   num_peers = View_size ();
3039   view_array = View_get_as_array();
3040   /* check size of view is small enough */
3041   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3042   {
3043     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3044                 "View is too big to send\n");
3045     return;
3046   }
3047
3048   for (cli_ctx_iter = cli_ctx_head;
3049        NULL != cli_ctx_iter;
3050        cli_ctx_iter = cli_ctx_head->next)
3051   {
3052     if (1 < cli_ctx_iter->view_updates_left)
3053     {
3054       /* Client wants to receive limited amount of updates */
3055       cli_ctx_iter->view_updates_left -= 1;
3056     } else if (1 == cli_ctx_iter->view_updates_left)
3057     {
3058       /* Last update of view for client */
3059       cli_ctx_iter->view_updates_left = -1;
3060     } else if (0 > cli_ctx_iter->view_updates_left) {
3061       /* Client is not interested in updates */
3062       continue;
3063     }
3064     /* else _updates_left == 0 - infinite amount of updates */
3065
3066     /* send view */
3067     send_view (cli_ctx_iter, view_array, num_peers);
3068   }
3069 }
3070
3071
3072 /**
3073  * Handle RPS request from the client.
3074  *
3075  * @param cls closure
3076  * @param message the actual message
3077  */
3078 static void
3079 handle_client_view_request (void *cls,
3080                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3081 {
3082   struct ClientContext *cli_ctx = cls;
3083   uint64_t num_updates;
3084
3085   num_updates = ntohl (msg->num_updates);
3086
3087   LOG (GNUNET_ERROR_TYPE_DEBUG,
3088        "Client requested %" PRIu64 " updates of view.\n",
3089        num_updates);
3090
3091   GNUNET_assert (NULL != cli_ctx);
3092   cli_ctx->view_updates_left = num_updates;
3093   send_view (cli_ctx, NULL, 0);
3094   GNUNET_SERVICE_client_continue (cli_ctx->client);
3095 }
3096
3097 /**
3098  * Handle a CHECK_LIVE message from another peer.
3099  *
3100  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3101  * the channel is blocked for all other communication.
3102  *
3103  * @param cls Closure
3104  * @param msg The message header
3105  */
3106 static void
3107 handle_peer_check (void *cls,
3108                    const struct GNUNET_MessageHeader *msg)
3109 {
3110   const struct GNUNET_PeerIdentity *peer = cls;
3111   LOG (GNUNET_ERROR_TYPE_DEBUG,
3112       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3113
3114   GNUNET_break_op (Peers_check_peer_known (peer));
3115   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3116 }
3117
3118 /**
3119  * Handle a PUSH message from another peer.
3120  *
3121  * Check the proof of work and store the PeerID
3122  * in the temporary list for pushed PeerIDs.
3123  *
3124  * @param cls Closure
3125  * @param msg The message header
3126  */
3127 static void
3128 handle_peer_push (void *cls,
3129                   const struct GNUNET_MessageHeader *msg)
3130 {
3131   const struct GNUNET_PeerIdentity *peer = cls;
3132
3133   // (check the proof of work (?))
3134
3135   LOG (GNUNET_ERROR_TYPE_DEBUG,
3136        "Received PUSH (%s)\n",
3137        GNUNET_i2s (peer));
3138   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3139
3140   #ifdef ENABLE_MALICIOUS
3141   struct AttackedPeer *tmp_att_peer;
3142
3143   if ( (1 == mal_type) ||
3144        (3 == mal_type) )
3145   { /* Try to maximise representation */
3146     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3147     tmp_att_peer->peer_id = *peer;
3148     if (NULL == att_peer_set)
3149       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3150     if (GNUNET_NO ==
3151         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3152                                                 peer))
3153     {
3154       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3155                                    att_peers_tail,
3156                                    tmp_att_peer);
3157       add_peer_array_to_set (peer, 1, att_peer_set);
3158     }
3159   }
3160
3161
3162   else if (2 == mal_type)
3163   {
3164     /* We attack one single well-known peer - simply ignore */
3165   }
3166   #endif /* ENABLE_MALICIOUS */
3167
3168   /* Add the sending peer to the push_map */
3169   CustomPeerMap_put (push_map, peer);
3170
3171   GNUNET_break_op (Peers_check_peer_known (peer));
3172   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3173 }
3174
3175
3176 /**
3177  * Handle PULL REQUEST request message from another peer.
3178  *
3179  * Reply with the view of PeerIDs.
3180  *
3181  * @param cls Closure
3182  * @param msg The message header
3183  */
3184 static void
3185 handle_peer_pull_request (void *cls,
3186                           const struct GNUNET_MessageHeader *msg)
3187 {
3188   struct GNUNET_PeerIdentity *peer = cls;
3189   const struct GNUNET_PeerIdentity *view_array;
3190
3191   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3192   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3193
3194   #ifdef ENABLE_MALICIOUS
3195   if (1 == mal_type
3196       || 3 == mal_type)
3197   { /* Try to maximise representation */
3198     send_pull_reply (peer, mal_peers, num_mal_peers);
3199   }
3200
3201   else if (2 == mal_type)
3202   { /* Try to partition network */
3203     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3204     {
3205       send_pull_reply (peer, mal_peers, num_mal_peers);
3206     }
3207   }
3208   #endif /* ENABLE_MALICIOUS */
3209
3210   GNUNET_break_op (Peers_check_peer_known (peer));
3211   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3212   view_array = View_get_as_array ();
3213   send_pull_reply (peer, view_array, View_size ());
3214 }
3215
3216
3217 /**
3218  * Check whether we sent a corresponding request and
3219  * whether this reply is the first one.
3220  *
3221  * @param cls Closure
3222  * @param msg The message header
3223  */
3224 static int
3225 check_peer_pull_reply (void *cls,
3226                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3227 {
3228   struct GNUNET_PeerIdentity *sender = cls;
3229
3230   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3231   {
3232     GNUNET_break_op (0);
3233     return GNUNET_SYSERR;
3234   }
3235
3236   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3237       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3238   {
3239     LOG (GNUNET_ERROR_TYPE_ERROR,
3240         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3241         ntohl (msg->num_peers),
3242         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3243             sizeof (struct GNUNET_PeerIdentity));
3244     GNUNET_break_op (0);
3245     return GNUNET_SYSERR;
3246   }
3247
3248   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3249   {
3250     LOG (GNUNET_ERROR_TYPE_WARNING,
3251         "Received a pull reply from a peer we didn't request one from!\n");
3252     GNUNET_break_op (0);
3253     return GNUNET_SYSERR;
3254   }
3255   return GNUNET_OK;
3256 }
3257
3258 /**
3259  * Handle PULL REPLY message from another peer.
3260  *
3261  * @param cls Closure
3262  * @param msg The message header
3263  */
3264 static void
3265 handle_peer_pull_reply (void *cls,
3266                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3267 {
3268   const struct GNUNET_PeerIdentity *peers;
3269   struct GNUNET_PeerIdentity *sender = cls;
3270   uint32_t i;
3271 #ifdef ENABLE_MALICIOUS
3272   struct AttackedPeer *tmp_att_peer;
3273 #endif /* ENABLE_MALICIOUS */
3274
3275   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3276   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3277
3278   #ifdef ENABLE_MALICIOUS
3279   // We shouldn't even receive pull replies as we're not sending
3280   if (2 == mal_type)
3281   {
3282   }
3283   #endif /* ENABLE_MALICIOUS */
3284
3285   /* Do actual logic */
3286   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3287
3288   LOG (GNUNET_ERROR_TYPE_DEBUG,
3289        "PULL REPLY received, got following %u peers:\n",
3290        ntohl (msg->num_peers));
3291
3292   for (i = 0; i < ntohl (msg->num_peers); i++)
3293   {
3294     LOG (GNUNET_ERROR_TYPE_DEBUG,
3295          "%u. %s\n",
3296          i,
3297          GNUNET_i2s (&peers[i]));
3298
3299     #ifdef ENABLE_MALICIOUS
3300     if ((NULL != att_peer_set) &&
3301         (1 == mal_type || 3 == mal_type))
3302     { /* Add attacked peer to local list */
3303       // TODO check if we sent a request and this was the first reply
3304       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3305                                                                &peers[i])
3306           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3307                                                                   &peers[i])
3308           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3309                                                    &own_identity))
3310       {
3311         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3312         tmp_att_peer->peer_id = peers[i];
3313         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3314                                      att_peers_tail,
3315                                      tmp_att_peer);
3316         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3317       }
3318       continue;
3319     }
3320     #endif /* ENABLE_MALICIOUS */
3321     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3322                                               &peers[i]))
3323     {
3324       /* Make sure we 'know' about this peer */
3325       (void) Peers_insert_peer (&peers[i]);
3326
3327       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3328       {
3329         CustomPeerMap_put (pull_map, &peers[i]);
3330       }
3331       else
3332       {
3333         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3334         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3335       }
3336     }
3337   }
3338
3339   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3340   clean_peer (sender);
3341
3342   GNUNET_break_op (Peers_check_peer_known (sender));
3343   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3344 }
3345
3346
3347 /**
3348  * Compute a random delay.
3349  * A uniformly distributed value between mean + spread and mean - spread.
3350  *
3351  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3352  * It would return a random value between 2 and 6 min.
3353  *
3354  * @param mean the mean
3355  * @param spread the inverse amount of deviation from the mean
3356  */
3357 static struct GNUNET_TIME_Relative
3358 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3359                     unsigned int spread)
3360 {
3361   struct GNUNET_TIME_Relative half_interval;
3362   struct GNUNET_TIME_Relative ret;
3363   unsigned int rand_delay;
3364   unsigned int max_rand_delay;
3365
3366   if (0 == spread)
3367   {
3368     LOG (GNUNET_ERROR_TYPE_WARNING,
3369          "Not accepting spread of 0\n");
3370     GNUNET_break (0);
3371     GNUNET_assert (0);
3372   }
3373   GNUNET_assert (0 != mean.rel_value_us);
3374
3375   /* Compute random time value between spread * mean and spread * mean */
3376   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3377
3378   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3379   /**
3380    * Compute random value between (0 and 1) * round_interval
3381    * via multiplying round_interval with a 'fraction' (0 to value)/value
3382    */
3383   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3384   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3385   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3386   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3387
3388   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3389     LOG (GNUNET_ERROR_TYPE_WARNING,
3390          "Returning FOREVER_REL\n");
3391
3392   return ret;
3393 }
3394
3395
3396 /**
3397  * Send single pull request
3398  *
3399  * @param peer_id the peer to send the pull request to.
3400  */
3401 static void
3402 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3403 {
3404   struct GNUNET_MQ_Envelope *ev;
3405
3406   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3407                                                      Peers_PULL_REPLY_PENDING));
3408   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3409
3410   LOG (GNUNET_ERROR_TYPE_DEBUG,
3411        "Going to send PULL REQUEST to peer %s.\n",
3412        GNUNET_i2s (peer));
3413
3414   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3415   Peers_send_message (peer, ev, "PULL REQUEST");
3416   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3417 }
3418
3419
3420 /**
3421  * Send single push
3422  *
3423  * @param peer_id the peer to send the push to.
3424  */
3425 static void
3426 send_push (const struct GNUNET_PeerIdentity *peer_id)
3427 {
3428   struct GNUNET_MQ_Envelope *ev;
3429
3430   LOG (GNUNET_ERROR_TYPE_DEBUG,
3431        "Going to send PUSH to peer %s.\n",
3432        GNUNET_i2s (peer_id));
3433
3434   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3435   Peers_send_message (peer_id, ev, "PUSH");
3436   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3437 }
3438
3439
3440 static void
3441 do_round (void *cls);
3442
3443 static void
3444 do_mal_round (void *cls);
3445
3446 #ifdef ENABLE_MALICIOUS
3447
3448
3449 /**
3450  * @brief This function is called, when the client tells us to act malicious.
3451  * It verifies that @a msg is well-formed.
3452  *
3453  * @param cls the closure (#ClientContext)
3454  * @param msg the message
3455  * @return #GNUNET_OK if @a msg is well-formed
3456  */
3457 static int
3458 check_client_act_malicious (void *cls,
3459                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3460 {
3461   struct ClientContext *cli_ctx = cls;
3462   uint16_t msize = ntohs (msg->header.size);
3463   uint32_t num_peers = ntohl (msg->num_peers);
3464
3465   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3466   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3467        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3468   {
3469     LOG (GNUNET_ERROR_TYPE_ERROR,
3470         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3471         ntohl (msg->num_peers),
3472         (msize / sizeof (struct GNUNET_PeerIdentity)));
3473     GNUNET_break (0);
3474     GNUNET_SERVICE_client_drop (cli_ctx->client);
3475     return GNUNET_SYSERR;
3476   }
3477   return GNUNET_OK;
3478 }
3479
3480 /**
3481  * Turn RPS service to act malicious.
3482  *
3483  * @param cls Closure
3484  * @param client The client that sent the message
3485  * @param msg The message header
3486  */
3487 static void
3488 handle_client_act_malicious (void *cls,
3489                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3490 {
3491   struct ClientContext *cli_ctx = cls;
3492   struct GNUNET_PeerIdentity *peers;
3493   uint32_t num_mal_peers_sent;
3494   uint32_t num_mal_peers_old;
3495
3496   /* Do actual logic */
3497   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3498   mal_type = ntohl (msg->type);
3499   if (NULL == mal_peer_set)
3500     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3501
3502   LOG (GNUNET_ERROR_TYPE_DEBUG,
3503        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3504        mal_type,
3505        ntohl (msg->num_peers));
3506
3507   if (1 == mal_type)
3508   { /* Try to maximise representation */
3509     /* Add other malicious peers to those we already know */
3510
3511     num_mal_peers_sent = ntohl (msg->num_peers);
3512     num_mal_peers_old = num_mal_peers;
3513     GNUNET_array_grow (mal_peers,
3514                        num_mal_peers,
3515                        num_mal_peers + num_mal_peers_sent);
3516     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3517             peers,
3518             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3519
3520     /* Add all mal peers to mal_peer_set */
3521     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3522                            num_mal_peers_sent,
3523                            mal_peer_set);
3524
3525     /* Substitute do_round () with do_mal_round () */
3526     GNUNET_SCHEDULER_cancel (do_round_task);
3527     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3528   }
3529
3530   else if ( (2 == mal_type) ||
3531             (3 == mal_type) )
3532   { /* Try to partition the network */
3533     /* Add other malicious peers to those we already know */
3534
3535     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3536     num_mal_peers_old = num_mal_peers;
3537     GNUNET_array_grow (mal_peers,
3538                        num_mal_peers,
3539                        num_mal_peers + num_mal_peers_sent);
3540     if (NULL != mal_peers &&
3541         0 != num_mal_peers)
3542     {
3543       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3544               peers,
3545               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3546
3547       /* Add all mal peers to mal_peer_set */
3548       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3549                              num_mal_peers_sent,
3550                              mal_peer_set);
3551     }
3552
3553     /* Store the one attacked peer */
3554     GNUNET_memcpy (&attacked_peer,
3555             &msg->attacked_peer,
3556             sizeof (struct GNUNET_PeerIdentity));
3557     /* Set the flag of the attacked peer to valid to avoid problems */
3558     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3559     {
3560       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3561     }
3562
3563     LOG (GNUNET_ERROR_TYPE_DEBUG,
3564          "Attacked peer is %s\n",
3565          GNUNET_i2s (&attacked_peer));
3566
3567     /* Substitute do_round () with do_mal_round () */
3568     GNUNET_SCHEDULER_cancel (do_round_task);
3569     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3570   }
3571   else if (0 == mal_type)
3572   { /* Stop acting malicious */
3573     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3574
3575     /* Substitute do_mal_round () with do_round () */
3576     GNUNET_SCHEDULER_cancel (do_round_task);
3577     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3578   }
3579   else
3580   {
3581     GNUNET_break (0);
3582     GNUNET_SERVICE_client_continue (cli_ctx->client);
3583   }
3584   GNUNET_SERVICE_client_continue (cli_ctx->client);
3585 }
3586
3587
3588 /**
3589  * Send out PUSHes and PULLs maliciously.
3590  *
3591  * This is executed regylary.
3592  */
3593 static void
3594 do_mal_round (void *cls)
3595 {
3596   uint32_t num_pushes;
3597   uint32_t i;
3598   struct GNUNET_TIME_Relative time_next_round;
3599   struct AttackedPeer *tmp_att_peer;
3600
3601   LOG (GNUNET_ERROR_TYPE_DEBUG,
3602        "Going to execute next round maliciously type %" PRIu32 ".\n",
3603       mal_type);
3604   do_round_task = NULL;
3605   GNUNET_assert (mal_type <= 3);
3606   /* Do malicious actions */
3607   if (1 == mal_type)
3608   { /* Try to maximise representation */
3609
3610     /* The maximum of pushes we're going to send this round */
3611     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3612                                          num_attacked_peers),
3613                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3614
3615     LOG (GNUNET_ERROR_TYPE_DEBUG,
3616          "Going to send %" PRIu32 " pushes\n",
3617          num_pushes);
3618
3619     /* Send PUSHes to attacked peers */
3620     for (i = 0 ; i < num_pushes ; i++)
3621     {
3622       if (att_peers_tail == att_peer_index)
3623         att_peer_index = att_peers_head;
3624       else
3625         att_peer_index = att_peer_index->next;
3626
3627       send_push (&att_peer_index->peer_id);
3628     }
3629
3630     /* Send PULLs to some peers to learn about additional peers to attack */
3631     tmp_att_peer = att_peer_index;
3632     for (i = 0 ; i < num_pushes * alpha ; i++)
3633     {
3634       if (att_peers_tail == tmp_att_peer)
3635         tmp_att_peer = att_peers_head;
3636       else
3637         att_peer_index = tmp_att_peer->next;
3638
3639       send_pull_request (&tmp_att_peer->peer_id);
3640     }
3641   }
3642
3643
3644   else if (2 == mal_type)
3645   { /**
3646      * Try to partition the network
3647      * Send as many pushes to the attacked peer as possible
3648      * That is one push per round as it will ignore more.
3649      */
3650     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3651     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3652       send_push (&attacked_peer);
3653   }
3654
3655
3656   if (3 == mal_type)
3657   { /* Combined attack */
3658
3659     /* Send PUSH to attacked peers */
3660     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3661     {
3662       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3663       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3664       {
3665         LOG (GNUNET_ERROR_TYPE_DEBUG,
3666             "Goding to send push to attacked peer (%s)\n",
3667             GNUNET_i2s (&attacked_peer));
3668         send_push (&attacked_peer);
3669       }
3670     }
3671     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3672
3673     /* The maximum of pushes we're going to send this round */
3674     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3675                                          num_attacked_peers),
3676                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3677
3678     LOG (GNUNET_ERROR_TYPE_DEBUG,
3679          "Going to send %" PRIu32 " pushes\n",
3680          num_pushes);
3681
3682     for (i = 0; i < num_pushes; i++)
3683     {
3684       if (att_peers_tail == att_peer_index)
3685         att_peer_index = att_peers_head;
3686       else
3687         att_peer_index = att_peer_index->next;
3688
3689       send_push (&att_peer_index->peer_id);
3690     }
3691
3692     /* Send PULLs to some peers to learn about additional peers to attack */
3693     tmp_att_peer = att_peer_index;
3694     for (i = 0; i < num_pushes * alpha; i++)
3695     {
3696       if (att_peers_tail == tmp_att_peer)
3697         tmp_att_peer = att_peers_head;
3698       else
3699         att_peer_index = tmp_att_peer->next;
3700
3701       send_pull_request (&tmp_att_peer->peer_id);
3702     }
3703   }
3704
3705   /* Schedule next round */
3706   time_next_round = compute_rand_delay (round_interval, 2);
3707
3708   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3709   //NULL);
3710   GNUNET_assert (NULL == do_round_task);
3711   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3712                                                 &do_mal_round, NULL);
3713   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3714 }
3715 #endif /* ENABLE_MALICIOUS */
3716
3717 /**
3718  * Send out PUSHes and PULLs, possibly update #view, samplers.
3719  *
3720  * This is executed regylary.
3721  */
3722 static void
3723 do_round (void *cls)
3724 {
3725   uint32_t i;
3726   const struct GNUNET_PeerIdentity *view_array;
3727   unsigned int *permut;
3728   unsigned int a_peers; /* Number of peers we send pushes to */
3729   unsigned int b_peers; /* Number of peers we send pull requests to */
3730   uint32_t first_border;
3731   uint32_t second_border;
3732   struct GNUNET_PeerIdentity peer;
3733   struct GNUNET_PeerIdentity *update_peer;
3734
3735   LOG (GNUNET_ERROR_TYPE_DEBUG,
3736        "Going to execute next round.\n");
3737   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3738   do_round_task = NULL;
3739   LOG (GNUNET_ERROR_TYPE_DEBUG,
3740        "Printing view:\n");
3741   to_file (file_name_view_log,
3742            "___ new round ___");
3743   view_array = View_get_as_array ();
3744   for (i = 0; i < View_size (); i++)
3745   {
3746     LOG (GNUNET_ERROR_TYPE_DEBUG,
3747          "\t%s\n", GNUNET_i2s (&view_array[i]));
3748     to_file (file_name_view_log,
3749              "=%s\t(do round)",
3750              GNUNET_i2s_full (&view_array[i]));
3751   }
3752
3753
3754   /* Send pushes and pull requests */
3755   if (0 < View_size ())
3756   {
3757     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3758                                            View_size ());
3759
3760     /* Send PUSHes */
3761     a_peers = ceil (alpha * View_size ());
3762
3763     LOG (GNUNET_ERROR_TYPE_DEBUG,
3764          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3765          a_peers, alpha, View_size ());
3766     for (i = 0; i < a_peers; i++)
3767     {
3768       peer = view_array[permut[i]];
3769       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3770       { // FIXME if this fails schedule/loop this for later
3771         send_push (&peer);
3772       }
3773     }
3774
3775     /* Send PULL requests */
3776     b_peers = ceil (beta * View_size ());
3777     first_border = a_peers;
3778     second_border = a_peers + b_peers;
3779     if (second_border > View_size ())
3780     {
3781       first_border = View_size () - b_peers;
3782       second_border = View_size ();
3783     }
3784     LOG (GNUNET_ERROR_TYPE_DEBUG,
3785         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3786         b_peers, beta, View_size ());
3787     for (i = first_border; i < second_border; i++)
3788     {
3789       peer = view_array[permut[i]];
3790       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3791           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3792       { // FIXME if this fails schedule/loop this for later
3793         send_pull_request (&peer);
3794       }
3795     }
3796
3797     GNUNET_free (permut);
3798     permut = NULL;
3799   }
3800
3801
3802   /* Update view */
3803   /* TODO see how many peers are in push-/pull- list! */
3804
3805   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
3806       (0 < CustomPeerMap_size (push_map)) &&
3807       (0 < CustomPeerMap_size (pull_map)))
3808   { /* If conditions for update are fulfilled, update */
3809     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3810
3811     uint32_t final_size;
3812     uint32_t peers_to_clean_size;
3813     struct GNUNET_PeerIdentity *peers_to_clean;
3814
3815     peers_to_clean = NULL;
3816     peers_to_clean_size = 0;
3817     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3818     GNUNET_memcpy (peers_to_clean,
3819             view_array,
3820             View_size () * sizeof (struct GNUNET_PeerIdentity));
3821
3822     /* Seems like recreating is the easiest way of emptying the peermap */
3823     View_clear ();
3824     to_file (file_name_view_log,
3825              "--- emptied ---");
3826
3827     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
3828                                 CustomPeerMap_size (push_map));
3829     second_border = first_border +
3830                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
3831                                 CustomPeerMap_size (pull_map));
3832     final_size    = second_border +
3833       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
3834
3835     /* Update view with peers received through PUSHes */
3836     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3837                                            CustomPeerMap_size (push_map));
3838     for (i = 0; i < first_border; i++)
3839     {
3840       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3841                                                               permut[i]));
3842       to_file (file_name_view_log,
3843                "+%s\t(push list)",
3844                GNUNET_i2s_full (&view_array[i]));
3845       // TODO change the peer_flags accordingly
3846     }
3847     GNUNET_free (permut);
3848     permut = NULL;
3849
3850     /* Update view with peers received through PULLs */
3851     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3852                                            CustomPeerMap_size (pull_map));
3853     for (i = first_border; i < second_border; i++)
3854     {
3855       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3856             permut[i - first_border]));
3857       to_file (file_name_view_log,
3858                "+%s\t(pull list)",
3859                GNUNET_i2s_full (&view_array[i]));
3860       // TODO change the peer_flags accordingly
3861     }
3862     GNUNET_free (permut);
3863     permut = NULL;
3864
3865     /* Update view with peers from history */
3866     RPS_sampler_get_n_rand_peers (prot_sampler,
3867                                   hist_update,
3868                                   NULL,
3869                                   final_size - second_border);
3870     // TODO change the peer_flags accordingly
3871
3872     for (i = 0; i < View_size (); i++)
3873       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3874
3875     /* Clean peers that were removed from the view */
3876     for (i = 0; i < peers_to_clean_size; i++)
3877     {
3878       to_file (file_name_view_log,
3879                "-%s",
3880                GNUNET_i2s_full (&peers_to_clean[i]));
3881       clean_peer (&peers_to_clean[i]);
3882       //peer_destroy_channel_send (sender);
3883     }
3884
3885     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3886     clients_notify_view_update();
3887   } else {
3888     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3889     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3890     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3891         !(0 >= CustomPeerMap_size (pull_map)))
3892       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3893     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3894         (0 >= CustomPeerMap_size (pull_map)))
3895       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3896     if (0 >= CustomPeerMap_size (push_map) &&
3897         !(0 >= CustomPeerMap_size (pull_map)))
3898       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3899     if (0 >= CustomPeerMap_size (push_map) &&
3900         (0 >= CustomPeerMap_size (pull_map)))
3901       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3902     if (0 >= CustomPeerMap_size (pull_map) &&
3903         CustomPeerMap_size (push_map) > alpha * View_size () &&
3904         0 >= CustomPeerMap_size (push_map))
3905       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3906   }
3907   // TODO independent of that also get some peers from CADET_get_peers()?
3908   GNUNET_STATISTICS_set (stats,
3909       "# peers in push map at end of round",
3910       CustomPeerMap_size (push_map),
3911       GNUNET_NO);
3912   GNUNET_STATISTICS_set (stats,
3913       "# peers in pull map at end of round",
3914       CustomPeerMap_size (pull_map),
3915       GNUNET_NO);
3916   GNUNET_STATISTICS_set (stats,
3917       "# peers in view at end of round",
3918       View_size (),
3919       GNUNET_NO);
3920
3921   LOG (GNUNET_ERROR_TYPE_DEBUG,
3922        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3923        CustomPeerMap_size (push_map),
3924        CustomPeerMap_size (pull_map),
3925        alpha,
3926        View_size (),
3927        alpha * View_size ());
3928
3929   /* Update samplers */
3930   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3931   {
3932     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3933     LOG (GNUNET_ERROR_TYPE_DEBUG,
3934          "Updating with peer %s from push list\n",
3935          GNUNET_i2s (update_peer));
3936     insert_in_sampler (NULL, update_peer);
3937     clean_peer (update_peer); /* This cleans only if it is not in the view */
3938     //peer_destroy_channel_send (sender);
3939   }
3940
3941   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3942   {
3943     LOG (GNUNET_ERROR_TYPE_DEBUG,
3944          "Updating with peer %s from pull list\n",
3945          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3946     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3947     /* This cleans only if it is not in the view */
3948     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3949     //peer_destroy_channel_send (sender);
3950   }
3951
3952
3953   /* Empty push/pull lists */
3954   CustomPeerMap_clear (push_map);
3955   CustomPeerMap_clear (pull_map);
3956
3957   struct GNUNET_TIME_Relative time_next_round;
3958
3959   time_next_round = compute_rand_delay (round_interval, 2);
3960
3961   /* Schedule next round */
3962   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3963                                                 &do_round, NULL);
3964   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3965 }
3966
3967
3968 /**
3969  * This is called from GNUNET_CADET_get_peers().
3970  *
3971  * It is called on every peer(ID) that cadet somehow has contact with.
3972  * We use those to initialise the sampler.
3973  */
3974 void
3975 init_peer_cb (void *cls,
3976               const struct GNUNET_PeerIdentity *peer,
3977               int tunnel, // "Do we have a tunnel towards this peer?"
3978               unsigned int n_paths, // "Number of known paths towards this peer"
3979               unsigned int best_path) // "How long is the best path?
3980                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3981 {
3982   if (NULL != peer)
3983   {
3984     LOG (GNUNET_ERROR_TYPE_DEBUG,
3985          "Got peer_id %s from cadet\n",
3986          GNUNET_i2s (peer));
3987     got_peer (peer);
3988   }
3989 }
3990
3991 /**
3992  * @brief Iterator function over stored, valid peers.
3993  *
3994  * We initialise the sampler with those.
3995  *
3996  * @param cls the closure
3997  * @param peer the peer id
3998  * @return #GNUNET_YES if we should continue to
3999  *         iterate,
4000  *         #GNUNET_NO if not.
4001  */
4002 static int
4003 valid_peers_iterator (void *cls,
4004                       const struct GNUNET_PeerIdentity *peer)
4005 {
4006   if (NULL != peer)
4007   {
4008     LOG (GNUNET_ERROR_TYPE_DEBUG,
4009          "Got stored, valid peer %s\n",
4010          GNUNET_i2s (peer));
4011     got_peer (peer);
4012   }
4013   return GNUNET_YES;
4014 }
4015
4016
4017 /**
4018  * Iterator over peers from peerinfo.
4019  *
4020  * @param cls closure
4021  * @param peer id of the peer, NULL for last call
4022  * @param hello hello message for the peer (can be NULL)
4023  * @param error message
4024  */
4025 void
4026 process_peerinfo_peers (void *cls,
4027                         const struct GNUNET_PeerIdentity *peer,
4028                         const struct GNUNET_HELLO_Message *hello,
4029                         const char *err_msg)
4030 {
4031   if (NULL != peer)
4032   {
4033     LOG (GNUNET_ERROR_TYPE_DEBUG,
4034          "Got peer_id %s from peerinfo\n",
4035          GNUNET_i2s (peer));
4036     got_peer (peer);
4037   }
4038 }
4039
4040
4041 /**
4042  * Task run during shutdown.
4043  *
4044  * @param cls unused
4045  */
4046 static void
4047 shutdown_task (void *cls)
4048 {
4049   struct ClientContext *client_ctx;
4050   struct ReplyCls *reply_cls;
4051
4052   LOG (GNUNET_ERROR_TYPE_DEBUG,
4053        "RPS is going down\n");
4054
4055   /* Clean all clients */
4056   for (client_ctx = cli_ctx_head;
4057        NULL != cli_ctx_head;
4058        client_ctx = cli_ctx_head)
4059   {
4060     /* Clean pending requests to the sampler */
4061     for (reply_cls = client_ctx->rep_cls_head;
4062          NULL != client_ctx->rep_cls_head;
4063          reply_cls = client_ctx->rep_cls_head)
4064     {
4065       RPS_sampler_request_cancel (reply_cls->req_handle);
4066       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4067                                    client_ctx->rep_cls_tail,
4068                                    reply_cls);
4069       GNUNET_free (reply_cls);
4070     }
4071     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4072     GNUNET_free (client_ctx);
4073   }
4074   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4075   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4076
4077   if (NULL != do_round_task)
4078   {
4079     GNUNET_SCHEDULER_cancel (do_round_task);
4080     do_round_task = NULL;
4081   }
4082
4083   Peers_terminate ();
4084
4085   GNUNET_NSE_disconnect (nse);
4086   RPS_sampler_destroy (prot_sampler);
4087   RPS_sampler_destroy (client_sampler);
4088   GNUNET_CADET_close_port (cadet_port);
4089   GNUNET_CADET_disconnect (cadet_handle);
4090   View_destroy ();
4091   CustomPeerMap_destroy (push_map);
4092   CustomPeerMap_destroy (pull_map);
4093   if (NULL != stats)
4094   {
4095     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4096     stats = NULL;
4097   }
4098   #ifdef ENABLE_MALICIOUS
4099   struct AttackedPeer *tmp_att_peer;
4100   GNUNET_free (file_name_view_log);
4101   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4102   if (NULL != mal_peer_set)
4103     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4104   if (NULL != att_peer_set)
4105     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4106   while (NULL != att_peers_head)
4107   {
4108     tmp_att_peer = att_peers_head;
4109     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4110   }
4111   #endif /* ENABLE_MALICIOUS */
4112 }
4113
4114
4115 /**
4116  * Handle client connecting to the service.
4117  *
4118  * @param cls NULL
4119  * @param client the new client
4120  * @param mq the message queue of @a client
4121  * @return @a client
4122  */
4123 static void *
4124 client_connect_cb (void *cls,
4125                    struct GNUNET_SERVICE_Client *client,
4126                    struct GNUNET_MQ_Handle *mq)
4127 {
4128   struct ClientContext *cli_ctx;
4129
4130   LOG (GNUNET_ERROR_TYPE_DEBUG,
4131        "Client connected\n");
4132   if (NULL == client)
4133     return client; /* Server was destroyed before a client connected. Shutting down */
4134   cli_ctx = GNUNET_new (struct ClientContext);
4135   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4136   cli_ctx->view_updates_left = -1;
4137   cli_ctx->client = client;
4138   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4139                                cli_ctx_tail,
4140                                cli_ctx);
4141   return cli_ctx;
4142 }
4143
4144 /**
4145  * Callback called when a client disconnected from the service
4146  *
4147  * @param cls closure for the service
4148  * @param c the client that disconnected
4149  * @param internal_cls should be equal to @a c
4150  */
4151 static void
4152 client_disconnect_cb (void *cls,
4153                       struct GNUNET_SERVICE_Client *client,
4154                       void *internal_cls)
4155 {
4156   struct ClientContext *cli_ctx = internal_cls;
4157
4158   GNUNET_assert (client == cli_ctx->client);
4159   if (NULL == client)
4160   {/* shutdown task - destroy all clients */
4161     while (NULL != cli_ctx_head)
4162       destroy_cli_ctx (cli_ctx_head);
4163   }
4164   else
4165   { /* destroy this client */
4166     LOG (GNUNET_ERROR_TYPE_DEBUG,
4167         "Client disconnected. Destroy its context.\n");
4168     destroy_cli_ctx (cli_ctx);
4169   }
4170 }
4171
4172
4173 /**
4174  * Handle random peer sampling clients.
4175  *
4176  * @param cls closure
4177  * @param c configuration to use
4178  * @param service the initialized service
4179  */
4180 static void
4181 run (void *cls,
4182      const struct GNUNET_CONFIGURATION_Handle *c,
4183      struct GNUNET_SERVICE_Handle *service)
4184 {
4185   int size;
4186   int out_size;
4187   char* fn_valid_peers;
4188   struct GNUNET_HashCode port;
4189
4190   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4191   cfg = c;
4192
4193
4194   /* Get own ID */
4195   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4196   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4197               "STARTING SERVICE (rps) for peer [%s]\n",
4198               GNUNET_i2s (&own_identity));
4199   #ifdef ENABLE_MALICIOUS
4200   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4201               "Malicious execution compiled in.\n");
4202   #endif /* ENABLE_MALICIOUS */
4203
4204
4205
4206   /* Get time interval from the configuration */
4207   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4208                                                         "ROUNDINTERVAL",
4209                                                         &round_interval))
4210   {
4211     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4212                                "RPS", "ROUNDINTERVAL");
4213     GNUNET_SCHEDULER_shutdown ();
4214     return;
4215   }
4216
4217   /* Get initial size of sampler/view from the configuration */
4218   if (GNUNET_OK !=
4219       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4220         (long long unsigned int *) &sampler_size_est_min))
4221   {
4222     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4223                                "RPS", "MINSIZE");
4224     GNUNET_SCHEDULER_shutdown ();
4225     return;
4226   }
4227   sampler_size_est_need = sampler_size_est_min;
4228   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4229
4230   if (GNUNET_OK !=
4231       GNUNET_CONFIGURATION_get_value_filename (cfg,
4232                                                "rps",
4233                                                "FILENAME_VALID_PEERS",
4234                                                &fn_valid_peers))
4235   {
4236     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4237                                "rps", "FILENAME_VALID_PEERS");
4238   }
4239
4240
4241   View_create (4);
4242
4243   /* file_name_view_log */
4244   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
4245   {
4246     LOG (GNUNET_ERROR_TYPE_WARNING,
4247          "Failed to create directory /tmp/rps/\n");
4248   }
4249
4250   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
4251   file_name_view_log = GNUNET_malloc (size);
4252   out_size = GNUNET_snprintf (file_name_view_log,
4253                               size,
4254                               "/tmp/rps/view-%s",
4255                               GNUNET_i2s_full (&own_identity));
4256   if (size < out_size ||
4257       0 > out_size)
4258   {
4259     LOG (GNUNET_ERROR_TYPE_WARNING,
4260          "Failed to write string to buffer (size: %i, out_size: %i)\n",
4261          size,
4262          out_size);
4263   }
4264
4265
4266   /* connect to NSE */
4267   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4268
4269
4270   alpha = 0.45;
4271   beta  = 0.45;
4272
4273
4274   /* Initialise cadet */
4275   /* There exists a copy-paste-clone in get_channel() */
4276   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4277     GNUNET_MQ_hd_fixed_size (peer_check,
4278                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4279                              struct GNUNET_MessageHeader,
4280                              NULL),
4281     GNUNET_MQ_hd_fixed_size (peer_push,
4282                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4283                              struct GNUNET_MessageHeader,
4284                              NULL),
4285     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4286                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4287                              struct GNUNET_MessageHeader,
4288                              NULL),
4289     GNUNET_MQ_hd_var_size (peer_pull_reply,
4290                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4291                            struct GNUNET_RPS_P2P_PullReplyMessage,
4292                            NULL),
4293     GNUNET_MQ_handler_end ()
4294   };
4295
4296   cadet_handle = GNUNET_CADET_connect (cfg);
4297   GNUNET_assert (NULL != cadet_handle);
4298   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4299                       strlen (GNUNET_APPLICATION_PORT_RPS),
4300                       &port);
4301   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4302                                        &port,
4303                                        &Peers_handle_inbound_channel, /* Connect handler */
4304                                        NULL, /* cls */
4305                                        NULL, /* WindowSize handler */
4306                                        cleanup_destroyed_channel, /* Disconnect handler */
4307                                        cadet_handlers);
4308
4309
4310   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4311   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4312   GNUNET_free (fn_valid_peers);
4313
4314   /* Initialise sampler */
4315   struct GNUNET_TIME_Relative half_round_interval;
4316   struct GNUNET_TIME_Relative  max_round_interval;
4317
4318   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4319   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4320
4321   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4322   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4323
4324   /* Initialise push and pull maps */
4325   push_map = CustomPeerMap_create (4);
4326   pull_map = CustomPeerMap_create (4);
4327
4328
4329   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4330   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4331   // TODO send push/pull to each of those peers?
4332   // TODO read stored valid peers from last run
4333   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4334   Peers_get_valid_peers (valid_peers_iterator, NULL);
4335
4336   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4337                                                    GNUNET_NO,
4338                                                    process_peerinfo_peers,
4339                                                    NULL);
4340
4341   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4342
4343   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4344   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4345
4346   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4347   stats = GNUNET_STATISTICS_create ("rps", cfg);
4348
4349 }
4350
4351
4352 /**
4353  * Define "main" method using service macro.
4354  */
4355 GNUNET_SERVICE_MAIN
4356 ("rps",
4357  GNUNET_SERVICE_OPTION_NONE,
4358  &run,
4359  &client_connect_cb,
4360  &client_disconnect_cb,
4361  NULL,
4362  GNUNET_MQ_hd_fixed_size (client_request,
4363    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4364    struct GNUNET_RPS_CS_RequestMessage,
4365    NULL),
4366  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4367    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4368    struct GNUNET_RPS_CS_RequestCancelMessage,
4369    NULL),
4370  GNUNET_MQ_hd_var_size (client_seed,
4371    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4372    struct GNUNET_RPS_CS_SeedMessage,
4373    NULL),
4374 #ifdef ENABLE_MALICIOUS
4375  GNUNET_MQ_hd_var_size (client_act_malicious,
4376    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4377    struct GNUNET_RPS_CS_ActMaliciousMessage,
4378    NULL),
4379 #endif /* ENABLE_MALICIOUS */
4380  GNUNET_MQ_hd_fixed_size (client_view_request,
4381    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4382    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4383    NULL),
4384  GNUNET_MQ_handler_end());
4385
4386 /* end of gnunet-service-rps.c */