*/
uint64_t uid_gen;
+ /**
+ * Did we start our receive loop yet?
+ */
+ int in_receive;
};
{
if (handle->client != NULL)
return GNUNET_OK;
+ handle->in_receive = GNUNET_NO;
handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
if (handle->client == NULL)
{
_("Failed to connect to the DHT service!\n"));
return GNUNET_NO;
}
-#if DEBUG_DHT
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
-#endif
- GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
- GNUNET_TIME_UNIT_FOREVER_REL);
return GNUNET_YES;
}
struct GNUNET_DHT_GetHandle *rh = value;
if (GNUNET_NO == rh->message->in_pending_queue)
- {
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Retransmitting request related to %s to DHT %p\n",
+ GNUNET_h2s (key),
+ handle);
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
rh->message);
rh->message->in_pending_queue = GNUNET_YES;
struct GNUNET_DHT_Handle *handle = cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Reconnedting with DHT %p\n",
+ "Reconnecting with DHT %p\n",
handle);
handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
- if (handle->client == NULL)
+ if (GNUNET_YES != try_connect (handle))
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "dht reconnect failed(!)\n");
return;
memcpy (buf, head->msg, tsize);
GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
head);
+ head->in_pending_queue = GNUNET_NO;
if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (head->timeout_task);
head->cont = NULL;
head->cont_cls = NULL;
}
- head->in_pending_queue = GNUNET_NO;
if (GNUNET_YES == head->free_on_send)
GNUNET_free (head);
process_pending_messages (handle);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Forwarded request of %u bytes to DHT service\n", (unsigned int) tsize);
#endif
+ if (GNUNET_NO == handle->in_receive)
+ {
+#if DEBUG_DHT
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Starting to process replies from DHT\n");
+#endif
+ handle->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
return tsize;
}
}
while (NULL != (pm = handle->pending_head))
{
+ GNUNET_assert (GNUNET_YES == pm->in_pending_queue);
GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
pm);
+ pm->in_pending_queue = GNUNET_NO;
GNUNET_assert (GNUNET_YES == pm->free_on_send);
if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
GNUNET_SCHEDULER_cancel (pm->timeout_task);
if (NULL != pm->cont)
GNUNET_SCHEDULER_add_continuation (pm->cont, pm->cont_cls,
GNUNET_SCHEDULER_REASON_TIMEOUT);
- pm->in_pending_queue = GNUNET_NO;
GNUNET_free (pm);
}
if (handle->client != NULL)
struct GNUNET_DHT_Handle *handle;
handle = pending->handle;
+ GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
pending);
+ pending->in_pending_queue = GNUNET_NO;
if (pending->cont != NULL)
pending->cont (pending->cont_cls, tc);
GNUNET_free (pending);