first batch of license fixes (boring)
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file rps/gnunet-service-rps.c
18  * @brief rps service implementation
19  * @author Julius Bünger
20  */
21 #include "platform.h"
22 #include "gnunet_applications.h"
23 #include "gnunet_util_lib.h"
24 #include "gnunet_cadet_service.h"
25 #include "gnunet_peerinfo_service.h"
26 #include "gnunet_nse_service.h"
27 #include "gnunet_statistics_service.h"
28 #include "rps.h"
29 #include "rps-test_util.h"
30 #include "gnunet-service-rps_sampler.h"
31 #include "gnunet-service-rps_custommap.h"
32 #include "gnunet-service-rps_view.h"
33
34 #include <math.h>
35 #include <inttypes.h>
36
37 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
38
39 // TODO modify @brief in every file
40
41 // TODO check for overflows
42
43 // TODO align message structs
44
45 // TODO connect to friends
46
47 // TODO store peers somewhere persistent
48
49 // TODO blacklist? (-> mal peer detection on top of brahms)
50
51 // hist_size_init, hist_size_max
52
53 /**
54  * Our configuration.
55  */
56 static const struct GNUNET_CONFIGURATION_Handle *cfg;
57
58 /**
59  * Handle to the statistics service.
60  */
61 static struct GNUNET_STATISTICS_Handle *stats;
62
63 /**
64  * Our own identity.
65  */
66 static struct GNUNET_PeerIdentity own_identity;
67
68
69
70 /***********************************************************************
71  * Old gnunet-service-rps_peers.c
72 ***********************************************************************/
73
74 /**
75  * Set a peer flag of given peer context.
76  */
77 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
78
79 /**
80  * Get peer flag of given peer context.
81  */
82 #define check_peer_flag_set(peer_ctx, mask)\
83   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
84
85 /**
86  * Unset flag of given peer context.
87  */
88 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
89
90 /**
91  * Set a channel flag of given channel context.
92  */
93 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
94
95 /**
96  * Get channel flag of given channel context.
97  */
98 #define check_channel_flag_set(channel_flags, mask)\
99   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
100
101 /**
102  * Unset flag of given channel context.
103  */
104 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
105
106
107
108 /**
109  * Pending operation on peer consisting of callback and closure
110  *
111  * When an operation cannot be executed right now this struct is used to store
112  * the callback and closure for later execution.
113  */
114 struct PeerPendingOp
115 {
116   /**
117    * Callback
118    */
119   PeerOp op;
120
121   /**
122    * Closure
123    */
124   void *op_cls;
125 };
126
127 /**
128  * List containing all messages that are yet to be send
129  *
130  * This is used to keep track of all messages that have not been sent yet. When
131  * a peer is to be removed the pending messages can be removed properly.
132  */
133 struct PendingMessage
134 {
135   /**
136    * DLL next, prev
137    */
138   struct PendingMessage *next;
139   struct PendingMessage *prev;
140
141   /**
142    * The envelope to the corresponding message
143    */
144   struct GNUNET_MQ_Envelope *ev;
145
146   /**
147    * The corresponding context
148    */
149   struct PeerContext *peer_ctx;
150
151   /**
152    * The message type
153    */
154   const char *type;
155 };
156
157 /**
158  * Struct used to keep track of other peer's status
159  *
160  * This is stored in a multipeermap.
161  * It contains information such as cadet channels, a message queue for sending,
162  * status about the channels, the pending operations on this peer and some flags
163  * about the status of the peer itself. (live, valid, ...)
164  */
165 struct PeerContext
166 {
167   /**
168    * Message queue open to client
169    */
170   struct GNUNET_MQ_Handle *mq;
171
172   /**
173    * Channel open to client.
174    */
175   struct GNUNET_CADET_Channel *send_channel;
176
177   /**
178    * Flags to the sending channel
179    */
180   uint32_t *send_channel_flags;
181
182   /**
183    * Channel open from client.
184    */
185   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
186
187   /**
188    * Flags to the receiving channel
189    */
190   uint32_t *recv_channel_flags;
191
192   /**
193    * Array of pending operations on this peer.
194    */
195   struct PeerPendingOp *pending_ops;
196
197   /**
198    * Handle to the callback given to cadet_ntfy_tmt_rdy()
199    *
200    * To be canceled on shutdown.
201    */
202   struct PendingMessage *liveliness_check_pending;
203
204   /**
205    * Number of pending operations.
206    */
207   unsigned int num_pending_ops;
208
209   /**
210    * Identity of the peer
211    */
212   struct GNUNET_PeerIdentity peer_id;
213
214   /**
215    * Flags indicating status of peer
216    */
217   uint32_t peer_flags;
218
219   /**
220    * Last time we received something from that peer.
221    */
222   struct GNUNET_TIME_Absolute last_message_recv;
223
224   /**
225    * Last time we received a keepalive message.
226    */
227   struct GNUNET_TIME_Absolute last_keepalive;
228
229   /**
230    * DLL with all messages that are yet to be sent
231    */
232   struct PendingMessage *pending_messages_head;
233   struct PendingMessage *pending_messages_tail;
234
235   /**
236    * This is pobably followed by 'statistical' data (when we first saw
237    * him, how did we get his ID, how many pushes (in a timeinterval),
238    * ...)
239    */
240 };
241
242 /**
243  * @brief Closure to #valid_peer_iterator
244  */
245 struct PeersIteratorCls
246 {
247   /**
248    * Iterator function
249    */
250   PeersIterator iterator;
251
252   /**
253    * Closure to iterator
254    */
255   void *cls;
256 };
257
258 /**
259  * @brief Hashmap of valid peers.
260  */
261 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
262
263 /**
264  * @brief Maximum number of valid peers to keep.
265  * TODO read from config
266  */
267 static uint32_t num_valid_peers_max = UINT32_MAX;
268
269 /**
270  * @brief Filename of the file that stores the valid peers persistently.
271  */
272 static char *filename_valid_peers;
273
274 /**
275  * Set of all peers to keep track of them.
276  */
277 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
278
279 /**
280  * Cadet handle.
281  */
282 static struct GNUNET_CADET_Handle *cadet_handle;
283
284
285
286 /**
287  * @brief Get the #PeerContext associated with a peer
288  *
289  * @param peer the peer id
290  *
291  * @return the #PeerContext
292  */
293 static struct PeerContext *
294 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
295 {
296   struct PeerContext *ctx;
297   int ret;
298
299   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
300   GNUNET_assert (GNUNET_YES == ret);
301   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
302   GNUNET_assert (NULL != ctx);
303   return ctx;
304 }
305
306 int
307 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
308
309 /**
310  * @brief Create a new #PeerContext and insert it into the peer map
311  *
312  * @param peer the peer to create the #PeerContext for
313  *
314  * @return the #PeerContext
315  */
316 static struct PeerContext *
317 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
318 {
319   struct PeerContext *ctx;
320   int ret;
321
322   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
323
324   ctx = GNUNET_new (struct PeerContext);
325   ctx->peer_id = *peer;
326   ctx->send_channel_flags = GNUNET_new (uint32_t);
327   ctx->recv_channel_flags = GNUNET_new (uint32_t);
328   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
329       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
330   GNUNET_assert (GNUNET_OK == ret);
331   return ctx;
332 }
333
334
335 /**
336  * @brief Create or get a #PeerContext
337  *
338  * @param peer the peer to get the associated context to
339  *
340  * @return the context
341  */
342 static struct PeerContext *
343 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
344 {
345   if (GNUNET_NO == Peers_check_peer_known (peer))
346   {
347     return create_peer_ctx (peer);
348   }
349   return get_peer_ctx (peer);
350 }
351
352 void
353 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
354
355 void
356 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
357
358 /**
359  * @brief Check whether we have a connection to this @a peer
360  *
361  * Also sets the #Peers_ONLINE flag accordingly
362  *
363  * @param peer the peer in question
364  *
365  * @return #GNUNET_YES if we are connected
366  *         #GNUNET_NO  otherwise
367  */
368 int
369 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
370 {
371   const struct PeerContext *peer_ctx;
372
373   /* If we don't know about this peer we don't know whether it's online */
374   if (GNUNET_NO == Peers_check_peer_known (peer))
375   {
376     return GNUNET_NO;
377   }
378   /* Get the context */
379   peer_ctx = get_peer_ctx (peer);
380   /* If we have no channel to this peer we don't know whether it's online */
381   if ( (NULL == peer_ctx->send_channel) &&
382        (NULL == peer_ctx->recv_channel) )
383   {
384     Peers_unset_peer_flag (peer, Peers_ONLINE);
385     return GNUNET_NO;
386   }
387   /* Otherwise (if we have a channel, we know that it's online */
388   Peers_set_peer_flag (peer, Peers_ONLINE);
389   return GNUNET_YES;
390 }
391
392
393 /**
394  * @brief The closure to #get_rand_peer_iterator.
395  */
396 struct GetRandPeerIteratorCls
397 {
398   /**
399    * @brief The index of the peer to return.
400    * Will be decreased until 0.
401    * Then current peer is returned.
402    */
403   uint32_t index;
404
405   /**
406    * @brief Pointer to peer to return.
407    */
408   const struct GNUNET_PeerIdentity *peer;
409 };
410
411
412 /**
413  * @brief Iterator function for #get_random_peer_from_peermap.
414  *
415  * Implements #GNUNET_CONTAINER_PeerMapIterator.
416  * Decreases the index until the index is null.
417  * Then returns the current peer.
418  *
419  * @param cls the #GetRandPeerIteratorCls containing index and peer
420  * @param peer current peer
421  * @param value unused
422  *
423  * @return  #GNUNET_YES if we should continue to
424  *          iterate,
425  *          #GNUNET_NO if not.
426  */
427 static int
428 get_rand_peer_iterator (void *cls,
429                         const struct GNUNET_PeerIdentity *peer,
430                         void *value)
431 {
432   struct GetRandPeerIteratorCls *iterator_cls = cls;
433   if (0 >= iterator_cls->index)
434   {
435     iterator_cls->peer = peer;
436     return GNUNET_NO;
437   }
438   iterator_cls->index--;
439   return GNUNET_YES;
440 }
441
442
443 /**
444  * @brief Get a random peer from @a peer_map
445  *
446  * @param peer_map the peer_map to get the peer from
447  *
448  * @return a random peer
449  */
450 static const struct GNUNET_PeerIdentity *
451 get_random_peer_from_peermap (const struct
452                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
453 {
454   struct GetRandPeerIteratorCls *iterator_cls;
455   const struct GNUNET_PeerIdentity *ret;
456
457   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
458   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
459       GNUNET_CONTAINER_multipeermap_size (peer_map));
460   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
461                                                 get_rand_peer_iterator,
462                                                 iterator_cls);
463   ret = iterator_cls->peer;
464   GNUNET_free (iterator_cls);
465   return ret;
466 }
467
468
469 /**
470  * @brief Add a given @a peer to valid peers.
471  *
472  * If valid peers are already #num_valid_peers_max, delete a peer previously.
473  *
474  * @param peer the peer that is added to the valid peers.
475  *
476  * @return #GNUNET_YES if no other peer had to be removed
477  *         #GNUNET_NO  otherwise
478  */
479 static int
480 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
481 {
482   const struct GNUNET_PeerIdentity *rand_peer;
483   int ret;
484
485   ret = GNUNET_YES;
486   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
487   {
488     rand_peer = get_random_peer_from_peermap (valid_peers);
489     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
490     ret = GNUNET_NO;
491   }
492   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
493       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
494   return ret;
495 }
496
497
498 /**
499  * @brief Set the peer flag to living and
500  *        call the pending operations on this peer.
501  *
502  * Also adds peer to #valid_peers.
503  *
504  * @param peer_ctx the #PeerContext of the peer to set live
505  */
506 static void
507 set_peer_live (struct PeerContext *peer_ctx)
508 {
509   struct GNUNET_PeerIdentity *peer;
510   unsigned int i;
511
512   peer = &peer_ctx->peer_id;
513   LOG (GNUNET_ERROR_TYPE_DEBUG,
514       "Peer %s is live and valid, calling %i pending operations on it\n",
515       GNUNET_i2s (peer),
516       peer_ctx->num_pending_ops);
517
518   if (NULL != peer_ctx->liveliness_check_pending)
519   {
520     LOG (GNUNET_ERROR_TYPE_DEBUG,
521          "Removing pending liveliness check for peer %s\n",
522          GNUNET_i2s (&peer_ctx->peer_id));
523     // TODO wait until cadet sets mq->cancel_impl
524     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
525     GNUNET_free (peer_ctx->liveliness_check_pending);
526     peer_ctx->liveliness_check_pending = NULL;
527   }
528
529   (void) add_valid_peer (peer);
530   set_peer_flag (peer_ctx, Peers_ONLINE);
531
532   /* Call pending operations */
533   for (i = 0; i < peer_ctx->num_pending_ops; i++)
534   {
535     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
536   }
537   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
538 }
539
540 static void
541 cleanup_destroyed_channel (void *cls,
542                            const struct GNUNET_CADET_Channel *channel);
543
544 /* Declaration of handlers */
545 static void
546 handle_peer_check (void *cls,
547                    const struct GNUNET_MessageHeader *msg);
548
549 static void
550 handle_peer_push (void *cls,
551                   const struct GNUNET_MessageHeader *msg);
552
553 static void
554 handle_peer_pull_request (void *cls,
555                           const struct GNUNET_MessageHeader *msg);
556
557 static int
558 check_peer_pull_reply (void *cls,
559                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
560
561 static void
562 handle_peer_pull_reply (void *cls,
563                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
564
565 /* End declaration of handlers */
566
567
568 /**
569  * @brief Get the channel of a peer. If not existing, create.
570  *
571  * @param peer the peer id
572  * @return the #GNUNET_CADET_Channel used to send data to @a peer
573  */
574 struct GNUNET_CADET_Channel *
575 get_channel (const struct GNUNET_PeerIdentity *peer)
576 {
577   struct PeerContext *peer_ctx;
578   struct GNUNET_HashCode port;
579   struct GNUNET_PeerIdentity *ctx_peer;
580   /* There exists a copy-paste-clone in run() */
581   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
582     GNUNET_MQ_hd_fixed_size (peer_check,
583                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
584                              struct GNUNET_MessageHeader,
585                              NULL),
586     GNUNET_MQ_hd_fixed_size (peer_push,
587                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
588                              struct GNUNET_MessageHeader,
589                              NULL),
590     GNUNET_MQ_hd_fixed_size (peer_pull_request,
591                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
592                              struct GNUNET_MessageHeader,
593                              NULL),
594     GNUNET_MQ_hd_var_size (peer_pull_reply,
595                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
596                            struct GNUNET_RPS_P2P_PullReplyMessage,
597                            NULL),
598     GNUNET_MQ_handler_end ()
599   };
600
601
602   peer_ctx = get_peer_ctx (peer);
603   if (NULL == peer_ctx->send_channel)
604   {
605     LOG (GNUNET_ERROR_TYPE_DEBUG,
606          "Trying to establish channel to peer %s\n",
607          GNUNET_i2s (peer));
608     GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
609                         strlen (GNUNET_APPLICATION_PORT_RPS),
610                         &port);
611     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
612     *ctx_peer = *peer;
613     peer_ctx->send_channel =
614       GNUNET_CADET_channel_create (cadet_handle,
615                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
616                                    peer,
617                                    &port,
618                                    GNUNET_CADET_OPTION_RELIABLE,
619                                    NULL, /* WindowSize handler */
620                                    cleanup_destroyed_channel, /* Disconnect handler */
621                                    cadet_handlers);
622   }
623   GNUNET_assert (NULL != peer_ctx->send_channel);
624   return peer_ctx->send_channel;
625 }
626
627
628 /**
629  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
630  *
631  * If we already have a message queue open to this client,
632  * simply return it, otherways create one.
633  *
634  * @param peer the peer to get the mq to
635  * @return the #GNUNET_MQ_Handle
636  */
637 static struct GNUNET_MQ_Handle *
638 get_mq (const struct GNUNET_PeerIdentity *peer)
639 {
640   struct PeerContext *peer_ctx;
641
642   peer_ctx = get_peer_ctx (peer);
643
644   if (NULL == peer_ctx->mq)
645   {
646     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
647   }
648   return peer_ctx->mq;
649 }
650
651
652 /**
653  * @brief This is called in response to the first message we sent as a
654  * liveliness check.
655  *
656  * @param cls #PeerContext of peer with pending liveliness check
657  */
658 static void
659 mq_liveliness_check_successful (void *cls)
660 {
661   struct PeerContext *peer_ctx = cls;
662
663   if (NULL != peer_ctx->liveliness_check_pending)
664   {
665     LOG (GNUNET_ERROR_TYPE_DEBUG,
666         "Liveliness check for peer %s was successfull\n",
667         GNUNET_i2s (&peer_ctx->peer_id));
668     GNUNET_free (peer_ctx->liveliness_check_pending);
669     peer_ctx->liveliness_check_pending = NULL;
670     set_peer_live (peer_ctx);
671   }
672 }
673
674 /**
675  * Issue a check whether peer is live
676  *
677  * @param peer_ctx the context of the peer
678  */
679 static void
680 check_peer_live (struct PeerContext *peer_ctx)
681 {
682   LOG (GNUNET_ERROR_TYPE_DEBUG,
683        "Get informed about peer %s getting live\n",
684        GNUNET_i2s (&peer_ctx->peer_id));
685
686   struct GNUNET_MQ_Handle *mq;
687   struct GNUNET_MQ_Envelope *ev;
688
689   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
690   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
691   peer_ctx->liveliness_check_pending->ev = ev;
692   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
693   peer_ctx->liveliness_check_pending->type = "Check liveliness";
694   mq = get_mq (&peer_ctx->peer_id);
695   GNUNET_MQ_notify_sent (ev,
696                          mq_liveliness_check_successful,
697                          peer_ctx);
698   GNUNET_MQ_send (mq, ev);
699 }
700
701 /**
702  * @brief Add an envelope to a message passed to mq to list of pending messages
703  *
704  * @param peer peer the message was sent to
705  * @param ev envelope to the message
706  * @param type type of the message to be sent
707  * @return pointer to pending message
708  */
709 static struct PendingMessage *
710 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
711                         struct GNUNET_MQ_Envelope *ev,
712                         const char *type)
713 {
714   struct PendingMessage *pending_msg;
715   struct PeerContext *peer_ctx;
716
717   peer_ctx = get_peer_ctx (peer);
718   pending_msg = GNUNET_new (struct PendingMessage);
719   pending_msg->ev = ev;
720   pending_msg->peer_ctx = peer_ctx;
721   pending_msg->type = type;
722   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
723                                peer_ctx->pending_messages_tail,
724                                pending_msg);
725   return pending_msg;
726 }
727
728
729 /**
730  * @brief Remove a pending message from the respective DLL
731  *
732  * @param pending_msg the pending message to remove
733  * @param cancel cancel the pending message, too
734  */
735 static void
736 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
737 {
738   struct PeerContext *peer_ctx;
739
740   peer_ctx = pending_msg->peer_ctx;
741   GNUNET_assert (NULL != peer_ctx);
742   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
743                                peer_ctx->pending_messages_tail,
744                                pending_msg);
745   // TODO wait for the cadet implementation of message cancellation
746   //if (GNUNET_YES == cancel)
747   //{
748   //  GNUNET_MQ_send_cancel (pending_msg->ev);
749   //}
750   GNUNET_free (pending_msg);
751 }
752
753
754 /**
755  * @brief Check whether function of type #PeerOp was already scheduled
756  *
757  * The array with pending operations will probably never grow really big, so
758  * iterating over it should be ok.
759  *
760  * @param peer the peer to check
761  * @param peer_op the operation (#PeerOp) on the peer
762  *
763  * @return #GNUNET_YES if this operation is scheduled on that peer
764  *         #GNUNET_NO  otherwise
765  */
766 static int
767 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
768                            const PeerOp peer_op)
769 {
770   const struct PeerContext *peer_ctx;
771   unsigned int i;
772
773   peer_ctx = get_peer_ctx (peer);
774   for (i = 0; i < peer_ctx->num_pending_ops; i++)
775     if (peer_op == peer_ctx->pending_ops[i].op)
776       return GNUNET_YES;
777   return GNUNET_NO;
778 }
779
780 int
781 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
782
783 /**
784  * Iterator over hash map entries. Deletes all contexts of peers.
785  *
786  * @param cls closure
787  * @param key current public key
788  * @param value value in the hash map
789  * @return #GNUNET_YES if we should continue to iterate,
790  *         #GNUNET_NO if not.
791  */
792 static int
793 peermap_clear_iterator (void *cls,
794                         const struct GNUNET_PeerIdentity *key,
795                         void *value)
796 {
797   Peers_remove_peer (key);
798   return GNUNET_YES;
799 }
800
801
802 /**
803  * @brief This is called once a message is sent.
804  *
805  * Removes the pending message
806  *
807  * @param cls type of the message that was sent
808  */
809 static void
810 mq_notify_sent_cb (void *cls)
811 {
812   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814       "%s was sent.\n",
815       pending_msg->type);
816   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
817     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
818   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
819     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
820   if (0 == strncmp ("PUSH", pending_msg->type, 4))
821     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
822   /* Do not cancle message */
823   remove_pending_message (pending_msg, GNUNET_NO);
824 }
825
826
827 /**
828  * @brief Iterator function for #store_valid_peers.
829  *
830  * Implements #GNUNET_CONTAINER_PeerMapIterator.
831  * Writes single peer to disk.
832  *
833  * @param cls the file handle to write to.
834  * @param peer current peer
835  * @param value unused
836  *
837  * @return  #GNUNET_YES if we should continue to
838  *          iterate,
839  *          #GNUNET_NO if not.
840  */
841 static int
842 store_peer_presistently_iterator (void *cls,
843                                   const struct GNUNET_PeerIdentity *peer,
844                                   void *value)
845 {
846   const struct GNUNET_DISK_FileHandle *fh = cls;
847   char peer_string[128];
848   int size;
849   ssize_t ret;
850
851   if (NULL == peer)
852   {
853     return GNUNET_YES;
854   }
855   size = GNUNET_snprintf (peer_string,
856                           sizeof (peer_string),
857                           "%s\n",
858                           GNUNET_i2s_full (peer));
859   GNUNET_assert (53 == size);
860   ret = GNUNET_DISK_file_write (fh,
861                                 peer_string,
862                                 size);
863   GNUNET_assert (size == ret);
864   return GNUNET_YES;
865 }
866
867
868 /**
869  * @brief Store the peers currently in #valid_peers to disk.
870  */
871 static void
872 store_valid_peers ()
873 {
874   struct GNUNET_DISK_FileHandle *fh;
875   uint32_t number_written_peers;
876   int ret;
877
878   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
879   {
880     return;
881   }
882
883   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
884   if (GNUNET_SYSERR == ret)
885   {
886     LOG (GNUNET_ERROR_TYPE_WARNING,
887         "Not able to create directory for file `%s'\n",
888         filename_valid_peers);
889     GNUNET_break (0);
890   }
891   else if (GNUNET_NO == ret)
892   {
893     LOG (GNUNET_ERROR_TYPE_WARNING,
894         "Directory for file `%s' exists but is not writable for us\n",
895         filename_valid_peers);
896     GNUNET_break (0);
897   }
898   fh = GNUNET_DISK_file_open (filename_valid_peers,
899                               GNUNET_DISK_OPEN_WRITE |
900                                   GNUNET_DISK_OPEN_CREATE,
901                               GNUNET_DISK_PERM_USER_READ |
902                                   GNUNET_DISK_PERM_USER_WRITE);
903   if (NULL == fh)
904   {
905     LOG (GNUNET_ERROR_TYPE_WARNING,
906         "Not able to write valid peers to file `%s'\n",
907         filename_valid_peers);
908     return;
909   }
910   LOG (GNUNET_ERROR_TYPE_DEBUG,
911       "Writing %u valid peers to disk\n",
912       GNUNET_CONTAINER_multipeermap_size (valid_peers));
913   number_written_peers =
914     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
915                                            store_peer_presistently_iterator,
916                                            fh);
917   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
918   GNUNET_assert (number_written_peers ==
919       GNUNET_CONTAINER_multipeermap_size (valid_peers));
920 }
921
922
923 /**
924  * @brief Convert string representation of peer id to peer id.
925  *
926  * Counterpart to #GNUNET_i2s_full.
927  *
928  * @param string_repr The string representation of the peer id
929  *
930  * @return The peer id
931  */
932 static const struct GNUNET_PeerIdentity *
933 s2i_full (const char *string_repr)
934 {
935   struct GNUNET_PeerIdentity *peer;
936   size_t len;
937   int ret;
938
939   peer = GNUNET_new (struct GNUNET_PeerIdentity);
940   len = strlen (string_repr);
941   if (52 > len)
942   {
943     LOG (GNUNET_ERROR_TYPE_WARNING,
944         "Not able to convert string representation of PeerID to PeerID\n"
945         "Sting representation: %s (len %lu) - too short\n",
946         string_repr,
947         len);
948     GNUNET_break (0);
949   }
950   else if (52 < len)
951   {
952     len = 52;
953   }
954   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
955                                                     len,
956                                                     &peer->public_key);
957   if (GNUNET_OK != ret)
958   {
959     LOG (GNUNET_ERROR_TYPE_WARNING,
960         "Not able to convert string representation of PeerID to PeerID\n"
961         "Sting representation: %s\n",
962         string_repr);
963     GNUNET_break (0);
964   }
965   return peer;
966 }
967
968
969 /**
970  * @brief Restore the peers on disk to #valid_peers.
971  */
972 static void
973 restore_valid_peers ()
974 {
975   off_t file_size;
976   uint32_t num_peers;
977   struct GNUNET_DISK_FileHandle *fh;
978   char *buf;
979   ssize_t size_read;
980   char *iter_buf;
981   char *str_repr;
982   const struct GNUNET_PeerIdentity *peer;
983
984   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
985   {
986     return;
987   }
988
989   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
990   {
991     return;
992   }
993   fh = GNUNET_DISK_file_open (filename_valid_peers,
994                               GNUNET_DISK_OPEN_READ,
995                               GNUNET_DISK_PERM_NONE);
996   GNUNET_assert (NULL != fh);
997   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
998   num_peers = file_size / 53;
999   buf = GNUNET_malloc (file_size);
1000   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1001   GNUNET_assert (size_read == file_size);
1002   LOG (GNUNET_ERROR_TYPE_DEBUG,
1003       "Restoring %" PRIu32 " peers from file `%s'\n",
1004       num_peers,
1005       filename_valid_peers);
1006   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1007   {
1008     str_repr = GNUNET_strndup (iter_buf, 53);
1009     peer = s2i_full (str_repr);
1010     GNUNET_free (str_repr);
1011     add_valid_peer (peer);
1012     LOG (GNUNET_ERROR_TYPE_DEBUG,
1013         "Restored valid peer %s from disk\n",
1014         GNUNET_i2s_full (peer));
1015   }
1016   iter_buf = NULL;
1017   GNUNET_free (buf);
1018   LOG (GNUNET_ERROR_TYPE_DEBUG,
1019       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1020       num_peers,
1021       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1022   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1023   {
1024     LOG (GNUNET_ERROR_TYPE_WARNING,
1025         "Number of restored peers does not match file size. Have probably duplicates.\n");
1026   }
1027   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1028   LOG (GNUNET_ERROR_TYPE_DEBUG,
1029       "Restored %u valid peers from disk\n",
1030       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1031 }
1032
1033
1034 /**
1035  * @brief Initialise storage of peers
1036  *
1037  * @param fn_valid_peers filename of the file used to store valid peer ids
1038  * @param cadet_h cadet handle
1039  * @param own_id own peer identity
1040  */
1041 void
1042 Peers_initialise (char* fn_valid_peers,
1043                   struct GNUNET_CADET_Handle *cadet_h,
1044                   const struct GNUNET_PeerIdentity *own_id)
1045 {
1046   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1047   cadet_handle = cadet_h;
1048   own_identity = *own_id;
1049   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1050   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1051   restore_valid_peers ();
1052 }
1053
1054
1055 /**
1056  * @brief Delete storage of peers that was created with #Peers_initialise ()
1057  */
1058 void
1059 Peers_terminate ()
1060 {
1061   if (GNUNET_SYSERR ==
1062       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1063                                              peermap_clear_iterator,
1064                                              NULL))
1065   {
1066     LOG (GNUNET_ERROR_TYPE_WARNING,
1067         "Iteration destroying peers was aborted.\n");
1068   }
1069   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1070   peer_map = NULL;
1071   store_valid_peers ();
1072   GNUNET_free (filename_valid_peers);
1073   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1074 }
1075
1076
1077 /**
1078  * Iterator over #valid_peers hash map entries.
1079  *
1080  * @param cls closure - unused
1081  * @param peer current peer id
1082  * @param value value in the hash map - unused
1083  * @return #GNUNET_YES if we should continue to
1084  *         iterate,
1085  *         #GNUNET_NO if not.
1086  */
1087 static int
1088 valid_peer_iterator (void *cls,
1089                      const struct GNUNET_PeerIdentity *peer,
1090                      void *value)
1091 {
1092   struct PeersIteratorCls *it_cls = cls;
1093
1094   return it_cls->iterator (it_cls->cls,
1095                            peer);
1096 }
1097
1098
1099 /**
1100  * @brief Get all currently known, valid peer ids.
1101  *
1102  * @param it function to call on each peer id
1103  * @param it_cls extra argument to @a it
1104  * @return the number of key value pairs processed,
1105  *         #GNUNET_SYSERR if it aborted iteration
1106  */
1107 int
1108 Peers_get_valid_peers (PeersIterator iterator,
1109                        void *it_cls)
1110 {
1111   struct PeersIteratorCls *cls;
1112   int ret;
1113
1114   cls = GNUNET_new (struct PeersIteratorCls);
1115   cls->iterator = iterator;
1116   cls->cls = it_cls;
1117   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1118                                                valid_peer_iterator,
1119                                                cls);
1120   GNUNET_free (cls);
1121   return ret;
1122 }
1123
1124
1125 /**
1126  * @brief Add peer to known peers.
1127  *
1128  * This function is called on new peer_ids from 'external' sources
1129  * (client seed, cadet get_peers(), ...)
1130  *
1131  * @param peer the new #GNUNET_PeerIdentity
1132  *
1133  * @return #GNUNET_YES if peer was inserted
1134  *         #GNUNET_NO  otherwise (if peer was already known or
1135  *                     peer was #own_identity)
1136  */
1137 int
1138 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1139 {
1140   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1141        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1142   {
1143     return GNUNET_NO; /* We already know this peer - nothing to do */
1144   }
1145   (void) create_peer_ctx (peer);
1146   return GNUNET_YES;
1147 }
1148
1149 int
1150 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1151
1152 /**
1153  * @brief Try connecting to a peer to see whether it is online
1154  *
1155  * If not known yet, insert into known peers
1156  *
1157  * @param peer the peer whose liveliness is to be checked
1158  * @return #GNUNET_YES if peer had to be inserted
1159  *         #GNUNET_NO  otherwise (if peer was already known or
1160  *                     peer was #own_identity)
1161  */
1162 int
1163 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1164 {
1165   struct PeerContext *peer_ctx;
1166   int ret;
1167
1168   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1169   {
1170     return GNUNET_NO;
1171   }
1172   ret = Peers_insert_peer (peer);
1173   peer_ctx = get_peer_ctx (peer);
1174   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1175   {
1176     check_peer_live (peer_ctx);
1177   }
1178   return ret;
1179 }
1180
1181
1182 /**
1183  * @brief Check if peer is removable.
1184  *
1185  * Check if
1186  *  - a recv channel exists
1187  *  - there are pending messages
1188  *  - there is no pending pull reply
1189  *
1190  * @param peer the peer in question
1191  * @return #GNUNET_YES    if peer is removable
1192  *         #GNUNET_NO     if peer is NOT removable
1193  *         #GNUNET_SYSERR if peer is not known
1194  */
1195 int
1196 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1197 {
1198   struct PeerContext *peer_ctx;
1199
1200   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1201   {
1202     return GNUNET_SYSERR;
1203   }
1204
1205   peer_ctx = get_peer_ctx (peer);
1206   if ( (NULL != peer_ctx->recv_channel) ||
1207        (NULL != peer_ctx->pending_messages_head) ||
1208        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1209   {
1210     return GNUNET_NO;
1211   }
1212   return GNUNET_YES;
1213 }
1214
1215 uint32_t *
1216 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1217                         enum Peers_ChannelRole role);
1218
1219 int
1220 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1221
1222 /**
1223  * @brief Remove peer
1224  *
1225  * @param peer the peer to clean
1226  * @return #GNUNET_YES if peer was removed
1227  *         #GNUNET_NO  otherwise
1228  */
1229 int
1230 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1231 {
1232   struct PeerContext *peer_ctx;
1233   uint32_t *channel_flag;
1234
1235   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1236   {
1237     return GNUNET_NO;
1238   }
1239
1240   peer_ctx = get_peer_ctx (peer);
1241   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1242   LOG (GNUNET_ERROR_TYPE_DEBUG,
1243        "Going to remove peer %s\n",
1244        GNUNET_i2s (&peer_ctx->peer_id));
1245   Peers_unset_peer_flag (peer, Peers_ONLINE);
1246
1247   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1248   while (NULL != peer_ctx->pending_messages_head)
1249   {
1250     LOG (GNUNET_ERROR_TYPE_DEBUG,
1251         "Removing unsent %s\n",
1252         peer_ctx->pending_messages_head->type);
1253     /* Cancle pending message, too */
1254     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1255   }
1256   /* If we are still waiting for notification whether this peer is live
1257    * cancel the according task */
1258   if (NULL != peer_ctx->liveliness_check_pending)
1259   {
1260     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261          "Removing pending liveliness check for peer %s\n",
1262          GNUNET_i2s (&peer_ctx->peer_id));
1263     // TODO wait until cadet sets mq->cancel_impl
1264     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1265     GNUNET_free (peer_ctx->liveliness_check_pending);
1266     peer_ctx->liveliness_check_pending = NULL;
1267   }
1268   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1269   if (NULL != peer_ctx->send_channel &&
1270       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1271   {
1272     LOG (GNUNET_ERROR_TYPE_DEBUG,
1273         "Destroying send channel\n");
1274     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1275     peer_ctx->send_channel = NULL;
1276     peer_ctx->mq = NULL;
1277   }
1278   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1279   if (NULL != peer_ctx->recv_channel &&
1280       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1281   {
1282     LOG (GNUNET_ERROR_TYPE_DEBUG,
1283         "Destroying recv channel\n");
1284     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1285     peer_ctx->recv_channel = NULL;
1286   }
1287
1288   GNUNET_free (peer_ctx->send_channel_flags);
1289   GNUNET_free (peer_ctx->recv_channel_flags);
1290
1291   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1292   {
1293     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1294   }
1295   GNUNET_free (peer_ctx);
1296   return GNUNET_YES;
1297 }
1298
1299
1300 /**
1301  * @brief set flags on a given peer.
1302  *
1303  * @param peer the peer to set flags on
1304  * @param flags the flags
1305  */
1306 void
1307 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1308 {
1309   struct PeerContext *peer_ctx;
1310
1311   peer_ctx = get_peer_ctx (peer);
1312   set_peer_flag (peer_ctx, flags);
1313 }
1314
1315
1316 /**
1317  * @brief unset flags on a given peer.
1318  *
1319  * @param peer the peer to unset flags on
1320  * @param flags the flags
1321  */
1322 void
1323 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1324 {
1325   struct PeerContext *peer_ctx;
1326
1327   peer_ctx = get_peer_ctx (peer);
1328   unset_peer_flag (peer_ctx, flags);
1329 }
1330
1331
1332 /**
1333  * @brief Check whether flags on a peer are set.
1334  *
1335  * @param peer the peer to check the flag of
1336  * @param flags the flags to check
1337  *
1338  * @return #GNUNET_SYSERR if peer is not known
1339  *         #GNUNET_YES    if all given flags are set
1340  *         #GNUNET_NO     otherwise
1341  */
1342 int
1343 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1344 {
1345   struct PeerContext *peer_ctx;
1346
1347   if (GNUNET_NO == Peers_check_peer_known (peer))
1348   {
1349     return GNUNET_SYSERR;
1350   }
1351   peer_ctx = get_peer_ctx (peer);
1352   return check_peer_flag_set (peer_ctx, flags);
1353 }
1354
1355
1356 /**
1357  * @brief set flags on a given channel.
1358  *
1359  * @param channel the channel to set flags on
1360  * @param flags the flags
1361  */
1362 void
1363 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1364 {
1365   set_channel_flag (channel_flags, flags);
1366 }
1367
1368
1369 /**
1370  * @brief unset flags on a given channel.
1371  *
1372  * @param channel the channel to unset flags on
1373  * @param flags the flags
1374  */
1375 void
1376 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1377 {
1378   unset_channel_flag (channel_flags, flags);
1379 }
1380
1381
1382 /**
1383  * @brief Check whether flags on a channel are set.
1384  *
1385  * @param channel the channel to check the flag of
1386  * @param flags the flags to check
1387  *
1388  * @return #GNUNET_YES if all given flags are set
1389  *         #GNUNET_NO  otherwise
1390  */
1391 int
1392 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1393 {
1394   return check_channel_flag_set (channel_flags, flags);
1395 }
1396
1397 /**
1398  * @brief Get the flags for the channel in @a role for @a peer.
1399  *
1400  * @param peer Peer to get the channel flags for.
1401  * @param role Role of channel to get flags for
1402  *
1403  * @return The flags.
1404  */
1405 uint32_t *
1406 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1407                         enum Peers_ChannelRole role)
1408 {
1409   const struct PeerContext *peer_ctx;
1410
1411   peer_ctx = get_peer_ctx (peer);
1412   if (Peers_CHANNEL_ROLE_SENDING == role)
1413   {
1414     return peer_ctx->send_channel_flags;
1415   }
1416   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1417   {
1418     return peer_ctx->recv_channel_flags;
1419   }
1420   else
1421   {
1422     GNUNET_assert (0);
1423   }
1424 }
1425
1426 /**
1427  * @brief Check whether we have information about the given peer.
1428  *
1429  * FIXME probably deprecated. Make this the new _online.
1430  *
1431  * @param peer peer in question
1432  *
1433  * @return #GNUNET_YES if peer is known
1434  *         #GNUNET_NO  if peer is not knwon
1435  */
1436 int
1437 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1438 {
1439   if (NULL != peer_map)
1440   {
1441     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1442   } else
1443   {
1444     return GNUNET_NO;
1445   }
1446 }
1447
1448
1449 /**
1450  * @brief Check whether @a peer is actually a peer.
1451  *
1452  * A valid peer is a peer that we know exists eg. we were connected to once.
1453  *
1454  * @param peer peer in question
1455  *
1456  * @return #GNUNET_YES if peer is valid
1457  *         #GNUNET_NO  if peer is not valid
1458  */
1459 int
1460 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1461 {
1462   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1463 }
1464
1465
1466 /**
1467  * @brief Indicate that we want to send to the other peer
1468  *
1469  * This establishes a sending channel
1470  *
1471  * @param peer the peer to establish channel to
1472  */
1473 void
1474 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1475 {
1476   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1477   (void) get_channel (peer);
1478 }
1479
1480
1481 /**
1482  * @brief Check whether other peer has the intention to send/opened channel
1483  *        towars us
1484  *
1485  * @param peer the peer in question
1486  *
1487  * @return #GNUNET_YES if peer has the intention to send
1488  *         #GNUNET_NO  otherwise
1489  */
1490 int
1491 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1492 {
1493   const struct PeerContext *peer_ctx;
1494
1495   peer_ctx = get_peer_ctx (peer);
1496   if (NULL != peer_ctx->recv_channel)
1497   {
1498     return GNUNET_YES;
1499   }
1500   return GNUNET_NO;
1501 }
1502
1503
1504 /**
1505  * Handle the channel a peer opens to us.
1506  *
1507  * @param cls The closure
1508  * @param channel The channel the peer wants to establish
1509  * @param initiator The peer's peer ID
1510  *
1511  * @return initial channel context for the channel
1512  *         (can be NULL -- that's not an error)
1513  */
1514 void *
1515 Peers_handle_inbound_channel (void *cls,
1516                               struct GNUNET_CADET_Channel *channel,
1517                               const struct GNUNET_PeerIdentity *initiator)
1518 {
1519   struct PeerContext *peer_ctx;
1520   struct GNUNET_PeerIdentity *ctx_peer;
1521
1522   LOG (GNUNET_ERROR_TYPE_DEBUG,
1523       "New channel was established to us (Peer %s).\n",
1524       GNUNET_i2s (initiator));
1525   GNUNET_assert (NULL != channel); /* according to cadet API */
1526   /* Make sure we 'know' about this peer */
1527   peer_ctx = create_or_get_peer_ctx (initiator);
1528   set_peer_live (peer_ctx);
1529   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1530   *ctx_peer = *initiator;
1531   /* We only accept one incoming channel per peer */
1532   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1533   {
1534     set_channel_flag (peer_ctx->recv_channel_flags,
1535                       Peers_CHANNEL_ESTABLISHED_TWICE);
1536     //GNUNET_CADET_channel_destroy (channel);
1537     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1538     peer_ctx->recv_channel = channel;
1539     /* return the channel context */
1540     return ctx_peer;
1541   }
1542   peer_ctx->recv_channel = channel;
1543   return ctx_peer;
1544 }
1545
1546
1547 /**
1548  * @brief Check whether a sending channel towards the given peer exists
1549  *
1550  * @param peer the peer to check for
1551  *
1552  * @return #GNUNET_YES if a sending channel towards that peer exists
1553  *         #GNUNET_NO  otherwise
1554  */
1555 int
1556 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1557 {
1558   struct PeerContext *peer_ctx;
1559
1560   if (GNUNET_NO == Peers_check_peer_known (peer))
1561   { /* If no such peer exists, there is no channel */
1562     return GNUNET_NO;
1563   }
1564   peer_ctx = get_peer_ctx (peer);
1565   if (NULL == peer_ctx->send_channel)
1566   {
1567     return GNUNET_NO;
1568   }
1569   return GNUNET_YES;
1570 }
1571
1572
1573 /**
1574  * @brief check whether the given channel is the sending channel of the given
1575  *        peer
1576  *
1577  * @param peer the peer in question
1578  * @param channel the channel to check for
1579  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1580  *                    #Peers_CHANNEL_ROLE_RECEIVING
1581  *
1582  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1583  *         #GNUNET_NO  otherwise
1584  */
1585 int
1586 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1587                           const struct GNUNET_CADET_Channel *channel,
1588                           enum Peers_ChannelRole role)
1589 {
1590   const struct PeerContext *peer_ctx;
1591
1592   if (GNUNET_NO == Peers_check_peer_known (peer))
1593   {
1594     return GNUNET_NO;
1595   }
1596   peer_ctx = get_peer_ctx (peer);
1597   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1598        (channel == peer_ctx->send_channel) )
1599   {
1600     return GNUNET_YES;
1601   }
1602   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1603        (channel == peer_ctx->recv_channel) )
1604   {
1605     return GNUNET_YES;
1606   }
1607   return GNUNET_NO;
1608 }
1609
1610
1611 /**
1612  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1613  *        intention to another peer
1614  *
1615  * If there is also no channel to receive messages from that peer, remove it
1616  * from the peermap.
1617  * TODO really?
1618  *
1619  * @peer the peer identity of the peer whose sending channel to destroy
1620  * @return #GNUNET_YES if channel was destroyed
1621  *         #GNUNET_NO  otherwise
1622  */
1623 int
1624 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1625 {
1626   struct PeerContext *peer_ctx;
1627
1628   if (GNUNET_NO == Peers_check_peer_known (peer))
1629   {
1630     return GNUNET_NO;
1631   }
1632   peer_ctx = get_peer_ctx (peer);
1633   if (NULL != peer_ctx->send_channel)
1634   {
1635     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1636     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1637     peer_ctx->send_channel = NULL;
1638     peer_ctx->mq = NULL;
1639     (void) Peers_check_connected (peer);
1640     return GNUNET_YES;
1641   }
1642   return GNUNET_NO;
1643 }
1644
1645 /**
1646  * This is called when a channel is destroyed.
1647  *
1648  * @param cls The closure
1649  * @param channel The channel being closed
1650  */
1651 void
1652 Peers_cleanup_destroyed_channel (void *cls,
1653                                  const struct GNUNET_CADET_Channel *channel)
1654 {
1655   struct GNUNET_PeerIdentity *peer = cls;
1656   struct PeerContext *peer_ctx;
1657
1658   if (GNUNET_NO == Peers_check_peer_known (peer))
1659   {/* We don't want to implicitly create a context that we're about to kill */
1660   LOG (GNUNET_ERROR_TYPE_DEBUG,
1661        "channel (%s) without associated context was destroyed\n",
1662        GNUNET_i2s (peer));
1663     return;
1664   }
1665   peer_ctx = get_peer_ctx (peer);
1666
1667   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1668    * flag will be set. In this case simply make sure that the channels are
1669    * cleaned. */
1670   /* FIXME This distinction seems to be redundant */
1671   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1672   {/* We initiatad the destruction of this particular peer */
1673     LOG (GNUNET_ERROR_TYPE_DEBUG,
1674         "Peer is in the process of being destroyed\n");
1675     if (channel == peer_ctx->send_channel)
1676     {
1677       peer_ctx->send_channel = NULL;
1678       peer_ctx->mq = NULL;
1679     }
1680     else if (channel == peer_ctx->recv_channel)
1681     {
1682       peer_ctx->recv_channel = NULL;
1683     }
1684
1685     if (NULL != peer_ctx->send_channel)
1686     {
1687       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1688       peer_ctx->send_channel = NULL;
1689       peer_ctx->mq = NULL;
1690     }
1691     if (NULL != peer_ctx->recv_channel)
1692     {
1693       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1694       peer_ctx->recv_channel = NULL;
1695     }
1696     /* Set the #Peers_ONLINE flag accordingly */
1697     (void) Peers_check_connected (peer);
1698     return;
1699   }
1700
1701   else
1702   { /* We did not initiate the destruction of this peer */
1703     LOG (GNUNET_ERROR_TYPE_DEBUG,
1704         "Peer is NOT in the process of being destroyed\n");
1705     if (channel == peer_ctx->send_channel)
1706     { /* Something (but us) killd the channel - clean up peer */
1707       LOG (GNUNET_ERROR_TYPE_DEBUG,
1708           "send channel (%s) was destroyed - cleaning up\n",
1709           GNUNET_i2s (peer));
1710       peer_ctx->send_channel = NULL;
1711       peer_ctx->mq = NULL;
1712     }
1713     else if (channel == peer_ctx->recv_channel)
1714     { /* Other peer doesn't want to send us messages anymore */
1715       LOG (GNUNET_ERROR_TYPE_DEBUG,
1716            "Peer %s destroyed recv channel - cleaning up channel\n",
1717            GNUNET_i2s (peer));
1718       peer_ctx->recv_channel = NULL;
1719     }
1720     else
1721     {
1722       LOG (GNUNET_ERROR_TYPE_WARNING,
1723            "unknown channel (%s) was destroyed\n",
1724            GNUNET_i2s (peer));
1725     }
1726   }
1727   (void) Peers_check_connected (peer);
1728 }
1729
1730 /**
1731  * @brief Send a message to another peer.
1732  *
1733  * Keeps track about pending messages so they can be properly removed when the
1734  * peer is destroyed.
1735  *
1736  * @param peer receeiver of the message
1737  * @param ev envelope of the message
1738  * @param type type of the message
1739  */
1740 void
1741 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1742                     struct GNUNET_MQ_Envelope *ev,
1743                     const char *type)
1744 {
1745   struct PendingMessage *pending_msg;
1746   struct GNUNET_MQ_Handle *mq;
1747
1748   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1749               "Sending message to %s of type %s\n",
1750               GNUNET_i2s (peer),
1751               type);
1752   pending_msg = insert_pending_message (peer, ev, type);
1753   mq = get_mq (peer);
1754   GNUNET_MQ_notify_sent (ev,
1755                          mq_notify_sent_cb,
1756                          pending_msg);
1757   GNUNET_MQ_send (mq, ev);
1758 }
1759
1760 /**
1761  * @brief Schedule a operation on given peer
1762  *
1763  * Avoids scheduling an operation twice.
1764  *
1765  * @param peer the peer we want to schedule the operation for once it gets live
1766  *
1767  * @return #GNUNET_YES if the operation was scheduled
1768  *         #GNUNET_NO  otherwise
1769  */
1770 int
1771 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1772                           const PeerOp peer_op)
1773 {
1774   struct PeerPendingOp pending_op;
1775   struct PeerContext *peer_ctx;
1776
1777   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1778   {
1779     return GNUNET_NO;
1780   }
1781   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1782
1783   //TODO if LIVE/ONLINE execute immediately
1784
1785   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1786   {
1787     peer_ctx = get_peer_ctx (peer);
1788     pending_op.op = peer_op;
1789     pending_op.op_cls = NULL;
1790     GNUNET_array_append (peer_ctx->pending_ops,
1791                          peer_ctx->num_pending_ops,
1792                          pending_op);
1793     return GNUNET_YES;
1794   }
1795   return GNUNET_NO;
1796 }
1797
1798 /**
1799  * @brief Get the recv_channel of @a peer.
1800  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1801  * messages.
1802  *
1803  * @param peer The peer to get the recv_channel from.
1804  *
1805  * @return The recv_channel.
1806  */
1807 struct GNUNET_CADET_Channel *
1808 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1809 {
1810   struct PeerContext *peer_ctx;
1811
1812   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1813   peer_ctx = get_peer_ctx (peer);
1814   return peer_ctx->recv_channel;
1815 }
1816 /***********************************************************************
1817  * /Old gnunet-service-rps_peers.c
1818 ***********************************************************************/
1819
1820
1821 /***********************************************************************
1822  * Housekeeping with clients
1823 ***********************************************************************/
1824
1825 /**
1826  * Closure used to pass the client and the id to the callback
1827  * that replies to a client's request
1828  */
1829 struct ReplyCls
1830 {
1831   /**
1832    * DLL
1833    */
1834   struct ReplyCls *next;
1835   struct ReplyCls *prev;
1836
1837   /**
1838    * The identifier of the request
1839    */
1840   uint32_t id;
1841
1842   /**
1843    * The handle to the request
1844    */
1845   struct RPS_SamplerRequestHandle *req_handle;
1846
1847   /**
1848    * The client handle to send the reply to
1849    */
1850   struct ClientContext *cli_ctx;
1851 };
1852
1853
1854 /**
1855  * Struct used to store the context of a connected client.
1856  */
1857 struct ClientContext
1858 {
1859   /**
1860    * DLL
1861    */
1862   struct ClientContext *next;
1863   struct ClientContext *prev;
1864
1865   /**
1866    * The message queue to communicate with the client.
1867    */
1868   struct GNUNET_MQ_Handle *mq;
1869
1870   /**
1871    * DLL with handles to single requests from the client
1872    */
1873   struct ReplyCls *rep_cls_head;
1874   struct ReplyCls *rep_cls_tail;
1875
1876   /**
1877    * @brief How many updates this client expects to receive.
1878    */
1879   int64_t view_updates_left;
1880
1881   /**
1882    * The client handle to send the reply to
1883    */
1884   struct GNUNET_SERVICE_Client *client;
1885 };
1886
1887 /**
1888  * DLL with all clients currently connected to us
1889  */
1890 struct ClientContext *cli_ctx_head;
1891 struct ClientContext *cli_ctx_tail;
1892
1893 /***********************************************************************
1894  * /Housekeeping with clients
1895 ***********************************************************************/
1896
1897
1898
1899
1900
1901 /***********************************************************************
1902  * Globals
1903 ***********************************************************************/
1904
1905 /**
1906  * Sampler used for the Brahms protocol itself.
1907  */
1908 static struct RPS_Sampler *prot_sampler;
1909
1910 /**
1911  * Sampler used for the clients.
1912  */
1913 static struct RPS_Sampler *client_sampler;
1914
1915 /**
1916  * Name to log view to
1917  */
1918 static const char *file_name_view_log;
1919
1920 #ifdef TO_FILE
1921 /**
1922  * Name to log number of observed peers to
1923  */
1924 static const char *file_name_observed_log;
1925
1926 /**
1927  * @brief Count the observed peers
1928  */
1929 static uint32_t num_observed_peers;
1930
1931 /**
1932  * @brief Multipeermap (ab-) used to count unique peer_ids
1933  */
1934 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1935 #endif /* TO_FILE */
1936
1937 /**
1938  * The size of sampler we need to be able to satisfy the client's need
1939  * of random peers.
1940  */
1941 static unsigned int sampler_size_client_need;
1942
1943 /**
1944  * The size of sampler we need to be able to satisfy the Brahms protocol's
1945  * need of random peers.
1946  *
1947  * This is one minimum size the sampler grows to.
1948  */
1949 static unsigned int sampler_size_est_need;
1950
1951 /**
1952  * @brief This is the minimum estimate used as sampler size.
1953  *
1954  * It is configured by the user.
1955  */
1956 static unsigned int sampler_size_est_min;
1957
1958 /**
1959  * @brief This is the estimate used as view size.
1960  *
1961  * It is initialised with the minimum
1962  */
1963 static unsigned int view_size_est_need;
1964
1965 /**
1966  * @brief This is the minimum estimate used as view size.
1967  *
1968  * It is configured by the user.
1969  */
1970 static unsigned int view_size_est_min;
1971
1972 /**
1973  * Percentage of total peer number in the view
1974  * to send random PUSHes to
1975  */
1976 static float alpha;
1977
1978 /**
1979  * Percentage of total peer number in the view
1980  * to send random PULLs to
1981  */
1982 static float beta;
1983
1984 /**
1985  * Identifier for the main task that runs periodically.
1986  */
1987 static struct GNUNET_SCHEDULER_Task *do_round_task;
1988
1989 /**
1990  * Time inverval the do_round task runs in.
1991  */
1992 static struct GNUNET_TIME_Relative round_interval;
1993
1994 /**
1995  * List to store peers received through pushes temporary.
1996  */
1997 static struct CustomPeerMap *push_map;
1998
1999 /**
2000  * List to store peers received through pulls temporary.
2001  */
2002 static struct CustomPeerMap *pull_map;
2003
2004 /**
2005  * Handler to NSE.
2006  */
2007 static struct GNUNET_NSE_Handle *nse;
2008
2009 /**
2010  * Handler to CADET.
2011  */
2012 static struct GNUNET_CADET_Handle *cadet_handle;
2013
2014 /**
2015  * @brief Port to communicate to other peers.
2016  */
2017 static struct GNUNET_CADET_Port *cadet_port;
2018
2019 /**
2020  * Handler to PEERINFO.
2021  */
2022 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2023
2024 /**
2025  * Handle for cancellation of iteration over peers.
2026  */
2027 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2028
2029 /**
2030  * Request counter.
2031  *
2032  * Counts how many requets clients already issued.
2033  * Only needed in the beginning to check how many of the 64 deltas
2034  * we already have
2035  */
2036 static unsigned int req_counter;
2037
2038 /**
2039  * Time of the last request we received.
2040  *
2041  * Used to compute the expected request rate.
2042  */
2043 static struct GNUNET_TIME_Absolute last_request;
2044
2045 /**
2046  * Size of #request_deltas.
2047  */
2048 #define REQUEST_DELTAS_SIZE 64
2049 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2050
2051 /**
2052  * Last 64 deltas between requests
2053  */
2054 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2055
2056 /**
2057  * The prediction of the rate of requests
2058  */
2059 static struct GNUNET_TIME_Relative request_rate;
2060
2061
2062 #ifdef ENABLE_MALICIOUS
2063 /**
2064  * Type of malicious peer
2065  *
2066  * 0 Don't act malicious at all - Default
2067  * 1 Try to maximise representation
2068  * 2 Try to partition the network
2069  * 3 Combined attack
2070  */
2071 static uint32_t mal_type;
2072
2073 /**
2074  * Other malicious peers
2075  */
2076 static struct GNUNET_PeerIdentity *mal_peers;
2077
2078 /**
2079  * Hashmap of malicious peers used as set.
2080  * Used to more efficiently check whether we know that peer.
2081  */
2082 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2083
2084 /**
2085  * Number of other malicious peers
2086  */
2087 static uint32_t num_mal_peers;
2088
2089
2090 /**
2091  * If type is 2 This struct is used to store the attacked peers in a DLL
2092  */
2093 struct AttackedPeer
2094 {
2095   /**
2096    * DLL
2097    */
2098   struct AttackedPeer *next;
2099   struct AttackedPeer *prev;
2100
2101   /**
2102    * PeerID
2103    */
2104   struct GNUNET_PeerIdentity peer_id;
2105 };
2106
2107 /**
2108  * If type is 2 this is the DLL of attacked peers
2109  */
2110 static struct AttackedPeer *att_peers_head;
2111 static struct AttackedPeer *att_peers_tail;
2112
2113 /**
2114  * This index is used to point to an attacked peer to
2115  * implement the round-robin-ish way to select attacked peers.
2116  */
2117 static struct AttackedPeer *att_peer_index;
2118
2119 /**
2120  * Hashmap of attacked peers used as set.
2121  * Used to more efficiently check whether we know that peer.
2122  */
2123 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2124
2125 /**
2126  * Number of attacked peers
2127  */
2128 static uint32_t num_attacked_peers;
2129
2130 /**
2131  * If type is 1 this is the attacked peer
2132  */
2133 static struct GNUNET_PeerIdentity attacked_peer;
2134
2135 /**
2136  * The limit of PUSHes we can send in one round.
2137  * This is an assumption of the Brahms protocol and either implemented
2138  * via proof of work
2139  * or
2140  * assumend to be the bandwidth limitation.
2141  */
2142 static uint32_t push_limit = 10000;
2143 #endif /* ENABLE_MALICIOUS */
2144
2145
2146 /***********************************************************************
2147  * /Globals
2148 ***********************************************************************/
2149
2150
2151 /***********************************************************************
2152  * Util functions
2153 ***********************************************************************/
2154
2155
2156 /**
2157  * Print peerlist to log.
2158  */
2159 static void
2160 print_peer_list (struct GNUNET_PeerIdentity *list,
2161                  unsigned int len)
2162 {
2163   unsigned int i;
2164
2165   LOG (GNUNET_ERROR_TYPE_DEBUG,
2166        "Printing peer list of length %u at %p:\n",
2167        len,
2168        list);
2169   for (i = 0 ; i < len ; i++)
2170   {
2171     LOG (GNUNET_ERROR_TYPE_DEBUG,
2172          "%u. peer: %s\n",
2173          i, GNUNET_i2s (&list[i]));
2174   }
2175 }
2176
2177
2178 /**
2179  * Remove peer from list.
2180  */
2181 static void
2182 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2183                unsigned int *list_size,
2184                const struct GNUNET_PeerIdentity *peer)
2185 {
2186   unsigned int i;
2187   struct GNUNET_PeerIdentity *tmp;
2188
2189   tmp = *peer_list;
2190
2191   LOG (GNUNET_ERROR_TYPE_DEBUG,
2192        "Removing peer %s from list at %p\n",
2193        GNUNET_i2s (peer),
2194        tmp);
2195
2196   for ( i = 0 ; i < *list_size ; i++ )
2197   {
2198     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2199     {
2200       if (i < *list_size -1)
2201       { /* Not at the last entry -- shift peers left */
2202         memmove (&tmp[i], &tmp[i +1],
2203                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2204       }
2205       /* Remove last entry (should be now useless PeerID) */
2206       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2207     }
2208   }
2209   *peer_list = tmp;
2210 }
2211
2212
2213 /**
2214  * Sum all time relatives of an array.
2215  */
2216 static struct GNUNET_TIME_Relative
2217 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2218                 uint32_t arr_size)
2219 {
2220   struct GNUNET_TIME_Relative sum;
2221   uint32_t i;
2222
2223   sum = GNUNET_TIME_UNIT_ZERO;
2224   for ( i = 0 ; i < arr_size ; i++ )
2225   {
2226     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2227   }
2228   return sum;
2229 }
2230
2231
2232 /**
2233  * Compute the average of given time relatives.
2234  */
2235 static struct GNUNET_TIME_Relative
2236 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2237                 uint32_t arr_size)
2238 {
2239   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2240                                                       arr_size),
2241                                       arr_size);
2242 }
2243
2244
2245 /**
2246  * Insert PeerID in #view
2247  *
2248  * Called once we know a peer is live.
2249  * Implements #PeerOp
2250  *
2251  * @return GNUNET_OK if peer was actually inserted
2252  *         GNUNET_NO if peer was not inserted
2253  */
2254 static void
2255 insert_in_view_op (void *cls,
2256                 const struct GNUNET_PeerIdentity *peer);
2257
2258 /**
2259  * Insert PeerID in #view
2260  *
2261  * Called once we know a peer is live.
2262  *
2263  * @return GNUNET_OK if peer was actually inserted
2264  *         GNUNET_NO if peer was not inserted
2265  */
2266 static int
2267 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2268 {
2269   int online;
2270
2271   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2272   if ( (GNUNET_NO == online) ||
2273        (GNUNET_SYSERR == online) ) /* peer is not even known */
2274   {
2275     (void) Peers_issue_peer_liveliness_check (peer);
2276     (void) Peers_schedule_operation (peer, insert_in_view_op);
2277     return GNUNET_NO;
2278   }
2279   /* Open channel towards peer to keep connection open */
2280   Peers_indicate_sending_intention (peer);
2281   return View_put (peer);
2282 }
2283
2284 /**
2285  * @brief sends updates to clients that are interested
2286  */
2287 static void
2288 clients_notify_view_update (void);
2289
2290 /**
2291  * Put random peer from sampler into the view as history update.
2292  */
2293 static void
2294 hist_update (void *cls,
2295              struct GNUNET_PeerIdentity *ids,
2296              uint32_t num_peers)
2297 {
2298   unsigned int i;
2299
2300   for (i = 0; i < num_peers; i++)
2301   {
2302     (void) insert_in_view (&ids[i]);
2303     to_file (file_name_view_log,
2304              "+%s\t(hist)",
2305              GNUNET_i2s_full (ids));
2306   }
2307   clients_notify_view_update();
2308 }
2309
2310
2311 /**
2312  * Wrapper around #RPS_sampler_resize()
2313  *
2314  * If we do not have enough sampler elements, double current sampler size
2315  * If we have more than enough sampler elements, halv current sampler size
2316  */
2317 static void
2318 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2319 {
2320   unsigned int sampler_size;
2321
2322   // TODO statistics
2323   // TODO respect the min, max
2324   sampler_size = RPS_sampler_get_size (sampler);
2325   if (sampler_size > new_size * 4)
2326   { /* Shrinking */
2327     RPS_sampler_resize (sampler, sampler_size / 2);
2328   }
2329   else if (sampler_size < new_size)
2330   { /* Growing */
2331     RPS_sampler_resize (sampler, sampler_size * 2);
2332   }
2333   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2334 }
2335
2336
2337 /**
2338  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2339  */
2340 static void
2341 client_resize_wrapper ()
2342 {
2343   uint32_t bigger_size;
2344
2345   // TODO statistics
2346
2347   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2348
2349   // TODO respect the min, max
2350   resize_wrapper (client_sampler, bigger_size);
2351   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2352       bigger_size);
2353 }
2354
2355
2356 /**
2357  * Estimate request rate
2358  *
2359  * Called every time we receive a request from the client.
2360  */
2361 static void
2362 est_request_rate()
2363 {
2364   struct GNUNET_TIME_Relative max_round_duration;
2365
2366   if (request_deltas_size > req_counter)
2367     req_counter++;
2368   if ( 1 < req_counter)
2369   {
2370     /* Shift last request deltas to the right */
2371     memmove (&request_deltas[1],
2372         request_deltas,
2373         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2374
2375     /* Add current delta to beginning */
2376     request_deltas[0] =
2377         GNUNET_TIME_absolute_get_difference (last_request,
2378                                              GNUNET_TIME_absolute_get ());
2379     request_rate = T_relative_avg (request_deltas, req_counter);
2380     request_rate = (request_rate.rel_value_us < 1) ?
2381       GNUNET_TIME_relative_get_unit_ () : request_rate;
2382
2383     /* Compute the duration a round will maximally take */
2384     max_round_duration =
2385         GNUNET_TIME_relative_add (round_interval,
2386                                   GNUNET_TIME_relative_divide (round_interval, 2));
2387
2388     /* Set the estimated size the sampler has to have to
2389      * satisfy the current client request rate */
2390     sampler_size_client_need =
2391         max_round_duration.rel_value_us / request_rate.rel_value_us;
2392
2393     /* Resize the sampler */
2394     client_resize_wrapper ();
2395   }
2396   last_request = GNUNET_TIME_absolute_get ();
2397 }
2398
2399
2400 /**
2401  * Add all peers in @a peer_array to @a peer_map used as set.
2402  *
2403  * @param peer_array array containing the peers
2404  * @param num_peers number of peers in @peer_array
2405  * @param peer_map the peermap to use as set
2406  */
2407 static void
2408 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2409                        unsigned int num_peers,
2410                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2411 {
2412   unsigned int i;
2413   if (NULL == peer_map)
2414   {
2415     LOG (GNUNET_ERROR_TYPE_WARNING,
2416          "Trying to add peers to non-existing peermap.\n");
2417     return;
2418   }
2419
2420   for (i = 0; i < num_peers; i++)
2421   {
2422     GNUNET_CONTAINER_multipeermap_put (peer_map,
2423                                        &peer_array[i],
2424                                        NULL,
2425                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2426   }
2427 }
2428
2429
2430 /**
2431  * Send a PULL REPLY to @a peer_id
2432  *
2433  * @param peer_id the peer to send the reply to.
2434  * @param peer_ids the peers to send to @a peer_id
2435  * @param num_peer_ids the number of peers to send to @a peer_id
2436  */
2437 static void
2438 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2439                  const struct GNUNET_PeerIdentity *peer_ids,
2440                  unsigned int num_peer_ids)
2441 {
2442   uint32_t send_size;
2443   struct GNUNET_MQ_Envelope *ev;
2444   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2445
2446   /* Compute actual size */
2447   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2448               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2449
2450   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2451     /* Compute number of peers to send
2452      * If too long, simply truncate */
2453     // TODO select random ones via permutation
2454     //      or even better: do good protocol design
2455     send_size =
2456       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2457        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2458        sizeof (struct GNUNET_PeerIdentity);
2459   else
2460     send_size = num_peer_ids;
2461
2462   LOG (GNUNET_ERROR_TYPE_DEBUG,
2463       "Going to send PULL REPLY with %u peers to %s\n",
2464       send_size, GNUNET_i2s (peer_id));
2465
2466   ev = GNUNET_MQ_msg_extra (out_msg,
2467                             send_size * sizeof (struct GNUNET_PeerIdentity),
2468                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2469   out_msg->num_peers = htonl (send_size);
2470   GNUNET_memcpy (&out_msg[1], peer_ids,
2471          send_size * sizeof (struct GNUNET_PeerIdentity));
2472
2473   Peers_send_message (peer_id, ev, "PULL REPLY");
2474   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2475 }
2476
2477
2478 /**
2479  * Insert PeerID in #pull_map
2480  *
2481  * Called once we know a peer is live.
2482  */
2483 static void
2484 insert_in_pull_map (void *cls,
2485                     const struct GNUNET_PeerIdentity *peer)
2486 {
2487   CustomPeerMap_put (pull_map, peer);
2488 }
2489
2490
2491 /**
2492  * Insert PeerID in #view
2493  *
2494  * Called once we know a peer is live.
2495  * Implements #PeerOp
2496  */
2497 static void
2498 insert_in_view_op (void *cls,
2499                 const struct GNUNET_PeerIdentity *peer)
2500 {
2501   (void) insert_in_view (peer);
2502 }
2503
2504
2505 /**
2506  * Update sampler with given PeerID.
2507  * Implements #PeerOp
2508  */
2509 static void
2510 insert_in_sampler (void *cls,
2511                    const struct GNUNET_PeerIdentity *peer)
2512 {
2513   LOG (GNUNET_ERROR_TYPE_DEBUG,
2514        "Updating samplers with peer %s from insert_in_sampler()\n",
2515        GNUNET_i2s (peer));
2516   RPS_sampler_update (prot_sampler,   peer);
2517   RPS_sampler_update (client_sampler, peer);
2518   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2519   {
2520     /* Make sure we 'know' about this peer */
2521     (void) Peers_issue_peer_liveliness_check (peer);
2522     /* Establish a channel towards that peer to indicate we are going to send
2523      * messages to it */
2524     //Peers_indicate_sending_intention (peer);
2525   }
2526   #ifdef TO_FILE
2527   num_observed_peers++;
2528   GNUNET_CONTAINER_multipeermap_put
2529     (observed_unique_peers,
2530      peer,
2531      NULL,
2532      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2533   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2534       observed_unique_peers);
2535   to_file (file_name_observed_log,
2536           "%" PRIu32 " %" PRIu32 " %f\n",
2537           num_observed_peers,
2538           num_observed_unique_peers,
2539           1.0*num_observed_unique_peers/num_observed_peers)
2540   #endif /* TO_FILE */
2541 }
2542
2543 /**
2544  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2545  *        If the peer is not known, liveliness check is issued and it is
2546  *        scheduled to be inserted in sampler and view.
2547  *
2548  * "External sources" refer to every source except the gossip.
2549  *
2550  * @param peer peer to insert
2551  */
2552 static void
2553 got_peer (const struct GNUNET_PeerIdentity *peer)
2554 {
2555   /* If we did not know this peer already, insert it into sampler and view */
2556   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2557   {
2558     Peers_schedule_operation (peer, insert_in_sampler);
2559     Peers_schedule_operation (peer, insert_in_view_op);
2560   }
2561 }
2562
2563 /**
2564  * @brief Checks if there is a sending channel and if it is needed
2565  *
2566  * @param peer the peer whose sending channel is checked
2567  * @return GNUNET_YES if sending channel exists and is still needed
2568  *         GNUNET_NO  otherwise
2569  */
2570 static int
2571 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2572 {
2573   /* struct GNUNET_CADET_Channel *channel; */
2574   if (GNUNET_NO == Peers_check_peer_known (peer))
2575   {
2576     return GNUNET_NO;
2577   }
2578   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2579   {
2580     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2581          (GNUNET_YES == View_contains_peer (peer)) ||
2582          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2583          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2584          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2585     { /* If we want to keep the connection to peer open */
2586       return GNUNET_YES;
2587     }
2588     return GNUNET_NO;
2589   }
2590   return GNUNET_NO;
2591 }
2592
2593 /**
2594  * @brief remove peer from our knowledge, the view, push and pull maps and
2595  * samplers.
2596  *
2597  * @param peer the peer to remove
2598  */
2599 static void
2600 remove_peer (const struct GNUNET_PeerIdentity *peer)
2601 {
2602   (void) View_remove_peer (peer);
2603   CustomPeerMap_remove_peer (pull_map, peer);
2604   CustomPeerMap_remove_peer (push_map, peer);
2605   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2606   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2607   Peers_remove_peer (peer);
2608 }
2609
2610
2611 /**
2612  * @brief Remove data that is not needed anymore.
2613  *
2614  * If the sending channel is no longer needed it is destroyed.
2615  *
2616  * @param peer the peer whose data is about to be cleaned
2617  */
2618 static void
2619 clean_peer (const struct GNUNET_PeerIdentity *peer)
2620 {
2621   if (GNUNET_NO == check_sending_channel_needed (peer))
2622   {
2623     LOG (GNUNET_ERROR_TYPE_DEBUG,
2624         "Going to remove send channel to peer %s\n",
2625         GNUNET_i2s (peer));
2626     #ifdef ENABLE_MALICIOUS
2627     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2628       (void) Peers_destroy_sending_channel (peer);
2629     #else /* ENABLE_MALICIOUS */
2630     (void) Peers_destroy_sending_channel (peer);
2631     #endif /* ENABLE_MALICIOUS */
2632   }
2633
2634   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2635        (GNUNET_NO == View_contains_peer (peer)) &&
2636        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2637        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2638        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2639        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2640        (GNUNET_NO != Peers_check_removable (peer)) )
2641   { /* We can safely remove this peer */
2642     LOG (GNUNET_ERROR_TYPE_DEBUG,
2643         "Going to remove peer %s\n",
2644         GNUNET_i2s (peer));
2645     remove_peer (peer);
2646     return;
2647   }
2648 }
2649
2650 /**
2651  * @brief This is called when a channel is destroyed.
2652  *
2653  * Removes peer completely from our knowledge if the send_channel was destroyed
2654  * Otherwise simply delete the recv_channel
2655  * Also check if the knowledge about this peer is still needed.
2656  * If not, remove this peer from our knowledge.
2657  *
2658  * @param cls The closure
2659  * @param channel The channel being closed
2660  * @param channel_ctx The context associated with this channel
2661  */
2662 static void
2663 cleanup_destroyed_channel (void *cls,
2664                            const struct GNUNET_CADET_Channel *channel)
2665 {
2666   struct GNUNET_PeerIdentity *peer = cls;
2667   uint32_t *channel_flag;
2668   struct PeerContext *peer_ctx;
2669
2670   GNUNET_assert (NULL != peer);
2671
2672   if (GNUNET_NO == Peers_check_peer_known (peer))
2673   { /* We don't know a context to that peer */
2674     LOG (GNUNET_ERROR_TYPE_WARNING,
2675          "channel (%s) without associated context was destroyed\n",
2676          GNUNET_i2s (peer));
2677     GNUNET_free (peer);
2678     return;
2679   }
2680
2681   peer_ctx = get_peer_ctx (peer);
2682   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2683   {
2684     LOG (GNUNET_ERROR_TYPE_DEBUG,
2685         "Callback on destruction of recv-channel was called (%s)\n",
2686         GNUNET_i2s (peer));
2687     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2688   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2689   {
2690     LOG (GNUNET_ERROR_TYPE_DEBUG,
2691         "Callback on destruction of send-channel was called (%s)\n",
2692         GNUNET_i2s (peer));
2693     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2694   } else {
2695     LOG (GNUNET_ERROR_TYPE_ERROR,
2696         "Channel to be destroyed has is neither sending nor receiving role\n");
2697   }
2698
2699   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2700   { /* We are in the middle of removing that peer from our knowledge. In this
2701        case simply make sure that the channels are cleaned. */
2702     Peers_cleanup_destroyed_channel (cls, channel);
2703     to_file (file_name_view_log,
2704              "-%s\t(cleanup channel, ourself)",
2705              GNUNET_i2s_full (peer));
2706     GNUNET_free (peer);
2707     return;
2708   }
2709
2710   if (GNUNET_YES ==
2711       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2712   { /* Channel used for sending was destroyed */
2713     /* Possible causes of channel destruction:
2714      *  - ourselves  -> cleaning send channel -> clean context
2715      *  - other peer -> peer probably went down -> remove
2716      */
2717     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2718     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2719     { /* We are about to clean the sending channel. Clean the respective
2720        * context */
2721       Peers_cleanup_destroyed_channel (cls, channel);
2722       GNUNET_free (peer);
2723       return;
2724     }
2725     else
2726     { /* Other peer destroyed our sending channel that he is supposed to keep
2727        * open. It probably went down. Remove it from our knowledge. */
2728       Peers_cleanup_destroyed_channel (cls, channel);
2729       remove_peer (peer);
2730       GNUNET_free (peer);
2731       return;
2732     }
2733   }
2734   else if (GNUNET_YES ==
2735       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2736   { /* Channel used for receiving was destroyed */
2737     /* Possible causes of channel destruction:
2738      *  - ourselves  -> peer tried to establish channel twice -> clean context
2739      *  - other peer -> peer doesn't want to send us data -> clean
2740      */
2741     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2742     if (GNUNET_YES ==
2743         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2744     { /* Other peer tried to establish a channel to us twice. We do not accept
2745        * that. Clean the context. */
2746       Peers_cleanup_destroyed_channel (cls, channel);
2747       GNUNET_free (peer);
2748       return;
2749     }
2750     else
2751     { /* Other peer doesn't want to send us data anymore. We are free to clean
2752        * it. */
2753       Peers_cleanup_destroyed_channel (cls, channel);
2754       clean_peer (peer);
2755       GNUNET_free (peer);
2756       return;
2757     }
2758   }
2759   else
2760   {
2761     LOG (GNUNET_ERROR_TYPE_WARNING,
2762         "Destroyed channel is neither sending nor receiving channel\n");
2763   }
2764   GNUNET_free (peer);
2765 }
2766
2767 /***********************************************************************
2768  * /Util functions
2769 ***********************************************************************/
2770
2771 static void
2772 destroy_reply_cls (struct ReplyCls *rep_cls)
2773 {
2774   struct ClientContext *cli_ctx;
2775
2776   cli_ctx = rep_cls->cli_ctx;
2777   GNUNET_assert (NULL != cli_ctx);
2778   if (NULL != rep_cls->req_handle)
2779   {
2780     RPS_sampler_request_cancel (rep_cls->req_handle);
2781   }
2782   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2783                                cli_ctx->rep_cls_tail,
2784                                rep_cls);
2785   GNUNET_free (rep_cls);
2786 }
2787
2788
2789 static void
2790 destroy_cli_ctx (struct ClientContext *cli_ctx)
2791 {
2792   GNUNET_assert (NULL != cli_ctx);
2793   if (NULL != cli_ctx->rep_cls_head)
2794   {
2795     LOG (GNUNET_ERROR_TYPE_WARNING,
2796          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2797     while (NULL != cli_ctx->rep_cls_head)
2798       destroy_reply_cls (cli_ctx->rep_cls_head);
2799   }
2800   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2801                                cli_ctx_tail,
2802                                cli_ctx);
2803   GNUNET_free (cli_ctx);
2804 }
2805
2806
2807 /**
2808  * Function called by NSE.
2809  *
2810  * Updates sizes of sampler list and view and adapt those lists
2811  * accordingly.
2812  */
2813 static void
2814 nse_callback (void *cls,
2815               struct GNUNET_TIME_Absolute timestamp,
2816               double logestimate, double std_dev)
2817 {
2818   double estimate;
2819   //double scale; // TODO this might go gloabal/config
2820
2821   LOG (GNUNET_ERROR_TYPE_DEBUG,
2822        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2823        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2824   //scale = .01;
2825   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2826   // GNUNET_NSE_log_estimate_to_n (logestimate);
2827   estimate = pow (estimate, 1.0 / 3);
2828   // TODO add if std_dev is a number
2829   // estimate += (std_dev * scale);
2830   if (view_size_est_min < ceil (estimate))
2831   {
2832     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2833     sampler_size_est_need = estimate;
2834     view_size_est_need = estimate;
2835   } else
2836   {
2837     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2838     //sampler_size_est_need = view_size_est_min;
2839     view_size_est_need = view_size_est_min;
2840   }
2841
2842   /* If the NSE has changed adapt the lists accordingly */
2843   resize_wrapper (prot_sampler, sampler_size_est_need);
2844   client_resize_wrapper ();
2845 }
2846
2847
2848 /**
2849  * Callback called once the requested PeerIDs are ready.
2850  *
2851  * Sends those to the requesting client.
2852  */
2853 static void
2854 client_respond (void *cls,
2855                 struct GNUNET_PeerIdentity *peer_ids,
2856                 uint32_t num_peers)
2857 {
2858   struct ReplyCls *reply_cls = cls;
2859   uint32_t i;
2860   struct GNUNET_MQ_Envelope *ev;
2861   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2862   uint32_t size_needed;
2863   struct ClientContext *cli_ctx;
2864
2865   GNUNET_assert (NULL != reply_cls);
2866   LOG (GNUNET_ERROR_TYPE_DEBUG,
2867        "sampler returned %" PRIu32 " peers:\n",
2868        num_peers);
2869   for (i = 0; i < num_peers; i++)
2870   {
2871     LOG (GNUNET_ERROR_TYPE_DEBUG,
2872          "  %" PRIu32 ": %s\n",
2873          i,
2874          GNUNET_i2s (&peer_ids[i]));
2875   }
2876
2877   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2878                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2879
2880   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2881
2882   ev = GNUNET_MQ_msg_extra (out_msg,
2883                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2884                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2885   out_msg->num_peers = htonl (num_peers);
2886   out_msg->id = htonl (reply_cls->id);
2887
2888   GNUNET_memcpy (&out_msg[1],
2889           peer_ids,
2890           num_peers * sizeof (struct GNUNET_PeerIdentity));
2891   GNUNET_free (peer_ids);
2892
2893   cli_ctx = reply_cls->cli_ctx;
2894   GNUNET_assert (NULL != cli_ctx);
2895   reply_cls->req_handle = NULL;
2896   destroy_reply_cls (reply_cls);
2897   GNUNET_MQ_send (cli_ctx->mq, ev);
2898 }
2899
2900
2901 /**
2902  * Handle RPS request from the client.
2903  *
2904  * @param cls closure
2905  * @param message the actual message
2906  */
2907 static void
2908 handle_client_request (void *cls,
2909                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2910 {
2911   struct ClientContext *cli_ctx = cls;
2912   uint32_t num_peers;
2913   uint32_t size_needed;
2914   struct ReplyCls *reply_cls;
2915   uint32_t i;
2916
2917   num_peers = ntohl (msg->num_peers);
2918   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2919                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2920
2921   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2922   {
2923     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2924                 "Message received from client has size larger than expected\n");
2925     GNUNET_SERVICE_client_drop (cli_ctx->client);
2926     return;
2927   }
2928
2929   for (i = 0 ; i < num_peers ; i++)
2930     est_request_rate();
2931
2932   LOG (GNUNET_ERROR_TYPE_DEBUG,
2933        "Client requested %" PRIu32 " random peer(s).\n",
2934        num_peers);
2935
2936   reply_cls = GNUNET_new (struct ReplyCls);
2937   reply_cls->id = ntohl (msg->id);
2938   reply_cls->cli_ctx = cli_ctx;
2939   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2940                                                         client_respond,
2941                                                         reply_cls,
2942                                                         num_peers);
2943
2944   GNUNET_assert (NULL != cli_ctx);
2945   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2946                                cli_ctx->rep_cls_tail,
2947                                reply_cls);
2948   GNUNET_SERVICE_client_continue (cli_ctx->client);
2949 }
2950
2951
2952 /**
2953  * @brief Handle a message that requests the cancellation of a request
2954  *
2955  * @param cls unused
2956  * @param message the message containing the id of the request
2957  */
2958 static void
2959 handle_client_request_cancel (void *cls,
2960                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2961 {
2962   struct ClientContext *cli_ctx = cls;
2963   struct ReplyCls *rep_cls;
2964
2965   GNUNET_assert (NULL != cli_ctx);
2966   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2967   rep_cls = cli_ctx->rep_cls_head;
2968   LOG (GNUNET_ERROR_TYPE_DEBUG,
2969       "Client cancels request with id %" PRIu32 "\n",
2970       ntohl (msg->id));
2971   while ( (NULL != rep_cls->next) &&
2972           (rep_cls->id != ntohl (msg->id)) )
2973     rep_cls = rep_cls->next;
2974   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2975   destroy_reply_cls (rep_cls);
2976   GNUNET_SERVICE_client_continue (cli_ctx->client);
2977 }
2978
2979
2980 /**
2981  * @brief This function is called, when the client seeds peers.
2982  * It verifies that @a msg is well-formed.
2983  *
2984  * @param cls the closure (#ClientContext)
2985  * @param msg the message
2986  * @return #GNUNET_OK if @a msg is well-formed
2987  */
2988 static int
2989 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2990 {
2991   struct ClientContext *cli_ctx = cls;
2992   uint16_t msize = ntohs (msg->header.size);
2993   uint32_t num_peers = ntohl (msg->num_peers);
2994
2995   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2996   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2997        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2998   {
2999     GNUNET_break (0);
3000     GNUNET_SERVICE_client_drop (cli_ctx->client);
3001     return GNUNET_SYSERR;
3002   }
3003   return GNUNET_OK;
3004 }
3005
3006
3007 /**
3008  * Handle seed from the client.
3009  *
3010  * @param cls closure
3011  * @param message the actual message
3012  */
3013 static void
3014 handle_client_seed (void *cls,
3015                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3016 {
3017   struct ClientContext *cli_ctx = cls;
3018   struct GNUNET_PeerIdentity *peers;
3019   uint32_t num_peers;
3020   uint32_t i;
3021
3022   num_peers = ntohl (msg->num_peers);
3023   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3024   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3025   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
3026
3027   LOG (GNUNET_ERROR_TYPE_DEBUG,
3028        "Client seeded peers:\n");
3029   print_peer_list (peers, num_peers);
3030
3031   for (i = 0; i < num_peers; i++)
3032   {
3033     LOG (GNUNET_ERROR_TYPE_DEBUG,
3034          "Updating samplers with seed %" PRIu32 ": %s\n",
3035          i,
3036          GNUNET_i2s (&peers[i]));
3037
3038     got_peer (&peers[i]);
3039   }
3040
3041   ////GNUNET_free (peers);
3042
3043   GNUNET_SERVICE_client_continue (cli_ctx->client);
3044 }
3045
3046 /**
3047  * @brief Send view to client
3048  *
3049  * @param cli_ctx the context of the client
3050  * @param view_array the peerids of the view as array (can be empty)
3051  * @param view_size the size of the view array (can be 0)
3052  */
3053 void
3054 send_view (const struct ClientContext *cli_ctx,
3055            const struct GNUNET_PeerIdentity *view_array,
3056            uint64_t view_size)
3057 {
3058   struct GNUNET_MQ_Envelope *ev;
3059   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3060
3061   if (NULL == view_array)
3062   {
3063     view_size = View_size ();
3064     view_array = View_get_as_array();
3065   }
3066
3067   ev = GNUNET_MQ_msg_extra (out_msg,
3068                             view_size * sizeof (struct GNUNET_PeerIdentity),
3069                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3070   out_msg->num_peers = htonl (view_size);
3071
3072   GNUNET_memcpy (&out_msg[1],
3073           view_array,
3074           view_size * sizeof (struct GNUNET_PeerIdentity));
3075   GNUNET_MQ_send (cli_ctx->mq, ev);
3076 }
3077
3078 /**
3079  * @brief sends updates to clients that are interested
3080  */
3081 static void
3082 clients_notify_view_update (void)
3083 {
3084   struct ClientContext *cli_ctx_iter;
3085   uint64_t num_peers;
3086   const struct GNUNET_PeerIdentity *view_array;
3087
3088   num_peers = View_size ();
3089   view_array = View_get_as_array();
3090   /* check size of view is small enough */
3091   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3092   {
3093     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3094                 "View is too big to send\n");
3095     return;
3096   }
3097
3098   for (cli_ctx_iter = cli_ctx_head;
3099        NULL != cli_ctx_iter;
3100        cli_ctx_iter = cli_ctx_head->next)
3101   {
3102     if (1 < cli_ctx_iter->view_updates_left)
3103     {
3104       /* Client wants to receive limited amount of updates */
3105       cli_ctx_iter->view_updates_left -= 1;
3106     } else if (1 == cli_ctx_iter->view_updates_left)
3107     {
3108       /* Last update of view for client */
3109       cli_ctx_iter->view_updates_left = -1;
3110     } else if (0 > cli_ctx_iter->view_updates_left) {
3111       /* Client is not interested in updates */
3112       continue;
3113     }
3114     /* else _updates_left == 0 - infinite amount of updates */
3115
3116     /* send view */
3117     send_view (cli_ctx_iter, view_array, num_peers);
3118   }
3119 }
3120
3121
3122 /**
3123  * Handle RPS request from the client.
3124  *
3125  * @param cls closure
3126  * @param message the actual message
3127  */
3128 static void
3129 handle_client_view_request (void *cls,
3130                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3131 {
3132   struct ClientContext *cli_ctx = cls;
3133   uint64_t num_updates;
3134
3135   num_updates = ntohl (msg->num_updates);
3136
3137   LOG (GNUNET_ERROR_TYPE_DEBUG,
3138        "Client requested %" PRIu64 " updates of view.\n",
3139        num_updates);
3140
3141   GNUNET_assert (NULL != cli_ctx);
3142   cli_ctx->view_updates_left = num_updates;
3143   send_view (cli_ctx, NULL, 0);
3144   GNUNET_SERVICE_client_continue (cli_ctx->client);
3145 }
3146
3147 /**
3148  * Handle a CHECK_LIVE message from another peer.
3149  *
3150  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3151  * the channel is blocked for all other communication.
3152  *
3153  * @param cls Closure
3154  * @param msg The message header
3155  */
3156 static void
3157 handle_peer_check (void *cls,
3158                    const struct GNUNET_MessageHeader *msg)
3159 {
3160   const struct GNUNET_PeerIdentity *peer = cls;
3161   LOG (GNUNET_ERROR_TYPE_DEBUG,
3162       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3163
3164   GNUNET_break_op (Peers_check_peer_known (peer));
3165   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3166 }
3167
3168 /**
3169  * Handle a PUSH message from another peer.
3170  *
3171  * Check the proof of work and store the PeerID
3172  * in the temporary list for pushed PeerIDs.
3173  *
3174  * @param cls Closure
3175  * @param msg The message header
3176  */
3177 static void
3178 handle_peer_push (void *cls,
3179                   const struct GNUNET_MessageHeader *msg)
3180 {
3181   const struct GNUNET_PeerIdentity *peer = cls;
3182
3183   // (check the proof of work (?))
3184
3185   LOG (GNUNET_ERROR_TYPE_DEBUG,
3186        "Received PUSH (%s)\n",
3187        GNUNET_i2s (peer));
3188   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3189
3190   #ifdef ENABLE_MALICIOUS
3191   struct AttackedPeer *tmp_att_peer;
3192
3193   if ( (1 == mal_type) ||
3194        (3 == mal_type) )
3195   { /* Try to maximise representation */
3196     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3197     tmp_att_peer->peer_id = *peer;
3198     if (NULL == att_peer_set)
3199       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3200     if (GNUNET_NO ==
3201         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3202                                                 peer))
3203     {
3204       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3205                                    att_peers_tail,
3206                                    tmp_att_peer);
3207       add_peer_array_to_set (peer, 1, att_peer_set);
3208     }
3209   }
3210
3211
3212   else if (2 == mal_type)
3213   {
3214     /* We attack one single well-known peer - simply ignore */
3215   }
3216   #endif /* ENABLE_MALICIOUS */
3217
3218   /* Add the sending peer to the push_map */
3219   CustomPeerMap_put (push_map, peer);
3220
3221   GNUNET_break_op (Peers_check_peer_known (peer));
3222   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3223 }
3224
3225
3226 /**
3227  * Handle PULL REQUEST request message from another peer.
3228  *
3229  * Reply with the view of PeerIDs.
3230  *
3231  * @param cls Closure
3232  * @param msg The message header
3233  */
3234 static void
3235 handle_peer_pull_request (void *cls,
3236                           const struct GNUNET_MessageHeader *msg)
3237 {
3238   struct GNUNET_PeerIdentity *peer = cls;
3239   const struct GNUNET_PeerIdentity *view_array;
3240
3241   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3242   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3243
3244   #ifdef ENABLE_MALICIOUS
3245   if (1 == mal_type
3246       || 3 == mal_type)
3247   { /* Try to maximise representation */
3248     send_pull_reply (peer, mal_peers, num_mal_peers);
3249   }
3250
3251   else if (2 == mal_type)
3252   { /* Try to partition network */
3253     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3254     {
3255       send_pull_reply (peer, mal_peers, num_mal_peers);
3256     }
3257   }
3258   #endif /* ENABLE_MALICIOUS */
3259
3260   GNUNET_break_op (Peers_check_peer_known (peer));
3261   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3262   view_array = View_get_as_array ();
3263   send_pull_reply (peer, view_array, View_size ());
3264 }
3265
3266
3267 /**
3268  * Check whether we sent a corresponding request and
3269  * whether this reply is the first one.
3270  *
3271  * @param cls Closure
3272  * @param msg The message header
3273  */
3274 static int
3275 check_peer_pull_reply (void *cls,
3276                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3277 {
3278   struct GNUNET_PeerIdentity *sender = cls;
3279
3280   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3281   {
3282     GNUNET_break_op (0);
3283     return GNUNET_SYSERR;
3284   }
3285
3286   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3287       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3288   {
3289     LOG (GNUNET_ERROR_TYPE_ERROR,
3290         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3291         ntohl (msg->num_peers),
3292         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3293             sizeof (struct GNUNET_PeerIdentity));
3294     GNUNET_break_op (0);
3295     return GNUNET_SYSERR;
3296   }
3297
3298   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3299   {
3300     LOG (GNUNET_ERROR_TYPE_WARNING,
3301         "Received a pull reply from a peer we didn't request one from!\n");
3302     GNUNET_break_op (0);
3303     return GNUNET_SYSERR;
3304   }
3305   return GNUNET_OK;
3306 }
3307
3308 /**
3309  * Handle PULL REPLY message from another peer.
3310  *
3311  * @param cls Closure
3312  * @param msg The message header
3313  */
3314 static void
3315 handle_peer_pull_reply (void *cls,
3316                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3317 {
3318   const struct GNUNET_PeerIdentity *peers;
3319   struct GNUNET_PeerIdentity *sender = cls;
3320   uint32_t i;
3321 #ifdef ENABLE_MALICIOUS
3322   struct AttackedPeer *tmp_att_peer;
3323 #endif /* ENABLE_MALICIOUS */
3324
3325   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3326   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3327
3328   #ifdef ENABLE_MALICIOUS
3329   // We shouldn't even receive pull replies as we're not sending
3330   if (2 == mal_type)
3331   {
3332   }
3333   #endif /* ENABLE_MALICIOUS */
3334
3335   /* Do actual logic */
3336   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3337
3338   LOG (GNUNET_ERROR_TYPE_DEBUG,
3339        "PULL REPLY received, got following %u peers:\n",
3340        ntohl (msg->num_peers));
3341
3342   for (i = 0; i < ntohl (msg->num_peers); i++)
3343   {
3344     LOG (GNUNET_ERROR_TYPE_DEBUG,
3345          "%u. %s\n",
3346          i,
3347          GNUNET_i2s (&peers[i]));
3348
3349     #ifdef ENABLE_MALICIOUS
3350     if ((NULL != att_peer_set) &&
3351         (1 == mal_type || 3 == mal_type))
3352     { /* Add attacked peer to local list */
3353       // TODO check if we sent a request and this was the first reply
3354       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3355                                                                &peers[i])
3356           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3357                                                                   &peers[i])
3358           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3359                                                    &own_identity))
3360       {
3361         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3362         tmp_att_peer->peer_id = peers[i];
3363         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3364                                      att_peers_tail,
3365                                      tmp_att_peer);
3366         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3367       }
3368       continue;
3369     }
3370     #endif /* ENABLE_MALICIOUS */
3371     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3372                                               &peers[i]))
3373     {
3374       /* Make sure we 'know' about this peer */
3375       (void) Peers_insert_peer (&peers[i]);
3376
3377       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3378       {
3379         CustomPeerMap_put (pull_map, &peers[i]);
3380       }
3381       else
3382       {
3383         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3384         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3385       }
3386     }
3387   }
3388
3389   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3390   clean_peer (sender);
3391
3392   GNUNET_break_op (Peers_check_peer_known (sender));
3393   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3394 }
3395
3396
3397 /**
3398  * Compute a random delay.
3399  * A uniformly distributed value between mean + spread and mean - spread.
3400  *
3401  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3402  * It would return a random value between 2 and 6 min.
3403  *
3404  * @param mean the mean
3405  * @param spread the inverse amount of deviation from the mean
3406  */
3407 static struct GNUNET_TIME_Relative
3408 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3409                     unsigned int spread)
3410 {
3411   struct GNUNET_TIME_Relative half_interval;
3412   struct GNUNET_TIME_Relative ret;
3413   unsigned int rand_delay;
3414   unsigned int max_rand_delay;
3415
3416   if (0 == spread)
3417   {
3418     LOG (GNUNET_ERROR_TYPE_WARNING,
3419          "Not accepting spread of 0\n");
3420     GNUNET_break (0);
3421     GNUNET_assert (0);
3422   }
3423   GNUNET_assert (0 != mean.rel_value_us);
3424
3425   /* Compute random time value between spread * mean and spread * mean */
3426   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3427
3428   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3429   /**
3430    * Compute random value between (0 and 1) * round_interval
3431    * via multiplying round_interval with a 'fraction' (0 to value)/value
3432    */
3433   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3434   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3435   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3436   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3437
3438   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3439     LOG (GNUNET_ERROR_TYPE_WARNING,
3440          "Returning FOREVER_REL\n");
3441
3442   return ret;
3443 }
3444
3445
3446 /**
3447  * Send single pull request
3448  *
3449  * @param peer_id the peer to send the pull request to.
3450  */
3451 static void
3452 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3453 {
3454   struct GNUNET_MQ_Envelope *ev;
3455
3456   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3457                                                      Peers_PULL_REPLY_PENDING));
3458   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3459
3460   LOG (GNUNET_ERROR_TYPE_DEBUG,
3461        "Going to send PULL REQUEST to peer %s.\n",
3462        GNUNET_i2s (peer));
3463
3464   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3465   Peers_send_message (peer, ev, "PULL REQUEST");
3466   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3467 }
3468
3469
3470 /**
3471  * Send single push
3472  *
3473  * @param peer_id the peer to send the push to.
3474  */
3475 static void
3476 send_push (const struct GNUNET_PeerIdentity *peer_id)
3477 {
3478   struct GNUNET_MQ_Envelope *ev;
3479
3480   LOG (GNUNET_ERROR_TYPE_DEBUG,
3481        "Going to send PUSH to peer %s.\n",
3482        GNUNET_i2s (peer_id));
3483
3484   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3485   Peers_send_message (peer_id, ev, "PUSH");
3486   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3487 }
3488
3489
3490 static void
3491 do_round (void *cls);
3492
3493 static void
3494 do_mal_round (void *cls);
3495
3496 #ifdef ENABLE_MALICIOUS
3497
3498
3499 /**
3500  * @brief This function is called, when the client tells us to act malicious.
3501  * It verifies that @a msg is well-formed.
3502  *
3503  * @param cls the closure (#ClientContext)
3504  * @param msg the message
3505  * @return #GNUNET_OK if @a msg is well-formed
3506  */
3507 static int
3508 check_client_act_malicious (void *cls,
3509                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3510 {
3511   struct ClientContext *cli_ctx = cls;
3512   uint16_t msize = ntohs (msg->header.size);
3513   uint32_t num_peers = ntohl (msg->num_peers);
3514
3515   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3516   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3517        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3518   {
3519     LOG (GNUNET_ERROR_TYPE_ERROR,
3520         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3521         ntohl (msg->num_peers),
3522         (msize / sizeof (struct GNUNET_PeerIdentity)));
3523     GNUNET_break (0);
3524     GNUNET_SERVICE_client_drop (cli_ctx->client);
3525     return GNUNET_SYSERR;
3526   }
3527   return GNUNET_OK;
3528 }
3529
3530 /**
3531  * Turn RPS service to act malicious.
3532  *
3533  * @param cls Closure
3534  * @param client The client that sent the message
3535  * @param msg The message header
3536  */
3537 static void
3538 handle_client_act_malicious (void *cls,
3539                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3540 {
3541   struct ClientContext *cli_ctx = cls;
3542   struct GNUNET_PeerIdentity *peers;
3543   uint32_t num_mal_peers_sent;
3544   uint32_t num_mal_peers_old;
3545
3546   /* Do actual logic */
3547   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3548   mal_type = ntohl (msg->type);
3549   if (NULL == mal_peer_set)
3550     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3551
3552   LOG (GNUNET_ERROR_TYPE_DEBUG,
3553        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3554        mal_type,
3555        ntohl (msg->num_peers));
3556
3557   if (1 == mal_type)
3558   { /* Try to maximise representation */
3559     /* Add other malicious peers to those we already know */
3560
3561     num_mal_peers_sent = ntohl (msg->num_peers);
3562     num_mal_peers_old = num_mal_peers;
3563     GNUNET_array_grow (mal_peers,
3564                        num_mal_peers,
3565                        num_mal_peers + num_mal_peers_sent);
3566     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3567             peers,
3568             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3569
3570     /* Add all mal peers to mal_peer_set */
3571     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3572                            num_mal_peers_sent,
3573                            mal_peer_set);
3574
3575     /* Substitute do_round () with do_mal_round () */
3576     GNUNET_SCHEDULER_cancel (do_round_task);
3577     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3578   }
3579
3580   else if ( (2 == mal_type) ||
3581             (3 == mal_type) )
3582   { /* Try to partition the network */
3583     /* Add other malicious peers to those we already know */
3584
3585     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3586     num_mal_peers_old = num_mal_peers;
3587     GNUNET_array_grow (mal_peers,
3588                        num_mal_peers,
3589                        num_mal_peers + num_mal_peers_sent);
3590     if (NULL != mal_peers &&
3591         0 != num_mal_peers)
3592     {
3593       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3594               peers,
3595               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3596
3597       /* Add all mal peers to mal_peer_set */
3598       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3599                              num_mal_peers_sent,
3600                              mal_peer_set);
3601     }
3602
3603     /* Store the one attacked peer */
3604     GNUNET_memcpy (&attacked_peer,
3605             &msg->attacked_peer,
3606             sizeof (struct GNUNET_PeerIdentity));
3607     /* Set the flag of the attacked peer to valid to avoid problems */
3608     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3609     {
3610       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3611     }
3612
3613     LOG (GNUNET_ERROR_TYPE_DEBUG,
3614          "Attacked peer is %s\n",
3615          GNUNET_i2s (&attacked_peer));
3616
3617     /* Substitute do_round () with do_mal_round () */
3618     GNUNET_SCHEDULER_cancel (do_round_task);
3619     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3620   }
3621   else if (0 == mal_type)
3622   { /* Stop acting malicious */
3623     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3624
3625     /* Substitute do_mal_round () with do_round () */
3626     GNUNET_SCHEDULER_cancel (do_round_task);
3627     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3628   }
3629   else
3630   {
3631     GNUNET_break (0);
3632     GNUNET_SERVICE_client_continue (cli_ctx->client);
3633   }
3634   GNUNET_SERVICE_client_continue (cli_ctx->client);
3635 }
3636
3637
3638 /**
3639  * Send out PUSHes and PULLs maliciously.
3640  *
3641  * This is executed regylary.
3642  */
3643 static void
3644 do_mal_round (void *cls)
3645 {
3646   uint32_t num_pushes;
3647   uint32_t i;
3648   struct GNUNET_TIME_Relative time_next_round;
3649   struct AttackedPeer *tmp_att_peer;
3650
3651   LOG (GNUNET_ERROR_TYPE_DEBUG,
3652        "Going to execute next round maliciously type %" PRIu32 ".\n",
3653       mal_type);
3654   do_round_task = NULL;
3655   GNUNET_assert (mal_type <= 3);
3656   /* Do malicious actions */
3657   if (1 == mal_type)
3658   { /* Try to maximise representation */
3659
3660     /* The maximum of pushes we're going to send this round */
3661     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3662                                          num_attacked_peers),
3663                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3664
3665     LOG (GNUNET_ERROR_TYPE_DEBUG,
3666          "Going to send %" PRIu32 " pushes\n",
3667          num_pushes);
3668
3669     /* Send PUSHes to attacked peers */
3670     for (i = 0 ; i < num_pushes ; i++)
3671     {
3672       if (att_peers_tail == att_peer_index)
3673         att_peer_index = att_peers_head;
3674       else
3675         att_peer_index = att_peer_index->next;
3676
3677       send_push (&att_peer_index->peer_id);
3678     }
3679
3680     /* Send PULLs to some peers to learn about additional peers to attack */
3681     tmp_att_peer = att_peer_index;
3682     for (i = 0 ; i < num_pushes * alpha ; i++)
3683     {
3684       if (att_peers_tail == tmp_att_peer)
3685         tmp_att_peer = att_peers_head;
3686       else
3687         att_peer_index = tmp_att_peer->next;
3688
3689       send_pull_request (&tmp_att_peer->peer_id);
3690     }
3691   }
3692
3693
3694   else if (2 == mal_type)
3695   { /**
3696      * Try to partition the network
3697      * Send as many pushes to the attacked peer as possible
3698      * That is one push per round as it will ignore more.
3699      */
3700     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3701     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3702       send_push (&attacked_peer);
3703   }
3704
3705
3706   if (3 == mal_type)
3707   { /* Combined attack */
3708
3709     /* Send PUSH to attacked peers */
3710     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3711     {
3712       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3713       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3714       {
3715         LOG (GNUNET_ERROR_TYPE_DEBUG,
3716             "Goding to send push to attacked peer (%s)\n",
3717             GNUNET_i2s (&attacked_peer));
3718         send_push (&attacked_peer);
3719       }
3720     }
3721     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3722
3723     /* The maximum of pushes we're going to send this round */
3724     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3725                                          num_attacked_peers),
3726                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3727
3728     LOG (GNUNET_ERROR_TYPE_DEBUG,
3729          "Going to send %" PRIu32 " pushes\n",
3730          num_pushes);
3731
3732     for (i = 0; i < num_pushes; i++)
3733     {
3734       if (att_peers_tail == att_peer_index)
3735         att_peer_index = att_peers_head;
3736       else
3737         att_peer_index = att_peer_index->next;
3738
3739       send_push (&att_peer_index->peer_id);
3740     }
3741
3742     /* Send PULLs to some peers to learn about additional peers to attack */
3743     tmp_att_peer = att_peer_index;
3744     for (i = 0; i < num_pushes * alpha; i++)
3745     {
3746       if (att_peers_tail == tmp_att_peer)
3747         tmp_att_peer = att_peers_head;
3748       else
3749         att_peer_index = tmp_att_peer->next;
3750
3751       send_pull_request (&tmp_att_peer->peer_id);
3752     }
3753   }
3754
3755   /* Schedule next round */
3756   time_next_round = compute_rand_delay (round_interval, 2);
3757
3758   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3759   //NULL);
3760   GNUNET_assert (NULL == do_round_task);
3761   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3762                                                 &do_mal_round, NULL);
3763   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3764 }
3765 #endif /* ENABLE_MALICIOUS */
3766
3767 /**
3768  * Send out PUSHes and PULLs, possibly update #view, samplers.
3769  *
3770  * This is executed regylary.
3771  */
3772 static void
3773 do_round (void *cls)
3774 {
3775   uint32_t i;
3776   const struct GNUNET_PeerIdentity *view_array;
3777   unsigned int *permut;
3778   unsigned int a_peers; /* Number of peers we send pushes to */
3779   unsigned int b_peers; /* Number of peers we send pull requests to */
3780   uint32_t first_border;
3781   uint32_t second_border;
3782   struct GNUNET_PeerIdentity peer;
3783   struct GNUNET_PeerIdentity *update_peer;
3784
3785   LOG (GNUNET_ERROR_TYPE_DEBUG,
3786        "Going to execute next round.\n");
3787   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3788   do_round_task = NULL;
3789   LOG (GNUNET_ERROR_TYPE_DEBUG,
3790        "Printing view:\n");
3791   to_file (file_name_view_log,
3792            "___ new round ___");
3793   view_array = View_get_as_array ();
3794   for (i = 0; i < View_size (); i++)
3795   {
3796     LOG (GNUNET_ERROR_TYPE_DEBUG,
3797          "\t%s\n", GNUNET_i2s (&view_array[i]));
3798     to_file (file_name_view_log,
3799              "=%s\t(do round)",
3800              GNUNET_i2s_full (&view_array[i]));
3801   }
3802
3803
3804   /* Send pushes and pull requests */
3805   if (0 < View_size ())
3806   {
3807     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3808                                            View_size ());
3809
3810     /* Send PUSHes */
3811     a_peers = ceil (alpha * View_size ());
3812
3813     LOG (GNUNET_ERROR_TYPE_DEBUG,
3814          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3815          a_peers, alpha, View_size ());
3816     for (i = 0; i < a_peers; i++)
3817     {
3818       peer = view_array[permut[i]];
3819       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3820       { // FIXME if this fails schedule/loop this for later
3821         send_push (&peer);
3822       }
3823     }
3824
3825     /* Send PULL requests */
3826     b_peers = ceil (beta * View_size ());
3827     first_border = a_peers;
3828     second_border = a_peers + b_peers;
3829     if (second_border > View_size ())
3830     {
3831       first_border = View_size () - b_peers;
3832       second_border = View_size ();
3833     }
3834     LOG (GNUNET_ERROR_TYPE_DEBUG,
3835         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3836         b_peers, beta, View_size ());
3837     for (i = first_border; i < second_border; i++)
3838     {
3839       peer = view_array[permut[i]];
3840       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3841           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3842       { // FIXME if this fails schedule/loop this for later
3843         send_pull_request (&peer);
3844       }
3845     }
3846
3847     GNUNET_free (permut);
3848     permut = NULL;
3849   }
3850
3851
3852   /* Update view */
3853   /* TODO see how many peers are in push-/pull- list! */
3854
3855   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3856       (0 < CustomPeerMap_size (push_map)) &&
3857       (0 < CustomPeerMap_size (pull_map)))
3858   //if (GNUNET_YES) // disable blocking temporarily
3859   { /* If conditions for update are fulfilled, update */
3860     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3861
3862     uint32_t final_size;
3863     uint32_t peers_to_clean_size;
3864     struct GNUNET_PeerIdentity *peers_to_clean;
3865
3866     peers_to_clean = NULL;
3867     peers_to_clean_size = 0;
3868     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3869     GNUNET_memcpy (peers_to_clean,
3870             view_array,
3871             View_size () * sizeof (struct GNUNET_PeerIdentity));
3872
3873     /* Seems like recreating is the easiest way of emptying the peermap */
3874     View_clear ();
3875     to_file (file_name_view_log,
3876              "--- emptied ---");
3877
3878     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3879                                 CustomPeerMap_size (push_map));
3880     second_border = first_border +
3881                     GNUNET_MIN (floor (beta  * view_size_est_need),
3882                                 CustomPeerMap_size (pull_map));
3883     final_size    = second_border +
3884       ceil ((1 - (alpha + beta)) * view_size_est_need);
3885     LOG (GNUNET_ERROR_TYPE_DEBUG,
3886         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3887         first_border,
3888         second_border,
3889         final_size);
3890
3891     /* Update view with peers received through PUSHes */
3892     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3893                                            CustomPeerMap_size (push_map));
3894     for (i = 0; i < first_border; i++)
3895     {
3896       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3897                                                               permut[i]));
3898       to_file (file_name_view_log,
3899                "+%s\t(push list)",
3900                GNUNET_i2s_full (&view_array[i]));
3901       // TODO change the peer_flags accordingly
3902     }
3903     GNUNET_free (permut);
3904     permut = NULL;
3905
3906     /* Update view with peers received through PULLs */
3907     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3908                                            CustomPeerMap_size (pull_map));
3909     for (i = first_border; i < second_border; i++)
3910     {
3911       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3912             permut[i - first_border]));
3913       to_file (file_name_view_log,
3914                "+%s\t(pull list)",
3915                GNUNET_i2s_full (&view_array[i]));
3916       // TODO change the peer_flags accordingly
3917     }
3918     GNUNET_free (permut);
3919     permut = NULL;
3920
3921     /* Update view with peers from history */
3922     RPS_sampler_get_n_rand_peers (prot_sampler,
3923                                   hist_update,
3924                                   NULL,
3925                                   final_size - second_border);
3926     // TODO change the peer_flags accordingly
3927
3928     for (i = 0; i < View_size (); i++)
3929       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3930
3931     /* Clean peers that were removed from the view */
3932     for (i = 0; i < peers_to_clean_size; i++)
3933     {
3934       to_file (file_name_view_log,
3935                "-%s",
3936                GNUNET_i2s_full (&peers_to_clean[i]));
3937       clean_peer (&peers_to_clean[i]);
3938       //peer_destroy_channel_send (sender);
3939     }
3940
3941     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3942     clients_notify_view_update();
3943   } else {
3944     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3945     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3946     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3947         !(0 >= CustomPeerMap_size (pull_map)))
3948       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3949     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3950         (0 >= CustomPeerMap_size (pull_map)))
3951       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3952     if (0 >= CustomPeerMap_size (push_map) &&
3953         !(0 >= CustomPeerMap_size (pull_map)))
3954       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3955     if (0 >= CustomPeerMap_size (push_map) &&
3956         (0 >= CustomPeerMap_size (pull_map)))
3957       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3958     if (0 >= CustomPeerMap_size (pull_map) &&
3959         CustomPeerMap_size (push_map) > alpha * View_size () &&
3960         0 >= CustomPeerMap_size (push_map))
3961       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3962   }
3963   // TODO independent of that also get some peers from CADET_get_peers()?
3964   GNUNET_STATISTICS_set (stats,
3965       "# peers in push map at end of round",
3966       CustomPeerMap_size (push_map),
3967       GNUNET_NO);
3968   GNUNET_STATISTICS_set (stats,
3969       "# peers in pull map at end of round",
3970       CustomPeerMap_size (pull_map),
3971       GNUNET_NO);
3972   GNUNET_STATISTICS_set (stats,
3973       "# peers in view at end of round",
3974       View_size (),
3975       GNUNET_NO);
3976
3977   LOG (GNUNET_ERROR_TYPE_DEBUG,
3978        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3979        CustomPeerMap_size (push_map),
3980        CustomPeerMap_size (pull_map),
3981        alpha,
3982        View_size (),
3983        alpha * View_size ());
3984
3985   /* Update samplers */
3986   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3987   {
3988     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3989     LOG (GNUNET_ERROR_TYPE_DEBUG,
3990          "Updating with peer %s from push list\n",
3991          GNUNET_i2s (update_peer));
3992     insert_in_sampler (NULL, update_peer);
3993     clean_peer (update_peer); /* This cleans only if it is not in the view */
3994     //peer_destroy_channel_send (sender);
3995   }
3996
3997   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3998   {
3999     LOG (GNUNET_ERROR_TYPE_DEBUG,
4000          "Updating with peer %s from pull list\n",
4001          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
4002     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
4003     /* This cleans only if it is not in the view */
4004     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
4005     //peer_destroy_channel_send (sender);
4006   }
4007
4008
4009   /* Empty push/pull lists */
4010   CustomPeerMap_clear (push_map);
4011   CustomPeerMap_clear (pull_map);
4012
4013   struct GNUNET_TIME_Relative time_next_round;
4014
4015   time_next_round = compute_rand_delay (round_interval, 2);
4016
4017   /* Schedule next round */
4018   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4019                                                 &do_round, NULL);
4020   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4021 }
4022
4023
4024 /**
4025  * This is called from GNUNET_CADET_get_peers().
4026  *
4027  * It is called on every peer(ID) that cadet somehow has contact with.
4028  * We use those to initialise the sampler.
4029  */
4030 void
4031 init_peer_cb (void *cls,
4032               const struct GNUNET_PeerIdentity *peer,
4033               int tunnel, // "Do we have a tunnel towards this peer?"
4034               unsigned int n_paths, // "Number of known paths towards this peer"
4035               unsigned int best_path) // "How long is the best path?
4036                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4037 {
4038   if (NULL != peer)
4039   {
4040     LOG (GNUNET_ERROR_TYPE_DEBUG,
4041          "Got peer_id %s from cadet\n",
4042          GNUNET_i2s (peer));
4043     got_peer (peer);
4044   }
4045 }
4046
4047 /**
4048  * @brief Iterator function over stored, valid peers.
4049  *
4050  * We initialise the sampler with those.
4051  *
4052  * @param cls the closure
4053  * @param peer the peer id
4054  * @return #GNUNET_YES if we should continue to
4055  *         iterate,
4056  *         #GNUNET_NO if not.
4057  */
4058 static int
4059 valid_peers_iterator (void *cls,
4060                       const struct GNUNET_PeerIdentity *peer)
4061 {
4062   if (NULL != peer)
4063   {
4064     LOG (GNUNET_ERROR_TYPE_DEBUG,
4065          "Got stored, valid peer %s\n",
4066          GNUNET_i2s (peer));
4067     got_peer (peer);
4068   }
4069   return GNUNET_YES;
4070 }
4071
4072
4073 /**
4074  * Iterator over peers from peerinfo.
4075  *
4076  * @param cls closure
4077  * @param peer id of the peer, NULL for last call
4078  * @param hello hello message for the peer (can be NULL)
4079  * @param error message
4080  */
4081 void
4082 process_peerinfo_peers (void *cls,
4083                         const struct GNUNET_PeerIdentity *peer,
4084                         const struct GNUNET_HELLO_Message *hello,
4085                         const char *err_msg)
4086 {
4087   if (NULL != peer)
4088   {
4089     LOG (GNUNET_ERROR_TYPE_DEBUG,
4090          "Got peer_id %s from peerinfo\n",
4091          GNUNET_i2s (peer));
4092     got_peer (peer);
4093   }
4094 }
4095
4096
4097 /**
4098  * Task run during shutdown.
4099  *
4100  * @param cls unused
4101  */
4102 static void
4103 shutdown_task (void *cls)
4104 {
4105   struct ClientContext *client_ctx;
4106   struct ReplyCls *reply_cls;
4107
4108   LOG (GNUNET_ERROR_TYPE_DEBUG,
4109        "RPS is going down\n");
4110
4111   /* Clean all clients */
4112   for (client_ctx = cli_ctx_head;
4113        NULL != cli_ctx_head;
4114        client_ctx = cli_ctx_head)
4115   {
4116     /* Clean pending requests to the sampler */
4117     for (reply_cls = client_ctx->rep_cls_head;
4118          NULL != client_ctx->rep_cls_head;
4119          reply_cls = client_ctx->rep_cls_head)
4120     {
4121       RPS_sampler_request_cancel (reply_cls->req_handle);
4122       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4123                                    client_ctx->rep_cls_tail,
4124                                    reply_cls);
4125       GNUNET_free (reply_cls);
4126     }
4127     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4128     GNUNET_free (client_ctx);
4129   }
4130   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4131   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4132
4133   if (NULL != do_round_task)
4134   {
4135     GNUNET_SCHEDULER_cancel (do_round_task);
4136     do_round_task = NULL;
4137   }
4138
4139   Peers_terminate ();
4140
4141   GNUNET_NSE_disconnect (nse);
4142   RPS_sampler_destroy (prot_sampler);
4143   RPS_sampler_destroy (client_sampler);
4144   GNUNET_CADET_close_port (cadet_port);
4145   GNUNET_CADET_disconnect (cadet_handle);
4146   View_destroy ();
4147   CustomPeerMap_destroy (push_map);
4148   CustomPeerMap_destroy (pull_map);
4149   if (NULL != stats)
4150   {
4151     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4152     stats = NULL;
4153   }
4154   #ifdef ENABLE_MALICIOUS
4155   struct AttackedPeer *tmp_att_peer;
4156   /* it is ok to free this const during shutdown: */
4157   GNUNET_free ((char *) file_name_view_log);
4158   #ifdef TO_FILE
4159   GNUNET_free ((char *) file_name_observed_log);
4160   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
4161   #endif /* TO_FILE */
4162   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4163   if (NULL != mal_peer_set)
4164     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4165   if (NULL != att_peer_set)
4166     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4167   while (NULL != att_peers_head)
4168   {
4169     tmp_att_peer = att_peers_head;
4170     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4171   }
4172   #endif /* ENABLE_MALICIOUS */
4173 }
4174
4175
4176 /**
4177  * Handle client connecting to the service.
4178  *
4179  * @param cls NULL
4180  * @param client the new client
4181  * @param mq the message queue of @a client
4182  * @return @a client
4183  */
4184 static void *
4185 client_connect_cb (void *cls,
4186                    struct GNUNET_SERVICE_Client *client,
4187                    struct GNUNET_MQ_Handle *mq)
4188 {
4189   struct ClientContext *cli_ctx;
4190
4191   LOG (GNUNET_ERROR_TYPE_DEBUG,
4192        "Client connected\n");
4193   if (NULL == client)
4194     return client; /* Server was destroyed before a client connected. Shutting down */
4195   cli_ctx = GNUNET_new (struct ClientContext);
4196   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4197   cli_ctx->view_updates_left = -1;
4198   cli_ctx->client = client;
4199   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4200                                cli_ctx_tail,
4201                                cli_ctx);
4202   return cli_ctx;
4203 }
4204
4205 /**
4206  * Callback called when a client disconnected from the service
4207  *
4208  * @param cls closure for the service
4209  * @param c the client that disconnected
4210  * @param internal_cls should be equal to @a c
4211  */
4212 static void
4213 client_disconnect_cb (void *cls,
4214                       struct GNUNET_SERVICE_Client *client,
4215                       void *internal_cls)
4216 {
4217   struct ClientContext *cli_ctx = internal_cls;
4218
4219   GNUNET_assert (client == cli_ctx->client);
4220   if (NULL == client)
4221   {/* shutdown task - destroy all clients */
4222     while (NULL != cli_ctx_head)
4223       destroy_cli_ctx (cli_ctx_head);
4224   }
4225   else
4226   { /* destroy this client */
4227     LOG (GNUNET_ERROR_TYPE_DEBUG,
4228         "Client disconnected. Destroy its context.\n");
4229     destroy_cli_ctx (cli_ctx);
4230   }
4231 }
4232
4233
4234 /**
4235  * Handle random peer sampling clients.
4236  *
4237  * @param cls closure
4238  * @param c configuration to use
4239  * @param service the initialized service
4240  */
4241 static void
4242 run (void *cls,
4243      const struct GNUNET_CONFIGURATION_Handle *c,
4244      struct GNUNET_SERVICE_Handle *service)
4245 {
4246   char* fn_valid_peers;
4247   struct GNUNET_HashCode port;
4248
4249   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4250   cfg = c;
4251
4252
4253   /* Get own ID */
4254   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4255   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4256               "STARTING SERVICE (rps) for peer [%s]\n",
4257               GNUNET_i2s (&own_identity));
4258   #ifdef ENABLE_MALICIOUS
4259   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4260               "Malicious execution compiled in.\n");
4261   #endif /* ENABLE_MALICIOUS */
4262
4263
4264
4265   /* Get time interval from the configuration */
4266   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4267                                                         "ROUNDINTERVAL",
4268                                                         &round_interval))
4269   {
4270     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4271                                "RPS", "ROUNDINTERVAL");
4272     GNUNET_SCHEDULER_shutdown ();
4273     return;
4274   }
4275
4276   /* Get initial size of sampler/view from the configuration */
4277   if (GNUNET_OK !=
4278       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4279         (long long unsigned int *) &sampler_size_est_min))
4280   {
4281     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4282                                "RPS", "MINSIZE");
4283     GNUNET_SCHEDULER_shutdown ();
4284     return;
4285   }
4286   sampler_size_est_need = sampler_size_est_min;
4287   view_size_est_min = sampler_size_est_min;
4288   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4289
4290   if (GNUNET_OK !=
4291       GNUNET_CONFIGURATION_get_value_filename (cfg,
4292                                                "rps",
4293                                                "FILENAME_VALID_PEERS",
4294                                                &fn_valid_peers))
4295   {
4296     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4297                                "rps", "FILENAME_VALID_PEERS");
4298   }
4299
4300
4301   View_create (view_size_est_min);
4302
4303   /* file_name_view_log */
4304   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4305   #ifdef TO_FILE
4306   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4307   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4308   #endif /* TO_FILE */
4309
4310   /* connect to NSE */
4311   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4312
4313
4314   alpha = 0.45;
4315   beta  = 0.45;
4316
4317
4318   /* Initialise cadet */
4319   /* There exists a copy-paste-clone in get_channel() */
4320   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4321     GNUNET_MQ_hd_fixed_size (peer_check,
4322                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4323                              struct GNUNET_MessageHeader,
4324                              NULL),
4325     GNUNET_MQ_hd_fixed_size (peer_push,
4326                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4327                              struct GNUNET_MessageHeader,
4328                              NULL),
4329     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4330                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4331                              struct GNUNET_MessageHeader,
4332                              NULL),
4333     GNUNET_MQ_hd_var_size (peer_pull_reply,
4334                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4335                            struct GNUNET_RPS_P2P_PullReplyMessage,
4336                            NULL),
4337     GNUNET_MQ_handler_end ()
4338   };
4339
4340   cadet_handle = GNUNET_CADET_connect (cfg);
4341   GNUNET_assert (NULL != cadet_handle);
4342   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4343                       strlen (GNUNET_APPLICATION_PORT_RPS),
4344                       &port);
4345   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4346                                        &port,
4347                                        &Peers_handle_inbound_channel, /* Connect handler */
4348                                        NULL, /* cls */
4349                                        NULL, /* WindowSize handler */
4350                                        cleanup_destroyed_channel, /* Disconnect handler */
4351                                        cadet_handlers);
4352
4353
4354   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4355   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4356   GNUNET_free (fn_valid_peers);
4357
4358   /* Initialise sampler */
4359   struct GNUNET_TIME_Relative half_round_interval;
4360   struct GNUNET_TIME_Relative  max_round_interval;
4361
4362   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4363   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4364
4365   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4366   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4367
4368   /* Initialise push and pull maps */
4369   push_map = CustomPeerMap_create (4);
4370   pull_map = CustomPeerMap_create (4);
4371
4372
4373   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4374   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4375   // TODO send push/pull to each of those peers?
4376   // TODO read stored valid peers from last run
4377   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4378   Peers_get_valid_peers (valid_peers_iterator, NULL);
4379
4380   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4381                                                    GNUNET_NO,
4382                                                    process_peerinfo_peers,
4383                                                    NULL);
4384
4385   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4386
4387   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4388   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4389
4390   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4391   stats = GNUNET_STATISTICS_create ("rps", cfg);
4392
4393 }
4394
4395
4396 /**
4397  * Define "main" method using service macro.
4398  */
4399 GNUNET_SERVICE_MAIN
4400 ("rps",
4401  GNUNET_SERVICE_OPTION_NONE,
4402  &run,
4403  &client_connect_cb,
4404  &client_disconnect_cb,
4405  NULL,
4406  GNUNET_MQ_hd_fixed_size (client_request,
4407    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4408    struct GNUNET_RPS_CS_RequestMessage,
4409    NULL),
4410  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4411    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4412    struct GNUNET_RPS_CS_RequestCancelMessage,
4413    NULL),
4414  GNUNET_MQ_hd_var_size (client_seed,
4415    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4416    struct GNUNET_RPS_CS_SeedMessage,
4417    NULL),
4418 #ifdef ENABLE_MALICIOUS
4419  GNUNET_MQ_hd_var_size (client_act_malicious,
4420    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4421    struct GNUNET_RPS_CS_ActMaliciousMessage,
4422    NULL),
4423 #endif /* ENABLE_MALICIOUS */
4424  GNUNET_MQ_hd_fixed_size (client_view_request,
4425    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4426    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4427    NULL),
4428  GNUNET_MQ_handler_end());
4429
4430 /* end of gnunet-service-rps.c */