2 This file is part of GNUnet.
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file rps/gnunet-service-rps.c
21 * @brief rps service implementation
22 * @author Julius Bünger
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
42 // TODO modify @brief in every file
44 // TODO check for overflows
46 // TODO align message structs
48 // TODO connect to friends
50 // TODO store peers somewhere persistent
52 // TODO blacklist? (-> mal peer detection on top of brahms)
54 // hist_size_init, hist_size_max
59 static const struct GNUNET_CONFIGURATION_Handle *cfg;
62 * Handle to the statistics service.
64 static struct GNUNET_STATISTICS_Handle *stats;
69 static struct GNUNET_PeerIdentity own_identity;
71 static int in_shutdown = GNUNET_NO;
74 * @brief Port used for cadet.
76 * Don't compute multiple times through making it global
78 static struct GNUNET_HashCode port;
80 /***********************************************************************
81 * Old gnunet-service-rps_peers.c
82 ***********************************************************************/
85 * Set a peer flag of given peer context.
87 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
90 * Get peer flag of given peer context.
92 #define check_peer_flag_set(peer_ctx, mask)\
93 ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
96 * Unset flag of given peer context.
98 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
101 * Get channel flag of given channel context.
103 #define check_channel_flag_set(channel_flags, mask)\
104 ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
107 * Unset flag of given channel context.
109 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
114 * Pending operation on peer consisting of callback and closure
116 * When an operation cannot be executed right now this struct is used to store
117 * the callback and closure for later execution.
133 * List containing all messages that are yet to be send
135 * This is used to keep track of all messages that have not been sent yet. When
136 * a peer is to be removed the pending messages can be removed properly.
138 struct PendingMessage
143 struct PendingMessage *next;
144 struct PendingMessage *prev;
147 * The envelope to the corresponding message
149 struct GNUNET_MQ_Envelope *ev;
152 * The corresponding context
154 struct PeerContext *peer_ctx;
163 * @brief Context for a channel
168 * Struct used to keep track of other peer's status
170 * This is stored in a multipeermap.
171 * It contains information such as cadet channels, a message queue for sending,
172 * status about the channels, the pending operations on this peer and some flags
173 * about the status of the peer itself. (live, valid, ...)
178 * Message queue open to client
180 struct GNUNET_MQ_Handle *mq;
183 * Channel open to client.
185 struct ChannelCtx *send_channel_ctx;
188 * Channel open from client.
190 struct ChannelCtx *recv_channel_ctx;
193 * Array of pending operations on this peer.
195 struct PeerPendingOp *pending_ops;
198 * Handle to the callback given to cadet_ntfy_tmt_rdy()
200 * To be canceled on shutdown.
202 struct PendingMessage *liveliness_check_pending;
205 * Number of pending operations.
207 unsigned int num_pending_ops;
210 * Identity of the peer
212 struct GNUNET_PeerIdentity peer_id;
215 * Flags indicating status of peer
220 * Last time we received something from that peer.
222 struct GNUNET_TIME_Absolute last_message_recv;
225 * Last time we received a keepalive message.
227 struct GNUNET_TIME_Absolute last_keepalive;
230 * DLL with all messages that are yet to be sent
232 struct PendingMessage *pending_messages_head;
233 struct PendingMessage *pending_messages_tail;
236 * This is pobably followed by 'statistical' data (when we first saw
237 * it, how did we get its ID, how many pushes (in a timeinterval),
243 * @brief Closure to #valid_peer_iterator
245 struct PeersIteratorCls
250 PeersIterator iterator;
253 * Closure to iterator
259 * @brief Context for a channel
264 * @brief Meant to be used in a DLL
266 struct ChannelCtx *next;
267 struct ChannelCtx *prev;
270 * @brief The channel itself
272 struct GNUNET_CADET_Channel *channel;
275 * @brief The peer context associated with the channel
277 struct PeerContext *peer_ctx;
280 * @brief When channel destruction needs to be delayed (because it is called
281 * from within the cadet routine of another channel destruction) this task
282 * refers to the respective _SCHEDULER_Task.
284 struct GNUNET_SCHEDULER_Task *destruction_task;
288 * @brief Hashmap of valid peers.
290 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
293 * @brief Maximum number of valid peers to keep.
294 * TODO read from config
296 static uint32_t num_valid_peers_max = UINT32_MAX;
299 * @brief Filename of the file that stores the valid peers persistently.
301 static char *filename_valid_peers;
304 * Set of all peers to keep track of them.
306 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
311 static struct GNUNET_CADET_Handle *cadet_handle;
315 * @brief Get the #PeerContext associated with a peer
317 * @param peer the peer id
319 * @return the #PeerContext
321 static struct PeerContext *
322 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
324 struct PeerContext *ctx;
327 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
328 GNUNET_assert (GNUNET_YES == ret);
329 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
330 GNUNET_assert (NULL != ctx);
335 * @brief Check whether we have information about the given peer.
337 * FIXME probably deprecated. Make this the new _online.
339 * @param peer peer in question
341 * @return #GNUNET_YES if peer is known
342 * #GNUNET_NO if peer is not knwon
345 check_peer_known (const struct GNUNET_PeerIdentity *peer)
347 if (NULL != peer_map)
349 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
358 * @brief Create a new #PeerContext and insert it into the peer map
360 * @param peer the peer to create the #PeerContext for
362 * @return the #PeerContext
364 static struct PeerContext *
365 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
367 struct PeerContext *ctx;
370 GNUNET_assert (GNUNET_NO == check_peer_known (peer));
372 ctx = GNUNET_new (struct PeerContext);
373 ctx->peer_id = *peer;
374 ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
375 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
376 GNUNET_assert (GNUNET_OK == ret);
382 * @brief Create or get a #PeerContext
384 * @param peer the peer to get the associated context to
386 * @return the context
388 static struct PeerContext *
389 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
391 if (GNUNET_NO == check_peer_known (peer))
393 return create_peer_ctx (peer);
395 return get_peer_ctx (peer);
400 * @brief Check whether we have a connection to this @a peer
402 * Also sets the #Peers_ONLINE flag accordingly
404 * @param peer the peer in question
406 * @return #GNUNET_YES if we are connected
407 * #GNUNET_NO otherwise
410 check_connected (const struct GNUNET_PeerIdentity *peer)
412 struct PeerContext *peer_ctx;
414 /* If we don't know about this peer we don't know whether it's online */
415 if (GNUNET_NO == check_peer_known (peer))
419 /* Get the context */
420 peer_ctx = get_peer_ctx (peer);
421 /* If we have no channel to this peer we don't know whether it's online */
422 if ( (NULL == peer_ctx->send_channel_ctx) &&
423 (NULL == peer_ctx->recv_channel_ctx) )
425 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
428 /* Otherwise (if we have a channel, we know that it's online */
429 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
435 * @brief The closure to #get_rand_peer_iterator.
437 struct GetRandPeerIteratorCls
440 * @brief The index of the peer to return.
441 * Will be decreased until 0.
442 * Then current peer is returned.
447 * @brief Pointer to peer to return.
449 const struct GNUNET_PeerIdentity *peer;
454 * @brief Iterator function for #get_random_peer_from_peermap.
456 * Implements #GNUNET_CONTAINER_PeerMapIterator.
457 * Decreases the index until the index is null.
458 * Then returns the current peer.
460 * @param cls the #GetRandPeerIteratorCls containing index and peer
461 * @param peer current peer
462 * @param value unused
464 * @return #GNUNET_YES if we should continue to
469 get_rand_peer_iterator (void *cls,
470 const struct GNUNET_PeerIdentity *peer,
473 struct GetRandPeerIteratorCls *iterator_cls = cls;
474 if (0 >= iterator_cls->index)
476 iterator_cls->peer = peer;
479 iterator_cls->index--;
485 * @brief Get a random peer from @a peer_map
487 * @param peer_map the peer_map to get the peer from
489 * @return a random peer
491 static const struct GNUNET_PeerIdentity *
492 get_random_peer_from_peermap (const struct
493 GNUNET_CONTAINER_MultiPeerMap *peer_map)
495 struct GetRandPeerIteratorCls *iterator_cls;
496 const struct GNUNET_PeerIdentity *ret;
498 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
499 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
500 GNUNET_CONTAINER_multipeermap_size (peer_map));
501 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
502 get_rand_peer_iterator,
504 ret = iterator_cls->peer;
505 GNUNET_free (iterator_cls);
511 * @brief Add a given @a peer to valid peers.
513 * If valid peers are already #num_valid_peers_max, delete a peer previously.
515 * @param peer the peer that is added to the valid peers.
517 * @return #GNUNET_YES if no other peer had to be removed
518 * #GNUNET_NO otherwise
521 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
523 const struct GNUNET_PeerIdentity *rand_peer;
527 while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
529 rand_peer = get_random_peer_from_peermap (valid_peers);
530 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
533 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
534 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
539 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
542 * @brief Set the peer flag to living and
543 * call the pending operations on this peer.
545 * Also adds peer to #valid_peers.
547 * @param peer_ctx the #PeerContext of the peer to set live
550 set_peer_live (struct PeerContext *peer_ctx)
552 struct GNUNET_PeerIdentity *peer;
555 peer = &peer_ctx->peer_id;
556 LOG (GNUNET_ERROR_TYPE_DEBUG,
557 "Peer %s is live and valid, calling %i pending operations on it\n",
559 peer_ctx->num_pending_ops);
561 if (NULL != peer_ctx->liveliness_check_pending)
563 LOG (GNUNET_ERROR_TYPE_DEBUG,
564 "Removing pending liveliness check for peer %s\n",
565 GNUNET_i2s (&peer_ctx->peer_id));
566 // TODO wait until cadet sets mq->cancel_impl
567 //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
568 remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
569 peer_ctx->liveliness_check_pending = NULL;
572 (void) add_valid_peer (peer);
573 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
575 /* Call pending operations */
576 for (i = 0; i < peer_ctx->num_pending_ops; i++)
578 peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
580 GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
584 cleanup_destroyed_channel (void *cls,
585 const struct GNUNET_CADET_Channel *channel);
587 /* Declaration of handlers */
589 handle_peer_check (void *cls,
590 const struct GNUNET_MessageHeader *msg);
593 handle_peer_push (void *cls,
594 const struct GNUNET_MessageHeader *msg);
597 handle_peer_pull_request (void *cls,
598 const struct GNUNET_MessageHeader *msg);
601 check_peer_pull_reply (void *cls,
602 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
605 handle_peer_pull_reply (void *cls,
606 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
608 /* End declaration of handlers */
611 * @brief Allocate memory for a new channel context and insert it into DLL
613 * @param peer_ctx context of the according peer
615 * @return The channel context
617 static struct ChannelCtx *
618 add_channel_ctx (struct PeerContext *peer_ctx)
620 struct ChannelCtx *channel_ctx;
621 channel_ctx = GNUNET_new (struct ChannelCtx);
622 channel_ctx->peer_ctx = peer_ctx;
628 * @brief Remove the channel context from the DLL and free the memory.
630 * @param channel_ctx The channel context.
633 remove_channel_ctx (struct ChannelCtx *channel_ctx)
635 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
637 if (channel_ctx == peer_ctx->send_channel_ctx)
639 GNUNET_free (channel_ctx);
640 peer_ctx->send_channel_ctx = NULL;
643 else if (channel_ctx == peer_ctx->recv_channel_ctx)
645 GNUNET_free (channel_ctx);
646 peer_ctx->recv_channel_ctx = NULL;
656 * @brief Get the channel of a peer. If not existing, create.
658 * @param peer the peer id
659 * @return the #GNUNET_CADET_Channel used to send data to @a peer
661 struct GNUNET_CADET_Channel *
662 get_channel (const struct GNUNET_PeerIdentity *peer)
664 struct PeerContext *peer_ctx;
665 struct GNUNET_PeerIdentity *ctx_peer;
666 /* There exists a copy-paste-clone in run() */
667 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
668 GNUNET_MQ_hd_fixed_size (peer_check,
669 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
670 struct GNUNET_MessageHeader,
672 GNUNET_MQ_hd_fixed_size (peer_push,
673 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
674 struct GNUNET_MessageHeader,
676 GNUNET_MQ_hd_fixed_size (peer_pull_request,
677 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
678 struct GNUNET_MessageHeader,
680 GNUNET_MQ_hd_var_size (peer_pull_reply,
681 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
682 struct GNUNET_RPS_P2P_PullReplyMessage,
684 GNUNET_MQ_handler_end ()
688 peer_ctx = get_peer_ctx (peer);
689 if (NULL == peer_ctx->send_channel_ctx)
691 LOG (GNUNET_ERROR_TYPE_DEBUG,
692 "Trying to establish channel to peer %s\n",
694 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
696 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
697 peer_ctx->send_channel_ctx->channel =
698 GNUNET_CADET_channel_create (cadet_handle,
699 peer_ctx->send_channel_ctx, /* context */
702 GNUNET_CADET_OPTION_RELIABLE,
703 NULL, /* WindowSize handler */
704 cleanup_destroyed_channel, /* Disconnect handler */
707 GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
708 GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
709 return peer_ctx->send_channel_ctx->channel;
714 * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
716 * If we already have a message queue open to this client,
717 * simply return it, otherways create one.
719 * @param peer the peer to get the mq to
720 * @return the #GNUNET_MQ_Handle
722 static struct GNUNET_MQ_Handle *
723 get_mq (const struct GNUNET_PeerIdentity *peer)
725 struct PeerContext *peer_ctx;
727 peer_ctx = get_peer_ctx (peer);
729 if (NULL == peer_ctx->mq)
731 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
737 * @brief Add an envelope to a message passed to mq to list of pending messages
739 * @param peer peer the message was sent to
740 * @param ev envelope to the message
741 * @param type type of the message to be sent
742 * @return pointer to pending message
744 static struct PendingMessage *
745 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
746 struct GNUNET_MQ_Envelope *ev,
749 struct PendingMessage *pending_msg;
750 struct PeerContext *peer_ctx;
752 peer_ctx = get_peer_ctx (peer);
753 pending_msg = GNUNET_new (struct PendingMessage);
754 pending_msg->ev = ev;
755 pending_msg->peer_ctx = peer_ctx;
756 pending_msg->type = type;
757 GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
758 peer_ctx->pending_messages_tail,
765 * @brief Remove a pending message from the respective DLL
767 * @param pending_msg the pending message to remove
768 * @param cancel cancel the pending message, too
771 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
773 struct PeerContext *peer_ctx;
775 peer_ctx = pending_msg->peer_ctx;
776 GNUNET_assert (NULL != peer_ctx);
777 GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
778 peer_ctx->pending_messages_tail,
780 // TODO wait for the cadet implementation of message cancellation
781 //if (GNUNET_YES == cancel)
783 // GNUNET_MQ_send_cancel (pending_msg->ev);
785 GNUNET_free (pending_msg);
790 * @brief This is called in response to the first message we sent as a
793 * @param cls #PeerContext of peer with pending liveliness check
796 mq_liveliness_check_successful (void *cls)
798 struct PeerContext *peer_ctx = cls;
800 if (NULL != peer_ctx->liveliness_check_pending)
802 LOG (GNUNET_ERROR_TYPE_DEBUG,
803 "Liveliness check for peer %s was successfull\n",
804 GNUNET_i2s (&peer_ctx->peer_id));
805 remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
806 peer_ctx->liveliness_check_pending = NULL;
807 set_peer_live (peer_ctx);
812 * Issue a check whether peer is live
814 * @param peer_ctx the context of the peer
817 check_peer_live (struct PeerContext *peer_ctx)
819 LOG (GNUNET_ERROR_TYPE_DEBUG,
820 "Get informed about peer %s getting live\n",
821 GNUNET_i2s (&peer_ctx->peer_id));
823 struct GNUNET_MQ_Handle *mq;
824 struct GNUNET_MQ_Envelope *ev;
826 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
827 peer_ctx->liveliness_check_pending =
828 insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
829 mq = get_mq (&peer_ctx->peer_id);
830 GNUNET_MQ_notify_sent (ev,
831 mq_liveliness_check_successful,
833 GNUNET_MQ_send (mq, ev);
838 * @brief Check whether function of type #PeerOp was already scheduled
840 * The array with pending operations will probably never grow really big, so
841 * iterating over it should be ok.
843 * @param peer the peer to check
844 * @param peer_op the operation (#PeerOp) on the peer
846 * @return #GNUNET_YES if this operation is scheduled on that peer
847 * #GNUNET_NO otherwise
850 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
851 const PeerOp peer_op)
853 const struct PeerContext *peer_ctx;
856 peer_ctx = get_peer_ctx (peer);
857 for (i = 0; i < peer_ctx->num_pending_ops; i++)
858 if (peer_op == peer_ctx->pending_ops[i].op)
864 * @brief Callback for scheduler to destroy a channel
866 * @param cls Context of the channel
869 destroy_channel (struct ChannelCtx *channel_ctx)
871 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
873 if (NULL != channel_ctx->destruction_task)
875 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
876 channel_ctx->destruction_task = NULL;
878 GNUNET_CADET_channel_destroy (channel_ctx->channel);
879 channel_ctx->channel = NULL;
880 remove_channel_ctx (channel_ctx);
884 * @brief Destroy a cadet channel.
886 * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
891 destroy_channel_cb (void *cls)
893 struct ChannelCtx *channel_ctx = cls;
894 channel_ctx->destruction_task = NULL;
895 destroy_channel (channel_ctx);
899 * @brief Schedule the destruction of a channel for immediately afterwards.
901 * In case a channel is to be destroyed from within the callback to the
902 * destruction of another channel (send channel), we cannot call
903 * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
906 * @param channel_ctx channel to be destroyed.
909 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
911 channel_ctx->destruction_task =
912 GNUNET_SCHEDULER_add_now (destroy_channel_cb, channel_ctx);
918 * @param peer the peer to clean
919 * @return #GNUNET_YES if peer was removed
920 * #GNUNET_NO otherwise
923 destroy_peer (struct PeerContext *peer_ctx)
925 GNUNET_assert (NULL != peer_ctx);
926 GNUNET_assert (NULL != peer_map);
928 GNUNET_CONTAINER_multipeermap_contains (peer_map,
933 SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
934 LOG (GNUNET_ERROR_TYPE_DEBUG,
935 "Going to remove peer %s\n",
936 GNUNET_i2s (&peer_ctx->peer_id));
937 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
939 /* Clear list of pending operations */
940 // TODO this probably leaks memory
941 // ('only' the cls to the function. Not sure what to do with it)
942 GNUNET_array_grow (peer_ctx->pending_ops,
943 peer_ctx->num_pending_ops,
945 /* Remove all pending messages */
946 while (NULL != peer_ctx->pending_messages_head)
948 LOG (GNUNET_ERROR_TYPE_DEBUG,
949 "Removing unsent %s\n",
950 peer_ctx->pending_messages_head->type);
951 /* Cancle pending message, too */
952 if ( (NULL != peer_ctx->liveliness_check_pending) &&
953 (0 == memcmp (peer_ctx->pending_messages_head,
954 peer_ctx->liveliness_check_pending,
955 sizeof (struct PendingMessage))) )
957 // TODO this may leak memory
958 peer_ctx->liveliness_check_pending = NULL;
960 remove_pending_message (peer_ctx->pending_messages_head,
964 /* If we are still waiting for notification whether this peer is live
965 * cancel the according task */
966 if (NULL != peer_ctx->liveliness_check_pending)
968 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969 "Removing pending liveliness check for peer %s\n",
970 GNUNET_i2s (&peer_ctx->peer_id));
971 // TODO wait until cadet sets mq->cancel_impl
972 //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
973 remove_pending_message (peer_ctx->liveliness_check_pending,
975 peer_ctx->liveliness_check_pending = NULL;
978 if (NULL != peer_ctx->send_channel_ctx)
980 /* This is possibly called from within channel destruction */
981 schedule_channel_destruction (peer_ctx->send_channel_ctx);
983 if (NULL != peer_ctx->recv_channel_ctx)
985 /* This is possibly called from within channel destruction */
986 schedule_channel_destruction (peer_ctx->recv_channel_ctx);
990 GNUNET_CONTAINER_multipeermap_remove_all (peer_map,
993 LOG (GNUNET_ERROR_TYPE_WARNING,
994 "removing peer from peer_map failed\n");
996 GNUNET_free (peer_ctx);
1002 * Iterator over hash map entries. Deletes all contexts of peers.
1004 * @param cls closure
1005 * @param key current public key
1006 * @param value value in the hash map
1007 * @return #GNUNET_YES if we should continue to iterate,
1008 * #GNUNET_NO if not.
1011 peermap_clear_iterator (void *cls,
1012 const struct GNUNET_PeerIdentity *key,
1015 destroy_peer (get_peer_ctx (key));
1021 * @brief This is called once a message is sent.
1023 * Removes the pending message
1025 * @param cls type of the message that was sent
1028 mq_notify_sent_cb (void *cls)
1030 struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1031 LOG (GNUNET_ERROR_TYPE_DEBUG,
1034 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1035 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1036 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1037 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1038 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1039 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1040 /* Do not cancle message */
1041 remove_pending_message (pending_msg, GNUNET_NO);
1046 * @brief Iterator function for #store_valid_peers.
1048 * Implements #GNUNET_CONTAINER_PeerMapIterator.
1049 * Writes single peer to disk.
1051 * @param cls the file handle to write to.
1052 * @param peer current peer
1053 * @param value unused
1055 * @return #GNUNET_YES if we should continue to
1057 * #GNUNET_NO if not.
1060 store_peer_presistently_iterator (void *cls,
1061 const struct GNUNET_PeerIdentity *peer,
1064 const struct GNUNET_DISK_FileHandle *fh = cls;
1065 char peer_string[128];
1073 size = GNUNET_snprintf (peer_string,
1074 sizeof (peer_string),
1076 GNUNET_i2s_full (peer));
1077 GNUNET_assert (53 == size);
1078 ret = GNUNET_DISK_file_write (fh,
1081 GNUNET_assert (size == ret);
1087 * @brief Store the peers currently in #valid_peers to disk.
1090 store_valid_peers ()
1092 struct GNUNET_DISK_FileHandle *fh;
1093 uint32_t number_written_peers;
1096 if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1101 ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
1102 if (GNUNET_SYSERR == ret)
1104 LOG (GNUNET_ERROR_TYPE_WARNING,
1105 "Not able to create directory for file `%s'\n",
1106 filename_valid_peers);
1109 else if (GNUNET_NO == ret)
1111 LOG (GNUNET_ERROR_TYPE_WARNING,
1112 "Directory for file `%s' exists but is not writable for us\n",
1113 filename_valid_peers);
1116 fh = GNUNET_DISK_file_open (filename_valid_peers,
1117 GNUNET_DISK_OPEN_WRITE |
1118 GNUNET_DISK_OPEN_CREATE,
1119 GNUNET_DISK_PERM_USER_READ |
1120 GNUNET_DISK_PERM_USER_WRITE);
1123 LOG (GNUNET_ERROR_TYPE_WARNING,
1124 "Not able to write valid peers to file `%s'\n",
1125 filename_valid_peers);
1128 LOG (GNUNET_ERROR_TYPE_DEBUG,
1129 "Writing %u valid peers to disk\n",
1130 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1131 number_written_peers =
1132 GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1133 store_peer_presistently_iterator,
1135 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1136 GNUNET_assert (number_written_peers ==
1137 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1142 * @brief Convert string representation of peer id to peer id.
1144 * Counterpart to #GNUNET_i2s_full.
1146 * @param string_repr The string representation of the peer id
1148 * @return The peer id
1150 static const struct GNUNET_PeerIdentity *
1151 s2i_full (const char *string_repr)
1153 struct GNUNET_PeerIdentity *peer;
1157 peer = GNUNET_new (struct GNUNET_PeerIdentity);
1158 len = strlen (string_repr);
1161 LOG (GNUNET_ERROR_TYPE_WARNING,
1162 "Not able to convert string representation of PeerID to PeerID\n"
1163 "Sting representation: %s (len %lu) - too short\n",
1172 ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1175 if (GNUNET_OK != ret)
1177 LOG (GNUNET_ERROR_TYPE_WARNING,
1178 "Not able to convert string representation of PeerID to PeerID\n"
1179 "Sting representation: %s\n",
1188 * @brief Restore the peers on disk to #valid_peers.
1191 restore_valid_peers ()
1195 struct GNUNET_DISK_FileHandle *fh;
1200 const struct GNUNET_PeerIdentity *peer;
1202 if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1207 if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1211 fh = GNUNET_DISK_file_open (filename_valid_peers,
1212 GNUNET_DISK_OPEN_READ,
1213 GNUNET_DISK_PERM_NONE);
1214 GNUNET_assert (NULL != fh);
1215 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1216 num_peers = file_size / 53;
1217 buf = GNUNET_malloc (file_size);
1218 size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1219 GNUNET_assert (size_read == file_size);
1220 LOG (GNUNET_ERROR_TYPE_DEBUG,
1221 "Restoring %" PRIu32 " peers from file `%s'\n",
1223 filename_valid_peers);
1224 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1226 str_repr = GNUNET_strndup (iter_buf, 53);
1227 peer = s2i_full (str_repr);
1228 GNUNET_free (str_repr);
1229 add_valid_peer (peer);
1230 LOG (GNUNET_ERROR_TYPE_DEBUG,
1231 "Restored valid peer %s from disk\n",
1232 GNUNET_i2s_full (peer));
1236 LOG (GNUNET_ERROR_TYPE_DEBUG,
1237 "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1239 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1240 if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1242 LOG (GNUNET_ERROR_TYPE_WARNING,
1243 "Number of restored peers does not match file size. Have probably duplicates.\n");
1245 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1246 LOG (GNUNET_ERROR_TYPE_DEBUG,
1247 "Restored %u valid peers from disk\n",
1248 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1253 * @brief Initialise storage of peers
1255 * @param fn_valid_peers filename of the file used to store valid peer ids
1256 * @param cadet_h cadet handle
1257 * @param own_id own peer identity
1260 initialise_peers (char* fn_valid_peers,
1261 struct GNUNET_CADET_Handle *cadet_h)
1263 filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1264 cadet_handle = cadet_h;
1265 peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1266 valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1267 restore_valid_peers ();
1272 * @brief Delete storage of peers that was created with #initialise_peers ()
1277 if (GNUNET_SYSERR ==
1278 GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1279 &peermap_clear_iterator,
1282 LOG (GNUNET_ERROR_TYPE_WARNING,
1283 "Iteration destroying peers was aborted.\n");
1285 GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1287 store_valid_peers ();
1288 GNUNET_free (filename_valid_peers);
1289 filename_valid_peers = NULL;
1290 GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1296 * Iterator over #valid_peers hash map entries.
1298 * @param cls closure - unused
1299 * @param peer current peer id
1300 * @param value value in the hash map - unused
1301 * @return #GNUNET_YES if we should continue to
1303 * #GNUNET_NO if not.
1306 valid_peer_iterator (void *cls,
1307 const struct GNUNET_PeerIdentity *peer,
1310 struct PeersIteratorCls *it_cls = cls;
1312 return it_cls->iterator (it_cls->cls,
1318 * @brief Get all currently known, valid peer ids.
1320 * @param it function to call on each peer id
1321 * @param it_cls extra argument to @a it
1322 * @return the number of key value pairs processed,
1323 * #GNUNET_SYSERR if it aborted iteration
1326 get_valid_peers (PeersIterator iterator,
1329 struct PeersIteratorCls *cls;
1332 cls = GNUNET_new (struct PeersIteratorCls);
1333 cls->iterator = iterator;
1335 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1336 valid_peer_iterator,
1344 * @brief Add peer to known peers.
1346 * This function is called on new peer_ids from 'external' sources
1347 * (client seed, cadet get_peers(), ...)
1349 * @param peer the new #GNUNET_PeerIdentity
1351 * @return #GNUNET_YES if peer was inserted
1352 * #GNUNET_NO otherwise
1355 insert_peer (const struct GNUNET_PeerIdentity *peer)
1357 if (GNUNET_YES == check_peer_known (peer))
1359 return GNUNET_NO; /* We already know this peer - nothing to do */
1361 (void) create_peer_ctx (peer);
1367 * @brief Check whether flags on a peer are set.
1369 * @param peer the peer to check the flag of
1370 * @param flags the flags to check
1372 * @return #GNUNET_SYSERR if peer is not known
1373 * #GNUNET_YES if all given flags are set
1374 * #GNUNET_NO otherwise
1377 check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1378 enum Peers_PeerFlags flags)
1380 struct PeerContext *peer_ctx;
1382 if (GNUNET_NO == check_peer_known (peer))
1384 return GNUNET_SYSERR;
1386 peer_ctx = get_peer_ctx (peer);
1387 return check_peer_flag_set (peer_ctx, flags);
1391 * @brief Try connecting to a peer to see whether it is online
1393 * If not known yet, insert into known peers
1395 * @param peer the peer whose liveliness is to be checked
1396 * @return #GNUNET_YES if peer had to be inserted
1397 * #GNUNET_NO otherwise
1400 issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1402 struct PeerContext *peer_ctx;
1405 ret = insert_peer (peer);
1406 peer_ctx = get_peer_ctx (peer);
1407 if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) &&
1408 (NULL == peer_ctx->liveliness_check_pending) )
1410 check_peer_live (peer_ctx);
1417 * @brief Check if peer is removable.
1420 * - a recv channel exists
1421 * - there are pending messages
1422 * - there is no pending pull reply
1424 * @param peer the peer in question
1425 * @return #GNUNET_YES if peer is removable
1426 * #GNUNET_NO if peer is NOT removable
1427 * #GNUNET_SYSERR if peer is not known
1430 check_removable (const struct GNUNET_PeerIdentity *peer)
1432 struct PeerContext *peer_ctx;
1434 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1436 return GNUNET_SYSERR;
1439 peer_ctx = get_peer_ctx (peer);
1440 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1441 (NULL != peer_ctx->pending_messages_head) ||
1442 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1451 * @brief Check whether @a peer is actually a peer.
1453 * A valid peer is a peer that we know exists eg. we were connected to once.
1455 * @param peer peer in question
1457 * @return #GNUNET_YES if peer is valid
1458 * #GNUNET_NO if peer is not valid
1461 check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1463 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1468 * @brief Indicate that we want to send to the other peer
1470 * This establishes a sending channel
1472 * @param peer the peer to establish channel to
1475 indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1477 GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1478 (void) get_channel (peer);
1483 * @brief Check whether other peer has the intention to send/opened channel
1486 * @param peer the peer in question
1488 * @return #GNUNET_YES if peer has the intention to send
1489 * #GNUNET_NO otherwise
1492 check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1494 const struct PeerContext *peer_ctx;
1496 peer_ctx = get_peer_ctx (peer);
1497 if (NULL != peer_ctx->recv_channel_ctx)
1506 * Handle the channel a peer opens to us.
1508 * @param cls The closure
1509 * @param channel The channel the peer wants to establish
1510 * @param initiator The peer's peer ID
1512 * @return initial channel context for the channel
1513 * (can be NULL -- that's not an error)
1516 handle_inbound_channel (void *cls,
1517 struct GNUNET_CADET_Channel *channel,
1518 const struct GNUNET_PeerIdentity *initiator)
1520 struct PeerContext *peer_ctx;
1521 struct GNUNET_PeerIdentity *ctx_peer;
1522 struct ChannelCtx *channel_ctx;
1524 LOG (GNUNET_ERROR_TYPE_DEBUG,
1525 "New channel was established to us (Peer %s).\n",
1526 GNUNET_i2s (initiator));
1527 GNUNET_assert (NULL != channel); /* according to cadet API */
1528 /* Make sure we 'know' about this peer */
1529 peer_ctx = create_or_get_peer_ctx (initiator);
1530 set_peer_live (peer_ctx);
1531 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1532 *ctx_peer = *initiator;
1533 channel_ctx = add_channel_ctx (peer_ctx);
1534 channel_ctx->channel = channel;
1535 /* We only accept one incoming channel per peer */
1536 if (GNUNET_YES == check_peer_send_intention (initiator))
1538 LOG (GNUNET_ERROR_TYPE_WARNING,
1539 "Already got one receive channel. Destroying old one.\n");
1540 GNUNET_break_op (0);
1541 destroy_channel (peer_ctx->recv_channel_ctx);
1542 peer_ctx->recv_channel_ctx = channel_ctx;
1543 /* return the channel context */
1546 peer_ctx->recv_channel_ctx = channel_ctx;
1552 * @brief Check whether a sending channel towards the given peer exists
1554 * @param peer the peer to check for
1556 * @return #GNUNET_YES if a sending channel towards that peer exists
1557 * #GNUNET_NO otherwise
1560 check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1562 struct PeerContext *peer_ctx;
1564 if (GNUNET_NO == check_peer_known (peer))
1565 { /* If no such peer exists, there is no channel */
1568 peer_ctx = get_peer_ctx (peer);
1569 if (NULL == peer_ctx->send_channel_ctx)
1578 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1579 * intention to another peer
1581 * If there is also no channel to receive messages from that peer, remove it
1585 * @peer the peer identity of the peer whose sending channel to destroy
1586 * @return #GNUNET_YES if channel was destroyed
1587 * #GNUNET_NO otherwise
1590 destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1592 struct PeerContext *peer_ctx;
1594 if (GNUNET_NO == check_peer_known (peer))
1598 peer_ctx = get_peer_ctx (peer);
1599 if (NULL != peer_ctx->send_channel_ctx)
1601 destroy_channel (peer_ctx->send_channel_ctx);
1602 (void) check_connected (peer);
1609 * @brief Send a message to another peer.
1611 * Keeps track about pending messages so they can be properly removed when the
1612 * peer is destroyed.
1614 * @param peer receeiver of the message
1615 * @param ev envelope of the message
1616 * @param type type of the message
1619 send_message (const struct GNUNET_PeerIdentity *peer,
1620 struct GNUNET_MQ_Envelope *ev,
1623 struct PendingMessage *pending_msg;
1624 struct GNUNET_MQ_Handle *mq;
1626 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1627 "Sending message to %s of type %s\n",
1630 pending_msg = insert_pending_message (peer, ev, type);
1632 GNUNET_MQ_notify_sent (ev,
1635 GNUNET_MQ_send (mq, ev);
1639 * @brief Schedule a operation on given peer
1641 * Avoids scheduling an operation twice.
1643 * @param peer the peer we want to schedule the operation for once it gets live
1645 * @return #GNUNET_YES if the operation was scheduled
1646 * #GNUNET_NO otherwise
1649 schedule_operation (const struct GNUNET_PeerIdentity *peer,
1650 const PeerOp peer_op)
1652 struct PeerPendingOp pending_op;
1653 struct PeerContext *peer_ctx;
1655 GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1657 //TODO if LIVE/ONLINE execute immediately
1659 if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1661 peer_ctx = get_peer_ctx (peer);
1662 pending_op.op = peer_op;
1663 pending_op.op_cls = NULL;
1664 GNUNET_array_append (peer_ctx->pending_ops,
1665 peer_ctx->num_pending_ops,
1672 /***********************************************************************
1673 * /Old gnunet-service-rps_peers.c
1674 ***********************************************************************/
1677 /***********************************************************************
1678 * Housekeeping with clients
1679 ***********************************************************************/
1682 * Closure used to pass the client and the id to the callback
1683 * that replies to a client's request
1690 struct ReplyCls *next;
1691 struct ReplyCls *prev;
1694 * The identifier of the request
1699 * The handle to the request
1701 struct RPS_SamplerRequestHandle *req_handle;
1704 * The client handle to send the reply to
1706 struct ClientContext *cli_ctx;
1711 * Struct used to store the context of a connected client.
1713 struct ClientContext
1718 struct ClientContext *next;
1719 struct ClientContext *prev;
1722 * The message queue to communicate with the client.
1724 struct GNUNET_MQ_Handle *mq;
1727 * DLL with handles to single requests from the client
1729 struct ReplyCls *rep_cls_head;
1730 struct ReplyCls *rep_cls_tail;
1733 * @brief How many updates this client expects to receive.
1735 int64_t view_updates_left;
1738 * The client handle to send the reply to
1740 struct GNUNET_SERVICE_Client *client;
1744 * DLL with all clients currently connected to us
1746 struct ClientContext *cli_ctx_head;
1747 struct ClientContext *cli_ctx_tail;
1749 /***********************************************************************
1750 * /Housekeeping with clients
1751 ***********************************************************************/
1757 /***********************************************************************
1759 ***********************************************************************/
1762 * Sampler used for the Brahms protocol itself.
1764 static struct RPS_Sampler *prot_sampler;
1767 * Sampler used for the clients.
1769 static struct RPS_Sampler *client_sampler;
1772 * Name to log view to
1774 static const char *file_name_view_log;
1778 * Name to log number of observed peers to
1780 static const char *file_name_observed_log;
1783 * @brief Count the observed peers
1785 static uint32_t num_observed_peers;
1788 * @brief Multipeermap (ab-) used to count unique peer_ids
1790 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1791 #endif /* TO_FILE */
1794 * The size of sampler we need to be able to satisfy the client's need
1797 static unsigned int sampler_size_client_need;
1800 * The size of sampler we need to be able to satisfy the Brahms protocol's
1801 * need of random peers.
1803 * This is one minimum size the sampler grows to.
1805 static unsigned int sampler_size_est_need;
1808 * @brief This is the minimum estimate used as sampler size.
1810 * It is configured by the user.
1812 static unsigned int sampler_size_est_min;
1815 * @brief This is the estimate used as view size.
1817 * It is initialised with the minimum
1819 static unsigned int view_size_est_need;
1822 * @brief This is the minimum estimate used as view size.
1824 * It is configured by the user.
1826 static unsigned int view_size_est_min;
1829 * Percentage of total peer number in the view
1830 * to send random PUSHes to
1835 * Percentage of total peer number in the view
1836 * to send random PULLs to
1841 * Identifier for the main task that runs periodically.
1843 static struct GNUNET_SCHEDULER_Task *do_round_task;
1846 * Time inverval the do_round task runs in.
1848 static struct GNUNET_TIME_Relative round_interval;
1851 * List to store peers received through pushes temporary.
1853 static struct CustomPeerMap *push_map;
1856 * List to store peers received through pulls temporary.
1858 static struct CustomPeerMap *pull_map;
1863 static struct GNUNET_NSE_Handle *nse;
1868 static struct GNUNET_CADET_Handle *cadet_handle;
1871 * @brief Port to communicate to other peers.
1873 static struct GNUNET_CADET_Port *cadet_port;
1876 * Handler to PEERINFO.
1878 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1881 * Handle for cancellation of iteration over peers.
1883 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1888 * Counts how many requets clients already issued.
1889 * Only needed in the beginning to check how many of the 64 deltas
1892 static unsigned int req_counter;
1895 * Time of the last request we received.
1897 * Used to compute the expected request rate.
1899 static struct GNUNET_TIME_Absolute last_request;
1902 * Size of #request_deltas.
1904 #define REQUEST_DELTAS_SIZE 64
1905 static unsigned int request_deltas_size = REQUEST_DELTAS_SIZE;
1908 * Last 64 deltas between requests
1910 static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE];
1913 * The prediction of the rate of requests
1915 static struct GNUNET_TIME_Relative request_rate;
1918 #ifdef ENABLE_MALICIOUS
1920 * Type of malicious peer
1922 * 0 Don't act malicious at all - Default
1923 * 1 Try to maximise representation
1924 * 2 Try to partition the network
1927 static uint32_t mal_type;
1930 * Other malicious peers
1932 static struct GNUNET_PeerIdentity *mal_peers;
1935 * Hashmap of malicious peers used as set.
1936 * Used to more efficiently check whether we know that peer.
1938 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
1941 * Number of other malicious peers
1943 static uint32_t num_mal_peers;
1947 * If type is 2 This struct is used to store the attacked peers in a DLL
1954 struct AttackedPeer *next;
1955 struct AttackedPeer *prev;
1960 struct GNUNET_PeerIdentity peer_id;
1964 * If type is 2 this is the DLL of attacked peers
1966 static struct AttackedPeer *att_peers_head;
1967 static struct AttackedPeer *att_peers_tail;
1970 * This index is used to point to an attacked peer to
1971 * implement the round-robin-ish way to select attacked peers.
1973 static struct AttackedPeer *att_peer_index;
1976 * Hashmap of attacked peers used as set.
1977 * Used to more efficiently check whether we know that peer.
1979 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
1982 * Number of attacked peers
1984 static uint32_t num_attacked_peers;
1987 * If type is 1 this is the attacked peer
1989 static struct GNUNET_PeerIdentity attacked_peer;
1992 * The limit of PUSHes we can send in one round.
1993 * This is an assumption of the Brahms protocol and either implemented
1996 * assumend to be the bandwidth limitation.
1998 static uint32_t push_limit = 10000;
1999 #endif /* ENABLE_MALICIOUS */
2002 /***********************************************************************
2004 ***********************************************************************/
2007 /***********************************************************************
2009 ***********************************************************************/
2013 * Print peerlist to log.
2016 print_peer_list (struct GNUNET_PeerIdentity *list,
2021 LOG (GNUNET_ERROR_TYPE_DEBUG,
2022 "Printing peer list of length %u at %p:\n",
2025 for (i = 0 ; i < len ; i++)
2027 LOG (GNUNET_ERROR_TYPE_DEBUG,
2029 i, GNUNET_i2s (&list[i]));
2035 * Remove peer from list.
2038 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2039 unsigned int *list_size,
2040 const struct GNUNET_PeerIdentity *peer)
2043 struct GNUNET_PeerIdentity *tmp;
2047 LOG (GNUNET_ERROR_TYPE_DEBUG,
2048 "Removing peer %s from list at %p\n",
2052 for ( i = 0 ; i < *list_size ; i++ )
2054 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2056 if (i < *list_size -1)
2057 { /* Not at the last entry -- shift peers left */
2058 memmove (&tmp[i], &tmp[i +1],
2059 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2061 /* Remove last entry (should be now useless PeerID) */
2062 GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2070 * Sum all time relatives of an array.
2072 static struct GNUNET_TIME_Relative
2073 T_relative_sum (const struct GNUNET_TIME_Relative *rel_array,
2076 struct GNUNET_TIME_Relative sum;
2079 sum = GNUNET_TIME_UNIT_ZERO;
2080 for ( i = 0 ; i < arr_size ; i++ )
2082 sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
2089 * Compute the average of given time relatives.
2091 static struct GNUNET_TIME_Relative
2092 T_relative_avg (const struct GNUNET_TIME_Relative *rel_array,
2095 return GNUNET_TIME_relative_divide (T_relative_sum (rel_array,
2102 * Insert PeerID in #view
2104 * Called once we know a peer is live.
2105 * Implements #PeerOp
2107 * @return GNUNET_OK if peer was actually inserted
2108 * GNUNET_NO if peer was not inserted
2111 insert_in_view_op (void *cls,
2112 const struct GNUNET_PeerIdentity *peer);
2115 * Insert PeerID in #view
2117 * Called once we know a peer is live.
2119 * @return GNUNET_OK if peer was actually inserted
2120 * GNUNET_NO if peer was not inserted
2123 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2127 online = check_peer_flag (peer, Peers_ONLINE);
2128 if ( (GNUNET_NO == online) ||
2129 (GNUNET_SYSERR == online) ) /* peer is not even known */
2131 (void) issue_peer_liveliness_check (peer);
2132 (void) schedule_operation (peer, insert_in_view_op);
2135 /* Open channel towards peer to keep connection open */
2136 indicate_sending_intention (peer);
2137 return View_put (peer);
2141 * @brief sends updates to clients that are interested
2144 clients_notify_view_update (void);
2147 * Put random peer from sampler into the view as history update.
2150 hist_update (void *cls,
2151 struct GNUNET_PeerIdentity *ids,
2156 for (i = 0; i < num_peers; i++)
2158 (void) insert_in_view (&ids[i]);
2159 to_file (file_name_view_log,
2161 GNUNET_i2s_full (ids));
2163 clients_notify_view_update();
2168 * Wrapper around #RPS_sampler_resize()
2170 * If we do not have enough sampler elements, double current sampler size
2171 * If we have more than enough sampler elements, halv current sampler size
2174 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2176 unsigned int sampler_size;
2179 // TODO respect the min, max
2180 sampler_size = RPS_sampler_get_size (sampler);
2181 if (sampler_size > new_size * 4)
2183 RPS_sampler_resize (sampler, sampler_size / 2);
2185 else if (sampler_size < new_size)
2187 RPS_sampler_resize (sampler, sampler_size * 2);
2189 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2194 * Wrapper around #RPS_sampler_resize() resizing the client sampler
2197 client_resize_wrapper ()
2199 uint32_t bigger_size;
2203 bigger_size = GNUNET_MAX (sampler_size_est_need, sampler_size_client_need);
2205 // TODO respect the min, max
2206 resize_wrapper (client_sampler, bigger_size);
2207 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size_client is now %" PRIu32 "\n",
2213 * Estimate request rate
2215 * Called every time we receive a request from the client.
2220 struct GNUNET_TIME_Relative max_round_duration;
2222 if (request_deltas_size > req_counter)
2224 if ( 1 < req_counter)
2226 /* Shift last request deltas to the right */
2227 memmove (&request_deltas[1],
2229 (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
2231 /* Add current delta to beginning */
2233 GNUNET_TIME_absolute_get_difference (last_request,
2234 GNUNET_TIME_absolute_get ());
2235 request_rate = T_relative_avg (request_deltas, req_counter);
2236 request_rate = (request_rate.rel_value_us < 1) ?
2237 GNUNET_TIME_relative_get_unit_ () : request_rate;
2239 /* Compute the duration a round will maximally take */
2240 max_round_duration =
2241 GNUNET_TIME_relative_add (round_interval,
2242 GNUNET_TIME_relative_divide (round_interval, 2));
2244 /* Set the estimated size the sampler has to have to
2245 * satisfy the current client request rate */
2246 sampler_size_client_need =
2247 max_round_duration.rel_value_us / request_rate.rel_value_us;
2249 /* Resize the sampler */
2250 client_resize_wrapper ();
2252 last_request = GNUNET_TIME_absolute_get ();
2257 * Add all peers in @a peer_array to @a peer_map used as set.
2259 * @param peer_array array containing the peers
2260 * @param num_peers number of peers in @peer_array
2261 * @param peer_map the peermap to use as set
2264 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2265 unsigned int num_peers,
2266 struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2269 if (NULL == peer_map)
2271 LOG (GNUNET_ERROR_TYPE_WARNING,
2272 "Trying to add peers to non-existing peermap.\n");
2276 for (i = 0; i < num_peers; i++)
2278 GNUNET_CONTAINER_multipeermap_put (peer_map,
2281 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2287 * Send a PULL REPLY to @a peer_id
2289 * @param peer_id the peer to send the reply to.
2290 * @param peer_ids the peers to send to @a peer_id
2291 * @param num_peer_ids the number of peers to send to @a peer_id
2294 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2295 const struct GNUNET_PeerIdentity *peer_ids,
2296 unsigned int num_peer_ids)
2299 struct GNUNET_MQ_Envelope *ev;
2300 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2302 /* Compute actual size */
2303 send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2304 num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2306 if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2307 /* Compute number of peers to send
2308 * If too long, simply truncate */
2309 // TODO select random ones via permutation
2310 // or even better: do good protocol design
2312 (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2313 sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2314 sizeof (struct GNUNET_PeerIdentity);
2316 send_size = num_peer_ids;
2318 LOG (GNUNET_ERROR_TYPE_DEBUG,
2319 "Going to send PULL REPLY with %u peers to %s\n",
2320 send_size, GNUNET_i2s (peer_id));
2322 ev = GNUNET_MQ_msg_extra (out_msg,
2323 send_size * sizeof (struct GNUNET_PeerIdentity),
2324 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2325 out_msg->num_peers = htonl (send_size);
2326 GNUNET_memcpy (&out_msg[1], peer_ids,
2327 send_size * sizeof (struct GNUNET_PeerIdentity));
2329 send_message (peer_id, ev, "PULL REPLY");
2330 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2331 // TODO check with send intention: as send_channel is used/opened we indicate
2332 // a sending intention without intending it.
2333 // -> clean peer afterwards?
2338 * Insert PeerID in #pull_map
2340 * Called once we know a peer is live.
2343 insert_in_pull_map (void *cls,
2344 const struct GNUNET_PeerIdentity *peer)
2346 CustomPeerMap_put (pull_map, peer);
2351 * Insert PeerID in #view
2353 * Called once we know a peer is live.
2354 * Implements #PeerOp
2357 insert_in_view_op (void *cls,
2358 const struct GNUNET_PeerIdentity *peer)
2360 (void) insert_in_view (peer);
2365 * Update sampler with given PeerID.
2366 * Implements #PeerOp
2369 insert_in_sampler (void *cls,
2370 const struct GNUNET_PeerIdentity *peer)
2372 LOG (GNUNET_ERROR_TYPE_DEBUG,
2373 "Updating samplers with peer %s from insert_in_sampler()\n",
2375 RPS_sampler_update (prot_sampler, peer);
2376 RPS_sampler_update (client_sampler, peer);
2377 if (0 < RPS_sampler_count_id (prot_sampler, peer))
2379 /* Make sure we 'know' about this peer */
2380 (void) issue_peer_liveliness_check (peer);
2381 /* Establish a channel towards that peer to indicate we are going to send
2383 //indicate_sending_intention (peer);
2386 num_observed_peers++;
2387 GNUNET_CONTAINER_multipeermap_put
2388 (observed_unique_peers,
2391 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2392 uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2393 observed_unique_peers);
2394 to_file (file_name_observed_log,
2395 "%" PRIu32 " %" PRIu32 " %f\n",
2397 num_observed_unique_peers,
2398 1.0*num_observed_unique_peers/num_observed_peers)
2399 #endif /* TO_FILE */
2403 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2404 * If the peer is not known, liveliness check is issued and it is
2405 * scheduled to be inserted in sampler and view.
2407 * "External sources" refer to every source except the gossip.
2409 * @param peer peer to insert
2412 got_peer (const struct GNUNET_PeerIdentity *peer)
2414 /* If we did not know this peer already, insert it into sampler and view */
2415 if (GNUNET_YES == issue_peer_liveliness_check (peer))
2417 schedule_operation (peer, insert_in_sampler);
2418 schedule_operation (peer, insert_in_view_op);
2423 * @brief Checks if there is a sending channel and if it is needed
2425 * @param peer the peer whose sending channel is checked
2426 * @return GNUNET_YES if sending channel exists and is still needed
2427 * GNUNET_NO otherwise
2430 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2432 /* struct GNUNET_CADET_Channel *channel; */
2433 if (GNUNET_NO == check_peer_known (peer))
2437 if (GNUNET_YES == check_sending_channel_exists (peer))
2439 if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2440 (GNUNET_YES == View_contains_peer (peer)) ||
2441 (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2442 (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2443 (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2444 { /* If we want to keep the connection to peer open */
2453 * @brief remove peer from our knowledge, the view, push and pull maps and
2456 * @param peer the peer to remove
2459 remove_peer (const struct GNUNET_PeerIdentity *peer)
2461 (void) View_remove_peer (peer);
2462 CustomPeerMap_remove_peer (pull_map, peer);
2463 CustomPeerMap_remove_peer (push_map, peer);
2464 RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2465 RPS_sampler_reinitialise_by_value (client_sampler, peer);
2466 destroy_peer (get_peer_ctx (peer));
2471 * @brief Remove data that is not needed anymore.
2473 * If the sending channel is no longer needed it is destroyed.
2475 * @param peer the peer whose data is about to be cleaned
2478 clean_peer (const struct GNUNET_PeerIdentity *peer)
2480 if (GNUNET_NO == check_sending_channel_needed (peer))
2482 LOG (GNUNET_ERROR_TYPE_DEBUG,
2483 "Going to remove send channel to peer %s\n",
2485 #ifdef ENABLE_MALICIOUS
2486 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2487 (void) destroy_sending_channel (peer);
2488 #else /* ENABLE_MALICIOUS */
2489 (void) destroy_sending_channel (peer);
2490 #endif /* ENABLE_MALICIOUS */
2493 if ( (GNUNET_NO == check_peer_send_intention (peer)) &&
2494 (GNUNET_NO == View_contains_peer (peer)) &&
2495 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2496 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2497 (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
2498 (0 == RPS_sampler_count_id (client_sampler, peer)) &&
2499 (GNUNET_NO != check_removable (peer)) )
2500 { /* We can safely remove this peer */
2501 LOG (GNUNET_ERROR_TYPE_DEBUG,
2502 "Going to remove peer %s\n",
2511 * @brief This is called when a channel is destroyed.
2513 * Removes peer completely from our knowledge if the send_channel was destroyed
2514 * Otherwise simply delete the recv_channel
2515 * Also check if the knowledge about this peer is still needed.
2516 * If not, remove this peer from our knowledge.
2518 * @param cls The closure
2519 * @param channel The channel being closed
2520 * @param channel_ctx The context associated with this channel
2523 cleanup_destroyed_channel (void *cls,
2524 const struct GNUNET_CADET_Channel *channel)
2526 struct ChannelCtx *channel_ctx = cls;
2527 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2529 // What should be done here:
2530 // * cleanup everything related to the channel
2532 // * remove peer if necessary
2534 if (peer_ctx->recv_channel_ctx == channel_ctx)
2536 remove_channel_ctx (channel_ctx);
2538 else if (peer_ctx->send_channel_ctx == channel_ctx)
2540 remove_channel_ctx (channel_ctx);
2541 remove_peer (&peer_ctx->peer_id);
2545 /***********************************************************************
2547 ***********************************************************************/
2550 destroy_reply_cls (struct ReplyCls *rep_cls)
2552 struct ClientContext *cli_ctx;
2554 cli_ctx = rep_cls->cli_ctx;
2555 GNUNET_assert (NULL != cli_ctx);
2556 if (NULL != rep_cls->req_handle)
2558 RPS_sampler_request_cancel (rep_cls->req_handle);
2560 GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head,
2561 cli_ctx->rep_cls_tail,
2563 GNUNET_free (rep_cls);
2568 destroy_cli_ctx (struct ClientContext *cli_ctx)
2570 GNUNET_assert (NULL != cli_ctx);
2571 if (NULL != cli_ctx->rep_cls_head)
2573 LOG (GNUNET_ERROR_TYPE_WARNING,
2574 "Trying to destroy the context of a client that still has pending requests. Going to clean those\n");
2575 while (NULL != cli_ctx->rep_cls_head)
2576 destroy_reply_cls (cli_ctx->rep_cls_head);
2578 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2581 GNUNET_free (cli_ctx);
2586 * Function called by NSE.
2588 * Updates sizes of sampler list and view and adapt those lists
2592 nse_callback (void *cls,
2593 struct GNUNET_TIME_Absolute timestamp,
2594 double logestimate, double std_dev)
2597 //double scale; // TODO this might go gloabal/config
2599 LOG (GNUNET_ERROR_TYPE_DEBUG,
2600 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2601 logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2603 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2604 // GNUNET_NSE_log_estimate_to_n (logestimate);
2605 estimate = pow (estimate, 1.0 / 3);
2606 // TODO add if std_dev is a number
2607 // estimate += (std_dev * scale);
2608 if (view_size_est_min < ceil (estimate))
2610 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2611 sampler_size_est_need = estimate;
2612 view_size_est_need = estimate;
2615 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2616 //sampler_size_est_need = view_size_est_min;
2617 view_size_est_need = view_size_est_min;
2620 /* If the NSE has changed adapt the lists accordingly */
2621 resize_wrapper (prot_sampler, sampler_size_est_need);
2622 client_resize_wrapper ();
2627 * Callback called once the requested PeerIDs are ready.
2629 * Sends those to the requesting client.
2632 client_respond (void *cls,
2633 struct GNUNET_PeerIdentity *peer_ids,
2636 struct ReplyCls *reply_cls = cls;
2638 struct GNUNET_MQ_Envelope *ev;
2639 struct GNUNET_RPS_CS_ReplyMessage *out_msg;
2640 uint32_t size_needed;
2641 struct ClientContext *cli_ctx;
2643 GNUNET_assert (NULL != reply_cls);
2644 LOG (GNUNET_ERROR_TYPE_DEBUG,
2645 "sampler returned %" PRIu32 " peers:\n",
2647 for (i = 0; i < num_peers; i++)
2649 LOG (GNUNET_ERROR_TYPE_DEBUG,
2650 " %" PRIu32 ": %s\n",
2652 GNUNET_i2s (&peer_ids[i]));
2655 size_needed = sizeof (struct GNUNET_RPS_CS_ReplyMessage) +
2656 num_peers * sizeof (struct GNUNET_PeerIdentity);
2658 GNUNET_assert (GNUNET_MAX_MESSAGE_SIZE >= size_needed);
2660 ev = GNUNET_MQ_msg_extra (out_msg,
2661 num_peers * sizeof (struct GNUNET_PeerIdentity),
2662 GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
2663 out_msg->num_peers = htonl (num_peers);
2664 out_msg->id = htonl (reply_cls->id);
2666 GNUNET_memcpy (&out_msg[1],
2668 num_peers * sizeof (struct GNUNET_PeerIdentity));
2670 cli_ctx = reply_cls->cli_ctx;
2671 GNUNET_assert (NULL != cli_ctx);
2672 reply_cls->req_handle = NULL;
2673 destroy_reply_cls (reply_cls);
2674 GNUNET_MQ_send (cli_ctx->mq, ev);
2679 * Handle RPS request from the client.
2681 * @param cls closure
2682 * @param message the actual message
2685 handle_client_request (void *cls,
2686 const struct GNUNET_RPS_CS_RequestMessage *msg)
2688 struct ClientContext *cli_ctx = cls;
2690 uint32_t size_needed;
2691 struct ReplyCls *reply_cls;
2694 num_peers = ntohl (msg->num_peers);
2695 size_needed = sizeof (struct GNUNET_RPS_CS_RequestMessage) +
2696 num_peers * sizeof (struct GNUNET_PeerIdentity);
2698 if (GNUNET_MAX_MESSAGE_SIZE < size_needed)
2700 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2701 "Message received from client has size larger than expected\n");
2702 GNUNET_SERVICE_client_drop (cli_ctx->client);
2706 for (i = 0 ; i < num_peers ; i++)
2709 LOG (GNUNET_ERROR_TYPE_DEBUG,
2710 "Client requested %" PRIu32 " random peer(s).\n",
2713 reply_cls = GNUNET_new (struct ReplyCls);
2714 reply_cls->id = ntohl (msg->id);
2715 reply_cls->cli_ctx = cli_ctx;
2716 reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler,
2721 GNUNET_assert (NULL != cli_ctx);
2722 GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head,
2723 cli_ctx->rep_cls_tail,
2725 GNUNET_SERVICE_client_continue (cli_ctx->client);
2730 * @brief Handle a message that requests the cancellation of a request
2733 * @param message the message containing the id of the request
2736 handle_client_request_cancel (void *cls,
2737 const struct GNUNET_RPS_CS_RequestCancelMessage *msg)
2739 struct ClientContext *cli_ctx = cls;
2740 struct ReplyCls *rep_cls;
2742 GNUNET_assert (NULL != cli_ctx);
2743 GNUNET_assert (NULL != cli_ctx->rep_cls_head);
2744 rep_cls = cli_ctx->rep_cls_head;
2745 LOG (GNUNET_ERROR_TYPE_DEBUG,
2746 "Client cancels request with id %" PRIu32 "\n",
2748 while ( (NULL != rep_cls->next) &&
2749 (rep_cls->id != ntohl (msg->id)) )
2750 rep_cls = rep_cls->next;
2751 GNUNET_assert (rep_cls->id == ntohl (msg->id));
2752 destroy_reply_cls (rep_cls);
2753 GNUNET_SERVICE_client_continue (cli_ctx->client);
2758 * @brief This function is called, when the client seeds peers.
2759 * It verifies that @a msg is well-formed.
2761 * @param cls the closure (#ClientContext)
2762 * @param msg the message
2763 * @return #GNUNET_OK if @a msg is well-formed
2766 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2768 struct ClientContext *cli_ctx = cls;
2769 uint16_t msize = ntohs (msg->header.size);
2770 uint32_t num_peers = ntohl (msg->num_peers);
2772 msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2773 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2774 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2777 GNUNET_SERVICE_client_drop (cli_ctx->client);
2778 return GNUNET_SYSERR;
2785 * Handle seed from the client.
2787 * @param cls closure
2788 * @param message the actual message
2791 handle_client_seed (void *cls,
2792 const struct GNUNET_RPS_CS_SeedMessage *msg)
2794 struct ClientContext *cli_ctx = cls;
2795 struct GNUNET_PeerIdentity *peers;
2799 num_peers = ntohl (msg->num_peers);
2800 peers = (struct GNUNET_PeerIdentity *) &msg[1];
2802 LOG (GNUNET_ERROR_TYPE_DEBUG,
2803 "Client seeded peers:\n");
2804 print_peer_list (peers, num_peers);
2806 for (i = 0; i < num_peers; i++)
2808 LOG (GNUNET_ERROR_TYPE_DEBUG,
2809 "Updating samplers with seed %" PRIu32 ": %s\n",
2811 GNUNET_i2s (&peers[i]));
2813 got_peer (&peers[i]);
2815 GNUNET_SERVICE_client_continue (cli_ctx->client);
2819 * @brief Send view to client
2821 * @param cli_ctx the context of the client
2822 * @param view_array the peerids of the view as array (can be empty)
2823 * @param view_size the size of the view array (can be 0)
2826 send_view (const struct ClientContext *cli_ctx,
2827 const struct GNUNET_PeerIdentity *view_array,
2830 struct GNUNET_MQ_Envelope *ev;
2831 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2833 if (NULL == view_array)
2835 view_size = View_size ();
2836 view_array = View_get_as_array();
2839 ev = GNUNET_MQ_msg_extra (out_msg,
2840 view_size * sizeof (struct GNUNET_PeerIdentity),
2841 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2842 out_msg->num_peers = htonl (view_size);
2844 GNUNET_memcpy (&out_msg[1],
2846 view_size * sizeof (struct GNUNET_PeerIdentity));
2847 GNUNET_MQ_send (cli_ctx->mq, ev);
2851 * @brief sends updates to clients that are interested
2854 clients_notify_view_update (void)
2856 struct ClientContext *cli_ctx_iter;
2858 const struct GNUNET_PeerIdentity *view_array;
2860 num_peers = View_size ();
2861 view_array = View_get_as_array();
2862 /* check size of view is small enough */
2863 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2865 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2866 "View is too big to send\n");
2870 for (cli_ctx_iter = cli_ctx_head;
2871 NULL != cli_ctx_iter;
2872 cli_ctx_iter = cli_ctx_head->next)
2874 if (1 < cli_ctx_iter->view_updates_left)
2876 /* Client wants to receive limited amount of updates */
2877 cli_ctx_iter->view_updates_left -= 1;
2878 } else if (1 == cli_ctx_iter->view_updates_left)
2880 /* Last update of view for client */
2881 cli_ctx_iter->view_updates_left = -1;
2882 } else if (0 > cli_ctx_iter->view_updates_left) {
2883 /* Client is not interested in updates */
2886 /* else _updates_left == 0 - infinite amount of updates */
2889 send_view (cli_ctx_iter, view_array, num_peers);
2895 * Handle RPS request from the client.
2897 * @param cls closure
2898 * @param message the actual message
2901 handle_client_view_request (void *cls,
2902 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2904 struct ClientContext *cli_ctx = cls;
2905 uint64_t num_updates;
2907 num_updates = ntohl (msg->num_updates);
2909 LOG (GNUNET_ERROR_TYPE_DEBUG,
2910 "Client requested %" PRIu64 " updates of view.\n",
2913 GNUNET_assert (NULL != cli_ctx);
2914 cli_ctx->view_updates_left = num_updates;
2915 send_view (cli_ctx, NULL, 0);
2916 GNUNET_SERVICE_client_continue (cli_ctx->client);
2920 * Handle a CHECK_LIVE message from another peer.
2922 * This does nothing. But without calling #GNUNET_CADET_receive_done()
2923 * the channel is blocked for all other communication.
2925 * @param cls Closure
2926 * @param msg The message header
2929 handle_peer_check (void *cls,
2930 const struct GNUNET_MessageHeader *msg)
2932 const struct ChannelCtx *channel_ctx = cls;
2933 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2934 LOG (GNUNET_ERROR_TYPE_DEBUG,
2935 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2937 GNUNET_CADET_receive_done (channel_ctx->channel);
2941 * Handle a PUSH message from another peer.
2943 * Check the proof of work and store the PeerID
2944 * in the temporary list for pushed PeerIDs.
2946 * @param cls Closure
2947 * @param msg The message header
2950 handle_peer_push (void *cls,
2951 const struct GNUNET_MessageHeader *msg)
2953 const struct ChannelCtx *channel_ctx = cls;
2954 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2956 // (check the proof of work (?))
2958 LOG (GNUNET_ERROR_TYPE_DEBUG,
2959 "Received PUSH (%s)\n",
2961 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
2963 #ifdef ENABLE_MALICIOUS
2964 struct AttackedPeer *tmp_att_peer;
2966 if ( (1 == mal_type) ||
2968 { /* Try to maximise representation */
2969 tmp_att_peer = GNUNET_new (struct AttackedPeer);
2970 tmp_att_peer->peer_id = *peer;
2971 if (NULL == att_peer_set)
2972 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2974 GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2977 GNUNET_CONTAINER_DLL_insert (att_peers_head,
2980 add_peer_array_to_set (peer, 1, att_peer_set);
2984 GNUNET_free (tmp_att_peer);
2989 else if (2 == mal_type)
2991 /* We attack one single well-known peer - simply ignore */
2993 #endif /* ENABLE_MALICIOUS */
2995 /* Add the sending peer to the push_map */
2996 CustomPeerMap_put (push_map, peer);
2998 GNUNET_break_op (check_peer_known (peer));
2999 GNUNET_CADET_receive_done (channel_ctx->channel);
3004 * Handle PULL REQUEST request message from another peer.
3006 * Reply with the view of PeerIDs.
3008 * @param cls Closure
3009 * @param msg The message header
3012 handle_peer_pull_request (void *cls,
3013 const struct GNUNET_MessageHeader *msg)
3015 const struct ChannelCtx *channel_ctx = cls;
3016 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3017 const struct GNUNET_PeerIdentity *view_array;
3019 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3020 GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
3022 #ifdef ENABLE_MALICIOUS
3025 { /* Try to maximise representation */
3026 send_pull_reply (peer, mal_peers, num_mal_peers);
3029 else if (2 == mal_type)
3030 { /* Try to partition network */
3031 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3033 send_pull_reply (peer, mal_peers, num_mal_peers);
3036 #endif /* ENABLE_MALICIOUS */
3038 GNUNET_break_op (check_peer_known (peer));
3039 GNUNET_CADET_receive_done (channel_ctx->channel);
3040 view_array = View_get_as_array ();
3041 send_pull_reply (peer, view_array, View_size ());
3046 * Check whether we sent a corresponding request and
3047 * whether this reply is the first one.
3049 * @param cls Closure
3050 * @param msg The message header
3053 check_peer_pull_reply (void *cls,
3054 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3056 struct GNUNET_PeerIdentity *sender = cls;
3058 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3060 GNUNET_break_op (0);
3061 return GNUNET_SYSERR;
3064 if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3065 sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3067 LOG (GNUNET_ERROR_TYPE_ERROR,
3068 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3069 ntohl (msg->num_peers),
3070 (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3071 sizeof (struct GNUNET_PeerIdentity));
3072 GNUNET_break_op (0);
3073 return GNUNET_SYSERR;
3076 if (GNUNET_YES != check_peer_flag (sender, Peers_PULL_REPLY_PENDING))
3078 LOG (GNUNET_ERROR_TYPE_WARNING,
3079 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3080 GNUNET_i2s (sender));
3081 GNUNET_break_op (0);
3082 return GNUNET_SYSERR;
3088 * Handle PULL REPLY message from another peer.
3090 * @param cls Closure
3091 * @param msg The message header
3094 handle_peer_pull_reply (void *cls,
3095 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3097 const struct ChannelCtx *channel_ctx = cls;
3098 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3099 const struct GNUNET_PeerIdentity *peers;
3101 #ifdef ENABLE_MALICIOUS
3102 struct AttackedPeer *tmp_att_peer;
3103 #endif /* ENABLE_MALICIOUS */
3105 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3106 GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
3108 #ifdef ENABLE_MALICIOUS
3109 // We shouldn't even receive pull replies as we're not sending
3113 #endif /* ENABLE_MALICIOUS */
3115 /* Do actual logic */
3116 peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3118 LOG (GNUNET_ERROR_TYPE_DEBUG,
3119 "PULL REPLY received, got following %u peers:\n",
3120 ntohl (msg->num_peers));
3122 for (i = 0; i < ntohl (msg->num_peers); i++)
3124 LOG (GNUNET_ERROR_TYPE_DEBUG,
3127 GNUNET_i2s (&peers[i]));
3129 #ifdef ENABLE_MALICIOUS
3130 if ((NULL != att_peer_set) &&
3131 (1 == mal_type || 3 == mal_type))
3132 { /* Add attacked peer to local list */
3133 // TODO check if we sent a request and this was the first reply
3134 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3136 && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3139 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3140 tmp_att_peer->peer_id = peers[i];
3141 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3144 add_peer_array_to_set (&peers[i], 1, att_peer_set);
3148 #endif /* ENABLE_MALICIOUS */
3149 /* Make sure we 'know' about this peer */
3150 (void) insert_peer (&peers[i]);
3152 if (GNUNET_YES == check_peer_valid (&peers[i]))
3154 CustomPeerMap_put (pull_map, &peers[i]);
3158 schedule_operation (&peers[i], insert_in_pull_map);
3159 (void) issue_peer_liveliness_check (&peers[i]);
3163 UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING);
3164 clean_peer (sender);
3166 GNUNET_break_op (check_peer_known (sender));
3167 GNUNET_CADET_receive_done (channel_ctx->channel);
3172 * Compute a random delay.
3173 * A uniformly distributed value between mean + spread and mean - spread.
3175 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3176 * It would return a random value between 2 and 6 min.
3178 * @param mean the mean
3179 * @param spread the inverse amount of deviation from the mean
3181 static struct GNUNET_TIME_Relative
3182 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3183 unsigned int spread)
3185 struct GNUNET_TIME_Relative half_interval;
3186 struct GNUNET_TIME_Relative ret;
3187 unsigned int rand_delay;
3188 unsigned int max_rand_delay;
3192 LOG (GNUNET_ERROR_TYPE_WARNING,
3193 "Not accepting spread of 0\n");
3197 GNUNET_assert (0 != mean.rel_value_us);
3199 /* Compute random time value between spread * mean and spread * mean */
3200 half_interval = GNUNET_TIME_relative_divide (mean, spread);
3202 max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3204 * Compute random value between (0 and 1) * round_interval
3205 * via multiplying round_interval with a 'fraction' (0 to value)/value
3207 rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3208 ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3209 ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3210 ret = GNUNET_TIME_relative_add (ret, half_interval);
3212 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3213 LOG (GNUNET_ERROR_TYPE_WARNING,
3214 "Returning FOREVER_REL\n");
3221 * Send single pull request
3223 * @param peer_id the peer to send the pull request to.
3226 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3228 struct GNUNET_MQ_Envelope *ev;
3230 GNUNET_assert (GNUNET_NO == check_peer_flag (peer,
3231 Peers_PULL_REPLY_PENDING));
3232 SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING);
3234 LOG (GNUNET_ERROR_TYPE_DEBUG,
3235 "Going to send PULL REQUEST to peer %s.\n",
3238 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3239 send_message (peer, ev, "PULL REQUEST");
3240 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3247 * @param peer_id the peer to send the push to.
3250 send_push (const struct GNUNET_PeerIdentity *peer_id)
3252 struct GNUNET_MQ_Envelope *ev;
3254 LOG (GNUNET_ERROR_TYPE_DEBUG,
3255 "Going to send PUSH to peer %s.\n",
3256 GNUNET_i2s (peer_id));
3258 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3259 send_message (peer_id, ev, "PUSH");
3260 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3265 do_round (void *cls);
3268 do_mal_round (void *cls);
3270 #ifdef ENABLE_MALICIOUS
3274 * @brief This function is called, when the client tells us to act malicious.
3275 * It verifies that @a msg is well-formed.
3277 * @param cls the closure (#ClientContext)
3278 * @param msg the message
3279 * @return #GNUNET_OK if @a msg is well-formed
3282 check_client_act_malicious (void *cls,
3283 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3285 struct ClientContext *cli_ctx = cls;
3286 uint16_t msize = ntohs (msg->header.size);
3287 uint32_t num_peers = ntohl (msg->num_peers);
3289 msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3290 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3291 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3293 LOG (GNUNET_ERROR_TYPE_ERROR,
3294 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3295 ntohl (msg->num_peers),
3296 (msize / sizeof (struct GNUNET_PeerIdentity)));
3298 GNUNET_SERVICE_client_drop (cli_ctx->client);
3299 return GNUNET_SYSERR;
3305 * Turn RPS service to act malicious.
3307 * @param cls Closure
3308 * @param client The client that sent the message
3309 * @param msg The message header
3312 handle_client_act_malicious (void *cls,
3313 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3315 struct ClientContext *cli_ctx = cls;
3316 struct GNUNET_PeerIdentity *peers;
3317 uint32_t num_mal_peers_sent;
3318 uint32_t num_mal_peers_old;
3320 /* Do actual logic */
3321 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3322 mal_type = ntohl (msg->type);
3323 if (NULL == mal_peer_set)
3324 mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3326 LOG (GNUNET_ERROR_TYPE_DEBUG,
3327 "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3329 ntohl (msg->num_peers));
3332 { /* Try to maximise representation */
3333 /* Add other malicious peers to those we already know */
3335 num_mal_peers_sent = ntohl (msg->num_peers);
3336 num_mal_peers_old = num_mal_peers;
3337 GNUNET_array_grow (mal_peers,
3339 num_mal_peers + num_mal_peers_sent);
3340 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3342 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3344 /* Add all mal peers to mal_peer_set */
3345 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3349 /* Substitute do_round () with do_mal_round () */
3350 GNUNET_SCHEDULER_cancel (do_round_task);
3351 do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3354 else if ( (2 == mal_type) ||
3356 { /* Try to partition the network */
3357 /* Add other malicious peers to those we already know */
3359 num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3360 num_mal_peers_old = num_mal_peers;
3361 GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3362 GNUNET_array_grow (mal_peers,
3364 num_mal_peers + num_mal_peers_sent);
3365 if (NULL != mal_peers &&
3368 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3370 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3372 /* Add all mal peers to mal_peer_set */
3373 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3378 /* Store the one attacked peer */
3379 GNUNET_memcpy (&attacked_peer,
3380 &msg->attacked_peer,
3381 sizeof (struct GNUNET_PeerIdentity));
3382 /* Set the flag of the attacked peer to valid to avoid problems */
3383 if (GNUNET_NO == check_peer_known (&attacked_peer))
3385 (void) issue_peer_liveliness_check (&attacked_peer);
3388 LOG (GNUNET_ERROR_TYPE_DEBUG,
3389 "Attacked peer is %s\n",
3390 GNUNET_i2s (&attacked_peer));
3392 /* Substitute do_round () with do_mal_round () */
3393 GNUNET_SCHEDULER_cancel (do_round_task);
3394 do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3396 else if (0 == mal_type)
3397 { /* Stop acting malicious */
3398 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3400 /* Substitute do_mal_round () with do_round () */
3401 GNUNET_SCHEDULER_cancel (do_round_task);
3402 do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3407 GNUNET_SERVICE_client_continue (cli_ctx->client);
3409 GNUNET_SERVICE_client_continue (cli_ctx->client);
3414 * Send out PUSHes and PULLs maliciously.
3416 * This is executed regylary.
3419 do_mal_round (void *cls)
3421 uint32_t num_pushes;
3423 struct GNUNET_TIME_Relative time_next_round;
3424 struct AttackedPeer *tmp_att_peer;
3426 LOG (GNUNET_ERROR_TYPE_DEBUG,
3427 "Going to execute next round maliciously type %" PRIu32 ".\n",
3429 do_round_task = NULL;
3430 GNUNET_assert (mal_type <= 3);
3431 /* Do malicious actions */
3433 { /* Try to maximise representation */
3435 /* The maximum of pushes we're going to send this round */
3436 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3437 num_attacked_peers),
3438 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3440 LOG (GNUNET_ERROR_TYPE_DEBUG,
3441 "Going to send %" PRIu32 " pushes\n",
3444 /* Send PUSHes to attacked peers */
3445 for (i = 0 ; i < num_pushes ; i++)
3447 if (att_peers_tail == att_peer_index)
3448 att_peer_index = att_peers_head;
3450 att_peer_index = att_peer_index->next;
3452 send_push (&att_peer_index->peer_id);
3455 /* Send PULLs to some peers to learn about additional peers to attack */
3456 tmp_att_peer = att_peer_index;
3457 for (i = 0 ; i < num_pushes * alpha ; i++)
3459 if (att_peers_tail == tmp_att_peer)
3460 tmp_att_peer = att_peers_head;
3462 att_peer_index = tmp_att_peer->next;
3464 send_pull_request (&tmp_att_peer->peer_id);
3469 else if (2 == mal_type)
3471 * Try to partition the network
3472 * Send as many pushes to the attacked peer as possible
3473 * That is one push per round as it will ignore more.
3475 (void) issue_peer_liveliness_check (&attacked_peer);
3476 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3477 send_push (&attacked_peer);
3482 { /* Combined attack */
3484 /* Send PUSH to attacked peers */
3485 if (GNUNET_YES == check_peer_known (&attacked_peer))
3487 (void) issue_peer_liveliness_check (&attacked_peer);
3488 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3490 LOG (GNUNET_ERROR_TYPE_DEBUG,
3491 "Goding to send push to attacked peer (%s)\n",
3492 GNUNET_i2s (&attacked_peer));
3493 send_push (&attacked_peer);
3496 (void) issue_peer_liveliness_check (&attacked_peer);
3498 /* The maximum of pushes we're going to send this round */
3499 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3500 num_attacked_peers),
3501 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3503 LOG (GNUNET_ERROR_TYPE_DEBUG,
3504 "Going to send %" PRIu32 " pushes\n",
3507 for (i = 0; i < num_pushes; i++)
3509 if (att_peers_tail == att_peer_index)
3510 att_peer_index = att_peers_head;
3512 att_peer_index = att_peer_index->next;
3514 send_push (&att_peer_index->peer_id);
3517 /* Send PULLs to some peers to learn about additional peers to attack */
3518 tmp_att_peer = att_peer_index;
3519 for (i = 0; i < num_pushes * alpha; i++)
3521 if (att_peers_tail == tmp_att_peer)
3522 tmp_att_peer = att_peers_head;
3524 att_peer_index = tmp_att_peer->next;
3526 send_pull_request (&tmp_att_peer->peer_id);
3530 /* Schedule next round */
3531 time_next_round = compute_rand_delay (round_interval, 2);
3533 //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3535 GNUNET_assert (NULL == do_round_task);
3536 do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3537 &do_mal_round, NULL);
3538 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3540 #endif /* ENABLE_MALICIOUS */
3543 * Send out PUSHes and PULLs, possibly update #view, samplers.
3545 * This is executed regylary.
3548 do_round (void *cls)
3551 const struct GNUNET_PeerIdentity *view_array;
3552 unsigned int *permut;
3553 unsigned int a_peers; /* Number of peers we send pushes to */
3554 unsigned int b_peers; /* Number of peers we send pull requests to */
3555 uint32_t first_border;
3556 uint32_t second_border;
3557 struct GNUNET_PeerIdentity peer;
3558 struct GNUNET_PeerIdentity *update_peer;
3560 LOG (GNUNET_ERROR_TYPE_DEBUG,
3561 "Going to execute next round.\n");
3562 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3563 do_round_task = NULL;
3564 LOG (GNUNET_ERROR_TYPE_DEBUG,
3565 "Printing view:\n");
3566 to_file (file_name_view_log,
3567 "___ new round ___");
3568 view_array = View_get_as_array ();
3569 for (i = 0; i < View_size (); i++)
3571 LOG (GNUNET_ERROR_TYPE_DEBUG,
3572 "\t%s\n", GNUNET_i2s (&view_array[i]));
3573 to_file (file_name_view_log,
3575 GNUNET_i2s_full (&view_array[i]));
3579 /* Send pushes and pull requests */
3580 if (0 < View_size ())
3582 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3586 a_peers = ceil (alpha * View_size ());
3588 LOG (GNUNET_ERROR_TYPE_DEBUG,
3589 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3590 a_peers, alpha, View_size ());
3591 for (i = 0; i < a_peers; i++)
3593 peer = view_array[permut[i]];
3594 // FIXME if this fails schedule/loop this for later
3598 /* Send PULL requests */
3599 b_peers = ceil (beta * View_size ());
3600 first_border = a_peers;
3601 second_border = a_peers + b_peers;
3602 if (second_border > View_size ())
3604 first_border = View_size () - b_peers;
3605 second_border = View_size ();
3607 LOG (GNUNET_ERROR_TYPE_DEBUG,
3608 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3609 b_peers, beta, View_size ());
3610 for (i = first_border; i < second_border; i++)
3612 peer = view_array[permut[i]];
3613 if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3614 { // FIXME if this fails schedule/loop this for later
3615 send_pull_request (&peer);
3619 GNUNET_free (permut);
3625 /* TODO see how many peers are in push-/pull- list! */
3627 if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3628 (0 < CustomPeerMap_size (push_map)) &&
3629 (0 < CustomPeerMap_size (pull_map)))
3630 //if (GNUNET_YES) // disable blocking temporarily
3631 { /* If conditions for update are fulfilled, update */
3632 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3634 uint32_t final_size;
3635 uint32_t peers_to_clean_size;
3636 struct GNUNET_PeerIdentity *peers_to_clean;
3638 peers_to_clean = NULL;
3639 peers_to_clean_size = 0;
3640 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3641 GNUNET_memcpy (peers_to_clean,
3643 View_size () * sizeof (struct GNUNET_PeerIdentity));
3645 /* Seems like recreating is the easiest way of emptying the peermap */
3647 to_file (file_name_view_log,
3650 first_border = GNUNET_MIN (ceil (alpha * view_size_est_need),
3651 CustomPeerMap_size (push_map));
3652 second_border = first_border +
3653 GNUNET_MIN (floor (beta * view_size_est_need),
3654 CustomPeerMap_size (pull_map));
3655 final_size = second_border +
3656 ceil ((1 - (alpha + beta)) * view_size_est_need);
3657 LOG (GNUNET_ERROR_TYPE_DEBUG,
3658 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3663 /* Update view with peers received through PUSHes */
3664 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3665 CustomPeerMap_size (push_map));
3666 for (i = 0; i < first_border; i++)
3668 (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3670 to_file (file_name_view_log,
3672 GNUNET_i2s_full (&view_array[i]));
3673 // TODO change the peer_flags accordingly
3675 GNUNET_free (permut);
3678 /* Update view with peers received through PULLs */
3679 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3680 CustomPeerMap_size (pull_map));
3681 for (i = first_border; i < second_border; i++)
3683 (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3684 permut[i - first_border]));
3685 to_file (file_name_view_log,
3687 GNUNET_i2s_full (&view_array[i]));
3688 // TODO change the peer_flags accordingly
3690 GNUNET_free (permut);
3693 /* Update view with peers from history */
3694 RPS_sampler_get_n_rand_peers (prot_sampler,
3697 final_size - second_border);
3698 // TODO change the peer_flags accordingly
3700 for (i = 0; i < View_size (); i++)
3701 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3703 /* Clean peers that were removed from the view */
3704 for (i = 0; i < peers_to_clean_size; i++)
3706 to_file (file_name_view_log,
3708 GNUNET_i2s_full (&peers_to_clean[i]));
3709 clean_peer (&peers_to_clean[i]);
3712 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3713 clients_notify_view_update();
3715 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3716 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3717 if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3718 !(0 >= CustomPeerMap_size (pull_map)))
3719 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3720 if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3721 (0 >= CustomPeerMap_size (pull_map)))
3722 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3723 if (0 >= CustomPeerMap_size (push_map) &&
3724 !(0 >= CustomPeerMap_size (pull_map)))
3725 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3726 if (0 >= CustomPeerMap_size (push_map) &&
3727 (0 >= CustomPeerMap_size (pull_map)))
3728 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3729 if (0 >= CustomPeerMap_size (pull_map) &&
3730 CustomPeerMap_size (push_map) > alpha * View_size () &&
3731 0 >= CustomPeerMap_size (push_map))
3732 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3734 // TODO independent of that also get some peers from CADET_get_peers()?
3735 GNUNET_STATISTICS_set (stats,
3736 "# peers in push map at end of round",
3737 CustomPeerMap_size (push_map),
3739 GNUNET_STATISTICS_set (stats,
3740 "# peers in pull map at end of round",
3741 CustomPeerMap_size (pull_map),
3743 GNUNET_STATISTICS_set (stats,
3744 "# peers in view at end of round",
3748 LOG (GNUNET_ERROR_TYPE_DEBUG,
3749 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3750 CustomPeerMap_size (push_map),
3751 CustomPeerMap_size (pull_map),
3754 alpha * View_size ());
3756 /* Update samplers */
3757 for (i = 0; i < CustomPeerMap_size (push_map); i++)
3759 update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3760 LOG (GNUNET_ERROR_TYPE_DEBUG,
3761 "Updating with peer %s from push list\n",
3762 GNUNET_i2s (update_peer));
3763 insert_in_sampler (NULL, update_peer);
3764 clean_peer (update_peer); /* This cleans only if it is not in the view */
3767 for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3769 LOG (GNUNET_ERROR_TYPE_DEBUG,
3770 "Updating with peer %s from pull list\n",
3771 GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3772 insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3773 /* This cleans only if it is not in the view */
3774 clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3778 /* Empty push/pull lists */
3779 CustomPeerMap_clear (push_map);
3780 CustomPeerMap_clear (pull_map);
3782 struct GNUNET_TIME_Relative time_next_round;
3784 time_next_round = compute_rand_delay (round_interval, 2);
3786 /* Schedule next round */
3787 do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3789 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3794 * This is called from GNUNET_CADET_get_peers().
3796 * It is called on every peer(ID) that cadet somehow has contact with.
3797 * We use those to initialise the sampler.
3800 init_peer_cb (void *cls,
3801 const struct GNUNET_PeerIdentity *peer,
3802 int tunnel, // "Do we have a tunnel towards this peer?"
3803 unsigned int n_paths, // "Number of known paths towards this peer"
3804 unsigned int best_path) // "How long is the best path?
3805 // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3809 LOG (GNUNET_ERROR_TYPE_DEBUG,
3810 "Got peer_id %s from cadet\n",
3817 * @brief Iterator function over stored, valid peers.
3819 * We initialise the sampler with those.
3821 * @param cls the closure
3822 * @param peer the peer id
3823 * @return #GNUNET_YES if we should continue to
3825 * #GNUNET_NO if not.
3828 valid_peers_iterator (void *cls,
3829 const struct GNUNET_PeerIdentity *peer)
3833 LOG (GNUNET_ERROR_TYPE_DEBUG,
3834 "Got stored, valid peer %s\n",
3843 * Iterator over peers from peerinfo.
3845 * @param cls closure
3846 * @param peer id of the peer, NULL for last call
3847 * @param hello hello message for the peer (can be NULL)
3848 * @param error message
3851 process_peerinfo_peers (void *cls,
3852 const struct GNUNET_PeerIdentity *peer,
3853 const struct GNUNET_HELLO_Message *hello,
3854 const char *err_msg)
3858 LOG (GNUNET_ERROR_TYPE_DEBUG,
3859 "Got peer_id %s from peerinfo\n",
3867 * Task run during shutdown.
3872 shutdown_task (void *cls)
3874 struct ClientContext *client_ctx;
3875 struct ReplyCls *reply_cls;
3877 in_shutdown = GNUNET_YES;
3879 LOG (GNUNET_ERROR_TYPE_DEBUG,
3880 "RPS is going down\n");
3882 /* Clean all clients */
3883 for (client_ctx = cli_ctx_head;
3884 NULL != cli_ctx_head;
3885 client_ctx = cli_ctx_head)
3887 /* Clean pending requests to the sampler */
3888 for (reply_cls = client_ctx->rep_cls_head;
3889 NULL != client_ctx->rep_cls_head;
3890 reply_cls = client_ctx->rep_cls_head)
3892 RPS_sampler_request_cancel (reply_cls->req_handle);
3893 GNUNET_CONTAINER_DLL_remove (client_ctx->rep_cls_head,
3894 client_ctx->rep_cls_tail,
3896 GNUNET_free (reply_cls);
3898 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3901 GNUNET_free (client_ctx);
3903 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3904 GNUNET_PEERINFO_disconnect (peerinfo_handle);
3905 peerinfo_handle = NULL;
3906 if (NULL != do_round_task)
3908 GNUNET_SCHEDULER_cancel (do_round_task);
3909 do_round_task = NULL;
3914 GNUNET_NSE_disconnect (nse);
3915 RPS_sampler_destroy (prot_sampler);
3916 RPS_sampler_destroy (client_sampler);
3917 GNUNET_CADET_close_port (cadet_port);
3918 GNUNET_CADET_disconnect (cadet_handle);
3919 cadet_handle = NULL;
3921 CustomPeerMap_destroy (push_map);
3922 CustomPeerMap_destroy (pull_map);
3925 GNUNET_STATISTICS_destroy (stats,
3929 #ifdef ENABLE_MALICIOUS
3930 struct AttackedPeer *tmp_att_peer;
3931 /* it is ok to free this const during shutdown: */
3932 GNUNET_free ((char *) file_name_view_log);
3934 GNUNET_free ((char *) file_name_observed_log);
3935 GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
3936 #endif /* TO_FILE */
3937 GNUNET_array_grow (mal_peers,
3940 if (NULL != mal_peer_set)
3941 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3942 if (NULL != att_peer_set)
3943 GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
3944 while (NULL != att_peers_head)
3946 tmp_att_peer = att_peers_head;
3947 GNUNET_CONTAINER_DLL_remove (att_peers_head,
3950 GNUNET_free (tmp_att_peer);
3952 #endif /* ENABLE_MALICIOUS */
3957 * Handle client connecting to the service.
3960 * @param client the new client
3961 * @param mq the message queue of @a client
3965 client_connect_cb (void *cls,
3966 struct GNUNET_SERVICE_Client *client,
3967 struct GNUNET_MQ_Handle *mq)
3969 struct ClientContext *cli_ctx;
3971 LOG (GNUNET_ERROR_TYPE_DEBUG,
3972 "Client connected\n");
3974 return client; /* Server was destroyed before a client connected. Shutting down */
3975 cli_ctx = GNUNET_new (struct ClientContext);
3976 cli_ctx->mq = GNUNET_SERVICE_client_get_mq (client);
3977 cli_ctx->view_updates_left = -1;
3978 cli_ctx->client = client;
3979 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
3986 * Callback called when a client disconnected from the service
3988 * @param cls closure for the service
3989 * @param c the client that disconnected
3990 * @param internal_cls should be equal to @a c
3993 client_disconnect_cb (void *cls,
3994 struct GNUNET_SERVICE_Client *client,
3997 struct ClientContext *cli_ctx = internal_cls;
4000 GNUNET_assert (client == cli_ctx->client);
4002 {/* shutdown task - destroy all clients */
4003 while (NULL != cli_ctx_head)
4004 destroy_cli_ctx (cli_ctx_head);
4007 { /* destroy this client */
4008 LOG (GNUNET_ERROR_TYPE_DEBUG,
4009 "Client disconnected. Destroy its context.\n");
4010 destroy_cli_ctx (cli_ctx);
4016 * Handle random peer sampling clients.
4018 * @param cls closure
4019 * @param c configuration to use
4020 * @param service the initialized service
4024 const struct GNUNET_CONFIGURATION_Handle *c,
4025 struct GNUNET_SERVICE_Handle *service)
4027 char *fn_valid_peers;
4031 GNUNET_log_setup ("rps",
4032 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4036 GNUNET_CRYPTO_get_peer_identity (cfg,
4037 &own_identity); // TODO check return value
4038 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4039 "STARTING SERVICE (rps) for peer [%s]\n",
4040 GNUNET_i2s (&own_identity));
4041 #ifdef ENABLE_MALICIOUS
4042 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4043 "Malicious execution compiled in.\n");
4044 #endif /* ENABLE_MALICIOUS */
4046 /* Get time interval from the configuration */
4048 GNUNET_CONFIGURATION_get_value_time (cfg,
4053 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4054 "RPS", "ROUNDINTERVAL");
4055 GNUNET_SCHEDULER_shutdown ();
4059 /* Get initial size of sampler/view from the configuration */
4061 GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
4062 (long long unsigned int *) &sampler_size_est_min))
4064 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4066 GNUNET_SCHEDULER_shutdown ();
4069 sampler_size_est_need = sampler_size_est_min;
4070 view_size_est_min = sampler_size_est_min;
4071 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
4074 GNUNET_CONFIGURATION_get_value_filename (cfg,
4076 "FILENAME_VALID_PEERS",
4079 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4080 "rps", "FILENAME_VALID_PEERS");
4084 View_create (view_size_est_min);
4086 /* file_name_view_log */
4087 file_name_view_log = store_prefix_file_name (&own_identity, "view");
4089 file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
4090 observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4091 #endif /* TO_FILE */
4093 /* connect to NSE */
4094 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4101 /* Initialise cadet */
4102 /* There exists a copy-paste-clone in get_channel() */
4103 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4104 GNUNET_MQ_hd_fixed_size (peer_check,
4105 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4106 struct GNUNET_MessageHeader,
4108 GNUNET_MQ_hd_fixed_size (peer_push,
4109 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4110 struct GNUNET_MessageHeader,
4112 GNUNET_MQ_hd_fixed_size (peer_pull_request,
4113 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4114 struct GNUNET_MessageHeader,
4116 GNUNET_MQ_hd_var_size (peer_pull_reply,
4117 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4118 struct GNUNET_RPS_P2P_PullReplyMessage,
4120 GNUNET_MQ_handler_end ()
4123 cadet_handle = GNUNET_CADET_connect (cfg);
4124 GNUNET_assert (NULL != cadet_handle);
4125 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4126 strlen (GNUNET_APPLICATION_PORT_RPS),
4128 cadet_port = GNUNET_CADET_open_port (cadet_handle,
4130 &handle_inbound_channel, /* Connect handler */
4132 NULL, /* WindowSize handler */
4133 &cleanup_destroyed_channel, /* Disconnect handler */
4135 if (NULL == cadet_port)
4137 LOG (GNUNET_ERROR_TYPE_ERROR,
4138 "Cadet port `%s' is already in use.\n",
4139 GNUNET_APPLICATION_PORT_RPS);
4144 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4145 initialise_peers (fn_valid_peers, cadet_handle);
4146 GNUNET_free (fn_valid_peers);
4148 /* Initialise sampler */
4149 struct GNUNET_TIME_Relative half_round_interval;
4150 struct GNUNET_TIME_Relative max_round_interval;
4152 half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4153 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4155 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
4156 client_sampler = RPS_sampler_mod_init (sampler_size_est_need, max_round_interval);
4158 /* Initialise push and pull maps */
4159 push_map = CustomPeerMap_create (4);
4160 pull_map = CustomPeerMap_create (4);
4163 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4164 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4165 // TODO send push/pull to each of those peers?
4166 // TODO read stored valid peers from last run
4167 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4168 get_valid_peers (valid_peers_iterator, NULL);
4170 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4172 process_peerinfo_peers,
4175 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4177 do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4178 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4180 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4181 stats = GNUNET_STATISTICS_create ("rps", cfg);
4187 * Define "main" method using service macro.
4191 GNUNET_SERVICE_OPTION_NONE,
4194 &client_disconnect_cb,
4196 GNUNET_MQ_hd_fixed_size (client_request,
4197 GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST,
4198 struct GNUNET_RPS_CS_RequestMessage,
4200 GNUNET_MQ_hd_fixed_size (client_request_cancel,
4201 GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL,
4202 struct GNUNET_RPS_CS_RequestCancelMessage,
4204 GNUNET_MQ_hd_var_size (client_seed,
4205 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4206 struct GNUNET_RPS_CS_SeedMessage,
4208 #ifdef ENABLE_MALICIOUS
4209 GNUNET_MQ_hd_var_size (client_act_malicious,
4210 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4211 struct GNUNET_RPS_CS_ActMaliciousMessage,
4213 #endif /* ENABLE_MALICIOUS */
4214 GNUNET_MQ_hd_fixed_size (client_view_request,
4215 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4216 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4218 GNUNET_MQ_handler_end());
4220 /* end of gnunet-service-rps.c */