psyc/store: apply state modifiers
authorGabor X Toth <*@tg-x.net>
Sat, 18 Jul 2015 00:03:06 +0000 (00:03 +0000)
committerGabor X Toth <*@tg-x.net>
Sat, 18 Jul 2015 00:03:06 +0000 (00:03 +0000)
src/include/gnunet_psycstore_plugin.h
src/include/gnunet_psycstore_service.h
src/psyc/gnunet-service-psyc.c
src/psyc/psyc_util_lib.c
src/psycstore/Makefile.am
src/psycstore/gnunet-service-psycstore.c
src/psycstore/plugin_psycstore_sqlite.c
src/psycstore/psycstore.h
src/psycstore/psycstore_api.c
src/psycstore/test_plugin_psycstore.c
src/psycstore/test_psycstore.c

index 12f4e692f1a341f52a2ef0a5ffd00f58bc8bf67b..b0bbfd81976b24ab34503880a5d3a2860f36dbe4 100644 (file)
@@ -240,9 +240,10 @@ struct GNUNET_PSYCSTORE_PluginFunctions
    * @return #GNUNET_OK on success, else #GNUNET_SYSERR
    */
   int
-  (*state_modify_set) (void *cls,
-                       const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                       const char *name, const void *value, size_t value_size);
+  (*state_modify_op) (void *cls,
+                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                      enum GNUNET_ENV_Operator op,
+                      const char *name, const void *value, size_t value_size);
 
 
   /**
@@ -270,20 +271,20 @@ struct GNUNET_PSYCSTORE_PluginFunctions
                          const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key);
 
   /**
-   * Set the value of a state variable while synchronizing state.
+   * Assign value of a state variable while synchronizing state.
    *
    * The state synchronization process is started with state_sync_begin(),
    * which is followed by one or more calls to this function,
-   * and finished with state_sync_end().
+   * and finished using state_sync_end().
    *
    * @see GNUNET_PSYCSTORE_state_sync()
    *
    * @return #GNUNET_OK on success, else #GNUNET_SYSERR
    */
   int
-  (*state_sync_set) (void *cls,
-                     const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                     const char *name, const void *value, size_t value_size);
+  (*state_sync_assign) (void *cls,
+                        const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                        const char *name, const void *value, size_t value_size);
 
 
   /**
@@ -296,7 +297,8 @@ struct GNUNET_PSYCSTORE_PluginFunctions
   int
   (*state_sync_end) (void *cls,
                      const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                     uint64_t message_id);
+                     uint64_t max_state_message_id,
+                     uint64_t state_hash_message_id);
 
 
   /**
index 12a375c44553a501215821d7a6795a4669749e1b..8f3866bdbe7d1aba3fb8a345e95161364ae8b2f0 100644 (file)
@@ -494,10 +494,6 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
  *        ID of the message that contains the @a modifiers.
  * @param state_delta
  *        Value of the @e state_delta PSYC header variable of the message.
- * @param modifier_count
- *        Number of elements in the @a modifiers array.
- * @param modifiers
- *        List of modifiers to apply.
  * @param rcb
  *        Callback to call with the result of the operation.
  * @param rcb_cls
@@ -510,8 +506,6 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                                uint64_t message_id,
                                uint64_t state_delta,
-                               size_t modifier_count,
-                               const struct GNUNET_ENV_Modifier *modifiers,
                                GNUNET_PSYCSTORE_ResultCallback rcb,
                                void *rcb_cls);
 
@@ -523,7 +517,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
  *        Handle for the PSYCstore.
  * @param channel_key
  *        The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ *        ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
  *        ID of the message that contains the state_hash PSYC header variable.
  * @param modifier_count
  *        Number of elements in the @a modifiers array.
@@ -539,7 +535,8 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
 struct GNUNET_PSYCSTORE_OperationHandle *
 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
                              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                             uint64_t message_id,
+                             uint64_t max_state_message_id,
+                             uint64_t state_hash_message_id,
                              size_t modifier_count,
                              const struct GNUNET_ENV_Modifier *modifiers,
                              GNUNET_PSYCSTORE_ResultCallback rcb,
index 72377680d3f9e18cb342a73a56fad04884181e9d..29ef07f109c613f18e510ea366247a3bd518058b 100644 (file)
@@ -416,6 +416,8 @@ struct Slave
 static void
 transmit_message (struct Channel *chn);
 
+static uint64_t
+message_queue_run (struct Channel *chn);
 
 static uint64_t
 message_queue_drop (struct Channel *chn);
@@ -1274,6 +1276,39 @@ fragment_queue_run (struct Channel *chn, uint64_t msg_id,
 }
 
 
+struct StateModifyClosure
+{
+  struct Channel *chn;
+  struct FragmentQueue *fragq;
+  uint64_t message_id;
+};
+
+
+void
+store_recv_state_modify_result (void *cls, int64_t result,
+                                const char *err_msg, uint16_t err_msg_size)
+{
+  struct StateModifyClosure *mcls = cls;
+  struct Channel *chn = mcls->chn;
+  struct FragmentQueue *fragq = mcls->fragq;
+  uint64_t msg_id = mcls->message_id;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p GNUNET_PSYCSTORE_state_modify() returned %" PRId64 " (%.*s)\n",
+              chn, result, err_msg_size, err_msg);
+
+  if (GNUNET_OK == result)
+  {
+    chn->max_state_message_id = msg_id;
+    chn->max_message_id = msg_id;
+
+    fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state);
+    GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
+    message_queue_run (chn);
+  }
+}
+
+
 /**
  * Run message queue.
  *
@@ -1294,6 +1329,7 @@ message_queue_run (struct Channel *chn)
               "%p Running message queue.\n", chn);
   uint64_t n = 0;
   uint64_t msg_id;
+
   while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
                                                     &msg_id))
   {
@@ -1325,7 +1361,7 @@ message_queue_run (struct Channel *chn)
                       "%p Out of order message. "
                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
                       chn, msg_id, chn->max_message_id);
-          break;
+          continue;
         }
       }
       else
@@ -1336,14 +1372,19 @@ message_queue_run (struct Channel *chn)
                       "%p Out of order stateful message. "
                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
                       chn, msg_id, fragq->state_delta, chn->max_state_message_id);
-          break;
+          continue;
         }
-#if TODO
-        /* FIXME: apply modifiers to state in PSYCstore */
-        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
-                                       store_recv_state_modify_result, cls);
-#endif
-        chn->max_state_message_id = msg_id;
+
+        struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
+        mcls->chn = chn;
+        mcls->fragq = fragq;
+        mcls->message_id = msg_id;
+
+        /* Apply modifiers to state in PSYCstore */
+        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, msg_id,
+                                       fragq->state_delta,
+                                       store_recv_state_modify_result, mcls);
+        break;
       }
       chn->max_message_id = msg_id;
     }
@@ -1351,6 +1392,7 @@ message_queue_run (struct Channel *chn)
     GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
   return n;
@@ -2039,6 +2081,11 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
     {
       pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
     }
+
+    if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH)
+    {
+      /// @todo add state_hash to PSYC header
+    }
   }
 }
 
index 13d66e6d66185cb689f76abec4ca5faac52c3357..4ad7a914befa1ca438088d0f2079f592bfe5c816 100644 (file)
@@ -343,7 +343,7 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Queueing message part of type %u and size %u (end: %u)).\n",
-       ntohs (msg->type), size, end);
+       NULL != msg ? ntohs (msg->type) : 0, size, end);
 
   if (NULL != tmit->msg)
   {
@@ -917,7 +917,8 @@ GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
     }
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part from PSYC.\n");
+                "Received message part of type %u and size %u from PSYC.\n",
+                ptype, psize);
     GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
 
     switch (ptype)
@@ -1118,7 +1119,7 @@ GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
                   ptype, psize);
       return GNUNET_SYSERR;
     }
-    /* FIXME: check message part order */
+    /** @todo FIXME: check message part order */
   }
   return parts;
 }
index 8804255d2ade7480b149aa83e12fc9ea6ad4a67e..7dac870846a68da6752419c72a5cea1922a3b697 100644 (file)
@@ -49,6 +49,7 @@ gnunet_service_psycstore_SOURCES = \
 gnunet_service_psycstore_LDADD = \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/psyc/libgnunetpsycutil.la \
   $(GN_LIBINTL)
 
 plugin_LTLIBRARIES = \
index 556712df4bd18c3bf7782261584402448f7c18fc..6e40e7849d5791225e88f352bfd111f33b84e9a6 100644 (file)
@@ -32,6 +32,7 @@
 #include "gnunet_constants.h"
 #include "gnunet_protocols.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_psyc_util_lib.h"
 #include "gnunet_psycstore_service.h"
 #include "gnunet_psycstore_plugin.h"
 #include "psycstore.h"
@@ -493,7 +494,136 @@ handle_counters_get (void *cls,
 }
 
 
-/** @todo FIXME: stop processing further state modify messages after an error */
+struct StateModifyClosure
+{
+  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key;
+  struct GNUNET_PSYC_ReceiveHandle *recv;
+  enum GNUNET_PSYC_MessageState msg_state;
+  char mod_oper;
+  char *mod_name;
+  char *mod_value;
+  uint64_t mod_value_size;
+  uint64_t mod_value_remaining;
+};
+
+
+static void
+recv_state_message_part (void *cls, uint64_t message_id, uint64_t data_offset,
+                         uint32_t flags, const struct GNUNET_MessageHeader *msg)
+{
+  struct StateModifyClosure *scls = cls;
+  uint16_t psize;
+  if (NULL == msg)
+  {
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+    return;
+  }
+
+  switch (ntohs (msg->type))
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+  {
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
+    break;
+  }
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+  {
+    struct GNUNET_PSYC_MessageModifier *
+      pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
+    psize = ntohs (pmod->header.size);
+    uint16_t name_size = ntohs (pmod->name_size);
+    uint16_t value_size = ntohs (pmod->value_size);
+
+    const char *name = (const char *) &pmod[1];
+    const void *value = name + name_size;
+
+    if (GNUNET_ENV_OP_SET != pmod->oper)
+    { // Apply non-transient operation.
+      if (psize == sizeof (*pmod) + name_size + value_size)
+      {
+        db->state_modify_op (db->cls, scls->channel_key,
+                             pmod->oper, name, value, value_size);
+      }
+      else
+      {
+        scls->mod_oper = pmod->oper;
+        scls->mod_name = GNUNET_malloc (name_size);
+        memcpy (scls->mod_name, name, name_size);
+
+        scls->mod_value_size = value_size;
+        scls->mod_value = GNUNET_malloc (scls->mod_value_size);
+        scls->mod_value_remaining
+          = scls->mod_value_size - (psize - sizeof (*pmod) - name_size);
+        memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining);
+      }
+    }
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+    break;
+  }
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+    if (GNUNET_ENV_OP_SET != scls->mod_oper)
+    {
+      if (scls->mod_value_remaining == 0)
+      {
+        GNUNET_break_op (0);
+        scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+      }
+      psize = ntohs (msg->size);
+      memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining),
+              &msg[1], psize - sizeof (*msg));
+      scls->mod_value_remaining -= psize - sizeof (*msg);
+      if (0 == scls->mod_value_remaining)
+      {
+        db->state_modify_op (db->cls, scls->channel_key,
+                             scls->mod_oper, scls->mod_name,
+                             scls->mod_value, scls->mod_value_size);
+        GNUNET_free (scls->mod_name);
+        GNUNET_free (scls->mod_value);
+        scls->mod_oper = 0;
+        scls->mod_name = NULL;
+        scls->mod_value = NULL;
+        scls->mod_value_size = 0;
+      }
+    }
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
+    break;
+
+  default:
+    scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+  }
+}
+
+
+static int
+recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
+                     enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+  struct StateModifyClosure *scls = cls;
+
+  if (NULL == scls->recv)
+  {
+    scls->recv = GNUNET_PSYC_receive_create (NULL, &recv_state_message_part,
+                                             scls);
+  }
+
+  const struct GNUNET_PSYC_MessageHeader *
+    pmsg = (const struct GNUNET_PSYC_MessageHeader *) &msg[1];
+  GNUNET_PSYC_receive_message (scls->recv, pmsg);
+
+  return GNUNET_YES;
+}
+
+
 static void
 handle_state_modify (void *cls,
                      struct GNUNET_SERVER_Client *client,
@@ -502,65 +632,36 @@ handle_state_modify (void *cls,
   const struct StateModifyRequest *req
     = (const struct StateModifyRequest *) msg;
 
-  int ret = GNUNET_SYSERR;
-  const char *name = (const char *) &req[1];
-  uint16_t name_size = ntohs (req->name_size);
+  uint64_t message_id = GNUNET_ntohll (req->message_id);
+  uint64_t state_delta = GNUNET_ntohll (req->state_delta);
+  uint64_t ret_frags = 0;
 
-  if (name_size <= 2 || '\0' != name[name_size - 1])
+  struct StateModifyClosure scls = { 0 };
+
+  if (GNUNET_OK != db->state_modify_begin (db->cls, &req->channel_key,
+                                           message_id, state_delta))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                _("Tried to set invalid state variable name!\n"));
-    GNUNET_break_op (0);
+                _("Failed to begin modifying state!\n"));
+    GNUNET_break (0);
   }
-  else
-  {
-    ret = GNUNET_OK;
 
-    if (req->flags & STATE_OP_FIRST)
-    {
-      ret = db->state_modify_begin (db->cls, &req->channel_key,
-                                    GNUNET_ntohll (req->message_id),
-                                    GNUNET_ntohll (req->state_delta));
-    }
-    if (ret != GNUNET_OK)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  _("Failed to begin modifying state!\n"));
-    }
-    else
-    {
-      switch (req->oper)
-      {
-      case GNUNET_ENV_OP_ASSIGN:
-        ret = db->state_modify_set (db->cls, &req->channel_key,
-                                    (const char *) &req[1],
-                                    name + ntohs (req->name_size),
-                                    ntohs (req->header.size) - sizeof (*req)
-                                    - ntohs (req->name_size));
-        break;
-      default:
-#if TODO
-        ret = GNUNET_ENV_operation ((const char *) &req[1],
-                                    current_value, current_value_size,
-                                    req->oper, name + ntohs (req->name_size),
-                                    ntohs (req->header.size) - sizeof (*req)
-                                    - ntohs (req->name_size), &value, &value_size);
-#endif
-        ret = GNUNET_SYSERR;
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    _("Unknown operator: %c\n"), req->oper);
-      }
-    }
+  int ret = db->message_get (db->cls, &req->channel_key,
+                             message_id, message_id,
+                             &ret_frags, &recv_state_fragment, &scls);
 
-    if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
-    {
-      ret = db->state_modify_end (db->cls, &req->channel_key,
-                                  GNUNET_ntohll (req->message_id));
-      if (ret != GNUNET_OK)
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                    _("Failed to end modifying state!\n"));
-    }
+  if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                _("Failed to end modifying state!\n"));
+    GNUNET_break (0);
   }
+
+  if (NULL != scls.recv)
+  {
+    GNUNET_PSYC_receive_destroy (scls.recv);
+  }
+
   send_result_code (client, req->op_id, ret, NULL);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -600,16 +701,17 @@ handle_state_sync (void *cls,
     }
     else
     {
-      ret = db->state_sync_set (db->cls, &req->channel_key, name,
-                                name + ntohs (req->name_size),
-                                ntohs (req->header.size) - sizeof (*req)
-                                - ntohs (req->name_size));
+      ret = db->state_sync_assign (db->cls, &req->channel_key, name,
+                                   name + ntohs (req->name_size),
+                                   ntohs (req->header.size) - sizeof (*req)
+                                   - ntohs (req->name_size));
     }
 
     if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
     {
       ret = db->state_sync_end (db->cls, &req->channel_key,
-                                GNUNET_ntohll (req->message_id));
+                                GNUNET_ntohll (req->max_state_message_id),
+                                GNUNET_ntohll (req->state_hash_message_id));
       if (ret != GNUNET_OK)
         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                     _("Failed to end synchronizing state!\n"));
index 60dc7430389a415903c2f719c1cda2cc6e587403..1abc479d2035924a3d9faf6d411fff8ea825ba88 100644 (file)
@@ -35,6 +35,7 @@
 #include "gnunet_psycstore_service.h"
 #include "gnunet_multicast_service.h"
 #include "gnunet_crypto_lib.h"
+#include "gnunet_env_lib.h"
 #include "psycstore.h"
 #include <sqlite3.h>
 
@@ -172,14 +173,8 @@ struct Plugin
    */
   sqlite3_stmt *update_max_state_message_id;
 
-
   /**
-   * Precompiled SQL for message_modify_begin()
-   */
-  sqlite3_stmt *select_message_state_delta;
-
-  /**
-   * Precompiled SQL for state_modify_set()
+   * Precompiled SQL for state_modify_op()
    */
   sqlite3_stmt *insert_state_current;
 
@@ -353,8 +348,8 @@ database_setup (struct Plugin *plugin)
             "CREATE TABLE IF NOT EXISTS channels (\n"
             "  id INTEGER PRIMARY KEY,\n"
             "  pub_key BLOB UNIQUE,\n"
-            "  max_state_message_id INTEGER,\n"
-            "  state_hash_message_id INTEGER\n"
+            "  max_state_message_id INTEGER,\n" // last applied state message ID
+            "  state_hash_message_id INTEGER\n" // last message ID with a state hash
             ");");
 
   sql_exec (plugin->dbh,
@@ -542,17 +537,6 @@ database_setup (struct Plugin *plugin)
                "WHERE pub_key = ?;",
                &plugin->update_state_hash_message_id);
 
-  sql_prepare (plugin->dbh,
-               "SELECT 1\n"
-               "FROM channels AS c\n"
-               "LEFT JOIN messages AS m\n"
-               "ON c.id = m.channel_id\n"
-               "WHERE c.pub_key = ?\n"
-               "      AND ((? < c.state_hash_message_id AND c.state_hash_message_id < ?)\n"
-               "           OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
-               "LIMIT 1;",
-               &plugin->select_message_state_delta);
-
   sql_prepare (plugin->dbh,
                "INSERT OR REPLACE INTO state\n"
                "  (channel_id, name, value_current, value_signed)\n"
@@ -1447,14 +1431,14 @@ counters_state_get (void *cls,
 
 
 /**
- * Set a state variable to the given value.
+ * Assign a value to a state variable.
  *
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
-           const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-           const char *name, const void *value, size_t value_size)
+state_assign (struct Plugin *plugin, sqlite3_stmt *stmt,
+              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+              const char *name, const void *value, size_t value_size)
 {
   int ret = GNUNET_SYSERR;
 
@@ -1527,50 +1511,25 @@ state_modify_begin (void *cls,
                     uint64_t message_id, uint64_t state_delta)
 {
   struct Plugin *plugin = cls;
-  sqlite3_stmt *stmt = plugin->select_message_state_delta;
 
   if (state_delta > 0)
   {
-    int ret = GNUNET_SYSERR;
-    if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
-                                        sizeof (*channel_key), SQLITE_STATIC)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
-                                            message_id - state_delta)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
-                                            message_id)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
-                                            message_id - state_delta)
-        || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
-                                            GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
-    {
-      LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sqlite3_bind");
-    }
-    else
-    {
-      switch (sqlite3_step (stmt))
-      {
-      case SQLITE_DONE:
-        ret = GNUNET_NO;
-        break;
-      case SQLITE_ROW:
-        ret = GNUNET_OK;
-        break;
-      default:
-        LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                    "sqlite3_step");
-      }
-    }
-    if (SQLITE_OK != sqlite3_reset (stmt))
-    {
-      ret = GNUNET_SYSERR;
-      LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                  "sqlite3_reset");
-     }
+    /**
+     * We can only apply state modifiers in the current message if modifiers in
+     * the previous stateful message (message_id - state_delta) were already
+     * applied.
+     */
+
+    uint64_t max_state_message_id = 0;
+    int ret = counters_state_get (plugin, channel_key, &max_state_message_id);
     if (GNUNET_OK != ret)
       return ret;
+
+    if (message_id - state_delta != max_state_message_id)
+      return GNUNET_NO;
   }
 
+  // Make sure no other transaction is going on.
   if (TRANSACTION_NONE != plugin->transaction)
       if (GNUNET_OK != transaction_rollback (plugin))
           return GNUNET_SYSERR;
@@ -1587,16 +1546,24 @@ state_modify_begin (void *cls,
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_modify_set (void *cls,
-                  const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                  const char *name, const void *value, size_t value_size)
+state_modify_op (void *cls,
+                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
+                 enum GNUNET_ENV_Operator op,
+                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
   GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
 
-  return state_set (plugin, plugin->insert_state_current, channel_key,
-                    name, value, value_size);
+  switch (op)
+  {
+  case GNUNET_ENV_OP_ASSIGN:
+    return state_assign (plugin, plugin->insert_state_current, channel_key,
+                         name, value, value_size);
 
+  /// @todo implement more state operations
+  default:
+    return GNUNET_SYSERR;
+  }
 }
 
 
@@ -1634,20 +1601,20 @@ state_sync_begin (void *cls,
 
 
 /**
- * Set the current value of state variable.
+ * Assign current value of a state variable.
  *
  * @see GNUNET_PSYCSTORE_state_modify()
  *
  * @return #GNUNET_OK on success, else #GNUNET_SYSERR
  */
 static int
-state_sync_set (void *cls,
+state_sync_assign (void *cls,
                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_set (cls, plugin->insert_state_sync, channel_key,
-                    name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync, channel_key,
+                       name, value, value_size);
 }
 
 
@@ -1657,7 +1624,8 @@ state_sync_set (void *cls,
 static int
 state_sync_end (void *cls,
                 const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                uint64_t message_id)
+                uint64_t max_state_message_id,
+                uint64_t state_hash_message_id)
 {
   struct Plugin *plugin = cls;
   int ret = GNUNET_SYSERR;
@@ -1670,7 +1638,10 @@ state_sync_end (void *cls,
                                   channel_key)
     && GNUNET_OK == update_message_id (plugin,
                                        plugin->update_state_hash_message_id,
-                                       channel_key, message_id)
+                                       channel_key, state_hash_message_id)
+    && GNUNET_OK == update_message_id (plugin,
+                                       plugin->update_max_state_message_id,
+                                       channel_key, max_state_message_id)
     && GNUNET_OK == transaction_commit (plugin)
     ? ret = GNUNET_OK
     : transaction_rollback (plugin);
@@ -1679,7 +1650,7 @@ state_sync_end (void *cls,
 
 
 /**
- * Reset the state of a channel.
+ * Delete the whole state.
  *
  * @see GNUNET_PSYCSTORE_state_reset()
  *
@@ -1922,10 +1893,10 @@ libgnunet_plugin_psycstore_sqlite_init (void *cls)
   api->counters_message_get = &counters_message_get;
   api->counters_state_get = &counters_state_get;
   api->state_modify_begin = &state_modify_begin;
-  api->state_modify_set = &state_modify_set;
+  api->state_modify_op = &state_modify_op;
   api->state_modify_end = &state_modify_end;
   api->state_sync_begin = &state_sync_begin;
-  api->state_sync_set = &state_sync_set;
+  api->state_sync_assign = &state_sync_assign;
   api->state_sync_end = &state_sync_end;
   api->state_reset = &state_reset;
   api->state_update_signed = &state_update_signed;
index 807c3c3dac6414c5629d2b896580f1e6cb70e354..67104e8ad9bf238a88c07fb6af7eeb0aecc2ad6d 100644 (file)
@@ -441,35 +441,24 @@ struct StateModifyRequest
   struct GNUNET_MessageHeader header;
 
   /**
-   * Size of name, including NUL terminator.
-   */
-  uint16_t name_size GNUNET_PACKED;
-
-  /**
-   * OR'd StateOpFlags
+   * Operation ID.
    */
-  uint8_t flags;
+  uint64_t op_id GNUNET_PACKED;
 
   /**
-   * enum GNUNET_ENV_Operator
+   * ID of the message to apply the state changes in.
    */
-  uint8_t oper;
+  uint64_t message_id GNUNET_PACKED;
 
   /**
-   * Operation ID.
+   * State delta of the message with ID @a message_id.
    */
-  uint64_t op_id GNUNET_PACKED;
+  uint64_t state_delta GNUNET_PACKED;
 
   /**
    * Channel's public key.
    */
   struct GNUNET_CRYPTO_EddsaPublicKey channel_key;
-
-  uint64_t message_id GNUNET_PACKED;
-
-  uint64_t state_delta GNUNET_PACKED;
-
-  /* Followed by NUL-terminated name, then the value. */
 };
 
 
@@ -495,13 +484,21 @@ struct StateSyncRequest
 
   uint8_t reserved;
 
-  uint64_t message_id GNUNET_PACKED;
-
   /**
    * Operation ID.
    */
   uint64_t op_id GNUNET_PACKED;
 
+  /**
+   * ID of the message that contains the state_hash PSYC header variable.
+   */
+  uint64_t state_hash_message_id GNUNET_PACKED;
+
+  /**
+   * ID of the last stateful message before @a state_hash_message_id.
+   */
+  uint64_t max_state_message_id GNUNET_PACKED;
+
   /**
    * Channel's public key.
    */
index f5210ac76aaa7ca29e7c112090a0e7b96f0fdc23..214d8ba5d2b1b3a6d67dead6e1e2b4bb5cf86511 100644 (file)
@@ -302,16 +302,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
       GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
       if (NULL != op->res_cb)
       {
-        const struct StateModifyRequest *smreq;
         const struct StateSyncRequest *ssreq;
         switch (ntohs (op->msg->type))
         {
-        case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
-          smreq = (const struct StateModifyRequest *) op->msg;
-          if (!(smreq->flags & STATE_OP_LAST
-                || GNUNET_OK != result_code))
-            op->res_cb = NULL;
-          break;
         case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
           ssreq = (const struct StateSyncRequest *) op->msg;
           if (!(ssreq->flags & STATE_OP_LAST
@@ -1234,10 +1227,6 @@ GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h,
  *        ID of the message that contains the @a modifiers.
  * @param state_delta
  *        Value of the _state_delta PSYC header variable of the message.
- * @param modifier_count
- *        Number of elements in the @a modifiers array.
- * @param modifiers
- *        List of modifiers to apply.
  * @param rcb
  *        Callback to call with the result of the operation.
  * @param rcb_cls
@@ -1250,50 +1239,31 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
                                const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                                uint64_t message_id,
                                uint64_t state_delta,
-                               size_t modifier_count,
-                               const struct GNUNET_ENV_Modifier *modifiers,
                                GNUNET_PSYCSTORE_ResultCallback rcb,
                                void *rcb_cls)
 {
   struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
-  size_t i;
+  struct StateModifyRequest *req;
 
-  for (i = 0; i < modifier_count; i++) {
-    struct StateModifyRequest *req;
-    uint16_t name_size = strlen (modifiers[i].name) + 1;
-
-    op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
-                        modifiers[i].value_size);
-    op->h = h;
-    op->res_cb = rcb;
-    op->cls = rcb_cls;
+  op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+  op->h = h;
+  op->res_cb = rcb;
+  op->cls = rcb_cls;
 
-    req = (struct StateModifyRequest *) &op[1];
-    op->msg = (struct GNUNET_MessageHeader *) req;
-    req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
-    req->header.size = htons (sizeof (*req) + name_size
-                              + modifiers[i].value_size);
-    req->channel_key = *channel_key;
-    req->message_id = GNUNET_htonll (message_id);
-    req->state_delta = GNUNET_htonll (state_delta);
-    req->oper = modifiers[i].oper;
-    req->name_size = htons (name_size);
-    req->flags
-      = 0 == i
-      ? STATE_OP_FIRST
-      : modifier_count - 1 == i
-      ? STATE_OP_LAST
-      : 0;
+  req = (struct StateModifyRequest *) &op[1];
+  op->msg = (struct GNUNET_MessageHeader *) req;
+  req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
+  req->header.size = htons (sizeof (*req));
+  req->channel_key = *channel_key;
+  req->message_id = GNUNET_htonll (message_id);
+  req->state_delta = GNUNET_htonll (state_delta);
 
-    memcpy (&req[1], modifiers[i].name, name_size);
-    memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size);
+  op->op_id = get_next_op_id (h);
+  req->op_id = GNUNET_htonll (op->op_id);
 
-    op->op_id = get_next_op_id (h);
-    req->op_id = GNUNET_htonll (op->op_id);
+  GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+  transmit_next (h);
 
-    GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
-    transmit_next (h);
-  }
   return op;
   /* FIXME: only the last operation is returned,
    *        operation_cancel() should be able to cancel all of them.
@@ -1308,7 +1278,9 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
  *        Handle for the PSYCstore.
  * @param channel_key
  *        The channel we are interested in.
- * @param message_id
+ * @param max_state_message_id
+ *        ID of the last stateful message before @a state_hash_message_id.
+ * @param state_hash_message_id
  *        ID of the message that contains the state_hash PSYC header variable.
  * @param modifier_count
  *        Number of elements in the @a modifiers array.
@@ -1324,7 +1296,8 @@ GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
 struct GNUNET_PSYCSTORE_OperationHandle *
 GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
                              const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
-                             uint64_t message_id,
+                             uint64_t max_state_message_id,
+                             uint64_t state_hash_message_id,
                              size_t modifier_count,
                              const struct GNUNET_ENV_Modifier *modifiers,
                              GNUNET_PSYCSTORE_ResultCallback rcb,
@@ -1349,7 +1322,8 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
     req->header.size = htons (sizeof (*req) + name_size
                               + modifiers[i].value_size);
     req->channel_key = *channel_key;
-    req->message_id = GNUNET_htonll (message_id);
+    req->max_state_message_id = GNUNET_htonll (max_state_message_id);
+    req->state_hash_message_id = GNUNET_htonll (state_hash_message_id);
     req->name_size = htons (name_size);
     req->flags
       = (0 == i)
index 9e4def7eac03ceb623459c6ed25fa37a46ae2fae..0a7824929e74dc38e47286bab71ee18a9f6bc07a 100644 (file)
@@ -85,7 +85,7 @@ load_plugin (const struct GNUNET_CONFIGURATION_Handle *cfg)
   struct GNUNET_PSYCSTORE_PluginFunctions *ret;
   char *libname;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, _("Loading `%s' psycstore plugin\n"),
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Loading `%s' psycstore plugin\n"),
               plugin_name);
   GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name);
   if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg)))
@@ -306,15 +306,17 @@ run (void *cls, char *const *args, const char *cfgfile,
 
   message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
   GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
-                                                      message_id, 1));
+                                                      message_id, 0));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_foo",
-                                                    C2ARG("one two three")));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_foo",
+                                                   C2ARG("one two three")));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_foo_bar", slave_key,
-                                                    sizeof (*slave_key)));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_foo_bar", slave_key,
+                                                   sizeof (*slave_key)));
 
   GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
                                                     message_id));
@@ -366,15 +368,16 @@ run (void *cls, char *const *args, const char *cfgfile,
 
   GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key));
 
-  GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
-                                                  "_sync_bar", scls.value[0],
-                                                  scls.value_size[0]));
+  GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+                                                     "_sync_bar", scls.value[0],
+                                                     scls.value_size[0]));
 
-  GNUNET_assert (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
-                                                  "_sync_foo", scls.value[1],
-                                                  scls.value_size[1]));
+  GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key,
+                                                     "_sync_foo", scls.value[1],
+                                                     scls.value_size[1]));
 
   GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key,
+                                                  max_state_msg_id,
                                                   INT64_MAX - 5));
 
   GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key,
@@ -394,11 +397,13 @@ run (void *cls, char *const *args, const char *cfgfile,
 
   message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
   GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
-                                                      message_id, 3));
+                                                      message_id,
+                                                      message_id - max_state_msg_id));
 
-  GNUNET_assert (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
-                                                    "_sync_foo",
-                                                    C2ARG("five six seven")));
+  GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key,
+                                                   GNUNET_ENV_OP_ASSIGN,
+                                                   "_sync_foo",
+                                                   C2ARG("five six seven")));
 
   GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
                                                     message_id));
index c20868cbca4f123070e5f703d6efd9e04af4fad8..c869a862f2b8d2ba461a15bd1787e4a2d6f96c41 100644 (file)
@@ -224,8 +224,8 @@ state_get_result (void *cls, int64_t result,
   scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
 
   scls.name[1] = "_sync_foo";
-  scls.value[1] = "one two three";
-  scls.value_size[1] = sizeof ("one two three") - 1;
+  scls.value[1] = "three two one";
+  scls.value_size[1] = sizeof ("three two one") - 1;
 
   op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
                                           &state_result,
@@ -253,11 +253,11 @@ counters_result (void *cls, int status, uint64_t max_fragment_id,
   GNUNET_assert (result == 1);
 
   scls.n = 0;
-  scls.name[0] = "_bar";
-  scls.value[0] = "four five six";
-  scls.value_size[0] = sizeof ("four five six") - 1;
+  scls.name[0] = "_sync_bar";
+  scls.value[0] = "ten eleven twelve";
+  scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
 
-  op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz",
+  op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_sync_bar_x_yy_zzz",
                                    &state_result, &state_get_result, &scls);
 }
 
@@ -284,22 +284,9 @@ state_sync_result (void *cls, int64_t result,
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
   GNUNET_assert (GNUNET_OK == result);
 
-  modifiers[0] = (struct GNUNET_ENV_Modifier) {
-    .oper = '=',
-    .name = "_sync_foo",
-    .value = "one two three",
-    .value_size = sizeof ("one two three") - 1
-  };
-  modifiers[1] = (struct GNUNET_ENV_Modifier) {
-    .oper = '=',
-    .name = "_bar",
-    .value = "four five six",
-    .value_size = sizeof ("four five six") - 1
-  };
-
   op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
-                                      GNUNET_ntohll (fcls->msg[0]->message_id), 0,
-                                      2, modifiers, state_modify_result, fcls);
+                                      GNUNET_ntohll (fcls->msg[0]->message_id),
+                                      0, state_modify_result, fcls);
 }
 
 
@@ -356,6 +343,7 @@ message_get_latest_result (void *cls, int64_t result,
 
   op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
                                     GNUNET_ntohll (fcls->msg[0]->message_id) + 1,
+                                    GNUNET_ntohll (fcls->msg[0]->message_id) + 2,
                                     2, modifiers, state_sync_result, fcls);
 }