2 This file is part of GNUnet.
3 Copyright (C) 2013-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * @file rps/gnunet-service-rps.c
21 * @brief rps service implementation
22 * @author Julius Bünger
25 #include "gnunet_applications.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_cadet_service.h"
28 #include "gnunet_peerinfo_service.h"
29 #include "gnunet_nse_service.h"
30 #include "gnunet_statistics_service.h"
32 #include "rps-test_util.h"
33 #include "gnunet-service-rps_sampler.h"
34 #include "gnunet-service-rps_custommap.h"
35 #include "gnunet-service-rps_view.h"
40 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
42 // TODO modify @brief in every file
44 // TODO check for overflows
46 // TODO align message structs
48 // TODO connect to friends
50 // TODO blacklist? (-> mal peer detection on top of brahms)
52 // hist_size_init, hist_size_max
57 static const struct GNUNET_CONFIGURATION_Handle *cfg;
60 * Handle to the statistics service.
62 struct GNUNET_STATISTICS_Handle *stats;
67 static struct GNUNET_PeerIdentity own_identity;
69 static int in_shutdown = GNUNET_NO;
72 * @brief Port used for cadet.
74 * Don't compute multiple times through making it global
76 static struct GNUNET_HashCode port;
78 /***********************************************************************
79 * Old gnunet-service-rps_peers.c
80 ***********************************************************************/
83 * Set a peer flag of given peer context.
85 #define SET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) |= (mask))
88 * Get peer flag of given peer context.
90 #define check_peer_flag_set(peer_ctx, mask)\
91 ((peer_ctx->peer_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
94 * Unset flag of given peer context.
96 #define UNSET_PEER_FLAG(peer_ctx, mask) ((peer_ctx->peer_flags) &= ~(mask))
99 * Get channel flag of given channel context.
101 #define check_channel_flag_set(channel_flags, mask)\
102 ((*channel_flags) & (mask) ? GNUNET_YES : GNUNET_NO)
105 * Unset flag of given channel context.
107 #define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= ~(mask))
112 * Pending operation on peer consisting of callback and closure
114 * When an operation cannot be executed right now this struct is used to store
115 * the callback and closure for later execution.
131 * List containing all messages that are yet to be send
133 * This is used to keep track of all messages that have not been sent yet. When
134 * a peer is to be removed the pending messages can be removed properly.
136 struct PendingMessage
141 struct PendingMessage *next;
142 struct PendingMessage *prev;
145 * The envelope to the corresponding message
147 struct GNUNET_MQ_Envelope *ev;
150 * The corresponding context
152 struct PeerContext *peer_ctx;
161 * @brief Context for a channel
166 * Struct used to keep track of other peer's status
168 * This is stored in a multipeermap.
169 * It contains information such as cadet channels, a message queue for sending,
170 * status about the channels, the pending operations on this peer and some flags
171 * about the status of the peer itself. (live, valid, ...)
176 * Message queue open to client
178 struct GNUNET_MQ_Handle *mq;
181 * Channel open to client.
183 struct ChannelCtx *send_channel_ctx;
186 * Channel open from client.
188 struct ChannelCtx *recv_channel_ctx;
191 * Array of pending operations on this peer.
193 struct PeerPendingOp *pending_ops;
196 * Handle to the callback given to cadet_ntfy_tmt_rdy()
198 * To be canceled on shutdown.
200 struct PendingMessage *liveliness_check_pending;
203 * Number of pending operations.
205 unsigned int num_pending_ops;
208 * Identity of the peer
210 struct GNUNET_PeerIdentity peer_id;
213 * Flags indicating status of peer
218 * Last time we received something from that peer.
220 struct GNUNET_TIME_Absolute last_message_recv;
223 * Last time we received a keepalive message.
225 struct GNUNET_TIME_Absolute last_keepalive;
228 * DLL with all messages that are yet to be sent
230 struct PendingMessage *pending_messages_head;
231 struct PendingMessage *pending_messages_tail;
234 * This is pobably followed by 'statistical' data (when we first saw
235 * it, how did we get its ID, how many pushes (in a timeinterval),
241 * @brief Closure to #valid_peer_iterator
243 struct PeersIteratorCls
248 PeersIterator iterator;
251 * Closure to iterator
257 * @brief Context for a channel
262 * @brief The channel itself
264 struct GNUNET_CADET_Channel *channel;
267 * @brief The peer context associated with the channel
269 struct PeerContext *peer_ctx;
272 * @brief When channel destruction needs to be delayed (because it is called
273 * from within the cadet routine of another channel destruction) this task
274 * refers to the respective _SCHEDULER_Task.
276 struct GNUNET_SCHEDULER_Task *destruction_task;
280 * @brief Hashmap of valid peers.
282 static struct GNUNET_CONTAINER_MultiPeerMap *valid_peers;
285 * @brief Maximum number of valid peers to keep.
286 * TODO read from config
288 static uint32_t num_valid_peers_max = UINT32_MAX;
291 * @brief Filename of the file that stores the valid peers persistently.
293 static char *filename_valid_peers;
296 * Set of all peers to keep track of them.
298 static struct GNUNET_CONTAINER_MultiPeerMap *peer_map;
303 static struct GNUNET_CADET_Handle *cadet_handle;
307 * @brief Get the #PeerContext associated with a peer
309 * @param peer the peer id
311 * @return the #PeerContext
313 static struct PeerContext *
314 get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
316 struct PeerContext *ctx;
319 ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
320 GNUNET_assert (GNUNET_YES == ret);
321 ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer);
322 GNUNET_assert (NULL != ctx);
327 * @brief Check whether we have information about the given peer.
329 * FIXME probably deprecated. Make this the new _online.
331 * @param peer peer in question
333 * @return #GNUNET_YES if peer is known
334 * #GNUNET_NO if peer is not knwon
337 check_peer_known (const struct GNUNET_PeerIdentity *peer)
339 if (NULL != peer_map)
341 return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer);
350 * @brief Create a new #PeerContext and insert it into the peer map
352 * @param peer the peer to create the #PeerContext for
354 * @return the #PeerContext
356 static struct PeerContext *
357 create_peer_ctx (const struct GNUNET_PeerIdentity *peer)
359 struct PeerContext *ctx;
362 GNUNET_assert (GNUNET_NO == check_peer_known (peer));
364 ctx = GNUNET_new (struct PeerContext);
365 ctx->peer_id = *peer;
366 ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx,
367 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
368 GNUNET_assert (GNUNET_OK == ret);
369 GNUNET_STATISTICS_set (stats,
371 GNUNET_CONTAINER_multipeermap_size (peer_map),
378 * @brief Create or get a #PeerContext
380 * @param peer the peer to get the associated context to
382 * @return the context
384 static struct PeerContext *
385 create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer)
387 if (GNUNET_NO == check_peer_known (peer))
389 return create_peer_ctx (peer);
391 return get_peer_ctx (peer);
396 * @brief Check whether we have a connection to this @a peer
398 * Also sets the #Peers_ONLINE flag accordingly
400 * @param peer the peer in question
402 * @return #GNUNET_YES if we are connected
403 * #GNUNET_NO otherwise
406 check_connected (const struct GNUNET_PeerIdentity *peer)
408 struct PeerContext *peer_ctx;
410 /* If we don't know about this peer we don't know whether it's online */
411 if (GNUNET_NO == check_peer_known (peer))
415 /* Get the context */
416 peer_ctx = get_peer_ctx (peer);
417 /* If we have no channel to this peer we don't know whether it's online */
418 if ( (NULL == peer_ctx->send_channel_ctx) &&
419 (NULL == peer_ctx->recv_channel_ctx) )
421 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
424 /* Otherwise (if we have a channel, we know that it's online */
425 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
431 * @brief The closure to #get_rand_peer_iterator.
433 struct GetRandPeerIteratorCls
436 * @brief The index of the peer to return.
437 * Will be decreased until 0.
438 * Then current peer is returned.
443 * @brief Pointer to peer to return.
445 const struct GNUNET_PeerIdentity *peer;
450 * @brief Iterator function for #get_random_peer_from_peermap.
452 * Implements #GNUNET_CONTAINER_PeerMapIterator.
453 * Decreases the index until the index is null.
454 * Then returns the current peer.
456 * @param cls the #GetRandPeerIteratorCls containing index and peer
457 * @param peer current peer
458 * @param value unused
460 * @return #GNUNET_YES if we should continue to
465 get_rand_peer_iterator (void *cls,
466 const struct GNUNET_PeerIdentity *peer,
469 struct GetRandPeerIteratorCls *iterator_cls = cls;
472 if (0 >= iterator_cls->index)
474 iterator_cls->peer = peer;
477 iterator_cls->index--;
483 * @brief Get a random peer from @a peer_map
485 * @param peer_map the peer_map to get the peer from
487 * @return a random peer
489 static const struct GNUNET_PeerIdentity *
490 get_random_peer_from_peermap (const struct
491 GNUNET_CONTAINER_MultiPeerMap *peer_map)
493 struct GetRandPeerIteratorCls *iterator_cls;
494 const struct GNUNET_PeerIdentity *ret;
496 iterator_cls = GNUNET_new (struct GetRandPeerIteratorCls);
497 iterator_cls->index = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
498 GNUNET_CONTAINER_multipeermap_size (peer_map));
499 (void) GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
500 get_rand_peer_iterator,
502 ret = iterator_cls->peer;
503 GNUNET_free (iterator_cls);
509 * @brief Add a given @a peer to valid peers.
511 * If valid peers are already #num_valid_peers_max, delete a peer previously.
513 * @param peer the peer that is added to the valid peers.
515 * @return #GNUNET_YES if no other peer had to be removed
516 * #GNUNET_NO otherwise
519 add_valid_peer (const struct GNUNET_PeerIdentity *peer)
521 const struct GNUNET_PeerIdentity *rand_peer;
525 while (GNUNET_CONTAINER_multipeermap_size (valid_peers) >= num_valid_peers_max)
527 rand_peer = get_random_peer_from_peermap (valid_peers);
528 GNUNET_CONTAINER_multipeermap_remove_all (valid_peers, rand_peer);
531 (void) GNUNET_CONTAINER_multipeermap_put (valid_peers, peer, NULL,
532 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
533 GNUNET_STATISTICS_set (stats,
535 GNUNET_CONTAINER_multipeermap_size (valid_peers),
541 remove_pending_message (struct PendingMessage *pending_msg, int cancel);
544 * @brief Set the peer flag to living and
545 * call the pending operations on this peer.
547 * Also adds peer to #valid_peers.
549 * @param peer_ctx the #PeerContext of the peer to set live
552 set_peer_live (struct PeerContext *peer_ctx)
554 struct GNUNET_PeerIdentity *peer;
557 peer = &peer_ctx->peer_id;
558 LOG (GNUNET_ERROR_TYPE_DEBUG,
559 "Peer %s is live and valid, calling %i pending operations on it\n",
561 peer_ctx->num_pending_ops);
563 if (NULL != peer_ctx->liveliness_check_pending)
565 LOG (GNUNET_ERROR_TYPE_DEBUG,
566 "Removing pending liveliness check for peer %s\n",
567 GNUNET_i2s (&peer_ctx->peer_id));
568 // TODO wait until cadet sets mq->cancel_impl
569 //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
570 remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
571 peer_ctx->liveliness_check_pending = NULL;
574 (void) add_valid_peer (peer);
575 SET_PEER_FLAG (peer_ctx, Peers_ONLINE);
577 /* Call pending operations */
578 for (i = 0; i < peer_ctx->num_pending_ops; i++)
580 peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer);
582 GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0);
586 cleanup_destroyed_channel (void *cls,
587 const struct GNUNET_CADET_Channel *channel);
589 /* Declaration of handlers */
591 handle_peer_check (void *cls,
592 const struct GNUNET_MessageHeader *msg);
595 handle_peer_push (void *cls,
596 const struct GNUNET_MessageHeader *msg);
599 handle_peer_pull_request (void *cls,
600 const struct GNUNET_MessageHeader *msg);
603 check_peer_pull_reply (void *cls,
604 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
607 handle_peer_pull_reply (void *cls,
608 const struct GNUNET_RPS_P2P_PullReplyMessage *msg);
610 /* End declaration of handlers */
613 * @brief Allocate memory for a new channel context and insert it into DLL
615 * @param peer_ctx context of the according peer
617 * @return The channel context
619 static struct ChannelCtx *
620 add_channel_ctx (struct PeerContext *peer_ctx)
622 struct ChannelCtx *channel_ctx;
623 channel_ctx = GNUNET_new (struct ChannelCtx);
624 channel_ctx->peer_ctx = peer_ctx;
630 * @brief Free memory and NULL pointers.
632 * @param channel_ctx The channel context.
635 remove_channel_ctx (struct ChannelCtx *channel_ctx)
637 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
639 if (NULL != channel_ctx->destruction_task)
641 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
642 channel_ctx->destruction_task = NULL;
645 GNUNET_free (channel_ctx);
647 if (NULL == peer_ctx) return;
648 if (channel_ctx == peer_ctx->send_channel_ctx)
650 peer_ctx->send_channel_ctx = NULL;
653 else if (channel_ctx == peer_ctx->recv_channel_ctx)
655 peer_ctx->recv_channel_ctx = NULL;
661 * @brief Get the channel of a peer. If not existing, create.
663 * @param peer the peer id
664 * @return the #GNUNET_CADET_Channel used to send data to @a peer
666 struct GNUNET_CADET_Channel *
667 get_channel (const struct GNUNET_PeerIdentity *peer)
669 struct PeerContext *peer_ctx;
670 struct GNUNET_PeerIdentity *ctx_peer;
671 /* There exists a copy-paste-clone in run() */
672 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
673 GNUNET_MQ_hd_fixed_size (peer_check,
674 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
675 struct GNUNET_MessageHeader,
677 GNUNET_MQ_hd_fixed_size (peer_push,
678 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
679 struct GNUNET_MessageHeader,
681 GNUNET_MQ_hd_fixed_size (peer_pull_request,
682 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
683 struct GNUNET_MessageHeader,
685 GNUNET_MQ_hd_var_size (peer_pull_reply,
686 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
687 struct GNUNET_RPS_P2P_PullReplyMessage,
689 GNUNET_MQ_handler_end ()
693 peer_ctx = get_peer_ctx (peer);
694 if (NULL == peer_ctx->send_channel_ctx)
696 LOG (GNUNET_ERROR_TYPE_DEBUG,
697 "Trying to establish channel to peer %s\n",
699 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
701 peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
702 peer_ctx->send_channel_ctx->channel =
703 GNUNET_CADET_channel_create (cadet_handle,
704 peer_ctx->send_channel_ctx, /* context */
707 GNUNET_CADET_OPTION_RELIABLE,
708 NULL, /* WindowSize handler */
709 &cleanup_destroyed_channel, /* Disconnect handler */
712 GNUNET_assert (NULL != peer_ctx->send_channel_ctx);
713 GNUNET_assert (NULL != peer_ctx->send_channel_ctx->channel);
714 return peer_ctx->send_channel_ctx->channel;
719 * Get the message queue (#GNUNET_MQ_Handle) of a specific peer.
721 * If we already have a message queue open to this client,
722 * simply return it, otherways create one.
724 * @param peer the peer to get the mq to
725 * @return the #GNUNET_MQ_Handle
727 static struct GNUNET_MQ_Handle *
728 get_mq (const struct GNUNET_PeerIdentity *peer)
730 struct PeerContext *peer_ctx;
732 peer_ctx = get_peer_ctx (peer);
734 if (NULL == peer_ctx->mq)
736 peer_ctx->mq = GNUNET_CADET_get_mq (get_channel (peer));
742 * @brief Add an envelope to a message passed to mq to list of pending messages
744 * @param peer peer the message was sent to
745 * @param ev envelope to the message
746 * @param type type of the message to be sent
747 * @return pointer to pending message
749 static struct PendingMessage *
750 insert_pending_message (const struct GNUNET_PeerIdentity *peer,
751 struct GNUNET_MQ_Envelope *ev,
754 struct PendingMessage *pending_msg;
755 struct PeerContext *peer_ctx;
757 peer_ctx = get_peer_ctx (peer);
758 pending_msg = GNUNET_new (struct PendingMessage);
759 pending_msg->ev = ev;
760 pending_msg->peer_ctx = peer_ctx;
761 pending_msg->type = type;
762 GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head,
763 peer_ctx->pending_messages_tail,
770 * @brief Remove a pending message from the respective DLL
772 * @param pending_msg the pending message to remove
773 * @param cancel whether to cancel the pending message, too
776 remove_pending_message (struct PendingMessage *pending_msg, int cancel)
778 struct PeerContext *peer_ctx;
781 peer_ctx = pending_msg->peer_ctx;
782 GNUNET_assert (NULL != peer_ctx);
783 GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head,
784 peer_ctx->pending_messages_tail,
786 // TODO wait for the cadet implementation of message cancellation
787 //if (GNUNET_YES == cancel)
789 // GNUNET_MQ_send_cancel (pending_msg->ev);
791 GNUNET_free (pending_msg);
796 * @brief This is called in response to the first message we sent as a
799 * @param cls #PeerContext of peer with pending liveliness check
802 mq_liveliness_check_successful (void *cls)
804 struct PeerContext *peer_ctx = cls;
806 if (NULL != peer_ctx->liveliness_check_pending)
808 LOG (GNUNET_ERROR_TYPE_DEBUG,
809 "Liveliness check for peer %s was successfull\n",
810 GNUNET_i2s (&peer_ctx->peer_id));
811 remove_pending_message (peer_ctx->liveliness_check_pending, GNUNET_YES);
812 peer_ctx->liveliness_check_pending = NULL;
813 set_peer_live (peer_ctx);
818 * Issue a check whether peer is live
820 * @param peer_ctx the context of the peer
823 check_peer_live (struct PeerContext *peer_ctx)
825 LOG (GNUNET_ERROR_TYPE_DEBUG,
826 "Get informed about peer %s getting live\n",
827 GNUNET_i2s (&peer_ctx->peer_id));
829 struct GNUNET_MQ_Handle *mq;
830 struct GNUNET_MQ_Envelope *ev;
832 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE);
833 peer_ctx->liveliness_check_pending =
834 insert_pending_message (&peer_ctx->peer_id, ev, "Check liveliness");
835 mq = get_mq (&peer_ctx->peer_id);
836 GNUNET_MQ_notify_sent (ev,
837 mq_liveliness_check_successful,
839 GNUNET_MQ_send (mq, ev);
840 GNUNET_STATISTICS_update (stats,
841 "# pending liveliness checks",
848 * @brief Check whether function of type #PeerOp was already scheduled
850 * The array with pending operations will probably never grow really big, so
851 * iterating over it should be ok.
853 * @param peer the peer to check
854 * @param peer_op the operation (#PeerOp) on the peer
856 * @return #GNUNET_YES if this operation is scheduled on that peer
857 * #GNUNET_NO otherwise
860 check_operation_scheduled (const struct GNUNET_PeerIdentity *peer,
861 const PeerOp peer_op)
863 const struct PeerContext *peer_ctx;
866 peer_ctx = get_peer_ctx (peer);
867 for (i = 0; i < peer_ctx->num_pending_ops; i++)
868 if (peer_op == peer_ctx->pending_ops[i].op)
875 * @brief Callback for scheduler to destroy a channel
877 * @param cls Context of the channel
880 destroy_channel (struct ChannelCtx *channel_ctx)
882 struct GNUNET_CADET_Channel *channel;
884 if (NULL != channel_ctx->destruction_task)
886 GNUNET_SCHEDULER_cancel (channel_ctx->destruction_task);
887 channel_ctx->destruction_task = NULL;
889 GNUNET_assert (channel_ctx->channel != NULL);
890 channel = channel_ctx->channel;
891 channel_ctx->channel = NULL;
892 GNUNET_CADET_channel_destroy (channel);
893 remove_channel_ctx (channel_ctx);
898 * @brief Destroy a cadet channel.
900 * This satisfies the function signature of #GNUNET_SCHEDULER_TaskCallback.
905 destroy_channel_cb (void *cls)
907 struct ChannelCtx *channel_ctx = cls;
909 channel_ctx->destruction_task = NULL;
910 destroy_channel (channel_ctx);
915 * @brief Schedule the destruction of a channel for immediately afterwards.
917 * In case a channel is to be destroyed from within the callback to the
918 * destruction of another channel (send channel), we cannot call
919 * GNUNET_CADET_channel_destroy directly, but need to use this scheduling
922 * @param channel_ctx channel to be destroyed.
925 schedule_channel_destruction (struct ChannelCtx *channel_ctx)
927 GNUNET_assert (NULL ==
928 channel_ctx->destruction_task);
929 GNUNET_assert (NULL !=
930 channel_ctx->channel);
931 channel_ctx->destruction_task =
932 GNUNET_SCHEDULER_add_now (&destroy_channel_cb,
940 * @param peer the peer to clean
941 * @return #GNUNET_YES if peer was removed
942 * #GNUNET_NO otherwise
945 destroy_peer (struct PeerContext *peer_ctx)
947 GNUNET_assert (NULL != peer_ctx);
948 GNUNET_assert (NULL != peer_map);
950 GNUNET_CONTAINER_multipeermap_contains (peer_map,
955 SET_PEER_FLAG (peer_ctx, Peers_TO_DESTROY);
956 LOG (GNUNET_ERROR_TYPE_DEBUG,
957 "Going to remove peer %s\n",
958 GNUNET_i2s (&peer_ctx->peer_id));
959 UNSET_PEER_FLAG (peer_ctx, Peers_ONLINE);
961 /* Clear list of pending operations */
962 // TODO this probably leaks memory
963 // ('only' the cls to the function. Not sure what to do with it)
964 GNUNET_array_grow (peer_ctx->pending_ops,
965 peer_ctx->num_pending_ops,
967 /* Remove all pending messages */
968 while (NULL != peer_ctx->pending_messages_head)
970 LOG (GNUNET_ERROR_TYPE_DEBUG,
971 "Removing unsent %s\n",
972 peer_ctx->pending_messages_head->type);
973 /* Cancle pending message, too */
974 if ( (NULL != peer_ctx->liveliness_check_pending) &&
975 (0 == memcmp (peer_ctx->pending_messages_head,
976 peer_ctx->liveliness_check_pending,
977 sizeof (struct PendingMessage))) )
979 peer_ctx->liveliness_check_pending = NULL;
980 GNUNET_STATISTICS_update (stats,
981 "# pending liveliness checks",
985 remove_pending_message (peer_ctx->pending_messages_head,
989 /* If we are still waiting for notification whether this peer is live
990 * cancel the according task */
991 if (NULL != peer_ctx->liveliness_check_pending)
993 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994 "Removing pending liveliness check for peer %s\n",
995 GNUNET_i2s (&peer_ctx->peer_id));
996 // TODO wait until cadet sets mq->cancel_impl
997 //GNUNET_MQ_send_cancel (peer_ctx->liveliness_check_pending->ev);
998 remove_pending_message (peer_ctx->liveliness_check_pending,
1000 peer_ctx->liveliness_check_pending = NULL;
1003 if (NULL != peer_ctx->send_channel_ctx)
1005 /* This is possibly called from within channel destruction */
1006 peer_ctx->send_channel_ctx->peer_ctx = NULL;
1007 schedule_channel_destruction (peer_ctx->send_channel_ctx);
1008 peer_ctx->send_channel_ctx = NULL;
1009 peer_ctx->mq = NULL;
1011 if (NULL != peer_ctx->recv_channel_ctx)
1013 /* This is possibly called from within channel destruction */
1014 peer_ctx->recv_channel_ctx->peer_ctx = NULL;
1015 schedule_channel_destruction (peer_ctx->recv_channel_ctx);
1016 peer_ctx->recv_channel_ctx = NULL;
1020 GNUNET_CONTAINER_multipeermap_remove_all (peer_map,
1021 &peer_ctx->peer_id))
1023 LOG (GNUNET_ERROR_TYPE_WARNING,
1024 "removing peer from peer_map failed\n");
1026 GNUNET_STATISTICS_set (stats,
1028 GNUNET_CONTAINER_multipeermap_size (peer_map),
1030 GNUNET_free (peer_ctx);
1036 * Iterator over hash map entries. Deletes all contexts of peers.
1038 * @param cls closure
1039 * @param key current public key
1040 * @param value value in the hash map
1041 * @return #GNUNET_YES if we should continue to iterate,
1042 * #GNUNET_NO if not.
1045 peermap_clear_iterator (void *cls,
1046 const struct GNUNET_PeerIdentity *key,
1051 destroy_peer (get_peer_ctx (key));
1057 * @brief This is called once a message is sent.
1059 * Removes the pending message
1061 * @param cls type of the message that was sent
1064 mq_notify_sent_cb (void *cls)
1066 struct PendingMessage *pending_msg = (struct PendingMessage *) cls;
1067 LOG (GNUNET_ERROR_TYPE_DEBUG,
1070 if (0 == strncmp ("PULL REPLY", pending_msg->type, 10))
1071 GNUNET_STATISTICS_update(stats, "# pull replys sent", 1, GNUNET_NO);
1072 if (0 == strncmp ("PULL REQUEST", pending_msg->type, 12))
1073 GNUNET_STATISTICS_update(stats, "# pull requests sent", 1, GNUNET_NO);
1074 if (0 == strncmp ("PUSH", pending_msg->type, 4))
1075 GNUNET_STATISTICS_update(stats, "# pushes sent", 1, GNUNET_NO);
1076 /* Do not cancle message */
1077 remove_pending_message (pending_msg, GNUNET_NO);
1082 * @brief Iterator function for #store_valid_peers.
1084 * Implements #GNUNET_CONTAINER_PeerMapIterator.
1085 * Writes single peer to disk.
1087 * @param cls the file handle to write to.
1088 * @param peer current peer
1089 * @param value unused
1091 * @return #GNUNET_YES if we should continue to
1093 * #GNUNET_NO if not.
1096 store_peer_presistently_iterator (void *cls,
1097 const struct GNUNET_PeerIdentity *peer,
1100 const struct GNUNET_DISK_FileHandle *fh = cls;
1101 char peer_string[128];
1110 size = GNUNET_snprintf (peer_string,
1111 sizeof (peer_string),
1113 GNUNET_i2s_full (peer));
1114 GNUNET_assert (53 == size);
1115 ret = GNUNET_DISK_file_write (fh,
1118 GNUNET_assert (size == ret);
1124 * @brief Store the peers currently in #valid_peers to disk.
1127 store_valid_peers ()
1129 struct GNUNET_DISK_FileHandle *fh;
1130 uint32_t number_written_peers;
1133 if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1138 ret = GNUNET_DISK_directory_create_for_file (filename_valid_peers);
1139 if (GNUNET_SYSERR == ret)
1141 LOG (GNUNET_ERROR_TYPE_WARNING,
1142 "Not able to create directory for file `%s'\n",
1143 filename_valid_peers);
1146 else if (GNUNET_NO == ret)
1148 LOG (GNUNET_ERROR_TYPE_WARNING,
1149 "Directory for file `%s' exists but is not writable for us\n",
1150 filename_valid_peers);
1153 fh = GNUNET_DISK_file_open (filename_valid_peers,
1154 GNUNET_DISK_OPEN_WRITE |
1155 GNUNET_DISK_OPEN_CREATE,
1156 GNUNET_DISK_PERM_USER_READ |
1157 GNUNET_DISK_PERM_USER_WRITE);
1160 LOG (GNUNET_ERROR_TYPE_WARNING,
1161 "Not able to write valid peers to file `%s'\n",
1162 filename_valid_peers);
1165 LOG (GNUNET_ERROR_TYPE_DEBUG,
1166 "Writing %u valid peers to disk\n",
1167 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1168 number_written_peers =
1169 GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1170 store_peer_presistently_iterator,
1172 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1173 GNUNET_assert (number_written_peers ==
1174 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1179 * @brief Convert string representation of peer id to peer id.
1181 * Counterpart to #GNUNET_i2s_full.
1183 * @param string_repr The string representation of the peer id
1185 * @return The peer id
1187 static const struct GNUNET_PeerIdentity *
1188 s2i_full (const char *string_repr)
1190 struct GNUNET_PeerIdentity *peer;
1194 peer = GNUNET_new (struct GNUNET_PeerIdentity);
1195 len = strlen (string_repr);
1198 LOG (GNUNET_ERROR_TYPE_WARNING,
1199 "Not able to convert string representation of PeerID to PeerID\n"
1200 "Sting representation: %s (len %lu) - too short\n",
1209 ret = GNUNET_CRYPTO_eddsa_public_key_from_string (string_repr,
1212 if (GNUNET_OK != ret)
1214 LOG (GNUNET_ERROR_TYPE_WARNING,
1215 "Not able to convert string representation of PeerID to PeerID\n"
1216 "Sting representation: %s\n",
1225 * @brief Restore the peers on disk to #valid_peers.
1228 restore_valid_peers ()
1232 struct GNUNET_DISK_FileHandle *fh;
1237 const struct GNUNET_PeerIdentity *peer;
1239 if (0 == strncmp ("DISABLE", filename_valid_peers, 7))
1244 if (GNUNET_OK != GNUNET_DISK_file_test (filename_valid_peers))
1248 fh = GNUNET_DISK_file_open (filename_valid_peers,
1249 GNUNET_DISK_OPEN_READ,
1250 GNUNET_DISK_PERM_NONE);
1251 GNUNET_assert (NULL != fh);
1252 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_handle_size (fh, &file_size));
1253 num_peers = file_size / 53;
1254 buf = GNUNET_malloc (file_size);
1255 size_read = GNUNET_DISK_file_read (fh, buf, file_size);
1256 GNUNET_assert (size_read == file_size);
1257 LOG (GNUNET_ERROR_TYPE_DEBUG,
1258 "Restoring %" PRIu32 " peers from file `%s'\n",
1260 filename_valid_peers);
1261 for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
1263 str_repr = GNUNET_strndup (iter_buf, 53);
1264 peer = s2i_full (str_repr);
1265 GNUNET_free (str_repr);
1266 add_valid_peer (peer);
1267 LOG (GNUNET_ERROR_TYPE_DEBUG,
1268 "Restored valid peer %s from disk\n",
1269 GNUNET_i2s_full (peer));
1273 LOG (GNUNET_ERROR_TYPE_DEBUG,
1274 "num_peers: %" PRIu32 ", _size (valid_peers): %u\n",
1276 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1277 if (num_peers != GNUNET_CONTAINER_multipeermap_size (valid_peers))
1279 LOG (GNUNET_ERROR_TYPE_WARNING,
1280 "Number of restored peers does not match file size. Have probably duplicates.\n");
1282 GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
1283 LOG (GNUNET_ERROR_TYPE_DEBUG,
1284 "Restored %u valid peers from disk\n",
1285 GNUNET_CONTAINER_multipeermap_size (valid_peers));
1290 * @brief Initialise storage of peers
1292 * @param fn_valid_peers filename of the file used to store valid peer ids
1293 * @param cadet_h cadet handle
1294 * @param own_id own peer identity
1297 initialise_peers (char* fn_valid_peers,
1298 struct GNUNET_CADET_Handle *cadet_h)
1300 filename_valid_peers = GNUNET_strdup (fn_valid_peers);
1301 cadet_handle = cadet_h;
1302 peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1303 valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
1304 restore_valid_peers ();
1309 * @brief Delete storage of peers that was created with #initialise_peers ()
1314 if (GNUNET_SYSERR ==
1315 GNUNET_CONTAINER_multipeermap_iterate (peer_map,
1316 &peermap_clear_iterator,
1319 LOG (GNUNET_ERROR_TYPE_WARNING,
1320 "Iteration destroying peers was aborted.\n");
1322 GNUNET_CONTAINER_multipeermap_destroy (peer_map);
1324 store_valid_peers ();
1325 GNUNET_free (filename_valid_peers);
1326 filename_valid_peers = NULL;
1327 GNUNET_CONTAINER_multipeermap_destroy (valid_peers);
1333 * Iterator over #valid_peers hash map entries.
1335 * @param cls closure - unused
1336 * @param peer current peer id
1337 * @param value value in the hash map - unused
1338 * @return #GNUNET_YES if we should continue to
1340 * #GNUNET_NO if not.
1343 valid_peer_iterator (void *cls,
1344 const struct GNUNET_PeerIdentity *peer,
1347 struct PeersIteratorCls *it_cls = cls;
1350 return it_cls->iterator (it_cls->cls,
1356 * @brief Get all currently known, valid peer ids.
1358 * @param it function to call on each peer id
1359 * @param it_cls extra argument to @a it
1360 * @return the number of key value pairs processed,
1361 * #GNUNET_SYSERR if it aborted iteration
1364 get_valid_peers (PeersIterator iterator,
1367 struct PeersIteratorCls *cls;
1370 cls = GNUNET_new (struct PeersIteratorCls);
1371 cls->iterator = iterator;
1373 ret = GNUNET_CONTAINER_multipeermap_iterate (valid_peers,
1374 valid_peer_iterator,
1382 * @brief Add peer to known peers.
1384 * This function is called on new peer_ids from 'external' sources
1385 * (client seed, cadet get_peers(), ...)
1387 * @param peer the new #GNUNET_PeerIdentity
1389 * @return #GNUNET_YES if peer was inserted
1390 * #GNUNET_NO otherwise
1393 insert_peer (const struct GNUNET_PeerIdentity *peer)
1395 if (GNUNET_YES == check_peer_known (peer))
1397 return GNUNET_NO; /* We already know this peer - nothing to do */
1399 (void) create_peer_ctx (peer);
1405 * @brief Check whether flags on a peer are set.
1407 * @param peer the peer to check the flag of
1408 * @param flags the flags to check
1410 * @return #GNUNET_SYSERR if peer is not known
1411 * #GNUNET_YES if all given flags are set
1412 * #GNUNET_NO otherwise
1415 check_peer_flag (const struct GNUNET_PeerIdentity *peer,
1416 enum Peers_PeerFlags flags)
1418 struct PeerContext *peer_ctx;
1420 if (GNUNET_NO == check_peer_known (peer))
1422 return GNUNET_SYSERR;
1424 peer_ctx = get_peer_ctx (peer);
1425 return check_peer_flag_set (peer_ctx, flags);
1429 * @brief Try connecting to a peer to see whether it is online
1431 * If not known yet, insert into known peers
1433 * @param peer the peer whose liveliness is to be checked
1434 * @return #GNUNET_YES if the check was issued
1435 * #GNUNET_NO otherwise
1438 issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer)
1440 struct PeerContext *peer_ctx;
1442 (void) insert_peer (peer);
1443 peer_ctx = get_peer_ctx (peer);
1444 if ( (GNUNET_NO == check_peer_flag (peer, Peers_ONLINE)) &&
1445 (NULL == peer_ctx->liveliness_check_pending) )
1447 check_peer_live (peer_ctx);
1455 * @brief Check if peer is removable.
1458 * - a recv channel exists
1459 * - there are pending messages
1460 * - there is no pending pull reply
1462 * @param peer the peer in question
1463 * @return #GNUNET_YES if peer is removable
1464 * #GNUNET_NO if peer is NOT removable
1465 * #GNUNET_SYSERR if peer is not known
1468 check_removable (const struct GNUNET_PeerIdentity *peer)
1470 struct PeerContext *peer_ctx;
1472 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer))
1474 return GNUNET_SYSERR;
1477 peer_ctx = get_peer_ctx (peer);
1478 if ( (NULL != peer_ctx->recv_channel_ctx) ||
1479 (NULL != peer_ctx->pending_messages_head) ||
1480 (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) )
1489 * @brief Check whether @a peer is actually a peer.
1491 * A valid peer is a peer that we know exists eg. we were connected to once.
1493 * @param peer peer in question
1495 * @return #GNUNET_YES if peer is valid
1496 * #GNUNET_NO if peer is not valid
1499 check_peer_valid (const struct GNUNET_PeerIdentity *peer)
1501 return GNUNET_CONTAINER_multipeermap_contains (valid_peers, peer);
1506 * @brief Indicate that we want to send to the other peer
1508 * This establishes a sending channel
1510 * @param peer the peer to establish channel to
1513 indicate_sending_intention (const struct GNUNET_PeerIdentity *peer)
1515 GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1516 (void) get_channel (peer);
1521 * @brief Check whether other peer has the intention to send/opened channel
1524 * @param peer the peer in question
1526 * @return #GNUNET_YES if peer has the intention to send
1527 * #GNUNET_NO otherwise
1530 check_peer_send_intention (const struct GNUNET_PeerIdentity *peer)
1532 const struct PeerContext *peer_ctx;
1534 peer_ctx = get_peer_ctx (peer);
1535 if (NULL != peer_ctx->recv_channel_ctx)
1544 * Handle the channel a peer opens to us.
1546 * @param cls The closure
1547 * @param channel The channel the peer wants to establish
1548 * @param initiator The peer's peer ID
1550 * @return initial channel context for the channel
1551 * (can be NULL -- that's not an error)
1554 handle_inbound_channel (void *cls,
1555 struct GNUNET_CADET_Channel *channel,
1556 const struct GNUNET_PeerIdentity *initiator)
1558 struct PeerContext *peer_ctx;
1559 struct GNUNET_PeerIdentity *ctx_peer;
1560 struct ChannelCtx *channel_ctx;
1563 LOG (GNUNET_ERROR_TYPE_DEBUG,
1564 "New channel was established to us (Peer %s).\n",
1565 GNUNET_i2s (initiator));
1566 GNUNET_assert (NULL != channel); /* according to cadet API */
1567 /* Make sure we 'know' about this peer */
1568 peer_ctx = create_or_get_peer_ctx (initiator);
1569 set_peer_live (peer_ctx);
1570 ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
1571 *ctx_peer = *initiator;
1572 channel_ctx = add_channel_ctx (peer_ctx);
1573 channel_ctx->channel = channel;
1574 /* We only accept one incoming channel per peer */
1575 if (GNUNET_YES == check_peer_send_intention (initiator))
1577 LOG (GNUNET_ERROR_TYPE_WARNING,
1578 "Already got one receive channel. Destroying old one.\n");
1579 GNUNET_break_op (0);
1580 destroy_channel (peer_ctx->recv_channel_ctx);
1581 peer_ctx->recv_channel_ctx = channel_ctx;
1582 /* return the channel context */
1585 peer_ctx->recv_channel_ctx = channel_ctx;
1591 * @brief Check whether a sending channel towards the given peer exists
1593 * @param peer the peer to check for
1595 * @return #GNUNET_YES if a sending channel towards that peer exists
1596 * #GNUNET_NO otherwise
1599 check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer)
1601 struct PeerContext *peer_ctx;
1603 if (GNUNET_NO == check_peer_known (peer))
1604 { /* If no such peer exists, there is no channel */
1607 peer_ctx = get_peer_ctx (peer);
1608 if (NULL == peer_ctx->send_channel_ctx)
1617 * @brief Destroy the send channel of a peer e.g. stop indicating a sending
1618 * intention to another peer
1620 * @peer the peer identity of the peer whose sending channel to destroy
1621 * @return #GNUNET_YES if channel was destroyed
1622 * #GNUNET_NO otherwise
1625 destroy_sending_channel (const struct GNUNET_PeerIdentity *peer)
1627 struct PeerContext *peer_ctx;
1629 if (GNUNET_NO == check_peer_known (peer))
1633 peer_ctx = get_peer_ctx (peer);
1634 if (NULL != peer_ctx->send_channel_ctx)
1636 destroy_channel (peer_ctx->send_channel_ctx);
1637 (void) check_connected (peer);
1644 * @brief Send a message to another peer.
1646 * Keeps track about pending messages so they can be properly removed when the
1647 * peer is destroyed.
1649 * @param peer receeiver of the message
1650 * @param ev envelope of the message
1651 * @param type type of the message
1654 send_message (const struct GNUNET_PeerIdentity *peer,
1655 struct GNUNET_MQ_Envelope *ev,
1658 struct PendingMessage *pending_msg;
1659 struct GNUNET_MQ_Handle *mq;
1661 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1662 "Sending message to %s of type %s\n",
1665 pending_msg = insert_pending_message (peer, ev, type);
1667 GNUNET_MQ_notify_sent (ev,
1670 GNUNET_MQ_send (mq, ev);
1674 * @brief Schedule a operation on given peer
1676 * Avoids scheduling an operation twice.
1678 * @param peer the peer we want to schedule the operation for once it gets live
1680 * @return #GNUNET_YES if the operation was scheduled
1681 * #GNUNET_NO otherwise
1684 schedule_operation (const struct GNUNET_PeerIdentity *peer,
1685 const PeerOp peer_op)
1687 struct PeerPendingOp pending_op;
1688 struct PeerContext *peer_ctx;
1690 GNUNET_assert (GNUNET_YES == check_peer_known (peer));
1692 //TODO if LIVE/ONLINE execute immediately
1694 if (GNUNET_NO == check_operation_scheduled (peer, peer_op))
1696 peer_ctx = get_peer_ctx (peer);
1697 pending_op.op = peer_op;
1698 pending_op.op_cls = NULL;
1699 GNUNET_array_append (peer_ctx->pending_ops,
1700 peer_ctx->num_pending_ops,
1707 /***********************************************************************
1708 * /Old gnunet-service-rps_peers.c
1709 ***********************************************************************/
1712 /***********************************************************************
1713 * Housekeeping with clients
1714 ***********************************************************************/
1717 * Closure used to pass the client and the id to the callback
1718 * that replies to a client's request
1725 struct ReplyCls *next;
1726 struct ReplyCls *prev;
1729 * The identifier of the request
1734 * The handle to the request
1736 struct RPS_SamplerRequestHandle *req_handle;
1739 * The client handle to send the reply to
1741 struct ClientContext *cli_ctx;
1746 * Struct used to store the context of a connected client.
1748 struct ClientContext
1753 struct ClientContext *next;
1754 struct ClientContext *prev;
1757 * The message queue to communicate with the client.
1759 struct GNUNET_MQ_Handle *mq;
1762 * @brief How many updates this client expects to receive.
1764 int64_t view_updates_left;
1767 * @brief Whether this client wants to receive stream updates.
1768 * Either #GNUNET_YES or #GNUNET_NO
1770 int8_t stream_update;
1773 * The client handle to send the reply to
1775 struct GNUNET_SERVICE_Client *client;
1779 * DLL with all clients currently connected to us
1781 struct ClientContext *cli_ctx_head;
1782 struct ClientContext *cli_ctx_tail;
1784 /***********************************************************************
1785 * /Housekeeping with clients
1786 ***********************************************************************/
1792 /***********************************************************************
1794 ***********************************************************************/
1797 * Sampler used for the Brahms protocol itself.
1799 static struct RPS_Sampler *prot_sampler;
1802 * Name to log view to
1804 static const char *file_name_view_log;
1808 * Name to log number of observed peers to
1810 static const char *file_name_observed_log;
1813 * @brief Count the observed peers
1815 static uint32_t num_observed_peers;
1818 * @brief Multipeermap (ab-) used to count unique peer_ids
1820 static struct GNUNET_CONTAINER_MultiPeerMap *observed_unique_peers;
1821 #endif /* TO_FILE */
1824 * The size of sampler we need to be able to satisfy the Brahms protocol's
1825 * need of random peers.
1827 * This is one minimum size the sampler grows to.
1829 static unsigned int sampler_size_est_need;
1832 * @brief This is the minimum estimate used as sampler size.
1834 * It is configured by the user.
1836 static unsigned int sampler_size_est_min;
1839 * @brief This is the estimate used as view size.
1841 * It is initialised with the minimum
1843 static unsigned int view_size_est_need;
1846 * @brief This is the minimum estimate used as view size.
1848 * It is configured by the user.
1850 static unsigned int view_size_est_min;
1853 * Percentage of total peer number in the view
1854 * to send random PUSHes to
1859 * Percentage of total peer number in the view
1860 * to send random PULLs to
1865 * Identifier for the main task that runs periodically.
1867 static struct GNUNET_SCHEDULER_Task *do_round_task;
1870 * Time inverval the do_round task runs in.
1872 static struct GNUNET_TIME_Relative round_interval;
1875 * List to store peers received through pushes temporary.
1877 static struct CustomPeerMap *push_map;
1880 * List to store peers received through pulls temporary.
1882 static struct CustomPeerMap *pull_map;
1887 static struct GNUNET_NSE_Handle *nse;
1892 static struct GNUNET_CADET_Handle *cadet_handle;
1895 * @brief Port to communicate to other peers.
1897 static struct GNUNET_CADET_Port *cadet_port;
1900 * Handler to PEERINFO.
1902 static struct GNUNET_PEERINFO_Handle *peerinfo_handle;
1905 * Handle for cancellation of iteration over peers.
1907 static struct GNUNET_PEERINFO_NotifyContext *peerinfo_notify_handle;
1910 #ifdef ENABLE_MALICIOUS
1912 * Type of malicious peer
1914 * 0 Don't act malicious at all - Default
1915 * 1 Try to maximise representation
1916 * 2 Try to partition the network
1919 static uint32_t mal_type;
1922 * Other malicious peers
1924 static struct GNUNET_PeerIdentity *mal_peers;
1927 * Hashmap of malicious peers used as set.
1928 * Used to more efficiently check whether we know that peer.
1930 static struct GNUNET_CONTAINER_MultiPeerMap *mal_peer_set;
1933 * Number of other malicious peers
1935 static uint32_t num_mal_peers;
1939 * If type is 2 This struct is used to store the attacked peers in a DLL
1946 struct AttackedPeer *next;
1947 struct AttackedPeer *prev;
1952 struct GNUNET_PeerIdentity peer_id;
1956 * If type is 2 this is the DLL of attacked peers
1958 static struct AttackedPeer *att_peers_head;
1959 static struct AttackedPeer *att_peers_tail;
1962 * This index is used to point to an attacked peer to
1963 * implement the round-robin-ish way to select attacked peers.
1965 static struct AttackedPeer *att_peer_index;
1968 * Hashmap of attacked peers used as set.
1969 * Used to more efficiently check whether we know that peer.
1971 static struct GNUNET_CONTAINER_MultiPeerMap *att_peer_set;
1974 * Number of attacked peers
1976 static uint32_t num_attacked_peers;
1979 * If type is 1 this is the attacked peer
1981 static struct GNUNET_PeerIdentity attacked_peer;
1984 * The limit of PUSHes we can send in one round.
1985 * This is an assumption of the Brahms protocol and either implemented
1988 * assumend to be the bandwidth limitation.
1990 static uint32_t push_limit = 10000;
1991 #endif /* ENABLE_MALICIOUS */
1994 /***********************************************************************
1996 ***********************************************************************/
1999 /***********************************************************************
2001 ***********************************************************************/
2005 * Print peerlist to log.
2008 print_peer_list (struct GNUNET_PeerIdentity *list,
2013 LOG (GNUNET_ERROR_TYPE_DEBUG,
2014 "Printing peer list of length %u at %p:\n",
2017 for (i = 0 ; i < len ; i++)
2019 LOG (GNUNET_ERROR_TYPE_DEBUG,
2021 i, GNUNET_i2s (&list[i]));
2027 * Remove peer from list.
2030 rem_from_list (struct GNUNET_PeerIdentity **peer_list,
2031 unsigned int *list_size,
2032 const struct GNUNET_PeerIdentity *peer)
2035 struct GNUNET_PeerIdentity *tmp;
2039 LOG (GNUNET_ERROR_TYPE_DEBUG,
2040 "Removing peer %s from list at %p\n",
2044 for ( i = 0 ; i < *list_size ; i++ )
2046 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&tmp[i], peer))
2048 if (i < *list_size -1)
2049 { /* Not at the last entry -- shift peers left */
2050 memmove (&tmp[i], &tmp[i +1],
2051 ((*list_size) - i -1) * sizeof (struct GNUNET_PeerIdentity));
2053 /* Remove last entry (should be now useless PeerID) */
2054 GNUNET_array_grow (tmp, *list_size, (*list_size) -1);
2062 * Insert PeerID in #view
2064 * Called once we know a peer is live.
2065 * Implements #PeerOp
2067 * @return GNUNET_OK if peer was actually inserted
2068 * GNUNET_NO if peer was not inserted
2071 insert_in_view_op (void *cls,
2072 const struct GNUNET_PeerIdentity *peer);
2075 * Insert PeerID in #view
2077 * Called once we know a peer is live.
2079 * @return GNUNET_OK if peer was actually inserted
2080 * GNUNET_NO if peer was not inserted
2083 insert_in_view (const struct GNUNET_PeerIdentity *peer)
2088 online = check_peer_flag (peer, Peers_ONLINE);
2089 if ( (GNUNET_NO == online) ||
2090 (GNUNET_SYSERR == online) ) /* peer is not even known */
2092 (void) issue_peer_liveliness_check (peer);
2093 (void) schedule_operation (peer, insert_in_view_op);
2096 /* Open channel towards peer to keep connection open */
2097 indicate_sending_intention (peer);
2098 ret = View_put (peer);
2099 GNUNET_STATISTICS_set (stats, "view size", View_size(), GNUNET_NO);
2105 * @brief Send view to client
2107 * @param cli_ctx the context of the client
2108 * @param view_array the peerids of the view as array (can be empty)
2109 * @param view_size the size of the view array (can be 0)
2112 send_view (const struct ClientContext *cli_ctx,
2113 const struct GNUNET_PeerIdentity *view_array,
2116 struct GNUNET_MQ_Envelope *ev;
2117 struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
2119 if (NULL == view_array)
2121 view_size = View_size ();
2122 view_array = View_get_as_array();
2125 ev = GNUNET_MQ_msg_extra (out_msg,
2126 view_size * sizeof (struct GNUNET_PeerIdentity),
2127 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
2128 out_msg->num_peers = htonl (view_size);
2130 GNUNET_memcpy (&out_msg[1],
2132 view_size * sizeof (struct GNUNET_PeerIdentity));
2133 GNUNET_MQ_send (cli_ctx->mq, ev);
2138 * @brief Send peer from biased stream to client.
2140 * TODO merge with send_view, parameterise
2142 * @param cli_ctx the context of the client
2143 * @param view_array the peerids of the view as array (can be empty)
2144 * @param view_size the size of the view array (can be 0)
2147 send_stream_peers (const struct ClientContext *cli_ctx,
2149 const struct GNUNET_PeerIdentity *peers)
2151 struct GNUNET_MQ_Envelope *ev;
2152 struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
2154 GNUNET_assert (NULL != peers);
2156 ev = GNUNET_MQ_msg_extra (out_msg,
2157 num_peers * sizeof (struct GNUNET_PeerIdentity),
2158 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
2159 out_msg->num_peers = htonl (num_peers);
2161 GNUNET_memcpy (&out_msg[1],
2163 num_peers * sizeof (struct GNUNET_PeerIdentity));
2164 GNUNET_MQ_send (cli_ctx->mq, ev);
2169 * @brief sends updates to clients that are interested
2172 clients_notify_view_update (void)
2174 struct ClientContext *cli_ctx_iter;
2176 const struct GNUNET_PeerIdentity *view_array;
2178 num_peers = View_size ();
2179 view_array = View_get_as_array();
2180 /* check size of view is small enough */
2181 if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
2183 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2184 "View is too big to send\n");
2188 for (cli_ctx_iter = cli_ctx_head;
2189 NULL != cli_ctx_iter;
2190 cli_ctx_iter = cli_ctx_iter->next)
2192 if (1 < cli_ctx_iter->view_updates_left)
2194 /* Client wants to receive limited amount of updates */
2195 cli_ctx_iter->view_updates_left -= 1;
2196 } else if (1 == cli_ctx_iter->view_updates_left)
2198 /* Last update of view for client */
2199 cli_ctx_iter->view_updates_left = -1;
2200 } else if (0 > cli_ctx_iter->view_updates_left) {
2201 /* Client is not interested in updates */
2204 /* else _updates_left == 0 - infinite amount of updates */
2207 send_view (cli_ctx_iter, view_array, num_peers);
2213 * @brief sends updates to clients that are interested
2216 clients_notify_stream_peer (uint64_t num_peers,
2217 const struct GNUNET_PeerIdentity *peers)
2218 // TODO enum StreamPeerSource)
2220 struct ClientContext *cli_ctx_iter;
2222 LOG (GNUNET_ERROR_TYPE_DEBUG,
2223 "Got peer (%s) from biased stream - update all clients\n",
2224 GNUNET_i2s (peers));
2226 for (cli_ctx_iter = cli_ctx_head;
2227 NULL != cli_ctx_iter;
2228 cli_ctx_iter = cli_ctx_iter->next)
2230 if (GNUNET_YES == cli_ctx_iter->stream_update)
2232 send_stream_peers (cli_ctx_iter, num_peers, peers);
2238 * Put random peer from sampler into the view as history update.
2241 hist_update (const struct GNUNET_PeerIdentity *ids,
2248 for (i = 0; i < num_peers; i++)
2251 inserted = insert_in_view (&ids[i]);
2252 if (GNUNET_OK == inserted)
2254 clients_notify_stream_peer (1, &ids[i]);
2256 to_file (file_name_view_log,
2258 GNUNET_i2s_full (ids));
2260 clients_notify_view_update();
2265 * Wrapper around #RPS_sampler_resize()
2267 * If we do not have enough sampler elements, double current sampler size
2268 * If we have more than enough sampler elements, halv current sampler size
2271 resize_wrapper (struct RPS_Sampler *sampler, uint32_t new_size)
2273 unsigned int sampler_size;
2276 // TODO respect the min, max
2277 sampler_size = RPS_sampler_get_size (sampler);
2278 if (sampler_size > new_size * 4)
2280 RPS_sampler_resize (sampler, sampler_size / 2);
2282 else if (sampler_size < new_size)
2284 RPS_sampler_resize (sampler, sampler_size * 2);
2286 LOG (GNUNET_ERROR_TYPE_DEBUG, "sampler_size is now %u\n", sampler_size);
2291 * Add all peers in @a peer_array to @a peer_map used as set.
2293 * @param peer_array array containing the peers
2294 * @param num_peers number of peers in @peer_array
2295 * @param peer_map the peermap to use as set
2298 add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array,
2299 unsigned int num_peers,
2300 struct GNUNET_CONTAINER_MultiPeerMap *peer_map)
2303 if (NULL == peer_map)
2305 LOG (GNUNET_ERROR_TYPE_WARNING,
2306 "Trying to add peers to non-existing peermap.\n");
2310 for (i = 0; i < num_peers; i++)
2312 GNUNET_CONTAINER_multipeermap_put (peer_map,
2315 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
2316 GNUNET_STATISTICS_set (stats,
2318 GNUNET_CONTAINER_multipeermap_size (peer_map),
2325 * Send a PULL REPLY to @a peer_id
2327 * @param peer_id the peer to send the reply to.
2328 * @param peer_ids the peers to send to @a peer_id
2329 * @param num_peer_ids the number of peers to send to @a peer_id
2332 send_pull_reply (const struct GNUNET_PeerIdentity *peer_id,
2333 const struct GNUNET_PeerIdentity *peer_ids,
2334 unsigned int num_peer_ids)
2337 struct GNUNET_MQ_Envelope *ev;
2338 struct GNUNET_RPS_P2P_PullReplyMessage *out_msg;
2340 /* Compute actual size */
2341 send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) +
2342 num_peer_ids * sizeof (struct GNUNET_PeerIdentity);
2344 if (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE < send_size)
2345 /* Compute number of peers to send
2346 * If too long, simply truncate */
2347 // TODO select random ones via permutation
2348 // or even better: do good protocol design
2350 (GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE -
2351 sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2352 sizeof (struct GNUNET_PeerIdentity);
2354 send_size = num_peer_ids;
2356 LOG (GNUNET_ERROR_TYPE_DEBUG,
2357 "Going to send PULL REPLY with %u peers to %s\n",
2358 send_size, GNUNET_i2s (peer_id));
2360 ev = GNUNET_MQ_msg_extra (out_msg,
2361 send_size * sizeof (struct GNUNET_PeerIdentity),
2362 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY);
2363 out_msg->num_peers = htonl (send_size);
2364 GNUNET_memcpy (&out_msg[1], peer_ids,
2365 send_size * sizeof (struct GNUNET_PeerIdentity));
2367 send_message (peer_id, ev, "PULL REPLY");
2368 GNUNET_STATISTICS_update(stats, "# pull reply send issued", 1, GNUNET_NO);
2369 // TODO check with send intention: as send_channel is used/opened we indicate
2370 // a sending intention without intending it.
2371 // -> clean peer afterwards?
2372 // -> use recv_channel?
2377 * Insert PeerID in #pull_map
2379 * Called once we know a peer is live.
2382 insert_in_pull_map (void *cls,
2383 const struct GNUNET_PeerIdentity *peer)
2386 CustomPeerMap_put (pull_map, peer);
2391 * Insert PeerID in #view
2393 * Called once we know a peer is live.
2394 * Implements #PeerOp
2397 insert_in_view_op (void *cls,
2398 const struct GNUNET_PeerIdentity *peer)
2403 inserted = insert_in_view (peer);
2404 if (GNUNET_OK == inserted)
2406 clients_notify_stream_peer (1, peer);
2412 * Update sampler with given PeerID.
2413 * Implements #PeerOp
2416 insert_in_sampler (void *cls,
2417 const struct GNUNET_PeerIdentity *peer)
2420 LOG (GNUNET_ERROR_TYPE_DEBUG,
2421 "Updating samplers with peer %s from insert_in_sampler()\n",
2423 RPS_sampler_update (prot_sampler, peer);
2424 if (0 < RPS_sampler_count_id (prot_sampler, peer))
2426 /* Make sure we 'know' about this peer */
2427 (void) issue_peer_liveliness_check (peer);
2428 /* Establish a channel towards that peer to indicate we are going to send
2430 //indicate_sending_intention (peer);
2433 num_observed_peers++;
2434 GNUNET_CONTAINER_multipeermap_put
2435 (observed_unique_peers,
2438 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2439 uint32_t num_observed_unique_peers = GNUNET_CONTAINER_multipeermap_size (
2440 observed_unique_peers);
2441 to_file (file_name_observed_log,
2442 "%" PRIu32 " %" PRIu32 " %f\n",
2444 num_observed_unique_peers,
2445 1.0*num_observed_unique_peers/num_observed_peers)
2446 #endif /* TO_FILE */
2450 * @brief This is called on peers from external sources (cadet, peerinfo, ...)
2451 * If the peer is not known, liveliness check is issued and it is
2452 * scheduled to be inserted in sampler and view.
2454 * "External sources" refer to every source except the gossip.
2456 * @param peer peer to insert
2459 got_peer (const struct GNUNET_PeerIdentity *peer)
2461 /* If we did not know this peer already, insert it into sampler and view */
2462 if (GNUNET_YES == issue_peer_liveliness_check (peer))
2464 schedule_operation (peer, insert_in_sampler);
2465 schedule_operation (peer, insert_in_view_op);
2467 GNUNET_STATISTICS_update (stats,
2474 * @brief Checks if there is a sending channel and if it is needed
2476 * @param peer the peer whose sending channel is checked
2477 * @return GNUNET_YES if sending channel exists and is still needed
2478 * GNUNET_NO otherwise
2481 check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer)
2483 /* struct GNUNET_CADET_Channel *channel; */
2484 if (GNUNET_NO == check_peer_known (peer))
2488 if (GNUNET_YES == check_sending_channel_exists (peer))
2490 if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) ||
2491 (GNUNET_YES == View_contains_peer (peer)) ||
2492 (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) ||
2493 (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) ||
2494 (GNUNET_YES == check_peer_flag (peer, Peers_PULL_REPLY_PENDING)))
2495 { /* If we want to keep the connection to peer open */
2504 * @brief remove peer from our knowledge, the view, push and pull maps and
2507 * @param peer the peer to remove
2510 remove_peer (const struct GNUNET_PeerIdentity *peer)
2512 (void) View_remove_peer (peer);
2513 CustomPeerMap_remove_peer (pull_map, peer);
2514 CustomPeerMap_remove_peer (push_map, peer);
2515 RPS_sampler_reinitialise_by_value (prot_sampler, peer);
2516 destroy_peer (get_peer_ctx (peer));
2521 * @brief Remove data that is not needed anymore.
2523 * If the sending channel is no longer needed it is destroyed.
2525 * @param peer the peer whose data is about to be cleaned
2528 clean_peer (const struct GNUNET_PeerIdentity *peer)
2530 if (GNUNET_NO == check_sending_channel_needed (peer))
2532 LOG (GNUNET_ERROR_TYPE_DEBUG,
2533 "Going to remove send channel to peer %s\n",
2535 #ifdef ENABLE_MALICIOUS
2536 if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2537 (void) destroy_sending_channel (peer);
2538 #else /* ENABLE_MALICIOUS */
2539 (void) destroy_sending_channel (peer);
2540 #endif /* ENABLE_MALICIOUS */
2543 if ( (GNUNET_NO == check_peer_send_intention (peer)) &&
2544 (GNUNET_NO == View_contains_peer (peer)) &&
2545 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2546 (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) &&
2547 (0 == RPS_sampler_count_id (prot_sampler, peer)) &&
2548 (GNUNET_NO != check_removable (peer)) )
2549 { /* We can safely remove this peer */
2550 LOG (GNUNET_ERROR_TYPE_DEBUG,
2551 "Going to remove peer %s\n",
2560 * @brief This is called when a channel is destroyed.
2562 * Removes peer completely from our knowledge if the send_channel was destroyed
2563 * Otherwise simply delete the recv_channel
2564 * Also check if the knowledge about this peer is still needed.
2565 * If not, remove this peer from our knowledge.
2567 * @param cls The closure
2568 * @param channel The channel being closed
2569 * @param channel_ctx The context associated with this channel
2572 cleanup_destroyed_channel (void *cls,
2573 const struct GNUNET_CADET_Channel *channel)
2575 struct ChannelCtx *channel_ctx = cls;
2576 struct PeerContext *peer_ctx = channel_ctx->peer_ctx;
2580 channel_ctx->channel = NULL;
2581 remove_channel_ctx (channel_ctx);
2582 if (NULL != peer_ctx &&
2583 peer_ctx->send_channel_ctx == channel_ctx)
2585 remove_peer (&peer_ctx->peer_id);
2589 /***********************************************************************
2591 ***********************************************************************/
2594 destroy_cli_ctx (struct ClientContext *cli_ctx)
2596 GNUNET_assert (NULL != cli_ctx);
2597 GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
2600 GNUNET_free (cli_ctx);
2605 * Function called by NSE.
2607 * Updates sizes of sampler list and view and adapt those lists
2611 nse_callback (void *cls,
2612 struct GNUNET_TIME_Absolute timestamp,
2613 double logestimate, double std_dev)
2616 //double scale; // TODO this might go gloabal/config
2620 LOG (GNUNET_ERROR_TYPE_DEBUG,
2621 "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
2622 logestimate, std_dev, RPS_sampler_get_size (prot_sampler));
2624 estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
2625 // GNUNET_NSE_log_estimate_to_n (logestimate);
2626 estimate = pow (estimate, 1.0 / 3);
2627 // TODO add if std_dev is a number
2628 // estimate += (std_dev * scale);
2629 if (view_size_est_min < ceil (estimate))
2631 LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
2632 sampler_size_est_need = estimate;
2633 view_size_est_need = estimate;
2636 LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
2637 //sampler_size_est_need = view_size_est_min;
2638 view_size_est_need = view_size_est_min;
2640 GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_need, GNUNET_NO);
2642 /* If the NSE has changed adapt the lists accordingly */
2643 resize_wrapper (prot_sampler, sampler_size_est_need);
2644 View_change_len (view_size_est_need);
2649 * @brief This function is called, when the client seeds peers.
2650 * It verifies that @a msg is well-formed.
2652 * @param cls the closure (#ClientContext)
2653 * @param msg the message
2654 * @return #GNUNET_OK if @a msg is well-formed
2657 check_client_seed (void *cls, const struct GNUNET_RPS_CS_SeedMessage *msg)
2659 struct ClientContext *cli_ctx = cls;
2660 uint16_t msize = ntohs (msg->header.size);
2661 uint32_t num_peers = ntohl (msg->num_peers);
2663 msize -= sizeof (struct GNUNET_RPS_CS_SeedMessage);
2664 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
2665 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
2668 GNUNET_SERVICE_client_drop (cli_ctx->client);
2669 return GNUNET_SYSERR;
2676 * Handle seed from the client.
2678 * @param cls closure
2679 * @param message the actual message
2682 handle_client_seed (void *cls,
2683 const struct GNUNET_RPS_CS_SeedMessage *msg)
2685 struct ClientContext *cli_ctx = cls;
2686 struct GNUNET_PeerIdentity *peers;
2690 num_peers = ntohl (msg->num_peers);
2691 peers = (struct GNUNET_PeerIdentity *) &msg[1];
2693 LOG (GNUNET_ERROR_TYPE_DEBUG,
2694 "Client seeded peers:\n");
2695 print_peer_list (peers, num_peers);
2697 for (i = 0; i < num_peers; i++)
2699 LOG (GNUNET_ERROR_TYPE_DEBUG,
2700 "Updating samplers with seed %" PRIu32 ": %s\n",
2702 GNUNET_i2s (&peers[i]));
2704 got_peer (&peers[i]);
2706 GNUNET_SERVICE_client_continue (cli_ctx->client);
2711 * Handle RPS request from the client.
2713 * @param cls Client context
2714 * @param message unused
2717 handle_client_view_request (void *cls,
2718 const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
2720 struct ClientContext *cli_ctx = cls;
2721 uint64_t num_updates;
2723 num_updates = ntohl (msg->num_updates);
2725 LOG (GNUNET_ERROR_TYPE_DEBUG,
2726 "Client requested %" PRIu64 " updates of view.\n",
2729 GNUNET_assert (NULL != cli_ctx);
2730 cli_ctx->view_updates_left = num_updates;
2731 send_view (cli_ctx, NULL, 0);
2732 GNUNET_SERVICE_client_continue (cli_ctx->client);
2737 * @brief Handle the cancellation of the view updates.
2739 * @param cls The client context
2743 handle_client_view_cancel (void *cls,
2744 const struct GNUNET_MessageHeader *msg)
2746 struct ClientContext *cli_ctx = cls;
2749 LOG (GNUNET_ERROR_TYPE_DEBUG,
2750 "Client does not want to receive updates of view any more.\n");
2752 GNUNET_assert (NULL != cli_ctx);
2753 cli_ctx->view_updates_left = 0;
2754 GNUNET_SERVICE_client_continue (cli_ctx->client);
2755 if (GNUNET_YES == cli_ctx->stream_update)
2757 destroy_cli_ctx (cli_ctx);
2763 * Handle RPS request for biased stream from the client.
2765 * @param cls Client context
2766 * @param message unused
2769 handle_client_stream_request (void *cls,
2770 const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
2772 struct ClientContext *cli_ctx = cls;
2775 LOG (GNUNET_ERROR_TYPE_DEBUG,
2776 "Client requested peers from biased stream.\n");
2777 cli_ctx->stream_update = GNUNET_YES;
2779 GNUNET_assert (NULL != cli_ctx);
2780 GNUNET_SERVICE_client_continue (cli_ctx->client);
2785 * Handle a CHECK_LIVE message from another peer.
2787 * This does nothing. But without calling #GNUNET_CADET_receive_done()
2788 * the channel is blocked for all other communication.
2790 * @param cls Closure
2791 * @param msg The message header
2794 handle_peer_check (void *cls,
2795 const struct GNUNET_MessageHeader *msg)
2797 const struct ChannelCtx *channel_ctx = cls;
2798 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2801 LOG (GNUNET_ERROR_TYPE_DEBUG,
2802 "Received CHECK_LIVE (%s)\n", GNUNET_i2s (peer));
2803 GNUNET_STATISTICS_update (stats,
2804 "# pending liveliness checks",
2808 GNUNET_CADET_receive_done (channel_ctx->channel);
2812 * Handle a PUSH message from another peer.
2814 * Check the proof of work and store the PeerID
2815 * in the temporary list for pushed PeerIDs.
2817 * @param cls Closure
2818 * @param msg The message header
2821 handle_peer_push (void *cls,
2822 const struct GNUNET_MessageHeader *msg)
2824 const struct ChannelCtx *channel_ctx = cls;
2825 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2828 // (check the proof of work (?))
2830 LOG (GNUNET_ERROR_TYPE_DEBUG,
2831 "Received PUSH (%s)\n",
2833 GNUNET_STATISTICS_update(stats, "# push message received", 1, GNUNET_NO);
2835 #ifdef ENABLE_MALICIOUS
2836 struct AttackedPeer *tmp_att_peer;
2838 if ( (1 == mal_type) ||
2840 { /* Try to maximise representation */
2841 tmp_att_peer = GNUNET_new (struct AttackedPeer);
2842 tmp_att_peer->peer_id = *peer;
2843 if (NULL == att_peer_set)
2844 att_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
2846 GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
2849 GNUNET_CONTAINER_DLL_insert (att_peers_head,
2852 add_peer_array_to_set (peer, 1, att_peer_set);
2856 GNUNET_free (tmp_att_peer);
2861 else if (2 == mal_type)
2863 /* We attack one single well-known peer - simply ignore */
2865 #endif /* ENABLE_MALICIOUS */
2867 /* Add the sending peer to the push_map */
2868 CustomPeerMap_put (push_map, peer);
2870 GNUNET_break_op (check_peer_known (peer));
2871 GNUNET_CADET_receive_done (channel_ctx->channel);
2876 * Handle PULL REQUEST request message from another peer.
2878 * Reply with the view of PeerIDs.
2880 * @param cls Closure
2881 * @param msg The message header
2884 handle_peer_pull_request (void *cls,
2885 const struct GNUNET_MessageHeader *msg)
2887 const struct ChannelCtx *channel_ctx = cls;
2888 const struct GNUNET_PeerIdentity *peer = &channel_ctx->peer_ctx->peer_id;
2889 const struct GNUNET_PeerIdentity *view_array;
2892 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REQUEST (%s)\n", GNUNET_i2s (peer));
2893 GNUNET_STATISTICS_update(stats, "# pull request message received", 1, GNUNET_NO);
2895 #ifdef ENABLE_MALICIOUS
2898 { /* Try to maximise representation */
2899 send_pull_reply (peer, mal_peers, num_mal_peers);
2902 else if (2 == mal_type)
2903 { /* Try to partition network */
2904 if (0 == GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
2906 send_pull_reply (peer, mal_peers, num_mal_peers);
2909 #endif /* ENABLE_MALICIOUS */
2911 GNUNET_break_op (check_peer_known (peer));
2912 GNUNET_CADET_receive_done (channel_ctx->channel);
2913 view_array = View_get_as_array ();
2914 send_pull_reply (peer, view_array, View_size ());
2919 * Check whether we sent a corresponding request and
2920 * whether this reply is the first one.
2922 * @param cls Closure
2923 * @param msg The message header
2926 check_peer_pull_reply (void *cls,
2927 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
2929 struct ChannelCtx *channel_ctx = cls;
2930 struct PeerContext *sender_ctx = channel_ctx->peer_ctx;
2932 if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->header.size))
2934 GNUNET_break_op (0);
2935 return GNUNET_SYSERR;
2938 if ((ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2939 sizeof (struct GNUNET_PeerIdentity) != ntohl (msg->num_peers))
2941 LOG (GNUNET_ERROR_TYPE_ERROR,
2942 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
2943 ntohl (msg->num_peers),
2944 (ntohs (msg->header.size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage)) /
2945 sizeof (struct GNUNET_PeerIdentity));
2946 GNUNET_break_op (0);
2947 return GNUNET_SYSERR;
2950 if (GNUNET_YES != check_peer_flag (&sender_ctx->peer_id,
2951 Peers_PULL_REPLY_PENDING))
2953 LOG (GNUNET_ERROR_TYPE_WARNING,
2954 "Received a pull reply from a peer (%s) we didn't request one from!\n",
2955 GNUNET_i2s (&sender_ctx->peer_id));
2956 GNUNET_STATISTICS_update (stats,
2957 "# unrequested pull replies",
2960 GNUNET_break_op (0);
2961 return GNUNET_SYSERR;
2967 * Handle PULL REPLY message from another peer.
2969 * @param cls Closure
2970 * @param msg The message header
2973 handle_peer_pull_reply (void *cls,
2974 const struct GNUNET_RPS_P2P_PullReplyMessage *msg)
2976 const struct ChannelCtx *channel_ctx = cls;
2977 const struct GNUNET_PeerIdentity *sender = &channel_ctx->peer_ctx->peer_id;
2978 const struct GNUNET_PeerIdentity *peers;
2980 #ifdef ENABLE_MALICIOUS
2981 struct AttackedPeer *tmp_att_peer;
2982 #endif /* ENABLE_MALICIOUS */
2984 LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender));
2985 GNUNET_STATISTICS_update(stats, "# pull reply messages received", 1, GNUNET_NO);
2987 #ifdef ENABLE_MALICIOUS
2988 // We shouldn't even receive pull replies as we're not sending
2992 #endif /* ENABLE_MALICIOUS */
2994 /* Do actual logic */
2995 peers = (const struct GNUNET_PeerIdentity *) &msg[1];
2997 LOG (GNUNET_ERROR_TYPE_DEBUG,
2998 "PULL REPLY received, got following %u peers:\n",
2999 ntohl (msg->num_peers));
3001 for (i = 0; i < ntohl (msg->num_peers); i++)
3003 LOG (GNUNET_ERROR_TYPE_DEBUG,
3006 GNUNET_i2s (&peers[i]));
3008 #ifdef ENABLE_MALICIOUS
3009 if ((NULL != att_peer_set) &&
3010 (1 == mal_type || 3 == mal_type))
3011 { /* Add attacked peer to local list */
3012 // TODO check if we sent a request and this was the first reply
3013 if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (att_peer_set,
3015 && GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (mal_peer_set,
3018 tmp_att_peer = GNUNET_new (struct AttackedPeer);
3019 tmp_att_peer->peer_id = peers[i];
3020 GNUNET_CONTAINER_DLL_insert (att_peers_head,
3023 add_peer_array_to_set (&peers[i], 1, att_peer_set);
3027 #endif /* ENABLE_MALICIOUS */
3028 /* Make sure we 'know' about this peer */
3029 (void) insert_peer (&peers[i]);
3031 if (GNUNET_YES == check_peer_valid (&peers[i]))
3033 CustomPeerMap_put (pull_map, &peers[i]);
3037 schedule_operation (&peers[i], insert_in_pull_map);
3038 (void) issue_peer_liveliness_check (&peers[i]);
3042 UNSET_PEER_FLAG (get_peer_ctx (sender), Peers_PULL_REPLY_PENDING);
3043 clean_peer (sender);
3045 GNUNET_break_op (check_peer_known (sender));
3046 GNUNET_CADET_receive_done (channel_ctx->channel);
3051 * Compute a random delay.
3052 * A uniformly distributed value between mean + spread and mean - spread.
3054 * For example for mean 4 min and spread 2 the minimum is (4 min - (1/2 * 4 min))
3055 * It would return a random value between 2 and 6 min.
3057 * @param mean the mean
3058 * @param spread the inverse amount of deviation from the mean
3060 static struct GNUNET_TIME_Relative
3061 compute_rand_delay (struct GNUNET_TIME_Relative mean,
3062 unsigned int spread)
3064 struct GNUNET_TIME_Relative half_interval;
3065 struct GNUNET_TIME_Relative ret;
3066 unsigned int rand_delay;
3067 unsigned int max_rand_delay;
3071 LOG (GNUNET_ERROR_TYPE_WARNING,
3072 "Not accepting spread of 0\n");
3076 GNUNET_assert (0 != mean.rel_value_us);
3078 /* Compute random time value between spread * mean and spread * mean */
3079 half_interval = GNUNET_TIME_relative_divide (mean, spread);
3081 max_rand_delay = GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us / mean.rel_value_us * (2/spread);
3083 * Compute random value between (0 and 1) * round_interval
3084 * via multiplying round_interval with a 'fraction' (0 to value)/value
3086 rand_delay = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, max_rand_delay);
3087 ret = GNUNET_TIME_relative_saturating_multiply (mean, rand_delay);
3088 ret = GNUNET_TIME_relative_divide (ret, max_rand_delay);
3089 ret = GNUNET_TIME_relative_add (ret, half_interval);
3091 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == ret.rel_value_us)
3092 LOG (GNUNET_ERROR_TYPE_WARNING,
3093 "Returning FOREVER_REL\n");
3100 * Send single pull request
3102 * @param peer_id the peer to send the pull request to.
3105 send_pull_request (const struct GNUNET_PeerIdentity *peer)
3107 struct GNUNET_MQ_Envelope *ev;
3109 GNUNET_assert (GNUNET_NO == check_peer_flag (peer,
3110 Peers_PULL_REPLY_PENDING));
3111 SET_PEER_FLAG (get_peer_ctx (peer), Peers_PULL_REPLY_PENDING);
3113 LOG (GNUNET_ERROR_TYPE_DEBUG,
3114 "Going to send PULL REQUEST to peer %s.\n",
3117 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
3118 send_message (peer, ev, "PULL REQUEST");
3119 GNUNET_STATISTICS_update(stats, "# pull request send issued", 1, GNUNET_NO);
3126 * @param peer_id the peer to send the push to.
3129 send_push (const struct GNUNET_PeerIdentity *peer_id)
3131 struct GNUNET_MQ_Envelope *ev;
3133 LOG (GNUNET_ERROR_TYPE_DEBUG,
3134 "Going to send PUSH to peer %s.\n",
3135 GNUNET_i2s (peer_id));
3137 ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH);
3138 send_message (peer_id, ev, "PUSH");
3139 GNUNET_STATISTICS_update(stats, "# push send issued", 1, GNUNET_NO);
3144 do_round (void *cls);
3147 do_mal_round (void *cls);
3149 #ifdef ENABLE_MALICIOUS
3153 * @brief This function is called, when the client tells us to act malicious.
3154 * It verifies that @a msg is well-formed.
3156 * @param cls the closure (#ClientContext)
3157 * @param msg the message
3158 * @return #GNUNET_OK if @a msg is well-formed
3161 check_client_act_malicious (void *cls,
3162 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3164 struct ClientContext *cli_ctx = cls;
3165 uint16_t msize = ntohs (msg->header.size);
3166 uint32_t num_peers = ntohl (msg->num_peers);
3168 msize -= sizeof (struct GNUNET_RPS_CS_ActMaliciousMessage);
3169 if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
3170 (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
3172 LOG (GNUNET_ERROR_TYPE_ERROR,
3173 "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
3174 ntohl (msg->num_peers),
3175 (msize / sizeof (struct GNUNET_PeerIdentity)));
3177 GNUNET_SERVICE_client_drop (cli_ctx->client);
3178 return GNUNET_SYSERR;
3184 * Turn RPS service to act malicious.
3186 * @param cls Closure
3187 * @param client The client that sent the message
3188 * @param msg The message header
3191 handle_client_act_malicious (void *cls,
3192 const struct GNUNET_RPS_CS_ActMaliciousMessage *msg)
3194 struct ClientContext *cli_ctx = cls;
3195 struct GNUNET_PeerIdentity *peers;
3196 uint32_t num_mal_peers_sent;
3197 uint32_t num_mal_peers_old;
3199 /* Do actual logic */
3200 peers = (struct GNUNET_PeerIdentity *) &msg[1];
3201 mal_type = ntohl (msg->type);
3202 if (NULL == mal_peer_set)
3203 mal_peer_set = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3205 LOG (GNUNET_ERROR_TYPE_DEBUG,
3206 "Now acting malicious type %" PRIu32 ", got %" PRIu32 " peers.\n",
3208 ntohl (msg->num_peers));
3211 { /* Try to maximise representation */
3212 /* Add other malicious peers to those we already know */
3214 num_mal_peers_sent = ntohl (msg->num_peers);
3215 num_mal_peers_old = num_mal_peers;
3216 GNUNET_array_grow (mal_peers,
3218 num_mal_peers + num_mal_peers_sent);
3219 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3221 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3223 /* Add all mal peers to mal_peer_set */
3224 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3228 /* Substitute do_round () with do_mal_round () */
3229 GNUNET_SCHEDULER_cancel (do_round_task);
3230 do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3233 else if ( (2 == mal_type) ||
3235 { /* Try to partition the network */
3236 /* Add other malicious peers to those we already know */
3238 num_mal_peers_sent = ntohl (msg->num_peers) - 1;
3239 num_mal_peers_old = num_mal_peers;
3240 GNUNET_assert (GNUNET_MAX_MALLOC_CHECKED > num_mal_peers_sent);
3241 GNUNET_array_grow (mal_peers,
3243 num_mal_peers + num_mal_peers_sent);
3244 if (NULL != mal_peers &&
3247 GNUNET_memcpy (&mal_peers[num_mal_peers_old],
3249 num_mal_peers_sent * sizeof (struct GNUNET_PeerIdentity));
3251 /* Add all mal peers to mal_peer_set */
3252 add_peer_array_to_set (&mal_peers[num_mal_peers_old],
3257 /* Store the one attacked peer */
3258 GNUNET_memcpy (&attacked_peer,
3259 &msg->attacked_peer,
3260 sizeof (struct GNUNET_PeerIdentity));
3261 /* Set the flag of the attacked peer to valid to avoid problems */
3262 if (GNUNET_NO == check_peer_known (&attacked_peer))
3264 (void) issue_peer_liveliness_check (&attacked_peer);
3267 LOG (GNUNET_ERROR_TYPE_DEBUG,
3268 "Attacked peer is %s\n",
3269 GNUNET_i2s (&attacked_peer));
3271 /* Substitute do_round () with do_mal_round () */
3272 GNUNET_SCHEDULER_cancel (do_round_task);
3273 do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL);
3275 else if (0 == mal_type)
3276 { /* Stop acting malicious */
3277 GNUNET_array_grow (mal_peers, num_mal_peers, 0);
3279 /* Substitute do_mal_round () with do_round () */
3280 GNUNET_SCHEDULER_cancel (do_round_task);
3281 do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
3286 GNUNET_SERVICE_client_continue (cli_ctx->client);
3288 GNUNET_SERVICE_client_continue (cli_ctx->client);
3293 * Send out PUSHes and PULLs maliciously.
3295 * This is executed regylary.
3298 do_mal_round (void *cls)
3300 uint32_t num_pushes;
3302 struct GNUNET_TIME_Relative time_next_round;
3303 struct AttackedPeer *tmp_att_peer;
3306 LOG (GNUNET_ERROR_TYPE_DEBUG,
3307 "Going to execute next round maliciously type %" PRIu32 ".\n",
3309 do_round_task = NULL;
3310 GNUNET_assert (mal_type <= 3);
3311 /* Do malicious actions */
3313 { /* Try to maximise representation */
3315 /* The maximum of pushes we're going to send this round */
3316 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit,
3317 num_attacked_peers),
3318 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3320 LOG (GNUNET_ERROR_TYPE_DEBUG,
3321 "Going to send %" PRIu32 " pushes\n",
3324 /* Send PUSHes to attacked peers */
3325 for (i = 0 ; i < num_pushes ; i++)
3327 if (att_peers_tail == att_peer_index)
3328 att_peer_index = att_peers_head;
3330 att_peer_index = att_peer_index->next;
3332 send_push (&att_peer_index->peer_id);
3335 /* Send PULLs to some peers to learn about additional peers to attack */
3336 tmp_att_peer = att_peer_index;
3337 for (i = 0 ; i < num_pushes * alpha ; i++)
3339 if (att_peers_tail == tmp_att_peer)
3340 tmp_att_peer = att_peers_head;
3342 att_peer_index = tmp_att_peer->next;
3344 send_pull_request (&tmp_att_peer->peer_id);
3349 else if (2 == mal_type)
3351 * Try to partition the network
3352 * Send as many pushes to the attacked peer as possible
3353 * That is one push per round as it will ignore more.
3355 (void) issue_peer_liveliness_check (&attacked_peer);
3356 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3357 send_push (&attacked_peer);
3362 { /* Combined attack */
3364 /* Send PUSH to attacked peers */
3365 if (GNUNET_YES == check_peer_known (&attacked_peer))
3367 (void) issue_peer_liveliness_check (&attacked_peer);
3368 if (GNUNET_YES == check_peer_flag (&attacked_peer, Peers_ONLINE))
3370 LOG (GNUNET_ERROR_TYPE_DEBUG,
3371 "Goding to send push to attacked peer (%s)\n",
3372 GNUNET_i2s (&attacked_peer));
3373 send_push (&attacked_peer);
3376 (void) issue_peer_liveliness_check (&attacked_peer);
3378 /* The maximum of pushes we're going to send this round */
3379 num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
3380 num_attacked_peers),
3381 GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE);
3383 LOG (GNUNET_ERROR_TYPE_DEBUG,
3384 "Going to send %" PRIu32 " pushes\n",
3387 for (i = 0; i < num_pushes; i++)
3389 if (att_peers_tail == att_peer_index)
3390 att_peer_index = att_peers_head;
3392 att_peer_index = att_peer_index->next;
3394 send_push (&att_peer_index->peer_id);
3397 /* Send PULLs to some peers to learn about additional peers to attack */
3398 tmp_att_peer = att_peer_index;
3399 for (i = 0; i < num_pushes * alpha; i++)
3401 if (att_peers_tail == tmp_att_peer)
3402 tmp_att_peer = att_peers_head;
3404 att_peer_index = tmp_att_peer->next;
3406 send_pull_request (&tmp_att_peer->peer_id);
3410 /* Schedule next round */
3411 time_next_round = compute_rand_delay (round_interval, 2);
3413 //do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_mal_round,
3415 GNUNET_assert (NULL == do_round_task);
3416 do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3417 &do_mal_round, NULL);
3418 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3420 #endif /* ENABLE_MALICIOUS */
3423 * Send out PUSHes and PULLs, possibly update #view, samplers.
3425 * This is executed regylary.
3428 do_round (void *cls)
3431 const struct GNUNET_PeerIdentity *view_array;
3432 unsigned int *permut;
3433 unsigned int a_peers; /* Number of peers we send pushes to */
3434 unsigned int b_peers; /* Number of peers we send pull requests to */
3435 uint32_t first_border;
3436 uint32_t second_border;
3437 struct GNUNET_PeerIdentity peer;
3438 struct GNUNET_PeerIdentity *update_peer;
3441 LOG (GNUNET_ERROR_TYPE_DEBUG,
3442 "Going to execute next round.\n");
3443 GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
3444 do_round_task = NULL;
3445 LOG (GNUNET_ERROR_TYPE_DEBUG,
3446 "Printing view:\n");
3447 to_file (file_name_view_log,
3448 "___ new round ___");
3449 view_array = View_get_as_array ();
3450 for (i = 0; i < View_size (); i++)
3452 LOG (GNUNET_ERROR_TYPE_DEBUG,
3453 "\t%s\n", GNUNET_i2s (&view_array[i]));
3454 to_file (file_name_view_log,
3456 GNUNET_i2s_full (&view_array[i]));
3460 /* Send pushes and pull requests */
3461 if (0 < View_size ())
3463 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3467 a_peers = ceil (alpha * View_size ());
3469 LOG (GNUNET_ERROR_TYPE_DEBUG,
3470 "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
3471 a_peers, alpha, View_size ());
3472 for (i = 0; i < a_peers; i++)
3474 peer = view_array[permut[i]];
3475 // FIXME if this fails schedule/loop this for later
3479 /* Send PULL requests */
3480 b_peers = ceil (beta * View_size ());
3481 first_border = a_peers;
3482 second_border = a_peers + b_peers;
3483 if (second_border > View_size ())
3485 first_border = View_size () - b_peers;
3486 second_border = View_size ();
3488 LOG (GNUNET_ERROR_TYPE_DEBUG,
3489 "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
3490 b_peers, beta, View_size ());
3491 for (i = first_border; i < second_border; i++)
3493 peer = view_array[permut[i]];
3494 if ( GNUNET_NO == check_peer_flag (&peer, Peers_PULL_REPLY_PENDING))
3495 { // FIXME if this fails schedule/loop this for later
3496 send_pull_request (&peer);
3500 GNUNET_free (permut);
3506 /* TODO see how many peers are in push-/pull- list! */
3508 if ((CustomPeerMap_size (push_map) <= alpha * view_size_est_need) &&
3509 (0 < CustomPeerMap_size (push_map)) &&
3510 (0 < CustomPeerMap_size (pull_map)))
3511 //if (GNUNET_YES) // disable blocking temporarily
3512 { /* If conditions for update are fulfilled, update */
3513 LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
3515 uint32_t final_size;
3516 uint32_t peers_to_clean_size;
3517 struct GNUNET_PeerIdentity *peers_to_clean;
3519 peers_to_clean = NULL;
3520 peers_to_clean_size = 0;
3521 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, View_size ());
3522 GNUNET_memcpy (peers_to_clean,
3524 View_size () * sizeof (struct GNUNET_PeerIdentity));
3526 /* Seems like recreating is the easiest way of emptying the peermap */
3528 to_file (file_name_view_log,
3531 first_border = GNUNET_MIN (ceil (alpha * view_size_est_need),
3532 CustomPeerMap_size (push_map));
3533 second_border = first_border +
3534 GNUNET_MIN (floor (beta * view_size_est_need),
3535 CustomPeerMap_size (pull_map));
3536 final_size = second_border +
3537 ceil ((1 - (alpha + beta)) * view_size_est_need);
3538 LOG (GNUNET_ERROR_TYPE_DEBUG,
3539 "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: %"PRIu32 "\n",
3544 /* Update view with peers received through PUSHes */
3545 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3546 CustomPeerMap_size (push_map));
3547 for (i = 0; i < first_border; i++)
3550 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
3552 if (GNUNET_OK == inserted)
3554 clients_notify_stream_peer (1,
3555 CustomPeerMap_get_peer_by_index (push_map, permut[i]));
3557 to_file (file_name_view_log,
3559 GNUNET_i2s_full (&view_array[i]));
3560 // TODO change the peer_flags accordingly
3562 GNUNET_free (permut);
3565 /* Update view with peers received through PULLs */
3566 permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
3567 CustomPeerMap_size (pull_map));
3568 for (i = first_border; i < second_border; i++)
3571 inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
3572 permut[i - first_border]));
3573 if (GNUNET_OK == inserted)
3575 clients_notify_stream_peer (1,
3576 CustomPeerMap_get_peer_by_index (pull_map,
3577 permut[i - first_border]));
3579 to_file (file_name_view_log,
3581 GNUNET_i2s_full (&view_array[i]));
3582 // TODO change the peer_flags accordingly
3584 GNUNET_free (permut);
3587 /* Update view with peers from history */
3588 RPS_sampler_get_n_rand_peers (prot_sampler,
3589 final_size - second_border,
3592 // TODO change the peer_flags accordingly
3594 for (i = 0; i < View_size (); i++)
3595 rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
3597 /* Clean peers that were removed from the view */
3598 for (i = 0; i < peers_to_clean_size; i++)
3600 to_file (file_name_view_log,
3602 GNUNET_i2s_full (&peers_to_clean[i]));
3603 clean_peer (&peers_to_clean[i]);
3606 GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
3607 clients_notify_view_update();
3609 LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
3610 GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
3611 if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3612 !(0 >= CustomPeerMap_size (pull_map)))
3613 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, GNUNET_NO);
3614 if (CustomPeerMap_size (push_map) > alpha * View_size () &&
3615 (0 >= CustomPeerMap_size (pull_map)))
3616 GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no pull replies", 1, GNUNET_NO);
3617 if (0 >= CustomPeerMap_size (push_map) &&
3618 !(0 >= CustomPeerMap_size (pull_map)))
3619 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, GNUNET_NO);
3620 if (0 >= CustomPeerMap_size (push_map) &&
3621 (0 >= CustomPeerMap_size (pull_map)))
3622 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull replies", 1, GNUNET_NO);
3623 if (0 >= CustomPeerMap_size (pull_map) &&
3624 CustomPeerMap_size (push_map) > alpha * View_size () &&
3625 0 >= CustomPeerMap_size (push_map))
3626 GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, GNUNET_NO);
3628 // TODO independent of that also get some peers from CADET_get_peers()?
3629 GNUNET_STATISTICS_set (stats,
3630 "# peers in push map at end of round",
3631 CustomPeerMap_size (push_map),
3633 GNUNET_STATISTICS_set (stats,
3634 "# peers in pull map at end of round",
3635 CustomPeerMap_size (pull_map),
3637 GNUNET_STATISTICS_set (stats,
3638 "# peers in view at end of round",
3642 LOG (GNUNET_ERROR_TYPE_DEBUG,
3643 "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size (%u) = %.2f)\n",
3644 CustomPeerMap_size (push_map),
3645 CustomPeerMap_size (pull_map),
3648 alpha * View_size ());
3650 /* Update samplers */
3651 for (i = 0; i < CustomPeerMap_size (push_map); i++)
3653 update_peer = CustomPeerMap_get_peer_by_index (push_map, i);
3654 LOG (GNUNET_ERROR_TYPE_DEBUG,
3655 "Updating with peer %s from push list\n",
3656 GNUNET_i2s (update_peer));
3657 insert_in_sampler (NULL, update_peer);
3658 clean_peer (update_peer); /* This cleans only if it is not in the view */
3661 for (i = 0; i < CustomPeerMap_size (pull_map); i++)
3663 LOG (GNUNET_ERROR_TYPE_DEBUG,
3664 "Updating with peer %s from pull list\n",
3665 GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i)));
3666 insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i));
3667 /* This cleans only if it is not in the view */
3668 clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i));
3672 /* Empty push/pull lists */
3673 CustomPeerMap_clear (push_map);
3674 CustomPeerMap_clear (pull_map);
3676 GNUNET_STATISTICS_set (stats,
3681 struct GNUNET_TIME_Relative time_next_round;
3683 time_next_round = compute_rand_delay (round_interval, 2);
3685 /* Schedule next round */
3686 do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
3688 LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
3693 * This is called from GNUNET_CADET_get_peers().
3695 * It is called on every peer(ID) that cadet somehow has contact with.
3696 * We use those to initialise the sampler.
3699 init_peer_cb (void *cls,
3700 const struct GNUNET_PeerIdentity *peer,
3701 int tunnel, // "Do we have a tunnel towards this peer?"
3702 unsigned int n_paths, // "Number of known paths towards this peer"
3703 unsigned int best_path) // "How long is the best path?
3704 // (0 = unknown, 1 = ourselves, 2 = neighbor)"
3713 LOG (GNUNET_ERROR_TYPE_DEBUG,
3714 "Got peer_id %s from cadet\n",
3721 * @brief Iterator function over stored, valid peers.
3723 * We initialise the sampler with those.
3725 * @param cls the closure
3726 * @param peer the peer id
3727 * @return #GNUNET_YES if we should continue to
3729 * #GNUNET_NO if not.
3732 valid_peers_iterator (void *cls,
3733 const struct GNUNET_PeerIdentity *peer)
3739 LOG (GNUNET_ERROR_TYPE_DEBUG,
3740 "Got stored, valid peer %s\n",
3749 * Iterator over peers from peerinfo.
3751 * @param cls closure
3752 * @param peer id of the peer, NULL for last call
3753 * @param hello hello message for the peer (can be NULL)
3754 * @param error message
3757 process_peerinfo_peers (void *cls,
3758 const struct GNUNET_PeerIdentity *peer,
3759 const struct GNUNET_HELLO_Message *hello,
3760 const char *err_msg)
3768 LOG (GNUNET_ERROR_TYPE_DEBUG,
3769 "Got peer_id %s from peerinfo\n",
3777 * Task run during shutdown.
3782 shutdown_task (void *cls)
3784 struct ClientContext *client_ctx;
3787 in_shutdown = GNUNET_YES;
3789 LOG (GNUNET_ERROR_TYPE_DEBUG,
3790 "RPS is going down\n");
3792 /* Clean all clients */
3793 for (client_ctx = cli_ctx_head;
3794 NULL != cli_ctx_head;
3795 client_ctx = cli_ctx_head)
3797 destroy_cli_ctx (client_ctx);
3799 GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
3800 GNUNET_PEERINFO_disconnect (peerinfo_handle);
3801 peerinfo_handle = NULL;
3802 if (NULL != do_round_task)
3804 GNUNET_SCHEDULER_cancel (do_round_task);
3805 do_round_task = NULL;
3810 GNUNET_NSE_disconnect (nse);
3811 RPS_sampler_destroy (prot_sampler);
3812 GNUNET_CADET_close_port (cadet_port);
3813 GNUNET_CADET_disconnect (cadet_handle);
3814 cadet_handle = NULL;
3816 CustomPeerMap_destroy (push_map);
3817 CustomPeerMap_destroy (pull_map);
3820 GNUNET_STATISTICS_destroy (stats,
3824 #ifdef ENABLE_MALICIOUS
3825 struct AttackedPeer *tmp_att_peer;
3826 /* it is ok to free this const during shutdown: */
3827 GNUNET_free ((char *) file_name_view_log);
3829 GNUNET_free ((char *) file_name_observed_log);
3830 GNUNET_CONTAINER_multipeermap_destroy (observed_unique_peers);
3831 #endif /* TO_FILE */
3832 GNUNET_array_grow (mal_peers,
3835 if (NULL != mal_peer_set)
3836 GNUNET_CONTAINER_multipeermap_destroy (mal_peer_set);
3837 if (NULL != att_peer_set)
3838 GNUNET_CONTAINER_multipeermap_destroy (att_peer_set);
3839 while (NULL != att_peers_head)
3841 tmp_att_peer = att_peers_head;
3842 GNUNET_CONTAINER_DLL_remove (att_peers_head,
3845 GNUNET_free (tmp_att_peer);
3847 #endif /* ENABLE_MALICIOUS */
3852 * Handle client connecting to the service.
3855 * @param client the new client
3856 * @param mq the message queue of @a client
3860 client_connect_cb (void *cls,
3861 struct GNUNET_SERVICE_Client *client,
3862 struct GNUNET_MQ_Handle *mq)
3864 struct ClientContext *cli_ctx;
3867 LOG (GNUNET_ERROR_TYPE_DEBUG,
3868 "Client connected\n");
3870 return client; /* Server was destroyed before a client connected. Shutting down */
3871 cli_ctx = GNUNET_new (struct ClientContext);
3873 cli_ctx->view_updates_left = -1;
3874 cli_ctx->stream_update = GNUNET_NO;
3875 cli_ctx->client = client;
3876 GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
3883 * Callback called when a client disconnected from the service
3885 * @param cls closure for the service
3886 * @param c the client that disconnected
3887 * @param internal_cls should be equal to @a c
3890 client_disconnect_cb (void *cls,
3891 struct GNUNET_SERVICE_Client *client,
3894 struct ClientContext *cli_ctx = internal_cls;
3897 GNUNET_assert (client == cli_ctx->client);
3899 {/* shutdown task - destroy all clients */
3900 while (NULL != cli_ctx_head)
3901 destroy_cli_ctx (cli_ctx_head);
3904 { /* destroy this client */
3905 LOG (GNUNET_ERROR_TYPE_DEBUG,
3906 "Client disconnected. Destroy its context.\n");
3907 destroy_cli_ctx (cli_ctx);
3913 * Handle random peer sampling clients.
3915 * @param cls closure
3916 * @param c configuration to use
3917 * @param service the initialized service
3921 const struct GNUNET_CONFIGURATION_Handle *c,
3922 struct GNUNET_SERVICE_Handle *service)
3924 char *fn_valid_peers;
3928 GNUNET_log_setup ("rps",
3929 GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG),
3933 GNUNET_CRYPTO_get_peer_identity (cfg,
3934 &own_identity); // TODO check return value
3935 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3936 "STARTING SERVICE (rps) for peer [%s]\n",
3937 GNUNET_i2s (&own_identity));
3938 #ifdef ENABLE_MALICIOUS
3939 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
3940 "Malicious execution compiled in.\n");
3941 #endif /* ENABLE_MALICIOUS */
3943 /* Get time interval from the configuration */
3945 GNUNET_CONFIGURATION_get_value_time (cfg,
3950 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3951 "RPS", "ROUNDINTERVAL");
3952 GNUNET_SCHEDULER_shutdown ();
3956 /* Get initial size of sampler/view from the configuration */
3958 GNUNET_CONFIGURATION_get_value_number (cfg, "RPS", "MINSIZE",
3959 (long long unsigned int *) &sampler_size_est_min))
3961 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3963 GNUNET_SCHEDULER_shutdown ();
3966 sampler_size_est_need = sampler_size_est_min;
3967 view_size_est_min = sampler_size_est_min;
3968 LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sampler_size_est_min);
3971 GNUNET_CONFIGURATION_get_value_filename (cfg,
3973 "FILENAME_VALID_PEERS",
3976 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
3977 "rps", "FILENAME_VALID_PEERS");
3981 View_create (view_size_est_min);
3982 GNUNET_STATISTICS_set (stats, "view size aim", view_size_est_min, GNUNET_NO);
3984 /* file_name_view_log */
3985 file_name_view_log = store_prefix_file_name (&own_identity, "view");
3987 file_name_observed_log = store_prefix_file_name (&own_identity, "observed");
3988 observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1, GNUNET_NO);
3989 #endif /* TO_FILE */
3991 /* connect to NSE */
3992 nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
3999 /* Initialise cadet */
4000 /* There exists a copy-paste-clone in get_channel() */
4001 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4002 GNUNET_MQ_hd_fixed_size (peer_check,
4003 GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
4004 struct GNUNET_MessageHeader,
4006 GNUNET_MQ_hd_fixed_size (peer_push,
4007 GNUNET_MESSAGE_TYPE_RPS_PP_PUSH,
4008 struct GNUNET_MessageHeader,
4010 GNUNET_MQ_hd_fixed_size (peer_pull_request,
4011 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST,
4012 struct GNUNET_MessageHeader,
4014 GNUNET_MQ_hd_var_size (peer_pull_reply,
4015 GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY,
4016 struct GNUNET_RPS_P2P_PullReplyMessage,
4018 GNUNET_MQ_handler_end ()
4021 cadet_handle = GNUNET_CADET_connect (cfg);
4022 GNUNET_assert (NULL != cadet_handle);
4023 GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_RPS,
4024 strlen (GNUNET_APPLICATION_PORT_RPS),
4026 cadet_port = GNUNET_CADET_open_port (cadet_handle,
4028 &handle_inbound_channel, /* Connect handler */
4030 NULL, /* WindowSize handler */
4031 &cleanup_destroyed_channel, /* Disconnect handler */
4033 if (NULL == cadet_port)
4035 LOG (GNUNET_ERROR_TYPE_ERROR,
4036 "Cadet port `%s' is already in use.\n",
4037 GNUNET_APPLICATION_PORT_RPS);
4042 peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
4043 initialise_peers (fn_valid_peers, cadet_handle);
4044 GNUNET_free (fn_valid_peers);
4046 /* Initialise sampler */
4047 struct GNUNET_TIME_Relative half_round_interval;
4048 struct GNUNET_TIME_Relative max_round_interval;
4050 half_round_interval = GNUNET_TIME_relative_divide (round_interval, 2);
4051 max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval);
4053 prot_sampler = RPS_sampler_init (sampler_size_est_need, max_round_interval);
4055 /* Initialise push and pull maps */
4056 push_map = CustomPeerMap_create (4);
4057 pull_map = CustomPeerMap_create (4);
4060 //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
4061 //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, NULL);
4062 // TODO send push/pull to each of those peers?
4063 // TODO read stored valid peers from last run
4064 LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
4065 get_valid_peers (valid_peers_iterator, NULL);
4067 peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
4069 process_peerinfo_peers,
4072 LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
4074 do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL);
4075 LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
4077 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
4078 stats = GNUNET_STATISTICS_create ("rps", cfg);
4084 * Define "main" method using service macro.
4088 GNUNET_SERVICE_OPTION_NONE,
4091 &client_disconnect_cb,
4093 GNUNET_MQ_hd_var_size (client_seed,
4094 GNUNET_MESSAGE_TYPE_RPS_CS_SEED,
4095 struct GNUNET_RPS_CS_SeedMessage,
4097 #ifdef ENABLE_MALICIOUS
4098 GNUNET_MQ_hd_var_size (client_act_malicious,
4099 GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS,
4100 struct GNUNET_RPS_CS_ActMaliciousMessage,
4102 #endif /* ENABLE_MALICIOUS */
4103 GNUNET_MQ_hd_fixed_size (client_view_request,
4104 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
4105 struct GNUNET_RPS_CS_DEBUG_ViewRequest,
4107 GNUNET_MQ_hd_fixed_size (client_view_cancel,
4108 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL,
4109 struct GNUNET_MessageHeader,
4111 GNUNET_MQ_hd_fixed_size (client_stream_request,
4112 GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
4113 struct GNUNET_RPS_CS_DEBUG_StreamRequest,
4115 GNUNET_MQ_handler_end());
4117 /* end of gnunet-service-rps.c */