Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO store peers somewhere persistent
51
52 // TODO blacklist? (-> mal peer detection on top of brahms)
53
54 // hist_size_init, hist_size_max
55
56 /**
57  * Our configuration.
58  */
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60
61 /**
62  * Handle to the statistics service.
63  */
64 static struct GNUNET_STATISTICS_Handle *stats;
65
66 /**
67  * Our own identity.
68  */
69 static struct GNUNET_PeerIdentity own_identity;
70
71
72 /**
73  * @brief Port used for cadet.
74  *
75  * Don't compute multiple times through making it global
76  */
77 static struct GNUNET_HashCode port;
78
79 /***********************************************************************
80  * Old gnunet-service-rps_peers.c
81 ***********************************************************************/
82
83 /**
84  * Set a peer flag of given peer context.
85  */
86 #define set_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
87
88 /**
89  * Get peer flag of given peer context.
90  */
91 #define check_peer_flag_set(peer_ctx, mask)\
92   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
93
94 /**
95  * Unset flag of given peer context.
96  */
97 #define unset_peer_flag(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
98
99 /**
100  * Set a channel flag of given channel context.
101  */
102 #define set_channel_flag(channel_flags, mask) ((*channel_flags) |= (mask))
103
104 /**
105  * Get channel flag of given channel context.
106  */
107 #define check_channel_flag_set(channel_flags, mask)\
108   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
109
110 /**
111  * Unset flag of given channel context.
112  */
113 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
114
115
116
117 /**
118  * Pending operation on peer consisting of callback and closure
119  *
120  * When an operation cannot be executed right now this struct is used to store
121  * the callback and closure for later execution.
122  */
123 struct PeerPendingOp
124 {
125   /**
126    * Callback
127    */
128   PeerOp op;
129
130   /**
131    * Closure
132    */
133   void *op_cls;
134 };
135
136 /**
137  * List containing all messages that are yet to be send
138  *
139  * This is used to keep track of all messages that have not been sent yet. When
140  * a peer is to be removed the pending messages can be removed properly.
141  */
142 struct PendingMessage
143 {
144   /**
145    * DLL next, prev
146    */
147   struct PendingMessage *next;
148   struct PendingMessage *prev;
149
150   /**
151    * The envelope to the corresponding message
152    */
153   struct GNUNET_MQ_Envelope *ev;
154
155   /**
156    * The corresponding context
157    */
158   struct PeerContext *peer_ctx;
159
160   /**
161    * The message type
162    */
163   const char *type;
164 };
165
166 /**
167  * Struct used to keep track of other peer's status
168  *
169  * This is stored in a multipeermap.
170  * It contains information such as cadet channels, a message queue for sending,
171  * status about the channels, the pending operations on this peer and some flags
172  * about the status of the peer itself. (live, valid, ...)
173  */
174 struct PeerContext
175 {
176   /**
177    * Message queue open to client
178    */
179   struct GNUNET_MQ_Handle *mq;
180
181   /**
182    * Channel open to client.
183    */
184   struct GNUNET_CADET_Channel *send_channel;
185
186   /**
187    * Flags to the sending channel
188    */
189   uint32_t *send_channel_flags;
190
191   /**
192    * Channel open from client.
193    */
194   struct GNUNET_CADET_Channel *recv_channel; // unneeded?
195
196   /**
197    * Flags to the receiving channel
198    */
199   uint32_t *recv_channel_flags;
200
201   /**
202    * Array of pending operations on this peer.
203    */
204   struct PeerPendingOp *pending_ops;
205
206   /**
207    * Handle to the callback given to cadet_ntfy_tmt_rdy()
208    *
209    * To be canceled on shutdown.
210    */
211   struct PendingMessage *liveliness_check_pending;
212
213   /**
214    * Number of pending operations.
215    */
216   unsigned int num_pending_ops;
217
218   /**
219    * Identity of the peer
220    */
221   struct GNUNET_PeerIdentity peer_id;
222
223   /**
224    * Flags indicating status of peer
225    */
226   uint32_t peer_flags;
227
228   /**
229    * Last time we received something from that peer.
230    */
231   struct GNUNET_TIME_Absolute last_message_recv;
232
233   /**
234    * Last time we received a keepalive message.
235    */
236   struct GNUNET_TIME_Absolute last_keepalive;
237
238   /**
239    * DLL with all messages that are yet to be sent
240    */
241   struct PendingMessage *pending_messages_head;
242   struct PendingMessage *pending_messages_tail;
243
244   /**
245    * This is pobably followed by 'statistical' data (when we first saw
246    * him, how did we get his ID, how many pushes (in a timeinterval),
247    * ...)
248    */
249 };
250
251 /**
252  * @brief Closure to #valid_peer_iterator
253  */
254 struct PeersIteratorCls
255 {
256   /**
257    * Iterator function
258    */
259   PeersIterator iterator;
260
261   /**
262    * Closure to iterator
263    */
264   void *cls;
265 };
266
267 /**
268  * @brief Hashmap of valid peers.
269  */
270 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
271
272 /**
273  * @brief Maximum number of valid peers to keep.
274  * TODO read from config
275  */
276 static uint32_t num_valid_peers_max = UINT32_MAX;
277
278 /**
279  * @brief Filename of the file that stores the valid peers persistently.
280  */
281 static char *filename_valid_peers;
282
283 /**
284  * Set of all peers to keep track of them.
285  */
286 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
287
288 /**
289  * Cadet handle.
290  */
291 static struct GNUNET_CADET_Handle *cadet_handle;
292
293
294
295 /**
296  * @brief Get the #PeerContext associated with a peer
297  *
298  * @param peer the peer id
299  *
300  * @return the #PeerContext
301  */
302 static struct PeerContext *
303 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
304 {
305   struct PeerContext *ctx;
306   int ret;
307
308   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
309   GNUNET_assert (GNUNET_YES == ret);
310   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
311   GNUNET_assert (NULL != ctx);
312   return ctx;
313 }
314
315 int
316 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer);
317
318 /**
319  * @brief Create a new #PeerContext and insert it into the peer map
320  *
321  * @param peer the peer to create the #PeerContext for
322  *
323  * @return the #PeerContext
324  */
325 static struct PeerContext *
326 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
327 {
328   struct PeerContext *ctx;
329   int ret;
330
331   GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer));
332
333   ctx = GNUNET_new (struct PeerContext);
334   ctx->peer_id = *peer;
335   ctx->send_channel_flags = GNUNET_new (uint32_t);
336   ctx->recv_channel_flags = GNUNET_new (uint32_t);
337   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
338       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
339   GNUNET_assert (GNUNET_OK == ret);
340   return ctx;
341 }
342
343
344 /**
345  * @brief Create or get a #PeerContext
346  *
347  * @param peer the peer to get the associated context to
348  *
349  * @return the context
350  */
351 static struct PeerContext *
352 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
353 {
354   if (GNUNET_NO == Peers_check_peer_known (peer))
355   {
356     return create_peer_ctx (peer);
357   }
358   return get_peer_ctx (peer);
359 }
360
361 void
362 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
363
364 void
365 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
366
367 /**
368  * @brief Check whether we have a connection to this @a peer
369  *
370  * Also sets the #Peers_ONLINE flag accordingly
371  *
372  * @param peer the peer in question
373  *
374  * @return #GNUNET_YES if we are connected
375  *         #GNUNET_NO  otherwise
376  */
377 int
378 Peers_check_connected (const struct GNUNET_PeerIdentity *peer)
379 {
380   const struct PeerContext *peer_ctx;
381
382   /* If we don't know about this peer we don't know whether it's online */
383   if (GNUNET_NO == Peers_check_peer_known (peer))
384   {
385     return GNUNET_NO;
386   }
387   /* Get the context */
388   peer_ctx = get_peer_ctx (peer);
389   /* If we have no channel to this peer we don't know whether it's online */
390   if ( (NULL == peer_ctx->send_channel) &&
391        (NULL == peer_ctx->recv_channel) )
392   {
393     Peers_unset_peer_flag (peer, Peers_ONLINE);
394     return GNUNET_NO;
395   }
396   /* Otherwise (if we have a channel, we know that it's online */
397   Peers_set_peer_flag (peer, Peers_ONLINE);
398   return GNUNET_YES;
399 }
400
401
402 /**
403  * @brief The closure to #get_rand_peer_iterator.
404  */
405 struct GetRandPeerIteratorCls
406 {
407   /**
408    * @brief The index of the peer to return.
409    * Will be decreased until 0.
410    * Then current peer is returned.
411    */
412   uint32_t index;
413
414   /**
415    * @brief Pointer to peer to return.
416    */
417   const struct GNUNET_PeerIdentity *peer;
418 };
419
420
421 /**
422  * @brief Iterator function for #get_random_peer_from_peermap.
423  *
424  * Implements #GNUNET_CONTAINER_PeerMapIterator.
425  * Decreases the index until the index is null.
426  * Then returns the current peer.
427  *
428  * @param cls the #GetRandPeerIteratorCls containing index and peer
429  * @param peer current peer
430  * @param value unused
431  *
432  * @return  #GNUNET_YES if we should continue to
433  *          iterate,
434  *          #GNUNET_NO if not.
435  */
436 static int
437 get_rand_peer_iterator (void *cls,
438                         const struct GNUNET_PeerIdentity *peer,
439                         void *value)
440 {
441   struct GetRandPeerIteratorCls *iterator_cls = cls;
442   if (0 >= iterator_cls->index)
443   {
444     iterator_cls->peer = peer;
445     return GNUNET_NO;
446   }
447   iterator_cls->index--;
448   return GNUNET_YES;
449 }
450
451
452 /**
453  * @brief Get a random peer from @a peer_map
454  *
455  * @param peer_map the peer_map to get the peer from
456  *
457  * @return a random peer
458  */
459 static const struct GNUNET_PeerIdentity *
460 get_random_peer_from_peermap (const struct
461                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
462 {
463   struct GetRandPeerIteratorCls *iterator_cls;
464   const struct GNUNET_PeerIdentity *ret;
465
466   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
467   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
468       GNUNET_CONTAINER_multipeermap_size (peer_map));
469   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
470                                                 get_rand_peer_iterator,
471                                                 iterator_cls);
472   ret = iterator_cls->peer;
473   GNUNET_free (iterator_cls);
474   return ret;
475 }
476
477
478 /**
479  * @brief Add a given @a peer to valid peers.
480  *
481  * If valid peers are already #num_valid_peers_max, delete a peer previously.
482  *
483  * @param peer the peer that is added to the valid peers.
484  *
485  * @return #GNUNET_YES if no other peer had to be removed
486  *         #GNUNET_NO  otherwise
487  */
488 static int
489 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
490 {
491   const struct GNUNET_PeerIdentity *rand_peer;
492   int ret;
493
494   ret = GNUNET_YES;
495   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
496   {
497     rand_peer = get_random_peer_from_peermap (valid_peers);
498     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
499     ret = GNUNET_NO;
500   }
501   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
502       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
503   return ret;
504 }
505
506
507 /**
508  * @brief Set the peer flag to living and
509  *        call the pending operations on this peer.
510  *
511  * Also adds peer to #valid_peers.
512  *
513  * @param peer_ctx the #PeerContext of the peer to set live
514  */
515 static void
516 set_peer_live (struct PeerContext *peer_ctx)
517 {
518   struct GNUNET_PeerIdentity *peer;
519   unsigned int i;
520
521   peer = &peer_ctx->peer_id;
522   LOG (GNUNET_ERROR_TYPE_DEBUG,
523       "Peer %s is live and valid, calling %i pending operations on it\n",
524       GNUNET_i2s (peer),
525       peer_ctx->num_pending_ops);
526
527   if (NULL != peer_ctx->liveliness_check_pending)
528   {
529     LOG (GNUNET_ERROR_TYPE_DEBUG,
530          "Removing pending liveliness check for peer %s\n",
531          GNUNET_i2s (&peer_ctx->peer_id));
532     // TODO wait until cadet sets mq->cancel_impl
533     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
534     GNUNET_free (peer_ctx->liveliness_check_pending);
535     peer_ctx->liveliness_check_pending = NULL;
536   }
537
538   (void) add_valid_peer (peer);
539   set_peer_flag (peer_ctx, Peers_ONLINE);
540
541   /* Call pending operations */
542   for (i = 0; i < peer_ctx->num_pending_ops; i++)
543   {
544     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
545   }
546   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
547 }
548
549 static void
550 cleanup_destroyed_channel (void *cls,
551                            const struct GNUNET_CADET_Channel *channel);
552
553 /* Declaration of handlers */
554 static void
555 handle_peer_check (void *cls,
556                    const struct GNUNET_MessageHeader *msg);
557
558 static void
559 handle_peer_push (void *cls,
560                   const struct GNUNET_MessageHeader *msg);
561
562 static void
563 handle_peer_pull_request (void *cls,
564                           const struct GNUNET_MessageHeader *msg);
565
566 static int
567 check_peer_pull_reply (void *cls,
568                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
569
570 static void
571 handle_peer_pull_reply (void *cls,
572                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
573
574 /* End declaration of handlers */
575
576
577 /**
578  * @brief Get the channel of a peer. If not existing, create.
579  *
580  * @param peer the peer id
581  * @return the #GNUNET_CADET_Channel used to send data to @a peer
582  */
583 struct GNUNET_CADET_Channel *
584 get_channel (const struct GNUNET_PeerIdentity *peer)
585 {
586   struct PeerContext *peer_ctx;
587   struct GNUNET_PeerIdentity *ctx_peer;
588   /* There exists a copy-paste-clone in run() */
589   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
590     GNUNET_MQ_hd_fixed_size (peer_check,
591                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
592                              struct GNUNET_MessageHeader,
593                              NULL),
594     GNUNET_MQ_hd_fixed_size (peer_push,
595                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
596                              struct GNUNET_MessageHeader,
597                              NULL),
598     GNUNET_MQ_hd_fixed_size (peer_pull_request,
599                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
600                              struct GNUNET_MessageHeader,
601                              NULL),
602     GNUNET_MQ_hd_var_size (peer_pull_reply,
603                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
604                            struct GNUNET_RPS_P2P_PullReplyMessage,
605                            NULL),
606     GNUNET_MQ_handler_end ()
607   };
608
609
610   peer_ctx = get_peer_ctx (peer);
611   if (NULL == peer_ctx->send_channel)
612   {
613     LOG (GNUNET_ERROR_TYPE_DEBUG,
614          "Trying to establish channel to peer %s\n",
615          GNUNET_i2s (peer));
616     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
617     *ctx_peer = *peer;
618     peer_ctx->send_channel =
619       GNUNET_CADET_channel_create (cadet_handle,
620                                    (struct GNUNET_PeerIdentity *) ctx_peer, /* context */
621                                    peer,
622                                    &port,
623                                    GNUNET_CADET_OPTION_RELIABLE,
624                                    NULL, /* WindowSize handler */
625                                    cleanup_destroyed_channel, /* Disconnect handler */
626                                    cadet_handlers);
627   }
628   GNUNET_assert (NULL != peer_ctx->send_channel);
629   return peer_ctx->send_channel;
630 }
631
632
633 /**
634  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
635  *
636  * If we already have a message queue open to this client,
637  * simply return it, otherways create one.
638  *
639  * @param peer the peer to get the mq to
640  * @return the #GNUNET_MQ_Handle
641  */
642 static struct GNUNET_MQ_Handle *
643 get_mq (const struct GNUNET_PeerIdentity *peer)
644 {
645   struct PeerContext *peer_ctx;
646
647   peer_ctx = get_peer_ctx (peer);
648
649   if (NULL == peer_ctx->mq)
650   {
651     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
652   }
653   return peer_ctx->mq;
654 }
655
656
657 /**
658  * @brief This is called in response to the first message we sent as a
659  * liveliness check.
660  *
661  * @param cls #PeerContext of peer with pending liveliness check
662  */
663 static void
664 mq_liveliness_check_successful (void *cls)
665 {
666   struct PeerContext *peer_ctx = cls;
667
668   if (NULL != peer_ctx->liveliness_check_pending)
669   {
670     LOG (GNUNET_ERROR_TYPE_DEBUG,
671         "Liveliness check for peer %s was successfull\n",
672         GNUNET_i2s (&peer_ctx->peer_id));
673     GNUNET_free (peer_ctx->liveliness_check_pending);
674     peer_ctx->liveliness_check_pending = NULL;
675     set_peer_live (peer_ctx);
676   }
677 }
678
679 /**
680  * Issue a check whether peer is live
681  *
682  * @param peer_ctx the context of the peer
683  */
684 static void
685 check_peer_live (struct PeerContext *peer_ctx)
686 {
687   LOG (GNUNET_ERROR_TYPE_DEBUG,
688        "Get informed about peer %s getting live\n",
689        GNUNET_i2s (&peer_ctx->peer_id));
690
691   struct GNUNET_MQ_Handle *mq;
692   struct GNUNET_MQ_Envelope *ev;
693
694   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
695   peer_ctx->liveliness_check_pending = GNUNET_new (struct PendingMessage);
696   peer_ctx->liveliness_check_pending->ev = ev;
697   peer_ctx->liveliness_check_pending->peer_ctx = peer_ctx;
698   peer_ctx->liveliness_check_pending->type = "Check liveliness";
699   mq = get_mq (&peer_ctx->peer_id);
700   GNUNET_MQ_notify_sent (ev,
701                          mq_liveliness_check_successful,
702                          peer_ctx);
703   GNUNET_MQ_send (mq, ev);
704 }
705
706 /**
707  * @brief Add an envelope to a message passed to mq to list of pending messages
708  *
709  * @param peer peer the message was sent to
710  * @param ev envelope to the message
711  * @param type type of the message to be sent
712  * @return pointer to pending message
713  */
714 static struct PendingMessage *
715 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
716                         struct GNUNET_MQ_Envelope *ev,
717                         const char *type)
718 {
719   struct PendingMessage *pending_msg;
720   struct PeerContext *peer_ctx;
721
722   peer_ctx = get_peer_ctx (peer);
723   pending_msg = GNUNET_new (struct PendingMessage);
724   pending_msg->ev = ev;
725   pending_msg->peer_ctx = peer_ctx;
726   pending_msg->type = type;
727   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
728                                peer_ctx->pending_messages_tail,
729                                pending_msg);
730   return pending_msg;
731 }
732
733
734 /**
735  * @brief Remove a pending message from the respective DLL
736  *
737  * @param pending_msg the pending message to remove
738  * @param cancel cancel the pending message, too
739  */
740 static void
741 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
742 {
743   struct PeerContext *peer_ctx;
744
745   peer_ctx = pending_msg->peer_ctx;
746   GNUNET_assert (NULL != peer_ctx);
747   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
748                                peer_ctx->pending_messages_tail,
749                                pending_msg);
750   // TODO wait for the cadet implementation of message cancellation
751   //if (GNUNET_YES == cancel)
752   //{
753   //  GNUNET_MQ_send_cancel (pending_msg->ev);
754   //}
755   GNUNET_free (pending_msg);
756 }
757
758
759 /**
760  * @brief Check whether function of type #PeerOp was already scheduled
761  *
762  * The array with pending operations will probably never grow really big, so
763  * iterating over it should be ok.
764  *
765  * @param peer the peer to check
766  * @param peer_op the operation (#PeerOp) on the peer
767  *
768  * @return #GNUNET_YES if this operation is scheduled on that peer
769  *         #GNUNET_NO  otherwise
770  */
771 static int
772 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
773                            const PeerOp peer_op)
774 {
775   const struct PeerContext *peer_ctx;
776   unsigned int i;
777
778   peer_ctx = get_peer_ctx (peer);
779   for (i = 0; i < peer_ctx->num_pending_ops; i++)
780     if (peer_op == peer_ctx->pending_ops[i].op)
781       return GNUNET_YES;
782   return GNUNET_NO;
783 }
784
785 int
786 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer);
787
788 /**
789  * Iterator over hash map entries. Deletes all contexts of peers.
790  *
791  * @param cls closure
792  * @param key current public key
793  * @param value value in the hash map
794  * @return #GNUNET_YES if we should continue to iterate,
795  *         #GNUNET_NO if not.
796  */
797 static int
798 peermap_clear_iterator (void *cls,
799                         const struct GNUNET_PeerIdentity *key,
800                         void *value)
801 {
802   Peers_remove_peer (key);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * @brief This is called once a message is sent.
809  *
810  * Removes the pending message
811  *
812  * @param cls type of the message that was sent
813  */
814 static void
815 mq_notify_sent_cb (void *cls)
816 {
817   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819       "%s was sent.\n",
820       pending_msg->type);
821   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
822     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
823   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
824     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
825   if (0 == strncmp ("PUSH", pending_msg->type, 4))
826     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
827   /* Do not cancle message */
828   remove_pending_message (pending_msg, GNUNET_NO);
829 }
830
831
832 /**
833  * @brief Iterator function for #store_valid_peers.
834  *
835  * Implements #GNUNET_CONTAINER_PeerMapIterator.
836  * Writes single peer to disk.
837  *
838  * @param cls the file handle to write to.
839  * @param peer current peer
840  * @param value unused
841  *
842  * @return  #GNUNET_YES if we should continue to
843  *          iterate,
844  *          #GNUNET_NO if not.
845  */
846 static int
847 store_peer_presistently_iterator (void *cls,
848                                   const struct GNUNET_PeerIdentity *peer,
849                                   void *value)
850 {
851   const struct GNUNET_DISK_FileHandle *fh = cls;
852   char peer_string[128];
853   int size;
854   ssize_t ret;
855
856   if (NULL == peer)
857   {
858     return GNUNET_YES;
859   }
860   size = GNUNET_snprintf (peer_string,
861                           sizeof (peer_string),
862                           "%s\n",
863                           GNUNET_i2s_full (peer));
864   GNUNET_assert (53 == size);
865   ret = GNUNET_DISK_file_write (fh,
866                                 peer_string,
867                                 size);
868   GNUNET_assert (size == ret);
869   return GNUNET_YES;
870 }
871
872
873 /**
874  * @brief Store the peers currently in #valid_peers to disk.
875  */
876 static void
877 store_valid_peers ()
878 {
879   struct GNUNET_DISK_FileHandle *fh;
880   uint32_t number_written_peers;
881   int ret;
882
883   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
884   {
885     return;
886   }
887
888   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
889   if (GNUNET_SYSERR == ret)
890   {
891     LOG (GNUNET_ERROR_TYPE_WARNING,
892         "Not able to create directory for file `%s'\n",
893         filename_valid_peers);
894     GNUNET_break (0);
895   }
896   else if (GNUNET_NO == ret)
897   {
898     LOG (GNUNET_ERROR_TYPE_WARNING,
899         "Directory for file `%s' exists but is not writable for us\n",
900         filename_valid_peers);
901     GNUNET_break (0);
902   }
903   fh = GNUNET_DISK_file_open (filename_valid_peers,
904                               GNUNET_DISK_OPEN_WRITE |
905                                   GNUNET_DISK_OPEN_CREATE,
906                               GNUNET_DISK_PERM_USER_READ |
907                                   GNUNET_DISK_PERM_USER_WRITE);
908   if (NULL == fh)
909   {
910     LOG (GNUNET_ERROR_TYPE_WARNING,
911         "Not able to write valid peers to file `%s'\n",
912         filename_valid_peers);
913     return;
914   }
915   LOG (GNUNET_ERROR_TYPE_DEBUG,
916       "Writing %u valid peers to disk\n",
917       GNUNET_CONTAINER_multipeermap_size (valid_peers));
918   number_written_peers =
919     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
920                                            store_peer_presistently_iterator,
921                                            fh);
922   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
923   GNUNET_assert (number_written_peers ==
924       GNUNET_CONTAINER_multipeermap_size (valid_peers));
925 }
926
927
928 /**
929  * @brief Convert string representation of peer id to peer id.
930  *
931  * Counterpart to #GNUNET_i2s_full.
932  *
933  * @param string_repr The string representation of the peer id
934  *
935  * @return The peer id
936  */
937 static const struct GNUNET_PeerIdentity *
938 s2i_full (const char *string_repr)
939 {
940   struct GNUNET_PeerIdentity *peer;
941   size_t len;
942   int ret;
943
944   peer = GNUNET_new (struct GNUNET_PeerIdentity);
945   len = strlen (string_repr);
946   if (52 > len)
947   {
948     LOG (GNUNET_ERROR_TYPE_WARNING,
949         "Not able to convert string representation of PeerID to PeerID\n"
950         "Sting representation: %s (len %lu) - too short\n",
951         string_repr,
952         len);
953     GNUNET_break (0);
954   }
955   else if (52 < len)
956   {
957     len = 52;
958   }
959   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
960                                                     len,
961                                                     &peer->public_key);
962   if (GNUNET_OK != ret)
963   {
964     LOG (GNUNET_ERROR_TYPE_WARNING,
965         "Not able to convert string representation of PeerID to PeerID\n"
966         "Sting representation: %s\n",
967         string_repr);
968     GNUNET_break (0);
969   }
970   return peer;
971 }
972
973
974 /**
975  * @brief Restore the peers on disk to #valid_peers.
976  */
977 static void
978 restore_valid_peers ()
979 {
980   off_t file_size;
981   uint32_t num_peers;
982   struct GNUNET_DISK_FileHandle *fh;
983   char *buf;
984   ssize_t size_read;
985   char *iter_buf;
986   char *str_repr;
987   const struct GNUNET_PeerIdentity *peer;
988
989   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
990   {
991     return;
992   }
993
994   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
995   {
996     return;
997   }
998   fh = GNUNET_DISK_file_open (filename_valid_peers,
999                               GNUNET_DISK_OPEN_READ,
1000                               GNUNET_DISK_PERM_NONE);
1001   GNUNET_assert (NULL != fh);
1002   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1003   num_peers = file_size / 53;
1004   buf = GNUNET_malloc (file_size);
1005   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1006   GNUNET_assert (size_read == file_size);
1007   LOG (GNUNET_ERROR_TYPE_DEBUG,
1008       "Restoring %" PRIu32 " peers from file `%s'\n",
1009       num_peers,
1010       filename_valid_peers);
1011   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1012   {
1013     str_repr = GNUNET_strndup (iter_buf, 53);
1014     peer = s2i_full (str_repr);
1015     GNUNET_free (str_repr);
1016     add_valid_peer (peer);
1017     LOG (GNUNET_ERROR_TYPE_DEBUG,
1018         "Restored valid peer %s from disk\n",
1019         GNUNET_i2s_full (peer));
1020   }
1021   iter_buf = NULL;
1022   GNUNET_free (buf);
1023   LOG (GNUNET_ERROR_TYPE_DEBUG,
1024       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1025       num_peers,
1026       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1027   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1028   {
1029     LOG (GNUNET_ERROR_TYPE_WARNING,
1030         "Number of restored peers does not match file size. Have probably duplicates.\n");
1031   }
1032   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1033   LOG (GNUNET_ERROR_TYPE_DEBUG,
1034       "Restored %u valid peers from disk\n",
1035       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1036 }
1037
1038
1039 /**
1040  * @brief Initialise storage of peers
1041  *
1042  * @param fn_valid_peers filename of the file used to store valid peer ids
1043  * @param cadet_h cadet handle
1044  * @param own_id own peer identity
1045  */
1046 void
1047 Peers_initialise (char* fn_valid_peers,
1048                   struct GNUNET_CADET_Handle *cadet_h,
1049                   const struct GNUNET_PeerIdentity *own_id)
1050 {
1051   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1052   cadet_handle = cadet_h;
1053   own_identity = *own_id;
1054   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1055   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1056   restore_valid_peers ();
1057 }
1058
1059
1060 /**
1061  * @brief Delete storage of peers that was created with #Peers_initialise ()
1062  */
1063 void
1064 Peers_terminate ()
1065 {
1066   if (GNUNET_SYSERR ==
1067       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1068                                              peermap_clear_iterator,
1069                                              NULL))
1070   {
1071     LOG (GNUNET_ERROR_TYPE_WARNING,
1072         "Iteration destroying peers was aborted.\n");
1073   }
1074   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1075   peer_map = NULL;
1076   store_valid_peers ();
1077   GNUNET_free (filename_valid_peers);
1078   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1079 }
1080
1081
1082 /**
1083  * Iterator over #valid_peers hash map entries.
1084  *
1085  * @param cls closure - unused
1086  * @param peer current peer id
1087  * @param value value in the hash map - unused
1088  * @return #GNUNET_YES if we should continue to
1089  *         iterate,
1090  *         #GNUNET_NO if not.
1091  */
1092 static int
1093 valid_peer_iterator (void *cls,
1094                      const struct GNUNET_PeerIdentity *peer,
1095                      void *value)
1096 {
1097   struct PeersIteratorCls *it_cls = cls;
1098
1099   return it_cls->iterator (it_cls->cls,
1100                            peer);
1101 }
1102
1103
1104 /**
1105  * @brief Get all currently known, valid peer ids.
1106  *
1107  * @param it function to call on each peer id
1108  * @param it_cls extra argument to @a it
1109  * @return the number of key value pairs processed,
1110  *         #GNUNET_SYSERR if it aborted iteration
1111  */
1112 int
1113 Peers_get_valid_peers (PeersIterator iterator,
1114                        void *it_cls)
1115 {
1116   struct PeersIteratorCls *cls;
1117   int ret;
1118
1119   cls = GNUNET_new (struct PeersIteratorCls);
1120   cls->iterator = iterator;
1121   cls->cls = it_cls;
1122   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1123                                                valid_peer_iterator,
1124                                                cls);
1125   GNUNET_free (cls);
1126   return ret;
1127 }
1128
1129
1130 /**
1131  * @brief Add peer to known peers.
1132  *
1133  * This function is called on new peer_ids from 'external' sources
1134  * (client seed, cadet get_peers(), ...)
1135  *
1136  * @param peer the new #GNUNET_PeerIdentity
1137  *
1138  * @return #GNUNET_YES if peer was inserted
1139  *         #GNUNET_NO  otherwise (if peer was already known or
1140  *                     peer was #own_identity)
1141  */
1142 int
1143 Peers_insert_peer (const struct GNUNET_PeerIdentity *peer)
1144 {
1145   if ( (GNUNET_YES == Peers_check_peer_known (peer)) ||
1146        (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity)) )
1147   {
1148     return GNUNET_NO; /* We already know this peer - nothing to do */
1149   }
1150   (void) create_peer_ctx (peer);
1151   return GNUNET_YES;
1152 }
1153
1154 int
1155 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags);
1156
1157 /**
1158  * @brief Try connecting to a peer to see whether it is online
1159  *
1160  * If not known yet, insert into known peers
1161  *
1162  * @param peer the peer whose liveliness is to be checked
1163  * @return #GNUNET_YES if peer had to be inserted
1164  *         #GNUNET_NO  otherwise (if peer was already known or
1165  *                     peer was #own_identity)
1166  */
1167 int
1168 Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1169 {
1170   struct PeerContext *peer_ctx;
1171   int ret;
1172
1173   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1174   {
1175     return GNUNET_NO;
1176   }
1177   ret = Peers_insert_peer (peer);
1178   peer_ctx = get_peer_ctx (peer);
1179   if (GNUNET_NO == Peers_check_peer_flag (peer, Peers_ONLINE))
1180   {
1181     check_peer_live (peer_ctx);
1182   }
1183   return ret;
1184 }
1185
1186
1187 /**
1188  * @brief Check if peer is removable.
1189  *
1190  * Check if
1191  *  - a recv channel exists
1192  *  - there are pending messages
1193  *  - there is no pending pull reply
1194  *
1195  * @param peer the peer in question
1196  * @return #GNUNET_YES    if peer is removable
1197  *         #GNUNET_NO     if peer is NOT removable
1198  *         #GNUNET_SYSERR if peer is not known
1199  */
1200 int
1201 Peers_check_removable (const struct GNUNET_PeerIdentity *peer)
1202 {
1203   struct PeerContext *peer_ctx;
1204
1205   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1206   {
1207     return GNUNET_SYSERR;
1208   }
1209
1210   peer_ctx = get_peer_ctx (peer);
1211   if ( (NULL != peer_ctx->recv_channel) ||
1212        (NULL != peer_ctx->pending_messages_head) ||
1213        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1214   {
1215     return GNUNET_NO;
1216   }
1217   return GNUNET_YES;
1218 }
1219
1220 uint32_t *
1221 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1222                         enum Peers_ChannelRole role);
1223
1224 int
1225 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags);
1226
1227 /**
1228  * @brief Remove peer
1229  *
1230  * @param peer the peer to clean
1231  * @return #GNUNET_YES if peer was removed
1232  *         #GNUNET_NO  otherwise
1233  */
1234 int
1235 Peers_remove_peer (const struct GNUNET_PeerIdentity *peer)
1236 {
1237   struct PeerContext *peer_ctx;
1238   uint32_t *channel_flag;
1239
1240   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1241   {
1242     return GNUNET_NO;
1243   }
1244
1245   peer_ctx = get_peer_ctx (peer);
1246   set_peer_flag (peer_ctx, Peers_TO_DESTROY);
1247   LOG (GNUNET_ERROR_TYPE_DEBUG,
1248        "Going to remove peer %s\n",
1249        GNUNET_i2s (&peer_ctx->peer_id));
1250   Peers_unset_peer_flag (peer, Peers_ONLINE);
1251
1252   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
1253   while (NULL != peer_ctx->pending_messages_head)
1254   {
1255     LOG (GNUNET_ERROR_TYPE_DEBUG,
1256         "Removing unsent %s\n",
1257         peer_ctx->pending_messages_head->type);
1258     /* Cancle pending message, too */
1259     remove_pending_message (peer_ctx->pending_messages_head, GNUNET_YES);
1260   }
1261   /* If we are still waiting for notification whether this peer is live
1262    * cancel the according task */
1263   if (NULL != peer_ctx->liveliness_check_pending)
1264   {
1265     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1266          "Removing pending liveliness check for peer %s\n",
1267          GNUNET_i2s (&peer_ctx->peer_id));
1268     // TODO wait until cadet sets mq->cancel_impl
1269     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
1270     GNUNET_free (peer_ctx->liveliness_check_pending);
1271     peer_ctx->liveliness_check_pending = NULL;
1272   }
1273   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
1274   if (NULL != peer_ctx->send_channel &&
1275       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1276   {
1277     LOG (GNUNET_ERROR_TYPE_DEBUG,
1278         "Destroying send channel\n");
1279     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1280     peer_ctx->send_channel = NULL;
1281     peer_ctx->mq = NULL;
1282   }
1283   channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
1284   if (NULL != peer_ctx->recv_channel &&
1285       GNUNET_YES != Peers_check_channel_flag (channel_flag, Peers_CHANNEL_DESTROING))
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288         "Destroying recv channel\n");
1289     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1290     peer_ctx->recv_channel = NULL;
1291   }
1292
1293   GNUNET_free (peer_ctx->send_channel_flags);
1294   GNUNET_free (peer_ctx->recv_channel_flags);
1295
1296   if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id))
1297   {
1298     LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n");
1299   }
1300   GNUNET_free (peer_ctx);
1301   return GNUNET_YES;
1302 }
1303
1304
1305 /**
1306  * @brief set flags on a given peer.
1307  *
1308  * @param peer the peer to set flags on
1309  * @param flags the flags
1310  */
1311 void
1312 Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1313 {
1314   struct PeerContext *peer_ctx;
1315
1316   peer_ctx = get_peer_ctx (peer);
1317   set_peer_flag (peer_ctx, flags);
1318 }
1319
1320
1321 /**
1322  * @brief unset flags on a given peer.
1323  *
1324  * @param peer the peer to unset flags on
1325  * @param flags the flags
1326  */
1327 void
1328 Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1329 {
1330   struct PeerContext *peer_ctx;
1331
1332   peer_ctx = get_peer_ctx (peer);
1333   unset_peer_flag (peer_ctx, flags);
1334 }
1335
1336
1337 /**
1338  * @brief Check whether flags on a peer are set.
1339  *
1340  * @param peer the peer to check the flag of
1341  * @param flags the flags to check
1342  *
1343  * @return #GNUNET_SYSERR if peer is not known
1344  *         #GNUNET_YES    if all given flags are set
1345  *         #GNUNET_NO     otherwise
1346  */
1347 int
1348 Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags)
1349 {
1350   struct PeerContext *peer_ctx;
1351
1352   if (GNUNET_NO == Peers_check_peer_known (peer))
1353   {
1354     return GNUNET_SYSERR;
1355   }
1356   peer_ctx = get_peer_ctx (peer);
1357   return check_peer_flag_set (peer_ctx, flags);
1358 }
1359
1360
1361 /**
1362  * @brief set flags on a given channel.
1363  *
1364  * @param channel the channel to set flags on
1365  * @param flags the flags
1366  */
1367 void
1368 Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1369 {
1370   set_channel_flag (channel_flags, flags);
1371 }
1372
1373
1374 /**
1375  * @brief unset flags on a given channel.
1376  *
1377  * @param channel the channel to unset flags on
1378  * @param flags the flags
1379  */
1380 void
1381 Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1382 {
1383   unset_channel_flag (channel_flags, flags);
1384 }
1385
1386
1387 /**
1388  * @brief Check whether flags on a channel are set.
1389  *
1390  * @param channel the channel to check the flag of
1391  * @param flags the flags to check
1392  *
1393  * @return #GNUNET_YES if all given flags are set
1394  *         #GNUNET_NO  otherwise
1395  */
1396 int
1397 Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags)
1398 {
1399   return check_channel_flag_set (channel_flags, flags);
1400 }
1401
1402 /**
1403  * @brief Get the flags for the channel in @a role for @a peer.
1404  *
1405  * @param peer Peer to get the channel flags for.
1406  * @param role Role of channel to get flags for
1407  *
1408  * @return The flags.
1409  */
1410 uint32_t *
1411 Peers_get_channel_flag (const struct GNUNET_PeerIdentity *peer,
1412                         enum Peers_ChannelRole role)
1413 {
1414   const struct PeerContext *peer_ctx;
1415
1416   peer_ctx = get_peer_ctx (peer);
1417   if (Peers_CHANNEL_ROLE_SENDING == role)
1418   {
1419     return peer_ctx->send_channel_flags;
1420   }
1421   else if (Peers_CHANNEL_ROLE_RECEIVING == role)
1422   {
1423     return peer_ctx->recv_channel_flags;
1424   }
1425   else
1426   {
1427     GNUNET_assert (0);
1428   }
1429 }
1430
1431 /**
1432  * @brief Check whether we have information about the given peer.
1433  *
1434  * FIXME probably deprecated. Make this the new _online.
1435  *
1436  * @param peer peer in question
1437  *
1438  * @return #GNUNET_YES if peer is known
1439  *         #GNUNET_NO  if peer is not knwon
1440  */
1441 int
1442 Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer)
1443 {
1444   if (NULL != peer_map)
1445   {
1446     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
1447   } else
1448   {
1449     return GNUNET_NO;
1450   }
1451 }
1452
1453
1454 /**
1455  * @brief Check whether @a peer is actually a peer.
1456  *
1457  * A valid peer is a peer that we know exists eg. we were connected to once.
1458  *
1459  * @param peer peer in question
1460  *
1461  * @return #GNUNET_YES if peer is valid
1462  *         #GNUNET_NO  if peer is not valid
1463  */
1464 int
1465 Peers_check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1466 {
1467   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1468 }
1469
1470
1471 /**
1472  * @brief Indicate that we want to send to the other peer
1473  *
1474  * This establishes a sending channel
1475  *
1476  * @param peer the peer to establish channel to
1477  */
1478 void
1479 Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1480 {
1481   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1482   (void) get_channel (peer);
1483 }
1484
1485
1486 /**
1487  * @brief Check whether other peer has the intention to send/opened channel
1488  *        towars us
1489  *
1490  * @param peer the peer in question
1491  *
1492  * @return #GNUNET_YES if peer has the intention to send
1493  *         #GNUNET_NO  otherwise
1494  */
1495 int
1496 Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1497 {
1498   const struct PeerContext *peer_ctx;
1499
1500   peer_ctx = get_peer_ctx (peer);
1501   if (NULL != peer_ctx->recv_channel)
1502   {
1503     return GNUNET_YES;
1504   }
1505   return GNUNET_NO;
1506 }
1507
1508
1509 /**
1510  * Handle the channel a peer opens to us.
1511  *
1512  * @param cls The closure
1513  * @param channel The channel the peer wants to establish
1514  * @param initiator The peer's peer ID
1515  *
1516  * @return initial channel context for the channel
1517  *         (can be NULL -- that's not an error)
1518  */
1519 void *
1520 Peers_handle_inbound_channel (void *cls,
1521                               struct GNUNET_CADET_Channel *channel,
1522                               const struct GNUNET_PeerIdentity *initiator)
1523 {
1524   struct PeerContext *peer_ctx;
1525   struct GNUNET_PeerIdentity *ctx_peer;
1526
1527   LOG (GNUNET_ERROR_TYPE_DEBUG,
1528       "New channel was established to us (Peer %s).\n",
1529       GNUNET_i2s (initiator));
1530   GNUNET_assert (NULL != channel); /* according to cadet API */
1531   /* Make sure we 'know' about this peer */
1532   peer_ctx = create_or_get_peer_ctx (initiator);
1533   set_peer_live (peer_ctx);
1534   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1535   *ctx_peer = *initiator;
1536   /* We only accept one incoming channel per peer */
1537   if (GNUNET_YES == Peers_check_peer_send_intention (initiator))
1538   {
1539     set_channel_flag (peer_ctx->recv_channel_flags,
1540                       Peers_CHANNEL_ESTABLISHED_TWICE);
1541     //GNUNET_CADET_channel_destroy (channel);
1542     GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1543     peer_ctx->recv_channel = channel;
1544     /* return the channel context */
1545     return ctx_peer;
1546   }
1547   peer_ctx->recv_channel = channel;
1548   return ctx_peer;
1549 }
1550
1551
1552 /**
1553  * @brief Check whether a sending channel towards the given peer exists
1554  *
1555  * @param peer the peer to check for
1556  *
1557  * @return #GNUNET_YES if a sending channel towards that peer exists
1558  *         #GNUNET_NO  otherwise
1559  */
1560 int
1561 Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1562 {
1563   struct PeerContext *peer_ctx;
1564
1565   if (GNUNET_NO == Peers_check_peer_known (peer))
1566   { /* If no such peer exists, there is no channel */
1567     return GNUNET_NO;
1568   }
1569   peer_ctx = get_peer_ctx (peer);
1570   if (NULL == peer_ctx->send_channel)
1571   {
1572     return GNUNET_NO;
1573   }
1574   return GNUNET_YES;
1575 }
1576
1577
1578 /**
1579  * @brief check whether the given channel is the sending channel of the given
1580  *        peer
1581  *
1582  * @param peer the peer in question
1583  * @param channel the channel to check for
1584  * @param role either #Peers_CHANNEL_ROLE_SENDING, or
1585  *                    #Peers_CHANNEL_ROLE_RECEIVING
1586  *
1587  * @return #GNUNET_YES if the given chennel is the sending channel of the peer
1588  *         #GNUNET_NO  otherwise
1589  */
1590 int
1591 Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer,
1592                           const struct GNUNET_CADET_Channel *channel,
1593                           enum Peers_ChannelRole role)
1594 {
1595   const struct PeerContext *peer_ctx;
1596
1597   if (GNUNET_NO == Peers_check_peer_known (peer))
1598   {
1599     return GNUNET_NO;
1600   }
1601   peer_ctx = get_peer_ctx (peer);
1602   if ( (Peers_CHANNEL_ROLE_SENDING == role) &&
1603        (channel == peer_ctx->send_channel) )
1604   {
1605     return GNUNET_YES;
1606   }
1607   if ( (Peers_CHANNEL_ROLE_RECEIVING == role) &&
1608        (channel == peer_ctx->recv_channel) )
1609   {
1610     return GNUNET_YES;
1611   }
1612   return GNUNET_NO;
1613 }
1614
1615
1616 /**
1617  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618  *        intention to another peer
1619  *
1620  * If there is also no channel to receive messages from that peer, remove it
1621  * from the peermap.
1622  * TODO really?
1623  *
1624  * @peer the peer identity of the peer whose sending channel to destroy
1625  * @return #GNUNET_YES if channel was destroyed
1626  *         #GNUNET_NO  otherwise
1627  */
1628 int
1629 Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1630 {
1631   struct PeerContext *peer_ctx;
1632
1633   if (GNUNET_NO == Peers_check_peer_known (peer))
1634   {
1635     return GNUNET_NO;
1636   }
1637   peer_ctx = get_peer_ctx (peer);
1638   if (NULL != peer_ctx->send_channel)
1639   {
1640     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN);
1641     GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1642     peer_ctx->send_channel = NULL;
1643     peer_ctx->mq = NULL;
1644     (void) Peers_check_connected (peer);
1645     return GNUNET_YES;
1646   }
1647   return GNUNET_NO;
1648 }
1649
1650 /**
1651  * This is called when a channel is destroyed.
1652  *
1653  * @param cls The closure
1654  * @param channel The channel being closed
1655  */
1656 void
1657 Peers_cleanup_destroyed_channel (void *cls,
1658                                  const struct GNUNET_CADET_Channel *channel)
1659 {
1660   struct GNUNET_PeerIdentity *peer = cls;
1661   struct PeerContext *peer_ctx;
1662
1663   if (GNUNET_NO == Peers_check_peer_known (peer))
1664   {/* We don't want to implicitly create a context that we're about to kill */
1665   LOG (GNUNET_ERROR_TYPE_DEBUG,
1666        "channel (%s) without associated context was destroyed\n",
1667        GNUNET_i2s (peer));
1668     return;
1669   }
1670   peer_ctx = get_peer_ctx (peer);
1671
1672   /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY
1673    * flag will be set. In this case simply make sure that the channels are
1674    * cleaned. */
1675   /* FIXME This distinction seems to be redundant */
1676   if (Peers_check_peer_flag (peer, Peers_TO_DESTROY))
1677   {/* We initiatad the destruction of this particular peer */
1678     LOG (GNUNET_ERROR_TYPE_DEBUG,
1679         "Peer is in the process of being destroyed\n");
1680     if (channel == peer_ctx->send_channel)
1681     {
1682       peer_ctx->send_channel = NULL;
1683       peer_ctx->mq = NULL;
1684     }
1685     else if (channel == peer_ctx->recv_channel)
1686     {
1687       peer_ctx->recv_channel = NULL;
1688     }
1689
1690     if (NULL != peer_ctx->send_channel)
1691     {
1692       GNUNET_CADET_channel_destroy (peer_ctx->send_channel);
1693       peer_ctx->send_channel = NULL;
1694       peer_ctx->mq = NULL;
1695     }
1696     if (NULL != peer_ctx->recv_channel)
1697     {
1698       GNUNET_CADET_channel_destroy (peer_ctx->recv_channel);
1699       peer_ctx->recv_channel = NULL;
1700     }
1701     /* Set the #Peers_ONLINE flag accordingly */
1702     (void) Peers_check_connected (peer);
1703     return;
1704   }
1705
1706   else
1707   { /* We did not initiate the destruction of this peer */
1708     LOG (GNUNET_ERROR_TYPE_DEBUG,
1709         "Peer is NOT in the process of being destroyed\n");
1710     if (channel == peer_ctx->send_channel)
1711     { /* Something (but us) killd the channel - clean up peer */
1712       LOG (GNUNET_ERROR_TYPE_DEBUG,
1713           "send channel (%s) was destroyed - cleaning up\n",
1714           GNUNET_i2s (peer));
1715       peer_ctx->send_channel = NULL;
1716       peer_ctx->mq = NULL;
1717     }
1718     else if (channel == peer_ctx->recv_channel)
1719     { /* Other peer doesn't want to send us messages anymore */
1720       LOG (GNUNET_ERROR_TYPE_DEBUG,
1721            "Peer %s destroyed recv channel - cleaning up channel\n",
1722            GNUNET_i2s (peer));
1723       peer_ctx->recv_channel = NULL;
1724     }
1725     else
1726     {
1727       LOG (GNUNET_ERROR_TYPE_WARNING,
1728            "unknown channel (%s) was destroyed\n",
1729            GNUNET_i2s (peer));
1730     }
1731   }
1732   (void) Peers_check_connected (peer);
1733 }
1734
1735 /**
1736  * @brief Send a message to another peer.
1737  *
1738  * Keeps track about pending messages so they can be properly removed when the
1739  * peer is destroyed.
1740  *
1741  * @param peer receeiver of the message
1742  * @param ev envelope of the message
1743  * @param type type of the message
1744  */
1745 void
1746 Peers_send_message (const struct GNUNET_PeerIdentity *peer,
1747                     struct GNUNET_MQ_Envelope *ev,
1748                     const char *type)
1749 {
1750   struct PendingMessage *pending_msg;
1751   struct GNUNET_MQ_Handle *mq;
1752
1753   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1754               "Sending message to %s of type %s\n",
1755               GNUNET_i2s (peer),
1756               type);
1757   pending_msg = insert_pending_message (peer, ev, type);
1758   mq = get_mq (peer);
1759   GNUNET_MQ_notify_sent (ev,
1760                          mq_notify_sent_cb,
1761                          pending_msg);
1762   GNUNET_MQ_send (mq, ev);
1763 }
1764
1765 /**
1766  * @brief Schedule a operation on given peer
1767  *
1768  * Avoids scheduling an operation twice.
1769  *
1770  * @param peer the peer we want to schedule the operation for once it gets live
1771  *
1772  * @return #GNUNET_YES if the operation was scheduled
1773  *         #GNUNET_NO  otherwise
1774  */
1775 int
1776 Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer,
1777                           const PeerOp peer_op)
1778 {
1779   struct PeerPendingOp pending_op;
1780   struct PeerContext *peer_ctx;
1781
1782   if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, &own_identity))
1783   {
1784     return GNUNET_NO;
1785   }
1786   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1787
1788   //TODO if LIVE/ONLINE execute immediately
1789
1790   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1791   {
1792     peer_ctx = get_peer_ctx (peer);
1793     pending_op.op = peer_op;
1794     pending_op.op_cls = NULL;
1795     GNUNET_array_append (peer_ctx->pending_ops,
1796                          peer_ctx->num_pending_ops,
1797                          pending_op);
1798     return GNUNET_YES;
1799   }
1800   return GNUNET_NO;
1801 }
1802
1803 /**
1804  * @brief Get the recv_channel of @a peer.
1805  * Needed to correctly handle (call #GNUNET_CADET_receive_done()) incoming
1806  * messages.
1807  *
1808  * @param peer The peer to get the recv_channel from.
1809  *
1810  * @return The recv_channel.
1811  */
1812 struct GNUNET_CADET_Channel *
1813 Peers_get_recv_channel (const struct GNUNET_PeerIdentity *peer)
1814 {
1815   struct PeerContext *peer_ctx;
1816
1817   GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer));
1818   peer_ctx = get_peer_ctx (peer);
1819   return peer_ctx->recv_channel;
1820 }
1821 /***********************************************************************
1822  * /Old gnunet-service-rps_peers.c
1823 ***********************************************************************/
1824
1825
1826 /***********************************************************************
1827  * Housekeeping with clients
1828 ***********************************************************************/
1829
1830 /**
1831  * Closure used to pass the client and the id to the callback
1832  * that replies to a client's request
1833  */
1834 struct ReplyCls
1835 {
1836   /**
1837    * DLL
1838    */
1839   struct ReplyCls *next;
1840   struct ReplyCls *prev;
1841
1842   /**
1843    * The identifier of the request
1844    */
1845   uint32_t id;
1846
1847   /**
1848    * The handle to the request
1849    */
1850   struct RPS_SamplerRequestHandle *req_handle;
1851
1852   /**
1853    * The client handle to send the reply to
1854    */
1855   struct ClientContext *cli_ctx;
1856 };
1857
1858
1859 /**
1860  * Struct used to store the context of a connected client.
1861  */
1862 struct ClientContext
1863 {
1864   /**
1865    * DLL
1866    */
1867   struct ClientContext *next;
1868   struct ClientContext *prev;
1869
1870   /**
1871    * The message queue to communicate with the client.
1872    */
1873   struct GNUNET_MQ_Handle *mq;
1874
1875   /**
1876    * DLL with handles to single requests from the client
1877    */
1878   struct ReplyCls *rep_cls_head;
1879   struct ReplyCls *rep_cls_tail;
1880
1881   /**
1882    * @brief How many updates this client expects to receive.
1883    */
1884   int64_t view_updates_left;
1885
1886   /**
1887    * The client handle to send the reply to
1888    */
1889   struct GNUNET_SERVICE_Client *client;
1890 };
1891
1892 /**
1893  * DLL with all clients currently connected to us
1894  */
1895 struct ClientContext *cli_ctx_head;
1896 struct ClientContext *cli_ctx_tail;
1897
1898 /***********************************************************************
1899  * /Housekeeping with clients
1900 ***********************************************************************/
1901
1902
1903
1904
1905
1906 /***********************************************************************
1907  * Globals
1908 ***********************************************************************/
1909
1910 /**
1911  * Sampler used for the Brahms protocol itself.
1912  */
1913 static struct RPS_Sampler *prot_sampler;
1914
1915 /**
1916  * Sampler used for the clients.
1917  */
1918 static struct RPS_Sampler *client_sampler;
1919
1920 /**
1921  * Name to log view to
1922  */
1923 static const char *file_name_view_log;
1924
1925 #ifdef TO_FILE
1926 /**
1927  * Name to log number of observed peers to
1928  */
1929 static const char *file_name_observed_log;
1930
1931 /**
1932  * @brief Count the observed peers
1933  */
1934 static uint32_t num_observed_peers;
1935
1936 /**
1937  * @brief Multipeermap (ab-) used to count unique peer_ids
1938  */
1939 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1940 #endif /* TO_FILE */
1941
1942 /**
1943  * The size of sampler we need to be able to satisfy the client's need
1944  * of random peers.
1945  */
1946 static unsigned int sampler_size_client_need;
1947
1948 /**
1949  * The size of sampler we need to be able to satisfy the Brahms protocol's
1950  * need of random peers.
1951  *
1952  * This is one minimum size the sampler grows to.
1953  */
1954 static unsigned int sampler_size_est_need;
1955
1956 /**
1957  * @brief This is the minimum estimate used as sampler size.
1958  *
1959  * It is configured by the user.
1960  */
1961 static unsigned int sampler_size_est_min;
1962
1963 /**
1964  * @brief This is the estimate used as view size.
1965  *
1966  * It is initialised with the minimum
1967  */
1968 static unsigned int view_size_est_need;
1969
1970 /**
1971  * @brief This is the minimum estimate used as view size.
1972  *
1973  * It is configured by the user.
1974  */
1975 static unsigned int view_size_est_min;
1976
1977 /**
1978  * Percentage of total peer number in the view
1979  * to send random PUSHes to
1980  */
1981 static float alpha;
1982
1983 /**
1984  * Percentage of total peer number in the view
1985  * to send random PULLs to
1986  */
1987 static float beta;
1988
1989 /**
1990  * Identifier for the main task that runs periodically.
1991  */
1992 static struct GNUNET_SCHEDULER_Task *do_round_task;
1993
1994 /**
1995  * Time inverval the do_round task runs in.
1996  */
1997 static struct GNUNET_TIME_Relative round_interval;
1998
1999 /**
2000  * List to store peers received through pushes temporary.
2001  */
2002 static struct CustomPeerMap *push_map;
2003
2004 /**
2005  * List to store peers received through pulls temporary.
2006  */
2007 static struct CustomPeerMap *pull_map;
2008
2009 /**
2010  * Handler to NSE.
2011  */
2012 static struct GNUNET_NSE_Handle *nse;
2013
2014 /**
2015  * Handler to CADET.
2016  */
2017 static struct GNUNET_CADET_Handle *cadet_handle;
2018
2019 /**
2020  * @brief Port to communicate to other peers.
2021  */
2022 static struct GNUNET_CADET_Port *cadet_port;
2023
2024 /**
2025  * Handler to PEERINFO.
2026  */
2027 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
2028
2029 /**
2030  * Handle for cancellation of iteration over peers.
2031  */
2032 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
2033
2034 /**
2035  * Request counter.
2036  *
2037  * Counts how many requets clients already issued.
2038  * Only needed in the beginning to check how many of the 64 deltas
2039  * we already have
2040  */
2041 static unsigned int req_counter;
2042
2043 /**
2044  * Time of the last request we received.
2045  *
2046  * Used to compute the expected request rate.
2047  */
2048 static struct GNUNET_TIME_Absolute last_request;
2049
2050 /**
2051  * Size of #request_deltas.
2052  */
2053 #define REQUEST_DELTAS_SIZE 64
2054 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
2055
2056 /**
2057  * Last 64 deltas between requests
2058  */
2059 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
2060
2061 /**
2062  * The prediction of the rate of requests
2063  */
2064 static struct GNUNET_TIME_Relative request_rate;
2065
2066
2067 #ifdef ENABLE_MALICIOUS
2068 /**
2069  * Type of malicious peer
2070  *
2071  * 0 Don't act malicious at all - Default
2072  * 1 Try to maximise representation
2073  * 2 Try to partition the network
2074  * 3 Combined attack
2075  */
2076 static uint32_t mal_type;
2077
2078 /**
2079  * Other malicious peers
2080  */
2081 static struct GNUNET_PeerIdentity *mal_peers;
2082
2083 /**
2084  * Hashmap of malicious peers used as set.
2085  * Used to more efficiently check whether we know that peer.
2086  */
2087 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
2088
2089 /**
2090  * Number of other malicious peers
2091  */
2092 static uint32_t num_mal_peers;
2093
2094
2095 /**
2096  * If type is 2 This struct is used to store the attacked peers in a DLL
2097  */
2098 struct AttackedPeer
2099 {
2100   /**
2101    * DLL
2102    */
2103   struct AttackedPeer *next;
2104   struct AttackedPeer *prev;
2105
2106   /**
2107    * PeerID
2108    */
2109   struct GNUNET_PeerIdentity peer_id;
2110 };
2111
2112 /**
2113  * If type is 2 this is the DLL of attacked peers
2114  */
2115 static struct AttackedPeer *att_peers_head;
2116 static struct AttackedPeer *att_peers_tail;
2117
2118 /**
2119  * This index is used to point to an attacked peer to
2120  * implement the round-robin-ish way to select attacked peers.
2121  */
2122 static struct AttackedPeer *att_peer_index;
2123
2124 /**
2125  * Hashmap of attacked peers used as set.
2126  * Used to more efficiently check whether we know that peer.
2127  */
2128 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
2129
2130 /**
2131  * Number of attacked peers
2132  */
2133 static uint32_t num_attacked_peers;
2134
2135 /**
2136  * If type is 1 this is the attacked peer
2137  */
2138 static struct GNUNET_PeerIdentity attacked_peer;
2139
2140 /**
2141  * The limit of PUSHes we can send in one round.
2142  * This is an assumption of the Brahms protocol and either implemented
2143  * via proof of work
2144  * or
2145  * assumend to be the bandwidth limitation.
2146  */
2147 static uint32_t push_limit = 10000;
2148 #endif /* ENABLE_MALICIOUS */
2149
2150
2151 /***********************************************************************
2152  * /Globals
2153 ***********************************************************************/
2154
2155
2156 /***********************************************************************
2157  * Util functions
2158 ***********************************************************************/
2159
2160
2161 /**
2162  * Print peerlist to log.
2163  */
2164 static void
2165 print_peer_list (struct GNUNET_PeerIdentity *list,
2166                  unsigned int len)
2167 {
2168   unsigned int i;
2169
2170   LOG (GNUNET_ERROR_TYPE_DEBUG,
2171        "Printing peer list of length %u at %p:\n",
2172        len,
2173        list);
2174   for (i = 0 ; i < len ; i++)
2175   {
2176     LOG (GNUNET_ERROR_TYPE_DEBUG,
2177          "%u. peer: %s\n",
2178          i, GNUNET_i2s (&list[i]));
2179   }
2180 }
2181
2182
2183 /**
2184  * Remove peer from list.
2185  */
2186 static void
2187 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2188                unsigned int *list_size,
2189                const struct GNUNET_PeerIdentity *peer)
2190 {
2191   unsigned int i;
2192   struct GNUNET_PeerIdentity *tmp;
2193
2194   tmp = *peer_list;
2195
2196   LOG (GNUNET_ERROR_TYPE_DEBUG,
2197        "Removing peer %s from list at %p\n",
2198        GNUNET_i2s (peer),
2199        tmp);
2200
2201   for ( i = 0 ; i < *list_size ; i++ )
2202   {
2203     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2204     {
2205       if (i < *list_size -1)
2206       { /* Not at the last entry -- shift peers left */
2207         memmove (&tmp[i], &tmp[i +1],
2208                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2209       }
2210       /* Remove last entry (should be now useless PeerID) */
2211       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2212     }
2213   }
2214   *peer_list = tmp;
2215 }
2216
2217
2218 /**
2219  * Sum all time relatives of an array.
2220  */
2221 static struct GNUNET_TIME_Relative
2222 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2223                 uint32_t arr_size)
2224 {
2225   struct GNUNET_TIME_Relative sum;
2226   uint32_t i;
2227
2228   sum = GNUNET_TIME_UNIT_ZERO;
2229   for ( i = 0 ; i < arr_size ; i++ )
2230   {
2231     sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2232   }
2233   return sum;
2234 }
2235
2236
2237 /**
2238  * Compute the average of given time relatives.
2239  */
2240 static struct GNUNET_TIME_Relative
2241 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2242                 uint32_t arr_size)
2243 {
2244   return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2245                                                       arr_size),
2246                                       arr_size);
2247 }
2248
2249
2250 /**
2251  * Insert PeerID in #view
2252  *
2253  * Called once we know a peer is live.
2254  * Implements #PeerOp
2255  *
2256  * @return GNUNET_OK if peer was actually inserted
2257  *         GNUNET_NO if peer was not inserted
2258  */
2259 static void
2260 insert_in_view_op (void *cls,
2261                 const struct GNUNET_PeerIdentity *peer);
2262
2263 /**
2264  * Insert PeerID in #view
2265  *
2266  * Called once we know a peer is live.
2267  *
2268  * @return GNUNET_OK if peer was actually inserted
2269  *         GNUNET_NO if peer was not inserted
2270  */
2271 static int
2272 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2273 {
2274   int online;
2275
2276   online = Peers_check_peer_flag (peer, Peers_ONLINE);
2277   if ( (GNUNET_NO == online) ||
2278        (GNUNET_SYSERR == online) ) /* peer is not even known */
2279   {
2280     (void) Peers_issue_peer_liveliness_check (peer);
2281     (void) Peers_schedule_operation (peer, insert_in_view_op);
2282     return GNUNET_NO;
2283   }
2284   /* Open channel towards peer to keep connection open */
2285   Peers_indicate_sending_intention (peer);
2286   return View_put (peer);
2287 }
2288
2289 /**
2290  * @brief sends updates to clients that are interested
2291  */
2292 static void
2293 clients_notify_view_update (void);
2294
2295 /**
2296  * Put random peer from sampler into the view as history update.
2297  */
2298 static void
2299 hist_update (void *cls,
2300              struct GNUNET_PeerIdentity *ids,
2301              uint32_t num_peers)
2302 {
2303   unsigned int i;
2304
2305   for (i = 0; i < num_peers; i++)
2306   {
2307     (void) insert_in_view (&ids[i]);
2308     to_file (file_name_view_log,
2309              "+%s\t(hist)",
2310              GNUNET_i2s_full (ids));
2311   }
2312   clients_notify_view_update();
2313 }
2314
2315
2316 /**
2317  * Wrapper around #RPS_sampler_resize()
2318  *
2319  * If we do not have enough sampler elements, double current sampler size
2320  * If we have more than enough sampler elements, halv current sampler size
2321  */
2322 static void
2323 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2324 {
2325   unsigned int sampler_size;
2326
2327   // TODO statistics
2328   // TODO respect the min, max
2329   sampler_size = RPS_sampler_get_size (sampler);
2330   if (sampler_size > new_size * 4)
2331   { /* Shrinking */
2332     RPS_sampler_resize (sampler, sampler_size / 2);
2333   }
2334   else if (sampler_size < new_size)
2335   { /* Growing */
2336     RPS_sampler_resize (sampler, sampler_size * 2);
2337   }
2338   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2339 }
2340
2341
2342 /**
2343  * Wrapper around #RPS_sampler_resize() resizing the client sampler
2344  */
2345 static void
2346 client_resize_wrapper ()
2347 {
2348   uint32_t bigger_size;
2349
2350   // TODO statistics
2351
2352   bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2353
2354   // TODO respect the min, max
2355   resize_wrapper (client_sampler, bigger_size);
2356   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2357       bigger_size);
2358 }
2359
2360
2361 /**
2362  * Estimate request rate
2363  *
2364  * Called every time we receive a request from the client.
2365  */
2366 static void
2367 est_request_rate()
2368 {
2369   struct GNUNET_TIME_Relative max_round_duration;
2370
2371   if (request_deltas_size > req_counter)
2372     req_counter++;
2373   if ( 1 < req_counter)
2374   {
2375     /* Shift last request deltas to the right */
2376     memmove (&request_deltas[1],
2377         request_deltas,
2378         (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2379
2380     /* Add current delta to beginning */
2381     request_deltas[0] =
2382         GNUNET_TIME_absolute_get_difference (last_request,
2383                                              GNUNET_TIME_absolute_get ());
2384     request_rate = T_relative_avg (request_deltas, req_counter);
2385     request_rate = (request_rate.rel_value_us < 1) ?
2386       GNUNET_TIME_relative_get_unit_ () : request_rate;
2387
2388     /* Compute the duration a round will maximally take */
2389     max_round_duration =
2390         GNUNET_TIME_relative_add (round_interval,
2391                                   GNUNET_TIME_relative_divide (round_interval, 2));
2392
2393     /* Set the estimated size the sampler has to have to
2394      * satisfy the current client request rate */
2395     sampler_size_client_need =
2396         max_round_duration.rel_value_us / request_rate.rel_value_us;
2397
2398     /* Resize the sampler */
2399     client_resize_wrapper ();
2400   }
2401   last_request = GNUNET_TIME_absolute_get ();
2402 }
2403
2404
2405 /**
2406  * Add all peers in @a peer_array to @a peer_map used as set.
2407  *
2408  * @param peer_array array containing the peers
2409  * @param num_peers number of peers in @peer_array
2410  * @param peer_map the peermap to use as set
2411  */
2412 static void
2413 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2414                        unsigned int num_peers,
2415                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2416 {
2417   unsigned int i;
2418   if (NULL == peer_map)
2419   {
2420     LOG (GNUNET_ERROR_TYPE_WARNING,
2421          "Trying to add peers to non-existing peermap.\n");
2422     return;
2423   }
2424
2425   for (i = 0; i < num_peers; i++)
2426   {
2427     GNUNET_CONTAINER_multipeermap_put (peer_map,
2428                                        &peer_array[i],
2429                                        NULL,
2430                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2431   }
2432 }
2433
2434
2435 /**
2436  * Send a PULL REPLY to @a peer_id
2437  *
2438  * @param peer_id the peer to send the reply to.
2439  * @param peer_ids the peers to send to @a peer_id
2440  * @param num_peer_ids the number of peers to send to @a peer_id
2441  */
2442 static void
2443 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2444                  const struct GNUNET_PeerIdentity *peer_ids,
2445                  unsigned int num_peer_ids)
2446 {
2447   uint32_t send_size;
2448   struct GNUNET_MQ_Envelope *ev;
2449   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2450
2451   /* Compute actual size */
2452   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2453               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2454
2455   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2456     /* Compute number of peers to send
2457      * If too long, simply truncate */
2458     // TODO select random ones via permutation
2459     //      or even better: do good protocol design
2460     send_size =
2461       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2462        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2463        sizeof (struct GNUNET_PeerIdentity);
2464   else
2465     send_size = num_peer_ids;
2466
2467   LOG (GNUNET_ERROR_TYPE_DEBUG,
2468       "Going to send PULL REPLY with %u peers to %s\n",
2469       send_size, GNUNET_i2s (peer_id));
2470
2471   ev = GNUNET_MQ_msg_extra (out_msg,
2472                             send_size * sizeof (struct GNUNET_PeerIdentity),
2473                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2474   out_msg->num_peers = htonl (send_size);
2475   GNUNET_memcpy (&out_msg[1], peer_ids,
2476          send_size * sizeof (struct GNUNET_PeerIdentity));
2477
2478   Peers_send_message (peer_id, ev, "PULL REPLY");
2479   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2480 }
2481
2482
2483 /**
2484  * Insert PeerID in #pull_map
2485  *
2486  * Called once we know a peer is live.
2487  */
2488 static void
2489 insert_in_pull_map (void *cls,
2490                     const struct GNUNET_PeerIdentity *peer)
2491 {
2492   CustomPeerMap_put (pull_map, peer);
2493 }
2494
2495
2496 /**
2497  * Insert PeerID in #view
2498  *
2499  * Called once we know a peer is live.
2500  * Implements #PeerOp
2501  */
2502 static void
2503 insert_in_view_op (void *cls,
2504                 const struct GNUNET_PeerIdentity *peer)
2505 {
2506   (void) insert_in_view (peer);
2507 }
2508
2509
2510 /**
2511  * Update sampler with given PeerID.
2512  * Implements #PeerOp
2513  */
2514 static void
2515 insert_in_sampler (void *cls,
2516                    const struct GNUNET_PeerIdentity *peer)
2517 {
2518   LOG (GNUNET_ERROR_TYPE_DEBUG,
2519        "Updating samplers with peer %s from insert_in_sampler()\n",
2520        GNUNET_i2s (peer));
2521   RPS_sampler_update (prot_sampler,   peer);
2522   RPS_sampler_update (client_sampler, peer);
2523   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2524   {
2525     /* Make sure we 'know' about this peer */
2526     (void) Peers_issue_peer_liveliness_check (peer);
2527     /* Establish a channel towards that peer to indicate we are going to send
2528      * messages to it */
2529     //Peers_indicate_sending_intention (peer);
2530   }
2531   #ifdef TO_FILE
2532   num_observed_peers++;
2533   GNUNET_CONTAINER_multipeermap_put
2534     (observed_unique_peers,
2535      peer,
2536      NULL,
2537      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2538   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2539       observed_unique_peers);
2540   to_file (file_name_observed_log,
2541           "%" PRIu32 " %" PRIu32 " %f\n",
2542           num_observed_peers,
2543           num_observed_unique_peers,
2544           1.0*num_observed_unique_peers/num_observed_peers)
2545   #endif /* TO_FILE */
2546 }
2547
2548 /**
2549  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2550  *        If the peer is not known, liveliness check is issued and it is
2551  *        scheduled to be inserted in sampler and view.
2552  *
2553  * "External sources" refer to every source except the gossip.
2554  *
2555  * @param peer peer to insert
2556  */
2557 static void
2558 got_peer (const struct GNUNET_PeerIdentity *peer)
2559 {
2560   /* If we did not know this peer already, insert it into sampler and view */
2561   if (GNUNET_YES == Peers_issue_peer_liveliness_check (peer))
2562   {
2563     Peers_schedule_operation (peer, insert_in_sampler);
2564     Peers_schedule_operation (peer, insert_in_view_op);
2565   }
2566 }
2567
2568 /**
2569  * @brief Checks if there is a sending channel and if it is needed
2570  *
2571  * @param peer the peer whose sending channel is checked
2572  * @return GNUNET_YES if sending channel exists and is still needed
2573  *         GNUNET_NO  otherwise
2574  */
2575 static int
2576 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2577 {
2578   /* struct GNUNET_CADET_Channel *channel; */
2579   if (GNUNET_NO == Peers_check_peer_known (peer))
2580   {
2581     return GNUNET_NO;
2582   }
2583   if (GNUNET_YES == Peers_check_sending_channel_exists (peer))
2584   {
2585     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2586          (GNUNET_YES == View_contains_peer (peer)) ||
2587          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2588          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2589          (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2590     { /* If we want to keep the connection to peer open */
2591       return GNUNET_YES;
2592     }
2593     return GNUNET_NO;
2594   }
2595   return GNUNET_NO;
2596 }
2597
2598 /**
2599  * @brief remove peer from our knowledge, the view, push and pull maps and
2600  * samplers.
2601  *
2602  * @param peer the peer to remove
2603  */
2604 static void
2605 remove_peer (const struct GNUNET_PeerIdentity *peer)
2606 {
2607   (void) View_remove_peer (peer);
2608   CustomPeerMap_remove_peer (pull_map, peer);
2609   CustomPeerMap_remove_peer (push_map, peer);
2610   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2611   RPS_sampler_reinitialise_by_value (client_sampler, peer);
2612   Peers_remove_peer (peer);
2613 }
2614
2615
2616 /**
2617  * @brief Remove data that is not needed anymore.
2618  *
2619  * If the sending channel is no longer needed it is destroyed.
2620  *
2621  * @param peer the peer whose data is about to be cleaned
2622  */
2623 static void
2624 clean_peer (const struct GNUNET_PeerIdentity *peer)
2625 {
2626   if (GNUNET_NO == check_sending_channel_needed (peer))
2627   {
2628     LOG (GNUNET_ERROR_TYPE_DEBUG,
2629         "Going to remove send channel to peer %s\n",
2630         GNUNET_i2s (peer));
2631     #ifdef ENABLE_MALICIOUS
2632     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2633       (void) Peers_destroy_sending_channel (peer);
2634     #else /* ENABLE_MALICIOUS */
2635     (void) Peers_destroy_sending_channel (peer);
2636     #endif /* ENABLE_MALICIOUS */
2637   }
2638
2639   if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) &&
2640        (GNUNET_NO == View_contains_peer (peer)) &&
2641        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2642        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2643        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2644        (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2645        (GNUNET_NO != Peers_check_removable (peer)) )
2646   { /* We can safely remove this peer */
2647     LOG (GNUNET_ERROR_TYPE_DEBUG,
2648         "Going to remove peer %s\n",
2649         GNUNET_i2s (peer));
2650     remove_peer (peer);
2651     return;
2652   }
2653 }
2654
2655 /**
2656  * @brief This is called when a channel is destroyed.
2657  *
2658  * Removes peer completely from our knowledge if the send_channel was destroyed
2659  * Otherwise simply delete the recv_channel
2660  * Also check if the knowledge about this peer is still needed.
2661  * If not, remove this peer from our knowledge.
2662  *
2663  * @param cls The closure
2664  * @param channel The channel being closed
2665  * @param channel_ctx The context associated with this channel
2666  */
2667 static void
2668 cleanup_destroyed_channel (void *cls,
2669                            const struct GNUNET_CADET_Channel *channel)
2670 {
2671   struct GNUNET_PeerIdentity *peer = cls;
2672   uint32_t *channel_flag;
2673   struct PeerContext *peer_ctx;
2674
2675   GNUNET_assert (NULL != peer);
2676
2677   if (GNUNET_NO == Peers_check_peer_known (peer))
2678   { /* We don't know a context to that peer */
2679     LOG (GNUNET_ERROR_TYPE_WARNING,
2680          "channel (%s) without associated context was destroyed\n",
2681          GNUNET_i2s (peer));
2682     GNUNET_free (peer);
2683     return;
2684   }
2685
2686   peer_ctx = get_peer_ctx (peer);
2687   if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2688   {
2689     LOG (GNUNET_ERROR_TYPE_DEBUG,
2690         "Callback on destruction of recv-channel was called (%s)\n",
2691         GNUNET_i2s (peer));
2692     set_channel_flag (peer_ctx->recv_channel_flags, Peers_CHANNEL_DESTROING);
2693   } else if (GNUNET_YES == Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2694   {
2695     LOG (GNUNET_ERROR_TYPE_DEBUG,
2696         "Callback on destruction of send-channel was called (%s)\n",
2697         GNUNET_i2s (peer));
2698     set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_DESTROING);
2699   } else {
2700     LOG (GNUNET_ERROR_TYPE_ERROR,
2701         "Channel to be destroyed has is neither sending nor receiving role\n");
2702   }
2703
2704   if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY))
2705   { /* We are in the middle of removing that peer from our knowledge. In this
2706        case simply make sure that the channels are cleaned. */
2707     Peers_cleanup_destroyed_channel (cls, channel);
2708     to_file (file_name_view_log,
2709              "-%s\t(cleanup channel, ourself)",
2710              GNUNET_i2s_full (peer));
2711     GNUNET_free (peer);
2712     return;
2713   }
2714
2715   if (GNUNET_YES ==
2716       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING))
2717   { /* Channel used for sending was destroyed */
2718     /* Possible causes of channel destruction:
2719      *  - ourselves  -> cleaning send channel -> clean context
2720      *  - other peer -> peer probably went down -> remove
2721      */
2722     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_SENDING);
2723     if (GNUNET_YES == Peers_check_channel_flag (channel_flag, Peers_CHANNEL_CLEAN))
2724     { /* We are about to clean the sending channel. Clean the respective
2725        * context */
2726       Peers_cleanup_destroyed_channel (cls, channel);
2727       GNUNET_free (peer);
2728       return;
2729     }
2730     else
2731     { /* Other peer destroyed our sending channel that he is supposed to keep
2732        * open. It probably went down. Remove it from our knowledge. */
2733       Peers_cleanup_destroyed_channel (cls, channel);
2734       remove_peer (peer);
2735       GNUNET_free (peer);
2736       return;
2737     }
2738   }
2739   else if (GNUNET_YES ==
2740       Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING))
2741   { /* Channel used for receiving was destroyed */
2742     /* Possible causes of channel destruction:
2743      *  - ourselves  -> peer tried to establish channel twice -> clean context
2744      *  - other peer -> peer doesn't want to send us data -> clean
2745      */
2746     channel_flag = Peers_get_channel_flag (peer, Peers_CHANNEL_ROLE_RECEIVING);
2747     if (GNUNET_YES ==
2748         Peers_check_channel_flag (channel_flag, Peers_CHANNEL_ESTABLISHED_TWICE))
2749     { /* Other peer tried to establish a channel to us twice. We do not accept
2750        * that. Clean the context. */
2751       Peers_cleanup_destroyed_channel (cls, channel);
2752       GNUNET_free (peer);
2753       return;
2754     }
2755     else
2756     { /* Other peer doesn't want to send us data anymore. We are free to clean
2757        * it. */
2758       Peers_cleanup_destroyed_channel (cls, channel);
2759       clean_peer (peer);
2760       GNUNET_free (peer);
2761       return;
2762     }
2763   }
2764   else
2765   {
2766     LOG (GNUNET_ERROR_TYPE_WARNING,
2767         "Destroyed channel is neither sending nor receiving channel\n");
2768   }
2769   GNUNET_free (peer);
2770 }
2771
2772 /***********************************************************************
2773  * /Util functions
2774 ***********************************************************************/
2775
2776 static void
2777 destroy_reply_cls (struct ReplyCls *rep_cls)
2778 {
2779   struct ClientContext *cli_ctx;
2780
2781   cli_ctx = rep_cls->cli_ctx;
2782   GNUNET_assert (NULL != cli_ctx);
2783   if (NULL != rep_cls->req_handle)
2784   {
2785     RPS_sampler_request_cancel (rep_cls->req_handle);
2786   }
2787   GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2788                                cli_ctx->rep_cls_tail,
2789                                rep_cls);
2790   GNUNET_free (rep_cls);
2791 }
2792
2793
2794 static void
2795 destroy_cli_ctx (struct ClientContext *cli_ctx)
2796 {
2797   GNUNET_assert (NULL != cli_ctx);
2798   if (NULL != cli_ctx->rep_cls_head)
2799   {
2800     LOG (GNUNET_ERROR_TYPE_WARNING,
2801          "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2802     while (NULL != cli_ctx->rep_cls_head)
2803       destroy_reply_cls (cli_ctx->rep_cls_head);
2804   }
2805   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2806                                cli_ctx_tail,
2807                                cli_ctx);
2808   GNUNET_free (cli_ctx);
2809 }
2810
2811
2812 /**
2813  * Function called by NSE.
2814  *
2815  * Updates sizes of sampler list and view and adapt those lists
2816  * accordingly.
2817  */
2818 static void
2819 nse_callback (void *cls,
2820               struct GNUNET_TIME_Absolute timestamp,
2821               double logestimate, double std_dev)
2822 {
2823   double estimate;
2824   //double scale; // TODO this might go gloabal/config
2825
2826   LOG (GNUNET_ERROR_TYPE_DEBUG,
2827        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2828        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2829   //scale = .01;
2830   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2831   // GNUNET_NSE_log_estimate_to_n (logestimate);
2832   estimate = pow (estimate, 1.0 / 3);
2833   // TODO add if std_dev is a number
2834   // estimate += (std_dev * scale);
2835   if (view_size_est_min < ceil (estimate))
2836   {
2837     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2838     sampler_size_est_need = estimate;
2839     view_size_est_need = estimate;
2840   } else
2841   {
2842     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2843     //sampler_size_est_need = view_size_est_min;
2844     view_size_est_need = view_size_est_min;
2845   }
2846
2847   /* If the NSE has changed adapt the lists accordingly */
2848   resize_wrapper (prot_sampler, sampler_size_est_need);
2849   client_resize_wrapper ();
2850 }
2851
2852
2853 /**
2854  * Callback called once the requested PeerIDs are ready.
2855  *
2856  * Sends those to the requesting client.
2857  */
2858 static void
2859 client_respond (void *cls,
2860                 struct GNUNET_PeerIdentity *peer_ids,
2861                 uint32_t num_peers)
2862 {
2863   struct ReplyCls *reply_cls = cls;
2864   uint32_t i;
2865   struct GNUNET_MQ_Envelope *ev;
2866   struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2867   uint32_t size_needed;
2868   struct ClientContext *cli_ctx;
2869
2870   GNUNET_assert (NULL != reply_cls);
2871   LOG (GNUNET_ERROR_TYPE_DEBUG,
2872        "sampler returned %" PRIu32 " peers:\n",
2873        num_peers);
2874   for (i = 0; i < num_peers; i++)
2875   {
2876     LOG (GNUNET_ERROR_TYPE_DEBUG,
2877          "  %" PRIu32 ": %s\n",
2878          i,
2879          GNUNET_i2s (&peer_ids[i]));
2880   }
2881
2882   size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2883                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2884
2885   GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2886
2887   ev = GNUNET_MQ_msg_extra (out_msg,
2888                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2889                             GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2890   out_msg->num_peers = htonl (num_peers);
2891   out_msg->id = htonl (reply_cls->id);
2892
2893   GNUNET_memcpy (&out_msg[1],
2894           peer_ids,
2895           num_peers * sizeof (struct GNUNET_PeerIdentity));
2896   GNUNET_free (peer_ids);
2897
2898   cli_ctx = reply_cls->cli_ctx;
2899   GNUNET_assert (NULL != cli_ctx);
2900   reply_cls->req_handle = NULL;
2901   destroy_reply_cls (reply_cls);
2902   GNUNET_MQ_send (cli_ctx->mq, ev);
2903 }
2904
2905
2906 /**
2907  * Handle RPS request from the client.
2908  *
2909  * @param cls closure
2910  * @param message the actual message
2911  */
2912 static void
2913 handle_client_request (void *cls,
2914                        const struct GNUNET_RPS_CS_RequestMessage *msg)
2915 {
2916   struct ClientContext *cli_ctx = cls;
2917   uint32_t num_peers;
2918   uint32_t size_needed;
2919   struct ReplyCls *reply_cls;
2920   uint32_t i;
2921
2922   num_peers = ntohl (msg->num_peers);
2923   size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2924                 num_peers * sizeof (struct GNUNET_PeerIdentity);
2925
2926   if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2927   {
2928     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2929                 "Message received from client has size larger than expected\n");
2930     GNUNET_SERVICE_client_drop (cli_ctx->client);
2931     return;
2932   }
2933
2934   for (i = 0 ; i < num_peers ; i++)
2935     est_request_rate();
2936
2937   LOG (GNUNET_ERROR_TYPE_DEBUG,
2938        "Client requested %" PRIu32 " random peer(s).\n",
2939        num_peers);
2940
2941   reply_cls = GNUNET_new (struct ReplyCls);
2942   reply_cls->id = ntohl (msg->id);
2943   reply_cls->cli_ctx = cli_ctx;
2944   reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2945                                                         client_respond,
2946                                                         reply_cls,
2947                                                         num_peers);
2948
2949   GNUNET_assert (NULL != cli_ctx);
2950   GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2951                                cli_ctx->rep_cls_tail,
2952                                reply_cls);
2953   GNUNET_SERVICE_client_continue (cli_ctx->client);
2954 }
2955
2956
2957 /**
2958  * @brief Handle a message that requests the cancellation of a request
2959  *
2960  * @param cls unused
2961  * @param message the message containing the id of the request
2962  */
2963 static void
2964 handle_client_request_cancel (void *cls,
2965                               const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2966 {
2967   struct ClientContext *cli_ctx = cls;
2968   struct ReplyCls *rep_cls;
2969
2970   GNUNET_assert (NULL != cli_ctx);
2971   GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2972   rep_cls = cli_ctx->rep_cls_head;
2973   LOG (GNUNET_ERROR_TYPE_DEBUG,
2974       "Client cancels request with id %" PRIu32 "\n",
2975       ntohl (msg->id));
2976   while ( (NULL != rep_cls->next) &&
2977           (rep_cls->id != ntohl (msg->id)) )
2978     rep_cls = rep_cls->next;
2979   GNUNET_assert (rep_cls->id == ntohl (msg->id));
2980   destroy_reply_cls (rep_cls);
2981   GNUNET_SERVICE_client_continue (cli_ctx->client);
2982 }
2983
2984
2985 /**
2986  * @brief This function is called, when the client seeds peers.
2987  * It verifies that @a msg is well-formed.
2988  *
2989  * @param cls the closure (#ClientContext)
2990  * @param msg the message
2991  * @return #GNUNET_OK if @a msg is well-formed
2992  */
2993 static int
2994 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2995 {
2996   struct ClientContext *cli_ctx = cls;
2997   uint16_t msize = ntohs (msg->header.size);
2998   uint32_t num_peers = ntohl (msg->num_peers);
2999
3000   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3001   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3002        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3003   {
3004     GNUNET_break (0);
3005     GNUNET_SERVICE_client_drop (cli_ctx->client);
3006     return GNUNET_SYSERR;
3007   }
3008   return GNUNET_OK;
3009 }
3010
3011
3012 /**
3013  * Handle seed from the client.
3014  *
3015  * @param cls closure
3016  * @param message the actual message
3017  */
3018 static void
3019 handle_client_seed (void *cls,
3020                     const struct GNUNET_RPS_CS_SeedMessage *msg)
3021 {
3022   struct ClientContext *cli_ctx = cls;
3023   struct GNUNET_PeerIdentity *peers;
3024   uint32_t num_peers;
3025   uint32_t i;
3026
3027   num_peers = ntohl (msg->num_peers);
3028   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3029   //peers = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity);
3030   //GNUNET_memcpy (peers, &msg[1], num_peers * sizeof (struct GNUNET_PeerIdentity));
3031
3032   LOG (GNUNET_ERROR_TYPE_DEBUG,
3033        "Client seeded peers:\n");
3034   print_peer_list (peers, num_peers);
3035
3036   for (i = 0; i < num_peers; i++)
3037   {
3038     LOG (GNUNET_ERROR_TYPE_DEBUG,
3039          "Updating samplers with seed %" PRIu32 ": %s\n",
3040          i,
3041          GNUNET_i2s (&peers[i]));
3042
3043     got_peer (&peers[i]);
3044   }
3045
3046   ////GNUNET_free (peers);
3047
3048   GNUNET_SERVICE_client_continue (cli_ctx->client);
3049 }
3050
3051 /**
3052  * @brief Send view to client
3053  *
3054  * @param cli_ctx the context of the client
3055  * @param view_array the peerids of the view as array (can be empty)
3056  * @param view_size the size of the view array (can be 0)
3057  */
3058 void
3059 send_view (const struct ClientContext *cli_ctx,
3060            const struct GNUNET_PeerIdentity *view_array,
3061            uint64_t view_size)
3062 {
3063   struct GNUNET_MQ_Envelope *ev;
3064   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
3065
3066   if (NULL == view_array)
3067   {
3068     view_size = View_size ();
3069     view_array = View_get_as_array();
3070   }
3071
3072   ev = GNUNET_MQ_msg_extra (out_msg,
3073                             view_size * sizeof (struct GNUNET_PeerIdentity),
3074                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
3075   out_msg->num_peers = htonl (view_size);
3076
3077   GNUNET_memcpy (&out_msg[1],
3078           view_array,
3079           view_size * sizeof (struct GNUNET_PeerIdentity));
3080   GNUNET_MQ_send (cli_ctx->mq, ev);
3081 }
3082
3083 /**
3084  * @brief sends updates to clients that are interested
3085  */
3086 static void
3087 clients_notify_view_update (void)
3088 {
3089   struct ClientContext *cli_ctx_iter;
3090   uint64_t num_peers;
3091   const struct GNUNET_PeerIdentity *view_array;
3092
3093   num_peers = View_size ();
3094   view_array = View_get_as_array();
3095   /* check size of view is small enough */
3096   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
3097   {
3098     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3099                 "View is too big to send\n");
3100     return;
3101   }
3102
3103   for (cli_ctx_iter = cli_ctx_head;
3104        NULL != cli_ctx_iter;
3105        cli_ctx_iter = cli_ctx_head->next)
3106   {
3107     if (1 < cli_ctx_iter->view_updates_left)
3108     {
3109       /* Client wants to receive limited amount of updates */
3110       cli_ctx_iter->view_updates_left -= 1;
3111     } else if (1 == cli_ctx_iter->view_updates_left)
3112     {
3113       /* Last update of view for client */
3114       cli_ctx_iter->view_updates_left = -1;
3115     } else if (0 > cli_ctx_iter->view_updates_left) {
3116       /* Client is not interested in updates */
3117       continue;
3118     }
3119     /* else _updates_left == 0 - infinite amount of updates */
3120
3121     /* send view */
3122     send_view (cli_ctx_iter, view_array, num_peers);
3123   }
3124 }
3125
3126
3127 /**
3128  * Handle RPS request from the client.
3129  *
3130  * @param cls closure
3131  * @param message the actual message
3132  */
3133 static void
3134 handle_client_view_request (void *cls,
3135                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3136 {
3137   struct ClientContext *cli_ctx = cls;
3138   uint64_t num_updates;
3139
3140   num_updates = ntohl (msg->num_updates);
3141
3142   LOG (GNUNET_ERROR_TYPE_DEBUG,
3143        "Client requested %" PRIu64 " updates of view.\n",
3144        num_updates);
3145
3146   GNUNET_assert (NULL != cli_ctx);
3147   cli_ctx->view_updates_left = num_updates;
3148   send_view (cli_ctx, NULL, 0);
3149   GNUNET_SERVICE_client_continue (cli_ctx->client);
3150 }
3151
3152 /**
3153  * Handle a CHECK_LIVE message from another peer.
3154  *
3155  * This does nothing. But without calling #GNUNET_CADET_receive_done()
3156  * the channel is blocked for all other communication.
3157  *
3158  * @param cls Closure
3159  * @param msg The message header
3160  */
3161 static void
3162 handle_peer_check (void *cls,
3163                    const struct GNUNET_MessageHeader *msg)
3164 {
3165   const struct GNUNET_PeerIdentity *peer = cls;
3166   LOG (GNUNET_ERROR_TYPE_DEBUG,
3167       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3168
3169   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3170 }
3171
3172 /**
3173  * Handle a PUSH message from another peer.
3174  *
3175  * Check the proof of work and store the PeerID
3176  * in the temporary list for pushed PeerIDs.
3177  *
3178  * @param cls Closure
3179  * @param msg The message header
3180  */
3181 static void
3182 handle_peer_push (void *cls,
3183                   const struct GNUNET_MessageHeader *msg)
3184 {
3185   const struct GNUNET_PeerIdentity *peer = cls;
3186
3187   // (check the proof of work (?))
3188
3189   LOG (GNUNET_ERROR_TYPE_DEBUG,
3190        "Received PUSH (%s)\n",
3191        GNUNET_i2s (peer));
3192   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3193
3194   #ifdef ENABLE_MALICIOUS
3195   struct AttackedPeer *tmp_att_peer;
3196
3197   if ( (1 == mal_type) ||
3198        (3 == mal_type) )
3199   { /* Try to maximise representation */
3200     tmp_att_peer = GNUNET_new (struct AttackedPeer);
3201     tmp_att_peer->peer_id = *peer;
3202     if (NULL == att_peer_set)
3203       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3204     if (GNUNET_NO ==
3205         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3206                                                 peer))
3207     {
3208       GNUNET_CONTAINER_DLL_insert (att_peers_head,
3209                                    att_peers_tail,
3210                                    tmp_att_peer);
3211       add_peer_array_to_set (peer, 1, att_peer_set);
3212     }
3213   }
3214
3215
3216   else if (2 == mal_type)
3217   {
3218     /* We attack one single well-known peer - simply ignore */
3219   }
3220   #endif /* ENABLE_MALICIOUS */
3221
3222   /* Add the sending peer to the push_map */
3223   CustomPeerMap_put (push_map, peer);
3224
3225   GNUNET_break_op (Peers_check_peer_known (peer));
3226   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3227 }
3228
3229
3230 /**
3231  * Handle PULL REQUEST request message from another peer.
3232  *
3233  * Reply with the view of PeerIDs.
3234  *
3235  * @param cls Closure
3236  * @param msg The message header
3237  */
3238 static void
3239 handle_peer_pull_request (void *cls,
3240                           const struct GNUNET_MessageHeader *msg)
3241 {
3242   struct GNUNET_PeerIdentity *peer = cls;
3243   const struct GNUNET_PeerIdentity *view_array;
3244
3245   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3246   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3247
3248   #ifdef ENABLE_MALICIOUS
3249   if (1 == mal_type
3250       || 3 == mal_type)
3251   { /* Try to maximise representation */
3252     send_pull_reply (peer, mal_peers, num_mal_peers);
3253   }
3254
3255   else if (2 == mal_type)
3256   { /* Try to partition network */
3257     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3258     {
3259       send_pull_reply (peer, mal_peers, num_mal_peers);
3260     }
3261   }
3262   #endif /* ENABLE_MALICIOUS */
3263
3264   GNUNET_break_op (Peers_check_peer_known (peer));
3265   GNUNET_CADET_receive_done (Peers_get_recv_channel (peer));
3266   view_array = View_get_as_array ();
3267   send_pull_reply (peer, view_array, View_size ());
3268 }
3269
3270
3271 /**
3272  * Check whether we sent a corresponding request and
3273  * whether this reply is the first one.
3274  *
3275  * @param cls Closure
3276  * @param msg The message header
3277  */
3278 static int
3279 check_peer_pull_reply (void *cls,
3280                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3281 {
3282   struct GNUNET_PeerIdentity *sender = cls;
3283
3284   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3285   {
3286     GNUNET_break_op (0);
3287     return GNUNET_SYSERR;
3288   }
3289
3290   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3291       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3292   {
3293     LOG (GNUNET_ERROR_TYPE_ERROR,
3294         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3295         ntohl (msg->num_peers),
3296         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3297             sizeof (struct GNUNET_PeerIdentity));
3298     GNUNET_break_op (0);
3299     return GNUNET_SYSERR;
3300   }
3301
3302   if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3303   {
3304     LOG (GNUNET_ERROR_TYPE_WARNING,
3305         "Received a pull reply from a peer we didn't request one from!\n");
3306     GNUNET_break_op (0);
3307     return GNUNET_SYSERR;
3308   }
3309   return GNUNET_OK;
3310 }
3311
3312 /**
3313  * Handle PULL REPLY message from another peer.
3314  *
3315  * @param cls Closure
3316  * @param msg The message header
3317  */
3318 static void
3319 handle_peer_pull_reply (void *cls,
3320                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3321 {
3322   const struct GNUNET_PeerIdentity *peers;
3323   struct GNUNET_PeerIdentity *sender = cls;
3324   uint32_t i;
3325 #ifdef ENABLE_MALICIOUS
3326   struct AttackedPeer *tmp_att_peer;
3327 #endif /* ENABLE_MALICIOUS */
3328
3329   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3330   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3331
3332   #ifdef ENABLE_MALICIOUS
3333   // We shouldn't even receive pull replies as we're not sending
3334   if (2 == mal_type)
3335   {
3336   }
3337   #endif /* ENABLE_MALICIOUS */
3338
3339   /* Do actual logic */
3340   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3341
3342   LOG (GNUNET_ERROR_TYPE_DEBUG,
3343        "PULL REPLY received, got following %u peers:\n",
3344        ntohl (msg->num_peers));
3345
3346   for (i = 0; i < ntohl (msg->num_peers); i++)
3347   {
3348     LOG (GNUNET_ERROR_TYPE_DEBUG,
3349          "%u. %s\n",
3350          i,
3351          GNUNET_i2s (&peers[i]));
3352
3353     #ifdef ENABLE_MALICIOUS
3354     if ((NULL != att_peer_set) &&
3355         (1 == mal_type || 3 == mal_type))
3356     { /* Add attacked peer to local list */
3357       // TODO check if we sent a request and this was the first reply
3358       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3359                                                                &peers[i])
3360           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3361                                                                   &peers[i])
3362           && 0 != GNUNET_CRYPTO_cmp_peer_identity (&peers[i],
3363                                                    &own_identity))
3364       {
3365         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3366         tmp_att_peer->peer_id = peers[i];
3367         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3368                                      att_peers_tail,
3369                                      tmp_att_peer);
3370         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3371       }
3372       continue;
3373     }
3374     #endif /* ENABLE_MALICIOUS */
3375     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity,
3376                                               &peers[i]))
3377     {
3378       /* Make sure we 'know' about this peer */
3379       (void) Peers_insert_peer (&peers[i]);
3380
3381       if (GNUNET_YES == Peers_check_peer_valid (&peers[i]))
3382       {
3383         CustomPeerMap_put (pull_map, &peers[i]);
3384       }
3385       else
3386       {
3387         Peers_schedule_operation (&peers[i], insert_in_pull_map);
3388         (void) Peers_issue_peer_liveliness_check (&peers[i]);
3389       }
3390     }
3391   }
3392
3393   Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING);
3394   clean_peer (sender);
3395
3396   GNUNET_break_op (Peers_check_peer_known (sender));
3397   GNUNET_CADET_receive_done (Peers_get_recv_channel (sender));
3398 }
3399
3400
3401 /**
3402  * Compute a random delay.
3403  * A uniformly distributed value between mean + spread and mean - spread.
3404  *
3405  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3406  * It would return a random value between 2 and 6 min.
3407  *
3408  * @param mean the mean
3409  * @param spread the inverse amount of deviation from the mean
3410  */
3411 static struct GNUNET_TIME_Relative
3412 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3413                     unsigned int spread)
3414 {
3415   struct GNUNET_TIME_Relative half_interval;
3416   struct GNUNET_TIME_Relative ret;
3417   unsigned int rand_delay;
3418   unsigned int max_rand_delay;
3419
3420   if (0 == spread)
3421   {
3422     LOG (GNUNET_ERROR_TYPE_WARNING,
3423          "Not accepting spread of 0\n");
3424     GNUNET_break (0);
3425     GNUNET_assert (0);
3426   }
3427   GNUNET_assert (0 != mean.rel_value_us);
3428
3429   /* Compute random time value between spread * mean and spread * mean */
3430   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3431
3432   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3433   /**
3434    * Compute random value between (0 and 1) * round_interval
3435    * via multiplying round_interval with a 'fraction' (0 to value)/value
3436    */
3437   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3438   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3439   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3440   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3441
3442   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3443     LOG (GNUNET_ERROR_TYPE_WARNING,
3444          "Returning FOREVER_REL\n");
3445
3446   return ret;
3447 }
3448
3449
3450 /**
3451  * Send single pull request
3452  *
3453  * @param peer_id the peer to send the pull request to.
3454  */
3455 static void
3456 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3457 {
3458   struct GNUNET_MQ_Envelope *ev;
3459
3460   GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer,
3461                                                      Peers_PULL_REPLY_PENDING));
3462   Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING);
3463
3464   LOG (GNUNET_ERROR_TYPE_DEBUG,
3465        "Going to send PULL REQUEST to peer %s.\n",
3466        GNUNET_i2s (peer));
3467
3468   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3469   Peers_send_message (peer, ev, "PULL REQUEST");
3470   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3471 }
3472
3473
3474 /**
3475  * Send single push
3476  *
3477  * @param peer_id the peer to send the push to.
3478  */
3479 static void
3480 send_push (const struct GNUNET_PeerIdentity *peer_id)
3481 {
3482   struct GNUNET_MQ_Envelope *ev;
3483
3484   LOG (GNUNET_ERROR_TYPE_DEBUG,
3485        "Going to send PUSH to peer %s.\n",
3486        GNUNET_i2s (peer_id));
3487
3488   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3489   Peers_send_message (peer_id, ev, "PUSH");
3490   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3491 }
3492
3493
3494 static void
3495 do_round (void *cls);
3496
3497 static void
3498 do_mal_round (void *cls);
3499
3500 #ifdef ENABLE_MALICIOUS
3501
3502
3503 /**
3504  * @brief This function is called, when the client tells us to act malicious.
3505  * It verifies that @a msg is well-formed.
3506  *
3507  * @param cls the closure (#ClientContext)
3508  * @param msg the message
3509  * @return #GNUNET_OK if @a msg is well-formed
3510  */
3511 static int
3512 check_client_act_malicious (void *cls,
3513                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3514 {
3515   struct ClientContext *cli_ctx = cls;
3516   uint16_t msize = ntohs (msg->header.size);
3517   uint32_t num_peers = ntohl (msg->num_peers);
3518
3519   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3520   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3521        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3522   {
3523     LOG (GNUNET_ERROR_TYPE_ERROR,
3524         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3525         ntohl (msg->num_peers),
3526         (msize / sizeof (struct GNUNET_PeerIdentity)));
3527     GNUNET_break (0);
3528     GNUNET_SERVICE_client_drop (cli_ctx->client);
3529     return GNUNET_SYSERR;
3530   }
3531   return GNUNET_OK;
3532 }
3533
3534 /**
3535  * Turn RPS service to act malicious.
3536  *
3537  * @param cls Closure
3538  * @param client The client that sent the message
3539  * @param msg The message header
3540  */
3541 static void
3542 handle_client_act_malicious (void *cls,
3543                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3544 {
3545   struct ClientContext *cli_ctx = cls;
3546   struct GNUNET_PeerIdentity *peers;
3547   uint32_t num_mal_peers_sent;
3548   uint32_t num_mal_peers_old;
3549
3550   /* Do actual logic */
3551   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3552   mal_type = ntohl (msg->type);
3553   if (NULL == mal_peer_set)
3554     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3555
3556   LOG (GNUNET_ERROR_TYPE_DEBUG,
3557        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3558        mal_type,
3559        ntohl (msg->num_peers));
3560
3561   if (1 == mal_type)
3562   { /* Try to maximise representation */
3563     /* Add other malicious peers to those we already know */
3564
3565     num_mal_peers_sent = ntohl (msg->num_peers);
3566     num_mal_peers_old = num_mal_peers;
3567     GNUNET_array_grow (mal_peers,
3568                        num_mal_peers,
3569                        num_mal_peers + num_mal_peers_sent);
3570     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3571             peers,
3572             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3573
3574     /* Add all mal peers to mal_peer_set */
3575     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3576                            num_mal_peers_sent,
3577                            mal_peer_set);
3578
3579     /* Substitute do_round () with do_mal_round () */
3580     GNUNET_SCHEDULER_cancel (do_round_task);
3581     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3582   }
3583
3584   else if ( (2 == mal_type) ||
3585             (3 == mal_type) )
3586   { /* Try to partition the network */
3587     /* Add other malicious peers to those we already know */
3588
3589     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3590     num_mal_peers_old = num_mal_peers;
3591     GNUNET_array_grow (mal_peers,
3592                        num_mal_peers,
3593                        num_mal_peers + num_mal_peers_sent);
3594     if (NULL != mal_peers &&
3595         0 != num_mal_peers)
3596     {
3597       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3598               peers,
3599               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3600
3601       /* Add all mal peers to mal_peer_set */
3602       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3603                              num_mal_peers_sent,
3604                              mal_peer_set);
3605     }
3606
3607     /* Store the one attacked peer */
3608     GNUNET_memcpy (&attacked_peer,
3609             &msg->attacked_peer,
3610             sizeof (struct GNUNET_PeerIdentity));
3611     /* Set the flag of the attacked peer to valid to avoid problems */
3612     if (GNUNET_NO == Peers_check_peer_known (&attacked_peer))
3613     {
3614       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3615     }
3616
3617     LOG (GNUNET_ERROR_TYPE_DEBUG,
3618          "Attacked peer is %s\n",
3619          GNUNET_i2s (&attacked_peer));
3620
3621     /* Substitute do_round () with do_mal_round () */
3622     GNUNET_SCHEDULER_cancel (do_round_task);
3623     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3624   }
3625   else if (0 == mal_type)
3626   { /* Stop acting malicious */
3627     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3628
3629     /* Substitute do_mal_round () with do_round () */
3630     GNUNET_SCHEDULER_cancel (do_round_task);
3631     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3632   }
3633   else
3634   {
3635     GNUNET_break (0);
3636     GNUNET_SERVICE_client_continue (cli_ctx->client);
3637   }
3638   GNUNET_SERVICE_client_continue (cli_ctx->client);
3639 }
3640
3641
3642 /**
3643  * Send out PUSHes and PULLs maliciously.
3644  *
3645  * This is executed regylary.
3646  */
3647 static void
3648 do_mal_round (void *cls)
3649 {
3650   uint32_t num_pushes;
3651   uint32_t i;
3652   struct GNUNET_TIME_Relative time_next_round;
3653   struct AttackedPeer *tmp_att_peer;
3654
3655   LOG (GNUNET_ERROR_TYPE_DEBUG,
3656        "Going to execute next round maliciously type %" PRIu32 ".\n",
3657       mal_type);
3658   do_round_task = NULL;
3659   GNUNET_assert (mal_type <= 3);
3660   /* Do malicious actions */
3661   if (1 == mal_type)
3662   { /* Try to maximise representation */
3663
3664     /* The maximum of pushes we're going to send this round */
3665     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3666                                          num_attacked_peers),
3667                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3668
3669     LOG (GNUNET_ERROR_TYPE_DEBUG,
3670          "Going to send %" PRIu32 " pushes\n",
3671          num_pushes);
3672
3673     /* Send PUSHes to attacked peers */
3674     for (i = 0 ; i < num_pushes ; i++)
3675     {
3676       if (att_peers_tail == att_peer_index)
3677         att_peer_index = att_peers_head;
3678       else
3679         att_peer_index = att_peer_index->next;
3680
3681       send_push (&att_peer_index->peer_id);
3682     }
3683
3684     /* Send PULLs to some peers to learn about additional peers to attack */
3685     tmp_att_peer = att_peer_index;
3686     for (i = 0 ; i < num_pushes * alpha ; i++)
3687     {
3688       if (att_peers_tail == tmp_att_peer)
3689         tmp_att_peer = att_peers_head;
3690       else
3691         att_peer_index = tmp_att_peer->next;
3692
3693       send_pull_request (&tmp_att_peer->peer_id);
3694     }
3695   }
3696
3697
3698   else if (2 == mal_type)
3699   { /**
3700      * Try to partition the network
3701      * Send as many pushes to the attacked peer as possible
3702      * That is one push per round as it will ignore more.
3703      */
3704     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3705     if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3706       send_push (&attacked_peer);
3707   }
3708
3709
3710   if (3 == mal_type)
3711   { /* Combined attack */
3712
3713     /* Send PUSH to attacked peers */
3714     if (GNUNET_YES == Peers_check_peer_known (&attacked_peer))
3715     {
3716       (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3717       if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_ONLINE))
3718       {
3719         LOG (GNUNET_ERROR_TYPE_DEBUG,
3720             "Goding to send push to attacked peer (%s)\n",
3721             GNUNET_i2s (&attacked_peer));
3722         send_push (&attacked_peer);
3723       }
3724     }
3725     (void) Peers_issue_peer_liveliness_check (&attacked_peer);
3726
3727     /* The maximum of pushes we're going to send this round */
3728     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3729                                          num_attacked_peers),
3730                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3731
3732     LOG (GNUNET_ERROR_TYPE_DEBUG,
3733          "Going to send %" PRIu32 " pushes\n",
3734          num_pushes);
3735
3736     for (i = 0; i < num_pushes; i++)
3737     {
3738       if (att_peers_tail == att_peer_index)
3739         att_peer_index = att_peers_head;
3740       else
3741         att_peer_index = att_peer_index->next;
3742
3743       send_push (&att_peer_index->peer_id);
3744     }
3745
3746     /* Send PULLs to some peers to learn about additional peers to attack */
3747     tmp_att_peer = att_peer_index;
3748     for (i = 0; i < num_pushes * alpha; i++)
3749     {
3750       if (att_peers_tail == tmp_att_peer)
3751         tmp_att_peer = att_peers_head;
3752       else
3753         att_peer_index = tmp_att_peer->next;
3754
3755       send_pull_request (&tmp_att_peer->peer_id);
3756     }
3757   }
3758
3759   /* Schedule next round */
3760   time_next_round = compute_rand_delay (round_interval, 2);
3761
3762   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3763   //NULL);
3764   GNUNET_assert (NULL == do_round_task);
3765   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3766                                                 &do_mal_round, NULL);
3767   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3768 }
3769 #endif /* ENABLE_MALICIOUS */
3770
3771 /**
3772  * Send out PUSHes and PULLs, possibly update #view, samplers.
3773  *
3774  * This is executed regylary.
3775  */
3776 static void
3777 do_round (void *cls)
3778 {
3779   uint32_t i;
3780   const struct GNUNET_PeerIdentity *view_array;
3781   unsigned int *permut;
3782   unsigned int a_peers; /* Number of peers we send pushes to */
3783   unsigned int b_peers; /* Number of peers we send pull requests to */
3784   uint32_t first_border;
3785   uint32_t second_border;
3786   struct GNUNET_PeerIdentity peer;
3787   struct GNUNET_PeerIdentity *update_peer;
3788
3789   LOG (GNUNET_ERROR_TYPE_DEBUG,
3790        "Going to execute next round.\n");
3791   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3792   do_round_task = NULL;
3793   LOG (GNUNET_ERROR_TYPE_DEBUG,
3794        "Printing view:\n");
3795   to_file (file_name_view_log,
3796            "___ new round ___");
3797   view_array = View_get_as_array ();
3798   for (i = 0; i < View_size (); i++)
3799   {
3800     LOG (GNUNET_ERROR_TYPE_DEBUG,
3801          "\t%s\n", GNUNET_i2s (&view_array[i]));
3802     to_file (file_name_view_log,
3803              "=%s\t(do round)",
3804              GNUNET_i2s_full (&view_array[i]));
3805   }
3806
3807
3808   /* Send pushes and pull requests */
3809   if (0 < View_size ())
3810   {
3811     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3812                                            View_size ());
3813
3814     /* Send PUSHes */
3815     a_peers = ceil (alpha * View_size ());
3816
3817     LOG (GNUNET_ERROR_TYPE_DEBUG,
3818          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3819          a_peers, alpha, View_size ());
3820     for (i = 0; i < a_peers; i++)
3821     {
3822       peer = view_array[permut[i]];
3823       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer)) // TODO
3824       { // FIXME if this fails schedule/loop this for later
3825         send_push (&peer);
3826       }
3827     }
3828
3829     /* Send PULL requests */
3830     b_peers = ceil (beta * View_size ());
3831     first_border = a_peers;
3832     second_border = a_peers + b_peers;
3833     if (second_border > View_size ())
3834     {
3835       first_border = View_size () - b_peers;
3836       second_border = View_size ();
3837     }
3838     LOG (GNUNET_ERROR_TYPE_DEBUG,
3839         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3840         b_peers, beta, View_size ());
3841     for (i = first_border; i < second_border; i++)
3842     {
3843       peer = view_array[permut[i]];
3844       if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) &&
3845           GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO
3846       { // FIXME if this fails schedule/loop this for later
3847         send_pull_request (&peer);
3848       }
3849     }
3850
3851     GNUNET_free (permut);
3852     permut = NULL;
3853   }
3854
3855
3856   /* Update view */
3857   /* TODO see how many peers are in push-/pull- list! */
3858
3859   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3860       (0 < CustomPeerMap_size (push_map)) &&
3861       (0 < CustomPeerMap_size (pull_map)))
3862   //if (GNUNET_YES) // disable blocking temporarily
3863   { /* If conditions for update are fulfilled, update */
3864     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3865
3866     uint32_t final_size;
3867     uint32_t peers_to_clean_size;
3868     struct GNUNET_PeerIdentity *peers_to_clean;
3869
3870     peers_to_clean = NULL;
3871     peers_to_clean_size = 0;
3872     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3873     GNUNET_memcpy (peers_to_clean,
3874             view_array,
3875             View_size () * sizeof (struct GNUNET_PeerIdentity));
3876
3877     /* Seems like recreating is the easiest way of emptying the peermap */
3878     View_clear ();
3879     to_file (file_name_view_log,
3880              "--- emptied ---");
3881
3882     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3883                                 CustomPeerMap_size (push_map));
3884     second_border = first_border +
3885                     GNUNET_MIN (floor (beta  * view_size_est_need),
3886                                 CustomPeerMap_size (pull_map));
3887     final_size    = second_border +
3888       ceil ((1 - (alpha + beta)) * view_size_est_need);
3889     LOG (GNUNET_ERROR_TYPE_DEBUG,
3890         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3891         first_border,
3892         second_border,
3893         final_size);
3894
3895     /* Update view with peers received through PUSHes */
3896     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3897                                            CustomPeerMap_size (push_map));
3898     for (i = 0; i < first_border; i++)
3899     {
3900       (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3901                                                               permut[i]));
3902       to_file (file_name_view_log,
3903                "+%s\t(push list)",
3904                GNUNET_i2s_full (&view_array[i]));
3905       // TODO change the peer_flags accordingly
3906     }
3907     GNUNET_free (permut);
3908     permut = NULL;
3909
3910     /* Update view with peers received through PULLs */
3911     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3912                                            CustomPeerMap_size (pull_map));
3913     for (i = first_border; i < second_border; i++)
3914     {
3915       (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3916             permut[i - first_border]));
3917       to_file (file_name_view_log,
3918                "+%s\t(pull list)",
3919                GNUNET_i2s_full (&view_array[i]));
3920       // TODO change the peer_flags accordingly
3921     }
3922     GNUNET_free (permut);
3923     permut = NULL;
3924
3925     /* Update view with peers from history */
3926     RPS_sampler_get_n_rand_peers (prot_sampler,
3927                                   hist_update,
3928                                   NULL,
3929                                   final_size - second_border);
3930     // TODO change the peer_flags accordingly
3931
3932     for (i = 0; i < View_size (); i++)
3933       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3934
3935     /* Clean peers that were removed from the view */
3936     for (i = 0; i < peers_to_clean_size; i++)
3937     {
3938       to_file (file_name_view_log,
3939                "-%s",
3940                GNUNET_i2s_full (&peers_to_clean[i]));
3941       clean_peer (&peers_to_clean[i]);
3942       //peer_destroy_channel_send (sender);
3943     }
3944
3945     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3946     clients_notify_view_update();
3947   } else {
3948     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3949     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3950     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3951         !(0 >= CustomPeerMap_size (pull_map)))
3952       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3953     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3954         (0 >= CustomPeerMap_size (pull_map)))
3955       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3956     if (0 >= CustomPeerMap_size (push_map) &&
3957         !(0 >= CustomPeerMap_size (pull_map)))
3958       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3959     if (0 >= CustomPeerMap_size (push_map) &&
3960         (0 >= CustomPeerMap_size (pull_map)))
3961       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3962     if (0 >= CustomPeerMap_size (pull_map) &&
3963         CustomPeerMap_size (push_map) > alpha * View_size () &&
3964         0 >= CustomPeerMap_size (push_map))
3965       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3966   }
3967   // TODO independent of that also get some peers from CADET_get_peers()?
3968   GNUNET_STATISTICS_set (stats,
3969       "# peers in push map at end of round",
3970       CustomPeerMap_size (push_map),
3971       GNUNET_NO);
3972   GNUNET_STATISTICS_set (stats,
3973       "# peers in pull map at end of round",
3974       CustomPeerMap_size (pull_map),
3975       GNUNET_NO);
3976   GNUNET_STATISTICS_set (stats,
3977       "# peers in view at end of round",
3978       View_size (),
3979       GNUNET_NO);
3980
3981   LOG (GNUNET_ERROR_TYPE_DEBUG,
3982        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3983        CustomPeerMap_size (push_map),
3984        CustomPeerMap_size (pull_map),
3985        alpha,
3986        View_size (),
3987        alpha * View_size ());
3988
3989   /* Update samplers */
3990   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3991   {
3992     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3993     LOG (GNUNET_ERROR_TYPE_DEBUG,
3994          "Updating with peer %s from push list\n",
3995          GNUNET_i2s (update_peer));
3996     insert_in_sampler (NULL, update_peer);
3997     clean_peer (update_peer); /* This cleans only if it is not in the view */
3998     //peer_destroy_channel_send (sender);
3999   }
4000
4001   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
4002   {
4003     LOG (GNUNET_ERROR_TYPE_DEBUG,
4004          "Updating with peer %s from pull list\n",
4005          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
4006     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
4007     /* This cleans only if it is not in the view */
4008     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
4009     //peer_destroy_channel_send (sender);
4010   }
4011
4012
4013   /* Empty push/pull lists */
4014   CustomPeerMap_clear (push_map);
4015   CustomPeerMap_clear (pull_map);
4016
4017   struct GNUNET_TIME_Relative time_next_round;
4018
4019   time_next_round = compute_rand_delay (round_interval, 2);
4020
4021   /* Schedule next round */
4022   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4023                                                 &do_round, NULL);
4024   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4025 }
4026
4027
4028 /**
4029  * This is called from GNUNET_CADET_get_peers().
4030  *
4031  * It is called on every peer(ID) that cadet somehow has contact with.
4032  * We use those to initialise the sampler.
4033  */
4034 void
4035 init_peer_cb (void *cls,
4036               const struct GNUNET_PeerIdentity *peer,
4037               int tunnel, // "Do we have a tunnel towards this peer?"
4038               unsigned int n_paths, // "Number of known paths towards this peer"
4039               unsigned int best_path) // "How long is the best path?
4040                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
4041 {
4042   if (NULL != peer)
4043   {
4044     LOG (GNUNET_ERROR_TYPE_DEBUG,
4045          "Got peer_id %s from cadet\n",
4046          GNUNET_i2s (peer));
4047     got_peer (peer);
4048   }
4049 }
4050
4051 /**
4052  * @brief Iterator function over stored, valid peers.
4053  *
4054  * We initialise the sampler with those.
4055  *
4056  * @param cls the closure
4057  * @param peer the peer id
4058  * @return #GNUNET_YES if we should continue to
4059  *         iterate,
4060  *         #GNUNET_NO if not.
4061  */
4062 static int
4063 valid_peers_iterator (void *cls,
4064                       const struct GNUNET_PeerIdentity *peer)
4065 {
4066   if (NULL != peer)
4067   {
4068     LOG (GNUNET_ERROR_TYPE_DEBUG,
4069          "Got stored, valid peer %s\n",
4070          GNUNET_i2s (peer));
4071     got_peer (peer);
4072   }
4073   return GNUNET_YES;
4074 }
4075
4076
4077 /**
4078  * Iterator over peers from peerinfo.
4079  *
4080  * @param cls closure
4081  * @param peer id of the peer, NULL for last call
4082  * @param hello hello message for the peer (can be NULL)
4083  * @param error message
4084  */
4085 void
4086 process_peerinfo_peers (void *cls,
4087                         const struct GNUNET_PeerIdentity *peer,
4088                         const struct GNUNET_HELLO_Message *hello,
4089                         const char *err_msg)
4090 {
4091   if (NULL != peer)
4092   {
4093     LOG (GNUNET_ERROR_TYPE_DEBUG,
4094          "Got peer_id %s from peerinfo\n",
4095          GNUNET_i2s (peer));
4096     got_peer (peer);
4097   }
4098 }
4099
4100
4101 /**
4102  * Task run during shutdown.
4103  *
4104  * @param cls unused
4105  */
4106 static void
4107 shutdown_task (void *cls)
4108 {
4109   struct ClientContext *client_ctx;
4110   struct ReplyCls *reply_cls;
4111
4112   LOG (GNUNET_ERROR_TYPE_DEBUG,
4113        "RPS is going down\n");
4114
4115   /* Clean all clients */
4116   for (client_ctx = cli_ctx_head;
4117        NULL != cli_ctx_head;
4118        client_ctx = cli_ctx_head)
4119   {
4120     /* Clean pending requests to the sampler */
4121     for (reply_cls = client_ctx->rep_cls_head;
4122          NULL != client_ctx->rep_cls_head;
4123          reply_cls = client_ctx->rep_cls_head)
4124     {
4125       RPS_sampler_request_cancel (reply_cls->req_handle);
4126       GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
4127                                    client_ctx->rep_cls_tail,
4128                                    reply_cls);
4129       GNUNET_free (reply_cls);
4130     }
4131     GNUNET_CONTAINER_DLL_remove (cli_ctx_head, cli_ctx_tail, client_ctx);
4132     GNUNET_free (client_ctx);
4133   }
4134   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4135   GNUNET_PEERINFO_disconnect (peerinfo_handle);
4136
4137   if (NULL != do_round_task)
4138   {
4139     GNUNET_SCHEDULER_cancel (do_round_task);
4140     do_round_task = NULL;
4141   }
4142
4143   Peers_terminate ();
4144
4145   GNUNET_NSE_disconnect (nse);
4146   RPS_sampler_destroy (prot_sampler);
4147   RPS_sampler_destroy (client_sampler);
4148   GNUNET_CADET_close_port (cadet_port);
4149   GNUNET_CADET_disconnect (cadet_handle);
4150   View_destroy ();
4151   CustomPeerMap_destroy (push_map);
4152   CustomPeerMap_destroy (pull_map);
4153   if (NULL != stats)
4154   {
4155     GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
4156     stats = NULL;
4157   }
4158   #ifdef ENABLE_MALICIOUS
4159   struct AttackedPeer *tmp_att_peer;
4160   /* it is ok to free this const during shutdown: */
4161   GNUNET_free ((char *) file_name_view_log);
4162   #ifdef TO_FILE
4163   GNUNET_free ((char *) file_name_observed_log);
4164   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
4165   #endif /* TO_FILE */
4166   GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4167   if (NULL != mal_peer_set)
4168     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4169   if (NULL != att_peer_set)
4170     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4171   while (NULL != att_peers_head)
4172   {
4173     tmp_att_peer = att_peers_head;
4174     GNUNET_CONTAINER_DLL_remove (att_peers_head, att_peers_tail, tmp_att_peer);
4175   }
4176   #endif /* ENABLE_MALICIOUS */
4177 }
4178
4179
4180 /**
4181  * Handle client connecting to the service.
4182  *
4183  * @param cls NULL
4184  * @param client the new client
4185  * @param mq the message queue of @a client
4186  * @return @a client
4187  */
4188 static void *
4189 client_connect_cb (void *cls,
4190                    struct GNUNET_SERVICE_Client *client,
4191                    struct GNUNET_MQ_Handle *mq)
4192 {
4193   struct ClientContext *cli_ctx;
4194
4195   LOG (GNUNET_ERROR_TYPE_DEBUG,
4196        "Client connected\n");
4197   if (NULL == client)
4198     return client; /* Server was destroyed before a client connected. Shutting down */
4199   cli_ctx = GNUNET_new (struct ClientContext);
4200   cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
4201   cli_ctx->view_updates_left = -1;
4202   cli_ctx->client = client;
4203   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4204                                cli_ctx_tail,
4205                                cli_ctx);
4206   return cli_ctx;
4207 }
4208
4209 /**
4210  * Callback called when a client disconnected from the service
4211  *
4212  * @param cls closure for the service
4213  * @param c the client that disconnected
4214  * @param internal_cls should be equal to @a c
4215  */
4216 static void
4217 client_disconnect_cb (void *cls,
4218                       struct GNUNET_SERVICE_Client *client,
4219                       void *internal_cls)
4220 {
4221   struct ClientContext *cli_ctx = internal_cls;
4222
4223   GNUNET_assert (client == cli_ctx->client);
4224   if (NULL == client)
4225   {/* shutdown task - destroy all clients */
4226     while (NULL != cli_ctx_head)
4227       destroy_cli_ctx (cli_ctx_head);
4228   }
4229   else
4230   { /* destroy this client */
4231     LOG (GNUNET_ERROR_TYPE_DEBUG,
4232         "Client disconnected. Destroy its context.\n");
4233     destroy_cli_ctx (cli_ctx);
4234   }
4235 }
4236
4237
4238 /**
4239  * Handle random peer sampling clients.
4240  *
4241  * @param cls closure
4242  * @param c configuration to use
4243  * @param service the initialized service
4244  */
4245 static void
4246 run (void *cls,
4247      const struct GNUNET_CONFIGURATION_Handle *c,
4248      struct GNUNET_SERVICE_Handle *service)
4249 {
4250   char* fn_valid_peers;
4251
4252   GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL);
4253   cfg = c;
4254
4255
4256   /* Get own ID */
4257   GNUNET_CRYPTO_get_peer_identity (cfg, &own_identity); // TODO check return value
4258   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4259               "STARTING SERVICE (rps) for peer [%s]\n",
4260               GNUNET_i2s (&own_identity));
4261   #ifdef ENABLE_MALICIOUS
4262   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4263               "Malicious execution compiled in.\n");
4264   #endif /* ENABLE_MALICIOUS */
4265
4266
4267
4268   /* Get time interval from the configuration */
4269   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
4270                                                         "ROUNDINTERVAL",
4271                                                         &round_interval))
4272   {
4273     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4274                                "RPS", "ROUNDINTERVAL");
4275     GNUNET_SCHEDULER_shutdown ();
4276     return;
4277   }
4278
4279   /* Get initial size of sampler/view from the configuration */
4280   if (GNUNET_OK !=
4281       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4282         (long long unsigned int *) &sampler_size_est_min))
4283   {
4284     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4285                                "RPS", "MINSIZE");
4286     GNUNET_SCHEDULER_shutdown ();
4287     return;
4288   }
4289   sampler_size_est_need = sampler_size_est_min;
4290   view_size_est_min = sampler_size_est_min;
4291   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4292
4293   if (GNUNET_OK !=
4294       GNUNET_CONFIGURATION_get_value_filename (cfg,
4295                                                "rps",
4296                                                "FILENAME_VALID_PEERS",
4297                                                &fn_valid_peers))
4298   {
4299     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4300                                "rps", "FILENAME_VALID_PEERS");
4301   }
4302
4303
4304   View_create (view_size_est_min);
4305
4306   /* file_name_view_log */
4307   file_name_view_log = store_prefix_file_name (&own_identity, "view");
4308   #ifdef TO_FILE
4309   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4310   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4311   #endif /* TO_FILE */
4312
4313   /* connect to NSE */
4314   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4315
4316
4317   alpha = 0.45;
4318   beta  = 0.45;
4319
4320
4321   /* Initialise cadet */
4322   /* There exists a copy-paste-clone in get_channel() */
4323   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4324     GNUNET_MQ_hd_fixed_size (peer_check,
4325                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4326                              struct GNUNET_MessageHeader,
4327                              NULL),
4328     GNUNET_MQ_hd_fixed_size (peer_push,
4329                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4330                              struct GNUNET_MessageHeader,
4331                              NULL),
4332     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4333                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4334                              struct GNUNET_MessageHeader,
4335                              NULL),
4336     GNUNET_MQ_hd_var_size (peer_pull_reply,
4337                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4338                            struct GNUNET_RPS_P2P_PullReplyMessage,
4339                            NULL),
4340     GNUNET_MQ_handler_end ()
4341   };
4342
4343   cadet_handle = GNUNET_CADET_connect (cfg);
4344   GNUNET_assert (NULL != cadet_handle);
4345   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4346                       strlen (GNUNET_APPLICATION_PORT_RPS),
4347                       &port);
4348   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4349                                        &port,
4350                                        &Peers_handle_inbound_channel, /* Connect handler */
4351                                        NULL, /* cls */
4352                                        NULL, /* WindowSize handler */
4353                                        cleanup_destroyed_channel, /* Disconnect handler */
4354                                        cadet_handlers);
4355
4356
4357   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4358   Peers_initialise (fn_valid_peers, cadet_handle, &own_identity);
4359   GNUNET_free (fn_valid_peers);
4360
4361   /* Initialise sampler */
4362   struct GNUNET_TIME_Relative half_round_interval;
4363   struct GNUNET_TIME_Relative  max_round_interval;
4364
4365   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4366   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4367
4368   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4369   client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4370
4371   /* Initialise push and pull maps */
4372   push_map = CustomPeerMap_create (4);
4373   pull_map = CustomPeerMap_create (4);
4374
4375
4376   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4377   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4378   // TODO send push/pull to each of those peers?
4379   // TODO read stored valid peers from last run
4380   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4381   Peers_get_valid_peers (valid_peers_iterator, NULL);
4382
4383   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4384                                                    GNUNET_NO,
4385                                                    process_peerinfo_peers,
4386                                                    NULL);
4387
4388   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4389
4390   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4391   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4392
4393   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4394   stats = GNUNET_STATISTICS_create ("rps", cfg);
4395
4396 }
4397
4398
4399 /**
4400  * Define "main" method using service macro.
4401  */
4402 GNUNET_SERVICE_MAIN
4403 ("rps",
4404  GNUNET_SERVICE_OPTION_NONE,
4405  &run,
4406  &client_connect_cb,
4407  &client_disconnect_cb,
4408  NULL,
4409  GNUNET_MQ_hd_fixed_size (client_request,
4410    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4411    struct GNUNET_RPS_CS_RequestMessage,
4412    NULL),
4413  GNUNET_MQ_hd_fixed_size (client_request_cancel,
4414    GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4415    struct GNUNET_RPS_CS_RequestCancelMessage,
4416    NULL),
4417  GNUNET_MQ_hd_var_size (client_seed,
4418    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4419    struct GNUNET_RPS_CS_SeedMessage,
4420    NULL),
4421 #ifdef ENABLE_MALICIOUS
4422  GNUNET_MQ_hd_var_size (client_act_malicious,
4423    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4424    struct GNUNET_RPS_CS_ActMaliciousMessage,
4425    NULL),
4426 #endif /* ENABLE_MALICIOUS */
4427  GNUNET_MQ_hd_fixed_size (client_view_request,
4428    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4429    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4430    NULL),
4431  GNUNET_MQ_handler_end());
4432
4433 /* end of gnunet-service-rps.c */