rps service fix: send the updated view to client
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file rps/gnunet-service-rps.c
23  * @brief rps service implementation
24  * @author Julius Bünger
25  */
26 #include "platform.h"
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_peerinfo_service.h"
31 #include "gnunet_nse_service.h"
32 #include "gnunet_statistics_service.h"
33 #include "rps.h"
34 #include "rps-test_util.h"
35 #include "gnunet-service-rps_sampler.h"
36 #include "gnunet-service-rps_custommap.h"
37 #include "gnunet-service-rps_view.h"
38
39 #include <math.h>
40 #include <inttypes.h>
41
42 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
43
44 // TODO modify @brief in every file
45
46 // TODO check for overflows
47
48 // TODO align message structs
49
50 // TODO connect to friends
51
52 // TODO store peers somewhere persistent
53
54 // TODO blacklist? (-> mal peer detection on top of brahms)
55
56 // hist_size_init, hist_size_max
57
58 /**
59  * Our configuration.
60  */
61 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62
63 /**
64  * Handle to the statistics service.
65  */
66 static struct GNUNET_STATISTICS_Handle *stats;
67
68 /**
69  * Our own identity.
70  */
71 static struct GNUNET_PeerIdentity own_identity;
72
73
74
75 /***********************************************************************
76  * Old gnunet-service-rps_peers.c
77 ***********************************************************************/
78
79 /**
80  * Set a peer flag of given peer context.
81  */
82 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
83
84 /**
85  * Get peer flag of given peer context.
86  */
87 #define check_peer_flag_set(peer_ctx, mask)\
88   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
89
90 /**
91  * Unset flag of given peer context.
92  */
93 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
94
95 /**
96  * Set a channel flag of given channel context.
97  */
98 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
99
100 /**
101  * Get channel flag of given channel context.
102  */
103 #define check_channel_flag_set(channel_flags, mask)\
104   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105
106 /**
107  * Unset flag of given channel context.
108  */
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
110
111
112
113 /**
114  * Pending operation on peer consisting of callback and closure
115  *
116  * When an operation cannot be executed right now this struct is used to store
117  * the callback and closure for later execution.
118  */
119 struct PeerPendingOp
120 {
121   /**
122    * Callback
123    */
124   PeerOp op;
125
126   /**
127    * Closure
128    */
129   void *op_cls;
130 };
131
132 /**
133  * List containing all messages that are yet to be send
134  *
135  * This is used to keep track of all messages that have not been sent yet. When
136  * a peer is to be removed the pending messages can be removed properly.
137  */
138 struct PendingMessage
139 {
140   /**
141    * DLL next, prev
142    */
143   struct PendingMessage *next;
144   struct PendingMessage *prev;
145
146   /**
147    * The envelope to the corresponding message
148    */
149   struct GNUNET_MQ_Envelope *ev;
150
151   /**
152    * The corresponding context
153    */
154   struct PeerContext *peer_ctx;
155
156   /**
157    * The message type
158    */
159   const char *type;
160 };
161
162 /**
163  * Struct used to keep track of other peer's status
164  *
165  * This is stored in a multipeermap.
166  * It contains information such as cadet channels, a message queue for sending,
167  * status about the channels, the pending operations on this peer and some flags
168  * about the status of the peer itself. (live, valid, ...)
169  */
170 struct PeerContext
171 {
172   /**
173    * Message queue open to client
174    */
175   struct GNUNET_MQ_Handle *mq;
176
177   /**
178    * Channel open to client.
179    */
180   struct GNUNET_CADET_Channel *send_channel;
181
182   /**
183    * Flags to the sending channel
184    */
185   uint32_t *send_channel_flags;
186
187   /**
188    * Channel open from client.
189    */
190   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
191
192   /**
193    * Flags to the receiving channel
194    */
195   uint32_t *recv_channel_flags;
196
197   /**
198    * Array of pending operations on this peer.
199    */
200   struct PeerPendingOp *pending_ops;
201
202   /**
203    * Handle to the callback given to cadet_ntfy_tmt_rdy()
204    *
205    * To be canceled on shutdown.
206    */
207   struct PendingMessage *liveliness_check_pending;
208
209   /**
210    * Number of pending operations.
211    */
212   unsigned int num_pending_ops;
213
214   /**
215    * Identity of the peer
216    */
217   struct GNUNET_PeerIdentity peer_id;
218
219   /**
220    * Flags indicating status of peer
221    */
222   uint32_t peer_flags;
223
224   /**
225    * Last time we received something from that peer.
226    */
227   struct GNUNET_TIME_Absolute last_message_recv;
228
229   /**
230    * Last time we received a keepalive message.
231    */
232   struct GNUNET_TIME_Absolute last_keepalive;
233
234   /**
235    * DLL with all messages that are yet to be sent
236    */
237   struct PendingMessage *pending_messages_head;
238   struct PendingMessage *pending_messages_tail;
239
240   /**
241    * This is pobably followed by 'statistical' data (when we first saw
242    * him, how did we get his ID, how many pushes (in a timeinterval),
243    * ...)
244    */
245 };
246
247 /**
248  * @brief Closure to #valid_peer_iterator
249  */
250 struct PeersIteratorCls
251 {
252   /**
253    * Iterator function
254    */
255   PeersIterator iterator;
256
257   /**
258    * Closure to iterator
259    */
260   void *cls;
261 };
262
263 /**
264  * @brief Hashmap of valid peers.
265  */
266 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
267
268 /**
269  * @brief Maximum number of valid peers to keep.
270  * TODO read from config
271  */
272 static uint32_t num_valid_peers_max = UINT32_MAX;
273
274 /**
275  * @brief Filename of the file that stores the valid peers persistently.
276  */
277 static char *filename_valid_peers;
278
279 /**
280  * Set of all peers to keep track of them.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
283
284 /**
285  * Cadet handle.
286  */
287 static struct GNUNET_CADET_Handle *cadet_handle;
288
289
290
291 /**
292  * @brief Get the #PeerContext associated with a peer
293  *
294  * @param peer the peer id
295  *
296  * @return the #PeerContext
297  */
298 static struct PeerContext *
299 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
300 {
301   struct PeerContext *ctx;
302   int ret;
303
304   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
305   GNUNET_assert (GNUNET_YES == ret);
306   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
307   GNUNET_assert (NULL != ctx);
308   return ctx;
309 }
310
311 int
312 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
313
314 /**
315  * @brief Create a new #PeerContext and insert it into the peer map
316  *
317  * @param peer the peer to create the #PeerContext for
318  *
319  * @return the #PeerContext
320  */
321 static struct PeerContext *
322 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
323 {
324   struct PeerContext *ctx;
325   int ret;
326
327   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
328
329   ctx = GNUNET_new (struct PeerContext);
330   ctx->peer_id = *peer;
331   ctx->send_channel_flags = GNUNET_new (uint32_t);
332   ctx->recv_channel_flags = GNUNET_new (uint32_t);
333   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
334       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
335   GNUNET_assert (GNUNET_OK == ret);
336   return ctx;
337 }
338
339
340 /**
341  * @brief Create or get a #PeerContext
342  *
343  * @param peer the peer to get the associated context to
344  *
345  * @return the context
346  */
347 static struct PeerContext *
348 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
349 {
350   if (GNUNET_NO == Peers_check_peer_known (peer))
351   {
352     return create_peer_ctx (peer);
353   }
354   return get_peer_ctx (peer);
355 }
356
357 void
358 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
359
360 void
361 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
362
363 /**
364  * @brief Check whether we have a connection to this @a peer
365  *
366  * Also sets the #Peers_ONLINE flag accordingly
367  *
368  * @param peer the peer in question
369  *
370  * @return #GNUNET_YES if we are connected
371  *         #GNUNET_NO  otherwise
372  */
373 int
374 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
375 {
376   const struct PeerContext *peer_ctx;
377
378   /* If we don't know about this peer we don't know whether it's online */
379   if (GNUNET_NO == Peers_check_peer_known (peer))
380   {
381     return GNUNET_NO;
382   }
383   /* Get the context */
384   peer_ctx = get_peer_ctx (peer);
385   /* If we have no channel to this peer we don't know whether it's online */
386   if ( (NULL == peer_ctx->send_channel) &&
387        (NULL == peer_ctx->recv_channel) )
388   {
389     Peers_unset_peer_flag (peer, Peers_ONLINE);
390     return GNUNET_NO;
391   }
392   /* Otherwise (if we have a channel, we know that it's online */
393   Peers_set_peer_flag (peer, Peers_ONLINE);
394   return GNUNET_YES;
395 }
396
397
398 /**
399  * @brief The closure to #get_rand_peer_iterator.
400  */
401 struct GetRandPeerIteratorCls
402 {
403   /**
404    * @brief The index of the peer to return.
405    * Will be decreased until 0.
406    * Then current peer is returned.
407    */
408   uint32_t index;
409
410   /**
411    * @brief Pointer to peer to return.
412    */
413   const struct GNUNET_PeerIdentity *peer;
414 };
415
416
417 /**
418  * @brief Iterator function for #get_random_peer_from_peermap.
419  *
420  * Implements #GNUNET_CONTAINER_PeerMapIterator.
421  * Decreases the index until the index is null.
422  * Then returns the current peer.
423  *
424  * @param cls the #GetRandPeerIteratorCls containing index and peer
425  * @param peer current peer
426  * @param value unused
427  *
428  * @return  #GNUNET_YES if we should continue to
429  *          iterate,
430  *          #GNUNET_NO if not.
431  */
432 static int
433 get_rand_peer_iterator (void *cls,
434                         const struct GNUNET_PeerIdentity *peer,
435                         void *value)
436 {
437   struct GetRandPeerIteratorCls *iterator_cls = cls;
438   if (0 >= iterator_cls->index)
439   {
440     iterator_cls->peer = peer;
441     return GNUNET_NO;
442   }
443   iterator_cls->index--;
444   return GNUNET_YES;
445 }
446
447
448 /**
449  * @brief Get a random peer from @a peer_map
450  *
451  * @param peer_map the peer_map to get the peer from
452  *
453  * @return a random peer
454  */
455 static const struct GNUNET_PeerIdentity *
456 get_random_peer_from_peermap (const struct
457                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
458 {
459   struct GetRandPeerIteratorCls *iterator_cls;
460   const struct GNUNET_PeerIdentity *ret;
461
462   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
463   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
464       GNUNET_CONTAINER_multipeermap_size (peer_map));
465   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
466                                                 get_rand_peer_iterator,
467                                                 iterator_cls);
468   ret = iterator_cls->peer;
469   GNUNET_free (iterator_cls);
470   return ret;
471 }
472
473
474 /**
475  * @brief Add a given @a peer to valid peers.
476  *
477  * If valid peers are already #num_valid_peers_max, delete a peer previously.
478  *
479  * @param peer the peer that is added to the valid peers.
480  *
481  * @return #GNUNET_YES if no other peer had to be removed
482  *         #GNUNET_NO  otherwise
483  */
484 static int
485 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
486 {
487   const struct GNUNET_PeerIdentity *rand_peer;
488   int ret;
489
490   ret = GNUNET_YES;
491   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
492   {
493     rand_peer = get_random_peer_from_peermap (valid_peers);
494     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
495     ret = GNUNET_NO;
496   }
497   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
498       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
499   return ret;
500 }
501
502
503 /**
504  * @brief Set the peer flag to living and
505  *        call the pending operations on this peer.
506  *
507  * Also adds peer to #valid_peers.
508  *
509  * @param peer_ctx the #PeerContext of the peer to set live
510  */
511 static void
512 set_peer_live (struct PeerContext *peer_ctx)
513 {
514   struct GNUNET_PeerIdentity *peer;
515   unsigned int i;
516
517   peer = &peer_ctx->peer_id;
518   LOG (GNUNET_ERROR_TYPE_DEBUG,
519       "Peer %s is live and valid, calling %i pending operations on it\n",
520       GNUNET_i2s (peer),
521       peer_ctx->num_pending_ops);
522
523   if (NULL != peer_ctx->liveliness_check_pending)
524   {
525     LOG (GNUNET_ERROR_TYPE_DEBUG,
526          "Removing pending liveliness check for peer %s\n",
527          GNUNET_i2s (&peer_ctx->peer_id));
528     // TODO wait until cadet sets mq->cancel_impl
529     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
530     GNUNET_free (peer_ctx->liveliness_check_pending);
531     peer_ctx->liveliness_check_pending = NULL;
532   }
533
534   (void) add_valid_peer (peer);
535   set_peer_flag (peer_ctx, Peers_ONLINE);
536
537   /* Call pending operations */
538   for (i = 0; i < peer_ctx->num_pending_ops; i++)
539   {
540     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
541   }
542   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
543 }
544
545 static void
546 cleanup_destroyed_channel (void *cls,
547                            const struct GNUNET_CADET_Channel *channel);
548
549 /* Declaration of handlers */
550 static void
551 handle_peer_check (void *cls,
552                    const struct GNUNET_MessageHeader *msg);
553
554 static void
555 handle_peer_push (void *cls,
556                   const struct GNUNET_MessageHeader *msg);
557
558 static void
559 handle_peer_pull_request (void *cls,
560                           const struct GNUNET_MessageHeader *msg);
561
562 static int
563 check_peer_pull_reply (void *cls,
564                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
565
566 static void
567 handle_peer_pull_reply (void *cls,
568                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
569
570 /* End declaration of handlers */
571
572
573 /**
574  * @brief Get the channel of a peer. If not existing, create.
575  *
576  * @param peer the peer id
577  * @return the #GNUNET_CADET_Channel used to send data to @a peer
578  */
579 struct GNUNET_CADET_Channel *
580 get_channel (const struct GNUNET_PeerIdentity *peer)
581 {
582   struct PeerContext *peer_ctx;
583   struct GNUNET_HashCode port;
584   struct GNUNET_PeerIdentity *ctx_peer;
585   /* There exists a copy-paste-clone in run() */
586   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
587     GNUNET_MQ_hd_fixed_size (peer_check,
588                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
589                              struct GNUNET_MessageHeader,
590                              NULL),
591     GNUNET_MQ_hd_fixed_size (peer_push,
592                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
593                              struct GNUNET_MessageHeader,
594                              NULL),
595     GNUNET_MQ_hd_fixed_size (peer_pull_request,
596                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
597                              struct GNUNET_MessageHeader,
598                              NULL),
599     GNUNET_MQ_hd_var_size (peer_pull_reply,
600                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
601                            struct GNUNET_RPS_P2P_PullReplyMessage,
602                            NULL),
603     GNUNET_MQ_handler_end ()
604   };
605
606
607   peer_ctx = get_peer_ctx (peer);
608   if (NULL == peer_ctx->send_channel)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG,
611          "Trying to establish channel to peer %s\n",
612          GNUNET_i2s (peer));
613     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
614                         strlen (GNUNET_APPLICATION_PORT_RPS),
615                         &port);
616     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
617     *ctx_peer = *peer;
618     peer_ctx->send_channel =
619       GNUNET_CADET_channel_create (cadet_handle,
620                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
621                                    peer,
622                                    &port,
623                                    GNUNET_CADET_OPTION_RELIABLE,
624                                    NULL, /* WindowSize handler */
625                                    cleanup_destroyed_channel, /* Disconnect handler */
626                                    cadet_handlers);
627   }
628   GNUNET_assert (NULL != peer_ctx->send_channel);
629   return peer_ctx->send_channel;
630 }
631
632
633 /**
634  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
635  *
636  * If we already have a message queue open to this client,
637  * simply return it, otherways create one.
638  *
639  * @param peer the peer to get the mq to
640  * @return the #GNUNET_MQ_Handle
641  */
642 static struct GNUNET_MQ_Handle *
643 get_mq (const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *peer_ctx;
646
647   peer_ctx = get_peer_ctx (peer);
648
649   if (NULL == peer_ctx->mq)
650   {
651     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
652   }
653   return peer_ctx->mq;
654 }
655
656
657 /**
658  * @brief This is called in response to the first message we sent as a
659  * liveliness check.
660  *
661  * @param cls #PeerContext of peer with pending liveliness check
662  */
663 static void
664 mq_liveliness_check_successful (void *cls)
665 {
666   struct PeerContext *peer_ctx = cls;
667
668   if (NULL != peer_ctx->liveliness_check_pending)
669   {
670     LOG (GNUNET_ERROR_TYPE_DEBUG,
671         "Liveliness check for peer %s was successfull\n",
672         GNUNET_i2s (&peer_ctx->peer_id));
673     GNUNET_free (peer_ctx->liveliness_check_pending);
674     peer_ctx->liveliness_check_pending = NULL;
675     set_peer_live (peer_ctx);
676   }
677 }
678
679 /**
680  * Issue a check whether peer is live
681  *
682  * @param peer_ctx the context of the peer
683  */
684 static void
685 check_peer_live (struct PeerContext *peer_ctx)
686 {
687   LOG (GNUNET_ERROR_TYPE_DEBUG,
688        "Get informed about peer %s getting live\n",
689        GNUNET_i2s (&peer_ctx->peer_id));
690
691   struct GNUNET_MQ_Handle *mq;
692   struct GNUNET_MQ_Envelope *ev;
693
694   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
695   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
696   peer_ctx->liveliness_check_pending->ev = ev;
697   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
698   peer_ctx->liveliness_check_pending->type = "Check liveliness";
699   mq = get_mq (&peer_ctx->peer_id);
700   GNUNET_MQ_notify_sent (ev,
701                          mq_liveliness_check_successful,
702                          peer_ctx);
703   GNUNET_MQ_send (mq, ev);
704 }
705
706 /**
707  * @brief Add an envelope to a message passed to mq to list of pending messages
708  *
709  * @param peer peer the message was sent to
710  * @param ev envelope to the message
711  * @param type type of the message to be sent
712  * @return pointer to pending message
713  */
714 static struct PendingMessage *
715 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
716                         struct GNUNET_MQ_Envelope *ev,
717                         const char *type)
718 {
719   struct PendingMessage *pending_msg;
720   struct PeerContext *peer_ctx;
721
722   peer_ctx = get_peer_ctx (peer);
723   pending_msg = GNUNET_new (struct PendingMessage);
724   pending_msg->ev = ev;
725   pending_msg->peer_ctx = peer_ctx;
726   pending_msg->type = type;
727   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
728                                peer_ctx->pending_messages_tail,
729                                pending_msg);
730   return pending_msg;
731 }
732
733
734 /**
735  * @brief Remove a pending message from the respective DLL
736  *
737  * @param pending_msg the pending message to remove
738  * @param cancel cancel the pending message, too
739  */
740 static void
741 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
742 {
743   struct PeerContext *peer_ctx;
744
745   peer_ctx = pending_msg->peer_ctx;
746   GNUNET_assert (NULL != peer_ctx);
747   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
748                                peer_ctx->pending_messages_tail,
749                                pending_msg);
750   // TODO wait for the cadet implementation of message cancellation
751   //if (GNUNET_YES == cancel)
752   //{
753   //  GNUNET_MQ_send_cancel (pending_msg->ev);
754   //}
755   GNUNET_free (pending_msg);
756 }
757
758
759 /**
760  * @brief Check whether function of type #PeerOp was already scheduled
761  *
762  * The array with pending operations will probably never grow really big, so
763  * iterating over it should be ok.
764  *
765  * @param peer the peer to check
766  * @param peer_op the operation (#PeerOp) on the peer
767  *
768  * @return #GNUNET_YES if this operation is scheduled on that peer
769  *         #GNUNET_NO  otherwise
770  */
771 static int
772 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
773                            const PeerOp peer_op)
774 {
775   const struct PeerContext *peer_ctx;
776   unsigned int i;
777
778   peer_ctx = get_peer_ctx (peer);
779   for (i = 0; i < peer_ctx->num_pending_ops; i++)
780     if (peer_op == peer_ctx->pending_ops[i].op)
781       return GNUNET_YES;
782   return GNUNET_NO;
783 }
784
785 int
786 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
787
788 /**
789  * Iterator over hash map entries. Deletes all contexts of peers.
790  *
791  * @param cls closure
792  * @param key current public key
793  * @param value value in the hash map
794  * @return #GNUNET_YES if we should continue to iterate,
795  *         #GNUNET_NO if not.
796  */
797 static int
798 peermap_clear_iterator (void *cls,
799                         const struct GNUNET_PeerIdentity *key,
800                         void *value)
801 {
802   Peers_remove_peer (key);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * @brief This is called once a message is sent.
809  *
810  * Removes the pending message
811  *
812  * @param cls type of the message that was sent
813  */
814 static void
815 mq_notify_sent_cb (void *cls)
816 {
817   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819       "%s was sent.\n",
820       pending_msg->type);
821   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
822     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
823   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
824     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
825   if (0 == strncmp ("PUSH", pending_msg->type, 4))
826     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
827   /* Do not cancle message */
828   remove_pending_message (pending_msg, GNUNET_NO);
829 }
830
831
832 /**
833  * @brief Iterator function for #store_valid_peers.
834  *
835  * Implements #GNUNET_CONTAINER_PeerMapIterator.
836  * Writes single peer to disk.
837  *
838  * @param cls the file handle to write to.
839  * @param peer current peer
840  * @param value unused
841  *
842  * @return  #GNUNET_YES if we should continue to
843  *          iterate,
844  *          #GNUNET_NO if not.
845  */
846 static int
847 store_peer_presistently_iterator (void *cls,
848                                   const struct GNUNET_PeerIdentity *peer,
849                                   void *value)
850 {
851   const struct GNUNET_DISK_FileHandle *fh = cls;
852   char peer_string[128];
853   int size;
854   ssize_t ret;
855
856   if (NULL == peer)
857   {
858     return GNUNET_YES;
859   }
860   size = GNUNET_snprintf (peer_string,
861                           sizeof (peer_string),
862                           "%s\n",
863                           GNUNET_i2s_full (peer));
864   GNUNET_assert (53 == size);
865   ret = GNUNET_DISK_file_write (fh,
866                                 peer_string,
867                                 size);
868   GNUNET_assert (size == ret);
869   return GNUNET_YES;
870 }
871
872
873 /**
874  * @brief Store the peers currently in #valid_peers to disk.
875  */
876 static void
877 store_valid_peers ()
878 {
879   struct GNUNET_DISK_FileHandle *fh;
880   uint32_t number_written_peers;
881   int ret;
882
883   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
884   {
885     return;
886   }
887
888   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
889   if (GNUNET_SYSERR == ret)
890   {
891     LOG (GNUNET_ERROR_TYPE_WARNING,
892         "Not able to create directory for file `%s'\n",
893         filename_valid_peers);
894     GNUNET_break (0);
895   }
896   else if (GNUNET_NO == ret)
897   {
898     LOG (GNUNET_ERROR_TYPE_WARNING,
899         "Directory for file `%s' exists but is not writable for us\n",
900         filename_valid_peers);
901     GNUNET_break (0);
902   }
903   fh = GNUNET_DISK_file_open (filename_valid_peers,
904                               GNUNET_DISK_OPEN_WRITE |
905                                   GNUNET_DISK_OPEN_CREATE,
906                               GNUNET_DISK_PERM_USER_READ |
907                                   GNUNET_DISK_PERM_USER_WRITE);
908   if (NULL == fh)
909   {
910     LOG (GNUNET_ERROR_TYPE_WARNING,
911         "Not able to write valid peers to file `%s'\n",
912         filename_valid_peers);
913     return;
914   }
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916       "Writing %u valid peers to disk\n",
917       GNUNET_CONTAINER_multipeermap_size (valid_peers));
918   number_written_peers =
919     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
920                                            store_peer_presistently_iterator,
921                                            fh);
922   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
923   GNUNET_assert (number_written_peers ==
924       GNUNET_CONTAINER_multipeermap_size (valid_peers));
925 }
926
927
928 /**
929  * @brief Convert string representation of peer id to peer id.
930  *
931  * Counterpart to #GNUNET_i2s_full.
932  *
933  * @param string_repr The string representation of the peer id
934  *
935  * @return The peer id
936  */
937 static const struct GNUNET_PeerIdentity *
938 s2i_full (const char *string_repr)
939 {
940   struct GNUNET_PeerIdentity *peer;
941   size_t len;
942   int ret;
943
944   peer = GNUNET_new (struct GNUNET_PeerIdentity);
945   len = strlen (string_repr);
946   if (52 > len)
947   {
948     LOG (GNUNET_ERROR_TYPE_WARNING,
949         "Not able to convert string representation of PeerID to PeerID\n"
950         "Sting representation: %s (len %lu) - too short\n",
951         string_repr,
952         len);
953     GNUNET_break (0);
954   }
955   else if (52 < len)
956   {
957     len = 52;
958   }
959   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
960                                                     len,
961                                                     &peer->public_key);
962   if (GNUNET_OK != ret)
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965         "Not able to convert string representation of PeerID to PeerID\n"
966         "Sting representation: %s\n",
967         string_repr);
968     GNUNET_break (0);
969   }
970   return peer;
971 }
972
973
974 /**
975  * @brief Restore the peers on disk to #valid_peers.
976  */
977 static void
978 restore_valid_peers ()
979 {
980   off_t file_size;
981   uint32_t num_peers;
982   struct GNUNET_DISK_FileHandle *fh;
983   char *buf;
984   ssize_t size_read;
985   char *iter_buf;
986   char *str_repr;
987   const struct GNUNET_PeerIdentity *peer;
988
989   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
990   {
991     return;
992   }
993
994   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
995   {
996     return;
997   }
998   fh = GNUNET_DISK_file_open (filename_valid_peers,
999                               GNUNET_DISK_OPEN_READ,
1000                               GNUNET_DISK_PERM_NONE);
1001   GNUNET_assert (NULL != fh);
1002   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1003   num_peers = file_size / 53;
1004   buf = GNUNET_malloc (file_size);
1005   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1006   GNUNET_assert (size_read == file_size);
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008       "Restoring %" PRIu32 " peers from file `%s'\n",
1009       num_peers,
1010       filename_valid_peers);
1011   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1012   {
1013     str_repr = GNUNET_strndup (iter_buf, 53);
1014     peer = s2i_full (str_repr);
1015     GNUNET_free (str_repr);
1016     add_valid_peer (peer);
1017     LOG (GNUNET_ERROR_TYPE_DEBUG,
1018         "Restored valid peer %s from disk\n",
1019         GNUNET_i2s_full (peer));
1020   }
1021   iter_buf = NULL;
1022   GNUNET_free (buf);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG,
1024       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1025       num_peers,
1026       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1027   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1028   {
1029     LOG (GNUNET_ERROR_TYPE_WARNING,
1030         "Number of restored peers does not match file size. Have probably duplicates.\n");
1031   }
1032   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1033   LOG (GNUNET_ERROR_TYPE_DEBUG,
1034       "Restored %u valid peers from disk\n",
1035       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1036 }
1037
1038
1039 /**
1040  * @brief Initialise storage of peers
1041  *
1042  * @param fn_valid_peers filename of the file used to store valid peer ids
1043  * @param cadet_h cadet handle
1044  * @param own_id own peer identity
1045  */
1046 void
1047 Peers_initialise (char* fn_valid_peers,
1048                   struct GNUNET_CADET_Handle *cadet_h,
1049                   const struct GNUNET_PeerIdentity *own_id)
1050 {
1051   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1052   cadet_handle = cadet_h;
1053   own_identity = *own_id;
1054   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1055   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1056   restore_valid_peers ();
1057 }
1058
1059
1060 /**
1061  * @brief Delete storage of peers that was created with #Peers_initialise ()
1062  */
1063 void
1064 Peers_terminate ()
1065 {
1066   if (GNUNET_SYSERR ==
1067       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1068                                              peermap_clear_iterator,
1069                                              NULL))
1070   {
1071     LOG (GNUNET_ERROR_TYPE_WARNING,
1072         "Iteration destroying peers was aborted.\n");
1073   }
1074   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1075   peer_map = NULL;
1076   store_valid_peers ();
1077   GNUNET_free (filename_valid_peers);
1078   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1079 }
1080
1081
1082 /**
1083  * Iterator over #valid_peers hash map entries.
1084  *
1085  * @param cls closure - unused
1086  * @param peer current peer id
1087  * @param value value in the hash map - unused
1088  * @return #GNUNET_YES if we should continue to
1089  *         iterate,
1090  *         #GNUNET_NO if not.
1091  */
1092 static int
1093 valid_peer_iterator (void *cls,
1094                      const struct GNUNET_PeerIdentity *peer,
1095                      void *value)
1096 {
1097   struct PeersIteratorCls *it_cls = cls;
1098
1099   return it_cls->iterator (it_cls->cls,
1100                            peer);
1101 }
1102
1103
1104 /**
1105  * @brief Get all currently known, valid peer ids.
1106  *
1107  * @param it function to call on each peer id
1108  * @param it_cls extra argument to @a it
1109  * @return the number of key value pairs processed,
1110  *         #GNUNET_SYSERR if it aborted iteration
1111  */
1112 int
1113 Peers_get_valid_peers (PeersIterator iterator,
1114                        void *it_cls)
1115 {
1116   struct PeersIteratorCls *cls;
1117   int ret;
1118
1119   cls = GNUNET_new (struct PeersIteratorCls);
1120   cls->iterator = iterator;
1121   cls->cls = it_cls;
1122   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1123                                                valid_peer_iterator,
1124                                                cls);
1125   GNUNET_free (cls);
1126   return ret;
1127 }
1128
1129
1130 /**
1131  * @brief Add peer to known peers.
1132  *
1133  * This function is called on new peer_ids from 'external' sources
1134  * (client seed, cadet get_peers(), ...)
1135  *
1136  * @param peer the new #GNUNET_PeerIdentity
1137  *
1138  * @return #GNUNET_YES if peer was inserted
1139  *         #GNUNET_NO  otherwise (if peer was already known or
1140  *                     peer was #own_identity)
1141  */
1142 int
1143 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1144 {
1145   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1146        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1147   {
1148     return GNUNET_NO; /* We already know this peer - nothing to do */
1149   }
1150   (void) create_peer_ctx (peer);
1151   return GNUNET_YES;
1152 }
1153
1154 int
1155 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1156
1157 /**
1158  * @brief Try connecting to a peer to see whether it is online
1159  *
1160  * If not known yet, insert into known peers
1161  *
1162  * @param peer the peer whose liveliness is to be checked
1163  * @return #GNUNET_YES if peer had to be inserted
1164  *         #GNUNET_NO  otherwise (if peer was already known or
1165  *                     peer was #own_identity)
1166  */
1167 int
1168 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1169 {
1170   struct PeerContext *peer_ctx;
1171   int ret;
1172
1173   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1174   {
1175     return GNUNET_NO;
1176   }
1177   ret = Peers_insert_peer (peer);
1178   peer_ctx = get_peer_ctx (peer);
1179   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1180   {
1181     check_peer_live (peer_ctx);
1182   }
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * @brief Check if peer is removable.
1189  *
1190  * Check if
1191  *  - a recv channel exists
1192  *  - there are pending messages
1193  *  - there is no pending pull reply
1194  *
1195  * @param peer the peer in question
1196  * @return #GNUNET_YES    if peer is removable
1197  *         #GNUNET_NO     if peer is NOT removable
1198  *         #GNUNET_SYSERR if peer is not known
1199  */
1200 int
1201 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1202 {
1203   struct PeerContext *peer_ctx;
1204
1205   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1206   {
1207     return GNUNET_SYSERR;
1208   }
1209
1210   peer_ctx = get_peer_ctx (peer);
1211   if ( (NULL != peer_ctx->recv_channel) ||
1212        (NULL != peer_ctx->pending_messages_head) ||
1213        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1214   {
1215     return GNUNET_NO;
1216   }
1217   return GNUNET_YES;
1218 }
1219
1220 uint32_t *
1221 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1222                         enum Peers_ChannelRole role);
1223
1224 int
1225 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1226
1227 /**
1228  * @brief Remove peer
1229  *
1230  * @param peer the peer to clean
1231  * @return #GNUNET_YES if peer was removed
1232  *         #GNUNET_NO  otherwise
1233  */
1234 int
1235 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1236 {
1237   struct PeerContext *peer_ctx;
1238   uint32_t *channel_flag;
1239
1240   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1241   {
1242     return GNUNET_NO;
1243   }
1244
1245   peer_ctx = get_peer_ctx (peer);
1246   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Going to remove peer %s\n",
1249        GNUNET_i2s (&peer_ctx->peer_id));
1250   Peers_unset_peer_flag (peer, Peers_ONLINE);
1251
1252   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1253   while (NULL != peer_ctx->pending_messages_head)
1254   {
1255     LOG (GNUNET_ERROR_TYPE_DEBUG,
1256         "Removing unsent %s\n",
1257         peer_ctx->pending_messages_head->type);
1258     /* Cancle pending message, too */
1259     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1260   }
1261   /* If we are still waiting for notification whether this peer is live
1262    * cancel the according task */
1263   if (NULL != peer_ctx->liveliness_check_pending)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266          "Removing pending liveliness check for peer %s\n",
1267          GNUNET_i2s (&peer_ctx->peer_id));
1268     // TODO wait until cadet sets mq->cancel_impl
1269     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1270     GNUNET_free (peer_ctx->liveliness_check_pending);
1271     peer_ctx->liveliness_check_pending = NULL;
1272   }
1273   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1274   if (NULL != peer_ctx->send_channel &&
1275       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1276   {
1277     LOG (GNUNET_ERROR_TYPE_DEBUG,
1278         "Destroying send channel\n");
1279     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1280     peer_ctx->send_channel = NULL;
1281     peer_ctx->mq = NULL;
1282   }
1283   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1284   if (NULL != peer_ctx->recv_channel &&
1285       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288         "Destroying recv channel\n");
1289     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1290     peer_ctx->recv_channel = NULL;
1291   }
1292
1293   GNUNET_free (peer_ctx->send_channel_flags);
1294   GNUNET_free (peer_ctx->recv_channel_flags);
1295
1296   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1297   {
1298     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1299   }
1300   GNUNET_free (peer_ctx);
1301   return GNUNET_YES;
1302 }
1303
1304
1305 /**
1306  * @brief set flags on a given peer.
1307  *
1308  * @param peer the peer to set flags on
1309  * @param flags the flags
1310  */
1311 void
1312 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1313 {
1314   struct PeerContext *peer_ctx;
1315
1316   peer_ctx = get_peer_ctx (peer);
1317   set_peer_flag (peer_ctx, flags);
1318 }
1319
1320
1321 /**
1322  * @brief unset flags on a given peer.
1323  *
1324  * @param peer the peer to unset flags on
1325  * @param flags the flags
1326  */
1327 void
1328 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1329 {
1330   struct PeerContext *peer_ctx;
1331
1332   peer_ctx = get_peer_ctx (peer);
1333   unset_peer_flag (peer_ctx, flags);
1334 }
1335
1336
1337 /**
1338  * @brief Check whether flags on a peer are set.
1339  *
1340  * @param peer the peer to check the flag of
1341  * @param flags the flags to check
1342  *
1343  * @return #GNUNET_SYSERR if peer is not known
1344  *         #GNUNET_YES    if all given flags are set
1345  *         #GNUNET_NO     otherwise
1346  */
1347 int
1348 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1349 {
1350   struct PeerContext *peer_ctx;
1351
1352   if (GNUNET_NO == Peers_check_peer_known (peer))
1353   {
1354     return GNUNET_SYSERR;
1355   }
1356   peer_ctx = get_peer_ctx (peer);
1357   return check_peer_flag_set (peer_ctx, flags);
1358 }
1359
1360
1361 /**
1362  * @brief set flags on a given channel.
1363  *
1364  * @param channel the channel to set flags on
1365  * @param flags the flags
1366  */
1367 void
1368 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1369 {
1370   set_channel_flag (channel_flags, flags);
1371 }
1372
1373
1374 /**
1375  * @brief unset flags on a given channel.
1376  *
1377  * @param channel the channel to unset flags on
1378  * @param flags the flags
1379  */
1380 void
1381 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1382 {
1383   unset_channel_flag (channel_flags, flags);
1384 }
1385
1386
1387 /**
1388  * @brief Check whether flags on a channel are set.
1389  *
1390  * @param channel the channel to check the flag of
1391  * @param flags the flags to check
1392  *
1393  * @return #GNUNET_YES if all given flags are set
1394  *         #GNUNET_NO  otherwise
1395  */
1396 int
1397 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1398 {
1399   return check_channel_flag_set (channel_flags, flags);
1400 }
1401
1402 /**
1403  * @brief Get the flags for the channel in @a role for @a peer.
1404  *
1405  * @param peer Peer to get the channel flags for.
1406  * @param role Role of channel to get flags for
1407  *
1408  * @return The flags.
1409  */
1410 uint32_t *
1411 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1412                         enum Peers_ChannelRole role)
1413 {
1414   const struct PeerContext *peer_ctx;
1415
1416   peer_ctx = get_peer_ctx (peer);
1417   if (Peers_CHANNEL_ROLE_SENDING == role)
1418   {
1419     return peer_ctx->send_channel_flags;
1420   }
1421   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1422   {
1423     return peer_ctx->recv_channel_flags;
1424   }
1425   else
1426   {
1427     GNUNET_assert (0);
1428   }
1429 }
1430
1431 /**
1432  * @brief Check whether we have information about the given peer.
1433  *
1434  * FIXME probably deprecated. Make this the new _online.
1435  *
1436  * @param peer peer in question
1437  *
1438  * @return #GNUNET_YES if peer is known
1439  *         #GNUNET_NO  if peer is not knwon
1440  */
1441 int
1442 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1443 {
1444   if (NULL != peer_map)
1445   {
1446     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1447   } else
1448   {
1449     return GNUNET_NO;
1450   }
1451 }
1452
1453
1454 /**
1455  * @brief Check whether @a peer is actually a peer.
1456  *
1457  * A valid peer is a peer that we know exists eg. we were connected to once.
1458  *
1459  * @param peer peer in question
1460  *
1461  * @return #GNUNET_YES if peer is valid
1462  *         #GNUNET_NO  if peer is not valid
1463  */
1464 int
1465 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1466 {
1467   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1468 }
1469
1470
1471 /**
1472  * @brief Indicate that we want to send to the other peer
1473  *
1474  * This establishes a sending channel
1475  *
1476  * @param peer the peer to establish channel to
1477  */
1478 void
1479 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1480 {
1481   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1482   (void) get_channel (peer);
1483 }
1484
1485
1486 /**
1487  * @brief Check whether other peer has the intention to send/opened channel
1488  *        towars us
1489  *
1490  * @param peer the peer in question
1491  *
1492  * @return #GNUNET_YES if peer has the intention to send
1493  *         #GNUNET_NO  otherwise
1494  */
1495 int
1496 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1497 {
1498   const struct PeerContext *peer_ctx;
1499
1500   peer_ctx = get_peer_ctx (peer);
1501   if (NULL != peer_ctx->recv_channel)
1502   {
1503     return GNUNET_YES;
1504   }
1505   return GNUNET_NO;
1506 }
1507
1508
1509 /**
1510  * Handle the channel a peer opens to us.
1511  *
1512  * @param cls The closure
1513  * @param channel The channel the peer wants to establish
1514  * @param initiator The peer's peer ID
1515  *
1516  * @return initial channel context for the channel
1517  *         (can be NULL -- that's not an error)
1518  */
1519 void *
1520 Peers_handle_inbound_channel (void *cls,
1521                               struct GNUNET_CADET_Channel *channel,
1522                               const struct GNUNET_PeerIdentity *initiator)
1523 {
1524   struct PeerContext *peer_ctx;
1525   struct GNUNET_PeerIdentity *ctx_peer;
1526
1527   LOG (GNUNET_ERROR_TYPE_DEBUG,
1528       "New channel was established to us (Peer %s).\n",
1529       GNUNET_i2s (initiator));
1530   GNUNET_assert (NULL != channel); /* according to cadet API */
1531   /* Make sure we 'know' about this peer */
1532   peer_ctx = create_or_get_peer_ctx (initiator);
1533   set_peer_live (peer_ctx);
1534   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1535   *ctx_peer = *initiator;
1536   /* We only accept one incoming channel per peer */
1537   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1538   {
1539     set_channel_flag (peer_ctx->recv_channel_flags,
1540                       Peers_CHANNEL_ESTABLISHED_TWICE);
1541     //GNUNET_CADET_channel_destroy (channel);
1542     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1543     peer_ctx->recv_channel = channel;
1544     /* return the channel context */
1545     return ctx_peer;
1546   }
1547   peer_ctx->recv_channel = channel;
1548   return ctx_peer;
1549 }
1550
1551
1552 /**
1553  * @brief Check whether a sending channel towards the given peer exists
1554  *
1555  * @param peer the peer to check for
1556  *
1557  * @return #GNUNET_YES if a sending channel towards that peer exists
1558  *         #GNUNET_NO  otherwise
1559  */
1560 int
1561 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1562 {
1563   struct PeerContext *peer_ctx;
1564
1565   if (GNUNET_NO == Peers_check_peer_known (peer))
1566   { /* If no such peer exists, there is no channel */
1567     return GNUNET_NO;
1568   }
1569   peer_ctx = get_peer_ctx (peer);
1570   if (NULL == peer_ctx->send_channel)
1571   {
1572     return GNUNET_NO;
1573   }
1574   return GNUNET_YES;
1575 }
1576
1577
1578 /**
1579  * @brief check whether the given channel is the sending channel of the given
1580  *        peer
1581  *
1582  * @param peer the peer in question
1583  * @param channel the channel to check for
1584  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1585  *                    #Peers_CHANNEL_ROLE_RECEIVING
1586  *
1587  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1588  *         #GNUNET_NO  otherwise
1589  */
1590 int
1591 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1592                           const struct GNUNET_CADET_Channel *channel,
1593                           enum Peers_ChannelRole role)
1594 {
1595   const struct PeerContext *peer_ctx;
1596
1597   if (GNUNET_NO == Peers_check_peer_known (peer))
1598   {
1599     return GNUNET_NO;
1600   }
1601   peer_ctx = get_peer_ctx (peer);
1602   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1603        (channel == peer_ctx->send_channel) )
1604   {
1605     return GNUNET_YES;
1606   }
1607   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1608        (channel == peer_ctx->recv_channel) )
1609   {
1610     return GNUNET_YES;
1611   }
1612   return GNUNET_NO;
1613 }
1614
1615
1616 /**
1617  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618  *        intention to another peer
1619  *
1620  * If there is also no channel to receive messages from that peer, remove it
1621  * from the peermap.
1622  * TODO really?
1623  *
1624  * @peer the peer identity of the peer whose sending channel to destroy
1625  * @return #GNUNET_YES if channel was destroyed
1626  *         #GNUNET_NO  otherwise
1627  */
1628 int
1629 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1630 {
1631   struct PeerContext *peer_ctx;
1632
1633   if (GNUNET_NO == Peers_check_peer_known (peer))
1634   {
1635     return GNUNET_NO;
1636   }
1637   peer_ctx = get_peer_ctx (peer);
1638   if (NULL != peer_ctx->send_channel)
1639   {
1640     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1641     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1642     peer_ctx->send_channel = NULL;
1643     peer_ctx->mq = NULL;
1644     (void) Peers_check_connected (peer);
1645     return GNUNET_YES;
1646   }
1647   return GNUNET_NO;
1648 }
1649
1650 /**
1651  * This is called when a channel is destroyed.
1652  *
1653  * @param cls The closure
1654  * @param channel The channel being closed
1655  */
1656 void
1657 Peers_cleanup_destroyed_channel (void *cls,
1658                                  const struct GNUNET_CADET_Channel *channel)
1659 {
1660   struct GNUNET_PeerIdentity *peer = cls;
1661   struct PeerContext *peer_ctx;
1662
1663   if (GNUNET_NO == Peers_check_peer_known (peer))
1664   {/* We don't want to implicitly create a context that we're about to kill */
1665   LOG (GNUNET_ERROR_TYPE_DEBUG,
1666        "channel (%s) without associated context was destroyed\n",
1667        GNUNET_i2s (peer));
1668     return;
1669   }
1670   peer_ctx = get_peer_ctx (peer);
1671
1672   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1673    * flag will be set. In this case simply make sure that the channels are
1674    * cleaned. */
1675   /* FIXME This distinction seems to be redundant */
1676   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1677   {/* We initiatad the destruction of this particular peer */
1678     LOG (GNUNET_ERROR_TYPE_DEBUG,
1679         "Peer is in the process of being destroyed\n");
1680     if (channel == peer_ctx->send_channel)
1681     {
1682       peer_ctx->send_channel = NULL;
1683       peer_ctx->mq = NULL;
1684     }
1685     else if (channel == peer_ctx->recv_channel)
1686     {
1687       peer_ctx->recv_channel = NULL;
1688     }
1689
1690     if (NULL != peer_ctx->send_channel)
1691     {
1692       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1693       peer_ctx->send_channel = NULL;
1694       peer_ctx->mq = NULL;
1695     }
1696     if (NULL != peer_ctx->recv_channel)
1697     {
1698       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1699       peer_ctx->recv_channel = NULL;
1700     }
1701     /* Set the #Peers_ONLINE flag accordingly */
1702     (void) Peers_check_connected (peer);
1703     return;
1704   }
1705
1706   else
1707   { /* We did not initiate the destruction of this peer */
1708     LOG (GNUNET_ERROR_TYPE_DEBUG,
1709         "Peer is NOT in the process of being destroyed\n");
1710     if (channel == peer_ctx->send_channel)
1711     { /* Something (but us) killd the channel - clean up peer */
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713           "send channel (%s) was destroyed - cleaning up\n",
1714           GNUNET_i2s (peer));
1715       peer_ctx->send_channel = NULL;
1716       peer_ctx->mq = NULL;
1717     }
1718     else if (channel == peer_ctx->recv_channel)
1719     { /* Other peer doesn't want to send us messages anymore */
1720       LOG (GNUNET_ERROR_TYPE_DEBUG,
1721            "Peer %s destroyed recv channel - cleaning up channel\n",
1722            GNUNET_i2s (peer));
1723       peer_ctx->recv_channel = NULL;
1724     }
1725     else
1726     {
1727       LOG (GNUNET_ERROR_TYPE_WARNING,
1728            "unknown channel (%s) was destroyed\n",
1729            GNUNET_i2s (peer));
1730     }
1731   }
1732   (void) Peers_check_connected (peer);
1733 }
1734
1735 /**
1736  * @brief Send a message to another peer.
1737  *
1738  * Keeps track about pending messages so they can be properly removed when the
1739  * peer is destroyed.
1740  *
1741  * @param peer receeiver of the message
1742  * @param ev envelope of the message
1743  * @param type type of the message
1744  */
1745 void
1746 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1747                     struct GNUNET_MQ_Envelope *ev,
1748                     const char *type)
1749 {
1750   struct PendingMessage *pending_msg;
1751   struct GNUNET_MQ_Handle *mq;
1752
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "Sending message to %s of type %s\n",
1755               GNUNET_i2s (peer),
1756               type);
1757   pending_msg = insert_pending_message (peer, ev, type);
1758   mq = get_mq (peer);
1759   GNUNET_MQ_notify_sent (ev,
1760                          mq_notify_sent_cb,
1761                          pending_msg);
1762   GNUNET_MQ_send (mq, ev);
1763 }
1764
1765 /**
1766  * @brief Schedule a operation on given peer
1767  *
1768  * Avoids scheduling an operation twice.
1769  *
1770  * @param peer the peer we want to schedule the operation for once it gets live
1771  *
1772  * @return #GNUNET_YES if the operation was scheduled
1773  *         #GNUNET_NO  otherwise
1774  */
1775 int
1776 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1777                           const PeerOp peer_op)
1778 {
1779   struct PeerPendingOp pending_op;
1780   struct PeerContext *peer_ctx;
1781
1782   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1783   {
1784     return GNUNET_NO;
1785   }
1786   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1787
1788   //TODO if LIVE/ONLINE execute immediately
1789
1790   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1791   {
1792     peer_ctx = get_peer_ctx (peer);
1793     pending_op.op = peer_op;
1794     pending_op.op_cls = NULL;
1795     GNUNET_array_append (peer_ctx->pending_ops,
1796                          peer_ctx->num_pending_ops,
1797                          pending_op);
1798     return GNUNET_YES;
1799   }
1800   return GNUNET_NO;
1801 }
1802
1803 /**
1804  * @brief Get the recv_channel of @a peer.
1805  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1806  * messages.
1807  *
1808  * @param peer The peer to get the recv_channel from.
1809  *
1810  * @return The recv_channel.
1811  */
1812 struct GNUNET_CADET_Channel *
1813 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1814 {
1815   struct PeerContext *peer_ctx;
1816
1817   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1818   peer_ctx = get_peer_ctx (peer);
1819   return peer_ctx->recv_channel;
1820 }
1821 /***********************************************************************
1822  * /Old gnunet-service-rps_peers.c
1823 ***********************************************************************/
1824
1825
1826 /***********************************************************************
1827  * Housekeeping with clients
1828 ***********************************************************************/
1829
1830 /**
1831  * Closure used to pass the client and the id to the callback
1832  * that replies to a client's request
1833  */
1834 struct ReplyCls
1835 {
1836   /**
1837    * DLL
1838    */
1839   struct ReplyCls *next;
1840   struct ReplyCls *prev;
1841
1842   /**
1843    * The identifier of the request
1844    */
1845   uint32_t id;
1846
1847   /**
1848    * The handle to the request
1849    */
1850   struct RPS_SamplerRequestHandle *req_handle;
1851
1852   /**
1853    * The client handle to send the reply to
1854    */
1855   struct ClientContext *cli_ctx;
1856 };
1857
1858
1859 /**
1860  * Struct used to store the context of a connected client.
1861  */
1862 struct ClientContext
1863 {
1864   /**
1865    * DLL
1866    */
1867   struct ClientContext *next;
1868   struct ClientContext *prev;
1869
1870   /**
1871    * The message queue to communicate with the client.
1872    */
1873   struct GNUNET_MQ_Handle *mq;
1874
1875   /**
1876    * DLL with handles to single requests from the client
1877    */
1878   struct ReplyCls *rep_cls_head;
1879   struct ReplyCls *rep_cls_tail;
1880
1881   /**
1882    * @brief How many updates this client expects to receive.
1883    */
1884   int64_t view_updates_left;
1885
1886   /**
1887    * The client handle to send the reply to
1888    */
1889   struct GNUNET_SERVICE_Client *client;
1890 };
1891
1892 /**
1893  * DLL with all clients currently connected to us
1894  */
1895 struct ClientContext *cli_ctx_head;
1896 struct ClientContext *cli_ctx_tail;
1897
1898 /***********************************************************************
1899  * /Housekeeping with clients
1900 ***********************************************************************/
1901
1902
1903
1904
1905
1906 /***********************************************************************
1907  * Globals
1908 ***********************************************************************/
1909
1910 /**
1911  * Sampler used for the Brahms protocol itself.
1912  */
1913 static struct RPS_Sampler *prot_sampler;
1914
1915 /**
1916  * Sampler used for the clients.
1917  */
1918 static struct RPS_Sampler *client_sampler;
1919
1920 /**
1921  * Name to log view to
1922  */
1923 static char *file_name_view_log;
1924
1925 /**
1926  * The size of sampler we need to be able to satisfy the client's need
1927  * of random peers.
1928  */
1929 static unsigned int sampler_size_client_need;
1930
1931 /**
1932  * The size of sampler we need to be able to satisfy the Brahms protocol's
1933  * need of random peers.
1934  *
1935  * This is one minimum size the sampler grows to.
1936  */
1937 static unsigned int sampler_size_est_need;
1938
1939 /**
1940  * @brief This is the minimum estimate used as sampler size.
1941  *
1942  * It is configured by the user.
1943  */
1944 static unsigned int sampler_size_est_min;
1945
1946 /**
1947  * @brief This is the estimate used as view size.
1948  *
1949  * It is initialised with the minimum
1950  */
1951 static unsigned int view_size_est_need;
1952
1953 /**
1954  * @brief This is the minimum estimate used as view size.
1955  *
1956  * It is configured by the user.
1957  */
1958 static unsigned int view_size_est_min;
1959
1960 /**
1961  * Percentage of total peer number in the view
1962  * to send random PUSHes to
1963  */
1964 static float alpha;
1965
1966 /**
1967  * Percentage of total peer number in the view
1968  * to send random PULLs to
1969  */
1970 static float beta;
1971
1972 /**
1973  * Identifier for the main task that runs periodically.
1974  */
1975 static struct GNUNET_SCHEDULER_Task *do_round_task;
1976
1977 /**
1978  * Time inverval the do_round task runs in.
1979  */
1980 static struct GNUNET_TIME_Relative round_interval;
1981
1982 /**
1983  * List to store peers received through pushes temporary.
1984  */
1985 static struct CustomPeerMap *push_map;
1986
1987 /**
1988  * List to store peers received through pulls temporary.
1989  */
1990 static struct CustomPeerMap *pull_map;
1991
1992 /**
1993  * Handler to NSE.
1994  */
1995 static struct GNUNET_NSE_Handle *nse;
1996
1997 /**
1998  * Handler to CADET.
1999  */
2000 static struct GNUNET_CADET_Handle *cadet_handle;
2001
2002 /**
2003  * @brief Port to communicate to other peers.
2004  */
2005 static struct GNUNET_CADET_Port *cadet_port;
2006
2007 /**
2008  * Handler to PEERINFO.
2009  */
2010 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2011
2012 /**
2013  * Handle for cancellation of iteration over peers.
2014  */
2015 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2016
2017 /**
2018  * Request counter.
2019  *
2020  * Counts how many requets clients already issued.
2021  * Only needed in the beginning to check how many of the 64 deltas
2022  * we already have
2023  */
2024 static unsigned int req_counter;
2025
2026 /**
2027  * Time of the last request we received.
2028  *
2029  * Used to compute the expected request rate.
2030  */
2031 static struct GNUNET_TIME_Absolute last_request;
2032
2033 /**
2034  * Size of #request_deltas.
2035  */
2036 #define REQUEST_DELTAS_SIZE 64
2037 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2038
2039 /**
2040  * Last 64 deltas between requests
2041  */
2042 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2043
2044 /**
2045  * The prediction of the rate of requests
2046  */
2047 static struct GNUNET_TIME_Relative request_rate;
2048
2049
2050 #ifdef ENABLE_MALICIOUS
2051 /**
2052  * Type of malicious peer
2053  *
2054  * 0 Don't act malicious at all - Default
2055  * 1 Try to maximise representation
2056  * 2 Try to partition the network
2057  * 3 Combined attack
2058  */
2059 static uint32_t mal_type;
2060
2061 /**
2062  * Other malicious peers
2063  */
2064 static struct GNUNET_PeerIdentity *mal_peers;
2065
2066 /**
2067  * Hashmap of malicious peers used as set.
2068  * Used to more efficiently check whether we know that peer.
2069  */
2070 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2071
2072 /**
2073  * Number of other malicious peers
2074  */
2075 static uint32_t num_mal_peers;
2076
2077
2078 /**
2079  * If type is 2 This struct is used to store the attacked peers in a DLL
2080  */
2081 struct AttackedPeer
2082 {
2083   /**
2084    * DLL
2085    */
2086   struct AttackedPeer *next;
2087   struct AttackedPeer *prev;
2088
2089   /**
2090    * PeerID
2091    */
2092   struct GNUNET_PeerIdentity peer_id;
2093 };
2094
2095 /**
2096  * If type is 2 this is the DLL of attacked peers
2097  */
2098 static struct AttackedPeer *att_peers_head;
2099 static struct AttackedPeer *att_peers_tail;
2100
2101 /**
2102  * This index is used to point to an attacked peer to
2103  * implement the round-robin-ish way to select attacked peers.
2104  */
2105 static struct AttackedPeer *att_peer_index;
2106
2107 /**
2108  * Hashmap of attacked peers used as set.
2109  * Used to more efficiently check whether we know that peer.
2110  */
2111 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2112
2113 /**
2114  * Number of attacked peers
2115  */
2116 static uint32_t num_attacked_peers;
2117
2118 /**
2119  * If type is 1 this is the attacked peer
2120  */
2121 static struct GNUNET_PeerIdentity attacked_peer;
2122
2123 /**
2124  * The limit of PUSHes we can send in one round.
2125  * This is an assumption of the Brahms protocol and either implemented
2126  * via proof of work
2127  * or
2128  * assumend to be the bandwidth limitation.
2129  */
2130 static uint32_t push_limit = 10000;
2131 #endif /* ENABLE_MALICIOUS */
2132
2133
2134 /***********************************************************************
2135  * /Globals
2136 ***********************************************************************/
2137
2138
2139 /***********************************************************************
2140  * Util functions
2141 ***********************************************************************/
2142
2143
2144 /**
2145  * Print peerlist to log.
2146  */
2147 static void
2148 print_peer_list (struct GNUNET_PeerIdentity *list,
2149                  unsigned int len)
2150 {
2151   unsigned int i;
2152
2153   LOG (GNUNET_ERROR_TYPE_DEBUG,
2154        "Printing peer list of length %u at %p:\n",
2155        len,
2156        list);
2157   for (i = 0 ; i < len ; i++)
2158   {
2159     LOG (GNUNET_ERROR_TYPE_DEBUG,
2160          "%u. peer: %s\n",
2161          i, GNUNET_i2s (&list[i]));
2162   }
2163 }
2164
2165
2166 /**
2167  * Remove peer from list.
2168  */
2169 static void
2170 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2171                unsigned int *list_size,
2172                const struct GNUNET_PeerIdentity *peer)
2173 {
2174   unsigned int i;
2175   struct GNUNET_PeerIdentity *tmp;
2176
2177   tmp = *peer_list;
2178
2179   LOG (GNUNET_ERROR_TYPE_DEBUG,
2180        "Removing peer %s from list at %p\n",
2181        GNUNET_i2s (peer),
2182        tmp);
2183
2184   for ( i = 0 ; i < *list_size ; i++ )
2185   {
2186     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2187     {
2188       if (i < *list_size -1)
2189       { /* Not at the last entry -- shift peers left */
2190         memmove (&tmp[i], &tmp[i +1],
2191                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2192       }
2193       /* Remove last entry (should be now useless PeerID) */
2194       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2195     }
2196   }
2197   *peer_list = tmp;
2198 }
2199
2200
2201 /**
2202  * Sum all time relatives of an array.
2203  */
2204 static struct GNUNET_TIME_Relative
2205 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2206                 uint32_t arr_size)
2207 {
2208   struct GNUNET_TIME_Relative sum;
2209   uint32_t i;
2210
2211   sum = GNUNET_TIME_UNIT_ZERO;
2212   for ( i = 0 ; i < arr_size ; i++ )
2213   {
2214     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2215   }
2216   return sum;
2217 }
2218
2219
2220 /**
2221  * Compute the average of given time relatives.
2222  */
2223 static struct GNUNET_TIME_Relative
2224 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2225                 uint32_t arr_size)
2226 {
2227   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2228                                                       arr_size),
2229                                       arr_size);
2230 }
2231
2232
2233 /**
2234  * Insert PeerID in #view
2235  *
2236  * Called once we know a peer is live.
2237  * Implements #PeerOp
2238  *
2239  * @return GNUNET_OK if peer was actually inserted
2240  *         GNUNET_NO if peer was not inserted
2241  */
2242 static void
2243 insert_in_view_op (void *cls,
2244                 const struct GNUNET_PeerIdentity *peer);
2245
2246 /**
2247  * Insert PeerID in #view
2248  *
2249  * Called once we know a peer is live.
2250  *
2251  * @return GNUNET_OK if peer was actually inserted
2252  *         GNUNET_NO if peer was not inserted
2253  */
2254 static int
2255 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2256 {
2257   int online;
2258
2259   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2260   if ( (GNUNET_NO == online) ||
2261        (GNUNET_SYSERR == online) ) /* peer is not even known */
2262   {
2263     (void) Peers_issue_peer_liveliness_check (peer);
2264     (void) Peers_schedule_operation (peer, insert_in_view_op);
2265     return GNUNET_NO;
2266   }
2267   /* Open channel towards peer to keep connection open */
2268   Peers_indicate_sending_intention (peer);
2269   return View_put (peer);
2270 }
2271
2272 /**
2273  * @brief sends updates to clients that are interested
2274  */
2275 static void
2276 clients_notify_view_update (void);
2277
2278 /**
2279  * Put random peer from sampler into the view as history update.
2280  */
2281 static void
2282 hist_update (void *cls,
2283              struct GNUNET_PeerIdentity *ids,
2284              uint32_t num_peers)
2285 {
2286   unsigned int i;
2287
2288   for (i = 0; i < num_peers; i++)
2289   {
2290     (void) insert_in_view (&ids[i]);
2291     to_file (file_name_view_log,
2292              "+%s\t(hist)",
2293              GNUNET_i2s_full (ids));
2294   }
2295   clients_notify_view_update();
2296 }
2297
2298
2299 /**
2300  * Wrapper around #RPS_sampler_resize()
2301  *
2302  * If we do not have enough sampler elements, double current sampler size
2303  * If we have more than enough sampler elements, halv current sampler size
2304  */
2305 static void
2306 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2307 {
2308   unsigned int sampler_size;
2309
2310   // TODO statistics
2311   // TODO respect the min, max
2312   sampler_size = RPS_sampler_get_size (sampler);
2313   if (sampler_size > new_size * 4)
2314   { /* Shrinking */
2315     RPS_sampler_resize (sampler, sampler_size / 2);
2316   }
2317   else if (sampler_size < new_size)
2318   { /* Growing */
2319     RPS_sampler_resize (sampler, sampler_size * 2);
2320   }
2321   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2322 }
2323
2324
2325 /**
2326  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2327  */
2328 static void
2329 client_resize_wrapper ()
2330 {
2331   uint32_t bigger_size;
2332
2333   // TODO statistics
2334
2335   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2336
2337   // TODO respect the min, max
2338   resize_wrapper (client_sampler, bigger_size);
2339   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2340       bigger_size);
2341 }
2342
2343
2344 /**
2345  * Estimate request rate
2346  *
2347  * Called every time we receive a request from the client.
2348  */
2349 static void
2350 est_request_rate()
2351 {
2352   struct GNUNET_TIME_Relative max_round_duration;
2353
2354   if (request_deltas_size > req_counter)
2355     req_counter++;
2356   if ( 1 < req_counter)
2357   {
2358     /* Shift last request deltas to the right */
2359     memmove (&request_deltas[1],
2360         request_deltas,
2361         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2362
2363     /* Add current delta to beginning */
2364     request_deltas[0] =
2365         GNUNET_TIME_absolute_get_difference (last_request,
2366                                              GNUNET_TIME_absolute_get ());
2367     request_rate = T_relative_avg (request_deltas, req_counter);
2368     request_rate = (request_rate.rel_value_us < 1) ?
2369       GNUNET_TIME_relative_get_unit_ () : request_rate;
2370
2371     /* Compute the duration a round will maximally take */
2372     max_round_duration =
2373         GNUNET_TIME_relative_add (round_interval,
2374                                   GNUNET_TIME_relative_divide (round_interval, 2));
2375
2376     /* Set the estimated size the sampler has to have to
2377      * satisfy the current client request rate */
2378     sampler_size_client_need =
2379         max_round_duration.rel_value_us / request_rate.rel_value_us;
2380
2381     /* Resize the sampler */
2382     client_resize_wrapper ();
2383   }
2384   last_request = GNUNET_TIME_absolute_get ();
2385 }
2386
2387
2388 /**
2389  * Add all peers in @a peer_array to @a peer_map used as set.
2390  *
2391  * @param peer_array array containing the peers
2392  * @param num_peers number of peers in @peer_array
2393  * @param peer_map the peermap to use as set
2394  */
2395 static void
2396 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2397                        unsigned int num_peers,
2398                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2399 {
2400   unsigned int i;
2401   if (NULL == peer_map)
2402   {
2403     LOG (GNUNET_ERROR_TYPE_WARNING,
2404          "Trying to add peers to non-existing peermap.\n");
2405     return;
2406   }
2407
2408   for (i = 0; i < num_peers; i++)
2409   {
2410     GNUNET_CONTAINER_multipeermap_put (peer_map,
2411                                        &peer_array[i],
2412                                        NULL,
2413                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2414   }
2415 }
2416
2417
2418 /**
2419  * Send a PULL REPLY to @a peer_id
2420  *
2421  * @param peer_id the peer to send the reply to.
2422  * @param peer_ids the peers to send to @a peer_id
2423  * @param num_peer_ids the number of peers to send to @a peer_id
2424  */
2425 static void
2426 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2427                  const struct GNUNET_PeerIdentity *peer_ids,
2428                  unsigned int num_peer_ids)
2429 {
2430   uint32_t send_size;
2431   struct GNUNET_MQ_Envelope *ev;
2432   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2433
2434   /* Compute actual size */
2435   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2436               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2437
2438   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2439     /* Compute number of peers to send
2440      * If too long, simply truncate */
2441     // TODO select random ones via permutation
2442     //      or even better: do good protocol design
2443     send_size =
2444       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2445        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2446        sizeof (struct GNUNET_PeerIdentity);
2447   else
2448     send_size = num_peer_ids;
2449
2450   LOG (GNUNET_ERROR_TYPE_DEBUG,
2451       "Going to send PULL REPLY with %u peers to %s\n",
2452       send_size, GNUNET_i2s (peer_id));
2453
2454   ev = GNUNET_MQ_msg_extra (out_msg,
2455                             send_size * sizeof (struct GNUNET_PeerIdentity),
2456                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2457   out_msg->num_peers = htonl (send_size);
2458   GNUNET_memcpy (&out_msg[1], peer_ids,
2459          send_size * sizeof (struct GNUNET_PeerIdentity));
2460
2461   Peers_send_message (peer_id, ev, "PULL REPLY");
2462   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2463 }
2464
2465
2466 /**
2467  * Insert PeerID in #pull_map
2468  *
2469  * Called once we know a peer is live.
2470  */
2471 static void
2472 insert_in_pull_map (void *cls,
2473                     const struct GNUNET_PeerIdentity *peer)
2474 {
2475   CustomPeerMap_put (pull_map, peer);
2476 }
2477
2478
2479 /**
2480  * Insert PeerID in #view
2481  *
2482  * Called once we know a peer is live.
2483  * Implements #PeerOp
2484  */
2485 static void
2486 insert_in_view_op (void *cls,
2487                 const struct GNUNET_PeerIdentity *peer)
2488 {
2489   (void) insert_in_view (peer);
2490 }
2491
2492
2493 /**
2494  * Update sampler with given PeerID.
2495  * Implements #PeerOp
2496  */
2497 static void
2498 insert_in_sampler (void *cls,
2499                    const struct GNUNET_PeerIdentity *peer)
2500 {
2501   LOG (GNUNET_ERROR_TYPE_DEBUG,
2502        "Updating samplers with peer %s from insert_in_sampler()\n",
2503        GNUNET_i2s (peer));
2504   RPS_sampler_update (prot_sampler,   peer);
2505   RPS_sampler_update (client_sampler, peer);
2506   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2507   {
2508     /* Make sure we 'know' about this peer */
2509     (void) Peers_issue_peer_liveliness_check (peer);
2510     /* Establish a channel towards that peer to indicate we are going to send
2511      * messages to it */
2512     //Peers_indicate_sending_intention (peer);
2513   }
2514 }
2515
2516 /**
2517  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2518  *        If the peer is not known, liveliness check is issued and it is
2519  *        scheduled to be inserted in sampler and view.
2520  *
2521  * "External sources" refer to every source except the gossip.
2522  *
2523  * @param peer peer to insert
2524  */
2525 static void
2526 got_peer (const struct GNUNET_PeerIdentity *peer)
2527 {
2528   /* If we did not know this peer already, insert it into sampler and view */
2529   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2530   {
2531     Peers_schedule_operation (peer, insert_in_sampler);
2532     Peers_schedule_operation (peer, insert_in_view_op);
2533   }
2534 }
2535
2536 /**
2537  * @brief Checks if there is a sending channel and if it is needed
2538  *
2539  * @param peer the peer whose sending channel is checked
2540  * @return GNUNET_YES if sending channel exists and is still needed
2541  *         GNUNET_NO  otherwise
2542  */
2543 static int
2544 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2545 {
2546   /* struct GNUNET_CADET_Channel *channel; */
2547   if (GNUNET_NO == Peers_check_peer_known (peer))
2548   {
2549     return GNUNET_NO;
2550   }
2551   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2552   {
2553     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2554          (GNUNET_YES == View_contains_peer (peer)) ||
2555          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2556          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2557          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2558     { /* If we want to keep the connection to peer open */
2559       return GNUNET_YES;
2560     }
2561     return GNUNET_NO;
2562   }
2563   return GNUNET_NO;
2564 }
2565
2566 /**
2567  * @brief remove peer from our knowledge, the view, push and pull maps and
2568  * samplers.
2569  *
2570  * @param peer the peer to remove
2571  */
2572 static void
2573 remove_peer (const struct GNUNET_PeerIdentity *peer)
2574 {
2575   (void) View_remove_peer (peer);
2576   CustomPeerMap_remove_peer (pull_map, peer);
2577   CustomPeerMap_remove_peer (push_map, peer);
2578   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2579   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2580   Peers_remove_peer (peer);
2581 }
2582
2583
2584 /**
2585  * @brief Remove data that is not needed anymore.
2586  *
2587  * If the sending channel is no longer needed it is destroyed.
2588  *
2589  * @param peer the peer whose data is about to be cleaned
2590  */
2591 static void
2592 clean_peer (const struct GNUNET_PeerIdentity *peer)
2593 {
2594   if (GNUNET_NO == check_sending_channel_needed (peer))
2595   {
2596     LOG (GNUNET_ERROR_TYPE_DEBUG,
2597         "Going to remove send channel to peer %s\n",
2598         GNUNET_i2s (peer));
2599     #ifdef ENABLE_MALICIOUS
2600     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2601       (void) Peers_destroy_sending_channel (peer);
2602     #else /* ENABLE_MALICIOUS */
2603     (void) Peers_destroy_sending_channel (peer);
2604     #endif /* ENABLE_MALICIOUS */
2605   }
2606
2607   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2608        (GNUNET_NO == View_contains_peer (peer)) &&
2609        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2610        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2611        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2612        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2613        (GNUNET_NO != Peers_check_removable (peer)) )
2614   { /* We can safely remove this peer */
2615     LOG (GNUNET_ERROR_TYPE_DEBUG,
2616         "Going to remove peer %s\n",
2617         GNUNET_i2s (peer));
2618     remove_peer (peer);
2619     return;
2620   }
2621 }
2622
2623 /**
2624  * @brief This is called when a channel is destroyed.
2625  *
2626  * Removes peer completely from our knowledge if the send_channel was destroyed
2627  * Otherwise simply delete the recv_channel
2628  * Also check if the knowledge about this peer is still needed.
2629  * If not, remove this peer from our knowledge.
2630  *
2631  * @param cls The closure
2632  * @param channel The channel being closed
2633  * @param channel_ctx The context associated with this channel
2634  */
2635 static void
2636 cleanup_destroyed_channel (void *cls,
2637                            const struct GNUNET_CADET_Channel *channel)
2638 {
2639   struct GNUNET_PeerIdentity *peer = cls;
2640   uint32_t *channel_flag;
2641   struct PeerContext *peer_ctx;
2642
2643   GNUNET_assert (NULL != peer);
2644
2645   if (GNUNET_NO == Peers_check_peer_known (peer))
2646   { /* We don't know a context to that peer */
2647     LOG (GNUNET_ERROR_TYPE_WARNING,
2648          "channel (%s) without associated context was destroyed\n",
2649          GNUNET_i2s (peer));
2650     GNUNET_free (peer);
2651     return;
2652   }
2653
2654   peer_ctx = get_peer_ctx (peer);
2655   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2656   {
2657     LOG (GNUNET_ERROR_TYPE_DEBUG,
2658         "Callback on destruction of recv-channel was called (%s)\n",
2659         GNUNET_i2s (peer));
2660     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2661   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2662   {
2663     LOG (GNUNET_ERROR_TYPE_DEBUG,
2664         "Callback on destruction of send-channel was called (%s)\n",
2665         GNUNET_i2s (peer));
2666     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2667   } else {
2668     LOG (GNUNET_ERROR_TYPE_ERROR,
2669         "Channel to be destroyed has is neither sending nor receiving role\n");
2670   }
2671
2672   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2673   { /* We are in the middle of removing that peer from our knowledge. In this
2674        case simply make sure that the channels are cleaned. */
2675     Peers_cleanup_destroyed_channel (cls, channel);
2676     to_file (file_name_view_log,
2677              "-%s\t(cleanup channel, ourself)",
2678              GNUNET_i2s_full (peer));
2679     GNUNET_free (peer);
2680     return;
2681   }
2682
2683   if (GNUNET_YES ==
2684       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2685   { /* Channel used for sending was destroyed */
2686     /* Possible causes of channel destruction:
2687      *  - ourselves  -> cleaning send channel -> clean context
2688      *  - other peer -> peer probably went down -> remove
2689      */
2690     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2691     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2692     { /* We are about to clean the sending channel. Clean the respective
2693        * context */
2694       Peers_cleanup_destroyed_channel (cls, channel);
2695       GNUNET_free (peer);
2696       return;
2697     }
2698     else
2699     { /* Other peer destroyed our sending channel that he is supposed to keep
2700        * open. It probably went down. Remove it from our knowledge. */
2701       Peers_cleanup_destroyed_channel (cls, channel);
2702       remove_peer (peer);
2703       GNUNET_free (peer);
2704       return;
2705     }
2706   }
2707   else if (GNUNET_YES ==
2708       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2709   { /* Channel used for receiving was destroyed */
2710     /* Possible causes of channel destruction:
2711      *  - ourselves  -> peer tried to establish channel twice -> clean context
2712      *  - other peer -> peer doesn't want to send us data -> clean
2713      */
2714     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2715     if (GNUNET_YES ==
2716         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2717     { /* Other peer tried to establish a channel to us twice. We do not accept
2718        * that. Clean the context. */
2719       Peers_cleanup_destroyed_channel (cls, channel);
2720       GNUNET_free (peer);
2721       return;
2722     }
2723     else
2724     { /* Other peer doesn't want to send us data anymore. We are free to clean
2725        * it. */
2726       Peers_cleanup_destroyed_channel (cls, channel);
2727       clean_peer (peer);
2728       GNUNET_free (peer);
2729       return;
2730     }
2731   }
2732   else
2733   {
2734     LOG (GNUNET_ERROR_TYPE_WARNING,
2735         "Destroyed channel is neither sending nor receiving channel\n");
2736   }
2737   GNUNET_free (peer);
2738 }
2739
2740 /***********************************************************************
2741  * /Util functions
2742 ***********************************************************************/
2743
2744 static void
2745 destroy_reply_cls (struct ReplyCls *rep_cls)
2746 {
2747   struct ClientContext *cli_ctx;
2748
2749   cli_ctx = rep_cls->cli_ctx;
2750   GNUNET_assert (NULL != cli_ctx);
2751   if (NULL != rep_cls->req_handle)
2752   {
2753     RPS_sampler_request_cancel (rep_cls->req_handle);
2754   }
2755   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2756                                cli_ctx->rep_cls_tail,
2757                                rep_cls);
2758   GNUNET_free (rep_cls);
2759 }
2760
2761
2762 static void
2763 destroy_cli_ctx (struct ClientContext *cli_ctx)
2764 {
2765   GNUNET_assert (NULL != cli_ctx);
2766   if (NULL != cli_ctx->rep_cls_head)
2767   {
2768     LOG (GNUNET_ERROR_TYPE_WARNING,
2769          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2770     while (NULL != cli_ctx->rep_cls_head)
2771       destroy_reply_cls (cli_ctx->rep_cls_head);
2772   }
2773   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2774                                cli_ctx_tail,
2775                                cli_ctx);
2776   GNUNET_free (cli_ctx);
2777 }
2778
2779
2780 /**
2781  * Function called by NSE.
2782  *
2783  * Updates sizes of sampler list and view and adapt those lists
2784  * accordingly.
2785  */
2786 static void
2787 nse_callback (void *cls,
2788               struct GNUNET_TIME_Absolute timestamp,
2789               double logestimate, double std_dev)
2790 {
2791   double estimate;
2792   //double scale; // TODO this might go gloabal/config
2793
2794   LOG (GNUNET_ERROR_TYPE_DEBUG,
2795        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2796        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2797   //scale = .01;
2798   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2799   // GNUNET_NSE_log_estimate_to_n (logestimate);
2800   estimate = pow (estimate, 1.0 / 3);
2801   // TODO add if std_dev is a number
2802   // estimate += (std_dev * scale);
2803   if (view_size_est_min < ceil (estimate))
2804   {
2805     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2806     sampler_size_est_need = estimate;
2807     view_size_est_need = estimate;
2808   } else
2809   {
2810     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2811     //sampler_size_est_need = view_size_est_min;
2812     view_size_est_need = view_size_est_min;
2813   }
2814
2815   /* If the NSE has changed adapt the lists accordingly */
2816   resize_wrapper (prot_sampler, sampler_size_est_need);
2817   client_resize_wrapper ();
2818 }
2819
2820
2821 /**
2822  * Callback called once the requested PeerIDs are ready.
2823  *
2824  * Sends those to the requesting client.
2825  */
2826 static void
2827 client_respond (void *cls,
2828                 struct GNUNET_PeerIdentity *peer_ids,
2829                 uint32_t num_peers)
2830 {
2831   struct ReplyCls *reply_cls = cls;
2832   uint32_t i;
2833   struct GNUNET_MQ_Envelope *ev;
2834   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2835   uint32_t size_needed;
2836   struct ClientContext *cli_ctx;
2837
2838   GNUNET_assert (NULL != reply_cls);
2839   LOG (GNUNET_ERROR_TYPE_DEBUG,
2840        "sampler returned %" PRIu32 " peers:\n",
2841        num_peers);
2842   for (i = 0; i < num_peers; i++)
2843   {
2844     LOG (GNUNET_ERROR_TYPE_DEBUG,
2845          "  %" PRIu32 ": %s\n",
2846          i,
2847          GNUNET_i2s (&peer_ids[i]));
2848   }
2849
2850   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2851                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2852
2853   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2854
2855   ev = GNUNET_MQ_msg_extra (out_msg,
2856                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2857                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2858   out_msg->num_peers = htonl (num_peers);
2859   out_msg->id = htonl (reply_cls->id);
2860
2861   GNUNET_memcpy (&out_msg[1],
2862           peer_ids,
2863           num_peers * sizeof (struct GNUNET_PeerIdentity));
2864   GNUNET_free (peer_ids);
2865
2866   cli_ctx = reply_cls->cli_ctx;
2867   GNUNET_assert (NULL != cli_ctx);
2868   reply_cls->req_handle = NULL;
2869   destroy_reply_cls (reply_cls);
2870   GNUNET_MQ_send (cli_ctx->mq, ev);
2871 }
2872
2873
2874 /**
2875  * Handle RPS request from the client.
2876  *
2877  * @param cls closure
2878  * @param message the actual message
2879  */
2880 static void
2881 handle_client_request (void *cls,
2882                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2883 {
2884   struct ClientContext *cli_ctx = cls;
2885   uint32_t num_peers;
2886   uint32_t size_needed;
2887   struct ReplyCls *reply_cls;
2888   uint32_t i;
2889
2890   num_peers = ntohl (msg->num_peers);
2891   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2892                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2893
2894   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2895   {
2896     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2897                 "Message received from client has size larger than expected\n");
2898     GNUNET_SERVICE_client_drop (cli_ctx->client);
2899     return;
2900   }
2901
2902   for (i = 0 ; i < num_peers ; i++)
2903     est_request_rate();
2904
2905   LOG (GNUNET_ERROR_TYPE_DEBUG,
2906        "Client requested %" PRIu32 " random peer(s).\n",
2907        num_peers);
2908
2909   reply_cls = GNUNET_new (struct ReplyCls);
2910   reply_cls->id = ntohl (msg->id);
2911   reply_cls->cli_ctx = cli_ctx;
2912   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2913                                                         client_respond,
2914                                                         reply_cls,
2915                                                         num_peers);
2916
2917   GNUNET_assert (NULL != cli_ctx);
2918   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2919                                cli_ctx->rep_cls_tail,
2920                                reply_cls);
2921   GNUNET_SERVICE_client_continue (cli_ctx->client);
2922 }
2923
2924
2925 /**
2926  * @brief Handle a message that requests the cancellation of a request
2927  *
2928  * @param cls unused
2929  * @param message the message containing the id of the request
2930  */
2931 static void
2932 handle_client_request_cancel (void *cls,
2933                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2934 {
2935   struct ClientContext *cli_ctx = cls;
2936   struct ReplyCls *rep_cls;
2937
2938   GNUNET_assert (NULL != cli_ctx);
2939   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2940   rep_cls = cli_ctx->rep_cls_head;
2941   LOG (GNUNET_ERROR_TYPE_DEBUG,
2942       "Client cancels request with id %" PRIu32 "\n",
2943       ntohl (msg->id));
2944   while ( (NULL != rep_cls->next) &&
2945           (rep_cls->id != ntohl (msg->id)) )
2946     rep_cls = rep_cls->next;
2947   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2948   destroy_reply_cls (rep_cls);
2949   GNUNET_SERVICE_client_continue (cli_ctx->client);
2950 }
2951
2952
2953 /**
2954  * @brief This function is called, when the client seeds peers.
2955  * It verifies that @a msg is well-formed.
2956  *
2957  * @param cls the closure (#ClientContext)
2958  * @param msg the message
2959  * @return #GNUNET_OK if @a msg is well-formed
2960  */
2961 static int
2962 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2963 {
2964   struct ClientContext *cli_ctx = cls;
2965   uint16_t msize = ntohs (msg->header.size);
2966   uint32_t num_peers = ntohl (msg->num_peers);
2967
2968   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2969   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2970        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2971   {
2972     GNUNET_break (0);
2973     GNUNET_SERVICE_client_drop (cli_ctx->client);
2974     return GNUNET_SYSERR;
2975   }
2976   return GNUNET_OK;
2977 }
2978
2979
2980 /**
2981  * Handle seed from the client.
2982  *
2983  * @param cls closure
2984  * @param message the actual message
2985  */
2986 static void
2987 handle_client_seed (void *cls,
2988                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2989 {
2990   struct ClientContext *cli_ctx = cls;
2991   struct GNUNET_PeerIdentity *peers;
2992   uint32_t num_peers;
2993   uint32_t i;
2994
2995   num_peers = ntohl (msg->num_peers);
2996   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2997   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
2998   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
2999
3000   LOG (GNUNET_ERROR_TYPE_DEBUG,
3001        "Client seeded peers:\n");
3002   print_peer_list (peers, num_peers);
3003
3004   for (i = 0; i < num_peers; i++)
3005   {
3006     LOG (GNUNET_ERROR_TYPE_DEBUG,
3007          "Updating samplers with seed %" PRIu32 ": %s\n",
3008          i,
3009          GNUNET_i2s (&peers[i]));
3010
3011     got_peer (&peers[i]);
3012   }
3013
3014   ////GNUNET_free (peers);
3015
3016   GNUNET_SERVICE_client_continue (cli_ctx->client);
3017 }
3018
3019 /**
3020  * @brief Send view to client
3021  *
3022  * @param cli_ctx the context of the client
3023  * @param view_array the peerids of the view as array (can be empty)
3024  * @param view_size the size of the view array (can be 0)
3025  */
3026 void
3027 send_view (const struct ClientContext *cli_ctx,
3028            const struct GNUNET_PeerIdentity *view_array,
3029            uint64_t view_size)
3030 {
3031   struct GNUNET_MQ_Envelope *ev;
3032   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3033
3034   if (NULL == view_array)
3035   {
3036     view_size = View_size ();
3037     view_array = View_get_as_array();
3038   }
3039
3040   ev = GNUNET_MQ_msg_extra (out_msg,
3041                             view_size * sizeof (struct GNUNET_PeerIdentity),
3042                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3043   out_msg->num_peers = htonl (view_size);
3044
3045   GNUNET_memcpy (&out_msg[1],
3046           view_array,
3047           view_size * sizeof (struct GNUNET_PeerIdentity));
3048   GNUNET_MQ_send (cli_ctx->mq, ev);
3049 }
3050
3051 /**
3052  * @brief sends updates to clients that are interested
3053  */
3054 static void
3055 clients_notify_view_update (void)
3056 {
3057   struct ClientContext *cli_ctx_iter;
3058   uint64_t num_peers;
3059   const struct GNUNET_PeerIdentity *view_array;
3060
3061   num_peers = View_size ();
3062   view_array = View_get_as_array();
3063   /* check size of view is small enough */
3064   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3065   {
3066     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3067                 "View is too big to send\n");
3068     return;
3069   }
3070
3071   for (cli_ctx_iter = cli_ctx_head;
3072        NULL != cli_ctx_iter;
3073        cli_ctx_iter = cli_ctx_head->next)
3074   {
3075     if (1 < cli_ctx_iter->view_updates_left)
3076     {
3077       /* Client wants to receive limited amount of updates */
3078       cli_ctx_iter->view_updates_left -= 1;
3079     } else if (1 == cli_ctx_iter->view_updates_left)
3080     {
3081       /* Last update of view for client */
3082       cli_ctx_iter->view_updates_left = -1;
3083     } else if (0 > cli_ctx_iter->view_updates_left) {
3084       /* Client is not interested in updates */
3085       continue;
3086     }
3087     /* else _updates_left == 0 - infinite amount of updates */
3088
3089     /* send view */
3090     send_view (cli_ctx_iter, view_array, num_peers);
3091   }
3092 }
3093
3094
3095 /**
3096  * Handle RPS request from the client.
3097  *
3098  * @param cls closure
3099  * @param message the actual message
3100  */
3101 static void
3102 handle_client_view_request (void *cls,
3103                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3104 {
3105   struct ClientContext *cli_ctx = cls;
3106   uint64_t num_updates;
3107
3108   num_updates = ntohl (msg->num_updates);
3109
3110   LOG (GNUNET_ERROR_TYPE_DEBUG,
3111        "Client requested %" PRIu64 " updates of view.\n",
3112        num_updates);
3113
3114   GNUNET_assert (NULL != cli_ctx);
3115   cli_ctx->view_updates_left = num_updates;
3116   send_view (cli_ctx, NULL, 0);
3117   GNUNET_SERVICE_client_continue (cli_ctx->client);
3118 }
3119
3120 /**
3121  * Handle a CHECK_LIVE message from another peer.
3122  *
3123  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3124  * the channel is blocked for all other communication.
3125  *
3126  * @param cls Closure
3127  * @param msg The message header
3128  */
3129 static void
3130 handle_peer_check (void *cls,
3131                    const struct GNUNET_MessageHeader *msg)
3132 {
3133   const struct GNUNET_PeerIdentity *peer = cls;
3134   LOG (GNUNET_ERROR_TYPE_DEBUG,
3135       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3136
3137   GNUNET_break_op (Peers_check_peer_known (peer));
3138   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3139 }
3140
3141 /**
3142  * Handle a PUSH message from another peer.
3143  *
3144  * Check the proof of work and store the PeerID
3145  * in the temporary list for pushed PeerIDs.
3146  *
3147  * @param cls Closure
3148  * @param msg The message header
3149  */
3150 static void
3151 handle_peer_push (void *cls,
3152                   const struct GNUNET_MessageHeader *msg)
3153 {
3154   const struct GNUNET_PeerIdentity *peer = cls;
3155
3156   // (check the proof of work (?))
3157
3158   LOG (GNUNET_ERROR_TYPE_DEBUG,
3159        "Received PUSH (%s)\n",
3160        GNUNET_i2s (peer));
3161   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3162
3163   #ifdef ENABLE_MALICIOUS
3164   struct AttackedPeer *tmp_att_peer;
3165
3166   if ( (1 == mal_type) ||
3167        (3 == mal_type) )
3168   { /* Try to maximise representation */
3169     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3170     tmp_att_peer->peer_id = *peer;
3171     if (NULL == att_peer_set)
3172       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3173     if (GNUNET_NO ==
3174         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3175                                                 peer))
3176     {
3177       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3178                                    att_peers_tail,
3179                                    tmp_att_peer);
3180       add_peer_array_to_set (peer, 1, att_peer_set);
3181     }
3182   }
3183
3184
3185   else if (2 == mal_type)
3186   {
3187     /* We attack one single well-known peer - simply ignore */
3188   }
3189   #endif /* ENABLE_MALICIOUS */
3190
3191   /* Add the sending peer to the push_map */
3192   CustomPeerMap_put (push_map, peer);
3193
3194   GNUNET_break_op (Peers_check_peer_known (peer));
3195   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3196 }
3197
3198
3199 /**
3200  * Handle PULL REQUEST request message from another peer.
3201  *
3202  * Reply with the view of PeerIDs.
3203  *
3204  * @param cls Closure
3205  * @param msg The message header
3206  */
3207 static void
3208 handle_peer_pull_request (void *cls,
3209                           const struct GNUNET_MessageHeader *msg)
3210 {
3211   struct GNUNET_PeerIdentity *peer = cls;
3212   const struct GNUNET_PeerIdentity *view_array;
3213
3214   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3215   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3216
3217   #ifdef ENABLE_MALICIOUS
3218   if (1 == mal_type
3219       || 3 == mal_type)
3220   { /* Try to maximise representation */
3221     send_pull_reply (peer, mal_peers, num_mal_peers);
3222   }
3223
3224   else if (2 == mal_type)
3225   { /* Try to partition network */
3226     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3227     {
3228       send_pull_reply (peer, mal_peers, num_mal_peers);
3229     }
3230   }
3231   #endif /* ENABLE_MALICIOUS */
3232
3233   GNUNET_break_op (Peers_check_peer_known (peer));
3234   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3235   view_array = View_get_as_array ();
3236   send_pull_reply (peer, view_array, View_size ());
3237 }
3238
3239
3240 /**
3241  * Check whether we sent a corresponding request and
3242  * whether this reply is the first one.
3243  *
3244  * @param cls Closure
3245  * @param msg The message header
3246  */
3247 static int
3248 check_peer_pull_reply (void *cls,
3249                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3250 {
3251   struct GNUNET_PeerIdentity *sender = cls;
3252
3253   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3254   {
3255     GNUNET_break_op (0);
3256     return GNUNET_SYSERR;
3257   }
3258
3259   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3260       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3261   {
3262     LOG (GNUNET_ERROR_TYPE_ERROR,
3263         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3264         ntohl (msg->num_peers),
3265         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3266             sizeof (struct GNUNET_PeerIdentity));
3267     GNUNET_break_op (0);
3268     return GNUNET_SYSERR;
3269   }
3270
3271   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3272   {
3273     LOG (GNUNET_ERROR_TYPE_WARNING,
3274         "Received a pull reply from a peer we didn't request one from!\n");
3275     GNUNET_break_op (0);
3276     return GNUNET_SYSERR;
3277   }
3278   return GNUNET_OK;
3279 }
3280
3281 /**
3282  * Handle PULL REPLY message from another peer.
3283  *
3284  * @param cls Closure
3285  * @param msg The message header
3286  */
3287 static void
3288 handle_peer_pull_reply (void *cls,
3289                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3290 {
3291   const struct GNUNET_PeerIdentity *peers;
3292   struct GNUNET_PeerIdentity *sender = cls;
3293   uint32_t i;
3294 #ifdef ENABLE_MALICIOUS
3295   struct AttackedPeer *tmp_att_peer;
3296 #endif /* ENABLE_MALICIOUS */
3297
3298   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3299   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3300
3301   #ifdef ENABLE_MALICIOUS
3302   // We shouldn't even receive pull replies as we're not sending
3303   if (2 == mal_type)
3304   {
3305   }
3306   #endif /* ENABLE_MALICIOUS */
3307
3308   /* Do actual logic */
3309   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3310
3311   LOG (GNUNET_ERROR_TYPE_DEBUG,
3312        "PULL REPLY received, got following %u peers:\n",
3313        ntohl (msg->num_peers));
3314
3315   for (i = 0; i < ntohl (msg->num_peers); i++)
3316   {
3317     LOG (GNUNET_ERROR_TYPE_DEBUG,
3318          "%u. %s\n",
3319          i,
3320          GNUNET_i2s (&peers[i]));
3321
3322     #ifdef ENABLE_MALICIOUS
3323     if ((NULL != att_peer_set) &&
3324         (1 == mal_type || 3 == mal_type))
3325     { /* Add attacked peer to local list */
3326       // TODO check if we sent a request and this was the first reply
3327       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3328                                                                &peers[i])
3329           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3330                                                                   &peers[i])
3331           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3332                                                    &own_identity))
3333       {
3334         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3335         tmp_att_peer->peer_id = peers[i];
3336         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3337                                      att_peers_tail,
3338                                      tmp_att_peer);
3339         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3340       }
3341       continue;
3342     }
3343     #endif /* ENABLE_MALICIOUS */
3344     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3345                                               &peers[i]))
3346     {
3347       /* Make sure we 'know' about this peer */
3348       (void) Peers_insert_peer (&peers[i]);
3349
3350       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3351       {
3352         CustomPeerMap_put (pull_map, &peers[i]);
3353       }
3354       else
3355       {
3356         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3357         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3358       }
3359     }
3360   }
3361
3362   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3363   clean_peer (sender);
3364
3365   GNUNET_break_op (Peers_check_peer_known (sender));
3366   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3367 }
3368
3369
3370 /**
3371  * Compute a random delay.
3372  * A uniformly distributed value between mean + spread and mean - spread.
3373  *
3374  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3375  * It would return a random value between 2 and 6 min.
3376  *
3377  * @param mean the mean
3378  * @param spread the inverse amount of deviation from the mean
3379  */
3380 static struct GNUNET_TIME_Relative
3381 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3382                     unsigned int spread)
3383 {
3384   struct GNUNET_TIME_Relative half_interval;
3385   struct GNUNET_TIME_Relative ret;
3386   unsigned int rand_delay;
3387   unsigned int max_rand_delay;
3388
3389   if (0 == spread)
3390   {
3391     LOG (GNUNET_ERROR_TYPE_WARNING,
3392          "Not accepting spread of 0\n");
3393     GNUNET_break (0);
3394     GNUNET_assert (0);
3395   }
3396   GNUNET_assert (0 != mean.rel_value_us);
3397
3398   /* Compute random time value between spread * mean and spread * mean */
3399   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3400
3401   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3402   /**
3403    * Compute random value between (0 and 1) * round_interval
3404    * via multiplying round_interval with a 'fraction' (0 to value)/value
3405    */
3406   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3407   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3408   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3409   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3410
3411   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3412     LOG (GNUNET_ERROR_TYPE_WARNING,
3413          "Returning FOREVER_REL\n");
3414
3415   return ret;
3416 }
3417
3418
3419 /**
3420  * Send single pull request
3421  *
3422  * @param peer_id the peer to send the pull request to.
3423  */
3424 static void
3425 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3426 {
3427   struct GNUNET_MQ_Envelope *ev;
3428
3429   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3430                                                      Peers_PULL_REPLY_PENDING));
3431   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3432
3433   LOG (GNUNET_ERROR_TYPE_DEBUG,
3434        "Going to send PULL REQUEST to peer %s.\n",
3435        GNUNET_i2s (peer));
3436
3437   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3438   Peers_send_message (peer, ev, "PULL REQUEST");
3439   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3440 }
3441
3442
3443 /**
3444  * Send single push
3445  *
3446  * @param peer_id the peer to send the push to.
3447  */
3448 static void
3449 send_push (const struct GNUNET_PeerIdentity *peer_id)
3450 {
3451   struct GNUNET_MQ_Envelope *ev;
3452
3453   LOG (GNUNET_ERROR_TYPE_DEBUG,
3454        "Going to send PUSH to peer %s.\n",
3455        GNUNET_i2s (peer_id));
3456
3457   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3458   Peers_send_message (peer_id, ev, "PUSH");
3459   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3460 }
3461
3462
3463 static void
3464 do_round (void *cls);
3465
3466 static void
3467 do_mal_round (void *cls);
3468
3469 #ifdef ENABLE_MALICIOUS
3470
3471
3472 /**
3473  * @brief This function is called, when the client tells us to act malicious.
3474  * It verifies that @a msg is well-formed.
3475  *
3476  * @param cls the closure (#ClientContext)
3477  * @param msg the message
3478  * @return #GNUNET_OK if @a msg is well-formed
3479  */
3480 static int
3481 check_client_act_malicious (void *cls,
3482                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3483 {
3484   struct ClientContext *cli_ctx = cls;
3485   uint16_t msize = ntohs (msg->header.size);
3486   uint32_t num_peers = ntohl (msg->num_peers);
3487
3488   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3489   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3490        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3491   {
3492     LOG (GNUNET_ERROR_TYPE_ERROR,
3493         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3494         ntohl (msg->num_peers),
3495         (msize / sizeof (struct GNUNET_PeerIdentity)));
3496     GNUNET_break (0);
3497     GNUNET_SERVICE_client_drop (cli_ctx->client);
3498     return GNUNET_SYSERR;
3499   }
3500   return GNUNET_OK;
3501 }
3502
3503 /**
3504  * Turn RPS service to act malicious.
3505  *
3506  * @param cls Closure
3507  * @param client The client that sent the message
3508  * @param msg The message header
3509  */
3510 static void
3511 handle_client_act_malicious (void *cls,
3512                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3513 {
3514   struct ClientContext *cli_ctx = cls;
3515   struct GNUNET_PeerIdentity *peers;
3516   uint32_t num_mal_peers_sent;
3517   uint32_t num_mal_peers_old;
3518
3519   /* Do actual logic */
3520   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3521   mal_type = ntohl (msg->type);
3522   if (NULL == mal_peer_set)
3523     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3524
3525   LOG (GNUNET_ERROR_TYPE_DEBUG,
3526        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3527        mal_type,
3528        ntohl (msg->num_peers));
3529
3530   if (1 == mal_type)
3531   { /* Try to maximise representation */
3532     /* Add other malicious peers to those we already know */
3533
3534     num_mal_peers_sent = ntohl (msg->num_peers);
3535     num_mal_peers_old = num_mal_peers;
3536     GNUNET_array_grow (mal_peers,
3537                        num_mal_peers,
3538                        num_mal_peers + num_mal_peers_sent);
3539     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3540             peers,
3541             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3542
3543     /* Add all mal peers to mal_peer_set */
3544     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3545                            num_mal_peers_sent,
3546                            mal_peer_set);
3547
3548     /* Substitute do_round () with do_mal_round () */
3549     GNUNET_SCHEDULER_cancel (do_round_task);
3550     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3551   }
3552
3553   else if ( (2 == mal_type) ||
3554             (3 == mal_type) )
3555   { /* Try to partition the network */
3556     /* Add other malicious peers to those we already know */
3557
3558     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3559     num_mal_peers_old = num_mal_peers;
3560     GNUNET_array_grow (mal_peers,
3561                        num_mal_peers,
3562                        num_mal_peers + num_mal_peers_sent);
3563     if (NULL != mal_peers &&
3564         0 != num_mal_peers)
3565     {
3566       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3567               peers,
3568               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3569
3570       /* Add all mal peers to mal_peer_set */
3571       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3572                              num_mal_peers_sent,
3573                              mal_peer_set);
3574     }
3575
3576     /* Store the one attacked peer */
3577     GNUNET_memcpy (&attacked_peer,
3578             &msg->attacked_peer,
3579             sizeof (struct GNUNET_PeerIdentity));
3580     /* Set the flag of the attacked peer to valid to avoid problems */
3581     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3582     {
3583       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3584     }
3585
3586     LOG (GNUNET_ERROR_TYPE_DEBUG,
3587          "Attacked peer is %s\n",
3588          GNUNET_i2s (&attacked_peer));
3589
3590     /* Substitute do_round () with do_mal_round () */
3591     GNUNET_SCHEDULER_cancel (do_round_task);
3592     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3593   }
3594   else if (0 == mal_type)
3595   { /* Stop acting malicious */
3596     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3597
3598     /* Substitute do_mal_round () with do_round () */
3599     GNUNET_SCHEDULER_cancel (do_round_task);
3600     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3601   }
3602   else
3603   {
3604     GNUNET_break (0);
3605     GNUNET_SERVICE_client_continue (cli_ctx->client);
3606   }
3607   GNUNET_SERVICE_client_continue (cli_ctx->client);
3608 }
3609
3610
3611 /**
3612  * Send out PUSHes and PULLs maliciously.
3613  *
3614  * This is executed regylary.
3615  */
3616 static void
3617 do_mal_round (void *cls)
3618 {
3619   uint32_t num_pushes;
3620   uint32_t i;
3621   struct GNUNET_TIME_Relative time_next_round;
3622   struct AttackedPeer *tmp_att_peer;
3623
3624   LOG (GNUNET_ERROR_TYPE_DEBUG,
3625        "Going to execute next round maliciously type %" PRIu32 ".\n",
3626       mal_type);
3627   do_round_task = NULL;
3628   GNUNET_assert (mal_type <= 3);
3629   /* Do malicious actions */
3630   if (1 == mal_type)
3631   { /* Try to maximise representation */
3632
3633     /* The maximum of pushes we're going to send this round */
3634     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3635                                          num_attacked_peers),
3636                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3637
3638     LOG (GNUNET_ERROR_TYPE_DEBUG,
3639          "Going to send %" PRIu32 " pushes\n",
3640          num_pushes);
3641
3642     /* Send PUSHes to attacked peers */
3643     for (i = 0 ; i < num_pushes ; i++)
3644     {
3645       if (att_peers_tail == att_peer_index)
3646         att_peer_index = att_peers_head;
3647       else
3648         att_peer_index = att_peer_index->next;
3649
3650       send_push (&att_peer_index->peer_id);
3651     }
3652
3653     /* Send PULLs to some peers to learn about additional peers to attack */
3654     tmp_att_peer = att_peer_index;
3655     for (i = 0 ; i < num_pushes * alpha ; i++)
3656     {
3657       if (att_peers_tail == tmp_att_peer)
3658         tmp_att_peer = att_peers_head;
3659       else
3660         att_peer_index = tmp_att_peer->next;
3661
3662       send_pull_request (&tmp_att_peer->peer_id);
3663     }
3664   }
3665
3666
3667   else if (2 == mal_type)
3668   { /**
3669      * Try to partition the network
3670      * Send as many pushes to the attacked peer as possible
3671      * That is one push per round as it will ignore more.
3672      */
3673     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3674     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3675       send_push (&attacked_peer);
3676   }
3677
3678
3679   if (3 == mal_type)
3680   { /* Combined attack */
3681
3682     /* Send PUSH to attacked peers */
3683     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3684     {
3685       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3686       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3687       {
3688         LOG (GNUNET_ERROR_TYPE_DEBUG,
3689             "Goding to send push to attacked peer (%s)\n",
3690             GNUNET_i2s (&attacked_peer));
3691         send_push (&attacked_peer);
3692       }
3693     }
3694     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3695
3696     /* The maximum of pushes we're going to send this round */
3697     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3698                                          num_attacked_peers),
3699                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3700
3701     LOG (GNUNET_ERROR_TYPE_DEBUG,
3702          "Going to send %" PRIu32 " pushes\n",
3703          num_pushes);
3704
3705     for (i = 0; i < num_pushes; i++)
3706     {
3707       if (att_peers_tail == att_peer_index)
3708         att_peer_index = att_peers_head;
3709       else
3710         att_peer_index = att_peer_index->next;
3711
3712       send_push (&att_peer_index->peer_id);
3713     }
3714
3715     /* Send PULLs to some peers to learn about additional peers to attack */
3716     tmp_att_peer = att_peer_index;
3717     for (i = 0; i < num_pushes * alpha; i++)
3718     {
3719       if (att_peers_tail == tmp_att_peer)
3720         tmp_att_peer = att_peers_head;
3721       else
3722         att_peer_index = tmp_att_peer->next;
3723
3724       send_pull_request (&tmp_att_peer->peer_id);
3725     }
3726   }
3727
3728   /* Schedule next round */
3729   time_next_round = compute_rand_delay (round_interval, 2);
3730
3731   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3732   //NULL);
3733   GNUNET_assert (NULL == do_round_task);
3734   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3735                                                 &do_mal_round, NULL);
3736   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3737 }
3738 #endif /* ENABLE_MALICIOUS */
3739
3740 /**
3741  * Send out PUSHes and PULLs, possibly update #view, samplers.
3742  *
3743  * This is executed regylary.
3744  */
3745 static void
3746 do_round (void *cls)
3747 {
3748   uint32_t i;
3749   const struct GNUNET_PeerIdentity *view_array;
3750   unsigned int *permut;
3751   unsigned int a_peers; /* Number of peers we send pushes to */
3752   unsigned int b_peers; /* Number of peers we send pull requests to */
3753   uint32_t first_border;
3754   uint32_t second_border;
3755   struct GNUNET_PeerIdentity peer;
3756   struct GNUNET_PeerIdentity *update_peer;
3757
3758   LOG (GNUNET_ERROR_TYPE_DEBUG,
3759        "Going to execute next round.\n");
3760   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3761   do_round_task = NULL;
3762   LOG (GNUNET_ERROR_TYPE_DEBUG,
3763        "Printing view:\n");
3764   to_file (file_name_view_log,
3765            "___ new round ___");
3766   view_array = View_get_as_array ();
3767   for (i = 0; i < View_size (); i++)
3768   {
3769     LOG (GNUNET_ERROR_TYPE_DEBUG,
3770          "\t%s\n", GNUNET_i2s (&view_array[i]));
3771     to_file (file_name_view_log,
3772              "=%s\t(do round)",
3773              GNUNET_i2s_full (&view_array[i]));
3774   }
3775
3776
3777   /* Send pushes and pull requests */
3778   if (0 < View_size ())
3779   {
3780     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3781                                            View_size ());
3782
3783     /* Send PUSHes */
3784     a_peers = ceil (alpha * View_size ());
3785
3786     LOG (GNUNET_ERROR_TYPE_DEBUG,
3787          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3788          a_peers, alpha, View_size ());
3789     for (i = 0; i < a_peers; i++)
3790     {
3791       peer = view_array[permut[i]];
3792       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3793       { // FIXME if this fails schedule/loop this for later
3794         send_push (&peer);
3795       }
3796     }
3797
3798     /* Send PULL requests */
3799     b_peers = ceil (beta * View_size ());
3800     first_border = a_peers;
3801     second_border = a_peers + b_peers;
3802     if (second_border > View_size ())
3803     {
3804       first_border = View_size () - b_peers;
3805       second_border = View_size ();
3806     }
3807     LOG (GNUNET_ERROR_TYPE_DEBUG,
3808         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3809         b_peers, beta, View_size ());
3810     for (i = first_border; i < second_border; i++)
3811     {
3812       peer = view_array[permut[i]];
3813       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3814           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3815       { // FIXME if this fails schedule/loop this for later
3816         send_pull_request (&peer);
3817       }
3818     }
3819
3820     GNUNET_free (permut);
3821     permut = NULL;
3822   }
3823
3824
3825   /* Update view */
3826   /* TODO see how many peers are in push-/pull- list! */
3827
3828   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3829       (0 < CustomPeerMap_size (push_map)) &&
3830       (0 < CustomPeerMap_size (pull_map)))
3831   //if (GNUNET_YES) // disable blocking temporarily
3832   { /* If conditions for update are fulfilled, update */
3833     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3834
3835     uint32_t final_size;
3836     uint32_t peers_to_clean_size;
3837     struct GNUNET_PeerIdentity *peers_to_clean;
3838
3839     peers_to_clean = NULL;
3840     peers_to_clean_size = 0;
3841     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3842     GNUNET_memcpy (peers_to_clean,
3843             view_array,
3844             View_size () * sizeof (struct GNUNET_PeerIdentity));
3845
3846     /* Seems like recreating is the easiest way of emptying the peermap */
3847     View_clear ();
3848     to_file (file_name_view_log,
3849              "--- emptied ---");
3850
3851     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3852                                 CustomPeerMap_size (push_map));
3853     second_border = first_border +
3854                     GNUNET_MIN (floor (beta  * view_size_est_need),
3855                                 CustomPeerMap_size (pull_map));
3856     final_size    = second_border +
3857       ceil ((1 - (alpha + beta)) * view_size_est_need);
3858     LOG (GNUNET_ERROR_TYPE_DEBUG,
3859         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3860         first_border,
3861         second_border,
3862         final_size);
3863
3864     /* Update view with peers received through PUSHes */
3865     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3866                                            CustomPeerMap_size (push_map));
3867     for (i = 0; i < first_border; i++)
3868     {
3869       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3870                                                               permut[i]));
3871       to_file (file_name_view_log,
3872                "+%s\t(push list)",
3873                GNUNET_i2s_full (&view_array[i]));
3874       // TODO change the peer_flags accordingly
3875     }
3876     GNUNET_free (permut);
3877     permut = NULL;
3878
3879     /* Update view with peers received through PULLs */
3880     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3881                                            CustomPeerMap_size (pull_map));
3882     for (i = first_border; i < second_border; i++)
3883     {
3884       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3885             permut[i - first_border]));
3886       to_file (file_name_view_log,
3887                "+%s\t(pull list)",
3888                GNUNET_i2s_full (&view_array[i]));
3889       // TODO change the peer_flags accordingly
3890     }
3891     GNUNET_free (permut);
3892     permut = NULL;
3893
3894     /* Update view with peers from history */
3895     RPS_sampler_get_n_rand_peers (prot_sampler,
3896                                   hist_update,
3897                                   NULL,
3898                                   final_size - second_border);
3899     // TODO change the peer_flags accordingly
3900
3901     for (i = 0; i < View_size (); i++)
3902       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3903
3904     /* Clean peers that were removed from the view */
3905     for (i = 0; i < peers_to_clean_size; i++)
3906     {
3907       to_file (file_name_view_log,
3908                "-%s",
3909                GNUNET_i2s_full (&peers_to_clean[i]));
3910       clean_peer (&peers_to_clean[i]);
3911       //peer_destroy_channel_send (sender);
3912     }
3913
3914     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3915     clients_notify_view_update();
3916   } else {
3917     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3918     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3919     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3920         !(0 >= CustomPeerMap_size (pull_map)))
3921       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3922     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3923         (0 >= CustomPeerMap_size (pull_map)))
3924       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3925     if (0 >= CustomPeerMap_size (push_map) &&
3926         !(0 >= CustomPeerMap_size (pull_map)))
3927       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3928     if (0 >= CustomPeerMap_size (push_map) &&
3929         (0 >= CustomPeerMap_size (pull_map)))
3930       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3931     if (0 >= CustomPeerMap_size (pull_map) &&
3932         CustomPeerMap_size (push_map) > alpha * View_size () &&
3933         0 >= CustomPeerMap_size (push_map))
3934       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3935   }
3936   // TODO independent of that also get some peers from CADET_get_peers()?
3937   GNUNET_STATISTICS_set (stats,
3938       "# peers in push map at end of round",
3939       CustomPeerMap_size (push_map),
3940       GNUNET_NO);
3941   GNUNET_STATISTICS_set (stats,
3942       "# peers in pull map at end of round",
3943       CustomPeerMap_size (pull_map),
3944       GNUNET_NO);
3945   GNUNET_STATISTICS_set (stats,
3946       "# peers in view at end of round",
3947       View_size (),
3948       GNUNET_NO);
3949
3950   LOG (GNUNET_ERROR_TYPE_DEBUG,
3951        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3952        CustomPeerMap_size (push_map),
3953        CustomPeerMap_size (pull_map),
3954        alpha,
3955        View_size (),
3956        alpha * View_size ());
3957
3958   /* Update samplers */
3959   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3960   {
3961     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3962     LOG (GNUNET_ERROR_TYPE_DEBUG,
3963          "Updating with peer %s from push list\n",
3964          GNUNET_i2s (update_peer));
3965     insert_in_sampler (NULL, update_peer);
3966     clean_peer (update_peer); /* This cleans only if it is not in the view */
3967     //peer_destroy_channel_send (sender);
3968   }
3969
3970   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3971   {
3972     LOG (GNUNET_ERROR_TYPE_DEBUG,
3973          "Updating with peer %s from pull list\n",
3974          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3975     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3976     /* This cleans only if it is not in the view */
3977     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3978     //peer_destroy_channel_send (sender);
3979   }
3980
3981
3982   /* Empty push/pull lists */
3983   CustomPeerMap_clear (push_map);
3984   CustomPeerMap_clear (pull_map);
3985
3986   struct GNUNET_TIME_Relative time_next_round;
3987
3988   time_next_round = compute_rand_delay (round_interval, 2);
3989
3990   /* Schedule next round */
3991   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3992                                                 &do_round, NULL);
3993   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3994 }
3995
3996
3997 /**
3998  * This is called from GNUNET_CADET_get_peers().
3999  *
4000  * It is called on every peer(ID) that cadet somehow has contact with.
4001  * We use those to initialise the sampler.
4002  */
4003 void
4004 init_peer_cb (void *cls,
4005               const struct GNUNET_PeerIdentity *peer,
4006               int tunnel, // "Do we have a tunnel towards this peer?"
4007               unsigned int n_paths, // "Number of known paths towards this peer"
4008               unsigned int best_path) // "How long is the best path?
4009                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4010 {
4011   if (NULL != peer)
4012   {
4013     LOG (GNUNET_ERROR_TYPE_DEBUG,
4014          "Got peer_id %s from cadet\n",
4015          GNUNET_i2s (peer));
4016     got_peer (peer);
4017   }
4018 }
4019
4020 /**
4021  * @brief Iterator function over stored, valid peers.
4022  *
4023  * We initialise the sampler with those.
4024  *
4025  * @param cls the closure
4026  * @param peer the peer id
4027  * @return #GNUNET_YES if we should continue to
4028  *         iterate,
4029  *         #GNUNET_NO if not.
4030  */
4031 static int
4032 valid_peers_iterator (void *cls,
4033                       const struct GNUNET_PeerIdentity *peer)
4034 {
4035   if (NULL != peer)
4036   {
4037     LOG (GNUNET_ERROR_TYPE_DEBUG,
4038          "Got stored, valid peer %s\n",
4039          GNUNET_i2s (peer));
4040     got_peer (peer);
4041   }
4042   return GNUNET_YES;
4043 }
4044
4045
4046 /**
4047  * Iterator over peers from peerinfo.
4048  *
4049  * @param cls closure
4050  * @param peer id of the peer, NULL for last call
4051  * @param hello hello message for the peer (can be NULL)
4052  * @param error message
4053  */
4054 void
4055 process_peerinfo_peers (void *cls,
4056                         const struct GNUNET_PeerIdentity *peer,
4057                         const struct GNUNET_HELLO_Message *hello,
4058                         const char *err_msg)
4059 {
4060   if (NULL != peer)
4061   {
4062     LOG (GNUNET_ERROR_TYPE_DEBUG,
4063          "Got peer_id %s from peerinfo\n",
4064          GNUNET_i2s (peer));
4065     got_peer (peer);
4066   }
4067 }
4068
4069
4070 /**
4071  * Task run during shutdown.
4072  *
4073  * @param cls unused
4074  */
4075 static void
4076 shutdown_task (void *cls)
4077 {
4078   struct ClientContext *client_ctx;
4079   struct ReplyCls *reply_cls;
4080
4081   LOG (GNUNET_ERROR_TYPE_DEBUG,
4082        "RPS is going down\n");
4083
4084   /* Clean all clients */
4085   for (client_ctx = cli_ctx_head;
4086        NULL != cli_ctx_head;
4087        client_ctx = cli_ctx_head)
4088   {
4089     /* Clean pending requests to the sampler */
4090     for (reply_cls = client_ctx->rep_cls_head;
4091          NULL != client_ctx->rep_cls_head;
4092          reply_cls = client_ctx->rep_cls_head)
4093     {
4094       RPS_sampler_request_cancel (reply_cls->req_handle);
4095       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4096                                    client_ctx->rep_cls_tail,
4097                                    reply_cls);
4098       GNUNET_free (reply_cls);
4099     }
4100     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4101     GNUNET_free (client_ctx);
4102   }
4103   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4104   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4105
4106   if (NULL != do_round_task)
4107   {
4108     GNUNET_SCHEDULER_cancel (do_round_task);
4109     do_round_task = NULL;
4110   }
4111
4112   Peers_terminate ();
4113
4114   GNUNET_NSE_disconnect (nse);
4115   RPS_sampler_destroy (prot_sampler);
4116   RPS_sampler_destroy (client_sampler);
4117   GNUNET_CADET_close_port (cadet_port);
4118   GNUNET_CADET_disconnect (cadet_handle);
4119   View_destroy ();
4120   CustomPeerMap_destroy (push_map);
4121   CustomPeerMap_destroy (pull_map);
4122   if (NULL != stats)
4123   {
4124     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4125     stats = NULL;
4126   }
4127   #ifdef ENABLE_MALICIOUS
4128   struct AttackedPeer *tmp_att_peer;
4129   GNUNET_free (file_name_view_log);
4130   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4131   if (NULL != mal_peer_set)
4132     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4133   if (NULL != att_peer_set)
4134     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4135   while (NULL != att_peers_head)
4136   {
4137     tmp_att_peer = att_peers_head;
4138     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4139   }
4140   #endif /* ENABLE_MALICIOUS */
4141 }
4142
4143
4144 /**
4145  * Handle client connecting to the service.
4146  *
4147  * @param cls NULL
4148  * @param client the new client
4149  * @param mq the message queue of @a client
4150  * @return @a client
4151  */
4152 static void *
4153 client_connect_cb (void *cls,
4154                    struct GNUNET_SERVICE_Client *client,
4155                    struct GNUNET_MQ_Handle *mq)
4156 {
4157   struct ClientContext *cli_ctx;
4158
4159   LOG (GNUNET_ERROR_TYPE_DEBUG,
4160        "Client connected\n");
4161   if (NULL == client)
4162     return client; /* Server was destroyed before a client connected. Shutting down */
4163   cli_ctx = GNUNET_new (struct ClientContext);
4164   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4165   cli_ctx->view_updates_left = -1;
4166   cli_ctx->client = client;
4167   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4168                                cli_ctx_tail,
4169                                cli_ctx);
4170   return cli_ctx;
4171 }
4172
4173 /**
4174  * Callback called when a client disconnected from the service
4175  *
4176  * @param cls closure for the service
4177  * @param c the client that disconnected
4178  * @param internal_cls should be equal to @a c
4179  */
4180 static void
4181 client_disconnect_cb (void *cls,
4182                       struct GNUNET_SERVICE_Client *client,
4183                       void *internal_cls)
4184 {
4185   struct ClientContext *cli_ctx = internal_cls;
4186
4187   GNUNET_assert (client == cli_ctx->client);
4188   if (NULL == client)
4189   {/* shutdown task - destroy all clients */
4190     while (NULL != cli_ctx_head)
4191       destroy_cli_ctx (cli_ctx_head);
4192   }
4193   else
4194   { /* destroy this client */
4195     LOG (GNUNET_ERROR_TYPE_DEBUG,
4196         "Client disconnected. Destroy its context.\n");
4197     destroy_cli_ctx (cli_ctx);
4198   }
4199 }
4200
4201
4202 /**
4203  * Handle random peer sampling clients.
4204  *
4205  * @param cls closure
4206  * @param c configuration to use
4207  * @param service the initialized service
4208  */
4209 static void
4210 run (void *cls,
4211      const struct GNUNET_CONFIGURATION_Handle *c,
4212      struct GNUNET_SERVICE_Handle *service)
4213 {
4214   int size;
4215   int out_size;
4216   char* fn_valid_peers;
4217   struct GNUNET_HashCode port;
4218
4219   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4220   cfg = c;
4221
4222
4223   /* Get own ID */
4224   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4225   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4226               "STARTING SERVICE (rps) for peer [%s]\n",
4227               GNUNET_i2s (&own_identity));
4228   #ifdef ENABLE_MALICIOUS
4229   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4230               "Malicious execution compiled in.\n");
4231   #endif /* ENABLE_MALICIOUS */
4232
4233
4234
4235   /* Get time interval from the configuration */
4236   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4237                                                         "ROUNDINTERVAL",
4238                                                         &round_interval))
4239   {
4240     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4241                                "RPS", "ROUNDINTERVAL");
4242     GNUNET_SCHEDULER_shutdown ();
4243     return;
4244   }
4245
4246   /* Get initial size of sampler/view from the configuration */
4247   if (GNUNET_OK !=
4248       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4249         (long long unsigned int *) &sampler_size_est_min))
4250   {
4251     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4252                                "RPS", "MINSIZE");
4253     GNUNET_SCHEDULER_shutdown ();
4254     return;
4255   }
4256   sampler_size_est_need = sampler_size_est_min;
4257   view_size_est_min = sampler_size_est_min;
4258   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4259
4260   if (GNUNET_OK !=
4261       GNUNET_CONFIGURATION_get_value_filename (cfg,
4262                                                "rps",
4263                                                "FILENAME_VALID_PEERS",
4264                                                &fn_valid_peers))
4265   {
4266     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4267                                "rps", "FILENAME_VALID_PEERS");
4268   }
4269
4270
4271   View_create (4);
4272
4273   /* file_name_view_log */
4274   if (GNUNET_OK != GNUNET_DISK_directory_create ("/tmp/rps/"))
4275   {
4276     LOG (GNUNET_ERROR_TYPE_WARNING,
4277          "Failed to create directory /tmp/rps/\n");
4278   }
4279
4280   size = (14 + strlen (GNUNET_i2s_full (&own_identity)) + 1) * sizeof (char);
4281   file_name_view_log = GNUNET_malloc (size);
4282   out_size = GNUNET_snprintf (file_name_view_log,
4283                               size,
4284                               "/tmp/rps/view-%s",
4285                               GNUNET_i2s_full (&own_identity));
4286   if (size < out_size ||
4287       0 > out_size)
4288   {
4289     LOG (GNUNET_ERROR_TYPE_WARNING,
4290          "Failed to write string to buffer (size: %i, out_size: %i)\n",
4291          size,
4292          out_size);
4293   }
4294
4295
4296   /* connect to NSE */
4297   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4298
4299
4300   alpha = 0.45;
4301   beta  = 0.45;
4302
4303
4304   /* Initialise cadet */
4305   /* There exists a copy-paste-clone in get_channel() */
4306   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4307     GNUNET_MQ_hd_fixed_size (peer_check,
4308                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4309                              struct GNUNET_MessageHeader,
4310                              NULL),
4311     GNUNET_MQ_hd_fixed_size (peer_push,
4312                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4313                              struct GNUNET_MessageHeader,
4314                              NULL),
4315     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4316                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4317                              struct GNUNET_MessageHeader,
4318                              NULL),
4319     GNUNET_MQ_hd_var_size (peer_pull_reply,
4320                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4321                            struct GNUNET_RPS_P2P_PullReplyMessage,
4322                            NULL),
4323     GNUNET_MQ_handler_end ()
4324   };
4325
4326   cadet_handle = GNUNET_CADET_connect (cfg);
4327   GNUNET_assert (NULL != cadet_handle);
4328   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4329                       strlen (GNUNET_APPLICATION_PORT_RPS),
4330                       &port);
4331   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4332                                        &port,
4333                                        &Peers_handle_inbound_channel, /* Connect handler */
4334                                        NULL, /* cls */
4335                                        NULL, /* WindowSize handler */
4336                                        cleanup_destroyed_channel, /* Disconnect handler */
4337                                        cadet_handlers);
4338
4339
4340   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4341   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4342   GNUNET_free (fn_valid_peers);
4343
4344   /* Initialise sampler */
4345   struct GNUNET_TIME_Relative half_round_interval;
4346   struct GNUNET_TIME_Relative  max_round_interval;
4347
4348   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4349   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4350
4351   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4352   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4353
4354   /* Initialise push and pull maps */
4355   push_map = CustomPeerMap_create (4);
4356   pull_map = CustomPeerMap_create (4);
4357
4358
4359   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4360   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4361   // TODO send push/pull to each of those peers?
4362   // TODO read stored valid peers from last run
4363   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4364   Peers_get_valid_peers (valid_peers_iterator, NULL);
4365
4366   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4367                                                    GNUNET_NO,
4368                                                    process_peerinfo_peers,
4369                                                    NULL);
4370
4371   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4372
4373   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4374   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4375
4376   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4377   stats = GNUNET_STATISTICS_create ("rps", cfg);
4378
4379 }
4380
4381
4382 /**
4383  * Define "main" method using service macro.
4384  */
4385 GNUNET_SERVICE_MAIN
4386 ("rps",
4387  GNUNET_SERVICE_OPTION_NONE,
4388  &run,
4389  &client_connect_cb,
4390  &client_disconnect_cb,
4391  NULL,
4392  GNUNET_MQ_hd_fixed_size (client_request,
4393    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4394    struct GNUNET_RPS_CS_RequestMessage,
4395    NULL),
4396  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4397    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4398    struct GNUNET_RPS_CS_RequestCancelMessage,
4399    NULL),
4400  GNUNET_MQ_hd_var_size (client_seed,
4401    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4402    struct GNUNET_RPS_CS_SeedMessage,
4403    NULL),
4404 #ifdef ENABLE_MALICIOUS
4405  GNUNET_MQ_hd_var_size (client_act_malicious,
4406    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4407    struct GNUNET_RPS_CS_ActMaliciousMessage,
4408    NULL),
4409 #endif /* ENABLE_MALICIOUS */
4410  GNUNET_MQ_hd_fixed_size (client_view_request,
4411    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4412    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4413    NULL),
4414  GNUNET_MQ_handler_end());
4415
4416 /* end of gnunet-service-rps.c */