This file is part of GNUnet
Copyright (C) 2004-2013, 2016 GNUnet e.V.
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
+ GNUnet is free software: you can redistribute it and/or modify it
+ under the terms of the GNU Affero General Public License as published
+ by the Free Software Foundation, either version 3 of the License,
+ or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
- Boston, MA 02110-1301, USA.
+ Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
+#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
+
/**
* Collect an instane number of statistics? May cause excessive IPC.
*/
*/
struct GNUNET_MQ_Envelope *env;
+ /**
+ * Task we run if this entry stalls the queue and we
+ * need to warn the user.
+ */
+ struct GNUNET_SCHEDULER_Task *delay_warn_task;
+
/**
* Priority in the queue.
*/
h->queue_size--;
if (NULL != qe->env)
GNUNET_MQ_discard (qe->env);
+ if (NULL != qe->delay_warn_task)
+ GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
GNUNET_free (qe);
}
+/**
+ * Task that logs an error after some time.
+ *
+ * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
+ */
+static void
+delay_warning (void *cls)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe = cls;
+
+ qe->delay_warn_task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Request %p of type %u at head of datastore queue for more than %s\n",
+ qe,
+ (unsigned int) qe->response_type,
+ GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
+ GNUNET_YES));
+ qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
+ &delay_warning,
+ qe);
+}
+
+
/**
* Handle error in sending drop request to datastore.
*
"MQ error, reconnecting to DATASTORE\n");
do_disconnect (h);
qe = h->queue_head;
- if ( (NULL != qe) &&
- (NULL == qe->env) )
+ if (NULL == qe)
+ return;
+ if (NULL != qe->delay_warn_task)
+ {
+ GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
+ qe->delay_warn_task = NULL;
+ }
+ if (NULL == qe->env)
{
union QueueContext qc = qe->qc;
uint16_t rt = qe->response_type;
qc.rc.proc (qc.rc.proc_cls,
NULL,
0,
- NULL, 0, 0, 0,
+ NULL,
+ 0,
+ 0,
+ 0,
+ 0,
GNUNET_TIME_UNIT_ZERO_ABS,
0);
break;
qe->qc.rc.proc (qe->qc.rc.proc_cls,
NULL,
0,
- NULL, 0, 0, 0,
+ NULL,
+ 0,
+ 0,
+ 0,
+ 0,
GNUNET_TIME_UNIT_ZERO_ABS,
0);
break;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Re-connecting to issue DROP!\n");
GNUNET_assert (NULL == h->mq);
- h->mq = GNUNET_CLIENT_connecT (h->cfg,
+ h->mq = GNUNET_CLIENT_connect (h->cfg,
"datastore",
NULL,
&disconnect_on_mq_error,
struct GNUNET_DATASTORE_QueueEntry *pos;
unsigned int c;
- c = 0;
- pos = h->queue_head;
+ if ( (NULL != h->queue_tail) &&
+ (h->queue_tail->priority >= queue_priority) )
+ {
+ c = h->queue_size;
+ pos = NULL;
+ }
+ else
+ {
+ c = 0;
+ pos = h->queue_head;
+ }
while ( (NULL != pos) &&
(c < max_queue_size) &&
(pos->priority >= queue_priority) )
"Not connected\n");
return;
}
+ GNUNET_assert (NULL == qe->delay_warn_task);
+ qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
+ &delay_warning,
+ qe);
GNUNET_MQ_send (h->mq,
qe->env);
qe->env = NULL;
}
+/**
+ * Get the entry at the head of the message queue.
+ *
+ * @param h handle to the datastore
+ * @param response_type the expected response type
+ * @return the queue entry
+ */
+static struct GNUNET_DATASTORE_QueueEntry *
+get_queue_head (struct GNUNET_DATASTORE_Handle *h,
+ uint16_t response_type)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ if (h->skip_next_messages > 0)
+ {
+ h->skip_next_messages--;
+ process_queue (h);
+ return NULL;
+ }
+ qe = h->queue_head;
+ if (NULL == qe)
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return NULL;
+ }
+ if (NULL != qe->env)
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return NULL;
+ }
+ if (response_type != qe->response_type)
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return NULL;
+ }
+ return qe;
+}
/**
const char *emsg;
int32_t status = ntohl (sm->status);
- if (h->skip_next_messages > 0)
- {
- h->skip_next_messages--;
- process_queue (h);
- return;
- }
- if (NULL == (qe = h->queue_head))
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
- if (NULL != qe->env)
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
- if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
- {
- GNUNET_break (0);
- do_disconnect (h);
+ qe = get_queue_head (h,
+ GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
+ if (NULL == qe)
return;
- }
rc = qe->qc.sc;
free_queue_entry (qe);
if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
struct GNUNET_DATASTORE_QueueEntry *qe;
struct ResultContext rc;
- if (h->skip_next_messages > 0)
- {
- process_queue (h);
- return;
- }
- qe = h->queue_head;
+ qe = get_queue_head (h,
+ GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
if (NULL == qe)
- {
- GNUNET_break (0);
- do_disconnect (h);
return;
- }
- if (NULL != qe->env)
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
- if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
#if INSANE_STATISTICS
GNUNET_STATISTICS_update (h->stats,
gettext_noop ("# Results received"),
ntohl (dm->type),
ntohl (dm->priority),
ntohl (dm->anonymity),
+ ntohl (dm->replication),
GNUNET_TIME_absolute_ntoh (dm->expiration),
GNUNET_ntohll (dm->uid));
}
struct GNUNET_DATASTORE_QueueEntry *qe;
struct ResultContext rc;
- if (h->skip_next_messages > 0)
- {
- h->skip_next_messages--;
- process_queue (h);
- return;
- }
- qe = h->queue_head;
+ qe = get_queue_head (h,
+ GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
if (NULL == qe)
- {
- GNUNET_break (0);
- do_disconnect (h);
return;
- }
- if (NULL != qe->env)
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
- if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
- {
- GNUNET_break (0);
- do_disconnect (h);
- return;
- }
rc = qe->qc.rc;
free_queue_entry (qe);
LOG (GNUNET_ERROR_TYPE_DEBUG,
0,
0,
0,
+ 0,
GNUNET_TIME_UNIT_ZERO_ABS,
0);
}
static void
try_reconnect (void *cls)
{
- GNUNET_MQ_hd_var_size (status,
- GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
- struct StatusMessage);
- GNUNET_MQ_hd_var_size (data,
- GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
- struct DataMessage);
- GNUNET_MQ_hd_fixed_size (data_end,
- GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
- struct GNUNET_MessageHeader);
struct GNUNET_DATASTORE_Handle *h = cls;
struct GNUNET_MQ_MessageHandler handlers[] = {
- make_status_handler (h),
- make_data_handler (h),
- make_data_end_handler (h),
+ GNUNET_MQ_hd_var_size (status,
+ GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+ struct StatusMessage,
+ h),
+ GNUNET_MQ_hd_var_size (data,
+ GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+ struct DataMessage,
+ h),
+ GNUNET_MQ_hd_fixed_size (data_end,
+ GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
+ struct GNUNET_MessageHeader,
+ h),
GNUNET_MQ_handler_end ()
};
h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
h->reconnect_task = NULL;
GNUNET_assert (NULL == h->mq);
- h->mq = GNUNET_CLIENT_connecT (h->cfg,
+ h->mq = GNUNET_CLIENT_connect (h->cfg,
"datastore",
handlers,
&mq_error_handler,
struct DataMessage *dm;
union QueueContext qc;
- if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
return NULL;
dm->priority = htonl (priority);
dm->anonymity = htonl (anonymity);
dm->replication = htonl (replication);
- dm->reserved = htonl (0);
- dm->uid = GNUNET_htonll (0);
dm->expiration = GNUNET_TIME_absolute_hton (expiration);
dm->key = *key;
GNUNET_memcpy (&dm[1],
}
-/**
- * Update a value in the datastore.
- *
- * @param h handle to the datastore
- * @param uid identifier for the value
- * @param priority how much to increase the priority of the value
- * @param expiration new expiration value should be MAX of existing and this argument
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- * (if other requests of higher priority are in the queue)
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- * cancel; note that even if NULL is returned, the callback will be invoked
- * (or rather, will already have been invoked)
- */
-struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
- uint64_t uid,
- uint32_t priority,
- struct GNUNET_TIME_Absolute expiration,
- unsigned int queue_priority,
- unsigned int max_queue_size,
- GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls)
-{
- struct GNUNET_DATASTORE_QueueEntry *qe;
- struct GNUNET_MQ_Envelope *env;
- struct UpdateMessage *um;
- union QueueContext qc;
-
- if (NULL == cont)
- cont = &drop_status_cont;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to update entry %llu raising priority by %u and expiration to %s\n",
- uid,
- (unsigned int) priority,
- GNUNET_STRINGS_absolute_time_to_string (expiration));
- env = GNUNET_MQ_msg (um,
- GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
- um->priority = htonl (priority);
- um->expiration = GNUNET_TIME_absolute_hton (expiration);
- um->uid = GNUNET_htonll (uid);
-
- qc.sc.cont = cont;
- qc.sc.cont_cls = cont_cls;
- qe = make_queue_entry (h,
- env,
- queue_priority,
- max_queue_size,
- GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
- &qc);
- if (NULL == qe)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Could not create queue entry for UPDATE\n");
- return NULL;
- }
- GNUNET_STATISTICS_update (h->stats,
- gettext_noop ("# UPDATE requests executed"), 1,
- GNUNET_NO);
- process_queue (h);
- return qe;
-}
-
-
/**
* Explicitly remove some content from the database.
* The @a cont continuation will be called with `status`
struct GNUNET_MQ_Envelope *env;
union QueueContext qc;
- if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
{
GNUNET_break (0);
return NULL;
env = GNUNET_MQ_msg_extra (dm,
size,
GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
- dm->rid = htonl (0);
dm->size = htonl (size);
- dm->type = htonl (0);
- dm->priority = htonl (0);
- dm->anonymity = htonl (0);
- dm->uid = GNUNET_htonll (0);
- dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
dm->key = *key;
GNUNET_memcpy (&dm[1],
data,
* Get a single zero-anonymity value from the datastore.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
* @param queue_priority ranking of this request in the priority queue
* @param max_queue_size at what queue size should this request be dropped
* (if other requests of higher priority are in the queue)
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
unsigned int queue_priority,
unsigned int max_queue_size,
enum GNUNET_BLOCK_Type type,
GNUNET_assert (NULL != proc);
GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Asked to get %llu-th zero-anonymity entry of type %d\n",
- (unsigned long long) offset,
+ "Asked to get a zero-anonymity entry of type %d\n",
type);
env = GNUNET_MQ_msg (m,
GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
m->type = htonl ((uint32_t) type);
- m->offset = GNUNET_htonll (offset);
+ m->next_uid = GNUNET_htonll (next_uid);
qc.rc.proc = proc;
qc.rc.proc_cls = proc_cls;
qe = make_queue_entry (h,
* will only be called once.
*
* @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- * a random 64-bit value initially; then increment by
- * one each time; detect that all results have been found by uid
- * being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
* @param queue_priority ranking of this request in the priority queue
*/
struct GNUNET_DATASTORE_QueueEntry *
GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
- uint64_t offset,
+ uint64_t next_uid,
+ bool random,
const struct GNUNET_HashCode *key,
enum GNUNET_BLOCK_Type type,
unsigned int queue_priority,
env = GNUNET_MQ_msg (gm,
GNUNET_MESSAGE_TYPE_DATASTORE_GET);
gm->type = htonl (type);
- gm->offset = GNUNET_htonll (offset);
+ gm->next_uid = GNUNET_htonll (next_uid);
+ gm->random = random;
}
else
{
env = GNUNET_MQ_msg (gkm,
GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
gkm->type = htonl (type);
- gkm->offset = GNUNET_htonll (offset);
+ gkm->next_uid = GNUNET_htonll (next_uid);
+ gkm->random = random;
gkm->key = *key;
}
qc.rc.proc = proc;