better reporting
[oweals/gnunet.git] / src / datastore / datastore_api.c
index 2218fffc3eb56e7c5813ee85dbcdecc3bf24c1d7..f8826ce667bd7762423e4b9c2fdcafe031460ee3 100644 (file)
@@ -1,10 +1,10 @@
 /*
      This file is part of GNUnet
 /*
      This file is part of GNUnet
-     (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 2, or (at your
+     by the Free Software Foundation; either version 3, or (at your
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
 #include "gnunet_arm_service.h"
 #include "gnunet_constants.h"
 #include "gnunet_datastore_service.h"
 #include "gnunet_arm_service.h"
 #include "gnunet_constants.h"
 #include "gnunet_datastore_service.h"
+#include "gnunet_statistics_service.h"
 #include "datastore.h"
 
 #include "datastore.h"
 
+/**
+ * If a client stopped asking for more results, how many more do
+ * we receive from the DB before killing the connection?  Trade-off
+ * between re-doing TCP handshakes and (needlessly) receiving
+ * useless results.
+ */
+#define MAX_EXCESS_RESULTS 8
 
 /**
  * Context for processing status messages.
 
 /**
  * Context for processing status messages.
@@ -55,14 +63,14 @@ struct StatusContext
 struct ResultContext
 {
   /**
 struct ResultContext
 {
   /**
-   * Iterator to call with the result.
+   * Function to call with the result.
    */
    */
-  GNUNET_DATASTORE_Iterator iter;
+  GNUNET_DATASTORE_DatumProcessor proc;
 
   /**
 
   /**
-   * Closure for iter.
+   * Closure for proc.
    */
    */
-  void *iter_cls;
+  void *proc_cls;
 
 };
 
 
 };
 
@@ -74,7 +82,7 @@ union QueueContext
 {
 
   struct StatusContext sc;
 {
 
   struct StatusContext sc;
-  
+
   struct ResultContext rc;
 
 };
   struct ResultContext rc;
 
 };
@@ -113,7 +121,7 @@ struct GNUNET_DATASTORE_QueueEntry
    * Function to call after transmission of the request.
    */
   GNUNET_DATASTORE_ContinuationWithStatus cont;
    * Function to call after transmission of the request.
    */
   GNUNET_DATASTORE_ContinuationWithStatus cont;
-   
+
   /**
    * Closure for 'cont'.
    */
   /**
    * Closure for 'cont'.
    */
@@ -155,15 +163,15 @@ struct GNUNET_DATASTORE_QueueEntry
   /**
    * Has this message been transmitted to the service?
    * Only ever GNUNET_YES for the head of the queue.
   /**
    * Has this message been transmitted to the service?
    * Only ever GNUNET_YES for the head of the queue.
-   * Note that the overall struct should end at a 
+   * Note that the overall struct should end at a
    * multiple of 64 bits.
    */
    * multiple of 64 bits.
    */
-  int32_t was_transmitted;
+  int was_transmitted;
 
 };
 
 /**
 
 };
 
 /**
- * Handle to the datastore service. 
+ * Handle to the datastore service.
  */
 struct GNUNET_DATASTORE_Handle
 {
  */
 struct GNUNET_DATASTORE_Handle
 {
@@ -174,14 +182,14 @@ struct GNUNET_DATASTORE_Handle
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Our scheduler.
+   * Current connection to the datastore service.
    */
    */
-  struct GNUNET_SCHEDULER_Handle *sched;
+  struct GNUNET_CLIENT_Connection *client;
 
   /**
 
   /**
-   * Current connection to the datastore service.
+   * Handle for statistics.
    */
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_STATISTICS_Handle *stats;
 
   /**
    * Current transmit handle.
 
   /**
    * Current transmit handle.
@@ -214,11 +222,23 @@ struct GNUNET_DATASTORE_Handle
    */
   unsigned int queue_size;
 
    */
   unsigned int queue_size;
 
+  /**
+   * Number of results we're receiving for the current query
+   * after application stopped to care.  Used to determine when
+   * to reset the connection.
+   */
+  unsigned int result_count;
+
   /**
    * Are we currently trying to receive from the service?
    */
   int in_receive;
 
   /**
    * Are we currently trying to receive from the service?
    */
   int in_receive;
 
+  /**
+   * We should ignore the next message(s) from the service.
+   */
+  unsigned int skip_next_messages;
+
 };
 
 
 };
 
 
@@ -227,28 +247,22 @@ struct GNUNET_DATASTORE_Handle
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
  * Connect to the datastore service.
  *
  * @param cfg configuration to use
- * @param sched scheduler to use
  * @return handle to use to access the service
  */
 struct GNUNET_DATASTORE_Handle *
  * @return handle to use to access the service
  */
 struct GNUNET_DATASTORE_Handle *
-GNUNET_DATASTORE_connect (const struct
-                         GNUNET_CONFIGURATION_Handle
-                         *cfg,
-                         struct
-                         GNUNET_SCHEDULER_Handle
-                         *sched)
+GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   struct GNUNET_CLIENT_Connection *c;
   struct GNUNET_DATASTORE_Handle *h;
 {
   struct GNUNET_CLIENT_Connection *c;
   struct GNUNET_DATASTORE_Handle *h;
-  
-  c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
+
+  c = GNUNET_CLIENT_connect ("datastore", cfg);
   if (c == NULL)
   if (c == NULL)
-    return NULL; /* oops */
-  h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
-                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
+    return NULL;                /* oops */
+  h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
+                     GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
   h->client = c;
   h->cfg = cfg;
   h->client = c;
   h->cfg = cfg;
-  h->sched = sched;
+  h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
   return h;
 }
 
   return h;
 }
 
@@ -262,26 +276,24 @@ GNUNET_DATASTORE_connect (const struct
  * @return number of bytes written to buf
  */
 static size_t
  * @return number of bytes written to buf
  */
 static size_t
-transmit_drop (void *cls,
-              size_t size, 
-              void *buf)
+transmit_drop (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_MessageHeader *hdr;
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_MessageHeader *hdr;
-  
+
   if (buf == NULL)
   if (buf == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to drop database.\n"));
-      GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
-      return 0;
-    }
-  GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                _("Failed to transmit request to drop database.\n"));
+    GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
+    return 0;
+  }
+  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
   hdr = buf;
   hdr = buf;
-  hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
-  hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
+  hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
+  hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
-  return sizeof(struct GNUNET_MessageHeader);
+  return sizeof (struct GNUNET_MessageHeader);
 }
 
 
 }
 
 
@@ -292,44 +304,53 @@ transmit_drop (void *cls,
  * @param h handle to the datastore
  * @param drop set to GNUNET_YES to delete all data in datastore (!)
  */
  * @param h handle to the datastore
  * @param drop set to GNUNET_YES to delete all data in datastore (!)
  */
-void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
-                                 int drop)
+void
+GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
+#endif
+  if (NULL != h->th)
+  {
+    GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
+    h->th = NULL;
+  }
   if (h->client != NULL)
   if (h->client != NULL)
-    {
-      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-      h->client = NULL;
-    }
+  {
+    GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+    h->client = NULL;
+  }
   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              h->reconnect_task);
-      h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-    }
+  {
+    GNUNET_SCHEDULER_cancel (h->reconnect_task);
+    h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+  }
   while (NULL != (qe = h->queue_head))
   while (NULL != (qe = h->queue_head))
+  {
+    GNUNET_assert (NULL != qe->response_proc);
+    qe->response_proc (h, NULL);
+  }
+  if (GNUNET_YES == drop)
+  {
+    h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
+    if (h->client != NULL)
     {
     {
-      GNUNET_assert (NULL != qe->response_proc);
-      qe->response_proc (qe, NULL);
-    }
-  if (GNUNET_YES == drop) 
-    {
-      h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
-      if (h->client != NULL)
-       {
-         if (NULL != 
-             GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                                  sizeof(struct GNUNET_MessageHeader),
-                                                  GNUNET_TIME_UNIT_MINUTES,
-                                                  GNUNET_YES,
-                                                  &transmit_drop,
-                                                  h))
-           return;
-         GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
-       }
-      GNUNET_break (0);
+      if (NULL !=
+          GNUNET_CLIENT_notify_transmit_ready (h->client,
+                                               sizeof (struct
+                                                       GNUNET_MessageHeader),
+                                               GNUNET_TIME_UNIT_MINUTES,
+                                               GNUNET_YES, &transmit_drop, h))
+        return;
+      GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+      h->client = NULL;
     }
     }
+    GNUNET_break (0);
+  }
+  GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
+  h->stats = NULL;
   GNUNET_free (h);
 }
 
   GNUNET_free (h);
 }
 
@@ -341,14 +362,20 @@ void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
  * @param tc scheduler context
  */
 static void
  * @param tc scheduler context
  */
 static void
-timeout_queue_entry (void *cls,
-                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
 
 {
   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
 
+  GNUNET_STATISTICS_update (qe->h->stats,
+                            gettext_noop ("# queue entry timeouts"), 1,
+                            GNUNET_NO);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
-  qe->response_proc (qe, NULL);
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Timeout of request in datastore queue\n");
+#endif
+  qe->response_proc (qe->h, NULL);
 }
 
 
 }
 
 
@@ -364,16 +391,14 @@ timeout_queue_entry (void *cls,
  * @param timeout timeout for the operation
  * @param response_proc function to call with replies (can be NULL)
  * @param qc client context (NOT a closure for response_proc)
  * @param timeout timeout for the operation
  * @param response_proc function to call with replies (can be NULL)
  * @param qc client context (NOT a closure for response_proc)
- * @return NULL if the queue is full (and this entry was dropped)
+ * @return NULL if the queue is full
  */
 static struct GNUNET_DATASTORE_QueueEntry *
  */
 static struct GNUNET_DATASTORE_QueueEntry *
-make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
-                 size_t msize,
-                 unsigned int queue_priority,
-                 unsigned int max_queue_size,
-                 struct GNUNET_TIME_Relative timeout,
-                 GNUNET_CLIENT_MessageHandler response_proc,            
-                 const union QueueContext *qc)
+make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
+                  unsigned int queue_priority, unsigned int max_queue_size,
+                  struct GNUNET_TIME_Relative timeout,
+                  GNUNET_CLIENT_MessageHandler response_proc,
+                  const union QueueContext *qc)
 {
   struct GNUNET_DATASTORE_QueueEntry *ret;
   struct GNUNET_DATASTORE_QueueEntry *pos;
 {
   struct GNUNET_DATASTORE_QueueEntry *ret;
   struct GNUNET_DATASTORE_QueueEntry *pos;
@@ -381,13 +406,18 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
 
   c = 0;
   pos = h->queue_head;
 
   c = 0;
   pos = h->queue_head;
-  while ( (pos != NULL) &&
-         (c < max_queue_size) &&
-         (pos->priority >= queue_priority) )
-    {
-      c++;
-      pos = pos->next;
-    }
+  while ((pos != NULL) && (c < max_queue_size) &&
+         (pos->priority >= queue_priority))
+  {
+    c++;
+    pos = pos->next;
+  }
+  if (c >= max_queue_size)
+  {
+    GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
+                              GNUNET_NO);
+    return NULL;
+  }
   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
   ret->h = h;
   ret->response_proc = response_proc;
   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
   ret->h = h;
   ret->response_proc = response_proc;
@@ -398,45 +428,48 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
   ret->message_size = msize;
   ret->was_transmitted = GNUNET_NO;
   if (pos == NULL)
   ret->message_size = msize;
   ret->was_transmitted = GNUNET_NO;
   if (pos == NULL)
-    {
-      /* append at the tail */
-      pos = h->queue_tail;
-    }
+  {
+    /* append at the tail */
+    pos = h->queue_tail;
+  }
   else
   else
-    {
-      pos = pos->prev; 
-      /* do not insert at HEAD if HEAD query was already
-        transmitted and we are still receiving replies! */
-      if ( (pos == NULL) &&
-          (h->queue_head->was_transmitted) )
-       pos = h->queue_head;
-    }
+  {
+    pos = pos->prev;
+    /* do not insert at HEAD if HEAD query was already
+     * transmitted and we are still receiving replies! */
+    if ((pos == NULL) && (h->queue_head->was_transmitted))
+      pos = h->queue_head;
+  }
   c++;
   c++;
-  GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
-                                    h->queue_tail,
-                                    pos,
-                                    ret);
+  GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
+                            1, GNUNET_NO);
+  GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
   h->queue_size++;
   h->queue_size++;
-  if (c > max_queue_size)
-    {
-      response_proc (ret, NULL);
-      return NULL;
-    }
-  ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                           timeout,
-                                           &timeout_queue_entry,
-                                           ret);
+  ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
   pos = ret->next;
   pos = ret->next;
-  while (pos != NULL) 
+  while (pos != NULL)
+  {
+    if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
     {
     {
-      if (pos->max_queue < h->queue_size)
-       {
-         GNUNET_assert (pos->response_proc != NULL);
-         pos->response_proc (pos, NULL);
-         break;
-       }
-      pos = pos->next;
+      GNUNET_assert (pos->response_proc != NULL);
+      /* move 'pos' element to head so that it will be
+       * killed on 'NULL' call below */
+#if DEBUG_DATASTORE
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Dropping request from datastore queue\n");
+#endif
+      GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
+      GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
+      GNUNET_STATISTICS_update (h->stats,
+                                gettext_noop
+                                ("# Requests dropped from datastore queue"), 1,
+                                GNUNET_NO);
+      GNUNET_assert (h->queue_head == pos);
+      pos->response_proc (h, NULL);
+      break;
     }
     }
+    pos = pos->next;
+  }
   return ret;
 }
 
   return ret;
 }
 
@@ -444,7 +477,7 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
- * 
+ *
  * @param h handle to the datastore
  */
 static void
  * @param h handle to the datastore
  */
 static void
@@ -458,28 +491,30 @@ process_queue (struct GNUNET_DATASTORE_Handle *h);
  * @param tc scheduler context
  */
 static void
  * @param tc scheduler context
  */
 static void
-try_reconnect (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
+try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
 
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
 
-  if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
+  if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
   else
     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
   else
     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
-  if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
+  if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
+  h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
   if (h->client == NULL)
   if (h->client == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 "DATASTORE reconnect failed (fatally)\n");
-      return;
-    }
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "DATASTORE reconnect failed (fatally)\n");
+    return;
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# datastore connections (re)created"), 1,
+                            GNUNET_NO);
 #if DEBUG_DATASTORE
 #if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Reconnected to DATASTORE\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
 #endif
   process_queue (h);
 }
 #endif
   process_queue (h);
 }
@@ -495,25 +530,55 @@ static void
 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
   if (h->client == NULL)
 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
 {
   if (h->client == NULL)
-    {
+  {
 #if DEBUG_DATASTORE
 #if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "client NULL in disconnect, will not try to reconnect\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "client NULL in disconnect, will not try to reconnect\n");
 #endif
 #endif
-      return;
-    }
+    return;
+  }
 #if 0
 #if 0
-  GNUNET_STATISTICS_update (stats,
-                           gettext_noop ("# reconnected to DATASTORE"),
-                           1,
-                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
+                            1, GNUNET_NO);
 #endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
 #endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+  h->skip_next_messages = 0;
   h->client = NULL;
   h->client = NULL;
-  h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
-                                                   h->retry_time,
-                                                   &try_reconnect,
-                                                   h);      
+  h->reconnect_task =
+      GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
+}
+
+
+/**
+ * Function called whenever we receive a message from
+ * the service.  Calls the appropriate handler.
+ *
+ * @param cls the 'struct GNUNET_DATASTORE_Handle'
+ * @param msg the received message
+ */
+static void
+receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  h->in_receive = GNUNET_NO;
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
+#endif
+  if (h->skip_next_messages > 0)
+  {
+    h->skip_next_messages--;
+    process_queue (h);
+    return;
+  }
+  if (NULL == (qe = h->queue_head))
+  {
+    GNUNET_break (0);
+    process_queue (h);
+    return;
+  }
+  qe->response_proc (h, msg);
 }
 
 
 }
 
 
@@ -526,9 +591,7 @@ do_disconnect (struct GNUNET_DATASTORE_Handle *h)
  * @return number of bytes written to buf
  */
 static size_t
  * @return number of bytes written to buf
  */
 static size_t
-transmit_request (void *cls,
-                 size_t size, 
-                 void *buf)
+transmit_request (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -536,34 +599,37 @@ transmit_request (void *cls,
 
   h->th = NULL;
   if (NULL == (qe = h->queue_head))
 
   h->th = NULL;
   if (NULL == (qe = h->queue_head))
-    return 0; /* no entry in queue */
+    return 0;                   /* no entry in queue */
   if (buf == NULL)
   if (buf == NULL)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                 _("Failed to transmit request to DATASTORE.\n"));
-      do_disconnect (h);
-      return 0;
-    }
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                _("Failed to transmit request to DATASTORE.\n"));
+    GNUNET_STATISTICS_update (h->stats,
+                              gettext_noop ("# transmission request failures"),
+                              1, GNUNET_NO);
+    do_disconnect (h);
+    return 0;
+  }
   if (size < (msize = qe->message_size))
   if (size < (msize = qe->message_size))
-    {
-      process_queue (h);
-      return 0;
-    }
- #if DEBUG_DATASTORE
+  {
+    process_queue (h);
+    return 0;
+  }
+#if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitting %u byte request to DATASTORE\n",
-             msize);
+              "Transmitting %u byte request to DATASTORE\n", msize);
 #endif
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
 #endif
   memcpy (buf, &qe[1], msize);
   qe->was_transmitted = GNUNET_YES;
-  GNUNET_SCHEDULER_cancel (h->sched,
-                          qe->task);
+  GNUNET_SCHEDULER_cancel (qe->task);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
   qe->task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (GNUNET_NO == h->in_receive);
   h->in_receive = GNUNET_YES;
   h->in_receive = GNUNET_YES;
-  GNUNET_CLIENT_receive (h->client,
-                        qe->response_proc,
-                        qe,
-                        GNUNET_TIME_absolute_get_remaining (qe->timeout));
+  GNUNET_CLIENT_receive (h->client, &receive_cb, h,
+                         GNUNET_TIME_absolute_get_remaining (qe->timeout));
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# bytes sent to datastore"), 1,
+                            GNUNET_NO);
   return msize;
 }
 
   return msize;
 }
 
@@ -571,7 +637,7 @@ transmit_request (void *cls,
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
 /**
  * Process entries in the queue (or do nothing if we are already
  * doing so).
- * 
+ *
  * @param h handle to the datastore
  */
 static void
  * @param h handle to the datastore
  */
 static void
@@ -580,48 +646,49 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   if (NULL == (qe = h->queue_head))
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   if (NULL == (qe = h->queue_head))
-    {
+  {
 #if DEBUG_DATASTORE > 1
 #if DEBUG_DATASTORE > 1
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Queue empty\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
 #endif
 #endif
-      return; /* no entry in queue */
-    }
+    return;                     /* no entry in queue */
+  }
   if (qe->was_transmitted == GNUNET_YES)
   if (qe->was_transmitted == GNUNET_YES)
-    {
+  {
 #if DEBUG_DATASTORE > 1
 #if DEBUG_DATASTORE > 1
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Head request already transmitted\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
 #endif
 #endif
-      return; /* waiting for replies */
-    }
+    return;                     /* waiting for replies */
+  }
   if (h->th != NULL)
   if (h->th != NULL)
-    {
+  {
 #if DEBUG_DATASTORE > 1
 #if DEBUG_DATASTORE > 1
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Pending transmission request\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
 #endif
 #endif
-      return; /* request pending */
-    }
+    return;                     /* request pending */
+  }
   if (h->client == NULL)
   if (h->client == NULL)
-    {
+  {
 #if DEBUG_DATASTORE > 1
 #if DEBUG_DATASTORE > 1
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Not connected\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
 #endif
 #endif
-      return; /* waiting for reconnect */
-    }
+    return;                     /* waiting for reconnect */
+  }
+  if (GNUNET_YES == h->in_receive)
+  {
+    /* wait for response to previous query */
+    return;
+  }
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Queueing %u byte request to DATASTORE\n",
-             qe->message_size);
+              "Queueing %u byte request to DATASTORE\n", qe->message_size);
 #endif
 #endif
-  h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
-                                              qe->message_size,
-                                              GNUNET_TIME_absolute_get_remaining (qe->timeout),
-                                              GNUNET_YES,
-                                              &transmit_request,
-                                              h);
+  h->th =
+      GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
+                                           GNUNET_TIME_absolute_get_remaining
+                                           (qe->timeout), GNUNET_YES,
+                                           &transmit_request, h);
+  GNUNET_assert (GNUNET_NO == h->in_receive);
+  GNUNET_break (NULL != h->th);
 }
 
 
 }
 
 
@@ -629,34 +696,40 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
  * Dummy continuation used to do nothing (but be non-zero).
  *
  * @param cls closure
  * Dummy continuation used to do nothing (but be non-zero).
  *
  * @param cls closure
- * @param result result 
+ * @param result result
  * @param emsg error message
  */
 static void
  * @param emsg error message
  */
 static void
-drop_status_cont (void *cls, int result, const char *emsg)
+drop_status_cont (void *cls, int32_t result, const char *emsg)
 {
   /* do nothing */
 }
 
 
 {
   /* do nothing */
 }
 
 
+/**
+ * Free a queue entry.  Removes the given entry from the
+ * queue and releases associated resources.  Does NOT
+ * call the callback.
+ *
+ * @param qe entry to free.
+ */
 static void
 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h = qe->h;
 
 static void
 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h = qe->h;
 
-  GNUNET_CONTAINER_DLL_remove (h->queue_head,
-                              h->queue_tail,
-                              qe);
+  GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (h->sched,
-                              qe->task);
-      qe->task = GNUNET_SCHEDULER_NO_TASK;
-    }
+  {
+    GNUNET_SCHEDULER_cancel (qe->task);
+    qe->task = GNUNET_SCHEDULER_NO_TASK;
+  }
   h->queue_size--;
   h->queue_size--;
+  qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
   GNUNET_free (qe);
 }
 
   GNUNET_free (qe);
 }
 
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -664,76 +737,78 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
  * @param cls closure
  * @param msg message received, NULL on timeout or fatal error
  */
  * @param cls closure
  * @param msg message received, NULL on timeout or fatal error
  */
-static void 
-process_status_message (void *cls,
-                       const struct
-                       GNUNET_MessageHeader * msg)
+static void
+process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
 {
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct StatusContext rc = qe->qc.sc;
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct StatusContext rc;
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
   int was_transmitted;
 
   const struct StatusMessage *sm;
   const char *emsg;
   int32_t status;
   int was_transmitted;
 
-  h->in_receive = GNUNET_NO;
-  was_transmitted = qe->was_transmitted;
+  if (NULL == (qe = h->queue_head))
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return;
+  }
+  rc = qe->qc.sc;
   if (msg == NULL)
   if (msg == NULL)
-    {      
-      free_queue_entry (qe);
-      if (NULL == h->client)
-       return; /* forced disconnect */
-      if (was_transmitted == GNUNET_YES)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Failed to receive response from database.\n"));
-         do_disconnect (h);
-       }
-      return;
-    }
+  {
+    was_transmitted = qe->was_transmitted;
+    free_queue_entry (qe);
+    if (was_transmitted == GNUNET_YES)
+      do_disconnect (h);
+    else
+      process_queue (h);
+    if (rc.cont != NULL)
+      rc.cont (rc.cont_cls, GNUNET_SYSERR,
+               _("Failed to receive status response from database."));
+    return;
+  }
   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  GNUNET_assert (h->queue_head == qe);
   free_queue_entry (qe);
   free_queue_entry (qe);
-  if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
-    {
-      GNUNET_break (0);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      rc.cont (rc.cont_cls, 
-              GNUNET_SYSERR,
-              _("Error reading response from datastore service"));
-      return;
-    }
-  sm = (const struct StatusMessage*) msg;
-  status = ntohl(sm->status);
+  if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
+      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
+  {
+    GNUNET_break (0);
+    h->retry_time = GNUNET_TIME_UNIT_ZERO;
+    do_disconnect (h);
+    if (rc.cont != NULL)
+      rc.cont (rc.cont_cls, GNUNET_SYSERR,
+               _("Error reading response from datastore service"));
+    return;
+  }
+  sm = (const struct StatusMessage *) msg;
+  status = ntohl (sm->status);
   emsg = NULL;
   emsg = NULL;
-  if (ntohs(msg->size) > sizeof(struct StatusMessage))
-    {
-      emsg = (const char*) &sm[1];
-      if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
-       {
-         GNUNET_break (0);
-         emsg = _("Invalid error message received from datastore service");
-       }
-    }  
-  if ( (status == GNUNET_SYSERR) &&
-       (emsg == NULL) )
+  if (ntohs (msg->size) > sizeof (struct StatusMessage))
+  {
+    emsg = (const char *) &sm[1];
+    if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
     {
       GNUNET_break (0);
       emsg = _("Invalid error message received from datastore service");
     }
     {
       GNUNET_break (0);
       emsg = _("Invalid error message received from datastore service");
     }
+  }
+  if ((status == GNUNET_SYSERR) && (emsg == NULL))
+  {
+    GNUNET_break (0);
+    emsg = _("Invalid error message received from datastore service");
+  }
 #if DEBUG_DATASTORE
 #if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received status %d/%s\n",
-             (int) status,
-             emsg);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status,
+              emsg);
 #endif
 #endif
-  rc.cont (rc.cont_cls, 
-          status,
-          emsg);
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# status messages received"), 1,
+                            GNUNET_NO);
+  h->retry_time.rel_value = 0;
   process_queue (h);
   process_queue (h);
+  if (rc.cont != NULL)
+    rc.cont (rc.cont_cls, status, emsg);
 }
 
 
 }
 
 
@@ -751,6 +826,7 @@ process_status_message (void *cls,
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
  * @param type type of the content
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
+ * @param replication how often should the content be replicated to other peers?
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  * @param expiration expiration time for the content
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
@@ -763,20 +839,16 @@ process_status_message (void *cls,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
-                     int rid,
-                      const GNUNET_HashCode * key,
-                      uint32_t size,
-                      const void *data,
-                      enum GNUNET_BLOCK_Type type,
-                      uint32_t priority,
-                      uint32_t anonymity,
+GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
+                      const GNUNET_HashCode * key, size_t size,
+                      const void *data, enum GNUNET_BLOCK_Type type,
+                      uint32_t priority, uint32_t anonymity,
+                      uint32_t replication,
                       struct GNUNET_TIME_Absolute expiration,
                       struct GNUNET_TIME_Absolute expiration,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
+                      unsigned int queue_priority, unsigned int max_queue_size,
                       struct GNUNET_TIME_Relative timeout,
                       struct GNUNET_TIME_Relative timeout,
-                     GNUNET_DATASTORE_ContinuationWithStatus cont,
-                     void *cont_cls)
+                      GNUNET_DATASTORE_ContinuationWithStatus cont,
+                      void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct DataMessage *dm;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct DataMessage *dm;
@@ -785,29 +857,38 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
 
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to put %u bytes of data under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
+              "Asked to put %u bytes of data under key `%s' for %llu ms\n",
+              size, GNUNET_h2s (key),
+              GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
 #endif
 #endif
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  msize = sizeof (struct DataMessage) + size;
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, msize,
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
+  qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
+                         &process_status_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry for PUT\n");
+#endif
     return NULL;
     return NULL;
-  dm = (struct DataMessage* ) &qe[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(rid);
-  dm->size = htonl(size);
-  dm->type = htonl(type);
-  dm->priority = htonl(priority);
-  dm->anonymity = htonl(anonymity);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(expiration);
+  }
+  GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
+                            1, GNUNET_NO);
+  dm = (struct DataMessage *) &qe[1];
+  dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
+  dm->header.size = htons (msize);
+  dm->rid = htonl (rid);
+  dm->size = htonl ((uint32_t) size);
+  dm->type = htonl (type);
+  dm->priority = htonl (priority);
+  dm->anonymity = htonl (anonymity);
+  dm->replication = htonl (replication);
+  dm->reserved = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
   dm->key = *key;
   memcpy (&dm[1], data, size);
   process_queue (h);
   dm->key = *key;
   memcpy (&dm[1], data, size);
   process_queue (h);
@@ -835,14 +916,12 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
-                         uint64_t amount,
-                         uint32_t entries,
-                         unsigned int queue_priority,
-                         unsigned int max_queue_size,
-                         struct GNUNET_TIME_Relative timeout,
-                         GNUNET_DATASTORE_ContinuationWithStatus cont,
-                         void *cont_cls)
+GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
+                          uint32_t entries, unsigned int queue_priority,
+                          unsigned int max_queue_size,
+                          struct GNUNET_TIME_Relative timeout,
+                          GNUNET_DATASTORE_ContinuationWithStatus cont,
+                          void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ReserveMessage *rm;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ReserveMessage *rm;
@@ -852,22 +931,29 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to reserve %llu bytes of data and %u entries'\n",
-             (unsigned long long) amount,
-             (unsigned int) entries);
+              "Asked to reserve %llu bytes of data and %u entries\n",
+              (unsigned long long) amount, (unsigned int) entries);
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct ReserveMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
+  qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
+                         max_queue_size, timeout, &process_status_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry to reserve\n");
+#endif
     return NULL;
     return NULL;
-  rm = (struct ReserveMessage*) &qe[1];
-  rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
-  rm->header.size = htons(sizeof (struct ReserveMessage));
-  rm->entries = htonl(entries);
-  rm->amount = GNUNET_htonll(amount);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# RESERVE requests executed"), 1,
+                            GNUNET_NO);
+  rm = (struct ReserveMessage *) &qe[1];
+  rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
+  rm->header.size = htons (sizeof (struct ReserveMessage));
+  rm->entries = htonl (entries);
+  rm->amount = GNUNET_htonll (amount);
   process_queue (h);
   return qe;
 }
   process_queue (h);
   return qe;
 }
@@ -896,12 +982,11 @@ GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
-                                 int rid,
-                                 unsigned int queue_priority,
-                                 unsigned int max_queue_size,
-                                 struct GNUNET_TIME_Relative timeout,
-                                 GNUNET_DATASTORE_ContinuationWithStatus cont,
-                                 void *cont_cls)
+                                  uint32_t rid, unsigned int queue_priority,
+                                  unsigned int max_queue_size,
+                                  struct GNUNET_TIME_Relative timeout,
+                                  GNUNET_DATASTORE_ContinuationWithStatus cont,
+                                  void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ReleaseReserveMessage *rrm;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ReleaseReserveMessage *rrm;
@@ -910,21 +995,29 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
   if (cont == NULL)
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   if (cont == NULL)
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to release reserve %d\n",
-             rid);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
+  qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
+                         queue_priority, max_queue_size, timeout,
+                         &process_status_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry to release reserve\n");
+#endif
     return NULL;
     return NULL;
-  rrm = (struct ReleaseReserveMessage*) &qe[1];
-  rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
-  rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
-  rrm->rid = htonl(rid);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# RELEASE RESERVE requests executed"), 1,
+                            GNUNET_NO);
+  rrm = (struct ReleaseReserveMessage *) &qe[1];
+  rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
+  rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
+  rrm->rid = htonl (rid);
   process_queue (h);
   return qe;
 }
   process_queue (h);
   return qe;
 }
@@ -948,15 +1041,14 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
  *         (or rather, will already have been invoked)
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
-                        unsigned long long uid,
-                        uint32_t priority,
-                        struct GNUNET_TIME_Absolute expiration,
-                        unsigned int queue_priority,
-                        unsigned int max_queue_size,
-                        struct GNUNET_TIME_Relative timeout,
-                        GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls)
+GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
+                         uint32_t priority,
+                         struct GNUNET_TIME_Absolute expiration,
+                         unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         struct GNUNET_TIME_Relative timeout,
+                         GNUNET_DATASTORE_ContinuationWithStatus cont,
+                         void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct UpdateMessage *um;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct UpdateMessage *um;
@@ -966,24 +1058,31 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
-             uid,
-             (unsigned int) priority,
-             (unsigned long long) expiration.value);
+              "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
+              uid, (unsigned int) priority,
+              (unsigned long long) expiration.abs_value);
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h, sizeof(struct UpdateMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
+  qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
+                         max_queue_size, timeout, &process_status_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry for UPDATE\n");
+#endif
     return NULL;
     return NULL;
-  um = (struct UpdateMessage*) &qe[1];
-  um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
-  um->header.size = htons(sizeof (struct UpdateMessage));
-  um->priority = htonl(priority);
-  um->expiration = GNUNET_TIME_absolute_hton(expiration);
-  um->uid = GNUNET_htonll(uid);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# UPDATE requests executed"), 1,
+                            GNUNET_NO);
+  um = (struct UpdateMessage *) &qe[1];
+  um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
+  um->header.size = htons (sizeof (struct UpdateMessage));
+  um->priority = htonl (priority);
+  um->expiration = GNUNET_TIME_absolute_hton (expiration);
+  um->uid = GNUNET_htonll (uid);
   process_queue (h);
   return qe;
 }
   process_queue (h);
   return qe;
 }
@@ -1012,14 +1111,12 @@ GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
-                         const GNUNET_HashCode *key,
-                         uint32_t size, 
-                        const void *data,
-                        unsigned int queue_priority,
-                        unsigned int max_queue_size,
-                        struct GNUNET_TIME_Relative timeout,
-                        GNUNET_DATASTORE_ContinuationWithStatus cont,
-                        void *cont_cls)
+                         const GNUNET_HashCode * key, size_t size,
+                         const void *data, unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         struct GNUNET_TIME_Relative timeout,
+                         GNUNET_DATASTORE_ContinuationWithStatus cont,
+                         void *cont_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct DataMessage *dm;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct DataMessage *dm;
@@ -1030,29 +1127,36 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
     cont = &drop_status_cont;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to remove %u bytes under key `%s'\n",
-             size,
-             GNUNET_h2s (key));
+              "Asked to remove %u bytes under key `%s'\n", size,
+              GNUNET_h2s (key));
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
 #endif
   qc.sc.cont = cont;
   qc.sc.cont_cls = cont_cls;
-  msize = sizeof(struct DataMessage) + size;
-  GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
-  qe = make_queue_entry (h, msize,
-                        queue_priority, max_queue_size, timeout,
-                        &process_status_message, &qc);
+  msize = sizeof (struct DataMessage) + size;
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
+                         &process_status_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry for REMOVE\n");
+#endif
     return NULL;
     return NULL;
-  dm = (struct DataMessage*) &qe[1];
-  dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
-  dm->header.size = htons(msize);
-  dm->rid = htonl(0);
-  dm->size = htonl(size);
-  dm->type = htonl(0);
-  dm->priority = htonl(0);
-  dm->anonymity = htonl(0);
-  dm->uid = GNUNET_htonll(0);
-  dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop ("# REMOVE requests executed"), 1,
+                            GNUNET_NO);
+  dm = (struct DataMessage *) &qe[1];
+  dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
+  dm->header.size = htons (msize);
+  dm->rid = htonl (0);
+  dm->size = htonl (size);
+  dm->type = htonl (0);
+  dm->priority = htonl (0);
+  dm->anonymity = htonl (0);
+  dm->uid = GNUNET_htonll (0);
+  dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
   dm->key = *key;
   memcpy (&dm[1], data, size);
   process_queue (h);
   dm->key = *key;
   memcpy (&dm[1], data, size);
   process_queue (h);
@@ -1067,287 +1171,336 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
  * @param cls closure
  * @param msg message received, NULL on timeout or fatal error
  */
  * @param cls closure
  * @param msg message received, NULL on timeout or fatal error
  */
-static void 
-process_result_message (void *cls,
-                       const struct GNUNET_MessageHeader * msg)
+static void
+process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
 {
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
-  struct GNUNET_DATASTORE_Handle *h = qe->h;
-  struct ResultContext rc = qe->qc.rc;
+  struct GNUNET_DATASTORE_Handle *h = cls;
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct ResultContext rc;
   const struct DataMessage *dm;
   int was_transmitted;
 
   const struct DataMessage *dm;
   int was_transmitted;
 
-  h->in_receive = GNUNET_NO;
   if (msg == NULL)
   if (msg == NULL)
+  {
+    qe = h->queue_head;
+    GNUNET_assert (NULL != qe);
+    rc = qe->qc.rc;
+    was_transmitted = qe->was_transmitted;
+    free_queue_entry (qe);
+    if (was_transmitted == GNUNET_YES)
     {
     {
-      was_transmitted = qe->was_transmitted;
-      free_queue_entry (qe);
-      if (was_transmitted == GNUNET_YES)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Failed to receive response from database.\n"));
-         do_disconnect (h);
-       }
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
-      return;
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  _("Failed to receive response from database.\n"));
+      do_disconnect (h);
     }
     }
-  GNUNET_assert (GNUNET_YES == qe->was_transmitted);
-  GNUNET_assert (h->queue_head == qe);
-  if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
+    else
     {
     {
-      GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Received end of result set\n");
-#endif
-      free_queue_entry (qe);
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       process_queue (h);
       process_queue (h);
-      return;
-    }
-  if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
-       (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
-       (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
-    {
-      GNUNET_break (0);
-      free_queue_entry (qe);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
-                NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
-      return;
     }
     }
-  if (rc.iter == NULL)
-    {
-      /* abort iteration */
+    if (rc.proc != NULL)
+      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+               0);
+    return;
+  }
+  if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
+  {
+    GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
+    qe = h->queue_head;
+    rc = qe->qc.rc;
+    GNUNET_assert (GNUNET_YES == qe->was_transmitted);
+    free_queue_entry (qe);
 #if DEBUG_DATASTORE
 #if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Aborting iteration via disconnect (client has cancelled)\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received end of result set, new queue size is %u\n",
+                h->queue_size);
 #endif
 #endif
-      free_queue_entry (qe);
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      return;
-    }
-  dm = (const struct DataMessage*) msg;
+    if (rc.proc != NULL)
+      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+               0);
+    h->retry_time.rel_value = 0;
+    h->result_count = 0;
+    process_queue (h);
+    return;
+  }
+  qe = h->queue_head;
+  GNUNET_assert (NULL != qe);
+  rc = qe->qc.rc;
+  if (GNUNET_YES != qe->was_transmitted)
+  {
+    GNUNET_break (0);
+    free_queue_entry (qe);
+    h->retry_time = GNUNET_TIME_UNIT_ZERO;
+    do_disconnect (h);
+    if (rc.proc != NULL)
+      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+               0);
+    return;
+  }
+  if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
+      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
+      (ntohs (msg->size) !=
+       sizeof (struct DataMessage) +
+       ntohl (((const struct DataMessage *) msg)->size)))
+  {
+    GNUNET_break (0);
+    free_queue_entry (qe);
+    h->retry_time = GNUNET_TIME_UNIT_ZERO;
+    do_disconnect (h);
+    if (rc.proc != NULL)
+      rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
+               0);
+    return;
+  }
+  GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
+                            GNUNET_NO);
+  dm = (const struct DataMessage *) msg;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received result %llu with type %u and size %u with key %s\n",
-             (unsigned long long) GNUNET_ntohll(dm->uid),
-             ntohl(dm->type),
-             ntohl(dm->size),
-             GNUNET_h2s(&dm->key));
+              "Received result %llu with type %u and size %u with key %s\n",
+              (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
+              ntohl (dm->size), GNUNET_h2s (&dm->key));
 #endif
 #endif
-  rc.iter (rc.iter_cls,
-          &dm->key,
-          ntohl(dm->size),
-          &dm[1],
-          ntohl(dm->type),
-          ntohl(dm->priority),
-          ntohl(dm->anonymity),
-          GNUNET_TIME_absolute_ntoh(dm->expiration),   
-          GNUNET_ntohll(dm->uid));
+  free_queue_entry (qe);
+  h->retry_time.rel_value = 0;
+  process_queue (h);
+  if (rc.proc != NULL)
+    rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
+             ntohl (dm->priority), ntohl (dm->anonymity),
+             GNUNET_TIME_absolute_ntoh (dm->expiration),
+             GNUNET_ntohll (dm->uid));
 }
 
 
 /**
 }
 
 
 /**
- * Get a random value from the datastore.
+ * Get a random value from the datastore for content replication.
+ * Returns a single, random value among those with the highest
+ * replication score, lowering positive replication scores by one for
+ * the chosen value (if only content with a replication score exists,
+ * a random value is returned and replication scores are not changed).
  *
  * @param h handle to the datastore
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
  *
  * @param h handle to the datastore
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
-                            unsigned int queue_priority,
-                            unsigned int max_queue_size,
-                            struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                            void *iter_cls)
+GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
+                                      unsigned int queue_priority,
+                                      unsigned int max_queue_size,
+                                      struct GNUNET_TIME_Relative timeout,
+                                      GNUNET_DATASTORE_DatumProcessor proc,
+                                      void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_MessageHeader *m;
   union QueueContext qc;
 
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_MessageHeader *m;
   union QueueContext qc;
 
+  GNUNET_assert (NULL != proc);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to get random entry in %llu ms\n",
-             (unsigned long long) timeout.value);
+              "Asked to get replication entry in %llu ms\n",
+              (unsigned long long) timeout.rel_value);
 #endif
 #endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
-  qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
-                        queue_priority, max_queue_size, timeout,
-                        &process_result_message, &qc);
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
+                         queue_priority, max_queue_size, timeout,
+                         &process_result_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
-    return NULL;    
-  m = (struct GNUNET_MessageHeader*) &qe[1];
-  m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
-  m->size = htons(sizeof (struct GNUNET_MessageHeader));
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry for GET REPLICATION\n");
+#endif
+    return NULL;
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# GET REPLICATION requests executed"), 1,
+                            GNUNET_NO);
+  m = (struct GNUNET_MessageHeader *) &qe[1];
+  m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
+  m->size = htons (sizeof (struct GNUNET_MessageHeader));
   process_queue (h);
   return qe;
 }
 
 
   process_queue (h);
   return qe;
 }
 
 
-
 /**
 /**
- * Iterate over the results for a particular key
- * in the datastore.  The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_get_next" with the given argument.
+ * Get a single zero-anonymity value from the datastore.
  *
  * @param h handle to the datastore
  *
  * @param h handle to the datastore
- * @param key maybe NULL (to match all entries)
- * @param type desired type, 0 for any
+ * @param offset offset of the result (modulo num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param type allowed type for the operation (never zero)
+ * @param proc function to call on a random value; it
+ *        will be called once with a value (if available)
+ *        or with NULL if none value exists.
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
  * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
-                      const GNUNET_HashCode * key,
-                     enum GNUNET_BLOCK_Type type,
-                     unsigned int queue_priority,
-                     unsigned int max_queue_size,
-                     struct GNUNET_TIME_Relative timeout,
-                      GNUNET_DATASTORE_Iterator iter, 
-                     void *iter_cls)
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                     uint64_t offset,
+                                     unsigned int queue_priority,
+                                     unsigned int max_queue_size,
+                                     struct GNUNET_TIME_Relative timeout,
+                                     enum GNUNET_BLOCK_Type type,
+                                     GNUNET_DATASTORE_DatumProcessor proc,
+                                     void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct GetMessage *gm;
+  struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
   union QueueContext qc;
 
+  GNUNET_assert (NULL != proc);
+  GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to look for data of type %u under key `%s'\n",
-             (unsigned int) type,
-             GNUNET_h2s (key));
+              "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
+              (unsigned long long) offset, type,
+              (unsigned long long) timeout.rel_value);
 #endif
 #endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
-  qe = make_queue_entry (h, sizeof(struct GetMessage),
-                        queue_priority, max_queue_size, timeout,
-                        &process_result_message, &qc);
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
+                         queue_priority, max_queue_size, timeout,
+                         &process_result_message, &qc);
   if (qe == NULL)
   if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Could not create queue entry for zero-anonymity procation\n");
+#endif
     return NULL;
     return NULL;
-  gm = (struct GetMessage*) &qe[1];
-  gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
-  gm->type = htonl(type);
-  if (key != NULL)
-    {
-      gm->header.size = htons(sizeof (struct GetMessage));
-      gm->key = *key;
-    }
-  else
-    {
-      gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
-    }
+  }
+  GNUNET_STATISTICS_update (h->stats,
+                            gettext_noop
+                            ("# GET ZERO ANONYMITY requests executed"), 1,
+                            GNUNET_NO);
+  m = (struct GetZeroAnonymityMessage *) &qe[1];
+  m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
+  m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
+  m->type = htonl ((uint32_t) type);
+  m->offset = GNUNET_htonll (offset);
   process_queue (h);
   return qe;
 }
 
 
 /**
   process_queue (h);
   return qe;
 }
 
 
 /**
- * Function called to trigger obtaining the next result
- * from the datastore.
- * 
+ * Get a result for a particular key from the datastore.  The processor
+ * will only be called once.
+ *
  * @param h handle to the datastore
  * @param h handle to the datastore
- * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
- *        iteration (with a final call to "iter" with key/data == NULL).
+ * @param offset offset of the result (modulo num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
+ * @param key maybe NULL (to match all entries)
+ * @param type desired type, 0 for any
+ * @param queue_priority ranking of this request in the priority queue
+ * @param max_queue_size at what queue size should this request be dropped
+ *        (if other requests of higher priority are in the queue)
+ * @param timeout how long to wait at most for a response
+ * @param proc function to call on each matching value;
+ *        will be called once with a NULL value at the end
+ * @param proc_cls closure for proc
+ * @return NULL if the entry was not queued, otherwise a handle that can be used to
+ *         cancel
  */
  */
-void 
-GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
-                          int more)
+struct GNUNET_DATASTORE_QueueEntry *
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
+                          const GNUNET_HashCode * key,
+                          enum GNUNET_BLOCK_Type type,
+                          unsigned int queue_priority,
+                          unsigned int max_queue_size,
+                          struct GNUNET_TIME_Relative timeout,
+                          GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
 {
 {
-  struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-  struct ResultContext rc = qe->qc.rc;
-
-  GNUNET_assert (&process_result_message == qe->response_proc);
-  if (GNUNET_YES == more)
-    {     
-      h->in_receive = GNUNET_YES;
-      GNUNET_CLIENT_receive (h->client,
-                            qe->response_proc,
-                            qe,
-                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
-      return;
-    }
-  free_queue_entry (qe);
-  h->retry_time = GNUNET_TIME_UNIT_ZERO;
-  do_disconnect (h);
-  rc.iter (rc.iter_cls,
-          NULL, 0, NULL, 0, 0, 0, 
-          GNUNET_TIME_UNIT_ZERO_ABS, 0);       
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+  struct GetMessage *gm;
+  union QueueContext qc;
+
+  GNUNET_assert (NULL != proc);
+#if DEBUG_DATASTORE
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Asked to look for data of type %u under key `%s'\n",
+              (unsigned int) type, GNUNET_h2s (key));
+#endif
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
+  qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
+                         max_queue_size, timeout, &process_result_message, &qc);
+  if (qe == NULL)
+  {
+#if DEBUG_DATASTORE
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
+                GNUNET_h2s (key));
+#endif
+    return NULL;
+  }
+  GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
+                            1, GNUNET_NO);
+  gm = (struct GetMessage *) &qe[1];
+  gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
+  gm->type = htonl (type);
+  gm->offset = GNUNET_htonll (offset);
+  if (key != NULL)
+  {
+    gm->header.size = htons (sizeof (struct GetMessage));
+    gm->key = *key;
+  }
+  else
+  {
+    gm->header.size =
+        htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
+  }
+  process_queue (h);
+  return qe;
 }
 
 
 /**
  * Cancel a datastore operation.  The final callback from the
  * operation must not have been done yet.
 }
 
 
 /**
  * Cancel a datastore operation.  The final callback from the
  * operation must not have been done yet.
- * 
+ *
  * @param qe operation to cancel
  */
 void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h;
  * @param qe operation to cancel
  */
 void
 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
 {
   struct GNUNET_DATASTORE_Handle *h;
-  int reconnect;
 
 
+  GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
   h = qe->h;
 #if DEBUG_DATASTORE
   h = qe->h;
 #if DEBUG_DATASTORE
-  GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
-              "Pending DATASTORE request %p cancelled (%d, %d)\n",
-              qe,
-              qe->was_transmitted,
-              h->queue_head == qe);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
+              qe->was_transmitted, h->queue_head == qe);
 #endif
 #endif
-  reconnect = GNUNET_NO;
-  if (GNUNET_YES == qe->was_transmitted) 
-    {
-      if (qe->response_proc == &process_result_message)        
-       {
-         qe->qc.rc.iter = NULL;    
-         if (GNUNET_YES != h->in_receive)
-           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
-         return;
-       }
-      reconnect = GNUNET_YES;
-    }
+  if (GNUNET_YES == qe->was_transmitted)
+  {
+    free_queue_entry (qe);
+    h->skip_next_messages++;
+    return;
+  }
   free_queue_entry (qe);
   free_queue_entry (qe);
-  if (reconnect)
-    {
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-    }
-  else
-    {
-      process_queue (h);
-    }
+  process_queue (h);
 }
 
 
 }