+/**
+ * A request has timed out (before being transmitted to the service).
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
+ * @param tc scheduler context
+ */
+static void
+timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe = cls;
+
+ GNUNET_STATISTICS_update (qe->h->stats,
+ gettext_noop ("# queue entry timeouts"), 1,
+ GNUNET_NO);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (qe->was_transmitted == GNUNET_NO);
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout of request in datastore queue\n");
+#endif
+ qe->response_proc (qe->h, NULL);
+}
+
+
+/**
+ * Create a new entry for our priority queue (and possibly discard other entires if
+ * the queue is getting too long).
+ *
+ * @param h handle to the datastore
+ * @param msize size of the message to queue
+ * @param queue_priority priority of the entry
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout timeout for the operation
+ * @param response_proc function to call with replies (can be NULL)
+ * @param qc client context (NOT a closure for response_proc)
+ * @return NULL if the queue is full
+ */
+static struct GNUNET_DATASTORE_QueueEntry *
+make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
+ unsigned int queue_priority, unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_CLIENT_MessageHandler response_proc,
+ const union QueueContext *qc)
+{
+ struct GNUNET_DATASTORE_QueueEntry *ret;
+ struct GNUNET_DATASTORE_QueueEntry *pos;
+ unsigned int c;
+
+ c = 0;
+ pos = h->queue_head;
+ while ((pos != NULL) && (c < max_queue_size) &&
+ (pos->priority >= queue_priority))
+ {
+ c++;
+ pos = pos->next;
+ }
+ if (c >= max_queue_size)
+ {
+ GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
+ GNUNET_NO);
+ return NULL;
+ }
+ ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
+ ret->h = h;
+ ret->response_proc = response_proc;
+ ret->qc = *qc;
+ ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ ret->priority = queue_priority;
+ ret->max_queue = max_queue_size;
+ ret->message_size = msize;
+ ret->was_transmitted = GNUNET_NO;
+ if (pos == NULL)
+ {
+ /* append at the tail */
+ pos = h->queue_tail;
+ }
+ else
+ {
+ pos = pos->prev;
+ /* do not insert at HEAD if HEAD query was already
+ * transmitted and we are still receiving replies! */
+ if ((pos == NULL) && (h->queue_head->was_transmitted))
+ pos = h->queue_head;
+ }
+ c++;
+ GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
+ 1, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
+ h->queue_size++;
+ ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
+ pos = ret->next;
+ while (pos != NULL)
+ {
+ if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
+ {
+ GNUNET_assert (pos->response_proc != NULL);
+ /* move 'pos' element to head so that it will be
+ * killed on 'NULL' call below */
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Dropping request from datastore queue\n");
+#endif
+ GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
+ GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop
+ ("# Requests dropped from datastore queue"), 1,
+ GNUNET_NO);
+ GNUNET_assert (h->queue_head == pos);
+ pos->response_proc (h, NULL);
+ break;
+ }
+ pos = pos->next;
+ }
+ return ret;
+}
+
+
+/**
+ * Process entries in the queue (or do nothing if we are already
+ * doing so).
+ *
+ * @param h handle to the datastore
+ */
+static void process_queue (struct GNUNET_DATASTORE_Handle *h);
+
+
+/**
+ * Try reconnecting to the datastore service.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param tc scheduler context
+ */
+static void
+try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+
+ if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
+ h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
+ else
+ h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
+ if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
+ h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+ h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
+ if (h->client == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "DATASTORE reconnect failed (fatally)\n");
+ return;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop
+ ("# datastore connections (re)created"), 1,
+ GNUNET_NO);
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
+#endif
+ process_queue (h);
+}
+
+
+/**
+ * Disconnect from the service and then try reconnecting to the datastore service
+ * after some delay.
+ *
+ * @param h handle to datastore to disconnect and reconnect
+ */
+static void
+do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+{
+ if (h->client == NULL)
+ {
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "client NULL in disconnect, will not try to reconnect\n");
+#endif
+ return;
+ }
+#if 0
+ GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
+ 1, GNUNET_NO);
+#endif
+ GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+ h->skip_next_messages = 0;
+ h->client = NULL;
+ h->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
+}
+
+
+/**
+ * Function called whenever we receive a message from
+ * the service. Calls the appropriate handler.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param msg the received message
+ */
+static void
+receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ h->in_receive = GNUNET_NO;
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
+#endif
+ if (h->skip_next_messages > 0)
+ {
+ h->skip_next_messages--;
+ process_queue (h);
+ return;
+ }
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ process_queue (h);
+ return;
+ }
+ qe->response_proc (h, msg);
+}
+
+
+/**
+ * Transmit request from queue to datastore service.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param size number of bytes that can be copied to buf
+ * @param buf where to copy the drop message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_request (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ size_t msize;
+
+ h->th = NULL;
+ if (NULL == (qe = h->queue_head))
+ return 0; /* no entry in queue */
+ if (buf == NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to transmit request to DATASTORE.\n"));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# transmission request failures"),
+ 1, GNUNET_NO);
+ do_disconnect (h);
+ return 0;
+ }
+ if (size < (msize = qe->message_size))
+ {
+ process_queue (h);
+ return 0;
+ }
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitting %u byte request to DATASTORE\n", msize);
+#endif
+ memcpy (buf, &qe[1], msize);
+ qe->was_transmitted = GNUNET_YES;
+ GNUNET_SCHEDULER_cancel (qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (GNUNET_NO == h->in_receive);
+ h->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (h->client, &receive_cb, h,
+ GNUNET_TIME_absolute_get_remaining (qe->timeout));
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# bytes sent to datastore"), 1,
+ GNUNET_NO);
+ return msize;
+}
+
+
+/**
+ * Process entries in the queue (or do nothing if we are already
+ * doing so).
+ *
+ * @param h handle to the datastore
+ */
+static void
+process_queue (struct GNUNET_DATASTORE_Handle *h)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ if (NULL == (qe = h->queue_head))
+ {
+#if DEBUG_DATASTORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
+#endif
+ return; /* no entry in queue */
+ }
+ if (qe->was_transmitted == GNUNET_YES)
+ {
+#if DEBUG_DATASTORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
+#endif
+ return; /* waiting for replies */
+ }
+ if (h->th != NULL)
+ {
+#if DEBUG_DATASTORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
+#endif
+ return; /* request pending */
+ }
+ if (h->client == NULL)
+ {
+#if DEBUG_DATASTORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
+#endif
+ return; /* waiting for reconnect */
+ }
+ if (GNUNET_YES == h->in_receive)
+ {
+ /* wait for response to previous query */
+ return;
+ }
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing %u byte request to DATASTORE\n", qe->message_size);
+#endif
+ h->th =
+ GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
+ GNUNET_TIME_absolute_get_remaining
+ (qe->timeout), GNUNET_YES,
+ &transmit_request, h);
+ GNUNET_assert (GNUNET_NO == h->in_receive);
+ GNUNET_break (NULL != h->th);
+}
+
+
+/**
+ * Dummy continuation used to do nothing (but be non-zero).
+ *
+ * @param cls closure
+ * @param result result
+ * @param emsg error message
+ */
+static void
+drop_status_cont (void *cls, int32_t result, const char *emsg)
+{
+ /* do nothing */
+}
+
+
+/**
+ * Free a queue entry. Removes the given entry from the
+ * queue and releases associated resources. Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+ struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+ GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
+ if (qe->task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (qe->task);
+ qe->task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ h->queue_size--;
+ qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
+ GNUNET_free (qe);
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct StatusContext rc;
+ const struct StatusMessage *sm;
+ const char *emsg;
+ int32_t status;
+ int was_transmitted;
+
+ if (NULL == (qe = h->queue_head))
+ {
+ GNUNET_break (0);
+ do_disconnect (h);
+ return;
+ }
+ rc = qe->qc.sc;
+ if (msg == NULL)
+ {
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ if (was_transmitted == GNUNET_YES)
+ do_disconnect (h);
+ else
+ process_queue (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ _("Failed to receive status response from database."));
+ return;
+ }
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+ free_queue_entry (qe);
+ if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
+ (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
+ {
+ GNUNET_break (0);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, GNUNET_SYSERR,
+ _("Error reading response from datastore service"));
+ return;
+ }
+ sm = (const struct StatusMessage *) msg;
+ status = ntohl (sm->status);
+ emsg = NULL;
+ if (ntohs (msg->size) > sizeof (struct StatusMessage))
+ {
+ emsg = (const char *) &sm[1];
+ if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
+ {
+ GNUNET_break (0);
+ emsg = _("Invalid error message received from datastore service");
+ }
+ }
+ if ((status == GNUNET_SYSERR) && (emsg == NULL))
+ {
+ GNUNET_break (0);
+ emsg = _("Invalid error message received from datastore service");
+ }
+#if DEBUG_DATASTORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status,
+ emsg);
+#endif
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# status messages received"), 1,
+ GNUNET_NO);
+ h->retry_time.rel_value = 0;
+ process_queue (h);
+ if (rc.cont != NULL)
+ rc.cont (rc.cont_cls, status, emsg);
+}
+
+