From 099bde38ac80ca84ac7c4a08cc5ac91ea7abab70 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Thu, 21 Jan 2016 16:38:41 +0000 Subject: [PATCH] rps: restructured service. moved peer-related code to gnunet-service-rps_peers MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Julius Bünger --- src/rps/Makefile.am | 1 + src/rps/gnunet-service-rps.c | 1178 +++++++--------------------- src/rps/gnunet-service-rps_peers.c | 1158 +++++++++++++++++++++++++++ src/rps/gnunet-service-rps_peers.h | 371 +++++++++ 4 files changed, 1800 insertions(+), 908 deletions(-) create mode 100644 src/rps/gnunet-service-rps_peers.c create mode 100644 src/rps/gnunet-service-rps_peers.h diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index 48d346aef..f08ded57a 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am @@ -49,6 +49,7 @@ endif gnunet_service_rps_SOURCES = \ gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \ gnunet-service-rps_sampler.h gnunet-service-rps_sampler.c \ + gnunet-service-rps_peers.h gnunet-service-rps_peers.c \ gnunet-service-rps_custommap.h gnunet-service-rps_custommap.c \ gnunet-service-rps_view.h gnunet-service-rps_view.c \ rps-test_util.h rps-test_util.c \ diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index fbf42a808..4d227c8d7 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -29,11 +29,11 @@ #include "gnunet_peerinfo_service.h" #include "gnunet_nse_service.h" #include "rps.h" -#include "gnunet-service-rps_custommap.h" -#include "gnunet-service-rps_view.h" #include "rps-test_util.h" - #include "gnunet-service-rps_sampler.h" +#include "gnunet-service-rps_custommap.h" +#include "gnunet-service-rps_peers.h" +#include "gnunet-service-rps_view.h" #include #include @@ -52,8 +52,6 @@ // TODO blacklist? (-> mal peer detection on top of brahms) -// TODO API request_cancel - // hist_size_init, hist_size_max /** @@ -129,146 +127,6 @@ struct ClientContext struct ClientContext *cli_ctx_head; struct ClientContext *cli_ctx_tail; -/** - * Used to keep track in what lists single peerIDs are. - */ -enum PeerFlags -{ - /** - * If we are waiting for a reply from that peer (sent a pull request). - */ - PULL_REPLY_PENDING = 0x01, - - IN_OTHER_GOSSIP_LIST = 0x02, // unneeded? - IN_OWN_SAMPLER_LIST = 0x04, // unneeded? - IN_OWN_GOSSIP_LIST = 0x08, // unneeded? - - /** - * We set this bit when we can be sure the other peer is/was live. - */ - VALID = 0x10, - - /** - * We set this bit when we are going to destroy the channel to this peer. - * When cleanup_channel is called, we know that we wanted to destroy it. - * Otherwise the channel to the other peer was destroyed. - */ - TO_DESTROY = 0x20, -}; - - -/** - * Functions of this type can be used to be stored at a peer for later execution. - */ -typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer); - -/** - * Outstanding operation on peer consisting of callback and closure - */ -struct PeerOutstandingOp -{ - /** - * Callback - */ - PeerOp op; - - /** - * Closure - */ - void *op_cls; -}; - - -/** - * List containing all messages that are yet to be send - */ -struct PendingMessage -{ - /** - * DLL next, prev - */ - struct PendingMessage *next; - struct PendingMessage *prev; - - /** - * The envelope to the corresponding message - */ - struct GNUNET_MQ_Envelope *ev; - - /** - * The corresponding context - */ - struct PeerContext *peer_ctx; - - /** - * The message type - */ - const char *type; -}; - - -/** - * Struct used to keep track of other peer's status - * - * This is stored in a multipeermap. - */ -struct PeerContext -{ - /** - * Message queue open to client - */ - struct GNUNET_MQ_Handle *mq; - - /** - * Channel open to client. - */ - struct GNUNET_CADET_Channel *send_channel; - - /** - * Channel open from client. - */ - struct GNUNET_CADET_Channel *recv_channel; // unneeded? - - /** - * Array of outstanding operations on this peer. - */ - struct PeerOutstandingOp *outstanding_ops; - - /** - * Handle to the callback given to cadet_ntfy_tmt_rdy() - * - * To be canceled on shutdown. - */ - struct GNUNET_CADET_TransmitHandle *transmit_handle; - - /** - * Number of outstanding operations. - */ - unsigned int num_outstanding_ops; - - /** - * Identity of the peer - */ - struct GNUNET_PeerIdentity peer_id; - - /** - * Flags indicating status of peer - */ - uint32_t peer_flags; - - /** - * DLL with all messages that are yet to be sent - */ - struct PendingMessage *pending_messages_head; - struct PendingMessage *pending_messages_tail; - - /** - * This is pobably followed by 'statistical' data (when we first saw - * him, how did we get his ID, how many pushes (in a timeinterval), - * ...) - */ -}; - /*********************************************************************** * /Housekeeping with peers ***********************************************************************/ @@ -291,11 +149,6 @@ static struct RPS_Sampler *prot_sampler; */ static struct RPS_Sampler *client_sampler; -/** - * Set of all peers to keep track of them. - */ -static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; - /** * Name to log view to @@ -516,27 +369,6 @@ static uint32_t push_limit = 10000; * Util functions ***********************************************************************/ -/** - * Set a peer flag of given peer context. - */ -#define set_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags |= mask) - -/** - * Get peer flag of given peer context. - */ -#define get_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags & mask ? GNUNET_YES : GNUNET_NO) - -/** - * Unset flag of given peer context. - */ -#define unset_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags &= (~mask)) - -/** - * Clean the send channel of a peer - */ -void -peer_clean (const struct GNUNET_PeerIdentity *peer); - /** * Print peerlist to log. @@ -594,201 +426,6 @@ rem_from_list (struct GNUNET_PeerIdentity **peer_list, } -/** - * Get the context of a peer. If not existing, create. - */ - struct PeerContext * -get_peer_ctx (const struct GNUNET_PeerIdentity *peer) -{ - struct PeerContext *ctx; - int ret; - - ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer); - GNUNET_assert (GNUNET_YES == ret); - ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer); - GNUNET_assert (NULL != ctx); - return ctx; -} - -/** - * Create a new peer context and insert it into the peer map - */ -struct PeerContext * -create_peer_ctx (const struct GNUNET_PeerIdentity *peer) -{ - struct PeerContext *ctx; - int ret; - - ctx = GNUNET_new (struct PeerContext); - ctx->peer_id = *peer; - ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - GNUNET_assert (GNUNET_OK == ret); - return ctx; -} - - -/** - * Set the peer flag to living and call the outstanding operations on this peer. - */ -static size_t -peer_is_live (struct PeerContext *peer_ctx) -{ - struct GNUNET_PeerIdentity *peer; - - /* Cancle transmit_handle if still scheduled */ - if (NULL != peer_ctx->transmit_handle) - { - GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle); - peer_ctx->transmit_handle = NULL; - } - - peer = &peer_ctx->peer_id; - set_peer_flag (peer_ctx, VALID); - - LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer)); - - if (0 < peer_ctx->num_outstanding_ops) - { /* Call outstanding operations */ - unsigned int i; - - for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++) - peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer); - GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0); - } - - return 0; -} - - -/** - * Callback that is called when a channel was effectively established. - * This is given to ntfy_tmt_rdy and called when the channel was - * successfully established. - */ -static size_t -cadet_ntfy_tmt_rdy_cb (void *cls, size_t size, void *buf) -{ - struct PeerContext *peer_ctx = (struct PeerContext *) cls; - - peer_ctx->transmit_handle = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Set ->transmit_handle = NULL for peer %s\n", - GNUNET_i2s (&peer_ctx->peer_id)); - - if (NULL != buf - && 0 != size) - { - peer_is_live (peer_ctx); - } - else - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Problems establishing a connection to peer %s in order to check liveliness\n", - GNUNET_i2s (&peer_ctx->peer_id)); - // TODO reschedule? cleanup? - } - - //if (NULL != peer_ctx->transmit_handle) - //{ - // LOG (GNUNET_ERROR_TYPE_DEBUG, - // "Trying to cancle transmit_handle for peer %s\n", - // GNUNET_i2s (&peer_ctx->peer_id)); - // GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle); - // peer_ctx->transmit_handle = NULL; - //} - - return 0; -} - - -/** - * Get the channel of a peer. If not existing, create. - */ - struct GNUNET_CADET_Channel * -get_channel (const struct GNUNET_PeerIdentity *peer) -{ - struct PeerContext *peer_ctx; - - peer_ctx = get_peer_ctx (peer); - if (NULL == peer_ctx->send_channel) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to establish channel to peer %s\n", - GNUNET_i2s (peer)); - - peer_ctx->send_channel = - GNUNET_CADET_channel_create (cadet_handle, - NULL, - peer, - GNUNET_RPS_CADET_PORT, - GNUNET_CADET_OPTION_RELIABLE); - - } - return peer_ctx->send_channel; -} - - -/** - * Get the message queue of a specific peer. - * - * If we already have a message queue open to this client, - * simply return it, otherways create one. - */ - struct GNUNET_MQ_Handle * -get_mq (const struct GNUNET_PeerIdentity *peer_id) -{ - struct PeerContext *peer_ctx; - - peer_ctx = get_peer_ctx (peer_id); - - GNUNET_assert (NULL == peer_ctx->transmit_handle); - - if (NULL == peer_ctx->mq) - { - (void) get_channel (peer_id); - peer_ctx->mq = GNUNET_CADET_mq_create (peer_ctx->send_channel); - //do I have to explicitly put it in the peer_map? - (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer_id, peer_ctx, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); - } - return peer_ctx->mq; -} - - -/** - * Issue check whether peer is live - * - * @param peer_ctx the context of the peer - */ -void -check_peer_live (struct PeerContext *peer_ctx) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Get informed about peer %s getting live\n", - GNUNET_i2s (&peer_ctx->peer_id)); - - if (NULL == peer_ctx->transmit_handle && - NULL == peer_ctx->send_channel) - { - (void) get_channel (&peer_ctx->peer_id); - peer_ctx->transmit_handle = - GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel, - GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - sizeof (struct GNUNET_MessageHeader), - cadet_ntfy_tmt_rdy_cb, - peer_ctx); - } - else if (NULL != peer_ctx->transmit_handle) - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Already waiting for notification\n"); - else if (NULL != peer_ctx->send_channel) - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Already have established channel to peer\n"); -} - - /** * Sum all time relatives of an array. */ @@ -817,92 +454,6 @@ T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) } -/** - * Insert PeerID in #pull_map - * - * Called once we know a peer is live. - */ - void -insert_in_pull_map (void *cls, const struct GNUNET_PeerIdentity *peer) -{ - CustomPeerMap_put (pull_map, peer); -} - -/** - * Check whether #insert_in_pull_map was already scheduled - */ - int -insert_in_pull_map_scheduled (const struct PeerContext *peer_ctx) -{ - unsigned int i; - - for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) - if (insert_in_pull_map == peer_ctx->outstanding_ops[i].op) - return GNUNET_YES; - return GNUNET_NO; -} - - -/** - * Insert PeerID in #view - * - * Called once we know a peer is live. - */ - void -insert_in_view (void *cls, const struct GNUNET_PeerIdentity *peer) -{ - View_put (peer); -} - -/** - * Check whether #insert_in_view was already scheduled - */ - int -insert_in_view_scheduled (const struct PeerContext *peer_ctx) -{ - unsigned int i; - - for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) - if (insert_in_view == peer_ctx->outstanding_ops[i].op) - return GNUNET_YES; - return GNUNET_NO; -} - - -/** - * Update sampler with given PeerID. - */ - void -insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Updating samplers with peer %s from insert_in_sampler()\n", - GNUNET_i2s (peer)); - RPS_sampler_update (prot_sampler, peer); - RPS_sampler_update (client_sampler, peer); - if (0 < RPS_sampler_count_id (prot_sampler, peer)) - { - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) - (void) create_peer_ctx (peer); - (void) get_channel (peer); - } -} - - -/** - * Check whether #insert_in_sampler was already scheduled - */ -static int -insert_in_sampler_scheduled (const struct PeerContext *peer_ctx) -{ - unsigned int i; - - for (i = 0 ; i < peer_ctx->num_outstanding_ops ; i++) - if (insert_in_sampler== peer_ctx->outstanding_ops[i].op) - return GNUNET_YES; - return GNUNET_NO; -} - /** * Put random peer from sampler into the view as history update. */ @@ -1033,68 +584,13 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array, return; } - for (i = 0; i < num_peers; i++) - { - GNUNET_CONTAINER_multipeermap_put (peer_map, - &peer_array[i], - NULL, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - } -} - - -/** - * @brief Add an envelope to a message passed to mq to list of pending messages - * - * @param peer peer the message was sent to - * @param ev envelope to the message - * @param type type of the message to be sent - */ -static struct PendingMessage * -insert_pending_message (const struct GNUNET_PeerIdentity *peer, - struct GNUNET_MQ_Envelope *ev, - const char *type) -{ - struct PendingMessage *pending_msg; - struct PeerContext *peer_ctx; - - peer_ctx = get_peer_ctx (peer); - pending_msg = GNUNET_new (struct PendingMessage); - pending_msg->ev = ev; - pending_msg->peer_ctx = peer_ctx; - pending_msg->type = type; - GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head, - peer_ctx->pending_messages_tail, - pending_msg); - return pending_msg; -} - -static void -remove_pending_message (struct PendingMessage *pending_msg) -{ - struct PeerContext *peer_ctx; - - peer_ctx = pending_msg->peer_ctx; - GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head, - peer_ctx->pending_messages_tail, - pending_msg); - GNUNET_free (pending_msg); -} - - -/** - * @brief This is called once a message is sent. - * - * @param cls type of the message that was sent - */ -static void -mq_notify_sent_cb (void *cls) -{ - struct PendingMessage *pending_msg = (struct PendingMessage *) cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "%s was sent.\n", - pending_msg->type); - remove_pending_message (pending_msg); + for (i = 0; i < num_peers; i++) + { + GNUNET_CONTAINER_multipeermap_put (peer_map, + &peer_array[i], + NULL, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } } @@ -1111,10 +607,8 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, unsigned int num_peer_ids) { uint32_t send_size; - struct GNUNET_MQ_Handle *mq; struct GNUNET_MQ_Envelope *ev; struct GNUNET_RPS_P2P_PullReplyMessage *out_msg; - struct PendingMessage *pending_msg; /* Compute actual size */ send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) + @@ -1136,8 +630,6 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, "Going to send PULL REPLY with %u peers to %s\n", send_size, GNUNET_i2s (peer_id)); - mq = get_mq (peer_id); - ev = GNUNET_MQ_msg_extra (out_msg, send_size * sizeof (struct GNUNET_PeerIdentity), GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY); @@ -1145,69 +637,224 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, memcpy (&out_msg[1], peer_ids, send_size * sizeof (struct GNUNET_PeerIdentity)); - pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY"); - GNUNET_MQ_notify_sent (ev, - mq_notify_sent_cb, - pending_msg); - GNUNET_MQ_send (mq, ev); + Peers_send_message (peer_id, ev, "PULL REPLY"); } /** - * This function is called on new peer_ids from 'external' sources - * (client seed, cadet get_peers(), ...) + * Insert PeerID in #pull_map * - * @param peer_id the new peer_id + * Called once we know a peer is live. */ -static void -new_peer_id (const struct GNUNET_PeerIdentity *peer_id) + void +insert_in_pull_map (void *cls, const struct GNUNET_PeerIdentity *peer) { - struct PeerOutstandingOp out_op; - struct PeerContext *peer_ctx; + CustomPeerMap_put (pull_map, peer); +} - if ((NULL == peer_id) || - (0 == GNUNET_CRYPTO_cmp_peer_identity (&own_identity, peer_id))) - return; +/** + * Insert PeerID in #view + * + * Called once we know a peer is live. + */ + void +insert_in_view (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + View_put (peer); +} + +/** + * Update sampler with given PeerID. + */ + void +insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) +{ LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got peer_id %s (at %p, view size: %u)\n", - GNUNET_i2s (peer_id), - peer_id, - View_size ()); - - /* if the seed peer is already know, skip context creation */ - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer_id)) - peer_ctx = create_peer_ctx (peer_id); - else - peer_ctx = get_peer_ctx (peer_id); + "Updating samplers with peer %s from insert_in_sampler()\n", + GNUNET_i2s (peer)); + RPS_sampler_update (prot_sampler, peer); + RPS_sampler_update (client_sampler, peer); + if (0 < RPS_sampler_count_id (prot_sampler, peer)) + { + /* Make sure we 'know' about this peer */ + (void) Peers_insert_peer (peer); + /* Establish a channel towards that peer to indicate we are going to send + * messages to it */ + Peers_indicate_sending_intention (peer); + //Peers_issue_peer_liveliness_check (peer); + } +} + - if (GNUNET_NO == get_peer_flag (peer_ctx, VALID)) +/** + * @brief Checks if there is a sending channel and if it is needed + * + * @param peer the peer whose sending channel is checked + * @return GNUNET_YES if sending channel exists and is still needed + * GNUNET_NO otherwise + */ +static int +check_sending_channel_needed (const struct GNUNET_PeerIdentity *peer) +{ + /* struct GNUNET_CADET_Channel *channel; */ + if (GNUNET_NO == Peers_check_peer_known (peer)) { - if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx)) - { - out_op.op = insert_in_sampler; - out_op.op_cls = NULL; - GNUNET_array_append (peer_ctx->outstanding_ops, - peer_ctx->num_outstanding_ops, - out_op); + return GNUNET_NO; + } + if (GNUNET_YES == Peers_check_sending_channel_exists (peer)) + { + if ( (0 < RPS_sampler_count_id (prot_sampler, peer)) || + (GNUNET_YES == View_contains_peer (peer)) || + (GNUNET_YES == CustomPeerMap_contains_peer (push_map, peer)) || + (GNUNET_YES == CustomPeerMap_contains_peer (pull_map, peer)) || + (GNUNET_YES == Peers_check_peer_flag (peer, Peers_PULL_REPLY_PENDING))) + { /* If we want to keep the connection to peer open */ + return GNUNET_YES; } + return GNUNET_NO; + } + return GNUNET_NO; +} + +/** + * @brief remove peer from our knowledge, the view, push and pull maps and + * samplers. + * + * @param peer the peer to remove + */ +static void +remove_peer (const struct GNUNET_PeerIdentity *peer) +{ + View_remove_peer (peer); + CustomPeerMap_remove_peer (pull_map, peer); + CustomPeerMap_remove_peer (push_map, peer); + RPS_sampler_reinitialise_by_value (prot_sampler, peer); + RPS_sampler_reinitialise_by_value (client_sampler, peer); + Peers_remove_peer (peer); +} - if (GNUNET_NO == insert_in_view_scheduled (peer_ctx)) - { - out_op.op = insert_in_view; - out_op.op_cls = NULL; - GNUNET_array_append (peer_ctx->outstanding_ops, - peer_ctx->num_outstanding_ops, - out_op); - } - /* Trigger livelyness test on peer */ - check_peer_live (peer_ctx); +/** + * @brief Remove data that is not needed anymore. + * + * If the sending channel is no longer needed it is destroyed. + * + * @param peer the peer whose data is about to be cleaned + */ +static void +clean_peer (const struct GNUNET_PeerIdentity *peer) +{ + if (GNUNET_NO == check_sending_channel_needed (peer)) + { + #ifdef ENABLE_MALICIOUS + if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) + Peers_destroy_sending_channel (peer); + #else /* ENABLE_MALICIOUS */ + Peers_destroy_sending_channel (peer); + #endif /* ENABLE_MALICIOUS */ } - // else...? - // send push/pull to each of those peers? + if ( (GNUNET_NO == Peers_check_peer_send_intention (peer)) && + (GNUNET_NO == View_contains_peer (peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && + (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && + (0 == RPS_sampler_count_id (prot_sampler, peer)) && + (0 == RPS_sampler_count_id (client_sampler, peer)) ) + { /* We can safely remov this peer */ + remove_peer (peer); + return; + } + Peers_clean_peer (peer); } +/** + * @brief This is called when a channel is destroyed. + * + * Removes peer completely from our knowledge if the send_channel was destroyed + * Otherwise simply delete the recv_channel + * + * @param cls The closure + * @param channel The channel being closed + * @param channel_ctx The context associated with this channel + */ +static void +cleanup_destroyed_channel (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx) +{ + struct GNUNET_PeerIdentity *peer; + + peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info ( + (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER); + // FIXME wait for cadet to change this function + + if (GNUNET_NO == Peers_check_peer_known (peer)) + { /* We don't know a context to that peer */ + LOG (GNUNET_ERROR_TYPE_WARNING, + "channel (%s) without associated context was destroyed\n", + GNUNET_i2s (peer)); + return; + } + + if (GNUNET_YES == Peers_check_peer_flag (peer, Peers_TO_DESTROY)) + { /* We are in the middle of removing that peer from our knowledge. In this + case simply make sure that the channels are cleaned. */ + Peers_cleanup_destroyed_channel (cls, channel, channel_ctx); + to_file (file_name_view_log, + "-%s\t(cleanup channel, ourself)", + GNUNET_i2s_full (peer)); + return; + } + + if (GNUNET_YES == + Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_SENDING)) + { /* Channel used for sending was destroyed */ + /* Possible causes of channel destruction: + * - ourselves -> cleaning send channel -> clean context + * - other peer -> peer probably went down -> remove + */ + if (GNUNET_YES == Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_CLEAN)) + { /* We are about to clean the sending channel. Clean the respective + * context */ + Peers_cleanup_destroyed_channel (cls, channel, channel_ctx); + return; + } + else + { /* Other peer destroyed our sending channel that he is supposed to keep + * open. It probably went down. Remove it from our knowledge. */ + Peers_cleanup_destroyed_channel (cls, channel, channel_ctx); + remove_peer (peer); + return; + } + } + else if (GNUNET_YES == + Peers_check_channel_role (peer, channel, Peers_CHANNEL_ROLE_RECEIVING)) + { /* Channel used for receiving was destroyed */ + /* Possible causes of channel destruction: + * - ourselves -> peer tried to establish channel twice -> clean context + * - other peer -> peer doesn't want to send us data -> clean + */ + if (GNUNET_YES == + Peers_check_channel_flag (channel_ctx, Peers_CHANNEL_ESTABLISHED_TWICE)) + { /* Other peer tried to establish a channel to us twice. We do not accept + * that. Clean the context. */ + Peers_cleanup_destroyed_channel (cls, channel, channel_ctx); + return; + } + else + { /* Other peer doesn't want to send us data anymore. We are free to clean + * it. */ + Peers_cleanup_destroyed_channel (cls, channel, channel_ctx); + clean_peer (peer); + return; + } + } + else + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Destroyed channel is neither sending nor receiving channel\n"); + } +} /*********************************************************************** * /Util functions @@ -1285,8 +932,10 @@ nse_callback (void *cls, struct GNUNET_TIME_Absolute timestamp, * * Sends those to the requesting client. */ -void client_respond (void *cls, - struct GNUNET_PeerIdentity *peer_ids, uint32_t num_peers) +void +client_respond (void *cls, + struct GNUNET_PeerIdentity *peer_ids, + uint32_t num_peers) { uint32_t i; struct GNUNET_MQ_Envelope *ev; @@ -1466,14 +1115,18 @@ handle_client_seed (void *cls, "Client seeded peers:\n"); print_peer_list (peers, num_peers); - for (i = 0 ; i < num_peers ; i++) + for (i = 0; i < num_peers; i++) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Updating samplers with seed %" PRIu32 ": %s\n", i, GNUNET_i2s (&peers[i])); - new_peer_id (&peers[i]); + if (GNUNET_YES == Peers_insert_peer (&peers[i])) + { + Peers_schedule_operation (&peers[i], insert_in_sampler); + Peers_schedule_operation (&peers[i], insert_in_view); + } //RPS_sampler_update (prot_sampler, &peers[i]); //RPS_sampler_update (client_sampler, &peers[i]); @@ -1505,7 +1158,7 @@ handle_peer_push (void *cls, { const struct GNUNET_PeerIdentity *peer; - // (check the proof of work) + // (check the proof of work (?)) peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER); @@ -1626,10 +1279,7 @@ handle_peer_pull_reply (void *cls, { struct GNUNET_RPS_P2P_PullReplyMessage *in_msg; struct GNUNET_PeerIdentity *peers; - struct PeerContext *peer_ctx; struct GNUNET_PeerIdentity *sender; - struct PeerContext *sender_ctx; - struct PeerOutstandingOp out_op; uint32_t i; #ifdef ENABLE_MALICIOUS struct AttackedPeer *tmp_att_peer; @@ -1661,11 +1311,10 @@ handle_peer_pull_reply (void *cls, // FIXME wait for cadet to change this function sender = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (channel, GNUNET_CADET_OPTION_PEER); - sender_ctx = get_peer_ctx (sender); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received PULL REPLY (%s)\n", GNUNET_i2s (sender)); - if (GNUNET_YES != get_peer_flag (sender_ctx, PULL_REPLY_PENDING)) + if (GNUNET_YES != Peers_check_peer_flag (sender, Peers_PULL_REPLY_PENDING)) { LOG (GNUNET_ERROR_TYPE_WARNING, "Received a pull reply from a peer we didn't request one from!\n"); @@ -1720,31 +1369,23 @@ handle_peer_pull_reply (void *cls, if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peers[i])) { - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, - &peers[i])) - peer_ctx = create_peer_ctx (&peers[i]); - else - peer_ctx = get_peer_ctx (&peers[i]); + /* Make sure we 'know' about this peer */ + (void) Peers_insert_peer (&peers[i]); - if (GNUNET_YES == get_peer_flag (peer_ctx, VALID)) + if (GNUNET_YES == Peers_check_peer_flag (&peers[i], Peers_VALID)) { CustomPeerMap_put (pull_map, &peers[i]); } - else if (GNUNET_NO == insert_in_pull_map_scheduled (peer_ctx)) + else { - out_op.op = insert_in_pull_map; - out_op.op_cls = NULL; - GNUNET_array_append (peer_ctx->outstanding_ops, - peer_ctx->num_outstanding_ops, - out_op); - check_peer_live (peer_ctx); + Peers_schedule_operation (&peers[i], insert_in_pull_map); + Peers_issue_peer_liveliness_check (&peers[i]); } } } - unset_peer_flag (sender_ctx, PULL_REPLY_PENDING); - - peer_clean (sender); + Peers_unset_peer_flag (sender, Peers_PULL_REPLY_PENDING); + clean_peer (sender); GNUNET_CADET_receive_done (channel); return GNUNET_OK; @@ -1803,28 +1444,20 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread) * @param peer_id the peer to send the pull request to. */ static void -send_pull_request (const struct GNUNET_PeerIdentity *peer_id) +send_pull_request (const struct GNUNET_PeerIdentity *peer) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MQ_Handle *mq; - struct PeerContext *peer_ctx; - struct PendingMessage *pending_msg; - peer_ctx = get_peer_ctx (peer_id); - GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)); - set_peer_flag (peer_ctx, PULL_REPLY_PENDING); + GNUNET_assert (GNUNET_NO == Peers_check_peer_flag (peer, + Peers_PULL_REPLY_PENDING)); + Peers_set_peer_flag (peer, Peers_PULL_REPLY_PENDING); LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send PULL REQUEST to peer %s.\n", - GNUNET_i2s (peer_id)); + GNUNET_i2s (peer)); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); - mq = get_mq (peer_id); - pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST"); - GNUNET_MQ_notify_sent (ev, - mq_notify_sent_cb, - pending_msg); - GNUNET_MQ_send (mq, ev); + Peers_send_message (peer, ev, "PULL REQUEST"); } @@ -1837,20 +1470,13 @@ static void send_push (const struct GNUNET_PeerIdentity *peer_id) { struct GNUNET_MQ_Envelope *ev; - struct GNUNET_MQ_Handle *mq; - struct PendingMessage *pending_msg; LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send PUSH to peer %s.\n", GNUNET_i2s (peer_id)); ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); - mq = get_mq (peer_id); - pending_msg = insert_pending_message (peer_id, ev, "PUSH"); - GNUNET_MQ_notify_sent (ev, - mq_notify_sent_cb, - pending_msg); - GNUNET_MQ_send (mq, ev); + Peers_send_message (peer_id, ev, "PUSH"); } @@ -1932,11 +1558,10 @@ handle_client_act_malicious (void *cls, do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, NULL); } - else if (2 == mal_type - || 3 == mal_type) + else if ( (2 == mal_type) || + (3 == mal_type) ) { /* Try to partition the network */ /* Add other malicious peers to those we already know */ - struct PeerContext *att_ctx; num_mal_peers_sent = ntohl (in_msg->num_peers) - 1; num_mal_peers_old = num_mal_peers; @@ -1961,11 +1586,10 @@ handle_client_act_malicious (void *cls, &in_msg->attacked_peer, sizeof (struct GNUNET_PeerIdentity)); /* Set the flag of the attacked peer to valid to avoid problems */ - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, - &attacked_peer)) + if (GNUNET_NO == Peers_check_peer_known (&attacked_peer)) { - att_ctx = create_peer_ctx (&attacked_peer); - check_peer_live (att_ctx); + Peers_insert_peer (&attacked_peer); + Peers_issue_peer_liveliness_check (&attacked_peer); } LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -2005,7 +1629,6 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) uint32_t i; struct GNUNET_TIME_Relative time_next_round; struct AttackedPeer *tmp_att_peer; - struct PeerContext *peer_ctx; LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to execute next round maliciously type %" PRIu32 ".\n", mal_type); @@ -2055,8 +1678,8 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * Send as many pushes to the attacked peer as possible * That is one push per round as it will ignore more. */ - peer_ctx = get_peer_ctx (&attacked_peer); - if (GNUNET_YES == get_peer_flag (peer_ctx, VALID)) + Peers_insert_peer (&attacked_peer); + if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_VALID)) send_push (&attacked_peer); } @@ -2065,11 +1688,10 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { /* Combined attack */ /* Send PUSH to attacked peers */ - if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, - &attacked_peer)) + if (GNUNET_YES == Peers_check_peer_known (&attacked_peer)) { - peer_ctx = get_peer_ctx (&attacked_peer); - if (GNUNET_YES == get_peer_flag (peer_ctx, VALID)) + Peers_insert_peer (&attacked_peer); + if (GNUNET_YES == Peers_check_peer_flag (&attacked_peer, Peers_VALID)) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Goding to send push to attacked peer (%s)\n", @@ -2077,11 +1699,11 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) send_push (&attacked_peer); } else - check_peer_live (peer_ctx); + Peers_issue_peer_liveliness_check (&attacked_peer); } else - peer_ctx = create_peer_ctx (&attacked_peer); - check_peer_live (peer_ctx); + Peers_insert_peer (&attacked_peer); + Peers_issue_peer_liveliness_check (&attacked_peer); /* The maximum of pushes we're going to send this round */ num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1, @@ -2092,7 +1714,7 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "Going to send %" PRIu32 " pushes\n", num_pushes); - for (i = 0 ; i < num_pushes ; i++) + for (i = 0; i < num_pushes; i++) { if (att_peers_tail == att_peer_index) att_peer_index = att_peers_head; @@ -2104,7 +1726,7 @@ do_mal_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* Send PULLs to some peers to learn about additional peers to attack */ tmp_att_peer = att_peer_index; - for (i = 0 ; i < num_pushes * alpha ; i++) + for (i = 0; i < num_pushes * alpha; i++) { if (att_peers_tail == tmp_att_peer) tmp_att_peer = att_peers_head; @@ -2145,7 +1767,6 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) uint32_t first_border; uint32_t second_border; struct GNUNET_PeerIdentity peer; - struct PeerContext *peer_ctx; struct GNUNET_PeerIdentity *update_peer; do_round_task = NULL; @@ -2200,9 +1821,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for (i = first_border; i < second_border; i++) { peer = view_array[permut[i]]; - peer_ctx = get_peer_ctx (&peer); if (0 != GNUNET_CRYPTO_cmp_peer_identity (&own_identity, &peer) && - GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) // TODO + GNUNET_NO == Peers_check_peer_flag (&peer, Peers_PULL_REPLY_PENDING)) // TODO { // FIXME if this fails schedule/loop this for later send_pull_request (&peer); } @@ -2292,7 +1912,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) to_file (file_name_view_log, "-%s", GNUNET_i2s_full (&peers_to_clean[i])); - peer_clean (&peers_to_clean[i]); + Peers_clean_peer (&peers_to_clean[i]); + //peer_destroy_channel_send (sender); } GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0); @@ -2320,7 +1941,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "Updating with peer %s from push list\n", GNUNET_i2s (update_peer)); insert_in_sampler (NULL, update_peer); - peer_clean (update_peer); /* This cleans only if it is not in the view */ + Peers_clean_peer (update_peer); /* This cleans only if it is not in the view */ + //peer_destroy_channel_send (sender); } for (i = 0; i < CustomPeerMap_size (pull_map); i++) @@ -2330,7 +1952,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_i2s (CustomPeerMap_get_peer_by_index (pull_map, i))); insert_in_sampler (NULL, CustomPeerMap_get_peer_by_index (pull_map, i)); /* This cleans only if it is not in the view */ - peer_clean (CustomPeerMap_get_peer_by_index (pull_map, i)); + Peers_clean_peer (CustomPeerMap_get_peer_by_index (pull_map, i)); + //peer_destroy_channel_send (sender); } @@ -2371,7 +1994,9 @@ init_peer_cb (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer_id %s from cadet\n", GNUNET_i2s (peer)); - new_peer_id (peer); + Peers_insert_peer (peer); + Peers_schedule_operation (peer, insert_in_sampler); + Peers_schedule_operation (peer, insert_in_view); } } @@ -2395,145 +2020,9 @@ process_peerinfo_peers (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Got peer_id %s from peerinfo\n", GNUNET_i2s (peer)); - new_peer_id (peer); - } -} - - -/** - * Callback used to remove peers from the multipeermap. - */ - int -peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value) -{ - struct PeerContext *peer_ctx; - const struct GNUNET_CADET_Channel *channel = - (const struct GNUNET_CADET_Channel *) cls; - - peer_ctx = (struct PeerContext *) value; - set_peer_flag (peer_ctx, TO_DESTROY); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Going to remove peer %s\n", - GNUNET_i2s (&peer_ctx->peer_id)); - - /* Remove it from the sampler used for the Brahms protocol */ - RPS_sampler_reinitialise_by_value (prot_sampler, key); - - /* If operations are still scheduled for this peer cancel those */ - if (0 != peer_ctx->num_outstanding_ops) - { - GNUNET_array_grow (peer_ctx->outstanding_ops, - peer_ctx->num_outstanding_ops, - 0); - } - - /* If we are still waiting for notification whether this peer is live - * cancel the according task */ - if (NULL != peer_ctx->transmit_handle) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Trying to cancle transmit_handle for peer %s\n", - GNUNET_i2s (key)); - GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle); - peer_ctx->transmit_handle = NULL; - } - - unset_peer_flag (peer_ctx, PULL_REPLY_PENDING); - - /* Remove peer from view */ - if (View_contains_peer (key)) - { - to_file (file_name_view_log, - "-%s\t(cleanup channel, other peer)", - GNUNET_i2s_full (key)); - View_remove_peer (key); - } - - /* Remove from push and pull lists */ - CustomPeerMap_remove_peer (push_map, key); - CustomPeerMap_remove_peer (pull_map, key); - - /* Cancle messages that have not been sent yet */ - while (NULL != peer_ctx->pending_messages_head) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Removing unsent %s\n", - peer_ctx->pending_messages_head->type); - /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does not - * set a #GNUNET_MQ_CancelImpl */ - /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */ - remove_pending_message (peer_ctx->pending_messages_head); - } - - /* If there is still a mq destroy it */ - if (NULL != peer_ctx->mq) - { - GNUNET_MQ_destroy (peer_ctx->mq); - peer_ctx->mq = NULL; - } - - - /* Remove the send_channel - * This function should be called again from #cleanup_channel (callback - * called on the destruction of channels) and clean up the rest. */ - if (NULL != peer_ctx->send_channel && - channel != peer_ctx->send_channel) - { - GNUNET_CADET_channel_destroy (peer_ctx->send_channel); - peer_ctx->send_channel = NULL; - } - - /* Remove the recv_channel - * This function should be called again from #cleanup_channel (callback - * called on the destruction of channels) and clean up the rest. */ - if (NULL != peer_ctx->recv_channel && - channel != peer_ctx->recv_channel) - { - GNUNET_CADET_channel_destroy (peer_ctx->recv_channel); - peer_ctx->recv_channel = NULL; - } - - /* If there is no channel we have to remove the context now */ - if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, key)) - LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n"); - - GNUNET_free (peer_ctx); - - return GNUNET_YES; -} - - -/** - * Clean the send channel of a peer - * If there is also no channel to receive messages from that peer, remove it - * from the peermap. - */ -void -peer_clean (const struct GNUNET_PeerIdentity *peer) -{ - struct PeerContext *peer_ctx; - /* struct GNUNET_CADET_Channel *channel; */ - - if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) && - (GNUNET_NO == View_contains_peer (peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (push_map, peer)) && - (GNUNET_NO == CustomPeerMap_contains_peer (pull_map, peer)) && - (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) ) - { - peer_ctx = get_peer_ctx (peer); - - if ( (NULL == peer_ctx->recv_channel) && - (NULL == peer_ctx->pending_messages_head) && - (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) ) - { - #ifdef ENABLE_MALICIOUS - if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) - peer_remove_cb (NULL, peer, peer_ctx); - #else /* ENABLE_MALICIOUS */ - peer_remove_cb (NULL, peer, peer_ctx); - #endif /* ENABLE_MALICIOUS */ - } + Peers_insert_peer (peer); + Peers_schedule_operation (peer, insert_in_sampler); + Peers_schedule_operation (peer, insert_in_view); } } @@ -2560,22 +2049,12 @@ shutdown_task (void *cls, do_round_task = NULL; } - { - if (GNUNET_SYSERR == - GNUNET_CONTAINER_multipeermap_iterate (peer_map, peer_remove_cb, NULL)) - LOG (GNUNET_ERROR_TYPE_WARNING, - "Iterating over peers to disconnect from them was cancelled\n"); - } + Peers_terminate (); GNUNET_NSE_disconnect (nse); RPS_sampler_destroy (prot_sampler); RPS_sampler_destroy (client_sampler); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Size of the peermap: %u\n", - GNUNET_CONTAINER_multipeermap_size (peer_map)); - GNUNET_break (0 == GNUNET_CONTAINER_multipeermap_size (peer_map)); GNUNET_CADET_disconnect (cadet_handle); - GNUNET_CONTAINER_multipeermap_destroy (peer_map); View_destroy (); CustomPeerMap_destroy (push_map); CustomPeerMap_destroy (pull_map); @@ -2644,119 +2123,6 @@ handle_client_disconnect (void *cls, } -/** - * Handle the channel a peer opens to us. - * - * @param cls The closure - * @param channel The channel the peer wants to establish - * @param initiator The peer's peer ID - * @param port The port the channel is being established over - * @param options Further options - */ - static void * -handle_inbound_channel (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - uint32_t port, - enum GNUNET_CADET_ChannelOption options) -{ - struct PeerContext *peer_ctx; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "New channel was established to us (Peer %s).\n", - GNUNET_i2s (initiator)); - GNUNET_assert (NULL != channel); /* according to cadet API */ - if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, initiator)) - peer_ctx = create_peer_ctx (initiator); - else - peer_ctx = get_peer_ctx (initiator); - /* We only accept one incoming channel from peers */ - if (NULL != peer_ctx->recv_channel) - { - GNUNET_CADET_channel_destroy (channel); - return NULL; - } - peer_ctx->recv_channel = channel; - peer_is_live (peer_ctx); - return NULL; -} - - -/** - * This is called when a channel is destroyed. - * - * @param cls The closure - * @param channel The channel being closed - * @param channel_ctx The context associated with this channel - */ - static void -cleanup_channel (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) -{ - struct GNUNET_PeerIdentity *peer; - struct PeerContext *peer_ctx; - - peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info ( - (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER); - // Guess simply casting isn't the nicest way... - // FIXME wait for cadet to change this function - - if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) - {/* We don't want to implicitly create a context that we're about to kill */ - peer_ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer); - if (NULL == peer_ctx) /* It could have been removed by shutdown_task */ - return; - - if (get_peer_flag (peer_ctx, TO_DESTROY)) - {/* We initiatad the destruction of this particular peer */ - if (channel == peer_ctx->send_channel) - peer_ctx->send_channel = NULL; - else if (channel == peer_ctx->recv_channel) - peer_ctx->recv_channel = NULL; - - to_file (file_name_view_log, - "-%s\t(cleanup channel, ourself)", - GNUNET_i2s_full (peer)); - } - - else - { /* We did not initiate the destruction of this peer */ - if (channel == peer_ctx->send_channel) - { /* Something (but us) killd the channel - clean up peer */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "send channel (%s) was destroyed - cleaning up\n", - GNUNET_i2s (peer)); - peer_ctx->send_channel = NULL; - /* Somwewhat {ab,re}use the iterator function */ - /* Cast to void is ok, because it's used as void in peer_remove_cb */ - (void) peer_remove_cb ((void *) channel, peer, peer_ctx); - } - else if (channel == peer_ctx->recv_channel) - { /* Other peer doesn't want to send us messages anymore */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s destroyed recv channel - cleaning up channel\n", - GNUNET_i2s (peer)); - peer_ctx->recv_channel = NULL; - } - else - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "unknown channel (%s) was destroyed\n", - GNUNET_i2s (peer)); - } - } - } - - else - { /* We don't know a context to that peer */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "channel (%s) without associated context was destroyed\n", - GNUNET_i2s (peer)); - } -} - - /** * Actually start the service. */ @@ -2809,8 +2175,6 @@ run (void *cls, int size; int out_size; - // TODO check what this does -- copied from gnunet-boss - // - seems to work as expected GNUNET_log_setup ("rps", GNUNET_error_type_to_string (GNUNET_ERROR_TYPE_DEBUG), NULL); cfg = c; @@ -2883,8 +2247,6 @@ run (void *cls, alpha = 0.45; beta = 0.45; - peer_map = GNUNET_CONTAINER_multipeermap_create (sampler_size_est_need, GNUNET_NO); - /* Initialise cadet */ static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { @@ -2899,12 +2261,12 @@ run (void *cls, const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in src/rps/rps.h cadet_handle = GNUNET_CADET_connect (cfg, cls, - &handle_inbound_channel, - &cleanup_channel, + &Peers_handle_inbound_channel, + &cleanup_destroyed_channel, cadet_handlers, ports); - peerinfo_handle = GNUNET_PEERINFO_connect (cfg); + Peers_initialise (cadet_handle, &own_identity); /* Initialise sampler */ struct GNUNET_TIME_Relative half_round_interval; @@ -2944,7 +2306,7 @@ run (void *cls, * @param argv command line arguments * @return 0 ok, 1 on error */ - int +int main (int argc, char *const *argv) { return (GNUNET_OK == diff --git a/src/rps/gnunet-service-rps_peers.c b/src/rps/gnunet-service-rps_peers.c new file mode 100644 index 000000000..7f656ec94 --- /dev/null +++ b/src/rps/gnunet-service-rps_peers.c @@ -0,0 +1,1158 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +/** + * @file rps/gnunet-service-rps_peers.c + * @brief utilities for managing (information about) peers + * @author Julius Bünger + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_cadet_service.h" +#include +#include "rps.h" +#include "gnunet-service-rps_peers.h" + + + +#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__) + + +/** + * Set a peer flag of given peer context. + */ +#define set_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags |= mask) + +/** + * Get peer flag of given peer context. + */ +#define check_peer_flag_set(peer_ctx, mask) (peer_ctx->peer_flags & mask ? GNUNET_YES : GNUNET_NO) + +/** + * Unset flag of given peer context. + */ +#define unset_peer_flag(peer_ctx, mask) (peer_ctx->peer_flags &= (~mask)) + +/** + * Set a channel flag of given channel context. + */ +#define set_channel_flag(channel_flags, mask) ((*channel_flags) |= mask) + +/** + * Get channel flag of given channel context. + */ +#define check_channel_flag_set(channel_flags, mask) ((*channel_flags) & mask ? GNUNET_YES : GNUNET_NO) + +/** + * Unset flag of given channel context. + */ +#define unset_channel_flag(channel_flags, mask) ((*channel_flags) &= (~mask)) + + + +/** + * Pending operation on peer consisting of callback and closure + * + * When an operation cannot be executed right now this struct is used to store + * the callback and closure for later execution. + */ +struct PeerPendingOp +{ + /** + * Callback + */ + PeerOp op; + + /** + * Closure + */ + void *op_cls; +}; + +/** + * List containing all messages that are yet to be send + * + * This is used to keep track of all messages that have not been sent yet. When + * a peer is to be removed the pending messages can be removed properly. + */ +struct PendingMessage +{ + /** + * DLL next, prev + */ + struct PendingMessage *next; + struct PendingMessage *prev; + + /** + * The envelope to the corresponding message + */ + struct GNUNET_MQ_Envelope *ev; + + /** + * The corresponding context + */ + struct PeerContext *peer_ctx; + + /** + * The message type + */ + const char *type; +}; + +/** + * Struct used to keep track of other peer's status + * + * This is stored in a multipeermap. + * It contains information such as cadet channels, a message queue for sending, + * status about the channels, the pending operations on this peer and some flags + * about the status of the peer itself. (live, valid, ...) + */ +struct PeerContext +{ + /** + * Message queue open to client + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Channel open to client. + */ + struct GNUNET_CADET_Channel *send_channel; + + /** + * Flags to the sending channel + */ + uint32_t *send_channel_flags; + + /** + * Channel open from client. + */ + struct GNUNET_CADET_Channel *recv_channel; // unneeded? + + /** + * Flags to the receiving channel + */ + uint32_t *recv_channel_flags; + + /** + * Array of pending operations on this peer. + */ + struct PeerPendingOp *pending_ops; + + /** + * Handle to the callback given to cadet_ntfy_tmt_rdy() + * + * To be canceled on shutdown. + */ + struct GNUNET_CADET_TransmitHandle *transmit_handle; + + /** + * Number of pending operations. + */ + unsigned int num_pending_ops; + + /** + * Identity of the peer + */ + struct GNUNET_PeerIdentity peer_id; + + /** + * Flags indicating status of peer + */ + uint32_t peer_flags; + + /** + * Last time we received something from that peer. + */ + struct GNUNET_TIME_Absolute last_message_recv; + + /** + * Last time we received a keepalive message. + */ + struct GNUNET_TIME_Absolute last_keepalive; + + /** + * DLL with all messages that are yet to be sent + */ + struct PendingMessage *pending_messages_head; + struct PendingMessage *pending_messages_tail; + + /** + * This is pobably followed by 'statistical' data (when we first saw + * him, how did we get his ID, how many pushes (in a timeinterval), + * ...) + */ +}; + + +/** + * Set of all peers to keep track of them. + */ +static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; + +/** + * Own #GNUNET_PeerIdentity. + */ +static const struct GNUNET_PeerIdentity *own_identity; + +/** + * Cadet handle. + */ +static struct GNUNET_CADET_Handle *cadet_handle; + + + +/** + * @brief Get the #PeerContext associated with a peer + * + * @param peer the peer id + * + * @return the #PeerContext + */ +static struct PeerContext * +get_peer_ctx (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *ctx; + int ret; + + ret = GNUNET_CONTAINER_multipeermap_contains (peer_map, peer); + GNUNET_assert (GNUNET_YES == ret); + ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, peer); + GNUNET_assert (NULL != ctx); + return ctx; +} + +/** + * @brief Create a new #PeerContext and insert it into the peer map + * + * @param peer the peer to create the #PeerContext for + * + * @return the #PeerContext + */ +static struct PeerContext * +create_peer_ctx (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *ctx; + int ret; + + GNUNET_assert (GNUNET_NO == Peers_check_peer_known (peer)); + + ctx = GNUNET_new (struct PeerContext); + ctx->peer_id = *peer; + ctx->send_channel_flags = GNUNET_new (uint32_t); + ctx->recv_channel_flags = GNUNET_new (uint32_t); + ret = GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); + GNUNET_assert (GNUNET_OK == ret); + return ctx; +} + +/** + * @brief Create or get a #PeerContext + * + * @param peer the peer to get the associated context to + * + * @return the context + */ +static struct PeerContext * +create_or_get_peer_ctx (const struct GNUNET_PeerIdentity *peer) +{ + if (GNUNET_NO == Peers_check_peer_known (peer)) + { + return create_peer_ctx (peer); + } + return get_peer_ctx (peer); +} + +/** + * @brief Set the peer flag to living and + * call the pending operations on this peer. + * + * Also sets the #Peers_VALID flag + * + * @param peer_ctx the #PeerContext of the peer to set live + */ +static void +set_peer_live (struct PeerContext *peer_ctx) +{ + struct GNUNET_PeerIdentity *peer; + unsigned int i; + + /* Cancle cadet transmit_handle if still scheduled */ + if (NULL != peer_ctx->transmit_handle) + { + GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle); + peer_ctx->transmit_handle = NULL; + } + + peer = &peer_ctx->peer_id; + set_peer_flag (peer_ctx, Peers_VALID); + // TODO LIVE/ONLINE + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Peer %s is live and valid\n", + GNUNET_i2s (peer)); + + /* Call pending operations */ + for (i = 0; i < peer_ctx->num_pending_ops; i++) + { + peer_ctx->pending_ops[i].op (peer_ctx->pending_ops[i].op_cls, peer); + } + GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0); +} + +/** + * @brief Get the channel of a peer. If not existing, create. + * + * @param peer the peer id + * @return the #GNUNET_CADET_Channel used to send data to @a peer + */ +struct GNUNET_CADET_Channel * +get_channel (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + if (NULL == peer_ctx->send_channel) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Trying to establish channel to peer %s\n", + GNUNET_i2s (peer)); + peer_ctx->send_channel = + GNUNET_CADET_channel_create (cadet_handle, + peer_ctx->send_channel_flags, /* context */ + peer, + GNUNET_RPS_CADET_PORT, + GNUNET_CADET_OPTION_RELIABLE); + } + return peer_ctx->send_channel; +} + +/** + * Get the message queue (#GNUNET_MQ_Handle) of a specific peer. + * + * If we already have a message queue open to this client, + * simply return it, otherways create one. + * + * @param peer the peer to get the mq to + * @return the #GNUNET_MQ_Handle + */ +static struct GNUNET_MQ_Handle * +get_mq (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + GNUNET_assert (NULL == peer_ctx->transmit_handle); + + if (NULL == peer_ctx->mq) + { + (void) get_channel (peer); + peer_ctx->mq = GNUNET_CADET_mq_create (peer_ctx->send_channel); + } + return peer_ctx->mq; +} + +/** + * @brief Callback that is called when a channel was effectively established. + * + * This is an implementation of #GNUNET_CONNECTION_TransmitReadyNotify and + * given to #GNUNET_CADET_notify_transmit_ready_cancel and called when the + * channel was successfully established. + * + * This function type was originally ment to be called to provide the data to + * be sent. This is called when the connection is ready to queue more data. + * However we use it to get notified about the successful establishement of a + * cadet channel. + * + * @a buf will be NULL and @a size zero if the + * connection was closed for writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in @a buf + * @param buf where the callee should write the message + * @return number of bytes written to @a buf + */ +//TODO +static size_t +cadet_notify_transmit_ready_cb (void *cls, size_t size, void *buf) +{ + struct PeerContext *peer_ctx = (struct PeerContext *) cls; + // TODO make sure the context is not deleted or the establishing of the + // channel is cancelled + + peer_ctx->transmit_handle = NULL; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Set ->transmit_handle = NULL for peer %s\n", + GNUNET_i2s (&peer_ctx->peer_id)); + + if ( (NULL != buf) && + (0 != size) ) + { + set_peer_live (peer_ctx); + } + else + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Problems establishing a connection to peer %s in order to check liveliness\n", + GNUNET_i2s (&peer_ctx->peer_id)); + // TODO reschedule? cleanup? + } + return 0; +} + +/** + * Issue a check whether peer is live + * + * @param peer_ctx the context of the peer + */ +static void +check_peer_live (struct PeerContext *peer_ctx) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Get informed about peer %s getting live\n", + GNUNET_i2s (&peer_ctx->peer_id)); + + if (NULL == peer_ctx->transmit_handle && + NULL == peer_ctx->send_channel) + { + (void) get_channel (&peer_ctx->peer_id); + peer_ctx->transmit_handle = + GNUNET_CADET_notify_transmit_ready (peer_ctx->send_channel, + GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + sizeof (struct GNUNET_MessageHeader), + cadet_notify_transmit_ready_cb, + peer_ctx); + } + else if (NULL != peer_ctx->transmit_handle) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Already waiting for notification\n"); + else if (NULL != peer_ctx->send_channel) + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Already have established channel to peer\n"); +} + +/** + * @brief Add an envelope to a message passed to mq to list of pending messages + * + * @param peer peer the message was sent to + * @param ev envelope to the message + * @param type type of the message to be sent + * @return pointer to pending message + */ +static struct PendingMessage * +insert_pending_message (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Envelope *ev, + const char *type) +{ + struct PendingMessage *pending_msg; + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + pending_msg = GNUNET_new (struct PendingMessage); + pending_msg->ev = ev; + pending_msg->peer_ctx = peer_ctx; + pending_msg->type = type; + GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head, + peer_ctx->pending_messages_tail, + pending_msg); + return pending_msg; +} + +/** + * @brief Remove a pending message from the respective DLL + * + * @param pending_msg the pending message to remove + */ +static void +remove_pending_message (struct PendingMessage *pending_msg) +{ + struct PeerContext *peer_ctx; + + peer_ctx = pending_msg->peer_ctx; + GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head, + peer_ctx->pending_messages_tail, + pending_msg); + /* FIXME We are not able to cancel messages as #GNUNET_CADET_mq_create () does + * not set a #GNUNET_MQ_CancelImpl */ + /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */ + GNUNET_free (pending_msg); +} + +/** + * @brief Check whether function of type #PeerOp was already scheduled + * + * The array with pending operations will probably never grow really big, so + * iterating over it should be ok. + * + * @param peer the peer to check + * @param peer_op the operation (#PeerOp) on the peer + * + * @return #GNUNET_YES if this operation is scheduled on that peer + * #GNUNET_NO otherwise + */ +static int +check_operation_scheduled (const struct GNUNET_PeerIdentity *peer, + const PeerOp peer_op) +{ + const struct PeerContext *peer_ctx; + unsigned int i; + + peer_ctx = get_peer_ctx (peer); + for (i = 0; i < peer_ctx->num_pending_ops; i++) + if (peer_op == peer_ctx->pending_ops[i].op) + return GNUNET_YES; + return GNUNET_NO; +} + +/** + * Iterator over hash map entries. Deletes all contexts of peers. + * + * @param cls closure + * @param key current public key + * @param value value in the hash map + * @return #GNUNET_YES if we should continue to iterate, + * #GNUNET_NO if not. + */ +static int +peermap_clear_iterator (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) +{ + Peers_remove_peer (key); + return GNUNET_YES; +} + +/** + * @brief This is called once a message is sent. + * + * Removes the pending message + * + * @param cls type of the message that was sent + */ +static void +mq_notify_sent_cb (void *cls) +{ + struct PendingMessage *pending_msg = (struct PendingMessage *) cls; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s was sent.\n", + pending_msg->type); + remove_pending_message (pending_msg); +} + + +/** + * @brief Initialise storage of peers + * + * @param cadet_h cadet handle + * @param own_id own peer identity + */ +void +Peers_initialise (struct GNUNET_CADET_Handle *cadet_h, + const struct GNUNET_PeerIdentity *own_id) +{ + cadet_handle = cadet_h; + own_identity = own_id; + peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO); +} + +/** + * @brief Delete storage of peers that was created with #Peers_initialise () + */ +void +Peers_terminate () +{ + if (GNUNET_SYSERR == + GNUNET_CONTAINER_multipeermap_iterate (peer_map, + peermap_clear_iterator, + NULL)) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Iteration destroying peers was aborted.\n"); + } + GNUNET_CONTAINER_multipeermap_destroy (peer_map); +} + +/** + * @brief Add peer to known peers. + * + * This function is called on new peer_ids from 'external' sources + * (client seed, cadet get_peers(), ...) + * + * @param peer the new peer + * + * @return #GNUNET_YES if peer was inserted + * #GNUNET_NO if peer was already known + */ +int +Peers_insert_peer (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + if ( (GNUNET_YES == Peers_check_peer_known (peer)) || + (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, own_identity)) ) + { + return GNUNET_NO; /* We already know this peer - nothing to do */ + } + peer_ctx = create_peer_ctx (peer); + // TODO LIVE + if (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_VALID)) + { + check_peer_live (peer_ctx); + } + return GNUNET_YES; +} + +/** + * @brief Remove unecessary data + * + * If the other peer is not intending to send messages, we have messages pending + * to be sent to this peer and we are not waiting for a reply, remove the + * information about it (its #PeerContext). + * + * @param peer the peer to clean + * @return #GNUNET_YES if peer was removed + * #GNUNET_NO otherwise + */ +int +Peers_clean_peer (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + // TODO actually remove unnecessary data + + if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) + { + return GNUNET_NO; + } + + peer_ctx = get_peer_ctx (peer); + if ( (NULL != peer_ctx->recv_channel) || + (NULL != peer_ctx->pending_messages_head) || + (GNUNET_NO == check_peer_flag_set (peer_ctx, Peers_PULL_REPLY_PENDING)) ) + { + return GNUNET_NO; + } + Peers_remove_peer (peer); + return GNUNET_YES; +} + +/** + * @brief Remove peer + * + * @param peer the peer to clean + * @return #GNUNET_YES if peer was removed + * #GNUNET_NO otherwise + */ +int +Peers_remove_peer (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) + { + return GNUNET_NO; + } + + peer_ctx = get_peer_ctx (peer); + set_peer_flag (peer_ctx, Peers_TO_DESTROY); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Going to remove peer %s\n", + GNUNET_i2s (&peer_ctx->peer_id)); + + GNUNET_array_grow (peer_ctx->pending_ops, peer_ctx->num_pending_ops, 0); + // TODO delete struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle + /* Cancle messages that have not been sent yet */ + while (NULL != peer_ctx->pending_messages_head) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Removing unsent %s\n", + peer_ctx->pending_messages_head->type); + remove_pending_message (peer_ctx->pending_messages_head); + } + /* If we are still waiting for notification whether this peer is live + * cancel the according task */ + if (NULL != peer_ctx->transmit_handle) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Trying to cancle transmit_handle for peer %s\n", + GNUNET_i2s (&peer_ctx->peer_id)); + GNUNET_CADET_notify_transmit_ready_cancel (peer_ctx->transmit_handle); + peer_ctx->transmit_handle = NULL; + } + if (NULL != peer_ctx->send_channel) + { + GNUNET_CADET_channel_destroy (peer_ctx->send_channel); + peer_ctx->send_channel = NULL; + } + if (NULL != peer_ctx->recv_channel) + { + GNUNET_CADET_channel_destroy (peer_ctx->recv_channel); + peer_ctx->recv_channel = NULL; + } + if (NULL != peer_ctx->mq) + { + GNUNET_MQ_destroy (peer_ctx->mq); + peer_ctx->mq = NULL; + } + + GNUNET_free (peer_ctx->send_channel_flags); + GNUNET_free (peer_ctx->recv_channel_flags); + + if (GNUNET_YES != GNUNET_CONTAINER_multipeermap_remove_all (peer_map, &peer_ctx->peer_id)) + { + LOG (GNUNET_ERROR_TYPE_WARNING, "removing peer from peer_map failed\n"); + } + GNUNET_free (peer_ctx); + return GNUNET_YES; +} + +/** + * @brief set flags on a given peer. + * + * @param peer the peer to set flags on + * @param flags the flags + */ +void +Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags) +{ + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + set_peer_flag (peer_ctx, flags); +} + +/** + * @brief unset flags on a given peer. + * + * @param peer the peer to unset flags on + * @param flags the flags + */ +void +Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags) +{ + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + unset_peer_flag (peer_ctx, flags); +} + +/** + * @brief Check whether flags on a peer are set. + * + * @param peer the peer to check the flag of + * @param flags the flags to check + * + * @return #GNUNET_YES if all given flags are set + * #GNUNET_NO otherwise + */ +int +Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags) +{ + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + return check_peer_flag_set (peer_ctx, flags); +} + + +/** + * @brief set flags on a given channel. + * + * @param channel the channel to set flags on + * @param flags the flags + */ +void +Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags) +{ + set_channel_flag (channel_flags, flags); +} + +/** + * @brief unset flags on a given channel. + * + * @param channel the channel to unset flags on + * @param flags the flags + */ +void +Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags) +{ + unset_channel_flag (channel_flags, flags); +} + +/** + * @brief Check whether flags on a channel are set. + * + * @param channel the channel to check the flag of + * @param flags the flags to check + * + * @return #GNUNET_YES if all given flags are set + * #GNUNET_NO otherwise + */ +int +Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags) +{ + return check_channel_flag_set (channel_flags, flags); +} + +/** + * @brief Check whether we have information about the given peer. + * + * @param peer peer in question + * + * @return #GNUNET_YES if peer is known + * #GNUNET_NO if peer is not knwon + */ +int +Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer) +{ + return GNUNET_CONTAINER_multipeermap_contains (peer_map, peer); +} + +/** + * @brief Indicate that we want to send to the other peer + * + * This establishes a sending channel + * + * @param peer the peer to establish channel to + */ +void +Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer) +{ + GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer)); + (void) get_channel (peer); +} + +/** + * @brief Check whether other peer has the intention to send/opened channel + * towars us + * + * @param peer the peer in question + * + * @return #GNUNET_YES if peer has the intention to send + * #GNUNET_NO otherwise + */ +int +Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer) +{ + const struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + if (NULL != peer_ctx->recv_channel) + { + return GNUNET_YES; + } + return GNUNET_NO; +} + +/** + * Handle the channel a peer opens to us. + * + * @param cls The closure + * @param channel The channel the peer wants to establish + * @param initiator The peer's peer ID + * @param port The port the channel is being established over + * @param options Further options + * + * @return initial channel context for the channel + * (can be NULL -- that's not an error) + */ +void * +Peers_handle_inbound_channel (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + uint32_t port, + enum GNUNET_CADET_ChannelOption options) +{ + struct PeerContext *peer_ctx; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "New channel was established to us (Peer %s).\n", + GNUNET_i2s (initiator)); + GNUNET_assert (NULL != channel); /* according to cadet API */ + /* Make sure we 'know' about this peer */ + peer_ctx = create_or_get_peer_ctx (initiator); + set_peer_live (peer_ctx); + /* We only accept one incoming channel per peer */ + if (GNUNET_YES == Peers_check_peer_send_intention (initiator)) + { + set_channel_flag (peer_ctx->recv_channel_flags, + Peers_CHANNEL_ESTABLISHED_TWICE); + GNUNET_CADET_channel_destroy (channel); + /* return the channel context */ + return peer_ctx->recv_channel_flags; + } + peer_ctx->recv_channel = channel; + return peer_ctx->recv_channel_flags; +} + +/** + * @brief Check whether a sending channel towards the given peer exists + * + * @param peer the peer to check for + * + * @return #GNUNET_YES if a sending channel towards that peer exists + * #GNUNET_NO otherwise + */ +int +Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + if (GNUNET_NO == Peers_check_peer_known (peer)) + { /* If no such peer exists, there is no channel */ + return GNUNET_NO; + } + peer_ctx = get_peer_ctx (peer); + if (NULL == peer_ctx->send_channel) + { + return GNUNET_NO; + } + return GNUNET_YES; +} + +/** + * @brief check whether the given channel is the sending channel of the given + * peer + * + * @param peer the peer in question + * @param channel the channel to check for + * @param role either #Peers_CHANNEL_ROLE_SENDING, or + * #Peers_CHANNEL_ROLE_RECEIVING + * + * @return #GNUNET_YES if the given chennel is the sending channel of the peer + * #GNUNET_NO otherwise + */ +int +Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_CADET_Channel *channel, + enum Peers_ChannelRole role) +{ + const struct PeerContext *peer_ctx; + + if (GNUNET_NO == Peers_check_peer_known (peer)) + { + return GNUNET_NO; + } + peer_ctx = get_peer_ctx (peer); + if ( (Peers_CHANNEL_ROLE_SENDING == role) && + (channel == peer_ctx->send_channel) ) + { + return GNUNET_YES; + } + if ( (Peers_CHANNEL_ROLE_RECEIVING == role) && + (channel == peer_ctx->recv_channel) ) + { + return GNUNET_YES; + } + return GNUNET_NO; +} + +/** + * @brief Destroy the send channel of a peer e.g. stop indicating a sending + * intention to another peer + * + * If there is also no channel to receive messages from that peer, remove it + * from the peermap. + * TODO really? + * + * @peer the peer identity of the peer whose sending channel to destroy + * @return #GNUNET_YES if channel was destroyed + * #GNUNET_NO otherwise + */ +int +Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + if (GNUNET_NO == Peers_check_peer_known (peer)) + { + return GNUNET_NO; + } + peer_ctx = get_peer_ctx (peer); + if (NULL != peer_ctx->send_channel) + { + set_channel_flag (peer_ctx->send_channel_flags, Peers_CHANNEL_CLEAN); + GNUNET_CADET_channel_destroy (peer_ctx->send_channel); + peer_ctx->send_channel = NULL; + return GNUNET_YES; + } + return GNUNET_NO; +} + +/** + * This is called when a channel is destroyed. + * + * Removes peer completely from our knowledge if the send_channel was destroyed + * Otherwise simply delete the recv_channel + * + * @param cls The closure + * @param channel The channel being closed + * @param channel_ctx The context associated with this channel + */ +void +Peers_cleanup_destroyed_channel (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx) +{ + struct GNUNET_PeerIdentity *peer; + struct PeerContext *peer_ctx; + + peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info ( + (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER); + // FIXME wait for cadet to change this function + + if (GNUNET_NO == Peers_check_peer_known (peer)) + {/* We don't want to implicitly create a context that we're about to kill */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "channel (%s) without associated context was destroyed\n", + GNUNET_i2s (peer)); + return; + } + + peer_ctx = get_peer_ctx (peer); + GNUNET_assert (NULL != peer_ctx); /* It could have been removed by shutdown_task */ + + /* If our peer issued the destruction of the channel, the #Peers_TO_DESTROY + * flag will be set. In this case simply make sure that the channels are + * cleaned. */ + if (Peers_check_peer_flag (peer, Peers_TO_DESTROY)) + {/* We initiatad the destruction of this particular peer */ + if (channel == peer_ctx->send_channel) + peer_ctx->send_channel = NULL; + else if (channel == peer_ctx->recv_channel) + peer_ctx->recv_channel = NULL; + + return; + } + + else + { /* We did not initiate the destruction of this peer */ + if (channel == peer_ctx->send_channel) + { /* Something (but us) killd the channel - clean up peer */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "send channel (%s) was destroyed - cleaning up\n", + GNUNET_i2s (peer)); + peer_ctx->send_channel = NULL; + } + else if (channel == peer_ctx->recv_channel) + { /* Other peer doesn't want to send us messages anymore */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Peer %s destroyed recv channel - cleaning up channel\n", + GNUNET_i2s (peer)); + peer_ctx->recv_channel = NULL; + } + else + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "unknown channel (%s) was destroyed\n", + GNUNET_i2s (peer)); + } + } +} + +/** + * @brief Issue a check whether peer is live + * + * This tries to establish a channel to the given peer. Once the channel is + * established successfully, we know the peer is live. + * + * @param peer the peer to check liveliness + */ +void +Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer) +{ + struct PeerContext *peer_ctx; + + if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, own_identity)) + { + return; /* We know that we are online */ + } + + peer_ctx = create_or_get_peer_ctx (peer); + // TODO if LIVE/ONLINE + check_peer_live (peer_ctx); +} + +/** + * @brief Send a message to another peer. + * + * Keeps track about pending messages so they can be properly removed when the + * peer is destroyed. + * + * @param peer receeiver of the message + * @param ev envelope of the message + * @param type type of the message + */ +void +Peers_send_message (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Envelope *ev, + const char *type) +{ + struct PendingMessage *pending_msg; + struct GNUNET_MQ_Handle *mq; + + pending_msg = insert_pending_message (peer, ev, "PULL REPLY"); + mq = get_mq (peer); + GNUNET_MQ_notify_sent (ev, + mq_notify_sent_cb, + pending_msg); + GNUNET_MQ_send (mq, ev); +} + +/** + * @brief Schedule a operation on given peer + * + * Avoids scheduling an operation twice. + * + * @param peer the peer we want to schedule the operation for once it gets live + * + * @return #GNUNET_YES if the operation was scheduled + * #GNUNET_NO otherwise + */ +int +Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer, + const PeerOp peer_op) +{ + struct PeerPendingOp pending_op; + struct PeerContext *peer_ctx; + + if (0 == GNUNET_CRYPTO_cmp_peer_identity (peer, own_identity)) + { + return GNUNET_NO; + } + GNUNET_assert (GNUNET_YES == Peers_check_peer_known (peer)); + + //TODO if LIVE/ONLINE execute immediately + + if (GNUNET_NO == check_operation_scheduled (peer, peer_op)) + { + peer_ctx = get_peer_ctx (peer); + pending_op.op = peer_op; + pending_op.op_cls = NULL; + GNUNET_array_append (peer_ctx->pending_ops, + peer_ctx->num_pending_ops, + pending_op); + return GNUNET_YES; + } + return GNUNET_NO; +} + +/* end of gnunet-service-rps_peers.c */ diff --git a/src/rps/gnunet-service-rps_peers.h b/src/rps/gnunet-service-rps_peers.h new file mode 100644 index 000000000..718890762 --- /dev/null +++ b/src/rps/gnunet-service-rps_peers.h @@ -0,0 +1,371 @@ +/* + This file is part of GNUnet. + Copyright (C) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, + Boston, MA 02110-1301, USA. +*/ + +/** + * @file rps/gnunet-service-rps_peers.h + * @brief utilities for managing (information about) peers + * @author Julius Bünger + */ +#include "gnunet_util_lib.h" +#include +#include "gnunet_cadet_service.h" + + +/** + * Different flags indicating the status of another peer. + */ +enum Peers_PeerFlags +{ + /** + * If we are waiting for a reply from that peer (sent a pull request). + */ + Peers_PULL_REPLY_PENDING = 0x01, + + /* IN_OTHER_GOSSIP_LIST = 0x02, unneeded? */ + /* IN_OWN_SAMPLER_LIST = 0x04, unneeded? */ + /* IN_OWN_GOSSIP_LIST = 0x08, unneeded? */ + + /** + * We set this bit when we can be sure the other peer is/was live. + */ + Peers_VALID = 0x10, + + /** + * We set this bit when we know the peer is online. + */ + Peers_ONLINE = 0x20, + + /** + * We set this bit when we are going to destroy the channel to this peer. + * When cleanup_channel is called, we know that we wanted to destroy it. + * Otherwise the channel to the other peer was destroyed. + */ + Peers_TO_DESTROY = 0x40, +}; + +/** + * Keep track of the status of a channel. + * + * This is needed in order to know what to do with a channel when it's + * destroyed. + */ +enum Peers_ChannelFlags +{ + /** + * We destroyed the channel because the other peer established a second one. + */ + Peers_CHANNEL_ESTABLISHED_TWICE = 0x1, + + /** + * The channel was removed because it was not needed any more. This should be + * the sending channel. + */ + Peers_CHANNEL_CLEAN = 0x2, +}; + +/** + * @brief The role of a channel. Sending or receiving. + */ +enum Peers_ChannelRole +{ + /** + * Channel is used for sending + */ + Peers_CHANNEL_ROLE_SENDING = 0x01, + + /** + * Channel is used for receiving + */ + Peers_CHANNEL_ROLE_RECEIVING = 0x02, +}; + +/** + * @brief Functions of this type can be used to be stored at a peer for later execution. + * + * @param cls closure + * @param peer peer to execute function on + */ +typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Initialise storage of peers + * + * @param cadet_h cadet handle + * @param own_id own peer identity + */ +void +Peers_initialise (struct GNUNET_CADET_Handle *cadet_h, + const struct GNUNET_PeerIdentity *own_id); + +/** + * @brief Delete storage of peers that was created with #Peers_initialise () + */ +void +Peers_terminate (); + +/** + * @brief Add peer to known peers. + * + * This function is called on new peer_ids from 'external' sources + * (client seed, cadet get_peers(), ...) + * + * @param peer the new peer + * + * @return #GNUNET_YES if peer was inserted + * #GNUNET_NO if peer was already known + */ +int +Peers_insert_peer (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Remove unecessary data + * + * If the other peer is not intending to send messages, we have messages pending + * to be sent to this peer and we are not waiting for a reply, remove the + * information about it (its #PeerContext). + * + * @param peer the peer to clean + * @return #GNUNET_YES if peer was removed + * #GNUNET_NO otherwise + */ +int +Peers_clean_peer (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Remove peer + * + * @param peer the peer to clean + * @return #GNUNET_YES if peer was removed + * #GNUNET_NO otherwise + */ +int +Peers_remove_peer (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief set flags on a given peer. + * + * @param peer the peer to set flags on + * @param flags the flags + */ +void +Peers_set_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags); + +/** + * @brief unset flags on a given peer. + * + * @param peer the peer to unset flags on + * @param flags the flags + */ +void +Peers_unset_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags); + +/** + * @brief Check whether flags on a peer are set. + * + * @param peer the peer to check the flag of + * @param flags the flags to check + * + * @return #GNUNET_YES if all given flags are set + * ##GNUNET_NO otherwise + */ +int +Peers_check_peer_flag (const struct GNUNET_PeerIdentity *peer, enum Peers_PeerFlags flags); + + +/** + * @brief set flags on a given channel. + * + * @param channel the channel to set flags on + * @param flags the flags + */ +void +Peers_set_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags); + +/** + * @brief unset flags on a given channel. + * + * @param channel the channel to unset flags on + * @param flags the flags + */ +void +Peers_unset_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags); + +/** + * @brief Check whether flags on a channel are set. + * + * @param channel the channel to check the flag of + * @param flags the flags to check + * + * @return #GNUNET_YES if all given flags are set + * #GNUNET_NO otherwise + */ +int +Peers_check_channel_flag (uint32_t *channel_flags, enum Peers_ChannelFlags flags); + +/** + * @brief Check whether we have information about the given peer. + * + * @param peer peer in question + * + * @return #GNUNET_YES if peer is known + * #GNUNET_NO if peer is not knwon + */ +int +Peers_check_peer_known (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Indicate that we want to send to the other peer + * + * This establishes a sending channel + * + * @param peer the peer to establish channel to + */ +void +Peers_indicate_sending_intention (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Check whether other peer has the intention to send/opened channel + * towars us + * + * @param peer the peer in question + * + * @return #GNUNET_YES if peer has the intention to send + * #GNUNET_NO otherwise + */ +int +Peers_check_peer_send_intention (const struct GNUNET_PeerIdentity *peer); + +/** + * Handle the channel a peer opens to us. + * + * @param cls The closure + * @param channel The channel the peer wants to establish + * @param initiator The peer's peer ID + * @param port The port the channel is being established over + * @param options Further options + * + * @return initial channel context for the channel + * (can be NULL -- that's not an error) + */ +void * +Peers_handle_inbound_channel (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator, + uint32_t port, + enum GNUNET_CADET_ChannelOption options); + +/** + * @brief Check whether a sending channel towards the given peer exists + * + * @param peer the peer to check for + * + * @return #GNUNET_YES if a sending channel towards that peer exists + * #GNUNET_NO otherwise + */ +int +Peers_check_sending_channel_exists (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief check whether the given channel is the sending channel of the given + * peer + * + * @param peer the peer in question + * @param channel the channel to check for + * @param role either #Peers_CHANNEL_ROLE_SENDING, or + * #Peers_CHANNEL_ROLE_RECEIVING + * + * @return #GNUNET_YES if the given chennel is the sending channel of the peer + * #GNUNET_NO otherwise + */ +int +Peers_check_channel_role (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_CADET_Channel *channel, + enum Peers_ChannelRole role); + +/** + * @brief Destroy the send channel of a peer e.g. stop indicating a sending + * intention to another peer + * + * If there is also no channel to receive messages from that peer, remove it + * from the peermap. + * + * @peer the peer identity of the peer whose sending channel to destroy + * @return #GNUNET_YES if channel was destroyed + * #GNUNET_NO otherwise + */ +int +Peers_destroy_sending_channel (const struct GNUNET_PeerIdentity *peer); + +/** + * This is called when a channel is destroyed. + * + * Removes peer completely from our knowledge if the send_channel was destroyed + * Otherwise simply delete the recv_channel + * + * @param cls The closure + * @param channel The channel being closed + * @param channel_ctx The context associated with this channel + */ +void +Peers_cleanup_destroyed_channel (void *cls, + const struct GNUNET_CADET_Channel *channel, + void *channel_ctx); + +/** + * @brief Issue a check whether peer is live + * + * This tries to establish a channel to the given peer. Once the channel is + * established successfully, we know the peer is live. + * + * @param peer the peer to check liveliness + */ +void +Peers_issue_peer_liveliness_check (const struct GNUNET_PeerIdentity *peer); + +/** + * @brief Send a message to another peer. + * + * Keeps track about pending messages so they can be properly removed when the + * peer is destroyed. + * + * @param peer receeiver of the message + * @param ev envelope of the message + * @param type type of the message + */ +void +Peers_send_message (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Envelope *ev, + const char *type); + +/** + * @brief Schedule a operation on given peer + * + * Avoids scheduling an operation twice. + * + * @param peer the peer we want to schedule the operation for once it gets live + * + * @return #GNUNET_YES if the operation was scheduled + * #GNUNET_NO otherwise + */ +int +Peers_schedule_operation (const struct GNUNET_PeerIdentity *peer, + const PeerOp peer_op); + +/* end of gnunet-service-rps_peers.h */ -- 2.25.1