#define DEBUG_DHT GNUNET_NO
typedef void (*GNUNET_DHT_MessageReceivedHandler) (void *cls,
- struct GNUNET_MessageHeader *msg);
+ struct GNUNET_MessageHeader
+ * msg);
/**
* Generic DHT message, wrapper for other message types
*/
GNUNET_HashCode key;
+ /**
+ * When does this entry expire?
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
/**
* The size of the data, appended to the end of this message.
*/
#include "gnunet_dht_service.h"
#include "dht.h"
-#define DEBUG_DHT_API GNUNET_NO
+#define DEBUG_DHT_API GNUNET_YES
#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
/**
* Handle to the actual route operation for the request
*/
- struct GNUNET_DHT_RouteHandle *route_handle;
+ struct GNUNET_DHT_RouteHandle *route_handle;
/**
* The context of the get request
*/
- struct GNUNET_DHT_FindPeerContext find_peer_context;
+ struct GNUNET_DHT_FindPeerContext find_peer_context;
};
static struct GNUNET_TIME_Relative default_request_timeout;
/* Forward declaration */
-static void process_pending_message(struct GNUNET_DHT_Handle *handle);
+static void process_pending_message (struct GNUNET_DHT_Handle *handle);
-static GNUNET_HashCode * hash_from_uid(uint64_t uid)
+static GNUNET_HashCode *
+hash_from_uid (uint64_t uid)
{
int count;
int remaining;
GNUNET_HashCode *hash;
- hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
+ hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
count = 0;
- while (count < sizeof(GNUNET_HashCode))
+ while (count < sizeof (GNUNET_HashCode))
{
- remaining = sizeof(GNUNET_HashCode) - count;
- if (remaining > sizeof(uid))
- remaining = sizeof(uid);
+ remaining = sizeof (GNUNET_HashCode) - count;
+ if (remaining > sizeof (uid))
+ remaining = sizeof (uid);
- memcpy(hash, &uid, remaining);
+ memcpy (hash, &uid, remaining);
count += remaining;
}
* a demultiplexer which handles numerous message types
*
*/
-void service_message_handler (void *cls,
- const struct GNUNET_MessageHeader *msg)
+void
+service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_DHT_Handle *handle = cls;
struct GNUNET_DHT_Message *dht_msg;
* Should be a non unique acknowledgment, or unique result. */
if (msg == NULL)
- {
+ {
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received NULL from server, connection down?\n", "DHT API");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Received NULL from server, connection down?\n",
+ "DHT API");
#endif
- return;
- }
+ return;
+ }
- if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
- {
- dht_msg = (struct GNUNET_DHT_Message *)msg;
- uid = GNUNET_ntohll(dht_msg->unique_id);
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_DHT:
+ {
+ dht_msg = (struct GNUNET_DHT_Message *) msg;
+ uid = GNUNET_ntohll (dht_msg->unique_id);
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Received response to message (uid %llu)\n",
+ "DHT API", uid);
#endif
- if (ntohs(dht_msg->unique))
- {
- uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
- route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
- GNUNET_free(uid_hash);
- if (route_handle == NULL) /* We have no recollection of this request */
+ if (ntohs (dht_msg->unique))
{
+ uid_hash = hash_from_uid (uid);
+ route_handle =
+ GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
+ uid_hash);
+ GNUNET_free (uid_hash);
+ if (route_handle == NULL) /* We have no recollection of this request */
+ {
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Received response to message (uid %llu), but have no recollection of it!\n",
+ "DHT API", uid);
#endif
+ }
+ else
+ {
+ enc_size =
+ ntohs (dht_msg->header.size) -
+ sizeof (struct GNUNET_DHT_Message);
+ GNUNET_assert (enc_size > 0);
+ enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
+ route_handle->iter (route_handle->iter_cls, enc_msg);
+
+ }
}
- else
- {
- enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
- GNUNET_assert(enc_size > 0);
- enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
- route_handle->iter(route_handle->iter_cls, enc_msg);
- }
+ break;
}
- }
- else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
- {
- stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
- uid = GNUNET_ntohll(stop_msg->unique_id);
-#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received response to message (uid %llu), current uid %llu\n", "DHT API", uid, handle->current->unique_id);
-#endif
- if (handle->current->unique_id == uid)
+ case GNUNET_MESSAGE_TYPE_DHT_STOP:
{
+ stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
+ uid = GNUNET_ntohll (stop_msg->unique_id);
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
+ "`%s': Received response to message (uid %llu), current uid %llu\n",
+ "DHT API", uid, handle->current->unique_id);
#endif
- if (handle->current->cont != NULL)
- GNUNET_SCHEDULER_add_continuation(handle->sched, handle->current->cont, handle->current->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-
- GNUNET_free(handle->current->msg);
- GNUNET_free(handle->current);
- handle->current = NULL;
- }
- }
- else
- {
+ if (handle->current->unique_id == uid)
+ {
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received unknown message type %d\n", "DHT API", ntohs(msg->type));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Have pending confirmation for this message!\n",
+ "DHT API", uid);
#endif
- }
-
+ if (handle->current->cont != NULL)
+ GNUNET_SCHEDULER_add_continuation (handle->sched,
+ handle->current->cont,
+ handle->current->cont_cls,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+
+ GNUNET_free (handle->current->msg);
+ GNUNET_free (handle->current);
+ handle->current = NULL;
+ }
+ break;
+ }
+ default:
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "`%s': Received unknown message type %d\n", "DHT API",
+ ntohs (msg->type));
+ }
+ }
GNUNET_CLIENT_receive (handle->client,
&service_message_handler,
handle, GNUNET_TIME_UNIT_FOREVER_REL);
{
struct GNUNET_DHT_Handle *handle;
- handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_Handle));
+ handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
- default_request_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
+ default_request_timeout =
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
handle->cfg = cfg;
handle->sched = sched;
handle->do_destroy = GNUNET_NO;
handle->th = NULL;
- handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
- handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
+ handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
+ handle->outstanding_requests =
+ GNUNET_CONTAINER_multihashmap_create (ht_len);
if (handle->client == NULL)
{
- GNUNET_free(handle);
+ GNUNET_free (handle);
return NULL;
}
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
#endif
- GNUNET_assert(handle != NULL);
+ GNUNET_assert (handle != NULL);
- if (handle->th != NULL) /* We have a live transmit request in the Aether */
+ if (handle->th != NULL) /* We have a live transmit request in the Aether */
{
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
}
- if (handle->current != NULL) /* We are trying to send something now, clean it up */
- GNUNET_free(handle->current);
+ if (handle->current != NULL) /* We are trying to send something now, clean it up */
+ GNUNET_free (handle->current);
- if (handle->client != NULL) /* Finally, disconnect from the service */
+ if (handle->client != NULL) /* Finally, disconnect from the service */
{
GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
handle->client = NULL;
/* TODO: if code is not GNUNET_OK, do something! */
struct PendingMessage *pos = handle->current;
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Finish called!\n", "DHT API");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
#endif
- GNUNET_assert(pos != NULL);
+ GNUNET_assert (pos != NULL);
if (pos->is_unique)
{
if (pos->cont != NULL)
- {
- if (code == GNUNET_SYSERR)
- GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
- else
- GNUNET_SCHEDULER_add_continuation(handle->sched, pos->cont, pos->cont_cls, GNUNET_SCHEDULER_REASON_PREREQ_DONE);
- }
-
- GNUNET_free(pos->msg);
+ {
+ if (code == GNUNET_SYSERR)
+ GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+ pos->cont_cls,
+ GNUNET_SCHEDULER_REASON_TIMEOUT);
+ else
+ GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+ pos->cont_cls,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+ }
+
+ GNUNET_free (pos->msg);
handle->current = NULL;
- GNUNET_free(pos);
+ GNUNET_free (pos);
}
/* Otherwise we need to wait for a response to this message! */
}
size_t tsize;
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': In transmit_pending\n", "DHT API");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': In transmit_pending\n", "DHT API");
#endif
if (buf == NULL)
{
"`%s': In transmit_pending buf is NULL\n", "DHT API");
#endif
/* FIXME: free associated resources or summat */
- finish(handle, GNUNET_SYSERR);
+ finish (handle, GNUNET_SYSERR);
return 0;
}
handle->th = NULL;
if (handle->current != NULL)
- {
- tsize = ntohs(handle->current->msg->size);
- if (size >= tsize)
{
+ tsize = ntohs (handle->current->msg->size);
+ if (size >= tsize)
+ {
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Sending message size %d\n", "DHT API", tsize);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Sending message size %d\n", "DHT API", tsize);
#endif
- memcpy(buf, handle->current->msg, tsize);
- finish(handle, GNUNET_OK);
- return tsize;
+ memcpy (buf, handle->current->msg, tsize);
+ finish (handle, GNUNET_OK);
+ return tsize;
+ }
+ else
+ {
+ return 0;
+ }
}
- else
- {
- return 0;
- }
- }
/* Have no pending request */
return 0;
}
/**
* Try to send messages from list of messages to send
*/
-static void process_pending_message(struct GNUNET_DHT_Handle *handle)
+static void
+process_pending_message (struct GNUNET_DHT_Handle *handle)
{
if (handle->current == NULL)
if (NULL ==
(handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
- ntohs(handle->current->msg->size),
- handle->current->timeout,
- GNUNET_YES,
- &transmit_pending, handle)))
+ ntohs (handle->
+ current->msg->
+ size),
+ handle->current->
+ timeout, GNUNET_YES,
+ &transmit_pending,
+ handle)))
{
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
}
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Scheduled sending message of size %d to service\n", "DHT API", ntohs(handle->current->msg->size));
+ "`%s': Scheduled sending message of size %d to service\n",
+ "DHT API", ntohs (handle->current->msg->size));
#endif
}
* Iterator called on each result obtained from a generic route
* operation
*/
-void get_reply_iterator (void *cls,
- const struct GNUNET_MessageHeader *reply)
+void
+get_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
{
+ struct GNUNET_DHT_GetHandle *get_handle = cls;
+ struct GNUNET_DHT_GetResultMessage *result;
+ size_t data_size;
+ char *result_data;
+ if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_GET_RESULT)
+ return;
+
+ GNUNET_assert (ntohs (reply->size) >=
+ sizeof (struct GNUNET_DHT_GetResultMessage));
+ result = (struct GNUNET_DHT_GetResultMessage *) reply;
+ data_size = ntohs (result->data_size);
+ GNUNET_assert (ntohs (reply->size) ==
+ sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
+ result_data = (char *) &result[1]; /* Set data pointer to end of message */
+
+ get_handle->get_context.iter (get_handle->get_context.iter_cls,
+ result->expiration, &result->key,
+ ntohs (result->type), data_size, result_data);
}
* Iterator called on each result obtained from a generic route
* operation
*/
-void find_peer_reply_iterator (void *cls,
- const struct GNUNET_MessageHeader *reply)
+void
+find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
{
+ struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
+ struct GNUNET_DHT_FindPeerResultMessage *result;
+ size_t data_size;
+ struct GNUNET_MessageHeader *result_data;
+ if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
+ return;
+
+ GNUNET_assert (ntohs (reply->size) >=
+ sizeof (struct GNUNET_DHT_FindPeerResultMessage));
+ result = (struct GNUNET_DHT_FindPeerResultMessage *) reply;
+ data_size = ntohs (result->data_size);
+ GNUNET_assert (ntohs (reply->size) ==
+ sizeof (struct GNUNET_DHT_GetResultMessage) + data_size);
+
+ if (data_size > 0)
+ result_data = (struct GNUNET_MessageHeader *) &result[1]; /* Set data pointer to end of message */
+ else
+ result_data = NULL;
+
+ find_peer_handle->find_peer_context.proc (find_peer_handle->
+ find_peer_context.proc_cls,
+ &result->peer, result_data);
}
/**
*/
struct GNUNET_DHT_RouteHandle *
GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
- const GNUNET_HashCode *key,
+ const GNUNET_HashCode * key,
unsigned int desired_replication_level,
enum GNUNET_DHT_RouteOption options,
const struct GNUNET_MessageHeader *enc,
struct GNUNET_TIME_Relative timeout,
GNUNET_DHT_ReplyProcessor iter,
void *iter_cls,
- GNUNET_SCHEDULER_Task cont,
- void *cont_cls)
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
struct GNUNET_DHT_RouteHandle *route_handle;
struct PendingMessage *pending;
uid_key = NULL;
do
- {
- GNUNET_free_non_null(uid_key);
- uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
- uid_key = hash_from_uid(uid);
- } while (GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) == GNUNET_YES);
+ {
+ GNUNET_free_non_null (uid_key);
+ uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
+ uid_key = hash_from_uid (uid);
+ }
+ while (GNUNET_CONTAINER_multihashmap_contains
+ (handle->outstanding_requests, uid_key) == GNUNET_YES);
if (is_unique)
{
- route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
- memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
+ route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
+ memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
route_handle->iter = iter;
route_handle->iter_cls = iter_cls;
route_handle->dht_handle = handle;
route_handle->uid = uid;
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Unique ID is %llu\n", "DHT API", uid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Unique ID is %llu\n", "DHT API", uid);
#endif
/**
* Store based on random identifier!
*/
- GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
+ GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
+ uid_key, route_handle,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
}
else
{
- msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
+ msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
}
- GNUNET_free(uid_key);
- message = GNUNET_malloc(msize);
- message->header.size = htons(msize);
- message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
- memcpy(&message->key, key, sizeof(GNUNET_HashCode));
- message->options = htons(options);
- message->desired_replication_level = htons(options);
- message->unique = htons(is_unique);
- message->unique_id = GNUNET_htonll(uid);
- memcpy(&message[1], enc, ntohs(enc->size));
-
- pending = GNUNET_malloc(sizeof(struct PendingMessage));
+ GNUNET_free (uid_key);
+ message = GNUNET_malloc (msize);
+ message->header.size = htons (msize);
+ message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
+ memcpy (&message->key, key, sizeof (GNUNET_HashCode));
+ message->options = htons (options);
+ message->desired_replication_level = htons (options);
+ message->unique = htons (is_unique);
+ message->unique_id = GNUNET_htonll (uid);
+ memcpy (&message[1], enc, ntohs (enc->size));
+
+ pending = GNUNET_malloc (sizeof (struct PendingMessage));
pending->msg = &message->header;
pending->timeout = timeout;
pending->cont = cont;
pending->is_unique = is_unique;
pending->unique_id = uid;
- GNUNET_assert(handle->current == NULL);
+ GNUNET_assert (handle->current == NULL);
handle->current = pending;
- process_pending_message(handle);
+ process_pending_message (handle);
return route_handle;
}
void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls);
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls);
/**
* Perform an asynchronous GET operation on the DHT identified.
const GNUNET_HashCode * key,
GNUNET_DHT_GetIterator iter,
void *iter_cls,
- GNUNET_SCHEDULER_Task cont,
- void *cont_cls)
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
struct GNUNET_DHT_GetHandle *get_handle;
struct GNUNET_DHT_GetMessage *get_msg;
- if (handle->current != NULL) /* Can't send right now, we have a pending message... */
+ if (handle->current != NULL) /* Can't send right now, we have a pending message... */
return NULL;
- get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle));
+ get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
get_handle->get_context.iter = iter;
get_handle->get_context.iter_cls = iter_cls;
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Inserting pending get request with key %s\n", "DHT API", GNUNET_h2s(key));
+ "`%s': Inserting pending get request with key %s\n", "DHT API",
+ GNUNET_h2s (key));
#endif
- get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
- get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
- get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
- get_msg->type = htonl(type);
+ get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
+ get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+ get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
+ get_msg->type = htonl (type);
- get_handle->route_handle = GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, &get_reply_iterator, get_handle, cont, cont_cls);
+ get_handle->route_handle =
+ GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
+ &get_reply_iterator, get_handle, cont, cont_cls);
return get_handle;
}
void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
struct PendingMessage *pending;
struct GNUNET_DHT_StopMessage *message;
size_t msize;
GNUNET_HashCode *uid_key;
- msize = sizeof(struct GNUNET_DHT_StopMessage);
+ msize = sizeof (struct GNUNET_DHT_StopMessage);
- message = GNUNET_malloc(msize);
- message->header.size = htons(msize);
- message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
+ message = GNUNET_malloc (msize);
+ message->header.size = htons (msize);
+ message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Remove outstanding request for uid %llu\n", "DHT API",
+ route_handle->uid);
#endif
- message->unique_id = GNUNET_htonll(route_handle->uid);
+ message->unique_id = GNUNET_htonll (route_handle->uid);
- GNUNET_assert(route_handle->dht_handle->current == NULL);
+ GNUNET_assert (route_handle->dht_handle->current == NULL);
- pending = GNUNET_malloc(sizeof(struct PendingMessage));
- pending->msg = (struct GNUNET_MessageHeader *)message;
+ pending = GNUNET_malloc (sizeof (struct PendingMessage));
+ pending->msg = (struct GNUNET_MessageHeader *) message;
pending->timeout = DEFAULT_DHT_TIMEOUT;
pending->cont = cont;
pending->cont_cls = cont_cls;
pending->is_unique = GNUNET_NO;
pending->unique_id = route_handle->uid;
- GNUNET_assert(route_handle->dht_handle->current == NULL);
+ GNUNET_assert (route_handle->dht_handle->current == NULL);
route_handle->dht_handle->current = pending;
- process_pending_message(route_handle->dht_handle);
+ process_pending_message (route_handle->dht_handle);
- uid_key = hash_from_uid(route_handle->uid);
+ uid_key = hash_from_uid (route_handle->uid);
- if (GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests, uid_key, route_handle) != GNUNET_YES)
+ if (GNUNET_CONTAINER_multihashmap_remove
+ (route_handle->dht_handle->outstanding_requests, uid_key,
+ route_handle) != GNUNET_YES)
{
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
+ "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n",
+ "DHT API", GNUNET_h2s (uid_key), route_handle->uid);
#endif
}
- GNUNET_free(uid_key);
+ GNUNET_free (uid_key);
return;
}
* @param get_handle handle to the GET operation to stop
*/
void
-GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Removing pending get request with key %s, uid %llu\n", "DHT API", GNUNET_h2s(&get_handle->route_handle->key), get_handle->route_handle->uid);
+ "`%s': Removing pending get request with key %s, uid %llu\n",
+ "DHT API", GNUNET_h2s (&get_handle->route_handle->key),
+ get_handle->route_handle->uid);
#endif
- GNUNET_DHT_route_stop(get_handle->route_handle, cont, cont_cls);
- GNUNET_free(get_handle);
+ GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
+ GNUNET_free (get_handle);
}
*/
struct GNUNET_DHT_FindPeerHandle *
GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
- struct GNUNET_TIME_Relative timeout,
- enum GNUNET_DHT_RouteOption options,
- struct GNUNET_MessageHeader *message,
- const GNUNET_HashCode * key,
- GNUNET_DHT_FindPeerProcessor proc,
- void *proc_cls,
- GNUNET_SCHEDULER_Task cont,
- void *cont_cls)
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_DHT_RouteOption options,
+ struct GNUNET_MessageHeader *message,
+ const GNUNET_HashCode * key,
+ GNUNET_DHT_FindPeerProcessor proc,
+ void *proc_cls,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
size_t msize;
- if (handle->current != NULL) /* Can't send right now, we have a pending message... */
+ if (handle->current != NULL) /* Can't send right now, we have a pending message... */
return NULL;
if (message != NULL)
- msize = ntohs(message->size);
+ msize = ntohs (message->size);
else
msize = 0;
- find_peer_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerHandle));
+ find_peer_handle =
+ GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerHandle));
find_peer_handle->find_peer_context.proc = proc;
find_peer_handle->find_peer_context.proc_cls = proc_cls;
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Inserting pending `%s' request with key %s\n", "DHT API", "FIND PEER", GNUNET_h2s(key));
+ "`%s': Inserting pending `%s' request with key %s\n", "DHT API",
+ "FIND PEER", GNUNET_h2s (key));
#endif
- find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_FindPeerMessage) + msize);
- find_peer_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
- find_peer_msg->header.size = htons(sizeof(struct GNUNET_DHT_FindPeerMessage));
+ find_peer_msg =
+ GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage) + msize);
+ find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+ find_peer_msg->header.size =
+ htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
find_peer_msg->msg_len = msize;
if (message != NULL)
- {
- memcpy(&find_peer_msg[1], message, msize);
- }
+ {
+ memcpy (&find_peer_msg[1], message, msize);
+ }
- find_peer_handle->route_handle = GNUNET_DHT_route_start(handle, key, 0, options, &find_peer_msg->header, timeout, &find_peer_reply_iterator, find_peer_handle, cont, cont_cls);
+ find_peer_handle->route_handle =
+ GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg->header,
+ timeout, &find_peer_reply_iterator,
+ find_peer_handle, cont, cont_cls);
return find_peer_handle;
}
* @param find_peer_handle GET operation to stop.
*/
void
-GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls)
+GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Removing pending `%s' request with key %s, uid %llu\n", "DHT API", "FIND PEER", GNUNET_h2s(&find_peer_handle->route_handle->key), find_peer_handle->route_handle->uid);
+ "`%s': Removing pending `%s' request with key %s, uid %llu\n",
+ "DHT API", "FIND PEER",
+ GNUNET_h2s (&find_peer_handle->route_handle->key),
+ find_peer_handle->route_handle->uid);
#endif
- GNUNET_DHT_route_stop(find_peer_handle->route_handle, cont, cont_cls);
- GNUNET_free(find_peer_handle);
+ GNUNET_DHT_route_stop (find_peer_handle->route_handle, cont, cont_cls);
+ GNUNET_free (find_peer_handle);
}
const char *data,
struct GNUNET_TIME_Absolute exp,
struct GNUNET_TIME_Relative timeout,
- GNUNET_SCHEDULER_Task cont,
- void *cont_cls)
+ GNUNET_SCHEDULER_Task cont, void *cont_cls)
{
struct GNUNET_DHT_PutMessage *put_msg;
size_t msize;
if (handle->current != NULL)
{
- GNUNET_SCHEDULER_add_continuation(handle->sched, cont, cont_cls, GNUNET_SCHEDULER_REASON_TIMEOUT);
+ GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+ GNUNET_SCHEDULER_REASON_TIMEOUT);
return;
}
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Inserting pending put request with key %s\n", "DHT API", GNUNET_h2s(key));
+ "`%s': Inserting pending put request with key %s\n", "DHT API",
+ GNUNET_h2s (key));
#endif
- msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
- put_msg = GNUNET_malloc(msize);
- put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
- put_msg->header.size = htons(msize);
- put_msg->type = htonl(type);
- put_msg->data_size = htons(size);
+ msize = sizeof (struct GNUNET_DHT_PutMessage) + size;
+ put_msg = GNUNET_malloc (msize);
+ put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_PUT);
+ put_msg->header.size = htons (msize);
+ put_msg->type = htonl (type);
+ put_msg->data_size = htons (size);
put_msg->expiration = exp;
- memcpy(&put_msg[1], data, size);
+ memcpy (&put_msg[1], data, size);
- GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, NULL, cont, cont_cls);
+ GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
+ NULL, cont, cont_cls);
- GNUNET_free(put_msg);
+ GNUNET_free (put_msg);
}
/**
* Handle to the datacache service (for inserting/retrieving data)
*/
- struct GNUNET_DATACACHE_Handle *datacache;
+struct GNUNET_DATACACHE_Handle *datacache;
/**
* The main scheduler to use for the DHT service
*/
static GNUNET_SCHEDULER_TaskIdentifier cleanup_task;
-struct ClientList
+/**
+ * Context for handling results from a get request.
+ */
+struct DatacacheGetContext
{
/**
- * This is a linked list
+ * The client to send the result to.
+ */
+ struct GNUNET_SERVER_Client *client;
+
+ /**
+ * The unique id of this request
*/
- struct ClientList *next;
+ unsigned long long unique_id;
+};
+
+struct DHT_MessageContext
+{
/**
- * The client in question
+ * The client this request was received from.
*/
struct GNUNET_SERVER_Client *client;
+
+ /**
+ * The key this request was about
+ */
+ GNUNET_HashCode *key;
+
+ /**
+ * The unique identifier of this request
+ */
+ unsigned long long unique_id;
+
+ /**
+ * Desired replication level
+ */
+ size_t replication;
+
+ /**
+ * Any message options for this request
+ */
+ size_t msg_options;
};
/**
* Server handler for handling locally received dht requests
*/
static void
-handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader *message);
+handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message);
static void
-handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client,
+handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message);
static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
* Core handler for p2p dht get requests.
*/
static int handle_dht_p2p_get (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance);
/**
* Core handler for p2p dht put requests.
*/
static int handle_dht_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance);
/**
* Core handler for p2p dht find peer requests.
*/
static int handle_dht_p2p_find_peer (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader
+ *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance);
static struct GNUNET_CORE_MessageHandler core_handlers[] = {
{&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_GET, 0},
};
+static size_t
+send_reply (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_DHT_Message *reply = cls;
+
+ if (buf == NULL) /* Message timed out, that's crappy... */
+ {
+ GNUNET_free (reply);
+ return 0;
+ }
+
+ if (size >= ntohs (reply->header.size))
+ {
+ memcpy (buf, reply, ntohs (reply->header.size));
+ return ntohs (reply->header.size);
+ }
+ else
+ return 0;
+}
+
+
+static void
+send_reply_to_client (struct GNUNET_SERVER_Client *client,
+ struct GNUNET_MessageHeader *message,
+ unsigned long long uid)
+{
+ struct GNUNET_DHT_Message *reply;
+ size_t msize;
+ size_t tsize;
+#if DEBUG_DHT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Sending reply to client.\n", "DHT");
+#endif
+ msize = ntohs (message->size);
+ tsize = sizeof (struct GNUNET_DHT_Message) + msize;
+ reply = GNUNET_malloc (tsize);
+ reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
+ reply->header.size = htons (tsize);
+ if (uid != 0)
+ reply->unique = htons (GNUNET_YES);
+ reply->unique_id = GNUNET_htonll (uid);
+ memcpy (&reply[1], message, msize);
+
+ GNUNET_SERVER_notify_transmit_ready (client,
+ tsize,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &send_reply, reply);
+
+}
+
+
+/**
+ * Iterator for local get request results, return
+ * GNUNET_OK to continue iteration, anything else
+ * to stop iteration.
+ */
+static int
+datacache_get_iterator (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ uint32_t size, const char *data, uint32_t type)
+{
+ struct DatacacheGetContext *datacache_get_ctx = cls;
+ struct GNUNET_DHT_GetResultMessage *get_result;
+
+ get_result =
+ GNUNET_malloc (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
+ get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
+ get_result->header.size =
+ htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
+ get_result->data_size = htons (size);
+ get_result->expiration = exp;
+ memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
+ get_result->type = htons (type);
+ memcpy (&get_result[1], data, size);
+
+ send_reply_to_client (datacache_get_ctx->client, &get_result->header,
+ datacache_get_ctx->unique_id);
+
+ GNUNET_free (get_result);
+ return GNUNET_OK;
+}
/**
* Server handler for initiating local dht get requests
*/
-static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, GNUNET_HashCode *key)
+static void
+handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
+ struct DHT_MessageContext *message_context)
{
#if DEBUG_DHT
GNUNET_HashCode get_key;
#endif
size_t get_type;
+ unsigned int results;
+ struct DatacacheGetContext *datacache_get_context;
- GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct GNUNET_DHT_GetMessage));
- get_type = ntohs(get_msg->type);
+ GNUNET_assert (ntohs (get_msg->header.size) >=
+ sizeof (struct GNUNET_DHT_GetMessage));
+ get_type = ntohs (get_msg->type);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key));
+ "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
+ "DHT", "GET", get_type, GNUNET_h2s (&get_key),
+ message_context->unique_id);
#endif
+ datacache_get_context = GNUNET_malloc (sizeof (struct DatacacheGetContext));
+ datacache_get_context->client = message_context->client;
+ datacache_get_context->unique_id = message_context->unique_id;
+
+ results = 0;
+ if (datacache != NULL)
+ results =
+ GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
+ datacache_get_iterator, datacache_get_context);
+
+#if DEBUG_DHT
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Found %d results for local `%s' request\n", "DHT",
+ results, "GET");
+#endif
+ GNUNET_free (datacache_get_context);
/* FIXME: Implement get functionality here */
}
/**
* Server handler for initiating local dht find peer requests
*/
-static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg, GNUNET_HashCode *key)
+static void
+handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg,
+ struct DHT_MessageContext *message_context)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n", "DHT", "FIND PEER", GNUNET_h2s(key), ntohs(find_msg->header.size), sizeof(struct GNUNET_DHT_FindPeerMessage));
+ "`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
+ "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
+ ntohs (find_msg->header.size),
+ sizeof (struct GNUNET_DHT_FindPeerMessage));
#endif
- GNUNET_assert(ntohs(find_msg->header.size) >= sizeof(struct GNUNET_DHT_FindPeerMessage));
+ GNUNET_assert (ntohs (find_msg->header.size) >=
+ sizeof (struct GNUNET_DHT_FindPeerMessage));
/* FIXME: Implement find peer functionality here */
}
/**
* Server handler for initiating local dht put requests
*/
-static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, GNUNET_HashCode *key)
+static void
+handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg,
+ struct DHT_MessageContext *message_context)
{
size_t put_type;
size_t data_size;
- char *data;
- GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct GNUNET_DHT_PutMessage));
+ GNUNET_assert (ntohs (put_msg->header.size) >=
+ sizeof (struct GNUNET_DHT_PutMessage));
- put_type = ntohs(put_msg->type);
- data_size = ntohs(put_msg->data_size);
+ put_type = ntohs (put_msg->type);
+ data_size = ntohs (put_msg->data_size);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': %s msg total size is %d, data size %d, struct size %d\n", "DHT", "PUT", ntohs(put_msg->header.size), data_size, sizeof(struct GNUNET_DHT_PutMessage));
+ "`%s': %s msg total size is %d, data size %d, struct size %d\n",
+ "DHT", "PUT", ntohs (put_msg->header.size), data_size,
+ sizeof (struct GNUNET_DHT_PutMessage));
#endif
- GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct GNUNET_DHT_PutMessage) + data_size);
- data = GNUNET_malloc(data_size);
- memcpy(data, &put_msg[1], data_size);
+ GNUNET_assert (ntohs (put_msg->header.size) ==
+ sizeof (struct GNUNET_DHT_PutMessage) + data_size);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, key %s\n", "DHT", "PUT", put_type, GNUNET_h2s(key));
+ "`%s': Received `%s' request from client, message type %d, key %s\n",
+ "DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
#endif
-
+ /**
+ * Simplest DHT functionality, store any message we receive a put request for.
+ */
+ if (datacache != NULL)
+ GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
+ (char *) &put_msg[1], put_type,
+ put_msg->expiration);
/**
* FIXME: Implement dht put request functionality here!
*/
- GNUNET_free(data);
}
/**
/**
* Transmit handle.
*/
- struct GNUNET_CONNECTION_TransmitHandle * transmit_handle;
+ struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
};
-static size_t send_confirmation (void *cls,
- size_t size, void *buf)
+static size_t
+send_confirmation (void *cls, size_t size, void *buf)
{
struct GNUNET_DHT_StopMessage *confirmation_message = cls;
- if (buf == NULL) /* Message timed out, that's crappy... */
- {
- GNUNET_free(confirmation_message);
- return 0;
- }
+ if (buf == NULL) /* Message timed out, that's crappy... */
+ {
+ GNUNET_free (confirmation_message);
+ return 0;
+ }
- if (size >= ntohs(confirmation_message->header.size))
- {
- memcpy(buf, confirmation_message, ntohs(confirmation_message->header.size));
- return ntohs(confirmation_message->header.size);
- }
+ if (size >= ntohs (confirmation_message->header.size))
+ {
+ memcpy (buf, confirmation_message,
+ ntohs (confirmation_message->header.size));
+ return ntohs (confirmation_message->header.size);
+ }
else
return 0;
}
+
static void
-send_client_receipt_confirmation(struct GNUNET_SERVER_Client *client, uint64_t uid)
+send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
+ uint64_t uid)
{
struct GNUNET_DHT_StopMessage *confirm_message;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Sending receipt confirmation for uid %llu\n", "DHT", uid);
+ "`%s': Sending receipt confirmation for uid %llu\n", "DHT",
+ uid);
#endif
- confirm_message = GNUNET_malloc(sizeof(struct GNUNET_DHT_StopMessage));
- confirm_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
- confirm_message->header.size = htons(sizeof(struct GNUNET_DHT_StopMessage));
- confirm_message->unique_id = GNUNET_htonll(uid);
+ confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage));
+ confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
+ confirm_message->header.size =
+ htons (sizeof (struct GNUNET_DHT_StopMessage));
+ confirm_message->unique_id = GNUNET_htonll (uid);
GNUNET_SERVER_notify_transmit_ready (client,
- sizeof(struct GNUNET_DHT_StopMessage),
- GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
+ sizeof (struct GNUNET_DHT_StopMessage),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
&send_confirmation, confirm_message);
}
static void
-handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader *message)
+handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)message;
+ struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message;
struct GNUNET_MessageHeader *enc_msg;
+ struct DHT_MessageContext *message_context;
size_t enc_type;
- enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
- enc_type = ntohs(enc_msg->type);
+ enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
+ enc_type = ntohs (enc_msg->type);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), GNUNET_ntohll(dht_msg->unique_id));
+ "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
+ "DHT", "GENERIC", enc_type, GNUNET_h2s (&dht_msg->key),
+ GNUNET_ntohll (dht_msg->unique_id));
#endif
- /* FIXME: Implement demultiplexing functionality here */
+ message_context = GNUNET_malloc (sizeof (struct DHT_MessageContext));
+ message_context->client = client;
+ message_context->key = &dht_msg->key;
+ message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
+ message_context->replication = ntohs (dht_msg->desired_replication_level);
+ message_context->msg_options = ntohs (dht_msg->options);
+
switch (enc_type)
{
case GNUNET_MESSAGE_TYPE_DHT_GET:
- handle_dht_get(cls, (struct GNUNET_DHT_GetMessage *)enc_msg, &dht_msg->key);
+ handle_dht_get (cls, (struct GNUNET_DHT_GetMessage *) enc_msg,
+ message_context);
break;
case GNUNET_MESSAGE_TYPE_DHT_PUT:
- handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, &dht_msg->key);
- send_client_receipt_confirmation(client, GNUNET_ntohll(dht_msg->unique_id));
+ handle_dht_put (cls, (struct GNUNET_DHT_PutMessage *) enc_msg,
+ message_context);
+ send_client_receipt_confirmation (client,
+ GNUNET_ntohll (dht_msg->unique_id));
break;
case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
- handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, &dht_msg->key);
+ handle_dht_find_peer (cls,
+ (struct GNUNET_DHT_FindPeerMessage *) enc_msg,
+ message_context);
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"`%s': Message type (%d) not handled\n", "DHT", enc_type);
}
- GNUNET_SERVER_receive_done(client, GNUNET_OK);
+ GNUNET_free (message_context);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
static void
-handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader *message)
+handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
{
- struct GNUNET_DHT_StopMessage * dht_stop_msg = (struct GNUNET_DHT_StopMessage *)message;
+ struct GNUNET_DHT_StopMessage *dht_stop_msg =
+ (struct GNUNET_DHT_StopMessage *) message;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll(dht_stop_msg->unique_id));
+ "`%s': Received `%s' request from client, uid %llu\n", "DHT",
+ "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
#endif
- send_client_receipt_confirmation(client, GNUNET_ntohll(dht_stop_msg->unique_id));
- GNUNET_SERVER_receive_done(client, GNUNET_OK);
+
+ /* TODO: Put in demultiplexing here */
+
+ send_client_receipt_confirmation (client,
+ GNUNET_ntohll (dht_stop_msg->unique_id));
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
* Core handler for p2p dht get requests.
*/
-static int handle_dht_p2p_get (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+static int
+handle_dht_p2p_get (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency, uint32_t distance)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from another peer\n", "DHT", "GET");
+ "`%s': Received `%s' request from another peer\n", "DHT",
+ "GET");
#endif
return GNUNET_YES;
/**
* Core handler for p2p dht put requests.
*/
-static int handle_dht_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+static int
+handle_dht_p2p_put (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency, uint32_t distance)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from another peer\n", "DHT", "PUT");
+ "`%s': Received `%s' request from another peer\n", "DHT",
+ "PUT");
#endif
return GNUNET_YES;
/**
* Core handler for p2p dht find peer requests.
*/
-static int handle_dht_p2p_find_peer (void *cls,
- const struct GNUNET_PeerIdentity * peer,
- const struct GNUNET_MessageHeader * message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+static int
+handle_dht_p2p_find_peer (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency,
+ uint32_t distance)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from another peer\n", "DHT", "FIND PEER");
+ "`%s': Received `%s' request from another peer\n", "DHT",
+ "FIND PEER");
#endif
return GNUNET_YES;
* @param tc unused
*/
static void
-shutdown_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_CORE_disconnect (coreAPI);
}
/**
* To be called on core init/fail.
*/
-void core_init (void *cls,
- struct GNUNET_CORE_Handle * server,
- const struct GNUNET_PeerIdentity *identity,
- const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded * publicKey)
+void
+core_init (void *cls,
+ struct GNUNET_CORE_Handle *server,
+ const struct GNUNET_PeerIdentity *identity,
+ const struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *publicKey)
{
if (server == NULL)
{
- GNUNET_SCHEDULER_cancel(sched, cleanup_task);
- GNUNET_SCHEDULER_add_now(sched, &shutdown_task, NULL);
+ GNUNET_SCHEDULER_cancel (sched, cleanup_task);
+ GNUNET_SCHEDULER_add_now (sched, &shutdown_task, NULL);
return;
}
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Core connection initialized, I am peer: %s\n", "dht", GNUNET_i2s(identity));
+ "%s: Core connection initialized, I am peer: %s\n", "dht",
+ GNUNET_i2s (identity));
#endif
- memcpy(&my_identity, identity, sizeof(struct GNUNET_PeerIdentity));
+ memcpy (&my_identity, identity, sizeof (struct GNUNET_PeerIdentity));
coreAPI = server;
}
datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
- client_transmit_timeout = GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
+ client_transmit_timeout =
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
GNUNET_SERVER_add_handlers (server, plugin_handlers);
- coreAPI =
- GNUNET_CORE_connect (sched, /* Main scheduler */
- cfg, /* Main configuration */
- client_transmit_timeout, /* Delay for connecting */
- NULL, /* FIXME: anything we want to pass around? */
- &core_init, /* Call core_init once connected */
- NULL, /* Don't care about pre-connects */
- NULL, /* Don't care about connects */
- NULL, /* Don't care about disconnects */
- NULL, /* Don't want notified about all incoming messages */
- GNUNET_NO, /* For header only inbound notification */
- NULL, /* Don't want notified about all outbound messages */
- GNUNET_NO, /* For header only outbound notification */
- core_handlers); /* Register these handlers */
+ coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
+ cfg, /* Main configuration */
+ client_transmit_timeout, /* Delay for connecting */
+ NULL, /* FIXME: anything we want to pass around? */
+ &core_init, /* Call core_init once connected */
+ NULL, /* Don't care about pre-connects */
+ NULL, /* Don't care about connects */
+ NULL, /* Don't care about disconnects */
+ NULL, /* Don't want notified about all incoming messages */
+ GNUNET_NO, /* For header only inbound notification */
+ NULL, /* Don't want notified about all outbound messages */
+ GNUNET_NO, /* For header only outbound notification */
+ core_handlers); /* Register these handlers */
if (coreAPI == NULL)
return;
/* Scheduled the task to clean up when shutdown is called */
cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &shutdown_task,
- NULL);
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task, NULL);
}
static void
-end (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+end (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
/* do work here */
GNUNET_SCHEDULER_cancel (sched, die_task);
die_task = GNUNET_SCHEDULER_NO_TASK;
if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning FAIL!\n");
- ok = 365;
- }
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DHT disconnected, returning FAIL!\n");
+ ok = 365;
+ }
else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "DHT disconnected, returning success!\n");
- ok = 0;
- }
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "DHT disconnected, returning success!\n");
+ ok = 0;
+ }
}
static void
{
/* do work here */
#if VERBOSE
- fprintf(stderr, "Ending on an unhappy note.\n");
+ fprintf (stderr, "Ending on an unhappy note.\n");
#endif
GNUNET_DHT_disconnect (p1.dht_handle);
* @param cls closure
* @param tc context information (why was this task triggered now)
*/
-void test_find_peer_stop (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+void
+test_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerContext *peer = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer_stop!\n");
if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
- GNUNET_SCHEDULER_add_now(sched, &end_badly, NULL);
+ GNUNET_SCHEDULER_add_now (sched, &end_badly, NULL);
GNUNET_assert (peer->dht_handle != NULL);
- GNUNET_DHT_find_peer_stop(peer->find_peer_handle, &end, &p1);
+ GNUNET_DHT_find_peer_stop (peer->find_peer_handle, &end, &p1);
//GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &end, &p1);
* @param cls closure
* @param tc context information (why was this task triggered now)
*/
-void test_find_peer (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+void
+test_find_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerContext *peer = cls;
GNUNET_HashCode hash;
- memset(&hash, 42, sizeof(GNUNET_HashCode));
+ memset (&hash, 42, sizeof (GNUNET_HashCode));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n");
GNUNET_assert (peer->dht_handle != NULL);
- peer->find_peer_handle = GNUNET_DHT_find_peer_start(peer->dht_handle, TIMEOUT, 0, NULL, &hash, NULL, NULL, &test_find_peer_stop, &p1);
+ peer->find_peer_handle =
+ GNUNET_DHT_find_peer_start (peer->dht_handle, TIMEOUT, 0, NULL, &hash,
+ NULL, NULL, &test_find_peer_stop, &p1);
if (peer->find_peer_handle == NULL)
- GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1);
+ GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
}
/**
* @param cls closure
* @param tc context information (why was this task triggered now)
*/
-void test_put (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+void
+test_get_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerContext *peer = cls;
- GNUNET_HashCode hash;
- char *data;
- size_t data_size = 42;
- memset(&hash, 42, sizeof(GNUNET_HashCode));
- data = GNUNET_malloc(data_size);
- memset(data, 43, data_size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_put!\n");
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get_stop!\n");
+ if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
+ GNUNET_SCHEDULER_add_now (sched, &end_badly, NULL);
+
GNUNET_assert (peer->dht_handle != NULL);
- GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(TIMEOUT), TIMEOUT, &test_find_peer, &p1);
+ GNUNET_DHT_get_stop (peer->get_handle, &test_find_peer, &p1);
+
+ //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1);
+
+}
+
+void
+test_get_iterator (void *cls,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ uint32_t type, uint32_t size, const void *data)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "test_get_iterator called (we got a result), stopping get request!\n");
+ GNUNET_SCHEDULER_add_continuation (sched, &test_get_stop, &p1,
+ GNUNET_SCHEDULER_REASON_PREREQ_DONE);
}
/**
* @param cls closure
* @param tc context information (why was this task triggered now)
*/
-void test_get_stop (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+void
+test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerContext *peer = cls;
+ GNUNET_HashCode hash;
+ memset (&hash, 42, sizeof (GNUNET_HashCode));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get_stop!\n");
- if (tc->reason == GNUNET_SCHEDULER_REASON_TIMEOUT)
- GNUNET_SCHEDULER_add_now(sched, &end_badly, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get!\n");
GNUNET_assert (peer->dht_handle != NULL);
- GNUNET_DHT_get_stop(peer->get_handle, &test_put, &p1);
-
- //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1);
+ peer->get_handle =
+ GNUNET_DHT_get_start (peer->dht_handle, TIMEOUT, 42, &hash,
+ &test_get_iterator, NULL, NULL, NULL);
+ if (peer->get_handle == NULL)
+ GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
}
/**
* @param cls closure
* @param tc context information (why was this task triggered now)
*/
-void test_get (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext * tc)
+void
+test_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct PeerContext *peer = cls;
GNUNET_HashCode hash;
- memset(&hash, 42, sizeof(GNUNET_HashCode));
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_get!\n");
+ char *data;
+ size_t data_size = 42;
+ memset (&hash, 42, sizeof (GNUNET_HashCode));
+ data = GNUNET_malloc (data_size);
+ memset (data, 43, data_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_put!\n");
peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg, 100);
+
GNUNET_assert (peer->dht_handle != NULL);
- peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, TIMEOUT, 42, &hash, NULL, NULL, &test_get_stop, &p1);
+ GNUNET_DHT_put (peer->dht_handle, &hash, 0, data_size, data,
+ GNUNET_TIME_relative_to_absolute (TIMEOUT), TIMEOUT,
+ &test_get, &p1);
- if (peer->get_handle == NULL)
- GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1);
}
static void
sched = s;
die_task = GNUNET_SCHEDULER_add_delayed (sched,
- GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1), &end_badly, NULL);
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_MINUTES, 1),
+ &end_badly, NULL);
setup_peer (&p1, "test_dht_api_peer1.conf");
- GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_get, &p1);
+ GNUNET_SCHEDULER_add_delayed (sched,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 1), &test_put,
+ &p1);
}
static int
ok = 1;
GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
- argv, "test-dht-api", "nohelp",
- options, &run, &ok);
+ argv, "test-dht-api", "nohelp", options, &run, &ok);
stop_arm (&p1);
return ok;
}
NULL);
ret = check ();
- //GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-dht-peer-1");
+ GNUNET_DISK_directory_remove ("/tmp/test-gnunetd-dht-peer-1");
return ret;
}