+struct GNUNET_DATASTORE_QueueEntry *
+GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
+ const struct GNUNET_HashCode * key, size_t size,
+ const void *data, unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_ContinuationWithStatus cont,
+ void *cont_cls)
+{
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct DataMessage *dm;
+ size_t msize;
+ union QueueContext qc;
+
+ if (cont == NULL)
+ cont = &drop_status_cont;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
+ size, GNUNET_h2s (key));
+ qc.sc.cont = cont;
+ qc.sc.cont_cls = cont_cls;
+ msize = sizeof (struct DataMessage) + size;
+ GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+ qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
+ &process_status_message, &qc);
+ if (qe == NULL)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (h->stats,
+ gettext_noop ("# REMOVE requests executed"), 1,
+ GNUNET_NO);
+ dm = (struct DataMessage *) &qe[1];
+ dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+ dm->header.size = htons (msize);
+ dm->rid = htonl (0);
+ dm->size = htonl (size);
+ dm->type = htonl (0);
+ dm->priority = htonl (0);
+ dm->anonymity = htonl (0);
+ dm->uid = GNUNET_htonll (0);
+ dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
+ dm->key = *key;
+ memcpy (&dm[1], data, size);
+ process_queue (h);
+ return qe;
+}
+
+
+/**
+ * Type of a function to call when we receive a message
+ * from the service.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_DATASTORE_Handle *h = cls;
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+ struct ResultContext rc;
+ const struct DataMessage *dm;
+ int was_transmitted;
+
+ if (msg == NULL)
+ {
+ qe = h->queue_head;
+ GNUNET_assert (NULL != qe);
+ rc = qe->qc.rc;
+ was_transmitted = qe->was_transmitted;
+ free_queue_entry (qe);
+ if (was_transmitted == GNUNET_YES)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ _("Failed to receive response from database.\n"));
+ do_disconnect (h);
+ }
+ else
+ {
+ process_queue (h);
+ }
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+ 0);
+ return;
+ }
+ if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
+ {
+ GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
+ qe = h->queue_head;
+ rc = qe->qc.rc;
+ GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+ free_queue_entry (qe);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received end of result set, new queue size is %u\n", h->queue_size);
+ h->retry_time.rel_value = 0;
+ h->result_count = 0;
+ process_queue (h);
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+ 0);
+ return;
+ }
+ qe = h->queue_head;
+ GNUNET_assert (NULL != qe);
+ rc = qe->qc.rc;
+ if (GNUNET_YES != qe->was_transmitted)
+ {
+ GNUNET_break (0);
+ free_queue_entry (qe);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+ 0);
+ return;
+ }
+ if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
+ (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
+ (ntohs (msg->size) !=
+ sizeof (struct DataMessage) +
+ ntohl (((const struct DataMessage *) msg)->size)))
+ {
+ GNUNET_break (0);
+ free_queue_entry (qe);
+ h->retry_time = GNUNET_TIME_UNIT_ZERO;
+ do_disconnect (h);
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+ 0);
+ return;
+ }
+ GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
+ GNUNET_NO);
+ dm = (const struct DataMessage *) msg;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received result %llu with type %u and size %u with key %s\n",
+ (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
+ ntohl (dm->size), GNUNET_h2s (&dm->key));
+ free_queue_entry (qe);
+ h->retry_time.rel_value = 0;
+ process_queue (h);
+ if (rc.proc != NULL)
+ rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
+ ntohl (dm->priority), ntohl (dm->anonymity),
+ GNUNET_TIME_absolute_ntoh (dm->expiration),
+ GNUNET_ntohll (dm->uid));
+}
+
+
+/**
+ * Get a random value from the datastore for content replication.
+ * Returns a single, random value among those with the highest
+ * replication score, lowering positive replication scores by one for
+ * the chosen value (if only content with a replication score exists,
+ * a random value is returned and replication scores are not changed).
+ *
+ * @param h handle to the datastore
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ * (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param proc function to call on a random value; it
+ * will be called once with a value (if available)
+ * and always once with a value of NULL.
+ * @param proc_cls closure for proc
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ * cancel
+ */
+struct GNUNET_DATASTORE_QueueEntry *
+GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
+ unsigned int queue_priority,
+ unsigned int max_queue_size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_DATASTORE_DatumProcessor proc,
+ void *proc_cls)