From a967815bd7207de41c7e719ef34628f2eaed3ab0 Mon Sep 17 00:00:00 2001 From: "Schanzenbach, Martin" Date: Tue, 24 Dec 2019 22:42:58 +0900 Subject: [PATCH] fix dropped pkts; wip --- src/transport/gnunet-communicator-unix.c | 50 ++++++++++++++++-------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/src/transport/gnunet-communicator-unix.c b/src/transport/gnunet-communicator-unix.c index 29ec087e1..5d7949b97 100644 --- a/src/transport/gnunet-communicator-unix.c +++ b/src/transport/gnunet-communicator-unix.c @@ -42,7 +42,7 @@ * otherwise we may read messages just to have them dropped * by the communicator API. */ -#define DEFAULT_MAX_QUEUE_LENGTH 8 +#define DEFAULT_MAX_QUEUE_LENGTH 8000 /** * Address prefix used by the communicator. @@ -412,16 +412,6 @@ select_write_cb (void *cls) /* take queue of the ready list */ write_task = NULL; - GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue); - if (NULL != queue_head) - write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, - unix_sock, - &select_write_cb, - NULL); - - /* send 'msg' */ - queue->msg = NULL; - GNUNET_MQ_impl_send_continue (queue->mq); resend: /* Send the data */ sent = GNUNET_NETWORK_socket_sendto (unix_sock, @@ -437,6 +427,17 @@ resend: (sent < 0) ? strerror (errno) : "ok"); if (-1 != sent) { + GNUNET_CONTAINER_DLL_remove (queue_head, queue_tail, queue); + if (NULL != queue_head) + write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_write_cb, + NULL); + + /* send 'msg' */ + GNUNET_free (queue->msg); + queue->msg = NULL; + GNUNET_MQ_impl_send_continue (queue->mq); GNUNET_STATISTICS_update (stats, "# bytes sent", (long long) sent, @@ -448,6 +449,10 @@ resend: "# network transmission failures", 1, GNUNET_NO); + write_task = GNUNET_SCHEDULER_add_write_net (GNUNET_TIME_UNIT_FOREVER_REL, + unix_sock, + &select_write_cb, + NULL); switch (errno) { case EAGAIN: @@ -473,7 +478,7 @@ resend: return; } GNUNET_log ( - GNUNET_ERROR_TYPE_DEBUG, + GNUNET_ERROR_TYPE_WARNING, "Trying to increase socket buffer size from %u to %u for message size %u\n", (unsigned int) size, (unsigned int) ((msg_size / 1000) + 2) * 1000, @@ -523,9 +528,9 @@ mq_send (struct GNUNET_MQ_Handle *mq, GNUNET_assert (mq == queue->mq); GNUNET_assert (NULL == queue->msg); - //Convert to UNIXMessage + // Convert to UNIXMessage queue->msg = GNUNET_malloc (msize + sizeof (struct UNIXMessage)); - queue->msg->header.size = htons(msize + sizeof (struct UNIXMessage)); + queue->msg->header.size = htons (msize + sizeof (struct UNIXMessage)); queue->msg->sender = my_identity; memcpy (&queue->msg[1], msg, msize); GNUNET_CONTAINER_DLL_insert (queue_head, queue_tail, queue); @@ -697,14 +702,13 @@ receive_complete_cb (void *cls, int success) { (void) cls; delivering_messages--; - if (GNUNET_OK != success) GNUNET_STATISTICS_update (stats, "# transport transmission failures", 1, GNUNET_NO); - GNUNET_assert (NULL != unix_sock); - if ((NULL == read_task) && (delivering_messages < max_queue_length)) + if ((NULL == read_task) && (delivering_messages < max_queue_length) && + (NULL != unix_sock)) read_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, unix_sock, &select_read_cb, @@ -807,15 +811,26 @@ select_read_cb (void *cls) &receive_complete_cb, NULL); if (GNUNET_SYSERR == ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Transport not up!\n"); return; /* transport not up */ + } if (GNUNET_NO == ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Error sending message to transport\n"); break; + } delivering_messages++; offset += csize; } } if (delivering_messages >= max_queue_length) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Back pressure %llu\n", delivering_messages); + /* we should try to apply 'back pressure' */ GNUNET_SCHEDULER_cancel (read_task); read_task = NULL; @@ -991,6 +1006,7 @@ run (void *cls, struct GNUNET_CRYPTO_EddsaPrivateKey *my_private_key; (void) cls; + delivering_messages = 0; my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration (cfg); if (NULL == my_private_key) -- 2.25.1