rps service: reorder functions in a meaningful way
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "rps.h"
34 #include "rps-test_util.h"
35 #include "gnunet-service-rps_sampler.h"
36 #include "gnunet-service-rps_custommap.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Handle to the statistics service.
65  */
66 static struct GNUNET_STATISTICS_Handle *stats;
67
68 /**
69  * Our own identity.
70  */
71 static struct GNUNET_PeerIdentity own_identity;
72
73
74
75 /***********************************************************************
76  * Old gnunet-service-rps_peers.c
77 ***********************************************************************/
78
79 /**
80  * Set a peer flag of given peer context.
81  */
82 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
83
84 /**
85  * Get peer flag of given peer context.
86  */
87 #define check_peer_flag_set(peer_ctx, mask)\
88   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
89
90 /**
91  * Unset flag of given peer context.
92  */
93 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
94
95 /**
96  * Set a channel flag of given channel context.
97  */
98 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * Struct used to keep track of other peer's status
164  *
165  * This is stored in a multipeermap.
166  * It contains information such as cadet channels, a message queue for sending,
167  * status about the channels, the pending operations on this peer and some flags
168  * about the status of the peer itself. (live, valid, ...)
169  */
170 struct PeerContext
171 {
172   /**
173    * Message queue open to client
174    */
175   struct GNUNET_MQ_Handle *mq;
176
177   /**
178    * Channel open to client.
179    */
180   struct GNUNET_CADET_Channel *send_channel;
181
182   /**
183    * Flags to the sending channel
184    */
185   uint32_t *send_channel_flags;
186
187   /**
188    * Channel open from client.
189    */
190   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
191
192   /**
193    * Flags to the receiving channel
194    */
195   uint32_t *recv_channel_flags;
196
197   /**
198    * Array of pending operations on this peer.
199    */
200   struct PeerPendingOp *pending_ops;
201
202   /**
203    * Handle to the callback given to cadet_ntfy_tmt_rdy()
204    *
205    * To be canceled on shutdown.
206    */
207   struct PendingMessage *liveliness_check_pending;
208
209   /**
210    * Number of pending operations.
211    */
212   unsigned int num_pending_ops;
213
214   /**
215    * Identity of the peer
216    */
217   struct GNUNET_PeerIdentity peer_id;
218
219   /**
220    * Flags indicating status of peer
221    */
222   uint32_t peer_flags;
223
224   /**
225    * Last time we received something from that peer.
226    */
227   struct GNUNET_TIME_Absolute last_message_recv;
228
229   /**
230    * Last time we received a keepalive message.
231    */
232   struct GNUNET_TIME_Absolute last_keepalive;
233
234   /**
235    * DLL with all messages that are yet to be sent
236    */
237   struct PendingMessage *pending_messages_head;
238   struct PendingMessage *pending_messages_tail;
239
240   /**
241    * This is pobably followed by 'statistical' data (when we first saw
242    * him, how did we get his ID, how many pushes (in a timeinterval),
243    * ...)
244    */
245 };
246
247 /**
248  * @brief Closure to #valid_peer_iterator
249  */
250 struct PeersIteratorCls
251 {
252   /**
253    * Iterator function
254    */
255   PeersIterator iterator;
256
257   /**
258    * Closure to iterator
259    */
260   void *cls;
261 };
262
263 /**
264  * @brief Hashmap of valid peers.
265  */
266 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
267
268 /**
269  * @brief Maximum number of valid peers to keep.
270  * TODO read from config
271  */
272 static uint32_t num_valid_peers_max = UINT32_MAX;
273
274 /**
275  * @brief Filename of the file that stores the valid peers persistently.
276  */
277 static char *filename_valid_peers;
278
279 /**
280  * Set of all peers to keep track of them.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
283
284 /**
285  * Cadet handle.
286  */
287 static struct GNUNET_CADET_Handle *cadet_handle;
288
289
290
291 /**
292  * @brief Get the #PeerContext associated with a peer
293  *
294  * @param peer the peer id
295  *
296  * @return the #PeerContext
297  */
298 static struct PeerContext *
299 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
300 {
301   struct PeerContext *ctx;
302   int ret;
303
304   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
305   GNUNET_assert (GNUNET_YES == ret);
306   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
307   GNUNET_assert (NULL != ctx);
308   return ctx;
309 }
310
311 int
312 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
313
314 /**
315  * @brief Create a new #PeerContext and insert it into the peer map
316  *
317  * @param peer the peer to create the #PeerContext for
318  *
319  * @return the #PeerContext
320  */
321 static struct PeerContext *
322 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
323 {
324   struct PeerContext *ctx;
325   int ret;
326
327   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
328
329   ctx = GNUNET_new (struct PeerContext);
330   ctx->peer_id = *peer;
331   ctx->send_channel_flags = GNUNET_new (uint32_t);
332   ctx->recv_channel_flags = GNUNET_new (uint32_t);
333   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
334       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
335   GNUNET_assert (GNUNET_OK == ret);
336   return ctx;
337 }
338
339
340 /**
341  * @brief Create or get a #PeerContext
342  *
343  * @param peer the peer to get the associated context to
344  *
345  * @return the context
346  */
347 static struct PeerContext *
348 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
349 {
350   if (GNUNET_NO == Peers_check_peer_known (peer))
351   {
352     return create_peer_ctx (peer);
353   }
354   return get_peer_ctx (peer);
355 }
356
357 void
358 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
359
360 void
361 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
362
363 /**
364  * @brief Check whether we have a connection to this @a peer
365  *
366  * Also sets the #Peers_ONLINE flag accordingly
367  *
368  * @param peer the peer in question
369  *
370  * @return #GNUNET_YES if we are connected
371  *         #GNUNET_NO  otherwise
372  */
373 int
374 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
375 {
376   const struct PeerContext *peer_ctx;
377
378   /* If we don't know about this peer we don't know whether it's online */
379   if (GNUNET_NO == Peers_check_peer_known (peer))
380   {
381     return GNUNET_NO;
382   }
383   /* Get the context */
384   peer_ctx = get_peer_ctx (peer);
385   /* If we have no channel to this peer we don't know whether it's online */
386   if ( (NULL == peer_ctx->send_channel) &&
387        (NULL == peer_ctx->recv_channel) )
388   {
389     Peers_unset_peer_flag (peer, Peers_ONLINE);
390     return GNUNET_NO;
391   }
392   /* Otherwise (if we have a channel, we know that it's online */
393   Peers_set_peer_flag (peer, Peers_ONLINE);
394   return GNUNET_YES;
395 }
396
397
398 /**
399  * @brief The closure to #get_rand_peer_iterator.
400  */
401 struct GetRandPeerIteratorCls
402 {
403   /**
404    * @brief The index of the peer to return.
405    * Will be decreased until 0.
406    * Then current peer is returned.
407    */
408   uint32_t index;
409
410   /**
411    * @brief Pointer to peer to return.
412    */
413   const struct GNUNET_PeerIdentity *peer;
414 };
415
416
417 /**
418  * @brief Iterator function for #get_random_peer_from_peermap.
419  *
420  * Implements #GNUNET_CONTAINER_PeerMapIterator.
421  * Decreases the index until the index is null.
422  * Then returns the current peer.
423  *
424  * @param cls the #GetRandPeerIteratorCls containing index and peer
425  * @param peer current peer
426  * @param value unused
427  *
428  * @return  #GNUNET_YES if we should continue to
429  *          iterate,
430  *          #GNUNET_NO if not.
431  */
432 static int
433 get_rand_peer_iterator (void *cls,
434                         const struct GNUNET_PeerIdentity *peer,
435                         void *value)
436 {
437   struct GetRandPeerIteratorCls *iterator_cls = cls;
438   if (0 >= iterator_cls->index)
439   {
440     iterator_cls->peer = peer;
441     return GNUNET_NO;
442   }
443   iterator_cls->index--;
444   return GNUNET_YES;
445 }
446
447
448 /**
449  * @brief Get a random peer from @a peer_map
450  *
451  * @param peer_map the peer_map to get the peer from
452  *
453  * @return a random peer
454  */
455 static const struct GNUNET_PeerIdentity *
456 get_random_peer_from_peermap (const struct
457                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
458 {
459   struct GetRandPeerIteratorCls *iterator_cls;
460   const struct GNUNET_PeerIdentity *ret;
461
462   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
463   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
464       GNUNET_CONTAINER_multipeermap_size (peer_map));
465   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
466                                                 get_rand_peer_iterator,
467                                                 iterator_cls);
468   ret = iterator_cls->peer;
469   GNUNET_free (iterator_cls);
470   return ret;
471 }
472
473
474 /**
475  * @brief Add a given @a peer to valid peers.
476  *
477  * If valid peers are already #num_valid_peers_max, delete a peer previously.
478  *
479  * @param peer the peer that is added to the valid peers.
480  *
481  * @return #GNUNET_YES if no other peer had to be removed
482  *         #GNUNET_NO  otherwise
483  */
484 static int
485 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
486 {
487   const struct GNUNET_PeerIdentity *rand_peer;
488   int ret;
489
490   ret = GNUNET_YES;
491   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
492   {
493     rand_peer = get_random_peer_from_peermap (valid_peers);
494     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
495     ret = GNUNET_NO;
496   }
497   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
498       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
499   return ret;
500 }
501
502
503 /**
504  * @brief Set the peer flag to living and
505  *        call the pending operations on this peer.
506  *
507  * Also adds peer to #valid_peers.
508  *
509  * @param peer_ctx the #PeerContext of the peer to set live
510  */
511 static void
512 set_peer_live (struct PeerContext *peer_ctx)
513 {
514   struct GNUNET_PeerIdentity *peer;
515   unsigned int i;
516
517   peer = &peer_ctx->peer_id;
518   LOG (GNUNET_ERROR_TYPE_DEBUG,
519       "Peer %s is live and valid, calling %i pending operations on it\n",
520       GNUNET_i2s (peer),
521       peer_ctx->num_pending_ops);
522
523   if (NULL != peer_ctx->liveliness_check_pending)
524   {
525     LOG (GNUNET_ERROR_TYPE_DEBUG,
526          "Removing pending liveliness check for peer %s\n",
527          GNUNET_i2s (&peer_ctx->peer_id));
528     // TODO wait until cadet sets mq->cancel_impl
529     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
530     GNUNET_free (peer_ctx->liveliness_check_pending);
531     peer_ctx->liveliness_check_pending = NULL;
532   }
533
534   (void) add_valid_peer (peer);
535   set_peer_flag (peer_ctx, Peers_ONLINE);
536
537   /* Call pending operations */
538   for (i = 0; i < peer_ctx->num_pending_ops; i++)
539   {
540     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
541   }
542   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
543 }
544
545 static void
546 cleanup_destroyed_channel (void *cls,
547                            const struct GNUNET_CADET_Channel *channel);
548
549 /* Declaration of handlers */
550 static void
551 handle_peer_check (void *cls,
552                    const struct GNUNET_MessageHeader *msg);
553
554 static void
555 handle_peer_push (void *cls,
556                   const struct GNUNET_MessageHeader *msg);
557
558 static void
559 handle_peer_pull_request (void *cls,
560                           const struct GNUNET_MessageHeader *msg);
561
562 static int
563 check_peer_pull_reply (void *cls,
564                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
565
566 static void
567 handle_peer_pull_reply (void *cls,
568                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
569
570 /* End declaration of handlers */
571
572
573 /**
574  * @brief Get the channel of a peer. If not existing, create.
575  *
576  * @param peer the peer id
577  * @return the #GNUNET_CADET_Channel used to send data to @a peer
578  */
579 struct GNUNET_CADET_Channel *
580 get_channel (const struct GNUNET_PeerIdentity *peer)
581 {
582   struct PeerContext *peer_ctx;
583   struct GNUNET_HashCode port;
584   struct GNUNET_PeerIdentity *ctx_peer;
585   /* There exists a copy-paste-clone in run() */
586   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
587     GNUNET_MQ_hd_fixed_size (peer_check,
588                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
589                              struct GNUNET_MessageHeader,
590                              NULL),
591     GNUNET_MQ_hd_fixed_size (peer_push,
592                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
593                              struct GNUNET_MessageHeader,
594                              NULL),
595     GNUNET_MQ_hd_fixed_size (peer_pull_request,
596                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
597                              struct GNUNET_MessageHeader,
598                              NULL),
599     GNUNET_MQ_hd_var_size (peer_pull_reply,
600                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
601                            struct GNUNET_RPS_P2P_PullReplyMessage,
602                            NULL),
603     GNUNET_MQ_handler_end ()
604   };
605
606
607   peer_ctx = get_peer_ctx (peer);
608   if (NULL == peer_ctx->send_channel)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Trying to establish channel to peer %s\n",
612          GNUNET_i2s (peer));
613     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
614                         strlen (GNUNET_APPLICATION_PORT_RPS),
615                         &port);
616     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
617     *ctx_peer = *peer;
618     peer_ctx->send_channel =
619       GNUNET_CADET_channel_create (cadet_handle,
620                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
621                                    peer,
622                                    &port,
623                                    GNUNET_CADET_OPTION_RELIABLE,
624                                    NULL, /* WindowSize handler */
625                                    cleanup_destroyed_channel, /* Disconnect handler */
626                                    cadet_handlers);
627   }
628   GNUNET_assert (NULL != peer_ctx->send_channel);
629   return peer_ctx->send_channel;
630 }
631
632
633 /**
634  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
635  *
636  * If we already have a message queue open to this client,
637  * simply return it, otherways create one.
638  *
639  * @param peer the peer to get the mq to
640  * @return the #GNUNET_MQ_Handle
641  */
642 static struct GNUNET_MQ_Handle *
643 get_mq (const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *peer_ctx;
646
647   peer_ctx = get_peer_ctx (peer);
648
649   if (NULL == peer_ctx->mq)
650   {
651     (void) get_channel (peer);
652     peer_ctx->mq = GNUNET_CADET_get_mq (peer_ctx->send_channel);
653   }
654   return peer_ctx->mq;
655 }
656
657
658 /**
659  * @brief This is called in response to the first message we sent as a
660  * liveliness check.
661  *
662  * @param cls #PeerContext of peer with pending liveliness check
663  */
664 static void
665 mq_liveliness_check_successful (void *cls)
666 {
667   struct PeerContext *peer_ctx = cls;
668
669   if (NULL != peer_ctx->liveliness_check_pending)
670   {
671     LOG (GNUNET_ERROR_TYPE_DEBUG,
672         "Liveliness check for peer %s was successfull\n",
673         GNUNET_i2s (&peer_ctx->peer_id));
674     GNUNET_free (peer_ctx->liveliness_check_pending);
675     peer_ctx->liveliness_check_pending = NULL;
676     set_peer_live (peer_ctx);
677   }
678 }
679
680 /**
681  * Issue a check whether peer is live
682  *
683  * @param peer_ctx the context of the peer
684  */
685 static void
686 check_peer_live (struct PeerContext *peer_ctx)
687 {
688   LOG (GNUNET_ERROR_TYPE_DEBUG,
689        "Get informed about peer %s getting live\n",
690        GNUNET_i2s (&peer_ctx->peer_id));
691
692   struct GNUNET_MQ_Handle *mq;
693   struct GNUNET_MQ_Envelope *ev;
694
695   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
696   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
697   peer_ctx->liveliness_check_pending->ev = ev;
698   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
699   peer_ctx->liveliness_check_pending->type = "Check liveliness";
700   mq = get_mq (&peer_ctx->peer_id);
701   GNUNET_MQ_notify_sent (ev,
702                          mq_liveliness_check_successful,
703                          peer_ctx);
704   GNUNET_MQ_send (mq, ev);
705 }
706
707 /**
708  * @brief Add an envelope to a message passed to mq to list of pending messages
709  *
710  * @param peer peer the message was sent to
711  * @param ev envelope to the message
712  * @param type type of the message to be sent
713  * @return pointer to pending message
714  */
715 static struct PendingMessage *
716 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
717                         struct GNUNET_MQ_Envelope *ev,
718                         const char *type)
719 {
720   struct PendingMessage *pending_msg;
721   struct PeerContext *peer_ctx;
722
723   peer_ctx = get_peer_ctx (peer);
724   pending_msg = GNUNET_new (struct PendingMessage);
725   pending_msg->ev = ev;
726   pending_msg->peer_ctx = peer_ctx;
727   pending_msg->type = type;
728   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
729                                peer_ctx->pending_messages_tail,
730                                pending_msg);
731   return pending_msg;
732 }
733
734
735 /**
736  * @brief Remove a pending message from the respective DLL
737  *
738  * @param pending_msg the pending message to remove
739  * @param cancel cancel the pending message, too
740  */
741 static void
742 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
743 {
744   struct PeerContext *peer_ctx;
745
746   peer_ctx = pending_msg->peer_ctx;
747   GNUNET_assert (NULL != peer_ctx);
748   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
749                                peer_ctx->pending_messages_tail,
750                                pending_msg);
751   // TODO wait for the cadet implementation of message cancellation
752   //if (GNUNET_YES == cancel)
753   //{
754   //  GNUNET_MQ_send_cancel (pending_msg->ev);
755   //}
756   GNUNET_free (pending_msg);
757 }
758
759
760 /**
761  * @brief Check whether function of type #PeerOp was already scheduled
762  *
763  * The array with pending operations will probably never grow really big, so
764  * iterating over it should be ok.
765  *
766  * @param peer the peer to check
767  * @param peer_op the operation (#PeerOp) on the peer
768  *
769  * @return #GNUNET_YES if this operation is scheduled on that peer
770  *         #GNUNET_NO  otherwise
771  */
772 static int
773 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
774                            const PeerOp peer_op)
775 {
776   const struct PeerContext *peer_ctx;
777   unsigned int i;
778
779   peer_ctx = get_peer_ctx (peer);
780   for (i = 0; i < peer_ctx->num_pending_ops; i++)
781     if (peer_op == peer_ctx->pending_ops[i].op)
782       return GNUNET_YES;
783   return GNUNET_NO;
784 }
785
786 int
787 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
788
789 /**
790  * Iterator over hash map entries. Deletes all contexts of peers.
791  *
792  * @param cls closure
793  * @param key current public key
794  * @param value value in the hash map
795  * @return #GNUNET_YES if we should continue to iterate,
796  *         #GNUNET_NO if not.
797  */
798 static int
799 peermap_clear_iterator (void *cls,
800                         const struct GNUNET_PeerIdentity *key,
801                         void *value)
802 {
803   Peers_remove_peer (key);
804   return GNUNET_YES;
805 }
806
807
808 /**
809  * @brief This is called once a message is sent.
810  *
811  * Removes the pending message
812  *
813  * @param cls type of the message that was sent
814  */
815 static void
816 mq_notify_sent_cb (void *cls)
817 {
818   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
819   LOG (GNUNET_ERROR_TYPE_DEBUG,
820       "%s was sent.\n",
821       pending_msg->type);
822   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
823     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
824   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
825     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
826   if (0 == strncmp ("PUSH", pending_msg->type, 4))
827     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
828   /* Do not cancle message */
829   remove_pending_message (pending_msg, GNUNET_NO);
830 }
831
832
833 /**
834  * @brief Iterator function for #store_valid_peers.
835  *
836  * Implements #GNUNET_CONTAINER_PeerMapIterator.
837  * Writes single peer to disk.
838  *
839  * @param cls the file handle to write to.
840  * @param peer current peer
841  * @param value unused
842  *
843  * @return  #GNUNET_YES if we should continue to
844  *          iterate,
845  *          #GNUNET_NO if not.
846  */
847 static int
848 store_peer_presistently_iterator (void *cls,
849                                   const struct GNUNET_PeerIdentity *peer,
850                                   void *value)
851 {
852   const struct GNUNET_DISK_FileHandle *fh = cls;
853   char peer_string[128];
854   int size;
855   ssize_t ret;
856
857   if (NULL == peer)
858   {
859     return GNUNET_YES;
860   }
861   size = GNUNET_snprintf (peer_string,
862                           sizeof (peer_string),
863                           "%s\n",
864                           GNUNET_i2s_full (peer));
865   GNUNET_assert (53 == size);
866   ret = GNUNET_DISK_file_write (fh,
867                                 peer_string,
868                                 size);
869   GNUNET_assert (size == ret);
870   return GNUNET_YES;
871 }
872
873
874 /**
875  * @brief Store the peers currently in #valid_peers to disk.
876  */
877 static void
878 store_valid_peers ()
879 {
880   struct GNUNET_DISK_FileHandle *fh;
881   uint32_t number_written_peers;
882   int ret;
883
884   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
885   {
886     return;
887   }
888
889   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
890   if (GNUNET_SYSERR == ret)
891   {
892     LOG (GNUNET_ERROR_TYPE_WARNING,
893         "Not able to create directory for file `%s'\n",
894         filename_valid_peers);
895     GNUNET_break (0);
896   }
897   else if (GNUNET_NO == ret)
898   {
899     LOG (GNUNET_ERROR_TYPE_WARNING,
900         "Directory for file `%s' exists but is not writable for us\n",
901         filename_valid_peers);
902     GNUNET_break (0);
903   }
904   fh = GNUNET_DISK_file_open (filename_valid_peers,
905                               GNUNET_DISK_OPEN_WRITE |
906                                   GNUNET_DISK_OPEN_CREATE,
907                               GNUNET_DISK_PERM_USER_READ |
908                                   GNUNET_DISK_PERM_USER_WRITE);
909   if (NULL == fh)
910   {
911     LOG (GNUNET_ERROR_TYPE_WARNING,
912         "Not able to write valid peers to file `%s'\n",
913         filename_valid_peers);
914     return;
915   }
916   LOG (GNUNET_ERROR_TYPE_DEBUG,
917       "Writing %u valid peers to disk\n",
918       GNUNET_CONTAINER_multipeermap_size (valid_peers));
919   number_written_peers =
920     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
921                                            store_peer_presistently_iterator,
922                                            fh);
923   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
924   GNUNET_assert (number_written_peers ==
925       GNUNET_CONTAINER_multipeermap_size (valid_peers));
926 }
927
928
929 /**
930  * @brief Convert string representation of peer id to peer id.
931  *
932  * Counterpart to #GNUNET_i2s_full.
933  *
934  * @param string_repr The string representation of the peer id
935  *
936  * @return The peer id
937  */
938 static const struct GNUNET_PeerIdentity *
939 s2i_full (const char *string_repr)
940 {
941   struct GNUNET_PeerIdentity *peer;
942   size_t len;
943   int ret;
944
945   peer = GNUNET_new (struct GNUNET_PeerIdentity);
946   len = strlen (string_repr);
947   if (52 > len)
948   {
949     LOG (GNUNET_ERROR_TYPE_WARNING,
950         "Not able to convert string representation of PeerID to PeerID\n"
951         "Sting representation: %s (len %lu) - too short\n",
952         string_repr,
953         len);
954     GNUNET_break (0);
955   }
956   else if (52 < len)
957   {
958     len = 52;
959   }
960   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
961                                                     len,
962                                                     &peer->public_key);
963   if (GNUNET_OK != ret)
964   {
965     LOG (GNUNET_ERROR_TYPE_WARNING,
966         "Not able to convert string representation of PeerID to PeerID\n"
967         "Sting representation: %s\n",
968         string_repr);
969     GNUNET_break (0);
970   }
971   return peer;
972 }
973
974
975 /**
976  * @brief Restore the peers on disk to #valid_peers.
977  */
978 static void
979 restore_valid_peers ()
980 {
981   off_t file_size;
982   uint32_t num_peers;
983   struct GNUNET_DISK_FileHandle *fh;
984   char *buf;
985   ssize_t size_read;
986   char *iter_buf;
987   char *str_repr;
988   const struct GNUNET_PeerIdentity *peer;
989
990   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
991   {
992     return;
993   }
994
995   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
996   {
997     return;
998   }
999   fh = GNUNET_DISK_file_open (filename_valid_peers,
1000                               GNUNET_DISK_OPEN_READ,
1001                               GNUNET_DISK_PERM_NONE);
1002   GNUNET_assert (NULL != fh);
1003   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1004   num_peers = file_size / 53;
1005   buf = GNUNET_malloc (file_size);
1006   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1007   GNUNET_assert (size_read == file_size);
1008   LOG (GNUNET_ERROR_TYPE_DEBUG,
1009       "Restoring %" PRIu32 " peers from file `%s'\n",
1010       num_peers,
1011       filename_valid_peers);
1012   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1013   {
1014     str_repr = GNUNET_strndup (iter_buf, 53);
1015     peer = s2i_full (str_repr);
1016     GNUNET_free (str_repr);
1017     add_valid_peer (peer);
1018     LOG (GNUNET_ERROR_TYPE_DEBUG,
1019         "Restored valid peer %s from disk\n",
1020         GNUNET_i2s_full (peer));
1021   }
1022   iter_buf = NULL;
1023   GNUNET_free (buf);
1024   LOG (GNUNET_ERROR_TYPE_DEBUG,
1025       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1026       num_peers,
1027       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1028   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1029   {
1030     LOG (GNUNET_ERROR_TYPE_WARNING,
1031         "Number of restored peers does not match file size. Have probably duplicates.\n");
1032   }
1033   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1034   LOG (GNUNET_ERROR_TYPE_DEBUG,
1035       "Restored %u valid peers from disk\n",
1036       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1037 }
1038
1039
1040 /**
1041  * @brief Initialise storage of peers
1042  *
1043  * @param fn_valid_peers filename of the file used to store valid peer ids
1044  * @param cadet_h cadet handle
1045  * @param disconnect_handler Disconnect handler
1046  * @param own_id own peer identity
1047  */
1048 void
1049 Peers_initialise (char* fn_valid_peers,
1050                   struct GNUNET_CADET_Handle *cadet_h,
1051                   GNUNET_CADET_DisconnectEventHandler disconnect_handler,
1052                   const struct GNUNET_PeerIdentity *own_id)
1053 {
1054   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1055   cadet_handle = cadet_h;
1056   own_identity = *own_id;
1057   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1058   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1059   restore_valid_peers ();
1060 }
1061
1062
1063 /**
1064  * @brief Delete storage of peers that was created with #Peers_initialise ()
1065  */
1066 void
1067 Peers_terminate ()
1068 {
1069   if (GNUNET_SYSERR ==
1070       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1071                                              peermap_clear_iterator,
1072                                              NULL))
1073   {
1074     LOG (GNUNET_ERROR_TYPE_WARNING,
1075         "Iteration destroying peers was aborted.\n");
1076   }
1077   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1078   peer_map = NULL;
1079   store_valid_peers ();
1080   GNUNET_free (filename_valid_peers);
1081   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1082 }
1083
1084
1085 /**
1086  * Iterator over #valid_peers hash map entries.
1087  *
1088  * @param cls closure - unused
1089  * @param peer current peer id
1090  * @param value value in the hash map - unused
1091  * @return #GNUNET_YES if we should continue to
1092  *         iterate,
1093  *         #GNUNET_NO if not.
1094  */
1095 static int
1096 valid_peer_iterator (void *cls,
1097                      const struct GNUNET_PeerIdentity *peer,
1098                      void *value)
1099 {
1100   struct PeersIteratorCls *it_cls = cls;
1101
1102   return it_cls->iterator (it_cls->cls,
1103                            peer);
1104 }
1105
1106
1107 /**
1108  * @brief Get all currently known, valid peer ids.
1109  *
1110  * @param it function to call on each peer id
1111  * @param it_cls extra argument to @a it
1112  * @return the number of key value pairs processed,
1113  *         #GNUNET_SYSERR if it aborted iteration
1114  */
1115 int
1116 Peers_get_valid_peers (PeersIterator iterator,
1117                        void *it_cls)
1118 {
1119   struct PeersIteratorCls *cls;
1120   int ret;
1121
1122   cls = GNUNET_new (struct PeersIteratorCls);
1123   cls->iterator = iterator;
1124   cls->cls = it_cls;
1125   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1126                                                valid_peer_iterator,
1127                                                cls);
1128   GNUNET_free (cls);
1129   return ret;
1130 }
1131
1132
1133 /**
1134  * @brief Add peer to known peers.
1135  *
1136  * This function is called on new peer_ids from 'external' sources
1137  * (client seed, cadet get_peers(), ...)
1138  *
1139  * @param peer the new #GNUNET_PeerIdentity
1140  *
1141  * @return #GNUNET_YES if peer was inserted
1142  *         #GNUNET_NO  otherwise (if peer was already known or
1143  *                     peer was #own_identity)
1144  */
1145 int
1146 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1147 {
1148   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1149        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1150   {
1151     return GNUNET_NO; /* We already know this peer - nothing to do */
1152   }
1153   (void) create_peer_ctx (peer);
1154   return GNUNET_YES;
1155 }
1156
1157 int
1158 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1159
1160 /**
1161  * @brief Try connecting to a peer to see whether it is online
1162  *
1163  * If not known yet, insert into known peers
1164  *
1165  * @param peer the peer whose liveliness is to be checked
1166  * @return #GNUNET_YES if peer had to be inserted
1167  *         #GNUNET_NO  otherwise (if peer was already known or
1168  *                     peer was #own_identity)
1169  */
1170 int
1171 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1172 {
1173   struct PeerContext *peer_ctx;
1174   int ret;
1175
1176   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1177   {
1178     return GNUNET_NO;
1179   }
1180   ret = Peers_insert_peer (peer);
1181   peer_ctx = get_peer_ctx (peer);
1182   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1183   {
1184     check_peer_live (peer_ctx);
1185   }
1186   return ret;
1187 }
1188
1189
1190 /**
1191  * @brief Check if peer is removable.
1192  *
1193  * Check if
1194  *  - a recv channel exists
1195  *  - there are pending messages
1196  *  - there is no pending pull reply
1197  *
1198  * @param peer the peer in question
1199  * @return #GNUNET_YES    if peer is removable
1200  *         #GNUNET_NO     if peer is NOT removable
1201  *         #GNUNET_SYSERR if peer is not known
1202  */
1203 int
1204 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1205 {
1206   struct PeerContext *peer_ctx;
1207
1208   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1209   {
1210     return GNUNET_SYSERR;
1211   }
1212
1213   peer_ctx = get_peer_ctx (peer);
1214   if ( (NULL != peer_ctx->recv_channel) ||
1215        (NULL != peer_ctx->pending_messages_head) ||
1216        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1217   {
1218     return GNUNET_NO;
1219   }
1220   return GNUNET_YES;
1221 }
1222
1223 uint32_t *
1224 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1225                         enum Peers_ChannelRole role);
1226
1227 int
1228 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1229
1230 /**
1231  * @brief Remove peer
1232  *
1233  * @param peer the peer to clean
1234  * @return #GNUNET_YES if peer was removed
1235  *         #GNUNET_NO  otherwise
1236  */
1237 int
1238 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1239 {
1240   struct PeerContext *peer_ctx;
1241   uint32_t *channel_flag;
1242
1243   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1244   {
1245     return GNUNET_NO;
1246   }
1247
1248   peer_ctx = get_peer_ctx (peer);
1249   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1250   LOG (GNUNET_ERROR_TYPE_DEBUG,
1251        "Going to remove peer %s\n",
1252        GNUNET_i2s (&peer_ctx->peer_id));
1253   Peers_unset_peer_flag (peer, Peers_ONLINE);
1254
1255   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1256   while (NULL != peer_ctx->pending_messages_head)
1257   {
1258     LOG (GNUNET_ERROR_TYPE_DEBUG,
1259         "Removing unsent %s\n",
1260         peer_ctx->pending_messages_head->type);
1261     /* Cancle pending message, too */
1262     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1263   }
1264   /* If we are still waiting for notification whether this peer is live
1265    * cancel the according task */
1266   if (NULL != peer_ctx->liveliness_check_pending)
1267   {
1268     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1269          "Removing pending liveliness check for peer %s\n",
1270          GNUNET_i2s (&peer_ctx->peer_id));
1271     // TODO wait until cadet sets mq->cancel_impl
1272     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1273     GNUNET_free (peer_ctx->liveliness_check_pending);
1274     peer_ctx->liveliness_check_pending = NULL;
1275   }
1276   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1277   if (NULL != peer_ctx->send_channel &&
1278       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1279   {
1280     LOG (GNUNET_ERROR_TYPE_DEBUG,
1281         "Destroying send channel\n");
1282     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1283     peer_ctx->send_channel = NULL;
1284   }
1285   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1286   if (NULL != peer_ctx->recv_channel &&
1287       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1288   {
1289     LOG (GNUNET_ERROR_TYPE_DEBUG,
1290         "Destroying recv channel\n");
1291     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1292     peer_ctx->recv_channel = NULL;
1293   }
1294
1295   GNUNET_free (peer_ctx->send_channel_flags);
1296   GNUNET_free (peer_ctx->recv_channel_flags);
1297
1298   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1299   {
1300     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1301   }
1302   GNUNET_free (peer_ctx);
1303   return GNUNET_YES;
1304 }
1305
1306
1307 /**
1308  * @brief set flags on a given peer.
1309  *
1310  * @param peer the peer to set flags on
1311  * @param flags the flags
1312  */
1313 void
1314 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1315 {
1316   struct PeerContext *peer_ctx;
1317
1318   peer_ctx = get_peer_ctx (peer);
1319   set_peer_flag (peer_ctx, flags);
1320 }
1321
1322
1323 /**
1324  * @brief unset flags on a given peer.
1325  *
1326  * @param peer the peer to unset flags on
1327  * @param flags the flags
1328  */
1329 void
1330 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1331 {
1332   struct PeerContext *peer_ctx;
1333
1334   peer_ctx = get_peer_ctx (peer);
1335   unset_peer_flag (peer_ctx, flags);
1336 }
1337
1338
1339 /**
1340  * @brief Check whether flags on a peer are set.
1341  *
1342  * @param peer the peer to check the flag of
1343  * @param flags the flags to check
1344  *
1345  * @return #GNUNET_SYSERR if peer is not known
1346  *         #GNUNET_YES    if all given flags are set
1347  *         #GNUNET_NO     otherwise
1348  */
1349 int
1350 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1351 {
1352   struct PeerContext *peer_ctx;
1353
1354   if (GNUNET_NO == Peers_check_peer_known (peer))
1355   {
1356     return GNUNET_SYSERR;
1357   }
1358   peer_ctx = get_peer_ctx (peer);
1359   return check_peer_flag_set (peer_ctx, flags);
1360 }
1361
1362
1363 /**
1364  * @brief set flags on a given channel.
1365  *
1366  * @param channel the channel to set flags on
1367  * @param flags the flags
1368  */
1369 void
1370 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1371 {
1372   set_channel_flag (channel_flags, flags);
1373 }
1374
1375
1376 /**
1377  * @brief unset flags on a given channel.
1378  *
1379  * @param channel the channel to unset flags on
1380  * @param flags the flags
1381  */
1382 void
1383 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1384 {
1385   unset_channel_flag (channel_flags, flags);
1386 }
1387
1388
1389 /**
1390  * @brief Check whether flags on a channel are set.
1391  *
1392  * @param channel the channel to check the flag of
1393  * @param flags the flags to check
1394  *
1395  * @return #GNUNET_YES if all given flags are set
1396  *         #GNUNET_NO  otherwise
1397  */
1398 int
1399 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1400 {
1401   return check_channel_flag_set (channel_flags, flags);
1402 }
1403
1404 /**
1405  * @brief Get the flags for the channel in @a role for @a peer.
1406  *
1407  * @param peer Peer to get the channel flags for.
1408  * @param role Role of channel to get flags for
1409  *
1410  * @return The flags.
1411  */
1412 uint32_t *
1413 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1414                         enum Peers_ChannelRole role)
1415 {
1416   const struct PeerContext *peer_ctx;
1417
1418   peer_ctx = get_peer_ctx (peer);
1419   if (Peers_CHANNEL_ROLE_SENDING == role)
1420   {
1421     return peer_ctx->send_channel_flags;
1422   }
1423   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1424   {
1425     return peer_ctx->recv_channel_flags;
1426   }
1427   else
1428   {
1429     GNUNET_assert (0);
1430   }
1431 }
1432
1433 /**
1434  * @brief Check whether we have information about the given peer.
1435  *
1436  * FIXME probably deprecated. Make this the new _online.
1437  *
1438  * @param peer peer in question
1439  *
1440  * @return #GNUNET_YES if peer is known
1441  *         #GNUNET_NO  if peer is not knwon
1442  */
1443 int
1444 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1445 {
1446   if (NULL != peer_map)
1447   {
1448     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1449   } else
1450   {
1451     return GNUNET_NO;
1452   }
1453 }
1454
1455
1456 /**
1457  * @brief Check whether @a peer is actually a peer.
1458  *
1459  * A valid peer is a peer that we know exists eg. we were connected to once.
1460  *
1461  * @param peer peer in question
1462  *
1463  * @return #GNUNET_YES if peer is valid
1464  *         #GNUNET_NO  if peer is not valid
1465  */
1466 int
1467 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1468 {
1469   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1470 }
1471
1472
1473 /**
1474  * @brief Indicate that we want to send to the other peer
1475  *
1476  * This establishes a sending channel
1477  *
1478  * @param peer the peer to establish channel to
1479  */
1480 void
1481 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1482 {
1483   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1484   (void) get_channel (peer);
1485 }
1486
1487
1488 /**
1489  * @brief Check whether other peer has the intention to send/opened channel
1490  *        towars us
1491  *
1492  * @param peer the peer in question
1493  *
1494  * @return #GNUNET_YES if peer has the intention to send
1495  *         #GNUNET_NO  otherwise
1496  */
1497 int
1498 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1499 {
1500   const struct PeerContext *peer_ctx;
1501
1502   peer_ctx = get_peer_ctx (peer);
1503   if (NULL != peer_ctx->recv_channel)
1504   {
1505     return GNUNET_YES;
1506   }
1507   return GNUNET_NO;
1508 }
1509
1510
1511 /**
1512  * Handle the channel a peer opens to us.
1513  *
1514  * @param cls The closure
1515  * @param channel The channel the peer wants to establish
1516  * @param initiator The peer's peer ID
1517  *
1518  * @return initial channel context for the channel
1519  *         (can be NULL -- that's not an error)
1520  */
1521 void *
1522 Peers_handle_inbound_channel (void *cls,
1523                               struct GNUNET_CADET_Channel *channel,
1524                               const struct GNUNET_PeerIdentity *initiator)
1525 {
1526   struct PeerContext *peer_ctx;
1527   struct GNUNET_PeerIdentity *ctx_peer;
1528
1529   LOG (GNUNET_ERROR_TYPE_DEBUG,
1530       "New channel was established to us (Peer %s).\n",
1531       GNUNET_i2s (initiator));
1532   GNUNET_assert (NULL != channel); /* according to cadet API */
1533   /* Make sure we 'know' about this peer */
1534   peer_ctx = create_or_get_peer_ctx (initiator);
1535   set_peer_live (peer_ctx);
1536   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1537   *ctx_peer = *initiator;
1538   /* We only accept one incoming channel per peer */
1539   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1540   {
1541     set_channel_flag (peer_ctx->recv_channel_flags,
1542                       Peers_CHANNEL_ESTABLISHED_TWICE);
1543     //GNUNET_CADET_channel_destroy (channel);
1544     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1545     peer_ctx->recv_channel = channel;
1546     /* return the channel context */
1547     return ctx_peer;
1548   }
1549   peer_ctx->recv_channel = channel;
1550   return ctx_peer;
1551 }
1552
1553
1554 /**
1555  * @brief Check whether a sending channel towards the given peer exists
1556  *
1557  * @param peer the peer to check for
1558  *
1559  * @return #GNUNET_YES if a sending channel towards that peer exists
1560  *         #GNUNET_NO  otherwise
1561  */
1562 int
1563 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1564 {
1565   struct PeerContext *peer_ctx;
1566
1567   if (GNUNET_NO == Peers_check_peer_known (peer))
1568   { /* If no such peer exists, there is no channel */
1569     return GNUNET_NO;
1570   }
1571   peer_ctx = get_peer_ctx (peer);
1572   if (NULL == peer_ctx->send_channel)
1573   {
1574     return GNUNET_NO;
1575   }
1576   return GNUNET_YES;
1577 }
1578
1579
1580 /**
1581  * @brief check whether the given channel is the sending channel of the given
1582  *        peer
1583  *
1584  * @param peer the peer in question
1585  * @param channel the channel to check for
1586  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1587  *                    #Peers_CHANNEL_ROLE_RECEIVING
1588  *
1589  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1590  *         #GNUNET_NO  otherwise
1591  */
1592 int
1593 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1594                           const struct GNUNET_CADET_Channel *channel,
1595                           enum Peers_ChannelRole role)
1596 {
1597   const struct PeerContext *peer_ctx;
1598
1599   if (GNUNET_NO == Peers_check_peer_known (peer))
1600   {
1601     return GNUNET_NO;
1602   }
1603   peer_ctx = get_peer_ctx (peer);
1604   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1605        (channel == peer_ctx->send_channel) )
1606   {
1607     return GNUNET_YES;
1608   }
1609   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1610        (channel == peer_ctx->recv_channel) )
1611   {
1612     return GNUNET_YES;
1613   }
1614   return GNUNET_NO;
1615 }
1616
1617
1618 /**
1619  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1620  *        intention to another peer
1621  *
1622  * If there is also no channel to receive messages from that peer, remove it
1623  * from the peermap.
1624  * TODO really?
1625  *
1626  * @peer the peer identity of the peer whose sending channel to destroy
1627  * @return #GNUNET_YES if channel was destroyed
1628  *         #GNUNET_NO  otherwise
1629  */
1630 int
1631 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1632 {
1633   struct PeerContext *peer_ctx;
1634
1635   if (GNUNET_NO == Peers_check_peer_known (peer))
1636   {
1637     return GNUNET_NO;
1638   }
1639   peer_ctx = get_peer_ctx (peer);
1640   if (NULL != peer_ctx->send_channel)
1641   {
1642     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1643     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1644     peer_ctx->send_channel = NULL;
1645     peer_ctx->mq = NULL;
1646     (void) Peers_check_connected (peer);
1647     return GNUNET_YES;
1648   }
1649   return GNUNET_NO;
1650 }
1651
1652 /**
1653  * This is called when a channel is destroyed.
1654  *
1655  * @param cls The closure
1656  * @param channel The channel being closed
1657  */
1658 void
1659 Peers_cleanup_destroyed_channel (void *cls,
1660                                  const struct GNUNET_CADET_Channel *channel)
1661 {
1662   struct GNUNET_PeerIdentity *peer = cls;
1663   struct PeerContext *peer_ctx;
1664
1665   if (GNUNET_NO == Peers_check_peer_known (peer))
1666   {/* We don't want to implicitly create a context that we're about to kill */
1667   LOG (GNUNET_ERROR_TYPE_DEBUG,
1668        "channel (%s) without associated context was destroyed\n",
1669        GNUNET_i2s (peer));
1670     return;
1671   }
1672   peer_ctx = get_peer_ctx (peer);
1673
1674   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1675    * flag will be set. In this case simply make sure that the channels are
1676    * cleaned. */
1677   /* FIXME This distinction seems to be redundant */
1678   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1679   {/* We initiatad the destruction of this particular peer */
1680     if (channel == peer_ctx->send_channel)
1681       peer_ctx->send_channel = NULL;
1682     else if (channel == peer_ctx->recv_channel)
1683       peer_ctx->recv_channel = NULL;
1684
1685     if (NULL != peer_ctx->send_channel)
1686     {
1687       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1688       peer_ctx->send_channel = NULL;
1689     }
1690     if (NULL != peer_ctx->recv_channel)
1691     {
1692       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1693       peer_ctx->recv_channel = NULL;
1694     }
1695     /* Set the #Peers_ONLINE flag accordingly */
1696     (void) Peers_check_connected (peer);
1697     return;
1698   }
1699
1700   else
1701   { /* We did not initiate the destruction of this peer */
1702     if (channel == peer_ctx->send_channel)
1703     { /* Something (but us) killd the channel - clean up peer */
1704       LOG (GNUNET_ERROR_TYPE_DEBUG,
1705           "send channel (%s) was destroyed - cleaning up\n",
1706           GNUNET_i2s (peer));
1707       peer_ctx->send_channel = NULL;
1708     }
1709     else if (channel == peer_ctx->recv_channel)
1710     { /* Other peer doesn't want to send us messages anymore */
1711       LOG (GNUNET_ERROR_TYPE_DEBUG,
1712            "Peer %s destroyed recv channel - cleaning up channel\n",
1713            GNUNET_i2s (peer));
1714       peer_ctx->recv_channel = NULL;
1715     }
1716     else
1717     {
1718       LOG (GNUNET_ERROR_TYPE_WARNING,
1719            "unknown channel (%s) was destroyed\n",
1720            GNUNET_i2s (peer));
1721     }
1722   }
1723   (void) Peers_check_connected (peer);
1724 }
1725
1726 /**
1727  * @brief Send a message to another peer.
1728  *
1729  * Keeps track about pending messages so they can be properly removed when the
1730  * peer is destroyed.
1731  *
1732  * @param peer receeiver of the message
1733  * @param ev envelope of the message
1734  * @param type type of the message
1735  */
1736 void
1737 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1738                     struct GNUNET_MQ_Envelope *ev,
1739                     const char *type)
1740 {
1741   struct PendingMessage *pending_msg;
1742   struct GNUNET_MQ_Handle *mq;
1743
1744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1745               "Sending message to %s of type %s\n",
1746               GNUNET_i2s (peer),
1747               type);
1748   pending_msg = insert_pending_message (peer, ev, type);
1749   mq = get_mq (peer);
1750   GNUNET_MQ_notify_sent (ev,
1751                          mq_notify_sent_cb,
1752                          pending_msg);
1753   GNUNET_MQ_send (mq, ev);
1754 }
1755
1756 /**
1757  * @brief Schedule a operation on given peer
1758  *
1759  * Avoids scheduling an operation twice.
1760  *
1761  * @param peer the peer we want to schedule the operation for once it gets live
1762  *
1763  * @return #GNUNET_YES if the operation was scheduled
1764  *         #GNUNET_NO  otherwise
1765  */
1766 int
1767 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1768                           const PeerOp peer_op)
1769 {
1770   struct PeerPendingOp pending_op;
1771   struct PeerContext *peer_ctx;
1772
1773   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1774   {
1775     return GNUNET_NO;
1776   }
1777   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1778
1779   //TODO if LIVE/ONLINE execute immediately
1780
1781   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1782   {
1783     peer_ctx = get_peer_ctx (peer);
1784     pending_op.op = peer_op;
1785     pending_op.op_cls = NULL;
1786     GNUNET_array_append (peer_ctx->pending_ops,
1787                          peer_ctx->num_pending_ops,
1788                          pending_op);
1789     return GNUNET_YES;
1790   }
1791   return GNUNET_NO;
1792 }
1793
1794 /**
1795  * @brief Get the recv_channel of @a peer.
1796  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1797  * messages.
1798  *
1799  * @param peer The peer to get the recv_channel from.
1800  *
1801  * @return The recv_channel.
1802  */
1803 struct GNUNET_CADET_Channel *
1804 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1805 {
1806   struct PeerContext *peer_ctx;
1807
1808   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1809   peer_ctx = get_peer_ctx (peer);
1810   return peer_ctx->recv_channel;
1811 }
1812 /***********************************************************************
1813  * /Old gnunet-service-rps_peers.c
1814 ***********************************************************************/
1815
1816
1817 /***********************************************************************
1818  * Housekeeping with clients
1819 ***********************************************************************/
1820
1821 /**
1822  * Closure used to pass the client and the id to the callback
1823  * that replies to a client's request
1824  */
1825 struct ReplyCls
1826 {
1827   /**
1828    * DLL
1829    */
1830   struct ReplyCls *next;
1831   struct ReplyCls *prev;
1832
1833   /**
1834    * The identifier of the request
1835    */
1836   uint32_t id;
1837
1838   /**
1839    * The handle to the request
1840    */
1841   struct RPS_SamplerRequestHandle *req_handle;
1842
1843   /**
1844    * The client handle to send the reply to
1845    */
1846   struct ClientContext *cli_ctx;
1847 };
1848
1849
1850 /**
1851  * Struct used to store the context of a connected client.
1852  */
1853 struct ClientContext
1854 {
1855   /**
1856    * DLL
1857    */
1858   struct ClientContext *next;
1859   struct ClientContext *prev;
1860
1861   /**
1862    * The message queue to communicate with the client.
1863    */
1864   struct GNUNET_MQ_Handle *mq;
1865
1866   /**
1867    * DLL with handles to single requests from the client
1868    */
1869   struct ReplyCls *rep_cls_head;
1870   struct ReplyCls *rep_cls_tail;
1871
1872   /**
1873    * @brief How many updates this client expects to receive.
1874    */
1875   int64_t view_updates_left;
1876
1877   /**
1878    * The client handle to send the reply to
1879    */
1880   struct GNUNET_SERVICE_Client *client;
1881 };
1882
1883 /**
1884  * DLL with all clients currently connected to us
1885  */
1886 struct ClientContext *cli_ctx_head;
1887 struct ClientContext *cli_ctx_tail;
1888
1889 /***********************************************************************
1890  * /Housekeeping with clients
1891 ***********************************************************************/
1892
1893
1894
1895
1896
1897 /***********************************************************************
1898  * Globals
1899 ***********************************************************************/
1900
1901 /**
1902  * Sampler used for the Brahms protocol itself.
1903  */
1904 static struct RPS_Sampler *prot_sampler;
1905
1906 /**
1907  * Sampler used for the clients.
1908  */
1909 static struct RPS_Sampler *client_sampler;
1910
1911 /**
1912  * Name to log view to
1913  */
1914 static char *file_name_view_log;
1915
1916 /**
1917  * The size of sampler we need to be able to satisfy the client's need
1918  * of random peers.
1919  */
1920 static unsigned int sampler_size_client_need;
1921
1922 /**
1923  * The size of sampler we need to be able to satisfy the Brahms protocol's
1924  * need of random peers.
1925  *
1926  * This is one minimum size the sampler grows to.
1927  */
1928 static unsigned int sampler_size_est_need;
1929
1930 /**
1931  * Percentage of total peer number in the view
1932  * to send random PUSHes to
1933  */
1934 static float alpha;
1935
1936 /**
1937  * Percentage of total peer number in the view
1938  * to send random PULLs to
1939  */
1940 static float beta;
1941
1942 /**
1943  * Identifier for the main task that runs periodically.
1944  */
1945 static struct GNUNET_SCHEDULER_Task *do_round_task;
1946
1947 /**
1948  * Time inverval the do_round task runs in.
1949  */
1950 static struct GNUNET_TIME_Relative round_interval;
1951
1952 /**
1953  * List to store peers received through pushes temporary.
1954  */
1955 static struct CustomPeerMap *push_map;
1956
1957 /**
1958  * List to store peers received through pulls temporary.
1959  */
1960 static struct CustomPeerMap *pull_map;
1961
1962 /**
1963  * Handler to NSE.
1964  */
1965 static struct GNUNET_NSE_Handle *nse;
1966
1967 /**
1968  * Handler to CADET.
1969  */
1970 static struct GNUNET_CADET_Handle *cadet_handle;
1971
1972 /**
1973  * @brief Port to communicate to other peers.
1974  */
1975 static struct GNUNET_CADET_Port *cadet_port;
1976
1977 /**
1978  * Handler to PEERINFO.
1979  */
1980 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1981
1982 /**
1983  * Handle for cancellation of iteration over peers.
1984  */
1985 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1986
1987 /**
1988  * Request counter.
1989  *
1990  * Counts how many requets clients already issued.
1991  * Only needed in the beginning to check how many of the 64 deltas
1992  * we already have
1993  */
1994 static unsigned int req_counter;
1995
1996 /**
1997  * Time of the last request we received.
1998  *
1999  * Used to compute the expected request rate.
2000  */
2001 static struct GNUNET_TIME_Absolute last_request;
2002
2003 /**
2004  * Size of #request_deltas.
2005  */
2006 #define REQUEST_DELTAS_SIZE 64
2007 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2008
2009 /**
2010  * Last 64 deltas between requests
2011  */
2012 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2013
2014 /**
2015  * The prediction of the rate of requests
2016  */
2017 static struct GNUNET_TIME_Relative request_rate;
2018
2019
2020 #ifdef ENABLE_MALICIOUS
2021 /**
2022  * Type of malicious peer
2023  *
2024  * 0 Don't act malicious at all - Default
2025  * 1 Try to maximise representation
2026  * 2 Try to partition the network
2027  * 3 Combined attack
2028  */
2029 static uint32_t mal_type;
2030
2031 /**
2032  * Other malicious peers
2033  */
2034 static struct GNUNET_PeerIdentity *mal_peers;
2035
2036 /**
2037  * Hashmap of malicious peers used as set.
2038  * Used to more efficiently check whether we know that peer.
2039  */
2040 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2041
2042 /**
2043  * Number of other malicious peers
2044  */
2045 static uint32_t num_mal_peers;
2046
2047
2048 /**
2049  * If type is 2 This struct is used to store the attacked peers in a DLL
2050  */
2051 struct AttackedPeer
2052 {
2053   /**
2054    * DLL
2055    */
2056   struct AttackedPeer *next;
2057   struct AttackedPeer *prev;
2058
2059   /**
2060    * PeerID
2061    */
2062   struct GNUNET_PeerIdentity peer_id;
2063 };
2064
2065 /**
2066  * If type is 2 this is the DLL of attacked peers
2067  */
2068 static struct AttackedPeer *att_peers_head;
2069 static struct AttackedPeer *att_peers_tail;
2070
2071 /**
2072  * This index is used to point to an attacked peer to
2073  * implement the round-robin-ish way to select attacked peers.
2074  */
2075 static struct AttackedPeer *att_peer_index;
2076
2077 /**
2078  * Hashmap of attacked peers used as set.
2079  * Used to more efficiently check whether we know that peer.
2080  */
2081 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2082
2083 /**
2084  * Number of attacked peers
2085  */
2086 static uint32_t num_attacked_peers;
2087
2088 /**
2089  * If type is 1 this is the attacked peer
2090  */
2091 static struct GNUNET_PeerIdentity attacked_peer;
2092
2093 /**
2094  * The limit of PUSHes we can send in one round.
2095  * This is an assumption of the Brahms protocol and either implemented
2096  * via proof of work
2097  * or
2098  * assumend to be the bandwidth limitation.
2099  */
2100 static uint32_t push_limit = 10000;
2101 #endif /* ENABLE_MALICIOUS */
2102
2103
2104 /***********************************************************************
2105  * /Globals
2106 ***********************************************************************/
2107
2108
2109 /***********************************************************************
2110  * Util functions
2111 ***********************************************************************/
2112
2113
2114 /**
2115  * Print peerlist to log.
2116  */
2117 static void
2118 print_peer_list (struct GNUNET_PeerIdentity *list,
2119                  unsigned int len)
2120 {
2121   unsigned int i;
2122
2123   LOG (GNUNET_ERROR_TYPE_DEBUG,
2124        "Printing peer list of length %u at %p:\n",
2125        len,
2126        list);
2127   for (i = 0 ; i < len ; i++)
2128   {
2129     LOG (GNUNET_ERROR_TYPE_DEBUG,
2130          "%u. peer: %s\n",
2131          i, GNUNET_i2s (&list[i]));
2132   }
2133 }
2134
2135
2136 /**
2137  * Remove peer from list.
2138  */
2139 static void
2140 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2141                unsigned int *list_size,
2142                const struct GNUNET_PeerIdentity *peer)
2143 {
2144   unsigned int i;
2145   struct GNUNET_PeerIdentity *tmp;
2146
2147   tmp = *peer_list;
2148
2149   LOG (GNUNET_ERROR_TYPE_DEBUG,
2150        "Removing peer %s from list at %p\n",
2151        GNUNET_i2s (peer),
2152        tmp);
2153
2154   for ( i = 0 ; i < *list_size ; i++ )
2155   {
2156     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2157     {
2158       if (i < *list_size -1)
2159       { /* Not at the last entry -- shift peers left */
2160         memmove (&tmp[i], &tmp[i +1],
2161                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2162       }
2163       /* Remove last entry (should be now useless PeerID) */
2164       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2165     }
2166   }
2167   *peer_list = tmp;
2168 }
2169
2170
2171 /**
2172  * Sum all time relatives of an array.
2173  */
2174 static struct GNUNET_TIME_Relative
2175 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2176                 uint32_t arr_size)
2177 {
2178   struct GNUNET_TIME_Relative sum;
2179   uint32_t i;
2180
2181   sum = GNUNET_TIME_UNIT_ZERO;
2182   for ( i = 0 ; i < arr_size ; i++ )
2183   {
2184     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2185   }
2186   return sum;
2187 }
2188
2189
2190 /**
2191  * Compute the average of given time relatives.
2192  */
2193 static struct GNUNET_TIME_Relative
2194 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2195                 uint32_t arr_size)
2196 {
2197   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2198                                                       arr_size),
2199                                       arr_size);
2200 }
2201
2202
2203 /**
2204  * Insert PeerID in #view
2205  *
2206  * Called once we know a peer is live.
2207  * Implements #PeerOp
2208  *
2209  * @return GNUNET_OK if peer was actually inserted
2210  *         GNUNET_NO if peer was not inserted
2211  */
2212 static void
2213 insert_in_view_op (void *cls,
2214                 const struct GNUNET_PeerIdentity *peer);
2215
2216 /**
2217  * Insert PeerID in #view
2218  *
2219  * Called once we know a peer is live.
2220  *
2221  * @return GNUNET_OK if peer was actually inserted
2222  *         GNUNET_NO if peer was not inserted
2223  */
2224 static int
2225 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2226 {
2227   int online;
2228
2229   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2230   if ( (GNUNET_NO == online) ||
2231        (GNUNET_SYSERR == online) ) /* peer is not even known */
2232   {
2233     (void) Peers_issue_peer_liveliness_check (peer);
2234     (void) Peers_schedule_operation (peer, insert_in_view_op);
2235     return GNUNET_NO;
2236   }
2237   /* Open channel towards peer to keep connection open */
2238   Peers_indicate_sending_intention (peer);
2239   return View_put (peer);
2240 }
2241
2242 /**
2243  * Put random peer from sampler into the view as history update.
2244  */
2245 static void
2246 hist_update (void *cls,
2247              struct GNUNET_PeerIdentity *ids,
2248              uint32_t num_peers)
2249 {
2250   unsigned int i;
2251
2252   for (i = 0; i < num_peers; i++)
2253   {
2254     (void) insert_in_view (&ids[i]);
2255     to_file (file_name_view_log,
2256              "+%s\t(hist)",
2257              GNUNET_i2s_full (ids));
2258   }
2259 }
2260
2261
2262 /**
2263  * Wrapper around #RPS_sampler_resize()
2264  *
2265  * If we do not have enough sampler elements, double current sampler size
2266  * If we have more than enough sampler elements, halv current sampler size
2267  */
2268 static void
2269 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2270 {
2271   unsigned int sampler_size;
2272
2273   // TODO statistics
2274   // TODO respect the min, max
2275   sampler_size = RPS_sampler_get_size (sampler);
2276   if (sampler_size > new_size * 4)
2277   { /* Shrinking */
2278     RPS_sampler_resize (sampler, sampler_size / 2);
2279   }
2280   else if (sampler_size < new_size)
2281   { /* Growing */
2282     RPS_sampler_resize (sampler, sampler_size * 2);
2283   }
2284   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2285 }
2286
2287
2288 /**
2289  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2290  */
2291 static void
2292 client_resize_wrapper ()
2293 {
2294   uint32_t bigger_size;
2295
2296   // TODO statistics
2297
2298   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2299
2300   // TODO respect the min, max
2301   resize_wrapper (client_sampler, bigger_size);
2302   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2303       bigger_size);
2304 }
2305
2306
2307 /**
2308  * Estimate request rate
2309  *
2310  * Called every time we receive a request from the client.
2311  */
2312 static void
2313 est_request_rate()
2314 {
2315   struct GNUNET_TIME_Relative max_round_duration;
2316
2317   if (request_deltas_size > req_counter)
2318     req_counter++;
2319   if ( 1 < req_counter)
2320   {
2321     /* Shift last request deltas to the right */
2322     memmove (&request_deltas[1],
2323         request_deltas,
2324         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2325
2326     /* Add current delta to beginning */
2327     request_deltas[0] =
2328         GNUNET_TIME_absolute_get_difference (last_request,
2329                                              GNUNET_TIME_absolute_get ());
2330     request_rate = T_relative_avg (request_deltas, req_counter);
2331     request_rate = (request_rate.rel_value_us < 1) ?
2332       GNUNET_TIME_relative_get_unit_ () : request_rate;
2333
2334     /* Compute the duration a round will maximally take */
2335     max_round_duration =
2336         GNUNET_TIME_relative_add (round_interval,
2337                                   GNUNET_TIME_relative_divide (round_interval, 2));
2338
2339     /* Set the estimated size the sampler has to have to
2340      * satisfy the current client request rate */
2341     sampler_size_client_need =
2342         max_round_duration.rel_value_us / request_rate.rel_value_us;
2343
2344     /* Resize the sampler */
2345     client_resize_wrapper ();
2346   }
2347   last_request = GNUNET_TIME_absolute_get ();
2348 }
2349
2350
2351 /**
2352  * Add all peers in @a peer_array to @a peer_map used as set.
2353  *
2354  * @param peer_array array containing the peers
2355  * @param num_peers number of peers in @peer_array
2356  * @param peer_map the peermap to use as set
2357  */
2358 static void
2359 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2360                        unsigned int num_peers,
2361                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2362 {
2363   unsigned int i;
2364   if (NULL == peer_map)
2365   {
2366     LOG (GNUNET_ERROR_TYPE_WARNING,
2367          "Trying to add peers to non-existing peermap.\n");
2368     return;
2369   }
2370
2371   for (i = 0; i < num_peers; i++)
2372   {
2373     GNUNET_CONTAINER_multipeermap_put (peer_map,
2374                                        &peer_array[i],
2375                                        NULL,
2376                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2377   }
2378 }
2379
2380
2381 /**
2382  * Send a PULL REPLY to @a peer_id
2383  *
2384  * @param peer_id the peer to send the reply to.
2385  * @param peer_ids the peers to send to @a peer_id
2386  * @param num_peer_ids the number of peers to send to @a peer_id
2387  */
2388 static void
2389 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2390                  const struct GNUNET_PeerIdentity *peer_ids,
2391                  unsigned int num_peer_ids)
2392 {
2393   uint32_t send_size;
2394   struct GNUNET_MQ_Envelope *ev;
2395   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2396
2397   /* Compute actual size */
2398   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2399               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2400
2401   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2402     /* Compute number of peers to send
2403      * If too long, simply truncate */
2404     // TODO select random ones via permutation
2405     //      or even better: do good protocol design
2406     send_size =
2407       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2408        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2409        sizeof (struct GNUNET_PeerIdentity);
2410   else
2411     send_size = num_peer_ids;
2412
2413   LOG (GNUNET_ERROR_TYPE_DEBUG,
2414       "Going to send PULL REPLY with %u peers to %s\n",
2415       send_size, GNUNET_i2s (peer_id));
2416
2417   ev = GNUNET_MQ_msg_extra (out_msg,
2418                             send_size * sizeof (struct GNUNET_PeerIdentity),
2419                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2420   out_msg->num_peers = htonl (send_size);
2421   GNUNET_memcpy (&out_msg[1], peer_ids,
2422          send_size * sizeof (struct GNUNET_PeerIdentity));
2423
2424   Peers_send_message (peer_id, ev, "PULL REPLY");
2425   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2426 }
2427
2428
2429 /**
2430  * Insert PeerID in #pull_map
2431  *
2432  * Called once we know a peer is live.
2433  */
2434 static void
2435 insert_in_pull_map (void *cls,
2436                     const struct GNUNET_PeerIdentity *peer)
2437 {
2438   CustomPeerMap_put (pull_map, peer);
2439 }
2440
2441
2442 /**
2443  * Insert PeerID in #view
2444  *
2445  * Called once we know a peer is live.
2446  * Implements #PeerOp
2447  */
2448 static void
2449 insert_in_view_op (void *cls,
2450                 const struct GNUNET_PeerIdentity *peer)
2451 {
2452   (void) insert_in_view (peer);
2453 }
2454
2455
2456 /**
2457  * Update sampler with given PeerID.
2458  * Implements #PeerOp
2459  */
2460 static void
2461 insert_in_sampler (void *cls,
2462                    const struct GNUNET_PeerIdentity *peer)
2463 {
2464   LOG (GNUNET_ERROR_TYPE_DEBUG,
2465        "Updating samplers with peer %s from insert_in_sampler()\n",
2466        GNUNET_i2s (peer));
2467   RPS_sampler_update (prot_sampler,   peer);
2468   RPS_sampler_update (client_sampler, peer);
2469   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2470   {
2471     /* Make sure we 'know' about this peer */
2472     (void) Peers_issue_peer_liveliness_check (peer);
2473     /* Establish a channel towards that peer to indicate we are going to send
2474      * messages to it */
2475     //Peers_indicate_sending_intention (peer);
2476   }
2477 }
2478
2479 /**
2480  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2481  *        If the peer is not known, liveliness check is issued and it is
2482  *        scheduled to be inserted in sampler and view.
2483  *
2484  * "External sources" refer to every source except the gossip.
2485  *
2486  * @param peer peer to insert
2487  */
2488 static void
2489 got_peer (const struct GNUNET_PeerIdentity *peer)
2490 {
2491   /* If we did not know this peer already, insert it into sampler and view */
2492   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2493   {
2494     Peers_schedule_operation (peer, insert_in_sampler);
2495     Peers_schedule_operation (peer, insert_in_view_op);
2496   }
2497 }
2498
2499 /**
2500  * @brief Checks if there is a sending channel and if it is needed
2501  *
2502  * @param peer the peer whose sending channel is checked
2503  * @return GNUNET_YES if sending channel exists and is still needed
2504  *         GNUNET_NO  otherwise
2505  */
2506 static int
2507 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2508 {
2509   /* struct GNUNET_CADET_Channel *channel; */
2510   if (GNUNET_NO == Peers_check_peer_known (peer))
2511   {
2512     return GNUNET_NO;
2513   }
2514   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2515   {
2516     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2517          (GNUNET_YES == View_contains_peer (peer)) ||
2518          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2519          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2520          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2521     { /* If we want to keep the connection to peer open */
2522       return GNUNET_YES;
2523     }
2524     return GNUNET_NO;
2525   }
2526   return GNUNET_NO;
2527 }
2528
2529 /**
2530  * @brief remove peer from our knowledge, the view, push and pull maps and
2531  * samplers.
2532  *
2533  * @param peer the peer to remove
2534  */
2535 static void
2536 remove_peer (const struct GNUNET_PeerIdentity *peer)
2537 {
2538   (void) View_remove_peer (peer);
2539   CustomPeerMap_remove_peer (pull_map, peer);
2540   CustomPeerMap_remove_peer (push_map, peer);
2541   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2542   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2543   Peers_remove_peer (peer);
2544 }
2545
2546
2547 /**
2548  * @brief Remove data that is not needed anymore.
2549  *
2550  * If the sending channel is no longer needed it is destroyed.
2551  *
2552  * @param peer the peer whose data is about to be cleaned
2553  */
2554 static void
2555 clean_peer (const struct GNUNET_PeerIdentity *peer)
2556 {
2557   if (GNUNET_NO == check_sending_channel_needed (peer))
2558   {
2559     LOG (GNUNET_ERROR_TYPE_DEBUG,
2560         "Going to remove send channel to peer %s\n",
2561         GNUNET_i2s (peer));
2562     #ifdef ENABLE_MALICIOUS
2563     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2564       (void) Peers_destroy_sending_channel (peer);
2565     #else /* ENABLE_MALICIOUS */
2566     (void) Peers_destroy_sending_channel (peer);
2567     #endif /* ENABLE_MALICIOUS */
2568   }
2569
2570   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2571        (GNUNET_NO == View_contains_peer (peer)) &&
2572        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2573        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2574        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2575        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2576        (GNUNET_NO != Peers_check_removable (peer)) )
2577   { /* We can safely remove this peer */
2578     LOG (GNUNET_ERROR_TYPE_DEBUG,
2579         "Going to remove peer %s\n",
2580         GNUNET_i2s (peer));
2581     remove_peer (peer);
2582     return;
2583   }
2584 }
2585
2586 /**
2587  * @brief This is called when a channel is destroyed.
2588  *
2589  * Removes peer completely from our knowledge if the send_channel was destroyed
2590  * Otherwise simply delete the recv_channel
2591  * Also check if the knowledge about this peer is still needed.
2592  * If not, remove this peer from our knowledge.
2593  *
2594  * @param cls The closure
2595  * @param channel The channel being closed
2596  * @param channel_ctx The context associated with this channel
2597  */
2598 static void
2599 cleanup_destroyed_channel (void *cls,
2600                            const struct GNUNET_CADET_Channel *channel)
2601 {
2602   struct GNUNET_PeerIdentity *peer = cls;
2603   uint32_t *channel_flag;
2604   struct PeerContext *peer_ctx;
2605
2606   GNUNET_assert (NULL != peer);
2607
2608   if (GNUNET_NO == Peers_check_peer_known (peer))
2609   { /* We don't know a context to that peer */
2610     LOG (GNUNET_ERROR_TYPE_WARNING,
2611          "channel (%s) without associated context was destroyed\n",
2612          GNUNET_i2s (peer));
2613     GNUNET_free (peer);
2614     return;
2615   }
2616
2617   peer_ctx = get_peer_ctx (peer);
2618   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2619   {
2620     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2621   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2622   {
2623     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2624   }
2625
2626   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2627   { /* We are in the middle of removing that peer from our knowledge. In this
2628        case simply make sure that the channels are cleaned. */
2629     Peers_cleanup_destroyed_channel (cls, channel);
2630     to_file (file_name_view_log,
2631              "-%s\t(cleanup channel, ourself)",
2632              GNUNET_i2s_full (peer));
2633     GNUNET_free (peer);
2634     return;
2635   }
2636
2637   if (GNUNET_YES ==
2638       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2639   { /* Channel used for sending was destroyed */
2640     /* Possible causes of channel destruction:
2641      *  - ourselves  -> cleaning send channel -> clean context
2642      *  - other peer -> peer probably went down -> remove
2643      */
2644     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2645     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2646     { /* We are about to clean the sending channel. Clean the respective
2647        * context */
2648       Peers_cleanup_destroyed_channel (cls, channel);
2649       GNUNET_free (peer);
2650       return;
2651     }
2652     else
2653     { /* Other peer destroyed our sending channel that he is supposed to keep
2654        * open. It probably went down. Remove it from our knowledge. */
2655       Peers_cleanup_destroyed_channel (cls, channel);
2656       remove_peer (peer);
2657       GNUNET_free (peer);
2658       return;
2659     }
2660   }
2661   else if (GNUNET_YES ==
2662       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2663   { /* Channel used for receiving was destroyed */
2664     /* Possible causes of channel destruction:
2665      *  - ourselves  -> peer tried to establish channel twice -> clean context
2666      *  - other peer -> peer doesn't want to send us data -> clean
2667      */
2668     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2669     if (GNUNET_YES ==
2670         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2671     { /* Other peer tried to establish a channel to us twice. We do not accept
2672        * that. Clean the context. */
2673       Peers_cleanup_destroyed_channel (cls, channel);
2674       GNUNET_free (peer);
2675       return;
2676     }
2677     else
2678     { /* Other peer doesn't want to send us data anymore. We are free to clean
2679        * it. */
2680       Peers_cleanup_destroyed_channel (cls, channel);
2681       clean_peer (peer);
2682       GNUNET_free (peer);
2683       return;
2684     }
2685   }
2686   else
2687   {
2688     LOG (GNUNET_ERROR_TYPE_WARNING,
2689         "Destroyed channel is neither sending nor receiving channel\n");
2690   }
2691   GNUNET_free (peer);
2692 }
2693
2694 /***********************************************************************
2695  * /Util functions
2696 ***********************************************************************/
2697
2698 static void
2699 destroy_reply_cls (struct ReplyCls *rep_cls)
2700 {
2701   struct ClientContext *cli_ctx;
2702
2703   cli_ctx = rep_cls->cli_ctx;
2704   GNUNET_assert (NULL != cli_ctx);
2705   if (NULL != rep_cls->req_handle)
2706   {
2707     RPS_sampler_request_cancel (rep_cls->req_handle);
2708   }
2709   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2710                                cli_ctx->rep_cls_tail,
2711                                rep_cls);
2712   GNUNET_free (rep_cls);
2713 }
2714
2715
2716 static void
2717 destroy_cli_ctx (struct ClientContext *cli_ctx)
2718 {
2719   GNUNET_assert (NULL != cli_ctx);
2720   if (NULL != cli_ctx->rep_cls_head)
2721   {
2722     LOG (GNUNET_ERROR_TYPE_WARNING,
2723          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2724     while (NULL != cli_ctx->rep_cls_head)
2725       destroy_reply_cls (cli_ctx->rep_cls_head);
2726   }
2727   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2728                                cli_ctx_tail,
2729                                cli_ctx);
2730   GNUNET_free (cli_ctx);
2731 }
2732
2733
2734 /**
2735  * Function called by NSE.
2736  *
2737  * Updates sizes of sampler list and view and adapt those lists
2738  * accordingly.
2739  */
2740 static void
2741 nse_callback (void *cls,
2742               struct GNUNET_TIME_Absolute timestamp,
2743               double logestimate, double std_dev)
2744 {
2745   double estimate;
2746   //double scale; // TODO this might go gloabal/config
2747
2748   LOG (GNUNET_ERROR_TYPE_DEBUG,
2749        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2750        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2751   //scale = .01;
2752   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2753   // GNUNET_NSE_log_estimate_to_n (logestimate);
2754   estimate = pow (estimate, 1.0 / 3);
2755   // TODO add if std_dev is a number
2756   // estimate += (std_dev * scale);
2757   if (2 < ceil (estimate))
2758   {
2759     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2760     sampler_size_est_need = estimate;
2761   } else
2762     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2763
2764   /* If the NSE has changed adapt the lists accordingly */
2765   resize_wrapper (prot_sampler, sampler_size_est_need);
2766   client_resize_wrapper ();
2767 }
2768
2769
2770 /**
2771  * Callback called once the requested PeerIDs are ready.
2772  *
2773  * Sends those to the requesting client.
2774  */
2775 static void
2776 client_respond (void *cls,
2777                 struct GNUNET_PeerIdentity *peer_ids,
2778                 uint32_t num_peers)
2779 {
2780   struct ReplyCls *reply_cls = cls;
2781   uint32_t i;
2782   struct GNUNET_MQ_Envelope *ev;
2783   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2784   uint32_t size_needed;
2785   struct ClientContext *cli_ctx;
2786
2787   GNUNET_assert (NULL != reply_cls);
2788   LOG (GNUNET_ERROR_TYPE_DEBUG,
2789        "sampler returned %" PRIu32 " peers:\n",
2790        num_peers);
2791   for (i = 0; i < num_peers; i++)
2792   {
2793     LOG (GNUNET_ERROR_TYPE_DEBUG,
2794          "  %" PRIu32 ": %s\n",
2795          i,
2796          GNUNET_i2s (&peer_ids[i]));
2797   }
2798
2799   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2800                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2801
2802   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2803
2804   ev = GNUNET_MQ_msg_extra (out_msg,
2805                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2806                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2807   out_msg->num_peers = htonl (num_peers);
2808   out_msg->id = htonl (reply_cls->id);
2809
2810   GNUNET_memcpy (&out_msg[1],
2811           peer_ids,
2812           num_peers * sizeof (struct GNUNET_PeerIdentity));
2813   GNUNET_free (peer_ids);
2814
2815   cli_ctx = reply_cls->cli_ctx;
2816   GNUNET_assert (NULL != cli_ctx);
2817   reply_cls->req_handle = NULL;
2818   destroy_reply_cls (reply_cls);
2819   GNUNET_MQ_send (cli_ctx->mq, ev);
2820 }
2821
2822
2823 /**
2824  * Handle RPS request from the client.
2825  *
2826  * @param cls closure
2827  * @param message the actual message
2828  */
2829 static void
2830 handle_client_request (void *cls,
2831                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2832 {
2833   struct ClientContext *cli_ctx = cls;
2834   uint32_t num_peers;
2835   uint32_t size_needed;
2836   struct ReplyCls *reply_cls;
2837   uint32_t i;
2838
2839   num_peers = ntohl (msg->num_peers);
2840   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2841                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2842
2843   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2844   {
2845     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2846                 "Message received from client has size larger than expected\n");
2847     GNUNET_SERVICE_client_drop (cli_ctx->client);
2848     return;
2849   }
2850
2851   for (i = 0 ; i < num_peers ; i++)
2852     est_request_rate();
2853
2854   LOG (GNUNET_ERROR_TYPE_DEBUG,
2855        "Client requested %" PRIu32 " random peer(s).\n",
2856        num_peers);
2857
2858   reply_cls = GNUNET_new (struct ReplyCls);
2859   reply_cls->id = ntohl (msg->id);
2860   reply_cls->cli_ctx = cli_ctx;
2861   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2862                                                         client_respond,
2863                                                         reply_cls,
2864                                                         num_peers);
2865
2866   GNUNET_assert (NULL != cli_ctx);
2867   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2868                                cli_ctx->rep_cls_tail,
2869                                reply_cls);
2870   GNUNET_SERVICE_client_continue (cli_ctx->client);
2871 }
2872
2873
2874 /**
2875  * @brief Handle a message that requests the cancellation of a request
2876  *
2877  * @param cls unused
2878  * @param message the message containing the id of the request
2879  */
2880 static void
2881 handle_client_request_cancel (void *cls,
2882                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2883 {
2884   struct ClientContext *cli_ctx = cls;
2885   struct ReplyCls *rep_cls;
2886
2887   GNUNET_assert (NULL != cli_ctx);
2888   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2889   rep_cls = cli_ctx->rep_cls_head;
2890   LOG (GNUNET_ERROR_TYPE_DEBUG,
2891       "Client cancels request with id %" PRIu32 "\n",
2892       ntohl (msg->id));
2893   while ( (NULL != rep_cls->next) &&
2894           (rep_cls->id != ntohl (msg->id)) )
2895     rep_cls = rep_cls->next;
2896   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2897   destroy_reply_cls (rep_cls);
2898   GNUNET_SERVICE_client_continue (cli_ctx->client);
2899 }
2900
2901
2902 /**
2903  * @brief This function is called, when the client seeds peers.
2904  * It verifies that @a msg is well-formed.
2905  *
2906  * @param cls the closure (#ClientContext)
2907  * @param msg the message
2908  * @return #GNUNET_OK if @a msg is well-formed
2909  */
2910 static int
2911 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2912 {
2913   struct ClientContext *cli_ctx = cls;
2914   uint16_t msize = ntohs (msg->header.size);
2915   uint32_t num_peers = ntohl (msg->num_peers);
2916
2917   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2918   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2919        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2920   {
2921     GNUNET_break (0);
2922     GNUNET_SERVICE_client_drop (cli_ctx->client);
2923     return GNUNET_SYSERR;
2924   }
2925   return GNUNET_OK;
2926 }
2927
2928
2929 /**
2930  * Handle seed from the client.
2931  *
2932  * @param cls closure
2933  * @param message the actual message
2934  */
2935 static void
2936 handle_client_seed (void *cls,
2937                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2938 {
2939   struct ClientContext *cli_ctx = cls;
2940   struct GNUNET_PeerIdentity *peers;
2941   uint32_t num_peers;
2942   uint32_t i;
2943
2944   num_peers = ntohl (msg->num_peers);
2945   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2946   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
2947   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
2948
2949   LOG (GNUNET_ERROR_TYPE_DEBUG,
2950        "Client seeded peers:\n");
2951   print_peer_list (peers, num_peers);
2952
2953   for (i = 0; i < num_peers; i++)
2954   {
2955     LOG (GNUNET_ERROR_TYPE_DEBUG,
2956          "Updating samplers with seed %" PRIu32 ": %s\n",
2957          i,
2958          GNUNET_i2s (&peers[i]));
2959
2960     got_peer (&peers[i]);
2961   }
2962
2963   ////GNUNET_free (peers);
2964
2965   GNUNET_SERVICE_client_continue (cli_ctx->client);
2966 }
2967
2968 /**
2969  * @brief Send view to client
2970  *
2971  * @param cli_ctx the context of the client
2972  * @param view_array the peerids of the view as array (can be empty)
2973  * @param view_size the size of the view array (can be 0)
2974  */
2975 void
2976 send_view (const struct ClientContext *cli_ctx,
2977            const struct GNUNET_PeerIdentity *view_array,
2978            uint64_t view_size)
2979 {
2980   struct GNUNET_MQ_Envelope *ev;
2981   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2982
2983   if (NULL == view_array)
2984   {
2985     view_size = View_size ();
2986     view_array = View_get_as_array();
2987   }
2988
2989   ev = GNUNET_MQ_msg_extra (out_msg,
2990                             view_size * sizeof (struct GNUNET_PeerIdentity),
2991                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2992   out_msg->num_peers = htonl (view_size);
2993
2994   GNUNET_memcpy (&out_msg[1],
2995           view_array,
2996           view_size * sizeof (struct GNUNET_PeerIdentity));
2997   GNUNET_MQ_send (cli_ctx->mq, ev);
2998 }
2999
3000 /**
3001  * @brief sends updates to clients that are interested
3002  */
3003 static void
3004 clients_notify_view_update (void)
3005 {
3006   struct ClientContext *cli_ctx_iter;
3007   uint64_t num_peers;
3008   const struct GNUNET_PeerIdentity *view_array;
3009
3010   num_peers = View_size ();
3011   view_array = View_get_as_array();
3012   /* check size of view is small enough */
3013   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3014   {
3015     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3016                 "View is too big to send\n");
3017     return;
3018   }
3019
3020   for (cli_ctx_iter = cli_ctx_head;
3021        NULL != cli_ctx_iter;
3022        cli_ctx_iter = cli_ctx_head->next)
3023   {
3024     if (1 < cli_ctx_iter->view_updates_left)
3025     {
3026       /* Client wants to receive limited amount of updates */
3027       cli_ctx_iter->view_updates_left -= 1;
3028     } else if (1 == cli_ctx_iter->view_updates_left)
3029     {
3030       /* Last update of view for client */
3031       cli_ctx_iter->view_updates_left = -1;
3032     } else if (0 > cli_ctx_iter->view_updates_left) {
3033       /* Client is not interested in updates */
3034       continue;
3035     }
3036     /* else _updates_left == 0 - infinite amount of updates */
3037
3038     /* send view */
3039     send_view (cli_ctx_iter, view_array, num_peers);
3040   }
3041 }
3042
3043
3044 /**
3045  * Handle RPS request from the client.
3046  *
3047  * @param cls closure
3048  * @param message the actual message
3049  */
3050 static void
3051 handle_client_view_request (void *cls,
3052                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3053 {
3054   struct ClientContext *cli_ctx = cls;
3055   uint64_t num_updates;
3056
3057   num_updates = ntohl (msg->num_updates);
3058
3059   LOG (GNUNET_ERROR_TYPE_DEBUG,
3060        "Client requested %" PRIu64 " updates of view.\n",
3061        num_updates);
3062
3063   GNUNET_assert (NULL != cli_ctx);
3064   cli_ctx->view_updates_left = num_updates;
3065   send_view (cli_ctx, NULL, 0);
3066   GNUNET_SERVICE_client_continue (cli_ctx->client);
3067 }
3068
3069 /**
3070  * Handle a CHECK_LIVE message from another peer.
3071  *
3072  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3073  * the channel is blocked for all other communication.
3074  *
3075  * @param cls Closure
3076  * @param msg The message header
3077  */
3078 static void
3079 handle_peer_check (void *cls,
3080                    const struct GNUNET_MessageHeader *msg)
3081 {
3082   const struct GNUNET_PeerIdentity *peer = cls;
3083
3084   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3085 }
3086
3087 /**
3088  * Handle a PUSH message from another peer.
3089  *
3090  * Check the proof of work and store the PeerID
3091  * in the temporary list for pushed PeerIDs.
3092  *
3093  * @param cls Closure
3094  * @param msg The message header
3095  */
3096 static void
3097 handle_peer_push (void *cls,
3098                   const struct GNUNET_MessageHeader *msg)
3099 {
3100   const struct GNUNET_PeerIdentity *peer = cls;
3101
3102   // (check the proof of work (?))
3103
3104   LOG (GNUNET_ERROR_TYPE_DEBUG,
3105        "Received PUSH (%s)\n",
3106        GNUNET_i2s (peer));
3107   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3108
3109   #ifdef ENABLE_MALICIOUS
3110   struct AttackedPeer *tmp_att_peer;
3111
3112   if ( (1 == mal_type) ||
3113        (3 == mal_type) )
3114   { /* Try to maximise representation */
3115     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3116     tmp_att_peer->peer_id = *peer;
3117     if (NULL == att_peer_set)
3118       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3119     if (GNUNET_NO ==
3120         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3121                                                 peer))
3122     {
3123       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3124                                    att_peers_tail,
3125                                    tmp_att_peer);
3126       add_peer_array_to_set (peer, 1, att_peer_set);
3127     }
3128   }
3129
3130
3131   else if (2 == mal_type)
3132   {
3133     /* We attack one single well-known peer - simply ignore */
3134   }
3135   #endif /* ENABLE_MALICIOUS */
3136
3137   /* Add the sending peer to the push_map */
3138   CustomPeerMap_put (push_map, peer);
3139
3140   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3141 }
3142
3143
3144 /**
3145  * Handle PULL REQUEST request message from another peer.
3146  *
3147  * Reply with the view of PeerIDs.
3148  *
3149  * @param cls Closure
3150  * @param msg The message header
3151  */
3152 static void
3153 handle_peer_pull_request (void *cls,
3154                           const struct GNUNET_MessageHeader *msg)
3155 {
3156   struct GNUNET_PeerIdentity *peer = cls;
3157   const struct GNUNET_PeerIdentity *view_array;
3158
3159   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3160   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3161
3162   #ifdef ENABLE_MALICIOUS
3163   if (1 == mal_type
3164       || 3 == mal_type)
3165   { /* Try to maximise representation */
3166     send_pull_reply (peer, mal_peers, num_mal_peers);
3167   }
3168
3169   else if (2 == mal_type)
3170   { /* Try to partition network */
3171     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3172     {
3173       send_pull_reply (peer, mal_peers, num_mal_peers);
3174     }
3175   }
3176   #endif /* ENABLE_MALICIOUS */
3177
3178   GNUNET_break_op (Peers_check_peer_known (peer));
3179   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3180   view_array = View_get_as_array ();
3181   send_pull_reply (peer, view_array, View_size ());
3182 }
3183
3184
3185 /**
3186  * Check whether we sent a corresponding request and
3187  * whether this reply is the first one.
3188  *
3189  * @param cls Closure
3190  * @param msg The message header
3191  */
3192 static int
3193 check_peer_pull_reply (void *cls,
3194                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3195 {
3196   struct GNUNET_PeerIdentity *sender = cls;
3197
3198   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3199   {
3200     GNUNET_break_op (0);
3201     return GNUNET_SYSERR;
3202   }
3203
3204   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3205       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3206   {
3207     LOG (GNUNET_ERROR_TYPE_ERROR,
3208         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3209         ntohl (msg->num_peers),
3210         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3211             sizeof (struct GNUNET_PeerIdentity));
3212     GNUNET_break_op (0);
3213     return GNUNET_SYSERR;
3214   }
3215
3216   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3217   {
3218     LOG (GNUNET_ERROR_TYPE_WARNING,
3219         "Received a pull reply from a peer we didn't request one from!\n");
3220     GNUNET_break_op (0);
3221     return GNUNET_SYSERR;
3222   }
3223   return GNUNET_OK;
3224 }
3225
3226 /**
3227  * Handle PULL REPLY message from another peer.
3228  *
3229  * @param cls Closure
3230  * @param msg The message header
3231  */
3232 static void
3233 handle_peer_pull_reply (void *cls,
3234                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3235 {
3236   const struct GNUNET_PeerIdentity *peers;
3237   struct GNUNET_PeerIdentity *sender = cls;
3238   uint32_t i;
3239 #ifdef ENABLE_MALICIOUS
3240   struct AttackedPeer *tmp_att_peer;
3241 #endif /* ENABLE_MALICIOUS */
3242
3243   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3244   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3245
3246   #ifdef ENABLE_MALICIOUS
3247   // We shouldn't even receive pull replies as we're not sending
3248   if (2 == mal_type)
3249   {
3250   }
3251   #endif /* ENABLE_MALICIOUS */
3252
3253   /* Do actual logic */
3254   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3255
3256   LOG (GNUNET_ERROR_TYPE_DEBUG,
3257        "PULL REPLY received, got following %u peers:\n",
3258        ntohl (msg->num_peers));
3259
3260   for (i = 0; i < ntohl (msg->num_peers); i++)
3261   {
3262     LOG (GNUNET_ERROR_TYPE_DEBUG,
3263          "%u. %s\n",
3264          i,
3265          GNUNET_i2s (&peers[i]));
3266
3267     #ifdef ENABLE_MALICIOUS
3268     if ((NULL != att_peer_set) &&
3269         (1 == mal_type || 3 == mal_type))
3270     { /* Add attacked peer to local list */
3271       // TODO check if we sent a request and this was the first reply
3272       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3273                                                                &peers[i])
3274           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3275                                                                   &peers[i])
3276           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3277                                                    &own_identity))
3278       {
3279         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3280         tmp_att_peer->peer_id = peers[i];
3281         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3282                                      att_peers_tail,
3283                                      tmp_att_peer);
3284         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3285       }
3286       continue;
3287     }
3288     #endif /* ENABLE_MALICIOUS */
3289     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3290                                               &peers[i]))
3291     {
3292       /* Make sure we 'know' about this peer */
3293       (void) Peers_insert_peer (&peers[i]);
3294
3295       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3296       {
3297         CustomPeerMap_put (pull_map, &peers[i]);
3298       }
3299       else
3300       {
3301         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3302         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3303       }
3304     }
3305   }
3306
3307   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3308   clean_peer (sender);
3309
3310   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3311 }
3312
3313
3314 /**
3315  * Compute a random delay.
3316  * A uniformly distributed value between mean + spread and mean - spread.
3317  *
3318  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3319  * It would return a random value between 2 and 6 min.
3320  *
3321  * @param mean the mean
3322  * @param spread the inverse amount of deviation from the mean
3323  */
3324 static struct GNUNET_TIME_Relative
3325 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3326                     unsigned int spread)
3327 {
3328   struct GNUNET_TIME_Relative half_interval;
3329   struct GNUNET_TIME_Relative ret;
3330   unsigned int rand_delay;
3331   unsigned int max_rand_delay;
3332
3333   if (0 == spread)
3334   {
3335     LOG (GNUNET_ERROR_TYPE_WARNING,
3336          "Not accepting spread of 0\n");
3337     GNUNET_break (0);
3338     GNUNET_assert (0);
3339   }
3340   GNUNET_assert (0 != mean.rel_value_us);
3341
3342   /* Compute random time value between spread * mean and spread * mean */
3343   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3344
3345   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3346   /**
3347    * Compute random value between (0 and 1) * round_interval
3348    * via multiplying round_interval with a 'fraction' (0 to value)/value
3349    */
3350   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3351   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3352   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3353   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3354
3355   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3356     LOG (GNUNET_ERROR_TYPE_WARNING,
3357          "Returning FOREVER_REL\n");
3358
3359   return ret;
3360 }
3361
3362
3363 /**
3364  * Send single pull request
3365  *
3366  * @param peer_id the peer to send the pull request to.
3367  */
3368 static void
3369 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3370 {
3371   struct GNUNET_MQ_Envelope *ev;
3372
3373   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3374                                                      Peers_PULL_REPLY_PENDING));
3375   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3376
3377   LOG (GNUNET_ERROR_TYPE_DEBUG,
3378        "Going to send PULL REQUEST to peer %s.\n",
3379        GNUNET_i2s (peer));
3380
3381   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3382   Peers_send_message (peer, ev, "PULL REQUEST");
3383   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3384 }
3385
3386
3387 /**
3388  * Send single push
3389  *
3390  * @param peer_id the peer to send the push to.
3391  */
3392 static void
3393 send_push (const struct GNUNET_PeerIdentity *peer_id)
3394 {
3395   struct GNUNET_MQ_Envelope *ev;
3396
3397   LOG (GNUNET_ERROR_TYPE_DEBUG,
3398        "Going to send PUSH to peer %s.\n",
3399        GNUNET_i2s (peer_id));
3400
3401   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3402   Peers_send_message (peer_id, ev, "PUSH");
3403   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3404 }
3405
3406
3407 static void
3408 do_round (void *cls);
3409
3410 static void
3411 do_mal_round (void *cls);
3412
3413 #ifdef ENABLE_MALICIOUS
3414
3415
3416 /**
3417  * @brief This function is called, when the client tells us to act malicious.
3418  * It verifies that @a msg is well-formed.
3419  *
3420  * @param cls the closure (#ClientContext)
3421  * @param msg the message
3422  * @return #GNUNET_OK if @a msg is well-formed
3423  */
3424 static int
3425 check_client_act_malicious (void *cls,
3426                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3427 {
3428   struct ClientContext *cli_ctx = cls;
3429   uint16_t msize = ntohs (msg->header.size);
3430   uint32_t num_peers = ntohl (msg->num_peers);
3431
3432   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3433   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3434        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3435   {
3436     LOG (GNUNET_ERROR_TYPE_ERROR,
3437         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3438         ntohl (msg->num_peers),
3439         (msize / sizeof (struct GNUNET_PeerIdentity)));
3440     GNUNET_break (0);
3441     GNUNET_SERVICE_client_drop (cli_ctx->client);
3442     return GNUNET_SYSERR;
3443   }
3444   return GNUNET_OK;
3445 }
3446
3447 /**
3448  * Turn RPS service to act malicious.
3449  *
3450  * @param cls Closure
3451  * @param client The client that sent the message
3452  * @param msg The message header
3453  */
3454 static void
3455 handle_client_act_malicious (void *cls,
3456                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3457 {
3458   struct ClientContext *cli_ctx = cls;
3459   struct GNUNET_PeerIdentity *peers;
3460   uint32_t num_mal_peers_sent;
3461   uint32_t num_mal_peers_old;
3462
3463   /* Do actual logic */
3464   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3465   mal_type = ntohl (msg->type);
3466   if (NULL == mal_peer_set)
3467     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3468
3469   LOG (GNUNET_ERROR_TYPE_DEBUG,
3470        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3471        mal_type,
3472        ntohl (msg->num_peers));
3473
3474   if (1 == mal_type)
3475   { /* Try to maximise representation */
3476     /* Add other malicious peers to those we already know */
3477
3478     num_mal_peers_sent = ntohl (msg->num_peers);
3479     num_mal_peers_old = num_mal_peers;
3480     GNUNET_array_grow (mal_peers,
3481                        num_mal_peers,
3482                        num_mal_peers + num_mal_peers_sent);
3483     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3484             peers,
3485             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3486
3487     /* Add all mal peers to mal_peer_set */
3488     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3489                            num_mal_peers_sent,
3490                            mal_peer_set);
3491
3492     /* Substitute do_round () with do_mal_round () */
3493     GNUNET_SCHEDULER_cancel (do_round_task);
3494     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3495   }
3496
3497   else if ( (2 == mal_type) ||
3498             (3 == mal_type) )
3499   { /* Try to partition the network */
3500     /* Add other malicious peers to those we already know */
3501
3502     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3503     num_mal_peers_old = num_mal_peers;
3504     GNUNET_array_grow (mal_peers,
3505                        num_mal_peers,
3506                        num_mal_peers + num_mal_peers_sent);
3507     if (NULL != mal_peers &&
3508         0 != num_mal_peers)
3509     {
3510       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3511               peers,
3512               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3513
3514       /* Add all mal peers to mal_peer_set */
3515       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3516                              num_mal_peers_sent,
3517                              mal_peer_set);
3518     }
3519
3520     /* Store the one attacked peer */
3521     GNUNET_memcpy (&attacked_peer,
3522             &msg->attacked_peer,
3523             sizeof (struct GNUNET_PeerIdentity));
3524     /* Set the flag of the attacked peer to valid to avoid problems */
3525     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3526     {
3527       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3528     }
3529
3530     LOG (GNUNET_ERROR_TYPE_DEBUG,
3531          "Attacked peer is %s\n",
3532          GNUNET_i2s (&attacked_peer));
3533
3534     /* Substitute do_round () with do_mal_round () */
3535     GNUNET_SCHEDULER_cancel (do_round_task);
3536     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3537   }
3538   else if (0 == mal_type)
3539   { /* Stop acting malicious */
3540     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3541
3542     /* Substitute do_mal_round () with do_round () */
3543     GNUNET_SCHEDULER_cancel (do_round_task);
3544     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3545   }
3546   else
3547   {
3548     GNUNET_break (0);
3549     GNUNET_SERVICE_client_continue (cli_ctx->client);
3550   }
3551   GNUNET_SERVICE_client_continue (cli_ctx->client);
3552 }
3553
3554
3555 /**
3556  * Send out PUSHes and PULLs maliciously.
3557  *
3558  * This is executed regylary.
3559  */
3560 static void
3561 do_mal_round (void *cls)
3562 {
3563   uint32_t num_pushes;
3564   uint32_t i;
3565   struct GNUNET_TIME_Relative time_next_round;
3566   struct AttackedPeer *tmp_att_peer;
3567
3568   LOG (GNUNET_ERROR_TYPE_DEBUG,
3569        "Going to execute next round maliciously type %" PRIu32 ".\n",
3570       mal_type);
3571   do_round_task = NULL;
3572   GNUNET_assert (mal_type <= 3);
3573   /* Do malicious actions */
3574   if (1 == mal_type)
3575   { /* Try to maximise representation */
3576
3577     /* The maximum of pushes we're going to send this round */
3578     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3579                                          num_attacked_peers),
3580                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3581
3582     LOG (GNUNET_ERROR_TYPE_DEBUG,
3583          "Going to send %" PRIu32 " pushes\n",
3584          num_pushes);
3585
3586     /* Send PUSHes to attacked peers */
3587     for (i = 0 ; i < num_pushes ; i++)
3588     {
3589       if (att_peers_tail == att_peer_index)
3590         att_peer_index = att_peers_head;
3591       else
3592         att_peer_index = att_peer_index->next;
3593
3594       send_push (&att_peer_index->peer_id);
3595     }
3596
3597     /* Send PULLs to some peers to learn about additional peers to attack */
3598     tmp_att_peer = att_peer_index;
3599     for (i = 0 ; i < num_pushes * alpha ; i++)
3600     {
3601       if (att_peers_tail == tmp_att_peer)
3602         tmp_att_peer = att_peers_head;
3603       else
3604         att_peer_index = tmp_att_peer->next;
3605
3606       send_pull_request (&tmp_att_peer->peer_id);
3607     }
3608   }
3609
3610
3611   else if (2 == mal_type)
3612   { /**
3613      * Try to partition the network
3614      * Send as many pushes to the attacked peer as possible
3615      * That is one push per round as it will ignore more.
3616      */
3617     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3618     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3619       send_push (&attacked_peer);
3620   }
3621
3622
3623   if (3 == mal_type)
3624   { /* Combined attack */
3625
3626     /* Send PUSH to attacked peers */
3627     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3628     {
3629       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3630       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3631       {
3632         LOG (GNUNET_ERROR_TYPE_DEBUG,
3633             "Goding to send push to attacked peer (%s)\n",
3634             GNUNET_i2s (&attacked_peer));
3635         send_push (&attacked_peer);
3636       }
3637     }
3638     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3639
3640     /* The maximum of pushes we're going to send this round */
3641     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3642                                          num_attacked_peers),
3643                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3644
3645     LOG (GNUNET_ERROR_TYPE_DEBUG,
3646          "Going to send %" PRIu32 " pushes\n",
3647          num_pushes);
3648
3649     for (i = 0; i < num_pushes; i++)
3650     {
3651       if (att_peers_tail == att_peer_index)
3652         att_peer_index = att_peers_head;
3653       else
3654         att_peer_index = att_peer_index->next;
3655
3656       send_push (&att_peer_index->peer_id);
3657     }
3658
3659     /* Send PULLs to some peers to learn about additional peers to attack */
3660     tmp_att_peer = att_peer_index;
3661     for (i = 0; i < num_pushes * alpha; i++)
3662     {
3663       if (att_peers_tail == tmp_att_peer)
3664         tmp_att_peer = att_peers_head;
3665       else
3666         att_peer_index = tmp_att_peer->next;
3667
3668       send_pull_request (&tmp_att_peer->peer_id);
3669     }
3670   }
3671
3672   /* Schedule next round */
3673   time_next_round = compute_rand_delay (round_interval, 2);
3674
3675   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3676   //NULL);
3677   GNUNET_assert (NULL == do_round_task);
3678   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3679                                                 &do_mal_round, NULL);
3680   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3681 }
3682 #endif /* ENABLE_MALICIOUS */
3683
3684 /**
3685  * Send out PUSHes and PULLs, possibly update #view, samplers.
3686  *
3687  * This is executed regylary.
3688  */
3689 static void
3690 do_round (void *cls)
3691 {
3692   uint32_t i;
3693   const struct GNUNET_PeerIdentity *view_array;
3694   unsigned int *permut;
3695   unsigned int a_peers; /* Number of peers we send pushes to */
3696   unsigned int b_peers; /* Number of peers we send pull requests to */
3697   uint32_t first_border;
3698   uint32_t second_border;
3699   struct GNUNET_PeerIdentity peer;
3700   struct GNUNET_PeerIdentity *update_peer;
3701
3702   LOG (GNUNET_ERROR_TYPE_DEBUG,
3703        "Going to execute next round.\n");
3704   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3705   do_round_task = NULL;
3706   LOG (GNUNET_ERROR_TYPE_DEBUG,
3707        "Printing view:\n");
3708   to_file (file_name_view_log,
3709            "___ new round ___");
3710   view_array = View_get_as_array ();
3711   for (i = 0; i < View_size (); i++)
3712   {
3713     LOG (GNUNET_ERROR_TYPE_DEBUG,
3714          "\t%s\n", GNUNET_i2s (&view_array[i]));
3715     to_file (file_name_view_log,
3716              "=%s\t(do round)",
3717              GNUNET_i2s_full (&view_array[i]));
3718   }
3719
3720
3721   /* Send pushes and pull requests */
3722   if (0 < View_size ())
3723   {
3724     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3725                                            View_size ());
3726
3727     /* Send PUSHes */
3728     a_peers = ceil (alpha * View_size ());
3729
3730     LOG (GNUNET_ERROR_TYPE_DEBUG,
3731          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3732          a_peers, alpha, View_size ());
3733     for (i = 0; i < a_peers; i++)
3734     {
3735       peer = view_array[permut[i]];
3736       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3737       { // FIXME if this fails schedule/loop this for later
3738         send_push (&peer);
3739       }
3740     }
3741
3742     /* Send PULL requests */
3743     b_peers = ceil (beta * View_size ());
3744     first_border = a_peers;
3745     second_border = a_peers + b_peers;
3746     if (second_border > View_size ())
3747     {
3748       first_border = View_size () - b_peers;
3749       second_border = View_size ();
3750     }
3751     LOG (GNUNET_ERROR_TYPE_DEBUG,
3752         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3753         b_peers, beta, View_size ());
3754     for (i = first_border; i < second_border; i++)
3755     {
3756       peer = view_array[permut[i]];
3757       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3758           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3759       { // FIXME if this fails schedule/loop this for later
3760         send_pull_request (&peer);
3761       }
3762     }
3763
3764     GNUNET_free (permut);
3765     permut = NULL;
3766   }
3767
3768
3769   /* Update view */
3770   /* TODO see how many peers are in push-/pull- list! */
3771
3772   if ((CustomPeerMap_size (push_map) <= alpha * View_size ()) &&
3773       (0 < CustomPeerMap_size (push_map)) &&
3774       (0 < CustomPeerMap_size (pull_map)))
3775   { /* If conditions for update are fulfilled, update */
3776     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3777
3778     uint32_t final_size;
3779     uint32_t peers_to_clean_size;
3780     struct GNUNET_PeerIdentity *peers_to_clean;
3781
3782     peers_to_clean = NULL;
3783     peers_to_clean_size = 0;
3784     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3785     GNUNET_memcpy (peers_to_clean,
3786             view_array,
3787             View_size () * sizeof (struct GNUNET_PeerIdentity));
3788
3789     /* Seems like recreating is the easiest way of emptying the peermap */
3790     View_clear ();
3791     to_file (file_name_view_log,
3792              "--- emptied ---");
3793
3794     first_border  = GNUNET_MIN (ceil (alpha * sampler_size_est_need),
3795                                 CustomPeerMap_size (push_map));
3796     second_border = first_border +
3797                     GNUNET_MIN (floor (beta  * sampler_size_est_need),
3798                                 CustomPeerMap_size (pull_map));
3799     final_size    = second_border +
3800       ceil ((1 - (alpha + beta)) * sampler_size_est_need);
3801
3802     /* Update view with peers received through PUSHes */
3803     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3804                                            CustomPeerMap_size (push_map));
3805     for (i = 0; i < first_border; i++)
3806     {
3807       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3808                                                               permut[i]));
3809       to_file (file_name_view_log,
3810                "+%s\t(push list)",
3811                GNUNET_i2s_full (&view_array[i]));
3812       // TODO change the peer_flags accordingly
3813     }
3814     GNUNET_free (permut);
3815     permut = NULL;
3816
3817     /* Update view with peers received through PULLs */
3818     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3819                                            CustomPeerMap_size (pull_map));
3820     for (i = first_border; i < second_border; i++)
3821     {
3822       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3823             permut[i - first_border]));
3824       to_file (file_name_view_log,
3825                "+%s\t(pull list)",
3826                GNUNET_i2s_full (&view_array[i]));
3827       // TODO change the peer_flags accordingly
3828     }
3829     GNUNET_free (permut);
3830     permut = NULL;
3831
3832     /* Update view with peers from history */
3833     RPS_sampler_get_n_rand_peers (prot_sampler,
3834                                   hist_update,
3835                                   NULL,
3836                                   final_size - second_border);
3837     // TODO change the peer_flags accordingly
3838
3839     for (i = 0; i < View_size (); i++)
3840       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3841
3842     /* Clean peers that were removed from the view */
3843     for (i = 0; i < peers_to_clean_size; i++)
3844     {
3845       to_file (file_name_view_log,
3846                "-%s",
3847                GNUNET_i2s_full (&peers_to_clean[i]));
3848       clean_peer (&peers_to_clean[i]);
3849       //peer_destroy_channel_send (sender);
3850     }
3851
3852     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3853     clients_notify_view_update();
3854   } else {
3855     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3856     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3857     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3858         !(0 >= CustomPeerMap_size (pull_map)))
3859       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3860     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3861         (0 >= CustomPeerMap_size (pull_map)))
3862       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3863     if (0 >= CustomPeerMap_size (push_map) &&
3864         !(0 >= CustomPeerMap_size (pull_map)))
3865       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3866     if (0 >= CustomPeerMap_size (push_map) &&
3867         (0 >= CustomPeerMap_size (pull_map)))
3868       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3869     if (0 >= CustomPeerMap_size (pull_map) &&
3870         CustomPeerMap_size (push_map) > alpha * View_size () &&
3871         0 >= CustomPeerMap_size (push_map))
3872       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3873   }
3874   // TODO independent of that also get some peers from CADET_get_peers()?
3875   GNUNET_STATISTICS_set (stats,
3876       "# peers in push map at end of round",
3877       CustomPeerMap_size (push_map),
3878       GNUNET_NO);
3879   GNUNET_STATISTICS_set (stats,
3880       "# peers in pull map at end of round",
3881       CustomPeerMap_size (pull_map),
3882       GNUNET_NO);
3883   GNUNET_STATISTICS_set (stats,
3884       "# peers in view at end of round",
3885       View_size (),
3886       GNUNET_NO);
3887
3888   LOG (GNUNET_ERROR_TYPE_DEBUG,
3889        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3890        CustomPeerMap_size (push_map),
3891        CustomPeerMap_size (pull_map),
3892        alpha,
3893        View_size (),
3894        alpha * View_size ());
3895
3896   /* Update samplers */
3897   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3898   {
3899     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3900     LOG (GNUNET_ERROR_TYPE_DEBUG,
3901          "Updating with peer %s from push list\n",
3902          GNUNET_i2s (update_peer));
3903     insert_in_sampler (NULL, update_peer);
3904     clean_peer (update_peer); /* This cleans only if it is not in the view */
3905     //peer_destroy_channel_send (sender);
3906   }
3907
3908   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3909   {
3910     LOG (GNUNET_ERROR_TYPE_DEBUG,
3911          "Updating with peer %s from pull list\n",
3912          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3913     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3914     /* This cleans only if it is not in the view */
3915     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3916     //peer_destroy_channel_send (sender);
3917   }
3918
3919
3920   /* Empty push/pull lists */
3921   CustomPeerMap_clear (push_map);
3922   CustomPeerMap_clear (pull_map);
3923
3924   struct GNUNET_TIME_Relative time_next_round;
3925
3926   time_next_round = compute_rand_delay (round_interval, 2);
3927
3928   /* Schedule next round */
3929   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3930                                                 &do_round, NULL);
3931   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3932 }
3933
3934
3935 /**
3936  * This is called from GNUNET_CADET_get_peers().
3937  *
3938  * It is called on every peer(ID) that cadet somehow has contact with.
3939  * We use those to initialise the sampler.
3940  */
3941 void
3942 init_peer_cb (void *cls,
3943               const struct GNUNET_PeerIdentity *peer,
3944               int tunnel, // "Do we have a tunnel towards this peer?"
3945               unsigned int n_paths, // "Number of known paths towards this peer"
3946               unsigned int best_path) // "How long is the best path?
3947                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3948 {
3949   if (NULL != peer)
3950   {
3951     LOG (GNUNET_ERROR_TYPE_DEBUG,
3952          "Got peer_id %s from cadet\n",
3953          GNUNET_i2s (peer));
3954     got_peer (peer);
3955   }
3956 }
3957
3958 /**
3959  * @brief Iterator function over stored, valid peers.
3960  *
3961  * We initialise the sampler with those.
3962  *
3963  * @param cls the closure
3964  * @param peer the peer id
3965  * @return #GNUNET_YES if we should continue to
3966  *         iterate,
3967  *         #GNUNET_NO if not.
3968  */
3969 static int
3970 valid_peers_iterator (void *cls,
3971                       const struct GNUNET_PeerIdentity *peer)
3972 {
3973   if (NULL != peer)
3974   {
3975     LOG (GNUNET_ERROR_TYPE_DEBUG,
3976          "Got stored, valid peer %s\n",
3977          GNUNET_i2s (peer));
3978     got_peer (peer);
3979   }
3980   return GNUNET_YES;
3981 }
3982
3983
3984 /**
3985  * Iterator over peers from peerinfo.
3986  *
3987  * @param cls closure
3988  * @param peer id of the peer, NULL for last call
3989  * @param hello hello message for the peer (can be NULL)
3990  * @param error message
3991  */
3992 void
3993 process_peerinfo_peers (void *cls,
3994                         const struct GNUNET_PeerIdentity *peer,
3995                         const struct GNUNET_HELLO_Message *hello,
3996                         const char *err_msg)
3997 {
3998   if (NULL != peer)
3999   {
4000     LOG (GNUNET_ERROR_TYPE_DEBUG,
4001          "Got peer_id %s from peerinfo\n",
4002          GNUNET_i2s (peer));
4003     got_peer (peer);
4004   }
4005 }
4006
4007
4008 /**
4009  * Task run during shutdown.
4010  *
4011  * @param cls unused
4012  */
4013 static void
4014 shutdown_task (void *cls)
4015 {
4016   struct ClientContext *client_ctx;
4017   struct ReplyCls *reply_cls;
4018
4019   LOG (GNUNET_ERROR_TYPE_DEBUG,
4020        "RPS is going down\n");
4021
4022   /* Clean all clients */
4023   for (client_ctx = cli_ctx_head;
4024        NULL != cli_ctx_head;
4025        client_ctx = cli_ctx_head)
4026   {
4027     /* Clean pending requests to the sampler */
4028     for (reply_cls = client_ctx->rep_cls_head;
4029          NULL != client_ctx->rep_cls_head;
4030          reply_cls = client_ctx->rep_cls_head)
4031     {
4032       RPS_sampler_request_cancel (reply_cls->req_handle);
4033       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4034                                    client_ctx->rep_cls_tail,
4035                                    reply_cls);
4036       GNUNET_free (reply_cls);
4037     }
4038     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4039     GNUNET_free (client_ctx);
4040   }
4041   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4042   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4043
4044   if (NULL != do_round_task)
4045   {
4046     GNUNET_SCHEDULER_cancel (do_round_task);
4047     do_round_task = NULL;
4048   }
4049
4050   Peers_terminate ();
4051
4052   GNUNET_NSE_disconnect (nse);
4053   RPS_sampler_destroy (prot_sampler);
4054   RPS_sampler_destroy (client_sampler);
4055   GNUNET_CADET_close_port (cadet_port);
4056   GNUNET_CADET_disconnect (cadet_handle);
4057   View_destroy ();
4058   CustomPeerMap_destroy (push_map);
4059   CustomPeerMap_destroy (pull_map);
4060   if (NULL != stats)
4061   {
4062     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4063     stats = NULL;
4064   }
4065   #ifdef ENABLE_MALICIOUS
4066   struct AttackedPeer *tmp_att_peer;
4067   GNUNET_free (file_name_view_log);
4068   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4069   if (NULL != mal_peer_set)
4070     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4071   if (NULL != att_peer_set)
4072     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4073   while (NULL != att_peers_head)
4074   {
4075     tmp_att_peer = att_peers_head;
4076     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4077   }
4078   #endif /* ENABLE_MALICIOUS */
4079 }
4080
4081
4082 /**
4083  * Handle client connecting to the service.
4084  *
4085  * @param cls NULL
4086  * @param client the new client
4087  * @param mq the message queue of @a client
4088  * @return @a client
4089  */
4090 static void *
4091 client_connect_cb (void *cls,
4092                    struct GNUNET_SERVICE_Client *client,
4093                    struct GNUNET_MQ_Handle *mq)
4094 {
4095   struct ClientContext *cli_ctx;
4096
4097   LOG (GNUNET_ERROR_TYPE_DEBUG,
4098        "Client connected\n");
4099   if (NULL == client)
4100     return client; /* Server was destroyed before a client connected. Shutting down */
4101   cli_ctx = GNUNET_new (struct ClientContext);
4102   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4103   cli_ctx->view_updates_left = -1;
4104   cli_ctx->client = client;
4105   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4106                                cli_ctx_tail,
4107                                cli_ctx);
4108   return cli_ctx;
4109 }
4110
4111 /**
4112  * Callback called when a client disconnected from the service
4113  *
4114  * @param cls closure for the service
4115  * @param c the client that disconnected
4116  * @param internal_cls should be equal to @a c
4117  */
4118 static void
4119 client_disconnect_cb (void *cls,
4120                       struct GNUNET_SERVICE_Client *client,
4121                       void *internal_cls)
4122 {
4123   struct ClientContext *cli_ctx = internal_cls;
4124
4125   GNUNET_assert (client == cli_ctx->client);
4126   if (NULL == client)
4127   {/* shutdown task - destroy all clients */
4128     while (NULL != cli_ctx_head)
4129       destroy_cli_ctx (cli_ctx_head);
4130   }
4131   else
4132   { /* destroy this client */
4133     LOG (GNUNET_ERROR_TYPE_DEBUG,
4134         "Client disconnected. Destroy its context.\n");
4135     destroy_cli_ctx (cli_ctx);
4136   }
4137 }
4138
4139
4140 /**
4141  * Handle random peer sampling clients.
4142  *
4143  * @param cls closure
4144  * @param c configuration to use
4145  * @param service the initialized service
4146  */
4147 static void
4148 run (void *cls,
4149      const struct GNUNET_CONFIGURATION_Handle *c,
4150      struct GNUNET_SERVICE_Handle *service)
4151 {
4152   int size;
4153   int out_size;
4154   char* fn_valid_peers;
4155   struct GNUNET_HashCode port;
4156
4157   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4158   cfg = c;
4159
4160
4161   /* Get own ID */
4162   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4163   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4164               "STARTING SERVICE (rps) for peer [%s]\n",
4165               GNUNET_i2s (&own_identity));
4166   #ifdef ENABLE_MALICIOUS
4167   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4168               "Malicious execution compiled in.\n");
4169   #endif /* ENABLE_MALICIOUS */
4170
4171
4172
4173   /* Get time interval from the configuration */
4174   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4175                                                         "ROUNDINTERVAL",
4176                                                         &round_interval))
4177   {
4178     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4179                                "RPS", "ROUNDINTERVAL");
4180     GNUNET_SCHEDULER_shutdown ();
4181     return;
4182   }
4183
4184   /* Get initial size of sampler/view from the configuration */
4185   if (GNUNET_OK !=
4186       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "INITSIZE",
4187         (long long unsigned int *) &sampler_size_est_need))
4188   {
4189     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4190                                "RPS", "INITSIZE");
4191     GNUNET_SCHEDULER_shutdown ();
4192     return;
4193   }
4194   LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %u\n", sampler_size_est_need);
4195
4196   if (GNUNET_OK !=
4197       GNUNET_CONFIGURATION_get_value_filename (cfg,
4198                                                "rps",
4199                                                "FILENAME_VALID_PEERS",
4200                                                &fn_valid_peers))
4201   {
4202     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4203                                "rps", "FILENAME_VALID_PEERS");
4204   }
4205
4206
4207   View_create (4);
4208
4209   /* file_name_view_log */
4210   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
4211   {
4212     LOG (GNUNET_ERROR_TYPE_WARNING,
4213          "Failed to create directory /tmp/rps/\n");
4214   }
4215
4216   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
4217   file_name_view_log = GNUNET_malloc (size);
4218   out_size = GNUNET_snprintf (file_name_view_log,
4219                               size,
4220                               "/tmp/rps/view-%s",
4221                               GNUNET_i2s_full (&own_identity));
4222   if (size < out_size ||
4223       0 > out_size)
4224   {
4225     LOG (GNUNET_ERROR_TYPE_WARNING,
4226          "Failed to write string to buffer (size: %i, out_size: %i)\n",
4227          size,
4228          out_size);
4229   }
4230
4231
4232   /* connect to NSE */
4233   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4234
4235
4236   alpha = 0.45;
4237   beta  = 0.45;
4238
4239
4240   /* Initialise cadet */
4241   /* There exists a copy-paste-clone in get_channel() */
4242   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4243     GNUNET_MQ_hd_fixed_size (peer_check,
4244                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4245                              struct GNUNET_MessageHeader,
4246                              NULL),
4247     GNUNET_MQ_hd_fixed_size (peer_push,
4248                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4249                              struct GNUNET_MessageHeader,
4250                              NULL),
4251     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4252                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4253                              struct GNUNET_MessageHeader,
4254                              NULL),
4255     GNUNET_MQ_hd_var_size (peer_pull_reply,
4256                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4257                            struct GNUNET_RPS_P2P_PullReplyMessage,
4258                            NULL),
4259     GNUNET_MQ_handler_end ()
4260   };
4261
4262   cadet_handle = GNUNET_CADET_connect (cfg);
4263   GNUNET_assert (NULL != cadet_handle);
4264   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4265                       strlen (GNUNET_APPLICATION_PORT_RPS),
4266                       &port);
4267   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4268                                        &port,
4269                                        &Peers_handle_inbound_channel, /* Connect handler */
4270                                        NULL, /* cls */
4271                                        NULL, /* WindowSize handler */
4272                                        cleanup_destroyed_channel, /* Disconnect handler */
4273                                        cadet_handlers);
4274
4275
4276   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4277   Peers_initialise (fn_valid_peers, cadet_handle, cleanup_destroyed_channel,
4278                     &own_identity);
4279   GNUNET_free (fn_valid_peers);
4280
4281   /* Initialise sampler */
4282   struct GNUNET_TIME_Relative half_round_interval;
4283   struct GNUNET_TIME_Relative  max_round_interval;
4284
4285   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4286   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4287
4288   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4289   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4290
4291   /* Initialise push and pull maps */
4292   push_map = CustomPeerMap_create (4);
4293   pull_map = CustomPeerMap_create (4);
4294
4295
4296   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4297   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4298   // TODO send push/pull to each of those peers?
4299   // TODO read stored valid peers from last run
4300   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4301   Peers_get_valid_peers (valid_peers_iterator, NULL);
4302
4303   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4304                                                    GNUNET_NO,
4305                                                    process_peerinfo_peers,
4306                                                    NULL);
4307
4308   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4309
4310   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4311   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4312
4313   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4314   stats = GNUNET_STATISTICS_create ("rps", cfg);
4315
4316 }
4317
4318
4319 /**
4320  * Define "main" method using service macro.
4321  */
4322 GNUNET_SERVICE_MAIN
4323 ("rps",
4324  GNUNET_SERVICE_OPTION_NONE,
4325  &run,
4326  &client_connect_cb,
4327  &client_disconnect_cb,
4328  NULL,
4329  GNUNET_MQ_hd_fixed_size (client_request,
4330    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4331    struct GNUNET_RPS_CS_RequestMessage,
4332    NULL),
4333  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4334    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4335    struct GNUNET_RPS_CS_RequestCancelMessage,
4336    NULL),
4337  GNUNET_MQ_hd_var_size (client_seed,
4338    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4339    struct GNUNET_RPS_CS_SeedMessage,
4340    NULL),
4341 #ifdef ENABLE_MALICIOUS
4342  GNUNET_MQ_hd_var_size (client_act_malicious,
4343    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4344    struct GNUNET_RPS_CS_ActMaliciousMessage,
4345    NULL),
4346 #endif /* ENABLE_MALICIOUS */
4347  GNUNET_MQ_hd_fixed_size (client_view_request,
4348    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4349    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4350    NULL),
4351  GNUNET_MQ_handler_end());
4352
4353 /* end of gnunet-service-rps.c */