restructure rps service: start keeping track of channels
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO store peers somewhere persistent
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /**
57  * Our configuration.
58  */
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62  * Handle to the statistics service.
63  */
64 static struct GNUNET_STATISTICS_Handle *stats;
65
66 /**
67  * Our own identity.
68  */
69 static struct GNUNET_PeerIdentity own_identity;
70
71
72 /**
73  * @brief Port used for cadet.
74  *
75  * Don't compute multiple times through making it global
76  */
77 static struct GNUNET_HashCode port;
78
79 /***********************************************************************
80  * Old gnunet-service-rps_peers.c
81 ***********************************************************************/
82
83 /**
84  * Set a peer flag of given peer context.
85  */
86 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
87
88 /**
89  * Get peer flag of given peer context.
90  */
91 #define check_peer_flag_set(peer_ctx, mask)\
92   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
93
94 /**
95  * Unset flag of given peer context.
96  */
97 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
98
99 /**
100  * Set a channel flag of given channel context.
101  */
102 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
103
104 /**
105  * Get channel flag of given channel context.
106  */
107 #define check_channel_flag_set(channel_flags, mask)\
108   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
109
110 /**
111  * Unset flag of given channel context.
112  */
113 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
114
115
116
117 /**
118  * Pending operation on peer consisting of callback and closure
119  *
120  * When an operation cannot be executed right now this struct is used to store
121  * the callback and closure for later execution.
122  */
123 struct PeerPendingOp
124 {
125   /**
126    * Callback
127    */
128   PeerOp op;
129
130   /**
131    * Closure
132    */
133   void *op_cls;
134 };
135
136 /**
137  * List containing all messages that are yet to be send
138  *
139  * This is used to keep track of all messages that have not been sent yet. When
140  * a peer is to be removed the pending messages can be removed properly.
141  */
142 struct PendingMessage
143 {
144   /**
145    * DLL next, prev
146    */
147   struct PendingMessage *next;
148   struct PendingMessage *prev;
149
150   /**
151    * The envelope to the corresponding message
152    */
153   struct GNUNET_MQ_Envelope *ev;
154
155   /**
156    * The corresponding context
157    */
158   struct PeerContext *peer_ctx;
159
160   /**
161    * The message type
162    */
163   const char *type;
164 };
165
166 /**
167  * Struct used to keep track of other peer's status
168  *
169  * This is stored in a multipeermap.
170  * It contains information such as cadet channels, a message queue for sending,
171  * status about the channels, the pending operations on this peer and some flags
172  * about the status of the peer itself. (live, valid, ...)
173  */
174 struct PeerContext
175 {
176   /**
177    * Message queue open to client
178    */
179   struct GNUNET_MQ_Handle *mq;
180
181   /**
182    * Channel open to client.
183    */
184   struct GNUNET_CADET_Channel *send_channel;
185
186   /**
187    * Flags to the sending channel
188    */
189   uint32_t *send_channel_flags;
190
191   /**
192    * Channel open from client.
193    */
194   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
195
196   /**
197    * Flags to the receiving channel
198    */
199   uint32_t *recv_channel_flags;
200
201   /**
202    * Array of pending operations on this peer.
203    */
204   struct PeerPendingOp *pending_ops;
205
206   /**
207    * Handle to the callback given to cadet_ntfy_tmt_rdy()
208    *
209    * To be canceled on shutdown.
210    */
211   struct PendingMessage *liveliness_check_pending;
212
213   /**
214    * Number of pending operations.
215    */
216   unsigned int num_pending_ops;
217
218   /**
219    * Identity of the peer
220    */
221   struct GNUNET_PeerIdentity peer_id;
222
223   /**
224    * Flags indicating status of peer
225    */
226   uint32_t peer_flags;
227
228   /**
229    * Last time we received something from that peer.
230    */
231   struct GNUNET_TIME_Absolute last_message_recv;
232
233   /**
234    * Last time we received a keepalive message.
235    */
236   struct GNUNET_TIME_Absolute last_keepalive;
237
238   /**
239    * DLL with all messages that are yet to be sent
240    */
241   struct PendingMessage *pending_messages_head;
242   struct PendingMessage *pending_messages_tail;
243
244   /**
245    * This is pobably followed by 'statistical' data (when we first saw
246    * it, how did we get its ID, how many pushes (in a timeinterval),
247    * ...)
248    */
249 };
250
251 /**
252  * @brief Closure to #valid_peer_iterator
253  */
254 struct PeersIteratorCls
255 {
256   /**
257    * Iterator function
258    */
259   PeersIterator iterator;
260
261   /**
262    * Closure to iterator
263    */
264   void *cls;
265 };
266
267 /**
268  * @brief Context for a channel
269  */
270 struct ChannelCtx
271 {
272   /**
273    * @brief Meant to be used in a DLL
274    */
275   struct ChannelCtx *next;
276   struct ChannelCtx *prev;
277
278   /**
279    * @brief The channel itself
280    */
281   struct GNUNET_CADET_Channel *channel;
282
283   /**
284    * @brief The peer context associated with the channel
285    */
286   struct PeerContext *peer_ctx;
287 };
288
289 /**
290  * @brief The DLL of channel contexts
291  */
292 static struct ChannelCtx *channel_ctx_head;
293 static struct ChannelCtx *channel_ctx_tail;
294
295 /**
296  * @brief Hashmap of valid peers.
297  */
298 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
299
300 /**
301  * @brief Maximum number of valid peers to keep.
302  * TODO read from config
303  */
304 static uint32_t num_valid_peers_max = UINT32_MAX;
305
306 /**
307  * @brief Filename of the file that stores the valid peers persistently.
308  */
309 static char *filename_valid_peers;
310
311 /**
312  * Set of all peers to keep track of them.
313  */
314 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
315
316 /**
317  * Cadet handle.
318  */
319 static struct GNUNET_CADET_Handle *cadet_handle;
320
321
322
323 /**
324  * @brief Get the #PeerContext associated with a peer
325  *
326  * @param peer the peer id
327  *
328  * @return the #PeerContext
329  */
330 static struct PeerContext *
331 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
332 {
333   struct PeerContext *ctx;
334   int ret;
335
336   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
337   GNUNET_assert (GNUNET_YES == ret);
338   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
339   GNUNET_assert (NULL != ctx);
340   return ctx;
341 }
342
343 int
344 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
345
346 /**
347  * @brief Create a new #PeerContext and insert it into the peer map
348  *
349  * @param peer the peer to create the #PeerContext for
350  *
351  * @return the #PeerContext
352  */
353 static struct PeerContext *
354 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
355 {
356   struct PeerContext *ctx;
357   int ret;
358
359   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
360
361   ctx = GNUNET_new (struct PeerContext);
362   ctx->peer_id = *peer;
363   ctx->send_channel_flags = GNUNET_new (uint32_t);
364   ctx->recv_channel_flags = GNUNET_new (uint32_t);
365   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
366       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
367   GNUNET_assert (GNUNET_OK == ret);
368   return ctx;
369 }
370
371
372 /**
373  * @brief Create or get a #PeerContext
374  *
375  * @param peer the peer to get the associated context to
376  *
377  * @return the context
378  */
379 static struct PeerContext *
380 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
381 {
382   if (GNUNET_NO == Peers_check_peer_known (peer))
383   {
384     return create_peer_ctx (peer);
385   }
386   return get_peer_ctx (peer);
387 }
388
389 void
390 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
391
392 void
393 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
394
395 /**
396  * @brief Check whether we have a connection to this @a peer
397  *
398  * Also sets the #Peers_ONLINE flag accordingly
399  *
400  * @param peer the peer in question
401  *
402  * @return #GNUNET_YES if we are connected
403  *         #GNUNET_NO  otherwise
404  */
405 int
406 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
407 {
408   const struct PeerContext *peer_ctx;
409
410   /* If we don't know about this peer we don't know whether it's online */
411   if (GNUNET_NO == Peers_check_peer_known (peer))
412   {
413     return GNUNET_NO;
414   }
415   /* Get the context */
416   peer_ctx = get_peer_ctx (peer);
417   /* If we have no channel to this peer we don't know whether it's online */
418   if ( (NULL == peer_ctx->send_channel) &&
419        (NULL == peer_ctx->recv_channel) )
420   {
421     Peers_unset_peer_flag (peer, Peers_ONLINE);
422     return GNUNET_NO;
423   }
424   /* Otherwise (if we have a channel, we know that it's online */
425   Peers_set_peer_flag (peer, Peers_ONLINE);
426   return GNUNET_YES;
427 }
428
429
430 /**
431  * @brief The closure to #get_rand_peer_iterator.
432  */
433 struct GetRandPeerIteratorCls
434 {
435   /**
436    * @brief The index of the peer to return.
437    * Will be decreased until 0.
438    * Then current peer is returned.
439    */
440   uint32_t index;
441
442   /**
443    * @brief Pointer to peer to return.
444    */
445   const struct GNUNET_PeerIdentity *peer;
446 };
447
448
449 /**
450  * @brief Iterator function for #get_random_peer_from_peermap.
451  *
452  * Implements #GNUNET_CONTAINER_PeerMapIterator.
453  * Decreases the index until the index is null.
454  * Then returns the current peer.
455  *
456  * @param cls the #GetRandPeerIteratorCls containing index and peer
457  * @param peer current peer
458  * @param value unused
459  *
460  * @return  #GNUNET_YES if we should continue to
461  *          iterate,
462  *          #GNUNET_NO if not.
463  */
464 static int
465 get_rand_peer_iterator (void *cls,
466                         const struct GNUNET_PeerIdentity *peer,
467                         void *value)
468 {
469   struct GetRandPeerIteratorCls *iterator_cls = cls;
470   if (0 >= iterator_cls->index)
471   {
472     iterator_cls->peer = peer;
473     return GNUNET_NO;
474   }
475   iterator_cls->index--;
476   return GNUNET_YES;
477 }
478
479
480 /**
481  * @brief Get a random peer from @a peer_map
482  *
483  * @param peer_map the peer_map to get the peer from
484  *
485  * @return a random peer
486  */
487 static const struct GNUNET_PeerIdentity *
488 get_random_peer_from_peermap (const struct
489                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
490 {
491   struct GetRandPeerIteratorCls *iterator_cls;
492   const struct GNUNET_PeerIdentity *ret;
493
494   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
495   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
496       GNUNET_CONTAINER_multipeermap_size (peer_map));
497   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
498                                                 get_rand_peer_iterator,
499                                                 iterator_cls);
500   ret = iterator_cls->peer;
501   GNUNET_free (iterator_cls);
502   return ret;
503 }
504
505
506 /**
507  * @brief Add a given @a peer to valid peers.
508  *
509  * If valid peers are already #num_valid_peers_max, delete a peer previously.
510  *
511  * @param peer the peer that is added to the valid peers.
512  *
513  * @return #GNUNET_YES if no other peer had to be removed
514  *         #GNUNET_NO  otherwise
515  */
516 static int
517 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
518 {
519   const struct GNUNET_PeerIdentity *rand_peer;
520   int ret;
521
522   ret = GNUNET_YES;
523   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
524   {
525     rand_peer = get_random_peer_from_peermap (valid_peers);
526     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
527     ret = GNUNET_NO;
528   }
529   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
530       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
531   return ret;
532 }
533
534 static void
535 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
536
537 /**
538  * @brief Set the peer flag to living and
539  *        call the pending operations on this peer.
540  *
541  * Also adds peer to #valid_peers.
542  *
543  * @param peer_ctx the #PeerContext of the peer to set live
544  */
545 static void
546 set_peer_live (struct PeerContext *peer_ctx)
547 {
548   struct GNUNET_PeerIdentity *peer;
549   unsigned int i;
550
551   peer = &peer_ctx->peer_id;
552   LOG (GNUNET_ERROR_TYPE_DEBUG,
553       "Peer %s is live and valid, calling %i pending operations on it\n",
554       GNUNET_i2s (peer),
555       peer_ctx->num_pending_ops);
556
557   if (NULL != peer_ctx->liveliness_check_pending)
558   {
559     LOG (GNUNET_ERROR_TYPE_DEBUG,
560          "Removing pending liveliness check for peer %s\n",
561          GNUNET_i2s (&peer_ctx->peer_id));
562     // TODO wait until cadet sets mq->cancel_impl
563     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
564     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
565     peer_ctx->liveliness_check_pending = NULL;
566   }
567
568   (void) add_valid_peer (peer);
569   set_peer_flag (peer_ctx, Peers_ONLINE);
570
571   /* Call pending operations */
572   for (i = 0; i < peer_ctx->num_pending_ops; i++)
573   {
574     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
575   }
576   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
577 }
578
579 static void
580 cleanup_destroyed_channel (void *cls,
581                            const struct GNUNET_CADET_Channel *channel);
582
583 /* Declaration of handlers */
584 static void
585 handle_peer_check (void *cls,
586                    const struct GNUNET_MessageHeader *msg);
587
588 static void
589 handle_peer_push (void *cls,
590                   const struct GNUNET_MessageHeader *msg);
591
592 static void
593 handle_peer_pull_request (void *cls,
594                           const struct GNUNET_MessageHeader *msg);
595
596 static int
597 check_peer_pull_reply (void *cls,
598                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
599
600 static void
601 handle_peer_pull_reply (void *cls,
602                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
603
604 /* End declaration of handlers */
605
606
607 /**
608  * @brief Get the channel of a peer. If not existing, create.
609  *
610  * @param peer the peer id
611  * @return the #GNUNET_CADET_Channel used to send data to @a peer
612  */
613 struct GNUNET_CADET_Channel *
614 get_channel (const struct GNUNET_PeerIdentity *peer)
615 {
616   struct PeerContext *peer_ctx;
617   struct GNUNET_PeerIdentity *ctx_peer;
618   /* There exists a copy-paste-clone in run() */
619   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
620     GNUNET_MQ_hd_fixed_size (peer_check,
621                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
622                              struct GNUNET_MessageHeader,
623                              NULL),
624     GNUNET_MQ_hd_fixed_size (peer_push,
625                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
626                              struct GNUNET_MessageHeader,
627                              NULL),
628     GNUNET_MQ_hd_fixed_size (peer_pull_request,
629                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
630                              struct GNUNET_MessageHeader,
631                              NULL),
632     GNUNET_MQ_hd_var_size (peer_pull_reply,
633                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
634                            struct GNUNET_RPS_P2P_PullReplyMessage,
635                            NULL),
636     GNUNET_MQ_handler_end ()
637   };
638
639
640   peer_ctx = get_peer_ctx (peer);
641   if (NULL == peer_ctx->send_channel)
642   {
643     LOG (GNUNET_ERROR_TYPE_DEBUG,
644          "Trying to establish channel to peer %s\n",
645          GNUNET_i2s (peer));
646     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
647     *ctx_peer = *peer;
648     peer_ctx->send_channel =
649       GNUNET_CADET_channel_create (cadet_handle,
650                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
651                                    peer,
652                                    &port,
653                                    GNUNET_CADET_OPTION_RELIABLE,
654                                    NULL, /* WindowSize handler */
655                                    cleanup_destroyed_channel, /* Disconnect handler */
656                                    cadet_handlers);
657   }
658   GNUNET_assert (NULL != peer_ctx->send_channel);
659   return peer_ctx->send_channel;
660 }
661
662
663 /**
664  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
665  *
666  * If we already have a message queue open to this client,
667  * simply return it, otherways create one.
668  *
669  * @param peer the peer to get the mq to
670  * @return the #GNUNET_MQ_Handle
671  */
672 static struct GNUNET_MQ_Handle *
673 get_mq (const struct GNUNET_PeerIdentity *peer)
674 {
675   struct PeerContext *peer_ctx;
676
677   peer_ctx = get_peer_ctx (peer);
678
679   if (NULL == peer_ctx->mq)
680   {
681     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
682   }
683   return peer_ctx->mq;
684 }
685
686 /**
687  * @brief Add an envelope to a message passed to mq to list of pending messages
688  *
689  * @param peer peer the message was sent to
690  * @param ev envelope to the message
691  * @param type type of the message to be sent
692  * @return pointer to pending message
693  */
694 static struct PendingMessage *
695 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
696                         struct GNUNET_MQ_Envelope *ev,
697                         const char *type)
698 {
699   struct PendingMessage *pending_msg;
700   struct PeerContext *peer_ctx;
701
702   peer_ctx = get_peer_ctx (peer);
703   pending_msg = GNUNET_new (struct PendingMessage);
704   pending_msg->ev = ev;
705   pending_msg->peer_ctx = peer_ctx;
706   pending_msg->type = type;
707   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
708                                peer_ctx->pending_messages_tail,
709                                pending_msg);
710   return pending_msg;
711 }
712
713
714 /**
715  * @brief Remove a pending message from the respective DLL
716  *
717  * @param pending_msg the pending message to remove
718  * @param cancel cancel the pending message, too
719  */
720 static void
721 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
722 {
723   struct PeerContext *peer_ctx;
724
725   peer_ctx = pending_msg->peer_ctx;
726   GNUNET_assert (NULL != peer_ctx);
727   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
728                                peer_ctx->pending_messages_tail,
729                                pending_msg);
730   // TODO wait for the cadet implementation of message cancellation
731   //if (GNUNET_YES == cancel)
732   //{
733   //  GNUNET_MQ_send_cancel (pending_msg->ev);
734   //}
735   GNUNET_free (pending_msg);
736 }
737
738
739 /**
740  * @brief This is called in response to the first message we sent as a
741  * liveliness check.
742  *
743  * @param cls #PeerContext of peer with pending liveliness check
744  */
745 static void
746 mq_liveliness_check_successful (void *cls)
747 {
748   struct PeerContext *peer_ctx = cls;
749
750   if (NULL != peer_ctx->liveliness_check_pending)
751   {
752     LOG (GNUNET_ERROR_TYPE_DEBUG,
753         "Liveliness check for peer %s was successfull\n",
754         GNUNET_i2s (&peer_ctx->peer_id));
755     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
756     peer_ctx->liveliness_check_pending = NULL;
757     set_peer_live (peer_ctx);
758   }
759 }
760
761 /**
762  * Issue a check whether peer is live
763  *
764  * @param peer_ctx the context of the peer
765  */
766 static void
767 check_peer_live (struct PeerContext *peer_ctx)
768 {
769   LOG (GNUNET_ERROR_TYPE_DEBUG,
770        "Get informed about peer %s getting live\n",
771        GNUNET_i2s (&peer_ctx->peer_id));
772
773   struct GNUNET_MQ_Handle *mq;
774   struct GNUNET_MQ_Envelope *ev;
775
776   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
777   peer_ctx->liveliness_check_pending =
778     insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
779   mq = get_mq (&peer_ctx->peer_id);
780   GNUNET_MQ_notify_sent (ev,
781                          mq_liveliness_check_successful,
782                          peer_ctx);
783   GNUNET_MQ_send (mq, ev);
784 }
785
786
787 /**
788  * @brief Check whether function of type #PeerOp was already scheduled
789  *
790  * The array with pending operations will probably never grow really big, so
791  * iterating over it should be ok.
792  *
793  * @param peer the peer to check
794  * @param peer_op the operation (#PeerOp) on the peer
795  *
796  * @return #GNUNET_YES if this operation is scheduled on that peer
797  *         #GNUNET_NO  otherwise
798  */
799 static int
800 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
801                            const PeerOp peer_op)
802 {
803   const struct PeerContext *peer_ctx;
804   unsigned int i;
805
806   peer_ctx = get_peer_ctx (peer);
807   for (i = 0; i < peer_ctx->num_pending_ops; i++)
808     if (peer_op == peer_ctx->pending_ops[i].op)
809       return GNUNET_YES;
810   return GNUNET_NO;
811 }
812
813 int
814 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
815
816 /**
817  * Iterator over hash map entries. Deletes all contexts of peers.
818  *
819  * @param cls closure
820  * @param key current public key
821  * @param value value in the hash map
822  * @return #GNUNET_YES if we should continue to iterate,
823  *         #GNUNET_NO if not.
824  */
825 static int
826 peermap_clear_iterator (void *cls,
827                         const struct GNUNET_PeerIdentity *key,
828                         void *value)
829 {
830   Peers_remove_peer (key);
831   return GNUNET_YES;
832 }
833
834
835 /**
836  * @brief This is called once a message is sent.
837  *
838  * Removes the pending message
839  *
840  * @param cls type of the message that was sent
841  */
842 static void
843 mq_notify_sent_cb (void *cls)
844 {
845   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
846   LOG (GNUNET_ERROR_TYPE_DEBUG,
847       "%s was sent.\n",
848       pending_msg->type);
849   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
850     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
851   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
852     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
853   if (0 == strncmp ("PUSH", pending_msg->type, 4))
854     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
855   /* Do not cancle message */
856   remove_pending_message (pending_msg, GNUNET_NO);
857 }
858
859
860 /**
861  * @brief Iterator function for #store_valid_peers.
862  *
863  * Implements #GNUNET_CONTAINER_PeerMapIterator.
864  * Writes single peer to disk.
865  *
866  * @param cls the file handle to write to.
867  * @param peer current peer
868  * @param value unused
869  *
870  * @return  #GNUNET_YES if we should continue to
871  *          iterate,
872  *          #GNUNET_NO if not.
873  */
874 static int
875 store_peer_presistently_iterator (void *cls,
876                                   const struct GNUNET_PeerIdentity *peer,
877                                   void *value)
878 {
879   const struct GNUNET_DISK_FileHandle *fh = cls;
880   char peer_string[128];
881   int size;
882   ssize_t ret;
883
884   if (NULL == peer)
885   {
886     return GNUNET_YES;
887   }
888   size = GNUNET_snprintf (peer_string,
889                           sizeof (peer_string),
890                           "%s\n",
891                           GNUNET_i2s_full (peer));
892   GNUNET_assert (53 == size);
893   ret = GNUNET_DISK_file_write (fh,
894                                 peer_string,
895                                 size);
896   GNUNET_assert (size == ret);
897   return GNUNET_YES;
898 }
899
900
901 /**
902  * @brief Store the peers currently in #valid_peers to disk.
903  */
904 static void
905 store_valid_peers ()
906 {
907   struct GNUNET_DISK_FileHandle *fh;
908   uint32_t number_written_peers;
909   int ret;
910
911   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
912   {
913     return;
914   }
915
916   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
917   if (GNUNET_SYSERR == ret)
918   {
919     LOG (GNUNET_ERROR_TYPE_WARNING,
920         "Not able to create directory for file `%s'\n",
921         filename_valid_peers);
922     GNUNET_break (0);
923   }
924   else if (GNUNET_NO == ret)
925   {
926     LOG (GNUNET_ERROR_TYPE_WARNING,
927         "Directory for file `%s' exists but is not writable for us\n",
928         filename_valid_peers);
929     GNUNET_break (0);
930   }
931   fh = GNUNET_DISK_file_open (filename_valid_peers,
932                               GNUNET_DISK_OPEN_WRITE |
933                                   GNUNET_DISK_OPEN_CREATE,
934                               GNUNET_DISK_PERM_USER_READ |
935                                   GNUNET_DISK_PERM_USER_WRITE);
936   if (NULL == fh)
937   {
938     LOG (GNUNET_ERROR_TYPE_WARNING,
939         "Not able to write valid peers to file `%s'\n",
940         filename_valid_peers);
941     return;
942   }
943   LOG (GNUNET_ERROR_TYPE_DEBUG,
944       "Writing %u valid peers to disk\n",
945       GNUNET_CONTAINER_multipeermap_size (valid_peers));
946   number_written_peers =
947     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
948                                            store_peer_presistently_iterator,
949                                            fh);
950   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
951   GNUNET_assert (number_written_peers ==
952       GNUNET_CONTAINER_multipeermap_size (valid_peers));
953 }
954
955
956 /**
957  * @brief Convert string representation of peer id to peer id.
958  *
959  * Counterpart to #GNUNET_i2s_full.
960  *
961  * @param string_repr The string representation of the peer id
962  *
963  * @return The peer id
964  */
965 static const struct GNUNET_PeerIdentity *
966 s2i_full (const char *string_repr)
967 {
968   struct GNUNET_PeerIdentity *peer;
969   size_t len;
970   int ret;
971
972   peer = GNUNET_new (struct GNUNET_PeerIdentity);
973   len = strlen (string_repr);
974   if (52 > len)
975   {
976     LOG (GNUNET_ERROR_TYPE_WARNING,
977         "Not able to convert string representation of PeerID to PeerID\n"
978         "Sting representation: %s (len %lu) - too short\n",
979         string_repr,
980         len);
981     GNUNET_break (0);
982   }
983   else if (52 < len)
984   {
985     len = 52;
986   }
987   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
988                                                     len,
989                                                     &peer->public_key);
990   if (GNUNET_OK != ret)
991   {
992     LOG (GNUNET_ERROR_TYPE_WARNING,
993         "Not able to convert string representation of PeerID to PeerID\n"
994         "Sting representation: %s\n",
995         string_repr);
996     GNUNET_break (0);
997   }
998   return peer;
999 }
1000
1001
1002 /**
1003  * @brief Restore the peers on disk to #valid_peers.
1004  */
1005 static void
1006 restore_valid_peers ()
1007 {
1008   off_t file_size;
1009   uint32_t num_peers;
1010   struct GNUNET_DISK_FileHandle *fh;
1011   char *buf;
1012   ssize_t size_read;
1013   char *iter_buf;
1014   char *str_repr;
1015   const struct GNUNET_PeerIdentity *peer;
1016
1017   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1018   {
1019     return;
1020   }
1021
1022   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1023   {
1024     return;
1025   }
1026   fh = GNUNET_DISK_file_open (filename_valid_peers,
1027                               GNUNET_DISK_OPEN_READ,
1028                               GNUNET_DISK_PERM_NONE);
1029   GNUNET_assert (NULL != fh);
1030   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1031   num_peers = file_size / 53;
1032   buf = GNUNET_malloc (file_size);
1033   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1034   GNUNET_assert (size_read == file_size);
1035   LOG (GNUNET_ERROR_TYPE_DEBUG,
1036       "Restoring %" PRIu32 " peers from file `%s'\n",
1037       num_peers,
1038       filename_valid_peers);
1039   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1040   {
1041     str_repr = GNUNET_strndup (iter_buf, 53);
1042     peer = s2i_full (str_repr);
1043     GNUNET_free (str_repr);
1044     add_valid_peer (peer);
1045     LOG (GNUNET_ERROR_TYPE_DEBUG,
1046         "Restored valid peer %s from disk\n",
1047         GNUNET_i2s_full (peer));
1048   }
1049   iter_buf = NULL;
1050   GNUNET_free (buf);
1051   LOG (GNUNET_ERROR_TYPE_DEBUG,
1052       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1053       num_peers,
1054       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1055   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1056   {
1057     LOG (GNUNET_ERROR_TYPE_WARNING,
1058         "Number of restored peers does not match file size. Have probably duplicates.\n");
1059   }
1060   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1061   LOG (GNUNET_ERROR_TYPE_DEBUG,
1062       "Restored %u valid peers from disk\n",
1063       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1064 }
1065
1066
1067 /**
1068  * @brief Initialise storage of peers
1069  *
1070  * @param fn_valid_peers filename of the file used to store valid peer ids
1071  * @param cadet_h cadet handle
1072  * @param own_id own peer identity
1073  */
1074 void
1075 Peers_initialise (char* fn_valid_peers,
1076                   struct GNUNET_CADET_Handle *cadet_h)
1077 {
1078   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1079   cadet_handle = cadet_h;
1080   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1081   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1082   restore_valid_peers ();
1083 }
1084
1085
1086 /**
1087  * @brief Delete storage of peers that was created with #Peers_initialise ()
1088  */
1089 void
1090 Peers_terminate ()
1091 {
1092   if (GNUNET_SYSERR ==
1093       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1094                                              peermap_clear_iterator,
1095                                              NULL))
1096   {
1097     LOG (GNUNET_ERROR_TYPE_WARNING,
1098         "Iteration destroying peers was aborted.\n");
1099   }
1100   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1101   peer_map = NULL;
1102   store_valid_peers ();
1103   GNUNET_free (filename_valid_peers);
1104   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1105 }
1106
1107
1108 /**
1109  * Iterator over #valid_peers hash map entries.
1110  *
1111  * @param cls closure - unused
1112  * @param peer current peer id
1113  * @param value value in the hash map - unused
1114  * @return #GNUNET_YES if we should continue to
1115  *         iterate,
1116  *         #GNUNET_NO if not.
1117  */
1118 static int
1119 valid_peer_iterator (void *cls,
1120                      const struct GNUNET_PeerIdentity *peer,
1121                      void *value)
1122 {
1123   struct PeersIteratorCls *it_cls = cls;
1124
1125   return it_cls->iterator (it_cls->cls,
1126                            peer);
1127 }
1128
1129
1130 /**
1131  * @brief Get all currently known, valid peer ids.
1132  *
1133  * @param it function to call on each peer id
1134  * @param it_cls extra argument to @a it
1135  * @return the number of key value pairs processed,
1136  *         #GNUNET_SYSERR if it aborted iteration
1137  */
1138 int
1139 Peers_get_valid_peers (PeersIterator iterator,
1140                        void *it_cls)
1141 {
1142   struct PeersIteratorCls *cls;
1143   int ret;
1144
1145   cls = GNUNET_new (struct PeersIteratorCls);
1146   cls->iterator = iterator;
1147   cls->cls = it_cls;
1148   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1149                                                valid_peer_iterator,
1150                                                cls);
1151   GNUNET_free (cls);
1152   return ret;
1153 }
1154
1155
1156 /**
1157  * @brief Add peer to known peers.
1158  *
1159  * This function is called on new peer_ids from 'external' sources
1160  * (client seed, cadet get_peers(), ...)
1161  *
1162  * @param peer the new #GNUNET_PeerIdentity
1163  *
1164  * @return #GNUNET_YES if peer was inserted
1165  *         #GNUNET_NO  otherwise
1166  */
1167 int
1168 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1169 {
1170   if (GNUNET_YES == Peers_check_peer_known (peer))
1171   {
1172     return GNUNET_NO; /* We already know this peer - nothing to do */
1173   }
1174   (void) create_peer_ctx (peer);
1175   return GNUNET_YES;
1176 }
1177
1178 int
1179 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1180
1181 /**
1182  * @brief Try connecting to a peer to see whether it is online
1183  *
1184  * If not known yet, insert into known peers
1185  *
1186  * @param peer the peer whose liveliness is to be checked
1187  * @return #GNUNET_YES if peer had to be inserted
1188  *         #GNUNET_NO  otherwise
1189  */
1190 int
1191 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1192 {
1193   struct PeerContext *peer_ctx;
1194   int ret;
1195
1196   ret = Peers_insert_peer (peer);
1197   peer_ctx = get_peer_ctx (peer);
1198   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1199   {
1200     check_peer_live (peer_ctx);
1201   }
1202   return ret;
1203 }
1204
1205
1206 /**
1207  * @brief Check if peer is removable.
1208  *
1209  * Check if
1210  *  - a recv channel exists
1211  *  - there are pending messages
1212  *  - there is no pending pull reply
1213  *
1214  * @param peer the peer in question
1215  * @return #GNUNET_YES    if peer is removable
1216  *         #GNUNET_NO     if peer is NOT removable
1217  *         #GNUNET_SYSERR if peer is not known
1218  */
1219 int
1220 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1221 {
1222   struct PeerContext *peer_ctx;
1223
1224   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1225   {
1226     return GNUNET_SYSERR;
1227   }
1228
1229   peer_ctx = get_peer_ctx (peer);
1230   if ( (NULL != peer_ctx->recv_channel) ||
1231        (NULL != peer_ctx->pending_messages_head) ||
1232        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1233   {
1234     return GNUNET_NO;
1235   }
1236   return GNUNET_YES;
1237 }
1238
1239 uint32_t *
1240 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1241                         enum Peers_ChannelRole role);
1242
1243 int
1244 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1245
1246 /**
1247  * @brief Remove peer
1248  *
1249  * @param peer the peer to clean
1250  * @return #GNUNET_YES if peer was removed
1251  *         #GNUNET_NO  otherwise
1252  */
1253 int
1254 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1255 {
1256   struct PeerContext *peer_ctx;
1257   uint32_t *channel_flag;
1258
1259   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1260   {
1261     return GNUNET_NO;
1262   }
1263
1264   peer_ctx = get_peer_ctx (peer);
1265   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1266   LOG (GNUNET_ERROR_TYPE_DEBUG,
1267        "Going to remove peer %s\n",
1268        GNUNET_i2s (&peer_ctx->peer_id));
1269   Peers_unset_peer_flag (peer, Peers_ONLINE);
1270
1271   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1272   while (NULL != peer_ctx->pending_messages_head)
1273   {
1274     LOG (GNUNET_ERROR_TYPE_DEBUG,
1275         "Removing unsent %s\n",
1276         peer_ctx->pending_messages_head->type);
1277     /* Cancle pending message, too */
1278     if ( (NULL != peer_ctx->liveliness_check_pending) &&
1279          (0 == memcmp (peer_ctx->pending_messages_head,
1280                      peer_ctx->liveliness_check_pending,
1281                      sizeof (struct PendingMessage))) )
1282       {
1283         peer_ctx->liveliness_check_pending = NULL;
1284       }
1285     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1286   }
1287   /* If we are still waiting for notification whether this peer is live
1288    * cancel the according task */
1289   if (NULL != peer_ctx->liveliness_check_pending)
1290   {
1291     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1292          "Removing pending liveliness check for peer %s\n",
1293          GNUNET_i2s (&peer_ctx->peer_id));
1294     // TODO wait until cadet sets mq->cancel_impl
1295     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1296     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
1297     peer_ctx->liveliness_check_pending = NULL;
1298   }
1299   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1300   if (NULL != peer_ctx->send_channel &&
1301       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1302   {
1303     LOG (GNUNET_ERROR_TYPE_DEBUG,
1304         "Destroying send channel\n");
1305     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1306     peer_ctx->send_channel = NULL;
1307     peer_ctx->mq = NULL;
1308   }
1309   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1310   if (NULL != peer_ctx->recv_channel &&
1311       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1312   {
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314         "Destroying recv channel\n");
1315     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1316     peer_ctx->recv_channel = NULL;
1317   }
1318
1319   GNUNET_free (peer_ctx->send_channel_flags);
1320   GNUNET_free (peer_ctx->recv_channel_flags);
1321
1322   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1323   {
1324     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1325   }
1326   GNUNET_free (peer_ctx);
1327   return GNUNET_YES;
1328 }
1329
1330
1331 /**
1332  * @brief set flags on a given peer.
1333  *
1334  * @param peer the peer to set flags on
1335  * @param flags the flags
1336  */
1337 void
1338 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1339 {
1340   struct PeerContext *peer_ctx;
1341
1342   peer_ctx = get_peer_ctx (peer);
1343   set_peer_flag (peer_ctx, flags);
1344 }
1345
1346
1347 /**
1348  * @brief unset flags on a given peer.
1349  *
1350  * @param peer the peer to unset flags on
1351  * @param flags the flags
1352  */
1353 void
1354 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1355 {
1356   struct PeerContext *peer_ctx;
1357
1358   peer_ctx = get_peer_ctx (peer);
1359   unset_peer_flag (peer_ctx, flags);
1360 }
1361
1362
1363 /**
1364  * @brief Check whether flags on a peer are set.
1365  *
1366  * @param peer the peer to check the flag of
1367  * @param flags the flags to check
1368  *
1369  * @return #GNUNET_SYSERR if peer is not known
1370  *         #GNUNET_YES    if all given flags are set
1371  *         #GNUNET_NO     otherwise
1372  */
1373 int
1374 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1375 {
1376   struct PeerContext *peer_ctx;
1377
1378   if (GNUNET_NO == Peers_check_peer_known (peer))
1379   {
1380     return GNUNET_SYSERR;
1381   }
1382   peer_ctx = get_peer_ctx (peer);
1383   return check_peer_flag_set (peer_ctx, flags);
1384 }
1385
1386
1387 /**
1388  * @brief set flags on a given channel.
1389  *
1390  * @param channel the channel to set flags on
1391  * @param flags the flags
1392  */
1393 void
1394 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1395 {
1396   set_channel_flag (channel_flags, flags);
1397 }
1398
1399
1400 /**
1401  * @brief unset flags on a given channel.
1402  *
1403  * @param channel the channel to unset flags on
1404  * @param flags the flags
1405  */
1406 void
1407 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1408 {
1409   unset_channel_flag (channel_flags, flags);
1410 }
1411
1412
1413 /**
1414  * @brief Check whether flags on a channel are set.
1415  *
1416  * @param channel the channel to check the flag of
1417  * @param flags the flags to check
1418  *
1419  * @return #GNUNET_YES if all given flags are set
1420  *         #GNUNET_NO  otherwise
1421  */
1422 int
1423 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1424 {
1425   return check_channel_flag_set (channel_flags, flags);
1426 }
1427
1428 /**
1429  * @brief Get the flags for the channel in @a role for @a peer.
1430  *
1431  * @param peer Peer to get the channel flags for.
1432  * @param role Role of channel to get flags for
1433  *
1434  * @return The flags.
1435  */
1436 uint32_t *
1437 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1438                         enum Peers_ChannelRole role)
1439 {
1440   const struct PeerContext *peer_ctx;
1441
1442   peer_ctx = get_peer_ctx (peer);
1443   if (Peers_CHANNEL_ROLE_SENDING == role)
1444   {
1445     return peer_ctx->send_channel_flags;
1446   }
1447   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1448   {
1449     return peer_ctx->recv_channel_flags;
1450   }
1451   else
1452   {
1453     GNUNET_assert (0);
1454   }
1455 }
1456
1457 /**
1458  * @brief Check whether we have information about the given peer.
1459  *
1460  * FIXME probably deprecated. Make this the new _online.
1461  *
1462  * @param peer peer in question
1463  *
1464  * @return #GNUNET_YES if peer is known
1465  *         #GNUNET_NO  if peer is not knwon
1466  */
1467 int
1468 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1469 {
1470   if (NULL != peer_map)
1471   {
1472     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1473   } else
1474   {
1475     return GNUNET_NO;
1476   }
1477 }
1478
1479
1480 /**
1481  * @brief Check whether @a peer is actually a peer.
1482  *
1483  * A valid peer is a peer that we know exists eg. we were connected to once.
1484  *
1485  * @param peer peer in question
1486  *
1487  * @return #GNUNET_YES if peer is valid
1488  *         #GNUNET_NO  if peer is not valid
1489  */
1490 int
1491 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1492 {
1493   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1494 }
1495
1496
1497 /**
1498  * @brief Indicate that we want to send to the other peer
1499  *
1500  * This establishes a sending channel
1501  *
1502  * @param peer the peer to establish channel to
1503  */
1504 void
1505 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1506 {
1507   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1508   (void) get_channel (peer);
1509 }
1510
1511
1512 /**
1513  * @brief Check whether other peer has the intention to send/opened channel
1514  *        towars us
1515  *
1516  * @param peer the peer in question
1517  *
1518  * @return #GNUNET_YES if peer has the intention to send
1519  *         #GNUNET_NO  otherwise
1520  */
1521 int
1522 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1523 {
1524   const struct PeerContext *peer_ctx;
1525
1526   peer_ctx = get_peer_ctx (peer);
1527   if (NULL != peer_ctx->recv_channel)
1528   {
1529     return GNUNET_YES;
1530   }
1531   return GNUNET_NO;
1532 }
1533
1534
1535 /**
1536  * Handle the channel a peer opens to us.
1537  *
1538  * @param cls The closure
1539  * @param channel The channel the peer wants to establish
1540  * @param initiator The peer's peer ID
1541  *
1542  * @return initial channel context for the channel
1543  *         (can be NULL -- that's not an error)
1544  */
1545 void *
1546 Peers_handle_inbound_channel (void *cls,
1547                               struct GNUNET_CADET_Channel *channel,
1548                               const struct GNUNET_PeerIdentity *initiator)
1549 {
1550   struct PeerContext *peer_ctx;
1551   struct GNUNET_PeerIdentity *ctx_peer;
1552   struct ChannelCtx *channel_ctx;
1553
1554   LOG (GNUNET_ERROR_TYPE_DEBUG,
1555       "New channel was established to us (Peer %s).\n",
1556       GNUNET_i2s (initiator));
1557   GNUNET_assert (NULL != channel); /* according to cadet API */
1558   /* Make sure we 'know' about this peer */
1559   peer_ctx = create_or_get_peer_ctx (initiator);
1560   set_peer_live (peer_ctx);
1561   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1562   *ctx_peer = *initiator;
1563   channel_ctx = GNUNET_new (struct ChannelCtx);
1564   channel_ctx->peer_ctx = peer_ctx;
1565   channel_ctx->channel = channel;
1566   GNUNET_CONTAINER_DLL_insert (channel_ctx_head, channel_ctx_tail, channel_ctx);
1567   /* We only accept one incoming channel per peer */
1568   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1569   {
1570     LOG (GNUNET_ERROR_TYPE_WARNING,
1571         "Already got one receive channel. Destroying old one.\n");
1572     GNUNET_break_op (0);
1573     set_channel_flag (peer_ctx->recv_channel_flags,
1574                       Peers_CHANNEL_ESTABLISHED_TWICE);
1575     //GNUNET_CADET_channel_destroy (channel);
1576     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1577     peer_ctx->recv_channel = channel;
1578     /* return the channel context */
1579     return channel_ctx;
1580   }
1581   peer_ctx->recv_channel = channel;
1582   return channel_ctx;
1583 }
1584
1585
1586 /**
1587  * @brief Check whether a sending channel towards the given peer exists
1588  *
1589  * @param peer the peer to check for
1590  *
1591  * @return #GNUNET_YES if a sending channel towards that peer exists
1592  *         #GNUNET_NO  otherwise
1593  */
1594 int
1595 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1596 {
1597   struct PeerContext *peer_ctx;
1598
1599   if (GNUNET_NO == Peers_check_peer_known (peer))
1600   { /* If no such peer exists, there is no channel */
1601     return GNUNET_NO;
1602   }
1603   peer_ctx = get_peer_ctx (peer);
1604   if (NULL == peer_ctx->send_channel)
1605   {
1606     return GNUNET_NO;
1607   }
1608   return GNUNET_YES;
1609 }
1610
1611
1612 /**
1613  * @brief check whether the given channel is the sending channel of the given
1614  *        peer
1615  *
1616  * @param peer the peer in question
1617  * @param channel the channel to check for
1618  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1619  *                    #Peers_CHANNEL_ROLE_RECEIVING
1620  *
1621  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1622  *         #GNUNET_NO  otherwise
1623  */
1624 int
1625 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1626                           const struct GNUNET_CADET_Channel *channel,
1627                           enum Peers_ChannelRole role)
1628 {
1629   const struct PeerContext *peer_ctx;
1630
1631   if (GNUNET_NO == Peers_check_peer_known (peer))
1632   {
1633     return GNUNET_NO;
1634   }
1635   peer_ctx = get_peer_ctx (peer);
1636   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1637        (channel == peer_ctx->send_channel) )
1638   {
1639     return GNUNET_YES;
1640   }
1641   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1642        (channel == peer_ctx->recv_channel) )
1643   {
1644     return GNUNET_YES;
1645   }
1646   return GNUNET_NO;
1647 }
1648
1649
1650 /**
1651  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1652  *        intention to another peer
1653  *
1654  * If there is also no channel to receive messages from that peer, remove it
1655  * from the peermap.
1656  * TODO really?
1657  *
1658  * @peer the peer identity of the peer whose sending channel to destroy
1659  * @return #GNUNET_YES if channel was destroyed
1660  *         #GNUNET_NO  otherwise
1661  */
1662 int
1663 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1664 {
1665   struct PeerContext *peer_ctx;
1666
1667   if (GNUNET_NO == Peers_check_peer_known (peer))
1668   {
1669     return GNUNET_NO;
1670   }
1671   peer_ctx = get_peer_ctx (peer);
1672   if (NULL != peer_ctx->send_channel)
1673   {
1674     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1675     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1676     peer_ctx->send_channel = NULL;
1677     peer_ctx->mq = NULL;
1678     (void) Peers_check_connected (peer);
1679     return GNUNET_YES;
1680   }
1681   return GNUNET_NO;
1682 }
1683
1684 /**
1685  * This is called when a channel is destroyed.
1686  *
1687  * @param cls The closure
1688  * @param channel The channel being closed
1689  */
1690 void
1691 Peers_cleanup_destroyed_channel (void *cls,
1692                                  const struct GNUNET_CADET_Channel *channel)
1693 {
1694   struct ChannelCtx *channel_ctx = cls;
1695   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
1696   struct PeerContext *peer_ctx;
1697   uint32_t *channel_flag;
1698
1699   if (GNUNET_NO == Peers_check_peer_known (peer))
1700   {/* We don't want to implicitly create a context that we're about to kill */
1701   LOG (GNUNET_ERROR_TYPE_WARNING,
1702        "channel (%s) without associated context was destroyed\n",
1703        GNUNET_i2s (peer));
1704     return;
1705   }
1706   peer_ctx = get_peer_ctx (peer);
1707
1708   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1709    * flag will be set. In this case simply make sure that the channels are
1710    * cleaned. */
1711   /* FIXME This distinction seems to be redundant */
1712   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1713   {/* We initiatad the destruction of this particular peer */
1714     LOG (GNUNET_ERROR_TYPE_DEBUG,
1715         "Peer is in the process of being destroyed\n");
1716     if (channel == peer_ctx->send_channel)
1717     {
1718       peer_ctx->send_channel = NULL;
1719       peer_ctx->mq = NULL;
1720     }
1721     else if (channel == peer_ctx->recv_channel)
1722     {
1723       peer_ctx->recv_channel = NULL;
1724     }
1725
1726     if (NULL != peer_ctx->send_channel)
1727     {
1728       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1729       channel_flag = Peers_get_channel_flag (&peer_ctx->peer_id, Peers_CHANNEL_ROLE_SENDING);
1730       Peers_set_channel_flag (channel_flag, Peers_CHANNEL_DESTROING);
1731       peer_ctx->send_channel = NULL;
1732       peer_ctx->mq = NULL;
1733     }
1734     if (NULL != peer_ctx->recv_channel)
1735     {
1736       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1737       channel_flag = Peers_get_channel_flag (&peer_ctx->peer_id, Peers_CHANNEL_ROLE_RECEIVING);
1738       Peers_set_channel_flag (channel_flag, Peers_CHANNEL_DESTROING);
1739       peer_ctx->recv_channel = NULL;
1740     }
1741     /* Set the #Peers_ONLINE flag accordingly */
1742     (void) Peers_check_connected (peer);
1743     return;
1744   }
1745
1746   else
1747   { /* We did not initiate the destruction of this peer */
1748     LOG (GNUNET_ERROR_TYPE_DEBUG,
1749         "Peer is NOT in the process of being destroyed\n");
1750     if (channel == peer_ctx->send_channel)
1751     { /* Something (but us) killd the channel - clean up peer */
1752       LOG (GNUNET_ERROR_TYPE_DEBUG,
1753           "send channel (%s) was destroyed - cleaning up\n",
1754           GNUNET_i2s (peer));
1755       peer_ctx->send_channel = NULL;
1756       peer_ctx->mq = NULL;
1757     }
1758     else if (channel == peer_ctx->recv_channel)
1759     { /* Other peer doesn't want to send us messages anymore */
1760       LOG (GNUNET_ERROR_TYPE_DEBUG,
1761            "Peer %s destroyed recv channel - cleaning up channel\n",
1762            GNUNET_i2s (peer));
1763       peer_ctx->recv_channel = NULL;
1764     }
1765     else
1766     {
1767       LOG (GNUNET_ERROR_TYPE_WARNING,
1768            "unknown channel (%s) was destroyed\n",
1769            GNUNET_i2s (peer));
1770     }
1771   }
1772   (void) Peers_check_connected (peer);
1773 }
1774
1775 /**
1776  * @brief Send a message to another peer.
1777  *
1778  * Keeps track about pending messages so they can be properly removed when the
1779  * peer is destroyed.
1780  *
1781  * @param peer receeiver of the message
1782  * @param ev envelope of the message
1783  * @param type type of the message
1784  */
1785 void
1786 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1787                     struct GNUNET_MQ_Envelope *ev,
1788                     const char *type)
1789 {
1790   struct PendingMessage *pending_msg;
1791   struct GNUNET_MQ_Handle *mq;
1792
1793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794               "Sending message to %s of type %s\n",
1795               GNUNET_i2s (peer),
1796               type);
1797   pending_msg = insert_pending_message (peer, ev, type);
1798   mq = get_mq (peer);
1799   GNUNET_MQ_notify_sent (ev,
1800                          mq_notify_sent_cb,
1801                          pending_msg);
1802   GNUNET_MQ_send (mq, ev);
1803 }
1804
1805 /**
1806  * @brief Schedule a operation on given peer
1807  *
1808  * Avoids scheduling an operation twice.
1809  *
1810  * @param peer the peer we want to schedule the operation for once it gets live
1811  *
1812  * @return #GNUNET_YES if the operation was scheduled
1813  *         #GNUNET_NO  otherwise
1814  */
1815 int
1816 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1817                           const PeerOp peer_op)
1818 {
1819   struct PeerPendingOp pending_op;
1820   struct PeerContext *peer_ctx;
1821
1822   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1823
1824   //TODO if LIVE/ONLINE execute immediately
1825
1826   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1827   {
1828     peer_ctx = get_peer_ctx (peer);
1829     pending_op.op = peer_op;
1830     pending_op.op_cls = NULL;
1831     GNUNET_array_append (peer_ctx->pending_ops,
1832                          peer_ctx->num_pending_ops,
1833                          pending_op);
1834     return GNUNET_YES;
1835   }
1836   return GNUNET_NO;
1837 }
1838
1839 /**
1840  * @brief Get the recv_channel of @a peer.
1841  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1842  * messages.
1843  *
1844  * @param peer The peer to get the recv_channel from.
1845  *
1846  * @return The recv_channel.
1847  */
1848 struct GNUNET_CADET_Channel *
1849 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1850 {
1851   struct PeerContext *peer_ctx;
1852
1853   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1854   peer_ctx = get_peer_ctx (peer);
1855   return peer_ctx->recv_channel;
1856 }
1857 /***********************************************************************
1858  * /Old gnunet-service-rps_peers.c
1859 ***********************************************************************/
1860
1861
1862 /***********************************************************************
1863  * Housekeeping with clients
1864 ***********************************************************************/
1865
1866 /**
1867  * Closure used to pass the client and the id to the callback
1868  * that replies to a client's request
1869  */
1870 struct ReplyCls
1871 {
1872   /**
1873    * DLL
1874    */
1875   struct ReplyCls *next;
1876   struct ReplyCls *prev;
1877
1878   /**
1879    * The identifier of the request
1880    */
1881   uint32_t id;
1882
1883   /**
1884    * The handle to the request
1885    */
1886   struct RPS_SamplerRequestHandle *req_handle;
1887
1888   /**
1889    * The client handle to send the reply to
1890    */
1891   struct ClientContext *cli_ctx;
1892 };
1893
1894
1895 /**
1896  * Struct used to store the context of a connected client.
1897  */
1898 struct ClientContext
1899 {
1900   /**
1901    * DLL
1902    */
1903   struct ClientContext *next;
1904   struct ClientContext *prev;
1905
1906   /**
1907    * The message queue to communicate with the client.
1908    */
1909   struct GNUNET_MQ_Handle *mq;
1910
1911   /**
1912    * DLL with handles to single requests from the client
1913    */
1914   struct ReplyCls *rep_cls_head;
1915   struct ReplyCls *rep_cls_tail;
1916
1917   /**
1918    * @brief How many updates this client expects to receive.
1919    */
1920   int64_t view_updates_left;
1921
1922   /**
1923    * The client handle to send the reply to
1924    */
1925   struct GNUNET_SERVICE_Client *client;
1926 };
1927
1928 /**
1929  * DLL with all clients currently connected to us
1930  */
1931 struct ClientContext *cli_ctx_head;
1932 struct ClientContext *cli_ctx_tail;
1933
1934 /***********************************************************************
1935  * /Housekeeping with clients
1936 ***********************************************************************/
1937
1938
1939
1940
1941
1942 /***********************************************************************
1943  * Globals
1944 ***********************************************************************/
1945
1946 /**
1947  * Sampler used for the Brahms protocol itself.
1948  */
1949 static struct RPS_Sampler *prot_sampler;
1950
1951 /**
1952  * Sampler used for the clients.
1953  */
1954 static struct RPS_Sampler *client_sampler;
1955
1956 /**
1957  * Name to log view to
1958  */
1959 static const char *file_name_view_log;
1960
1961 #ifdef TO_FILE
1962 /**
1963  * Name to log number of observed peers to
1964  */
1965 static const char *file_name_observed_log;
1966
1967 /**
1968  * @brief Count the observed peers
1969  */
1970 static uint32_t num_observed_peers;
1971
1972 /**
1973  * @brief Multipeermap (ab-) used to count unique peer_ids
1974  */
1975 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1976 #endif /* TO_FILE */
1977
1978 /**
1979  * The size of sampler we need to be able to satisfy the client's need
1980  * of random peers.
1981  */
1982 static unsigned int sampler_size_client_need;
1983
1984 /**
1985  * The size of sampler we need to be able to satisfy the Brahms protocol's
1986  * need of random peers.
1987  *
1988  * This is one minimum size the sampler grows to.
1989  */
1990 static unsigned int sampler_size_est_need;
1991
1992 /**
1993  * @brief This is the minimum estimate used as sampler size.
1994  *
1995  * It is configured by the user.
1996  */
1997 static unsigned int sampler_size_est_min;
1998
1999 /**
2000  * @brief This is the estimate used as view size.
2001  *
2002  * It is initialised with the minimum
2003  */
2004 static unsigned int view_size_est_need;
2005
2006 /**
2007  * @brief This is the minimum estimate used as view size.
2008  *
2009  * It is configured by the user.
2010  */
2011 static unsigned int view_size_est_min;
2012
2013 /**
2014  * Percentage of total peer number in the view
2015  * to send random PUSHes to
2016  */
2017 static float alpha;
2018
2019 /**
2020  * Percentage of total peer number in the view
2021  * to send random PULLs to
2022  */
2023 static float beta;
2024
2025 /**
2026  * Identifier for the main task that runs periodically.
2027  */
2028 static struct GNUNET_SCHEDULER_Task *do_round_task;
2029
2030 /**
2031  * Time inverval the do_round task runs in.
2032  */
2033 static struct GNUNET_TIME_Relative round_interval;
2034
2035 /**
2036  * List to store peers received through pushes temporary.
2037  */
2038 static struct CustomPeerMap *push_map;
2039
2040 /**
2041  * List to store peers received through pulls temporary.
2042  */
2043 static struct CustomPeerMap *pull_map;
2044
2045 /**
2046  * Handler to NSE.
2047  */
2048 static struct GNUNET_NSE_Handle *nse;
2049
2050 /**
2051  * Handler to CADET.
2052  */
2053 static struct GNUNET_CADET_Handle *cadet_handle;
2054
2055 /**
2056  * @brief Port to communicate to other peers.
2057  */
2058 static struct GNUNET_CADET_Port *cadet_port;
2059
2060 /**
2061  * Handler to PEERINFO.
2062  */
2063 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2064
2065 /**
2066  * Handle for cancellation of iteration over peers.
2067  */
2068 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2069
2070 /**
2071  * Request counter.
2072  *
2073  * Counts how many requets clients already issued.
2074  * Only needed in the beginning to check how many of the 64 deltas
2075  * we already have
2076  */
2077 static unsigned int req_counter;
2078
2079 /**
2080  * Time of the last request we received.
2081  *
2082  * Used to compute the expected request rate.
2083  */
2084 static struct GNUNET_TIME_Absolute last_request;
2085
2086 /**
2087  * Size of #request_deltas.
2088  */
2089 #define REQUEST_DELTAS_SIZE 64
2090 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2091
2092 /**
2093  * Last 64 deltas between requests
2094  */
2095 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2096
2097 /**
2098  * The prediction of the rate of requests
2099  */
2100 static struct GNUNET_TIME_Relative request_rate;
2101
2102
2103 #ifdef ENABLE_MALICIOUS
2104 /**
2105  * Type of malicious peer
2106  *
2107  * 0 Don't act malicious at all - Default
2108  * 1 Try to maximise representation
2109  * 2 Try to partition the network
2110  * 3 Combined attack
2111  */
2112 static uint32_t mal_type;
2113
2114 /**
2115  * Other malicious peers
2116  */
2117 static struct GNUNET_PeerIdentity *mal_peers;
2118
2119 /**
2120  * Hashmap of malicious peers used as set.
2121  * Used to more efficiently check whether we know that peer.
2122  */
2123 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2124
2125 /**
2126  * Number of other malicious peers
2127  */
2128 static uint32_t num_mal_peers;
2129
2130
2131 /**
2132  * If type is 2 This struct is used to store the attacked peers in a DLL
2133  */
2134 struct AttackedPeer
2135 {
2136   /**
2137    * DLL
2138    */
2139   struct AttackedPeer *next;
2140   struct AttackedPeer *prev;
2141
2142   /**
2143    * PeerID
2144    */
2145   struct GNUNET_PeerIdentity peer_id;
2146 };
2147
2148 /**
2149  * If type is 2 this is the DLL of attacked peers
2150  */
2151 static struct AttackedPeer *att_peers_head;
2152 static struct AttackedPeer *att_peers_tail;
2153
2154 /**
2155  * This index is used to point to an attacked peer to
2156  * implement the round-robin-ish way to select attacked peers.
2157  */
2158 static struct AttackedPeer *att_peer_index;
2159
2160 /**
2161  * Hashmap of attacked peers used as set.
2162  * Used to more efficiently check whether we know that peer.
2163  */
2164 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2165
2166 /**
2167  * Number of attacked peers
2168  */
2169 static uint32_t num_attacked_peers;
2170
2171 /**
2172  * If type is 1 this is the attacked peer
2173  */
2174 static struct GNUNET_PeerIdentity attacked_peer;
2175
2176 /**
2177  * The limit of PUSHes we can send in one round.
2178  * This is an assumption of the Brahms protocol and either implemented
2179  * via proof of work
2180  * or
2181  * assumend to be the bandwidth limitation.
2182  */
2183 static uint32_t push_limit = 10000;
2184 #endif /* ENABLE_MALICIOUS */
2185
2186
2187 /***********************************************************************
2188  * /Globals
2189 ***********************************************************************/
2190
2191
2192 /***********************************************************************
2193  * Util functions
2194 ***********************************************************************/
2195
2196
2197 /**
2198  * Print peerlist to log.
2199  */
2200 static void
2201 print_peer_list (struct GNUNET_PeerIdentity *list,
2202                  unsigned int len)
2203 {
2204   unsigned int i;
2205
2206   LOG (GNUNET_ERROR_TYPE_DEBUG,
2207        "Printing peer list of length %u at %p:\n",
2208        len,
2209        list);
2210   for (i = 0 ; i < len ; i++)
2211   {
2212     LOG (GNUNET_ERROR_TYPE_DEBUG,
2213          "%u. peer: %s\n",
2214          i, GNUNET_i2s (&list[i]));
2215   }
2216 }
2217
2218
2219 /**
2220  * Remove peer from list.
2221  */
2222 static void
2223 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2224                unsigned int *list_size,
2225                const struct GNUNET_PeerIdentity *peer)
2226 {
2227   unsigned int i;
2228   struct GNUNET_PeerIdentity *tmp;
2229
2230   tmp = *peer_list;
2231
2232   LOG (GNUNET_ERROR_TYPE_DEBUG,
2233        "Removing peer %s from list at %p\n",
2234        GNUNET_i2s (peer),
2235        tmp);
2236
2237   for ( i = 0 ; i < *list_size ; i++ )
2238   {
2239     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2240     {
2241       if (i < *list_size -1)
2242       { /* Not at the last entry -- shift peers left */
2243         memmove (&tmp[i], &tmp[i +1],
2244                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2245       }
2246       /* Remove last entry (should be now useless PeerID) */
2247       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2248     }
2249   }
2250   *peer_list = tmp;
2251 }
2252
2253
2254 /**
2255  * Sum all time relatives of an array.
2256  */
2257 static struct GNUNET_TIME_Relative
2258 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2259                 uint32_t arr_size)
2260 {
2261   struct GNUNET_TIME_Relative sum;
2262   uint32_t i;
2263
2264   sum = GNUNET_TIME_UNIT_ZERO;
2265   for ( i = 0 ; i < arr_size ; i++ )
2266   {
2267     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2268   }
2269   return sum;
2270 }
2271
2272
2273 /**
2274  * Compute the average of given time relatives.
2275  */
2276 static struct GNUNET_TIME_Relative
2277 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2278                 uint32_t arr_size)
2279 {
2280   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2281                                                       arr_size),
2282                                       arr_size);
2283 }
2284
2285
2286 /**
2287  * Insert PeerID in #view
2288  *
2289  * Called once we know a peer is live.
2290  * Implements #PeerOp
2291  *
2292  * @return GNUNET_OK if peer was actually inserted
2293  *         GNUNET_NO if peer was not inserted
2294  */
2295 static void
2296 insert_in_view_op (void *cls,
2297                 const struct GNUNET_PeerIdentity *peer);
2298
2299 /**
2300  * Insert PeerID in #view
2301  *
2302  * Called once we know a peer is live.
2303  *
2304  * @return GNUNET_OK if peer was actually inserted
2305  *         GNUNET_NO if peer was not inserted
2306  */
2307 static int
2308 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2309 {
2310   int online;
2311
2312   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2313   if ( (GNUNET_NO == online) ||
2314        (GNUNET_SYSERR == online) ) /* peer is not even known */
2315   {
2316     (void) Peers_issue_peer_liveliness_check (peer);
2317     (void) Peers_schedule_operation (peer, insert_in_view_op);
2318     return GNUNET_NO;
2319   }
2320   /* Open channel towards peer to keep connection open */
2321   Peers_indicate_sending_intention (peer);
2322   return View_put (peer);
2323 }
2324
2325 /**
2326  * @brief sends updates to clients that are interested
2327  */
2328 static void
2329 clients_notify_view_update (void);
2330
2331 /**
2332  * Put random peer from sampler into the view as history update.
2333  */
2334 static void
2335 hist_update (void *cls,
2336              struct GNUNET_PeerIdentity *ids,
2337              uint32_t num_peers)
2338 {
2339   unsigned int i;
2340
2341   for (i = 0; i < num_peers; i++)
2342   {
2343     (void) insert_in_view (&ids[i]);
2344     to_file (file_name_view_log,
2345              "+%s\t(hist)",
2346              GNUNET_i2s_full (ids));
2347   }
2348   clients_notify_view_update();
2349 }
2350
2351
2352 /**
2353  * Wrapper around #RPS_sampler_resize()
2354  *
2355  * If we do not have enough sampler elements, double current sampler size
2356  * If we have more than enough sampler elements, halv current sampler size
2357  */
2358 static void
2359 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2360 {
2361   unsigned int sampler_size;
2362
2363   // TODO statistics
2364   // TODO respect the min, max
2365   sampler_size = RPS_sampler_get_size (sampler);
2366   if (sampler_size > new_size * 4)
2367   { /* Shrinking */
2368     RPS_sampler_resize (sampler, sampler_size / 2);
2369   }
2370   else if (sampler_size < new_size)
2371   { /* Growing */
2372     RPS_sampler_resize (sampler, sampler_size * 2);
2373   }
2374   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2375 }
2376
2377
2378 /**
2379  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2380  */
2381 static void
2382 client_resize_wrapper ()
2383 {
2384   uint32_t bigger_size;
2385
2386   // TODO statistics
2387
2388   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2389
2390   // TODO respect the min, max
2391   resize_wrapper (client_sampler, bigger_size);
2392   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2393       bigger_size);
2394 }
2395
2396
2397 /**
2398  * Estimate request rate
2399  *
2400  * Called every time we receive a request from the client.
2401  */
2402 static void
2403 est_request_rate()
2404 {
2405   struct GNUNET_TIME_Relative max_round_duration;
2406
2407   if (request_deltas_size > req_counter)
2408     req_counter++;
2409   if ( 1 < req_counter)
2410   {
2411     /* Shift last request deltas to the right */
2412     memmove (&request_deltas[1],
2413         request_deltas,
2414         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2415
2416     /* Add current delta to beginning */
2417     request_deltas[0] =
2418         GNUNET_TIME_absolute_get_difference (last_request,
2419                                              GNUNET_TIME_absolute_get ());
2420     request_rate = T_relative_avg (request_deltas, req_counter);
2421     request_rate = (request_rate.rel_value_us < 1) ?
2422       GNUNET_TIME_relative_get_unit_ () : request_rate;
2423
2424     /* Compute the duration a round will maximally take */
2425     max_round_duration =
2426         GNUNET_TIME_relative_add (round_interval,
2427                                   GNUNET_TIME_relative_divide (round_interval, 2));
2428
2429     /* Set the estimated size the sampler has to have to
2430      * satisfy the current client request rate */
2431     sampler_size_client_need =
2432         max_round_duration.rel_value_us / request_rate.rel_value_us;
2433
2434     /* Resize the sampler */
2435     client_resize_wrapper ();
2436   }
2437   last_request = GNUNET_TIME_absolute_get ();
2438 }
2439
2440
2441 /**
2442  * Add all peers in @a peer_array to @a peer_map used as set.
2443  *
2444  * @param peer_array array containing the peers
2445  * @param num_peers number of peers in @peer_array
2446  * @param peer_map the peermap to use as set
2447  */
2448 static void
2449 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2450                        unsigned int num_peers,
2451                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2452 {
2453   unsigned int i;
2454   if (NULL == peer_map)
2455   {
2456     LOG (GNUNET_ERROR_TYPE_WARNING,
2457          "Trying to add peers to non-existing peermap.\n");
2458     return;
2459   }
2460
2461   for (i = 0; i < num_peers; i++)
2462   {
2463     GNUNET_CONTAINER_multipeermap_put (peer_map,
2464                                        &peer_array[i],
2465                                        NULL,
2466                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2467   }
2468 }
2469
2470
2471 /**
2472  * Send a PULL REPLY to @a peer_id
2473  *
2474  * @param peer_id the peer to send the reply to.
2475  * @param peer_ids the peers to send to @a peer_id
2476  * @param num_peer_ids the number of peers to send to @a peer_id
2477  */
2478 static void
2479 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2480                  const struct GNUNET_PeerIdentity *peer_ids,
2481                  unsigned int num_peer_ids)
2482 {
2483   uint32_t send_size;
2484   struct GNUNET_MQ_Envelope *ev;
2485   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2486
2487   /* Compute actual size */
2488   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2489               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2490
2491   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2492     /* Compute number of peers to send
2493      * If too long, simply truncate */
2494     // TODO select random ones via permutation
2495     //      or even better: do good protocol design
2496     send_size =
2497       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2498        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2499        sizeof (struct GNUNET_PeerIdentity);
2500   else
2501     send_size = num_peer_ids;
2502
2503   LOG (GNUNET_ERROR_TYPE_DEBUG,
2504       "Going to send PULL REPLY with %u peers to %s\n",
2505       send_size, GNUNET_i2s (peer_id));
2506
2507   ev = GNUNET_MQ_msg_extra (out_msg,
2508                             send_size * sizeof (struct GNUNET_PeerIdentity),
2509                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2510   out_msg->num_peers = htonl (send_size);
2511   GNUNET_memcpy (&out_msg[1], peer_ids,
2512          send_size * sizeof (struct GNUNET_PeerIdentity));
2513
2514   Peers_send_message (peer_id, ev, "PULL REPLY");
2515   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2516 }
2517
2518
2519 /**
2520  * Insert PeerID in #pull_map
2521  *
2522  * Called once we know a peer is live.
2523  */
2524 static void
2525 insert_in_pull_map (void *cls,
2526                     const struct GNUNET_PeerIdentity *peer)
2527 {
2528   CustomPeerMap_put (pull_map, peer);
2529 }
2530
2531
2532 /**
2533  * Insert PeerID in #view
2534  *
2535  * Called once we know a peer is live.
2536  * Implements #PeerOp
2537  */
2538 static void
2539 insert_in_view_op (void *cls,
2540                 const struct GNUNET_PeerIdentity *peer)
2541 {
2542   (void) insert_in_view (peer);
2543 }
2544
2545
2546 /**
2547  * Update sampler with given PeerID.
2548  * Implements #PeerOp
2549  */
2550 static void
2551 insert_in_sampler (void *cls,
2552                    const struct GNUNET_PeerIdentity *peer)
2553 {
2554   LOG (GNUNET_ERROR_TYPE_DEBUG,
2555        "Updating samplers with peer %s from insert_in_sampler()\n",
2556        GNUNET_i2s (peer));
2557   RPS_sampler_update (prot_sampler,   peer);
2558   RPS_sampler_update (client_sampler, peer);
2559   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2560   {
2561     /* Make sure we 'know' about this peer */
2562     (void) Peers_issue_peer_liveliness_check (peer);
2563     /* Establish a channel towards that peer to indicate we are going to send
2564      * messages to it */
2565     //Peers_indicate_sending_intention (peer);
2566   }
2567   #ifdef TO_FILE
2568   num_observed_peers++;
2569   GNUNET_CONTAINER_multipeermap_put
2570     (observed_unique_peers,
2571      peer,
2572      NULL,
2573      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2574   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2575       observed_unique_peers);
2576   to_file (file_name_observed_log,
2577           "%" PRIu32 " %" PRIu32 " %f\n",
2578           num_observed_peers,
2579           num_observed_unique_peers,
2580           1.0*num_observed_unique_peers/num_observed_peers)
2581   #endif /* TO_FILE */
2582 }
2583
2584 /**
2585  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2586  *        If the peer is not known, liveliness check is issued and it is
2587  *        scheduled to be inserted in sampler and view.
2588  *
2589  * "External sources" refer to every source except the gossip.
2590  *
2591  * @param peer peer to insert
2592  */
2593 static void
2594 got_peer (const struct GNUNET_PeerIdentity *peer)
2595 {
2596   /* If we did not know this peer already, insert it into sampler and view */
2597   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2598   {
2599     Peers_schedule_operation (peer, insert_in_sampler);
2600     Peers_schedule_operation (peer, insert_in_view_op);
2601   }
2602 }
2603
2604 /**
2605  * @brief Checks if there is a sending channel and if it is needed
2606  *
2607  * @param peer the peer whose sending channel is checked
2608  * @return GNUNET_YES if sending channel exists and is still needed
2609  *         GNUNET_NO  otherwise
2610  */
2611 static int
2612 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2613 {
2614   /* struct GNUNET_CADET_Channel *channel; */
2615   if (GNUNET_NO == Peers_check_peer_known (peer))
2616   {
2617     return GNUNET_NO;
2618   }
2619   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2620   {
2621     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2622          (GNUNET_YES == View_contains_peer (peer)) ||
2623          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2624          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2625          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2626     { /* If we want to keep the connection to peer open */
2627       return GNUNET_YES;
2628     }
2629     return GNUNET_NO;
2630   }
2631   return GNUNET_NO;
2632 }
2633
2634 /**
2635  * @brief remove peer from our knowledge, the view, push and pull maps and
2636  * samplers.
2637  *
2638  * @param peer the peer to remove
2639  */
2640 static void
2641 remove_peer (const struct GNUNET_PeerIdentity *peer)
2642 {
2643   (void) View_remove_peer (peer);
2644   CustomPeerMap_remove_peer (pull_map, peer);
2645   CustomPeerMap_remove_peer (push_map, peer);
2646   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2647   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2648   Peers_remove_peer (peer);
2649 }
2650
2651
2652 /**
2653  * @brief Remove data that is not needed anymore.
2654  *
2655  * If the sending channel is no longer needed it is destroyed.
2656  *
2657  * @param peer the peer whose data is about to be cleaned
2658  */
2659 static void
2660 clean_peer (const struct GNUNET_PeerIdentity *peer)
2661 {
2662   if (GNUNET_NO == check_sending_channel_needed (peer))
2663   {
2664     LOG (GNUNET_ERROR_TYPE_DEBUG,
2665         "Going to remove send channel to peer %s\n",
2666         GNUNET_i2s (peer));
2667     #ifdef ENABLE_MALICIOUS
2668     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2669       (void) Peers_destroy_sending_channel (peer);
2670     #else /* ENABLE_MALICIOUS */
2671     (void) Peers_destroy_sending_channel (peer);
2672     #endif /* ENABLE_MALICIOUS */
2673   }
2674
2675   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2676        (GNUNET_NO == View_contains_peer (peer)) &&
2677        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2678        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2679        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2680        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2681        (GNUNET_NO != Peers_check_removable (peer)) )
2682   { /* We can safely remove this peer */
2683     LOG (GNUNET_ERROR_TYPE_DEBUG,
2684         "Going to remove peer %s\n",
2685         GNUNET_i2s (peer));
2686     remove_peer (peer);
2687     return;
2688   }
2689 }
2690
2691 /**
2692  * @brief This is called when a channel is destroyed.
2693  *
2694  * Removes peer completely from our knowledge if the send_channel was destroyed
2695  * Otherwise simply delete the recv_channel
2696  * Also check if the knowledge about this peer is still needed.
2697  * If not, remove this peer from our knowledge.
2698  *
2699  * @param cls The closure
2700  * @param channel The channel being closed
2701  * @param channel_ctx The context associated with this channel
2702  */
2703 static void
2704 cleanup_destroyed_channel (void *cls,
2705                            const struct GNUNET_CADET_Channel *channel)
2706 {
2707   struct ChannelCtx *channel_ctx = cls; // FIXME: free this context!
2708   struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2709   uint32_t *channel_flag;
2710   struct PeerContext *peer_ctx;
2711
2712   GNUNET_assert (NULL != peer);
2713
2714   if (GNUNET_NO == Peers_check_peer_known (peer))
2715   { /* We don't know a context to that peer */
2716     LOG (GNUNET_ERROR_TYPE_WARNING,
2717          "channel (%s) without associated context was destroyed\n",
2718          GNUNET_i2s (peer));
2719     GNUNET_free (peer);
2720     return;
2721   }
2722
2723   peer_ctx = get_peer_ctx (peer);
2724   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2725   {
2726     LOG (GNUNET_ERROR_TYPE_DEBUG,
2727         "Callback on destruction of recv-channel was called (%s)\n",
2728         GNUNET_i2s (peer));
2729     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2730   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2731   {
2732     LOG (GNUNET_ERROR_TYPE_DEBUG,
2733         "Callback on destruction of send-channel was called (%s)\n",
2734         GNUNET_i2s (peer));
2735     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2736   } else {
2737     LOG (GNUNET_ERROR_TYPE_ERROR,
2738         "Channel to be destroyed has is neither sending nor receiving role\n");
2739   }
2740
2741   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2742   { /* We are in the middle of removing that peer from our knowledge. In this
2743        case simply make sure that the channels are cleaned. */
2744     Peers_cleanup_destroyed_channel (cls, channel);
2745     to_file (file_name_view_log,
2746              "-%s\t(cleanup channel, ourself)",
2747              GNUNET_i2s_full (peer));
2748     GNUNET_free (peer);
2749     return;
2750   }
2751
2752   if (GNUNET_YES ==
2753       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2754   { /* Channel used for sending was destroyed */
2755     /* Possible causes of channel destruction:
2756      *  - ourselves  -> cleaning send channel -> clean context
2757      *  - other peer -> peer probably went down -> remove
2758      */
2759     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2760     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2761     { /* We are about to clean the sending channel. Clean the respective
2762        * context */
2763       Peers_cleanup_destroyed_channel (cls, channel);
2764       GNUNET_free (peer);
2765       return;
2766     }
2767     else
2768     { /* Other peer destroyed our sending channel that it is supposed to keep
2769        * open. It probably went down. Remove it from our knowledge. */
2770       Peers_cleanup_destroyed_channel (cls, channel);
2771       remove_peer (peer);
2772       GNUNET_free (peer);
2773       return;
2774     }
2775   }
2776   else if (GNUNET_YES ==
2777       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2778   { /* Channel used for receiving was destroyed */
2779     /* Possible causes of channel destruction:
2780      *  - ourselves  -> peer tried to establish channel twice -> clean context
2781      *  - other peer -> peer doesn't want to send us data -> clean
2782      */
2783     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2784     if (GNUNET_YES ==
2785         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2786     { /* Other peer tried to establish a channel to us twice. We do not accept
2787        * that. Clean the context. */
2788       Peers_cleanup_destroyed_channel (cls, channel);
2789       GNUNET_free (peer);
2790       return;
2791     }
2792     else
2793     { /* Other peer doesn't want to send us data anymore. We are free to clean
2794        * it. */
2795       Peers_cleanup_destroyed_channel (cls, channel);
2796       clean_peer (peer);
2797       GNUNET_free (peer);
2798       return;
2799     }
2800   }
2801   else
2802   {
2803     LOG (GNUNET_ERROR_TYPE_WARNING,
2804         "Destroyed channel is neither sending nor receiving channel\n");
2805   }
2806   GNUNET_free (peer);
2807 }
2808
2809 /***********************************************************************
2810  * /Util functions
2811 ***********************************************************************/
2812
2813 static void
2814 destroy_reply_cls (struct ReplyCls *rep_cls)
2815 {
2816   struct ClientContext *cli_ctx;
2817
2818   cli_ctx = rep_cls->cli_ctx;
2819   GNUNET_assert (NULL != cli_ctx);
2820   if (NULL != rep_cls->req_handle)
2821   {
2822     RPS_sampler_request_cancel (rep_cls->req_handle);
2823   }
2824   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2825                                cli_ctx->rep_cls_tail,
2826                                rep_cls);
2827   GNUNET_free (rep_cls);
2828 }
2829
2830
2831 static void
2832 destroy_cli_ctx (struct ClientContext *cli_ctx)
2833 {
2834   GNUNET_assert (NULL != cli_ctx);
2835   if (NULL != cli_ctx->rep_cls_head)
2836   {
2837     LOG (GNUNET_ERROR_TYPE_WARNING,
2838          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2839     while (NULL != cli_ctx->rep_cls_head)
2840       destroy_reply_cls (cli_ctx->rep_cls_head);
2841   }
2842   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2843                                cli_ctx_tail,
2844                                cli_ctx);
2845   GNUNET_free (cli_ctx);
2846 }
2847
2848
2849 /**
2850  * Function called by NSE.
2851  *
2852  * Updates sizes of sampler list and view and adapt those lists
2853  * accordingly.
2854  */
2855 static void
2856 nse_callback (void *cls,
2857               struct GNUNET_TIME_Absolute timestamp,
2858               double logestimate, double std_dev)
2859 {
2860   double estimate;
2861   //double scale; // TODO this might go gloabal/config
2862
2863   LOG (GNUNET_ERROR_TYPE_DEBUG,
2864        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2865        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2866   //scale = .01;
2867   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2868   // GNUNET_NSE_log_estimate_to_n (logestimate);
2869   estimate = pow (estimate, 1.0 / 3);
2870   // TODO add if std_dev is a number
2871   // estimate += (std_dev * scale);
2872   if (view_size_est_min < ceil (estimate))
2873   {
2874     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2875     sampler_size_est_need = estimate;
2876     view_size_est_need = estimate;
2877   } else
2878   {
2879     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2880     //sampler_size_est_need = view_size_est_min;
2881     view_size_est_need = view_size_est_min;
2882   }
2883
2884   /* If the NSE has changed adapt the lists accordingly */
2885   resize_wrapper (prot_sampler, sampler_size_est_need);
2886   client_resize_wrapper ();
2887 }
2888
2889
2890 /**
2891  * Callback called once the requested PeerIDs are ready.
2892  *
2893  * Sends those to the requesting client.
2894  */
2895 static void
2896 client_respond (void *cls,
2897                 struct GNUNET_PeerIdentity *peer_ids,
2898                 uint32_t num_peers)
2899 {
2900   struct ReplyCls *reply_cls = cls;
2901   uint32_t i;
2902   struct GNUNET_MQ_Envelope *ev;
2903   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2904   uint32_t size_needed;
2905   struct ClientContext *cli_ctx;
2906
2907   GNUNET_assert (NULL != reply_cls);
2908   LOG (GNUNET_ERROR_TYPE_DEBUG,
2909        "sampler returned %" PRIu32 " peers:\n",
2910        num_peers);
2911   for (i = 0; i < num_peers; i++)
2912   {
2913     LOG (GNUNET_ERROR_TYPE_DEBUG,
2914          "  %" PRIu32 ": %s\n",
2915          i,
2916          GNUNET_i2s (&peer_ids[i]));
2917   }
2918
2919   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2920                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2921
2922   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2923
2924   ev = GNUNET_MQ_msg_extra (out_msg,
2925                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2926                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2927   out_msg->num_peers = htonl (num_peers);
2928   out_msg->id = htonl (reply_cls->id);
2929
2930   GNUNET_memcpy (&out_msg[1],
2931           peer_ids,
2932           num_peers * sizeof (struct GNUNET_PeerIdentity));
2933
2934   cli_ctx = reply_cls->cli_ctx;
2935   GNUNET_assert (NULL != cli_ctx);
2936   reply_cls->req_handle = NULL;
2937   destroy_reply_cls (reply_cls);
2938   GNUNET_MQ_send (cli_ctx->mq, ev);
2939 }
2940
2941
2942 /**
2943  * Handle RPS request from the client.
2944  *
2945  * @param cls closure
2946  * @param message the actual message
2947  */
2948 static void
2949 handle_client_request (void *cls,
2950                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2951 {
2952   struct ClientContext *cli_ctx = cls;
2953   uint32_t num_peers;
2954   uint32_t size_needed;
2955   struct ReplyCls *reply_cls;
2956   uint32_t i;
2957
2958   num_peers = ntohl (msg->num_peers);
2959   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2960                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2961
2962   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2963   {
2964     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2965                 "Message received from client has size larger than expected\n");
2966     GNUNET_SERVICE_client_drop (cli_ctx->client);
2967     return;
2968   }
2969
2970   for (i = 0 ; i < num_peers ; i++)
2971     est_request_rate();
2972
2973   LOG (GNUNET_ERROR_TYPE_DEBUG,
2974        "Client requested %" PRIu32 " random peer(s).\n",
2975        num_peers);
2976
2977   reply_cls = GNUNET_new (struct ReplyCls);
2978   reply_cls->id = ntohl (msg->id);
2979   reply_cls->cli_ctx = cli_ctx;
2980   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2981                                                         client_respond,
2982                                                         reply_cls,
2983                                                         num_peers);
2984
2985   GNUNET_assert (NULL != cli_ctx);
2986   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2987                                cli_ctx->rep_cls_tail,
2988                                reply_cls);
2989   GNUNET_SERVICE_client_continue (cli_ctx->client);
2990 }
2991
2992
2993 /**
2994  * @brief Handle a message that requests the cancellation of a request
2995  *
2996  * @param cls unused
2997  * @param message the message containing the id of the request
2998  */
2999 static void
3000 handle_client_request_cancel (void *cls,
3001                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
3002 {
3003   struct ClientContext *cli_ctx = cls;
3004   struct ReplyCls *rep_cls;
3005
3006   GNUNET_assert (NULL != cli_ctx);
3007   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
3008   rep_cls = cli_ctx->rep_cls_head;
3009   LOG (GNUNET_ERROR_TYPE_DEBUG,
3010       "Client cancels request with id %" PRIu32 "\n",
3011       ntohl (msg->id));
3012   while ( (NULL != rep_cls->next) &&
3013           (rep_cls->id != ntohl (msg->id)) )
3014     rep_cls = rep_cls->next;
3015   GNUNET_assert (rep_cls->id == ntohl (msg->id));
3016   destroy_reply_cls (rep_cls);
3017   GNUNET_SERVICE_client_continue (cli_ctx->client);
3018 }
3019
3020
3021 /**
3022  * @brief This function is called, when the client seeds peers.
3023  * It verifies that @a msg is well-formed.
3024  *
3025  * @param cls the closure (#ClientContext)
3026  * @param msg the message
3027  * @return #GNUNET_OK if @a msg is well-formed
3028  */
3029 static int
3030 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3031 {
3032   struct ClientContext *cli_ctx = cls;
3033   uint16_t msize = ntohs (msg->header.size);
3034   uint32_t num_peers = ntohl (msg->num_peers);
3035
3036   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3037   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3038        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3039   {
3040     GNUNET_break (0);
3041     GNUNET_SERVICE_client_drop (cli_ctx->client);
3042     return GNUNET_SYSERR;
3043   }
3044   return GNUNET_OK;
3045 }
3046
3047
3048 /**
3049  * Handle seed from the client.
3050  *
3051  * @param cls closure
3052  * @param message the actual message
3053  */
3054 static void
3055 handle_client_seed (void *cls,
3056                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3057 {
3058   struct ClientContext *cli_ctx = cls;
3059   struct GNUNET_PeerIdentity *peers;
3060   uint32_t num_peers;
3061   uint32_t i;
3062
3063   num_peers = ntohl (msg->num_peers);
3064   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3065   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3066   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
3067
3068   LOG (GNUNET_ERROR_TYPE_DEBUG,
3069        "Client seeded peers:\n");
3070   print_peer_list (peers, num_peers);
3071
3072   for (i = 0; i < num_peers; i++)
3073   {
3074     LOG (GNUNET_ERROR_TYPE_DEBUG,
3075          "Updating samplers with seed %" PRIu32 ": %s\n",
3076          i,
3077          GNUNET_i2s (&peers[i]));
3078
3079     got_peer (&peers[i]);
3080   }
3081
3082   ////GNUNET_free (peers);
3083
3084   GNUNET_SERVICE_client_continue (cli_ctx->client);
3085 }
3086
3087 /**
3088  * @brief Send view to client
3089  *
3090  * @param cli_ctx the context of the client
3091  * @param view_array the peerids of the view as array (can be empty)
3092  * @param view_size the size of the view array (can be 0)
3093  */
3094 void
3095 send_view (const struct ClientContext *cli_ctx,
3096            const struct GNUNET_PeerIdentity *view_array,
3097            uint64_t view_size)
3098 {
3099   struct GNUNET_MQ_Envelope *ev;
3100   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3101
3102   if (NULL == view_array)
3103   {
3104     view_size = View_size ();
3105     view_array = View_get_as_array();
3106   }
3107
3108   ev = GNUNET_MQ_msg_extra (out_msg,
3109                             view_size * sizeof (struct GNUNET_PeerIdentity),
3110                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3111   out_msg->num_peers = htonl (view_size);
3112
3113   GNUNET_memcpy (&out_msg[1],
3114           view_array,
3115           view_size * sizeof (struct GNUNET_PeerIdentity));
3116   GNUNET_MQ_send (cli_ctx->mq, ev);
3117 }
3118
3119 /**
3120  * @brief sends updates to clients that are interested
3121  */
3122 static void
3123 clients_notify_view_update (void)
3124 {
3125   struct ClientContext *cli_ctx_iter;
3126   uint64_t num_peers;
3127   const struct GNUNET_PeerIdentity *view_array;
3128
3129   num_peers = View_size ();
3130   view_array = View_get_as_array();
3131   /* check size of view is small enough */
3132   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3133   {
3134     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3135                 "View is too big to send\n");
3136     return;
3137   }
3138
3139   for (cli_ctx_iter = cli_ctx_head;
3140        NULL != cli_ctx_iter;
3141        cli_ctx_iter = cli_ctx_head->next)
3142   {
3143     if (1 < cli_ctx_iter->view_updates_left)
3144     {
3145       /* Client wants to receive limited amount of updates */
3146       cli_ctx_iter->view_updates_left -= 1;
3147     } else if (1 == cli_ctx_iter->view_updates_left)
3148     {
3149       /* Last update of view for client */
3150       cli_ctx_iter->view_updates_left = -1;
3151     } else if (0 > cli_ctx_iter->view_updates_left) {
3152       /* Client is not interested in updates */
3153       continue;
3154     }
3155     /* else _updates_left == 0 - infinite amount of updates */
3156
3157     /* send view */
3158     send_view (cli_ctx_iter, view_array, num_peers);
3159   }
3160 }
3161
3162
3163 /**
3164  * Handle RPS request from the client.
3165  *
3166  * @param cls closure
3167  * @param message the actual message
3168  */
3169 static void
3170 handle_client_view_request (void *cls,
3171                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3172 {
3173   struct ClientContext *cli_ctx = cls;
3174   uint64_t num_updates;
3175
3176   num_updates = ntohl (msg->num_updates);
3177
3178   LOG (GNUNET_ERROR_TYPE_DEBUG,
3179        "Client requested %" PRIu64 " updates of view.\n",
3180        num_updates);
3181
3182   GNUNET_assert (NULL != cli_ctx);
3183   cli_ctx->view_updates_left = num_updates;
3184   send_view (cli_ctx, NULL, 0);
3185   GNUNET_SERVICE_client_continue (cli_ctx->client);
3186 }
3187
3188 /**
3189  * Handle a CHECK_LIVE message from another peer.
3190  *
3191  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3192  * the channel is blocked for all other communication.
3193  *
3194  * @param cls Closure
3195  * @param msg The message header
3196  */
3197 static void
3198 handle_peer_check (void *cls,
3199                    const struct GNUNET_MessageHeader *msg)
3200 {
3201   const struct ChannelCtx *channel_ctx = cls;
3202   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3203   LOG (GNUNET_ERROR_TYPE_DEBUG,
3204       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3205
3206   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3207 }
3208
3209 /**
3210  * Handle a PUSH message from another peer.
3211  *
3212  * Check the proof of work and store the PeerID
3213  * in the temporary list for pushed PeerIDs.
3214  *
3215  * @param cls Closure
3216  * @param msg The message header
3217  */
3218 static void
3219 handle_peer_push (void *cls,
3220                   const struct GNUNET_MessageHeader *msg)
3221 {
3222   const struct ChannelCtx *channel_ctx = cls;
3223   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3224
3225   // (check the proof of work (?))
3226
3227   LOG (GNUNET_ERROR_TYPE_DEBUG,
3228        "Received PUSH (%s)\n",
3229        GNUNET_i2s (peer));
3230   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3231
3232   #ifdef ENABLE_MALICIOUS
3233   struct AttackedPeer *tmp_att_peer;
3234
3235   if ( (1 == mal_type) ||
3236        (3 == mal_type) )
3237   { /* Try to maximise representation */
3238     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3239     tmp_att_peer->peer_id = *peer;
3240     if (NULL == att_peer_set)
3241       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3242     if (GNUNET_NO ==
3243         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3244                                                 peer))
3245     {
3246       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3247                                    att_peers_tail,
3248                                    tmp_att_peer);
3249       add_peer_array_to_set (peer, 1, att_peer_set);
3250     }
3251     else
3252     {
3253       GNUNET_free (tmp_att_peer);
3254     }
3255   }
3256
3257
3258   else if (2 == mal_type)
3259   {
3260     /* We attack one single well-known peer - simply ignore */
3261   }
3262   #endif /* ENABLE_MALICIOUS */
3263
3264   /* Add the sending peer to the push_map */
3265   CustomPeerMap_put (push_map, peer);
3266
3267   GNUNET_break_op (Peers_check_peer_known (peer));
3268   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3269 }
3270
3271
3272 /**
3273  * Handle PULL REQUEST request message from another peer.
3274  *
3275  * Reply with the view of PeerIDs.
3276  *
3277  * @param cls Closure
3278  * @param msg The message header
3279  */
3280 static void
3281 handle_peer_pull_request (void *cls,
3282                           const struct GNUNET_MessageHeader *msg)
3283 {
3284   const struct ChannelCtx *channel_ctx = cls;
3285   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3286   const struct GNUNET_PeerIdentity *view_array;
3287
3288   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3289   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3290
3291   #ifdef ENABLE_MALICIOUS
3292   if (1 == mal_type
3293       || 3 == mal_type)
3294   { /* Try to maximise representation */
3295     send_pull_reply (peer, mal_peers, num_mal_peers);
3296   }
3297
3298   else if (2 == mal_type)
3299   { /* Try to partition network */
3300     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3301     {
3302       send_pull_reply (peer, mal_peers, num_mal_peers);
3303     }
3304   }
3305   #endif /* ENABLE_MALICIOUS */
3306
3307   GNUNET_break_op (Peers_check_peer_known (peer));
3308   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3309   view_array = View_get_as_array ();
3310   send_pull_reply (peer, view_array, View_size ());
3311 }
3312
3313
3314 /**
3315  * Check whether we sent a corresponding request and
3316  * whether this reply is the first one.
3317  *
3318  * @param cls Closure
3319  * @param msg The message header
3320  */
3321 static int
3322 check_peer_pull_reply (void *cls,
3323                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3324 {
3325   struct GNUNET_PeerIdentity *sender = cls;
3326
3327   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3328   {
3329     GNUNET_break_op (0);
3330     return GNUNET_SYSERR;
3331   }
3332
3333   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3334       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3335   {
3336     LOG (GNUNET_ERROR_TYPE_ERROR,
3337         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3338         ntohl (msg->num_peers),
3339         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3340             sizeof (struct GNUNET_PeerIdentity));
3341     GNUNET_break_op (0);
3342     return GNUNET_SYSERR;
3343   }
3344
3345   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3346   {
3347     LOG (GNUNET_ERROR_TYPE_WARNING,
3348         "Received a pull reply from a peer we didn't request one from!\n");
3349     GNUNET_break_op (0);
3350     return GNUNET_SYSERR;
3351   }
3352   return GNUNET_OK;
3353 }
3354
3355 /**
3356  * Handle PULL REPLY message from another peer.
3357  *
3358  * @param cls Closure
3359  * @param msg The message header
3360  */
3361 static void
3362 handle_peer_pull_reply (void *cls,
3363                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3364 {
3365   const struct ChannelCtx *channel_ctx = cls;
3366   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3367   const struct GNUNET_PeerIdentity *peers;
3368   uint32_t i;
3369 #ifdef ENABLE_MALICIOUS
3370   struct AttackedPeer *tmp_att_peer;
3371 #endif /* ENABLE_MALICIOUS */
3372
3373   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3374   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3375
3376   #ifdef ENABLE_MALICIOUS
3377   // We shouldn't even receive pull replies as we're not sending
3378   if (2 == mal_type)
3379   {
3380   }
3381   #endif /* ENABLE_MALICIOUS */
3382
3383   /* Do actual logic */
3384   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3385
3386   LOG (GNUNET_ERROR_TYPE_DEBUG,
3387        "PULL REPLY received, got following %u peers:\n",
3388        ntohl (msg->num_peers));
3389
3390   for (i = 0; i < ntohl (msg->num_peers); i++)
3391   {
3392     LOG (GNUNET_ERROR_TYPE_DEBUG,
3393          "%u. %s\n",
3394          i,
3395          GNUNET_i2s (&peers[i]));
3396
3397     #ifdef ENABLE_MALICIOUS
3398     if ((NULL != att_peer_set) &&
3399         (1 == mal_type || 3 == mal_type))
3400     { /* Add attacked peer to local list */
3401       // TODO check if we sent a request and this was the first reply
3402       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3403                                                                &peers[i])
3404           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3405                                                                   &peers[i]))
3406       {
3407         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3408         tmp_att_peer->peer_id = peers[i];
3409         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3410                                      att_peers_tail,
3411                                      tmp_att_peer);
3412         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3413       }
3414       continue;
3415     }
3416     #endif /* ENABLE_MALICIOUS */
3417     /* Make sure we 'know' about this peer */
3418     (void) Peers_insert_peer (&peers[i]);
3419
3420     if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3421     {
3422       CustomPeerMap_put (pull_map, &peers[i]);
3423     }
3424     else
3425     {
3426       Peers_schedule_operation (&peers[i], insert_in_pull_map);
3427       (void) Peers_issue_peer_liveliness_check (&peers[i]);
3428     }
3429   }
3430
3431   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3432   clean_peer (sender);
3433
3434   GNUNET_break_op (Peers_check_peer_known (sender));
3435   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3436 }
3437
3438
3439 /**
3440  * Compute a random delay.
3441  * A uniformly distributed value between mean + spread and mean - spread.
3442  *
3443  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3444  * It would return a random value between 2 and 6 min.
3445  *
3446  * @param mean the mean
3447  * @param spread the inverse amount of deviation from the mean
3448  */
3449 static struct GNUNET_TIME_Relative
3450 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3451                     unsigned int spread)
3452 {
3453   struct GNUNET_TIME_Relative half_interval;
3454   struct GNUNET_TIME_Relative ret;
3455   unsigned int rand_delay;
3456   unsigned int max_rand_delay;
3457
3458   if (0 == spread)
3459   {
3460     LOG (GNUNET_ERROR_TYPE_WARNING,
3461          "Not accepting spread of 0\n");
3462     GNUNET_break (0);
3463     GNUNET_assert (0);
3464   }
3465   GNUNET_assert (0 != mean.rel_value_us);
3466
3467   /* Compute random time value between spread * mean and spread * mean */
3468   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3469
3470   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3471   /**
3472    * Compute random value between (0 and 1) * round_interval
3473    * via multiplying round_interval with a 'fraction' (0 to value)/value
3474    */
3475   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3476   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3477   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3478   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3479
3480   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3481     LOG (GNUNET_ERROR_TYPE_WARNING,
3482          "Returning FOREVER_REL\n");
3483
3484   return ret;
3485 }
3486
3487
3488 /**
3489  * Send single pull request
3490  *
3491  * @param peer_id the peer to send the pull request to.
3492  */
3493 static void
3494 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3495 {
3496   struct GNUNET_MQ_Envelope *ev;
3497
3498   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3499                                                      Peers_PULL_REPLY_PENDING));
3500   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3501
3502   LOG (GNUNET_ERROR_TYPE_DEBUG,
3503        "Going to send PULL REQUEST to peer %s.\n",
3504        GNUNET_i2s (peer));
3505
3506   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3507   Peers_send_message (peer, ev, "PULL REQUEST");
3508   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3509 }
3510
3511
3512 /**
3513  * Send single push
3514  *
3515  * @param peer_id the peer to send the push to.
3516  */
3517 static void
3518 send_push (const struct GNUNET_PeerIdentity *peer_id)
3519 {
3520   struct GNUNET_MQ_Envelope *ev;
3521
3522   LOG (GNUNET_ERROR_TYPE_DEBUG,
3523        "Going to send PUSH to peer %s.\n",
3524        GNUNET_i2s (peer_id));
3525
3526   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3527   Peers_send_message (peer_id, ev, "PUSH");
3528   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3529 }
3530
3531
3532 static void
3533 do_round (void *cls);
3534
3535 static void
3536 do_mal_round (void *cls);
3537
3538 #ifdef ENABLE_MALICIOUS
3539
3540
3541 /**
3542  * @brief This function is called, when the client tells us to act malicious.
3543  * It verifies that @a msg is well-formed.
3544  *
3545  * @param cls the closure (#ClientContext)
3546  * @param msg the message
3547  * @return #GNUNET_OK if @a msg is well-formed
3548  */
3549 static int
3550 check_client_act_malicious (void *cls,
3551                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3552 {
3553   struct ClientContext *cli_ctx = cls;
3554   uint16_t msize = ntohs (msg->header.size);
3555   uint32_t num_peers = ntohl (msg->num_peers);
3556
3557   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3558   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3559        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3560   {
3561     LOG (GNUNET_ERROR_TYPE_ERROR,
3562         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3563         ntohl (msg->num_peers),
3564         (msize / sizeof (struct GNUNET_PeerIdentity)));
3565     GNUNET_break (0);
3566     GNUNET_SERVICE_client_drop (cli_ctx->client);
3567     return GNUNET_SYSERR;
3568   }
3569   return GNUNET_OK;
3570 }
3571
3572 /**
3573  * Turn RPS service to act malicious.
3574  *
3575  * @param cls Closure
3576  * @param client The client that sent the message
3577  * @param msg The message header
3578  */
3579 static void
3580 handle_client_act_malicious (void *cls,
3581                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3582 {
3583   struct ClientContext *cli_ctx = cls;
3584   struct GNUNET_PeerIdentity *peers;
3585   uint32_t num_mal_peers_sent;
3586   uint32_t num_mal_peers_old;
3587
3588   /* Do actual logic */
3589   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3590   mal_type = ntohl (msg->type);
3591   if (NULL == mal_peer_set)
3592     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3593
3594   LOG (GNUNET_ERROR_TYPE_DEBUG,
3595        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3596        mal_type,
3597        ntohl (msg->num_peers));
3598
3599   if (1 == mal_type)
3600   { /* Try to maximise representation */
3601     /* Add other malicious peers to those we already know */
3602
3603     num_mal_peers_sent = ntohl (msg->num_peers);
3604     num_mal_peers_old = num_mal_peers;
3605     GNUNET_array_grow (mal_peers,
3606                        num_mal_peers,
3607                        num_mal_peers + num_mal_peers_sent);
3608     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3609             peers,
3610             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3611
3612     /* Add all mal peers to mal_peer_set */
3613     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3614                            num_mal_peers_sent,
3615                            mal_peer_set);
3616
3617     /* Substitute do_round () with do_mal_round () */
3618     GNUNET_SCHEDULER_cancel (do_round_task);
3619     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3620   }
3621
3622   else if ( (2 == mal_type) ||
3623             (3 == mal_type) )
3624   { /* Try to partition the network */
3625     /* Add other malicious peers to those we already know */
3626
3627     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3628     num_mal_peers_old = num_mal_peers;
3629     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3630     GNUNET_array_grow (mal_peers,
3631                        num_mal_peers,
3632                        num_mal_peers + num_mal_peers_sent);
3633     if (NULL != mal_peers &&
3634         0 != num_mal_peers)
3635     {
3636       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3637               peers,
3638               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3639
3640       /* Add all mal peers to mal_peer_set */
3641       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3642                              num_mal_peers_sent,
3643                              mal_peer_set);
3644     }
3645
3646     /* Store the one attacked peer */
3647     GNUNET_memcpy (&attacked_peer,
3648             &msg->attacked_peer,
3649             sizeof (struct GNUNET_PeerIdentity));
3650     /* Set the flag of the attacked peer to valid to avoid problems */
3651     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3652     {
3653       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3654     }
3655
3656     LOG (GNUNET_ERROR_TYPE_DEBUG,
3657          "Attacked peer is %s\n",
3658          GNUNET_i2s (&attacked_peer));
3659
3660     /* Substitute do_round () with do_mal_round () */
3661     GNUNET_SCHEDULER_cancel (do_round_task);
3662     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3663   }
3664   else if (0 == mal_type)
3665   { /* Stop acting malicious */
3666     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3667
3668     /* Substitute do_mal_round () with do_round () */
3669     GNUNET_SCHEDULER_cancel (do_round_task);
3670     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3671   }
3672   else
3673   {
3674     GNUNET_break (0);
3675     GNUNET_SERVICE_client_continue (cli_ctx->client);
3676   }
3677   GNUNET_SERVICE_client_continue (cli_ctx->client);
3678 }
3679
3680
3681 /**
3682  * Send out PUSHes and PULLs maliciously.
3683  *
3684  * This is executed regylary.
3685  */
3686 static void
3687 do_mal_round (void *cls)
3688 {
3689   uint32_t num_pushes;
3690   uint32_t i;
3691   struct GNUNET_TIME_Relative time_next_round;
3692   struct AttackedPeer *tmp_att_peer;
3693
3694   LOG (GNUNET_ERROR_TYPE_DEBUG,
3695        "Going to execute next round maliciously type %" PRIu32 ".\n",
3696       mal_type);
3697   do_round_task = NULL;
3698   GNUNET_assert (mal_type <= 3);
3699   /* Do malicious actions */
3700   if (1 == mal_type)
3701   { /* Try to maximise representation */
3702
3703     /* The maximum of pushes we're going to send this round */
3704     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3705                                          num_attacked_peers),
3706                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3707
3708     LOG (GNUNET_ERROR_TYPE_DEBUG,
3709          "Going to send %" PRIu32 " pushes\n",
3710          num_pushes);
3711
3712     /* Send PUSHes to attacked peers */
3713     for (i = 0 ; i < num_pushes ; i++)
3714     {
3715       if (att_peers_tail == att_peer_index)
3716         att_peer_index = att_peers_head;
3717       else
3718         att_peer_index = att_peer_index->next;
3719
3720       send_push (&att_peer_index->peer_id);
3721     }
3722
3723     /* Send PULLs to some peers to learn about additional peers to attack */
3724     tmp_att_peer = att_peer_index;
3725     for (i = 0 ; i < num_pushes * alpha ; i++)
3726     {
3727       if (att_peers_tail == tmp_att_peer)
3728         tmp_att_peer = att_peers_head;
3729       else
3730         att_peer_index = tmp_att_peer->next;
3731
3732       send_pull_request (&tmp_att_peer->peer_id);
3733     }
3734   }
3735
3736
3737   else if (2 == mal_type)
3738   { /**
3739      * Try to partition the network
3740      * Send as many pushes to the attacked peer as possible
3741      * That is one push per round as it will ignore more.
3742      */
3743     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3744     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3745       send_push (&attacked_peer);
3746   }
3747
3748
3749   if (3 == mal_type)
3750   { /* Combined attack */
3751
3752     /* Send PUSH to attacked peers */
3753     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3754     {
3755       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3756       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3757       {
3758         LOG (GNUNET_ERROR_TYPE_DEBUG,
3759             "Goding to send push to attacked peer (%s)\n",
3760             GNUNET_i2s (&attacked_peer));
3761         send_push (&attacked_peer);
3762       }
3763     }
3764     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3765
3766     /* The maximum of pushes we're going to send this round */
3767     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3768                                          num_attacked_peers),
3769                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3770
3771     LOG (GNUNET_ERROR_TYPE_DEBUG,
3772          "Going to send %" PRIu32 " pushes\n",
3773          num_pushes);
3774
3775     for (i = 0; i < num_pushes; i++)
3776     {
3777       if (att_peers_tail == att_peer_index)
3778         att_peer_index = att_peers_head;
3779       else
3780         att_peer_index = att_peer_index->next;
3781
3782       send_push (&att_peer_index->peer_id);
3783     }
3784
3785     /* Send PULLs to some peers to learn about additional peers to attack */
3786     tmp_att_peer = att_peer_index;
3787     for (i = 0; i < num_pushes * alpha; i++)
3788     {
3789       if (att_peers_tail == tmp_att_peer)
3790         tmp_att_peer = att_peers_head;
3791       else
3792         att_peer_index = tmp_att_peer->next;
3793
3794       send_pull_request (&tmp_att_peer->peer_id);
3795     }
3796   }
3797
3798   /* Schedule next round */
3799   time_next_round = compute_rand_delay (round_interval, 2);
3800
3801   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3802   //NULL);
3803   GNUNET_assert (NULL == do_round_task);
3804   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3805                                                 &do_mal_round, NULL);
3806   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3807 }
3808 #endif /* ENABLE_MALICIOUS */
3809
3810 /**
3811  * Send out PUSHes and PULLs, possibly update #view, samplers.
3812  *
3813  * This is executed regylary.
3814  */
3815 static void
3816 do_round (void *cls)
3817 {
3818   uint32_t i;
3819   const struct GNUNET_PeerIdentity *view_array;
3820   unsigned int *permut;
3821   unsigned int a_peers; /* Number of peers we send pushes to */
3822   unsigned int b_peers; /* Number of peers we send pull requests to */
3823   uint32_t first_border;
3824   uint32_t second_border;
3825   struct GNUNET_PeerIdentity peer;
3826   struct GNUNET_PeerIdentity *update_peer;
3827
3828   LOG (GNUNET_ERROR_TYPE_DEBUG,
3829        "Going to execute next round.\n");
3830   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3831   do_round_task = NULL;
3832   LOG (GNUNET_ERROR_TYPE_DEBUG,
3833        "Printing view:\n");
3834   to_file (file_name_view_log,
3835            "___ new round ___");
3836   view_array = View_get_as_array ();
3837   for (i = 0; i < View_size (); i++)
3838   {
3839     LOG (GNUNET_ERROR_TYPE_DEBUG,
3840          "\t%s\n", GNUNET_i2s (&view_array[i]));
3841     to_file (file_name_view_log,
3842              "=%s\t(do round)",
3843              GNUNET_i2s_full (&view_array[i]));
3844   }
3845
3846
3847   /* Send pushes and pull requests */
3848   if (0 < View_size ())
3849   {
3850     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3851                                            View_size ());
3852
3853     /* Send PUSHes */
3854     a_peers = ceil (alpha * View_size ());
3855
3856     LOG (GNUNET_ERROR_TYPE_DEBUG,
3857          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3858          a_peers, alpha, View_size ());
3859     for (i = 0; i < a_peers; i++)
3860     {
3861       peer = view_array[permut[i]];
3862       // FIXME if this fails schedule/loop this for later
3863       send_push (&peer);
3864     }
3865
3866     /* Send PULL requests */
3867     b_peers = ceil (beta * View_size ());
3868     first_border = a_peers;
3869     second_border = a_peers + b_peers;
3870     if (second_border > View_size ())
3871     {
3872       first_border = View_size () - b_peers;
3873       second_border = View_size ();
3874     }
3875     LOG (GNUNET_ERROR_TYPE_DEBUG,
3876         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3877         b_peers, beta, View_size ());
3878     for (i = first_border; i < second_border; i++)
3879     {
3880       peer = view_array[permut[i]];
3881       if ( GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3882       { // FIXME if this fails schedule/loop this for later
3883         send_pull_request (&peer);
3884       }
3885     }
3886
3887     GNUNET_free (permut);
3888     permut = NULL;
3889   }
3890
3891
3892   /* Update view */
3893   /* TODO see how many peers are in push-/pull- list! */
3894
3895   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3896       (0 < CustomPeerMap_size (push_map)) &&
3897       (0 < CustomPeerMap_size (pull_map)))
3898   //if (GNUNET_YES) // disable blocking temporarily
3899   { /* If conditions for update are fulfilled, update */
3900     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3901
3902     uint32_t final_size;
3903     uint32_t peers_to_clean_size;
3904     struct GNUNET_PeerIdentity *peers_to_clean;
3905
3906     peers_to_clean = NULL;
3907     peers_to_clean_size = 0;
3908     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3909     GNUNET_memcpy (peers_to_clean,
3910             view_array,
3911             View_size () * sizeof (struct GNUNET_PeerIdentity));
3912
3913     /* Seems like recreating is the easiest way of emptying the peermap */
3914     View_clear ();
3915     to_file (file_name_view_log,
3916              "--- emptied ---");
3917
3918     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3919                                 CustomPeerMap_size (push_map));
3920     second_border = first_border +
3921                     GNUNET_MIN (floor (beta  * view_size_est_need),
3922                                 CustomPeerMap_size (pull_map));
3923     final_size    = second_border +
3924       ceil ((1 - (alpha + beta)) * view_size_est_need);
3925     LOG (GNUNET_ERROR_TYPE_DEBUG,
3926         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3927         first_border,
3928         second_border,
3929         final_size);
3930
3931     /* Update view with peers received through PUSHes */
3932     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3933                                            CustomPeerMap_size (push_map));
3934     for (i = 0; i < first_border; i++)
3935     {
3936       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3937                                                               permut[i]));
3938       to_file (file_name_view_log,
3939                "+%s\t(push list)",
3940                GNUNET_i2s_full (&view_array[i]));
3941       // TODO change the peer_flags accordingly
3942     }
3943     GNUNET_free (permut);
3944     permut = NULL;
3945
3946     /* Update view with peers received through PULLs */
3947     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3948                                            CustomPeerMap_size (pull_map));
3949     for (i = first_border; i < second_border; i++)
3950     {
3951       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3952             permut[i - first_border]));
3953       to_file (file_name_view_log,
3954                "+%s\t(pull list)",
3955                GNUNET_i2s_full (&view_array[i]));
3956       // TODO change the peer_flags accordingly
3957     }
3958     GNUNET_free (permut);
3959     permut = NULL;
3960
3961     /* Update view with peers from history */
3962     RPS_sampler_get_n_rand_peers (prot_sampler,
3963                                   hist_update,
3964                                   NULL,
3965                                   final_size - second_border);
3966     // TODO change the peer_flags accordingly
3967
3968     for (i = 0; i < View_size (); i++)
3969       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3970
3971     /* Clean peers that were removed from the view */
3972     for (i = 0; i < peers_to_clean_size; i++)
3973     {
3974       to_file (file_name_view_log,
3975                "-%s",
3976                GNUNET_i2s_full (&peers_to_clean[i]));
3977       clean_peer (&peers_to_clean[i]);
3978       //peer_destroy_channel_send (sender);
3979     }
3980
3981     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3982     clients_notify_view_update();
3983   } else {
3984     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3985     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3986     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3987         !(0 >= CustomPeerMap_size (pull_map)))
3988       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3989     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3990         (0 >= CustomPeerMap_size (pull_map)))
3991       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3992     if (0 >= CustomPeerMap_size (push_map) &&
3993         !(0 >= CustomPeerMap_size (pull_map)))
3994       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3995     if (0 >= CustomPeerMap_size (push_map) &&
3996         (0 >= CustomPeerMap_size (pull_map)))
3997       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3998     if (0 >= CustomPeerMap_size (pull_map) &&
3999         CustomPeerMap_size (push_map) > alpha * View_size () &&
4000         0 >= CustomPeerMap_size (push_map))
4001       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4002   }
4003   // TODO independent of that also get some peers from CADET_get_peers()?
4004   GNUNET_STATISTICS_set (stats,
4005       "# peers in push map at end of round",
4006       CustomPeerMap_size (push_map),
4007       GNUNET_NO);
4008   GNUNET_STATISTICS_set (stats,
4009       "# peers in pull map at end of round",
4010       CustomPeerMap_size (pull_map),
4011       GNUNET_NO);
4012   GNUNET_STATISTICS_set (stats,
4013       "# peers in view at end of round",
4014       View_size (),
4015       GNUNET_NO);
4016
4017   LOG (GNUNET_ERROR_TYPE_DEBUG,
4018        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
4019        CustomPeerMap_size (push_map),
4020        CustomPeerMap_size (pull_map),
4021        alpha,
4022        View_size (),
4023        alpha * View_size ());
4024
4025   /* Update samplers */
4026   for (i = 0; i < CustomPeerMap_size (push_map); i++)
4027   {
4028     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
4029     LOG (GNUNET_ERROR_TYPE_DEBUG,
4030          "Updating with peer %s from push list\n",
4031          GNUNET_i2s (update_peer));
4032     insert_in_sampler (NULL, update_peer);
4033     clean_peer (update_peer); /* This cleans only if it is not in the view */
4034     //peer_destroy_channel_send (sender);
4035   }
4036
4037   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
4038   {
4039     LOG (GNUNET_ERROR_TYPE_DEBUG,
4040          "Updating with peer %s from pull list\n",
4041          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
4042     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
4043     /* This cleans only if it is not in the view */
4044     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
4045     //peer_destroy_channel_send (sender);
4046   }
4047
4048
4049   /* Empty push/pull lists */
4050   CustomPeerMap_clear (push_map);
4051   CustomPeerMap_clear (pull_map);
4052
4053   struct GNUNET_TIME_Relative time_next_round;
4054
4055   time_next_round = compute_rand_delay (round_interval, 2);
4056
4057   /* Schedule next round */
4058   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4059                                                 &do_round, NULL);
4060   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4061 }
4062
4063
4064 /**
4065  * This is called from GNUNET_CADET_get_peers().
4066  *
4067  * It is called on every peer(ID) that cadet somehow has contact with.
4068  * We use those to initialise the sampler.
4069  */
4070 void
4071 init_peer_cb (void *cls,
4072               const struct GNUNET_PeerIdentity *peer,
4073               int tunnel, // "Do we have a tunnel towards this peer?"
4074               unsigned int n_paths, // "Number of known paths towards this peer"
4075               unsigned int best_path) // "How long is the best path?
4076                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4077 {
4078   if (NULL != peer)
4079   {
4080     LOG (GNUNET_ERROR_TYPE_DEBUG,
4081          "Got peer_id %s from cadet\n",
4082          GNUNET_i2s (peer));
4083     got_peer (peer);
4084   }
4085 }
4086
4087 /**
4088  * @brief Iterator function over stored, valid peers.
4089  *
4090  * We initialise the sampler with those.
4091  *
4092  * @param cls the closure
4093  * @param peer the peer id
4094  * @return #GNUNET_YES if we should continue to
4095  *         iterate,
4096  *         #GNUNET_NO if not.
4097  */
4098 static int
4099 valid_peers_iterator (void *cls,
4100                       const struct GNUNET_PeerIdentity *peer)
4101 {
4102   if (NULL != peer)
4103   {
4104     LOG (GNUNET_ERROR_TYPE_DEBUG,
4105          "Got stored, valid peer %s\n",
4106          GNUNET_i2s (peer));
4107     got_peer (peer);
4108   }
4109   return GNUNET_YES;
4110 }
4111
4112
4113 /**
4114  * Iterator over peers from peerinfo.
4115  *
4116  * @param cls closure
4117  * @param peer id of the peer, NULL for last call
4118  * @param hello hello message for the peer (can be NULL)
4119  * @param error message
4120  */
4121 void
4122 process_peerinfo_peers (void *cls,
4123                         const struct GNUNET_PeerIdentity *peer,
4124                         const struct GNUNET_HELLO_Message *hello,
4125                         const char *err_msg)
4126 {
4127   if (NULL != peer)
4128   {
4129     LOG (GNUNET_ERROR_TYPE_DEBUG,
4130          "Got peer_id %s from peerinfo\n",
4131          GNUNET_i2s (peer));
4132     got_peer (peer);
4133   }
4134 }
4135
4136
4137 /**
4138  * Task run during shutdown.
4139  *
4140  * @param cls unused
4141  */
4142 static void
4143 shutdown_task (void *cls)
4144 {
4145   struct ClientContext *client_ctx;
4146   struct ReplyCls *reply_cls;
4147
4148   LOG (GNUNET_ERROR_TYPE_DEBUG,
4149        "RPS is going down\n");
4150
4151   /* Clean all clients */
4152   for (client_ctx = cli_ctx_head;
4153        NULL != cli_ctx_head;
4154        client_ctx = cli_ctx_head)
4155   {
4156     /* Clean pending requests to the sampler */
4157     for (reply_cls = client_ctx->rep_cls_head;
4158          NULL != client_ctx->rep_cls_head;
4159          reply_cls = client_ctx->rep_cls_head)
4160     {
4161       RPS_sampler_request_cancel (reply_cls->req_handle);
4162       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4163                                    client_ctx->rep_cls_tail,
4164                                    reply_cls);
4165       GNUNET_free (reply_cls);
4166     }
4167     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4168     GNUNET_free (client_ctx);
4169   }
4170   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4171   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4172
4173   if (NULL != do_round_task)
4174   {
4175     GNUNET_SCHEDULER_cancel (do_round_task);
4176     do_round_task = NULL;
4177   }
4178
4179   Peers_terminate ();
4180
4181   GNUNET_NSE_disconnect (nse);
4182   RPS_sampler_destroy (prot_sampler);
4183   RPS_sampler_destroy (client_sampler);
4184   GNUNET_CADET_close_port (cadet_port);
4185   GNUNET_CADET_disconnect (cadet_handle);
4186   View_destroy ();
4187   CustomPeerMap_destroy (push_map);
4188   CustomPeerMap_destroy (pull_map);
4189   if (NULL != stats)
4190   {
4191     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4192     stats = NULL;
4193   }
4194   #ifdef ENABLE_MALICIOUS
4195   struct AttackedPeer *tmp_att_peer;
4196   /* it is ok to free this const during shutdown: */
4197   GNUNET_free ((char *) file_name_view_log);
4198   #ifdef TO_FILE
4199   GNUNET_free ((char *) file_name_observed_log);
4200   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
4201   #endif /* TO_FILE */
4202   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4203   if (NULL != mal_peer_set)
4204     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4205   if (NULL != att_peer_set)
4206     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4207   while (NULL != att_peers_head)
4208   {
4209     tmp_att_peer = att_peers_head;
4210     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4211     GNUNET_free (tmp_att_peer);
4212   }
4213   #endif /* ENABLE_MALICIOUS */
4214 }
4215
4216
4217 /**
4218  * Handle client connecting to the service.
4219  *
4220  * @param cls NULL
4221  * @param client the new client
4222  * @param mq the message queue of @a client
4223  * @return @a client
4224  */
4225 static void *
4226 client_connect_cb (void *cls,
4227                    struct GNUNET_SERVICE_Client *client,
4228                    struct GNUNET_MQ_Handle *mq)
4229 {
4230   struct ClientContext *cli_ctx;
4231
4232   LOG (GNUNET_ERROR_TYPE_DEBUG,
4233        "Client connected\n");
4234   if (NULL == client)
4235     return client; /* Server was destroyed before a client connected. Shutting down */
4236   cli_ctx = GNUNET_new (struct ClientContext);
4237   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4238   cli_ctx->view_updates_left = -1;
4239   cli_ctx->client = client;
4240   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4241                                cli_ctx_tail,
4242                                cli_ctx);
4243   return cli_ctx;
4244 }
4245
4246 /**
4247  * Callback called when a client disconnected from the service
4248  *
4249  * @param cls closure for the service
4250  * @param c the client that disconnected
4251  * @param internal_cls should be equal to @a c
4252  */
4253 static void
4254 client_disconnect_cb (void *cls,
4255                       struct GNUNET_SERVICE_Client *client,
4256                       void *internal_cls)
4257 {
4258   struct ClientContext *cli_ctx = internal_cls;
4259
4260   GNUNET_assert (client == cli_ctx->client);
4261   if (NULL == client)
4262   {/* shutdown task - destroy all clients */
4263     while (NULL != cli_ctx_head)
4264       destroy_cli_ctx (cli_ctx_head);
4265   }
4266   else
4267   { /* destroy this client */
4268     LOG (GNUNET_ERROR_TYPE_DEBUG,
4269         "Client disconnected. Destroy its context.\n");
4270     destroy_cli_ctx (cli_ctx);
4271   }
4272 }
4273
4274
4275 /**
4276  * Handle random peer sampling clients.
4277  *
4278  * @param cls closure
4279  * @param c configuration to use
4280  * @param service the initialized service
4281  */
4282 static void
4283 run (void *cls,
4284      const struct GNUNET_CONFIGURATION_Handle *c,
4285      struct GNUNET_SERVICE_Handle *service)
4286 {
4287   char* fn_valid_peers;
4288
4289   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4290   cfg = c;
4291
4292
4293   /* Get own ID */
4294   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4295   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4296               "STARTING SERVICE (rps) for peer [%s]\n",
4297               GNUNET_i2s (&own_identity));
4298   #ifdef ENABLE_MALICIOUS
4299   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4300               "Malicious execution compiled in.\n");
4301   #endif /* ENABLE_MALICIOUS */
4302
4303
4304
4305   /* Get time interval from the configuration */
4306   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4307                                                         "ROUNDINTERVAL",
4308                                                         &round_interval))
4309   {
4310     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4311                                "RPS", "ROUNDINTERVAL");
4312     GNUNET_SCHEDULER_shutdown ();
4313     return;
4314   }
4315
4316   /* Get initial size of sampler/view from the configuration */
4317   if (GNUNET_OK !=
4318       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4319         (long long unsigned int *) &sampler_size_est_min))
4320   {
4321     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4322                                "RPS", "MINSIZE");
4323     GNUNET_SCHEDULER_shutdown ();
4324     return;
4325   }
4326   sampler_size_est_need = sampler_size_est_min;
4327   view_size_est_min = sampler_size_est_min;
4328   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4329
4330   if (GNUNET_OK !=
4331       GNUNET_CONFIGURATION_get_value_filename (cfg,
4332                                                "rps",
4333                                                "FILENAME_VALID_PEERS",
4334                                                &fn_valid_peers))
4335   {
4336     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4337                                "rps", "FILENAME_VALID_PEERS");
4338   }
4339
4340
4341   View_create (view_size_est_min);
4342
4343   /* file_name_view_log */
4344   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4345   #ifdef TO_FILE
4346   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4347   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4348   #endif /* TO_FILE */
4349
4350   /* connect to NSE */
4351   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4352
4353
4354   alpha = 0.45;
4355   beta  = 0.45;
4356
4357
4358   /* Initialise cadet */
4359   /* There exists a copy-paste-clone in get_channel() */
4360   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4361     GNUNET_MQ_hd_fixed_size (peer_check,
4362                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4363                              struct GNUNET_MessageHeader,
4364                              NULL),
4365     GNUNET_MQ_hd_fixed_size (peer_push,
4366                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4367                              struct GNUNET_MessageHeader,
4368                              NULL),
4369     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4370                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4371                              struct GNUNET_MessageHeader,
4372                              NULL),
4373     GNUNET_MQ_hd_var_size (peer_pull_reply,
4374                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4375                            struct GNUNET_RPS_P2P_PullReplyMessage,
4376                            NULL),
4377     GNUNET_MQ_handler_end ()
4378   };
4379
4380   cadet_handle = GNUNET_CADET_connect (cfg);
4381   GNUNET_assert (NULL != cadet_handle);
4382   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4383                       strlen (GNUNET_APPLICATION_PORT_RPS),
4384                       &port);
4385   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4386                                        &port,
4387                                        &Peers_handle_inbound_channel, /* Connect handler */
4388                                        NULL, /* cls */
4389                                        NULL, /* WindowSize handler */
4390                                        cleanup_destroyed_channel, /* Disconnect handler */
4391                                        cadet_handlers);
4392
4393
4394   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4395   Peers_initialise (fn_valid_peers, cadet_handle);
4396   GNUNET_free (fn_valid_peers);
4397
4398   /* Initialise sampler */
4399   struct GNUNET_TIME_Relative half_round_interval;
4400   struct GNUNET_TIME_Relative  max_round_interval;
4401
4402   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4403   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4404
4405   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4406   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4407
4408   /* Initialise push and pull maps */
4409   push_map = CustomPeerMap_create (4);
4410   pull_map = CustomPeerMap_create (4);
4411
4412
4413   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4414   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4415   // TODO send push/pull to each of those peers?
4416   // TODO read stored valid peers from last run
4417   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4418   Peers_get_valid_peers (valid_peers_iterator, NULL);
4419
4420   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4421                                                    GNUNET_NO,
4422                                                    process_peerinfo_peers,
4423                                                    NULL);
4424
4425   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4426
4427   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4428   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4429
4430   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4431   stats = GNUNET_STATISTICS_create ("rps", cfg);
4432
4433 }
4434
4435
4436 /**
4437  * Define "main" method using service macro.
4438  */
4439 GNUNET_SERVICE_MAIN
4440 ("rps",
4441  GNUNET_SERVICE_OPTION_NONE,
4442  &run,
4443  &client_connect_cb,
4444  &client_disconnect_cb,
4445  NULL,
4446  GNUNET_MQ_hd_fixed_size (client_request,
4447    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4448    struct GNUNET_RPS_CS_RequestMessage,
4449    NULL),
4450  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4451    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4452    struct GNUNET_RPS_CS_RequestCancelMessage,
4453    NULL),
4454  GNUNET_MQ_hd_var_size (client_seed,
4455    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4456    struct GNUNET_RPS_CS_SeedMessage,
4457    NULL),
4458 #ifdef ENABLE_MALICIOUS
4459  GNUNET_MQ_hd_var_size (client_act_malicious,
4460    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4461    struct GNUNET_RPS_CS_ActMaliciousMessage,
4462    NULL),
4463 #endif /* ENABLE_MALICIOUS */
4464  GNUNET_MQ_hd_fixed_size (client_view_request,
4465    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4466    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4467    NULL),
4468  GNUNET_MQ_handler_end());
4469
4470 /* end of gnunet-service-rps.c */