*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
-
/**
* Current connection to the datastore service.
*/
*/
int in_receive;
+ /**
+ * We should either receive (and ignore) an 'END' message or force a
+ * disconnect for the next message from the service.
+ */
+ unsigned int expect_end_or_disconnect;
+
};
h->in_receive = GNUNET_YES;
GNUNET_CLIENT_receive (h->client,
qe->response_proc,
- qe,
+ h,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
GNUNET_STATISTICS_update (h->stats,
gettext_noop ("# bytes sent to datastore"),
const struct
GNUNET_MessageHeader * msg)
{
- struct GNUNET_DATASTORE_QueueEntry *qe = cls;
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- struct StatusContext rc = qe->qc.sc;
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct StatusContext rc;
const struct StatusMessage *sm;
const char *emsg;
int32_t status;
int was_transmitted;
h->in_receive = GNUNET_NO;
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return;
+ }
was_transmitted = qe->was_transmitted;
+ rc = qe->qc.sc;
if (msg == NULL)
{
free_queue_entry (qe);
return;
}
GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- GNUNET_assert (h->queue_head == qe);
free_queue_entry (qe);
if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
(ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) )
*/
static void
process_result_message (void *cls,
- const struct GNUNET_MessageHeader * msg)
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_DATASTORE_QueueEntry *qe = cls;
- struct GNUNET_DATASTORE_Handle *h = qe->h;
- struct ResultContext rc = qe->qc.rc;
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct ResultContext rc;
const struct DataMessage *dm;
int was_transmitted;
h->in_receive = GNUNET_NO;
if (msg == NULL)
- {
- was_transmitted = qe->was_transmitted;
- free_queue_entry (qe);
- if (was_transmitted == GNUNET_YES)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Failed to receive response from database.\n"));
- do_disconnect (h);
- }
- else
+ {
+ if (NULL != (qe = h->queue_head))
{
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ rc = qe->qc.rc;
+ if (was_transmitted == GNUNET_YES)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to receive response from database.\n"));
+ do_disconnect (h);
+ }
+ else
+ {
#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Request dropped due to finite datastore queue length.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Request dropped due to finite datastore queue length.\n");
#endif
+ }
+ if (rc.iter != NULL)
+ rc.iter (rc.iter_cls,
+ NULL, 0, NULL, 0, 0, 0,
+ GNUNET_TIME_UNIT_ZERO_ABS, 0);
}
- if (rc.iter != NULL)
- rc.iter (rc.iter_cls,
- NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
return;
}
- GNUNET_assert (GNUNET_YES == qe->was_transmitted);
- GNUNET_assert (h->queue_head == qe);
if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
{
GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
+ if (h->expect_end_or_disconnect > 0)
+ {
+ h->expect_end_or_disconnect--;
+ process_queue (h);
+ return;
+ }
+ qe = h->queue_head;
+ rc = qe->qc.rc;
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
free_queue_entry (qe);
#if DEBUG_DATASTORE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
process_queue (h);
return;
}
+ if (h->expect_end_or_disconnect > 0)
+ {
+ /* only 'END' allowed, must reconnect */
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ return;
+ }
+ qe = h->queue_head;
+ rc = qe->qc.rc;
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
(ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
(ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
{
struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
- GNUNET_assert (&process_result_message == qe->response_proc);
h->in_receive = GNUNET_YES;
GNUNET_CLIENT_receive (h->client,
- qe->response_proc,
- qe,
+ &process_result_message,
+ h,
GNUNET_TIME_absolute_get_remaining (qe->timeout));
}
if (GNUNET_YES == qe->was_transmitted)
{
free_queue_entry (qe);
- h->retry_time = GNUNET_TIME_UNIT_ZERO;
- do_disconnect (h);
+ h->expect_end_or_disconnect++;
return;
}
free_queue_entry (qe);