Fix details and style
[oweals/gnunet.git] / src / rps / gnunet-service-rps.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2013-2015 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file rps/gnunet-service-rps.c
21  * @brief rps service implementation
22  * @author Julius Bünger
23  */
24 #include "platform.h"
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
31 #include "rps.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
36
37 #include <math.h>
38 #include <inttypes.h>
39
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
41
42 // TODO modify @brief in every file
43
44 // TODO check for overflows
45
46 // TODO align message structs
47
48 // TODO connect to friends
49
50 // TODO blacklist? (-> mal peer detection on top of brahms)
51
52 // hist_size_init, hist_size_max
53
54 /**
55  * Our configuration.
56  */
57 static const struct GNUNET_CONFIGURATION_Handle *cfg;
58
59 /**
60  * Handle to the statistics service.
61  */
62 struct GNUNET_STATISTICS_Handle *stats;
63
64 /**
65  * Our own identity.
66  */
67 static struct GNUNET_PeerIdentity own_identity;
68
69 static int in_shutdown = GNUNET_NO;
70
71 /**
72  * @brief Port used for cadet.
73  *
74  * Don't compute multiple times through making it global
75  */
76 static struct GNUNET_HashCode port;
77
78 /***********************************************************************
79  * Old gnunet-service-rps_peers.c
80 ***********************************************************************/
81
82 /**
83  * Set a peer flag of given peer context.
84  */
85 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
86
87 /**
88  * Get peer flag of given peer context.
89  */
90 #define check_peer_flag_set(peer_ctx, mask)\
91   ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
92
93 /**
94  * Unset flag of given peer context.
95  */
96 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
97
98 /**
99  * Get channel flag of given channel context.
100  */
101 #define check_channel_flag_set(channel_flags, mask)\
102   ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
103
104 /**
105  * Unset flag of given channel context.
106  */
107 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
108
109
110
111 /**
112  * Pending operation on peer consisting of callback and closure
113  *
114  * When an operation cannot be executed right now this struct is used to store
115  * the callback and closure for later execution.
116  */
117 struct PeerPendingOp
118 {
119   /**
120    * Callback
121    */
122   PeerOp op;
123
124   /**
125    * Closure
126    */
127   void *op_cls;
128 };
129
130 /**
131  * List containing all messages that are yet to be send
132  *
133  * This is used to keep track of all messages that have not been sent yet. When
134  * a peer is to be removed the pending messages can be removed properly.
135  */
136 struct PendingMessage
137 {
138   /**
139    * DLL next, prev
140    */
141   struct PendingMessage *next;
142   struct PendingMessage *prev;
143
144   /**
145    * The envelope to the corresponding message
146    */
147   struct GNUNET_MQ_Envelope *ev;
148
149   /**
150    * The corresponding context
151    */
152   struct PeerContext *peer_ctx;
153
154   /**
155    * The message type
156    */
157   const char *type;
158 };
159
160 /**
161  * @brief Context for a channel
162  */
163 struct ChannelCtx;
164
165 /**
166  * Struct used to keep track of other peer's status
167  *
168  * This is stored in a multipeermap.
169  * It contains information such as cadet channels, a message queue for sending,
170  * status about the channels, the pending operations on this peer and some flags
171  * about the status of the peer itself. (live, valid, ...)
172  */
173 struct PeerContext
174 {
175   /**
176    * Message queue open to client
177    */
178   struct GNUNET_MQ_Handle *mq;
179
180   /**
181    * Channel open to client.
182    */
183   struct ChannelCtx *send_channel_ctx;
184
185   /**
186    * Channel open from client.
187    */
188   struct ChannelCtx *recv_channel_ctx;
189
190   /**
191    * Array of pending operations on this peer.
192    */
193   struct PeerPendingOp *pending_ops;
194
195   /**
196    * Handle to the callback given to cadet_ntfy_tmt_rdy()
197    *
198    * To be canceled on shutdown.
199    */
200   struct PendingMessage *liveliness_check_pending;
201
202   /**
203    * Number of pending operations.
204    */
205   unsigned int num_pending_ops;
206
207   /**
208    * Identity of the peer
209    */
210   struct GNUNET_PeerIdentity peer_id;
211
212   /**
213    * Flags indicating status of peer
214    */
215   uint32_t peer_flags;
216
217   /**
218    * Last time we received something from that peer.
219    */
220   struct GNUNET_TIME_Absolute last_message_recv;
221
222   /**
223    * Last time we received a keepalive message.
224    */
225   struct GNUNET_TIME_Absolute last_keepalive;
226
227   /**
228    * DLL with all messages that are yet to be sent
229    */
230   struct PendingMessage *pending_messages_head;
231   struct PendingMessage *pending_messages_tail;
232
233   /**
234    * This is pobably followed by 'statistical' data (when we first saw
235    * it, how did we get its ID, how many pushes (in a timeinterval),
236    * ...)
237    */
238 };
239
240 /**
241  * @brief Closure to #valid_peer_iterator
242  */
243 struct PeersIteratorCls
244 {
245   /**
246    * Iterator function
247    */
248   PeersIterator iterator;
249
250   /**
251    * Closure to iterator
252    */
253   void *cls;
254 };
255
256 /**
257  * @brief Context for a channel
258  */
259 struct ChannelCtx
260 {
261   /**
262    * @brief The channel itself
263    */
264   struct GNUNET_CADET_Channel *channel;
265
266   /**
267    * @brief The peer context associated with the channel
268    */
269   struct PeerContext *peer_ctx;
270
271   /**
272    * @brief When channel destruction needs to be delayed (because it is called
273    * from within the cadet routine of another channel destruction) this task
274    * refers to the respective _SCHEDULER_Task.
275    */
276   struct GNUNET_SCHEDULER_Task *destruction_task;
277 };
278
279 /**
280  * @brief Hashmap of valid peers.
281  */
282 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
283
284 /**
285  * @brief Maximum number of valid peers to keep.
286  * TODO read from config
287  */
288 static uint32_t num_valid_peers_max = UINT32_MAX;
289
290 /**
291  * @brief Filename of the file that stores the valid peers persistently.
292  */
293 static char *filename_valid_peers;
294
295 /**
296  * Set of all peers to keep track of them.
297  */
298 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
299
300 /**
301  * Cadet handle.
302  */
303 static struct GNUNET_CADET_Handle *cadet_handle;
304
305
306 /**
307  * @brief Get the #PeerContext associated with a peer
308  *
309  * @param peer the peer id
310  *
311  * @return the #PeerContext
312  */
313 static struct PeerContext *
314 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
315 {
316   struct PeerContext *ctx;
317   int ret;
318
319   ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
320   GNUNET_assert (GNUNET_YES == ret);
321   ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
322   GNUNET_assert (NULL != ctx);
323   return ctx;
324 }
325
326 /**
327  * @brief Check whether we have information about the given peer.
328  *
329  * FIXME probably deprecated. Make this the new _online.
330  *
331  * @param peer peer in question
332  *
333  * @return #GNUNET_YES if peer is known
334  *         #GNUNET_NO  if peer is not knwon
335  */
336 static int
337 check_peer_known (const struct GNUNET_PeerIdentity *peer)
338 {
339   if (NULL != peer_map)
340   {
341     return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
342   } else
343   {
344     return GNUNET_NO;
345   }
346 }
347
348
349 /**
350  * @brief Create a new #PeerContext and insert it into the peer map
351  *
352  * @param peer the peer to create the #PeerContext for
353  *
354  * @return the #PeerContext
355  */
356 static struct PeerContext *
357 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
358 {
359   struct PeerContext *ctx;
360   int ret;
361
362   GNUNET_assert (GNUNET_NO == check_peer_known (peer));
363
364   ctx = GNUNET_new (struct PeerContext);
365   ctx->peer_id = *peer;
366   ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
367       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
368   GNUNET_assert (GNUNET_OK == ret);
369   GNUNET_STATISTICS_set (stats,
370                         "# known peers",
371                         GNUNET_CONTAINER_multipeermap_size (peer_map),
372                         GNUNET_NO);
373   return ctx;
374 }
375
376
377 /**
378  * @brief Create or get a #PeerContext
379  *
380  * @param peer the peer to get the associated context to
381  *
382  * @return the context
383  */
384 static struct PeerContext *
385 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
386 {
387   if (GNUNET_NO == check_peer_known (peer))
388   {
389     return create_peer_ctx (peer);
390   }
391   return get_peer_ctx (peer);
392 }
393
394
395 /**
396  * @brief Check whether we have a connection to this @a peer
397  *
398  * Also sets the #Peers_ONLINE flag accordingly
399  *
400  * @param peer the peer in question
401  *
402  * @return #GNUNET_YES if we are connected
403  *         #GNUNET_NO  otherwise
404  */
405 static int
406 check_connected (const struct GNUNET_PeerIdentity *peer)
407 {
408   struct PeerContext *peer_ctx;
409
410   /* If we don't know about this peer we don't know whether it's online */
411   if (GNUNET_NO == check_peer_known (peer))
412   {
413     return GNUNET_NO;
414   }
415   /* Get the context */
416   peer_ctx = get_peer_ctx (peer);
417   /* If we have no channel to this peer we don't know whether it's online */
418   if ( (NULL == peer_ctx->send_channel_ctx) &&
419        (NULL == peer_ctx->recv_channel_ctx) )
420   {
421     UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
422     return GNUNET_NO;
423   }
424   /* Otherwise (if we have a channel, we know that it's online */
425   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
426   return GNUNET_YES;
427 }
428
429
430 /**
431  * @brief The closure to #get_rand_peer_iterator.
432  */
433 struct GetRandPeerIteratorCls
434 {
435   /**
436    * @brief The index of the peer to return.
437    * Will be decreased until 0.
438    * Then current peer is returned.
439    */
440   uint32_t index;
441
442   /**
443    * @brief Pointer to peer to return.
444    */
445   const struct GNUNET_PeerIdentity *peer;
446 };
447
448
449 /**
450  * @brief Iterator function for #get_random_peer_from_peermap.
451  *
452  * Implements #GNUNET_CONTAINER_PeerMapIterator.
453  * Decreases the index until the index is null.
454  * Then returns the current peer.
455  *
456  * @param cls the #GetRandPeerIteratorCls containing index and peer
457  * @param peer current peer
458  * @param value unused
459  *
460  * @return  #GNUNET_YES if we should continue to
461  *          iterate,
462  *          #GNUNET_NO if not.
463  */
464 static int
465 get_rand_peer_iterator (void *cls,
466                         const struct GNUNET_PeerIdentity *peer,
467                         void *value)
468 {
469   struct GetRandPeerIteratorCls *iterator_cls = cls;
470   (void) value;
471
472   if (0 >= iterator_cls->index)
473   {
474     iterator_cls->peer = peer;
475     return GNUNET_NO;
476   }
477   iterator_cls->index--;
478   return GNUNET_YES;
479 }
480
481
482 /**
483  * @brief Get a random peer from @a peer_map
484  *
485  * @param peer_map the peer_map to get the peer from
486  *
487  * @return a random peer
488  */
489 static const struct GNUNET_PeerIdentity *
490 get_random_peer_from_peermap (const struct
491                               GNUNET_CONTAINER_MultiPeerMap *peer_map)
492 {
493   struct GetRandPeerIteratorCls *iterator_cls;
494   const struct GNUNET_PeerIdentity *ret;
495
496   iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
497   iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
498       GNUNET_CONTAINER_multipeermap_size (peer_map));
499   (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
500                                                 get_rand_peer_iterator,
501                                                 iterator_cls);
502   ret = iterator_cls->peer;
503   GNUNET_free (iterator_cls);
504   return ret;
505 }
506
507
508 /**
509  * @brief Add a given @a peer to valid peers.
510  *
511  * If valid peers are already #num_valid_peers_max, delete a peer previously.
512  *
513  * @param peer the peer that is added to the valid peers.
514  *
515  * @return #GNUNET_YES if no other peer had to be removed
516  *         #GNUNET_NO  otherwise
517  */
518 static int
519 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
520 {
521   const struct GNUNET_PeerIdentity *rand_peer;
522   int ret;
523
524   ret = GNUNET_YES;
525   while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
526   {
527     rand_peer = get_random_peer_from_peermap (valid_peers);
528     GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
529     ret = GNUNET_NO;
530   }
531   (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
532       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
533   GNUNET_STATISTICS_set (stats,
534                          "# valid peers",
535                          GNUNET_CONTAINER_multipeermap_size (valid_peers),
536                          GNUNET_NO);
537   return ret;
538 }
539
540 static void
541 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
542
543 /**
544  * @brief Set the peer flag to living and
545  *        call the pending operations on this peer.
546  *
547  * Also adds peer to #valid_peers.
548  *
549  * @param peer_ctx the #PeerContext of the peer to set live
550  */
551 static void
552 set_peer_live (struct PeerContext *peer_ctx)
553 {
554   struct GNUNET_PeerIdentity *peer;
555   unsigned int i;
556
557   peer = &peer_ctx->peer_id;
558   LOG (GNUNET_ERROR_TYPE_DEBUG,
559       "Peer %s is live and valid, calling %i pending operations on it\n",
560       GNUNET_i2s (peer),
561       peer_ctx->num_pending_ops);
562
563   if (NULL != peer_ctx->liveliness_check_pending)
564   {
565     LOG (GNUNET_ERROR_TYPE_DEBUG,
566          "Removing pending liveliness check for peer %s\n",
567          GNUNET_i2s (&peer_ctx->peer_id));
568     // TODO wait until cadet sets mq->cancel_impl
569     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
570     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
571     peer_ctx->liveliness_check_pending = NULL;
572   }
573
574   (void) add_valid_peer (peer);
575   SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
576
577   /* Call pending operations */
578   for (i = 0; i < peer_ctx->num_pending_ops; i++)
579   {
580     peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
581   }
582   GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
583 }
584
585 static void
586 cleanup_destroyed_channel (void *cls,
587                            const struct GNUNET_CADET_Channel *channel);
588
589 /* Declaration of handlers */
590 static void
591 handle_peer_check (void *cls,
592                    const struct GNUNET_MessageHeader *msg);
593
594 static void
595 handle_peer_push (void *cls,
596                   const struct GNUNET_MessageHeader *msg);
597
598 static void
599 handle_peer_pull_request (void *cls,
600                           const struct GNUNET_MessageHeader *msg);
601
602 static int
603 check_peer_pull_reply (void *cls,
604                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
605
606 static void
607 handle_peer_pull_reply (void *cls,
608                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
609
610 /* End declaration of handlers */
611
612 /**
613  * @brief Allocate memory for a new channel context and insert it into DLL
614  *
615  * @param peer_ctx context of the according peer
616  *
617  * @return The channel context
618  */
619 static struct ChannelCtx *
620 add_channel_ctx (struct PeerContext *peer_ctx)
621 {
622   struct ChannelCtx *channel_ctx;
623   channel_ctx = GNUNET_new (struct ChannelCtx);
624   channel_ctx->peer_ctx = peer_ctx;
625   return channel_ctx;
626 }
627
628
629 /**
630  * @brief Free memory and NULL pointers.
631  *
632  * @param channel_ctx The channel context.
633  */
634 static void
635 remove_channel_ctx (struct ChannelCtx *channel_ctx)
636 {
637   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
638
639   if (NULL != channel_ctx->destruction_task)
640   {
641     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
642     channel_ctx->destruction_task = NULL;
643   }
644
645   GNUNET_free (channel_ctx);
646
647   if (NULL == peer_ctx) return;
648   if (channel_ctx == peer_ctx->send_channel_ctx)
649   {
650     peer_ctx->send_channel_ctx = NULL;
651     peer_ctx->mq = NULL;
652   }
653   else if (channel_ctx == peer_ctx->recv_channel_ctx)
654   {
655     peer_ctx->recv_channel_ctx = NULL;
656   }
657 }
658
659
660 /**
661  * @brief Get the channel of a peer. If not existing, create.
662  *
663  * @param peer the peer id
664  * @return the #GNUNET_CADET_Channel used to send data to @a peer
665  */
666 struct GNUNET_CADET_Channel *
667 get_channel (const struct GNUNET_PeerIdentity *peer)
668 {
669   struct PeerContext *peer_ctx;
670   struct GNUNET_PeerIdentity *ctx_peer;
671   /* There exists a copy-paste-clone in run() */
672   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
673     GNUNET_MQ_hd_fixed_size (peer_check,
674                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
675                              struct GNUNET_MessageHeader,
676                              NULL),
677     GNUNET_MQ_hd_fixed_size (peer_push,
678                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
679                              struct GNUNET_MessageHeader,
680                              NULL),
681     GNUNET_MQ_hd_fixed_size (peer_pull_request,
682                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
683                              struct GNUNET_MessageHeader,
684                              NULL),
685     GNUNET_MQ_hd_var_size (peer_pull_reply,
686                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
687                            struct GNUNET_RPS_P2P_PullReplyMessage,
688                            NULL),
689     GNUNET_MQ_handler_end ()
690   };
691
692
693   peer_ctx = get_peer_ctx (peer);
694   if (NULL == peer_ctx->send_channel_ctx)
695   {
696     LOG (GNUNET_ERROR_TYPE_DEBUG,
697          "Trying to establish channel to peer %s\n",
698          GNUNET_i2s (peer));
699     ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
700     *ctx_peer = *peer;
701     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
702     peer_ctx->send_channel_ctx->channel =
703       GNUNET_CADET_channel_create (cadet_handle,
704                                    peer_ctx->send_channel_ctx, /* context */
705                                    peer,
706                                    &port,
707                                    GNUNET_CADET_OPTION_RELIABLE,
708                                    NULL, /* WindowSize handler */
709                                    &cleanup_destroyed_channel, /* Disconnect handler */
710                                    cadet_handlers);
711   }
712   GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
713   GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
714   return peer_ctx->send_channel_ctx->channel;
715 }
716
717
718 /**
719  * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
720  *
721  * If we already have a message queue open to this client,
722  * simply return it, otherways create one.
723  *
724  * @param peer the peer to get the mq to
725  * @return the #GNUNET_MQ_Handle
726  */
727 static struct GNUNET_MQ_Handle *
728 get_mq (const struct GNUNET_PeerIdentity *peer)
729 {
730   struct PeerContext *peer_ctx;
731
732   peer_ctx = get_peer_ctx (peer);
733
734   if (NULL == peer_ctx->mq)
735   {
736     peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
737   }
738   return peer_ctx->mq;
739 }
740
741 /**
742  * @brief Add an envelope to a message passed to mq to list of pending messages
743  *
744  * @param peer peer the message was sent to
745  * @param ev envelope to the message
746  * @param type type of the message to be sent
747  * @return pointer to pending message
748  */
749 static struct PendingMessage *
750 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
751                         struct GNUNET_MQ_Envelope *ev,
752                         const char *type)
753 {
754   struct PendingMessage *pending_msg;
755   struct PeerContext *peer_ctx;
756
757   peer_ctx = get_peer_ctx (peer);
758   pending_msg = GNUNET_new (struct PendingMessage);
759   pending_msg->ev = ev;
760   pending_msg->peer_ctx = peer_ctx;
761   pending_msg->type = type;
762   GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
763                                peer_ctx->pending_messages_tail,
764                                pending_msg);
765   return pending_msg;
766 }
767
768
769 /**
770  * @brief Remove a pending message from the respective DLL
771  *
772  * @param pending_msg the pending message to remove
773  * @param cancel whether to cancel the pending message, too
774  */
775 static void
776 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
777 {
778   struct PeerContext *peer_ctx;
779   (void) cancel;
780
781   peer_ctx = pending_msg->peer_ctx;
782   GNUNET_assert (NULL != peer_ctx);
783   GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
784                                peer_ctx->pending_messages_tail,
785                                pending_msg);
786   // TODO wait for the cadet implementation of message cancellation
787   //if (GNUNET_YES == cancel)
788   //{
789   //  GNUNET_MQ_send_cancel (pending_msg->ev);
790   //}
791   GNUNET_free (pending_msg);
792 }
793
794
795 /**
796  * @brief This is called in response to the first message we sent as a
797  * liveliness check.
798  *
799  * @param cls #PeerContext of peer with pending liveliness check
800  */
801 static void
802 mq_liveliness_check_successful (void *cls)
803 {
804   struct PeerContext *peer_ctx = cls;
805
806   if (NULL != peer_ctx->liveliness_check_pending)
807   {
808     LOG (GNUNET_ERROR_TYPE_DEBUG,
809         "Liveliness check for peer %s was successfull\n",
810         GNUNET_i2s (&peer_ctx->peer_id));
811     remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
812     peer_ctx->liveliness_check_pending = NULL;
813     set_peer_live (peer_ctx);
814   }
815 }
816
817 /**
818  * Issue a check whether peer is live
819  *
820  * @param peer_ctx the context of the peer
821  */
822 static void
823 check_peer_live (struct PeerContext *peer_ctx)
824 {
825   LOG (GNUNET_ERROR_TYPE_DEBUG,
826        "Get informed about peer %s getting live\n",
827        GNUNET_i2s (&peer_ctx->peer_id));
828
829   struct GNUNET_MQ_Handle *mq;
830   struct GNUNET_MQ_Envelope *ev;
831
832   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
833   peer_ctx->liveliness_check_pending =
834     insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
835   mq = get_mq (&peer_ctx->peer_id);
836   GNUNET_MQ_notify_sent (ev,
837                          mq_liveliness_check_successful,
838                          peer_ctx);
839   GNUNET_MQ_send (mq, ev);
840   GNUNET_STATISTICS_update (stats,
841                             "# pending liveliness checks",
842                             1,
843                             GNUNET_NO);
844 }
845
846
847 /**
848  * @brief Check whether function of type #PeerOp was already scheduled
849  *
850  * The array with pending operations will probably never grow really big, so
851  * iterating over it should be ok.
852  *
853  * @param peer the peer to check
854  * @param peer_op the operation (#PeerOp) on the peer
855  *
856  * @return #GNUNET_YES if this operation is scheduled on that peer
857  *         #GNUNET_NO  otherwise
858  */
859 static int
860 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
861                            const PeerOp peer_op)
862 {
863   const struct PeerContext *peer_ctx;
864   unsigned int i;
865
866   peer_ctx = get_peer_ctx (peer);
867   for (i = 0; i < peer_ctx->num_pending_ops; i++)
868     if (peer_op == peer_ctx->pending_ops[i].op)
869       return GNUNET_YES;
870   return GNUNET_NO;
871 }
872
873
874 /**
875  * @brief Callback for scheduler to destroy a channel
876  *
877  * @param cls Context of the channel
878  */
879 static void
880 destroy_channel (struct ChannelCtx *channel_ctx)
881 {
882   struct GNUNET_CADET_Channel *channel;
883
884   if (NULL != channel_ctx->destruction_task)
885   {
886     GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
887     channel_ctx->destruction_task = NULL;
888   }
889   GNUNET_assert (channel_ctx->channel != NULL);
890   channel = channel_ctx->channel;
891   channel_ctx->channel = NULL;
892   GNUNET_CADET_channel_destroy (channel);
893   remove_channel_ctx (channel_ctx);
894 }
895
896
897 /**
898  * @brief Destroy a cadet channel.
899  *
900  * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
901  *
902  * @param cls
903  */
904 static void
905 destroy_channel_cb (void *cls)
906 {
907   struct ChannelCtx *channel_ctx = cls;
908
909   channel_ctx->destruction_task = NULL;
910   destroy_channel (channel_ctx);
911 }
912
913
914 /**
915  * @brief Schedule the destruction of a channel for immediately afterwards.
916  *
917  * In case a channel is to be destroyed from within the callback to the
918  * destruction of another channel (send channel), we cannot call
919  * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
920  * construction.
921  *
922  * @param channel_ctx channel to be destroyed.
923  */
924 static void
925 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
926 {
927   GNUNET_assert (NULL ==
928                  channel_ctx->destruction_task);
929   GNUNET_assert (NULL !=
930                  channel_ctx->channel);
931   channel_ctx->destruction_task =
932     GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
933                               channel_ctx);
934 }
935
936
937 /**
938  * @brief Remove peer
939  *
940  * @param peer the peer to clean
941  * @return #GNUNET_YES if peer was removed
942  *         #GNUNET_NO  otherwise
943  */
944 static int
945 destroy_peer (struct PeerContext *peer_ctx)
946 {
947   GNUNET_assert (NULL != peer_ctx);
948   GNUNET_assert (NULL != peer_map);
949   if (GNUNET_NO ==
950       GNUNET_CONTAINER_multipeermap_contains (peer_map,
951                                               &peer_ctx->peer_id))
952   {
953     return GNUNET_NO;
954   }
955   SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        "Going to remove peer %s\n",
958        GNUNET_i2s (&peer_ctx->peer_id));
959   UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
960
961   /* Clear list of pending operations */
962   // TODO this probably leaks memory
963   //      ('only' the cls to the function. Not sure what to do with it)
964   GNUNET_array_grow (peer_ctx->pending_ops,
965                      peer_ctx->num_pending_ops,
966                      0);
967   /* Remove all pending messages */
968   while (NULL != peer_ctx->pending_messages_head)
969   {
970     LOG (GNUNET_ERROR_TYPE_DEBUG,
971          "Removing unsent %s\n",
972          peer_ctx->pending_messages_head->type);
973     /* Cancle pending message, too */
974     if ( (NULL != peer_ctx->liveliness_check_pending) &&
975          (0 == memcmp (peer_ctx->pending_messages_head,
976                      peer_ctx->liveliness_check_pending,
977                      sizeof (struct PendingMessage))) )
978       {
979         peer_ctx->liveliness_check_pending = NULL;
980         GNUNET_STATISTICS_update (stats,
981                                   "# pending liveliness checks",
982                                   -1,
983                                   GNUNET_NO);
984       }
985     remove_pending_message (peer_ctx->pending_messages_head,
986                             GNUNET_YES);
987   }
988
989   /* If we are still waiting for notification whether this peer is live
990    * cancel the according task */
991   if (NULL != peer_ctx->liveliness_check_pending)
992   {
993     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994                 "Removing pending liveliness check for peer %s\n",
995                 GNUNET_i2s (&peer_ctx->peer_id));
996     // TODO wait until cadet sets mq->cancel_impl
997     //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
998     remove_pending_message (peer_ctx->liveliness_check_pending,
999                             GNUNET_YES);
1000     peer_ctx->liveliness_check_pending = NULL;
1001   }
1002
1003   if (NULL != peer_ctx->send_channel_ctx)
1004   {
1005     /* This is possibly called from within channel destruction */
1006     peer_ctx->send_channel_ctx->peer_ctx = NULL;
1007     schedule_channel_destruction (peer_ctx->send_channel_ctx);
1008     peer_ctx->send_channel_ctx = NULL;
1009     peer_ctx->mq = NULL;
1010   }
1011   if (NULL != peer_ctx->recv_channel_ctx)
1012   {
1013     /* This is possibly called from within channel destruction */
1014     peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1015     schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1016     peer_ctx->recv_channel_ctx = NULL;
1017   }
1018
1019   if (GNUNET_YES !=
1020       GNUNET_CONTAINER_multipeermap_remove_all (peer_map,
1021                                                 &peer_ctx->peer_id))
1022   {
1023     LOG (GNUNET_ERROR_TYPE_WARNING,
1024          "removing peer from peer_map failed\n");
1025   }
1026   GNUNET_STATISTICS_set (stats,
1027                         "# known peers",
1028                         GNUNET_CONTAINER_multipeermap_size (peer_map),
1029                         GNUNET_NO);
1030   GNUNET_free (peer_ctx);
1031   return GNUNET_YES;
1032 }
1033
1034
1035 /**
1036  * Iterator over hash map entries. Deletes all contexts of peers.
1037  *
1038  * @param cls closure
1039  * @param key current public key
1040  * @param value value in the hash map
1041  * @return #GNUNET_YES if we should continue to iterate,
1042  *         #GNUNET_NO if not.
1043  */
1044 static int
1045 peermap_clear_iterator (void *cls,
1046                         const struct GNUNET_PeerIdentity *key,
1047                         void *value)
1048 {
1049   (void) cls;
1050   (void) value;
1051   destroy_peer (get_peer_ctx (key));
1052   return GNUNET_YES;
1053 }
1054
1055
1056 /**
1057  * @brief This is called once a message is sent.
1058  *
1059  * Removes the pending message
1060  *
1061  * @param cls type of the message that was sent
1062  */
1063 static void
1064 mq_notify_sent_cb (void *cls)
1065 {
1066   struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1067   LOG (GNUNET_ERROR_TYPE_DEBUG,
1068       "%s was sent.\n",
1069       pending_msg->type);
1070   if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1071     GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1072   if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1073     GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1074   if (0 == strncmp ("PUSH", pending_msg->type, 4))
1075     GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1076   /* Do not cancle message */
1077   remove_pending_message (pending_msg, GNUNET_NO);
1078 }
1079
1080
1081 /**
1082  * @brief Iterator function for #store_valid_peers.
1083  *
1084  * Implements #GNUNET_CONTAINER_PeerMapIterator.
1085  * Writes single peer to disk.
1086  *
1087  * @param cls the file handle to write to.
1088  * @param peer current peer
1089  * @param value unused
1090  *
1091  * @return  #GNUNET_YES if we should continue to
1092  *          iterate,
1093  *          #GNUNET_NO if not.
1094  */
1095 static int
1096 store_peer_presistently_iterator (void *cls,
1097                                   const struct GNUNET_PeerIdentity *peer,
1098                                   void *value)
1099 {
1100   const struct GNUNET_DISK_FileHandle *fh = cls;
1101   char peer_string[128];
1102   int size;
1103   ssize_t ret;
1104   (void) value;
1105
1106   if (NULL == peer)
1107   {
1108     return GNUNET_YES;
1109   }
1110   size = GNUNET_snprintf (peer_string,
1111                           sizeof (peer_string),
1112                           "%s\n",
1113                           GNUNET_i2s_full (peer));
1114   GNUNET_assert (53 == size);
1115   ret = GNUNET_DISK_file_write (fh,
1116                                 peer_string,
1117                                 size);
1118   GNUNET_assert (size == ret);
1119   return GNUNET_YES;
1120 }
1121
1122
1123 /**
1124  * @brief Store the peers currently in #valid_peers to disk.
1125  */
1126 static void
1127 store_valid_peers ()
1128 {
1129   struct GNUNET_DISK_FileHandle *fh;
1130   uint32_t number_written_peers;
1131   int ret;
1132
1133   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1134   {
1135     return;
1136   }
1137
1138   ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
1139   if (GNUNET_SYSERR == ret)
1140   {
1141     LOG (GNUNET_ERROR_TYPE_WARNING,
1142         "Not able to create directory for file `%s'\n",
1143         filename_valid_peers);
1144     GNUNET_break (0);
1145   }
1146   else if (GNUNET_NO == ret)
1147   {
1148     LOG (GNUNET_ERROR_TYPE_WARNING,
1149         "Directory for file `%s' exists but is not writable for us\n",
1150         filename_valid_peers);
1151     GNUNET_break (0);
1152   }
1153   fh = GNUNET_DISK_file_open (filename_valid_peers,
1154                               GNUNET_DISK_OPEN_WRITE |
1155                                   GNUNET_DISK_OPEN_CREATE,
1156                               GNUNET_DISK_PERM_USER_READ |
1157                                   GNUNET_DISK_PERM_USER_WRITE);
1158   if (NULL == fh)
1159   {
1160     LOG (GNUNET_ERROR_TYPE_WARNING,
1161         "Not able to write valid peers to file `%s'\n",
1162         filename_valid_peers);
1163     return;
1164   }
1165   LOG (GNUNET_ERROR_TYPE_DEBUG,
1166       "Writing %u valid peers to disk\n",
1167       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1168   number_written_peers =
1169     GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1170                                            store_peer_presistently_iterator,
1171                                            fh);
1172   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1173   GNUNET_assert (number_written_peers ==
1174       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1175 }
1176
1177
1178 /**
1179  * @brief Convert string representation of peer id to peer id.
1180  *
1181  * Counterpart to #GNUNET_i2s_full.
1182  *
1183  * @param string_repr The string representation of the peer id
1184  *
1185  * @return The peer id
1186  */
1187 static const struct GNUNET_PeerIdentity *
1188 s2i_full (const char *string_repr)
1189 {
1190   struct GNUNET_PeerIdentity *peer;
1191   size_t len;
1192   int ret;
1193
1194   peer = GNUNET_new (struct GNUNET_PeerIdentity);
1195   len = strlen (string_repr);
1196   if (52 > len)
1197   {
1198     LOG (GNUNET_ERROR_TYPE_WARNING,
1199         "Not able to convert string representation of PeerID to PeerID\n"
1200         "Sting representation: %s (len %lu) - too short\n",
1201         string_repr,
1202         len);
1203     GNUNET_break (0);
1204   }
1205   else if (52 < len)
1206   {
1207     len = 52;
1208   }
1209   ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1210                                                     len,
1211                                                     &peer->public_key);
1212   if (GNUNET_OK != ret)
1213   {
1214     LOG (GNUNET_ERROR_TYPE_WARNING,
1215         "Not able to convert string representation of PeerID to PeerID\n"
1216         "Sting representation: %s\n",
1217         string_repr);
1218     GNUNET_break (0);
1219   }
1220   return peer;
1221 }
1222
1223
1224 /**
1225  * @brief Restore the peers on disk to #valid_peers.
1226  */
1227 static void
1228 restore_valid_peers ()
1229 {
1230   off_t file_size;
1231   uint32_t num_peers;
1232   struct GNUNET_DISK_FileHandle *fh;
1233   char *buf;
1234   ssize_t size_read;
1235   char *iter_buf;
1236   char *str_repr;
1237   const struct GNUNET_PeerIdentity *peer;
1238
1239   if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1240   {
1241     return;
1242   }
1243
1244   if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1245   {
1246     return;
1247   }
1248   fh = GNUNET_DISK_file_open (filename_valid_peers,
1249                               GNUNET_DISK_OPEN_READ,
1250                               GNUNET_DISK_PERM_NONE);
1251   GNUNET_assert (NULL != fh);
1252   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1253   num_peers = file_size / 53;
1254   buf = GNUNET_malloc (file_size);
1255   size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1256   GNUNET_assert (size_read == file_size);
1257   LOG (GNUNET_ERROR_TYPE_DEBUG,
1258       "Restoring %" PRIu32 " peers from file `%s'\n",
1259       num_peers,
1260       filename_valid_peers);
1261   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1262   {
1263     str_repr = GNUNET_strndup (iter_buf, 53);
1264     peer = s2i_full (str_repr);
1265     GNUNET_free (str_repr);
1266     add_valid_peer (peer);
1267     LOG (GNUNET_ERROR_TYPE_DEBUG,
1268         "Restored valid peer %s from disk\n",
1269         GNUNET_i2s_full (peer));
1270   }
1271   iter_buf = NULL;
1272   GNUNET_free (buf);
1273   LOG (GNUNET_ERROR_TYPE_DEBUG,
1274       "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1275       num_peers,
1276       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1277   if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1278   {
1279     LOG (GNUNET_ERROR_TYPE_WARNING,
1280         "Number of restored peers does not match file size. Have probably duplicates.\n");
1281   }
1282   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1283   LOG (GNUNET_ERROR_TYPE_DEBUG,
1284       "Restored %u valid peers from disk\n",
1285       GNUNET_CONTAINER_multipeermap_size (valid_peers));
1286 }
1287
1288
1289 /**
1290  * @brief Initialise storage of peers
1291  *
1292  * @param fn_valid_peers filename of the file used to store valid peer ids
1293  * @param cadet_h cadet handle
1294  * @param own_id own peer identity
1295  */
1296 static void
1297 initialise_peers (char* fn_valid_peers,
1298                   struct GNUNET_CADET_Handle *cadet_h)
1299 {
1300   filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1301   cadet_handle = cadet_h;
1302   peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1303   valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1304   restore_valid_peers ();
1305 }
1306
1307
1308 /**
1309  * @brief Delete storage of peers that was created with #initialise_peers ()
1310  */
1311 static void
1312 peers_terminate ()
1313 {
1314   if (GNUNET_SYSERR ==
1315       GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1316                                              &peermap_clear_iterator,
1317                                              NULL))
1318   {
1319     LOG (GNUNET_ERROR_TYPE_WARNING,
1320         "Iteration destroying peers was aborted.\n");
1321   }
1322   GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1323   peer_map = NULL;
1324   store_valid_peers ();
1325   GNUNET_free (filename_valid_peers);
1326   filename_valid_peers = NULL;
1327   GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1328   valid_peers = NULL;
1329 }
1330
1331
1332 /**
1333  * Iterator over #valid_peers hash map entries.
1334  *
1335  * @param cls closure - unused
1336  * @param peer current peer id
1337  * @param value value in the hash map - unused
1338  * @return #GNUNET_YES if we should continue to
1339  *         iterate,
1340  *         #GNUNET_NO if not.
1341  */
1342 static int
1343 valid_peer_iterator (void *cls,
1344                      const struct GNUNET_PeerIdentity *peer,
1345                      void *value)
1346 {
1347   struct PeersIteratorCls *it_cls = cls;
1348   (void) value;
1349
1350   return it_cls->iterator (it_cls->cls,
1351                            peer);
1352 }
1353
1354
1355 /**
1356  * @brief Get all currently known, valid peer ids.
1357  *
1358  * @param it function to call on each peer id
1359  * @param it_cls extra argument to @a it
1360  * @return the number of key value pairs processed,
1361  *         #GNUNET_SYSERR if it aborted iteration
1362  */
1363 static int
1364 get_valid_peers (PeersIterator iterator,
1365                  void *it_cls)
1366 {
1367   struct PeersIteratorCls *cls;
1368   int ret;
1369
1370   cls = GNUNET_new (struct PeersIteratorCls);
1371   cls->iterator = iterator;
1372   cls->cls = it_cls;
1373   ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1374                                                valid_peer_iterator,
1375                                                cls);
1376   GNUNET_free (cls);
1377   return ret;
1378 }
1379
1380
1381 /**
1382  * @brief Add peer to known peers.
1383  *
1384  * This function is called on new peer_ids from 'external' sources
1385  * (client seed, cadet get_peers(), ...)
1386  *
1387  * @param peer the new #GNUNET_PeerIdentity
1388  *
1389  * @return #GNUNET_YES if peer was inserted
1390  *         #GNUNET_NO  otherwise
1391  */
1392 static int
1393 insert_peer (const struct GNUNET_PeerIdentity *peer)
1394 {
1395   if (GNUNET_YES == check_peer_known (peer))
1396   {
1397     return GNUNET_NO; /* We already know this peer - nothing to do */
1398   }
1399   (void) create_peer_ctx (peer);
1400   return GNUNET_YES;
1401 }
1402
1403
1404 /**
1405  * @brief Check whether flags on a peer are set.
1406  *
1407  * @param peer the peer to check the flag of
1408  * @param flags the flags to check
1409  *
1410  * @return #GNUNET_SYSERR if peer is not known
1411  *         #GNUNET_YES    if all given flags are set
1412  *         #GNUNET_NO     otherwise
1413  */
1414 static int
1415 check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1416                  enum Peers_PeerFlags flags)
1417 {
1418   struct PeerContext *peer_ctx;
1419
1420   if (GNUNET_NO == check_peer_known (peer))
1421   {
1422     return GNUNET_SYSERR;
1423   }
1424   peer_ctx = get_peer_ctx (peer);
1425   return check_peer_flag_set (peer_ctx, flags);
1426 }
1427
1428 /**
1429  * @brief Try connecting to a peer to see whether it is online
1430  *
1431  * If not known yet, insert into known peers
1432  *
1433  * @param peer the peer whose liveliness is to be checked
1434  * @return #GNUNET_YES if the check was issued
1435  *         #GNUNET_NO  otherwise
1436  */
1437 static int
1438 issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1439 {
1440   struct PeerContext *peer_ctx;
1441
1442   (void) insert_peer (peer);
1443   peer_ctx = get_peer_ctx (peer);
1444   if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) &&
1445        (NULL == peer_ctx->liveliness_check_pending) )
1446   {
1447     check_peer_live (peer_ctx);
1448     return GNUNET_YES;
1449   }
1450   return GNUNET_NO;
1451 }
1452
1453
1454 /**
1455  * @brief Check if peer is removable.
1456  *
1457  * Check if
1458  *  - a recv channel exists
1459  *  - there are pending messages
1460  *  - there is no pending pull reply
1461  *
1462  * @param peer the peer in question
1463  * @return #GNUNET_YES    if peer is removable
1464  *         #GNUNET_NO     if peer is NOT removable
1465  *         #GNUNET_SYSERR if peer is not known
1466  */
1467 static int
1468 check_removable (const struct GNUNET_PeerIdentity *peer)
1469 {
1470   struct PeerContext *peer_ctx;
1471
1472   if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1473   {
1474     return GNUNET_SYSERR;
1475   }
1476
1477   peer_ctx = get_peer_ctx (peer);
1478   if ( (NULL != peer_ctx->recv_channel_ctx) ||
1479        (NULL != peer_ctx->pending_messages_head) ||
1480        (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1481   {
1482     return GNUNET_NO;
1483   }
1484   return GNUNET_YES;
1485 }
1486
1487
1488 /**
1489  * @brief Check whether @a peer is actually a peer.
1490  *
1491  * A valid peer is a peer that we know exists eg. we were connected to once.
1492  *
1493  * @param peer peer in question
1494  *
1495  * @return #GNUNET_YES if peer is valid
1496  *         #GNUNET_NO  if peer is not valid
1497  */
1498 static int
1499 check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1500 {
1501   return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1502 }
1503
1504
1505 /**
1506  * @brief Indicate that we want to send to the other peer
1507  *
1508  * This establishes a sending channel
1509  *
1510  * @param peer the peer to establish channel to
1511  */
1512 static void
1513 indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1514 {
1515   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1516   (void) get_channel (peer);
1517 }
1518
1519
1520 /**
1521  * @brief Check whether other peer has the intention to send/opened channel
1522  *        towars us
1523  *
1524  * @param peer the peer in question
1525  *
1526  * @return #GNUNET_YES if peer has the intention to send
1527  *         #GNUNET_NO  otherwise
1528  */
1529 static int
1530 check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1531 {
1532   const struct PeerContext *peer_ctx;
1533
1534   peer_ctx = get_peer_ctx (peer);
1535   if (NULL != peer_ctx->recv_channel_ctx)
1536   {
1537     return GNUNET_YES;
1538   }
1539   return GNUNET_NO;
1540 }
1541
1542
1543 /**
1544  * Handle the channel a peer opens to us.
1545  *
1546  * @param cls The closure
1547  * @param channel The channel the peer wants to establish
1548  * @param initiator The peer's peer ID
1549  *
1550  * @return initial channel context for the channel
1551  *         (can be NULL -- that's not an error)
1552  */
1553 static void *
1554 handle_inbound_channel (void *cls,
1555                         struct GNUNET_CADET_Channel *channel,
1556                         const struct GNUNET_PeerIdentity *initiator)
1557 {
1558   struct PeerContext *peer_ctx;
1559   struct GNUNET_PeerIdentity *ctx_peer;
1560   struct ChannelCtx *channel_ctx;
1561   (void) cls;
1562
1563   LOG (GNUNET_ERROR_TYPE_DEBUG,
1564       "New channel was established to us (Peer %s).\n",
1565       GNUNET_i2s (initiator));
1566   GNUNET_assert (NULL != channel); /* according to cadet API */
1567   /* Make sure we 'know' about this peer */
1568   peer_ctx = create_or_get_peer_ctx (initiator);
1569   set_peer_live (peer_ctx);
1570   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1571   *ctx_peer = *initiator;
1572   channel_ctx = add_channel_ctx (peer_ctx);
1573   channel_ctx->channel = channel;
1574   /* We only accept one incoming channel per peer */
1575   if (GNUNET_YES == check_peer_send_intention (initiator))
1576   {
1577     LOG (GNUNET_ERROR_TYPE_WARNING,
1578         "Already got one receive channel. Destroying old one.\n");
1579     GNUNET_break_op (0);
1580     destroy_channel (peer_ctx->recv_channel_ctx);
1581     peer_ctx->recv_channel_ctx = channel_ctx;
1582     /* return the channel context */
1583     return channel_ctx;
1584   }
1585   peer_ctx->recv_channel_ctx = channel_ctx;
1586   return channel_ctx;
1587 }
1588
1589
1590 /**
1591  * @brief Check whether a sending channel towards the given peer exists
1592  *
1593  * @param peer the peer to check for
1594  *
1595  * @return #GNUNET_YES if a sending channel towards that peer exists
1596  *         #GNUNET_NO  otherwise
1597  */
1598 static int
1599 check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1600 {
1601   struct PeerContext *peer_ctx;
1602
1603   if (GNUNET_NO == check_peer_known (peer))
1604   { /* If no such peer exists, there is no channel */
1605     return GNUNET_NO;
1606   }
1607   peer_ctx = get_peer_ctx (peer);
1608   if (NULL == peer_ctx->send_channel_ctx)
1609   {
1610     return GNUNET_NO;
1611   }
1612   return GNUNET_YES;
1613 }
1614
1615
1616 /**
1617  * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618  *        intention to another peer
1619  *
1620  * @peer the peer identity of the peer whose sending channel to destroy
1621  * @return #GNUNET_YES if channel was destroyed
1622  *         #GNUNET_NO  otherwise
1623  */
1624 static int
1625 destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1626 {
1627   struct PeerContext *peer_ctx;
1628
1629   if (GNUNET_NO == check_peer_known (peer))
1630   {
1631     return GNUNET_NO;
1632   }
1633   peer_ctx = get_peer_ctx (peer);
1634   if (NULL != peer_ctx->send_channel_ctx)
1635   {
1636     destroy_channel (peer_ctx->send_channel_ctx);
1637     (void) check_connected (peer);
1638     return GNUNET_YES;
1639   }
1640   return GNUNET_NO;
1641 }
1642
1643 /**
1644  * @brief Send a message to another peer.
1645  *
1646  * Keeps track about pending messages so they can be properly removed when the
1647  * peer is destroyed.
1648  *
1649  * @param peer receeiver of the message
1650  * @param ev envelope of the message
1651  * @param type type of the message
1652  */
1653 static void
1654 send_message (const struct GNUNET_PeerIdentity *peer,
1655               struct GNUNET_MQ_Envelope *ev,
1656               const char *type)
1657 {
1658   struct PendingMessage *pending_msg;
1659   struct GNUNET_MQ_Handle *mq;
1660
1661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662               "Sending message to %s of type %s\n",
1663               GNUNET_i2s (peer),
1664               type);
1665   pending_msg = insert_pending_message (peer, ev, type);
1666   mq = get_mq (peer);
1667   GNUNET_MQ_notify_sent (ev,
1668                          mq_notify_sent_cb,
1669                          pending_msg);
1670   GNUNET_MQ_send (mq, ev);
1671 }
1672
1673 /**
1674  * @brief Schedule a operation on given peer
1675  *
1676  * Avoids scheduling an operation twice.
1677  *
1678  * @param peer the peer we want to schedule the operation for once it gets live
1679  *
1680  * @return #GNUNET_YES if the operation was scheduled
1681  *         #GNUNET_NO  otherwise
1682  */
1683 static int
1684 schedule_operation (const struct GNUNET_PeerIdentity *peer,
1685                     const PeerOp peer_op)
1686 {
1687   struct PeerPendingOp pending_op;
1688   struct PeerContext *peer_ctx;
1689
1690   GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1691
1692   //TODO if LIVE/ONLINE execute immediately
1693
1694   if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1695   {
1696     peer_ctx = get_peer_ctx (peer);
1697     pending_op.op = peer_op;
1698     pending_op.op_cls = NULL;
1699     GNUNET_array_append (peer_ctx->pending_ops,
1700                          peer_ctx->num_pending_ops,
1701                          pending_op);
1702     return GNUNET_YES;
1703   }
1704   return GNUNET_NO;
1705 }
1706
1707 /***********************************************************************
1708  * /Old gnunet-service-rps_peers.c
1709 ***********************************************************************/
1710
1711
1712 /***********************************************************************
1713  * Housekeeping with clients
1714 ***********************************************************************/
1715
1716 /**
1717  * Closure used to pass the client and the id to the callback
1718  * that replies to a client's request
1719  */
1720 struct ReplyCls
1721 {
1722   /**
1723    * DLL
1724    */
1725   struct ReplyCls *next;
1726   struct ReplyCls *prev;
1727
1728   /**
1729    * The identifier of the request
1730    */
1731   uint32_t id;
1732
1733   /**
1734    * The handle to the request
1735    */
1736   struct RPS_SamplerRequestHandle *req_handle;
1737
1738   /**
1739    * The client handle to send the reply to
1740    */
1741   struct ClientContext *cli_ctx;
1742 };
1743
1744
1745 /**
1746  * Struct used to store the context of a connected client.
1747  */
1748 struct ClientContext
1749 {
1750   /**
1751    * DLL
1752    */
1753   struct ClientContext *next;
1754   struct ClientContext *prev;
1755
1756   /**
1757    * The message queue to communicate with the client.
1758    */
1759   struct GNUNET_MQ_Handle *mq;
1760
1761   /**
1762    * @brief How many updates this client expects to receive.
1763    */
1764   int64_t view_updates_left;
1765
1766   /**
1767    * @brief Whether this client wants to receive stream updates.
1768    * Either #GNUNET_YES or #GNUNET_NO
1769    */
1770   int8_t stream_update;
1771
1772   /**
1773    * The client handle to send the reply to
1774    */
1775   struct GNUNET_SERVICE_Client *client;
1776 };
1777
1778 /**
1779  * DLL with all clients currently connected to us
1780  */
1781 struct ClientContext *cli_ctx_head;
1782 struct ClientContext *cli_ctx_tail;
1783
1784 /***********************************************************************
1785  * /Housekeeping with clients
1786 ***********************************************************************/
1787
1788
1789
1790
1791
1792 /***********************************************************************
1793  * Globals
1794 ***********************************************************************/
1795
1796 /**
1797  * Sampler used for the Brahms protocol itself.
1798  */
1799 static struct RPS_Sampler *prot_sampler;
1800
1801 /**
1802  * Name to log view to
1803  */
1804 static const char *file_name_view_log;
1805
1806 #ifdef TO_FILE
1807 /**
1808  * Name to log number of observed peers to
1809  */
1810 static const char *file_name_observed_log;
1811
1812 /**
1813  * @brief Count the observed peers
1814  */
1815 static uint32_t num_observed_peers;
1816
1817 /**
1818  * @brief Multipeermap (ab-) used to count unique peer_ids
1819  */
1820 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1821 #endif /* TO_FILE */
1822
1823 /**
1824  * The size of sampler we need to be able to satisfy the Brahms protocol's
1825  * need of random peers.
1826  *
1827  * This is one minimum size the sampler grows to.
1828  */
1829 static unsigned int sampler_size_est_need;
1830
1831 /**
1832  * @brief This is the minimum estimate used as sampler size.
1833  *
1834  * It is configured by the user.
1835  */
1836 static unsigned int sampler_size_est_min;
1837
1838 /**
1839  * @brief This is the estimate used as view size.
1840  *
1841  * It is initialised with the minimum
1842  */
1843 static unsigned int view_size_est_need;
1844
1845 /**
1846  * @brief This is the minimum estimate used as view size.
1847  *
1848  * It is configured by the user.
1849  */
1850 static unsigned int view_size_est_min;
1851
1852 /**
1853  * Percentage of total peer number in the view
1854  * to send random PUSHes to
1855  */
1856 static float alpha;
1857
1858 /**
1859  * Percentage of total peer number in the view
1860  * to send random PULLs to
1861  */
1862 static float beta;
1863
1864 /**
1865  * Identifier for the main task that runs periodically.
1866  */
1867 static struct GNUNET_SCHEDULER_Task *do_round_task;
1868
1869 /**
1870  * Time inverval the do_round task runs in.
1871  */
1872 static struct GNUNET_TIME_Relative round_interval;
1873
1874 /**
1875  * List to store peers received through pushes temporary.
1876  */
1877 static struct CustomPeerMap *push_map;
1878
1879 /**
1880  * List to store peers received through pulls temporary.
1881  */
1882 static struct CustomPeerMap *pull_map;
1883
1884 /**
1885  * Handler to NSE.
1886  */
1887 static struct GNUNET_NSE_Handle *nse;
1888
1889 /**
1890  * Handler to CADET.
1891  */
1892 static struct GNUNET_CADET_Handle *cadet_handle;
1893
1894 /**
1895  * @brief Port to communicate to other peers.
1896  */
1897 static struct GNUNET_CADET_Port *cadet_port;
1898
1899 /**
1900  * Handler to PEERINFO.
1901  */
1902 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1903
1904 /**
1905  * Handle for cancellation of iteration over peers.
1906  */
1907 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1908
1909
1910 #ifdef ENABLE_MALICIOUS
1911 /**
1912  * Type of malicious peer
1913  *
1914  * 0 Don't act malicious at all - Default
1915  * 1 Try to maximise representation
1916  * 2 Try to partition the network
1917  * 3 Combined attack
1918  */
1919 static uint32_t mal_type;
1920
1921 /**
1922  * Other malicious peers
1923  */
1924 static struct GNUNET_PeerIdentity *mal_peers;
1925
1926 /**
1927  * Hashmap of malicious peers used as set.
1928  * Used to more efficiently check whether we know that peer.
1929  */
1930 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
1931
1932 /**
1933  * Number of other malicious peers
1934  */
1935 static uint32_t num_mal_peers;
1936
1937
1938 /**
1939  * If type is 2 This struct is used to store the attacked peers in a DLL
1940  */
1941 struct AttackedPeer
1942 {
1943   /**
1944    * DLL
1945    */
1946   struct AttackedPeer *next;
1947   struct AttackedPeer *prev;
1948
1949   /**
1950    * PeerID
1951    */
1952   struct GNUNET_PeerIdentity peer_id;
1953 };
1954
1955 /**
1956  * If type is 2 this is the DLL of attacked peers
1957  */
1958 static struct AttackedPeer *att_peers_head;
1959 static struct AttackedPeer *att_peers_tail;
1960
1961 /**
1962  * This index is used to point to an attacked peer to
1963  * implement the round-robin-ish way to select attacked peers.
1964  */
1965 static struct AttackedPeer *att_peer_index;
1966
1967 /**
1968  * Hashmap of attacked peers used as set.
1969  * Used to more efficiently check whether we know that peer.
1970  */
1971 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
1972
1973 /**
1974  * Number of attacked peers
1975  */
1976 static uint32_t num_attacked_peers;
1977
1978 /**
1979  * If type is 1 this is the attacked peer
1980  */
1981 static struct GNUNET_PeerIdentity attacked_peer;
1982
1983 /**
1984  * The limit of PUSHes we can send in one round.
1985  * This is an assumption of the Brahms protocol and either implemented
1986  * via proof of work
1987  * or
1988  * assumend to be the bandwidth limitation.
1989  */
1990 static uint32_t push_limit = 10000;
1991 #endif /* ENABLE_MALICIOUS */
1992
1993
1994 /***********************************************************************
1995  * /Globals
1996 ***********************************************************************/
1997
1998
1999 /***********************************************************************
2000  * Util functions
2001 ***********************************************************************/
2002
2003
2004 /**
2005  * Print peerlist to log.
2006  */
2007 static void
2008 print_peer_list (struct GNUNET_PeerIdentity *list,
2009                  unsigned int len)
2010 {
2011   unsigned int i;
2012
2013   LOG (GNUNET_ERROR_TYPE_DEBUG,
2014        "Printing peer list of length %u at %p:\n",
2015        len,
2016        list);
2017   for (i = 0 ; i < len ; i++)
2018   {
2019     LOG (GNUNET_ERROR_TYPE_DEBUG,
2020          "%u. peer: %s\n",
2021          i, GNUNET_i2s (&list[i]));
2022   }
2023 }
2024
2025
2026 /**
2027  * Remove peer from list.
2028  */
2029 static void
2030 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2031                unsigned int *list_size,
2032                const struct GNUNET_PeerIdentity *peer)
2033 {
2034   unsigned int i;
2035   struct GNUNET_PeerIdentity *tmp;
2036
2037   tmp = *peer_list;
2038
2039   LOG (GNUNET_ERROR_TYPE_DEBUG,
2040        "Removing peer %s from list at %p\n",
2041        GNUNET_i2s (peer),
2042        tmp);
2043
2044   for ( i = 0 ; i < *list_size ; i++ )
2045   {
2046     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2047     {
2048       if (i < *list_size -1)
2049       { /* Not at the last entry -- shift peers left */
2050         memmove (&tmp[i], &tmp[i +1],
2051                 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2052       }
2053       /* Remove last entry (should be now useless PeerID) */
2054       GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2055     }
2056   }
2057   *peer_list = tmp;
2058 }
2059
2060
2061 /**
2062  * Insert PeerID in #view
2063  *
2064  * Called once we know a peer is live.
2065  * Implements #PeerOp
2066  *
2067  * @return GNUNET_OK if peer was actually inserted
2068  *         GNUNET_NO if peer was not inserted
2069  */
2070 static void
2071 insert_in_view_op (void *cls,
2072                 const struct GNUNET_PeerIdentity *peer);
2073
2074 /**
2075  * Insert PeerID in #view
2076  *
2077  * Called once we know a peer is live.
2078  *
2079  * @return GNUNET_OK if peer was actually inserted
2080  *         GNUNET_NO if peer was not inserted
2081  */
2082 static int
2083 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2084 {
2085   int online;
2086   int ret;
2087
2088   online = check_peer_flag (peer, Peers_ONLINE);
2089   if ( (GNUNET_NO == online) ||
2090        (GNUNET_SYSERR == online) ) /* peer is not even known */
2091   {
2092     (void) issue_peer_liveliness_check (peer);
2093     (void) schedule_operation (peer, insert_in_view_op);
2094     return GNUNET_NO;
2095   }
2096   /* Open channel towards peer to keep connection open */
2097   indicate_sending_intention (peer);
2098   ret = View_put (peer);
2099   GNUNET_STATISTICS_set (stats, "view size", View_size(), GNUNET_NO);
2100   return ret;
2101 }
2102
2103
2104 /**
2105  * @brief Send view to client
2106  *
2107  * @param cli_ctx the context of the client
2108  * @param view_array the peerids of the view as array (can be empty)
2109  * @param view_size the size of the view array (can be 0)
2110  */
2111 void
2112 send_view (const struct ClientContext *cli_ctx,
2113            const struct GNUNET_PeerIdentity *view_array,
2114            uint64_t view_size)
2115 {
2116   struct GNUNET_MQ_Envelope *ev;
2117   struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2118
2119   if (NULL == view_array)
2120   {
2121     view_size = View_size ();
2122     view_array = View_get_as_array();
2123   }
2124
2125   ev = GNUNET_MQ_msg_extra (out_msg,
2126                             view_size * sizeof (struct GNUNET_PeerIdentity),
2127                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2128   out_msg->num_peers = htonl (view_size);
2129
2130   GNUNET_memcpy (&out_msg[1],
2131                  view_array,
2132                  view_size * sizeof (struct GNUNET_PeerIdentity));
2133   GNUNET_MQ_send (cli_ctx->mq, ev);
2134 }
2135
2136
2137 /**
2138  * @brief Send peer from biased stream to client.
2139  *
2140  * TODO merge with send_view, parameterise
2141  *
2142  * @param cli_ctx the context of the client
2143  * @param view_array the peerids of the view as array (can be empty)
2144  * @param view_size the size of the view array (can be 0)
2145  */
2146 void
2147 send_stream_peers (const struct ClientContext *cli_ctx,
2148                    uint64_t num_peers,
2149                    const struct GNUNET_PeerIdentity *peers)
2150 {
2151   struct GNUNET_MQ_Envelope *ev;
2152   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2153
2154   GNUNET_assert (NULL != peers);
2155
2156   ev = GNUNET_MQ_msg_extra (out_msg,
2157                             num_peers * sizeof (struct GNUNET_PeerIdentity),
2158                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2159   out_msg->num_peers = htonl (num_peers);
2160
2161   GNUNET_memcpy (&out_msg[1],
2162                  peers,
2163                  num_peers * sizeof (struct GNUNET_PeerIdentity));
2164   GNUNET_MQ_send (cli_ctx->mq, ev);
2165 }
2166
2167
2168 /**
2169  * @brief sends updates to clients that are interested
2170  */
2171 static void
2172 clients_notify_view_update (void)
2173 {
2174   struct ClientContext *cli_ctx_iter;
2175   uint64_t num_peers;
2176   const struct GNUNET_PeerIdentity *view_array;
2177
2178   num_peers = View_size ();
2179   view_array = View_get_as_array();
2180   /* check size of view is small enough */
2181   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2182   {
2183     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2184                 "View is too big to send\n");
2185     return;
2186   }
2187
2188   for (cli_ctx_iter = cli_ctx_head;
2189        NULL != cli_ctx_iter;
2190        cli_ctx_iter = cli_ctx_iter->next)
2191   {
2192     if (1 < cli_ctx_iter->view_updates_left)
2193     {
2194       /* Client wants to receive limited amount of updates */
2195       cli_ctx_iter->view_updates_left -= 1;
2196     } else if (1 == cli_ctx_iter->view_updates_left)
2197     {
2198       /* Last update of view for client */
2199       cli_ctx_iter->view_updates_left = -1;
2200     } else if (0 > cli_ctx_iter->view_updates_left) {
2201       /* Client is not interested in updates */
2202       continue;
2203     }
2204     /* else _updates_left == 0 - infinite amount of updates */
2205
2206     /* send view */
2207     send_view (cli_ctx_iter, view_array, num_peers);
2208   }
2209 }
2210
2211
2212 /**
2213  * @brief sends updates to clients that are interested
2214  */
2215 static void
2216 clients_notify_stream_peer (uint64_t num_peers,
2217                             const struct GNUNET_PeerIdentity *peers)
2218                             // TODO enum StreamPeerSource)
2219 {
2220   struct ClientContext *cli_ctx_iter;
2221
2222   LOG (GNUNET_ERROR_TYPE_DEBUG,
2223       "Got peer (%s) from biased stream - update all clients\n",
2224       GNUNET_i2s (peers));
2225
2226   for (cli_ctx_iter = cli_ctx_head;
2227        NULL != cli_ctx_iter;
2228        cli_ctx_iter = cli_ctx_iter->next)
2229   {
2230     if (GNUNET_YES == cli_ctx_iter->stream_update)
2231     {
2232       send_stream_peers (cli_ctx_iter, num_peers, peers);
2233     }
2234   }
2235 }
2236
2237 /**
2238  * Put random peer from sampler into the view as history update.
2239  */
2240 static void
2241 hist_update (const struct GNUNET_PeerIdentity *ids,
2242              uint32_t num_peers,
2243              void *cls)
2244 {
2245   unsigned int i;
2246   (void) cls;
2247
2248   for (i = 0; i < num_peers; i++)
2249   {
2250     int inserted;
2251     inserted = insert_in_view (&ids[i]);
2252     if (GNUNET_OK == inserted)
2253     {
2254       clients_notify_stream_peer (1, &ids[i]);
2255     }
2256     to_file (file_name_view_log,
2257              "+%s\t(hist)",
2258              GNUNET_i2s_full (ids));
2259   }
2260   clients_notify_view_update();
2261 }
2262
2263
2264 /**
2265  * Wrapper around #RPS_sampler_resize()
2266  *
2267  * If we do not have enough sampler elements, double current sampler size
2268  * If we have more than enough sampler elements, halv current sampler size
2269  */
2270 static void
2271 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2272 {
2273   unsigned int sampler_size;
2274
2275   // TODO statistics
2276   // TODO respect the min, max
2277   sampler_size = RPS_sampler_get_size (sampler);
2278   if (sampler_size > new_size * 4)
2279   { /* Shrinking */
2280     RPS_sampler_resize (sampler, sampler_size / 2);
2281   }
2282   else if (sampler_size < new_size)
2283   { /* Growing */
2284     RPS_sampler_resize (sampler, sampler_size * 2);
2285   }
2286   LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2287 }
2288
2289
2290 /**
2291  * Add all peers in @a peer_array to @a peer_map used as set.
2292  *
2293  * @param peer_array array containing the peers
2294  * @param num_peers number of peers in @peer_array
2295  * @param peer_map the peermap to use as set
2296  */
2297 static void
2298 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2299                        unsigned int num_peers,
2300                        struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2301 {
2302   unsigned int i;
2303   if (NULL == peer_map)
2304   {
2305     LOG (GNUNET_ERROR_TYPE_WARNING,
2306          "Trying to add peers to non-existing peermap.\n");
2307     return;
2308   }
2309
2310   for (i = 0; i < num_peers; i++)
2311   {
2312     GNUNET_CONTAINER_multipeermap_put (peer_map,
2313                                        &peer_array[i],
2314                                        NULL,
2315                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2316     GNUNET_STATISTICS_set (stats,
2317                           "# known peers",
2318                           GNUNET_CONTAINER_multipeermap_size (peer_map),
2319                           GNUNET_NO);
2320   }
2321 }
2322
2323
2324 /**
2325  * Send a PULL REPLY to @a peer_id
2326  *
2327  * @param peer_id the peer to send the reply to.
2328  * @param peer_ids the peers to send to @a peer_id
2329  * @param num_peer_ids the number of peers to send to @a peer_id
2330  */
2331 static void
2332 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2333                  const struct GNUNET_PeerIdentity *peer_ids,
2334                  unsigned int num_peer_ids)
2335 {
2336   uint32_t send_size;
2337   struct GNUNET_MQ_Envelope *ev;
2338   struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2339
2340   /* Compute actual size */
2341   send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2342               num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2343
2344   if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2345     /* Compute number of peers to send
2346      * If too long, simply truncate */
2347     // TODO select random ones via permutation
2348     //      or even better: do good protocol design
2349     send_size =
2350       (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2351        sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2352        sizeof (struct GNUNET_PeerIdentity);
2353   else
2354     send_size = num_peer_ids;
2355
2356   LOG (GNUNET_ERROR_TYPE_DEBUG,
2357       "Going to send PULL REPLY with %u peers to %s\n",
2358       send_size, GNUNET_i2s (peer_id));
2359
2360   ev = GNUNET_MQ_msg_extra (out_msg,
2361                             send_size * sizeof (struct GNUNET_PeerIdentity),
2362                             GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2363   out_msg->num_peers = htonl (send_size);
2364   GNUNET_memcpy (&out_msg[1], peer_ids,
2365          send_size * sizeof (struct GNUNET_PeerIdentity));
2366
2367   send_message (peer_id, ev, "PULL REPLY");
2368   GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2369   // TODO check with send intention: as send_channel is used/opened we indicate
2370   // a sending intention without intending it.
2371   // -> clean peer afterwards?
2372   // -> use recv_channel?
2373 }
2374
2375
2376 /**
2377  * Insert PeerID in #pull_map
2378  *
2379  * Called once we know a peer is live.
2380  */
2381 static void
2382 insert_in_pull_map (void *cls,
2383                     const struct GNUNET_PeerIdentity *peer)
2384 {
2385   (void) cls;
2386   CustomPeerMap_put (pull_map, peer);
2387 }
2388
2389
2390 /**
2391  * Insert PeerID in #view
2392  *
2393  * Called once we know a peer is live.
2394  * Implements #PeerOp
2395  */
2396 static void
2397 insert_in_view_op (void *cls,
2398                    const struct GNUNET_PeerIdentity *peer)
2399 {
2400   (void) cls;
2401   int inserted;
2402
2403   inserted = insert_in_view (peer);
2404   if (GNUNET_OK == inserted)
2405   {
2406     clients_notify_stream_peer (1, peer);
2407   }
2408 }
2409
2410
2411 /**
2412  * Update sampler with given PeerID.
2413  * Implements #PeerOp
2414  */
2415 static void
2416 insert_in_sampler (void *cls,
2417                    const struct GNUNET_PeerIdentity *peer)
2418 {
2419   (void) cls;
2420   LOG (GNUNET_ERROR_TYPE_DEBUG,
2421        "Updating samplers with peer %s from insert_in_sampler()\n",
2422        GNUNET_i2s (peer));
2423   RPS_sampler_update (prot_sampler,   peer);
2424   if (0 < RPS_sampler_count_id (prot_sampler, peer))
2425   {
2426     /* Make sure we 'know' about this peer */
2427     (void) issue_peer_liveliness_check (peer);
2428     /* Establish a channel towards that peer to indicate we are going to send
2429      * messages to it */
2430     //indicate_sending_intention (peer);
2431   }
2432   #ifdef TO_FILE
2433   num_observed_peers++;
2434   GNUNET_CONTAINER_multipeermap_put
2435     (observed_unique_peers,
2436      peer,
2437      NULL,
2438      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2439   uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2440       observed_unique_peers);
2441   to_file (file_name_observed_log,
2442           "%" PRIu32 " %" PRIu32 " %f\n",
2443           num_observed_peers,
2444           num_observed_unique_peers,
2445           1.0*num_observed_unique_peers/num_observed_peers)
2446   #endif /* TO_FILE */
2447 }
2448
2449 /**
2450  * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2451  *        If the peer is not known, liveliness check is issued and it is
2452  *        scheduled to be inserted in sampler and view.
2453  *
2454  * "External sources" refer to every source except the gossip.
2455  *
2456  * @param peer peer to insert
2457  */
2458 static void
2459 got_peer (const struct GNUNET_PeerIdentity *peer)
2460 {
2461   /* If we did not know this peer already, insert it into sampler and view */
2462   if (GNUNET_YES == issue_peer_liveliness_check (peer))
2463   {
2464     schedule_operation (peer, insert_in_sampler);
2465     schedule_operation (peer, insert_in_view_op);
2466   }
2467   GNUNET_STATISTICS_update (stats,
2468                             "# learnd peers",
2469                             1,
2470                             GNUNET_NO);
2471 }
2472
2473 /**
2474  * @brief Checks if there is a sending channel and if it is needed
2475  *
2476  * @param peer the peer whose sending channel is checked
2477  * @return GNUNET_YES if sending channel exists and is still needed
2478  *         GNUNET_NO  otherwise
2479  */
2480 static int
2481 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2482 {
2483   /* struct GNUNET_CADET_Channel *channel; */
2484   if (GNUNET_NO == check_peer_known (peer))
2485   {
2486     return GNUNET_NO;
2487   }
2488   if (GNUNET_YES == check_sending_channel_exists (peer))
2489   {
2490     if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2491          (GNUNET_YES == View_contains_peer (peer)) ||
2492          (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2493          (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2494          (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2495     { /* If we want to keep the connection to peer open */
2496       return GNUNET_YES;
2497     }
2498     return GNUNET_NO;
2499   }
2500   return GNUNET_NO;
2501 }
2502
2503 /**
2504  * @brief remove peer from our knowledge, the view, push and pull maps and
2505  * samplers.
2506  *
2507  * @param peer the peer to remove
2508  */
2509 static void
2510 remove_peer (const struct GNUNET_PeerIdentity *peer)
2511 {
2512   (void) View_remove_peer (peer);
2513   CustomPeerMap_remove_peer (pull_map, peer);
2514   CustomPeerMap_remove_peer (push_map, peer);
2515   RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2516   destroy_peer (get_peer_ctx (peer));
2517 }
2518
2519
2520 /**
2521  * @brief Remove data that is not needed anymore.
2522  *
2523  * If the sending channel is no longer needed it is destroyed.
2524  *
2525  * @param peer the peer whose data is about to be cleaned
2526  */
2527 static void
2528 clean_peer (const struct GNUNET_PeerIdentity *peer)
2529 {
2530   if (GNUNET_NO == check_sending_channel_needed (peer))
2531   {
2532     LOG (GNUNET_ERROR_TYPE_DEBUG,
2533         "Going to remove send channel to peer %s\n",
2534         GNUNET_i2s (peer));
2535     #ifdef ENABLE_MALICIOUS
2536     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2537       (void) destroy_sending_channel (peer);
2538     #else /* ENABLE_MALICIOUS */
2539     (void) destroy_sending_channel (peer);
2540     #endif /* ENABLE_MALICIOUS */
2541   }
2542
2543   if ( (GNUNET_NO == check_peer_send_intention (peer)) &&
2544        (GNUNET_NO == View_contains_peer (peer)) &&
2545        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2546        (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2547        (0 == RPS_sampler_count_id (prot_sampler,   peer)) &&
2548        (GNUNET_NO != check_removable (peer)) )
2549   { /* We can safely remove this peer */
2550     LOG (GNUNET_ERROR_TYPE_DEBUG,
2551         "Going to remove peer %s\n",
2552         GNUNET_i2s (peer));
2553     remove_peer (peer);
2554     return;
2555   }
2556 }
2557
2558
2559 /**
2560  * @brief This is called when a channel is destroyed.
2561  *
2562  * Removes peer completely from our knowledge if the send_channel was destroyed
2563  * Otherwise simply delete the recv_channel
2564  * Also check if the knowledge about this peer is still needed.
2565  * If not, remove this peer from our knowledge.
2566  *
2567  * @param cls The closure
2568  * @param channel The channel being closed
2569  * @param channel_ctx The context associated with this channel
2570  */
2571 static void
2572 cleanup_destroyed_channel (void *cls,
2573                            const struct GNUNET_CADET_Channel *channel)
2574 {
2575   struct ChannelCtx *channel_ctx = cls;
2576   struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2577   (void) cls;
2578   (void) channel;
2579
2580   channel_ctx->channel = NULL;
2581   remove_channel_ctx (channel_ctx);
2582   if (NULL != peer_ctx &&
2583       peer_ctx->send_channel_ctx == channel_ctx)
2584   {
2585     remove_peer (&peer_ctx->peer_id);
2586   }
2587 }
2588
2589 /***********************************************************************
2590  * /Util functions
2591 ***********************************************************************/
2592
2593 static void
2594 destroy_cli_ctx (struct ClientContext *cli_ctx)
2595 {
2596   GNUNET_assert (NULL != cli_ctx);
2597   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2598                                cli_ctx_tail,
2599                                cli_ctx);
2600   GNUNET_free (cli_ctx);
2601 }
2602
2603
2604 /**
2605  * Function called by NSE.
2606  *
2607  * Updates sizes of sampler list and view and adapt those lists
2608  * accordingly.
2609  */
2610 static void
2611 nse_callback (void *cls,
2612               struct GNUNET_TIME_Absolute timestamp,
2613               double logestimate, double std_dev)
2614 {
2615   double estimate;
2616   //double scale; // TODO this might go gloabal/config
2617   (void) cls;
2618   (void) timestamp;
2619
2620   LOG (GNUNET_ERROR_TYPE_DEBUG,
2621        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2622        logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2623   //scale = .01;
2624   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2625   // GNUNET_NSE_log_estimate_to_n (logestimate);
2626   estimate = pow (estimate, 1.0 / 3);
2627   // TODO add if std_dev is a number
2628   // estimate += (std_dev * scale);
2629   if (view_size_est_min < ceil (estimate))
2630   {
2631     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2632     sampler_size_est_need = estimate;
2633     view_size_est_need = estimate;
2634   } else
2635   {
2636     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2637     //sampler_size_est_need = view_size_est_min;
2638     view_size_est_need = view_size_est_min;
2639   }
2640   GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_need, GNUNET_NO);
2641
2642   /* If the NSE has changed adapt the lists accordingly */
2643   resize_wrapper (prot_sampler, sampler_size_est_need);
2644   View_change_len (view_size_est_need);
2645 }
2646
2647
2648 /**
2649  * @brief This function is called, when the client seeds peers.
2650  * It verifies that @a msg is well-formed.
2651  *
2652  * @param cls the closure (#ClientContext)
2653  * @param msg the message
2654  * @return #GNUNET_OK if @a msg is well-formed
2655  */
2656 static int
2657 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2658 {
2659   struct ClientContext *cli_ctx = cls;
2660   uint16_t msize = ntohs (msg->header.size);
2661   uint32_t num_peers = ntohl (msg->num_peers);
2662
2663   msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2664   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2665        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2666   {
2667     GNUNET_break (0);
2668     GNUNET_SERVICE_client_drop (cli_ctx->client);
2669     return GNUNET_SYSERR;
2670   }
2671   return GNUNET_OK;
2672 }
2673
2674
2675 /**
2676  * Handle seed from the client.
2677  *
2678  * @param cls closure
2679  * @param message the actual message
2680  */
2681 static void
2682 handle_client_seed (void *cls,
2683                     const struct GNUNET_RPS_CS_SeedMessage *msg)
2684 {
2685   struct ClientContext *cli_ctx = cls;
2686   struct GNUNET_PeerIdentity *peers;
2687   uint32_t num_peers;
2688   uint32_t i;
2689
2690   num_peers = ntohl (msg->num_peers);
2691   peers = (struct GNUNET_PeerIdentity *) &msg[1];
2692
2693   LOG (GNUNET_ERROR_TYPE_DEBUG,
2694        "Client seeded peers:\n");
2695   print_peer_list (peers, num_peers);
2696
2697   for (i = 0; i < num_peers; i++)
2698   {
2699     LOG (GNUNET_ERROR_TYPE_DEBUG,
2700          "Updating samplers with seed %" PRIu32 ": %s\n",
2701          i,
2702          GNUNET_i2s (&peers[i]));
2703
2704     got_peer (&peers[i]);
2705   }
2706   GNUNET_SERVICE_client_continue (cli_ctx->client);
2707 }
2708
2709
2710 /**
2711  * Handle RPS request from the client.
2712  *
2713  * @param cls Client context
2714  * @param message unused
2715  */
2716 static void
2717 handle_client_view_request (void *cls,
2718                             const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2719 {
2720   struct ClientContext *cli_ctx = cls;
2721   uint64_t num_updates;
2722
2723   num_updates = ntohl (msg->num_updates);
2724
2725   LOG (GNUNET_ERROR_TYPE_DEBUG,
2726        "Client requested %" PRIu64 " updates of view.\n",
2727        num_updates);
2728
2729   GNUNET_assert (NULL != cli_ctx);
2730   cli_ctx->view_updates_left = num_updates;
2731   send_view (cli_ctx, NULL, 0);
2732   GNUNET_SERVICE_client_continue (cli_ctx->client);
2733 }
2734
2735
2736 /**
2737  * @brief Handle the cancellation of the view updates.
2738  *
2739  * @param cls The client context
2740  * @param msg Unused
2741  */
2742 static void
2743 handle_client_view_cancel (void *cls,
2744                            const struct GNUNET_MessageHeader *msg)
2745 {
2746   struct ClientContext *cli_ctx = cls;
2747   (void) msg;
2748
2749   LOG (GNUNET_ERROR_TYPE_DEBUG,
2750        "Client does not want to receive updates of view any more.\n");
2751
2752   GNUNET_assert (NULL != cli_ctx);
2753   cli_ctx->view_updates_left = 0;
2754   GNUNET_SERVICE_client_continue (cli_ctx->client);
2755   if (GNUNET_YES == cli_ctx->stream_update)
2756   {
2757     destroy_cli_ctx (cli_ctx);
2758   }
2759 }
2760
2761
2762 /**
2763  * Handle RPS request for biased stream from the client.
2764  *
2765  * @param cls Client context
2766  * @param message unused
2767  */
2768 static void
2769 handle_client_stream_request (void *cls,
2770                               const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
2771 {
2772   struct ClientContext *cli_ctx = cls;
2773   (void) msg;
2774
2775   LOG (GNUNET_ERROR_TYPE_DEBUG,
2776        "Client requested peers from biased stream.\n");
2777   cli_ctx->stream_update = GNUNET_YES;
2778
2779   GNUNET_assert (NULL != cli_ctx);
2780   GNUNET_SERVICE_client_continue (cli_ctx->client);
2781 }
2782
2783
2784 /**
2785  * Handle a CHECK_LIVE message from another peer.
2786  *
2787  * This does nothing. But without calling #GNUNET_CADET_receive_done()
2788  * the channel is blocked for all other communication.
2789  *
2790  * @param cls Closure
2791  * @param msg The message header
2792  */
2793 static void
2794 handle_peer_check (void *cls,
2795                    const struct GNUNET_MessageHeader *msg)
2796 {
2797   const struct ChannelCtx *channel_ctx = cls;
2798   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2799   (void) msg;
2800
2801   LOG (GNUNET_ERROR_TYPE_DEBUG,
2802       "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2803   GNUNET_STATISTICS_update (stats,
2804                             "# pending liveliness checks",
2805                             -1,
2806                             GNUNET_NO);
2807
2808   GNUNET_CADET_receive_done (channel_ctx->channel);
2809 }
2810
2811 /**
2812  * Handle a PUSH message from another peer.
2813  *
2814  * Check the proof of work and store the PeerID
2815  * in the temporary list for pushed PeerIDs.
2816  *
2817  * @param cls Closure
2818  * @param msg The message header
2819  */
2820 static void
2821 handle_peer_push (void *cls,
2822                   const struct GNUNET_MessageHeader *msg)
2823 {
2824   const struct ChannelCtx *channel_ctx = cls;
2825   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2826   (void) msg;
2827
2828   // (check the proof of work (?))
2829
2830   LOG (GNUNET_ERROR_TYPE_DEBUG,
2831        "Received PUSH (%s)\n",
2832        GNUNET_i2s (peer));
2833   GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
2834
2835   #ifdef ENABLE_MALICIOUS
2836   struct AttackedPeer *tmp_att_peer;
2837
2838   if ( (1 == mal_type) ||
2839        (3 == mal_type) )
2840   { /* Try to maximise representation */
2841     tmp_att_peer = GNUNET_new (struct AttackedPeer);
2842     tmp_att_peer->peer_id = *peer;
2843     if (NULL == att_peer_set)
2844       att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2845     if (GNUNET_NO ==
2846         GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2847                                                 peer))
2848     {
2849       GNUNET_CONTAINER_DLL_insert (att_peers_head,
2850                                    att_peers_tail,
2851                                    tmp_att_peer);
2852       add_peer_array_to_set (peer, 1, att_peer_set);
2853     }
2854     else
2855     {
2856       GNUNET_free (tmp_att_peer);
2857     }
2858   }
2859
2860
2861   else if (2 == mal_type)
2862   {
2863     /* We attack one single well-known peer - simply ignore */
2864   }
2865   #endif /* ENABLE_MALICIOUS */
2866
2867   /* Add the sending peer to the push_map */
2868   CustomPeerMap_put (push_map, peer);
2869
2870   GNUNET_break_op (check_peer_known (peer));
2871   GNUNET_CADET_receive_done (channel_ctx->channel);
2872 }
2873
2874
2875 /**
2876  * Handle PULL REQUEST request message from another peer.
2877  *
2878  * Reply with the view of PeerIDs.
2879  *
2880  * @param cls Closure
2881  * @param msg The message header
2882  */
2883 static void
2884 handle_peer_pull_request (void *cls,
2885                           const struct GNUNET_MessageHeader *msg)
2886 {
2887   const struct ChannelCtx *channel_ctx = cls;
2888   const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2889   const struct GNUNET_PeerIdentity *view_array;
2890   (void) msg;
2891
2892   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
2893   GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
2894
2895   #ifdef ENABLE_MALICIOUS
2896   if (1 == mal_type
2897       || 3 == mal_type)
2898   { /* Try to maximise representation */
2899     send_pull_reply (peer, mal_peers, num_mal_peers);
2900   }
2901
2902   else if (2 == mal_type)
2903   { /* Try to partition network */
2904     if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2905     {
2906       send_pull_reply (peer, mal_peers, num_mal_peers);
2907     }
2908   }
2909   #endif /* ENABLE_MALICIOUS */
2910
2911   GNUNET_break_op (check_peer_known (peer));
2912   GNUNET_CADET_receive_done (channel_ctx->channel);
2913   view_array = View_get_as_array ();
2914   send_pull_reply (peer, view_array, View_size ());
2915 }
2916
2917
2918 /**
2919  * Check whether we sent a corresponding request and
2920  * whether this reply is the first one.
2921  *
2922  * @param cls Closure
2923  * @param msg The message header
2924  */
2925 static int
2926 check_peer_pull_reply (void *cls,
2927                        const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
2928 {
2929   struct ChannelCtx *channel_ctx = cls;
2930   struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
2931
2932   if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
2933   {
2934     GNUNET_break_op (0);
2935     return GNUNET_SYSERR;
2936   }
2937
2938   if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2939       sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
2940   {
2941     LOG (GNUNET_ERROR_TYPE_ERROR,
2942         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
2943         ntohl (msg->num_peers),
2944         (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2945             sizeof (struct GNUNET_PeerIdentity));
2946     GNUNET_break_op (0);
2947     return GNUNET_SYSERR;
2948   }
2949
2950   if (GNUNET_YES != check_peer_flag (&sender_ctx->peer_id,
2951                                      Peers_PULL_REPLY_PENDING))
2952   {
2953     LOG (GNUNET_ERROR_TYPE_WARNING,
2954         "Received a pull reply from a peer (%s) we didn't request one from!\n",
2955         GNUNET_i2s (&sender_ctx->peer_id));
2956     GNUNET_STATISTICS_update (stats,
2957                               "# unrequested pull replies",
2958                               1,
2959                               GNUNET_NO);
2960     GNUNET_break_op (0);
2961     return GNUNET_SYSERR;
2962   }
2963   return GNUNET_OK;
2964 }
2965
2966 /**
2967  * Handle PULL REPLY message from another peer.
2968  *
2969  * @param cls Closure
2970  * @param msg The message header
2971  */
2972 static void
2973 handle_peer_pull_reply (void *cls,
2974                         const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
2975 {
2976   const struct ChannelCtx *channel_ctx = cls;
2977   const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
2978   const struct GNUNET_PeerIdentity *peers;
2979   uint32_t i;
2980 #ifdef ENABLE_MALICIOUS
2981   struct AttackedPeer *tmp_att_peer;
2982 #endif /* ENABLE_MALICIOUS */
2983
2984   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
2985   GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
2986
2987   #ifdef ENABLE_MALICIOUS
2988   // We shouldn't even receive pull replies as we're not sending
2989   if (2 == mal_type)
2990   {
2991   }
2992   #endif /* ENABLE_MALICIOUS */
2993
2994   /* Do actual logic */
2995   peers = (const struct GNUNET_PeerIdentity *) &msg[1];
2996
2997   LOG (GNUNET_ERROR_TYPE_DEBUG,
2998        "PULL REPLY received, got following %u peers:\n",
2999        ntohl (msg->num_peers));
3000
3001   for (i = 0; i < ntohl (msg->num_peers); i++)
3002   {
3003     LOG (GNUNET_ERROR_TYPE_DEBUG,
3004          "%u. %s\n",
3005          i,
3006          GNUNET_i2s (&peers[i]));
3007
3008     #ifdef ENABLE_MALICIOUS
3009     if ((NULL != att_peer_set) &&
3010         (1 == mal_type || 3 == mal_type))
3011     { /* Add attacked peer to local list */
3012       // TODO check if we sent a request and this was the first reply
3013       if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3014                                                                &peers[i])
3015           && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3016                                                                   &peers[i]))
3017       {
3018         tmp_att_peer = GNUNET_new (struct AttackedPeer);
3019         tmp_att_peer->peer_id = peers[i];
3020         GNUNET_CONTAINER_DLL_insert (att_peers_head,
3021                                      att_peers_tail,
3022                                      tmp_att_peer);
3023         add_peer_array_to_set (&peers[i], 1, att_peer_set);
3024       }
3025       continue;
3026     }
3027     #endif /* ENABLE_MALICIOUS */
3028     /* Make sure we 'know' about this peer */
3029     (void) insert_peer (&peers[i]);
3030
3031     if (GNUNET_YES == check_peer_valid (&peers[i]))
3032     {
3033       CustomPeerMap_put (pull_map, &peers[i]);
3034     }
3035     else
3036     {
3037       schedule_operation (&peers[i], insert_in_pull_map);
3038       (void) issue_peer_liveliness_check (&peers[i]);
3039     }
3040   }
3041
3042   UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING);
3043   clean_peer (sender);
3044
3045   GNUNET_break_op (check_peer_known (sender));
3046   GNUNET_CADET_receive_done (channel_ctx->channel);
3047 }
3048
3049
3050 /**
3051  * Compute a random delay.
3052  * A uniformly distributed value between mean + spread and mean - spread.
3053  *
3054  * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3055  * It would return a random value between 2 and 6 min.
3056  *
3057  * @param mean the mean
3058  * @param spread the inverse amount of deviation from the mean
3059  */
3060 static struct GNUNET_TIME_Relative
3061 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3062                     unsigned int spread)
3063 {
3064   struct GNUNET_TIME_Relative half_interval;
3065   struct GNUNET_TIME_Relative ret;
3066   unsigned int rand_delay;
3067   unsigned int max_rand_delay;
3068
3069   if (0 == spread)
3070   {
3071     LOG (GNUNET_ERROR_TYPE_WARNING,
3072          "Not accepting spread of 0\n");
3073     GNUNET_break (0);
3074     GNUNET_assert (0);
3075   }
3076   GNUNET_assert (0 != mean.rel_value_us);
3077
3078   /* Compute random time value between spread * mean and spread * mean */
3079   half_interval = GNUNET_TIME_relative_divide (mean, spread);
3080
3081   max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3082   /**
3083    * Compute random value between (0 and 1) * round_interval
3084    * via multiplying round_interval with a 'fraction' (0 to value)/value
3085    */
3086   rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3087   ret = GNUNET_TIME_relative_saturating_multiply (mean,  rand_delay);
3088   ret = GNUNET_TIME_relative_divide   (ret, max_rand_delay);
3089   ret = GNUNET_TIME_relative_add      (ret, half_interval);
3090
3091   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3092     LOG (GNUNET_ERROR_TYPE_WARNING,
3093          "Returning FOREVER_REL\n");
3094
3095   return ret;
3096 }
3097
3098
3099 /**
3100  * Send single pull request
3101  *
3102  * @param peer_id the peer to send the pull request to.
3103  */
3104 static void
3105 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3106 {
3107   struct GNUNET_MQ_Envelope *ev;
3108
3109   GNUNET_assert (GNUNET_NO == check_peer_flag (peer,
3110                                                      Peers_PULL_REPLY_PENDING));
3111   SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING);
3112
3113   LOG (GNUNET_ERROR_TYPE_DEBUG,
3114        "Going to send PULL REQUEST to peer %s.\n",
3115        GNUNET_i2s (peer));
3116
3117   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3118   send_message (peer, ev, "PULL REQUEST");
3119   GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3120 }
3121
3122
3123 /**
3124  * Send single push
3125  *
3126  * @param peer_id the peer to send the push to.
3127  */
3128 static void
3129 send_push (const struct GNUNET_PeerIdentity *peer_id)
3130 {
3131   struct GNUNET_MQ_Envelope *ev;
3132
3133   LOG (GNUNET_ERROR_TYPE_DEBUG,
3134        "Going to send PUSH to peer %s.\n",
3135        GNUNET_i2s (peer_id));
3136
3137   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3138   send_message (peer_id, ev, "PUSH");
3139   GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3140 }
3141
3142
3143 static void
3144 do_round (void *cls);
3145
3146 static void
3147 do_mal_round (void *cls);
3148
3149 #ifdef ENABLE_MALICIOUS
3150
3151
3152 /**
3153  * @brief This function is called, when the client tells us to act malicious.
3154  * It verifies that @a msg is well-formed.
3155  *
3156  * @param cls the closure (#ClientContext)
3157  * @param msg the message
3158  * @return #GNUNET_OK if @a msg is well-formed
3159  */
3160 static int
3161 check_client_act_malicious (void *cls,
3162                             const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3163 {
3164   struct ClientContext *cli_ctx = cls;
3165   uint16_t msize = ntohs (msg->header.size);
3166   uint32_t num_peers = ntohl (msg->num_peers);
3167
3168   msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3169   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3170        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3171   {
3172     LOG (GNUNET_ERROR_TYPE_ERROR,
3173         "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3174         ntohl (msg->num_peers),
3175         (msize / sizeof (struct GNUNET_PeerIdentity)));
3176     GNUNET_break (0);
3177     GNUNET_SERVICE_client_drop (cli_ctx->client);
3178     return GNUNET_SYSERR;
3179   }
3180   return GNUNET_OK;
3181 }
3182
3183 /**
3184  * Turn RPS service to act malicious.
3185  *
3186  * @param cls Closure
3187  * @param client The client that sent the message
3188  * @param msg The message header
3189  */
3190 static void
3191 handle_client_act_malicious (void *cls,
3192                              const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3193 {
3194   struct ClientContext *cli_ctx = cls;
3195   struct GNUNET_PeerIdentity *peers;
3196   uint32_t num_mal_peers_sent;
3197   uint32_t num_mal_peers_old;
3198
3199   /* Do actual logic */
3200   peers = (struct GNUNET_PeerIdentity *) &msg[1];
3201   mal_type = ntohl (msg->type);
3202   if (NULL == mal_peer_set)
3203     mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3204
3205   LOG (GNUNET_ERROR_TYPE_DEBUG,
3206        "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3207        mal_type,
3208        ntohl (msg->num_peers));
3209
3210   if (1 == mal_type)
3211   { /* Try to maximise representation */
3212     /* Add other malicious peers to those we already know */
3213
3214     num_mal_peers_sent = ntohl (msg->num_peers);
3215     num_mal_peers_old = num_mal_peers;
3216     GNUNET_array_grow (mal_peers,
3217                        num_mal_peers,
3218                        num_mal_peers + num_mal_peers_sent);
3219     GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3220             peers,
3221             num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3222
3223     /* Add all mal peers to mal_peer_set */
3224     add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3225                            num_mal_peers_sent,
3226                            mal_peer_set);
3227
3228     /* Substitute do_round () with do_mal_round () */
3229     GNUNET_SCHEDULER_cancel (do_round_task);
3230     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3231   }
3232
3233   else if ( (2 == mal_type) ||
3234             (3 == mal_type) )
3235   { /* Try to partition the network */
3236     /* Add other malicious peers to those we already know */
3237
3238     num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3239     num_mal_peers_old = num_mal_peers;
3240     GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3241     GNUNET_array_grow (mal_peers,
3242                        num_mal_peers,
3243                        num_mal_peers + num_mal_peers_sent);
3244     if (NULL != mal_peers &&
3245         0 != num_mal_peers)
3246     {
3247       GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3248               peers,
3249               num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3250
3251       /* Add all mal peers to mal_peer_set */
3252       add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3253                              num_mal_peers_sent,
3254                              mal_peer_set);
3255     }
3256
3257     /* Store the one attacked peer */
3258     GNUNET_memcpy (&attacked_peer,
3259             &msg->attacked_peer,
3260             sizeof (struct GNUNET_PeerIdentity));
3261     /* Set the flag of the attacked peer to valid to avoid problems */
3262     if (GNUNET_NO == check_peer_known (&attacked_peer))
3263     {
3264       (void) issue_peer_liveliness_check (&attacked_peer);
3265     }
3266
3267     LOG (GNUNET_ERROR_TYPE_DEBUG,
3268          "Attacked peer is %s\n",
3269          GNUNET_i2s (&attacked_peer));
3270
3271     /* Substitute do_round () with do_mal_round () */
3272     GNUNET_SCHEDULER_cancel (do_round_task);
3273     do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3274   }
3275   else if (0 == mal_type)
3276   { /* Stop acting malicious */
3277     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3278
3279     /* Substitute do_mal_round () with do_round () */
3280     GNUNET_SCHEDULER_cancel (do_round_task);
3281     do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3282   }
3283   else
3284   {
3285     GNUNET_break (0);
3286     GNUNET_SERVICE_client_continue (cli_ctx->client);
3287   }
3288   GNUNET_SERVICE_client_continue (cli_ctx->client);
3289 }
3290
3291
3292 /**
3293  * Send out PUSHes and PULLs maliciously.
3294  *
3295  * This is executed regylary.
3296  */
3297 static void
3298 do_mal_round (void *cls)
3299 {
3300   uint32_t num_pushes;
3301   uint32_t i;
3302   struct GNUNET_TIME_Relative time_next_round;
3303   struct AttackedPeer *tmp_att_peer;
3304   (void) cls;
3305
3306   LOG (GNUNET_ERROR_TYPE_DEBUG,
3307        "Going to execute next round maliciously type %" PRIu32 ".\n",
3308       mal_type);
3309   do_round_task = NULL;
3310   GNUNET_assert (mal_type <= 3);
3311   /* Do malicious actions */
3312   if (1 == mal_type)
3313   { /* Try to maximise representation */
3314
3315     /* The maximum of pushes we're going to send this round */
3316     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3317                                          num_attacked_peers),
3318                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3319
3320     LOG (GNUNET_ERROR_TYPE_DEBUG,
3321          "Going to send %" PRIu32 " pushes\n",
3322          num_pushes);
3323
3324     /* Send PUSHes to attacked peers */
3325     for (i = 0 ; i < num_pushes ; i++)
3326     {
3327       if (att_peers_tail == att_peer_index)
3328         att_peer_index = att_peers_head;
3329       else
3330         att_peer_index = att_peer_index->next;
3331
3332       send_push (&att_peer_index->peer_id);
3333     }
3334
3335     /* Send PULLs to some peers to learn about additional peers to attack */
3336     tmp_att_peer = att_peer_index;
3337     for (i = 0 ; i < num_pushes * alpha ; i++)
3338     {
3339       if (att_peers_tail == tmp_att_peer)
3340         tmp_att_peer = att_peers_head;
3341       else
3342         att_peer_index = tmp_att_peer->next;
3343
3344       send_pull_request (&tmp_att_peer->peer_id);
3345     }
3346   }
3347
3348
3349   else if (2 == mal_type)
3350   { /**
3351      * Try to partition the network
3352      * Send as many pushes to the attacked peer as possible
3353      * That is one push per round as it will ignore more.
3354      */
3355     (void) issue_peer_liveliness_check (&attacked_peer);
3356     if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3357       send_push (&attacked_peer);
3358   }
3359
3360
3361   if (3 == mal_type)
3362   { /* Combined attack */
3363
3364     /* Send PUSH to attacked peers */
3365     if (GNUNET_YES == check_peer_known (&attacked_peer))
3366     {
3367       (void) issue_peer_liveliness_check (&attacked_peer);
3368       if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3369       {
3370         LOG (GNUNET_ERROR_TYPE_DEBUG,
3371             "Goding to send push to attacked peer (%s)\n",
3372             GNUNET_i2s (&attacked_peer));
3373         send_push (&attacked_peer);
3374       }
3375     }
3376     (void) issue_peer_liveliness_check (&attacked_peer);
3377
3378     /* The maximum of pushes we're going to send this round */
3379     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3380                                          num_attacked_peers),
3381                              GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3382
3383     LOG (GNUNET_ERROR_TYPE_DEBUG,
3384          "Going to send %" PRIu32 " pushes\n",
3385          num_pushes);
3386
3387     for (i = 0; i < num_pushes; i++)
3388     {
3389       if (att_peers_tail == att_peer_index)
3390         att_peer_index = att_peers_head;
3391       else
3392         att_peer_index = att_peer_index->next;
3393
3394       send_push (&att_peer_index->peer_id);
3395     }
3396
3397     /* Send PULLs to some peers to learn about additional peers to attack */
3398     tmp_att_peer = att_peer_index;
3399     for (i = 0; i < num_pushes * alpha; i++)
3400     {
3401       if (att_peers_tail == tmp_att_peer)
3402         tmp_att_peer = att_peers_head;
3403       else
3404         att_peer_index = tmp_att_peer->next;
3405
3406       send_pull_request (&tmp_att_peer->peer_id);
3407     }
3408   }
3409
3410   /* Schedule next round */
3411   time_next_round = compute_rand_delay (round_interval, 2);
3412
3413   //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3414   //NULL);
3415   GNUNET_assert (NULL == do_round_task);
3416   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3417                                                 &do_mal_round, NULL);
3418   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3419 }
3420 #endif /* ENABLE_MALICIOUS */
3421
3422 /**
3423  * Send out PUSHes and PULLs, possibly update #view, samplers.
3424  *
3425  * This is executed regylary.
3426  */
3427 static void
3428 do_round (void *cls)
3429 {
3430   unsigned int i;
3431   const struct GNUNET_PeerIdentity *view_array;
3432   unsigned int *permut;
3433   unsigned int a_peers; /* Number of peers we send pushes to */
3434   unsigned int b_peers; /* Number of peers we send pull requests to */
3435   uint32_t first_border;
3436   uint32_t second_border;
3437   struct GNUNET_PeerIdentity peer;
3438   struct GNUNET_PeerIdentity *update_peer;
3439   (void) cls;
3440
3441   LOG (GNUNET_ERROR_TYPE_DEBUG,
3442        "Going to execute next round.\n");
3443   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3444   do_round_task = NULL;
3445   LOG (GNUNET_ERROR_TYPE_DEBUG,
3446        "Printing view:\n");
3447   to_file (file_name_view_log,
3448            "___ new round ___");
3449   view_array = View_get_as_array ();
3450   for (i = 0; i < View_size (); i++)
3451   {
3452     LOG (GNUNET_ERROR_TYPE_DEBUG,
3453          "\t%s\n", GNUNET_i2s (&view_array[i]));
3454     to_file (file_name_view_log,
3455              "=%s\t(do round)",
3456              GNUNET_i2s_full (&view_array[i]));
3457   }
3458
3459
3460   /* Send pushes and pull requests */
3461   if (0 < View_size ())
3462   {
3463     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3464                                            View_size ());
3465
3466     /* Send PUSHes */
3467     a_peers = ceil (alpha * View_size ());
3468
3469     LOG (GNUNET_ERROR_TYPE_DEBUG,
3470          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3471          a_peers, alpha, View_size ());
3472     for (i = 0; i < a_peers; i++)
3473     {
3474       peer = view_array[permut[i]];
3475       // FIXME if this fails schedule/loop this for later
3476       send_push (&peer);
3477     }
3478
3479     /* Send PULL requests */
3480     b_peers = ceil (beta * View_size ());
3481     first_border = a_peers;
3482     second_border = a_peers + b_peers;
3483     if (second_border > View_size ())
3484     {
3485       first_border = View_size () - b_peers;
3486       second_border = View_size ();
3487     }
3488     LOG (GNUNET_ERROR_TYPE_DEBUG,
3489         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3490         b_peers, beta, View_size ());
3491     for (i = first_border; i < second_border; i++)
3492     {
3493       peer = view_array[permut[i]];
3494       if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3495       { // FIXME if this fails schedule/loop this for later
3496         send_pull_request (&peer);
3497       }
3498     }
3499
3500     GNUNET_free (permut);
3501     permut = NULL;
3502   }
3503
3504
3505   /* Update view */
3506   /* TODO see how many peers are in push-/pull- list! */
3507
3508   if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3509       (0 < CustomPeerMap_size (push_map)) &&
3510       (0 < CustomPeerMap_size (pull_map)))
3511   //if (GNUNET_YES) // disable blocking temporarily
3512   { /* If conditions for update are fulfilled, update */
3513     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3514
3515     uint32_t final_size;
3516     uint32_t peers_to_clean_size;
3517     struct GNUNET_PeerIdentity *peers_to_clean;
3518
3519     peers_to_clean = NULL;
3520     peers_to_clean_size = 0;
3521     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3522     GNUNET_memcpy (peers_to_clean,
3523             view_array,
3524             View_size () * sizeof (struct GNUNET_PeerIdentity));
3525
3526     /* Seems like recreating is the easiest way of emptying the peermap */
3527     View_clear ();
3528     to_file (file_name_view_log,
3529              "--- emptied ---");
3530
3531     first_border  = GNUNET_MIN (ceil (alpha * view_size_est_need),
3532                                 CustomPeerMap_size (push_map));
3533     second_border = first_border +
3534                     GNUNET_MIN (floor (beta  * view_size_est_need),
3535                                 CustomPeerMap_size (pull_map));
3536     final_size    = second_border +
3537       ceil ((1 - (alpha + beta)) * view_size_est_need);
3538     LOG (GNUNET_ERROR_TYPE_DEBUG,
3539         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3540         first_border,
3541         second_border,
3542         final_size);
3543
3544     /* Update view with peers received through PUSHes */
3545     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3546                                            CustomPeerMap_size (push_map));
3547     for (i = 0; i < first_border; i++)
3548     {
3549       int inserted;
3550       inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3551                                                                   permut[i]));
3552       if (GNUNET_OK == inserted)
3553       {
3554         clients_notify_stream_peer (1,
3555             CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3556       }
3557       to_file (file_name_view_log,
3558                "+%s\t(push list)",
3559                GNUNET_i2s_full (&view_array[i]));
3560       // TODO change the peer_flags accordingly
3561     }
3562     GNUNET_free (permut);
3563     permut = NULL;
3564
3565     /* Update view with peers received through PULLs */
3566     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3567                                            CustomPeerMap_size (pull_map));
3568     for (i = first_border; i < second_border; i++)
3569     {
3570       int inserted;
3571       inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3572             permut[i - first_border]));
3573       if (GNUNET_OK == inserted)
3574       {
3575         clients_notify_stream_peer (1,
3576             CustomPeerMap_get_peer_by_index (pull_map,
3577                                              permut[i - first_border]));
3578       }
3579       to_file (file_name_view_log,
3580                "+%s\t(pull list)",
3581                GNUNET_i2s_full (&view_array[i]));
3582       // TODO change the peer_flags accordingly
3583     }
3584     GNUNET_free (permut);
3585     permut = NULL;
3586
3587     /* Update view with peers from history */
3588     RPS_sampler_get_n_rand_peers (prot_sampler,
3589                                   final_size - second_border,
3590                                   hist_update,
3591                                   NULL);
3592     // TODO change the peer_flags accordingly
3593
3594     for (i = 0; i < View_size (); i++)
3595       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3596
3597     /* Clean peers that were removed from the view */
3598     for (i = 0; i < peers_to_clean_size; i++)
3599     {
3600       to_file (file_name_view_log,
3601                "-%s",
3602                GNUNET_i2s_full (&peers_to_clean[i]));
3603       clean_peer (&peers_to_clean[i]);
3604     }
3605
3606     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3607     clients_notify_view_update();
3608   } else {
3609     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3610     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3611     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3612         !(0 >= CustomPeerMap_size (pull_map)))
3613       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3614     if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3615         (0 >= CustomPeerMap_size (pull_map)))
3616       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3617     if (0 >= CustomPeerMap_size (push_map) &&
3618         !(0 >= CustomPeerMap_size (pull_map)))
3619       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3620     if (0 >= CustomPeerMap_size (push_map) &&
3621         (0 >= CustomPeerMap_size (pull_map)))
3622       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3623     if (0 >= CustomPeerMap_size (pull_map) &&
3624         CustomPeerMap_size (push_map) > alpha * View_size () &&
3625         0 >= CustomPeerMap_size (push_map))
3626       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3627   }
3628   // TODO independent of that also get some peers from CADET_get_peers()?
3629   GNUNET_STATISTICS_set (stats,
3630       "# peers in push map at end of round",
3631       CustomPeerMap_size (push_map),
3632       GNUNET_NO);
3633   GNUNET_STATISTICS_set (stats,
3634       "# peers in pull map at end of round",
3635       CustomPeerMap_size (pull_map),
3636       GNUNET_NO);
3637   GNUNET_STATISTICS_set (stats,
3638       "# peers in view at end of round",
3639       View_size (),
3640       GNUNET_NO);
3641
3642   LOG (GNUNET_ERROR_TYPE_DEBUG,
3643        "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3644        CustomPeerMap_size (push_map),
3645        CustomPeerMap_size (pull_map),
3646        alpha,
3647        View_size (),
3648        alpha * View_size ());
3649
3650   /* Update samplers */
3651   for (i = 0; i < CustomPeerMap_size (push_map); i++)
3652   {
3653     update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3654     LOG (GNUNET_ERROR_TYPE_DEBUG,
3655          "Updating with peer %s from push list\n",
3656          GNUNET_i2s (update_peer));
3657     insert_in_sampler (NULL, update_peer);
3658     clean_peer (update_peer); /* This cleans only if it is not in the view */
3659   }
3660
3661   for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3662   {
3663     LOG (GNUNET_ERROR_TYPE_DEBUG,
3664          "Updating with peer %s from pull list\n",
3665          GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3666     insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3667     /* This cleans only if it is not in the view */
3668     clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3669   }
3670
3671
3672   /* Empty push/pull lists */
3673   CustomPeerMap_clear (push_map);
3674   CustomPeerMap_clear (pull_map);
3675
3676   GNUNET_STATISTICS_set (stats,
3677                          "view size",
3678                          View_size(),
3679                          GNUNET_NO);
3680
3681   struct GNUNET_TIME_Relative time_next_round;
3682
3683   time_next_round = compute_rand_delay (round_interval, 2);
3684
3685   /* Schedule next round */
3686   do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3687                                                 &do_round, NULL);
3688   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3689 }
3690
3691
3692 /**
3693  * This is called from GNUNET_CADET_get_peers().
3694  *
3695  * It is called on every peer(ID) that cadet somehow has contact with.
3696  * We use those to initialise the sampler.
3697  */
3698 void
3699 init_peer_cb (void *cls,
3700               const struct GNUNET_PeerIdentity *peer,
3701               int tunnel, // "Do we have a tunnel towards this peer?"
3702               unsigned int n_paths, // "Number of known paths towards this peer"
3703               unsigned int best_path) // "How long is the best path?
3704                                       // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3705 {
3706   (void) cls;
3707   (void) tunnel;
3708   (void) n_paths;
3709   (void) best_path;
3710
3711   if (NULL != peer)
3712   {
3713     LOG (GNUNET_ERROR_TYPE_DEBUG,
3714          "Got peer_id %s from cadet\n",
3715          GNUNET_i2s (peer));
3716     got_peer (peer);
3717   }
3718 }
3719
3720 /**
3721  * @brief Iterator function over stored, valid peers.
3722  *
3723  * We initialise the sampler with those.
3724  *
3725  * @param cls the closure
3726  * @param peer the peer id
3727  * @return #GNUNET_YES if we should continue to
3728  *         iterate,
3729  *         #GNUNET_NO if not.
3730  */
3731 static int
3732 valid_peers_iterator (void *cls,
3733                       const struct GNUNET_PeerIdentity *peer)
3734 {
3735   (void) cls;
3736
3737   if (NULL != peer)
3738   {
3739     LOG (GNUNET_ERROR_TYPE_DEBUG,
3740          "Got stored, valid peer %s\n",
3741          GNUNET_i2s (peer));
3742     got_peer (peer);
3743   }
3744   return GNUNET_YES;
3745 }
3746
3747
3748 /**
3749  * Iterator over peers from peerinfo.
3750  *
3751  * @param cls closure
3752  * @param peer id of the peer, NULL for last call
3753  * @param hello hello message for the peer (can be NULL)
3754  * @param error message
3755  */
3756 void
3757 process_peerinfo_peers (void *cls,
3758                         const struct GNUNET_PeerIdentity *peer,
3759                         const struct GNUNET_HELLO_Message *hello,
3760                         const char *err_msg)
3761 {
3762   (void) cls;
3763   (void) hello;
3764   (void) err_msg;
3765
3766   if (NULL != peer)
3767   {
3768     LOG (GNUNET_ERROR_TYPE_DEBUG,
3769          "Got peer_id %s from peerinfo\n",
3770          GNUNET_i2s (peer));
3771     got_peer (peer);
3772   }
3773 }
3774
3775
3776 /**
3777  * Task run during shutdown.
3778  *
3779  * @param cls unused
3780  */
3781 static void
3782 shutdown_task (void *cls)
3783 {
3784   struct ClientContext *client_ctx;
3785   (void) cls;
3786
3787   in_shutdown = GNUNET_YES;
3788
3789   LOG (GNUNET_ERROR_TYPE_DEBUG,
3790        "RPS is going down\n");
3791
3792   /* Clean all clients */
3793   for (client_ctx = cli_ctx_head;
3794        NULL != cli_ctx_head;
3795        client_ctx = cli_ctx_head)
3796   {
3797     destroy_cli_ctx (client_ctx);
3798   }
3799   GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3800   GNUNET_PEERINFO_disconnect (peerinfo_handle);
3801   peerinfo_handle = NULL;
3802   if (NULL != do_round_task)
3803   {
3804     GNUNET_SCHEDULER_cancel (do_round_task);
3805     do_round_task = NULL;
3806   }
3807
3808   peers_terminate ();
3809
3810   GNUNET_NSE_disconnect (nse);
3811   RPS_sampler_destroy (prot_sampler);
3812   GNUNET_CADET_close_port (cadet_port);
3813   GNUNET_CADET_disconnect (cadet_handle);
3814   cadet_handle = NULL;
3815   View_destroy ();
3816   CustomPeerMap_destroy (push_map);
3817   CustomPeerMap_destroy (pull_map);
3818   if (NULL != stats)
3819   {
3820     GNUNET_STATISTICS_destroy (stats,
3821                                GNUNET_NO);
3822     stats = NULL;
3823   }
3824 #ifdef ENABLE_MALICIOUS
3825   struct AttackedPeer *tmp_att_peer;
3826   /* it is ok to free this const during shutdown: */
3827   GNUNET_free ((char *) file_name_view_log);
3828 #ifdef TO_FILE
3829   GNUNET_free ((char *) file_name_observed_log);
3830   GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
3831 #endif /* TO_FILE */
3832   GNUNET_array_grow (mal_peers,
3833                      num_mal_peers,
3834                      0);
3835   if (NULL != mal_peer_set)
3836     GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3837   if (NULL != att_peer_set)
3838     GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
3839   while (NULL != att_peers_head)
3840   {
3841     tmp_att_peer = att_peers_head;
3842     GNUNET_CONTAINER_DLL_remove (att_peers_head,
3843                                  att_peers_tail,
3844                                  tmp_att_peer);
3845     GNUNET_free (tmp_att_peer);
3846   }
3847 #endif /* ENABLE_MALICIOUS */
3848 }
3849
3850
3851 /**
3852  * Handle client connecting to the service.
3853  *
3854  * @param cls NULL
3855  * @param client the new client
3856  * @param mq the message queue of @a client
3857  * @return @a client
3858  */
3859 static void *
3860 client_connect_cb (void *cls,
3861                    struct GNUNET_SERVICE_Client *client,
3862                    struct GNUNET_MQ_Handle *mq)
3863 {
3864   struct ClientContext *cli_ctx;
3865   (void) cls;
3866
3867   LOG (GNUNET_ERROR_TYPE_DEBUG,
3868        "Client connected\n");
3869   if (NULL == client)
3870     return client; /* Server was destroyed before a client connected. Shutting down */
3871   cli_ctx = GNUNET_new (struct ClientContext);
3872   cli_ctx->mq = mq;
3873   cli_ctx->view_updates_left = -1;
3874   cli_ctx->stream_update = GNUNET_NO;
3875   cli_ctx->client = client;
3876   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
3877                                cli_ctx_tail,
3878                                cli_ctx);
3879   return cli_ctx;
3880 }
3881
3882 /**
3883  * Callback called when a client disconnected from the service
3884  *
3885  * @param cls closure for the service
3886  * @param c the client that disconnected
3887  * @param internal_cls should be equal to @a c
3888  */
3889 static void
3890 client_disconnect_cb (void *cls,
3891                       struct GNUNET_SERVICE_Client *client,
3892                       void *internal_cls)
3893 {
3894   struct ClientContext *cli_ctx = internal_cls;
3895
3896   (void) cls;
3897   GNUNET_assert (client == cli_ctx->client);
3898   if (NULL == client)
3899   {/* shutdown task - destroy all clients */
3900     while (NULL != cli_ctx_head)
3901       destroy_cli_ctx (cli_ctx_head);
3902   }
3903   else
3904   { /* destroy this client */
3905     LOG (GNUNET_ERROR_TYPE_DEBUG,
3906         "Client disconnected. Destroy its context.\n");
3907     destroy_cli_ctx (cli_ctx);
3908   }
3909 }
3910
3911
3912 /**
3913  * Handle random peer sampling clients.
3914  *
3915  * @param cls closure
3916  * @param c configuration to use
3917  * @param service the initialized service
3918  */
3919 static void
3920 run (void *cls,
3921      const struct GNUNET_CONFIGURATION_Handle *c,
3922      struct GNUNET_SERVICE_Handle *service)
3923 {
3924   char *fn_valid_peers;
3925
3926   (void) cls;
3927   (void) service;
3928   GNUNET_log_setup ("rps",
3929                     GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
3930                     NULL);
3931   cfg = c;
3932   /* Get own ID */
3933   GNUNET_CRYPTO_get_peer_identity (cfg,
3934                                    &own_identity); // TODO check return value
3935   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3936               "STARTING SERVICE (rps) for peer [%s]\n",
3937               GNUNET_i2s (&own_identity));
3938 #ifdef ENABLE_MALICIOUS
3939   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3940               "Malicious execution compiled in.\n");
3941 #endif /* ENABLE_MALICIOUS */
3942
3943   /* Get time interval from the configuration */
3944   if (GNUNET_OK !=
3945       GNUNET_CONFIGURATION_get_value_time (cfg,
3946                                            "RPS",
3947                                            "ROUNDINTERVAL",
3948                                            &round_interval))
3949   {
3950     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3951                                "RPS", "ROUNDINTERVAL");
3952     GNUNET_SCHEDULER_shutdown ();
3953     return;
3954   }
3955
3956   /* Get initial size of sampler/view from the configuration */
3957   if (GNUNET_OK !=
3958       GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
3959         (long long unsigned int *) &sampler_size_est_min))
3960   {
3961     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3962                                "RPS", "MINSIZE");
3963     GNUNET_SCHEDULER_shutdown ();
3964     return;
3965   }
3966   sampler_size_est_need = sampler_size_est_min;
3967   view_size_est_min = sampler_size_est_min;
3968   LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
3969
3970   if (GNUNET_OK !=
3971       GNUNET_CONFIGURATION_get_value_filename (cfg,
3972                                                "rps",
3973                                                "FILENAME_VALID_PEERS",
3974                                                &fn_valid_peers))
3975   {
3976     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3977                                "rps", "FILENAME_VALID_PEERS");
3978   }
3979
3980
3981   View_create (view_size_est_min);
3982   GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_min, GNUNET_NO);
3983
3984   /* file_name_view_log */
3985   file_name_view_log = store_prefix_file_name (&own_identity, "view");
3986   #ifdef TO_FILE
3987   file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
3988   observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3989   #endif /* TO_FILE */
3990
3991   /* connect to NSE */
3992   nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
3993
3994
3995   alpha = 0.45;
3996   beta  = 0.45;
3997
3998
3999   /* Initialise cadet */
4000   /* There exists a copy-paste-clone in get_channel() */
4001   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4002     GNUNET_MQ_hd_fixed_size (peer_check,
4003                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4004                              struct GNUNET_MessageHeader,
4005                              NULL),
4006     GNUNET_MQ_hd_fixed_size (peer_push,
4007                              GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4008                              struct GNUNET_MessageHeader,
4009                              NULL),
4010     GNUNET_MQ_hd_fixed_size (peer_pull_request,
4011                              GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4012                              struct GNUNET_MessageHeader,
4013                              NULL),
4014     GNUNET_MQ_hd_var_size (peer_pull_reply,
4015                            GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4016                            struct GNUNET_RPS_P2P_PullReplyMessage,
4017                            NULL),
4018     GNUNET_MQ_handler_end ()
4019   };
4020
4021   cadet_handle = GNUNET_CADET_connect (cfg);
4022   GNUNET_assert (NULL != cadet_handle);
4023   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4024                       strlen (GNUNET_APPLICATION_PORT_RPS),
4025                       &port);
4026   cadet_port = GNUNET_CADET_open_port (cadet_handle,
4027                                        &port,
4028                                        &handle_inbound_channel, /* Connect handler */
4029                                        NULL, /* cls */
4030                                        NULL, /* WindowSize handler */
4031                                        &cleanup_destroyed_channel, /* Disconnect handler */
4032                                        cadet_handlers);
4033   if (NULL == cadet_port)
4034   {
4035     LOG (GNUNET_ERROR_TYPE_ERROR,
4036         "Cadet port `%s' is already in use.\n",
4037         GNUNET_APPLICATION_PORT_RPS);
4038     GNUNET_assert (0);
4039   }
4040
4041
4042   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4043   initialise_peers (fn_valid_peers, cadet_handle);
4044   GNUNET_free (fn_valid_peers);
4045
4046   /* Initialise sampler */
4047   struct GNUNET_TIME_Relative half_round_interval;
4048   struct GNUNET_TIME_Relative  max_round_interval;
4049
4050   half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4051   max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4052
4053   prot_sampler =   RPS_sampler_init     (sampler_size_est_need, max_round_interval);
4054
4055   /* Initialise push and pull maps */
4056   push_map = CustomPeerMap_create (4);
4057   pull_map = CustomPeerMap_create (4);
4058
4059
4060   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4061   //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4062   // TODO send push/pull to each of those peers?
4063   // TODO read stored valid peers from last run
4064   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4065   get_valid_peers (valid_peers_iterator, NULL);
4066
4067   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4068                                                    GNUNET_NO,
4069                                                    process_peerinfo_peers,
4070                                                    NULL);
4071
4072   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4073
4074   do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4075   LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4076
4077   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4078   stats = GNUNET_STATISTICS_create ("rps", cfg);
4079
4080 }
4081
4082
4083 /**
4084  * Define "main" method using service macro.
4085  */
4086 GNUNET_SERVICE_MAIN
4087 ("rps",
4088  GNUNET_SERVICE_OPTION_NONE,
4089  &run,
4090  &client_connect_cb,
4091  &client_disconnect_cb,
4092  NULL,
4093  GNUNET_MQ_hd_var_size (client_seed,
4094    GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4095    struct GNUNET_RPS_CS_SeedMessage,
4096    NULL),
4097 #ifdef ENABLE_MALICIOUS
4098  GNUNET_MQ_hd_var_size (client_act_malicious,
4099    GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4100    struct GNUNET_RPS_CS_ActMaliciousMessage,
4101    NULL),
4102 #endif /* ENABLE_MALICIOUS */
4103  GNUNET_MQ_hd_fixed_size (client_view_request,
4104    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4105    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4106    NULL),
4107  GNUNET_MQ_hd_fixed_size (client_view_cancel,
4108    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4109    struct GNUNET_MessageHeader,
4110    NULL),
4111  GNUNET_MQ_hd_fixed_size (client_stream_request,
4112    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4113    struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4114    NULL),
4115  GNUNET_MQ_handler_end());
4116
4117 /* end of gnunet-service-rps.c */