+ GNUNET_CLIENT_disconnect (h->client);
+ h->skip_next_messages = 0;
+ h->client = NULL;
+ h->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
+}
+
+
+/**
+ * Function called whenever we receive a message from
+ * the service. Calls the appropriate handler.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param msg the received message
+ */
+static void
+receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ h->in_receive = GNUNET_NO;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
+ if (h->skip_next_messages > 0)
+ {
+ h->skip_next_messages--;
+ process_queue (h);
+ return;
+ }
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ process_queue (h);
+ return;
+ }
+ qe->response_proc (h, msg);
+}
+
+
+/**
+ * Transmit request from queue to datastore service.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param size number of bytes that can be copied to buf
+ * @param buf where to copy the drop message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ size_t msize;
+
+ h->th = NULL;
+ if (NULL == (qe = h->queue_head))
+ return 0; /* no entry in queue */
+ if (buf == NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# transmission request failures"),
+ 1, GNUNET_NO);
+ do_disconnect (h);
+ return 0;
+ }
+ if (size < (msize = qe->message_size))
+ {
+ process_queue (h);
+ return 0;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
+ msize);
+ memcpy (buf, &qe[1], msize);
+ qe->was_transmitted = GNUNET_YES;
+ GNUNET_SCHEDULER_cancel (qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (GNUNET_NO == h->in_receive);
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client, &receive_cb, h,
+ GNUNET_TIME_absolute_get_remaining (qe->timeout));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# bytes sent to datastore"), 1,
+ GNUNET_NO);
+ return msize;
+}
+
+
+/**
+ * Process entries in the queue (or do nothing if we are already
+ * doing so).
+ *
+ * @param h handle to the datastore
+ */
+static void
+process_queue (struct GNUNET_DATASTORE_Handle *h)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ if (NULL == (qe = h->queue_head))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
+ return; /* no entry in queue */
+ }
+ if (qe->was_transmitted == GNUNET_YES)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
+ return; /* waiting for replies */
+ }
+ if (h->th != NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
+ return; /* request pending */
+ }
+ if (h->client == NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
+ return; /* waiting for reconnect */
+ }
+ if (GNUNET_YES == h->in_receive)
+ {
+ /* wait for response to previous query */
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
+ qe->message_size);
+ h->th =
+ GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
+ GNUNET_TIME_absolute_get_remaining
+ (qe->timeout), GNUNET_YES,
+ &transmit_request, h);
+ GNUNET_assert (GNUNET_NO == h->in_receive);
+ GNUNET_break (NULL != h->th);
+}
+
+
+/**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result
+ * @param min_expiration expiration time
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls, int32_t result,
+ struct GNUNET_TIME_Absolute min_expiration,
+ const char *emsg)
+{
+ /* do nothing */
+}
+
+
+/**
+ * Free a queue entry. Removes the given entry from the
+ * queue and releases associated resources. Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+ struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+ GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
+ if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ h->queue_size--;
+ qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
+ GNUNET_free (qe);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct StatusContext rc;
+ const struct StatusMessage *sm;
+ const char *emsg;
+ int32_t status;
+ int was_transmitted;
+
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return;
+ }
+ rc = qe->qc.sc;
+ if (msg == NULL)
+ {
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ if (was_transmitted == GNUNET_YES)
+ do_disconnect (h);
+ else
+ process_queue (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ _("Failed to receive status response from database."));
+ return;
+ }
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+ free_queue_entry (qe);
+ if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
+ (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
+ {
+ GNUNET_break (0);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ _("Error reading response from datastore service"));
+ return;
+ }
+ sm = (const struct StatusMessage *) msg;
+ status = ntohl (sm->status);
+ emsg = NULL;
+ if (ntohs (msg->size) > sizeof (struct StatusMessage))
+ {
+ emsg = (const char *) &sm[1];
+ if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')