From: Christian Grothoff Date: Wed, 10 Jun 2009 02:19:45 +0000 (+0000) Subject: bound queue size, clean up code X-Git-Tag: initial-import-from-subversion-38251~23823 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=e40233928df9f3970d24f06103844489883c879d;p=oweals%2Fgnunet.git bound queue size, clean up code --- diff --git a/src/core/gnunet-service-core.c b/src/core/gnunet-service-core.c index 8f64bf366..f7f5a5fe7 100644 --- a/src/core/gnunet-service-core.c +++ b/src/core/gnunet-service-core.c @@ -24,16 +24,9 @@ * @author Christian Grothoff * * TODO: - * TESTING: - * - write test for basic core functions: - * + connect to peer - * + transmit (encrypted) message [with handshake] - * + receive (encrypted) message, forward plaintext to clients * POST-TESTING: * - revisit API (which arguments are used, needed)? - * - add code to bound queue size when handling client's SEND message * - add code to bound message queue size when passing messages to clients - * - add code to discard_expired_messages * - add code to re-transmit key if first attempt failed * + timeout on connect / key exchange, etc. * + timeout for automatic re-try, etc. @@ -82,6 +75,16 @@ #define DEFAULT_BPM_IN_OUT 2048 +/** + * After how much time past the "official" expiration time do + * we discard messages? Should not be zero since we may + * intentionally defer transmission until close to the deadline + * and then may be slightly past the deadline due to inaccuracy + * in sleep and our own CPU consumption. + */ +#define PAST_EXPIRATION_DISCARD_TIME GNUNET_TIME_UNIT_SECONDS + + /** * What is the maximum delay for a SET_KEY message? */ @@ -124,6 +127,12 @@ #define PONG_PRIORITY 0xFFFFFF +/** + * How many messages do we queue per peer at most? + */ +#define MAX_PEER_QUEUE_SIZE 16 + + /** * What is the maximum age of a message for us to consider * processing it? Note that this looks at the timestamp used @@ -933,6 +942,7 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { struct Client *pos; struct Client *prev; + struct Event *e; #if DEBUG_CORE_CLIENT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -950,6 +960,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) prev->next = pos->next; if (pos->th != NULL) GNUNET_NETWORK_notify_transmit_ready_cancel (pos->th); + while (NULL != (e = pos->event_head)) + { + pos->event_head = e->next; + GNUNET_free (e); + } GNUNET_free (pos); return; } @@ -1104,6 +1119,8 @@ static void process_plaintext_neighbour_queue (struct Neighbour *n); static void process_encrypted_neighbour_queue (struct Neighbour *n) { + struct MessageEntry *m; + if (n->th != NULL) return; /* request already pending */ if (n->encrypted_head == NULL) @@ -1132,7 +1149,13 @@ process_encrypted_neighbour_queue (struct Neighbour *n) { /* message request too large (oops) */ GNUNET_break (0); - /* FIXME: handle error somehow! */ + /* discard encrypted message */ + GNUNET_assert (NULL != (m = n->encrypted_head)); + n->encrypted_head = m->next; + if (m->next == NULL) + n->encrypted_tail = NULL; + GNUNET_free (m); + process_encrypted_neighbour_queue (n); } } @@ -1448,7 +1471,29 @@ batch_message (struct Neighbour *n, static void discard_expired_messages (struct Neighbour *n) { - /* FIXME */ + struct MessageEntry *prev; + struct MessageEntry *next; + struct MessageEntry *pos; + struct GNUNET_TIME_Absolute cutoff; + + cutoff = GNUNET_TIME_relative_to_absolute(PAST_EXPIRATION_DISCARD_TIME); + prev = NULL; + pos = n->messages; + while (pos != NULL) + { + next = pos->next; + if (pos->deadline.value < cutoff.value) + { + if (prev == NULL) + n->messages = next; + else + prev->next = next; + GNUNET_free (pos); + } + else + prev = pos; + pos = next; + } } @@ -1651,8 +1696,6 @@ send_connect_continuation (void *cls, size_t size, void *buf) GNUNET_i2s (&sm->peer)); #endif GNUNET_free (sm); - /* FIXME: do we need to do something here to let the - client know about the failure!? */ return 0; } #if DEBUG_CORE @@ -1678,9 +1721,13 @@ handle_client_send (void *cls, struct SendMessage *smc; const struct GNUNET_MessageHeader *mh; struct Neighbour *n; - struct MessageEntry *pred; + struct MessageEntry *prev; struct MessageEntry *pos; - struct MessageEntry *e; + struct MessageEntry *e; + struct MessageEntry *min_prio_entry; + struct MessageEntry *min_prio_prev; + unsigned int min_prio; + unsigned int queue_size; uint16_t msize; msize = ntohs (message->size); @@ -1714,20 +1761,26 @@ handle_client_send (void *cls, #endif msize += sizeof (struct SendMessage); /* ask transport to connect to the peer */ - /* FIXME: this code does not handle the - case where we get multiple SendMessages before - transport responds to this request; - => need to track pending requests! */ smc = GNUNET_malloc (msize); memcpy (smc, sm, msize); - GNUNET_TRANSPORT_notify_transmit_ready (transport, - &sm->peer, - 0, - GNUNET_TIME_absolute_get_remaining - (GNUNET_TIME_absolute_ntoh - (sm->deadline)), - &send_connect_continuation, - smc); + if (NULL == + GNUNET_TRANSPORT_notify_transmit_ready (transport, + &sm->peer, + 0, + GNUNET_TIME_absolute_get_remaining + (GNUNET_TIME_absolute_ntoh + (sm->deadline)), + &send_connect_continuation, + smc)) + { + /* transport has already a request pending for this peer! */ +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Dropped second message destined for `%4s' since connection is still down.\n", + GNUNET_i2s(&sm->peer)); +#endif + GNUNET_free (smc); + } if (client != NULL) GNUNET_SERVER_receive_done (client, GNUNET_OK); return; @@ -1739,7 +1792,50 @@ handle_client_send (void *cls, msize, GNUNET_i2s (&sm->peer)); #endif - /* FIXME: consider bounding queue size */ + /* bound queue size */ + discard_expired_messages (n); + min_prio = (unsigned int) -1; + queue_size = 0; + prev = NULL; + pos = n->messages; + while (pos != NULL) + { + if (pos->priority < min_prio) + { + min_prio_entry = pos; + min_prio_prev = prev; + min_prio = pos->priority; + } + queue_size++; + prev = pos; + pos = pos->next; + } + if (queue_size >= MAX_PEER_QUEUE_SIZE) + { + /* queue full */ + if (ntohl(sm->priority) <= min_prio) + { + /* discard new entry */ +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue full, discarding new request\n"); +#endif + if (client != NULL) + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; + } + /* discard "min_prio_entry" */ +#if DEBUG_CORE + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queue full, discarding existing older request\n"); +#endif + if (min_prio_prev == NULL) + n->messages = min_prio_entry->next; + else + min_prio_prev->next = min_prio_entry->next; + GNUNET_free (min_prio_entry); + } + e = GNUNET_malloc (sizeof (struct MessageEntry) + msize); e->deadline = GNUNET_TIME_absolute_ntoh (sm->deadline); e->priority = ntohl (sm->priority); @@ -1747,17 +1843,17 @@ handle_client_send (void *cls, memcpy (&e[1], mh, msize); /* insert, keep list sorted by deadline */ - pred = NULL; + prev = NULL; pos = n->messages; while ((pos != NULL) && (pos->deadline.value < e->deadline.value)) { - pred = pos; + prev = pos; pos = pos->next; } - if (pred == NULL) + if (prev == NULL) n->messages = e; else - pred->next = e; + prev->next = e; e->next = pos; /* consider scheduling now */ @@ -2813,6 +2909,7 @@ static void cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Neighbour *n; + struct Client *c; #if DEBUG_CORE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2826,6 +2923,8 @@ cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) neighbours = n->next; free_neighbour (n); } + while (NULL != (c = clients)) + handle_client_disconnect (NULL, c->client_handle); } @@ -2926,10 +3025,6 @@ cleanup (void *cls, struct GNUNET_CONFIGURATION_Handle *cfg) if (my_private_key != NULL) GNUNET_CRYPTO_rsa_key_free (my_private_key); - /* - FIXME: - - free clients - */ }