From 63b2e5ce20544b22da822848d6e3f3c495f381c3 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 24 Jun 2016 15:45:12 +0000 Subject: [PATCH] fix over-allocation in datastore API --- src/datastore/datastore_api.c | 167 +++++++++++++++++++++++----------- 1 file changed, 112 insertions(+), 55 deletions(-) diff --git a/src/datastore/datastore_api.c b/src/datastore/datastore_api.c index 832829e24..b2de3d35d 100644 --- a/src/datastore/datastore_api.c +++ b/src/datastore/datastore_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2004-2013 GNUnet e.V. + Copyright (C) 2004-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -57,7 +57,7 @@ struct StatusContext GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for cont. + * Closure for @e cont. */ void *cont_cls; @@ -75,7 +75,7 @@ struct ResultContext GNUNET_DATASTORE_DatumProcessor proc; /** - * Closure for proc. + * Closure for @e proc. */ void *proc_cls; @@ -130,7 +130,7 @@ struct GNUNET_DATASTORE_QueueEntry GNUNET_DATASTORE_ContinuationWithStatus cont; /** - * Closure for 'cont'. + * Closure for @e cont. */ void *cont_cls; @@ -142,7 +142,7 @@ struct GNUNET_DATASTORE_QueueEntry /** * Task for timeout signalling. */ - struct GNUNET_SCHEDULER_Task * task; + struct GNUNET_SCHEDULER_Task *task; /** * Timeout for the current operation. @@ -169,7 +169,7 @@ struct GNUNET_DATASTORE_QueueEntry /** * Has this message been transmitted to the service? - * Only ever GNUNET_YES for the head of the queue. + * Only ever #GNUNET_YES for the head of the queue. * Note that the overall struct should end at a * multiple of 64 bits. */ @@ -249,7 +249,6 @@ struct GNUNET_DATASTORE_Handle }; - /** * Connect to the datastore service. * @@ -265,8 +264,7 @@ GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) c = GNUNET_CLIENT_connect ("datastore", cfg); if (c == NULL) return NULL; /* oops */ - h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) + - GNUNET_SERVER_MAX_MESSAGE_SIZE - 1); + h = GNUNET_new (struct GNUNET_DATASTORE_Handle); h->client = c; h->cfg = cfg; h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg); @@ -284,7 +282,8 @@ disconnect_after_drop (void *cls) { struct GNUNET_DATASTORE_Handle *h = cls; - GNUNET_DATASTORE_disconnect (h, GNUNET_NO); + GNUNET_DATASTORE_disconnect (h, + GNUNET_NO); } @@ -313,7 +312,8 @@ transmit_drop (void *cls, size_t size, void *buf) hdr = buf; hdr->size = htons (sizeof (struct GNUNET_MessageHeader)); hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP); - GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h); + GNUNET_SCHEDULER_add_now (&disconnect_after_drop, + h); return sizeof (struct GNUNET_MessageHeader); } @@ -331,7 +331,8 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, { struct GNUNET_DATASTORE_QueueEntry *qe; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Datastore disconnect\n"); if (NULL != h->th) { GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); @@ -363,7 +364,8 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, GNUNET_MessageHeader), GNUNET_TIME_UNIT_SECONDS, GNUNET_YES, - &transmit_drop, h)) + &transmit_drop, + h)) return; GNUNET_CLIENT_disconnect (h->client); h->client = NULL; @@ -389,7 +391,8 @@ timeout_queue_entry (void *cls) struct GNUNET_DATASTORE_Handle *h = qe->h; GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# queue entry timeouts"), 1, + gettext_noop ("# queue entry timeouts"), + 1, GNUNET_NO); qe->task = NULL; GNUNET_assert (GNUNET_NO == qe->was_transmitted); @@ -444,7 +447,9 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, } if (c >= max_queue_size) { - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1, + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue overflows"), + 1, GNUNET_NO); return NULL; } @@ -472,12 +477,18 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, } c++; #if INSANE_STATISTICS - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"), + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# queue entries created"), 1, GNUNET_NO); #endif - GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret); + GNUNET_CONTAINER_DLL_insert_after (h->queue_head, + h->queue_tail, + pos, + ret); h->queue_size++; - ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret); + ret->task = GNUNET_SCHEDULER_add_delayed (timeout, + &timeout_queue_entry, + ret); for (pos = ret->next; NULL != pos; pos = pos->next) { if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO)) @@ -488,8 +499,12 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h, LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n"); /* response_proc's expect request at the head of the queue! */ - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos); - GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + pos); + GNUNET_CONTAINER_DLL_insert (h->queue_head, + h->queue_tail, + pos); GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Requests dropped from datastore queue"), 1, @@ -559,7 +574,9 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h) h->skip_next_messages = 0; h->client = NULL; h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h); + GNUNET_SCHEDULER_add_delayed (h->retry_time, + &try_reconnect, + h); } @@ -700,10 +717,12 @@ process_queue (struct GNUNET_DATASTORE_Handle *h) "Queueing %u byte request to DATASTORE\n", qe->message_size); h->th - = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size, + = GNUNET_CLIENT_notify_transmit_ready (h->client, + qe->message_size, GNUNET_TIME_absolute_get_remaining (qe->timeout), GNUNET_YES, - &transmit_request, h); + &transmit_request, + h); GNUNET_assert (GNUNET_NO == h->in_receive); GNUNET_break (NULL != h->th); } @@ -738,7 +757,9 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe) { struct GNUNET_DATASTORE_Handle *h = qe->h; - GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe); + GNUNET_CONTAINER_DLL_remove (h->queue_head, + h->queue_tail, + qe); if (qe->task != NULL) { GNUNET_SCHEDULER_cancel (qe->task); @@ -883,7 +904,8 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, union QueueContext qc; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to put %u bytes of data under key `%s' for %s\n", size, + "Asked to put %u bytes of data under key `%s' for %s\n", + size, GNUNET_h2s (key), GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration), GNUNET_YES)); @@ -896,14 +918,18 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + &process_status_message, + &qc); if (qe == NULL) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for PUT\n"); return NULL; } - GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"), - 1, GNUNET_NO); + GNUNET_STATISTICS_update (h->stats, + gettext_noop ("# PUT requests executed"), + 1, + GNUNET_NO); dm = (struct DataMessage *) &qe[1]; dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT); dm->header.size = htons (msize); @@ -939,7 +965,8 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, +GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, + uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls) @@ -952,7 +979,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, cont = &drop_status_cont; LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to reserve %llu bytes of data and %u entries\n", - (unsigned long long) amount, (unsigned int) entries); + (unsigned long long) amount, + (unsigned int) entries); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; qe = make_queue_entry (h, @@ -960,7 +988,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, UINT_MAX, UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - &process_status_message, &qc); + &process_status_message, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -968,7 +997,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, return NULL; } GNUNET_STATISTICS_update (h->stats, - gettext_noop ("# RESERVE requests executed"), 1, + gettext_noop ("# RESERVE requests executed"), + 1, GNUNET_NO); rm = (struct ReserveMessage *) &qe[1]; rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE); @@ -1003,7 +1033,8 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, - uint32_t rid, unsigned int queue_priority, + uint32_t rid, + unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, @@ -1015,12 +1046,18 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, if (cont == NULL) cont = &drop_status_cont; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to release reserve %d\n", + rid); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; - qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage), - queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, + sizeof (struct ReleaseReserveMessage), + queue_priority, + max_queue_size, + timeout, + &process_status_message, + &qc); if (qe == NULL) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1058,7 +1095,8 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h, * (or rather, will already have been invoked) */ struct GNUNET_DATASTORE_QueueEntry * -GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, +GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, + uint64_t uid, uint32_t priority, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, @@ -1125,8 +1163,10 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid, */ struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, - const struct GNUNET_HashCode * key, size_t size, - const void *data, unsigned int queue_priority, + const struct GNUNET_HashCode *key, + size_t size, + const void *data, + unsigned int queue_priority, unsigned int max_queue_size, struct GNUNET_TIME_Relative timeout, GNUNET_DATASTORE_ContinuationWithStatus cont, @@ -1139,17 +1179,25 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h, if (cont == NULL) cont = &drop_status_cont; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n", - size, GNUNET_h2s (key)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asked to remove %u bytes under key `%s'\n", + size, + GNUNET_h2s (key)); qc.sc.cont = cont; qc.sc.cont_cls = cont_cls; msize = sizeof (struct DataMessage) + size; GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE); - qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout, - &process_status_message, &qc); + qe = make_queue_entry (h, + msize, + queue_priority, + max_queue_size, + timeout, + &process_status_message, + &qc); if (qe == NULL) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not create queue entry for REMOVE\n"); return NULL; } GNUNET_STATISTICS_update (h->stats, @@ -1372,13 +1420,19 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY); LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get %llu-th zero-anonymity entry of type %d in %s\n", - (unsigned long long) offset, type, - GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES)); + (unsigned long long) offset, + type, + GNUNET_STRINGS_relative_time_to_string (timeout, + GNUNET_YES)); qc.rc.proc = proc; qc.rc.proc_cls = proc_cls; - qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage), - queue_priority, max_queue_size, timeout, - &process_result_message, &qc); + qe = make_queue_entry (h, + sizeof (struct GetZeroAnonymityMessage), + queue_priority, + max_queue_size, + timeout, + &process_result_message, + &qc); if (NULL == qe) { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -1423,7 +1477,7 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h, struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset, - const struct GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, @@ -1450,7 +1504,8 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, &qc); if (qe == NULL) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n", + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Could not queue request for `%s'\n", GNUNET_h2s (key)); return NULL; } @@ -1493,8 +1548,10 @@ GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe) GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted); h = qe->h; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Pending DATASTORE request %p cancelled (%d, %d)\n", qe, - qe->was_transmitted, h->queue_head == qe); + "Pending DATASTORE request %p cancelled (%d, %d)\n", + qe, + qe->was_transmitted, + h->queue_head == qe); if (GNUNET_YES == qe->was_transmitted) { free_queue_entry (qe); -- 2.25.1