2 This file is part of GNUnet.
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file rps/gnunet-service-rps.c
23 * @brief rps service implementation
24 * @author Julius Bünger
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
44 #define LOG(kind, ...) GNUNET_log (kind, __VA_ARGS__)
46 // TODO check for overflows
48 // TODO align message structs
50 // TODO connect to friends
52 // TODO blacklist? (-> mal peer detection on top of brahms)
54 // hist_size_init, hist_size_max
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
61 * Set a peer flag of given peer context.
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
66 * Get peer flag of given peer context.
68 #define check_peer_flag_set(peer_ctx, mask) \
69 ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
72 * Unset flag of given peer context.
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
77 * Get channel flag of given channel context.
79 #define check_channel_flag_set(channel_flags, mask) \
80 ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
83 * Unset flag of given channel context.
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
89 * Pending operation on peer consisting of callback and closure
91 * When an operation cannot be executed right now this struct is used to store
92 * the callback and closure for later execution.
108 * List containing all messages that are yet to be send
110 * This is used to keep track of all messages that have not been sent yet. When
111 * a peer is to be removed the pending messages can be removed properly.
113 struct PendingMessage
118 struct PendingMessage *next;
119 struct PendingMessage *prev;
122 * The envelope to the corresponding message
124 struct GNUNET_MQ_Envelope *ev;
127 * The corresponding context
129 struct PeerContext *peer_ctx;
138 * @brief Context for a channel
143 * Struct used to keep track of other peer's status
145 * This is stored in a multipeermap.
146 * It contains information such as cadet channels, a message queue for sending,
147 * status about the channels, the pending operations on this peer and some flags
148 * about the status of the peer itself. (online, valid, ...)
153 * The Sub this context belongs to.
158 * Message queue open to client
160 struct GNUNET_MQ_Handle *mq;
163 * Channel open to client.
165 struct ChannelCtx *send_channel_ctx;
168 * Channel open from client.
170 struct ChannelCtx *recv_channel_ctx;
173 * Array of pending operations on this peer.
175 struct PeerPendingOp *pending_ops;
178 * Handle to the callback given to cadet_ntfy_tmt_rdy()
180 * To be canceled on shutdown.
182 struct PendingMessage *online_check_pending;
185 * Number of pending operations.
187 unsigned int num_pending_ops;
190 * Identity of the peer
192 struct GNUNET_PeerIdentity peer_id;
195 * Flags indicating status of peer
200 * Last time we received something from that peer.
202 struct GNUNET_TIME_Absolute last_message_recv;
205 * Last time we received a keepalive message.
207 struct GNUNET_TIME_Absolute last_keepalive;
210 * DLL with all messages that are yet to be sent
212 struct PendingMessage *pending_messages_head;
213 struct PendingMessage *pending_messages_tail;
216 * This is pobably followed by 'statistical' data (when we first saw
217 * it, how did we get its ID, how many pushes (in a timeinterval),
220 uint32_t round_pull_req;
224 * @brief Closure to #valid_peer_iterator
226 struct PeersIteratorCls
231 PeersIterator iterator;
234 * Closure to iterator
240 * @brief Context for a channel
245 * @brief The channel itself
247 struct GNUNET_CADET_Channel *channel;
250 * @brief The peer context associated with the channel
252 struct PeerContext *peer_ctx;
255 * @brief When channel destruction needs to be delayed (because it is called
256 * from within the cadet routine of another channel destruction) this task
257 * refers to the respective _SCHEDULER_Task.
259 struct GNUNET_SCHEDULER_Task *destruction_task;
266 * If type is 2 This struct is used to store the attacked peers in a DLL
273 struct AttackedPeer *next;
274 struct AttackedPeer *prev;
279 struct GNUNET_PeerIdentity peer_id;
282 #endif /* ENABLE_MALICIOUS */
285 * @brief This number determines the number of slots for files that represent
288 #define HISTOGRAM_FILE_SLOTS 32
291 * @brief The size (in bytes) a file needs to store the histogram
293 * Per slot: 1 newline, up to 4 chars,
294 * Additionally: 1 null termination
296 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
301 * Essentially one instance of brahms that only connects to other instances
302 * with the same (secret) value.
307 * @brief Hash of the shared value that defines Subs.
309 struct GNUNET_HashCode hash;
312 * @brief Port to communicate to other peers.
314 struct GNUNET_CADET_Port *cadet_port;
317 * @brief Hashmap of valid peers.
319 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
322 * @brief Filename of the file that stores the valid peers persistently.
324 char *filename_valid_peers;
327 * Set of all peers to keep track of them.
329 struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
332 * @brief This is the minimum estimate used as sampler size.
334 * It is configured by the user.
336 unsigned int sampler_size_est_min;
339 * The size of sampler we need to be able to satisfy the Brahms protocol's
340 * need of random peers.
342 * This is one minimum size the sampler grows to.
344 unsigned int sampler_size_est_need;
347 * Time interval the do_round task runs in.
349 struct GNUNET_TIME_Relative round_interval;
352 * Sampler used for the Brahms protocol itself.
354 struct RPS_Sampler *sampler;
358 * Name to log view to
360 char *file_name_view_log;
361 #endif /* TO_FILE_FULL */
366 * Name to log number of observed peers to
368 char *file_name_observed_log;
369 #endif /* TO_FILE_FULL */
372 * @brief Count the observed peers
374 uint32_t num_observed_peers;
377 * @brief Multipeermap (ab-) used to count unique peer_ids
379 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
383 * List to store peers received through pushes temporary.
385 struct CustomPeerMap *push_map;
388 * List to store peers received through pulls temporary.
390 struct CustomPeerMap *pull_map;
393 * @brief This is the estimate used as view size.
395 * It is initialised with the minimum
397 unsigned int view_size_est_need;
400 * @brief This is the minimum estimate used as view size.
402 * It is configured by the user.
404 unsigned int view_size_est_min;
412 * Identifier for the main task that runs periodically.
414 struct GNUNET_SCHEDULER_Task *do_round_task;
419 * @brief Counts the executed rounds.
424 * @brief This array accumulates the number of received pushes per round.
426 * Number at index i represents the number of rounds with i observed pushes.
428 uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
431 * @brief Histogram of deltas between the expected and actual number of
434 * As half of the entries are expected to be negative, this is shifted by
435 * #HISTOGRAM_FILE_SLOTS/2.
437 uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
440 * @brief Number of pull replies with this delay measured in rounds.
442 * Number at index i represents the number of pull replies with a delay of i
445 uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
449 /***********************************************************************
451 ***********************************************************************/
456 static const struct GNUNET_CONFIGURATION_Handle *cfg;
459 * Handle to the statistics service.
461 struct GNUNET_STATISTICS_Handle *stats;
466 struct GNUNET_CADET_Handle *cadet_handle;
471 struct GNUNET_CORE_Handle *core_handle;
474 * @brief PeerMap to keep track of connected peers.
476 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
481 static struct GNUNET_PeerIdentity own_identity;
484 * Percentage of total peer number in the view
485 * to send random PUSHes to
490 * Percentage of total peer number in the view
491 * to send random PULLs to
498 static struct GNUNET_NSE_Handle *nse;
501 * Handler to PEERINFO.
503 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
506 * Handle for cancellation of iteration over peers.
508 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
513 * Type of malicious peer
515 * 0 Don't act malicious at all - Default
516 * 1 Try to maximise representation
517 * 2 Try to partition the network
520 static uint32_t mal_type;
523 * Other malicious peers
525 static struct GNUNET_PeerIdentity *mal_peers;
528 * Hashmap of malicious peers used as set.
529 * Used to more efficiently check whether we know that peer.
531 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
534 * Number of other malicious peers
536 static uint32_t num_mal_peers;
540 * If type is 2 this is the DLL of attacked peers
542 static struct AttackedPeer *att_peers_head;
543 static struct AttackedPeer *att_peers_tail;
546 * This index is used to point to an attacked peer to
547 * implement the round-robin-ish way to select attacked peers.
549 static struct AttackedPeer *att_peer_index;
552 * Hashmap of attacked peers used as set.
553 * Used to more efficiently check whether we know that peer.
555 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
558 * Number of attacked peers
560 static uint32_t num_attacked_peers;
563 * If type is 1 this is the attacked peer
565 static struct GNUNET_PeerIdentity attacked_peer;
568 * The limit of PUSHes we can send in one round.
569 * This is an assumption of the Brahms protocol and either implemented
572 * assumend to be the bandwidth limitation.
574 static uint32_t push_limit = 10000;
575 #endif /* ENABLE_MALICIOUS */
580 * This is run in any case by all peers and connects to all peers without
581 * specifying a shared value.
583 static struct Sub *msub;
586 * @brief Maximum number of valid peers to keep.
587 * TODO read from config
589 static const uint32_t num_valid_peers_max = UINT32_MAX;
591 /***********************************************************************
593 ***********************************************************************/
597 do_round (void *cls);
600 do_mal_round (void *cls);
604 * @brief Get the #PeerContext associated with a peer
606 * @param peer_map The peer map containing the context
607 * @param peer the peer id
609 * @return the #PeerContext
611 static struct PeerContext *
612 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
613 const struct GNUNET_PeerIdentity *peer)
615 struct PeerContext *ctx;
618 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
619 GNUNET_assert (GNUNET_YES == ret);
620 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
621 GNUNET_assert (NULL != ctx);
627 * @brief Check whether we have information about the given peer.
629 * FIXME probably deprecated. Make this the new _online.
631 * @param peer_map The peer map to check for the existence of @a peer
632 * @param peer peer in question
634 * @return #GNUNET_YES if peer is known
635 * #GNUNET_NO if peer is not knwon
638 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
639 const struct GNUNET_PeerIdentity *peer)
641 if (NULL != peer_map)
643 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
653 * @brief Create a new #PeerContext and insert it into the peer map
655 * @param sub The Sub this context belongs to.
656 * @param peer the peer to create the #PeerContext for
658 * @return the #PeerContext
660 static struct PeerContext *
661 create_peer_ctx (struct Sub *sub,
662 const struct GNUNET_PeerIdentity *peer)
664 struct PeerContext *ctx;
667 GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
669 ctx = GNUNET_new (struct PeerContext);
670 ctx->peer_id = *peer;
672 ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
673 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
674 GNUNET_assert (GNUNET_OK == ret);
677 GNUNET_STATISTICS_set (stats,
679 GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
687 * @brief Create or get a #PeerContext
689 * @param sub The Sub to which the created context belongs to
690 * @param peer the peer to get the associated context to
692 * @return the context
694 static struct PeerContext *
695 create_or_get_peer_ctx (struct Sub *sub,
696 const struct GNUNET_PeerIdentity *peer)
698 if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
700 return create_peer_ctx (sub, peer);
702 return get_peer_ctx (sub->peer_map, peer);
707 * @brief Check whether we have a connection to this @a peer
709 * Also sets the #Peers_ONLINE flag accordingly
711 * @param peer_ctx Context of the peer of which connectivity is to be checked
713 * @return #GNUNET_YES if we are connected
714 * #GNUNET_NO otherwise
717 check_connected (struct PeerContext *peer_ctx)
719 /* If we don't know about this peer we don't know whether it's online */
720 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
725 /* Get the context */
726 peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
727 /* If we have no channel to this peer we don't know whether it's online */
728 if ((NULL == peer_ctx->send_channel_ctx) &&
729 (NULL == peer_ctx->recv_channel_ctx))
731 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
734 /* Otherwise (if we have a channel, we know that it's online */
735 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
741 * @brief The closure to #get_rand_peer_iterator.
743 struct GetRandPeerIteratorCls
746 * @brief The index of the peer to return.
747 * Will be decreased until 0.
748 * Then current peer is returned.
753 * @brief Pointer to peer to return.
755 const struct GNUNET_PeerIdentity *peer;
760 * @brief Iterator function for #get_random_peer_from_peermap.
762 * Implements #GNUNET_CONTAINER_PeerMapIterator.
763 * Decreases the index until the index is null.
764 * Then returns the current peer.
766 * @param cls the #GetRandPeerIteratorCls containing index and peer
767 * @param peer current peer
768 * @param value unused
770 * @return #GNUNET_YES if we should continue to
775 get_rand_peer_iterator (void *cls,
776 const struct GNUNET_PeerIdentity *peer,
779 struct GetRandPeerIteratorCls *iterator_cls = cls;
783 if (0 >= iterator_cls->index)
785 iterator_cls->peer = peer;
788 iterator_cls->index--;
794 * @brief Get a random peer from @a peer_map
796 * @param valid_peers Peer map containing valid peers from which to select a
799 * @return a random peer
801 static const struct GNUNET_PeerIdentity *
802 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
804 struct GetRandPeerIteratorCls *iterator_cls;
805 const struct GNUNET_PeerIdentity *ret;
807 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
808 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
809 GNUNET_CONTAINER_multipeermap_size (
811 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
812 get_rand_peer_iterator,
814 ret = iterator_cls->peer;
815 GNUNET_free (iterator_cls);
821 * @brief Add a given @a peer to valid peers.
823 * If valid peers are already #num_valid_peers_max, delete a peer previously.
825 * @param peer The peer that is added to the valid peers.
826 * @param valid_peers Peer map of valid peers to which to add the @a peer
828 * @return #GNUNET_YES if no other peer had to be removed
829 * #GNUNET_NO otherwise
832 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
833 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
835 const struct GNUNET_PeerIdentity *rand_peer;
839 /* Remove random peers until there is space for a new one */
840 while (num_valid_peers_max <=
841 GNUNET_CONTAINER_multipeermap_size (valid_peers))
843 rand_peer = get_random_peer_from_peermap (valid_peers);
844 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
847 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
848 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
849 if (valid_peers == msub->valid_peers)
851 GNUNET_STATISTICS_set (stats,
853 GNUNET_CONTAINER_multipeermap_size (valid_peers),
861 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
864 * @brief Set the peer flag to living and
865 * call the pending operations on this peer.
867 * Also adds peer to #valid_peers.
869 * @param peer_ctx the #PeerContext of the peer to set online
872 set_peer_online (struct PeerContext *peer_ctx)
874 struct GNUNET_PeerIdentity *peer;
877 peer = &peer_ctx->peer_id;
878 LOG (GNUNET_ERROR_TYPE_DEBUG,
879 "Peer %s is online and valid, calling %i pending operations on it\n",
881 peer_ctx->num_pending_ops);
883 if (NULL != peer_ctx->online_check_pending)
885 LOG (GNUNET_ERROR_TYPE_DEBUG,
886 "Removing pending online check for peer %s\n",
887 GNUNET_i2s (&peer_ctx->peer_id));
888 // TODO wait until cadet sets mq->cancel_impl
889 // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
890 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
891 peer_ctx->online_check_pending = NULL;
894 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
896 /* Call pending operations */
897 for (i = 0; i < peer_ctx->num_pending_ops; i++)
899 peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
901 GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
906 cleanup_destroyed_channel (void *cls,
907 const struct GNUNET_CADET_Channel *channel);
909 /* Declaration of handlers */
911 handle_peer_check (void *cls,
912 const struct GNUNET_MessageHeader *msg);
915 handle_peer_push (void *cls,
916 const struct GNUNET_MessageHeader *msg);
919 handle_peer_pull_request (void *cls,
920 const struct GNUNET_MessageHeader *msg);
923 check_peer_pull_reply (void *cls,
924 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
927 handle_peer_pull_reply (void *cls,
928 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
930 /* End declaration of handlers */
933 * @brief Allocate memory for a new channel context and insert it into DLL
935 * @param peer_ctx context of the according peer
937 * @return The channel context
939 static struct ChannelCtx *
940 add_channel_ctx (struct PeerContext *peer_ctx)
942 struct ChannelCtx *channel_ctx;
944 channel_ctx = GNUNET_new (struct ChannelCtx);
945 channel_ctx->peer_ctx = peer_ctx;
951 * @brief Free memory and NULL pointers.
953 * @param channel_ctx The channel context.
956 remove_channel_ctx (struct ChannelCtx *channel_ctx)
958 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
960 if (NULL != channel_ctx->destruction_task)
962 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
963 channel_ctx->destruction_task = NULL;
966 GNUNET_free (channel_ctx);
968 if (NULL == peer_ctx)
970 if (channel_ctx == peer_ctx->send_channel_ctx)
972 peer_ctx->send_channel_ctx = NULL;
975 else if (channel_ctx == peer_ctx->recv_channel_ctx)
977 peer_ctx->recv_channel_ctx = NULL;
983 * @brief Get the channel of a peer. If not existing, create.
985 * @param peer_ctx Context of the peer of which to get the channel
986 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
988 struct GNUNET_CADET_Channel *
989 get_channel (struct PeerContext *peer_ctx)
991 /* There exists a copy-paste-clone in run() */
992 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
993 GNUNET_MQ_hd_fixed_size (peer_check,
994 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
995 struct GNUNET_MessageHeader,
997 GNUNET_MQ_hd_fixed_size (peer_push,
998 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
999 struct GNUNET_MessageHeader,
1001 GNUNET_MQ_hd_fixed_size (peer_pull_request,
1002 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
1003 struct GNUNET_MessageHeader,
1005 GNUNET_MQ_hd_var_size (peer_pull_reply,
1006 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
1007 struct GNUNET_RPS_P2P_PullReplyMessage,
1009 GNUNET_MQ_handler_end ()
1013 if (NULL == peer_ctx->send_channel_ctx)
1015 LOG (GNUNET_ERROR_TYPE_DEBUG,
1016 "Trying to establish channel to peer %s\n",
1017 GNUNET_i2s (&peer_ctx->peer_id));
1018 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1019 peer_ctx->send_channel_ctx->channel =
1020 GNUNET_CADET_channel_create (cadet_handle,
1021 peer_ctx->send_channel_ctx, /* context */
1023 &peer_ctx->sub->hash,
1024 NULL, /* WindowSize handler */
1025 &cleanup_destroyed_channel, /* Disconnect handler */
1028 GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1029 GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1030 return peer_ctx->send_channel_ctx->channel;
1035 * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1037 * If we already have a message queue open to this client,
1038 * simply return it, otherways create one.
1040 * @param peer_ctx Context of the peer of whicht to get the mq
1041 * @return the #GNUNET_MQ_Handle
1043 static struct GNUNET_MQ_Handle *
1044 get_mq (struct PeerContext *peer_ctx)
1046 if (NULL == peer_ctx->mq)
1048 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1050 return peer_ctx->mq;
1055 * @brief Add an envelope to a message passed to mq to list of pending messages
1057 * @param peer_ctx Context of the peer for which to insert the envelope
1058 * @param ev envelope to the message
1059 * @param type type of the message to be sent
1060 * @return pointer to pending message
1062 static struct PendingMessage *
1063 insert_pending_message (struct PeerContext *peer_ctx,
1064 struct GNUNET_MQ_Envelope *ev,
1067 struct PendingMessage *pending_msg;
1069 pending_msg = GNUNET_new (struct PendingMessage);
1070 pending_msg->ev = ev;
1071 pending_msg->peer_ctx = peer_ctx;
1072 pending_msg->type = type;
1073 GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1074 peer_ctx->pending_messages_tail,
1081 * @brief Remove a pending message from the respective DLL
1083 * @param pending_msg the pending message to remove
1084 * @param cancel whether to cancel the pending message, too
1087 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1089 struct PeerContext *peer_ctx;
1093 peer_ctx = pending_msg->peer_ctx;
1094 GNUNET_assert (NULL != peer_ctx);
1095 GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1096 peer_ctx->pending_messages_tail,
1098 // TODO wait for the cadet implementation of message cancellation
1099 // if (GNUNET_YES == cancel)
1101 // GNUNET_MQ_send_cancel (pending_msg->ev);
1103 GNUNET_free (pending_msg);
1108 * @brief This is called in response to the first message we sent as a
1111 * @param cls #PeerContext of peer with pending online check
1114 mq_online_check_successful (void *cls)
1116 struct PeerContext *peer_ctx = cls;
1118 if (NULL != peer_ctx->online_check_pending)
1120 LOG (GNUNET_ERROR_TYPE_DEBUG,
1121 "Online check for peer %s was successfull\n",
1122 GNUNET_i2s (&peer_ctx->peer_id));
1123 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1124 peer_ctx->online_check_pending = NULL;
1125 set_peer_online (peer_ctx);
1126 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1132 * Issue a check whether peer is online
1134 * @param peer_ctx the context of the peer
1137 check_peer_online (struct PeerContext *peer_ctx)
1139 LOG (GNUNET_ERROR_TYPE_DEBUG,
1140 "Get informed about peer %s getting online\n",
1141 GNUNET_i2s (&peer_ctx->peer_id));
1143 struct GNUNET_MQ_Handle *mq;
1144 struct GNUNET_MQ_Envelope *ev;
1146 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1147 peer_ctx->online_check_pending =
1148 insert_pending_message (peer_ctx, ev, "Check online");
1149 mq = get_mq (peer_ctx);
1150 GNUNET_MQ_notify_sent (ev,
1151 mq_online_check_successful,
1153 GNUNET_MQ_send (mq, ev);
1154 if (peer_ctx->sub == msub)
1156 GNUNET_STATISTICS_update (stats,
1157 "# pending online checks",
1165 * @brief Check whether function of type #PeerOp was already scheduled
1167 * The array with pending operations will probably never grow really big, so
1168 * iterating over it should be ok.
1170 * @param peer_ctx Context of the peer to check for the operation
1171 * @param peer_op the operation (#PeerOp) on the peer
1173 * @return #GNUNET_YES if this operation is scheduled on that peer
1174 * #GNUNET_NO otherwise
1177 check_operation_scheduled (const struct PeerContext *peer_ctx,
1178 const PeerOp peer_op)
1182 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1183 if (peer_op == peer_ctx->pending_ops[i].op)
1190 * @brief Callback for scheduler to destroy a channel
1192 * @param cls Context of the channel
1195 destroy_channel (struct ChannelCtx *channel_ctx)
1197 struct GNUNET_CADET_Channel *channel;
1199 if (NULL != channel_ctx->destruction_task)
1201 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1202 channel_ctx->destruction_task = NULL;
1204 GNUNET_assert (channel_ctx->channel != NULL);
1205 channel = channel_ctx->channel;
1206 channel_ctx->channel = NULL;
1207 GNUNET_CADET_channel_destroy (channel);
1208 remove_channel_ctx (channel_ctx);
1213 * @brief Destroy a cadet channel.
1215 * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1220 destroy_channel_cb (void *cls)
1222 struct ChannelCtx *channel_ctx = cls;
1224 channel_ctx->destruction_task = NULL;
1225 destroy_channel (channel_ctx);
1230 * @brief Schedule the destruction of a channel for immediately afterwards.
1232 * In case a channel is to be destroyed from within the callback to the
1233 * destruction of another channel (send channel), we cannot call
1234 * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1237 * @param channel_ctx channel to be destroyed.
1240 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1242 GNUNET_assert (NULL ==
1243 channel_ctx->destruction_task);
1244 GNUNET_assert (NULL !=
1245 channel_ctx->channel);
1246 channel_ctx->destruction_task =
1247 GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1253 * @brief Remove peer
1255 * - Empties the list with pending operations
1256 * - Empties the list with pending messages
1257 * - Cancels potentially existing online check
1258 * - Schedules closing of send and recv channels
1259 * - Removes peer from peer map
1261 * @param peer_ctx Context of the peer to be destroyed
1262 * @return #GNUNET_YES if peer was removed
1263 * #GNUNET_NO otherwise
1266 destroy_peer (struct PeerContext *peer_ctx)
1268 GNUNET_assert (NULL != peer_ctx);
1269 GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1271 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1272 &peer_ctx->peer_id))
1276 SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1277 LOG (GNUNET_ERROR_TYPE_DEBUG,
1278 "Going to remove peer %s\n",
1279 GNUNET_i2s (&peer_ctx->peer_id));
1280 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1282 /* Clear list of pending operations */
1283 // TODO this probably leaks memory
1284 // ('only' the cls to the function. Not sure what to do with it)
1285 GNUNET_array_grow (peer_ctx->pending_ops,
1286 peer_ctx->num_pending_ops,
1288 /* Remove all pending messages */
1289 while (NULL != peer_ctx->pending_messages_head)
1291 LOG (GNUNET_ERROR_TYPE_DEBUG,
1292 "Removing unsent %s\n",
1293 peer_ctx->pending_messages_head->type);
1294 /* Cancle pending message, too */
1295 if ((NULL != peer_ctx->online_check_pending) &&
1296 (0 == memcmp (peer_ctx->pending_messages_head,
1297 peer_ctx->online_check_pending,
1298 sizeof(struct PendingMessage))))
1300 peer_ctx->online_check_pending = NULL;
1301 if (peer_ctx->sub == msub)
1303 GNUNET_STATISTICS_update (stats,
1304 "# pending online checks",
1309 remove_pending_message (peer_ctx->pending_messages_head,
1313 /* If we are still waiting for notification whether this peer is online
1314 * cancel the according task */
1315 if (NULL != peer_ctx->online_check_pending)
1317 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318 "Removing pending online check for peer %s\n",
1319 GNUNET_i2s (&peer_ctx->peer_id));
1320 // TODO wait until cadet sets mq->cancel_impl
1321 // GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1322 remove_pending_message (peer_ctx->online_check_pending,
1324 peer_ctx->online_check_pending = NULL;
1327 if (NULL != peer_ctx->send_channel_ctx)
1329 /* This is possibly called from within channel destruction */
1330 peer_ctx->send_channel_ctx->peer_ctx = NULL;
1331 schedule_channel_destruction (peer_ctx->send_channel_ctx);
1332 peer_ctx->send_channel_ctx = NULL;
1333 peer_ctx->mq = NULL;
1335 if (NULL != peer_ctx->recv_channel_ctx)
1337 /* This is possibly called from within channel destruction */
1338 peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1339 schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1340 peer_ctx->recv_channel_ctx = NULL;
1344 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1345 &peer_ctx->peer_id))
1347 LOG (GNUNET_ERROR_TYPE_WARNING,
1348 "removing peer from peer_ctx->sub->peer_map failed\n");
1350 if (peer_ctx->sub == msub)
1352 GNUNET_STATISTICS_set (stats,
1354 GNUNET_CONTAINER_multipeermap_size (
1355 peer_ctx->sub->peer_map),
1358 GNUNET_free (peer_ctx);
1364 * Iterator over hash map entries. Deletes all contexts of peers.
1366 * @param cls closure
1367 * @param key current public key
1368 * @param value value in the hash map
1369 * @return #GNUNET_YES if we should continue to iterate,
1370 * #GNUNET_NO if not.
1373 peermap_clear_iterator (void *cls,
1374 const struct GNUNET_PeerIdentity *key,
1377 struct Sub *sub = cls;
1381 destroy_peer (get_peer_ctx (sub->peer_map, key));
1387 * @brief This is called once a message is sent.
1389 * Removes the pending message
1391 * @param cls type of the message that was sent
1394 mq_notify_sent_cb (void *cls)
1396 struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1398 LOG (GNUNET_ERROR_TYPE_DEBUG,
1401 if (pending_msg->peer_ctx->sub == msub)
1403 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1404 GNUNET_STATISTICS_update (stats, "# pull replys sent", 1, GNUNET_NO);
1405 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1406 GNUNET_STATISTICS_update (stats, "# pull requests sent", 1, GNUNET_NO);
1407 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1408 GNUNET_STATISTICS_update (stats, "# pushes sent", 1, GNUNET_NO);
1409 if ((0 == strncmp ("PULL REQUEST", pending_msg->type, 12)) &&
1410 (NULL != map_single_hop) &&
1411 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1413 peer_ctx->peer_id)) )
1414 GNUNET_STATISTICS_update (stats,
1415 "# pull requests sent (multi-hop peer)",
1419 /* Do not cancle message */
1420 remove_pending_message (pending_msg, GNUNET_NO);
1425 * @brief Iterator function for #store_valid_peers.
1427 * Implements #GNUNET_CONTAINER_PeerMapIterator.
1428 * Writes single peer to disk.
1430 * @param cls the file handle to write to.
1431 * @param peer current peer
1432 * @param value unused
1434 * @return #GNUNET_YES if we should continue to
1436 * #GNUNET_NO if not.
1439 store_peer_presistently_iterator (void *cls,
1440 const struct GNUNET_PeerIdentity *peer,
1443 const struct GNUNET_DISK_FileHandle *fh = cls;
1444 char peer_string[128];
1454 size = GNUNET_snprintf (peer_string,
1455 sizeof(peer_string),
1457 GNUNET_i2s_full (peer));
1458 GNUNET_assert (53 == size);
1459 ret = GNUNET_DISK_file_write (fh,
1462 GNUNET_assert (size == ret);
1468 * @brief Store the peers currently in #valid_peers to disk.
1470 * @param sub Sub for which to store the valid peers
1473 store_valid_peers (const struct Sub *sub)
1475 struct GNUNET_DISK_FileHandle *fh;
1476 uint32_t number_written_peers;
1479 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1484 ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1485 if (GNUNET_SYSERR == ret)
1487 LOG (GNUNET_ERROR_TYPE_WARNING,
1488 "Not able to create directory for file `%s'\n",
1489 sub->filename_valid_peers);
1492 else if (GNUNET_NO == ret)
1494 LOG (GNUNET_ERROR_TYPE_WARNING,
1495 "Directory for file `%s' exists but is not writable for us\n",
1496 sub->filename_valid_peers);
1499 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1500 GNUNET_DISK_OPEN_WRITE
1501 | GNUNET_DISK_OPEN_CREATE,
1502 GNUNET_DISK_PERM_USER_READ
1503 | GNUNET_DISK_PERM_USER_WRITE);
1506 LOG (GNUNET_ERROR_TYPE_WARNING,
1507 "Not able to write valid peers to file `%s'\n",
1508 sub->filename_valid_peers);
1511 LOG (GNUNET_ERROR_TYPE_DEBUG,
1512 "Writing %u valid peers to disk\n",
1513 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1514 number_written_peers =
1515 GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1516 store_peer_presistently_iterator,
1518 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1519 GNUNET_assert (number_written_peers ==
1520 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1525 * @brief Convert string representation of peer id to peer id.
1527 * Counterpart to #GNUNET_i2s_full.
1529 * @param string_repr The string representation of the peer id
1531 * @return The peer id
1533 static const struct GNUNET_PeerIdentity *
1534 s2i_full (const char *string_repr)
1536 struct GNUNET_PeerIdentity *peer;
1540 peer = GNUNET_new (struct GNUNET_PeerIdentity);
1541 len = strlen (string_repr);
1544 LOG (GNUNET_ERROR_TYPE_WARNING,
1545 "Not able to convert string representation of PeerID to PeerID\n"
1546 "Sting representation: %s (len %lu) - too short\n",
1555 ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1558 if (GNUNET_OK != ret)
1560 LOG (GNUNET_ERROR_TYPE_WARNING,
1561 "Not able to convert string representation of PeerID to PeerID\n"
1562 "Sting representation: %s\n",
1571 * @brief Restore the peers on disk to #valid_peers.
1573 * @param sub Sub for which to restore the valid peers
1576 restore_valid_peers (const struct Sub *sub)
1580 struct GNUNET_DISK_FileHandle *fh;
1585 const struct GNUNET_PeerIdentity *peer;
1587 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1592 if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1596 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1597 GNUNET_DISK_OPEN_READ,
1598 GNUNET_DISK_PERM_NONE);
1599 GNUNET_assert (NULL != fh);
1600 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1601 num_peers = file_size / 53;
1602 buf = GNUNET_malloc (file_size);
1603 size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1604 GNUNET_assert (size_read == file_size);
1605 LOG (GNUNET_ERROR_TYPE_DEBUG,
1606 "Restoring %" PRIu32 " peers from file `%s'\n",
1608 sub->filename_valid_peers);
1609 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1611 str_repr = GNUNET_strndup (iter_buf, 53);
1612 peer = s2i_full (str_repr);
1613 GNUNET_free (str_repr);
1614 add_valid_peer (peer, sub->valid_peers);
1615 LOG (GNUNET_ERROR_TYPE_DEBUG,
1616 "Restored valid peer %s from disk\n",
1617 GNUNET_i2s_full (peer));
1621 LOG (GNUNET_ERROR_TYPE_DEBUG,
1622 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1624 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1625 if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1627 LOG (GNUNET_ERROR_TYPE_WARNING,
1628 "Number of restored peers does not match file size. Have probably duplicates.\n");
1630 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1631 LOG (GNUNET_ERROR_TYPE_DEBUG,
1632 "Restored %u valid peers from disk\n",
1633 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1638 * @brief Delete storage of peers that was created with #initialise_peers ()
1640 * @param sub Sub for which the storage is deleted
1643 peers_terminate (struct Sub *sub)
1645 if (GNUNET_SYSERR ==
1646 GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1647 &peermap_clear_iterator,
1650 LOG (GNUNET_ERROR_TYPE_WARNING,
1651 "Iteration destroying peers was aborted.\n");
1653 GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1654 sub->peer_map = NULL;
1655 store_valid_peers (sub);
1656 GNUNET_free (sub->filename_valid_peers);
1657 sub->filename_valid_peers = NULL;
1658 GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1659 sub->valid_peers = NULL;
1664 * Iterator over #valid_peers hash map entries.
1666 * @param cls Closure that contains iterator function and closure
1667 * @param peer current peer id
1668 * @param value value in the hash map - unused
1669 * @return #GNUNET_YES if we should continue to
1671 * #GNUNET_NO if not.
1674 valid_peer_iterator (void *cls,
1675 const struct GNUNET_PeerIdentity *peer,
1678 struct PeersIteratorCls *it_cls = cls;
1682 return it_cls->iterator (it_cls->cls, peer);
1687 * @brief Get all currently known, valid peer ids.
1689 * @param valid_peers Peer map containing the valid peers in question
1690 * @param iterator function to call on each peer id
1691 * @param it_cls extra argument to @a iterator
1692 * @return the number of key value pairs processed,
1693 * #GNUNET_SYSERR if it aborted iteration
1696 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1697 PeersIterator iterator,
1700 struct PeersIteratorCls *cls;
1703 cls = GNUNET_new (struct PeersIteratorCls);
1704 cls->iterator = iterator;
1706 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1707 valid_peer_iterator,
1715 * @brief Add peer to known peers.
1717 * This function is called on new peer_ids from 'external' sources
1718 * (client seed, cadet get_peers(), ...)
1720 * @param sub Sub with the peer map that the @a peer will be added to
1721 * @param peer the new #GNUNET_PeerIdentity
1723 * @return #GNUNET_YES if peer was inserted
1724 * #GNUNET_NO otherwise
1727 insert_peer (struct Sub *sub,
1728 const struct GNUNET_PeerIdentity *peer)
1730 if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1732 return GNUNET_NO; /* We already know this peer - nothing to do */
1734 (void) create_peer_ctx (sub, peer);
1740 * @brief Check whether flags on a peer are set.
1742 * @param peer_map Peer map that is expected to contain the @a peer
1743 * @param peer the peer to check the flag of
1744 * @param flags the flags to check
1746 * @return #GNUNET_SYSERR if peer is not known
1747 * #GNUNET_YES if all given flags are set
1748 * #GNUNET_NO otherwise
1751 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1752 const struct GNUNET_PeerIdentity *peer,
1753 enum Peers_PeerFlags flags)
1755 struct PeerContext *peer_ctx;
1757 if (GNUNET_NO == check_peer_known (peer_map, peer))
1759 return GNUNET_SYSERR;
1761 peer_ctx = get_peer_ctx (peer_map, peer);
1762 return check_peer_flag_set (peer_ctx, flags);
1767 * @brief Try connecting to a peer to see whether it is online
1769 * If not known yet, insert into known peers
1771 * @param sub Sub which would contain the @a peer
1772 * @param peer the peer whose online is to be checked
1773 * @return #GNUNET_YES if the check was issued
1774 * #GNUNET_NO otherwise
1777 issue_peer_online_check (struct Sub *sub,
1778 const struct GNUNET_PeerIdentity *peer)
1780 struct PeerContext *peer_ctx;
1782 (void) insert_peer (sub, peer); // TODO even needed?
1783 peer_ctx = get_peer_ctx (sub->peer_map, peer);
1784 if ((GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1785 (NULL == peer_ctx->online_check_pending))
1787 check_peer_online (peer_ctx);
1795 * @brief Check if peer is removable.
1798 * - a recv channel exists
1799 * - there are pending messages
1800 * - there is no pending pull reply
1802 * @param peer_ctx Context of the peer in question
1803 * @return #GNUNET_YES if peer is removable
1804 * #GNUNET_NO if peer is NOT removable
1805 * #GNUNET_SYSERR if peer is not known
1808 check_removable (const struct PeerContext *peer_ctx)
1810 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (
1811 peer_ctx->sub->peer_map,
1812 &peer_ctx->peer_id))
1814 return GNUNET_SYSERR;
1817 if ((NULL != peer_ctx->recv_channel_ctx) ||
1818 (NULL != peer_ctx->pending_messages_head) ||
1819 (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)))
1828 * @brief Check whether @a peer is actually a peer.
1830 * A valid peer is a peer that we know exists eg. we were connected to once.
1832 * @param valid_peers Peer map that would contain the @a peer
1833 * @param peer peer in question
1835 * @return #GNUNET_YES if peer is valid
1836 * #GNUNET_NO if peer is not valid
1839 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1840 const struct GNUNET_PeerIdentity *peer)
1842 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1847 * @brief Indicate that we want to send to the other peer
1849 * This establishes a sending channel
1851 * @param peer_ctx Context of the target peer
1854 indicate_sending_intention (struct PeerContext *peer_ctx)
1856 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1857 &peer_ctx->peer_id));
1858 (void) get_channel (peer_ctx);
1863 * @brief Check whether other peer has the intention to send/opened channel
1866 * @param peer_ctx Context of the peer in question
1868 * @return #GNUNET_YES if peer has the intention to send
1869 * #GNUNET_NO otherwise
1872 check_peer_send_intention (const struct PeerContext *peer_ctx)
1874 if (NULL != peer_ctx->recv_channel_ctx)
1883 * Handle the channel a peer opens to us.
1885 * @param cls The closure - Sub
1886 * @param channel The channel the peer wants to establish
1887 * @param initiator The peer's peer ID
1889 * @return initial channel context for the channel
1890 * (can be NULL -- that's not an error)
1893 handle_inbound_channel (void *cls,
1894 struct GNUNET_CADET_Channel *channel,
1895 const struct GNUNET_PeerIdentity *initiator)
1897 struct PeerContext *peer_ctx;
1898 struct ChannelCtx *channel_ctx;
1899 struct Sub *sub = cls;
1901 LOG (GNUNET_ERROR_TYPE_DEBUG,
1902 "New channel was established to us (Peer %s).\n",
1903 GNUNET_i2s (initiator));
1904 GNUNET_assert (NULL != channel); /* according to cadet API */
1905 /* Make sure we 'know' about this peer */
1906 peer_ctx = create_or_get_peer_ctx (sub, initiator);
1907 set_peer_online (peer_ctx);
1908 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1909 channel_ctx = add_channel_ctx (peer_ctx);
1910 channel_ctx->channel = channel;
1911 /* We only accept one incoming channel per peer */
1912 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1915 LOG (GNUNET_ERROR_TYPE_WARNING,
1916 "Already got one receive channel. Destroying old one.\n");
1917 GNUNET_break_op (0);
1918 destroy_channel (peer_ctx->recv_channel_ctx);
1919 peer_ctx->recv_channel_ctx = channel_ctx;
1920 /* return the channel context */
1923 peer_ctx->recv_channel_ctx = channel_ctx;
1929 * @brief Check whether a sending channel towards the given peer exists
1931 * @param peer_ctx Context of the peer in question
1933 * @return #GNUNET_YES if a sending channel towards that peer exists
1934 * #GNUNET_NO otherwise
1937 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1939 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1940 &peer_ctx->peer_id))
1941 { /* If no such peer exists, there is no channel */
1944 if (NULL == peer_ctx->send_channel_ctx)
1953 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1954 * intention to another peer
1956 * @param peer_ctx Context to the peer
1957 * @return #GNUNET_YES if channel was destroyed
1958 * #GNUNET_NO otherwise
1961 destroy_sending_channel (struct PeerContext *peer_ctx)
1963 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1964 &peer_ctx->peer_id))
1968 if (NULL != peer_ctx->send_channel_ctx)
1970 destroy_channel (peer_ctx->send_channel_ctx);
1971 (void) check_connected (peer_ctx);
1979 * @brief Send a message to another peer.
1981 * Keeps track about pending messages so they can be properly removed when the
1982 * peer is destroyed.
1984 * @param peer_ctx Context of the peer to which the message is to be sent
1985 * @param ev envelope of the message
1986 * @param type type of the message
1989 send_message (struct PeerContext *peer_ctx,
1990 struct GNUNET_MQ_Envelope *ev,
1993 struct PendingMessage *pending_msg;
1994 struct GNUNET_MQ_Handle *mq;
1996 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1997 "Sending message to %s of type %s\n",
1998 GNUNET_i2s (&peer_ctx->peer_id),
2000 pending_msg = insert_pending_message (peer_ctx, ev, type);
2001 mq = get_mq (peer_ctx);
2002 GNUNET_MQ_notify_sent (ev,
2005 GNUNET_MQ_send (mq, ev);
2010 * @brief Schedule a operation on given peer
2012 * Avoids scheduling an operation twice.
2014 * @param peer_ctx Context of the peer for which to schedule the operation
2015 * @param peer_op the operation to schedule
2016 * @param cls Closure to @a peer_op
2018 * @return #GNUNET_YES if the operation was scheduled
2019 * #GNUNET_NO otherwise
2022 schedule_operation (struct PeerContext *peer_ctx,
2023 const PeerOp peer_op,
2026 struct PeerPendingOp pending_op;
2028 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2029 &peer_ctx->peer_id));
2031 // TODO if ONLINE execute immediately
2033 if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2035 pending_op.op = peer_op;
2036 pending_op.op_cls = cls;
2037 GNUNET_array_append (peer_ctx->pending_ops,
2038 peer_ctx->num_pending_ops,
2046 /***********************************************************************
2047 * /Old gnunet-service-rps_peers.c
2048 ***********************************************************************/
2051 /***********************************************************************
2052 * Housekeeping with clients
2053 ***********************************************************************/
2056 * Closure used to pass the client and the id to the callback
2057 * that replies to a client's request
2064 struct ReplyCls *next;
2065 struct ReplyCls *prev;
2068 * The identifier of the request
2073 * The handle to the request
2075 struct RPS_SamplerRequestHandle *req_handle;
2078 * The client handle to send the reply to
2080 struct ClientContext *cli_ctx;
2085 * Struct used to store the context of a connected client.
2087 struct ClientContext
2092 struct ClientContext *next;
2093 struct ClientContext *prev;
2096 * The message queue to communicate with the client.
2098 struct GNUNET_MQ_Handle *mq;
2101 * @brief How many updates this client expects to receive.
2103 int64_t view_updates_left;
2106 * @brief Whether this client wants to receive stream updates.
2107 * Either #GNUNET_YES or #GNUNET_NO
2109 int8_t stream_update;
2112 * The client handle to send the reply to
2114 struct GNUNET_SERVICE_Client *client;
2117 * The #Sub this context belongs to
2123 * DLL with all clients currently connected to us
2125 struct ClientContext *cli_ctx_head;
2126 struct ClientContext *cli_ctx_tail;
2128 /***********************************************************************
2129 * /Housekeeping with clients
2130 ***********************************************************************/
2133 /***********************************************************************
2135 ***********************************************************************/
2139 * Print peerlist to log.
2142 print_peer_list (struct GNUNET_PeerIdentity *list,
2147 LOG (GNUNET_ERROR_TYPE_DEBUG,
2148 "Printing peer list of length %u at %p:\n",
2151 for (i = 0; i < len; i++)
2153 LOG (GNUNET_ERROR_TYPE_DEBUG,
2155 i, GNUNET_i2s (&list[i]));
2161 * Remove peer from list.
2164 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2165 unsigned int *list_size,
2166 const struct GNUNET_PeerIdentity *peer)
2169 struct GNUNET_PeerIdentity *tmp;
2173 LOG (GNUNET_ERROR_TYPE_DEBUG,
2174 "Removing peer %s from list at %p\n",
2178 for (i = 0; i < *list_size; i++)
2180 if (0 == GNUNET_memcmp (&tmp[i], peer))
2182 if (i < *list_size - 1)
2183 { /* Not at the last entry -- shift peers left */
2184 memmove (&tmp[i], &tmp[i + 1],
2185 ((*list_size) - i - 1) * sizeof(struct GNUNET_PeerIdentity));
2187 /* Remove last entry (should be now useless PeerID) */
2188 GNUNET_array_grow (tmp, *list_size, (*list_size) - 1);
2196 * Insert PeerID in #view
2198 * Called once we know a peer is online.
2199 * Implements #PeerOp
2201 * @return GNUNET_OK if peer was actually inserted
2202 * GNUNET_NO if peer was not inserted
2205 insert_in_view_op (void *cls,
2206 const struct GNUNET_PeerIdentity *peer);
2209 * Insert PeerID in #view
2211 * Called once we know a peer is online.
2213 * @param sub Sub in with the view to insert in
2214 * @param peer the peer to insert
2216 * @return GNUNET_OK if peer was actually inserted
2217 * GNUNET_NO if peer was not inserted
2220 insert_in_view (struct Sub *sub,
2221 const struct GNUNET_PeerIdentity *peer)
2223 struct PeerContext *peer_ctx;
2227 online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2228 peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2229 if ((GNUNET_NO == online) ||
2230 (GNUNET_SYSERR == online)) /* peer is not even known */
2232 (void) issue_peer_online_check (sub, peer);
2233 (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2236 /* Open channel towards peer to keep connection open */
2237 indicate_sending_intention (peer_ctx);
2238 ret = View_put (sub->view, peer);
2239 if (peer_ctx->sub == msub)
2241 GNUNET_STATISTICS_set (stats,
2243 View_size (peer_ctx->sub->view),
2251 * @brief Send view to client
2253 * @param cli_ctx the context of the client
2254 * @param view_array the peerids of the view as array (can be empty)
2255 * @param view_size the size of the view array (can be 0)
2258 send_view (const struct ClientContext *cli_ctx,
2259 const struct GNUNET_PeerIdentity *view_array,
2262 struct GNUNET_MQ_Envelope *ev;
2263 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2266 if (NULL == view_array)
2268 if (NULL == cli_ctx->sub)
2272 view_size = View_size (sub->view);
2273 view_array = View_get_as_array (sub->view);
2276 ev = GNUNET_MQ_msg_extra (out_msg,
2277 view_size * sizeof(struct GNUNET_PeerIdentity),
2278 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2279 out_msg->num_peers = htonl (view_size);
2281 GNUNET_memcpy (&out_msg[1],
2283 view_size * sizeof(struct GNUNET_PeerIdentity));
2284 GNUNET_MQ_send (cli_ctx->mq, ev);
2289 * @brief Send peer from biased stream to client.
2291 * TODO merge with send_view, parameterise
2293 * @param cli_ctx the context of the client
2294 * @param view_array the peerids of the view as array (can be empty)
2295 * @param view_size the size of the view array (can be 0)
2298 send_stream_peers (const struct ClientContext *cli_ctx,
2300 const struct GNUNET_PeerIdentity *peers)
2302 struct GNUNET_MQ_Envelope *ev;
2303 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2305 GNUNET_assert (NULL != peers);
2307 ev = GNUNET_MQ_msg_extra (out_msg,
2308 num_peers * sizeof(struct GNUNET_PeerIdentity),
2309 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2310 out_msg->num_peers = htonl (num_peers);
2312 GNUNET_memcpy (&out_msg[1],
2314 num_peers * sizeof(struct GNUNET_PeerIdentity));
2315 GNUNET_MQ_send (cli_ctx->mq, ev);
2320 * @brief sends updates to clients that are interested
2322 * @param sub Sub for which to notify clients
2325 clients_notify_view_update (const struct Sub *sub)
2327 struct ClientContext *cli_ctx_iter;
2329 const struct GNUNET_PeerIdentity *view_array;
2331 num_peers = View_size (sub->view);
2332 view_array = View_get_as_array (sub->view);
2333 /* check size of view is small enough */
2334 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2336 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2337 "View is too big to send\n");
2341 for (cli_ctx_iter = cli_ctx_head;
2342 NULL != cli_ctx_iter;
2343 cli_ctx_iter = cli_ctx_iter->next)
2345 if (1 < cli_ctx_iter->view_updates_left)
2347 /* Client wants to receive limited amount of updates */
2348 cli_ctx_iter->view_updates_left -= 1;
2350 else if (1 == cli_ctx_iter->view_updates_left)
2352 /* Last update of view for client */
2353 cli_ctx_iter->view_updates_left = -1;
2355 else if (0 > cli_ctx_iter->view_updates_left)
2357 /* Client is not interested in updates */
2360 /* else _updates_left == 0 - infinite amount of updates */
2363 send_view (cli_ctx_iter, view_array, num_peers);
2369 * @brief sends updates to clients that are interested
2371 * @param num_peers Number of peers to send
2372 * @param peers the array of peers to send
2375 clients_notify_stream_peer (const struct Sub *sub,
2377 const struct GNUNET_PeerIdentity *peers)
2378 // TODO enum StreamPeerSource)
2380 struct ClientContext *cli_ctx_iter;
2382 LOG (GNUNET_ERROR_TYPE_DEBUG,
2383 "Got peer (%s) from biased stream - update all clients\n",
2384 GNUNET_i2s (peers));
2386 for (cli_ctx_iter = cli_ctx_head;
2387 NULL != cli_ctx_iter;
2388 cli_ctx_iter = cli_ctx_iter->next)
2390 if ((GNUNET_YES == cli_ctx_iter->stream_update) &&
2391 ((sub == cli_ctx_iter->sub) || (sub == msub) ))
2393 send_stream_peers (cli_ctx_iter, num_peers, peers);
2400 * Put random peer from sampler into the view as history update.
2402 * @param ids Array of Peers to insert into view
2403 * @param num_peers Number of peers to insert
2404 * @param cls Closure - The Sub for which this is to be done
2407 hist_update (const struct GNUNET_PeerIdentity *ids,
2412 struct Sub *sub = cls;
2414 for (i = 0; i < num_peers; i++)
2417 if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2419 LOG (GNUNET_ERROR_TYPE_WARNING,
2420 "Peer in history update not known!\n");
2423 inserted = insert_in_view (sub, &ids[i]);
2424 if (GNUNET_OK == inserted)
2426 clients_notify_stream_peer (sub, 1, &ids[i]);
2429 to_file (sub->file_name_view_log,
2431 GNUNET_i2s_full (ids));
2432 #endif /* TO_FILE_FULL */
2434 clients_notify_view_update (sub);
2439 * Wrapper around #RPS_sampler_resize()
2441 * If we do not have enough sampler elements, double current sampler size
2442 * If we have more than enough sampler elements, halv current sampler size
2444 * @param sampler The sampler to resize
2445 * @param new_size New size to which to resize
2448 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2450 unsigned int sampler_size;
2453 // TODO respect the min, max
2454 sampler_size = RPS_sampler_get_size (sampler);
2455 if (sampler_size > new_size * 4)
2457 RPS_sampler_resize (sampler, sampler_size / 2);
2459 else if (sampler_size < new_size)
2461 RPS_sampler_resize (sampler, sampler_size * 2);
2463 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2468 * Add all peers in @a peer_array to @a peer_map used as set.
2470 * @param peer_array array containing the peers
2471 * @param num_peers number of peers in @peer_array
2472 * @param peer_map the peermap to use as set
2475 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2476 unsigned int num_peers,
2477 struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2481 if (NULL == peer_map)
2483 LOG (GNUNET_ERROR_TYPE_WARNING,
2484 "Trying to add peers to non-existing peermap.\n");
2488 for (i = 0; i < num_peers; i++)
2490 GNUNET_CONTAINER_multipeermap_put (peer_map,
2493 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2494 if (msub->peer_map == peer_map)
2496 GNUNET_STATISTICS_set (stats,
2498 GNUNET_CONTAINER_multipeermap_size (peer_map),
2506 * Send a PULL REPLY to @a peer_id
2508 * @param peer_ctx Context of the peer to send the reply to
2509 * @param peer_ids the peers to send to @a peer_id
2510 * @param num_peer_ids the number of peers to send to @a peer_id
2513 send_pull_reply (struct PeerContext *peer_ctx,
2514 const struct GNUNET_PeerIdentity *peer_ids,
2515 unsigned int num_peer_ids)
2518 struct GNUNET_MQ_Envelope *ev;
2519 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2521 /* Compute actual size */
2522 send_size = sizeof(struct GNUNET_RPS_P2P_PullReplyMessage)
2523 + num_peer_ids * sizeof(struct GNUNET_PeerIdentity);
2525 if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2526 /* Compute number of peers to send
2527 * If too long, simply truncate */
2528 // TODO select random ones via permutation
2529 // or even better: do good protocol design
2531 (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE
2532 - sizeof(struct GNUNET_RPS_P2P_PullReplyMessage))
2533 / sizeof(struct GNUNET_PeerIdentity);
2535 send_size = num_peer_ids;
2537 LOG (GNUNET_ERROR_TYPE_DEBUG,
2538 "Going to send PULL REPLY with %u peers to %s\n",
2539 send_size, GNUNET_i2s (&peer_ctx->peer_id));
2541 ev = GNUNET_MQ_msg_extra (out_msg,
2542 send_size * sizeof(struct GNUNET_PeerIdentity),
2543 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2544 out_msg->num_peers = htonl (send_size);
2545 GNUNET_memcpy (&out_msg[1], peer_ids,
2546 send_size * sizeof(struct GNUNET_PeerIdentity));
2548 send_message (peer_ctx, ev, "PULL REPLY");
2549 if (peer_ctx->sub == msub)
2551 GNUNET_STATISTICS_update (stats, "# pull reply send issued", 1, GNUNET_NO);
2553 // TODO check with send intention: as send_channel is used/opened we indicate
2554 // a sending intention without intending it.
2555 // -> clean peer afterwards?
2556 // -> use recv_channel?
2561 * Insert PeerID in #pull_map
2563 * Called once we know a peer is online.
2565 * @param cls Closure - Sub with the pull map to insert into
2566 * @param peer Peer to insert
2569 insert_in_pull_map (void *cls,
2570 const struct GNUNET_PeerIdentity *peer)
2572 struct Sub *sub = cls;
2574 CustomPeerMap_put (sub->pull_map, peer);
2579 * Insert PeerID in #view
2581 * Called once we know a peer is online.
2582 * Implements #PeerOp
2584 * @param cls Closure - Sub with view to insert peer into
2585 * @param peer the peer to insert
2588 insert_in_view_op (void *cls,
2589 const struct GNUNET_PeerIdentity *peer)
2591 struct Sub *sub = cls;
2594 inserted = insert_in_view (sub, peer);
2595 if (GNUNET_OK == inserted)
2597 clients_notify_stream_peer (sub, 1, peer);
2603 * Update sampler with given PeerID.
2604 * Implements #PeerOp
2606 * @param cls Closure - Sub containing the sampler to insert into
2607 * @param peer Peer to insert
2610 insert_in_sampler (void *cls,
2611 const struct GNUNET_PeerIdentity *peer)
2613 struct Sub *sub = cls;
2615 LOG (GNUNET_ERROR_TYPE_DEBUG,
2616 "Updating samplers with peer %s from insert_in_sampler()\n",
2618 RPS_sampler_update (sub->sampler, peer);
2619 if (0 < RPS_sampler_count_id (sub->sampler, peer))
2621 /* Make sure we 'know' about this peer */
2622 (void) issue_peer_online_check (sub, peer);
2623 /* Establish a channel towards that peer to indicate we are going to send
2625 // indicate_sending_intention (peer);
2629 GNUNET_STATISTICS_update (stats,
2630 "# observed peers in gossip",
2635 sub->num_observed_peers++;
2636 GNUNET_CONTAINER_multipeermap_put
2637 (sub->observed_unique_peers,
2640 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2641 uint32_t num_observed_unique_peers =
2642 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2643 GNUNET_STATISTICS_set (stats,
2644 "# unique peers in gossip",
2645 num_observed_unique_peers,
2648 to_file (sub->file_name_observed_log,
2649 "%" PRIu32 " %" PRIu32 " %f\n",
2650 sub->num_observed_peers,
2651 num_observed_unique_peers,
2652 1.0 * num_observed_unique_peers / sub->num_observed_peers)
2653 #endif /* TO_FILE_FULL */
2654 #endif /* TO_FILE */
2659 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2660 * If the peer is not known, online check is issued and it is
2661 * scheduled to be inserted in sampler and view.
2663 * "External sources" refer to every source except the gossip.
2665 * @param sub Sub for which @a peer was received
2666 * @param peer peer to insert/peer received
2669 got_peer (struct Sub *sub,
2670 const struct GNUNET_PeerIdentity *peer)
2672 /* If we did not know this peer already, insert it into sampler and view */
2673 if (GNUNET_YES == issue_peer_online_check (sub, peer))
2675 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2676 &insert_in_sampler, sub);
2677 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2678 &insert_in_view_op, sub);
2682 GNUNET_STATISTICS_update (stats,
2691 * @brief Checks if there is a sending channel and if it is needed
2693 * @param peer_ctx Context of the peer to check
2694 * @return GNUNET_YES if sending channel exists and is still needed
2695 * GNUNET_NO otherwise
2698 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2700 /* struct GNUNET_CADET_Channel *channel; */
2701 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2702 &peer_ctx->peer_id))
2706 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2708 if ((0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2709 &peer_ctx->peer_id)) ||
2710 (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2711 &peer_ctx->peer_id)) ||
2712 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2713 &peer_ctx->peer_id)) ||
2714 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2715 &peer_ctx->peer_id)) ||
2716 (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2718 Peers_PULL_REPLY_PENDING)))
2719 { /* If we want to keep the connection to peer open */
2729 * @brief remove peer from our knowledge, the view, push and pull maps and
2732 * @param sub Sub with the data structures the peer is to be removed from
2733 * @param peer the peer to remove
2736 remove_peer (struct Sub *sub,
2737 const struct GNUNET_PeerIdentity *peer)
2739 (void) View_remove_peer (sub->view,
2741 CustomPeerMap_remove_peer (sub->pull_map,
2743 CustomPeerMap_remove_peer (sub->push_map,
2745 RPS_sampler_reinitialise_by_value (sub->sampler,
2747 /* We want to destroy the peer now.
2748 * Sometimes, it just seems that it's already been removed from the peer_map,
2749 * so check the peer_map first. */
2750 if (GNUNET_YES == check_peer_known (sub->peer_map,
2753 destroy_peer (get_peer_ctx (sub->peer_map,
2760 * @brief Remove data that is not needed anymore.
2762 * If the sending channel is no longer needed it is destroyed.
2764 * @param sub Sub in which the current peer is to be cleaned
2765 * @param peer the peer whose data is about to be cleaned
2768 clean_peer (struct Sub *sub,
2769 const struct GNUNET_PeerIdentity *peer)
2771 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2774 LOG (GNUNET_ERROR_TYPE_DEBUG,
2775 "Going to remove send channel to peer %s\n",
2777 #if ENABLE_MALICIOUS
2778 if (0 != GNUNET_memcmp (&attacked_peer,
2780 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2782 #else /* ENABLE_MALICIOUS */
2783 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2785 #endif /* ENABLE_MALICIOUS */
2788 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2791 /* Peer was already removed by callback on destroyed channel */
2792 LOG (GNUNET_ERROR_TYPE_WARNING,
2793 "Peer was removed from our knowledge during cleanup\n");
2797 if ((GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2799 (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2800 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2801 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2802 (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2803 (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))))
2804 { /* We can safely remove this peer */
2805 LOG (GNUNET_ERROR_TYPE_DEBUG,
2806 "Going to remove peer %s\n",
2808 remove_peer (sub, peer);
2815 * @brief This is called when a channel is destroyed.
2817 * Removes peer completely from our knowledge if the send_channel was destroyed
2818 * Otherwise simply delete the recv_channel
2819 * Also check if the knowledge about this peer is still needed.
2820 * If not, remove this peer from our knowledge.
2822 * @param cls The closure - Context to the channel
2823 * @param channel The channel being closed
2826 cleanup_destroyed_channel (void *cls,
2827 const struct GNUNET_CADET_Channel *channel)
2829 struct ChannelCtx *channel_ctx = cls;
2830 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2834 channel_ctx->channel = NULL;
2835 remove_channel_ctx (channel_ctx);
2836 if ((NULL != peer_ctx) &&
2837 (peer_ctx->send_channel_ctx == channel_ctx) &&
2838 (GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx)) )
2840 remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2845 /***********************************************************************
2847 ***********************************************************************/
2850 /***********************************************************************
2852 ***********************************************************************/
2855 * @brief Create a new Sub
2857 * @param hash Hash of value shared among rps instances on other hosts that
2858 * defines a subgroup to sample from.
2859 * @param sampler_size Size of the sampler
2860 * @param round_interval Interval (in average) between two rounds
2865 new_sub (const struct GNUNET_HashCode *hash,
2866 uint32_t sampler_size,
2867 struct GNUNET_TIME_Relative round_interval)
2871 sub = GNUNET_new (struct Sub);
2873 /* With the hash generated from the secret value this service only connects
2874 * to rps instances that share the value */
2875 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2876 GNUNET_MQ_hd_fixed_size (peer_check,
2877 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2878 struct GNUNET_MessageHeader,
2880 GNUNET_MQ_hd_fixed_size (peer_push,
2881 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2882 struct GNUNET_MessageHeader,
2884 GNUNET_MQ_hd_fixed_size (peer_pull_request,
2885 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2886 struct GNUNET_MessageHeader,
2888 GNUNET_MQ_hd_var_size (peer_pull_reply,
2889 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2890 struct GNUNET_RPS_P2P_PullReplyMessage,
2892 GNUNET_MQ_handler_end ()
2896 GNUNET_CADET_open_port (cadet_handle,
2898 &handle_inbound_channel, /* Connect handler */
2900 NULL, /* WindowSize handler */
2901 &cleanup_destroyed_channel, /* Disconnect handler */
2903 if (NULL == sub->cadet_port)
2905 LOG (GNUNET_ERROR_TYPE_ERROR,
2906 "Cadet port `%s' is already in use.\n",
2907 GNUNET_APPLICATION_PORT_RPS);
2911 /* Set up general data structure to keep track about peers */
2912 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2914 GNUNET_CONFIGURATION_get_value_filename (cfg,
2916 "FILENAME_VALID_PEERS",
2917 &sub->filename_valid_peers))
2919 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2921 "FILENAME_VALID_PEERS");
2923 if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2925 char *tmp_filename_valid_peers;
2928 GNUNET_snprintf (str_hash,
2930 GNUNET_h2s_full (hash));
2931 tmp_filename_valid_peers = sub->filename_valid_peers;
2932 GNUNET_asprintf (&sub->filename_valid_peers,
2934 tmp_filename_valid_peers,
2936 GNUNET_free (tmp_filename_valid_peers);
2938 sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2940 /* Set up the sampler */
2941 sub->sampler_size_est_min = sampler_size;
2942 sub->sampler_size_est_need = sampler_size;;
2943 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2944 GNUNET_assert (0 != round_interval.rel_value_us);
2945 sub->round_interval = round_interval;
2946 sub->sampler = RPS_sampler_init (sampler_size,
2949 /* Logging of internals */
2951 sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2952 #endif /* TO_FILE_FULL */
2955 sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2957 #endif /* TO_FILE_FULL */
2958 sub->num_observed_peers = 0;
2959 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2961 #endif /* TO_FILE */
2963 /* Set up data structures for gossip */
2964 sub->push_map = CustomPeerMap_create (4);
2965 sub->pull_map = CustomPeerMap_create (4);
2966 sub->view_size_est_min = sampler_size;;
2967 sub->view = View_create (sub->view_size_est_min);
2970 GNUNET_STATISTICS_set (stats,
2972 sub->view_size_est_min,
2976 /* Start executing rounds */
2977 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2985 * @brief Write all numbers in the given array into the given file
2987 * Single numbers devided by a newline
2989 * @param hist_array[] the array to dump
2990 * @param file_name file to dump into
2993 write_histogram_to_file (const uint32_t hist_array[],
2994 const char *file_name)
2996 char collect_str[SIZE_DUMP_FILE + 1] = "";
2997 char *recv_str_iter;
2998 char *file_name_full;
3000 recv_str_iter = collect_str;
3001 file_name_full = store_prefix_file_name (&own_identity,
3003 for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
3005 char collect_str_tmp[8];
3007 GNUNET_snprintf (collect_str_tmp,
3008 sizeof(collect_str_tmp),
3011 recv_str_iter = stpncpy (recv_str_iter,
3015 (void) stpcpy (recv_str_iter,
3017 LOG (GNUNET_ERROR_TYPE_DEBUG,
3018 "Writing push stats to disk\n");
3019 to_file_w_len (file_name_full,
3022 GNUNET_free (file_name_full);
3026 #endif /* TO_FILE */
3030 * @brief Destroy Sub.
3032 * @param sub Sub to destroy
3035 destroy_sub (struct Sub *sub)
3037 GNUNET_assert (NULL != sub);
3038 GNUNET_assert (NULL != sub->do_round_task);
3039 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3040 sub->do_round_task = NULL;
3042 /* Disconnect from cadet */
3043 GNUNET_CADET_close_port (sub->cadet_port);
3044 sub->cadet_port = NULL;
3046 /* Clean up data structures for peers */
3047 RPS_sampler_destroy (sub->sampler);
3048 sub->sampler = NULL;
3049 View_destroy (sub->view);
3051 CustomPeerMap_destroy (sub->push_map);
3052 sub->push_map = NULL;
3053 CustomPeerMap_destroy (sub->pull_map);
3054 sub->pull_map = NULL;
3055 peers_terminate (sub);
3057 /* Free leftover data structures */
3059 GNUNET_free (sub->file_name_view_log);
3060 sub->file_name_view_log = NULL;
3061 #endif /* TO_FILE_FULL */
3064 GNUNET_free (sub->file_name_observed_log);
3065 sub->file_name_observed_log = NULL;
3066 #endif /* TO_FILE_FULL */
3068 /* Write push frequencies to disk */
3069 write_histogram_to_file (sub->push_recv,
3072 /* Write push deltas to disk */
3073 write_histogram_to_file (sub->push_delta,
3076 /* Write pull delays to disk */
3077 write_histogram_to_file (sub->pull_delays,
3080 GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3081 sub->observed_unique_peers = NULL;
3082 #endif /* TO_FILE */
3088 /***********************************************************************
3090 ***********************************************************************/
3093 /***********************************************************************
3095 ***********************************************************************/
3098 * @brief Callback on initialisation of Core.
3100 * @param cls - unused
3101 * @param my_identity - unused
3104 core_init (void *cls,
3105 const struct GNUNET_PeerIdentity *my_identity)
3110 map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3115 * @brief Callback for core.
3116 * Method called whenever a given peer connects.
3118 * @param cls closure - unused
3119 * @param peer peer identity this notification is about
3120 * @return closure given to #core_disconnects as peer_cls
3123 core_connects (void *cls,
3124 const struct GNUNET_PeerIdentity *peer,
3125 struct GNUNET_MQ_Handle *mq)
3130 GNUNET_assert (GNUNET_YES ==
3131 GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3134 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3140 * @brief Callback for core.
3141 * Method called whenever a peer disconnects.
3143 * @param cls closure - unused
3144 * @param peer peer identity this notification is about
3145 * @param peer_cls closure given in #core_connects - unused
3148 core_disconnects (void *cls,
3149 const struct GNUNET_PeerIdentity *peer,
3155 GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3159 /***********************************************************************
3161 ***********************************************************************/
3165 * @brief Destroy the context for a (connected) client
3167 * @param cli_ctx Context to destroy
3170 destroy_cli_ctx (struct ClientContext *cli_ctx)
3172 GNUNET_assert (NULL != cli_ctx);
3173 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3176 if (NULL != cli_ctx->sub)
3178 destroy_sub (cli_ctx->sub);
3179 cli_ctx->sub = NULL;
3181 GNUNET_free (cli_ctx);
3186 * @brief Update sizes in sampler and view on estimate update from nse service
3189 * @param logestimate the log(Base 2) value of the current network size estimate
3190 * @param std_dev standard deviation for the estimate
3193 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3197 // double scale; // TODO this might go gloabal/config
3199 LOG (GNUNET_ERROR_TYPE_DEBUG,
3200 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3201 logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3203 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3204 // GNUNET_NSE_log_estimate_to_n (logestimate);
3205 estimate = pow (estimate, 1.0 / 3);
3206 // TODO add if std_dev is a number
3207 // estimate += (std_dev * scale);
3208 if (sub->view_size_est_min < ceil (estimate))
3210 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3211 sub->sampler_size_est_need = estimate;
3212 sub->view_size_est_need = estimate;
3216 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3217 // sub->sampler_size_est_need = sub->view_size_est_min;
3218 sub->view_size_est_need = sub->view_size_est_min;
3222 GNUNET_STATISTICS_set (stats,
3224 sub->view_size_est_need,
3228 /* If the NSE has changed adapt the lists accordingly */
3229 resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3230 View_change_len (sub->view, sub->view_size_est_need);
3235 * Function called by NSE.
3237 * Updates sizes of sampler list and view and adapt those lists
3240 * implements #GNUNET_NSE_Callback
3242 * @param cls Closure - unused
3243 * @param timestamp time when the estimate was received from the server (or created by the server)
3244 * @param logestimate the log(Base 2) value of the current network size estimate
3245 * @param std_dev standard deviation for the estimate
3248 nse_callback (void *cls,
3249 struct GNUNET_TIME_Absolute timestamp,
3250 double logestimate, double std_dev)
3254 struct ClientContext *cli_ctx_iter;
3256 adapt_sizes (msub, logestimate, std_dev);
3257 for (cli_ctx_iter = cli_ctx_head;
3258 NULL != cli_ctx_iter;
3259 cli_ctx_iter = cli_ctx_iter->next)
3261 if (NULL != cli_ctx_iter->sub)
3263 adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3270 * @brief This function is called, when the client seeds peers.
3271 * It verifies that @a msg is well-formed.
3273 * @param cls the closure (#ClientContext)
3274 * @param msg the message
3275 * @return #GNUNET_OK if @a msg is well-formed
3276 * #GNUNET_SYSERR otherwise
3279 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3281 struct ClientContext *cli_ctx = cls;
3282 uint16_t msize = ntohs (msg->header.size);
3283 uint32_t num_peers = ntohl (msg->num_peers);
3285 msize -= sizeof(struct GNUNET_RPS_CS_SeedMessage);
3286 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3287 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3289 LOG (GNUNET_ERROR_TYPE_ERROR,
3290 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3291 ntohl (msg->num_peers),
3292 (msize / sizeof(struct GNUNET_PeerIdentity)));
3294 GNUNET_SERVICE_client_drop (cli_ctx->client);
3295 return GNUNET_SYSERR;
3302 * Handle seed from the client.
3304 * @param cls closure
3305 * @param message the actual message
3308 handle_client_seed (void *cls,
3309 const struct GNUNET_RPS_CS_SeedMessage *msg)
3311 struct ClientContext *cli_ctx = cls;
3312 struct GNUNET_PeerIdentity *peers;
3316 num_peers = ntohl (msg->num_peers);
3317 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3319 LOG (GNUNET_ERROR_TYPE_DEBUG,
3320 "Client seeded peers:\n");
3321 print_peer_list (peers, num_peers);
3323 for (i = 0; i < num_peers; i++)
3325 LOG (GNUNET_ERROR_TYPE_DEBUG,
3326 "Updating samplers with seed %" PRIu32 ": %s\n",
3328 GNUNET_i2s (&peers[i]));
3331 got_peer (msub, &peers[i]); /* Condition needed? */
3332 if (NULL != cli_ctx->sub)
3333 got_peer (cli_ctx->sub, &peers[i]);
3335 GNUNET_SERVICE_client_continue (cli_ctx->client);
3340 * Handle RPS request from the client.
3342 * @param cls Client context
3343 * @param message Message containing the numer of updates the client wants to
3347 handle_client_view_request (void *cls,
3348 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3350 struct ClientContext *cli_ctx = cls;
3351 uint64_t num_updates;
3353 num_updates = ntohl (msg->num_updates);
3355 LOG (GNUNET_ERROR_TYPE_DEBUG,
3356 "Client requested %" PRIu64 " updates of view.\n",
3359 GNUNET_assert (NULL != cli_ctx);
3360 cli_ctx->view_updates_left = num_updates;
3361 send_view (cli_ctx, NULL, 0);
3362 GNUNET_SERVICE_client_continue (cli_ctx->client);
3367 * @brief Handle the cancellation of the view updates.
3369 * @param cls The client context
3373 handle_client_view_cancel (void *cls,
3374 const struct GNUNET_MessageHeader *msg)
3376 struct ClientContext *cli_ctx = cls;
3380 LOG (GNUNET_ERROR_TYPE_DEBUG,
3381 "Client does not want to receive updates of view any more.\n");
3383 GNUNET_assert (NULL != cli_ctx);
3384 cli_ctx->view_updates_left = 0;
3385 GNUNET_SERVICE_client_continue (cli_ctx->client);
3386 if (GNUNET_YES == cli_ctx->stream_update)
3388 destroy_cli_ctx (cli_ctx);
3394 * Handle RPS request for biased stream from the client.
3396 * @param cls Client context
3397 * @param message unused
3400 handle_client_stream_request (void *cls,
3402 GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3404 struct ClientContext *cli_ctx = cls;
3408 LOG (GNUNET_ERROR_TYPE_DEBUG,
3409 "Client requested peers from biased stream.\n");
3410 cli_ctx->stream_update = GNUNET_YES;
3412 GNUNET_assert (NULL != cli_ctx);
3413 GNUNET_SERVICE_client_continue (cli_ctx->client);
3418 * @brief Handles the cancellation of the stream of biased peer ids
3420 * @param cls The client context
3424 handle_client_stream_cancel (void *cls,
3425 const struct GNUNET_MessageHeader *msg)
3427 struct ClientContext *cli_ctx = cls;
3431 LOG (GNUNET_ERROR_TYPE_DEBUG,
3432 "Client canceled receiving peers from biased stream.\n");
3433 cli_ctx->stream_update = GNUNET_NO;
3435 GNUNET_assert (NULL != cli_ctx);
3436 GNUNET_SERVICE_client_continue (cli_ctx->client);
3441 * @brief Create and start a Sub.
3443 * @param cls Closure - unused
3444 * @param msg Message containing the necessary information
3447 handle_client_start_sub (void *cls,
3448 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3450 struct ClientContext *cli_ctx = cls;
3452 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3453 if ((NULL != cli_ctx->sub) &&
3454 (0 != memcmp (&cli_ctx->sub->hash,
3456 sizeof(struct GNUNET_HashCode))) )
3458 LOG (GNUNET_ERROR_TYPE_WARNING,
3459 "Already have a Sub with different share for this client. Remove old one, add new.\n");
3460 destroy_sub (cli_ctx->sub);
3461 cli_ctx->sub = NULL;
3463 cli_ctx->sub = new_sub (&msg->hash,
3464 msub->sampler_size_est_min, // TODO make api input?
3465 GNUNET_TIME_relative_ntoh (msg->round_interval));
3466 GNUNET_SERVICE_client_continue (cli_ctx->client);
3471 * @brief Destroy the Sub
3473 * @param cls Closure - unused
3474 * @param msg Message containing the hash that identifies the Sub
3477 handle_client_stop_sub (void *cls,
3478 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3480 struct ClientContext *cli_ctx = cls;
3482 GNUNET_assert (NULL != cli_ctx->sub);
3483 if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof(struct
3486 LOG (GNUNET_ERROR_TYPE_WARNING,
3487 "Share of current sub and request differ!\n");
3489 destroy_sub (cli_ctx->sub);
3490 cli_ctx->sub = NULL;
3491 GNUNET_SERVICE_client_continue (cli_ctx->client);
3496 * Handle a CHECK_LIVE message from another peer.
3498 * This does nothing. But without calling #GNUNET_CADET_receive_done()
3499 * the channel is blocked for all other communication.
3501 * @param cls Closure - Context of channel
3502 * @param msg Message - unused
3505 handle_peer_check (void *cls,
3506 const struct GNUNET_MessageHeader *msg)
3508 const struct ChannelCtx *channel_ctx = cls;
3509 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3513 LOG (GNUNET_ERROR_TYPE_DEBUG,
3514 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3515 if (channel_ctx->peer_ctx->sub == msub)
3517 GNUNET_STATISTICS_update (stats,
3518 "# pending online checks",
3523 GNUNET_CADET_receive_done (channel_ctx->channel);
3528 * Handle a PUSH message from another peer.
3530 * Check the proof of work and store the PeerID
3531 * in the temporary list for pushed PeerIDs.
3533 * @param cls Closure - Context of channel
3534 * @param msg Message - unused
3537 handle_peer_push (void *cls,
3538 const struct GNUNET_MessageHeader *msg)
3540 const struct ChannelCtx *channel_ctx = cls;
3541 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3545 // (check the proof of work (?))
3547 LOG (GNUNET_ERROR_TYPE_DEBUG,
3548 "Received PUSH (%s)\n",
3550 if (channel_ctx->peer_ctx->sub == msub)
3552 GNUNET_STATISTICS_update (stats, "# push message received", 1, GNUNET_NO);
3553 if ((NULL != map_single_hop) &&
3554 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3557 GNUNET_STATISTICS_update (stats,
3558 "# push message received (multi-hop peer)",
3564 #if ENABLE_MALICIOUS
3565 struct AttackedPeer *tmp_att_peer;
3567 if ((1 == mal_type) ||
3569 { /* Try to maximise representation */
3570 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3571 tmp_att_peer->peer_id = *peer;
3572 if (NULL == att_peer_set)
3573 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3574 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3577 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3580 add_peer_array_to_set (peer, 1, att_peer_set);
3584 GNUNET_free (tmp_att_peer);
3589 else if (2 == mal_type)
3591 /* We attack one single well-known peer - simply ignore */
3593 #endif /* ENABLE_MALICIOUS */
3595 /* Add the sending peer to the push_map */
3596 CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3598 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3599 &channel_ctx->peer_ctx->peer_id));
3600 GNUNET_CADET_receive_done (channel_ctx->channel);
3605 * Handle PULL REQUEST request message from another peer.
3607 * Reply with the view of PeerIDs.
3609 * @param cls Closure - Context of channel
3610 * @param msg Message - unused
3613 handle_peer_pull_request (void *cls,
3614 const struct GNUNET_MessageHeader *msg)
3616 const struct ChannelCtx *channel_ctx = cls;
3617 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3618 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3619 const struct GNUNET_PeerIdentity *view_array;
3623 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (
3625 if (peer_ctx->sub == msub)
3627 GNUNET_STATISTICS_update (stats,
3628 "# pull request message received",
3631 if ((NULL != map_single_hop) &&
3632 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3633 &peer_ctx->peer_id)))
3635 GNUNET_STATISTICS_update (stats,
3636 "# pull request message received (multi-hop peer)",
3642 #if ENABLE_MALICIOUS
3645 { /* Try to maximise representation */
3646 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3649 else if (2 == mal_type)
3650 { /* Try to partition network */
3651 if (0 == GNUNET_memcmp (&attacked_peer, peer))
3653 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3656 #endif /* ENABLE_MALICIOUS */
3658 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3659 &channel_ctx->peer_ctx->peer_id));
3660 GNUNET_CADET_receive_done (channel_ctx->channel);
3661 view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3662 send_pull_reply (peer_ctx,
3664 View_size (channel_ctx->peer_ctx->sub->view));
3669 * Check whether we sent a corresponding request and
3670 * whether this reply is the first one.
3672 * @param cls Closure - Context of channel
3673 * @param msg Message containing the replied peers
3676 check_peer_pull_reply (void *cls,
3677 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3679 struct ChannelCtx *channel_ctx = cls;
3680 struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3682 if (sizeof(struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3684 GNUNET_break_op (0);
3685 return GNUNET_SYSERR;
3688 if ((ntohs (msg->header.size) - sizeof(struct
3689 GNUNET_RPS_P2P_PullReplyMessage))
3690 / sizeof(struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3692 LOG (GNUNET_ERROR_TYPE_ERROR,
3693 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3694 ntohl (msg->num_peers),
3695 (ntohs (msg->header.size) - sizeof(struct
3696 GNUNET_RPS_P2P_PullReplyMessage))
3697 / sizeof(struct GNUNET_PeerIdentity));
3698 GNUNET_break_op (0);
3699 return GNUNET_SYSERR;
3702 if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3703 &sender_ctx->peer_id,
3704 Peers_PULL_REPLY_PENDING))
3706 LOG (GNUNET_ERROR_TYPE_WARNING,
3707 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3708 GNUNET_i2s (&sender_ctx->peer_id));
3709 if (sender_ctx->sub == msub)
3711 GNUNET_STATISTICS_update (stats,
3712 "# unrequested pull replies",
3722 * Handle PULL REPLY message from another peer.
3724 * @param cls Closure
3725 * @param msg The message header
3728 handle_peer_pull_reply (void *cls,
3729 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3731 const struct ChannelCtx *channel_ctx = cls;
3732 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3733 const struct GNUNET_PeerIdentity *peers;
3734 struct Sub *sub = channel_ctx->peer_ctx->sub;
3737 #if ENABLE_MALICIOUS
3738 struct AttackedPeer *tmp_att_peer;
3739 #endif /* ENABLE_MALICIOUS */
3741 sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3742 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (
3744 if (channel_ctx->peer_ctx->sub == msub)
3746 GNUNET_STATISTICS_update (stats,
3747 "# pull reply messages received",
3750 if ((NULL != map_single_hop) &&
3751 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3753 peer_ctx->peer_id)) )
3755 GNUNET_STATISTICS_update (stats,
3756 "# pull reply messages received (multi-hop peer)",
3762 #if ENABLE_MALICIOUS
3763 // We shouldn't even receive pull replies as we're not sending
3767 #endif /* ENABLE_MALICIOUS */
3769 /* Do actual logic */
3770 peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3772 LOG (GNUNET_ERROR_TYPE_DEBUG,
3773 "PULL REPLY received, got following %u peers:\n",
3774 ntohl (msg->num_peers));
3776 for (i = 0; i < ntohl (msg->num_peers); i++)
3778 LOG (GNUNET_ERROR_TYPE_DEBUG,
3781 GNUNET_i2s (&peers[i]));
3783 #if ENABLE_MALICIOUS
3784 if ((NULL != att_peer_set) &&
3785 ((1 == mal_type) || (3 == mal_type) ))
3786 { /* Add attacked peer to local list */
3787 // TODO check if we sent a request and this was the first reply
3788 if ((GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3790 && (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3793 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3794 tmp_att_peer->peer_id = peers[i];
3795 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3798 add_peer_array_to_set (&peers[i], 1, att_peer_set);
3802 #endif /* ENABLE_MALICIOUS */
3803 /* Make sure we 'know' about this peer */
3804 (void) insert_peer (channel_ctx->peer_ctx->sub,
3807 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3810 CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3815 schedule_operation (channel_ctx->peer_ctx,
3817 channel_ctx->peer_ctx->sub); /* cls */
3818 (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3823 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3825 Peers_PULL_REPLY_PENDING);
3826 clean_peer (channel_ctx->peer_ctx->sub,
3829 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3831 GNUNET_CADET_receive_done (channel_ctx->channel);
3836 * Compute a random delay.
3837 * A uniformly distributed value between mean + spread and mean - spread.
3839 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3840 * It would return a random value between 2 and 6 min.
3842 * @param mean the mean time until the next round
3843 * @param spread the inverse amount of deviation from the mean
3845 static struct GNUNET_TIME_Relative
3846 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3847 unsigned int spread)
3849 struct GNUNET_TIME_Relative half_interval;
3850 struct GNUNET_TIME_Relative ret;
3851 unsigned int rand_delay;
3852 unsigned int max_rand_delay;
3856 LOG (GNUNET_ERROR_TYPE_WARNING,
3857 "Not accepting spread of 0\n");
3861 GNUNET_assert (0 != mean.rel_value_us);
3863 /* Compute random time value between spread * mean and spread * mean */
3864 half_interval = GNUNET_TIME_relative_divide (mean, spread);
3866 max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us
3867 / mean.rel_value_us * (2 / spread);
3869 * Compute random value between (0 and 1) * round_interval
3870 * via multiplying round_interval with a 'fraction' (0 to value)/value
3872 rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3874 ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3875 ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3876 ret = GNUNET_TIME_relative_add (ret, half_interval);
3878 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3879 LOG (GNUNET_ERROR_TYPE_WARNING,
3880 "Returning FOREVER_REL\n");
3887 * Send single pull request
3889 * @param peer_ctx Context to the peer to send request to
3892 send_pull_request (struct PeerContext *peer_ctx)
3894 struct GNUNET_MQ_Envelope *ev;
3896 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3898 Peers_PULL_REPLY_PENDING));
3899 SET_PEER_FLAG (peer_ctx,
3900 Peers_PULL_REPLY_PENDING);
3901 peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3903 LOG (GNUNET_ERROR_TYPE_DEBUG,
3904 "Going to send PULL REQUEST to peer %s.\n",
3905 GNUNET_i2s (&peer_ctx->peer_id));
3907 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3908 send_message (peer_ctx,
3913 GNUNET_STATISTICS_update (stats,
3914 "# pull request send issued",
3917 if ((NULL != map_single_hop) &&
3918 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3919 &peer_ctx->peer_id)))
3921 GNUNET_STATISTICS_update (stats,
3922 "# pull request send issued (multi-hop peer)",
3933 * @param peer_ctx Context of peer to send push to
3936 send_push (struct PeerContext *peer_ctx)
3938 struct GNUNET_MQ_Envelope *ev;
3940 LOG (GNUNET_ERROR_TYPE_DEBUG,
3941 "Going to send PUSH to peer %s.\n",
3942 GNUNET_i2s (&peer_ctx->peer_id));
3944 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3945 send_message (peer_ctx, ev, "PUSH");
3948 GNUNET_STATISTICS_update (stats,
3949 "# push send issued",
3952 if ((NULL != map_single_hop) &&
3953 (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3954 &peer_ctx->peer_id)))
3956 GNUNET_STATISTICS_update (stats,
3957 "# push send issued (multi-hop peer)",
3965 #if ENABLE_MALICIOUS
3969 * @brief This function is called, when the client tells us to act malicious.
3970 * It verifies that @a msg is well-formed.
3972 * @param cls the closure (#ClientContext)
3973 * @param msg the message
3974 * @return #GNUNET_OK if @a msg is well-formed
3977 check_client_act_malicious (void *cls,
3978 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3980 struct ClientContext *cli_ctx = cls;
3981 uint16_t msize = ntohs (msg->header.size);
3982 uint32_t num_peers = ntohl (msg->num_peers);
3984 msize -= sizeof(struct GNUNET_RPS_CS_ActMaliciousMessage);
3985 if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
3986 (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
3988 LOG (GNUNET_ERROR_TYPE_ERROR,
3989 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3990 ntohl (msg->num_peers),
3991 (msize / sizeof(struct GNUNET_PeerIdentity)));
3993 GNUNET_SERVICE_client_drop (cli_ctx->client);
3994 return GNUNET_SYSERR;
4001 * Turn RPS service to act malicious.
4003 * @param cls Closure
4004 * @param client The client that sent the message
4005 * @param msg The message header
4008 handle_client_act_malicious (void *cls,
4010 GNUNET_RPS_CS_ActMaliciousMessage *msg)
4012 struct ClientContext *cli_ctx = cls;
4013 struct GNUNET_PeerIdentity *peers;
4014 uint32_t num_mal_peers_sent;
4015 uint32_t num_mal_peers_old;
4016 struct Sub *sub = cli_ctx->sub;
4020 /* Do actual logic */
4021 peers = (struct GNUNET_PeerIdentity *) &msg[1];
4022 mal_type = ntohl (msg->type);
4023 if (NULL == mal_peer_set)
4024 mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
4026 LOG (GNUNET_ERROR_TYPE_DEBUG,
4027 "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
4029 ntohl (msg->num_peers));
4032 { /* Try to maximise representation */
4033 /* Add other malicious peers to those we already know */
4035 num_mal_peers_sent = ntohl (msg->num_peers);
4036 num_mal_peers_old = num_mal_peers;
4037 GNUNET_array_grow (mal_peers,
4039 num_mal_peers + num_mal_peers_sent);
4040 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4042 num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4044 /* Add all mal peers to mal_peer_set */
4045 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4049 /* Substitute do_round () with do_mal_round () */
4050 GNUNET_assert (NULL != sub->do_round_task);
4051 GNUNET_SCHEDULER_cancel (sub->do_round_task);
4052 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4055 else if ((2 == mal_type) ||
4057 { /* Try to partition the network */
4058 /* Add other malicious peers to those we already know */
4060 num_mal_peers_sent = ntohl (msg->num_peers) - 1;
4061 num_mal_peers_old = num_mal_peers;
4062 GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
4063 GNUNET_array_grow (mal_peers,
4065 num_mal_peers + num_mal_peers_sent);
4066 if ((NULL != mal_peers) &&
4067 (0 != num_mal_peers) )
4069 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
4071 num_mal_peers_sent * sizeof(struct GNUNET_PeerIdentity));
4073 /* Add all mal peers to mal_peer_set */
4074 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
4079 /* Store the one attacked peer */
4080 GNUNET_memcpy (&attacked_peer,
4081 &msg->attacked_peer,
4082 sizeof(struct GNUNET_PeerIdentity));
4083 /* Set the flag of the attacked peer to valid to avoid problems */
4084 if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
4086 (void) issue_peer_online_check (sub, &attacked_peer);
4089 LOG (GNUNET_ERROR_TYPE_DEBUG,
4090 "Attacked peer is %s\n",
4091 GNUNET_i2s (&attacked_peer));
4093 /* Substitute do_round () with do_mal_round () */
4094 if (NULL != sub->do_round_task)
4096 /* Probably in shutdown */
4097 GNUNET_SCHEDULER_cancel (sub->do_round_task);
4098 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4101 else if (0 == mal_type)
4102 { /* Stop acting malicious */
4103 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4105 /* Substitute do_mal_round () with do_round () */
4106 GNUNET_SCHEDULER_cancel (sub->do_round_task);
4107 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4112 GNUNET_SERVICE_client_continue (cli_ctx->client);
4114 GNUNET_SERVICE_client_continue (cli_ctx->client);
4119 * Send out PUSHes and PULLs maliciously.
4121 * This is executed regylary.
4123 * @param cls Closure - Sub
4126 do_mal_round (void *cls)
4128 uint32_t num_pushes;
4130 struct GNUNET_TIME_Relative time_next_round;
4131 struct AttackedPeer *tmp_att_peer;
4132 struct Sub *sub = cls;
4134 LOG (GNUNET_ERROR_TYPE_DEBUG,
4135 "Going to execute next round maliciously type %" PRIu32 ".\n",
4137 sub->do_round_task = NULL;
4138 GNUNET_assert (mal_type <= 3);
4139 /* Do malicious actions */
4141 { /* Try to maximise representation */
4142 /* The maximum of pushes we're going to send this round */
4143 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4144 num_attacked_peers),
4145 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4147 LOG (GNUNET_ERROR_TYPE_DEBUG,
4148 "Going to send %" PRIu32 " pushes\n",
4151 /* Send PUSHes to attacked peers */
4152 for (i = 0; i < num_pushes; i++)
4154 if (att_peers_tail == att_peer_index)
4155 att_peer_index = att_peers_head;
4157 att_peer_index = att_peer_index->next;
4159 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4162 /* Send PULLs to some peers to learn about additional peers to attack */
4163 tmp_att_peer = att_peer_index;
4164 for (i = 0; i < num_pushes * alpha; i++)
4166 if (att_peers_tail == tmp_att_peer)
4167 tmp_att_peer = att_peers_head;
4169 att_peer_index = tmp_att_peer->next;
4171 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4176 else if (2 == mal_type)
4178 * Try to partition the network
4179 * Send as many pushes to the attacked peer as possible
4180 * That is one push per round as it will ignore more.
4182 (void) issue_peer_online_check (sub, &attacked_peer);
4183 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4186 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4191 { /* Combined attack */
4192 /* Send PUSH to attacked peers */
4193 if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4195 (void) issue_peer_online_check (sub, &attacked_peer);
4196 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4200 LOG (GNUNET_ERROR_TYPE_DEBUG,
4201 "Goding to send push to attacked peer (%s)\n",
4202 GNUNET_i2s (&attacked_peer));
4203 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4206 (void) issue_peer_online_check (sub, &attacked_peer);
4208 /* The maximum of pushes we're going to send this round */
4209 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4210 num_attacked_peers),
4211 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4213 LOG (GNUNET_ERROR_TYPE_DEBUG,
4214 "Going to send %" PRIu32 " pushes\n",
4217 for (i = 0; i < num_pushes; i++)
4219 if (att_peers_tail == att_peer_index)
4220 att_peer_index = att_peers_head;
4222 att_peer_index = att_peer_index->next;
4224 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4227 /* Send PULLs to some peers to learn about additional peers to attack */
4228 tmp_att_peer = att_peer_index;
4229 for (i = 0; i < num_pushes * alpha; i++)
4231 if (att_peers_tail == tmp_att_peer)
4232 tmp_att_peer = att_peers_head;
4234 att_peer_index = tmp_att_peer->next;
4236 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4240 /* Schedule next round */
4241 time_next_round = compute_rand_delay (sub->round_interval, 2);
4243 GNUNET_assert (NULL == sub->do_round_task);
4244 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4245 &do_mal_round, sub);
4246 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4250 #endif /* ENABLE_MALICIOUS */
4254 * Send out PUSHes and PULLs, possibly update #view, samplers.
4256 * This is executed regylary.
4258 * @param cls Closure - Sub
4261 do_round (void *cls)
4264 const struct GNUNET_PeerIdentity *view_array;
4265 unsigned int *permut;
4266 unsigned int a_peers; /* Number of peers we send pushes to */
4267 unsigned int b_peers; /* Number of peers we send pull requests to */
4268 uint32_t first_border;
4269 uint32_t second_border;
4270 struct GNUNET_PeerIdentity peer;
4271 struct GNUNET_PeerIdentity *update_peer;
4272 struct Sub *sub = cls;
4275 LOG (GNUNET_ERROR_TYPE_DEBUG,
4276 "Going to execute next round.\n");
4279 GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4281 sub->do_round_task = NULL;
4283 to_file (sub->file_name_view_log,
4284 "___ new round ___");
4285 #endif /* TO_FILE_FULL */
4286 view_array = View_get_as_array (sub->view);
4287 for (i = 0; i < View_size (sub->view); i++)
4289 LOG (GNUNET_ERROR_TYPE_DEBUG,
4290 "\t%s\n", GNUNET_i2s (&view_array[i]));
4292 to_file (sub->file_name_view_log,
4294 GNUNET_i2s_full (&view_array[i]));
4295 #endif /* TO_FILE_FULL */
4299 /* Send pushes and pull requests */
4300 if (0 < View_size (sub->view))
4302 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4303 View_size (sub->view));
4306 a_peers = ceil (alpha * View_size (sub->view));
4308 LOG (GNUNET_ERROR_TYPE_DEBUG,
4309 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4310 a_peers, alpha, View_size (sub->view));
4311 for (i = 0; i < a_peers; i++)
4313 peer = view_array[permut[i]];
4314 // FIXME if this fails schedule/loop this for later
4315 send_push (get_peer_ctx (sub->peer_map, &peer));
4318 /* Send PULL requests */
4319 b_peers = ceil (beta * View_size (sub->view));
4320 first_border = a_peers;
4321 second_border = a_peers + b_peers;
4322 if (second_border > View_size (sub->view))
4324 first_border = View_size (sub->view) - b_peers;
4325 second_border = View_size (sub->view);
4327 LOG (GNUNET_ERROR_TYPE_DEBUG,
4328 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4329 b_peers, beta, View_size (sub->view));
4330 for (i = first_border; i < second_border; i++)
4332 peer = view_array[permut[i]];
4333 if (GNUNET_NO == check_peer_flag (sub->peer_map,
4335 Peers_PULL_REPLY_PENDING))
4336 { // FIXME if this fails schedule/loop this for later
4337 send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4341 GNUNET_free (permut);
4347 /* TODO see how many peers are in push-/pull- list! */
4349 if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4350 (0 < CustomPeerMap_size (sub->push_map)) &&
4351 (0 < CustomPeerMap_size (sub->pull_map)))
4352 { /* If conditions for update are fulfilled, update */
4353 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4355 uint32_t final_size;
4356 uint32_t peers_to_clean_size;
4357 struct GNUNET_PeerIdentity *peers_to_clean;
4359 peers_to_clean = NULL;
4360 peers_to_clean_size = 0;
4361 GNUNET_array_grow (peers_to_clean,
4362 peers_to_clean_size,
4363 View_size (sub->view));
4364 GNUNET_memcpy (peers_to_clean,
4366 View_size (sub->view) * sizeof(struct GNUNET_PeerIdentity));
4368 /* Seems like recreating is the easiest way of emptying the peermap */
4369 View_clear (sub->view);
4371 to_file (sub->file_name_view_log,
4373 #endif /* TO_FILE_FULL */
4375 first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4376 CustomPeerMap_size (sub->push_map));
4377 second_border = first_border
4378 + GNUNET_MIN (floor (beta * sub->view_size_est_need),
4379 CustomPeerMap_size (sub->pull_map));
4380 final_size = second_border
4381 + ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4382 LOG (GNUNET_ERROR_TYPE_DEBUG,
4383 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"
4389 /* Update view with peers received through PUSHes */
4390 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4391 CustomPeerMap_size (sub->push_map));
4392 for (i = 0; i < first_border; i++)
4395 inserted = insert_in_view (sub,
4396 CustomPeerMap_get_peer_by_index (sub->push_map,
4398 if (GNUNET_OK == inserted)
4400 clients_notify_stream_peer (sub,
4402 CustomPeerMap_get_peer_by_index (
4403 sub->push_map, permut[i]));
4406 to_file (sub->file_name_view_log,
4408 GNUNET_i2s_full (&view_array[i]));
4409 #endif /* TO_FILE_FULL */
4410 // TODO change the peer_flags accordingly
4412 GNUNET_free (permut);
4415 /* Update view with peers received through PULLs */
4416 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4417 CustomPeerMap_size (sub->pull_map));
4418 for (i = first_border; i < second_border; i++)
4421 inserted = insert_in_view (sub,
4422 CustomPeerMap_get_peer_by_index (sub->pull_map,
4427 if (GNUNET_OK == inserted)
4429 clients_notify_stream_peer (sub,
4431 CustomPeerMap_get_peer_by_index (
4437 to_file (sub->file_name_view_log,
4439 GNUNET_i2s_full (&view_array[i]));
4440 #endif /* TO_FILE_FULL */
4441 // TODO change the peer_flags accordingly
4443 GNUNET_free (permut);
4446 /* Update view with peers from history */
4447 RPS_sampler_get_n_rand_peers (sub->sampler,
4448 final_size - second_border,
4451 // TODO change the peer_flags accordingly
4453 for (i = 0; i < View_size (sub->view); i++)
4454 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4456 /* Clean peers that were removed from the view */
4457 for (i = 0; i < peers_to_clean_size; i++)
4460 to_file (sub->file_name_view_log,
4462 GNUNET_i2s_full (&peers_to_clean[i]));
4463 #endif /* TO_FILE_FULL */
4464 clean_peer (sub, &peers_to_clean[i]);
4467 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4468 clients_notify_view_update (sub);
4472 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4475 GNUNET_STATISTICS_update (stats, "# rounds blocked", 1, GNUNET_NO);
4476 if ((CustomPeerMap_size (sub->push_map) > alpha
4477 * sub->view_size_est_need) &&
4478 ! (0 >= CustomPeerMap_size (sub->pull_map)))
4479 GNUNET_STATISTICS_update (stats, "# rounds blocked - too many pushes",
4481 if ((CustomPeerMap_size (sub->push_map) > alpha
4482 * sub->view_size_est_need) &&
4483 (0 >= CustomPeerMap_size (sub->pull_map)))
4484 GNUNET_STATISTICS_update (stats,
4485 "# rounds blocked - too many pushes, no pull replies",
4487 if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4488 ! (0 >= CustomPeerMap_size (sub->pull_map)))
4489 GNUNET_STATISTICS_update (stats, "# rounds blocked - no pushes", 1,
4491 if ((0 >= CustomPeerMap_size (sub->push_map)) &&
4492 (0 >= CustomPeerMap_size (sub->pull_map)))
4493 GNUNET_STATISTICS_update (stats,
4494 "# rounds blocked - no pushes, no pull replies",
4496 if ((0 >= CustomPeerMap_size (sub->pull_map)) &&
4497 (CustomPeerMap_size (sub->push_map) > alpha
4498 * sub->view_size_est_need) &&
4499 (0 >= CustomPeerMap_size (sub->push_map)) )
4500 GNUNET_STATISTICS_update (stats, "# rounds blocked - no pull replies",
4504 // TODO independent of that also get some peers from CADET_get_peers()?
4505 if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4507 sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4511 LOG (GNUNET_ERROR_TYPE_WARNING,
4512 "Push map size too big for histogram (%u, %u)\n",
4513 CustomPeerMap_size (sub->push_map),
4514 HISTOGRAM_FILE_SLOTS);
4516 // FIXME check bounds of histogram
4517 sub->push_delta[(int32_t) (CustomPeerMap_size (sub->push_map)
4518 - (alpha * sub->view_size_est_need))
4519 + (HISTOGRAM_FILE_SLOTS / 2)]++;
4522 GNUNET_STATISTICS_set (stats,
4523 "# peers in push map at end of round",
4524 CustomPeerMap_size (sub->push_map),
4526 GNUNET_STATISTICS_set (stats,
4527 "# peers in pull map at end of round",
4528 CustomPeerMap_size (sub->pull_map),
4530 GNUNET_STATISTICS_set (stats,
4531 "# peers in view at end of round",
4532 View_size (sub->view),
4534 GNUNET_STATISTICS_set (stats,
4535 "# expected pushes",
4536 alpha * sub->view_size_est_need,
4538 GNUNET_STATISTICS_set (stats,
4539 "delta expected - received pushes",
4540 CustomPeerMap_size (sub->push_map) - (alpha
4542 view_size_est_need),
4546 LOG (GNUNET_ERROR_TYPE_DEBUG,
4547 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4548 CustomPeerMap_size (sub->push_map),
4549 CustomPeerMap_size (sub->pull_map),
4551 View_size (sub->view),
4552 alpha * View_size (sub->view));
4554 /* Update samplers */
4555 for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4557 update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4558 LOG (GNUNET_ERROR_TYPE_DEBUG,
4559 "Updating with peer %s from push list\n",
4560 GNUNET_i2s (update_peer));
4561 insert_in_sampler (sub, update_peer);
4562 clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4565 for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4567 LOG (GNUNET_ERROR_TYPE_DEBUG,
4568 "Updating with peer %s from pull list\n",
4569 GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4570 insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4571 /* This cleans only if it is not in the view */
4572 clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4576 /* Empty push/pull lists */
4577 CustomPeerMap_clear (sub->push_map);
4578 CustomPeerMap_clear (sub->pull_map);
4582 GNUNET_STATISTICS_set (stats,
4584 View_size (sub->view),
4588 struct GNUNET_TIME_Relative time_next_round;
4590 time_next_round = compute_rand_delay (sub->round_interval, 2);
4592 /* Schedule next round */
4593 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4595 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4600 * This is called from GNUNET_CADET_get_peers().
4602 * It is called on every peer(ID) that cadet somehow has contact with.
4603 * We use those to initialise the sampler.
4605 * implements #GNUNET_CADET_PeersCB
4607 * @param cls Closure - Sub
4608 * @param peer Peer, or NULL on "EOF".
4609 * @param tunnel Do we have a tunnel towards this peer?
4610 * @param n_paths Number of known paths towards this peer.
4611 * @param best_path How long is the best path?
4612 * (0 = unknown, 1 = ourselves, 2 = neighbor)
4615 init_peer_cb (void *cls,
4616 const struct GNUNET_PeerIdentity *peer,
4617 int tunnel, /* "Do we have a tunnel towards this peer?" */
4618 unsigned int n_paths, /* "Number of known paths towards this peer" */
4619 unsigned int best_path) /* "How long is the best path?
4620 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4622 struct Sub *sub = cls;
4630 LOG (GNUNET_ERROR_TYPE_DEBUG,
4631 "Got peer_id %s from cadet\n",
4633 got_peer (sub, peer);
4639 * @brief Iterator function over stored, valid peers.
4641 * We initialise the sampler with those.
4643 * @param cls Closure - Sub
4644 * @param peer the peer id
4645 * @return #GNUNET_YES if we should continue to
4647 * #GNUNET_NO if not.
4650 valid_peers_iterator (void *cls,
4651 const struct GNUNET_PeerIdentity *peer)
4653 struct Sub *sub = cls;
4657 LOG (GNUNET_ERROR_TYPE_DEBUG,
4658 "Got stored, valid peer %s\n",
4660 got_peer (sub, peer);
4667 * Iterator over peers from peerinfo.
4669 * @param cls Closure - Sub
4670 * @param peer id of the peer, NULL for last call
4671 * @param hello hello message for the peer (can be NULL)
4672 * @param error message
4675 process_peerinfo_peers (void *cls,
4676 const struct GNUNET_PeerIdentity *peer,
4677 const struct GNUNET_HELLO_Message *hello,
4678 const char *err_msg)
4680 struct Sub *sub = cls;
4687 LOG (GNUNET_ERROR_TYPE_DEBUG,
4688 "Got peer_id %s from peerinfo\n",
4690 got_peer (sub, peer);
4696 * Task run during shutdown.
4698 * @param cls Closure - unused
4701 shutdown_task (void *cls)
4704 struct ClientContext *client_ctx;
4706 LOG (GNUNET_ERROR_TYPE_DEBUG,
4707 "RPS service is going down\n");
4709 /* Clean all clients */
4710 for (client_ctx = cli_ctx_head;
4711 NULL != cli_ctx_head;
4712 client_ctx = cli_ctx_head)
4714 destroy_cli_ctx (client_ctx);
4722 /* Disconnect from other services */
4723 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4724 GNUNET_PEERINFO_disconnect (peerinfo_handle);
4725 peerinfo_handle = NULL;
4726 GNUNET_NSE_disconnect (nse);
4727 if (NULL != map_single_hop)
4729 /* core_init was called - core was initialised */
4730 /* disconnect first, so no callback tries to access missing peermap */
4731 GNUNET_CORE_disconnect (core_handle);
4733 GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4734 map_single_hop = NULL;
4739 GNUNET_STATISTICS_destroy (stats,
4743 GNUNET_CADET_disconnect (cadet_handle);
4744 cadet_handle = NULL;
4745 #if ENABLE_MALICIOUS
4746 struct AttackedPeer *tmp_att_peer;
4747 GNUNET_array_grow (mal_peers,
4750 if (NULL != mal_peer_set)
4751 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4752 if (NULL != att_peer_set)
4753 GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4754 while (NULL != att_peers_head)
4756 tmp_att_peer = att_peers_head;
4757 GNUNET_CONTAINER_DLL_remove (att_peers_head,
4760 GNUNET_free (tmp_att_peer);
4762 #endif /* ENABLE_MALICIOUS */
4768 * Handle client connecting to the service.
4771 * @param client the new client
4772 * @param mq the message queue of @a client
4776 client_connect_cb (void *cls,
4777 struct GNUNET_SERVICE_Client *client,
4778 struct GNUNET_MQ_Handle *mq)
4780 struct ClientContext *cli_ctx;
4784 LOG (GNUNET_ERROR_TYPE_DEBUG,
4785 "Client connected\n");
4787 return client; /* Server was destroyed before a client connected. Shutting down */
4788 cli_ctx = GNUNET_new (struct ClientContext);
4790 cli_ctx->view_updates_left = -1;
4791 cli_ctx->stream_update = GNUNET_NO;
4792 cli_ctx->client = client;
4793 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4801 * Callback called when a client disconnected from the service
4803 * @param cls closure for the service
4804 * @param c the client that disconnected
4805 * @param internal_cls should be equal to @a c
4808 client_disconnect_cb (void *cls,
4809 struct GNUNET_SERVICE_Client *client,
4812 struct ClientContext *cli_ctx = internal_cls;
4815 GNUNET_assert (client == cli_ctx->client);
4817 { /* shutdown task - destroy all clients */
4818 while (NULL != cli_ctx_head)
4819 destroy_cli_ctx (cli_ctx_head);
4822 { /* destroy this client */
4823 LOG (GNUNET_ERROR_TYPE_DEBUG,
4824 "Client disconnected. Destroy its context.\n");
4825 destroy_cli_ctx (cli_ctx);
4831 * Handle random peer sampling clients.
4833 * @param cls closure
4834 * @param c configuration to use
4835 * @param service the initialized service
4839 const struct GNUNET_CONFIGURATION_Handle *c,
4840 struct GNUNET_SERVICE_Handle *service)
4842 struct GNUNET_TIME_Relative round_interval;
4843 long long unsigned int sampler_size;
4844 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4845 struct GNUNET_HashCode hash;
4850 GNUNET_log_setup ("rps",
4851 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4855 GNUNET_CRYPTO_get_peer_identity (cfg,
4856 &own_identity); // TODO check return value
4857 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4858 "STARTING SERVICE (rps) for peer [%s]\n",
4859 GNUNET_i2s (&own_identity));
4860 #if ENABLE_MALICIOUS
4861 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4862 "Malicious execution compiled in.\n");
4863 #endif /* ENABLE_MALICIOUS */
4865 /* Get time interval from the configuration */
4867 GNUNET_CONFIGURATION_get_value_time (cfg,
4872 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4873 "RPS", "ROUNDINTERVAL");
4874 GNUNET_SCHEDULER_shutdown ();
4878 /* Get initial size of sampler/view from the configuration */
4880 GNUNET_CONFIGURATION_get_value_number (cfg,
4885 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4887 GNUNET_SCHEDULER_shutdown ();
4891 cadet_handle = GNUNET_CADET_connect (cfg);
4892 GNUNET_assert (NULL != cadet_handle);
4893 core_handle = GNUNET_CORE_connect (cfg,
4895 core_init, /* init */
4896 core_connects, /* connects */
4897 core_disconnects, /* disconnects */
4898 NULL); /* handlers */
4899 GNUNET_assert (NULL != core_handle);
4906 /* Set up main Sub */
4907 GNUNET_CRYPTO_hash (hash_port_string,
4908 strlen (hash_port_string),
4910 msub = new_sub (&hash,
4911 sampler_size, /* Will be overwritten by config */
4915 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4917 /* connect to NSE */
4918 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4920 // LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4921 // GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4922 // TODO send push/pull to each of those peers?
4923 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4924 restore_valid_peers (msub);
4925 get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4927 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4929 process_peerinfo_peers,
4932 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4934 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4935 stats = GNUNET_STATISTICS_create ("rps", cfg);
4940 * Define "main" method using service macro.
4944 GNUNET_SERVICE_OPTION_NONE,
4947 &client_disconnect_cb,
4949 GNUNET_MQ_hd_var_size (client_seed,
4950 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4951 struct GNUNET_RPS_CS_SeedMessage,
4953 #if ENABLE_MALICIOUS
4954 GNUNET_MQ_hd_var_size (client_act_malicious,
4955 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4956 struct GNUNET_RPS_CS_ActMaliciousMessage,
4958 #endif /* ENABLE_MALICIOUS */
4959 GNUNET_MQ_hd_fixed_size (client_view_request,
4960 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4961 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4963 GNUNET_MQ_hd_fixed_size (client_view_cancel,
4964 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4965 struct GNUNET_MessageHeader,
4967 GNUNET_MQ_hd_fixed_size (client_stream_request,
4968 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4969 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4971 GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4972 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4973 struct GNUNET_MessageHeader,
4975 GNUNET_MQ_hd_fixed_size (client_start_sub,
4976 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4977 struct GNUNET_RPS_CS_SubStartMessage,
4979 GNUNET_MQ_hd_fixed_size (client_stop_sub,
4980 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4981 struct GNUNET_RPS_CS_SubStopMessage,
4983 GNUNET_MQ_handler_end ());
4985 /* end of gnunet-service-rps.c */