paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / datastore / datastore_api.c
index 3f8ef9db447d836dec31761b6302b3a7e43408f9..00258d7f9a1180fba9044854e871595dd121d8a3 100644 (file)
@@ -2,20 +2,18 @@
      This file is part of GNUnet
      Copyright (C) 2004-2013, 2016 GNUnet e.V.
 
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
 
      GNUnet is distributed in the hope that it will be useful, but
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 /**
@@ -33,6 +31,8 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
 
+#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
+
 /**
  * Collect an instane number of statistics?  May cause excessive IPC.
  */
@@ -137,6 +137,12 @@ struct GNUNET_DATASTORE_QueueEntry
    */
   struct GNUNET_MQ_Envelope *env;
 
+  /**
+   * Task we run if this entry stalls the queue and we
+   * need to warn the user.
+   */
+  struct GNUNET_SCHEDULER_Task *delay_warn_task;
+
   /**
    * Priority in the queue.
    */
@@ -269,10 +275,35 @@ free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
   h->queue_size--;
   if (NULL != qe->env)
     GNUNET_MQ_discard (qe->env);
+  if (NULL != qe->delay_warn_task)
+    GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
   GNUNET_free (qe);
 }
 
 
+/**
+ * Task that logs an error after some time.
+ *
+ * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
+ */
+static void
+delay_warning (void *cls)
+{
+  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
+
+  qe->delay_warn_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+              "Request %p of type %u at head of datastore queue for more than %s\n",
+              qe,
+              (unsigned int) qe->response_type,
+              GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
+                                                      GNUNET_YES));
+  qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
+                                                      &delay_warning,
+                                                      qe);
+}
+
+
 /**
  * Handle error in sending drop request to datastore.
  *
@@ -290,8 +321,14 @@ mq_error_handler (void *cls,
        "MQ error, reconnecting to DATASTORE\n");
   do_disconnect (h);
   qe = h->queue_head;
-  if ( (NULL != qe) &&
-       (NULL == qe->env) )
+  if (NULL == qe)
+    return;
+  if (NULL != qe->delay_warn_task)
+  {
+    GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
+    qe->delay_warn_task = NULL;
+  }
+  if (NULL == qe->env)
   {
     union QueueContext qc = qe->qc;
     uint16_t rt = qe->response_type;
@@ -313,7 +350,11 @@ mq_error_handler (void *cls,
         qc.rc.proc (qc.rc.proc_cls,
                     NULL,
                     0,
-                    NULL, 0, 0, 0,
+                    NULL,
+                    0,
+                    0,
+                    0,
+                    0,
                     GNUNET_TIME_UNIT_ZERO_ABS,
                     0);
       break;
@@ -429,7 +470,11 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
         qe->qc.rc.proc (qe->qc.rc.proc_cls,
                         NULL,
                         0,
-                        NULL, 0, 0, 0,
+                        NULL,
+                        0,
+                        0,
+                        0,
+                        0,
                         GNUNET_TIME_UNIT_ZERO_ABS,
                         0);
       break;
@@ -443,7 +488,7 @@ GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Re-connecting to issue DROP!\n");
     GNUNET_assert (NULL == h->mq);
-    h->mq = GNUNET_CLIENT_connecT (h->cfg,
+    h->mq = GNUNET_CLIENT_connect (h->cfg,
                                    "datastore",
                                    NULL,
                                    &disconnect_on_mq_error,
@@ -498,8 +543,17 @@ make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
   struct GNUNET_DATASTORE_QueueEntry *pos;
   unsigned int c;
 
-  c = 0;
-  pos = h->queue_head;
+  if ( (NULL != h->queue_tail) &&
+       (h->queue_tail->priority >= queue_priority) )
+  {
+    c = h->queue_size;
+    pos = NULL;
+  }
+  else
+  {
+    c = 0;
+    pos = h->queue_head;
+  }
   while ( (NULL != pos) &&
           (c < max_queue_size) &&
           (pos->priority >= queue_priority) )
@@ -585,12 +639,56 @@ process_queue (struct GNUNET_DATASTORE_Handle *h)
          "Not connected\n");
     return;
   }
+  GNUNET_assert (NULL == qe->delay_warn_task);
+  qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
+                                                      &delay_warning,
+                                                      qe);
   GNUNET_MQ_send (h->mq,
                   qe->env);
   qe->env = NULL;
 }
 
 
+/**
+ * Get the entry at the head of the message queue.
+ *
+ * @param h handle to the datastore
+ * @param response_type the expected response type
+ * @return the queue entry
+ */
+static struct GNUNET_DATASTORE_QueueEntry *
+get_queue_head (struct GNUNET_DATASTORE_Handle *h,
+                uint16_t response_type)
+{
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  if (h->skip_next_messages > 0)
+  {
+    h->skip_next_messages--;
+    process_queue (h);
+    return NULL;
+  }
+  qe = h->queue_head;
+  if (NULL == qe)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return NULL;
+  }
+  if (NULL != qe->env)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return NULL;
+  }
+  if (response_type != qe->response_type)
+  {
+    GNUNET_break (0);
+    do_disconnect (h);
+    return NULL;
+  }
+  return qe;
+}
 
 
 /**
@@ -642,30 +740,10 @@ handle_status (void *cls,
   const char *emsg;
   int32_t status = ntohl (sm->status);
 
-  if (h->skip_next_messages > 0)
-  {
-    h->skip_next_messages--;
-    process_queue (h);
-    return;
-  }
-  if (NULL == (qe = h->queue_head))
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
-  if (NULL != qe->env)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
-  if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
+  qe = get_queue_head (h,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
+  if (NULL == qe)
     return;
-  }
   rc = qe->qc.sc;
   free_queue_entry (qe);
   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
@@ -725,30 +803,10 @@ handle_data (void *cls,
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ResultContext rc;
 
-  if (h->skip_next_messages > 0)
-  {
-    process_queue (h);
-    return;
-  }
-  qe = h->queue_head;
+  qe = get_queue_head (h,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
   if (NULL == qe)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
     return;
-  }
-  if (NULL != qe->env)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
-  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
 #if INSANE_STATISTICS
   GNUNET_STATISTICS_update (h->stats,
                             gettext_noop ("# Results received"),
@@ -773,6 +831,7 @@ handle_data (void *cls,
              ntohl (dm->type),
              ntohl (dm->priority),
              ntohl (dm->anonymity),
+             ntohl (dm->replication),
              GNUNET_TIME_absolute_ntoh (dm->expiration),
              GNUNET_ntohll (dm->uid));
 }
@@ -793,31 +852,10 @@ handle_data_end (void *cls,
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ResultContext rc;
 
-  if (h->skip_next_messages > 0)
-  {
-    h->skip_next_messages--;
-    process_queue (h);
-    return;
-  }
-  qe = h->queue_head;
+  qe = get_queue_head (h,
+                       GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
   if (NULL == qe)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
     return;
-  }
-  if (NULL != qe->env)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
-  if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
-  {
-    GNUNET_break (0);
-    do_disconnect (h);
-    return;
-  }
   rc = qe->qc.rc;
   free_queue_entry (qe);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -835,6 +873,7 @@ handle_data_end (void *cls,
              0,
              0,
              0,
+             0,
              GNUNET_TIME_UNIT_ZERO_ABS,
              0);
 }
@@ -848,27 +887,27 @@ handle_data_end (void *cls,
 static void
 try_reconnect (void *cls)
 {
-  GNUNET_MQ_hd_var_size (status,
-                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
-                         struct StatusMessage);
-  GNUNET_MQ_hd_var_size (data,
-                         GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
-                         struct DataMessage);
-  GNUNET_MQ_hd_fixed_size (data_end,
-                           GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
-                           struct GNUNET_MessageHeader);
   struct GNUNET_DATASTORE_Handle *h = cls;
   struct GNUNET_MQ_MessageHandler handlers[] = {
-    make_status_handler (h),
-    make_data_handler (h),
-    make_data_end_handler (h),
+    GNUNET_MQ_hd_var_size (status,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
+                           struct StatusMessage,
+                           h),
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
+                           struct DataMessage,
+                           h),
+    GNUNET_MQ_hd_fixed_size (data_end,
+                             GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
+                             struct GNUNET_MessageHeader,
+                             h),
     GNUNET_MQ_handler_end ()
   };
 
   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
   h->reconnect_task = NULL;
   GNUNET_assert (NULL == h->mq);
-  h->mq = GNUNET_CLIENT_connecT (h->cfg,
+  h->mq = GNUNET_CLIENT_connect (h->cfg,
                                  "datastore",
                                  handlers,
                                  &mq_error_handler,
@@ -949,7 +988,7 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
   struct DataMessage *dm;
   union QueueContext qc;
 
-  if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
   {
     GNUNET_break (0);
     return NULL;
@@ -970,8 +1009,6 @@ GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
   dm->priority = htonl (priority);
   dm->anonymity = htonl (anonymity);
   dm->replication = htonl (replication);
-  dm->reserved = htonl (0);
-  dm->uid = GNUNET_htonll (0);
   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
   dm->key = *key;
   GNUNET_memcpy (&dm[1],
@@ -1125,72 +1162,6 @@ GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
 }
 
 
-/**
- * Update a value in the datastore.
- *
- * @param h handle to the datastore
- * @param uid identifier for the value
- * @param priority how much to increase the priority of the value
- * @param expiration new expiration value should be MAX of existing and this argument
- * @param queue_priority ranking of this request in the priority queue
- * @param max_queue_size at what queue size should this request be dropped
- *        (if other requests of higher priority are in the queue)
- * @param cont continuation to call when done
- * @param cont_cls closure for @a cont
- * @return NULL if the entry was not queued, otherwise a handle that can be used to
- *         cancel; note that even if NULL is returned, the callback will be invoked
- *         (or rather, will already have been invoked)
- */
-struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
-                         uint64_t uid,
-                         uint32_t priority,
-                         struct GNUNET_TIME_Absolute expiration,
-                         unsigned int queue_priority,
-                         unsigned int max_queue_size,
-                         GNUNET_DATASTORE_ContinuationWithStatus cont,
-                         void *cont_cls)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe;
-  struct GNUNET_MQ_Envelope *env;
-  struct UpdateMessage *um;
-  union QueueContext qc;
-
-  if (NULL == cont)
-    cont = &drop_status_cont;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to update entry %llu raising priority by %u and expiration to %s\n",
-       uid,
-       (unsigned int) priority,
-       GNUNET_STRINGS_absolute_time_to_string (expiration));
-  env = GNUNET_MQ_msg (um,
-                       GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
-  um->priority = htonl (priority);
-  um->expiration = GNUNET_TIME_absolute_hton (expiration);
-  um->uid = GNUNET_htonll (uid);
-
-  qc.sc.cont = cont;
-  qc.sc.cont_cls = cont_cls;
-  qe = make_queue_entry (h,
-                         env,
-                         queue_priority,
-                         max_queue_size,
-                         GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
-                         &qc);
-  if (NULL == qe)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Could not create queue entry for UPDATE\n");
-    return NULL;
-  }
-  GNUNET_STATISTICS_update (h->stats,
-                            gettext_noop ("# UPDATE requests executed"), 1,
-                            GNUNET_NO);
-  process_queue (h);
-  return qe;
-}
-
-
 /**
  * Explicitly remove some content from the database.
  * The @a cont continuation will be called with `status`
@@ -1226,7 +1197,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
   struct GNUNET_MQ_Envelope *env;
   union QueueContext qc;
 
-  if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
   {
     GNUNET_break (0);
     return NULL;
@@ -1240,13 +1211,7 @@ GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
   env = GNUNET_MQ_msg_extra (dm,
                              size,
                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
-  dm->rid = htonl (0);
   dm->size = htonl (size);
-  dm->type = htonl (0);
-  dm->priority = htonl (0);
-  dm->anonymity = htonl (0);
-  dm->uid = GNUNET_htonll (0);
-  dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
   dm->key = *key;
   GNUNET_memcpy (&dm[1],
           data,
@@ -1339,10 +1304,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
  * Get a single zero-anonymity value from the datastore.
  *
  * @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- *               a random 64-bit value initially; then increment by
- *               one each time; detect that all results have been found by uid
- *               being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
@@ -1356,7 +1318,7 @@ GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
-                                     uint64_t offset,
+                                     uint64_t next_uid,
                                      unsigned int queue_priority,
                                      unsigned int max_queue_size,
                                      enum GNUNET_BLOCK_Type type,
@@ -1371,13 +1333,12 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
   GNUNET_assert (NULL != proc);
   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to get %llu-th zero-anonymity entry of type %d\n",
-       (unsigned long long) offset,
+       "Asked to get a zero-anonymity entry of type %d\n",
        type);
   env = GNUNET_MQ_msg (m,
                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
   m->type = htonl ((uint32_t) type);
-  m->offset = GNUNET_htonll (offset);
+  m->next_uid = GNUNET_htonll (next_uid);
   qc.rc.proc = proc;
   qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h,
@@ -1406,10 +1367,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  * will only be called once.
  *
  * @param h handle to the datastore
- * @param offset offset of the result (modulo num-results); set to
- *               a random 64-bit value initially; then increment by
- *               one each time; detect that all results have been found by uid
- *               being again the first uid ever returned.
+ * @param next_uid return the result with lowest uid >= next_uid
+ * @param random if true, return a random result instead of using next_uid
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
  * @param queue_priority ranking of this request in the priority queue
@@ -1423,7 +1382,8 @@ GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
-                          uint64_t offset,
+                          uint64_t next_uid,
+                          bool random,
                           const struct GNUNET_HashCode *key,
                           enum GNUNET_BLOCK_Type type,
                           unsigned int queue_priority,
@@ -1447,14 +1407,16 @@ GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
     env = GNUNET_MQ_msg (gm,
                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
     gm->type = htonl (type);
-    gm->offset = GNUNET_htonll (offset);
+    gm->next_uid = GNUNET_htonll (next_uid);
+    gm->random = random;
   }
   else
   {
     env = GNUNET_MQ_msg (gkm,
                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
     gkm->type = htonl (type);
-    gkm->offset = GNUNET_htonll (offset);
+    gkm->next_uid = GNUNET_htonll (next_uid);
+    gkm->random = random;
     gkm->key = *key;
   }
   qc.rc.proc = proc;