From 688f62f7a0fcfc85a3bf611ff855edff541dc52a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Wed, 5 Aug 2015 21:47:36 +0000 Subject: [PATCH] -keep track of messages passed to mq --- src/rps/gnunet-service-rps.c | 115 +++++++++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 11 deletions(-) diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 5d9d2f9c6..77fe80351 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -182,6 +182,34 @@ struct PeerOutstandingOp }; +/** + * List containing all messages that are yet to be send + */ +struct PendingMessage +{ + /** + * DLL next, prev + */ + struct PendingMessage *next; + struct PendingMessage *prev; + + /** + * The envelope to the corresponding message + */ + struct GNUNET_MQ_Envelope *ev; + + /** + * The corresponding context + */ + struct PeerContext *peer_ctx; + + /** + * The message type + */ + const char *type; +}; + + /** * Struct used to keep track of other peer's status * @@ -231,6 +259,12 @@ struct PeerContext */ uint32_t peer_flags; + /** + * DLL with all messages that are yet to be sent + */ + struct PendingMessage *pending_messages_head; + struct PendingMessage *pending_messages_tail; + /** * This is pobably followed by 'statistical' data (when we first saw * him, how did we get his ID, how many pushes (in a timeinterval), @@ -1147,6 +1181,45 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array, } +/** + * @brief Add an envelope to a message passed to mq to list of pending messages + * + * @param peer peer the message was sent to + * @param ev envelope to the message + * @param type type of the message to be sent + */ +static struct PendingMessage * +insert_pending_message (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_MQ_Envelope *ev, + const char *type) +{ + struct PendingMessage *pending_msg; + struct PeerContext *peer_ctx; + + peer_ctx = get_peer_ctx (peer); + pending_msg = GNUNET_new (struct PendingMessage); + pending_msg->ev = ev; + pending_msg->peer_ctx = peer_ctx; + pending_msg->type = type; + GNUNET_CONTAINER_DLL_insert (peer_ctx->pending_messages_head, + peer_ctx->pending_messages_tail, + pending_msg); + return pending_msg; +} + +static void +remove_pending_message (struct PendingMessage *pending_msg) +{ + struct PeerContext *peer_ctx; + + peer_ctx = pending_msg->peer_ctx; + GNUNET_CONTAINER_DLL_remove (peer_ctx->pending_messages_head, + peer_ctx->pending_messages_tail, + pending_msg); + GNUNET_free (pending_msg); +} + + /** * @brief This is called once a message is sent. * @@ -1155,10 +1228,11 @@ add_peer_array_to_set (const struct GNUNET_PeerIdentity *peer_array, static void mq_notify_sent_cb (void *cls) { - const char *type = cls; + struct PendingMessage *pending_msg = (struct PendingMessage *) cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "%s was sent.\n", - type); + pending_msg->type); + remove_pending_message (pending_msg); } @@ -1178,6 +1252,7 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, struct GNUNET_MQ_Handle *mq; struct GNUNET_MQ_Envelope *ev; struct GNUNET_RPS_P2P_PullReplyMessage *out_msg; + struct PendingMessage *pending_msg; /* Compute actual size */ send_size = sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) + @@ -1208,9 +1283,10 @@ send_pull_reply (const struct GNUNET_PeerIdentity *peer_id, memcpy (&out_msg[1], peer_ids, send_size * sizeof (struct GNUNET_PeerIdentity)); + pending_msg = insert_pending_message (peer_id, ev, "PULL REPLY"); GNUNET_MQ_notify_sent (ev, mq_notify_sent_cb, - "PULL REPLY"); + pending_msg); GNUNET_MQ_send (mq, ev); } @@ -1923,11 +1999,12 @@ compute_rand_delay (struct GNUNET_TIME_Relative mean, unsigned int spread) * @param peer_id the peer to send the pull request to. */ static void -send_pull_request (struct GNUNET_PeerIdentity *peer_id) +send_pull_request (const struct GNUNET_PeerIdentity *peer_id) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_MQ_Handle *mq; struct PeerContext *peer_ctx; + struct PendingMessage *pending_msg; peer_ctx = get_peer_ctx (peer_id); GNUNET_assert (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)); @@ -1939,9 +2016,10 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id) ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); mq = get_mq (peer_id); + pending_msg = insert_pending_message (peer_id, ev, "PULL REQUEST"); GNUNET_MQ_notify_sent (ev, mq_notify_sent_cb, - "PULL REQUEST"); + pending_msg); GNUNET_MQ_send (mq, ev); } @@ -1952,10 +2030,11 @@ send_pull_request (struct GNUNET_PeerIdentity *peer_id) * @param peer_id the peer to send the push to. */ static void -send_push (struct GNUNET_PeerIdentity *peer_id) +send_push (const struct GNUNET_PeerIdentity *peer_id) { struct GNUNET_MQ_Envelope *ev; struct GNUNET_MQ_Handle *mq; + struct PendingMessage *pending_msg; LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send PUSH to peer %s.\n", @@ -1963,9 +2042,10 @@ send_push (struct GNUNET_PeerIdentity *peer_id) ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); mq = get_mq (peer_id); + pending_msg = insert_pending_message (peer_id, ev, "PUSH"); GNUNET_MQ_notify_sent (ev, mq_notify_sent_cb, - "PUSH"); + pending_msg); GNUNET_MQ_send (mq, ev); } @@ -2542,7 +2622,7 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value) GNUNET_i2s (&peer_ctx->peer_id)); /* Remove it from the sampler used for the Brahms protocol */ - RPS_sampler_reinitialise_by_value (prot_sampler, key); + RPS_sampler_reinitialise_by_value (prot_sampler, key); /* If operations are still scheduled for this peer cancel those */ if (0 != peer_ctx->num_outstanding_ops) @@ -2585,6 +2665,18 @@ peer_remove_cb (void *cls, const struct GNUNET_PeerIdentity *key, void *value) if (GNUNET_YES == in_arr (pull_list, pull_list_size, key)) rem_from_list (&pull_list, &pull_list_size, key); + /* Cancle messages that have not been sent yet */ + while (NULL != peer_ctx->pending_messages_head) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Removing unsent %s\n", + peer_ctx->pending_messages_head->type); + /* We are not able to cancel messages as #GNUNET_CADET_mq_create () does not + * set a #GNUNET_MQ_CancelImpl */ + /* GNUNET_MQ_send_cancel (peer_ctx->pending_messages_head->ev); */ + remove_pending_message (peer_ctx->pending_messages_head); + } + /* If there is still a mq destroy it */ if (NULL != peer_ctx->mq) { @@ -2636,19 +2728,20 @@ peer_clean (const struct GNUNET_PeerIdentity *peer) if ( (0 == RPS_sampler_count_id (prot_sampler, peer)) && (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains (view, peer)) && - (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) && (GNUNET_NO == in_arr (push_list, push_list_size, peer)) && - (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) ) + (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) && + (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, peer)) ) { peer_ctx = get_peer_ctx (peer); if ( (NULL == peer_ctx->recv_channel) && + (NULL == peer_ctx->pending_messages_head) && (GNUNET_NO == get_peer_flag (peer_ctx, PULL_REPLY_PENDING)) ) { #ifdef ENABLE_MALICIOUS if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer)) peer_remove_cb (NULL, peer, peer_ctx); - #else + #else /* ENABLE_MALICIOUS */ peer_remove_cb (NULL, peer, peer_ctx); #endif /* ENABLE_MALICIOUS */ } -- 2.25.1