- const struct GNUNET_MessageHeader *hdr;
- uint16_t msize;
-
- GNUNET_assert (cont != NULL);
- hdr = (const struct GNUNET_MessageHeader*) &h[1];
- msize = ntohs(hdr->size);
-#if DEBUG_DATASTORE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u byte message of type %u to datastore service\n",
- msize,
- ntohs(hdr->type));
-#endif
- GNUNET_assert (h->response_proc == NULL);
- h->response_proc = cont;
- h->response_proc_cls = cont_cls;
- h->timeout = GNUNET_TIME_relative_to_absolute (timeout);
- h->message_size = msize;
- if (NULL == GNUNET_CLIENT_notify_transmit_ready (h->client,
- msize,
- timeout,
- &transmit_get_status,
- h))
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ if (NULL == (qe = h->queue_head))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
+ return; /* no entry in queue */
+ }
+ if (qe->was_transmitted == GNUNET_YES)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
+ return; /* waiting for replies */
+ }
+ if (h->th != NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
+ return; /* request pending */
+ }
+ if (h->client == NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
+ return; /* waiting for reconnect */
+ }
+ if (GNUNET_YES == h->in_receive)
+ {
+ /* wait for response to previous query */
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
+ qe->message_size);
+ h->th =
+ GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
+ GNUNET_TIME_absolute_get_remaining
+ (qe->timeout), GNUNET_YES,
+ &transmit_request, h);
+ GNUNET_assert (GNUNET_NO == h->in_receive);
+ GNUNET_break (NULL != h->th);
+}
+
+
+/**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result
+ * @param min_expiration expiration time
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls, int32_t result,
+ struct GNUNET_TIME_Absolute min_expiration,
+ const char *emsg)
+{
+ /* do nothing */
+}
+
+
+/**
+ * Free a queue entry. Removes the given entry from the
+ * queue and releases associated resources. Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+ struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+ GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
+ if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ h->queue_size--;
+ qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
+ GNUNET_free (qe);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct StatusContext rc;
+ const struct StatusMessage *sm;
+ const char *emsg;
+ int32_t status;
+ int was_transmitted;
+
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return;
+ }
+ rc = qe->qc.sc;
+ if (msg == NULL)
+ {
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ if (was_transmitted == GNUNET_YES)
+ do_disconnect (h);
+ else
+ process_queue (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ _("Failed to receive status response from database."));
+ return;
+ }
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+ free_queue_entry (qe);
+ if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
+ (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
+ {
+ GNUNET_break (0);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ _("Error reading response from datastore service"));
+ return;
+ }
+ sm = (const struct StatusMessage *) msg;
+ status = ntohl (sm->status);
+ emsg = NULL;
+ if (ntohs (msg->size) > sizeof (struct StatusMessage))
+ {
+ emsg = (const char *) &sm[1];
+ if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')