struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
/**
- * Kill off the connection and any pending messages.
+ * Generator for unique ids.
*/
- int do_destroy;
+ uint64_t uid_gen;
};
-static struct GNUNET_TIME_Relative default_request_timeout;
-/* Forward declaration */
-static void process_pending_message (struct GNUNET_DHT_Handle *handle);
-
-static GNUNET_HashCode *
-hash_from_uid (uint64_t uid)
+/**
+ * Convert unique ID to hash code.
+ *
+ * @param uid unique ID to convert
+ * @param hash set to uid (extended with zeros)
+ */
+static void
+hash_from_uid (uint64_t uid,
+ GNUNET_HashCode *hash)
{
- int count;
- int remaining;
- GNUNET_HashCode *hash;
- hash = GNUNET_malloc (sizeof (GNUNET_HashCode));
- count = 0;
-
- while (count < sizeof (GNUNET_HashCode))
- {
- remaining = sizeof (GNUNET_HashCode) - count;
- if (remaining > sizeof (uid))
- remaining = sizeof (uid);
-
- memcpy (hash, &uid, remaining);
- count += remaining;
- }
-
- return hash;
+ memset (hash, 0, sizeof(GNUNET_HashCode));
+ *((uint64_t*)hash) = uid;
}
+
/**
* Handler for messages received from the DHT service
* a demultiplexer which handles numerous message types
*
*/
void
-service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+service_message_handler (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_DHT_Handle *handle = cls;
struct GNUNET_DHT_Message *dht_msg;
struct GNUNET_MessageHeader *enc_msg;
struct GNUNET_DHT_RouteHandle *route_handle;
uint64_t uid;
- GNUNET_HashCode *uid_hash;
+ GNUNET_HashCode uid_hash;
size_t enc_size;
/* TODO: find out message type, handle callbacks for different types of messages.
* Should be a non unique acknowledgment, or unique result. */
{
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received NULL from server, connection down?\n",
+ "`%s': Received NULL from server, connection down!\n",
"DHT API");
#endif
+ GNUNET_CLIENT_disconnect (handle->client);
+ handle->client = GNUNET_CLIENT_connect (handle->sched,
+ "dht",
+ handle->cfg);
+ /* FIXME: re-transmit *all* of our GET requests AND re-start
+ receiving responses! */
return;
}
"`%s': Received response to message (uid %llu)\n",
"DHT API", uid);
#endif
- if (ntohs (dht_msg->unique))
+ if (ntohl (dht_msg->unique))
{
- uid_hash = hash_from_uid (uid);
+ hash_from_uid (uid, &uid_hash);
route_handle =
GNUNET_CONTAINER_multihashmap_get (handle->outstanding_requests,
- uid_hash);
- GNUNET_free (uid_hash);
+ &uid_hash);
if (route_handle == NULL) /* We have no recollection of this request */
{
#if DEBUG_DHT_API
GNUNET_assert (enc_size > 0);
enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
route_handle->iter (route_handle->iter_cls, enc_msg);
-
}
}
break;
struct GNUNET_DHT_Handle *handle;
handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_Handle));
-
- default_request_timeout =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
handle->cfg = cfg;
handle->sched = sched;
-
- handle->current = NULL;
- handle->do_destroy = GNUNET_NO;
- handle->th = NULL;
-
handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
- handle->outstanding_requests =
- GNUNET_CONTAINER_multihashmap_create (ht_len);
-
if (handle->client == NULL)
{
GNUNET_free (handle);
return NULL;
}
+ handle->outstanding_requests =
+ GNUNET_CONTAINER_multihashmap_create (ht_len);
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Connection to service in progress\n", "DHT API");
GNUNET_CLIENT_receive (handle->client,
&service_message_handler,
handle, GNUNET_TIME_UNIT_FOREVER_REL);
-
return handle;
}
"`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
#endif
GNUNET_assert (handle != NULL);
-
if (handle->th != NULL) /* We have a live transmit request in the Aether */
{
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
handle->client = NULL;
}
-
+ /* Either assert that outstanding_requests is empty */
+ /* FIXME: handle->outstanding_requests not freed! */
GNUNET_free (handle);
}
/* Otherwise we need to wait for a response to this message! */
}
+
/**
* Transmit the next pending message, called by notify_transmit_ready
*/
return;
}
- /* TODO: set do_destroy somewhere's, see what needs to happen in that case! */
- if (handle->do_destroy)
- {
- //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper disconnect stuffs */
- }
-
-
if (NULL ==
(handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
ntohs (handle->
"Failed to transmit request to dht service.\n");
#endif
finish (handle, GNUNET_SYSERR);
+ return;
}
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
struct GNUNET_DHT_RouteHandle *route_handle;
struct PendingMessage *pending;
struct GNUNET_DHT_Message *message;
- size_t is_unique;
- size_t msize;
- GNUNET_HashCode *uid_key;
+ size_t expects_response;
+ uint16_t msize;
+ GNUNET_HashCode uid_key;
uint64_t uid;
- is_unique = GNUNET_YES;
- if (iter == NULL)
- is_unique = GNUNET_NO;
-
- route_handle = NULL;
- uid_key = NULL;
-
- do
+ if (sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
- GNUNET_free_non_null (uid_key);
- uid = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
- uid_key = hash_from_uid (uid);
+ GNUNET_break (0);
+ return NULL;
}
- while (GNUNET_CONTAINER_multihashmap_contains
- (handle->outstanding_requests, uid_key) == GNUNET_YES);
-
- if (is_unique)
+ expects_response = GNUNET_YES;
+ if (iter == NULL)
+ expects_response = GNUNET_NO;
+ uid = handle->uid_gen++;
+ if (expects_response)
{
route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Unique ID is %llu\n", "DHT API", uid);
#endif
- /**
- * Store based on random identifier!
- */
GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
- uid_key, route_handle,
+ &uid_key, route_handle,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
-
- }
- else
- {
- msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
}
-
- GNUNET_free (uid_key);
+ msize = sizeof (struct GNUNET_DHT_Message) + ntohs (enc->size);
message = GNUNET_malloc (msize);
message->header.size = htons (msize);
message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
memcpy (&message->key, key, sizeof (GNUNET_HashCode));
- message->options = htons (options);
- message->desired_replication_level = htons (options);
- message->unique = htons (is_unique);
+ message->options = htonl (options);
+ message->desired_replication_level = htonl (options);
+ message->unique = htonl (expects_response);
message->unique_id = GNUNET_htonll (uid);
memcpy (&message[1], enc, ntohs (enc->size));
-
pending = GNUNET_malloc (sizeof (struct PendingMessage));
pending->msg = &message->header;
pending->timeout = timeout;
pending->cont = cont;
pending->cont_cls = cont_cls;
- pending->is_unique = is_unique;
+ pending->expects_response = expects_response;
pending->unique_id = uid;
-
GNUNET_assert (handle->current == NULL);
-
handle->current = pending;
-
process_pending_message (handle);
-
return route_handle;
}
-void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
- GNUNET_SCHEDULER_Task cont, void *cont_cls);
/**
* Perform an asynchronous GET operation on the DHT identified.
return get_handle;
}
+
/**
* Stop a previously issued routing request
*
* @param route_handle handle to the request to stop
* @param cont continuation to call once this message is sent to the service or times out
* @param cont_cls closure for the continuation
- *
*/
void
GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
struct PendingMessage *pending;
struct GNUNET_DHT_StopMessage *message;
size_t msize;
- GNUNET_HashCode *uid_key;
+ GNUNET_HashCode uid_key;
msize = sizeof (struct GNUNET_DHT_StopMessage);
-
message = GNUNET_malloc (msize);
message->header.size = htons (msize);
message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
route_handle->uid);
#endif
message->unique_id = GNUNET_htonll (route_handle->uid);
-
GNUNET_assert (route_handle->dht_handle->current == NULL);
-
pending = GNUNET_malloc (sizeof (struct PendingMessage));
pending->msg = (struct GNUNET_MessageHeader *) message;
pending->timeout = DEFAULT_DHT_TIMEOUT;
pending->cont = cont;
pending->cont_cls = cont_cls;
- pending->is_unique = GNUNET_NO;
pending->unique_id = route_handle->uid;
-
GNUNET_assert (route_handle->dht_handle->current == NULL);
-
route_handle->dht_handle->current = pending;
-
process_pending_message (route_handle->dht_handle);
-
- uid_key = hash_from_uid (route_handle->uid);
-
- if (GNUNET_CONTAINER_multihashmap_remove
- (route_handle->dht_handle->outstanding_requests, uid_key,
- route_handle) != GNUNET_YES)
- {
-#if DEBUG_DHT_API
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Remove outstanding request from hashmap failed for key %s, uid %llu\n",
- "DHT API", GNUNET_h2s (uid_key), route_handle->uid);
-#endif
- }
- GNUNET_free (uid_key);
- return;
+ hash_from_uid (route_handle->uid, &uid_key);
+ GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
+ (route_handle->dht_handle->outstanding_requests, &uid_key,
+ route_handle) == GNUNET_YES);
}
#endif
GNUNET_DHT_route_stop (get_handle->route_handle, cont, cont_cls);
GNUNET_free (get_handle);
-
}
*/
static const struct GNUNET_CONFIGURATION_Handle *cfg;
-/**
- * Timeout for transmissions to clients
- */
-static struct GNUNET_TIME_Relative client_transmit_timeout;
-
/**
* Handle to the core service
*/
struct PendingMessage *next;
/**
- * Actual message to be sent
+ * Pointer to previous item in the list
*/
- struct GNUNET_MessageHeader *msg;
+ struct PendingMessage *prev;
+
+ /**
+ * Actual message to be sent; // avoid allocation
+ */
+ const struct GNUNET_MessageHeader *msg; // msg = (cast) &pm[1]; // memcpy (&pm[1], data, len);
};
*/
struct PendingMessage *pending_head;
+ /**
+ * Tail of linked list of pending messages for this client
+ */
+ struct PendingMessage *pending_tail;
+
};
/**
*/
static struct ClientList *client_list;
-
-/**
- * Server handlers for handling locally received dht requests
- */
-static void
-handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message);
-
-static void
-handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message);
-
-static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
- {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
- {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
- {NULL, NULL, 0, 0}
-};
-
-
-/**
- * Core handler for p2p dht get requests.
- */
-static int handle_dht_p2p_get (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
-
-/**
- * Core handler for p2p dht put requests.
- */
-static int handle_dht_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
-
-/**
- * Core handler for p2p dht find peer requests.
- */
-static int handle_dht_p2p_find_peer (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader
- *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance);
-
-static struct GNUNET_CORE_MessageHandler core_handlers[] = {
- {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_GET, 0},
- {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_PUT, 0},
- {&handle_dht_p2p_find_peer, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0},
- {NULL, 0, 0}
-};
-
/**
* Forward declaration.
*/
static size_t send_generic_reply (void *cls, size_t size, void *buf);
+
/**
* Task run to check for messages that need to be sent to a client.
*
- * @param cls a ClientList, containing the client and any messages to be sent to it
- * @param tc reason this was called
+ * @param client a ClientList, containing the client and any messages to be sent to it
*/
static void
-process_pending_messages (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct ClientList *client = cls;
-
- if (client->pending_head == NULL) /* No messages queued */
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Have no pending messages for client.\n", "DHT");
-#endif
- return;
- }
-
- if (client->transmit_handle == NULL) /* No current pending messages, we can try to send! */
- client->transmit_handle =
- GNUNET_SERVER_notify_transmit_ready (client->client_handle,
- ntohs (client->pending_head->msg->
- size),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &send_generic_reply, client);
- else
- {
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Transmit handle is non-null.\n", "DHT");
-#endif
- }
+process_pending_messages (struct ClientList *client)
+{
+ if (client->pending_head == NULL)
+ return;
+ if (client->transmit_handle != NULL)
+ return;
+ client->transmit_handle =
+ GNUNET_SERVER_notify_transmit_ready (client->client_handle,
+ ntohs (client->pending_head->msg->
+ size),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &send_generic_reply, client);
}
/**
send_generic_reply (void *cls, size_t size, void *buf)
{
struct ClientList *client = cls;
- struct PendingMessage *reply = client->pending_head;
- int ret;
+ char *cbuf = buf;
+ struct PendingMessage *reply;
+ size_t off;
+ size_t msize;
client->transmit_handle = NULL;
- if (buf == NULL) /* Message timed out, that's crappy... */
+ if (buf == NULL)
{
+ /* client disconnected */
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
#endif
- client->pending_head = reply->next;
- GNUNET_free (reply->msg);
- GNUNET_free (reply);
return 0;
}
-
- if (size >= ntohs (reply->msg->size))
+ off = 0;
+ while ( (NULL != (reply = client->pending_head)) &&
+ (size >= off + (msize = ntohs (reply->msg->size))))
{
+ GNUNET_CONTAINER_DLL_remove (client->pending_head,
+ client->pending_tail,
+ reply);
+ memcpy (&cbuf[off], reply->msg, msize);
+ GNUNET_free (reply->msg);
+ GNUNET_free (reply);
+ off += msize;
+ }
#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
#endif
- memcpy (buf, reply->msg, ntohs (reply->msg->size));
-
- ret = ntohs (reply->msg->size);
- }
- else
- ret = 0;
-
- client->pending_head = reply->next;
- GNUNET_free (reply->msg);
- GNUNET_free (reply);
-
- GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
- return ret;
+ process_pending_messages (client);
+ return off;
}
+
/**
* Add a PendingMessage to the clients list of messages to be sent
*
add_pending_message (struct ClientList *client,
struct PendingMessage *pending_message)
{
- struct PendingMessage *pos;
- struct PendingMessage *prev;
-
- pos = client->pending_head;
-
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Adding pending message for client.\n", "DHT");
-#endif
-
- if (pos == NULL)
- {
- client->pending_head = pending_message;
- }
- else /* This means another request is already queued, rely on send_reply to process all pending messages */
- {
- while (pos != NULL) /* Find end of list */
- {
- prev = pos;
- pos = pos->next;
- }
-
- GNUNET_assert (prev != NULL);
- prev->next = pending_message;
- }
-
- GNUNET_SCHEDULER_add_now (sched, &process_pending_messages, client);
-
+ GNUNET_CONTAINER_DLL_insert_after (client->pending_head,
+ client->pending_tail,
+ client->pending_tail,
+ pending_message);
+ process_pending_messages (client);
}
+
/**
* Called when a reply needs to be sent to a client, either as
* a result it found to a GET or FIND PEER request.
*/
static void
send_reply_to_client (struct ClientList *client,
- struct GNUNET_MessageHeader *message,
+ const struct GNUNET_MessageHeader *message,
unsigned long long uid)
{
struct GNUNET_DHT_Message *reply;
struct PendingMessage *pending_message;
-
- size_t msize;
+ uint16_t msize;
size_t tsize;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
#endif
msize = ntohs (message->size);
tsize = sizeof (struct GNUNET_DHT_Message) + msize;
+ if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_BREAK_op (0);
+ return;
+ }
reply = GNUNET_malloc (tsize);
reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT);
reply->header.size = htons (tsize);
if (uid != 0)
- reply->unique = htons (GNUNET_YES);
+ reply->unique = htonl (GNUNET_YES); // ????
reply->unique_id = GNUNET_htonll (uid);
memcpy (&reply[1], message, msize);
-
- pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
+ pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline
pending_message->msg = &reply->header;
-
add_pending_message (client, pending_message);
}
memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
get_result->type = htons (type);
memcpy (&get_result[1], data, size);
-
send_reply_to_client (datacache_get_ctx->client, &get_result->header,
datacache_get_ctx->unique_id);
-
GNUNET_free (get_result);
return GNUNET_OK;
}
+
/**
* Server handler for initiating local dht get requests
*
* @param cls closure for service
- * @param get_msg the actual get message
+ * @param msg the actual get message
* @param message_context struct containing pertinent information about the get request
- *
*/
static void
-handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg,
+handle_dht_get (void *cls,
+ const struct GNUNET_MessageHeader *msg,
struct DHT_MessageContext *message_context)
{
- size_t get_type;
+ const struct GNUNET_DHT_GetMessage *get_msg;
+ uint16_t get_type;
unsigned int results;
- struct DatacacheGetContext *datacache_get_context;
+ struct DatacacheGetContext datacache_get_context;
- GNUNET_assert (ntohs (get_msg->header.size) >=
- sizeof (struct GNUNET_DHT_GetMessage));
+ if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
get_type = ntohs (get_msg->type);
-
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n",
+ "`%s': Received `%s' request from client, message type %u, key %s, uid %llu\n",
"DHT", "GET", get_type, GNUNET_h2s (message_context->key),
message_context->unique_id);
#endif
-
- datacache_get_context = GNUNET_malloc (sizeof (struct DatacacheGetContext));
- datacache_get_context->client = message_context->client;
- datacache_get_context->unique_id = message_context->unique_id;
-
+ datacache_get_context.client = message_context->client;
+ datacache_get_context.unique_id = message_context->unique_id;
results = 0;
if (datacache != NULL)
results =
GNUNET_DATACACHE_get (datacache, message_context->key, get_type,
- &datacache_get_iterator, datacache_get_context);
-
+ &datacache_get_iterator, &datacache_get_context);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Found %d results for local `%s' request\n", "DHT",
results, "GET");
-
- GNUNET_free (datacache_get_context);
- /* FIXME: Implement get functionality here */
}
*
*/
static void
-handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage *find_msg,
+handle_dht_find_peer (void *cls,
+ const struct GNUNET_MessageHeader *find_msg,
struct DHT_MessageContext *message_context)
{
struct GNUNET_DHT_FindPeerResultMessage *find_peer_result;
size_t hello_size;
size_t tsize;
+
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received `%s' request from client, key %s (msg size %d, we expected %d)\n",
ntohs (find_msg->header.size),
sizeof (struct GNUNET_DHT_FindPeerMessage));
#endif
-
- GNUNET_assert (ntohs (find_msg->header.size) >=
- sizeof (struct GNUNET_DHT_FindPeerMessage));
-
if (my_hello == NULL)
{
#if DEBUG_DHT
"`%s': Our HELLO is null, can't return.\n",
"DHT");
#endif
-
return;
}
-
/* Simplistic find_peer functionality, always return our hello */
hello_size = ntohs(my_hello->size);
tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage);
+ // check tsize < MAX
find_peer_result = GNUNET_malloc (tsize);
find_peer_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
find_peer_result->header.size = htons (tsize);
- find_peer_result->data_size = htons (hello_size);
- memcpy(&find_peer_result->peer, &my_identity, sizeof(struct GNUNET_PeerIdentity));
memcpy (&find_peer_result[1], &my_hello, hello_size);
-
send_reply_to_client(message_context->client, &find_peer_result->header, message_context->unique_id);
GNUNET_free(find_peer_result);
- /* FIXME: Implement find peer functionality here */
}
* @param message_context struct containing pertinent information about the request
*/
static void
-handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg,
+handle_dht_put (void *cls,
+ const struct GNUNET_MessageHeader *msg,
struct DHT_MessageContext *message_context)
{
+ struct GNUNET_DHT_PutMessage *put_msg;
size_t put_type;
size_t data_size;
- GNUNET_assert (ntohs (put_msg->header.size) >=
+ GNUNET_assert (ntohs (msg->header.size) >=
sizeof (struct GNUNET_DHT_PutMessage));
-
- put_type = ntohs (put_msg->type);
- data_size = ntohs (put_msg->data_size);
+ put_msg = (struct GNUNET_DHT_PutMessage *)msg;
+ put_type = ntohl (put_msg->type);
+ data_size = ntohs (put_msg->header.size) - sizeof (struct GNUNET_DHT_PutMessage);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': %s msg total size is %d, data size %d, struct size %d\n",
"DHT", "PUT", ntohs (put_msg->header.size), data_size,
sizeof (struct GNUNET_DHT_PutMessage));
-#endif
- GNUNET_assert (ntohs (put_msg->header.size) ==
- sizeof (struct GNUNET_DHT_PutMessage) + data_size);
-
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received `%s' request from client, message type %d, key %s\n",
"DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
#endif
-
- /**
- * Simplest DHT functionality, store any message we receive a put request for.
- */
if (datacache != NULL)
GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
(char *) &put_msg[1], put_type,
put_msg->expiration);
- /**
- * FIXME: Implement dht put request functionality here!
- */
-
}
ret->client_handle = client;
ret->next = client_list;
client_list = ret;
- ret->pending_head = NULL;
-
return ret;
}
handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
- struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *) message;
- struct GNUNET_MessageHeader *enc_msg;
+ const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message *) message;
+ const struct GNUNET_MessageHeader *enc_msg;
struct DHT_MessageContext *message_context;
-
size_t enc_type;
- enc_msg = (struct GNUNET_MessageHeader *) &dht_msg[1];
+ enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
enc_type = ntohs (enc_msg->type);
message_context->client = find_active_client (client);
message_context->key = &dht_msg->key;
message_context->unique_id = GNUNET_ntohll (dht_msg->unique_id);
- message_context->replication = ntohs (dht_msg->desired_replication_level);
- message_context->msg_options = ntohs (dht_msg->options);
+ message_context->replication = ntohl (dht_msg->desired_replication_level);
+ message_context->msg_options = ntohl (dht_msg->options);
+ /* FIXME: Implement *remote* DHT operations here (forward request) */
+ /* FIXME: *IF* handling should be local, then do this: */
switch (enc_type)
{
case GNUNET_MESSAGE_TYPE_DHT_GET:
- handle_dht_get (cls, (struct GNUNET_DHT_GetMessage *) enc_msg,
+ handle_dht_get (cls, enc_msg,
message_context);
break;
case GNUNET_MESSAGE_TYPE_DHT_PUT:
- handle_dht_put (cls, (struct GNUNET_DHT_PutMessage *) enc_msg,
+ handle_dht_put (cls, enc_msg,
message_context);
send_client_receipt_confirmation (client,
GNUNET_ntohll (dht_msg->unique_id));
break;
case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
handle_dht_find_peer (cls,
- (struct GNUNET_DHT_FindPeerMessage *) enc_msg,
+ enc_msg,
message_context);
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"`%s': Message type (%d) not handled\n", "DHT", enc_type);
}
-
GNUNET_free (message_context);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
- struct GNUNET_DHT_StopMessage *dht_stop_msg =
- (struct GNUNET_DHT_StopMessage *) message;
+ const struct GNUNET_DHT_StopMessage *dht_stop_msg =
+ (const struct GNUNET_DHT_StopMessage *) message;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received `%s' request from client, uid %llu\n", "DHT",
"GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
#endif
-
- /* TODO: Put in demultiplexing here */
-
- send_client_receipt_confirmation (client,
- GNUNET_ntohll (dht_stop_msg->unique_id));
+ /* TODO: actually stop... */
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/**
- * Core handler for p2p dht get requests.
+ * Core handler for p2p route requests.
*/
static int
-handle_dht_p2p_get (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency, uint32_t distance)
+handle_dht_p2p_route_request (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency, uint32_t distance)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received `%s' request from another peer\n", "DHT",
"GET");
#endif
-
+ // FIXME: setup tracking for sending replies to peer (with timeout)
+ // FIXME: call code from handle_dht_start_message (refactor...)
return GNUNET_YES;
}
-/**
- * Core handler for p2p dht put requests.
- */
-static int
-handle_dht_p2p_put (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency, uint32_t distance)
-{
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from another peer\n", "DHT",
- "PUT");
-#endif
-
- return GNUNET_YES;
-}
/**
- * Core handler for p2p dht find peer requests.
+ * Core handler for p2p route results.
*/
static int
-handle_dht_p2p_find_peer (void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *message,
- struct GNUNET_TIME_Relative latency,
- uint32_t distance)
+handle_dht_p2p_route_result (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ struct GNUNET_TIME_Relative latency, uint32_t distance)
{
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received `%s' request from another peer\n", "DHT",
- "FIND PEER");
+ "GET");
#endif
-
+ // FIXME: setup tracking for sending replies to peer
+ // FIXME: possibly call code from handle_dht_stop_message? (unique result?) (refactor...)
return GNUNET_YES;
}
}
+static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
+ {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+ {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
+ {NULL, NULL, 0, 0}
+};
+
+
+static struct GNUNET_CORE_MessageHandler core_handlers[] = {
+ {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0},
+ {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0},
+ {NULL, 0, 0}
+};
+
+
/**
* Process dht requests.
*
{
sched = scheduler;
cfg = c;
-
datacache = GNUNET_DATACACHE_create (sched, cfg, "dhtcache");
-
- client_transmit_timeout =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
GNUNET_SERVER_add_handlers (server, plugin_handlers);
-
coreAPI = GNUNET_CORE_connect (sched, /* Main scheduler */
cfg, /* Main configuration */
- client_transmit_timeout, /* Delay for connecting */
+ GNUNET_TIME_UNIT_FOREVER_REL,
NULL, /* FIXME: anything we want to pass around? */
&core_init, /* Call core_init once connected */
NULL, /* Don't care about pre-connects */
NULL, /* Don't want notified about all outbound messages */
GNUNET_NO, /* For header only outbound notification */
core_handlers); /* Register these handlers */
-
+ if (coreAPI == NULL)
+ return;
transport_handle = GNUNET_TRANSPORT_connect(sched, cfg, NULL, NULL, NULL, NULL);
-
if (transport_handle != NULL)
GNUNET_TRANSPORT_get_hello (transport_handle, &process_hello, NULL);
else
GNUNET_log(GNUNET_ERROR_TYPE_WARNING, "Failed to connect to transport service!\n");
-
-
- if (coreAPI == NULL)
- return;
-
/* Scheduled the task to clean up when shutdown is called */
cleanup_task = GNUNET_SCHEDULER_add_delayed (sched,
GNUNET_TIME_UNIT_FOREVER_REL,
#define GNUNET_DHT_SERVICE_H
#include "gnunet_util_lib.h"
+#include "gnunet_hello_lib.h"
#ifdef __cplusplus
extern "C"
*
* @param handle handle to the DHT service
* @param timeout timeout for this request to be sent to the
- * service
- * @param type expected type of the response object
+ * service (this is NOT a timeout for receiving responses)
+ * @param type expected type of the response object (GNUNET_DATASTORE_BLOCKTYPE_*)
* @param key the key to look up
* @param iter function to call on each result
* @param iter_cls closure for iter
- * @param cont continuation to call once message sent
+ * @param cont continuation to call once message sent (and it is now
+ * safe to do another operation on the DHT)
* @param cont_cls closure for continuation
- *
- * @return handle to stop the async get, NULL on error
+ * @return handle to stop the async get, NULL on error (two
+ * concurrent operations scheduled)
*/
struct GNUNET_DHT_GetHandle *
GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
GNUNET_SCHEDULER_Task cont,
void *cont_cls);
+
/**
* Stop async DHT-get. Frees associated resources.
*
* @param get_handle GET operation to stop.
+ * @param cont continuation to call once this message is sent to the service
+ * @param cont_cls closure for the continuation
*/
void
-GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls);
+GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
+ GNUNET_SCHEDULER_Task cont,
+ void *cont_cls);
/**
* operation
*
* @param cls closure
- * @param reply response
+ * @param peer hello of a target (peer near key)
*/
typedef void (*GNUNET_DHT_FindPeerProcessor)(void *cls,
- const struct GNUNET_PeerIdentity *peer,
- const struct GNUNET_MessageHeader *reply);
+ const struct GNUNET_HELLO_Message *peer);
/**
* @param timeout timeout for this request to be sent to the
* service
* @param options routing options for this message
- * @param message a message to inject at found peers (may be null)
* @param key the key to look up
* @param proc function to call on each result
* @param proc_cls closure for proc
* @param cont continuation to call once message sent
* @param cont_cls closure for continuation
- *
* @return handle to stop the async get, NULL on error
*/
struct GNUNET_DHT_FindPeerHandle *
GNUNET_DHT_find_peer_start (struct GNUNET_DHT_Handle *handle,
- struct GNUNET_TIME_Relative timeout,
- enum GNUNET_DHT_RouteOption options,
- struct GNUNET_MessageHeader *message,
- const GNUNET_HashCode * key,
- GNUNET_DHT_FindPeerProcessor proc,
- void *proc_cls,
- GNUNET_SCHEDULER_Task cont,
- void *cont_cls);
+ struct GNUNET_TIME_Relative timeout,
+ enum GNUNET_DHT_RouteOption options,
+ const GNUNET_HashCode * key,
+ GNUNET_DHT_FindPeerProcessor proc,
+ void *proc_cls,
+ GNUNET_SCHEDULER_Task cont,
+ void *cont_cls);
+
/**
* Stop async find peer. Frees associated resources.
*
* @param find_peer_handle GET operation to stop.
+ * @param cont continuation to call once this message is sent to the service
+ * @param cont_cls closure for the continuation
*/
void
-GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls);
+GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
+ GNUNET_SCHEDULER_Task cont,
+ void *cont_cls);
+
/**
* Iterator called on each result obtained from a generic route
* operation
+ *
+ * @param cls closure
+ * @param reply response
*/
typedef void (*GNUNET_DHT_ReplyProcessor)(void *cls,
const struct GNUNET_MessageHeader *reply);
+
/**
- * Perform an asynchronous FIND_PEER operation on the DHT.
+ * Perform an asynchronous ROUTE_START operation on the DHT.
*
* @param handle handle to the DHT service
* @param key the key to look up
* to wait for transmission to the service
* @param iter function to call on each result, NULL if no replies are expected
* @param iter_cls closure for iter
-
* @param cont continuation to call when done, GNUNET_SYSERR if failed
* GNUNET_OK otherwise
* @param cont_cls closure for cont
GNUNET_SCHEDULER_Task cont,
void *cont_cls);
+
+/**
+ * Stop async route stop. Frees associated resources.
+ *
+ * @param route_handle operation to stop.
+ * @param cont continuation to call once this message is sent to the service
+ * @param cont_cls closure for the continuation
+ */
void
-GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle, GNUNET_SCHEDULER_Task cont, void *cont_cls);
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle,
+ GNUNET_SCHEDULER_Task cont,
+ void *cont_cls);
#if 0 /* keep Emacsens' auto-indent happy */