/*
This file is part of GNUnet
- (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
#include "gnunet_datastore_service.h"
#include "datastore.h"
+/**
+ * Entry in our priority queue.
+ */
+struct QueueEntry
+{
+
+ /**
+ * This is a linked list.
+ */
+ struct QueueEntry *next;
+
+ /**
+ * This is a linked list.
+ */
+ struct QueueEntry *prev;
+
+ /**
+ * Handle to the master context.
+ */
+ struct GNUNET_DATASTORE_Handle *h;
+
+ /**
+ * Task for timeout signalling.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+
+ /**
+ * Timeout for the current operation.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Priority in the queue.
+ */
+ unsigned int priority;
+
+ /**
+ * Maximum allowed length of queue (otherwise
+ * this request should be discarded).
+ */
+ unsigned int max_queue;
+
+ /**
+ * Number of bytes in the request message following
+ * this struct.
+ */
+ uint16_t message_size;
+
+ /**
+ * Has this message been transmitted to the service?
+ * Only ever GNUNET_YES for the head of the queue.
+ */
+ int16_t was_transmitted;
+
+ /**
+ * Response processor (NULL if we are not waiting for a response).
+ * This struct should be used for the closure, function-specific
+ * arguments can be passed via 'client_ctx'.
+ */
+ GNUNET_CLIENT_MessageHandler response_proc;
+
+ /**
+ * Specific context (variable argument that
+ * can be used by the response processor).
+ */
+ void *client_ctx;
+
+};
+
/**
* Handle to the datastore service. Followed
* by 65536 bytes used for storing messages.
struct GNUNET_CLIENT_Connection *client;
/**
- * Current response processor (NULL if we are not waiting for a
- * response). The specific type depends on the kind of message we
- * just transmitted.
+ * Current head of priority queue.
*/
- void *response_proc;
-
- /**
- * Closure for response_proc.
- */
- void *response_proc_cls;
+ struct QueueEntry *queue_head;
/**
- * Timeout for the current operation.
+ * Current tail of priority queue.
*/
- struct GNUNET_TIME_Absolute timeout;
+ struct QueueEntry *queue_tail;
/**
- * Number of bytes in the message following
- * this struct, 0 if we have no request pending.
+ * Number of entries in the queue.
*/
- size_t message_size;
+ unsigned int queue_size;
};
* @param sched scheduler to use
* @return handle to use to access the service
*/
-struct GNUNET_DATASTORE_Handle *GNUNET_DATASTORE_connect (const struct
- GNUNET_CONFIGURATION_Handle
- *cfg,
- struct
- GNUNET_SCHEDULER_Handle
- *sched)
+struct GNUNET_DATASTORE_Handle *
+GNUNET_DATASTORE_connect (const struct
+ GNUNET_CONFIGURATION_Handle
+ *cfg,
+ struct
+ GNUNET_SCHEDULER_Handle
+ *sched)
{
struct GNUNET_CLIENT_Connection *c;
struct GNUNET_DATASTORE_Handle *h;
/**
* Transmit DROP message to datastore service.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param size number of bytes that can be copied to buf
+ * @param buf where to copy the drop message
+ * @return number of bytes written to buf
*/
static size_t
transmit_drop (void *cls,
- size_t size, void *buf)
+ size_t size,
+ void *buf)
{
struct GNUNET_DATASTORE_Handle *h = cls;
struct GNUNET_MessageHeader *hdr;
void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
int drop)
{
+ struct QueueEntry *qe;
+
if (h->client != NULL)
GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
h->client = NULL;
+ while (NULL != (qe = h->queue_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (h->queue_head,
+ h->queue_tail,
+ qe);
+ if (NULL != qe->response_proc)
+ qe->response_proc (qe, NULL);
+ GNUNET_free (qe);
+ }
if (GNUNET_YES == drop)
{
h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
}
+#if 0
/**
* Type of a function to call when we receive a message
* from the service. This specific function is used
memcpy (&dm[1], data, size);
transmit_for_status (h, cont, cont_cls, timeout);
}
-
+#endif
/* end of datastore_api.c */
*/
static struct GNUNET_CORE_Handle *core;
+/**
+ * Are we allowed to migrate content to this peer.
+ */
+static int active_migration;
/* ******************* clean up functions ************************ */
}
+
+/**
+ * Continuation called to notify client about result of the
+ * operation.
+ *
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
+ */
+static void
+put_migration_continuation (void *cls,
+ int success,
+ const char *msg)
+{
+ /* FIXME */
+}
+
+
/**
* Handle P2P "PUT" message.
*
&query,
&process_reply,
&prq);
- // FIXME: if migration is on and load is low,
- // queue to store data in datastore;
- // use "prq.priority" for that!
+ if (GNUNET_YES == active_migration)
+ {
+ GNUNET_DATASTORE_put (NULL /* FIXME */,
+ 0, &query, dsize, &put[1],
+ type, prq.priority, 1 /* anonymity */,
+ expiration,
+ 0, 64 /* FIXME: use define */,
+ GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+ &put_migration_continuation,
+ NULL);
+ }
return GNUNET_OK;
}
struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
+ active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
+ "FS",
+ "ACTIVEMIGRATION");
if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
(GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
(GNUNET_OK != main_init (sched, server, cfg)) )
/*
This file is part of GNUnet
- (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
* operation.
*
* @param cls closure
- * @param success GNUNET_SYSERR on failure
+ * @param success GNUNET_SYSERR on failure,
+ * GNUNET_NO on timeout/queue drop
+ * GNUNET_YES on success
* @param msg NULL on success, otherwise an error message
*/
typedef void (*GNUNET_DATASTORE_ContinuationWithStatus)(void *cls,
* @param h handle to the datastore
* @param amount how much space (in bytes) should be reserved (for content only)
* @param entries how many entries will be created (to calculate per-entry overhead)
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response (or before dying in queue)
* @param cont continuation to call when done; "success" will be set to
* a positive reservation value if space could be reserved.
* @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
uint64_t amount,
uint32_t entries,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls,
- struct GNUNET_TIME_Relative timeout);
+ void *cont_cls);
/**
* @param priority priority of the content
* @param anonymity anonymity-level for the content
* @param expiration expiration time for the content
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
* @param timeout timeout for the operation
* @param cont continuation to call when done
* @param cont_cls closure for cont
uint32_t priority,
uint32_t anonymity,
struct GNUNET_TIME_Absolute expiration,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_ContinuationWithStatus cont,
void *cont_cls);
* @param h handle to the datastore
* @param rid reservation ID (value of "success" in original continuation
* from the "reserve" function).
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
int rid,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls,
- struct GNUNET_TIME_Relative timeout);
+ void *cont_cls);
/**
* @param uid identifier for the value
* @param priority how much to increase the priority of the value
* @param expiration new expiration value should be MAX of existing and this argument
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
unsigned long long uid,
uint32_t priority,
struct GNUNET_TIME_Absolute expiration,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls,
- struct GNUNET_TIME_Relative timeout);
+ void *cont_cls);
/**
* @param h handle to the datastore
* @param key maybe NULL (to match all entries)
* @param type desired type, 0 for any
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
* @param iter function to call on each matching value;
* will be called once with a NULL value at the end
* @param iter_cls closure for iter
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode * key,
enum GNUNET_BLOCK_Type type,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_Iterator iter,
- void *iter_cls,
- struct GNUNET_TIME_Relative timeout);
+ void *iter_cls);
/**
* Get a random value from the datastore.
*
* @param h handle to the datastore
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
* @param iter function to call on a random value; it
* will be called once with a value (if available)
* and always once with a value of NULL.
* @param iter_cls closure for iter
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
- GNUNET_DATASTORE_Iterator iter, void *iter_cls,
- struct GNUNET_TIME_Relative timeout);
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_Iterator iter,
+ void *iter_cls);
/**
* @param key key for the value
* @param size number of bytes in data
* @param data content stored
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
* @param cont continuation to call when done
* @param cont_cls closure for cont
- * @param timeout how long to wait at most for a response
*/
void
GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
const GNUNET_HashCode *key,
- uint32_t size, const void *data,
+ uint32_t size,
+ const void *data,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DATASTORE_ContinuationWithStatus cont,
- void *cont_cls,
- struct GNUNET_TIME_Relative timeout);
+ void *cont_cls);
#if 0 /* keep Emacsens' auto-indent happy */