const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_DHT_Handle *handle = cls;
- struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)msg;
+ struct GNUNET_DHT_Message *dht_msg;
+ struct GNUNET_DHT_StopMessage *stop_msg;
struct GNUNET_MessageHeader *enc_msg;
struct GNUNET_DHT_RouteHandle *route_handle;
+ uint64_t uid;
GNUNET_HashCode *uid_hash;
size_t enc_size;
/* TODO: find out message type, handle callbacks for different types of messages.
* Should be a non unique acknowledgment, or unique result. */
+ if (msg == NULL)
+ {
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received response to message (uid %llu)\n", "DHT API", ntohl(dht_msg->unique_id));
+ "`%s': Received NULL from server, connection down?\n", "DHT API");
#endif
+ return;
+ }
- if (ntohs(dht_msg->unique))
- {
- uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
- route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
- if (route_handle == NULL) /* We have no recollection of this request */
- {
+ if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT)
+ {
+ dht_msg = (struct GNUNET_DHT_Message *)msg;
+ uid = GNUNET_ntohll(dht_msg->unique_id);
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
+#endif
+ if (ntohs(dht_msg->unique))
+ {
+ uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
+
+ route_handle = GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
+ if (route_handle == NULL) /* We have no recollection of this request */
+ {
#if DEBUG_DHT_API
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"`%s': Received response to message (uid %llu), but have no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
#endif
- }
- else
- {
- enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
- GNUNET_assert(enc_size > 0);
- enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
- route_handle->iter(route_handle->iter_cls, enc_msg);
- }
- }
- else
- {
- if (handle->current->unique_id == ntohl(dht_msg->unique_id))
- {
+ }
+ else
+ {
+ enc_size = ntohs(dht_msg->header.size) - sizeof(struct GNUNET_DHT_Message);
+ GNUNET_assert(enc_size > 0);
+ enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
+ route_handle->iter(route_handle->iter_cls, enc_msg);
+ }
+ }
+ }
+ else if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DHT_STOP)
+ {
+ stop_msg = (struct GNUNET_DHT_StopMessage *)msg;
+ uid = GNUNET_ntohll(stop_msg->unique_id);
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Received response to message (uid %llu)\n", "DHT API", uid);
+#endif
+ if (handle->current->unique_id == uid)
+ {
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Have pending confirmation for this message!\n", "DHT API", uid);
+#endif
+ if (handle->current->cont != NULL)
handle->current->cont(handle->current->cont_cls, GNUNET_OK);
- GNUNET_free(handle->current->msg);
- handle->current = NULL;
- GNUNET_free(handle->current);
- }
- }
-
+ GNUNET_free(handle->current->msg);
+ handle->current = NULL;
+ GNUNET_free(handle->current);
+ }
+ }
}
{
/* TODO: if code is not GNUNET_OK, do something! */
struct PendingMessage *pos = handle->current;
-
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Finish called!\n", "DHT API");
+#endif
GNUNET_assert(pos != NULL);
if (pos->is_unique)
struct GNUNET_DHT_Handle *handle = cls;
size_t tsize;
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': In transmit_pending\n", "DHT API");
+#endif
if (buf == NULL)
{
#if DEBUG_DHT_API
"`%s': Sending message size %d\n", "DHT API", tsize);
#endif
memcpy(buf, handle->current->msg, tsize);
+ finish(handle, GNUNET_OK);
return tsize;
}
else
static void process_pending_message(struct GNUNET_DHT_Handle *handle)
{
- if (handle->current != NULL)
+ if (handle->current == NULL)
return; /* action already pending */
if (GNUNET_YES != try_connect (handle))
{
size_t msize;
GNUNET_HashCode *uid_key;
int count;
+ uint64_t uid;
+ uid = 0;
is_unique = GNUNET_YES;
if (iter == NULL)
is_unique = GNUNET_NO;
route_handle->iter_cls = iter_cls;
route_handle->dht_handle = handle;
route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
-
+ uid = route_handle->uid;
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Unique ID is %llu\n", "DHT API", uid);
+#endif
count = 0;
uid_key = hash_from_uid(route_handle->uid);
/* While we have an outstanding request with the same identifier! */
* Store based on random identifier!
*/
GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size) + sizeof(route_handle->uid);
+ msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
GNUNET_free(uid_key);
}
else
message->options = htons(options);
message->desired_replication_level = htons(options);
message->unique = htons(is_unique);
+ message->unique_id = GNUNET_htonll(uid);
+ memcpy(&message[1], enc, ntohs(enc->size));
pending = GNUNET_malloc(sizeof(struct PendingMessage));
pending->msg = &message->header;
GNUNET_assert(handle->current == NULL);
+ handle->current = pending;
+
process_pending_message(handle);
return route_handle;
message = GNUNET_malloc(msize);
message->header.size = htons(msize);
message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
- message->unique_id = htonl(route_handle->uid);
+#if DEBUG_DHT_API
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "`%s': Remove outstanding request for uid %llu\n", "DHT API", route_handle->uid);
+#endif
+ message->unique_id = GNUNET_htonll(route_handle->uid);
pending = GNUNET_malloc(sizeof(struct PendingMessage));
pending->msg = (struct GNUNET_MessageHeader *)message;
GNUNET_assert(route_handle->dht_handle->current == NULL);
+ route_handle->dht_handle->current = pending;
+
process_pending_message(route_handle->dht_handle);
uid_key = hash_from_uid(route_handle->uid);
};
/**
- * Server handler for initiating local dht get requests
+ * Server handler for handling locally received dht requests
*/
-static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader *message);
+static void
+handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client,
+ const struct GNUNET_MessageHeader *message);
+
+static void
+handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client,
+ const struct GNUNET_MessageHeader *message);
static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
- {&handle_dht_plugin_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+ {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+ {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
/* {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0},
{&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0},
{&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0},
GNUNET_free(data);
}
+/**
+ * Context for sending receipt confirmations. Not used yet.
+ */
+struct SendConfirmationContext
+{
+ /**
+ * The message to send.
+ */
+ struct GNUNET_DHT_StopMessage *message;
+
+ /**
+ * Transmit handle.
+ */
+ struct GNUNET_CONNECTION_TransmitHandle * transmit_handle;
+};
+
+size_t send_confirmation (void *cls,
+ size_t size, void *buf)
+{
+ struct GNUNET_DHT_StopMessage *confirmation_message = cls;
+
+ if (buf == NULL) /* Message timed out, that's crappy... */
+ {
+ GNUNET_free(confirmation_message);
+ return 0;
+ }
+
+ if (size >= ntohs(confirmation_message->header.size))
+ {
+ memcpy(buf, confirmation_message, ntohs(confirmation_message->header.size));
+ return ntohs(confirmation_message->header.size);
+ }
+ else
+ return 0;
+}
static void
-handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg)
+send_client_receipt_confirmation(struct GNUNET_SERVER_Client *client, uint64_t uid)
{
+ struct GNUNET_DHT_StopMessage *confirm_message;
+
+ confirm_message = GNUNET_malloc(sizeof(struct GNUNET_DHT_StopMessage));
+ confirm_message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
+ confirm_message->header.size = htons(sizeof(struct GNUNET_DHT_StopMessage));
+ confirm_message->unique_id = GNUNET_htonll(uid);
+
+ GNUNET_SERVER_notify_transmit_ready (client,
+ sizeof(struct GNUNET_DHT_StopMessage),
+ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5),
+ &send_confirmation, confirm_message);
+
+}
+
+static void
+handle_dht_start_message(void *cls, struct GNUNET_SERVER_Client * client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)message;
struct GNUNET_MessageHeader *enc_msg;
size_t enc_type;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), ntohl(dht_msg->unique_id));
+ "`%s': Received `%s' request from client, message type %d, key %s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), GNUNET_ntohll(dht_msg->unique_id));
#endif
/* FIXME: Implement demultiplexing functionality here */
#endif
}
+ GNUNET_SERVER_receive_done(client, GNUNET_OK);
+
}
static void
-handle_dht_stop_message(void *cls, struct GNUNET_DHT_StopMessage *dht_stop_msg)
+handle_dht_stop_message(void *cls, struct GNUNET_SERVER_Client * client,
+ const struct GNUNET_MessageHeader *message)
{
+ struct GNUNET_DHT_StopMessage * dht_stop_msg = (struct GNUNET_DHT_StopMessage *)message;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", ntohl(dht_stop_msg->unique_id));
-#endif
-}
-
-
-
-/**
- * Server handler for initiating local dht get requests
- */
-static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client * client,
- const struct GNUNET_MessageHeader *message)
-{
-
- #if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "`%s': Received `%s' request from client, message type %d, size %d\n", "DHT", "GENERIC", ntohs(message->type), ntohs(message->size));
+ "`%s': Received `%s' request from client, uid %llu\n", "DHT", "GENERIC STOP", GNUNET_ntohll(dht_stop_msg->unique_id));
#endif
-
- switch(ntohs(message->type))
- {
- case GNUNET_MESSAGE_TYPE_DHT:
- handle_dht_start_message(cls, (struct GNUNET_DHT_Message *)message);
- case GNUNET_MESSAGE_TYPE_DHT_STOP:
- handle_dht_stop_message(cls, (struct GNUNET_DHT_StopMessage *)message);
- }
-
+ send_client_receipt_confirmation(client, GNUNET_ntohll(dht_stop_msg->unique_id));
GNUNET_SERVER_receive_done(client, GNUNET_OK);
}
+
/**
* Core handler for p2p dht get requests.
*/
struct GNUNET_CONFIGURATION_Handle *cfg;
struct GNUNET_DHT_Handle *dht_handle;
struct GNUNET_PeerIdentity id;
- struct GNUNET_DHT_GetHandle *get_handle;
- struct GNUNET_DHT_GetHandle *find_peer_handle;
+ struct GNUNET_DHT_RouteHandle *get_handle;
+ struct GNUNET_DHT_RouteHandle *find_peer_handle;
#if START_ARM
pid_t arm_pid;
GNUNET_assert (peer->dht_handle != NULL);
- GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), NULL, NULL);
+ GNUNET_DHT_put(peer->dht_handle, &hash, 0, data_size, data, GNUNET_TIME_relative_to_absolute(GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360)) ,GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 360), NULL, NULL);
//GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1);
GNUNET_assert (peer->dht_handle != NULL);
- GNUNET_DHT_get_stop(peer->dht_handle, peer->get_handle);
+ GNUNET_DHT_get_stop(peer->get_handle);
- GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1);
+ //GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &test_put, &p1);
+ GNUNET_SCHEDULER_add_delayed(sched, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 1), &end, &p1);
}
GNUNET_HashCode hash;
memset(&hash, 42, sizeof(GNUNET_HashCode));
- peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg);
+ peer->dht_handle = GNUNET_DHT_connect (sched, peer->cfg, 100);
GNUNET_assert (peer->dht_handle != NULL);
- peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, 42, &hash, NULL, NULL);
+ peer->get_handle = GNUNET_DHT_get_start(peer->dht_handle, GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 100), 42, &hash, NULL, NULL);
if (peer->get_handle == NULL)
GNUNET_SCHEDULER_add_now(sched, &end_badly, &p1);