fix indentation, typo, improve logging
[oweals/gnunet.git] / src / datastore / datastore_api.c
index a487689e8d98e3b447eebe84a0ea769c48eef27d..916e6acaef930121af9a4bfa04f0be4c72437d35 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet
 /*
      This file is part of GNUnet
-     (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
+     Copyright (C) 2004-2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
 
      You should have received a copy of the GNU General Public License
      along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-     Boston, MA 02111-1307, USA.
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
 */
 
 /**
  * @file datastore/datastore_api.c
  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
 */
 
 /**
  * @file datastore/datastore_api.c
  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
- *        a priority queue for requests (with timeouts).
+ *        a priority queue for requests
  * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_arm_service.h"
 #include "gnunet_constants.h"
 #include "gnunet_datastore_service.h"
  * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_arm_service.h"
 #include "gnunet_constants.h"
 #include "gnunet_datastore_service.h"
+#include "gnunet_statistics_service.h"
 #include "datastore.h"
 
 #include "datastore.h"
 
+#define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
+
+/**
+ * Collect an instane number of statistics?  May cause excessive IPC.
+ */
+#define INSANE_STATISTICS GNUNET_NO
+
+/**
+ * If a client stopped asking for more results, how many more do
+ * we receive from the DB before killing the connection?  Trade-off
+ * between re-doing TCP handshakes and (needlessly) receiving
+ * useless results.
+ */
+#define MAX_EXCESS_RESULTS 8
 
 /**
  * Context for processing status messages.
 
 /**
  * Context for processing status messages.
@@ -42,7 +57,7 @@ struct StatusContext
   GNUNET_DATASTORE_ContinuationWithStatus cont;
 
   /**
   GNUNET_DATASTORE_ContinuationWithStatus cont;
 
   /**
-   * Closure for cont.
+   * Closure for @e cont.
    */
   void *cont_cls;
 
    */
   void *cont_cls;
 
@@ -55,14 +70,14 @@ struct StatusContext
 struct ResultContext
 {
   /**
 struct ResultContext
 {
   /**
-   * Iterator to call with the result.
+   * Function to call with the result.
    */
    */
-  GNUNET_DATASTORE_Iterator iter;
+  GNUNET_DATASTORE_DatumProcessor proc;
 
   /**
 
   /**
-   * Closure for iter.
+   * Closure for @e proc.
    */
    */
-  void *iter_cls;
+  void *proc_cls;
 
 };
 
 
 };
 
@@ -74,13 +89,12 @@ union QueueContext
 {
 
   struct StatusContext sc;
 {
 
   struct StatusContext sc;
-  
+
   struct ResultContext rc;
 
 };
 
 
   struct ResultContext rc;
 
 };
 
 
-
 /**
  * Entry in our priority queue.
  */
 /**
  * Entry in our priority queue.
  */
@@ -102,20 +116,13 @@ struct GNUNET_DATASTORE_QueueEntry
    */
   struct GNUNET_DATASTORE_Handle *h;
 
    */
   struct GNUNET_DATASTORE_Handle *h;
 
-  /**
-   * Response processor (NULL if we are not waiting for a response).
-   * This struct should be used for the closure, function-specific
-   * arguments can be passed via 'qc'.
-   */
-  GNUNET_CLIENT_MessageHandler response_proc;
-
   /**
    * Function to call after transmission of the request.
    */
   GNUNET_DATASTORE_ContinuationWithStatus cont;
   /**
    * Function to call after transmission of the request.
    */
   GNUNET_DATASTORE_ContinuationWithStatus cont;
-   
+
   /**
   /**
-   * Closure for 'cont'.
+   * Closure for @e cont.
    */
   void *cont_cls;
 
    */
   void *cont_cls;
 
@@ -125,14 +132,10 @@ struct GNUNET_DATASTORE_QueueEntry
   union QueueContext qc;
 
   /**
   union QueueContext qc;
 
   /**
-   * Task for timeout signalling.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier task;
-
-  /**
-   * Timeout for the current operation.
+   * Envelope of the request to transmit, NULL after
+   * transmission.
    */
    */
-  struct GNUNET_TIME_Absolute timeout;
+  struct GNUNET_MQ_Envelope *env;
 
   /**
    * Priority in the queue.
 
   /**
    * Priority in the queue.
@@ -146,24 +149,15 @@ struct GNUNET_DATASTORE_QueueEntry
   unsigned int max_queue;
 
   /**
   unsigned int max_queue;
 
   /**
-   * Number of bytes in the request message following
-   * this struct.  32-bit value for nicer memory
-   * access (and overall struct alignment).
+   * Expected response type.
    */
    */
-  uint32_t message_size;
-
-  /**
-   * Has this message been transmitted to the service?
-   * Only ever GNUNET_YES for the head of the queue.
-   * Note that the overall struct should end at a 
-   * multiple of 64 bits.
-   */
-  int32_t was_transmitted;
+  uint16_t response_type;
 
 };
 
 
 };
 
+
 /**
 /**
- * Handle to the datastore service. 
+ * Handle to the datastore service.
  */
 struct GNUNET_DATASTORE_Handle
 {
  */
 struct GNUNET_DATASTORE_Handle
 {
@@ -173,20 +167,15 @@ struct GNUNET_DATASTORE_Handle
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
-  /**
-   * Our scheduler.
-   */
-  struct GNUNET_SCHEDULER_Handle *sched;
-
   /**
    * Current connection to the datastore service.
    */
   /**
    * Current connection to the datastore service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
 
   /**
-   * Current transmit handle.
+   * Handle for statistics.
    */
    */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_STATISTICS_Handle *stats;
 
   /**
    * Current head of priority queue.
 
   /**
    * Current head of priority queue.
@@ -201,7 +190,7 @@ struct GNUNET_DATASTORE_Handle
   /**
    * Task for trying to reconnect.
    */
   /**
    * Task for trying to reconnect.
    */
-  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
   /**
    * How quickly should we retry?  Used for exponential back-off on
 
   /**
    * How quickly should we retry?  Used for exponential back-off on
@@ -214,69 +203,188 @@ struct GNUNET_DATASTORE_Handle
    */
   unsigned int queue_size;
 
    */
   unsigned int queue_size;
 
+  /**
+   * Number of results we're receiving for the current query
+   * after application stopped to care.  Used to determine when
+   * to reset the connection.
+   */
+  unsigned int result_count;
+
+  /**
+   * We should ignore the next message(s) from the service.
+   */
+  unsigned int skip_next_messages;
+
 };
 
 
 };
 
 
+/**
+ * Try reconnecting to the datastore service.
+ *
+ * @param cls the `struct GNUNET_DATASTORE_Handle`
+ */
+static void
+try_reconnect (void *cls);
+
+
+/**
+ * Disconnect from the service and then try reconnecting to the datastore service
+ * after some delay.
+ *
+ * @param h handle to datastore to disconnect and reconnect
+ */
+static void
+do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+{
+  if (NULL == h->mq)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  GNUNET_MQ_destroy (h->mq);
+  h->mq = NULL;
+  h->skip_next_messages = 0;
+  h->reconnect_task
+    = GNUNET_SCHEDULER_add_delayed (h->retry_time,
+                                    &try_reconnect,
+                                    h);
+}
+
+
+/**
+ * Free a queue entry.  Removes the given entry from the
+ * queue and releases associated resources.  Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
+static void
+free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
+{
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
+
+  GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                               h->queue_tail,
+                               qe);
+  h->queue_size--;
+  if (NULL != qe->env)
+    GNUNET_MQ_discard (qe->env);
+  GNUNET_free (qe);
+}
+
+
+/**
+ * Handle error in sending drop request to datastore.
+ *
+ * @param cls closure with the datastore handle
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "MQ error, reconnecting to DATASTORE\n");
+  do_disconnect (h);
+  qe = h->queue_head;
+  if ( (NULL != qe) &&
+       (NULL == qe->env) )
+  {
+    union QueueContext qc = qe->qc;
+    uint16_t rt = qe->response_type;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Failed to receive response from database.\n");
+    free_queue_entry (qe);
+    switch (rt)
+    {
+    case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
+      if (NULL != qc.sc.cont)
+        qc.sc.cont (qc.sc.cont_cls,
+                    GNUNET_SYSERR,
+                    GNUNET_TIME_UNIT_ZERO_ABS,
+                    _("DATASTORE disconnected"));
+      break;
+    case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
+      if (NULL != qc.rc.proc)
+        qc.rc.proc (qc.rc.proc_cls,
+                    NULL,
+                    0,
+                    NULL, 0, 0, 0,
+                    GNUNET_TIME_UNIT_ZERO_ABS,
+                    0);
+      break;
+    default:
+      GNUNET_break (0);
+    }
+  }
+}
+
 
 /**
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
 
 /**
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
- * @param sched scheduler to use
  * @return handle to use to access the service
  */
 struct GNUNET_DATASTORE_Handle *
  * @return handle to use to access the service
  */
 struct GNUNET_DATASTORE_Handle *
-GNUNET_DATASTORE_connect (const struct
-                         GNUNET_CONFIGURATION_Handle
-                         *cfg,
-                         struct
-                         GNUNET_SCHEDULER_Handle
-                         *sched)
+GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
 {
-  struct GNUNET_CLIENT_Connection *c;
   struct GNUNET_DATASTORE_Handle *h;
   struct GNUNET_DATASTORE_Handle *h;
-  
-  c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
-  if (c == NULL)
-    return NULL; /* oops */
-  h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
-                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  h->client = c;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Establishing DATASTORE connection!\n");
+  h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
   h->cfg = cfg;
   h->cfg = cfg;
-  h->sched = sched;
+  try_reconnect (h);
+  if (NULL == h->mq)
+  {
+    GNUNET_free (h);
+    return NULL;
+  }
+  h->stats = GNUNET_STATISTICS_create ("datastore-api",
+                                       cfg);
   return h;
 }
 
 
 /**
   return h;
 }
 
 
 /**
- * Transmit DROP message to datastore service.
+ * Task used by to disconnect from the datastore after
+ * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
  *
  *
- * @param cls the 'struct GNUNET_DATASTORE_Handle'
- * @param size number of bytes that can be copied to buf
- * @param buf where to copy the drop message
- * @return number of bytes written to buf
+ * @param cls the datastore handle
  */
  */
-static size_t
-transmit_drop (void *cls,
-              size_t size, 
-              void *buf)
+static void
+disconnect_after_drop (void *cls)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
-  struct GNUNET_MessageHeader *hdr;
-  
-  if (buf == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to drop database.\n"));
-      GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
-      return 0;
-    }
-  GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
-  hdr = buf;
-  hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
-  hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
-  GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
-  return sizeof(struct GNUNET_MessageHeader);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Drop sent, disconnecting\n");
+  GNUNET_DATASTORE_disconnect (h,
+                               GNUNET_NO);
+}
+
+
+/**
+ * Handle error in sending drop request to datastore.
+ *
+ * @param cls closure with the datastore handle
+ * @param error error code
+ */
+static void
+disconnect_on_mq_error (void *cls,
+                        enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "Failed to ask datastore to drop tables\n");
+  GNUNET_DATASTORE_disconnect (h,
+                               GNUNET_NO);
 }
 
 
 }
 
 
@@ -285,387 +393,522 @@ transmit_drop (void *cls,
  * associated resources).
  *
  * @param h handle to the datastore
  * associated resources).
  *
  * @param h handle to the datastore
- * @param drop set to GNUNET_YES to delete all data in datastore (!)
+ * @param drop set to #GNUNET_YES to delete all data in datastore (!)
  */
  */
-void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
-                                 int drop)
+void
+GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
+                             int drop)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
-  if (h->client != NULL)
-    {
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = NULL;
-    }
-  if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              h->reconnect_task);
-      h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-    }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Datastore disconnect\n");
+  if (NULL != h->mq)
+  {
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
+  }
+  if (NULL != h->reconnect_task)
+  {
+    GNUNET_SCHEDULER_cancel (h->reconnect_task);
+    h->reconnect_task = NULL;
+  }
   while (NULL != (qe = h->queue_head))
   while (NULL != (qe = h->queue_head))
+  {
+    switch (qe->response_type)
     {
     {
-      GNUNET_assert (NULL != qe->response_proc);
-      qe->response_proc (qe, NULL);
+    case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
+      if (NULL != qe->qc.sc.cont)
+        qe->qc.sc.cont (qe->qc.sc.cont_cls,
+                        GNUNET_SYSERR,
+                        GNUNET_TIME_UNIT_ZERO_ABS,
+                        _("Disconnected from DATASTORE"));
+      break;
+    case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
+      if (NULL != qe->qc.rc.proc)
+        qe->qc.rc.proc (qe->qc.rc.proc_cls,
+                        NULL,
+                        0,
+                        NULL, 0, 0, 0,
+                        GNUNET_TIME_UNIT_ZERO_ABS,
+                        0);
+      break;
+    default:
+      GNUNET_break (0);
     }
     }
-  if (GNUNET_YES == drop) 
+    free_queue_entry (qe);
+  }
+  if (GNUNET_YES == drop)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Re-connecting to issue DROP!\n");
+    GNUNET_assert (NULL == h->mq);
+    h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                   "datastore",
+                                   NULL,
+                                   &disconnect_on_mq_error,
+                                   h);
+    if (NULL != h->mq)
     {
     {
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      if (h->client != NULL)
-       {
-         if (NULL != 
-             GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                  sizeof(struct GNUNET_MessageHeader),
-                                                  GNUNET_TIME_UNIT_MINUTES,
-                                                  GNUNET_YES,
-                                                  &transmit_drop,
-                                                  h))
-           return;
-         GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-       }
-      GNUNET_break (0);
+      struct GNUNET_MessageHeader *hdr;
+      struct GNUNET_MQ_Envelope *env;
+
+      env = GNUNET_MQ_msg (hdr,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
+      GNUNET_MQ_notify_sent (env,
+                             &disconnect_after_drop,
+                             h);
+      GNUNET_MQ_send (h->mq,
+                      env);
+      return;
     }
     }
+    GNUNET_break (0);
+  }
+  GNUNET_STATISTICS_destroy (h->stats,
+                             GNUNET_NO);
+  h->stats = NULL;
   GNUNET_free (h);
 }
 
 
   GNUNET_free (h);
 }
 
 
-/**
- * A request has timed out (before being transmitted to the service).
- *
- * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
- * @param tc scheduler context
- */
-static void
-timeout_queue_entry (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-
-  qe->task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_assert (qe->was_transmitted == GNUNET_NO);
-  qe->response_proc (qe, NULL);
-}
-
-
 /**
  * Create a new entry for our priority queue (and possibly discard other entires if
  * the queue is getting too long).
  *
  * @param h handle to the datastore
 /**
  * Create a new entry for our priority queue (and possibly discard other entires if
  * the queue is getting too long).
  *
  * @param h handle to the datastore
- * @param msize size of the message to queue
+ * @param env envelope with the message to queue
  * @param queue_priority priority of the entry
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param queue_priority priority of the entry
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
- * @param response_proc function to call with replies (can be NULL)
- * @param qc client context (NOT a closure for response_proc)
- * @return NULL if the queue is full (and this entry was dropped)
+ * @param expected_type which type of response do we expect,
+ *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
+ *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
+ * @param qc client context (NOT a closure for @a response_proc)
+ * @return NULL if the queue is full
  */
 static struct GNUNET_DATASTORE_QueueEntry *
 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
  */
 static struct GNUNET_DATASTORE_QueueEntry *
 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
-                 size_t msize,
-                 unsigned int queue_priority,
-                 unsigned int max_queue_size,
-                 struct GNUNET_TIME_Relative timeout,
-                 GNUNET_CLIENT_MessageHandler response_proc,            
-                 const union QueueContext *qc)
+                  struct GNUNET_MQ_Envelope *env,
+                  unsigned int queue_priority,
+                  unsigned int max_queue_size,
+                  uint16_t expected_type,
+                  const union QueueContext *qc)
 {
 {
-  struct GNUNET_DATASTORE_QueueEntry *ret;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_DATASTORE_QueueEntry *pos;
   unsigned int c;
 
   struct GNUNET_DATASTORE_QueueEntry *pos;
   unsigned int c;
 
-  c = 0;
-  pos = h->queue_head;
-  while ( (pos != NULL) &&
-         (c < max_queue_size) &&
-         (pos->priority >= queue_priority) )
-    {
-      c++;
-      pos = pos->next;
-    }
-  ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
-  ret->h = h;
-  ret->response_proc = response_proc;
-  ret->qc = *qc;
-  ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
-  ret->priority = queue_priority;
-  ret->max_queue = max_queue_size;
-  ret->message_size = msize;
-  ret->was_transmitted = GNUNET_NO;
-  if (pos == NULL)
-    {
-      /* append at the tail */
-      pos = h->queue_tail;
-    }
+  if ( (NULL != h->queue_tail) &&
+       (h->queue_tail->priority >= queue_priority) )
+  {
+    c = h->queue_size;
+    pos = NULL;
+  }
   else
   else
-    {
-      pos = pos->prev; 
-      /* do not insert at HEAD if HEAD query was already
-        transmitted and we are still receiving replies! */
-      if ( (pos == NULL) &&
-          (h->queue_head->was_transmitted) )
-       pos = h->queue_head;
-    }
+  {
+    c = 0;
+    pos = h->queue_head;
+  }
+  while ( (NULL != pos) &&
+          (c < max_queue_size) &&
+          (pos->priority >= queue_priority) )
+  {
+    c++;
+    pos = pos->next;
+  }
+  if (c >= max_queue_size)
+  {
+    GNUNET_STATISTICS_update (h->stats,
+                              gettext_noop ("# queue overflows"),
+                              1,
+                              GNUNET_NO);
+    GNUNET_MQ_discard (env);
+    return NULL;
+  }
+  qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
+  qe->h = h;
+  qe->env = env;
+  qe->response_type = expected_type;
+  qe->qc = *qc;
+  qe->priority = queue_priority;
+  qe->max_queue = max_queue_size;
+  if (NULL == pos)
+  {
+    /* append at the tail */
+    pos = h->queue_tail;
+  }
+  else
+  {
+    pos = pos->prev;
+    /* do not insert at HEAD if HEAD query was already
+     * transmitted and we are still receiving replies! */
+    if ( (NULL == pos) &&
+         (NULL == h->queue_head->env) )
+      pos = h->queue_head;
+  }
   c++;
   c++;
+#if INSANE_STATISTICS
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# queue entries created"),
+                            1,
+                            GNUNET_NO);
+#endif
   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
-                                    h->queue_tail,
-                                    pos,
-                                    ret);
+                                     h->queue_tail,
+                                     pos,
+                                     qe);
   h->queue_size++;
   h->queue_size++;
-  if (c > max_queue_size)
-    {
-      response_proc (ret, NULL);
-      return NULL;
-    }
-  ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                           timeout,
-                                           &timeout_queue_entry,
-                                           ret);
-  pos = ret->next;
-  while (pos != NULL) 
-    {
-      if (pos->max_queue < h->queue_size)
-       {
-         GNUNET_assert (pos->response_proc != NULL);
-         pos->response_proc (pos, NULL);
-         break;
-       }
-      pos = pos->next;
-    }
-  return ret;
+  return qe;
 }
 
 
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
 }
 
 
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
- * 
+ *
  * @param h handle to the datastore
  */
 static void
  * @param h handle to the datastore
  */
 static void
-process_queue (struct GNUNET_DATASTORE_Handle *h);
+process_queue (struct GNUNET_DATASTORE_Handle *h)
+{
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  if (NULL == (qe = h->queue_head))
+  {
+    /* no entry in queue */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Queue empty\n");
+    return;
+  }
+  if (NULL == qe->env)
+  {
+    /* waiting for replies */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Head request already transmitted\n");
+    return;
+  }
+  if (NULL == h->mq)
+  {
+    /* waiting for reconnect */
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Not connected\n");
+    return;
+  }
+  GNUNET_MQ_send (h->mq,
+                  qe->env);
+  qe->env = NULL;
+}
+
+
 
 
 /**
 
 
 /**
- * Try reconnecting to the datastore service.
+ * Function called to check status message from the service.
+ *
+ * @param cls closure
+ * @param sm status message received
+ * @return #GNUNET_OK if the message is well-formed
+ */
+static int
+check_status (void *cls,
+              const struct StatusMessage *sm)
+{
+  uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
+  int32_t status = ntohl (sm->status);
+
+  if (msize > 0)
+  {
+    const char *emsg = (const char *) &sm[1];
+
+    if ('\0' != emsg[msize - 1])
+    {
+      GNUNET_break (0);
+      return GNUNET_SYSERR;
+    }
+  }
+  else if (GNUNET_SYSERR == status)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called to handle status message from the service.
  *
  *
- * @param cls the 'struct GNUNET_DATASTORE_Handle'
- * @param tc scheduler context
+ * @param cls closure
+ * @param sm status message received
  */
 static void
  */
 static void
-try_reconnect (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
+handle_status (void *cls,
+               const struct StatusMessage *sm)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct StatusContext rc;
+  const char *emsg;
+  int32_t status = ntohl (sm->status);
 
 
-  if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
-    h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
-  else
-    h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
-  if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
-    h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
-  h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-  if (h->client == NULL)
+  if (h->skip_next_messages > 0)
+  {
+    h->skip_next_messages--;
+    process_queue (h);
     return;
     return;
+  }
+  if (NULL == (qe = h->queue_head))
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (NULL != qe->env)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  rc = qe->qc.sc;
+  free_queue_entry (qe);
+  if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
+    emsg = (const char *) &sm[1];
+  else
+    emsg = NULL;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received status %d/%s\n",
+       (int) status,
+       emsg);
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# status messages received"),
+                            1,
+                            GNUNET_NO);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
   process_queue (h);
   process_queue (h);
+  if (NULL != rc.cont)
+    rc.cont (rc.cont_cls,
+             status,
+            GNUNET_TIME_absolute_ntoh (sm->min_expiration),
+            emsg);
 }
 
 
 /**
 }
 
 
 /**
- * Disconnect from the service and then try reconnecting to the datastore service
- * after some delay.
+ * Check data message we received from the service.
  *
  *
- * @param h handle to datastore to disconnect and reconnect
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param dm message received
  */
  */
-static void
-do_disconnect (struct GNUNET_DATASTORE_Handle *h)
+static int
+check_data (void *cls,
+            const struct DataMessage *dm)
 {
 {
-  if (h->client == NULL)
-    return;
-  GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-  h->client = NULL;
-  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                                   h->retry_time,
-                                                   &try_reconnect,
-                                                   h);      
+  uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
+
+  if (msize != ntohl (dm->size))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
 }
 
 
 /**
 }
 
 
 /**
- * Transmit request from queue to datastore service.
+ * Handle data message we got from the service.
  *
  *
- * @param cls the 'struct GNUNET_DATASTORE_Handle'
- * @param size number of bytes that can be copied to buf
- * @param buf where to copy the drop message
- * @return number of bytes written to buf
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param dm message received
  */
  */
-static size_t
-transmit_request (void *cls,
-                 size_t size, 
-                 void *buf)
+static void
+handle_data (void *cls,
+             const struct DataMessage *dm)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  size_t msize;
+  struct ResultContext rc;
 
 
-  h->th = NULL;
-  if (NULL == (qe = h->queue_head))
-    return 0; /* no entry in queue */
-  if (buf == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to database.\n"));
-      do_disconnect (h);
-      return 0;
-    }
-  if (size < (msize = qe->message_size))
-    {
-      process_queue (h);
-      return 0;
-    }
-  memcpy (buf, &qe[1], msize);
-  qe->was_transmitted = GNUNET_YES;
-  GNUNET_SCHEDULER_cancel (h->sched,
-                          qe->task);
-  qe->task = GNUNET_SCHEDULER_NO_TASK;
-  GNUNET_CLIENT_receive (h->client,
-                        qe->response_proc,
-                        qe,
-                        GNUNET_TIME_absolute_get_remaining (qe->timeout));
-  return msize;
+  if (h->skip_next_messages > 0)
+  {
+    process_queue (h);
+    return;
+  }
+  qe = h->queue_head;
+  if (NULL == qe)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (NULL != qe->env)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+#if INSANE_STATISTICS
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# Results received"),
+                            1,
+                            GNUNET_NO);
+#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received result %llu with type %u and size %u with key %s\n",
+       (unsigned long long) GNUNET_ntohll (dm->uid),
+       ntohl (dm->type),
+       ntohl (dm->size),
+       GNUNET_h2s (&dm->key));
+  rc = qe->qc.rc;
+  free_queue_entry (qe);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  process_queue (h);
+  if (NULL != rc.proc)
+    rc.proc (rc.proc_cls,
+             &dm->key,
+             ntohl (dm->size),
+             &dm[1],
+             ntohl (dm->type),
+             ntohl (dm->priority),
+             ntohl (dm->anonymity),
+             GNUNET_TIME_absolute_ntoh (dm->expiration),
+             GNUNET_ntohll (dm->uid));
 }
 
 
 /**
 }
 
 
 /**
- * Process entries in the queue (or do nothing if we are already
- * doing so).
- * 
- * @param h handle to the datastore
+ * Type of a function to call when we receive a
+ * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
+ *
+ * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
+ * @param msg message received
  */
 static void
  */
 static void
-process_queue (struct GNUNET_DATASTORE_Handle *h)
+handle_data_end (void *cls,
+                 const struct GNUNET_MessageHeader *msg)
 {
 {
+  struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct ResultContext rc;
 
 
-  if (NULL == (qe = h->queue_head))
-    return; /* no entry in queue */
-  if (qe->was_transmitted == GNUNET_YES)
-    return; /* waiting for replies */
-  if (h->th != NULL)
-    return; /* request pending */
-  if (h->client == NULL)
-    return; /* waiting for reconnect */
-  h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                              qe->message_size,
-                                              GNUNET_TIME_absolute_get_remaining (qe->timeout),
-                                              GNUNET_YES,
-                                              &transmit_request,
-                                              h);
+  if (h->skip_next_messages > 0)
+  {
+    h->skip_next_messages--;
+    process_queue (h);
+    return;
+  }
+  qe = h->queue_head;
+  if (NULL == qe)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (NULL != qe->env)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  rc = qe->qc.rc;
+  free_queue_entry (qe);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Received end of result set, new queue size is %u\n",
+       h->queue_size);
+  h->retry_time = GNUNET_TIME_UNIT_ZERO;
+  h->result_count = 0;
+  process_queue (h);
+  /* signal end of iteration */
+  if (NULL != rc.proc)
+    rc.proc (rc.proc_cls,
+             NULL,
+             0,
+             NULL,
+             0,
+             0,
+             0,
+             GNUNET_TIME_UNIT_ZERO_ABS,
+             0);
 }
 
 
 /**
 }
 
 
 /**
- * Dummy continuation used to do nothing (but be non-zero).
+ * Try reconnecting to the datastore service.
  *
  *
- * @param cls closure
- * @param result result 
- * @param emsg error message
+ * @param cls the `struct GNUNET_DATASTORE_Handle`
  */
 static void
  */
 static void
-drop_status_cont (void *cls, int result, const char *emsg)
+try_reconnect (void *cls)
 {
 {
-  /* do nothing */
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (status,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                           struct StatusMessage,
+                           h),
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                           struct DataMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (data_end,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
+                             struct GNUNET_MessageHeader,
+                             h),
+    GNUNET_MQ_handler_end ()
+  };
+
+  h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
+  h->reconnect_task = NULL;
+  GNUNET_assert (NULL == h->mq);
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
+                                 "datastore",
+                                 handlers,
+                                 &mq_error_handler,
+                                 h);
+  if (NULL == h->mq)
+    return;
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# datastore connections (re)created"),
+                            1,
+                            GNUNET_NO);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Reconnected to DATASTORE\n");
+  process_queue (h);
 }
 
 
 }
 
 
-static void
-free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
-{
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
-  if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              qe->task);
-      qe->task = GNUNET_SCHEDULER_NO_TASK;
-    }
-  h->queue_size--;
-  GNUNET_free (qe);
-}
-
 /**
 /**
- * Type of a function to call when we receive a message
- * from the service.
+ * Dummy continuation used to do nothing (but be non-zero).
  *
  * @param cls closure
  *
  * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
+ * @param result result
+ * @param min_expiration expiration time
+ * @param emsg error message
  */
  */
-static void 
-process_status_message (void *cls,
-                       const struct
-                       GNUNET_MessageHeader * msg)
+static void
+drop_status_cont (void *cls,
+                  int32_t result,
+                 struct GNUNET_TIME_Absolute min_expiration,
+                 const char *emsg)
 {
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct StatusContext rc = qe->qc.sc;
-  const struct StatusMessage *sm;
-  const char *emsg;
-  int32_t status;
-
-  free_queue_entry (qe);
-  if (msg == NULL)
-    {      
-      if (NULL == h->client)
-       return; /* forced disconnect */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to receive response from database.\n"));
-      do_disconnect (h);
-      return;
-    }
-
-  if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
-    {
-      GNUNET_break (0);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      rc.cont (rc.cont_cls, 
-              GNUNET_SYSERR,
-              _("Error reading response from datastore service"));
-      return;
-    }
-  sm = (const struct StatusMessage*) msg;
-  status = ntohl(sm->status);
-  emsg = NULL;
-  if (ntohs(msg->size) > sizeof(struct StatusMessage))
-    {
-      emsg = (const char*) &sm[1];
-      if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
-       {
-         GNUNET_break (0);
-         emsg = _("Invalid error message received from datastore service");
-       }
-    }  
-  if ( (status == GNUNET_SYSERR) &&
-       (emsg == NULL) )
-    {
-      GNUNET_break (0);
-      emsg = _("Invalid error message received from datastore service");
-    }
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received status %d/%s\n",
-             (int) status,
-             emsg);
-#endif
-  rc.cont (rc.cont_cls, 
-          status,
-          emsg);
-  process_queue (h);
+  /* do nothing */
 }
 
 
 }
 
 
@@ -683,65 +926,84 @@ process_status_message (void *cls,
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication how often should the content be replicated to other peers?
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout timeout for the operation
  * @param cont continuation to call when done
  * @param cont continuation to call when done
- * @param cont_cls closure for cont
+ * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
-                     int rid,
-                      const GNUNET_HashCode * key,
-                      uint32_t size,
+                      uint32_t rid,
+                      const struct GNUNET_HashCode *key,
+                      size_t size,
                       const void *data,
                       enum GNUNET_BLOCK_Type type,
                       uint32_t priority,
                       uint32_t anonymity,
                       const void *data,
                       enum GNUNET_BLOCK_Type type,
                       uint32_t priority,
                       uint32_t anonymity,
+                      uint32_t replication,
                       struct GNUNET_TIME_Absolute expiration,
                       struct GNUNET_TIME_Absolute expiration,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                      struct GNUNET_TIME_Relative timeout,
-                     GNUNET_DATASTORE_ContinuationWithStatus cont,
-                     void *cont_cls)
+                      unsigned int queue_priority,
+                      unsigned int max_queue_size,
+                      GNUNET_DATASTORE_ContinuationWithStatus cont,
+                      void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct DataMessage *dm;
   struct DataMessage *dm;
-  size_t msize;
   union QueueContext qc;
 
   union QueueContext qc;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to put %u bytes of data under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
-#endif
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to put %u bytes of data under key `%s' for %s\n",
+       size,
+       GNUNET_h2s (key),
+       GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
+                                              GNUNET_YES));
+  env = GNUNET_MQ_msg_extra (dm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
+  dm->rid = htonl (rid);
+  dm->size = htonl ((uint32_t) size);
+  dm->type = htonl (type);
+  dm->priority = htonl (priority);
+  dm->anonymity = htonl (anonymity);
+  dm->replication = htonl (replication);
+  dm->reserved = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
+  dm->key = *key;
+  GNUNET_memcpy (&dm[1],
+          data,
+          size);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, msize,
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
-  if (qe == NULL)
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry for PUT\n");
     return NULL;
     return NULL;
-  dm = (struct DataMessage* ) &qe[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(rid);
-  dm->size = htonl(size);
-  dm->type = htonl(type);
-  dm->priority = htonl(priority);
-  dm->anonymity = htonl(anonymity);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(expiration);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# PUT requests executed"),
+                            1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
   process_queue (h);
   return qe;
 }
@@ -755,51 +1017,54 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
  * @param h handle to the datastore
  * @param amount how much space (in bytes) should be reserved (for content only)
  * @param entries how many entries will be created (to calculate per-entry overhead)
  * @param h handle to the datastore
  * @param amount how much space (in bytes) should be reserved (for content only)
  * @param entries how many entries will be created (to calculate per-entry overhead)
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response (or before dying in queue)
  * @param cont continuation to call when done; "success" will be set to
  *             a positive reservation value if space could be reserved.
  * @param cont continuation to call when done; "success" will be set to
  *             a positive reservation value if space could be reserved.
- * @param cont_cls closure for cont
+ * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
-                         uint64_t amount,
-                         uint32_t entries,
-                         unsigned int queue_priority,
-                         unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
-                         GNUNET_DATASTORE_ContinuationWithStatus cont,
-                         void *cont_cls)
+                          uint64_t amount,
+                          uint32_t entries,
+                          GNUNET_DATASTORE_ContinuationWithStatus cont,
+                          void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct ReserveMessage *rm;
   union QueueContext qc;
 
   struct ReserveMessage *rm;
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (NULL == cont)
     cont = &drop_status_cont;
     cont = &drop_status_cont;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to reserve %llu bytes of data and %u entries'\n",
-             (unsigned long long) amount,
-             (unsigned int) entries);
-#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to reserve %llu bytes of data and %u entries\n",
+       (unsigned long long) amount,
+       (unsigned int) entries);
+  env = GNUNET_MQ_msg (rm,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
+  rm->entries = htonl (entries);
+  rm->amount = GNUNET_htonll (amount);
+
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct ReserveMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
-  if (qe == NULL)
+  qe = make_queue_entry (h,
+                         env,
+                         UINT_MAX,
+                         UINT_MAX,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry to reserve\n");
     return NULL;
     return NULL;
-  rm = (struct ReserveMessage*) &qe[1];
-  rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
-  rm->header.size = htons(sizeof (struct ReserveMessage));
-  rm->entries = htonl(entries);
-  rm->amount = GNUNET_htonll(amount);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# RESERVE requests executed"),
+                            1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
   process_queue (h);
   return qe;
 }
@@ -819,430 +1084,372 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont continuation to call when done
- * @param cont_cls closure for cont
+ * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
-                                 int rid,
-                                 unsigned int queue_priority,
-                                 unsigned int max_queue_size,
-                                 struct GNUNET_TIME_Relative timeout,
-                                 GNUNET_DATASTORE_ContinuationWithStatus cont,
-                                 void *cont_cls)
+                                  uint32_t rid,
+                                  unsigned int queue_priority,
+                                  unsigned int max_queue_size,
+                                  GNUNET_DATASTORE_ContinuationWithStatus cont,
+                                  void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
   struct ReleaseReserveMessage *rrm;
   union QueueContext qc;
 
   struct ReleaseReserveMessage *rrm;
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (NULL == cont)
     cont = &drop_status_cont;
     cont = &drop_status_cont;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to release reserve %d\n",
-             rid);
-#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to release reserve %d\n",
+       rid);
+  env = GNUNET_MQ_msg (rrm,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
+  rrm->rid = htonl (rid);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
-  if (qe == NULL)
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry to release reserve\n");
     return NULL;
     return NULL;
-  rrm = (struct ReleaseReserveMessage*) &qe[1];
-  rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
-  rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
-  rrm->rid = htonl(rid);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# RELEASE RESERVE requests executed"), 1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
 
 
 /**
   process_queue (h);
   return qe;
 }
 
 
 /**
- * Update a value in the datastore.
+ * Explicitly remove some content from the database.
+ * The @a cont continuation will be called with `status`
+ * #GNUNET_OK" if content was removed, #GNUNET_NO
+ * if no matching entry was found and #GNUNET_SYSERR
+ * on all other types of errors.
  *
  * @param h handle to the datastore
  *
  * @param h handle to the datastore
- * @param uid identifier for the value
- * @param priority how much to increase the priority of the value
- * @param expiration new expiration value should be MAX of existing and this argument
+ * @param key key for the value
+ * @param size number of bytes in data
+ * @param data content stored
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
  * @param cont continuation to call when done
  * @param cont continuation to call when done
- * @param cont_cls closure for cont
+ * @param cont_cls closure for @a cont
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  *         cancel; note that even if NULL is returned, the callback will be invoked
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
-                        unsigned long long uid,
-                        uint32_t priority,
-                        struct GNUNET_TIME_Absolute expiration,
-                        unsigned int queue_priority,
-                        unsigned int max_queue_size,
-                        struct GNUNET_TIME_Relative timeout,
-                        GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls)
+GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
+                         const struct GNUNET_HashCode *key,
+                         size_t size,
+                         const void *data,
+                         unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         GNUNET_DATASTORE_ContinuationWithStatus cont,
+                         void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct UpdateMessage *um;
+  struct DataMessage *dm;
+  struct GNUNET_MQ_Envelope *env;
   union QueueContext qc;
 
   union QueueContext qc;
 
-  if (cont == NULL)
+  if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return NULL;
+  }
+  if (NULL == cont)
     cont = &drop_status_cont;
     cont = &drop_status_cont;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
-             uid,
-             (unsigned int) priority,
-             (unsigned long long) expiration.value);
-#endif
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to remove %u bytes under key `%s'\n",
+       size,
+       GNUNET_h2s (key));
+  env = GNUNET_MQ_msg_extra (dm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+  dm->rid = htonl (0);
+  dm->size = htonl (size);
+  dm->type = htonl (0);
+  dm->priority = htonl (0);
+  dm->anonymity = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
+  dm->key = *key;
+  GNUNET_memcpy (&dm[1],
+          data,
+          size);
+
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct UpdateMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
-  if (qe == NULL)
+
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry for REMOVE\n");
     return NULL;
     return NULL;
-  um = (struct UpdateMessage*) &qe[1];
-  um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
-  um->header.size = htons(sizeof (struct UpdateMessage));
-  um->priority = htonl(priority);
-  um->expiration = GNUNET_TIME_absolute_hton(expiration);
-  um->uid = GNUNET_htonll(uid);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# REMOVE requests executed"),
+                            1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
 
 
   process_queue (h);
   return qe;
 }
 
 
+
 /**
 /**
- * Explicitly remove some content from the database.
- * The "cont"inuation will be called with status
- * "GNUNET_OK" if content was removed, "GNUNET_NO"
- * if no matching entry was found and "GNUNET_SYSERR"
- * on all other types of errors.
+ * Get a random value from the datastore for content replication.
+ * Returns a single, random value among those with the highest
+ * replication score, lowering positive replication scores by one for
+ * the chosen value (if only content with a replication score exists,
+ * a random value is returned and replication scores are not changed).
  *
  * @param h handle to the datastore
  *
  * @param h handle to the datastore
- * @param key key for the value
- * @param size number of bytes in data
- * @param data content stored
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param cont continuation to call when done
- * @param cont_cls closure for cont
+ * @param proc function to call on a random value; it
+ *        will be called once with a value (if available)
+ *        and always once with a value of NULL.
+ * @param proc_cls closure for @a proc
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
-                         const GNUNET_HashCode *key,
-                         uint32_t size, 
-                        const void *data,
-                        unsigned int queue_priority,
-                        unsigned int max_queue_size,
-                        struct GNUNET_TIME_Relative timeout,
-                        GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls)
+GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
+                                      unsigned int queue_priority,
+                                      unsigned int max_queue_size,
+                                      GNUNET_DATASTORE_DatumProcessor proc,
+                                      void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct DataMessage *dm;
-  size_t msize;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *m;
   union QueueContext qc;
 
   union QueueContext qc;
 
-  if (cont == NULL)
-    cont = &drop_status_cont;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to remove %u bytes under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
-#endif
-  qc.sc.cont = cont;
-  qc.sc.cont_cls = cont_cls;
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  qe = make_queue_entry (h, msize,
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
-  if (qe == NULL)
+  GNUNET_assert (NULL != proc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to get replication entry\n");
+  env = GNUNET_MQ_msg (m,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry for GET REPLICATION\n");
     return NULL;
     return NULL;
-  dm = (struct DataMessage*) &qe[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(0);
-  dm->size = htonl(size);
-  dm->type = htonl(0);
-  dm->priority = htonl(0);
-  dm->anonymity = htonl(0);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
-  dm->key = *key;
-  memcpy (&dm[1], data, size);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# GET REPLICATION requests executed"), 1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
 
 
 /**
   process_queue (h);
   return qe;
 }
 
 
 /**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
-static void 
-process_result_message (void *cls,
-                       const struct GNUNET_MessageHeader * msg)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct ResultContext rc = qe->qc.rc;
-  const struct DataMessage *dm;
-  int was_transmitted;
-
-  if (msg == NULL)
-    {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Failed to receive response from datastore or queue full\n");
-#endif
-      was_transmitted = qe->was_transmitted;
-      free_queue_entry (qe);
-      if (GNUNET_YES == was_transmitted)       
-       do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
-      return;
-    }
-  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  GNUNET_assert (h->queue_head == qe);
-  if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
-    {
-      GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received end of result set\n");
-#endif
-      free_queue_entry (qe);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
-      process_queue (h);
-      return;
-    }
-  if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
-       (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
-    {
-      GNUNET_break (0);
-      free_queue_entry (qe);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      rc.iter (rc.iter_cls,
-              NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);   
-      return;
-    }
-  dm = (const struct DataMessage*) msg;
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received result %llu with type %u and size %u with key %s\n",
-             (unsigned long long) GNUNET_ntohll(dm->uid),
-             ntohl(dm->type),
-             ntohl(dm->size),
-             GNUNET_h2s(&dm->key));
-#endif
-  rc.iter (rc.iter_cls,
-          &dm->key,
-          ntohl(dm->size),
-          &dm[1],
-          ntohl(dm->type),
-          ntohl(dm->priority),
-          ntohl(dm->anonymity),
-          GNUNET_TIME_absolute_ntoh(dm->expiration),   
-          GNUNET_ntohll(dm->uid));
-}
-
-
-/**
- * Get a random value from the datastore.
+ * Get a single zero-anonymity value from the datastore.
  *
  * @param h handle to the datastore
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (modulo num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param type allowed type for the operation (never zero)
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        will be called once with a value (if available)
- *        and always once with a value of NULL.
- * @param iter_cls closure for iter
+ *        or with NULL if none value exists.
+ * @param proc_cls closure for @a proc
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
-                            unsigned int queue_priority,
-                            unsigned int max_queue_size,
-                            struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                            void *iter_cls)
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                     uint64_t offset,
+                                     unsigned int queue_priority,
+                                     unsigned int max_queue_size,
+                                     enum GNUNET_BLOCK_Type type,
+                                     GNUNET_DATASTORE_DatumProcessor proc,
+                                     void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct GNUNET_MessageHeader *m;
+  struct GNUNET_MQ_Envelope *env;
+  struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
   union QueueContext qc;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to get random entry in %llu ms\n",
-             (unsigned long long) timeout.value);
-#endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
-  qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
-                        queue_priority, max_queue_size, timeout,
-                        &process_result_message, &qc);
-  if (qe == NULL)
-    return NULL;    
-  m = (struct GNUNET_MessageHeader*) &qe[1];
-  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
-  m->size = htons(sizeof (struct GNUNET_MessageHeader));
+  GNUNET_assert (NULL != proc);
+  GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to get %llu-th zero-anonymity entry of type %d\n",
+       (unsigned long long) offset,
+       type);
+  env = GNUNET_MQ_msg (m,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
+  m->type = htonl ((uint32_t) type);
+  m->offset = GNUNET_htonll (offset);
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not create queue entry for zero-anonymity procation\n");
+    return NULL;
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# GET ZERO ANONYMITY requests executed"), 1,
+                            GNUNET_NO);
   process_queue (h);
   return qe;
 }
 
 
   process_queue (h);
   return qe;
 }
 
 
-
 /**
 /**
- * Iterate over the results for a particular key
- * in the datastore.  The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_get_next" with the given argument.
+ * Get a result for a particular key from the datastore.  The processor
+ * will only be called once.
  *
  * @param h handle to the datastore
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (modulo num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
- * @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
  *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for @a proc
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
-                      const GNUNET_HashCode * key,
-                     enum GNUNET_BLOCK_Type type,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                     struct GNUNET_TIME_Relative timeout,
-                      GNUNET_DATASTORE_Iterator iter, 
-                     void *iter_cls)
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
+                          uint64_t offset,
+                          const struct GNUNET_HashCode *key,
+                          enum GNUNET_BLOCK_Type type,
+                          unsigned int queue_priority,
+                          unsigned int max_queue_size,
+                          GNUNET_DATASTORE_DatumProcessor proc,
+                          void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GNUNET_MQ_Envelope *env;
+  struct GetKeyMessage *gkm;
   struct GetMessage *gm;
   union QueueContext qc;
 
   struct GetMessage *gm;
   union QueueContext qc;
 
-#if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to look for data of type %u under key `%s'\n",
-             (unsigned int) type,
-             GNUNET_h2s (key));
-#endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
-  qe = make_queue_entry (h, sizeof(struct GetMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_result_message, &qc);
-  if (qe == NULL)
-    return NULL;
-  gm = (struct GetMessage*) &qe[1];
-  gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
-  gm->type = htonl(type);
-  if (key != NULL)
-    {
-      gm->header.size = htons(sizeof (struct GetMessage));
-      gm->key = *key;
-    }
+  GNUNET_assert (NULL != proc);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Asked to look for data of type %u under key `%s'\n",
+       (unsigned int) type,
+       GNUNET_h2s (key));
+  if (NULL == key)
+  {
+    env = GNUNET_MQ_msg (gm,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_GET);
+    gm->type = htonl (type);
+    gm->offset = GNUNET_htonll (offset);
+  }
   else
   else
-    {
-      gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
-    }
+  {
+    env = GNUNET_MQ_msg (gkm,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
+    gkm->type = htonl (type);
+    gkm->offset = GNUNET_htonll (offset);
+    gkm->key = *key;
+  }
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h,
+                         env,
+                         queue_priority,
+                         max_queue_size,
+                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                         &qc);
+  if (NULL == qe)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Could not queue request for `%s'\n",
+         GNUNET_h2s (key));
+    return NULL;
+  }
+#if INSANE_STATISTICS
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# GET requests executed"),
+                            1,
+                            GNUNET_NO);
+#endif
   process_queue (h);
   return qe;
 }
 
 
   process_queue (h);
   return qe;
 }
 
 
-/**
- * Function called to trigger obtaining the next result
- * from the datastore.
- * 
- * @param h handle to the datastore
- * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
- *        iteration (with a final call to "iter" with key/data == NULL).
- */
-void 
-GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
-                          int more)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-  struct ResultContext rc = qe->qc.rc;
-
-  GNUNET_assert (NULL != qe);
-  GNUNET_assert (&process_result_message == qe->response_proc);
-  if (GNUNET_YES == more)
-    {     
-      GNUNET_CLIENT_receive (h->client,
-                            qe->response_proc,
-                            qe,
-                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
-      return;
-    }
-  free_queue_entry (qe);
-  h->retry_time = GNUNET_TIME_UNIT_ZERO;
-  do_disconnect (h);
-  rc.iter (rc.iter_cls,
-          NULL, 0, NULL, 0, 0, 0, 
-          GNUNET_TIME_UNIT_ZERO_ABS, 0);       
-}
-
-
 /**
  * Cancel a datastore operation.  The final callback from the
  * operation must not have been done yet.
 /**
  * Cancel a datastore operation.  The final callback from the
  * operation must not have been done yet.
- * 
+ *
  * @param qe operation to cancel
  */
 void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
  * @param qe operation to cancel
  */
 void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
-  struct GNUNET_DATASTORE_Handle *h;
-  int reconnect;
+  struct GNUNET_DATASTORE_Handle *h = qe->h;
 
 
-  h = qe->h;
-  reconnect = qe->was_transmitted;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Pending DATASTORE request %p cancelled (%d, %d)\n",
+       qe,
+       NULL == qe->env,
+       h->queue_head == qe);
+  if (NULL == qe->env)
+  {
+    free_queue_entry (qe);
+    h->skip_next_messages++;
+    return;
+  }
   free_queue_entry (qe);
   free_queue_entry (qe);
-  h->queue_size--;
-  if (reconnect)
-    {
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-    }
+  process_queue (h);
 }
 
 
 }