2 This file is part of GNUnet.
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file rps/gnunet-service-rps.c
23 * @brief rps service implementation
24 * @author Julius Bünger
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
46 // TODO check for overflows
48 // TODO align message structs
50 // TODO connect to friends
52 // TODO blacklist? (-> mal peer detection on top of brahms)
54 // hist_size_init, hist_size_max
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
61 * Set a peer flag of given peer context.
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
66 * Get peer flag of given peer context.
68 #define check_peer_flag_set(peer_ctx, mask) \
69 ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
72 * Unset flag of given peer context.
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
77 * Get channel flag of given channel context.
79 #define check_channel_flag_set(channel_flags, mask) \
80 ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
83 * Unset flag of given channel context.
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
90 * Pending operation on peer consisting of callback and closure
92 * When an operation cannot be executed right now this struct is used to store
93 * the callback and closure for later execution.
95 struct PeerPendingOp {
108 * List containing all messages that are yet to be send
110 * This is used to keep track of all messages that have not been sent yet. When
111 * a peer is to be removed the pending messages can be removed properly.
113 struct PendingMessage {
117 struct PendingMessage *next;
118 struct PendingMessage *prev;
121 * The envelope to the corresponding message
123 struct GNUNET_MQ_Envelope *ev;
126 * The corresponding context
128 struct PeerContext *peer_ctx;
137 * @brief Context for a channel
142 * Struct used to keep track of other peer's status
144 * This is stored in a multipeermap.
145 * It contains information such as cadet channels, a message queue for sending,
146 * status about the channels, the pending operations on this peer and some flags
147 * about the status of the peer itself. (online, valid, ...)
151 * The Sub this context belongs to.
156 * Message queue open to client
158 struct GNUNET_MQ_Handle *mq;
161 * Channel open to client.
163 struct ChannelCtx *send_channel_ctx;
166 * Channel open from client.
168 struct ChannelCtx *recv_channel_ctx;
171 * Array of pending operations on this peer.
173 struct PeerPendingOp *pending_ops;
176 * Handle to the callback given to cadet_ntfy_tmt_rdy()
178 * To be canceled on shutdown.
180 struct PendingMessage *online_check_pending;
183 * Number of pending operations.
185 unsigned int num_pending_ops;
188 * Identity of the peer
190 struct GNUNET_PeerIdentity peer_id;
193 * Flags indicating status of peer
198 * Last time we received something from that peer.
200 struct GNUNET_TIME_Absolute last_message_recv;
203 * Last time we received a keepalive message.
205 struct GNUNET_TIME_Absolute last_keepalive;
208 * DLL with all messages that are yet to be sent
210 struct PendingMessage *pending_messages_head;
211 struct PendingMessage *pending_messages_tail;
214 * This is pobably followed by 'statistical' data (when we first saw
215 * it, how did we get its ID, how many pushes (in a timeinterval),
218 uint32_t round_pull_req;
222 * @brief Closure to #valid_peer_iterator
224 struct PeersIteratorCls {
228 PeersIterator iterator;
231 * Closure to iterator
237 * @brief Context for a channel
241 * @brief The channel itself
243 struct GNUNET_CADET_Channel *channel;
246 * @brief The peer context associated with the channel
248 struct PeerContext *peer_ctx;
251 * @brief When channel destruction needs to be delayed (because it is called
252 * from within the cadet routine of another channel destruction) this task
253 * refers to the respective _SCHEDULER_Task.
255 struct GNUNET_SCHEDULER_Task *destruction_task;
262 * If type is 2 This struct is used to store the attacked peers in a DLL
264 struct AttackedPeer {
268 struct AttackedPeer *next;
269 struct AttackedPeer *prev;
274 struct GNUNET_PeerIdentity peer_id;
277 #endif /* ENABLE_MALICIOUS */
280 * @brief This number determines the number of slots for files that represent
283 #define HISTOGRAM_FILE_SLOTS 32
286 * @brief The size (in bytes) a file needs to store the histogram
288 * Per slot: 1 newline, up to 4 chars,
289 * Additionally: 1 null termination
291 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
296 * Essentially one instance of brahms that only connects to other instances
297 * with the same (secret) value.
301 * @brief Hash of the shared value that defines Subs.
303 struct GNUNET_HashCode hash;
306 * @brief Port to communicate to other peers.
308 struct GNUNET_CADET_Port *cadet_port;
311 * @brief Hashmap of valid peers.
313 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
316 * @brief Filename of the file that stores the valid peers persistently.
318 char *filename_valid_peers;
321 * Set of all peers to keep track of them.
323 struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
326 * @brief This is the minimum estimate used as sampler size.
328 * It is configured by the user.
330 unsigned int sampler_size_est_min;
333 * The size of sampler we need to be able to satisfy the Brahms protocol's
334 * need of random peers.
336 * This is one minimum size the sampler grows to.
338 unsigned int sampler_size_est_need;
341 * Time interval the do_round task runs in.
343 struct GNUNET_TIME_Relative round_interval;
346 * Sampler used for the Brahms protocol itself.
348 struct RPS_Sampler *sampler;
352 * Name to log view to
354 char *file_name_view_log;
355 #endif /* TO_FILE_FULL */
360 * Name to log number of observed peers to
362 char *file_name_observed_log;
363 #endif /* TO_FILE_FULL */
366 * @brief Count the observed peers
368 uint32_t num_observed_peers;
371 * @brief Multipeermap (ab-) used to count unique peer_ids
373 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
377 * List to store peers received through pushes temporary.
379 struct CustomPeerMap *push_map;
382 * List to store peers received through pulls temporary.
384 struct CustomPeerMap *pull_map;
387 * @brief This is the estimate used as view size.
389 * It is initialised with the minimum
391 unsigned int view_size_est_need;
394 * @brief This is the minimum estimate used as view size.
396 * It is configured by the user.
398 unsigned int view_size_est_min;
406 * Identifier for the main task that runs periodically.
408 struct GNUNET_SCHEDULER_Task *do_round_task;
413 * @brief Counts the executed rounds.
418 * @brief This array accumulates the number of received pushes per round.
420 * Number at index i represents the number of rounds with i observed pushes.
422 uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
425 * @brief Histogram of deltas between the expected and actual number of
428 * As half of the entries are expected to be negative, this is shifted by
429 * #HISTOGRAM_FILE_SLOTS/2.
431 uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
434 * @brief Number of pull replies with this delay measured in rounds.
436 * Number at index i represents the number of pull replies with a delay of i
439 uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
443 /***********************************************************************
445 ***********************************************************************/
450 static const struct GNUNET_CONFIGURATION_Handle *cfg;
453 * Handle to the statistics service.
455 struct GNUNET_STATISTICS_Handle *stats;
460 struct GNUNET_CADET_Handle *cadet_handle;
465 struct GNUNET_CORE_Handle *core_handle;
468 * @brief PeerMap to keep track of connected peers.
470 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
475 static struct GNUNET_PeerIdentity own_identity;
478 * Percentage of total peer number in the view
479 * to send random PUSHes to
484 * Percentage of total peer number in the view
485 * to send random PULLs to
492 static struct GNUNET_NSE_Handle *nse;
495 * Handler to PEERINFO.
497 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
500 * Handle for cancellation of iteration over peers.
502 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
507 * Type of malicious peer
509 * 0 Don't act malicious at all - Default
510 * 1 Try to maximise representation
511 * 2 Try to partition the network
514 static uint32_t mal_type;
517 * Other malicious peers
519 static struct GNUNET_PeerIdentity *mal_peers;
522 * Hashmap of malicious peers used as set.
523 * Used to more efficiently check whether we know that peer.
525 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
528 * Number of other malicious peers
530 static uint32_t num_mal_peers;
534 * If type is 2 this is the DLL of attacked peers
536 static struct AttackedPeer *att_peers_head;
537 static struct AttackedPeer *att_peers_tail;
540 * This index is used to point to an attacked peer to
541 * implement the round-robin-ish way to select attacked peers.
543 static struct AttackedPeer *att_peer_index;
546 * Hashmap of attacked peers used as set.
547 * Used to more efficiently check whether we know that peer.
549 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
552 * Number of attacked peers
554 static uint32_t num_attacked_peers;
557 * If type is 1 this is the attacked peer
559 static struct GNUNET_PeerIdentity attacked_peer;
562 * The limit of PUSHes we can send in one round.
563 * This is an assumption of the Brahms protocol and either implemented
566 * assumend to be the bandwidth limitation.
568 static uint32_t push_limit = 10000;
569 #endif /* ENABLE_MALICIOUS */
574 * This is run in any case by all peers and connects to all peers without
575 * specifying a shared value.
577 static struct Sub *msub;
580 * @brief Maximum number of valid peers to keep.
581 * TODO read from config
583 static const uint32_t num_valid_peers_max = UINT32_MAX;
585 /***********************************************************************
587 ***********************************************************************/
594 do_mal_round(void *cls);
598 * @brief Get the #PeerContext associated with a peer
600 * @param peer_map The peer map containing the context
601 * @param peer the peer id
603 * @return the #PeerContext
605 static struct PeerContext *
606 get_peer_ctx(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
607 const struct GNUNET_PeerIdentity *peer)
609 struct PeerContext *ctx;
612 ret = GNUNET_CONTAINER_multipeermap_contains(peer_map, peer);
613 GNUNET_assert(GNUNET_YES == ret);
614 ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
615 GNUNET_assert(NULL != ctx);
620 * @brief Check whether we have information about the given peer.
622 * FIXME probably deprecated. Make this the new _online.
624 * @param peer_map The peer map to check for the existence of @a peer
625 * @param peer peer in question
627 * @return #GNUNET_YES if peer is known
628 * #GNUNET_NO if peer is not knwon
631 check_peer_known(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
632 const struct GNUNET_PeerIdentity *peer)
634 if (NULL != peer_map)
636 return GNUNET_CONTAINER_multipeermap_contains(peer_map, peer);
646 * @brief Create a new #PeerContext and insert it into the peer map
648 * @param sub The Sub this context belongs to.
649 * @param peer the peer to create the #PeerContext for
651 * @return the #PeerContext
653 static struct PeerContext *
654 create_peer_ctx(struct Sub *sub,
655 const struct GNUNET_PeerIdentity *peer)
657 struct PeerContext *ctx;
660 GNUNET_assert(GNUNET_NO == check_peer_known(sub->peer_map, peer));
662 ctx = GNUNET_new(struct PeerContext);
663 ctx->peer_id = *peer;
665 ret = GNUNET_CONTAINER_multipeermap_put(sub->peer_map, peer, ctx,
666 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
667 GNUNET_assert(GNUNET_OK == ret);
670 GNUNET_STATISTICS_set(stats,
672 GNUNET_CONTAINER_multipeermap_size(sub->peer_map),
680 * @brief Create or get a #PeerContext
682 * @param sub The Sub to which the created context belongs to
683 * @param peer the peer to get the associated context to
685 * @return the context
687 static struct PeerContext *
688 create_or_get_peer_ctx(struct Sub *sub,
689 const struct GNUNET_PeerIdentity *peer)
691 if (GNUNET_NO == check_peer_known(sub->peer_map, peer))
693 return create_peer_ctx(sub, peer);
695 return get_peer_ctx(sub->peer_map, peer);
700 * @brief Check whether we have a connection to this @a peer
702 * Also sets the #Peers_ONLINE flag accordingly
704 * @param peer_ctx Context of the peer of which connectivity is to be checked
706 * @return #GNUNET_YES if we are connected
707 * #GNUNET_NO otherwise
710 check_connected(struct PeerContext *peer_ctx)
712 /* If we don't know about this peer we don't know whether it's online */
713 if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
718 /* Get the context */
719 peer_ctx = get_peer_ctx(peer_ctx->sub->peer_map, &peer_ctx->peer_id);
720 /* If we have no channel to this peer we don't know whether it's online */
721 if ((NULL == peer_ctx->send_channel_ctx) &&
722 (NULL == peer_ctx->recv_channel_ctx))
724 UNSET_PEER_FLAG(peer_ctx, Peers_ONLINE);
727 /* Otherwise (if we have a channel, we know that it's online */
728 SET_PEER_FLAG(peer_ctx, Peers_ONLINE);
734 * @brief The closure to #get_rand_peer_iterator.
736 struct GetRandPeerIteratorCls {
738 * @brief The index of the peer to return.
739 * Will be decreased until 0.
740 * Then current peer is returned.
745 * @brief Pointer to peer to return.
747 const struct GNUNET_PeerIdentity *peer;
752 * @brief Iterator function for #get_random_peer_from_peermap.
754 * Implements #GNUNET_CONTAINER_PeerMapIterator.
755 * Decreases the index until the index is null.
756 * Then returns the current peer.
758 * @param cls the #GetRandPeerIteratorCls containing index and peer
759 * @param peer current peer
760 * @param value unused
762 * @return #GNUNET_YES if we should continue to
767 get_rand_peer_iterator(void *cls,
768 const struct GNUNET_PeerIdentity *peer,
771 struct GetRandPeerIteratorCls *iterator_cls = cls;
775 if (0 >= iterator_cls->index)
777 iterator_cls->peer = peer;
780 iterator_cls->index--;
786 * @brief Get a random peer from @a peer_map
788 * @param valid_peers Peer map containing valid peers from which to select a
791 * @return a random peer
793 static const struct GNUNET_PeerIdentity *
794 get_random_peer_from_peermap(struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
796 struct GetRandPeerIteratorCls *iterator_cls;
797 const struct GNUNET_PeerIdentity *ret;
799 iterator_cls = GNUNET_new(struct GetRandPeerIteratorCls);
800 iterator_cls->index = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
801 GNUNET_CONTAINER_multipeermap_size(valid_peers));
802 (void)GNUNET_CONTAINER_multipeermap_iterate(valid_peers,
803 get_rand_peer_iterator,
805 ret = iterator_cls->peer;
806 GNUNET_free(iterator_cls);
812 * @brief Add a given @a peer to valid peers.
814 * If valid peers are already #num_valid_peers_max, delete a peer previously.
816 * @param peer The peer that is added to the valid peers.
817 * @param valid_peers Peer map of valid peers to which to add the @a peer
819 * @return #GNUNET_YES if no other peer had to be removed
820 * #GNUNET_NO otherwise
823 add_valid_peer(const struct GNUNET_PeerIdentity *peer,
824 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
826 const struct GNUNET_PeerIdentity *rand_peer;
830 /* Remove random peers until there is space for a new one */
831 while (num_valid_peers_max <=
832 GNUNET_CONTAINER_multipeermap_size(valid_peers))
834 rand_peer = get_random_peer_from_peermap(valid_peers);
835 GNUNET_CONTAINER_multipeermap_remove_all(valid_peers, rand_peer);
838 (void)GNUNET_CONTAINER_multipeermap_put(valid_peers, peer, NULL,
839 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
840 if (valid_peers == msub->valid_peers)
842 GNUNET_STATISTICS_set(stats,
844 GNUNET_CONTAINER_multipeermap_size(valid_peers),
851 remove_pending_message(struct PendingMessage *pending_msg, int cancel);
854 * @brief Set the peer flag to living and
855 * call the pending operations on this peer.
857 * Also adds peer to #valid_peers.
859 * @param peer_ctx the #PeerContext of the peer to set online
862 set_peer_online(struct PeerContext *peer_ctx)
864 struct GNUNET_PeerIdentity *peer;
867 peer = &peer_ctx->peer_id;
868 LOG(GNUNET_ERROR_TYPE_DEBUG,
869 "Peer %s is online and valid, calling %i pending operations on it\n",
871 peer_ctx->num_pending_ops);
873 if (NULL != peer_ctx->online_check_pending)
875 LOG(GNUNET_ERROR_TYPE_DEBUG,
876 "Removing pending online check for peer %s\n",
877 GNUNET_i2s(&peer_ctx->peer_id));
878 // TODO wait until cadet sets mq->cancel_impl
879 //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
880 remove_pending_message(peer_ctx->online_check_pending, GNUNET_YES);
881 peer_ctx->online_check_pending = NULL;
884 SET_PEER_FLAG(peer_ctx, Peers_ONLINE);
886 /* Call pending operations */
887 for (i = 0; i < peer_ctx->num_pending_ops; i++)
889 peer_ctx->pending_ops[i].op(peer_ctx->pending_ops[i].op_cls, peer);
891 GNUNET_array_grow(peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
895 cleanup_destroyed_channel(void *cls,
896 const struct GNUNET_CADET_Channel *channel);
898 /* Declaration of handlers */
900 handle_peer_check(void *cls,
901 const struct GNUNET_MessageHeader *msg);
904 handle_peer_push(void *cls,
905 const struct GNUNET_MessageHeader *msg);
908 handle_peer_pull_request(void *cls,
909 const struct GNUNET_MessageHeader *msg);
912 check_peer_pull_reply(void *cls,
913 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
916 handle_peer_pull_reply(void *cls,
917 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
919 /* End declaration of handlers */
922 * @brief Allocate memory for a new channel context and insert it into DLL
924 * @param peer_ctx context of the according peer
926 * @return The channel context
928 static struct ChannelCtx *
929 add_channel_ctx(struct PeerContext *peer_ctx)
931 struct ChannelCtx *channel_ctx;
933 channel_ctx = GNUNET_new(struct ChannelCtx);
934 channel_ctx->peer_ctx = peer_ctx;
940 * @brief Free memory and NULL pointers.
942 * @param channel_ctx The channel context.
945 remove_channel_ctx(struct ChannelCtx *channel_ctx)
947 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
949 if (NULL != channel_ctx->destruction_task)
951 GNUNET_SCHEDULER_cancel(channel_ctx->destruction_task);
952 channel_ctx->destruction_task = NULL;
955 GNUNET_free(channel_ctx);
957 if (NULL == peer_ctx)
959 if (channel_ctx == peer_ctx->send_channel_ctx)
961 peer_ctx->send_channel_ctx = NULL;
964 else if (channel_ctx == peer_ctx->recv_channel_ctx)
966 peer_ctx->recv_channel_ctx = NULL;
972 * @brief Get the channel of a peer. If not existing, create.
974 * @param peer_ctx Context of the peer of which to get the channel
975 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
977 struct GNUNET_CADET_Channel *
978 get_channel(struct PeerContext *peer_ctx)
980 /* There exists a copy-paste-clone in run() */
981 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
982 GNUNET_MQ_hd_fixed_size(peer_check,
983 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
984 struct GNUNET_MessageHeader,
986 GNUNET_MQ_hd_fixed_size(peer_push,
987 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
988 struct GNUNET_MessageHeader,
990 GNUNET_MQ_hd_fixed_size(peer_pull_request,
991 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
992 struct GNUNET_MessageHeader,
994 GNUNET_MQ_hd_var_size(peer_pull_reply,
995 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
996 struct GNUNET_RPS_P2P_PullReplyMessage,
998 GNUNET_MQ_handler_end()
1002 if (NULL == peer_ctx->send_channel_ctx)
1004 LOG(GNUNET_ERROR_TYPE_DEBUG,
1005 "Trying to establish channel to peer %s\n",
1006 GNUNET_i2s(&peer_ctx->peer_id));
1007 peer_ctx->send_channel_ctx = add_channel_ctx(peer_ctx);
1008 peer_ctx->send_channel_ctx->channel =
1009 GNUNET_CADET_channel_create(cadet_handle,
1010 peer_ctx->send_channel_ctx, /* context */
1012 &peer_ctx->sub->hash,
1013 NULL, /* WindowSize handler */
1014 &cleanup_destroyed_channel, /* Disconnect handler */
1017 GNUNET_assert(NULL != peer_ctx->send_channel_ctx);
1018 GNUNET_assert(NULL != peer_ctx->send_channel_ctx->channel);
1019 return peer_ctx->send_channel_ctx->channel;
1024 * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1026 * If we already have a message queue open to this client,
1027 * simply return it, otherways create one.
1029 * @param peer_ctx Context of the peer of whicht to get the mq
1030 * @return the #GNUNET_MQ_Handle
1032 static struct GNUNET_MQ_Handle *
1033 get_mq(struct PeerContext *peer_ctx)
1035 if (NULL == peer_ctx->mq)
1037 peer_ctx->mq = GNUNET_CADET_get_mq(get_channel(peer_ctx));
1039 return peer_ctx->mq;
1043 * @brief Add an envelope to a message passed to mq to list of pending messages
1045 * @param peer_ctx Context of the peer for which to insert the envelope
1046 * @param ev envelope to the message
1047 * @param type type of the message to be sent
1048 * @return pointer to pending message
1050 static struct PendingMessage *
1051 insert_pending_message(struct PeerContext *peer_ctx,
1052 struct GNUNET_MQ_Envelope *ev,
1055 struct PendingMessage *pending_msg;
1057 pending_msg = GNUNET_new(struct PendingMessage);
1058 pending_msg->ev = ev;
1059 pending_msg->peer_ctx = peer_ctx;
1060 pending_msg->type = type;
1061 GNUNET_CONTAINER_DLL_insert(peer_ctx->pending_messages_head,
1062 peer_ctx->pending_messages_tail,
1069 * @brief Remove a pending message from the respective DLL
1071 * @param pending_msg the pending message to remove
1072 * @param cancel whether to cancel the pending message, too
1075 remove_pending_message(struct PendingMessage *pending_msg, int cancel)
1077 struct PeerContext *peer_ctx;
1081 peer_ctx = pending_msg->peer_ctx;
1082 GNUNET_assert(NULL != peer_ctx);
1083 GNUNET_CONTAINER_DLL_remove(peer_ctx->pending_messages_head,
1084 peer_ctx->pending_messages_tail,
1086 // TODO wait for the cadet implementation of message cancellation
1087 //if (GNUNET_YES == cancel)
1089 // GNUNET_MQ_send_cancel (pending_msg->ev);
1091 GNUNET_free(pending_msg);
1096 * @brief This is called in response to the first message we sent as a
1099 * @param cls #PeerContext of peer with pending online check
1102 mq_online_check_successful(void *cls)
1104 struct PeerContext *peer_ctx = cls;
1106 if (NULL != peer_ctx->online_check_pending)
1108 LOG(GNUNET_ERROR_TYPE_DEBUG,
1109 "Online check for peer %s was successfull\n",
1110 GNUNET_i2s(&peer_ctx->peer_id));
1111 remove_pending_message(peer_ctx->online_check_pending, GNUNET_YES);
1112 peer_ctx->online_check_pending = NULL;
1113 set_peer_online(peer_ctx);
1114 (void)add_valid_peer(&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1119 * Issue a check whether peer is online
1121 * @param peer_ctx the context of the peer
1124 check_peer_online(struct PeerContext *peer_ctx)
1126 LOG(GNUNET_ERROR_TYPE_DEBUG,
1127 "Get informed about peer %s getting online\n",
1128 GNUNET_i2s(&peer_ctx->peer_id));
1130 struct GNUNET_MQ_Handle *mq;
1131 struct GNUNET_MQ_Envelope *ev;
1133 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1134 peer_ctx->online_check_pending =
1135 insert_pending_message(peer_ctx, ev, "Check online");
1136 mq = get_mq(peer_ctx);
1137 GNUNET_MQ_notify_sent(ev,
1138 mq_online_check_successful,
1140 GNUNET_MQ_send(mq, ev);
1141 if (peer_ctx->sub == msub)
1143 GNUNET_STATISTICS_update(stats,
1144 "# pending online checks",
1152 * @brief Check whether function of type #PeerOp was already scheduled
1154 * The array with pending operations will probably never grow really big, so
1155 * iterating over it should be ok.
1157 * @param peer_ctx Context of the peer to check for the operation
1158 * @param peer_op the operation (#PeerOp) on the peer
1160 * @return #GNUNET_YES if this operation is scheduled on that peer
1161 * #GNUNET_NO otherwise
1164 check_operation_scheduled(const struct PeerContext *peer_ctx,
1165 const PeerOp peer_op)
1169 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1170 if (peer_op == peer_ctx->pending_ops[i].op)
1177 * @brief Callback for scheduler to destroy a channel
1179 * @param cls Context of the channel
1182 destroy_channel(struct ChannelCtx *channel_ctx)
1184 struct GNUNET_CADET_Channel *channel;
1186 if (NULL != channel_ctx->destruction_task)
1188 GNUNET_SCHEDULER_cancel(channel_ctx->destruction_task);
1189 channel_ctx->destruction_task = NULL;
1191 GNUNET_assert(channel_ctx->channel != NULL);
1192 channel = channel_ctx->channel;
1193 channel_ctx->channel = NULL;
1194 GNUNET_CADET_channel_destroy(channel);
1195 remove_channel_ctx(channel_ctx);
1200 * @brief Destroy a cadet channel.
1202 * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1207 destroy_channel_cb(void *cls)
1209 struct ChannelCtx *channel_ctx = cls;
1211 channel_ctx->destruction_task = NULL;
1212 destroy_channel(channel_ctx);
1217 * @brief Schedule the destruction of a channel for immediately afterwards.
1219 * In case a channel is to be destroyed from within the callback to the
1220 * destruction of another channel (send channel), we cannot call
1221 * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1224 * @param channel_ctx channel to be destroyed.
1227 schedule_channel_destruction(struct ChannelCtx *channel_ctx)
1229 GNUNET_assert(NULL ==
1230 channel_ctx->destruction_task);
1231 GNUNET_assert(NULL !=
1232 channel_ctx->channel);
1233 channel_ctx->destruction_task =
1234 GNUNET_SCHEDULER_add_now(&destroy_channel_cb,
1240 * @brief Remove peer
1242 * - Empties the list with pending operations
1243 * - Empties the list with pending messages
1244 * - Cancels potentially existing online check
1245 * - Schedules closing of send and recv channels
1246 * - Removes peer from peer map
1248 * @param peer_ctx Context of the peer to be destroyed
1249 * @return #GNUNET_YES if peer was removed
1250 * #GNUNET_NO otherwise
1253 destroy_peer(struct PeerContext *peer_ctx)
1255 GNUNET_assert(NULL != peer_ctx);
1256 GNUNET_assert(NULL != peer_ctx->sub->peer_map);
1258 GNUNET_CONTAINER_multipeermap_contains(peer_ctx->sub->peer_map,
1259 &peer_ctx->peer_id))
1263 SET_PEER_FLAG(peer_ctx, Peers_TO_DESTROY);
1264 LOG(GNUNET_ERROR_TYPE_DEBUG,
1265 "Going to remove peer %s\n",
1266 GNUNET_i2s(&peer_ctx->peer_id));
1267 UNSET_PEER_FLAG(peer_ctx, Peers_ONLINE);
1269 /* Clear list of pending operations */
1270 // TODO this probably leaks memory
1271 // ('only' the cls to the function. Not sure what to do with it)
1272 GNUNET_array_grow(peer_ctx->pending_ops,
1273 peer_ctx->num_pending_ops,
1275 /* Remove all pending messages */
1276 while (NULL != peer_ctx->pending_messages_head)
1278 LOG(GNUNET_ERROR_TYPE_DEBUG,
1279 "Removing unsent %s\n",
1280 peer_ctx->pending_messages_head->type);
1281 /* Cancle pending message, too */
1282 if ((NULL != peer_ctx->online_check_pending) &&
1283 (0 == memcmp(peer_ctx->pending_messages_head,
1284 peer_ctx->online_check_pending,
1285 sizeof(struct PendingMessage))))
1287 peer_ctx->online_check_pending = NULL;
1288 if (peer_ctx->sub == msub)
1290 GNUNET_STATISTICS_update(stats,
1291 "# pending online checks",
1296 remove_pending_message(peer_ctx->pending_messages_head,
1300 /* If we are still waiting for notification whether this peer is online
1301 * cancel the according task */
1302 if (NULL != peer_ctx->online_check_pending)
1304 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1305 "Removing pending online check for peer %s\n",
1306 GNUNET_i2s(&peer_ctx->peer_id));
1307 // TODO wait until cadet sets mq->cancel_impl
1308 //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1309 remove_pending_message(peer_ctx->online_check_pending,
1311 peer_ctx->online_check_pending = NULL;
1314 if (NULL != peer_ctx->send_channel_ctx)
1316 /* This is possibly called from within channel destruction */
1317 peer_ctx->send_channel_ctx->peer_ctx = NULL;
1318 schedule_channel_destruction(peer_ctx->send_channel_ctx);
1319 peer_ctx->send_channel_ctx = NULL;
1320 peer_ctx->mq = NULL;
1322 if (NULL != peer_ctx->recv_channel_ctx)
1324 /* This is possibly called from within channel destruction */
1325 peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1326 schedule_channel_destruction(peer_ctx->recv_channel_ctx);
1327 peer_ctx->recv_channel_ctx = NULL;
1331 GNUNET_CONTAINER_multipeermap_remove_all(peer_ctx->sub->peer_map,
1332 &peer_ctx->peer_id))
1334 LOG(GNUNET_ERROR_TYPE_WARNING,
1335 "removing peer from peer_ctx->sub->peer_map failed\n");
1337 if (peer_ctx->sub == msub)
1339 GNUNET_STATISTICS_set(stats,
1341 GNUNET_CONTAINER_multipeermap_size(peer_ctx->sub->peer_map),
1344 GNUNET_free(peer_ctx);
1350 * Iterator over hash map entries. Deletes all contexts of peers.
1352 * @param cls closure
1353 * @param key current public key
1354 * @param value value in the hash map
1355 * @return #GNUNET_YES if we should continue to iterate,
1356 * #GNUNET_NO if not.
1359 peermap_clear_iterator(void *cls,
1360 const struct GNUNET_PeerIdentity *key,
1363 struct Sub *sub = cls;
1367 destroy_peer(get_peer_ctx(sub->peer_map, key));
1373 * @brief This is called once a message is sent.
1375 * Removes the pending message
1377 * @param cls type of the message that was sent
1380 mq_notify_sent_cb(void *cls)
1382 struct PendingMessage *pending_msg = (struct PendingMessage *)cls;
1384 LOG(GNUNET_ERROR_TYPE_DEBUG,
1387 if (pending_msg->peer_ctx->sub == msub)
1389 if (0 == strncmp("PULL REPLY", pending_msg->type, 10))
1390 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1391 if (0 == strncmp("PULL REQUEST", pending_msg->type, 12))
1392 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1393 if (0 == strncmp("PUSH", pending_msg->type, 4))
1394 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1395 if (0 == strncmp("PULL REQUEST", pending_msg->type, 12) &&
1396 NULL != map_single_hop &&
1397 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
1398 &pending_msg->peer_ctx->peer_id))
1399 GNUNET_STATISTICS_update(stats,
1400 "# pull requests sent (multi-hop peer)",
1404 /* Do not cancle message */
1405 remove_pending_message(pending_msg, GNUNET_NO);
1410 * @brief Iterator function for #store_valid_peers.
1412 * Implements #GNUNET_CONTAINER_PeerMapIterator.
1413 * Writes single peer to disk.
1415 * @param cls the file handle to write to.
1416 * @param peer current peer
1417 * @param value unused
1419 * @return #GNUNET_YES if we should continue to
1421 * #GNUNET_NO if not.
1424 store_peer_presistently_iterator(void *cls,
1425 const struct GNUNET_PeerIdentity *peer,
1428 const struct GNUNET_DISK_FileHandle *fh = cls;
1429 char peer_string[128];
1439 size = GNUNET_snprintf(peer_string,
1440 sizeof(peer_string),
1442 GNUNET_i2s_full(peer));
1443 GNUNET_assert(53 == size);
1444 ret = GNUNET_DISK_file_write(fh,
1447 GNUNET_assert(size == ret);
1453 * @brief Store the peers currently in #valid_peers to disk.
1455 * @param sub Sub for which to store the valid peers
1458 store_valid_peers(const struct Sub *sub)
1460 struct GNUNET_DISK_FileHandle *fh;
1461 uint32_t number_written_peers;
1464 if (0 == strncmp("DISABLE", sub->filename_valid_peers, 7))
1469 ret = GNUNET_DISK_directory_create_for_file(sub->filename_valid_peers);
1470 if (GNUNET_SYSERR == ret)
1472 LOG(GNUNET_ERROR_TYPE_WARNING,
1473 "Not able to create directory for file `%s'\n",
1474 sub->filename_valid_peers);
1477 else if (GNUNET_NO == ret)
1479 LOG(GNUNET_ERROR_TYPE_WARNING,
1480 "Directory for file `%s' exists but is not writable for us\n",
1481 sub->filename_valid_peers);
1484 fh = GNUNET_DISK_file_open(sub->filename_valid_peers,
1485 GNUNET_DISK_OPEN_WRITE |
1486 GNUNET_DISK_OPEN_CREATE,
1487 GNUNET_DISK_PERM_USER_READ |
1488 GNUNET_DISK_PERM_USER_WRITE);
1491 LOG(GNUNET_ERROR_TYPE_WARNING,
1492 "Not able to write valid peers to file `%s'\n",
1493 sub->filename_valid_peers);
1496 LOG(GNUNET_ERROR_TYPE_DEBUG,
1497 "Writing %u valid peers to disk\n",
1498 GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1499 number_written_peers =
1500 GNUNET_CONTAINER_multipeermap_iterate(sub->valid_peers,
1501 store_peer_presistently_iterator,
1503 GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_close(fh));
1504 GNUNET_assert(number_written_peers ==
1505 GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1510 * @brief Convert string representation of peer id to peer id.
1512 * Counterpart to #GNUNET_i2s_full.
1514 * @param string_repr The string representation of the peer id
1516 * @return The peer id
1518 static const struct GNUNET_PeerIdentity *
1519 s2i_full(const char *string_repr)
1521 struct GNUNET_PeerIdentity *peer;
1525 peer = GNUNET_new(struct GNUNET_PeerIdentity);
1526 len = strlen(string_repr);
1529 LOG(GNUNET_ERROR_TYPE_WARNING,
1530 "Not able to convert string representation of PeerID to PeerID\n"
1531 "Sting representation: %s (len %lu) - too short\n",
1540 ret = GNUNET_CRYPTO_eddsa_public_key_from_string(string_repr,
1543 if (GNUNET_OK != ret)
1545 LOG(GNUNET_ERROR_TYPE_WARNING,
1546 "Not able to convert string representation of PeerID to PeerID\n"
1547 "Sting representation: %s\n",
1556 * @brief Restore the peers on disk to #valid_peers.
1558 * @param sub Sub for which to restore the valid peers
1561 restore_valid_peers(const struct Sub *sub)
1565 struct GNUNET_DISK_FileHandle *fh;
1570 const struct GNUNET_PeerIdentity *peer;
1572 if (0 == strncmp("DISABLE", sub->filename_valid_peers, 7))
1577 if (GNUNET_OK != GNUNET_DISK_file_test(sub->filename_valid_peers))
1581 fh = GNUNET_DISK_file_open(sub->filename_valid_peers,
1582 GNUNET_DISK_OPEN_READ,
1583 GNUNET_DISK_PERM_NONE);
1584 GNUNET_assert(NULL != fh);
1585 GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_handle_size(fh, &file_size));
1586 num_peers = file_size / 53;
1587 buf = GNUNET_malloc(file_size);
1588 size_read = GNUNET_DISK_file_read(fh, buf, file_size);
1589 GNUNET_assert(size_read == file_size);
1590 LOG(GNUNET_ERROR_TYPE_DEBUG,
1591 "Restoring %" PRIu32 " peers from file `%s'\n",
1593 sub->filename_valid_peers);
1594 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1596 str_repr = GNUNET_strndup(iter_buf, 53);
1597 peer = s2i_full(str_repr);
1598 GNUNET_free(str_repr);
1599 add_valid_peer(peer, sub->valid_peers);
1600 LOG(GNUNET_ERROR_TYPE_DEBUG,
1601 "Restored valid peer %s from disk\n",
1602 GNUNET_i2s_full(peer));
1606 LOG(GNUNET_ERROR_TYPE_DEBUG,
1607 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1609 GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1610 if (num_peers != GNUNET_CONTAINER_multipeermap_size(sub->valid_peers))
1612 LOG(GNUNET_ERROR_TYPE_WARNING,
1613 "Number of restored peers does not match file size. Have probably duplicates.\n");
1615 GNUNET_assert(GNUNET_OK == GNUNET_DISK_file_close(fh));
1616 LOG(GNUNET_ERROR_TYPE_DEBUG,
1617 "Restored %u valid peers from disk\n",
1618 GNUNET_CONTAINER_multipeermap_size(sub->valid_peers));
1623 * @brief Delete storage of peers that was created with #initialise_peers ()
1625 * @param sub Sub for which the storage is deleted
1628 peers_terminate(struct Sub *sub)
1630 if (GNUNET_SYSERR ==
1631 GNUNET_CONTAINER_multipeermap_iterate(sub->peer_map,
1632 &peermap_clear_iterator,
1635 LOG(GNUNET_ERROR_TYPE_WARNING,
1636 "Iteration destroying peers was aborted.\n");
1638 GNUNET_CONTAINER_multipeermap_destroy(sub->peer_map);
1639 sub->peer_map = NULL;
1640 store_valid_peers(sub);
1641 GNUNET_free(sub->filename_valid_peers);
1642 sub->filename_valid_peers = NULL;
1643 GNUNET_CONTAINER_multipeermap_destroy(sub->valid_peers);
1644 sub->valid_peers = NULL;
1649 * Iterator over #valid_peers hash map entries.
1651 * @param cls Closure that contains iterator function and closure
1652 * @param peer current peer id
1653 * @param value value in the hash map - unused
1654 * @return #GNUNET_YES if we should continue to
1656 * #GNUNET_NO if not.
1659 valid_peer_iterator(void *cls,
1660 const struct GNUNET_PeerIdentity *peer,
1663 struct PeersIteratorCls *it_cls = cls;
1667 return it_cls->iterator(it_cls->cls, peer);
1672 * @brief Get all currently known, valid peer ids.
1674 * @param valid_peers Peer map containing the valid peers in question
1675 * @param iterator function to call on each peer id
1676 * @param it_cls extra argument to @a iterator
1677 * @return the number of key value pairs processed,
1678 * #GNUNET_SYSERR if it aborted iteration
1681 get_valid_peers(struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1682 PeersIterator iterator,
1685 struct PeersIteratorCls *cls;
1688 cls = GNUNET_new(struct PeersIteratorCls);
1689 cls->iterator = iterator;
1691 ret = GNUNET_CONTAINER_multipeermap_iterate(valid_peers,
1692 valid_peer_iterator,
1700 * @brief Add peer to known peers.
1702 * This function is called on new peer_ids from 'external' sources
1703 * (client seed, cadet get_peers(), ...)
1705 * @param sub Sub with the peer map that the @a peer will be added to
1706 * @param peer the new #GNUNET_PeerIdentity
1708 * @return #GNUNET_YES if peer was inserted
1709 * #GNUNET_NO otherwise
1712 insert_peer(struct Sub *sub,
1713 const struct GNUNET_PeerIdentity *peer)
1715 if (GNUNET_YES == check_peer_known(sub->peer_map, peer))
1717 return GNUNET_NO; /* We already know this peer - nothing to do */
1719 (void)create_peer_ctx(sub, peer);
1725 * @brief Check whether flags on a peer are set.
1727 * @param peer_map Peer map that is expected to contain the @a peer
1728 * @param peer the peer to check the flag of
1729 * @param flags the flags to check
1731 * @return #GNUNET_SYSERR if peer is not known
1732 * #GNUNET_YES if all given flags are set
1733 * #GNUNET_NO otherwise
1736 check_peer_flag(const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1737 const struct GNUNET_PeerIdentity *peer,
1738 enum Peers_PeerFlags flags)
1740 struct PeerContext *peer_ctx;
1742 if (GNUNET_NO == check_peer_known(peer_map, peer))
1744 return GNUNET_SYSERR;
1746 peer_ctx = get_peer_ctx(peer_map, peer);
1747 return check_peer_flag_set(peer_ctx, flags);
1751 * @brief Try connecting to a peer to see whether it is online
1753 * If not known yet, insert into known peers
1755 * @param sub Sub which would contain the @a peer
1756 * @param peer the peer whose online is to be checked
1757 * @return #GNUNET_YES if the check was issued
1758 * #GNUNET_NO otherwise
1761 issue_peer_online_check(struct Sub *sub,
1762 const struct GNUNET_PeerIdentity *peer)
1764 struct PeerContext *peer_ctx;
1766 (void)insert_peer(sub, peer); // TODO even needed?
1767 peer_ctx = get_peer_ctx(sub->peer_map, peer);
1768 if ((GNUNET_NO == check_peer_flag(sub->peer_map, peer, Peers_ONLINE)) &&
1769 (NULL == peer_ctx->online_check_pending))
1771 check_peer_online(peer_ctx);
1779 * @brief Check if peer is removable.
1782 * - a recv channel exists
1783 * - there are pending messages
1784 * - there is no pending pull reply
1786 * @param peer_ctx Context of the peer in question
1787 * @return #GNUNET_YES if peer is removable
1788 * #GNUNET_NO if peer is NOT removable
1789 * #GNUNET_SYSERR if peer is not known
1792 check_removable(const struct PeerContext *peer_ctx)
1794 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(peer_ctx->sub->peer_map,
1795 &peer_ctx->peer_id))
1797 return GNUNET_SYSERR;
1800 if ((NULL != peer_ctx->recv_channel_ctx) ||
1801 (NULL != peer_ctx->pending_messages_head) ||
1802 (GNUNET_YES == check_peer_flag_set(peer_ctx, Peers_PULL_REPLY_PENDING)))
1811 * @brief Check whether @a peer is actually a peer.
1813 * A valid peer is a peer that we know exists eg. we were connected to once.
1815 * @param valid_peers Peer map that would contain the @a peer
1816 * @param peer peer in question
1818 * @return #GNUNET_YES if peer is valid
1819 * #GNUNET_NO if peer is not valid
1822 check_peer_valid(const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1823 const struct GNUNET_PeerIdentity *peer)
1825 return GNUNET_CONTAINER_multipeermap_contains(valid_peers, peer);
1830 * @brief Indicate that we want to send to the other peer
1832 * This establishes a sending channel
1834 * @param peer_ctx Context of the target peer
1837 indicate_sending_intention(struct PeerContext *peer_ctx)
1839 GNUNET_assert(GNUNET_YES == check_peer_known(peer_ctx->sub->peer_map,
1840 &peer_ctx->peer_id));
1841 (void)get_channel(peer_ctx);
1846 * @brief Check whether other peer has the intention to send/opened channel
1849 * @param peer_ctx Context of the peer in question
1851 * @return #GNUNET_YES if peer has the intention to send
1852 * #GNUNET_NO otherwise
1855 check_peer_send_intention(const struct PeerContext *peer_ctx)
1857 if (NULL != peer_ctx->recv_channel_ctx)
1866 * Handle the channel a peer opens to us.
1868 * @param cls The closure - Sub
1869 * @param channel The channel the peer wants to establish
1870 * @param initiator The peer's peer ID
1872 * @return initial channel context for the channel
1873 * (can be NULL -- that's not an error)
1876 handle_inbound_channel(void *cls,
1877 struct GNUNET_CADET_Channel *channel,
1878 const struct GNUNET_PeerIdentity *initiator)
1880 struct PeerContext *peer_ctx;
1881 struct ChannelCtx *channel_ctx;
1882 struct Sub *sub = cls;
1884 LOG(GNUNET_ERROR_TYPE_DEBUG,
1885 "New channel was established to us (Peer %s).\n",
1886 GNUNET_i2s(initiator));
1887 GNUNET_assert(NULL != channel); /* according to cadet API */
1888 /* Make sure we 'know' about this peer */
1889 peer_ctx = create_or_get_peer_ctx(sub, initiator);
1890 set_peer_online(peer_ctx);
1891 (void)add_valid_peer(&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1892 channel_ctx = add_channel_ctx(peer_ctx);
1893 channel_ctx->channel = channel;
1894 /* We only accept one incoming channel per peer */
1895 if (GNUNET_YES == check_peer_send_intention(get_peer_ctx(sub->peer_map,
1898 LOG(GNUNET_ERROR_TYPE_WARNING,
1899 "Already got one receive channel. Destroying old one.\n");
1901 destroy_channel(peer_ctx->recv_channel_ctx);
1902 peer_ctx->recv_channel_ctx = channel_ctx;
1903 /* return the channel context */
1906 peer_ctx->recv_channel_ctx = channel_ctx;
1912 * @brief Check whether a sending channel towards the given peer exists
1914 * @param peer_ctx Context of the peer in question
1916 * @return #GNUNET_YES if a sending channel towards that peer exists
1917 * #GNUNET_NO otherwise
1920 check_sending_channel_exists(const struct PeerContext *peer_ctx)
1922 if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
1923 &peer_ctx->peer_id))
1924 { /* If no such peer exists, there is no channel */
1927 if (NULL == peer_ctx->send_channel_ctx)
1936 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1937 * intention to another peer
1939 * @param peer_ctx Context to the peer
1940 * @return #GNUNET_YES if channel was destroyed
1941 * #GNUNET_NO otherwise
1944 destroy_sending_channel(struct PeerContext *peer_ctx)
1946 if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
1947 &peer_ctx->peer_id))
1951 if (NULL != peer_ctx->send_channel_ctx)
1953 destroy_channel(peer_ctx->send_channel_ctx);
1954 (void)check_connected(peer_ctx);
1961 * @brief Send a message to another peer.
1963 * Keeps track about pending messages so they can be properly removed when the
1964 * peer is destroyed.
1966 * @param peer_ctx Context of the peer to which the message is to be sent
1967 * @param ev envelope of the message
1968 * @param type type of the message
1971 send_message(struct PeerContext *peer_ctx,
1972 struct GNUNET_MQ_Envelope *ev,
1975 struct PendingMessage *pending_msg;
1976 struct GNUNET_MQ_Handle *mq;
1978 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1979 "Sending message to %s of type %s\n",
1980 GNUNET_i2s(&peer_ctx->peer_id),
1982 pending_msg = insert_pending_message(peer_ctx, ev, type);
1983 mq = get_mq(peer_ctx);
1984 GNUNET_MQ_notify_sent(ev,
1987 GNUNET_MQ_send(mq, ev);
1991 * @brief Schedule a operation on given peer
1993 * Avoids scheduling an operation twice.
1995 * @param peer_ctx Context of the peer for which to schedule the operation
1996 * @param peer_op the operation to schedule
1997 * @param cls Closure to @a peer_op
1999 * @return #GNUNET_YES if the operation was scheduled
2000 * #GNUNET_NO otherwise
2003 schedule_operation(struct PeerContext *peer_ctx,
2004 const PeerOp peer_op,
2007 struct PeerPendingOp pending_op;
2009 GNUNET_assert(GNUNET_YES == check_peer_known(peer_ctx->sub->peer_map,
2010 &peer_ctx->peer_id));
2012 //TODO if ONLINE execute immediately
2014 if (GNUNET_NO == check_operation_scheduled(peer_ctx, peer_op))
2016 pending_op.op = peer_op;
2017 pending_op.op_cls = cls;
2018 GNUNET_array_append(peer_ctx->pending_ops,
2019 peer_ctx->num_pending_ops,
2026 /***********************************************************************
2027 * /Old gnunet-service-rps_peers.c
2028 ***********************************************************************/
2031 /***********************************************************************
2032 * Housekeeping with clients
2033 ***********************************************************************/
2036 * Closure used to pass the client and the id to the callback
2037 * that replies to a client's request
2043 struct ReplyCls *next;
2044 struct ReplyCls *prev;
2047 * The identifier of the request
2052 * The handle to the request
2054 struct RPS_SamplerRequestHandle *req_handle;
2057 * The client handle to send the reply to
2059 struct ClientContext *cli_ctx;
2064 * Struct used to store the context of a connected client.
2066 struct ClientContext {
2070 struct ClientContext *next;
2071 struct ClientContext *prev;
2074 * The message queue to communicate with the client.
2076 struct GNUNET_MQ_Handle *mq;
2079 * @brief How many updates this client expects to receive.
2081 int64_t view_updates_left;
2084 * @brief Whether this client wants to receive stream updates.
2085 * Either #GNUNET_YES or #GNUNET_NO
2087 int8_t stream_update;
2090 * The client handle to send the reply to
2092 struct GNUNET_SERVICE_Client *client;
2095 * The #Sub this context belongs to
2101 * DLL with all clients currently connected to us
2103 struct ClientContext *cli_ctx_head;
2104 struct ClientContext *cli_ctx_tail;
2106 /***********************************************************************
2107 * /Housekeeping with clients
2108 ***********************************************************************/
2114 /***********************************************************************
2116 ***********************************************************************/
2120 * Print peerlist to log.
2123 print_peer_list(struct GNUNET_PeerIdentity *list,
2128 LOG(GNUNET_ERROR_TYPE_DEBUG,
2129 "Printing peer list of length %u at %p:\n",
2132 for (i = 0; i < len; i++)
2134 LOG(GNUNET_ERROR_TYPE_DEBUG,
2136 i, GNUNET_i2s(&list[i]));
2142 * Remove peer from list.
2145 rem_from_list(struct GNUNET_PeerIdentity **peer_list,
2146 unsigned int *list_size,
2147 const struct GNUNET_PeerIdentity *peer)
2150 struct GNUNET_PeerIdentity *tmp;
2154 LOG(GNUNET_ERROR_TYPE_DEBUG,
2155 "Removing peer %s from list at %p\n",
2159 for (i = 0; i < *list_size; i++)
2161 if (0 == GNUNET_memcmp(&tmp[i], peer))
2163 if (i < *list_size - 1)
2164 { /* Not at the last entry -- shift peers left */
2165 memmove(&tmp[i], &tmp[i + 1],
2166 ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
2168 /* Remove last entry (should be now useless PeerID) */
2169 GNUNET_array_grow(tmp, *list_size, (*list_size) - 1);
2177 * Insert PeerID in #view
2179 * Called once we know a peer is online.
2180 * Implements #PeerOp
2182 * @return GNUNET_OK if peer was actually inserted
2183 * GNUNET_NO if peer was not inserted
2186 insert_in_view_op(void *cls,
2187 const struct GNUNET_PeerIdentity *peer);
2190 * Insert PeerID in #view
2192 * Called once we know a peer is online.
2194 * @param sub Sub in with the view to insert in
2195 * @param peer the peer to insert
2197 * @return GNUNET_OK if peer was actually inserted
2198 * GNUNET_NO if peer was not inserted
2201 insert_in_view(struct Sub *sub,
2202 const struct GNUNET_PeerIdentity *peer)
2204 struct PeerContext *peer_ctx;
2208 online = check_peer_flag(sub->peer_map, peer, Peers_ONLINE);
2209 peer_ctx = get_peer_ctx(sub->peer_map, peer); // TODO indirection needed?
2210 if ((GNUNET_NO == online) ||
2211 (GNUNET_SYSERR == online)) /* peer is not even known */
2213 (void)issue_peer_online_check(sub, peer);
2214 (void)schedule_operation(peer_ctx, insert_in_view_op, sub);
2217 /* Open channel towards peer to keep connection open */
2218 indicate_sending_intention(peer_ctx);
2219 ret = View_put(sub->view, peer);
2220 if (peer_ctx->sub == msub)
2222 GNUNET_STATISTICS_set(stats,
2224 View_size(peer_ctx->sub->view),
2232 * @brief Send view to client
2234 * @param cli_ctx the context of the client
2235 * @param view_array the peerids of the view as array (can be empty)
2236 * @param view_size the size of the view array (can be 0)
2239 send_view(const struct ClientContext *cli_ctx,
2240 const struct GNUNET_PeerIdentity *view_array,
2243 struct GNUNET_MQ_Envelope *ev;
2244 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2247 if (NULL == view_array)
2249 if (NULL == cli_ctx->sub)
2253 view_size = View_size(sub->view);
2254 view_array = View_get_as_array(sub->view);
2257 ev = GNUNET_MQ_msg_extra(out_msg,
2258 view_size * sizeof(struct GNUNET_PeerIdentity),
2259 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2260 out_msg->num_peers = htonl(view_size);
2262 GNUNET_memcpy(&out_msg[1],
2264 view_size * sizeof(struct GNUNET_PeerIdentity));
2265 GNUNET_MQ_send(cli_ctx->mq, ev);
2270 * @brief Send peer from biased stream to client.
2272 * TODO merge with send_view, parameterise
2274 * @param cli_ctx the context of the client
2275 * @param view_array the peerids of the view as array (can be empty)
2276 * @param view_size the size of the view array (can be 0)
2279 send_stream_peers(const struct ClientContext *cli_ctx,
2281 const struct GNUNET_PeerIdentity *peers)
2283 struct GNUNET_MQ_Envelope *ev;
2284 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2286 GNUNET_assert(NULL != peers);
2288 ev = GNUNET_MQ_msg_extra(out_msg,
2289 num_peers * sizeof(struct GNUNET_PeerIdentity),
2290 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2291 out_msg->num_peers = htonl(num_peers);
2293 GNUNET_memcpy(&out_msg[1],
2295 num_peers * sizeof(struct GNUNET_PeerIdentity));
2296 GNUNET_MQ_send(cli_ctx->mq, ev);
2301 * @brief sends updates to clients that are interested
2303 * @param sub Sub for which to notify clients
2306 clients_notify_view_update(const struct Sub *sub)
2308 struct ClientContext *cli_ctx_iter;
2310 const struct GNUNET_PeerIdentity *view_array;
2312 num_peers = View_size(sub->view);
2313 view_array = View_get_as_array(sub->view);
2314 /* check size of view is small enough */
2315 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2317 GNUNET_log(GNUNET_ERROR_TYPE_ERROR,
2318 "View is too big to send\n");
2322 for (cli_ctx_iter = cli_ctx_head;
2323 NULL != cli_ctx_iter;
2324 cli_ctx_iter = cli_ctx_iter->next)
2326 if (1 < cli_ctx_iter->view_updates_left)
2328 /* Client wants to receive limited amount of updates */
2329 cli_ctx_iter->view_updates_left -= 1;
2331 else if (1 == cli_ctx_iter->view_updates_left)
2333 /* Last update of view for client */
2334 cli_ctx_iter->view_updates_left = -1;
2336 else if (0 > cli_ctx_iter->view_updates_left)
2338 /* Client is not interested in updates */
2341 /* else _updates_left == 0 - infinite amount of updates */
2344 send_view(cli_ctx_iter, view_array, num_peers);
2350 * @brief sends updates to clients that are interested
2352 * @param num_peers Number of peers to send
2353 * @param peers the array of peers to send
2356 clients_notify_stream_peer(const struct Sub *sub,
2358 const struct GNUNET_PeerIdentity *peers)
2359 // TODO enum StreamPeerSource)
2361 struct ClientContext *cli_ctx_iter;
2363 LOG(GNUNET_ERROR_TYPE_DEBUG,
2364 "Got peer (%s) from biased stream - update all clients\n",
2367 for (cli_ctx_iter = cli_ctx_head;
2368 NULL != cli_ctx_iter;
2369 cli_ctx_iter = cli_ctx_iter->next)
2371 if (GNUNET_YES == cli_ctx_iter->stream_update &&
2372 (sub == cli_ctx_iter->sub || sub == msub))
2374 send_stream_peers(cli_ctx_iter, num_peers, peers);
2381 * Put random peer from sampler into the view as history update.
2383 * @param ids Array of Peers to insert into view
2384 * @param num_peers Number of peers to insert
2385 * @param cls Closure - The Sub for which this is to be done
2388 hist_update(const struct GNUNET_PeerIdentity *ids,
2393 struct Sub *sub = cls;
2395 for (i = 0; i < num_peers; i++)
2398 if (GNUNET_YES != check_peer_known(sub->peer_map, &ids[i]))
2400 LOG(GNUNET_ERROR_TYPE_WARNING,
2401 "Peer in history update not known!\n");
2404 inserted = insert_in_view(sub, &ids[i]);
2405 if (GNUNET_OK == inserted)
2407 clients_notify_stream_peer(sub, 1, &ids[i]);
2410 to_file(sub->file_name_view_log,
2412 GNUNET_i2s_full(ids));
2413 #endif /* TO_FILE_FULL */
2415 clients_notify_view_update(sub);
2420 * Wrapper around #RPS_sampler_resize()
2422 * If we do not have enough sampler elements, double current sampler size
2423 * If we have more than enough sampler elements, halv current sampler size
2425 * @param sampler The sampler to resize
2426 * @param new_size New size to which to resize
2429 resize_wrapper(struct RPS_Sampler *sampler, uint32_t new_size)
2431 unsigned int sampler_size;
2434 // TODO respect the min, max
2435 sampler_size = RPS_sampler_get_size(sampler);
2436 if (sampler_size > new_size * 4)
2438 RPS_sampler_resize(sampler, sampler_size / 2);
2440 else if (sampler_size < new_size)
2442 RPS_sampler_resize(sampler, sampler_size * 2);
2444 LOG(GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2449 * Add all peers in @a peer_array to @a peer_map used as set.
2451 * @param peer_array array containing the peers
2452 * @param num_peers number of peers in @peer_array
2453 * @param peer_map the peermap to use as set
2456 add_peer_array_to_set(const struct GNUNET_PeerIdentity *peer_array,
2457 unsigned int num_peers,
2458 struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2462 if (NULL == peer_map)
2464 LOG(GNUNET_ERROR_TYPE_WARNING,
2465 "Trying to add peers to non-existing peermap.\n");
2469 for (i = 0; i < num_peers; i++)
2471 GNUNET_CONTAINER_multipeermap_put(peer_map,
2474 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2475 if (msub->peer_map == peer_map)
2477 GNUNET_STATISTICS_set(stats,
2479 GNUNET_CONTAINER_multipeermap_size(peer_map),
2487 * Send a PULL REPLY to @a peer_id
2489 * @param peer_ctx Context of the peer to send the reply to
2490 * @param peer_ids the peers to send to @a peer_id
2491 * @param num_peer_ids the number of peers to send to @a peer_id
2494 send_pull_reply(struct PeerContext *peer_ctx,
2495 const struct GNUNET_PeerIdentity *peer_ids,
2496 unsigned int num_peer_ids)
2499 struct GNUNET_MQ_Envelope *ev;
2500 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2502 /* Compute actual size */
2503 send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) +
2504 num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
2506 if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2507 /* Compute number of peers to send
2508 * If too long, simply truncate */
2509 // TODO select random ones via permutation
2510 // or even better: do good protocol design
2512 (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2513 sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
2514 sizeof(struct GNUNET_PeerIdentity);
2516 send_size = num_peer_ids;
2518 LOG(GNUNET_ERROR_TYPE_DEBUG,
2519 "Going to send PULL REPLY with %u peers to %s\n",
2520 send_size, GNUNET_i2s(&peer_ctx->peer_id));
2522 ev = GNUNET_MQ_msg_extra(out_msg,
2523 send_size * sizeof(struct GNUNET_PeerIdentity),
2524 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2525 out_msg->num_peers = htonl(send_size);
2526 GNUNET_memcpy(&out_msg[1], peer_ids,
2527 send_size * sizeof(struct GNUNET_PeerIdentity));
2529 send_message(peer_ctx, ev, "PULL REPLY");
2530 if (peer_ctx->sub == msub)
2532 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2534 // TODO check with send intention: as send_channel is used/opened we indicate
2535 // a sending intention without intending it.
2536 // -> clean peer afterwards?
2537 // -> use recv_channel?
2542 * Insert PeerID in #pull_map
2544 * Called once we know a peer is online.
2546 * @param cls Closure - Sub with the pull map to insert into
2547 * @param peer Peer to insert
2550 insert_in_pull_map(void *cls,
2551 const struct GNUNET_PeerIdentity *peer)
2553 struct Sub *sub = cls;
2555 CustomPeerMap_put(sub->pull_map, peer);
2560 * Insert PeerID in #view
2562 * Called once we know a peer is online.
2563 * Implements #PeerOp
2565 * @param cls Closure - Sub with view to insert peer into
2566 * @param peer the peer to insert
2569 insert_in_view_op(void *cls,
2570 const struct GNUNET_PeerIdentity *peer)
2572 struct Sub *sub = cls;
2575 inserted = insert_in_view(sub, peer);
2576 if (GNUNET_OK == inserted)
2578 clients_notify_stream_peer(sub, 1, peer);
2584 * Update sampler with given PeerID.
2585 * Implements #PeerOp
2587 * @param cls Closure - Sub containing the sampler to insert into
2588 * @param peer Peer to insert
2591 insert_in_sampler(void *cls,
2592 const struct GNUNET_PeerIdentity *peer)
2594 struct Sub *sub = cls;
2596 LOG(GNUNET_ERROR_TYPE_DEBUG,
2597 "Updating samplers with peer %s from insert_in_sampler()\n",
2599 RPS_sampler_update(sub->sampler, peer);
2600 if (0 < RPS_sampler_count_id(sub->sampler, peer))
2602 /* Make sure we 'know' about this peer */
2603 (void)issue_peer_online_check(sub, peer);
2604 /* Establish a channel towards that peer to indicate we are going to send
2606 //indicate_sending_intention (peer);
2610 GNUNET_STATISTICS_update(stats,
2611 "# observed peers in gossip",
2616 sub->num_observed_peers++;
2617 GNUNET_CONTAINER_multipeermap_put
2618 (sub->observed_unique_peers,
2621 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2622 uint32_t num_observed_unique_peers =
2623 GNUNET_CONTAINER_multipeermap_size(sub->observed_unique_peers);
2624 GNUNET_STATISTICS_set(stats,
2625 "# unique peers in gossip",
2626 num_observed_unique_peers,
2629 to_file(sub->file_name_observed_log,
2630 "%" PRIu32 " %" PRIu32 " %f\n",
2631 sub->num_observed_peers,
2632 num_observed_unique_peers,
2633 1.0 * num_observed_unique_peers / sub->num_observed_peers)
2634 #endif /* TO_FILE_FULL */
2635 #endif /* TO_FILE */
2640 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2641 * If the peer is not known, online check is issued and it is
2642 * scheduled to be inserted in sampler and view.
2644 * "External sources" refer to every source except the gossip.
2646 * @param sub Sub for which @a peer was received
2647 * @param peer peer to insert/peer received
2650 got_peer(struct Sub *sub,
2651 const struct GNUNET_PeerIdentity *peer)
2653 /* If we did not know this peer already, insert it into sampler and view */
2654 if (GNUNET_YES == issue_peer_online_check(sub, peer))
2656 schedule_operation(get_peer_ctx(sub->peer_map, peer),
2657 &insert_in_sampler, sub);
2658 schedule_operation(get_peer_ctx(sub->peer_map, peer),
2659 &insert_in_view_op, sub);
2663 GNUNET_STATISTICS_update(stats,
2672 * @brief Checks if there is a sending channel and if it is needed
2674 * @param peer_ctx Context of the peer to check
2675 * @return GNUNET_YES if sending channel exists and is still needed
2676 * GNUNET_NO otherwise
2679 check_sending_channel_needed(const struct PeerContext *peer_ctx)
2681 /* struct GNUNET_CADET_Channel *channel; */
2682 if (GNUNET_NO == check_peer_known(peer_ctx->sub->peer_map,
2683 &peer_ctx->peer_id))
2687 if (GNUNET_YES == check_sending_channel_exists(peer_ctx))
2689 if ((0 < RPS_sampler_count_id(peer_ctx->sub->sampler,
2690 &peer_ctx->peer_id)) ||
2691 (GNUNET_YES == View_contains_peer(peer_ctx->sub->view,
2692 &peer_ctx->peer_id)) ||
2693 (GNUNET_YES == CustomPeerMap_contains_peer(peer_ctx->sub->push_map,
2694 &peer_ctx->peer_id)) ||
2695 (GNUNET_YES == CustomPeerMap_contains_peer(peer_ctx->sub->pull_map,
2696 &peer_ctx->peer_id)) ||
2697 (GNUNET_YES == check_peer_flag(peer_ctx->sub->peer_map,
2699 Peers_PULL_REPLY_PENDING)))
2700 { /* If we want to keep the connection to peer open */
2710 * @brief remove peer from our knowledge, the view, push and pull maps and
2713 * @param sub Sub with the data structures the peer is to be removed from
2714 * @param peer the peer to remove
2717 remove_peer(struct Sub *sub,
2718 const struct GNUNET_PeerIdentity *peer)
2720 (void)View_remove_peer(sub->view,
2722 CustomPeerMap_remove_peer(sub->pull_map,
2724 CustomPeerMap_remove_peer(sub->push_map,
2726 RPS_sampler_reinitialise_by_value(sub->sampler,
2728 /* We want to destroy the peer now.
2729 * Sometimes, it just seems that it's already been removed from the peer_map,
2730 * so check the peer_map first. */
2731 if (GNUNET_YES == check_peer_known(sub->peer_map,
2734 destroy_peer(get_peer_ctx(sub->peer_map,
2741 * @brief Remove data that is not needed anymore.
2743 * If the sending channel is no longer needed it is destroyed.
2745 * @param sub Sub in which the current peer is to be cleaned
2746 * @param peer the peer whose data is about to be cleaned
2749 clean_peer(struct Sub *sub,
2750 const struct GNUNET_PeerIdentity *peer)
2752 if (GNUNET_NO == check_sending_channel_needed(get_peer_ctx(sub->peer_map,
2755 LOG(GNUNET_ERROR_TYPE_DEBUG,
2756 "Going to remove send channel to peer %s\n",
2758 #if ENABLE_MALICIOUS
2759 if (0 != GNUNET_memcmp(&attacked_peer,
2761 (void)destroy_sending_channel(get_peer_ctx(sub->peer_map,
2763 #else /* ENABLE_MALICIOUS */
2764 (void)destroy_sending_channel(get_peer_ctx(sub->peer_map,
2766 #endif /* ENABLE_MALICIOUS */
2769 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(sub->peer_map,
2772 /* Peer was already removed by callback on destroyed channel */
2773 LOG(GNUNET_ERROR_TYPE_WARNING,
2774 "Peer was removed from our knowledge during cleanup\n");
2778 if ((GNUNET_NO == check_peer_send_intention(get_peer_ctx(sub->peer_map,
2780 (GNUNET_NO == View_contains_peer(sub->view, peer)) &&
2781 (GNUNET_NO == CustomPeerMap_contains_peer(sub->push_map, peer)) &&
2782 (GNUNET_NO == CustomPeerMap_contains_peer(sub->push_map, peer)) &&
2783 (0 == RPS_sampler_count_id(sub->sampler, peer)) &&
2784 (GNUNET_YES == check_removable(get_peer_ctx(sub->peer_map, peer))))
2785 { /* We can safely remove this peer */
2786 LOG(GNUNET_ERROR_TYPE_DEBUG,
2787 "Going to remove peer %s\n",
2789 remove_peer(sub, peer);
2796 * @brief This is called when a channel is destroyed.
2798 * Removes peer completely from our knowledge if the send_channel was destroyed
2799 * Otherwise simply delete the recv_channel
2800 * Also check if the knowledge about this peer is still needed.
2801 * If not, remove this peer from our knowledge.
2803 * @param cls The closure - Context to the channel
2804 * @param channel The channel being closed
2807 cleanup_destroyed_channel(void *cls,
2808 const struct GNUNET_CADET_Channel *channel)
2810 struct ChannelCtx *channel_ctx = cls;
2811 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2815 channel_ctx->channel = NULL;
2816 remove_channel_ctx(channel_ctx);
2817 if (NULL != peer_ctx &&
2818 peer_ctx->send_channel_ctx == channel_ctx &&
2819 GNUNET_YES == check_sending_channel_needed(channel_ctx->peer_ctx))
2821 remove_peer(peer_ctx->sub, &peer_ctx->peer_id);
2825 /***********************************************************************
2827 ***********************************************************************/
2831 /***********************************************************************
2833 ***********************************************************************/
2836 * @brief Create a new Sub
2838 * @param hash Hash of value shared among rps instances on other hosts that
2839 * defines a subgroup to sample from.
2840 * @param sampler_size Size of the sampler
2841 * @param round_interval Interval (in average) between two rounds
2846 new_sub(const struct GNUNET_HashCode *hash,
2847 uint32_t sampler_size,
2848 struct GNUNET_TIME_Relative round_interval)
2852 sub = GNUNET_new(struct Sub);
2854 /* With the hash generated from the secret value this service only connects
2855 * to rps instances that share the value */
2856 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2857 GNUNET_MQ_hd_fixed_size(peer_check,
2858 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2859 struct GNUNET_MessageHeader,
2861 GNUNET_MQ_hd_fixed_size(peer_push,
2862 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2863 struct GNUNET_MessageHeader,
2865 GNUNET_MQ_hd_fixed_size(peer_pull_request,
2866 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2867 struct GNUNET_MessageHeader,
2869 GNUNET_MQ_hd_var_size(peer_pull_reply,
2870 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2871 struct GNUNET_RPS_P2P_PullReplyMessage,
2873 GNUNET_MQ_handler_end()
2877 GNUNET_CADET_open_port(cadet_handle,
2879 &handle_inbound_channel, /* Connect handler */
2881 NULL, /* WindowSize handler */
2882 &cleanup_destroyed_channel, /* Disconnect handler */
2884 if (NULL == sub->cadet_port)
2886 LOG(GNUNET_ERROR_TYPE_ERROR,
2887 "Cadet port `%s' is already in use.\n",
2888 GNUNET_APPLICATION_PORT_RPS);
2892 /* Set up general data structure to keep track about peers */
2893 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
2895 GNUNET_CONFIGURATION_get_value_filename(cfg,
2897 "FILENAME_VALID_PEERS",
2898 &sub->filename_valid_peers))
2900 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
2902 "FILENAME_VALID_PEERS");
2904 if (0 != strncmp("DISABLE", sub->filename_valid_peers, 7))
2906 char *tmp_filename_valid_peers;
2909 GNUNET_snprintf(str_hash,
2911 GNUNET_h2s_full(hash));
2912 tmp_filename_valid_peers = sub->filename_valid_peers;
2913 GNUNET_asprintf(&sub->filename_valid_peers,
2915 tmp_filename_valid_peers,
2917 GNUNET_free(tmp_filename_valid_peers);
2919 sub->peer_map = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
2921 /* Set up the sampler */
2922 sub->sampler_size_est_min = sampler_size;
2923 sub->sampler_size_est_need = sampler_size;;
2924 LOG(GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2925 GNUNET_assert(0 != round_interval.rel_value_us);
2926 sub->round_interval = round_interval;
2927 sub->sampler = RPS_sampler_init(sampler_size,
2930 /* Logging of internals */
2932 sub->file_name_view_log = store_prefix_file_name(&own_identity, "view");
2933 #endif /* TO_FILE_FULL */
2936 sub->file_name_observed_log = store_prefix_file_name(&own_identity,
2938 #endif /* TO_FILE_FULL */
2939 sub->num_observed_peers = 0;
2940 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create(1,
2942 #endif /* TO_FILE */
2944 /* Set up data structures for gossip */
2945 sub->push_map = CustomPeerMap_create(4);
2946 sub->pull_map = CustomPeerMap_create(4);
2947 sub->view_size_est_min = sampler_size;;
2948 sub->view = View_create(sub->view_size_est_min);
2951 GNUNET_STATISTICS_set(stats,
2953 sub->view_size_est_min,
2957 /* Start executing rounds */
2958 sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_round, sub);
2966 * @brief Write all numbers in the given array into the given file
2968 * Single numbers devided by a newline
2970 * @param hist_array[] the array to dump
2971 * @param file_name file to dump into
2974 write_histogram_to_file(const uint32_t hist_array[],
2975 const char *file_name)
2977 char collect_str[SIZE_DUMP_FILE + 1] = "";
2978 char *recv_str_iter;
2979 char *file_name_full;
2981 recv_str_iter = collect_str;
2982 file_name_full = store_prefix_file_name(&own_identity,
2984 for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2986 char collect_str_tmp[8];
2988 GNUNET_snprintf(collect_str_tmp,
2989 sizeof(collect_str_tmp),
2992 recv_str_iter = stpncpy(recv_str_iter,
2996 (void)stpcpy(recv_str_iter,
2998 LOG(GNUNET_ERROR_TYPE_DEBUG,
2999 "Writing push stats to disk\n");
3000 to_file_w_len(file_name_full,
3003 GNUNET_free(file_name_full);
3005 #endif /* TO_FILE */
3009 * @brief Destroy Sub.
3011 * @param sub Sub to destroy
3014 destroy_sub(struct Sub *sub)
3016 GNUNET_assert(NULL != sub);
3017 GNUNET_assert(NULL != sub->do_round_task);
3018 GNUNET_SCHEDULER_cancel(sub->do_round_task);
3019 sub->do_round_task = NULL;
3021 /* Disconnect from cadet */
3022 GNUNET_CADET_close_port(sub->cadet_port);
3023 sub->cadet_port = NULL;
3025 /* Clean up data structures for peers */
3026 RPS_sampler_destroy(sub->sampler);
3027 sub->sampler = NULL;
3028 View_destroy(sub->view);
3030 CustomPeerMap_destroy(sub->push_map);
3031 sub->push_map = NULL;
3032 CustomPeerMap_destroy(sub->pull_map);
3033 sub->pull_map = NULL;
3034 peers_terminate(sub);
3036 /* Free leftover data structures */
3038 GNUNET_free(sub->file_name_view_log);
3039 sub->file_name_view_log = NULL;
3040 #endif /* TO_FILE_FULL */
3043 GNUNET_free(sub->file_name_observed_log);
3044 sub->file_name_observed_log = NULL;
3045 #endif /* TO_FILE_FULL */
3047 /* Write push frequencies to disk */
3048 write_histogram_to_file(sub->push_recv,
3051 /* Write push deltas to disk */
3052 write_histogram_to_file(sub->push_delta,
3055 /* Write pull delays to disk */
3056 write_histogram_to_file(sub->pull_delays,
3059 GNUNET_CONTAINER_multipeermap_destroy(sub->observed_unique_peers);
3060 sub->observed_unique_peers = NULL;
3061 #endif /* TO_FILE */
3067 /***********************************************************************
3069 ***********************************************************************/
3072 /***********************************************************************
3074 ***********************************************************************/
3077 * @brief Callback on initialisation of Core.
3079 * @param cls - unused
3080 * @param my_identity - unused
3083 core_init(void *cls,
3084 const struct GNUNET_PeerIdentity *my_identity)
3089 map_single_hop = GNUNET_CONTAINER_multipeermap_create(4, GNUNET_NO);
3094 * @brief Callback for core.
3095 * Method called whenever a given peer connects.
3097 * @param cls closure - unused
3098 * @param peer peer identity this notification is about
3099 * @return closure given to #core_disconnects as peer_cls
3102 core_connects(void *cls,
3103 const struct GNUNET_PeerIdentity *peer,
3104 struct GNUNET_MQ_Handle *mq)
3109 GNUNET_assert(GNUNET_YES ==
3110 GNUNET_CONTAINER_multipeermap_put(map_single_hop,
3113 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3119 * @brief Callback for core.
3120 * Method called whenever a peer disconnects.
3122 * @param cls closure - unused
3123 * @param peer peer identity this notification is about
3124 * @param peer_cls closure given in #core_connects - unused
3127 core_disconnects(void *cls,
3128 const struct GNUNET_PeerIdentity *peer,
3134 GNUNET_CONTAINER_multipeermap_remove_all(map_single_hop, peer);
3137 /***********************************************************************
3139 ***********************************************************************/
3143 * @brief Destroy the context for a (connected) client
3145 * @param cli_ctx Context to destroy
3148 destroy_cli_ctx(struct ClientContext *cli_ctx)
3150 GNUNET_assert(NULL != cli_ctx);
3151 GNUNET_CONTAINER_DLL_remove(cli_ctx_head,
3154 if (NULL != cli_ctx->sub)
3156 destroy_sub(cli_ctx->sub);
3157 cli_ctx->sub = NULL;
3159 GNUNET_free(cli_ctx);
3164 * @brief Update sizes in sampler and view on estimate update from nse service
3167 * @param logestimate the log(Base 2) value of the current network size estimate
3168 * @param std_dev standard deviation for the estimate
3171 adapt_sizes(struct Sub *sub, double logestimate, double std_dev)
3175 //double scale; // TODO this might go gloabal/config
3177 LOG(GNUNET_ERROR_TYPE_DEBUG,
3178 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3179 logestimate, std_dev, RPS_sampler_get_size(sub->sampler));
3181 estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
3182 // GNUNET_NSE_log_estimate_to_n (logestimate);
3183 estimate = pow(estimate, 1.0 / 3);
3184 // TODO add if std_dev is a number
3185 // estimate += (std_dev * scale);
3186 if (sub->view_size_est_min < ceil(estimate))
3188 LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3189 sub->sampler_size_est_need = estimate;
3190 sub->view_size_est_need = estimate;
3194 LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3195 //sub->sampler_size_est_need = sub->view_size_est_min;
3196 sub->view_size_est_need = sub->view_size_est_min;
3200 GNUNET_STATISTICS_set(stats,
3202 sub->view_size_est_need,
3206 /* If the NSE has changed adapt the lists accordingly */
3207 resize_wrapper(sub->sampler, sub->sampler_size_est_need);
3208 View_change_len(sub->view, sub->view_size_est_need);
3213 * Function called by NSE.
3215 * Updates sizes of sampler list and view and adapt those lists
3218 * implements #GNUNET_NSE_Callback
3220 * @param cls Closure - unused
3221 * @param timestamp time when the estimate was received from the server (or created by the server)
3222 * @param logestimate the log(Base 2) value of the current network size estimate
3223 * @param std_dev standard deviation for the estimate
3226 nse_callback(void *cls,
3227 struct GNUNET_TIME_Absolute timestamp,
3228 double logestimate, double std_dev)
3232 struct ClientContext *cli_ctx_iter;
3234 adapt_sizes(msub, logestimate, std_dev);
3235 for (cli_ctx_iter = cli_ctx_head;
3236 NULL != cli_ctx_iter;
3237 cli_ctx_iter = cli_ctx_iter->next)
3239 if (NULL != cli_ctx_iter->sub)
3241 adapt_sizes(cli_ctx_iter->sub, logestimate, std_dev);
3248 * @brief This function is called, when the client seeds peers.
3249 * It verifies that @a msg is well-formed.
3251 * @param cls the closure (#ClientContext)
3252 * @param msg the message
3253 * @return #GNUNET_OK if @a msg is well-formed
3254 * #GNUNET_SYSERR otherwise
3257 check_client_seed(void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3259 struct ClientContext *cli_ctx = cls;
3260 uint16_t msize = ntohs(msg->header.size);
3261 uint32_t num_peers = ntohl(msg->num_peers);
3263 msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
3264 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3265 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3267 LOG(GNUNET_ERROR_TYPE_ERROR,
3268 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3269 ntohl(msg->num_peers),
3270 (msize / sizeof(struct GNUNET_PeerIdentity)));
3272 GNUNET_SERVICE_client_drop(cli_ctx->client);
3273 return GNUNET_SYSERR;
3280 * Handle seed from the client.
3282 * @param cls closure
3283 * @param message the actual message
3286 handle_client_seed(void *cls,
3287 const struct GNUNET_RPS_CS_SeedMessage *msg)
3289 struct ClientContext *cli_ctx = cls;
3290 struct GNUNET_PeerIdentity *peers;
3294 num_peers = ntohl(msg->num_peers);
3295 peers = (struct GNUNET_PeerIdentity *)&msg[1];
3297 LOG(GNUNET_ERROR_TYPE_DEBUG,
3298 "Client seeded peers:\n");
3299 print_peer_list(peers, num_peers);
3301 for (i = 0; i < num_peers; i++)
3303 LOG(GNUNET_ERROR_TYPE_DEBUG,
3304 "Updating samplers with seed %" PRIu32 ": %s\n",
3306 GNUNET_i2s(&peers[i]));
3309 got_peer(msub, &peers[i]); /* Condition needed? */
3310 if (NULL != cli_ctx->sub)
3311 got_peer(cli_ctx->sub, &peers[i]);
3313 GNUNET_SERVICE_client_continue(cli_ctx->client);
3318 * Handle RPS request from the client.
3320 * @param cls Client context
3321 * @param message Message containing the numer of updates the client wants to
3325 handle_client_view_request(void *cls,
3326 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3328 struct ClientContext *cli_ctx = cls;
3329 uint64_t num_updates;
3331 num_updates = ntohl(msg->num_updates);
3333 LOG(GNUNET_ERROR_TYPE_DEBUG,
3334 "Client requested %" PRIu64 " updates of view.\n",
3337 GNUNET_assert(NULL != cli_ctx);
3338 cli_ctx->view_updates_left = num_updates;
3339 send_view(cli_ctx, NULL, 0);
3340 GNUNET_SERVICE_client_continue(cli_ctx->client);
3345 * @brief Handle the cancellation of the view updates.
3347 * @param cls The client context
3351 handle_client_view_cancel(void *cls,
3352 const struct GNUNET_MessageHeader *msg)
3354 struct ClientContext *cli_ctx = cls;
3358 LOG(GNUNET_ERROR_TYPE_DEBUG,
3359 "Client does not want to receive updates of view any more.\n");
3361 GNUNET_assert(NULL != cli_ctx);
3362 cli_ctx->view_updates_left = 0;
3363 GNUNET_SERVICE_client_continue(cli_ctx->client);
3364 if (GNUNET_YES == cli_ctx->stream_update)
3366 destroy_cli_ctx(cli_ctx);
3372 * Handle RPS request for biased stream from the client.
3374 * @param cls Client context
3375 * @param message unused
3378 handle_client_stream_request(void *cls,
3379 const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3381 struct ClientContext *cli_ctx = cls;
3385 LOG(GNUNET_ERROR_TYPE_DEBUG,
3386 "Client requested peers from biased stream.\n");
3387 cli_ctx->stream_update = GNUNET_YES;
3389 GNUNET_assert(NULL != cli_ctx);
3390 GNUNET_SERVICE_client_continue(cli_ctx->client);
3395 * @brief Handles the cancellation of the stream of biased peer ids
3397 * @param cls The client context
3401 handle_client_stream_cancel(void *cls,
3402 const struct GNUNET_MessageHeader *msg)
3404 struct ClientContext *cli_ctx = cls;
3408 LOG(GNUNET_ERROR_TYPE_DEBUG,
3409 "Client canceled receiving peers from biased stream.\n");
3410 cli_ctx->stream_update = GNUNET_NO;
3412 GNUNET_assert(NULL != cli_ctx);
3413 GNUNET_SERVICE_client_continue(cli_ctx->client);
3418 * @brief Create and start a Sub.
3420 * @param cls Closure - unused
3421 * @param msg Message containing the necessary information
3424 handle_client_start_sub(void *cls,
3425 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3427 struct ClientContext *cli_ctx = cls;
3429 LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3430 if (NULL != cli_ctx->sub &&
3431 0 != memcmp(&cli_ctx->sub->hash,
3433 sizeof(struct GNUNET_HashCode)))
3435 LOG(GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3436 destroy_sub(cli_ctx->sub);
3437 cli_ctx->sub = NULL;
3439 cli_ctx->sub = new_sub(&msg->hash,
3440 msub->sampler_size_est_min, // TODO make api input?
3441 GNUNET_TIME_relative_ntoh(msg->round_interval));
3442 GNUNET_SERVICE_client_continue(cli_ctx->client);
3447 * @brief Destroy the Sub
3449 * @param cls Closure - unused
3450 * @param msg Message containing the hash that identifies the Sub
3453 handle_client_stop_sub(void *cls,
3454 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3456 struct ClientContext *cli_ctx = cls;
3458 GNUNET_assert(NULL != cli_ctx->sub);
3459 if (0 != memcmp(&cli_ctx->sub->hash, &msg->hash, sizeof(struct GNUNET_HashCode)))
3461 LOG(GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3463 destroy_sub(cli_ctx->sub);
3464 cli_ctx->sub = NULL;
3465 GNUNET_SERVICE_client_continue(cli_ctx->client);
3470 * Handle a CHECK_LIVE message from another peer.
3472 * This does nothing. But without calling #GNUNET_CADET_receive_done()
3473 * the channel is blocked for all other communication.
3475 * @param cls Closure - Context of channel
3476 * @param msg Message - unused
3479 handle_peer_check(void *cls,
3480 const struct GNUNET_MessageHeader *msg)
3482 const struct ChannelCtx *channel_ctx = cls;
3483 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3487 LOG(GNUNET_ERROR_TYPE_DEBUG,
3488 "Received CHECK_LIVE (%s)\n", GNUNET_i2s(peer));
3489 if (channel_ctx->peer_ctx->sub == msub)
3491 GNUNET_STATISTICS_update(stats,
3492 "# pending online checks",
3497 GNUNET_CADET_receive_done(channel_ctx->channel);
3502 * Handle a PUSH message from another peer.
3504 * Check the proof of work and store the PeerID
3505 * in the temporary list for pushed PeerIDs.
3507 * @param cls Closure - Context of channel
3508 * @param msg Message - unused
3511 handle_peer_push(void *cls,
3512 const struct GNUNET_MessageHeader *msg)
3514 const struct ChannelCtx *channel_ctx = cls;
3515 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3519 // (check the proof of work (?))
3521 LOG(GNUNET_ERROR_TYPE_DEBUG,
3522 "Received PUSH (%s)\n",
3524 if (channel_ctx->peer_ctx->sub == msub)
3526 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3527 if (NULL != map_single_hop &&
3528 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3531 GNUNET_STATISTICS_update(stats,
3532 "# push message received (multi-hop peer)",
3538 #if ENABLE_MALICIOUS
3539 struct AttackedPeer *tmp_att_peer;
3541 if ((1 == mal_type) ||
3543 { /* Try to maximise representation */
3544 tmp_att_peer = GNUNET_new(struct AttackedPeer);
3545 tmp_att_peer->peer_id = *peer;
3546 if (NULL == att_peer_set)
3547 att_peer_set = GNUNET_CONTAINER_multipeermap_create(1, GNUNET_NO);
3548 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(att_peer_set,
3551 GNUNET_CONTAINER_DLL_insert(att_peers_head,
3554 add_peer_array_to_set(peer, 1, att_peer_set);
3558 GNUNET_free(tmp_att_peer);
3563 else if (2 == mal_type)
3565 /* We attack one single well-known peer - simply ignore */
3567 #endif /* ENABLE_MALICIOUS */
3569 /* Add the sending peer to the push_map */
3570 CustomPeerMap_put(channel_ctx->peer_ctx->sub->push_map, peer);
3572 GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3573 &channel_ctx->peer_ctx->peer_id));
3574 GNUNET_CADET_receive_done(channel_ctx->channel);
3579 * Handle PULL REQUEST request message from another peer.
3581 * Reply with the view of PeerIDs.
3583 * @param cls Closure - Context of channel
3584 * @param msg Message - unused
3587 handle_peer_pull_request(void *cls,
3588 const struct GNUNET_MessageHeader *msg)
3590 const struct ChannelCtx *channel_ctx = cls;
3591 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3592 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3593 const struct GNUNET_PeerIdentity *view_array;
3597 LOG(GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s(peer));
3598 if (peer_ctx->sub == msub)
3600 GNUNET_STATISTICS_update(stats,
3601 "# pull request message received",
3604 if (NULL != map_single_hop &&
3605 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3606 &peer_ctx->peer_id))
3608 GNUNET_STATISTICS_update(stats,
3609 "# pull request message received (multi-hop peer)",
3615 #if ENABLE_MALICIOUS
3618 { /* Try to maximise representation */
3619 send_pull_reply(peer_ctx, mal_peers, num_mal_peers);
3622 else if (2 == mal_type)
3623 { /* Try to partition network */
3624 if (0 == GNUNET_memcmp(&attacked_peer, peer))
3626 send_pull_reply(peer_ctx, mal_peers, num_mal_peers);
3629 #endif /* ENABLE_MALICIOUS */
3631 GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3632 &channel_ctx->peer_ctx->peer_id));
3633 GNUNET_CADET_receive_done(channel_ctx->channel);
3634 view_array = View_get_as_array(channel_ctx->peer_ctx->sub->view);
3635 send_pull_reply(peer_ctx,
3637 View_size(channel_ctx->peer_ctx->sub->view));
3642 * Check whether we sent a corresponding request and
3643 * whether this reply is the first one.
3645 * @param cls Closure - Context of channel
3646 * @param msg Message containing the replied peers
3649 check_peer_pull_reply(void *cls,
3650 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3652 struct ChannelCtx *channel_ctx = cls;
3653 struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3655 if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs(msg->header.size))
3658 return GNUNET_SYSERR;
3661 if ((ntohs(msg->header.size) - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
3662 sizeof(struct GNUNET_PeerIdentity) != ntohl(msg->num_peers))
3664 LOG(GNUNET_ERROR_TYPE_ERROR,
3665 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3666 ntohl(msg->num_peers),
3667 (ntohs(msg->header.size) - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)) /
3668 sizeof(struct GNUNET_PeerIdentity));
3670 return GNUNET_SYSERR;
3673 if (GNUNET_YES != check_peer_flag(sender_ctx->sub->peer_map,
3674 &sender_ctx->peer_id,
3675 Peers_PULL_REPLY_PENDING))
3677 LOG(GNUNET_ERROR_TYPE_WARNING,
3678 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3679 GNUNET_i2s(&sender_ctx->peer_id));
3680 if (sender_ctx->sub == msub)
3682 GNUNET_STATISTICS_update(stats,
3683 "# unrequested pull replies",
3693 * Handle PULL REPLY message from another peer.
3695 * @param cls Closure
3696 * @param msg The message header
3699 handle_peer_pull_reply(void *cls,
3700 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3702 const struct ChannelCtx *channel_ctx = cls;
3703 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3704 const struct GNUNET_PeerIdentity *peers;
3705 struct Sub *sub = channel_ctx->peer_ctx->sub;
3708 #if ENABLE_MALICIOUS
3709 struct AttackedPeer *tmp_att_peer;
3710 #endif /* ENABLE_MALICIOUS */
3712 sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3713 LOG(GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s(sender));
3714 if (channel_ctx->peer_ctx->sub == msub)
3716 GNUNET_STATISTICS_update(stats,
3717 "# pull reply messages received",
3720 if (NULL != map_single_hop &&
3721 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3722 &channel_ctx->peer_ctx->peer_id))
3724 GNUNET_STATISTICS_update(stats,
3725 "# pull reply messages received (multi-hop peer)",
3731 #if ENABLE_MALICIOUS
3732 // We shouldn't even receive pull replies as we're not sending
3736 #endif /* ENABLE_MALICIOUS */
3738 /* Do actual logic */
3739 peers = (const struct GNUNET_PeerIdentity *)&msg[1];
3741 LOG(GNUNET_ERROR_TYPE_DEBUG,
3742 "PULL REPLY received, got following %u peers:\n",
3743 ntohl(msg->num_peers));
3745 for (i = 0; i < ntohl(msg->num_peers); i++)
3747 LOG(GNUNET_ERROR_TYPE_DEBUG,
3750 GNUNET_i2s(&peers[i]));
3752 #if ENABLE_MALICIOUS
3753 if ((NULL != att_peer_set) &&
3754 (1 == mal_type || 3 == mal_type))
3755 { /* Add attacked peer to local list */
3756 // TODO check if we sent a request and this was the first reply
3757 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(att_peer_set,
3759 && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(mal_peer_set,
3762 tmp_att_peer = GNUNET_new(struct AttackedPeer);
3763 tmp_att_peer->peer_id = peers[i];
3764 GNUNET_CONTAINER_DLL_insert(att_peers_head,
3767 add_peer_array_to_set(&peers[i], 1, att_peer_set);
3771 #endif /* ENABLE_MALICIOUS */
3772 /* Make sure we 'know' about this peer */
3773 (void)insert_peer(channel_ctx->peer_ctx->sub,
3776 if (GNUNET_YES == check_peer_valid(channel_ctx->peer_ctx->sub->valid_peers,
3779 CustomPeerMap_put(channel_ctx->peer_ctx->sub->pull_map,
3784 schedule_operation(channel_ctx->peer_ctx,
3786 channel_ctx->peer_ctx->sub); /* cls */
3787 (void)issue_peer_online_check(channel_ctx->peer_ctx->sub,
3792 UNSET_PEER_FLAG(get_peer_ctx(channel_ctx->peer_ctx->sub->peer_map,
3794 Peers_PULL_REPLY_PENDING);
3795 clean_peer(channel_ctx->peer_ctx->sub,
3798 GNUNET_break_op(check_peer_known(channel_ctx->peer_ctx->sub->peer_map,
3800 GNUNET_CADET_receive_done(channel_ctx->channel);
3805 * Compute a random delay.
3806 * A uniformly distributed value between mean + spread and mean - spread.
3808 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3809 * It would return a random value between 2 and 6 min.
3811 * @param mean the mean time until the next round
3812 * @param spread the inverse amount of deviation from the mean
3814 static struct GNUNET_TIME_Relative
3815 compute_rand_delay(struct GNUNET_TIME_Relative mean,
3816 unsigned int spread)
3818 struct GNUNET_TIME_Relative half_interval;
3819 struct GNUNET_TIME_Relative ret;
3820 unsigned int rand_delay;
3821 unsigned int max_rand_delay;
3825 LOG(GNUNET_ERROR_TYPE_WARNING,
3826 "Not accepting spread of 0\n");
3830 GNUNET_assert(0 != mean.rel_value_us);
3832 /* Compute random time value between spread * mean and spread * mean */
3833 half_interval = GNUNET_TIME_relative_divide(mean, spread);
3835 max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2 / spread);
3837 * Compute random value between (0 and 1) * round_interval
3838 * via multiplying round_interval with a 'fraction' (0 to value)/value
3840 rand_delay = GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3841 ret = GNUNET_TIME_relative_saturating_multiply(mean, rand_delay);
3842 ret = GNUNET_TIME_relative_divide(ret, max_rand_delay);
3843 ret = GNUNET_TIME_relative_add(ret, half_interval);
3845 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3846 LOG(GNUNET_ERROR_TYPE_WARNING,
3847 "Returning FOREVER_REL\n");
3854 * Send single pull request
3856 * @param peer_ctx Context to the peer to send request to
3859 send_pull_request(struct PeerContext *peer_ctx)
3861 struct GNUNET_MQ_Envelope *ev;
3863 GNUNET_assert(GNUNET_NO == check_peer_flag(peer_ctx->sub->peer_map,
3865 Peers_PULL_REPLY_PENDING));
3866 SET_PEER_FLAG(peer_ctx,
3867 Peers_PULL_REPLY_PENDING);
3868 peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3870 LOG(GNUNET_ERROR_TYPE_DEBUG,
3871 "Going to send PULL REQUEST to peer %s.\n",
3872 GNUNET_i2s(&peer_ctx->peer_id));
3874 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3875 send_message(peer_ctx,
3880 GNUNET_STATISTICS_update(stats,
3881 "# pull request send issued",
3884 if (NULL != map_single_hop &&
3885 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3886 &peer_ctx->peer_id))
3888 GNUNET_STATISTICS_update(stats,
3889 "# pull request send issued (multi-hop peer)",
3900 * @param peer_ctx Context of peer to send push to
3903 send_push(struct PeerContext *peer_ctx)
3905 struct GNUNET_MQ_Envelope *ev;
3907 LOG(GNUNET_ERROR_TYPE_DEBUG,
3908 "Going to send PUSH to peer %s.\n",
3909 GNUNET_i2s(&peer_ctx->peer_id));
3911 ev = GNUNET_MQ_msg_header(GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3912 send_message(peer_ctx, ev, "PUSH");
3915 GNUNET_STATISTICS_update(stats,
3916 "# push send issued",
3919 if (NULL != map_single_hop &&
3920 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains(map_single_hop,
3921 &peer_ctx->peer_id))
3923 GNUNET_STATISTICS_update(stats,
3924 "# push send issued (multi-hop peer)",
3932 #if ENABLE_MALICIOUS
3936 * @brief This function is called, when the client tells us to act malicious.
3937 * It verifies that @a msg is well-formed.
3939 * @param cls the closure (#ClientContext)
3940 * @param msg the message
3941 * @return #GNUNET_OK if @a msg is well-formed
3944 check_client_act_malicious(void *cls,
3945 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3947 struct ClientContext *cli_ctx = cls;
3948 uint16_t msize = ntohs(msg->header.size);
3949 uint32_t num_peers = ntohl(msg->num_peers);
3951 msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
3952 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3953 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3955 LOG(GNUNET_ERROR_TYPE_ERROR,
3956 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3957 ntohl(msg->num_peers),
3958 (msize / sizeof(struct GNUNET_PeerIdentity)));
3960 GNUNET_SERVICE_client_drop(cli_ctx->client);
3961 return GNUNET_SYSERR;
3967 * Turn RPS service to act malicious.
3969 * @param cls Closure
3970 * @param client The client that sent the message
3971 * @param msg The message header
3974 handle_client_act_malicious(void *cls,
3975 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3977 struct ClientContext *cli_ctx = cls;
3978 struct GNUNET_PeerIdentity *peers;
3979 uint32_t num_mal_peers_sent;
3980 uint32_t num_mal_peers_old;
3981 struct Sub *sub = cli_ctx->sub;
3985 /* Do actual logic */
3986 peers = (struct GNUNET_PeerIdentity *)&msg[1];
3987 mal_type = ntohl(msg->type);
3988 if (NULL == mal_peer_set)
3989 mal_peer_set = GNUNET_CONTAINER_multipeermap_create(1, GNUNET_NO);
3991 LOG(GNUNET_ERROR_TYPE_DEBUG,
3992 "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3994 ntohl(msg->num_peers));
3997 { /* Try to maximise representation */
3998 /* Add other malicious peers to those we already know */
4000 num_mal_peers_sent = ntohl(msg->num_peers);
4001 num_mal_peers_old = num_mal_peers;
4002 GNUNET_array_grow(mal_peers,
4004 num_mal_peers + num_mal_peers_sent);
4005 GNUNET_memcpy(&mal_peers[num_mal_peers_old],
4007 num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4009 /* Add all mal peers to mal_peer_set */
4010 add_peer_array_to_set(&mal_peers[num_mal_peers_old],
4014 /* Substitute do_round () with do_mal_round () */
4015 GNUNET_assert(NULL != sub->do_round_task);
4016 GNUNET_SCHEDULER_cancel(sub->do_round_task);
4017 sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_mal_round, sub);
4020 else if ((2 == mal_type) ||
4022 { /* Try to partition the network */
4023 /* Add other malicious peers to those we already know */
4025 num_mal_peers_sent = ntohl(msg->num_peers) - 1;
4026 num_mal_peers_old = num_mal_peers;
4027 GNUNET_assert(GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4028 GNUNET_array_grow(mal_peers,
4030 num_mal_peers + num_mal_peers_sent);
4031 if (NULL != mal_peers &&
4034 GNUNET_memcpy(&mal_peers[num_mal_peers_old],
4036 num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4038 /* Add all mal peers to mal_peer_set */
4039 add_peer_array_to_set(&mal_peers[num_mal_peers_old],
4044 /* Store the one attacked peer */
4045 GNUNET_memcpy(&attacked_peer,
4046 &msg->attacked_peer,
4047 sizeof(struct GNUNET_PeerIdentity));
4048 /* Set the flag of the attacked peer to valid to avoid problems */
4049 if (GNUNET_NO == check_peer_known(sub->peer_map, &attacked_peer))
4051 (void)issue_peer_online_check(sub, &attacked_peer);
4054 LOG(GNUNET_ERROR_TYPE_DEBUG,
4055 "Attacked peer is %s\n",
4056 GNUNET_i2s(&attacked_peer));
4058 /* Substitute do_round () with do_mal_round () */
4059 if (NULL != sub->do_round_task)
4061 /* Probably in shutdown */
4062 GNUNET_SCHEDULER_cancel(sub->do_round_task);
4063 sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_mal_round, sub);
4066 else if (0 == mal_type)
4067 { /* Stop acting malicious */
4068 GNUNET_array_grow(mal_peers, num_mal_peers, 0);
4070 /* Substitute do_mal_round () with do_round () */
4071 GNUNET_SCHEDULER_cancel(sub->do_round_task);
4072 sub->do_round_task = GNUNET_SCHEDULER_add_now(&do_round, sub);
4077 GNUNET_SERVICE_client_continue(cli_ctx->client);
4079 GNUNET_SERVICE_client_continue(cli_ctx->client);
4084 * Send out PUSHes and PULLs maliciously.
4086 * This is executed regylary.
4088 * @param cls Closure - Sub
4091 do_mal_round(void *cls)
4093 uint32_t num_pushes;
4095 struct GNUNET_TIME_Relative time_next_round;
4096 struct AttackedPeer *tmp_att_peer;
4097 struct Sub *sub = cls;
4099 LOG(GNUNET_ERROR_TYPE_DEBUG,
4100 "Going to execute next round maliciously type %" PRIu32 ".\n",
4102 sub->do_round_task = NULL;
4103 GNUNET_assert(mal_type <= 3);
4104 /* Do malicious actions */
4106 { /* Try to maximise representation */
4107 /* The maximum of pushes we're going to send this round */
4108 num_pushes = GNUNET_MIN(GNUNET_MIN(push_limit,
4109 num_attacked_peers),
4110 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4112 LOG(GNUNET_ERROR_TYPE_DEBUG,
4113 "Going to send %" PRIu32 " pushes\n",
4116 /* Send PUSHes to attacked peers */
4117 for (i = 0; i < num_pushes; i++)
4119 if (att_peers_tail == att_peer_index)
4120 att_peer_index = att_peers_head;
4122 att_peer_index = att_peer_index->next;
4124 send_push(get_peer_ctx(sub->peer_map, &att_peer_index->peer_id));
4127 /* Send PULLs to some peers to learn about additional peers to attack */
4128 tmp_att_peer = att_peer_index;
4129 for (i = 0; i < num_pushes * alpha; i++)
4131 if (att_peers_tail == tmp_att_peer)
4132 tmp_att_peer = att_peers_head;
4134 att_peer_index = tmp_att_peer->next;
4136 send_pull_request(get_peer_ctx(sub->peer_map, &tmp_att_peer->peer_id));
4141 else if (2 == mal_type)
4143 * Try to partition the network
4144 * Send as many pushes to the attacked peer as possible
4145 * That is one push per round as it will ignore more.
4147 (void)issue_peer_online_check(sub, &attacked_peer);
4148 if (GNUNET_YES == check_peer_flag(sub->peer_map,
4151 send_push(get_peer_ctx(sub->peer_map, &attacked_peer));
4156 { /* Combined attack */
4157 /* Send PUSH to attacked peers */
4158 if (GNUNET_YES == check_peer_known(sub->peer_map, &attacked_peer))
4160 (void)issue_peer_online_check(sub, &attacked_peer);
4161 if (GNUNET_YES == check_peer_flag(sub->peer_map,
4165 LOG(GNUNET_ERROR_TYPE_DEBUG,
4166 "Goding to send push to attacked peer (%s)\n",
4167 GNUNET_i2s(&attacked_peer));
4168 send_push(get_peer_ctx(sub->peer_map, &attacked_peer));
4171 (void)issue_peer_online_check(sub, &attacked_peer);
4173 /* The maximum of pushes we're going to send this round */
4174 num_pushes = GNUNET_MIN(GNUNET_MIN(push_limit - 1,
4175 num_attacked_peers),
4176 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4178 LOG(GNUNET_ERROR_TYPE_DEBUG,
4179 "Going to send %" PRIu32 " pushes\n",
4182 for (i = 0; i < num_pushes; i++)
4184 if (att_peers_tail == att_peer_index)
4185 att_peer_index = att_peers_head;
4187 att_peer_index = att_peer_index->next;
4189 send_push(get_peer_ctx(sub->peer_map, &att_peer_index->peer_id));
4192 /* Send PULLs to some peers to learn about additional peers to attack */
4193 tmp_att_peer = att_peer_index;
4194 for (i = 0; i < num_pushes * alpha; i++)
4196 if (att_peers_tail == tmp_att_peer)
4197 tmp_att_peer = att_peers_head;
4199 att_peer_index = tmp_att_peer->next;
4201 send_pull_request(get_peer_ctx(sub->peer_map, &tmp_att_peer->peer_id));
4205 /* Schedule next round */
4206 time_next_round = compute_rand_delay(sub->round_interval, 2);
4208 GNUNET_assert(NULL == sub->do_round_task);
4209 sub->do_round_task = GNUNET_SCHEDULER_add_delayed(time_next_round,
4210 &do_mal_round, sub);
4211 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4213 #endif /* ENABLE_MALICIOUS */
4217 * Send out PUSHes and PULLs, possibly update #view, samplers.
4219 * This is executed regylary.
4221 * @param cls Closure - Sub
4227 const struct GNUNET_PeerIdentity *view_array;
4228 unsigned int *permut;
4229 unsigned int a_peers; /* Number of peers we send pushes to */
4230 unsigned int b_peers; /* Number of peers we send pull requests to */
4231 uint32_t first_border;
4232 uint32_t second_border;
4233 struct GNUNET_PeerIdentity peer;
4234 struct GNUNET_PeerIdentity *update_peer;
4235 struct Sub *sub = cls;
4238 LOG(GNUNET_ERROR_TYPE_DEBUG,
4239 "Going to execute next round.\n");
4242 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
4244 sub->do_round_task = NULL;
4246 to_file(sub->file_name_view_log,
4247 "___ new round ___");
4248 #endif /* TO_FILE_FULL */
4249 view_array = View_get_as_array(sub->view);
4250 for (i = 0; i < View_size(sub->view); i++)
4252 LOG(GNUNET_ERROR_TYPE_DEBUG,
4253 "\t%s\n", GNUNET_i2s(&view_array[i]));
4255 to_file(sub->file_name_view_log,
4257 GNUNET_i2s_full(&view_array[i]));
4258 #endif /* TO_FILE_FULL */
4262 /* Send pushes and pull requests */
4263 if (0 < View_size(sub->view))
4265 permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4266 View_size(sub->view));
4269 a_peers = ceil(alpha * View_size(sub->view));
4271 LOG(GNUNET_ERROR_TYPE_DEBUG,
4272 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4273 a_peers, alpha, View_size(sub->view));
4274 for (i = 0; i < a_peers; i++)
4276 peer = view_array[permut[i]];
4277 // FIXME if this fails schedule/loop this for later
4278 send_push(get_peer_ctx(sub->peer_map, &peer));
4281 /* Send PULL requests */
4282 b_peers = ceil(beta * View_size(sub->view));
4283 first_border = a_peers;
4284 second_border = a_peers + b_peers;
4285 if (second_border > View_size(sub->view))
4287 first_border = View_size(sub->view) - b_peers;
4288 second_border = View_size(sub->view);
4290 LOG(GNUNET_ERROR_TYPE_DEBUG,
4291 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4292 b_peers, beta, View_size(sub->view));
4293 for (i = first_border; i < second_border; i++)
4295 peer = view_array[permut[i]];
4296 if (GNUNET_NO == check_peer_flag(sub->peer_map,
4298 Peers_PULL_REPLY_PENDING))
4299 { // FIXME if this fails schedule/loop this for later
4300 send_pull_request(get_peer_ctx(sub->peer_map, &peer));
4304 GNUNET_free(permut);
4310 /* TODO see how many peers are in push-/pull- list! */
4312 if ((CustomPeerMap_size(sub->push_map) <= alpha * sub->view_size_est_need) &&
4313 (0 < CustomPeerMap_size(sub->push_map)) &&
4314 (0 < CustomPeerMap_size(sub->pull_map)))
4315 { /* If conditions for update are fulfilled, update */
4316 LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4318 uint32_t final_size;
4319 uint32_t peers_to_clean_size;
4320 struct GNUNET_PeerIdentity *peers_to_clean;
4322 peers_to_clean = NULL;
4323 peers_to_clean_size = 0;
4324 GNUNET_array_grow(peers_to_clean,
4325 peers_to_clean_size,
4326 View_size(sub->view));
4327 GNUNET_memcpy(peers_to_clean,
4329 View_size(sub->view) * sizeof(struct GNUNET_PeerIdentity));
4331 /* Seems like recreating is the easiest way of emptying the peermap */
4332 View_clear(sub->view);
4334 to_file(sub->file_name_view_log,
4336 #endif /* TO_FILE_FULL */
4338 first_border = GNUNET_MIN(ceil(alpha * sub->view_size_est_need),
4339 CustomPeerMap_size(sub->push_map));
4340 second_border = first_border +
4341 GNUNET_MIN(floor(beta * sub->view_size_est_need),
4342 CustomPeerMap_size(sub->pull_map));
4343 final_size = second_border +
4344 ceil((1 - (alpha + beta)) * sub->view_size_est_need);
4345 LOG(GNUNET_ERROR_TYPE_DEBUG,
4346 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %" PRIu32 "\n",
4351 /* Update view with peers received through PUSHes */
4352 permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4353 CustomPeerMap_size(sub->push_map));
4354 for (i = 0; i < first_border; i++)
4357 inserted = insert_in_view(sub,
4358 CustomPeerMap_get_peer_by_index(sub->push_map,
4360 if (GNUNET_OK == inserted)
4362 clients_notify_stream_peer(sub,
4364 CustomPeerMap_get_peer_by_index(sub->push_map, permut[i]));
4367 to_file(sub->file_name_view_log,
4369 GNUNET_i2s_full(&view_array[i]));
4370 #endif /* TO_FILE_FULL */
4371 // TODO change the peer_flags accordingly
4373 GNUNET_free(permut);
4376 /* Update view with peers received through PULLs */
4377 permut = GNUNET_CRYPTO_random_permute(GNUNET_CRYPTO_QUALITY_STRONG,
4378 CustomPeerMap_size(sub->pull_map));
4379 for (i = first_border; i < second_border; i++)
4382 inserted = insert_in_view(sub,
4383 CustomPeerMap_get_peer_by_index(sub->pull_map,
4384 permut[i - first_border]));
4385 if (GNUNET_OK == inserted)
4387 clients_notify_stream_peer(sub,
4389 CustomPeerMap_get_peer_by_index(sub->pull_map,
4390 permut[i - first_border]));
4393 to_file(sub->file_name_view_log,
4395 GNUNET_i2s_full(&view_array[i]));
4396 #endif /* TO_FILE_FULL */
4397 // TODO change the peer_flags accordingly
4399 GNUNET_free(permut);
4402 /* Update view with peers from history */
4403 RPS_sampler_get_n_rand_peers(sub->sampler,
4404 final_size - second_border,
4407 // TODO change the peer_flags accordingly
4409 for (i = 0; i < View_size(sub->view); i++)
4410 rem_from_list(&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4412 /* Clean peers that were removed from the view */
4413 for (i = 0; i < peers_to_clean_size; i++)
4416 to_file(sub->file_name_view_log,
4418 GNUNET_i2s_full(&peers_to_clean[i]));
4419 #endif /* TO_FILE_FULL */
4420 clean_peer(sub, &peers_to_clean[i]);
4423 GNUNET_array_grow(peers_to_clean, peers_to_clean_size, 0);
4424 clients_notify_view_update(sub);
4428 LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4431 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4432 if (CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4433 !(0 >= CustomPeerMap_size(sub->pull_map)))
4434 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4435 if (CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4436 (0 >= CustomPeerMap_size(sub->pull_map)))
4437 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4438 if (0 >= CustomPeerMap_size(sub->push_map) &&
4439 !(0 >= CustomPeerMap_size(sub->pull_map)))
4440 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4441 if (0 >= CustomPeerMap_size(sub->push_map) &&
4442 (0 >= CustomPeerMap_size(sub->pull_map)))
4443 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4444 if (0 >= CustomPeerMap_size(sub->pull_map) &&
4445 CustomPeerMap_size(sub->push_map) > alpha * sub->view_size_est_need &&
4446 0 >= CustomPeerMap_size(sub->push_map))
4447 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4450 // TODO independent of that also get some peers from CADET_get_peers()?
4451 if (CustomPeerMap_size(sub->push_map) < HISTOGRAM_FILE_SLOTS)
4453 sub->push_recv[CustomPeerMap_size(sub->push_map)]++;
4457 LOG(GNUNET_ERROR_TYPE_WARNING,
4458 "Push map size too big for histogram (%u, %u)\n",
4459 CustomPeerMap_size(sub->push_map),
4460 HISTOGRAM_FILE_SLOTS);
4462 // FIXME check bounds of histogram
4463 sub->push_delta[(int32_t)(CustomPeerMap_size(sub->push_map) -
4464 (alpha * sub->view_size_est_need)) +
4465 (HISTOGRAM_FILE_SLOTS / 2)]++;
4468 GNUNET_STATISTICS_set(stats,
4469 "# peers in push map at end of round",
4470 CustomPeerMap_size(sub->push_map),
4472 GNUNET_STATISTICS_set(stats,
4473 "# peers in pull map at end of round",
4474 CustomPeerMap_size(sub->pull_map),
4476 GNUNET_STATISTICS_set(stats,
4477 "# peers in view at end of round",
4478 View_size(sub->view),
4480 GNUNET_STATISTICS_set(stats,
4481 "# expected pushes",
4482 alpha * sub->view_size_est_need,
4484 GNUNET_STATISTICS_set(stats,
4485 "delta expected - received pushes",
4486 CustomPeerMap_size(sub->push_map) - (alpha * sub->view_size_est_need),
4490 LOG(GNUNET_ERROR_TYPE_DEBUG,
4491 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4492 CustomPeerMap_size(sub->push_map),
4493 CustomPeerMap_size(sub->pull_map),
4495 View_size(sub->view),
4496 alpha * View_size(sub->view));
4498 /* Update samplers */
4499 for (i = 0; i < CustomPeerMap_size(sub->push_map); i++)
4501 update_peer = CustomPeerMap_get_peer_by_index(sub->push_map, i);
4502 LOG(GNUNET_ERROR_TYPE_DEBUG,
4503 "Updating with peer %s from push list\n",
4504 GNUNET_i2s(update_peer));
4505 insert_in_sampler(sub, update_peer);
4506 clean_peer(sub, update_peer); /* This cleans only if it is not in the view */
4509 for (i = 0; i < CustomPeerMap_size(sub->pull_map); i++)
4511 LOG(GNUNET_ERROR_TYPE_DEBUG,
4512 "Updating with peer %s from pull list\n",
4513 GNUNET_i2s(CustomPeerMap_get_peer_by_index(sub->pull_map, i)));
4514 insert_in_sampler(sub, CustomPeerMap_get_peer_by_index(sub->pull_map, i));
4515 /* This cleans only if it is not in the view */
4516 clean_peer(sub, CustomPeerMap_get_peer_by_index(sub->pull_map, i));
4520 /* Empty push/pull lists */
4521 CustomPeerMap_clear(sub->push_map);
4522 CustomPeerMap_clear(sub->pull_map);
4526 GNUNET_STATISTICS_set(stats,
4528 View_size(sub->view),
4532 struct GNUNET_TIME_Relative time_next_round;
4534 time_next_round = compute_rand_delay(sub->round_interval, 2);
4536 /* Schedule next round */
4537 sub->do_round_task = GNUNET_SCHEDULER_add_delayed(time_next_round,
4539 LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4544 * This is called from GNUNET_CADET_get_peers().
4546 * It is called on every peer(ID) that cadet somehow has contact with.
4547 * We use those to initialise the sampler.
4549 * implements #GNUNET_CADET_PeersCB
4551 * @param cls Closure - Sub
4552 * @param peer Peer, or NULL on "EOF".
4553 * @param tunnel Do we have a tunnel towards this peer?
4554 * @param n_paths Number of known paths towards this peer.
4555 * @param best_path How long is the best path?
4556 * (0 = unknown, 1 = ourselves, 2 = neighbor)
4559 init_peer_cb(void *cls,
4560 const struct GNUNET_PeerIdentity *peer,
4561 int tunnel, /* "Do we have a tunnel towards this peer?" */
4562 unsigned int n_paths, /* "Number of known paths towards this peer" */
4563 unsigned int best_path) /* "How long is the best path?
4564 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4566 struct Sub *sub = cls;
4574 LOG(GNUNET_ERROR_TYPE_DEBUG,
4575 "Got peer_id %s from cadet\n",
4577 got_peer(sub, peer);
4583 * @brief Iterator function over stored, valid peers.
4585 * We initialise the sampler with those.
4587 * @param cls Closure - Sub
4588 * @param peer the peer id
4589 * @return #GNUNET_YES if we should continue to
4591 * #GNUNET_NO if not.
4594 valid_peers_iterator(void *cls,
4595 const struct GNUNET_PeerIdentity *peer)
4597 struct Sub *sub = cls;
4601 LOG(GNUNET_ERROR_TYPE_DEBUG,
4602 "Got stored, valid peer %s\n",
4604 got_peer(sub, peer);
4611 * Iterator over peers from peerinfo.
4613 * @param cls Closure - Sub
4614 * @param peer id of the peer, NULL for last call
4615 * @param hello hello message for the peer (can be NULL)
4616 * @param error message
4619 process_peerinfo_peers(void *cls,
4620 const struct GNUNET_PeerIdentity *peer,
4621 const struct GNUNET_HELLO_Message *hello,
4622 const char *err_msg)
4624 struct Sub *sub = cls;
4631 LOG(GNUNET_ERROR_TYPE_DEBUG,
4632 "Got peer_id %s from peerinfo\n",
4634 got_peer(sub, peer);
4640 * Task run during shutdown.
4642 * @param cls Closure - unused
4645 shutdown_task(void *cls)
4648 struct ClientContext *client_ctx;
4650 LOG(GNUNET_ERROR_TYPE_DEBUG,
4651 "RPS service is going down\n");
4653 /* Clean all clients */
4654 for (client_ctx = cli_ctx_head;
4655 NULL != cli_ctx_head;
4656 client_ctx = cli_ctx_head)
4658 destroy_cli_ctx(client_ctx);
4666 /* Disconnect from other services */
4667 GNUNET_PEERINFO_notify_cancel(peerinfo_notify_handle);
4668 GNUNET_PEERINFO_disconnect(peerinfo_handle);
4669 peerinfo_handle = NULL;
4670 GNUNET_NSE_disconnect(nse);
4671 if (NULL != map_single_hop)
4673 /* core_init was called - core was initialised */
4674 /* disconnect first, so no callback tries to access missing peermap */
4675 GNUNET_CORE_disconnect(core_handle);
4677 GNUNET_CONTAINER_multipeermap_destroy(map_single_hop);
4678 map_single_hop = NULL;
4683 GNUNET_STATISTICS_destroy(stats,
4687 GNUNET_CADET_disconnect(cadet_handle);
4688 cadet_handle = NULL;
4689 #if ENABLE_MALICIOUS
4690 struct AttackedPeer *tmp_att_peer;
4691 GNUNET_array_grow(mal_peers,
4694 if (NULL != mal_peer_set)
4695 GNUNET_CONTAINER_multipeermap_destroy(mal_peer_set);
4696 if (NULL != att_peer_set)
4697 GNUNET_CONTAINER_multipeermap_destroy(att_peer_set);
4698 while (NULL != att_peers_head)
4700 tmp_att_peer = att_peers_head;
4701 GNUNET_CONTAINER_DLL_remove(att_peers_head,
4704 GNUNET_free(tmp_att_peer);
4706 #endif /* ENABLE_MALICIOUS */
4712 * Handle client connecting to the service.
4715 * @param client the new client
4716 * @param mq the message queue of @a client
4720 client_connect_cb(void *cls,
4721 struct GNUNET_SERVICE_Client *client,
4722 struct GNUNET_MQ_Handle *mq)
4724 struct ClientContext *cli_ctx;
4728 LOG(GNUNET_ERROR_TYPE_DEBUG,
4729 "Client connected\n");
4731 return client; /* Server was destroyed before a client connected. Shutting down */
4732 cli_ctx = GNUNET_new(struct ClientContext);
4734 cli_ctx->view_updates_left = -1;
4735 cli_ctx->stream_update = GNUNET_NO;
4736 cli_ctx->client = client;
4737 GNUNET_CONTAINER_DLL_insert(cli_ctx_head,
4744 * Callback called when a client disconnected from the service
4746 * @param cls closure for the service
4747 * @param c the client that disconnected
4748 * @param internal_cls should be equal to @a c
4751 client_disconnect_cb(void *cls,
4752 struct GNUNET_SERVICE_Client *client,
4755 struct ClientContext *cli_ctx = internal_cls;
4758 GNUNET_assert(client == cli_ctx->client);
4760 {/* shutdown task - destroy all clients */
4761 while (NULL != cli_ctx_head)
4762 destroy_cli_ctx(cli_ctx_head);
4765 { /* destroy this client */
4766 LOG(GNUNET_ERROR_TYPE_DEBUG,
4767 "Client disconnected. Destroy its context.\n");
4768 destroy_cli_ctx(cli_ctx);
4774 * Handle random peer sampling clients.
4776 * @param cls closure
4777 * @param c configuration to use
4778 * @param service the initialized service
4782 const struct GNUNET_CONFIGURATION_Handle *c,
4783 struct GNUNET_SERVICE_Handle *service)
4785 struct GNUNET_TIME_Relative round_interval;
4786 long long unsigned int sampler_size;
4787 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4788 struct GNUNET_HashCode hash;
4793 GNUNET_log_setup("rps",
4794 GNUNET_error_type_to_string(GNUNET_ERROR_TYPE_DEBUG),
4798 GNUNET_CRYPTO_get_peer_identity(cfg,
4799 &own_identity); // TODO check return value
4800 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
4801 "STARTING SERVICE (rps) for peer [%s]\n",
4802 GNUNET_i2s(&own_identity));
4803 #if ENABLE_MALICIOUS
4804 GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
4805 "Malicious execution compiled in.\n");
4806 #endif /* ENABLE_MALICIOUS */
4808 /* Get time interval from the configuration */
4810 GNUNET_CONFIGURATION_get_value_time(cfg,
4815 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
4816 "RPS", "ROUNDINTERVAL");
4817 GNUNET_SCHEDULER_shutdown();
4821 /* Get initial size of sampler/view from the configuration */
4823 GNUNET_CONFIGURATION_get_value_number(cfg,
4828 GNUNET_log_config_missing(GNUNET_ERROR_TYPE_ERROR,
4830 GNUNET_SCHEDULER_shutdown();
4834 cadet_handle = GNUNET_CADET_connect(cfg);
4835 GNUNET_assert(NULL != cadet_handle);
4836 core_handle = GNUNET_CORE_connect(cfg,
4838 core_init, /* init */
4839 core_connects, /* connects */
4840 core_disconnects, /* disconnects */
4841 NULL); /* handlers */
4842 GNUNET_assert(NULL != core_handle);
4849 /* Set up main Sub */
4850 GNUNET_CRYPTO_hash(hash_port_string,
4851 strlen(hash_port_string),
4853 msub = new_sub(&hash,
4854 sampler_size, /* Will be overwritten by config */
4858 peerinfo_handle = GNUNET_PEERINFO_connect(cfg);
4860 /* connect to NSE */
4861 nse = GNUNET_NSE_connect(cfg, nse_callback, NULL);
4863 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4864 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4865 // TODO send push/pull to each of those peers?
4866 LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4867 restore_valid_peers(msub);
4868 get_valid_peers(msub->valid_peers, valid_peers_iterator, msub);
4870 peerinfo_notify_handle = GNUNET_PEERINFO_notify(cfg,
4872 process_peerinfo_peers,
4875 LOG(GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4877 GNUNET_SCHEDULER_add_shutdown(&shutdown_task, NULL);
4878 stats = GNUNET_STATISTICS_create("rps", cfg);
4883 * Define "main" method using service macro.
4887 GNUNET_SERVICE_OPTION_NONE,
4890 &client_disconnect_cb,
4892 GNUNET_MQ_hd_var_size(client_seed,
4893 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4894 struct GNUNET_RPS_CS_SeedMessage,
4896 #if ENABLE_MALICIOUS
4897 GNUNET_MQ_hd_var_size(client_act_malicious,
4898 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4899 struct GNUNET_RPS_CS_ActMaliciousMessage,
4901 #endif /* ENABLE_MALICIOUS */
4902 GNUNET_MQ_hd_fixed_size(client_view_request,
4903 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4904 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4906 GNUNET_MQ_hd_fixed_size(client_view_cancel,
4907 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4908 struct GNUNET_MessageHeader,
4910 GNUNET_MQ_hd_fixed_size(client_stream_request,
4911 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4912 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4914 GNUNET_MQ_hd_fixed_size(client_stream_cancel,
4915 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4916 struct GNUNET_MessageHeader,
4918 GNUNET_MQ_hd_fixed_size(client_start_sub,
4919 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4920 struct GNUNET_RPS_CS_SubStartMessage,
4922 GNUNET_MQ_hd_fixed_size(client_stop_sub,
4923 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4924 struct GNUNET_RPS_CS_SubStopMessage,
4926 GNUNET_MQ_handler_end());
4928 /* end of gnunet-service-rps.c */