2 This file is part of GNUnet.
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
22 * @file rps/gnunet-service-rps.c
23 * @brief rps service implementation
24 * @author Julius Bünger
27 #include "gnunet_applications.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_core_service.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_nse_service.h"
33 #include "gnunet_statistics_service.h"
35 #include "rps-test_util.h"
36 #include "gnunet-service-rps_sampler.h"
37 #include "gnunet-service-rps_custommap.h"
38 #include "gnunet-service-rps_view.h"
44 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
46 // TODO check for overflows
48 // TODO align message structs
50 // TODO connect to friends
52 // TODO blacklist? (-> mal peer detection on top of brahms)
54 // hist_size_init, hist_size_max
56 /***********************************************************************
57 * Old gnunet-service-rps_peers.c
58 ***********************************************************************/
61 * Set a peer flag of given peer context.
63 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
66 * Get peer flag of given peer context.
68 #define check_peer_flag_set(peer_ctx, mask)\
69 ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
72 * Unset flag of given peer context.
74 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
77 * Get channel flag of given channel context.
79 #define check_channel_flag_set(channel_flags, mask)\
80 ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
83 * Unset flag of given channel context.
85 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
90 * Pending operation on peer consisting of callback and closure
92 * When an operation cannot be executed right now this struct is used to store
93 * the callback and closure for later execution.
109 * List containing all messages that are yet to be send
111 * This is used to keep track of all messages that have not been sent yet. When
112 * a peer is to be removed the pending messages can be removed properly.
114 struct PendingMessage
119 struct PendingMessage *next;
120 struct PendingMessage *prev;
123 * The envelope to the corresponding message
125 struct GNUNET_MQ_Envelope *ev;
128 * The corresponding context
130 struct PeerContext *peer_ctx;
139 * @brief Context for a channel
144 * Struct used to keep track of other peer's status
146 * This is stored in a multipeermap.
147 * It contains information such as cadet channels, a message queue for sending,
148 * status about the channels, the pending operations on this peer and some flags
149 * about the status of the peer itself. (online, valid, ...)
154 * The Sub this context belongs to.
159 * Message queue open to client
161 struct GNUNET_MQ_Handle *mq;
164 * Channel open to client.
166 struct ChannelCtx *send_channel_ctx;
169 * Channel open from client.
171 struct ChannelCtx *recv_channel_ctx;
174 * Array of pending operations on this peer.
176 struct PeerPendingOp *pending_ops;
179 * Handle to the callback given to cadet_ntfy_tmt_rdy()
181 * To be canceled on shutdown.
183 struct PendingMessage *online_check_pending;
186 * Number of pending operations.
188 unsigned int num_pending_ops;
191 * Identity of the peer
193 struct GNUNET_PeerIdentity peer_id;
196 * Flags indicating status of peer
201 * Last time we received something from that peer.
203 struct GNUNET_TIME_Absolute last_message_recv;
206 * Last time we received a keepalive message.
208 struct GNUNET_TIME_Absolute last_keepalive;
211 * DLL with all messages that are yet to be sent
213 struct PendingMessage *pending_messages_head;
214 struct PendingMessage *pending_messages_tail;
217 * This is pobably followed by 'statistical' data (when we first saw
218 * it, how did we get its ID, how many pushes (in a timeinterval),
221 uint32_t round_pull_req;
225 * @brief Closure to #valid_peer_iterator
227 struct PeersIteratorCls
232 PeersIterator iterator;
235 * Closure to iterator
241 * @brief Context for a channel
246 * @brief The channel itself
248 struct GNUNET_CADET_Channel *channel;
251 * @brief The peer context associated with the channel
253 struct PeerContext *peer_ctx;
256 * @brief When channel destruction needs to be delayed (because it is called
257 * from within the cadet routine of another channel destruction) this task
258 * refers to the respective _SCHEDULER_Task.
260 struct GNUNET_SCHEDULER_Task *destruction_task;
267 * If type is 2 This struct is used to store the attacked peers in a DLL
274 struct AttackedPeer *next;
275 struct AttackedPeer *prev;
280 struct GNUNET_PeerIdentity peer_id;
283 #endif /* ENABLE_MALICIOUS */
286 * @brief This number determines the number of slots for files that represent
289 #define HISTOGRAM_FILE_SLOTS 32
292 * @brief The size (in bytes) a file needs to store the histogram
294 * Per slot: 1 newline, up to 4 chars,
295 * Additionally: 1 null termination
297 #define SIZE_DUMP_FILE (HISTOGRAM_FILE_SLOTS * 5) + 1
302 * Essentially one instance of brahms that only connects to other instances
303 * with the same (secret) value.
308 * @brief Hash of the shared value that defines Subs.
310 struct GNUNET_HashCode hash;
313 * @brief Port to communicate to other peers.
315 struct GNUNET_CADET_Port *cadet_port;
318 * @brief Hashmap of valid peers.
320 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
323 * @brief Filename of the file that stores the valid peers persistently.
325 char *filename_valid_peers;
328 * Set of all peers to keep track of them.
330 struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
333 * @brief This is the minimum estimate used as sampler size.
335 * It is configured by the user.
337 unsigned int sampler_size_est_min;
340 * The size of sampler we need to be able to satisfy the Brahms protocol's
341 * need of random peers.
343 * This is one minimum size the sampler grows to.
345 unsigned int sampler_size_est_need;
348 * Time inverval the do_round task runs in.
350 struct GNUNET_TIME_Relative round_interval;
353 * Sampler used for the Brahms protocol itself.
355 struct RPS_Sampler *sampler;
358 * Name to log view to
360 char *file_name_view_log;
364 * Name to log number of observed peers to
366 char *file_name_observed_log;
369 * @brief Count the observed peers
371 uint32_t num_observed_peers;
374 * @brief Multipeermap (ab-) used to count unique peer_ids
376 struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
380 * List to store peers received through pushes temporary.
382 struct CustomPeerMap *push_map;
385 * List to store peers received through pulls temporary.
387 struct CustomPeerMap *pull_map;
390 * @brief This is the estimate used as view size.
392 * It is initialised with the minimum
394 unsigned int view_size_est_need;
397 * @brief This is the minimum estimate used as view size.
399 * It is configured by the user.
401 unsigned int view_size_est_min;
409 * Identifier for the main task that runs periodically.
411 struct GNUNET_SCHEDULER_Task *do_round_task;
416 * @brief Counts the executed rounds.
421 * @brief This array accumulates the number of received pushes per round.
423 * Number at index i represents the number of rounds with i observed pushes.
425 uint32_t push_recv[HISTOGRAM_FILE_SLOTS];
428 * @brief Histogram of deltas between the expected and actual number of
431 * As half of the entries are expected to be negative, this is shifted by
432 * #HISTOGRAM_FILE_SLOTS/2.
434 uint32_t push_delta[HISTOGRAM_FILE_SLOTS];
437 * @brief Number of pull replies with this delay measured in rounds.
439 * Number at index i represents the number of pull replies with a delay of i
442 uint32_t pull_delays[HISTOGRAM_FILE_SLOTS];
446 /***********************************************************************
448 ***********************************************************************/
453 static const struct GNUNET_CONFIGURATION_Handle *cfg;
456 * Handle to the statistics service.
458 struct GNUNET_STATISTICS_Handle *stats;
463 struct GNUNET_CADET_Handle *cadet_handle;
468 struct GNUNET_CORE_Handle *core_handle;
471 * @brief PeerMap to keep track of connected peers.
473 struct GNUNET_CONTAINER_MultiPeerMap *map_single_hop;
478 static struct GNUNET_PeerIdentity own_identity;
481 * Percentage of total peer number in the view
482 * to send random PUSHes to
487 * Percentage of total peer number in the view
488 * to send random PULLs to
495 static struct GNUNET_NSE_Handle *nse;
498 * Handler to PEERINFO.
500 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
503 * Handle for cancellation of iteration over peers.
505 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
510 * Type of malicious peer
512 * 0 Don't act malicious at all - Default
513 * 1 Try to maximise representation
514 * 2 Try to partition the network
517 static uint32_t mal_type;
520 * Other malicious peers
522 static struct GNUNET_PeerIdentity *mal_peers;
525 * Hashmap of malicious peers used as set.
526 * Used to more efficiently check whether we know that peer.
528 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
531 * Number of other malicious peers
533 static uint32_t num_mal_peers;
537 * If type is 2 this is the DLL of attacked peers
539 static struct AttackedPeer *att_peers_head;
540 static struct AttackedPeer *att_peers_tail;
543 * This index is used to point to an attacked peer to
544 * implement the round-robin-ish way to select attacked peers.
546 static struct AttackedPeer *att_peer_index;
549 * Hashmap of attacked peers used as set.
550 * Used to more efficiently check whether we know that peer.
552 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
555 * Number of attacked peers
557 static uint32_t num_attacked_peers;
560 * If type is 1 this is the attacked peer
562 static struct GNUNET_PeerIdentity attacked_peer;
565 * The limit of PUSHes we can send in one round.
566 * This is an assumption of the Brahms protocol and either implemented
569 * assumend to be the bandwidth limitation.
571 static uint32_t push_limit = 10000;
572 #endif /* ENABLE_MALICIOUS */
577 * This is run in any case by all peers and connects to all peers without
578 * specifying a shared value.
580 static struct Sub *msub;
583 * @brief Maximum number of valid peers to keep.
584 * TODO read from config
586 static const uint32_t num_valid_peers_max = UINT32_MAX;
588 /***********************************************************************
590 ***********************************************************************/
594 do_round (void *cls);
597 do_mal_round (void *cls);
601 * @brief Get the #PeerContext associated with a peer
603 * @param peer_map The peer map containing the context
604 * @param peer the peer id
606 * @return the #PeerContext
608 static struct PeerContext *
609 get_peer_ctx (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
610 const struct GNUNET_PeerIdentity *peer)
612 struct PeerContext *ctx;
615 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
616 GNUNET_assert (GNUNET_YES == ret);
617 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
618 GNUNET_assert (NULL != ctx);
623 * @brief Check whether we have information about the given peer.
625 * FIXME probably deprecated. Make this the new _online.
627 * @param peer_map The peer map to check for the existence of @a peer
628 * @param peer peer in question
630 * @return #GNUNET_YES if peer is known
631 * #GNUNET_NO if peer is not knwon
634 check_peer_known (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
635 const struct GNUNET_PeerIdentity *peer)
637 if (NULL != peer_map)
639 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
649 * @brief Create a new #PeerContext and insert it into the peer map
651 * @param sub The Sub this context belongs to.
652 * @param peer the peer to create the #PeerContext for
654 * @return the #PeerContext
656 static struct PeerContext *
657 create_peer_ctx (struct Sub *sub,
658 const struct GNUNET_PeerIdentity *peer)
660 struct PeerContext *ctx;
663 GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
665 ctx = GNUNET_new (struct PeerContext);
666 ctx->peer_id = *peer;
668 ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
669 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
670 GNUNET_assert (GNUNET_OK == ret);
673 GNUNET_STATISTICS_set (stats,
675 GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
683 * @brief Create or get a #PeerContext
685 * @param sub The Sub to which the created context belongs to
686 * @param peer the peer to get the associated context to
688 * @return the context
690 static struct PeerContext *
691 create_or_get_peer_ctx (struct Sub *sub,
692 const struct GNUNET_PeerIdentity *peer)
694 if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
696 return create_peer_ctx (sub, peer);
698 return get_peer_ctx (sub->peer_map, peer);
703 * @brief Check whether we have a connection to this @a peer
705 * Also sets the #Peers_ONLINE flag accordingly
707 * @param peer_ctx Context of the peer of which connectivity is to be checked
709 * @return #GNUNET_YES if we are connected
710 * #GNUNET_NO otherwise
713 check_connected (struct PeerContext *peer_ctx)
715 /* If we don't know about this peer we don't know whether it's online */
716 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
721 /* Get the context */
722 peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
723 /* If we have no channel to this peer we don't know whether it's online */
724 if ( (NULL == peer_ctx->send_channel_ctx) &&
725 (NULL == peer_ctx->recv_channel_ctx) )
727 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
730 /* Otherwise (if we have a channel, we know that it's online */
731 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
737 * @brief The closure to #get_rand_peer_iterator.
739 struct GetRandPeerIteratorCls
742 * @brief The index of the peer to return.
743 * Will be decreased until 0.
744 * Then current peer is returned.
749 * @brief Pointer to peer to return.
751 const struct GNUNET_PeerIdentity *peer;
756 * @brief Iterator function for #get_random_peer_from_peermap.
758 * Implements #GNUNET_CONTAINER_PeerMapIterator.
759 * Decreases the index until the index is null.
760 * Then returns the current peer.
762 * @param cls the #GetRandPeerIteratorCls containing index and peer
763 * @param peer current peer
764 * @param value unused
766 * @return #GNUNET_YES if we should continue to
771 get_rand_peer_iterator (void *cls,
772 const struct GNUNET_PeerIdentity *peer,
775 struct GetRandPeerIteratorCls *iterator_cls = cls;
778 if (0 >= iterator_cls->index)
780 iterator_cls->peer = peer;
783 iterator_cls->index--;
789 * @brief Get a random peer from @a peer_map
791 * @param valid_peers Peer map containing valid peers from which to select a
794 * @return a random peer
796 static const struct GNUNET_PeerIdentity *
797 get_random_peer_from_peermap (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
799 struct GetRandPeerIteratorCls *iterator_cls;
800 const struct GNUNET_PeerIdentity *ret;
802 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
803 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
804 GNUNET_CONTAINER_multipeermap_size (valid_peers));
805 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
806 get_rand_peer_iterator,
808 ret = iterator_cls->peer;
809 GNUNET_free (iterator_cls);
815 * @brief Add a given @a peer to valid peers.
817 * If valid peers are already #num_valid_peers_max, delete a peer previously.
819 * @param peer The peer that is added to the valid peers.
820 * @param valid_peers Peer map of valid peers to which to add the @a peer
822 * @return #GNUNET_YES if no other peer had to be removed
823 * #GNUNET_NO otherwise
826 add_valid_peer (const struct GNUNET_PeerIdentity *peer,
827 struct GNUNET_CONTAINER_MultiPeerMap *valid_peers)
829 const struct GNUNET_PeerIdentity *rand_peer;
833 /* Remove random peers until there is space for a new one */
834 while (num_valid_peers_max <=
835 GNUNET_CONTAINER_multipeermap_size (valid_peers))
837 rand_peer = get_random_peer_from_peermap (valid_peers);
838 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
841 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
842 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
843 if (valid_peers == msub->valid_peers)
845 GNUNET_STATISTICS_set (stats,
847 GNUNET_CONTAINER_multipeermap_size (valid_peers),
854 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
857 * @brief Set the peer flag to living and
858 * call the pending operations on this peer.
860 * Also adds peer to #valid_peers.
862 * @param peer_ctx the #PeerContext of the peer to set online
865 set_peer_online (struct PeerContext *peer_ctx)
867 struct GNUNET_PeerIdentity *peer;
870 peer = &peer_ctx->peer_id;
871 LOG (GNUNET_ERROR_TYPE_DEBUG,
872 "Peer %s is online and valid, calling %i pending operations on it\n",
874 peer_ctx->num_pending_ops);
876 if (NULL != peer_ctx->online_check_pending)
878 LOG (GNUNET_ERROR_TYPE_DEBUG,
879 "Removing pending online check for peer %s\n",
880 GNUNET_i2s (&peer_ctx->peer_id));
881 // TODO wait until cadet sets mq->cancel_impl
882 //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
883 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
884 peer_ctx->online_check_pending = NULL;
887 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
889 /* Call pending operations */
890 for (i = 0; i < peer_ctx->num_pending_ops; i++)
892 peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
894 GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
898 cleanup_destroyed_channel (void *cls,
899 const struct GNUNET_CADET_Channel *channel);
901 /* Declaration of handlers */
903 handle_peer_check (void *cls,
904 const struct GNUNET_MessageHeader *msg);
907 handle_peer_push (void *cls,
908 const struct GNUNET_MessageHeader *msg);
911 handle_peer_pull_request (void *cls,
912 const struct GNUNET_MessageHeader *msg);
915 check_peer_pull_reply (void *cls,
916 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
919 handle_peer_pull_reply (void *cls,
920 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
922 /* End declaration of handlers */
925 * @brief Allocate memory for a new channel context and insert it into DLL
927 * @param peer_ctx context of the according peer
929 * @return The channel context
931 static struct ChannelCtx *
932 add_channel_ctx (struct PeerContext *peer_ctx)
934 struct ChannelCtx *channel_ctx;
935 channel_ctx = GNUNET_new (struct ChannelCtx);
936 channel_ctx->peer_ctx = peer_ctx;
942 * @brief Free memory and NULL pointers.
944 * @param channel_ctx The channel context.
947 remove_channel_ctx (struct ChannelCtx *channel_ctx)
949 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
951 if (NULL != channel_ctx->destruction_task)
953 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
954 channel_ctx->destruction_task = NULL;
957 GNUNET_free (channel_ctx);
959 if (NULL == peer_ctx) return;
960 if (channel_ctx == peer_ctx->send_channel_ctx)
962 peer_ctx->send_channel_ctx = NULL;
965 else if (channel_ctx == peer_ctx->recv_channel_ctx)
967 peer_ctx->recv_channel_ctx = NULL;
973 * @brief Get the channel of a peer. If not existing, create.
975 * @param peer_ctx Context of the peer of which to get the channel
976 * @return the #GNUNET_CADET_Channel used to send data to @a peer_ctx
978 struct GNUNET_CADET_Channel *
979 get_channel (struct PeerContext *peer_ctx)
981 /* There exists a copy-paste-clone in run() */
982 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
983 GNUNET_MQ_hd_fixed_size (peer_check,
984 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
985 struct GNUNET_MessageHeader,
987 GNUNET_MQ_hd_fixed_size (peer_push,
988 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
989 struct GNUNET_MessageHeader,
991 GNUNET_MQ_hd_fixed_size (peer_pull_request,
992 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
993 struct GNUNET_MessageHeader,
995 GNUNET_MQ_hd_var_size (peer_pull_reply,
996 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
997 struct GNUNET_RPS_P2P_PullReplyMessage,
999 GNUNET_MQ_handler_end ()
1003 if (NULL == peer_ctx->send_channel_ctx)
1005 LOG (GNUNET_ERROR_TYPE_DEBUG,
1006 "Trying to establish channel to peer %s\n",
1007 GNUNET_i2s (&peer_ctx->peer_id));
1008 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
1009 peer_ctx->send_channel_ctx->channel =
1010 GNUNET_CADET_channel_create (cadet_handle,
1011 peer_ctx->send_channel_ctx, /* context */
1013 &peer_ctx->sub->hash,
1014 GNUNET_CADET_OPTION_RELIABLE,
1015 NULL, /* WindowSize handler */
1016 &cleanup_destroyed_channel, /* Disconnect handler */
1019 GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
1020 GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
1021 return peer_ctx->send_channel_ctx->channel;
1026 * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
1028 * If we already have a message queue open to this client,
1029 * simply return it, otherways create one.
1031 * @param peer_ctx Context of the peer of whicht to get the mq
1032 * @return the #GNUNET_MQ_Handle
1034 static struct GNUNET_MQ_Handle *
1035 get_mq (struct PeerContext *peer_ctx)
1037 if (NULL == peer_ctx->mq)
1039 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer_ctx));
1041 return peer_ctx->mq;
1045 * @brief Add an envelope to a message passed to mq to list of pending messages
1047 * @param peer_ctx Context of the peer for which to insert the envelope
1048 * @param ev envelope to the message
1049 * @param type type of the message to be sent
1050 * @return pointer to pending message
1052 static struct PendingMessage *
1053 insert_pending_message (struct PeerContext *peer_ctx,
1054 struct GNUNET_MQ_Envelope *ev,
1057 struct PendingMessage *pending_msg;
1059 pending_msg = GNUNET_new (struct PendingMessage);
1060 pending_msg->ev = ev;
1061 pending_msg->peer_ctx = peer_ctx;
1062 pending_msg->type = type;
1063 GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
1064 peer_ctx->pending_messages_tail,
1071 * @brief Remove a pending message from the respective DLL
1073 * @param pending_msg the pending message to remove
1074 * @param cancel whether to cancel the pending message, too
1077 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
1079 struct PeerContext *peer_ctx;
1082 peer_ctx = pending_msg->peer_ctx;
1083 GNUNET_assert (NULL != peer_ctx);
1084 GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
1085 peer_ctx->pending_messages_tail,
1087 // TODO wait for the cadet implementation of message cancellation
1088 //if (GNUNET_YES == cancel)
1090 // GNUNET_MQ_send_cancel (pending_msg->ev);
1092 GNUNET_free (pending_msg);
1097 * @brief This is called in response to the first message we sent as a
1100 * @param cls #PeerContext of peer with pending online check
1103 mq_online_check_successful (void *cls)
1105 struct PeerContext *peer_ctx = cls;
1107 if (NULL != peer_ctx->online_check_pending)
1109 LOG (GNUNET_ERROR_TYPE_DEBUG,
1110 "Online check for peer %s was successfull\n",
1111 GNUNET_i2s (&peer_ctx->peer_id));
1112 remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
1113 peer_ctx->online_check_pending = NULL;
1114 set_peer_online (peer_ctx);
1115 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1120 * Issue a check whether peer is online
1122 * @param peer_ctx the context of the peer
1125 check_peer_online (struct PeerContext *peer_ctx)
1127 LOG (GNUNET_ERROR_TYPE_DEBUG,
1128 "Get informed about peer %s getting online\n",
1129 GNUNET_i2s (&peer_ctx->peer_id));
1131 struct GNUNET_MQ_Handle *mq;
1132 struct GNUNET_MQ_Envelope *ev;
1134 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
1135 peer_ctx->online_check_pending =
1136 insert_pending_message (peer_ctx, ev, "Check online");
1137 mq = get_mq (peer_ctx);
1138 GNUNET_MQ_notify_sent (ev,
1139 mq_online_check_successful,
1141 GNUNET_MQ_send (mq, ev);
1142 if (peer_ctx->sub == msub)
1144 GNUNET_STATISTICS_update (stats,
1145 "# pending online checks",
1153 * @brief Check whether function of type #PeerOp was already scheduled
1155 * The array with pending operations will probably never grow really big, so
1156 * iterating over it should be ok.
1158 * @param peer_ctx Context of the peer to check for the operation
1159 * @param peer_op the operation (#PeerOp) on the peer
1161 * @return #GNUNET_YES if this operation is scheduled on that peer
1162 * #GNUNET_NO otherwise
1165 check_operation_scheduled (const struct PeerContext *peer_ctx,
1166 const PeerOp peer_op)
1170 for (i = 0; i < peer_ctx->num_pending_ops; i++)
1171 if (peer_op == peer_ctx->pending_ops[i].op)
1178 * @brief Callback for scheduler to destroy a channel
1180 * @param cls Context of the channel
1183 destroy_channel (struct ChannelCtx *channel_ctx)
1185 struct GNUNET_CADET_Channel *channel;
1187 if (NULL != channel_ctx->destruction_task)
1189 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
1190 channel_ctx->destruction_task = NULL;
1192 GNUNET_assert (channel_ctx->channel != NULL);
1193 channel = channel_ctx->channel;
1194 channel_ctx->channel = NULL;
1195 GNUNET_CADET_channel_destroy (channel);
1196 remove_channel_ctx (channel_ctx);
1201 * @brief Destroy a cadet channel.
1203 * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
1208 destroy_channel_cb (void *cls)
1210 struct ChannelCtx *channel_ctx = cls;
1212 channel_ctx->destruction_task = NULL;
1213 destroy_channel (channel_ctx);
1218 * @brief Schedule the destruction of a channel for immediately afterwards.
1220 * In case a channel is to be destroyed from within the callback to the
1221 * destruction of another channel (send channel), we cannot call
1222 * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
1225 * @param channel_ctx channel to be destroyed.
1228 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
1230 GNUNET_assert (NULL ==
1231 channel_ctx->destruction_task);
1232 GNUNET_assert (NULL !=
1233 channel_ctx->channel);
1234 channel_ctx->destruction_task =
1235 GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
1241 * @brief Remove peer
1243 * - Empties the list with pending operations
1244 * - Empties the list with pending messages
1245 * - Cancels potentially existing online check
1246 * - Schedules closing of send and recv channels
1247 * - Removes peer from peer map
1249 * @param peer_ctx Context of the peer to be destroyed
1250 * @return #GNUNET_YES if peer was removed
1251 * #GNUNET_NO otherwise
1254 destroy_peer (struct PeerContext *peer_ctx)
1256 GNUNET_assert (NULL != peer_ctx);
1257 GNUNET_assert (NULL != peer_ctx->sub->peer_map);
1259 GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1260 &peer_ctx->peer_id))
1264 SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
1265 LOG (GNUNET_ERROR_TYPE_DEBUG,
1266 "Going to remove peer %s\n",
1267 GNUNET_i2s (&peer_ctx->peer_id));
1268 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
1270 /* Clear list of pending operations */
1271 // TODO this probably leaks memory
1272 // ('only' the cls to the function. Not sure what to do with it)
1273 GNUNET_array_grow (peer_ctx->pending_ops,
1274 peer_ctx->num_pending_ops,
1276 /* Remove all pending messages */
1277 while (NULL != peer_ctx->pending_messages_head)
1279 LOG (GNUNET_ERROR_TYPE_DEBUG,
1280 "Removing unsent %s\n",
1281 peer_ctx->pending_messages_head->type);
1282 /* Cancle pending message, too */
1283 if ( (NULL != peer_ctx->online_check_pending) &&
1284 (0 == memcmp (peer_ctx->pending_messages_head,
1285 peer_ctx->online_check_pending,
1286 sizeof (struct PendingMessage))) )
1288 peer_ctx->online_check_pending = NULL;
1289 if (peer_ctx->sub == msub)
1291 GNUNET_STATISTICS_update (stats,
1292 "# pending online checks",
1297 remove_pending_message (peer_ctx->pending_messages_head,
1301 /* If we are still waiting for notification whether this peer is online
1302 * cancel the according task */
1303 if (NULL != peer_ctx->online_check_pending)
1305 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306 "Removing pending online check for peer %s\n",
1307 GNUNET_i2s (&peer_ctx->peer_id));
1308 // TODO wait until cadet sets mq->cancel_impl
1309 //GNUNET_MQ_send_cancel (peer_ctx->online_check_pending->ev);
1310 remove_pending_message (peer_ctx->online_check_pending,
1312 peer_ctx->online_check_pending = NULL;
1315 if (NULL != peer_ctx->send_channel_ctx)
1317 /* This is possibly called from within channel destruction */
1318 peer_ctx->send_channel_ctx->peer_ctx = NULL;
1319 schedule_channel_destruction (peer_ctx->send_channel_ctx);
1320 peer_ctx->send_channel_ctx = NULL;
1321 peer_ctx->mq = NULL;
1323 if (NULL != peer_ctx->recv_channel_ctx)
1325 /* This is possibly called from within channel destruction */
1326 peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1327 schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1328 peer_ctx->recv_channel_ctx = NULL;
1332 GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
1333 &peer_ctx->peer_id))
1335 LOG (GNUNET_ERROR_TYPE_WARNING,
1336 "removing peer from peer_ctx->sub->peer_map failed\n");
1338 if (peer_ctx->sub == msub)
1340 GNUNET_STATISTICS_set (stats,
1342 GNUNET_CONTAINER_multipeermap_size (peer_ctx->sub->peer_map),
1345 GNUNET_free (peer_ctx);
1351 * Iterator over hash map entries. Deletes all contexts of peers.
1353 * @param cls closure
1354 * @param key current public key
1355 * @param value value in the hash map
1356 * @return #GNUNET_YES if we should continue to iterate,
1357 * #GNUNET_NO if not.
1360 peermap_clear_iterator (void *cls,
1361 const struct GNUNET_PeerIdentity *key,
1364 struct Sub *sub = cls;
1367 destroy_peer (get_peer_ctx (sub->peer_map, key));
1373 * @brief This is called once a message is sent.
1375 * Removes the pending message
1377 * @param cls type of the message that was sent
1380 mq_notify_sent_cb (void *cls)
1382 struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1383 LOG (GNUNET_ERROR_TYPE_DEBUG,
1386 if (pending_msg->peer_ctx->sub == msub)
1388 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1389 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1390 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1391 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1392 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1393 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1394 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12) &&
1395 NULL != map_single_hop &&
1396 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
1397 &pending_msg->peer_ctx->peer_id))
1398 GNUNET_STATISTICS_update(stats,
1399 "# pull requests sent (multi-hop peer)",
1403 /* Do not cancle message */
1404 remove_pending_message (pending_msg, GNUNET_NO);
1409 * @brief Iterator function for #store_valid_peers.
1411 * Implements #GNUNET_CONTAINER_PeerMapIterator.
1412 * Writes single peer to disk.
1414 * @param cls the file handle to write to.
1415 * @param peer current peer
1416 * @param value unused
1418 * @return #GNUNET_YES if we should continue to
1420 * #GNUNET_NO if not.
1423 store_peer_presistently_iterator (void *cls,
1424 const struct GNUNET_PeerIdentity *peer,
1427 const struct GNUNET_DISK_FileHandle *fh = cls;
1428 char peer_string[128];
1437 size = GNUNET_snprintf (peer_string,
1438 sizeof (peer_string),
1440 GNUNET_i2s_full (peer));
1441 GNUNET_assert (53 == size);
1442 ret = GNUNET_DISK_file_write (fh,
1445 GNUNET_assert (size == ret);
1451 * @brief Store the peers currently in #valid_peers to disk.
1453 * @param sub Sub for which to store the valid peers
1456 store_valid_peers (const struct Sub *sub)
1458 struct GNUNET_DISK_FileHandle *fh;
1459 uint32_t number_written_peers;
1462 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1467 ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
1468 if (GNUNET_SYSERR == ret)
1470 LOG (GNUNET_ERROR_TYPE_WARNING,
1471 "Not able to create directory for file `%s'\n",
1472 sub->filename_valid_peers);
1475 else if (GNUNET_NO == ret)
1477 LOG (GNUNET_ERROR_TYPE_WARNING,
1478 "Directory for file `%s' exists but is not writable for us\n",
1479 sub->filename_valid_peers);
1482 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1483 GNUNET_DISK_OPEN_WRITE |
1484 GNUNET_DISK_OPEN_CREATE,
1485 GNUNET_DISK_PERM_USER_READ |
1486 GNUNET_DISK_PERM_USER_WRITE);
1489 LOG (GNUNET_ERROR_TYPE_WARNING,
1490 "Not able to write valid peers to file `%s'\n",
1491 sub->filename_valid_peers);
1494 LOG (GNUNET_ERROR_TYPE_DEBUG,
1495 "Writing %u valid peers to disk\n",
1496 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1497 number_written_peers =
1498 GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
1499 store_peer_presistently_iterator,
1501 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1502 GNUNET_assert (number_written_peers ==
1503 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1508 * @brief Convert string representation of peer id to peer id.
1510 * Counterpart to #GNUNET_i2s_full.
1512 * @param string_repr The string representation of the peer id
1514 * @return The peer id
1516 static const struct GNUNET_PeerIdentity *
1517 s2i_full (const char *string_repr)
1519 struct GNUNET_PeerIdentity *peer;
1523 peer = GNUNET_new (struct GNUNET_PeerIdentity);
1524 len = strlen (string_repr);
1527 LOG (GNUNET_ERROR_TYPE_WARNING,
1528 "Not able to convert string representation of PeerID to PeerID\n"
1529 "Sting representation: %s (len %lu) - too short\n",
1538 ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1541 if (GNUNET_OK != ret)
1543 LOG (GNUNET_ERROR_TYPE_WARNING,
1544 "Not able to convert string representation of PeerID to PeerID\n"
1545 "Sting representation: %s\n",
1554 * @brief Restore the peers on disk to #valid_peers.
1556 * @param sub Sub for which to restore the valid peers
1559 restore_valid_peers (const struct Sub *sub)
1563 struct GNUNET_DISK_FileHandle *fh;
1568 const struct GNUNET_PeerIdentity *peer;
1570 if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
1575 if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
1579 fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
1580 GNUNET_DISK_OPEN_READ,
1581 GNUNET_DISK_PERM_NONE);
1582 GNUNET_assert (NULL != fh);
1583 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1584 num_peers = file_size / 53;
1585 buf = GNUNET_malloc (file_size);
1586 size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1587 GNUNET_assert (size_read == file_size);
1588 LOG (GNUNET_ERROR_TYPE_DEBUG,
1589 "Restoring %" PRIu32 " peers from file `%s'\n",
1591 sub->filename_valid_peers);
1592 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1594 str_repr = GNUNET_strndup (iter_buf, 53);
1595 peer = s2i_full (str_repr);
1596 GNUNET_free (str_repr);
1597 add_valid_peer (peer, sub->valid_peers);
1598 LOG (GNUNET_ERROR_TYPE_DEBUG,
1599 "Restored valid peer %s from disk\n",
1600 GNUNET_i2s_full (peer));
1604 LOG (GNUNET_ERROR_TYPE_DEBUG,
1605 "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
1607 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1608 if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
1610 LOG (GNUNET_ERROR_TYPE_WARNING,
1611 "Number of restored peers does not match file size. Have probably duplicates.\n");
1613 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1614 LOG (GNUNET_ERROR_TYPE_DEBUG,
1615 "Restored %u valid peers from disk\n",
1616 GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
1621 * @brief Delete storage of peers that was created with #initialise_peers ()
1623 * @param sub Sub for which the storage is deleted
1626 peers_terminate (struct Sub *sub)
1628 if (GNUNET_SYSERR ==
1629 GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
1630 &peermap_clear_iterator,
1633 LOG (GNUNET_ERROR_TYPE_WARNING,
1634 "Iteration destroying peers was aborted.\n");
1636 GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
1637 sub->peer_map = NULL;
1638 store_valid_peers (sub);
1639 GNUNET_free (sub->filename_valid_peers);
1640 sub->filename_valid_peers = NULL;
1641 GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
1642 sub->valid_peers = NULL;
1647 * Iterator over #valid_peers hash map entries.
1649 * @param cls Closure that contains iterator function and closure
1650 * @param peer current peer id
1651 * @param value value in the hash map - unused
1652 * @return #GNUNET_YES if we should continue to
1654 * #GNUNET_NO if not.
1657 valid_peer_iterator (void *cls,
1658 const struct GNUNET_PeerIdentity *peer,
1661 struct PeersIteratorCls *it_cls = cls;
1664 return it_cls->iterator (it_cls->cls, peer);
1669 * @brief Get all currently known, valid peer ids.
1671 * @param valid_peers Peer map containing the valid peers in question
1672 * @param iterator function to call on each peer id
1673 * @param it_cls extra argument to @a iterator
1674 * @return the number of key value pairs processed,
1675 * #GNUNET_SYSERR if it aborted iteration
1678 get_valid_peers (struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1679 PeersIterator iterator,
1682 struct PeersIteratorCls *cls;
1685 cls = GNUNET_new (struct PeersIteratorCls);
1686 cls->iterator = iterator;
1688 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1689 valid_peer_iterator,
1697 * @brief Add peer to known peers.
1699 * This function is called on new peer_ids from 'external' sources
1700 * (client seed, cadet get_peers(), ...)
1702 * @param sub Sub with the peer map that the @a peer will be added to
1703 * @param peer the new #GNUNET_PeerIdentity
1705 * @return #GNUNET_YES if peer was inserted
1706 * #GNUNET_NO otherwise
1709 insert_peer (struct Sub *sub,
1710 const struct GNUNET_PeerIdentity *peer)
1712 if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
1714 return GNUNET_NO; /* We already know this peer - nothing to do */
1716 (void) create_peer_ctx (sub, peer);
1722 * @brief Check whether flags on a peer are set.
1724 * @param peer_map Peer map that is expected to contain the @a peer
1725 * @param peer the peer to check the flag of
1726 * @param flags the flags to check
1728 * @return #GNUNET_SYSERR if peer is not known
1729 * #GNUNET_YES if all given flags are set
1730 * #GNUNET_NO otherwise
1733 check_peer_flag (const struct GNUNET_CONTAINER_MultiPeerMap *peer_map,
1734 const struct GNUNET_PeerIdentity *peer,
1735 enum Peers_PeerFlags flags)
1737 struct PeerContext *peer_ctx;
1739 if (GNUNET_NO == check_peer_known (peer_map, peer))
1741 return GNUNET_SYSERR;
1743 peer_ctx = get_peer_ctx (peer_map, peer);
1744 return check_peer_flag_set (peer_ctx, flags);
1748 * @brief Try connecting to a peer to see whether it is online
1750 * If not known yet, insert into known peers
1752 * @param sub Sub which would contain the @a peer
1753 * @param peer the peer whose online is to be checked
1754 * @return #GNUNET_YES if the check was issued
1755 * #GNUNET_NO otherwise
1758 issue_peer_online_check (struct Sub *sub,
1759 const struct GNUNET_PeerIdentity *peer)
1761 struct PeerContext *peer_ctx;
1763 (void) insert_peer (sub, peer); // TODO even needed?
1764 peer_ctx = get_peer_ctx (sub->peer_map, peer);
1765 if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
1766 (NULL == peer_ctx->online_check_pending) )
1768 check_peer_online (peer_ctx);
1776 * @brief Check if peer is removable.
1779 * - a recv channel exists
1780 * - there are pending messages
1781 * - there is no pending pull reply
1783 * @param peer_ctx Context of the peer in question
1784 * @return #GNUNET_YES if peer is removable
1785 * #GNUNET_NO if peer is NOT removable
1786 * #GNUNET_SYSERR if peer is not known
1789 check_removable (const struct PeerContext *peer_ctx)
1791 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
1792 &peer_ctx->peer_id))
1794 return GNUNET_SYSERR;
1797 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1798 (NULL != peer_ctx->pending_messages_head) ||
1799 (GNUNET_YES == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1808 * @brief Check whether @a peer is actually a peer.
1810 * A valid peer is a peer that we know exists eg. we were connected to once.
1812 * @param valid_peers Peer map that would contain the @a peer
1813 * @param peer peer in question
1815 * @return #GNUNET_YES if peer is valid
1816 * #GNUNET_NO if peer is not valid
1819 check_peer_valid (const struct GNUNET_CONTAINER_MultiPeerMap *valid_peers,
1820 const struct GNUNET_PeerIdentity *peer)
1822 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1827 * @brief Indicate that we want to send to the other peer
1829 * This establishes a sending channel
1831 * @param peer_ctx Context of the target peer
1834 indicate_sending_intention (struct PeerContext *peer_ctx)
1836 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
1837 &peer_ctx->peer_id));
1838 (void) get_channel (peer_ctx);
1843 * @brief Check whether other peer has the intention to send/opened channel
1846 * @param peer_ctx Context of the peer in question
1848 * @return #GNUNET_YES if peer has the intention to send
1849 * #GNUNET_NO otherwise
1852 check_peer_send_intention (const struct PeerContext *peer_ctx)
1854 if (NULL != peer_ctx->recv_channel_ctx)
1863 * Handle the channel a peer opens to us.
1865 * @param cls The closure - Sub
1866 * @param channel The channel the peer wants to establish
1867 * @param initiator The peer's peer ID
1869 * @return initial channel context for the channel
1870 * (can be NULL -- that's not an error)
1873 handle_inbound_channel (void *cls,
1874 struct GNUNET_CADET_Channel *channel,
1875 const struct GNUNET_PeerIdentity *initiator)
1877 struct PeerContext *peer_ctx;
1878 struct ChannelCtx *channel_ctx;
1879 struct Sub *sub = cls;
1881 LOG (GNUNET_ERROR_TYPE_DEBUG,
1882 "New channel was established to us (Peer %s).\n",
1883 GNUNET_i2s (initiator));
1884 GNUNET_assert (NULL != channel); /* according to cadet API */
1885 /* Make sure we 'know' about this peer */
1886 peer_ctx = create_or_get_peer_ctx (sub, initiator);
1887 set_peer_online (peer_ctx);
1888 (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
1889 channel_ctx = add_channel_ctx (peer_ctx);
1890 channel_ctx->channel = channel;
1891 /* We only accept one incoming channel per peer */
1892 if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
1895 LOG (GNUNET_ERROR_TYPE_WARNING,
1896 "Already got one receive channel. Destroying old one.\n");
1897 GNUNET_break_op (0);
1898 destroy_channel (peer_ctx->recv_channel_ctx);
1899 peer_ctx->recv_channel_ctx = channel_ctx;
1900 /* return the channel context */
1903 peer_ctx->recv_channel_ctx = channel_ctx;
1909 * @brief Check whether a sending channel towards the given peer exists
1911 * @param peer_ctx Context of the peer in question
1913 * @return #GNUNET_YES if a sending channel towards that peer exists
1914 * #GNUNET_NO otherwise
1917 check_sending_channel_exists (const struct PeerContext *peer_ctx)
1919 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1920 &peer_ctx->peer_id))
1921 { /* If no such peer exists, there is no channel */
1924 if (NULL == peer_ctx->send_channel_ctx)
1933 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1934 * intention to another peer
1936 * @param peer_ctx Context to the peer
1937 * @return #GNUNET_YES if channel was destroyed
1938 * #GNUNET_NO otherwise
1941 destroy_sending_channel (struct PeerContext *peer_ctx)
1943 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
1944 &peer_ctx->peer_id))
1948 if (NULL != peer_ctx->send_channel_ctx)
1950 destroy_channel (peer_ctx->send_channel_ctx);
1951 (void) check_connected (peer_ctx);
1958 * @brief Send a message to another peer.
1960 * Keeps track about pending messages so they can be properly removed when the
1961 * peer is destroyed.
1963 * @param peer_ctx Context of the peer to which the message is to be sent
1964 * @param ev envelope of the message
1965 * @param type type of the message
1968 send_message (struct PeerContext *peer_ctx,
1969 struct GNUNET_MQ_Envelope *ev,
1972 struct PendingMessage *pending_msg;
1973 struct GNUNET_MQ_Handle *mq;
1975 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1976 "Sending message to %s of type %s\n",
1977 GNUNET_i2s (&peer_ctx->peer_id),
1979 pending_msg = insert_pending_message (peer_ctx, ev, type);
1980 mq = get_mq (peer_ctx);
1981 GNUNET_MQ_notify_sent (ev,
1984 GNUNET_MQ_send (mq, ev);
1988 * @brief Schedule a operation on given peer
1990 * Avoids scheduling an operation twice.
1992 * @param peer_ctx Context of the peer for which to schedule the operation
1993 * @param peer_op the operation to schedule
1994 * @param cls Closure to @a peer_op
1996 * @return #GNUNET_YES if the operation was scheduled
1997 * #GNUNET_NO otherwise
2000 schedule_operation (struct PeerContext *peer_ctx,
2001 const PeerOp peer_op,
2004 struct PeerPendingOp pending_op;
2006 GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
2007 &peer_ctx->peer_id));
2009 //TODO if ONLINE execute immediately
2011 if (GNUNET_NO == check_operation_scheduled (peer_ctx, peer_op))
2013 pending_op.op = peer_op;
2014 pending_op.op_cls = cls;
2015 GNUNET_array_append (peer_ctx->pending_ops,
2016 peer_ctx->num_pending_ops,
2023 /***********************************************************************
2024 * /Old gnunet-service-rps_peers.c
2025 ***********************************************************************/
2028 /***********************************************************************
2029 * Housekeeping with clients
2030 ***********************************************************************/
2033 * Closure used to pass the client and the id to the callback
2034 * that replies to a client's request
2041 struct ReplyCls *next;
2042 struct ReplyCls *prev;
2045 * The identifier of the request
2050 * The handle to the request
2052 struct RPS_SamplerRequestHandle *req_handle;
2055 * The client handle to send the reply to
2057 struct ClientContext *cli_ctx;
2062 * Struct used to store the context of a connected client.
2064 struct ClientContext
2069 struct ClientContext *next;
2070 struct ClientContext *prev;
2073 * The message queue to communicate with the client.
2075 struct GNUNET_MQ_Handle *mq;
2078 * @brief How many updates this client expects to receive.
2080 int64_t view_updates_left;
2083 * @brief Whether this client wants to receive stream updates.
2084 * Either #GNUNET_YES or #GNUNET_NO
2086 int8_t stream_update;
2089 * The client handle to send the reply to
2091 struct GNUNET_SERVICE_Client *client;
2094 * The #Sub this context belongs to
2100 * DLL with all clients currently connected to us
2102 struct ClientContext *cli_ctx_head;
2103 struct ClientContext *cli_ctx_tail;
2105 /***********************************************************************
2106 * /Housekeeping with clients
2107 ***********************************************************************/
2113 /***********************************************************************
2115 ***********************************************************************/
2119 * Print peerlist to log.
2122 print_peer_list (struct GNUNET_PeerIdentity *list,
2127 LOG (GNUNET_ERROR_TYPE_DEBUG,
2128 "Printing peer list of length %u at %p:\n",
2131 for (i = 0 ; i < len ; i++)
2133 LOG (GNUNET_ERROR_TYPE_DEBUG,
2135 i, GNUNET_i2s (&list[i]));
2141 * Remove peer from list.
2144 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2145 unsigned int *list_size,
2146 const struct GNUNET_PeerIdentity *peer)
2149 struct GNUNET_PeerIdentity *tmp;
2153 LOG (GNUNET_ERROR_TYPE_DEBUG,
2154 "Removing peer %s from list at %p\n",
2158 for ( i = 0 ; i < *list_size ; i++ )
2160 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2162 if (i < *list_size -1)
2163 { /* Not at the last entry -- shift peers left */
2164 memmove (&tmp[i], &tmp[i +1],
2165 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2167 /* Remove last entry (should be now useless PeerID) */
2168 GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2176 * Insert PeerID in #view
2178 * Called once we know a peer is online.
2179 * Implements #PeerOp
2181 * @return GNUNET_OK if peer was actually inserted
2182 * GNUNET_NO if peer was not inserted
2185 insert_in_view_op (void *cls,
2186 const struct GNUNET_PeerIdentity *peer);
2189 * Insert PeerID in #view
2191 * Called once we know a peer is online.
2193 * @param sub Sub in with the view to insert in
2194 * @param peer the peer to insert
2196 * @return GNUNET_OK if peer was actually inserted
2197 * GNUNET_NO if peer was not inserted
2200 insert_in_view (struct Sub *sub,
2201 const struct GNUNET_PeerIdentity *peer)
2203 struct PeerContext *peer_ctx;
2207 online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
2208 peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
2209 if ( (GNUNET_NO == online) ||
2210 (GNUNET_SYSERR == online) ) /* peer is not even known */
2212 (void) issue_peer_online_check (sub, peer);
2213 (void) schedule_operation (peer_ctx, insert_in_view_op, sub);
2216 /* Open channel towards peer to keep connection open */
2217 indicate_sending_intention (peer_ctx);
2218 ret = View_put (sub->view, peer);
2219 if (peer_ctx->sub == msub)
2221 GNUNET_STATISTICS_set (stats,
2223 View_size (peer_ctx->sub->view),
2231 * @brief Send view to client
2233 * @param cli_ctx the context of the client
2234 * @param view_array the peerids of the view as array (can be empty)
2235 * @param view_size the size of the view array (can be 0)
2238 send_view (const struct ClientContext *cli_ctx,
2239 const struct GNUNET_PeerIdentity *view_array,
2242 struct GNUNET_MQ_Envelope *ev;
2243 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2246 if (NULL == view_array)
2248 if (NULL == cli_ctx->sub) sub = msub;
2249 else sub = cli_ctx->sub;
2250 view_size = View_size (sub->view);
2251 view_array = View_get_as_array (sub->view);
2254 ev = GNUNET_MQ_msg_extra (out_msg,
2255 view_size * sizeof (struct GNUNET_PeerIdentity),
2256 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2257 out_msg->num_peers = htonl (view_size);
2259 GNUNET_memcpy (&out_msg[1],
2261 view_size * sizeof (struct GNUNET_PeerIdentity));
2262 GNUNET_MQ_send (cli_ctx->mq, ev);
2267 * @brief Send peer from biased stream to client.
2269 * TODO merge with send_view, parameterise
2271 * @param cli_ctx the context of the client
2272 * @param view_array the peerids of the view as array (can be empty)
2273 * @param view_size the size of the view array (can be 0)
2276 send_stream_peers (const struct ClientContext *cli_ctx,
2278 const struct GNUNET_PeerIdentity *peers)
2280 struct GNUNET_MQ_Envelope *ev;
2281 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2283 GNUNET_assert (NULL != peers);
2285 ev = GNUNET_MQ_msg_extra (out_msg,
2286 num_peers * sizeof (struct GNUNET_PeerIdentity),
2287 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2288 out_msg->num_peers = htonl (num_peers);
2290 GNUNET_memcpy (&out_msg[1],
2292 num_peers * sizeof (struct GNUNET_PeerIdentity));
2293 GNUNET_MQ_send (cli_ctx->mq, ev);
2298 * @brief sends updates to clients that are interested
2300 * @param sub Sub for which to notify clients
2303 clients_notify_view_update (const struct Sub *sub)
2305 struct ClientContext *cli_ctx_iter;
2307 const struct GNUNET_PeerIdentity *view_array;
2309 num_peers = View_size (sub->view);
2310 view_array = View_get_as_array(sub->view);
2311 /* check size of view is small enough */
2312 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2314 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2315 "View is too big to send\n");
2319 for (cli_ctx_iter = cli_ctx_head;
2320 NULL != cli_ctx_iter;
2321 cli_ctx_iter = cli_ctx_iter->next)
2323 if (1 < cli_ctx_iter->view_updates_left)
2325 /* Client wants to receive limited amount of updates */
2326 cli_ctx_iter->view_updates_left -= 1;
2327 } else if (1 == cli_ctx_iter->view_updates_left)
2329 /* Last update of view for client */
2330 cli_ctx_iter->view_updates_left = -1;
2331 } else if (0 > cli_ctx_iter->view_updates_left) {
2332 /* Client is not interested in updates */
2335 /* else _updates_left == 0 - infinite amount of updates */
2338 send_view (cli_ctx_iter, view_array, num_peers);
2344 * @brief sends updates to clients that are interested
2346 * @param num_peers Number of peers to send
2347 * @param peers the array of peers to send
2350 clients_notify_stream_peer (const struct Sub *sub,
2352 const struct GNUNET_PeerIdentity *peers)
2353 // TODO enum StreamPeerSource)
2355 struct ClientContext *cli_ctx_iter;
2357 LOG (GNUNET_ERROR_TYPE_DEBUG,
2358 "Got peer (%s) from biased stream - update all clients\n",
2359 GNUNET_i2s (peers));
2361 for (cli_ctx_iter = cli_ctx_head;
2362 NULL != cli_ctx_iter;
2363 cli_ctx_iter = cli_ctx_iter->next)
2365 if (GNUNET_YES == cli_ctx_iter->stream_update &&
2366 (sub == cli_ctx_iter->sub || sub == msub))
2368 send_stream_peers (cli_ctx_iter, num_peers, peers);
2375 * Put random peer from sampler into the view as history update.
2377 * @param ids Array of Peers to insert into view
2378 * @param num_peers Number of peers to insert
2379 * @param cls Closure - The Sub for which this is to be done
2382 hist_update (const struct GNUNET_PeerIdentity *ids,
2387 struct Sub *sub = cls;
2389 for (i = 0; i < num_peers; i++)
2392 if (GNUNET_YES != check_peer_known (sub->peer_map, &ids[i]))
2394 LOG (GNUNET_ERROR_TYPE_WARNING,
2395 "Peer in history update not known!\n");
2398 inserted = insert_in_view (sub, &ids[i]);
2399 if (GNUNET_OK == inserted)
2401 clients_notify_stream_peer (sub, 1, &ids[i]);
2403 to_file (sub->file_name_view_log,
2405 GNUNET_i2s_full (ids));
2407 clients_notify_view_update (sub);
2412 * Wrapper around #RPS_sampler_resize()
2414 * If we do not have enough sampler elements, double current sampler size
2415 * If we have more than enough sampler elements, halv current sampler size
2417 * @param sampler The sampler to resize
2418 * @param new_size New size to which to resize
2421 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2423 unsigned int sampler_size;
2426 // TODO respect the min, max
2427 sampler_size = RPS_sampler_get_size (sampler);
2428 if (sampler_size > new_size * 4)
2430 RPS_sampler_resize (sampler, sampler_size / 2);
2432 else if (sampler_size < new_size)
2434 RPS_sampler_resize (sampler, sampler_size * 2);
2436 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2441 * Add all peers in @a peer_array to @a peer_map used as set.
2443 * @param peer_array array containing the peers
2444 * @param num_peers number of peers in @peer_array
2445 * @param peer_map the peermap to use as set
2448 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2449 unsigned int num_peers,
2450 struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2453 if (NULL == peer_map)
2455 LOG (GNUNET_ERROR_TYPE_WARNING,
2456 "Trying to add peers to non-existing peermap.\n");
2460 for (i = 0; i < num_peers; i++)
2462 GNUNET_CONTAINER_multipeermap_put (peer_map,
2465 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2466 if (msub->peer_map == peer_map)
2468 GNUNET_STATISTICS_set (stats,
2470 GNUNET_CONTAINER_multipeermap_size (peer_map),
2478 * Send a PULL REPLY to @a peer_id
2480 * @param peer_ctx Context of the peer to send the reply to
2481 * @param peer_ids the peers to send to @a peer_id
2482 * @param num_peer_ids the number of peers to send to @a peer_id
2485 send_pull_reply (struct PeerContext *peer_ctx,
2486 const struct GNUNET_PeerIdentity *peer_ids,
2487 unsigned int num_peer_ids)
2490 struct GNUNET_MQ_Envelope *ev;
2491 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2493 /* Compute actual size */
2494 send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2495 num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2497 if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2498 /* Compute number of peers to send
2499 * If too long, simply truncate */
2500 // TODO select random ones via permutation
2501 // or even better: do good protocol design
2503 (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2504 sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2505 sizeof (struct GNUNET_PeerIdentity);
2507 send_size = num_peer_ids;
2509 LOG (GNUNET_ERROR_TYPE_DEBUG,
2510 "Going to send PULL REPLY with %u peers to %s\n",
2511 send_size, GNUNET_i2s (&peer_ctx->peer_id));
2513 ev = GNUNET_MQ_msg_extra (out_msg,
2514 send_size * sizeof (struct GNUNET_PeerIdentity),
2515 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2516 out_msg->num_peers = htonl (send_size);
2517 GNUNET_memcpy (&out_msg[1], peer_ids,
2518 send_size * sizeof (struct GNUNET_PeerIdentity));
2520 send_message (peer_ctx, ev, "PULL REPLY");
2521 if (peer_ctx->sub == msub)
2523 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2525 // TODO check with send intention: as send_channel is used/opened we indicate
2526 // a sending intention without intending it.
2527 // -> clean peer afterwards?
2528 // -> use recv_channel?
2533 * Insert PeerID in #pull_map
2535 * Called once we know a peer is online.
2537 * @param cls Closure - Sub with the pull map to insert into
2538 * @param peer Peer to insert
2541 insert_in_pull_map (void *cls,
2542 const struct GNUNET_PeerIdentity *peer)
2544 struct Sub *sub = cls;
2546 CustomPeerMap_put (sub->pull_map, peer);
2551 * Insert PeerID in #view
2553 * Called once we know a peer is online.
2554 * Implements #PeerOp
2556 * @param cls Closure - Sub with view to insert peer into
2557 * @param peer the peer to insert
2560 insert_in_view_op (void *cls,
2561 const struct GNUNET_PeerIdentity *peer)
2563 struct Sub *sub = cls;
2566 inserted = insert_in_view (sub, peer);
2567 if (GNUNET_OK == inserted)
2569 clients_notify_stream_peer (sub, 1, peer);
2575 * Update sampler with given PeerID.
2576 * Implements #PeerOp
2578 * @param cls Closure - Sub containing the sampler to insert into
2579 * @param peer Peer to insert
2582 insert_in_sampler (void *cls,
2583 const struct GNUNET_PeerIdentity *peer)
2585 struct Sub *sub = cls;
2587 LOG (GNUNET_ERROR_TYPE_DEBUG,
2588 "Updating samplers with peer %s from insert_in_sampler()\n",
2590 RPS_sampler_update (sub->sampler, peer);
2591 if (0 < RPS_sampler_count_id (sub->sampler, peer))
2593 /* Make sure we 'know' about this peer */
2594 (void) issue_peer_online_check (sub, peer);
2595 /* Establish a channel towards that peer to indicate we are going to send
2597 //indicate_sending_intention (peer);
2600 sub->num_observed_peers++;
2601 GNUNET_CONTAINER_multipeermap_put
2602 (sub->observed_unique_peers,
2605 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2606 uint32_t num_observed_unique_peers =
2607 GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
2608 to_file (sub->file_name_observed_log,
2609 "%" PRIu32 " %" PRIu32 " %f\n",
2610 sub->num_observed_peers,
2611 num_observed_unique_peers,
2612 1.0*num_observed_unique_peers/sub->num_observed_peers)
2613 #endif /* TO_FILE */
2618 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2619 * If the peer is not known, online check is issued and it is
2620 * scheduled to be inserted in sampler and view.
2622 * "External sources" refer to every source except the gossip.
2624 * @param sub Sub for which @a peer was received
2625 * @param peer peer to insert/peer received
2628 got_peer (struct Sub *sub,
2629 const struct GNUNET_PeerIdentity *peer)
2631 /* If we did not know this peer already, insert it into sampler and view */
2632 if (GNUNET_YES == issue_peer_online_check (sub, peer))
2634 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2635 &insert_in_sampler, sub);
2636 schedule_operation (get_peer_ctx (sub->peer_map, peer),
2637 &insert_in_view_op, sub);
2641 GNUNET_STATISTICS_update (stats,
2650 * @brief Checks if there is a sending channel and if it is needed
2652 * @param peer_ctx Context of the peer to check
2653 * @return GNUNET_YES if sending channel exists and is still needed
2654 * GNUNET_NO otherwise
2657 check_sending_channel_needed (const struct PeerContext *peer_ctx)
2659 /* struct GNUNET_CADET_Channel *channel; */
2660 if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
2661 &peer_ctx->peer_id))
2665 if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
2667 if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
2668 &peer_ctx->peer_id)) ||
2669 (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
2670 &peer_ctx->peer_id)) ||
2671 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
2672 &peer_ctx->peer_id)) ||
2673 (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
2674 &peer_ctx->peer_id)) ||
2675 (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
2677 Peers_PULL_REPLY_PENDING)))
2678 { /* If we want to keep the connection to peer open */
2688 * @brief remove peer from our knowledge, the view, push and pull maps and
2691 * @param sub Sub with the data structures the peer is to be removed from
2692 * @param peer the peer to remove
2695 remove_peer (struct Sub *sub,
2696 const struct GNUNET_PeerIdentity *peer)
2698 (void) View_remove_peer (sub->view,
2700 CustomPeerMap_remove_peer (sub->pull_map,
2702 CustomPeerMap_remove_peer (sub->push_map,
2704 RPS_sampler_reinitialise_by_value (sub->sampler,
2706 /* We want to destroy the peer now.
2707 * Sometimes, it just seems that it's already been removed from the peer_map,
2708 * so check the peer_map first. */
2709 if (GNUNET_YES == check_peer_known (sub->peer_map,
2712 destroy_peer (get_peer_ctx (sub->peer_map,
2719 * @brief Remove data that is not needed anymore.
2721 * If the sending channel is no longer needed it is destroyed.
2723 * @param sub Sub in which the current peer is to be cleaned
2724 * @param peer the peer whose data is about to be cleaned
2727 clean_peer (struct Sub *sub,
2728 const struct GNUNET_PeerIdentity *peer)
2730 if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
2733 LOG (GNUNET_ERROR_TYPE_DEBUG,
2734 "Going to remove send channel to peer %s\n",
2736 #if ENABLE_MALICIOUS
2737 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer,
2739 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2741 #else /* ENABLE_MALICIOUS */
2742 (void) destroy_sending_channel (get_peer_ctx (sub->peer_map,
2744 #endif /* ENABLE_MALICIOUS */
2747 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (sub->peer_map,
2750 /* Peer was already removed by callback on destroyed channel */
2751 LOG (GNUNET_ERROR_TYPE_WARNING,
2752 "Peer was removed from our knowledge during cleanup\n");
2756 if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
2758 (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
2759 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2760 (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
2761 (0 == RPS_sampler_count_id (sub->sampler, peer)) &&
2762 (GNUNET_YES == check_removable (get_peer_ctx (sub->peer_map, peer))) )
2763 { /* We can safely remove this peer */
2764 LOG (GNUNET_ERROR_TYPE_DEBUG,
2765 "Going to remove peer %s\n",
2767 remove_peer (sub, peer);
2774 * @brief This is called when a channel is destroyed.
2776 * Removes peer completely from our knowledge if the send_channel was destroyed
2777 * Otherwise simply delete the recv_channel
2778 * Also check if the knowledge about this peer is still needed.
2779 * If not, remove this peer from our knowledge.
2781 * @param cls The closure - Context to the channel
2782 * @param channel The channel being closed
2785 cleanup_destroyed_channel (void *cls,
2786 const struct GNUNET_CADET_Channel *channel)
2788 struct ChannelCtx *channel_ctx = cls;
2789 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2792 channel_ctx->channel = NULL;
2793 remove_channel_ctx (channel_ctx);
2794 if (NULL != peer_ctx &&
2795 peer_ctx->send_channel_ctx == channel_ctx &&
2796 GNUNET_YES == check_sending_channel_needed (channel_ctx->peer_ctx))
2798 remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
2802 /***********************************************************************
2804 ***********************************************************************/
2808 /***********************************************************************
2810 ***********************************************************************/
2813 * @brief Create a new Sub
2815 * @param hash Hash of value shared among rps instances on other hosts that
2816 * defines a subgroup to sample from.
2817 * @param sampler_size Size of the sampler
2818 * @param round_interval Interval (in average) between two rounds
2823 new_sub (const struct GNUNET_HashCode *hash,
2824 uint32_t sampler_size,
2825 struct GNUNET_TIME_Relative round_interval)
2829 sub = GNUNET_new (struct Sub);
2831 /* With the hash generated from the secret value this service only connects
2832 * to rps instances that share the value */
2833 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2834 GNUNET_MQ_hd_fixed_size (peer_check,
2835 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
2836 struct GNUNET_MessageHeader,
2838 GNUNET_MQ_hd_fixed_size (peer_push,
2839 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
2840 struct GNUNET_MessageHeader,
2842 GNUNET_MQ_hd_fixed_size (peer_pull_request,
2843 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
2844 struct GNUNET_MessageHeader,
2846 GNUNET_MQ_hd_var_size (peer_pull_reply,
2847 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
2848 struct GNUNET_RPS_P2P_PullReplyMessage,
2850 GNUNET_MQ_handler_end ()
2854 GNUNET_CADET_open_port (cadet_handle,
2856 &handle_inbound_channel, /* Connect handler */
2858 NULL, /* WindowSize handler */
2859 &cleanup_destroyed_channel, /* Disconnect handler */
2861 if (NULL == sub->cadet_port)
2863 LOG (GNUNET_ERROR_TYPE_ERROR,
2864 "Cadet port `%s' is already in use.\n",
2865 GNUNET_APPLICATION_PORT_RPS);
2869 /* Set up general data structure to keep track about peers */
2870 sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2872 GNUNET_CONFIGURATION_get_value_filename (cfg,
2874 "FILENAME_VALID_PEERS",
2875 &sub->filename_valid_peers))
2877 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2879 "FILENAME_VALID_PEERS");
2881 if (0 != strncmp ("DISABLE", sub->filename_valid_peers, 7))
2883 char *tmp_filename_valid_peers;
2886 GNUNET_snprintf (str_hash,
2888 GNUNET_h2s_full (hash));
2889 tmp_filename_valid_peers = sub->filename_valid_peers;
2890 GNUNET_asprintf (&sub->filename_valid_peers,
2892 tmp_filename_valid_peers,
2894 GNUNET_free (tmp_filename_valid_peers);
2896 sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
2898 /* Set up the sampler */
2899 sub->sampler_size_est_min = sampler_size;
2900 sub->sampler_size_est_need = sampler_size;;
2901 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
2902 GNUNET_assert (0 != round_interval.rel_value_us);
2903 sub->round_interval = round_interval;
2904 sub->sampler = RPS_sampler_init (sampler_size,
2907 /* Logging of internals */
2908 sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
2910 sub->file_name_observed_log = store_prefix_file_name (&own_identity,
2912 sub->num_observed_peers = 0;
2913 sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
2915 #endif /* TO_FILE */
2917 /* Set up data structures for gossip */
2918 sub->push_map = CustomPeerMap_create (4);
2919 sub->pull_map = CustomPeerMap_create (4);
2920 sub->view_size_est_min = sampler_size;;
2921 sub->view = View_create (sub->view_size_est_min);
2924 GNUNET_STATISTICS_set (stats,
2926 sub->view_size_est_min,
2930 /* Start executing rounds */
2931 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
2939 * @brief Write all numbers in the given array into the given file
2941 * Single numbers devided by a newline
2943 * @param hist_array[] the array to dump
2944 * @param file_name file to dump into
2947 write_histogram_to_file (const uint32_t hist_array[],
2948 const char *file_name)
2950 char collect_str[SIZE_DUMP_FILE + 1] = "";
2951 char *recv_str_iter;
2952 char *file_name_full;
2954 recv_str_iter = collect_str;
2955 file_name_full = store_prefix_file_name (&own_identity,
2957 for (uint32_t i = 0; i < HISTOGRAM_FILE_SLOTS; i++)
2959 char collect_str_tmp[8];
2961 GNUNET_snprintf (collect_str_tmp,
2962 sizeof (collect_str_tmp),
2965 recv_str_iter = stpncpy (recv_str_iter,
2969 (void) stpcpy (recv_str_iter,
2971 LOG (GNUNET_ERROR_TYPE_DEBUG,
2972 "Writing push stats to disk\n");
2973 to_file_w_len (file_name_full,
2976 GNUNET_free (file_name_full);
2978 #endif /* TO_FILE */
2982 * @brief Destroy Sub.
2984 * @param sub Sub to destroy
2987 destroy_sub (struct Sub *sub)
2989 GNUNET_assert (NULL != sub);
2990 GNUNET_assert (NULL != sub->do_round_task);
2991 GNUNET_SCHEDULER_cancel (sub->do_round_task);
2992 sub->do_round_task = NULL;
2994 /* Disconnect from cadet */
2995 GNUNET_CADET_close_port (sub->cadet_port);
2997 /* Clean up data structures for peers */
2998 RPS_sampler_destroy (sub->sampler);
2999 sub->sampler = NULL;
3000 View_destroy (sub->view);
3002 CustomPeerMap_destroy (sub->push_map);
3003 sub->push_map = NULL;
3004 CustomPeerMap_destroy (sub->pull_map);
3005 sub->pull_map = NULL;
3006 peers_terminate (sub);
3008 /* Free leftover data structures */
3009 GNUNET_free (sub->file_name_view_log);
3010 sub->file_name_view_log = NULL;
3012 GNUNET_free (sub->file_name_observed_log);
3013 sub->file_name_observed_log = NULL;
3015 /* Write push frequencies to disk */
3016 write_histogram_to_file (sub->push_recv,
3019 /* Write push deltas to disk */
3020 write_histogram_to_file (sub->push_delta,
3023 /* Write pull delays to disk */
3024 write_histogram_to_file (sub->pull_delays,
3027 GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
3028 sub->observed_unique_peers = NULL;
3029 #endif /* TO_FILE */
3035 /***********************************************************************
3037 ***********************************************************************/
3040 /***********************************************************************
3042 ***********************************************************************/
3045 * @brief Callback on initialisation of Core.
3047 * @param cls - unused
3048 * @param my_identity - unused
3051 core_init (void *cls,
3052 const struct GNUNET_PeerIdentity *my_identity)
3057 map_single_hop = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
3062 * @brief Callback for core.
3063 * Method called whenever a given peer connects.
3065 * @param cls closure - unused
3066 * @param peer peer identity this notification is about
3067 * @return closure given to #core_disconnects as peer_cls
3070 core_connects (void *cls,
3071 const struct GNUNET_PeerIdentity *peer,
3072 struct GNUNET_MQ_Handle *mq)
3077 GNUNET_assert (GNUNET_YES ==
3078 GNUNET_CONTAINER_multipeermap_put (map_single_hop,
3081 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
3087 * @brief Callback for core.
3088 * Method called whenever a peer disconnects.
3090 * @param cls closure - unused
3091 * @param peer peer identity this notification is about
3092 * @param peer_cls closure given in #core_connects - unused
3095 core_disconnects (void *cls,
3096 const struct GNUNET_PeerIdentity *peer,
3102 GNUNET_CONTAINER_multipeermap_remove_all (map_single_hop, peer);
3105 /***********************************************************************
3107 ***********************************************************************/
3111 * @brief Destroy the context for a (connected) client
3113 * @param cli_ctx Context to destroy
3116 destroy_cli_ctx (struct ClientContext *cli_ctx)
3118 GNUNET_assert (NULL != cli_ctx);
3119 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
3122 if (NULL != cli_ctx->sub)
3124 destroy_sub (cli_ctx->sub);
3125 cli_ctx->sub = NULL;
3127 GNUNET_free (cli_ctx);
3132 * @brief Update sizes in sampler and view on estimate update from nse service
3135 * @param logestimate the log(Base 2) value of the current network size estimate
3136 * @param std_dev standard deviation for the estimate
3139 adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
3142 //double scale; // TODO this might go gloabal/config
3144 LOG (GNUNET_ERROR_TYPE_DEBUG,
3145 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
3146 logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
3148 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
3149 // GNUNET_NSE_log_estimate_to_n (logestimate);
3150 estimate = pow (estimate, 1.0 / 3);
3151 // TODO add if std_dev is a number
3152 // estimate += (std_dev * scale);
3153 if (sub->view_size_est_min < ceil (estimate))
3155 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
3156 sub->sampler_size_est_need = estimate;
3157 sub->view_size_est_need = estimate;
3160 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
3161 //sub->sampler_size_est_need = sub->view_size_est_min;
3162 sub->view_size_est_need = sub->view_size_est_min;
3166 GNUNET_STATISTICS_set (stats,
3168 sub->view_size_est_need,
3172 /* If the NSE has changed adapt the lists accordingly */
3173 resize_wrapper (sub->sampler, sub->sampler_size_est_need);
3174 View_change_len (sub->view, sub->view_size_est_need);
3179 * Function called by NSE.
3181 * Updates sizes of sampler list and view and adapt those lists
3184 * implements #GNUNET_NSE_Callback
3186 * @param cls Closure - unused
3187 * @param timestamp time when the estimate was received from the server (or created by the server)
3188 * @param logestimate the log(Base 2) value of the current network size estimate
3189 * @param std_dev standard deviation for the estimate
3192 nse_callback (void *cls,
3193 struct GNUNET_TIME_Absolute timestamp,
3194 double logestimate, double std_dev)
3198 struct ClientContext *cli_ctx_iter;
3200 adapt_sizes (msub, logestimate, std_dev);
3201 for (cli_ctx_iter = cli_ctx_head;
3202 NULL != cli_ctx_iter;
3203 cli_ctx_iter = cli_ctx_iter->next)
3205 if (NULL != cli_ctx_iter->sub)
3207 adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
3214 * @brief This function is called, when the client seeds peers.
3215 * It verifies that @a msg is well-formed.
3217 * @param cls the closure (#ClientContext)
3218 * @param msg the message
3219 * @return #GNUNET_OK if @a msg is well-formed
3220 * #GNUNET_SYSERR otherwise
3223 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
3225 struct ClientContext *cli_ctx = cls;
3226 uint16_t msize = ntohs (msg->header.size);
3227 uint32_t num_peers = ntohl (msg->num_peers);
3229 msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
3230 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3231 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3233 LOG (GNUNET_ERROR_TYPE_ERROR,
3234 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3235 ntohl (msg->num_peers),
3236 (msize / sizeof (struct GNUNET_PeerIdentity)));
3238 GNUNET_SERVICE_client_drop (cli_ctx->client);
3239 return GNUNET_SYSERR;
3246 * Handle seed from the client.
3248 * @param cls closure
3249 * @param message the actual message
3252 handle_client_seed (void *cls,
3253 const struct GNUNET_RPS_CS_SeedMessage *msg)
3255 struct ClientContext *cli_ctx = cls;
3256 struct GNUNET_PeerIdentity *peers;
3260 num_peers = ntohl (msg->num_peers);
3261 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3263 LOG (GNUNET_ERROR_TYPE_DEBUG,
3264 "Client seeded peers:\n");
3265 print_peer_list (peers, num_peers);
3267 for (i = 0; i < num_peers; i++)
3269 LOG (GNUNET_ERROR_TYPE_DEBUG,
3270 "Updating samplers with seed %" PRIu32 ": %s\n",
3272 GNUNET_i2s (&peers[i]));
3274 if (NULL != msub) got_peer (msub, &peers[i]); /* Condition needed? */
3275 if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
3277 GNUNET_SERVICE_client_continue (cli_ctx->client);
3282 * Handle RPS request from the client.
3284 * @param cls Client context
3285 * @param message Message containing the numer of updates the client wants to
3289 handle_client_view_request (void *cls,
3290 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
3292 struct ClientContext *cli_ctx = cls;
3293 uint64_t num_updates;
3295 num_updates = ntohl (msg->num_updates);
3297 LOG (GNUNET_ERROR_TYPE_DEBUG,
3298 "Client requested %" PRIu64 " updates of view.\n",
3301 GNUNET_assert (NULL != cli_ctx);
3302 cli_ctx->view_updates_left = num_updates;
3303 send_view (cli_ctx, NULL, 0);
3304 GNUNET_SERVICE_client_continue (cli_ctx->client);
3309 * @brief Handle the cancellation of the view updates.
3311 * @param cls The client context
3315 handle_client_view_cancel (void *cls,
3316 const struct GNUNET_MessageHeader *msg)
3318 struct ClientContext *cli_ctx = cls;
3321 LOG (GNUNET_ERROR_TYPE_DEBUG,
3322 "Client does not want to receive updates of view any more.\n");
3324 GNUNET_assert (NULL != cli_ctx);
3325 cli_ctx->view_updates_left = 0;
3326 GNUNET_SERVICE_client_continue (cli_ctx->client);
3327 if (GNUNET_YES == cli_ctx->stream_update)
3329 destroy_cli_ctx (cli_ctx);
3335 * Handle RPS request for biased stream from the client.
3337 * @param cls Client context
3338 * @param message unused
3341 handle_client_stream_request (void *cls,
3342 const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
3344 struct ClientContext *cli_ctx = cls;
3347 LOG (GNUNET_ERROR_TYPE_DEBUG,
3348 "Client requested peers from biased stream.\n");
3349 cli_ctx->stream_update = GNUNET_YES;
3351 GNUNET_assert (NULL != cli_ctx);
3352 GNUNET_SERVICE_client_continue (cli_ctx->client);
3357 * @brief Handles the cancellation of the stream of biased peer ids
3359 * @param cls The client context
3363 handle_client_stream_cancel (void *cls,
3364 const struct GNUNET_MessageHeader *msg)
3366 struct ClientContext *cli_ctx = cls;
3369 LOG (GNUNET_ERROR_TYPE_DEBUG,
3370 "Client canceled receiving peers from biased stream.\n");
3371 cli_ctx->stream_update = GNUNET_NO;
3373 GNUNET_assert (NULL != cli_ctx);
3374 GNUNET_SERVICE_client_continue (cli_ctx->client);
3379 * @brief Create and start a Sub.
3381 * @param cls Closure - unused
3382 * @param msg Message containing the necessary information
3385 handle_client_start_sub (void *cls,
3386 const struct GNUNET_RPS_CS_SubStartMessage *msg)
3388 struct ClientContext *cli_ctx = cls;
3390 LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
3391 if (NULL != cli_ctx->sub &&
3392 0 != memcmp (&cli_ctx->sub->hash,
3394 sizeof (struct GNUNET_HashCode)))
3396 LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share for this client. Remove old one, add new.\n");
3397 destroy_sub (cli_ctx->sub);
3398 cli_ctx->sub = NULL;
3400 cli_ctx->sub = new_sub (&msg->hash,
3401 msub->sampler_size_est_min, // TODO make api input?
3402 GNUNET_TIME_relative_ntoh (msg->round_interval));
3403 GNUNET_SERVICE_client_continue (cli_ctx->client);
3408 * @brief Destroy the Sub
3410 * @param cls Closure - unused
3411 * @param msg Message containing the hash that identifies the Sub
3414 handle_client_stop_sub (void *cls,
3415 const struct GNUNET_RPS_CS_SubStopMessage *msg)
3417 struct ClientContext *cli_ctx = cls;
3419 GNUNET_assert (NULL != cli_ctx->sub);
3420 if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct GNUNET_HashCode)))
3422 LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request differ!\n");
3424 destroy_sub (cli_ctx->sub);
3425 cli_ctx->sub = NULL;
3426 GNUNET_SERVICE_client_continue (cli_ctx->client);
3431 * Handle a CHECK_LIVE message from another peer.
3433 * This does nothing. But without calling #GNUNET_CADET_receive_done()
3434 * the channel is blocked for all other communication.
3436 * @param cls Closure - Context of channel
3437 * @param msg Message - unused
3440 handle_peer_check (void *cls,
3441 const struct GNUNET_MessageHeader *msg)
3443 const struct ChannelCtx *channel_ctx = cls;
3444 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3447 LOG (GNUNET_ERROR_TYPE_DEBUG,
3448 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
3449 if (channel_ctx->peer_ctx->sub == msub)
3451 GNUNET_STATISTICS_update (stats,
3452 "# pending online checks",
3457 GNUNET_CADET_receive_done (channel_ctx->channel);
3462 * Handle a PUSH message from another peer.
3464 * Check the proof of work and store the PeerID
3465 * in the temporary list for pushed PeerIDs.
3467 * @param cls Closure - Context of channel
3468 * @param msg Message - unused
3471 handle_peer_push (void *cls,
3472 const struct GNUNET_MessageHeader *msg)
3474 const struct ChannelCtx *channel_ctx = cls;
3475 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
3478 // (check the proof of work (?))
3480 LOG (GNUNET_ERROR_TYPE_DEBUG,
3481 "Received PUSH (%s)\n",
3483 if (channel_ctx->peer_ctx->sub == msub)
3485 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
3488 #if ENABLE_MALICIOUS
3489 struct AttackedPeer *tmp_att_peer;
3491 if ( (1 == mal_type) ||
3493 { /* Try to maximise representation */
3494 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3495 tmp_att_peer->peer_id = *peer;
3496 if (NULL == att_peer_set)
3497 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3498 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3501 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3504 add_peer_array_to_set (peer, 1, att_peer_set);
3508 GNUNET_free (tmp_att_peer);
3513 else if (2 == mal_type)
3515 /* We attack one single well-known peer - simply ignore */
3517 #endif /* ENABLE_MALICIOUS */
3519 /* Add the sending peer to the push_map */
3520 CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
3522 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3523 &channel_ctx->peer_ctx->peer_id));
3524 GNUNET_CADET_receive_done (channel_ctx->channel);
3529 * Handle PULL REQUEST request message from another peer.
3531 * Reply with the view of PeerIDs.
3533 * @param cls Closure - Context of channel
3534 * @param msg Message - unused
3537 handle_peer_pull_request (void *cls,
3538 const struct GNUNET_MessageHeader *msg)
3540 const struct ChannelCtx *channel_ctx = cls;
3541 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
3542 const struct GNUNET_PeerIdentity *peer = &peer_ctx->peer_id;
3543 const struct GNUNET_PeerIdentity *view_array;
3546 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
3547 if (peer_ctx->sub == msub)
3549 GNUNET_STATISTICS_update(stats,
3550 "# pull request message received",
3553 if (NULL != map_single_hop &&
3554 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3555 &peer_ctx->peer_id))
3557 GNUNET_STATISTICS_update (stats,
3558 "# pull request message received (multi-hop peer)",
3564 #if ENABLE_MALICIOUS
3567 { /* Try to maximise representation */
3568 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3571 else if (2 == mal_type)
3572 { /* Try to partition network */
3573 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
3575 send_pull_reply (peer_ctx, mal_peers, num_mal_peers);
3578 #endif /* ENABLE_MALICIOUS */
3580 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3581 &channel_ctx->peer_ctx->peer_id));
3582 GNUNET_CADET_receive_done (channel_ctx->channel);
3583 view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
3584 send_pull_reply (peer_ctx,
3586 View_size (channel_ctx->peer_ctx->sub->view));
3591 * Check whether we sent a corresponding request and
3592 * whether this reply is the first one.
3594 * @param cls Closure - Context of channel
3595 * @param msg Message containing the replied peers
3598 check_peer_pull_reply (void *cls,
3599 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3601 struct ChannelCtx *channel_ctx = cls;
3602 struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
3604 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
3606 GNUNET_break_op (0);
3607 return GNUNET_SYSERR;
3610 if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3611 sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
3613 LOG (GNUNET_ERROR_TYPE_ERROR,
3614 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3615 ntohl (msg->num_peers),
3616 (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
3617 sizeof (struct GNUNET_PeerIdentity));
3618 GNUNET_break_op (0);
3619 return GNUNET_SYSERR;
3622 if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
3623 &sender_ctx->peer_id,
3624 Peers_PULL_REPLY_PENDING))
3626 LOG (GNUNET_ERROR_TYPE_WARNING,
3627 "Received a pull reply from a peer (%s) we didn't request one from!\n",
3628 GNUNET_i2s (&sender_ctx->peer_id));
3629 if (sender_ctx->sub == msub)
3631 GNUNET_STATISTICS_update (stats,
3632 "# unrequested pull replies",
3642 * Handle PULL REPLY message from another peer.
3644 * @param cls Closure
3645 * @param msg The message header
3648 handle_peer_pull_reply (void *cls,
3649 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
3651 const struct ChannelCtx *channel_ctx = cls;
3652 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
3653 const struct GNUNET_PeerIdentity *peers;
3654 struct Sub *sub = channel_ctx->peer_ctx->sub;
3656 #if ENABLE_MALICIOUS
3657 struct AttackedPeer *tmp_att_peer;
3658 #endif /* ENABLE_MALICIOUS */
3660 sub->pull_delays[sub->num_rounds - channel_ctx->peer_ctx->round_pull_req]++;
3661 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
3662 if (channel_ctx->peer_ctx->sub == msub)
3664 GNUNET_STATISTICS_update (stats,
3665 "# pull reply messages received",
3668 if (NULL != map_single_hop &&
3669 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3670 &channel_ctx->peer_ctx->peer_id))
3672 GNUNET_STATISTICS_update (stats,
3673 "# pull reply messages received (multi-hop peer)",
3679 #if ENABLE_MALICIOUS
3680 // We shouldn't even receive pull replies as we're not sending
3684 #endif /* ENABLE_MALICIOUS */
3686 /* Do actual logic */
3687 peers = (const struct GNUNET_PeerIdentity *) &msg[1];
3689 LOG (GNUNET_ERROR_TYPE_DEBUG,
3690 "PULL REPLY received, got following %u peers:\n",
3691 ntohl (msg->num_peers));
3693 for (i = 0; i < ntohl (msg->num_peers); i++)
3695 LOG (GNUNET_ERROR_TYPE_DEBUG,
3698 GNUNET_i2s (&peers[i]));
3700 #if ENABLE_MALICIOUS
3701 if ((NULL != att_peer_set) &&
3702 (1 == mal_type || 3 == mal_type))
3703 { /* Add attacked peer to local list */
3704 // TODO check if we sent a request and this was the first reply
3705 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3707 && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3710 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3711 tmp_att_peer->peer_id = peers[i];
3712 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3715 add_peer_array_to_set (&peers[i], 1, att_peer_set);
3719 #endif /* ENABLE_MALICIOUS */
3720 /* Make sure we 'know' about this peer */
3721 (void) insert_peer (channel_ctx->peer_ctx->sub,
3724 if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->sub->valid_peers,
3727 CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map,
3732 schedule_operation (channel_ctx->peer_ctx,
3734 channel_ctx->peer_ctx->sub); /* cls */
3735 (void) issue_peer_online_check (channel_ctx->peer_ctx->sub,
3740 UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map,
3742 Peers_PULL_REPLY_PENDING);
3743 clean_peer (channel_ctx->peer_ctx->sub,
3746 GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
3748 GNUNET_CADET_receive_done (channel_ctx->channel);
3753 * Compute a random delay.
3754 * A uniformly distributed value between mean + spread and mean - spread.
3756 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3757 * It would return a random value between 2 and 6 min.
3759 * @param mean the mean time until the next round
3760 * @param spread the inverse amount of deviation from the mean
3762 static struct GNUNET_TIME_Relative
3763 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3764 unsigned int spread)
3766 struct GNUNET_TIME_Relative half_interval;
3767 struct GNUNET_TIME_Relative ret;
3768 unsigned int rand_delay;
3769 unsigned int max_rand_delay;
3773 LOG (GNUNET_ERROR_TYPE_WARNING,
3774 "Not accepting spread of 0\n");
3778 GNUNET_assert (0 != mean.rel_value_us);
3780 /* Compute random time value between spread * mean and spread * mean */
3781 half_interval = GNUNET_TIME_relative_divide (mean, spread);
3783 max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3785 * Compute random value between (0 and 1) * round_interval
3786 * via multiplying round_interval with a 'fraction' (0 to value)/value
3788 rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3789 ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3790 ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3791 ret = GNUNET_TIME_relative_add (ret, half_interval);
3793 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3794 LOG (GNUNET_ERROR_TYPE_WARNING,
3795 "Returning FOREVER_REL\n");
3802 * Send single pull request
3804 * @param peer_ctx Context to the peer to send request to
3807 send_pull_request (struct PeerContext *peer_ctx)
3809 struct GNUNET_MQ_Envelope *ev;
3811 GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
3813 Peers_PULL_REPLY_PENDING));
3814 SET_PEER_FLAG (peer_ctx,
3815 Peers_PULL_REPLY_PENDING);
3816 peer_ctx->round_pull_req = peer_ctx->sub->num_rounds;
3818 LOG (GNUNET_ERROR_TYPE_DEBUG,
3819 "Going to send PULL REQUEST to peer %s.\n",
3820 GNUNET_i2s (&peer_ctx->peer_id));
3822 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3823 send_message (peer_ctx,
3828 GNUNET_STATISTICS_update (stats,
3829 "# pull request send issued",
3832 if (NULL != map_single_hop &&
3833 GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (map_single_hop,
3834 &peer_ctx->peer_id))
3836 GNUNET_STATISTICS_update (stats,
3837 "# pull request send issued (multi-hop peer)",
3848 * @param peer_ctx Context of peer to send push to
3851 send_push (struct PeerContext *peer_ctx)
3853 struct GNUNET_MQ_Envelope *ev;
3855 LOG (GNUNET_ERROR_TYPE_DEBUG,
3856 "Going to send PUSH to peer %s.\n",
3857 GNUNET_i2s (&peer_ctx->peer_id));
3859 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3860 send_message (peer_ctx, ev, "PUSH");
3863 GNUNET_STATISTICS_update (stats,
3864 "# push send issued",
3871 #if ENABLE_MALICIOUS
3875 * @brief This function is called, when the client tells us to act malicious.
3876 * It verifies that @a msg is well-formed.
3878 * @param cls the closure (#ClientContext)
3879 * @param msg the message
3880 * @return #GNUNET_OK if @a msg is well-formed
3883 check_client_act_malicious (void *cls,
3884 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3886 struct ClientContext *cli_ctx = cls;
3887 uint16_t msize = ntohs (msg->header.size);
3888 uint32_t num_peers = ntohl (msg->num_peers);
3890 msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3891 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3892 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3894 LOG (GNUNET_ERROR_TYPE_ERROR,
3895 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3896 ntohl (msg->num_peers),
3897 (msize / sizeof (struct GNUNET_PeerIdentity)));
3899 GNUNET_SERVICE_client_drop (cli_ctx->client);
3900 return GNUNET_SYSERR;
3906 * Turn RPS service to act malicious.
3908 * @param cls Closure
3909 * @param client The client that sent the message
3910 * @param msg The message header
3913 handle_client_act_malicious (void *cls,
3914 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3916 struct ClientContext *cli_ctx = cls;
3917 struct GNUNET_PeerIdentity *peers;
3918 uint32_t num_mal_peers_sent;
3919 uint32_t num_mal_peers_old;
3920 struct Sub *sub = cli_ctx->sub;
3922 if (NULL == sub) sub = msub;
3923 /* Do actual logic */
3924 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3925 mal_type = ntohl (msg->type);
3926 if (NULL == mal_peer_set)
3927 mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3929 LOG (GNUNET_ERROR_TYPE_DEBUG,
3930 "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3932 ntohl (msg->num_peers));
3935 { /* Try to maximise representation */
3936 /* Add other malicious peers to those we already know */
3938 num_mal_peers_sent = ntohl (msg->num_peers);
3939 num_mal_peers_old = num_mal_peers;
3940 GNUNET_array_grow (mal_peers,
3942 num_mal_peers + num_mal_peers_sent);
3943 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3945 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3947 /* Add all mal peers to mal_peer_set */
3948 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3952 /* Substitute do_round () with do_mal_round () */
3953 GNUNET_assert (NULL != sub->do_round_task);
3954 GNUNET_SCHEDULER_cancel (sub->do_round_task);
3955 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
3958 else if ( (2 == mal_type) ||
3960 { /* Try to partition the network */
3961 /* Add other malicious peers to those we already know */
3963 num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3964 num_mal_peers_old = num_mal_peers;
3965 GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3966 GNUNET_array_grow (mal_peers,
3968 num_mal_peers + num_mal_peers_sent);
3969 if (NULL != mal_peers &&
3972 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3974 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3976 /* Add all mal peers to mal_peer_set */
3977 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3982 /* Store the one attacked peer */
3983 GNUNET_memcpy (&attacked_peer,
3984 &msg->attacked_peer,
3985 sizeof (struct GNUNET_PeerIdentity));
3986 /* Set the flag of the attacked peer to valid to avoid problems */
3987 if (GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
3989 (void) issue_peer_online_check (sub, &attacked_peer);
3992 LOG (GNUNET_ERROR_TYPE_DEBUG,
3993 "Attacked peer is %s\n",
3994 GNUNET_i2s (&attacked_peer));
3996 /* Substitute do_round () with do_mal_round () */
3997 if (NULL != sub->do_round_task)
3999 /* Probably in shutdown */
4000 GNUNET_SCHEDULER_cancel (sub->do_round_task);
4001 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
4004 else if (0 == mal_type)
4005 { /* Stop acting malicious */
4006 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
4008 /* Substitute do_mal_round () with do_round () */
4009 GNUNET_SCHEDULER_cancel (sub->do_round_task);
4010 sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
4015 GNUNET_SERVICE_client_continue (cli_ctx->client);
4017 GNUNET_SERVICE_client_continue (cli_ctx->client);
4022 * Send out PUSHes and PULLs maliciously.
4024 * This is executed regylary.
4026 * @param cls Closure - Sub
4029 do_mal_round (void *cls)
4031 uint32_t num_pushes;
4033 struct GNUNET_TIME_Relative time_next_round;
4034 struct AttackedPeer *tmp_att_peer;
4035 struct Sub *sub = cls;
4037 LOG (GNUNET_ERROR_TYPE_DEBUG,
4038 "Going to execute next round maliciously type %" PRIu32 ".\n",
4040 sub->do_round_task = NULL;
4041 GNUNET_assert (mal_type <= 3);
4042 /* Do malicious actions */
4044 { /* Try to maximise representation */
4046 /* The maximum of pushes we're going to send this round */
4047 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
4048 num_attacked_peers),
4049 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4051 LOG (GNUNET_ERROR_TYPE_DEBUG,
4052 "Going to send %" PRIu32 " pushes\n",
4055 /* Send PUSHes to attacked peers */
4056 for (i = 0 ; i < num_pushes ; i++)
4058 if (att_peers_tail == att_peer_index)
4059 att_peer_index = att_peers_head;
4061 att_peer_index = att_peer_index->next;
4063 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4066 /* Send PULLs to some peers to learn about additional peers to attack */
4067 tmp_att_peer = att_peer_index;
4068 for (i = 0 ; i < num_pushes * alpha ; i++)
4070 if (att_peers_tail == tmp_att_peer)
4071 tmp_att_peer = att_peers_head;
4073 att_peer_index = tmp_att_peer->next;
4075 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4080 else if (2 == mal_type)
4082 * Try to partition the network
4083 * Send as many pushes to the attacked peer as possible
4084 * That is one push per round as it will ignore more.
4086 (void) issue_peer_online_check (sub, &attacked_peer);
4087 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4090 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4095 { /* Combined attack */
4097 /* Send PUSH to attacked peers */
4098 if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
4100 (void) issue_peer_online_check (sub, &attacked_peer);
4101 if (GNUNET_YES == check_peer_flag (sub->peer_map,
4105 LOG (GNUNET_ERROR_TYPE_DEBUG,
4106 "Goding to send push to attacked peer (%s)\n",
4107 GNUNET_i2s (&attacked_peer));
4108 send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
4111 (void) issue_peer_online_check (sub, &attacked_peer);
4113 /* The maximum of pushes we're going to send this round */
4114 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
4115 num_attacked_peers),
4116 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
4118 LOG (GNUNET_ERROR_TYPE_DEBUG,
4119 "Going to send %" PRIu32 " pushes\n",
4122 for (i = 0; i < num_pushes; i++)
4124 if (att_peers_tail == att_peer_index)
4125 att_peer_index = att_peers_head;
4127 att_peer_index = att_peer_index->next;
4129 send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
4132 /* Send PULLs to some peers to learn about additional peers to attack */
4133 tmp_att_peer = att_peer_index;
4134 for (i = 0; i < num_pushes * alpha; i++)
4136 if (att_peers_tail == tmp_att_peer)
4137 tmp_att_peer = att_peers_head;
4139 att_peer_index = tmp_att_peer->next;
4141 send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
4145 /* Schedule next round */
4146 time_next_round = compute_rand_delay (sub->round_interval, 2);
4148 GNUNET_assert (NULL == sub->do_round_task);
4149 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4150 &do_mal_round, sub);
4151 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4153 #endif /* ENABLE_MALICIOUS */
4157 * Send out PUSHes and PULLs, possibly update #view, samplers.
4159 * This is executed regylary.
4161 * @param cls Closure - Sub
4164 do_round (void *cls)
4167 const struct GNUNET_PeerIdentity *view_array;
4168 unsigned int *permut;
4169 unsigned int a_peers; /* Number of peers we send pushes to */
4170 unsigned int b_peers; /* Number of peers we send pull requests to */
4171 uint32_t first_border;
4172 uint32_t second_border;
4173 struct GNUNET_PeerIdentity peer;
4174 struct GNUNET_PeerIdentity *update_peer;
4175 struct Sub *sub = cls;
4178 LOG (GNUNET_ERROR_TYPE_DEBUG,
4179 "Going to execute next round.\n");
4182 GNUNET_STATISTICS_update (stats, "# rounds", 1, GNUNET_NO);
4184 sub->do_round_task = NULL;
4185 LOG (GNUNET_ERROR_TYPE_DEBUG,
4186 "Printing view:\n");
4187 to_file (sub->file_name_view_log,
4188 "___ new round ___");
4189 view_array = View_get_as_array (sub->view);
4190 for (i = 0; i < View_size (sub->view); i++)
4192 LOG (GNUNET_ERROR_TYPE_DEBUG,
4193 "\t%s\n", GNUNET_i2s (&view_array[i]));
4194 to_file (sub->file_name_view_log,
4196 GNUNET_i2s_full (&view_array[i]));
4200 /* Send pushes and pull requests */
4201 if (0 < View_size (sub->view))
4203 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4204 View_size (sub->view));
4207 a_peers = ceil (alpha * View_size (sub->view));
4209 LOG (GNUNET_ERROR_TYPE_DEBUG,
4210 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
4211 a_peers, alpha, View_size (sub->view));
4212 for (i = 0; i < a_peers; i++)
4214 peer = view_array[permut[i]];
4215 // FIXME if this fails schedule/loop this for later
4216 send_push (get_peer_ctx (sub->peer_map, &peer));
4219 /* Send PULL requests */
4220 b_peers = ceil (beta * View_size (sub->view));
4221 first_border = a_peers;
4222 second_border = a_peers + b_peers;
4223 if (second_border > View_size (sub->view))
4225 first_border = View_size (sub->view) - b_peers;
4226 second_border = View_size (sub->view);
4228 LOG (GNUNET_ERROR_TYPE_DEBUG,
4229 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
4230 b_peers, beta, View_size (sub->view));
4231 for (i = first_border; i < second_border; i++)
4233 peer = view_array[permut[i]];
4234 if ( GNUNET_NO == check_peer_flag (sub->peer_map,
4236 Peers_PULL_REPLY_PENDING))
4237 { // FIXME if this fails schedule/loop this for later
4238 send_pull_request (get_peer_ctx (sub->peer_map, &peer));
4242 GNUNET_free (permut);
4248 /* TODO see how many peers are in push-/pull- list! */
4250 if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) &&
4251 (0 < CustomPeerMap_size (sub->push_map)) &&
4252 (0 < CustomPeerMap_size (sub->pull_map)))
4253 { /* If conditions for update are fulfilled, update */
4254 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
4256 uint32_t final_size;
4257 uint32_t peers_to_clean_size;
4258 struct GNUNET_PeerIdentity *peers_to_clean;
4260 peers_to_clean = NULL;
4261 peers_to_clean_size = 0;
4262 GNUNET_array_grow (peers_to_clean,
4263 peers_to_clean_size,
4264 View_size (sub->view));
4265 GNUNET_memcpy (peers_to_clean,
4267 View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
4269 /* Seems like recreating is the easiest way of emptying the peermap */
4270 View_clear (sub->view);
4271 to_file (sub->file_name_view_log,
4274 first_border = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
4275 CustomPeerMap_size (sub->push_map));
4276 second_border = first_border +
4277 GNUNET_MIN (floor (beta * sub->view_size_est_need),
4278 CustomPeerMap_size (sub->pull_map));
4279 final_size = second_border +
4280 ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
4281 LOG (GNUNET_ERROR_TYPE_DEBUG,
4282 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
4287 /* Update view with peers received through PUSHes */
4288 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4289 CustomPeerMap_size (sub->push_map));
4290 for (i = 0; i < first_border; i++)
4293 inserted = insert_in_view (sub,
4294 CustomPeerMap_get_peer_by_index (sub->push_map,
4296 if (GNUNET_OK == inserted)
4298 clients_notify_stream_peer (sub,
4300 CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
4302 to_file (sub->file_name_view_log,
4304 GNUNET_i2s_full (&view_array[i]));
4305 // TODO change the peer_flags accordingly
4307 GNUNET_free (permut);
4310 /* Update view with peers received through PULLs */
4311 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
4312 CustomPeerMap_size (sub->pull_map));
4313 for (i = first_border; i < second_border; i++)
4316 inserted = insert_in_view (sub,
4317 CustomPeerMap_get_peer_by_index (sub->pull_map,
4318 permut[i - first_border]));
4319 if (GNUNET_OK == inserted)
4321 clients_notify_stream_peer (sub,
4323 CustomPeerMap_get_peer_by_index (sub->pull_map,
4324 permut[i - first_border]));
4326 to_file (sub->file_name_view_log,
4328 GNUNET_i2s_full (&view_array[i]));
4329 // TODO change the peer_flags accordingly
4331 GNUNET_free (permut);
4334 /* Update view with peers from history */
4335 RPS_sampler_get_n_rand_peers (sub->sampler,
4336 final_size - second_border,
4339 // TODO change the peer_flags accordingly
4341 for (i = 0; i < View_size (sub->view); i++)
4342 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
4344 /* Clean peers that were removed from the view */
4345 for (i = 0; i < peers_to_clean_size; i++)
4347 to_file (sub->file_name_view_log,
4349 GNUNET_i2s_full (&peers_to_clean[i]));
4350 clean_peer (sub, &peers_to_clean[i]);
4353 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
4354 clients_notify_view_update (sub);
4356 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
4359 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
4360 if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4361 !(0 >= CustomPeerMap_size (sub->pull_map)))
4362 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
4363 if (CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4364 (0 >= CustomPeerMap_size (sub->pull_map)))
4365 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
4366 if (0 >= CustomPeerMap_size (sub->push_map) &&
4367 !(0 >= CustomPeerMap_size (sub->pull_map)))
4368 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
4369 if (0 >= CustomPeerMap_size (sub->push_map) &&
4370 (0 >= CustomPeerMap_size (sub->pull_map)))
4371 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
4372 if (0 >= CustomPeerMap_size (sub->pull_map) &&
4373 CustomPeerMap_size (sub->push_map) > alpha * sub->view_size_est_need &&
4374 0 >= CustomPeerMap_size (sub->push_map))
4375 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
4378 // TODO independent of that also get some peers from CADET_get_peers()?
4379 if (CustomPeerMap_size (sub->push_map) < HISTOGRAM_FILE_SLOTS)
4381 sub->push_recv[CustomPeerMap_size (sub->push_map)]++;
4385 LOG (GNUNET_ERROR_TYPE_WARNING,
4386 "Push map size too big for histogram (%u, %u)\n",
4387 CustomPeerMap_size (sub->push_map),
4388 HISTOGRAM_FILE_SLOTS);
4390 // FIXME check bounds of histogram
4391 sub->push_delta[(uint32_t) (CustomPeerMap_size (sub->push_map) -
4392 (alpha * sub->view_size_est_need)) +
4393 (HISTOGRAM_FILE_SLOTS/2)]++;
4396 GNUNET_STATISTICS_set (stats,
4397 "# peers in push map at end of round",
4398 CustomPeerMap_size (sub->push_map),
4400 GNUNET_STATISTICS_set (stats,
4401 "# peers in pull map at end of round",
4402 CustomPeerMap_size (sub->pull_map),
4404 GNUNET_STATISTICS_set (stats,
4405 "# peers in view at end of round",
4406 View_size (sub->view),
4408 GNUNET_STATISTICS_set (stats,
4409 "# expected pushes",
4410 alpha * sub->view_size_est_need,
4412 GNUNET_STATISTICS_set (stats,
4413 "delta expected - received pushes",
4414 CustomPeerMap_size (sub->push_map) - (alpha * sub->view_size_est_need),
4418 LOG (GNUNET_ERROR_TYPE_DEBUG,
4419 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (sub->view%u) = %.2f)\n",
4420 CustomPeerMap_size (sub->push_map),
4421 CustomPeerMap_size (sub->pull_map),
4423 View_size (sub->view),
4424 alpha * View_size (sub->view));
4426 /* Update samplers */
4427 for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
4429 update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
4430 LOG (GNUNET_ERROR_TYPE_DEBUG,
4431 "Updating with peer %s from push list\n",
4432 GNUNET_i2s (update_peer));
4433 insert_in_sampler (sub, update_peer);
4434 clean_peer (sub, update_peer); /* This cleans only if it is not in the view */
4437 for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
4439 LOG (GNUNET_ERROR_TYPE_DEBUG,
4440 "Updating with peer %s from pull list\n",
4441 GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
4442 insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4443 /* This cleans only if it is not in the view */
4444 clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
4448 /* Empty push/pull lists */
4449 CustomPeerMap_clear (sub->push_map);
4450 CustomPeerMap_clear (sub->pull_map);
4454 GNUNET_STATISTICS_set (stats,
4456 View_size(sub->view),
4460 struct GNUNET_TIME_Relative time_next_round;
4462 time_next_round = compute_rand_delay (sub->round_interval, 2);
4464 /* Schedule next round */
4465 sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
4467 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
4472 * This is called from GNUNET_CADET_get_peers().
4474 * It is called on every peer(ID) that cadet somehow has contact with.
4475 * We use those to initialise the sampler.
4477 * implements #GNUNET_CADET_PeersCB
4479 * @param cls Closure - Sub
4480 * @param peer Peer, or NULL on "EOF".
4481 * @param tunnel Do we have a tunnel towards this peer?
4482 * @param n_paths Number of known paths towards this peer.
4483 * @param best_path How long is the best path?
4484 * (0 = unknown, 1 = ourselves, 2 = neighbor)
4487 init_peer_cb (void *cls,
4488 const struct GNUNET_PeerIdentity *peer,
4489 int tunnel, /* "Do we have a tunnel towards this peer?" */
4490 unsigned int n_paths, /* "Number of known paths towards this peer" */
4491 unsigned int best_path) /* "How long is the best path?
4492 * (0 = unknown, 1 = ourselves, 2 = neighbor)" */
4494 struct Sub *sub = cls;
4501 LOG (GNUNET_ERROR_TYPE_DEBUG,
4502 "Got peer_id %s from cadet\n",
4504 got_peer (sub, peer);
4510 * @brief Iterator function over stored, valid peers.
4512 * We initialise the sampler with those.
4514 * @param cls Closure - Sub
4515 * @param peer the peer id
4516 * @return #GNUNET_YES if we should continue to
4518 * #GNUNET_NO if not.
4521 valid_peers_iterator (void *cls,
4522 const struct GNUNET_PeerIdentity *peer)
4524 struct Sub *sub = cls;
4528 LOG (GNUNET_ERROR_TYPE_DEBUG,
4529 "Got stored, valid peer %s\n",
4531 got_peer (sub, peer);
4538 * Iterator over peers from peerinfo.
4540 * @param cls Closure - Sub
4541 * @param peer id of the peer, NULL for last call
4542 * @param hello hello message for the peer (can be NULL)
4543 * @param error message
4546 process_peerinfo_peers (void *cls,
4547 const struct GNUNET_PeerIdentity *peer,
4548 const struct GNUNET_HELLO_Message *hello,
4549 const char *err_msg)
4551 struct Sub *sub = cls;
4557 LOG (GNUNET_ERROR_TYPE_DEBUG,
4558 "Got peer_id %s from peerinfo\n",
4560 got_peer (sub, peer);
4566 * Task run during shutdown.
4568 * @param cls Closure - unused
4571 shutdown_task (void *cls)
4574 struct ClientContext *client_ctx;
4576 LOG (GNUNET_ERROR_TYPE_DEBUG,
4577 "RPS service is going down\n");
4579 /* Clean all clients */
4580 for (client_ctx = cli_ctx_head;
4581 NULL != cli_ctx_head;
4582 client_ctx = cli_ctx_head)
4584 destroy_cli_ctx (client_ctx);
4592 /* Disconnect from other services */
4593 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
4594 GNUNET_PEERINFO_disconnect (peerinfo_handle);
4595 peerinfo_handle = NULL;
4596 GNUNET_NSE_disconnect (nse);
4597 if (NULL != map_single_hop)
4599 /* core_init was called - core was initialised */
4600 /* disconnect first, so no callback tries to access missing peermap */
4601 GNUNET_CORE_disconnect (core_handle);
4603 GNUNET_CONTAINER_multipeermap_destroy (map_single_hop);
4604 map_single_hop = NULL;
4609 GNUNET_STATISTICS_destroy (stats,
4613 GNUNET_CADET_disconnect (cadet_handle);
4614 cadet_handle = NULL;
4615 #if ENABLE_MALICIOUS
4616 struct AttackedPeer *tmp_att_peer;
4617 GNUNET_array_grow (mal_peers,
4620 if (NULL != mal_peer_set)
4621 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
4622 if (NULL != att_peer_set)
4623 GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
4624 while (NULL != att_peers_head)
4626 tmp_att_peer = att_peers_head;
4627 GNUNET_CONTAINER_DLL_remove (att_peers_head,
4630 GNUNET_free (tmp_att_peer);
4632 #endif /* ENABLE_MALICIOUS */
4638 * Handle client connecting to the service.
4641 * @param client the new client
4642 * @param mq the message queue of @a client
4646 client_connect_cb (void *cls,
4647 struct GNUNET_SERVICE_Client *client,
4648 struct GNUNET_MQ_Handle *mq)
4650 struct ClientContext *cli_ctx;
4653 LOG (GNUNET_ERROR_TYPE_DEBUG,
4654 "Client connected\n");
4656 return client; /* Server was destroyed before a client connected. Shutting down */
4657 cli_ctx = GNUNET_new (struct ClientContext);
4659 cli_ctx->view_updates_left = -1;
4660 cli_ctx->stream_update = GNUNET_NO;
4661 cli_ctx->client = client;
4662 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
4669 * Callback called when a client disconnected from the service
4671 * @param cls closure for the service
4672 * @param c the client that disconnected
4673 * @param internal_cls should be equal to @a c
4676 client_disconnect_cb (void *cls,
4677 struct GNUNET_SERVICE_Client *client,
4680 struct ClientContext *cli_ctx = internal_cls;
4683 GNUNET_assert (client == cli_ctx->client);
4685 {/* shutdown task - destroy all clients */
4686 while (NULL != cli_ctx_head)
4687 destroy_cli_ctx (cli_ctx_head);
4690 { /* destroy this client */
4691 LOG (GNUNET_ERROR_TYPE_DEBUG,
4692 "Client disconnected. Destroy its context.\n");
4693 destroy_cli_ctx (cli_ctx);
4699 * Handle random peer sampling clients.
4701 * @param cls closure
4702 * @param c configuration to use
4703 * @param service the initialized service
4707 const struct GNUNET_CONFIGURATION_Handle *c,
4708 struct GNUNET_SERVICE_Handle *service)
4710 struct GNUNET_TIME_Relative round_interval;
4711 long long unsigned int sampler_size;
4712 char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
4713 struct GNUNET_HashCode hash;
4718 GNUNET_log_setup ("rps",
4719 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
4723 GNUNET_CRYPTO_get_peer_identity (cfg,
4724 &own_identity); // TODO check return value
4725 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4726 "STARTING SERVICE (rps) for peer [%s]\n",
4727 GNUNET_i2s (&own_identity));
4728 #if ENABLE_MALICIOUS
4729 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
4730 "Malicious execution compiled in.\n");
4731 #endif /* ENABLE_MALICIOUS */
4733 /* Get time interval from the configuration */
4735 GNUNET_CONFIGURATION_get_value_time (cfg,
4740 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4741 "RPS", "ROUNDINTERVAL");
4742 GNUNET_SCHEDULER_shutdown ();
4746 /* Get initial size of sampler/view from the configuration */
4748 GNUNET_CONFIGURATION_get_value_number (cfg,
4753 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
4755 GNUNET_SCHEDULER_shutdown ();
4759 cadet_handle = GNUNET_CADET_connect (cfg);
4760 GNUNET_assert (NULL != cadet_handle);
4761 core_handle = GNUNET_CORE_connect (cfg,
4763 core_init, /* init */
4764 core_connects, /* connects */
4765 core_disconnects, /* disconnects */
4766 NULL); /* handlers */
4767 GNUNET_assert (NULL != core_handle);
4774 /* Set up main Sub */
4775 GNUNET_CRYPTO_hash (hash_port_string,
4776 strlen (hash_port_string),
4778 msub = new_sub (&hash,
4779 sampler_size, /* Will be overwritten by config */
4783 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4785 /* connect to NSE */
4786 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
4788 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4789 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
4790 // TODO send push/pull to each of those peers?
4791 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4792 restore_valid_peers (msub);
4793 get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
4795 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4797 process_peerinfo_peers,
4800 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4802 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4803 stats = GNUNET_STATISTICS_create ("rps", cfg);
4808 * Define "main" method using service macro.
4812 GNUNET_SERVICE_OPTION_NONE,
4815 &client_disconnect_cb,
4817 GNUNET_MQ_hd_var_size (client_seed,
4818 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4819 struct GNUNET_RPS_CS_SeedMessage,
4821 #if ENABLE_MALICIOUS
4822 GNUNET_MQ_hd_var_size (client_act_malicious,
4823 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4824 struct GNUNET_RPS_CS_ActMaliciousMessage,
4826 #endif /* ENABLE_MALICIOUS */
4827 GNUNET_MQ_hd_fixed_size (client_view_request,
4828 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4829 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4831 GNUNET_MQ_hd_fixed_size (client_view_cancel,
4832 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4833 struct GNUNET_MessageHeader,
4835 GNUNET_MQ_hd_fixed_size (client_stream_request,
4836 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4837 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4839 GNUNET_MQ_hd_fixed_size (client_stream_cancel,
4840 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
4841 struct GNUNET_MessageHeader,
4843 GNUNET_MQ_hd_fixed_size (client_start_sub,
4844 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
4845 struct GNUNET_RPS_CS_SubStartMessage,
4847 GNUNET_MQ_hd_fixed_size (client_stop_sub,
4848 GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
4849 struct GNUNET_RPS_CS_SubStopMessage,
4851 GNUNET_MQ_handler_end());
4853 /* end of gnunet-service-rps.c */