commented out wrong message type
[oweals/gnunet.git] / src / psycstore / plugin_psycstore_mysql.c
index 2ce5775e7bf38da5f159e6e25144ab86bbdfda60..c76e7e6b12d2337ce2e7dcdad624e41494309476 100644 (file)
@@ -90,12 +90,6 @@ struct Plugin
    */
   enum Transactions transaction;
 
-  struct GNUNET_MYSQL_StatementHandle *transaction_begin;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_commit;
-
-  struct GNUNET_MYSQL_StatementHandle *transaction_rollback;
-
   /**
    * Precompiled SQL for channel_key_store()
    */
@@ -301,25 +295,25 @@ database_setup (struct Plugin *plugin)
 
   /* Create tables */
   STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n"
-            " id INT AUTO_INCREMENT,\n"
-            " pub_key BLOB,\n"
-            " max_state_message_id INT,\n"
-            " state_hash_message_id INT,\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
+            " max_state_message_id BIGINT UNSIGNED,\n"
+            " state_hash_message_id BIGINT UNSIGNED,\n"
             " PRIMARY KEY(id),\n"
-            " UNIQUE KEY(pub_key(5))\n"
+            " UNIQUE KEY(pub_key(32))\n"
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n"
-            " id INT AUTO_INCREMENT,\n"
-            " pub_key BLOB,\n"
+            " id BIGINT UNSIGNED AUTO_INCREMENT,\n"
+            " pub_key BLOB(32),\n"
             " PRIMARY KEY(id),\n"
-            " UNIQUE KEY(pub_key(5))\n"
+            " UNIQUE KEY(pub_key(32))\n"
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n"
-            "  channel_id INT NOT NULL REFERENCES channels(id),\n"
-            "  slave_id INT NOT NULL REFERENCES slaves(id),\n"
-            "  did_join INT NOT NULL,\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
+            "  slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n"
+            "  did_join TINYINT NOT NULL,\n"
             "  announced_at BIGINT UNSIGNED NOT NULL,\n"
             "  effective_since BIGINT UNSIGNED NOT NULL,\n"
             "  group_generation BIGINT UNSIGNED NOT NULL\n"
@@ -332,7 +326,7 @@ database_setup (struct Plugin *plugin)
 
   /** @todo messages table: add method_name column */
   STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n"
-            "  channel_id INT NOT NULL REFERENCES channels(id),\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
             "  hop_counter BIGINT UNSIGNED NOT NULL,\n"
             "  signature BLOB,\n"
             "  purpose BLOB,\n"
@@ -348,18 +342,18 @@ database_setup (struct Plugin *plugin)
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n"
-            "  channel_id INT NOT NULL REFERENCES channels(id),\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
             "  name TEXT NOT NULL,\n"
             "  value_current BLOB,\n"
-            "  value_signed BLOB,\n"
-            "  PRIMARY KEY (channel_id, name(5))\n"
+            "  value_signed BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
             ");");
 
   STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n"
-            "  channel_id INT NOT NULL REFERENCES channels(id),\n"
+            "  channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n"
             "  name TEXT NOT NULL,\n"
-            "  value BLOB,\n"
-            "  PRIMARY KEY (channel_id, name(5))\n"
+            "  value BLOB\n"
+            //"  PRIMARY KEY (channel_id, name(255))\n"
             ");");
 #undef STMT_RUN
 
@@ -370,12 +364,6 @@ database_setup (struct Plugin *plugin)
     GNUNET_break (0); \
     return GNUNET_SYSERR; \
   }
-  PREP ("BEGIN",
-        &plugin->transaction_begin);
-  PREP ("COMMIT",
-        &plugin->transaction_commit);
-  PREP ("ROLLBACK;",
-        &plugin->transaction_rollback);
   PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);",
         &plugin->insert_channel_key);
   PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);",
@@ -484,11 +472,11 @@ database_setup (struct Plugin *plugin)
 
   PREP ("REPLACE INTO state\n"
         "  (channel_id, name, value_current, value_signed)\n"
-        "SELECT new.channel_id, new.name,\n"
-        "       new.value_current, old.value_signed\n"
-        "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n"
-        "             AS channel_id,\n"
-        "             ? AS name, ? AS value_current) AS new\n"
+        "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n"
+        "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n"
+        "             (SELECT ?) AS name,\n"
+        "             (SELECT ?) AS value_current\n"
+        "     ) AS new\n"
         "LEFT JOIN (SELECT channel_id, name, value_signed\n"
         "           FROM state) AS old\n"
         "ON new.channel_id = old.channel_id AND new.name = old.name;",
@@ -533,7 +521,7 @@ database_setup (struct Plugin *plugin)
   PREP ("SELECT name, value_current\n"
         "FROM state\n"
         "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n"
-        "      AND (name = ? OR substr(name, 1, ?) = ? || '_');",
+        "      AND (name = ? OR substr(name, 1, ?) = ?);",
         &plugin->select_state_prefix);
 
   PREP ("SELECT name, value_signed\n"
@@ -600,23 +588,9 @@ exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
 static int
 transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_begin;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exexc_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt(stmt)))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed");
     return GNUNET_SYSERR;
   }
 
@@ -631,23 +605,9 @@ transaction_begin (struct Plugin *plugin, enum Transactions transaction)
 static int
 transaction_commit (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_commit;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed");
     return GNUNET_SYSERR;
   }
 
@@ -662,23 +622,9 @@ transaction_commit (struct Plugin *plugin)
 static int
 transaction_rollback (struct Plugin *plugin)
 {
-  struct GNUNET_MYSQL_StatementHandle *stmt = plugin->transaction_rollback;
-
-  struct GNUNET_MY_QueryParam params[] = {
-    GNUNET_MY_query_param_end
-  };
-
-  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params))
+  if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK"))
   {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql exec_prepared", stmt);
-    return GNUNET_SYSERR;
-  }
-
-  if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt)))
-  {
-    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql_stmt_reset", stmt);
+    LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed");
     return GNUNET_SYSERR;
   }
 
@@ -849,8 +795,7 @@ membership_test (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  switch(GNUNET_MY_extract_result (stmt,
-                                results_select))
+  switch (GNUNET_MY_extract_result (stmt, results_select))
   {
     case GNUNET_NO:
       ret = GNUNET_NO;
@@ -960,7 +905,7 @@ static int
 message_add_flags (void *cls,
                    const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                    uint64_t message_id,
-                   uint64_t psycstore_flags)
+                   uint32_t psycstore_flags)
 {
   struct Plugin *plugin = cls;
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags;
@@ -969,7 +914,7 @@ message_add_flags (void *cls,
   int ret = GNUNET_SYSERR;
 
   struct GNUNET_MY_QueryParam params_update[] = {
-    GNUNET_MY_query_param_uint64 (&psycstore_flags),
+    GNUNET_MY_query_param_uint32 (&psycstore_flags),
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&message_id),
     GNUNET_MY_query_param_end
@@ -1037,50 +982,54 @@ fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt,
   };
 
   do
+  {
+    sql_ret = GNUNET_MY_extract_result (stmt, results);
+    switch (sql_ret)
     {
-      sql_ret = GNUNET_MY_extract_result (stmt,
-                                          results);
-      switch (sql_ret)
-        {
-        case GNUNET_NO:
-          if (ret != GNUNET_OK)
-            ret = GNUNET_NO;
-          break;
-
-        case GNUNET_YES:
-          mp = GNUNET_malloc (sizeof (*mp) + buf_size);
-
-          mp->header.size = htons (sizeof (*mp) + buf_size);
-          mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
-          mp->hop_counter = htonl (hop_counter);
-          GNUNET_memcpy (&mp->signature,
-                         signature,
-                         signature_size);
-          GNUNET_memcpy (&mp->purpose,
-                         purpose,
-                         purpose_size);
-          mp->fragment_id = GNUNET_htonll (fragment_id);
-          mp->fragment_offset = GNUNET_htonll (fragment_offset);
-          mp->message_id = GNUNET_htonll (message_id);
-          mp->group_generation = GNUNET_htonll (group_generation);
-          mp->flags = htonl(msg_flags);
-
-          GNUNET_memcpy (&mp[1],
-                         buf,
-                         buf_size);
-          ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
-          if (NULL != returned_fragments)
-            (*returned_fragments)++;
-          GNUNET_MY_cleanup_result (results);
-          break;
-
-        default:
-          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                    "mysql extract_result", stmt);
-        }
+    case GNUNET_NO:
+      if (ret != GNUNET_YES)
+        ret = GNUNET_NO;
+      break;
+
+    case GNUNET_YES:
+      mp = GNUNET_malloc (sizeof (*mp) + buf_size);
+
+      mp->header.size = htons (sizeof (*mp) + buf_size);
+      mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+      mp->hop_counter = htonl (hop_counter);
+      GNUNET_memcpy (&mp->signature,
+                     signature,
+                     signature_size);
+      GNUNET_memcpy (&mp->purpose,
+                     purpose,
+                     purpose_size);
+      mp->fragment_id = GNUNET_htonll (fragment_id);
+      mp->fragment_offset = GNUNET_htonll (fragment_offset);
+      mp->message_id = GNUNET_htonll (message_id);
+      mp->group_generation = GNUNET_htonll (group_generation);
+      mp->flags = htonl(msg_flags);
+
+      GNUNET_memcpy (&mp[1],
+                     buf,
+                     buf_size);
+      ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags);
+      if (NULL != returned_fragments)
+        (*returned_fragments)++;
+      GNUNET_MY_cleanup_result (results);
+      break;
+
+    default:
+      LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+                 "mysql extract_result", stmt);
     }
+  }
   while (GNUNET_YES == sql_ret);
 
+  // for debugging
+  if (GNUNET_NO == ret)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
+               "Empty result set\n");
+
   return ret;
 }
 
@@ -1098,9 +1047,9 @@ fragment_select (struct Plugin *plugin,
 
   sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params);
   switch (sql_ret)
-    {
+  {
     case GNUNET_NO:
-      if (ret != GNUNET_OK)
+      if (ret != GNUNET_YES)
         ret = GNUNET_NO;
       break;
 
@@ -1111,7 +1060,7 @@ fragment_select (struct Plugin *plugin,
     default:
       LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                  "mysql exec_prepared", stmt);
-    }
+  }
   return ret;
 }
 
@@ -1217,6 +1166,10 @@ message_get (void *cls,
   struct Plugin *plugin = cls;
   struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages;
   int ret;
+
+  if (0 == fragment_limit)
+    fragment_limit = UINT64_MAX;
+
   struct GNUNET_MY_QueryParam params_select[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_uint64 (&first_message_id),
@@ -1374,8 +1327,7 @@ counters_message_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1429,8 +1381,7 @@ counters_state_get (void *cls,
     GNUNET_MY_result_spec_end
   };
 
-  ret = GNUNET_MY_extract_result (stmt,
-                                  results_select);
+  ret = GNUNET_MY_extract_result (stmt, results_select);
 
   if (GNUNET_OK != ret)
   {
@@ -1465,7 +1416,7 @@ state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt,
   struct GNUNET_MY_QueryParam params[] = {
     GNUNET_MY_query_param_auto_from_type (channel_key),
     GNUNET_MY_query_param_string (name),
-    GNUNET_MY_query_param_auto_from_type (value),
+    GNUNET_MY_query_param_fixed_size(value, value_size),
     GNUNET_MY_query_param_end
   };
 
@@ -1583,8 +1534,8 @@ state_modify_op (void *cls,
   switch (op)
   {
   case GNUNET_PSYC_OP_ASSIGN:
-    return state_assign (plugin, plugin->insert_state_current, channel_key,
-                         name, value, value_size);
+    return state_assign (plugin, plugin->insert_state_current,
+                         channel_key, name, value, value_size);
 
   default: /** @todo implement more state operations */
     GNUNET_break (0);
@@ -1639,8 +1590,8 @@ state_sync_assign (void *cls,
                 const char *name, const void *value, size_t value_size)
 {
   struct Plugin *plugin = cls;
-  return state_assign (cls, plugin->insert_state_sync, channel_key,
-                       name, value, value_size);
+  return state_assign (cls, plugin->insert_state_sync,
+                       channel_key, name, value, value_size);
 }
 
 
@@ -1756,10 +1707,11 @@ state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
     case GNUNET_NO:
       ret = GNUNET_NO;
       break;
+
     case GNUNET_YES:
-      ret = cb (cb_cls, name, value_current,
-                value_size);
+      ret = cb (cb_cls, name, value_current, value_size);
       break;
+
     default:
       LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
                 "mysql extract_result", stmt);
@@ -1812,30 +1764,29 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
     GNUNET_MY_result_spec_string (&name2),
     GNUNET_MY_result_spec_variable_size (&value_current, &value_size),
     GNUNET_MY_result_spec_end
-  };
+  };;
 
   int sql_ret;
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
   do
   {
-    if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
-    {
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-      break;
-    }
     sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
 
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name2,
-                  value_current,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name2, value_current, value_size);
 
         if (ret != GNUNET_YES)
           sql_ret = GNUNET_NO;
@@ -1843,7 +1794,7 @@ state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_
 
       default:
         LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-              "mysql extract_result", stmt);
+                  "mysql extract_result", stmt);
     }
   }
   while (sql_ret == GNUNET_YES);
@@ -1893,29 +1844,30 @@ state_get_signed (void *cls,
     GNUNET_MY_result_spec_end
   };
 
+  if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
+  {
+    LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+              "mysql exec_prepared", stmt);
+    return GNUNET_SYSERR;
+  }
+
   do
   {
-    if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select))
-    {
-      LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
-                "mysql exec_prepared", stmt);
-      break;
-    }
     sql_ret = GNUNET_MY_extract_result (stmt, results);
     switch (sql_ret)
     {
       case GNUNET_NO:
-        if (ret != GNUNET_OK)
+        if (ret != GNUNET_YES)
           ret = GNUNET_NO;
         break;
+
       case GNUNET_YES:
-        ret = cb (cb_cls, (const char *) name,
-                  value_signed,
-                  value_size);
+        ret = cb (cb_cls, (const char *) name, value_signed, value_size);
 
         if (ret != GNUNET_YES)
             sql_ret = GNUNET_NO;
         break;
+
       default:
          LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
               "mysql extract_result", stmt);