first hack at implementing new replication select code
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 #define DEBUG_SQLITE GNUNET_NO
32
33
34 /**
35  * Log an error message at log-level 'level' that indicates
36  * a failure of the command 'cmd' on file 'filename'
37  * with the message given by strerror(errno).
38  */
39 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40
41 #define SELECT_IT_LOW_PRIORITY_1 \
42   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\
43   "ORDER BY hash ASC LIMIT 1"
44
45 #define SELECT_IT_LOW_PRIORITY_2 \
46   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\
47   "ORDER BY prio ASC, hash ASC LIMIT 1"
48
49 #define SELECT_IT_NON_ANONYMOUS_1 \
50   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
51   " ORDER BY hash DESC LIMIT 1"
52
53 #define SELECT_IT_NON_ANONYMOUS_2 \
54   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
55   " ORDER BY prio DESC, hash DESC LIMIT 1"
56
57 #define SELECT_IT_EXPIRATION_TIME_1 \
58   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\
59   " ORDER BY hash ASC LIMIT 1"
60
61 #define SELECT_IT_EXPIRATION_TIME_2 \
62   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\
63   " ORDER BY expire ASC, hash ASC LIMIT 1"
64
65 #define SELECT_IT_MIGRATION_ORDER_1 \
66   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\
67   " ORDER BY hash DESC LIMIT 1"
68
69 #define SELECT_IT_MIGRATION_ORDER_2 \
70   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\
71   " ORDER BY expire DESC, hash DESC LIMIT 1"
72
73
74 #define SELECT_IT_REPLICATION_ORDER \
75   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\
76   " ORDER BY repl DESC, Random() LIMIT 1"
77
78
79 /**
80  * After how many ms "busy" should a DB operation fail for good?
81  * A low value makes sure that we are more responsive to requests
82  * (especially PUTs).  A high value guarantees a higher success
83  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
84  *
85  * The default value of 250ms should ensure that users do not experience
86  * huge latencies while at the same time allowing operations to succeed
87  * with reasonable probability.
88  */
89 #define BUSY_TIMEOUT_MS 250
90
91
92
93 /**
94  * Context for all functions in this plugin.
95  */
96 struct Plugin 
97 {
98   /**
99    * Our execution environment.
100    */
101   struct GNUNET_DATASTORE_PluginEnvironment *env;
102
103   /**
104    * Database filename.
105    */
106   char *fn;
107
108   /**
109    * Native SQLite database handle.
110    */
111   sqlite3 *dbh;
112
113   /**
114    * Precompiled SQL for deletion.
115    */
116   sqlite3_stmt *delRow;
117
118   /**
119    * Precompiled SQL for update.
120    */
121   sqlite3_stmt *updPrio;
122
123   /**
124    * Precompiled SQL for replication decrement.
125    */
126   sqlite3_stmt *updRepl;
127
128   /**
129    * Precompiled SQL for replication decrement.
130    */
131   sqlite3_stmt *selRepl;
132
133   /**
134    * Precompiled SQL for insertion.
135    */
136   sqlite3_stmt *insertContent;
137
138   /**
139    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
140    */
141   struct NextContext *next_task_nc;
142
143   /**
144    * Pending task with scheduler for running the next request.
145    */
146   GNUNET_SCHEDULER_TaskIdentifier next_task;
147
148   /**
149    * Should the database be dropped on shutdown?
150    */
151   int drop_on_shutdown;
152
153 };
154
155
156 /**
157  * @brief Prepare a SQL statement
158  *
159  * @param dbh handle to the database
160  * @param zSql SQL statement, UTF-8 encoded
161  * @param ppStmt set to the prepared statement
162  * @return 0 on success
163  */
164 static int
165 sq_prepare (sqlite3 * dbh, const char *zSql,
166             sqlite3_stmt ** ppStmt)
167 {
168   char *dummy;
169   int result;
170   result = sqlite3_prepare_v2 (dbh,
171                                zSql,
172                                strlen (zSql), ppStmt, (const char **) &dummy);
173 #if DEBUG_SQLITE
174   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
175                    "sqlite",
176                    "Prepared %p: %d\n", *ppStmt, result);
177 #endif
178   return result;
179 }
180
181
182 /**
183  * Create our database indices.
184  * 
185  * @param dbh handle to the database
186  */
187 static void
188 create_indices (sqlite3 * dbh)
189 {
190   /* create indices */
191   sqlite3_exec (dbh,
192                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
193   sqlite3_exec (dbh,
194                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
195                 NULL, NULL);
196   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
197                 NULL);
198   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL,
199                 NULL);
200   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL,
201                 NULL, NULL);
202   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
203                 NULL, NULL, NULL);
204   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL,
205                 NULL, NULL);
206   sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL,
207                 NULL, NULL);
208 }
209
210
211
212 #if 0
213 #define CHECK(a) GNUNET_break(a)
214 #define ENULL NULL
215 #else
216 #define ENULL &e
217 #define ENULL_DEFINED 1
218 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
219 #endif
220
221
222
223
224 /**
225  * Initialize the database connections and associated
226  * data structures (create tables and indices
227  * as needed as well).
228  *
229  * @param cfg our configuration
230  * @param plugin the plugin context (state for this module)
231  * @return GNUNET_OK on success
232  */
233 static int
234 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
235                 struct Plugin *plugin)
236 {
237   sqlite3_stmt *stmt;
238   char *afsdir;
239 #if ENULL_DEFINED
240   char *e;
241 #endif
242   
243   if (GNUNET_OK != 
244       GNUNET_CONFIGURATION_get_value_filename (cfg,
245                                                "datastore-sqlite",
246                                                "FILENAME",
247                                                &afsdir))
248     {
249       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
250                        "sqlite",
251                        _("Option `%s' in section `%s' missing in configuration!\n"),
252                        "FILENAME",
253                        "datastore-sqlite");
254       return GNUNET_SYSERR;
255     }
256   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
257     {
258       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
259         {
260           GNUNET_break (0);
261           GNUNET_free (afsdir);
262           return GNUNET_SYSERR;
263         }
264       /* database is new or got deleted, reset payload to zero! */
265       plugin->env->duc (plugin->env->cls, 0);
266     }
267 #ifdef ENABLE_NLS
268   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
269                                        nl_langinfo (CODESET));
270 #else
271   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
272                                        "UTF-8");   /* good luck */
273 #endif
274   GNUNET_free (afsdir);
275   
276   /* Open database and precompile statements */
277   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
278     {
279       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
280                        "sqlite",
281                        _("Unable to initialize SQLite: %s.\n"),
282                        sqlite3_errmsg (plugin->dbh));
283       return GNUNET_SYSERR;
284     }
285   CHECK (SQLITE_OK ==
286          sqlite3_exec (plugin->dbh,
287                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
288   CHECK (SQLITE_OK ==
289          sqlite3_exec (plugin->dbh,
290                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
291   CHECK (SQLITE_OK ==
292          sqlite3_exec (plugin->dbh,
293                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
294   CHECK (SQLITE_OK ==
295          sqlite3_exec (plugin->dbh,
296                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
297   CHECK (SQLITE_OK ==
298          sqlite3_exec (plugin->dbh, 
299                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
300
301   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
302
303
304   /* We have to do it here, because otherwise precompiling SQL might fail */
305   CHECK (SQLITE_OK ==
306          sq_prepare (plugin->dbh,
307                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
308                      &stmt));
309   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
310        (sqlite3_exec (plugin->dbh,
311                       "CREATE TABLE gn090 ("
312                       "  repl INT4 NOT NULL DEFAULT 0,"
313                       "  type INT4 NOT NULL DEFAULT 0,"
314                       "  prio INT4 NOT NULL DEFAULT 0,"
315                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
316                       "  expire INT8 NOT NULL DEFAULT 0,"
317                       "  hash TEXT NOT NULL DEFAULT '',"
318                       "  vhash TEXT NOT NULL DEFAULT '',"
319                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
320                       NULL) != SQLITE_OK) )
321     {
322       LOG_SQLITE (plugin, NULL,
323                   GNUNET_ERROR_TYPE_ERROR, 
324                   "sqlite3_exec");
325       sqlite3_finalize (stmt);
326       return GNUNET_SYSERR;
327     }
328   sqlite3_finalize (stmt);
329   create_indices (plugin->dbh);
330
331   CHECK (SQLITE_OK ==
332          sq_prepare (plugin->dbh,
333                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
334                      &stmt));
335   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
336        (sqlite3_exec (plugin->dbh,
337                       "CREATE TABLE gn071 ("
338                       "  key TEXT NOT NULL DEFAULT '',"
339                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
340                       NULL) != SQLITE_OK) )
341     {
342       LOG_SQLITE (plugin, NULL,
343                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
344       sqlite3_finalize (stmt);
345       return GNUNET_SYSERR;
346     }
347   sqlite3_finalize (stmt);
348
349   if ((sq_prepare (plugin->dbh,
350                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
351                    "_ROWID_ = ?",
352                    &plugin->updPrio) != SQLITE_OK) ||
353       (sq_prepare (plugin->dbh,
354                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
355                    "_ROWID_ = ?",
356                    &plugin->updRepl) != SQLITE_OK) ||
357       (sq_prepare (plugin->dbh,
358                    SELECT_IT_REPLICATION_ORDER,
359                    &plugin->selRepl) != SQLITE_OK) ||
360       (sq_prepare (plugin->dbh,
361                    "INSERT INTO gn090 (repl, type, prio, "
362                    "anonLevel, expire, hash, vhash, value) VALUES "
363                    "(?, ?, ?, ?, ?, ?, ?, ?)",
364                    &plugin->insertContent) != SQLITE_OK) ||
365       (sq_prepare (plugin->dbh,
366                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
367                    &plugin->delRow) != SQLITE_OK))
368     {
369       LOG_SQLITE (plugin, NULL,
370                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
371       return GNUNET_SYSERR;
372     }
373
374   return GNUNET_OK;
375 }
376
377
378 /**
379  * Shutdown database connection and associate data
380  * structures.
381  * @param plugin the plugin context (state for this module)
382  */
383 static void
384 database_shutdown (struct Plugin *plugin)
385 {
386   int result;
387 #if SQLITE_VERSION_NUMBER >= 3007000
388   sqlite3_stmt *stmt;
389 #endif
390
391   if (plugin->delRow != NULL)
392     sqlite3_finalize (plugin->delRow);
393   if (plugin->updPrio != NULL)
394     sqlite3_finalize (plugin->updPrio);
395   if (plugin->updRepl != NULL)
396     sqlite3_finalize (plugin->updRepl);
397   if (plugin->selRepl != NULL)
398     sqlite3_finalize (plugin->selRepl);
399   if (plugin->insertContent != NULL)
400     sqlite3_finalize (plugin->insertContent);
401   result = sqlite3_close(plugin->dbh);
402 #if SQLITE_VERSION_NUMBER >= 3007000
403   if (result == SQLITE_BUSY)
404     {
405       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
406                        "sqlite",
407                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
408       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
409       while (stmt != NULL)
410         {
411 #if DEBUG_SQLITE
412           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
413                      "sqlite", "Closing statement %p\n", stmt);
414 #endif
415           result = sqlite3_finalize(stmt);
416 #if DEBUG_SQLITE
417           if (result != SQLITE_OK)
418               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
419                                "sqlite",
420                                "Failed to close statement %p: %d\n", stmt, result);
421 #endif
422           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
423         }
424       result = sqlite3_close(plugin->dbh);
425     }
426 #endif
427   if (SQLITE_OK != result)
428       LOG_SQLITE (plugin, NULL,
429                   GNUNET_ERROR_TYPE_ERROR, 
430                   "sqlite3_close");
431
432   GNUNET_free_non_null (plugin->fn);
433 }
434
435
436 /**
437  * Delete the database entry with the given
438  * row identifier.
439  *
440  * @param plugin the plugin context (state for this module)
441  * @param rid the ID of the row to delete
442  */
443 static int
444 delete_by_rowid (struct Plugin* plugin, 
445                  unsigned long long rid)
446 {
447   sqlite3_bind_int64 (plugin->delRow, 1, rid);
448   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
449     {
450       LOG_SQLITE (plugin, NULL,
451                   GNUNET_ERROR_TYPE_ERROR |
452                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
453       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
454           LOG_SQLITE (plugin, NULL,
455                       GNUNET_ERROR_TYPE_ERROR |
456                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
457       return GNUNET_SYSERR;
458     }
459   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
460       LOG_SQLITE (plugin, NULL,
461                   GNUNET_ERROR_TYPE_ERROR |
462                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
463   return GNUNET_OK;
464 }
465
466
467 /**
468  * Context for the universal iterator.
469  */
470 struct NextContext;
471
472 /**
473  * Type of a function that will prepare
474  * the next iteration.
475  *
476  * @param cls closure
477  * @param nc the next context; NULL for the last
478  *         call which gives the callback a chance to
479  *         clean up the closure
480  * @return GNUNET_OK on success, GNUNET_NO if there are
481  *         no more values, GNUNET_SYSERR on error
482  */
483 typedef int (*PrepareFunction)(void *cls,
484                                struct NextContext *nc);
485
486
487 /**
488  * Context we keep for the "next request" callback.
489  */
490 struct NextContext
491 {
492   /**
493    * Internal state.
494    */ 
495   struct Plugin *plugin;
496
497   /**
498    * Function to call on the next value.
499    */
500   PluginIterator iter;
501
502   /**
503    * Closure for iter.
504    */
505   void *iter_cls;
506
507   /**
508    * Function to call to prepare the next
509    * iteration.
510    */
511   PrepareFunction prep;
512
513   /**
514    * Closure for prep.
515    */
516   void *prep_cls;
517
518   /**
519    * Statement that the iterator will get the data
520    * from (updated or set by prep).
521    */ 
522   sqlite3_stmt *stmt;
523
524   /**
525    * Row ID of the last result.
526    */
527   unsigned long long last_rowid;
528
529   /**
530    * Key of the last result.
531    */
532   GNUNET_HashCode lastKey;  
533
534   /**
535    * Expiration time of the last value visited.
536    */
537   struct GNUNET_TIME_Absolute lastExpiration;
538
539   /**
540    * Priority of the last value visited.
541    */ 
542   unsigned int lastPriority; 
543
544   /**
545    * Number of results processed so far.
546    */
547   unsigned int count;
548
549   /**
550    * Set to GNUNET_YES if we must stop now.
551    */
552   int end_it;
553 };
554
555
556 /**
557  * Continuation of "sqlite_next_request".
558  *
559  * @param cls the next context
560  * @param tc the task context (unused)
561  */
562 static void 
563 sqlite_next_request_cont (void *cls,
564                           const struct GNUNET_SCHEDULER_TaskContext *tc)
565 {
566   struct NextContext * nc = cls;
567   struct Plugin *plugin;
568   unsigned long long rowid;
569   sqlite3_stmt *stmtd;
570   int ret;
571   unsigned int type;
572   unsigned int size;
573   unsigned int priority;
574   unsigned int anonymity;
575   struct GNUNET_TIME_Absolute expiration;
576   const GNUNET_HashCode *key;
577   const void *data;
578   
579   plugin = nc->plugin;
580   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
581   plugin->next_task_nc = NULL;
582   if ( (GNUNET_YES == nc->end_it) ||
583        (GNUNET_OK != (nc->prep(nc->prep_cls,
584                                nc))) )
585     {
586     END:
587       nc->iter (nc->iter_cls, 
588                 NULL, NULL, 0, NULL, 0, 0, 0, 
589                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
590       nc->prep (nc->prep_cls, NULL);
591       GNUNET_free (nc);
592       return;
593     }
594
595   rowid = sqlite3_column_int64 (nc->stmt, 6);
596   nc->last_rowid = rowid;
597   type = sqlite3_column_int (nc->stmt, 0);
598   size = sqlite3_column_bytes (nc->stmt, 5);
599   if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode))
600     {
601       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
602                        "sqlite",
603                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
604       if (SQLITE_OK != sqlite3_reset (nc->stmt))
605         LOG_SQLITE (nc->plugin, NULL,
606                     GNUNET_ERROR_TYPE_ERROR |
607                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
608       if (sq_prepare
609           (nc->plugin->dbh,
610            "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?",
611            &stmtd) != SQLITE_OK)
612         {
613           LOG_SQLITE (nc->plugin, NULL,
614                       GNUNET_ERROR_TYPE_ERROR |
615                       GNUNET_ERROR_TYPE_BULK, 
616                       "sq_prepare");
617           goto END;
618         }
619
620       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
621         LOG_SQLITE (nc->plugin, NULL,
622                     GNUNET_ERROR_TYPE_ERROR |
623                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
624       if (SQLITE_DONE != sqlite3_step (stmtd))
625         LOG_SQLITE (nc->plugin, NULL,
626                     GNUNET_ERROR_TYPE_ERROR |
627                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
628       if (SQLITE_OK != sqlite3_finalize (stmtd))
629         LOG_SQLITE (nc->plugin, NULL,
630                     GNUNET_ERROR_TYPE_ERROR |
631                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
632       goto END;
633     }
634
635   priority = sqlite3_column_int (nc->stmt, 1);
636   anonymity = sqlite3_column_int (nc->stmt, 2);
637   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
638   key = sqlite3_column_blob (nc->stmt, 4);
639   nc->lastPriority = priority;
640   nc->lastExpiration = expiration;
641   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
642   data = sqlite3_column_blob (nc->stmt, 5);
643   nc->count++;
644   ret = nc->iter (nc->iter_cls,
645                   nc,
646                   key,
647                   size,
648                   data, 
649                   type,
650                   priority,
651                   anonymity,
652                   expiration,
653                   rowid);
654   if (ret == GNUNET_SYSERR)
655     {
656       nc->end_it = GNUNET_YES;
657       return;
658     }
659 #if DEBUG_SQLITE
660   if (ret == GNUNET_NO)
661     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
662                      "sqlite",
663                      "Asked to remove entry %llu (%u bytes)\n",
664                      (unsigned long long) rowid,
665                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
666 #endif
667   if ( (ret == GNUNET_NO) &&
668        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
669     {
670       plugin->env->duc (plugin->env->cls,
671                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
672 #if DEBUG_SQLITE
673       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
674                        "sqlite",
675                        "Removed entry %llu (%u bytes)\n",
676                        (unsigned long long) rowid,
677                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
678 #endif
679     }
680 }
681
682
683 /**
684  * Function invoked on behalf of a "PluginIterator"
685  * asking the database plugin to call the iterator
686  * with the next item.
687  *
688  * @param next_cls whatever argument was given
689  *        to the PluginIterator as "next_cls".
690  * @param end_it set to GNUNET_YES if we
691  *        should terminate the iteration early
692  *        (iterator should be still called once more
693  *         to signal the end of the iteration).
694  */
695 static void 
696 sqlite_next_request (void *next_cls,
697                      int end_it)
698 {
699   struct NextContext * nc= next_cls;
700
701   if (GNUNET_YES == end_it)
702     nc->end_it = GNUNET_YES;
703   nc->plugin->next_task_nc = nc;
704   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
705                                                     nc);
706 }
707
708
709 /**
710  * Store an item in the datastore.
711  *
712  * @param cls closure
713  * @param key key for the item
714  * @param size number of bytes in data
715  * @param data content stored
716  * @param type type of the content
717  * @param priority priority of the content
718  * @param anonymity anonymity-level for the content
719  * @param replication replication-level for the content
720  * @param expiration expiration time for the content
721  * @param msg set to an error message
722  * @return GNUNET_OK on success
723  */
724 static int
725 sqlite_plugin_put (void *cls,
726                    const GNUNET_HashCode * key,
727                    uint32_t size,
728                    const void *data,
729                    enum GNUNET_BLOCK_Type type,
730                    uint32_t priority,
731                    uint32_t anonymity,
732                    uint32_t replication,
733                    struct GNUNET_TIME_Absolute expiration,
734                    char ** msg)
735 {
736   struct Plugin *plugin = cls;
737   int n;
738   sqlite3_stmt *stmt;
739   GNUNET_HashCode vhash;
740
741 #if DEBUG_SQLITE
742   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
743                    "sqlite",
744                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
745                    type, 
746                    GNUNET_h2s(key),
747                    priority,
748                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
749                    (long long) expiration.abs_value);
750 #endif
751   GNUNET_CRYPTO_hash (data, size, &vhash);
752   stmt = plugin->insertContent;
753   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
754       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
755       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
756       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
757       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
758       (SQLITE_OK !=
759        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
760                           SQLITE_TRANSIENT)) ||
761       (SQLITE_OK !=
762        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
763                           SQLITE_TRANSIENT))
764       || (SQLITE_OK !=
765           sqlite3_bind_blob (stmt, 8, data, size,
766                              SQLITE_TRANSIENT)))
767     {
768       LOG_SQLITE (plugin,
769                   msg,
770                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
771       if (SQLITE_OK != sqlite3_reset (stmt))
772         LOG_SQLITE (plugin, NULL,
773                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
774       return GNUNET_SYSERR;
775     }
776   n = sqlite3_step (stmt);
777   if (n != SQLITE_DONE) 
778     {
779       if (n == SQLITE_BUSY)
780         {
781           LOG_SQLITE (plugin, msg,
782                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
783           sqlite3_reset (stmt);
784           GNUNET_break (0);
785           return GNUNET_NO;
786         }
787       LOG_SQLITE (plugin, msg,
788                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
789       sqlite3_reset (stmt);
790       database_shutdown (plugin);
791       database_setup (plugin->env->cfg,
792                       plugin);
793       return GNUNET_SYSERR;
794     }
795   if (SQLITE_OK != sqlite3_reset (stmt))
796     LOG_SQLITE (plugin, NULL,
797                 GNUNET_ERROR_TYPE_ERROR |
798                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
799   plugin->env->duc (plugin->env->cls,
800                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
801 #if DEBUG_SQLITE
802   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
803                    "sqlite",
804                    "Stored new entry (%u bytes)\n",
805            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
806 #endif
807   return GNUNET_OK;
808 }
809
810
811 /**
812  * Update the priority for a particular key in the datastore.  If
813  * the expiration time in value is different than the time found in
814  * the datastore, the higher value should be kept.  For the
815  * anonymity level, the lower value is to be used.  The specified
816  * priority should be added to the existing priority, ignoring the
817  * priority in value.
818  *
819  * Note that it is possible for multiple values to match this put.
820  * In that case, all of the respective values are updated.
821  *
822  * @param cls the plugin context (state for this module)
823  * @param uid unique identifier of the datum
824  * @param delta by how much should the priority
825  *     change?  If priority + delta < 0 the
826  *     priority should be set to 0 (never go
827  *     negative).
828  * @param expire new expiration time should be the
829  *     MAX of any existing expiration time and
830  *     this value
831  * @param msg set to an error message
832  * @return GNUNET_OK on success
833  */
834 static int
835 sqlite_plugin_update (void *cls,
836                       uint64_t uid,
837                       int delta, struct GNUNET_TIME_Absolute expire,
838                       char **msg)
839 {
840   struct Plugin *plugin = cls;
841   int n;
842
843   sqlite3_bind_int (plugin->updPrio, 1, delta);
844   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
845   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
846   n = sqlite3_step (plugin->updPrio);
847   if (n != SQLITE_DONE)
848     LOG_SQLITE (plugin, msg,
849                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
850                 "sqlite3_step");
851 #if DEBUG_SQLITE
852   else
853     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
854                      "sqlite",
855                      "Block updated\n");
856 #endif
857   sqlite3_reset (plugin->updPrio);
858
859   if (n == SQLITE_BUSY)
860     return GNUNET_NO;
861   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
862 }
863
864
865 /**
866  * Internal context for an iteration.
867  */
868 struct IterContext
869 {
870   /**
871    * FIXME.
872    */
873   sqlite3_stmt *stmt_1;
874
875   /**
876    * FIXME.
877    */
878   sqlite3_stmt *stmt_2;
879
880   /**
881    * FIXME.
882    */
883   int is_asc;
884
885   /**
886    * FIXME.
887    */
888   int is_prio;
889
890   /**
891    * FIXME.
892    */
893   int is_migr;
894
895   /**
896    * FIXME.
897    */
898   int limit_nonanonymous;
899
900   /**
901    * Desired type for blocks returned by this iterator.
902    */
903   enum GNUNET_BLOCK_Type type;
904 };
905
906
907 /**
908  * Prepare our SQL query to obtain the next record from the database.
909  *
910  * @param cls our "struct IterContext"
911  * @param nc NULL to terminate the iteration, otherwise our context for
912  *           getting the next result.
913  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
914  *         GNUNET_SYSERR on error (or end of iteration)
915  */
916 static int
917 iter_next_prepare (void *cls,
918                    struct NextContext *nc)
919 {
920   struct IterContext *ic = cls;
921   struct Plugin *plugin;
922   int ret;
923
924   if (nc == NULL)
925     {
926 #if DEBUG_SQLITE
927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                   "Asked to clean up iterator state.\n");
929 #endif
930       sqlite3_finalize (ic->stmt_1);
931       sqlite3_finalize (ic->stmt_2);
932       return GNUNET_SYSERR;
933     }
934   sqlite3_reset (ic->stmt_1);
935   sqlite3_reset (ic->stmt_2);
936   plugin = nc->plugin;
937   if (ic->is_prio)
938     {
939 #if DEBUG_SQLITE
940       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941                   "Restricting to results larger than the last priority %u\n",
942                   nc->lastPriority);
943 #endif
944       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
945       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
946     }
947   else
948     {
949 #if DEBUG_SQLITE
950       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951                   "Restricting to results larger than the last expiration %llu\n",
952                   (unsigned long long) nc->lastExpiration.abs_value);
953 #endif
954       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value);
955       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value);
956     }
957 #if DEBUG_SQLITE
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Restricting to results larger than the last key `%s'\n",
960               GNUNET_h2s(&nc->lastKey));
961 #endif
962   sqlite3_bind_blob (ic->stmt_1, 2, 
963                      &nc->lastKey, 
964                      sizeof (GNUNET_HashCode),
965                      SQLITE_TRANSIENT);
966   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
967     {      
968 #if DEBUG_SQLITE
969       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
970                   "Result found using iterator 1\n");
971 #endif
972       nc->stmt = ic->stmt_1;
973       return GNUNET_OK;
974     }
975   if (ret != SQLITE_DONE)
976     {
977       LOG_SQLITE (plugin, NULL,
978                   GNUNET_ERROR_TYPE_ERROR |
979                   GNUNET_ERROR_TYPE_BULK,
980                   "sqlite3_step");
981       return GNUNET_SYSERR;
982     }
983   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
984     LOG_SQLITE (plugin, NULL,
985                 GNUNET_ERROR_TYPE_ERROR | 
986                 GNUNET_ERROR_TYPE_BULK, 
987                 "sqlite3_reset");
988   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
989     {
990 #if DEBUG_SQLITE
991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992                   "Result found using iterator 2\n");
993 #endif
994       nc->stmt = ic->stmt_2;
995       return GNUNET_OK;
996     }
997   if (ret != SQLITE_DONE)
998     {
999       LOG_SQLITE (plugin, NULL,
1000                   GNUNET_ERROR_TYPE_ERROR |
1001                   GNUNET_ERROR_TYPE_BULK,
1002                   "sqlite3_step");
1003       return GNUNET_SYSERR;
1004     }
1005   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
1006     LOG_SQLITE (plugin, NULL,
1007                 GNUNET_ERROR_TYPE_ERROR |
1008                 GNUNET_ERROR_TYPE_BULK,
1009                 "sqlite3_reset");
1010 #if DEBUG_SQLITE
1011   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1012               "No result found using either iterator\n");
1013 #endif
1014   return GNUNET_NO;
1015 }
1016
1017
1018 /**
1019  * Call a method for each key in the database and
1020  * call the callback method on it.
1021  *
1022  * @param plugin our plugin context
1023  * @param type entries of which type should be considered?
1024  * @param is_asc are we iterating in ascending order?
1025  * @param is_prio are we iterating by priority (otherwise by expiration)
1026  * @param is_migr are we iterating in migration order?
1027  * @param limit_nonanonymous are we restricting results to those with anonymity
1028  *              level zero?
1029  * @param stmt_str_1 first SQL statement to execute
1030  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1031  * @param iter function to call on each matching value;
1032  *        will be called once with a NULL value at the end
1033  * @param iter_cls closure for iter
1034  */
1035 static void
1036 basic_iter (struct Plugin *plugin,
1037             enum GNUNET_BLOCK_Type type,
1038             int is_asc,
1039             int is_prio,
1040             int is_migr,
1041             int limit_nonanonymous,
1042             const char *stmt_str_1,
1043             const char *stmt_str_2,
1044             PluginIterator iter,
1045             void *iter_cls)
1046 {
1047   struct NextContext *nc;
1048   struct IterContext *ic;
1049   sqlite3_stmt *stmt_1;
1050   sqlite3_stmt *stmt_2;
1051
1052 #if DEBUG_SQLITE
1053   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054               "At %llu, using queries `%s' and `%s'\n",
1055               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value,
1056               stmt_str_1,
1057               stmt_str_2);
1058 #endif
1059   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1060     {
1061       LOG_SQLITE (plugin, NULL,
1062                   GNUNET_ERROR_TYPE_ERROR |
1063                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1064       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1065       return;
1066     }
1067   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1068     {
1069       LOG_SQLITE (plugin, NULL,
1070                   GNUNET_ERROR_TYPE_ERROR |
1071                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1072       sqlite3_finalize (stmt_1);
1073       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1074       return;
1075     }
1076   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1077                       sizeof(struct IterContext));
1078   nc->plugin = plugin;
1079   nc->iter = iter;
1080   nc->iter_cls = iter_cls;
1081   nc->stmt = NULL;
1082   ic = (struct IterContext*) &nc[1];
1083   ic->stmt_1 = stmt_1;
1084   ic->stmt_2 = stmt_2;
1085   ic->type = type;
1086   ic->is_asc = is_asc;
1087   ic->is_prio = is_prio;
1088   ic->is_migr = is_migr;
1089   ic->limit_nonanonymous = limit_nonanonymous;
1090   nc->prep = &iter_next_prepare;
1091   nc->prep_cls = ic;
1092   if (is_asc)
1093     {
1094       nc->lastPriority = 0;
1095       nc->lastExpiration.abs_value = 0;
1096       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1097     }
1098   else
1099     {
1100       nc->lastPriority = 0x7FFFFFFF;
1101       nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL;
1102       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1103     }
1104   sqlite_next_request (nc, GNUNET_NO);
1105 }
1106
1107
1108 /**
1109  * Select a subset of the items in the datastore and call
1110  * the given iterator for each of them.
1111  *
1112  * @param cls our plugin context
1113  * @param type entries of which type should be considered?
1114  *        Use 0 for any type.
1115  * @param iter function to call on each matching value;
1116  *        will be called once with a NULL value at the end
1117  * @param iter_cls closure for iter
1118  */
1119 static void
1120 sqlite_plugin_iter_low_priority (void *cls,
1121                                  enum GNUNET_BLOCK_Type type,
1122                                  PluginIterator iter,
1123                                  void *iter_cls)
1124 {
1125   basic_iter (cls,
1126               type, 
1127               GNUNET_YES, GNUNET_YES, 
1128               GNUNET_NO, GNUNET_NO,
1129               SELECT_IT_LOW_PRIORITY_1,
1130               SELECT_IT_LOW_PRIORITY_2, 
1131               iter, iter_cls);
1132 }
1133
1134
1135 /**
1136  * Select a subset of the items in the datastore and call
1137  * the given iterator for each of them.
1138  *
1139  * @param cls our plugin context
1140  * @param type entries of which type should be considered?
1141  *        Use 0 for any type.
1142  * @param iter function to call on each matching value;
1143  *        will be called once with a NULL value at the end
1144  * @param iter_cls closure for iter
1145  */
1146 static void
1147 sqlite_plugin_iter_zero_anonymity (void *cls,
1148                                    enum GNUNET_BLOCK_Type type,
1149                                    PluginIterator iter,
1150                                    void *iter_cls)
1151 {
1152   struct GNUNET_TIME_Absolute now;
1153   char *q1;
1154   char *q2;
1155
1156   now = GNUNET_TIME_absolute_get ();
1157   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1158                    (unsigned long long) now.abs_value);
1159   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1160                    (unsigned long long) now.abs_value);
1161   basic_iter (cls,
1162               type, 
1163               GNUNET_NO, GNUNET_YES, 
1164               GNUNET_NO, GNUNET_YES,
1165               q1,
1166               q2,
1167               iter, iter_cls);
1168   GNUNET_free (q1);
1169   GNUNET_free (q2);
1170 }
1171
1172
1173
1174 /**
1175  * Select a subset of the items in the datastore and call
1176  * the given iterator for each of them.
1177  *
1178  * @param cls our plugin context
1179  * @param type entries of which type should be considered?
1180  *        Use 0 for any type.
1181  * @param iter function to call on each matching value;
1182  *        will be called once with a NULL value at the end
1183  * @param iter_cls closure for iter
1184  */
1185 static void
1186 sqlite_plugin_iter_ascending_expiration (void *cls,
1187                                          enum GNUNET_BLOCK_Type type,
1188                                          PluginIterator iter,
1189                                          void *iter_cls)
1190 {
1191   struct GNUNET_TIME_Absolute now;
1192   char *q1;
1193   char *q2;
1194
1195   now = GNUNET_TIME_absolute_get ();
1196   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1197                    (unsigned long long) 0*now.abs_value);
1198   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1199                    (unsigned long long) 0*now.abs_value);
1200   basic_iter (cls,
1201               type, 
1202               GNUNET_YES, GNUNET_NO, 
1203               GNUNET_NO, GNUNET_NO,
1204               q1, q2,
1205               iter, iter_cls);
1206   GNUNET_free (q1);
1207   GNUNET_free (q2);
1208 }
1209
1210
1211 /**
1212  * Select a subset of the items in the datastore and call
1213  * the given iterator for each of them.
1214  *
1215  * @param cls our plugin context
1216  * @param type entries of which type should be considered?
1217  *        Use 0 for any type.
1218  * @param iter function to call on each matching value;
1219  *        will be called once with a NULL value at the end
1220  * @param iter_cls closure for iter
1221  */
1222 static void
1223 sqlite_plugin_iter_migration_order (void *cls,
1224                                     enum GNUNET_BLOCK_Type type,
1225                                     PluginIterator iter,
1226                                     void *iter_cls)
1227 {
1228   struct GNUNET_TIME_Absolute now;
1229   char *q;
1230
1231   now = GNUNET_TIME_absolute_get ();
1232   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1233                    (unsigned long long) now.abs_value);
1234   basic_iter (cls,
1235               type, 
1236               GNUNET_NO, GNUNET_NO, 
1237               GNUNET_YES, GNUNET_NO,
1238               SELECT_IT_MIGRATION_ORDER_1,
1239               q,
1240               iter, iter_cls);
1241   GNUNET_free (q);
1242 }
1243
1244
1245 /**
1246  * Call sqlite using the already prepared query to get
1247  * the next result.
1248  *
1249  * @param cls context with the prepared query
1250  * @param nc context with the prepared query
1251  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1252  *        there are no more results 
1253  */
1254 static int
1255 all_next_prepare (void *cls,
1256                   struct NextContext *nc)
1257 {
1258   struct Plugin *plugin;
1259   int ret;
1260
1261   if (nc == NULL)
1262     {
1263 #if DEBUG_SQLITE
1264       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1265                   "Asked to clean up iterator state.\n");
1266 #endif
1267       nc = (struct NextContext *)cls;
1268       if (nc->stmt)
1269           sqlite3_finalize (nc->stmt);
1270       nc->stmt = NULL;
1271       return GNUNET_SYSERR;
1272     }
1273   plugin = nc->plugin;
1274   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1275     {      
1276       return GNUNET_OK;
1277     }
1278   if (ret != SQLITE_DONE)
1279     {
1280       LOG_SQLITE (plugin, NULL,
1281                   GNUNET_ERROR_TYPE_ERROR |
1282                   GNUNET_ERROR_TYPE_BULK,
1283                   "sqlite3_step");
1284       return GNUNET_SYSERR;
1285     }
1286   return GNUNET_NO;
1287 }
1288
1289
1290 /**
1291  * Select a subset of the items in the datastore and call
1292  * the given iterator for each of them.
1293  *
1294  * @param cls our plugin context
1295  * @param type entries of which type should be considered?
1296  *        Use 0 for any type.
1297  * @param iter function to call on each matching value;
1298  *        will be called once with a NULL value at the end
1299  * @param iter_cls closure for iter
1300  */
1301 static void
1302 sqlite_plugin_iter_all_now (void *cls,
1303                             enum GNUNET_BLOCK_Type type,
1304                             PluginIterator iter,
1305                             void *iter_cls)
1306 {
1307   struct Plugin *plugin = cls;
1308   struct NextContext *nc;
1309   sqlite3_stmt *stmt;
1310
1311   if (sq_prepare (plugin->dbh, 
1312                   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090",
1313                   &stmt) != SQLITE_OK)
1314     {
1315       LOG_SQLITE (plugin, NULL,
1316                   GNUNET_ERROR_TYPE_ERROR |
1317                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1318       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1319       return;
1320     }
1321   nc = GNUNET_malloc (sizeof(struct NextContext));
1322   nc->plugin = plugin;
1323   nc->iter = iter;
1324   nc->iter_cls = iter_cls;
1325   nc->stmt = stmt;
1326   nc->prep = &all_next_prepare;
1327   nc->prep_cls = nc;
1328   sqlite_next_request (nc, GNUNET_NO);
1329 }
1330
1331
1332 /**
1333  * FIXME.
1334  */
1335 struct GetNextContext
1336 {
1337
1338   /**
1339    * FIXME.
1340    */
1341   int total;
1342
1343   /**
1344    * FIXME.
1345    */
1346   int off;
1347
1348   /**
1349    * FIXME.
1350    */
1351   int have_vhash;
1352
1353   /**
1354    * FIXME.
1355    */
1356   unsigned int type;
1357
1358   /**
1359    * FIXME.
1360    */
1361   sqlite3_stmt *stmt;
1362
1363   /**
1364    * FIXME.
1365    */
1366   GNUNET_HashCode key;
1367
1368   /**
1369    * FIXME.
1370    */
1371   GNUNET_HashCode vhash;
1372 };
1373
1374
1375
1376 /**
1377  * FIXME.
1378  *
1379  * @param cls our "struct GetNextContext*"
1380  * @param nc FIXME
1381  * @return GNUNET_YES if there are more results, 
1382  *         GNUNET_NO if there are no more results,
1383  *         GNUNET_SYSERR on internal error
1384  */
1385 static int
1386 get_next_prepare (void *cls,
1387                   struct NextContext *nc)
1388 {
1389   struct GetNextContext *gnc = cls;
1390   int sqoff;
1391   int ret;
1392   int limit_off;
1393
1394   if (nc == NULL)
1395     {
1396       sqlite3_finalize (gnc->stmt);
1397       return GNUNET_SYSERR;
1398     }
1399   if (nc->count == gnc->total)
1400     return GNUNET_NO;
1401   if (nc->count + gnc->off == gnc->total)
1402     nc->last_rowid = 0;
1403   if (nc->count == 0)
1404     limit_off = gnc->off;
1405   else
1406     limit_off = 0;
1407   sqoff = 1;
1408   sqlite3_reset (nc->stmt);
1409   ret = sqlite3_bind_blob (nc->stmt,
1410                            sqoff++,
1411                            &gnc->key, 
1412                            sizeof (GNUNET_HashCode),
1413                            SQLITE_TRANSIENT);
1414   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1415     ret = sqlite3_bind_blob (nc->stmt,
1416                              sqoff++,
1417                              &gnc->vhash,
1418                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1419   if ((gnc->type != 0) && (ret == SQLITE_OK))
1420     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1421   if (ret == SQLITE_OK)
1422     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1423   if (ret == SQLITE_OK)
1424     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1425   if (ret != SQLITE_OK)
1426     return GNUNET_SYSERR;
1427   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1428     return GNUNET_NO;
1429   return GNUNET_OK;
1430 }
1431
1432
1433 /**
1434  * Iterate over the results for a particular key
1435  * in the datastore.
1436  *
1437  * @param cls closure
1438  * @param key maybe NULL (to match all entries)
1439  * @param vhash hash of the value, maybe NULL (to
1440  *        match all values that have the right key).
1441  *        Note that for DBlocks there is no difference
1442  *        betwen key and vhash, but for other blocks
1443  *        there may be!
1444  * @param type entries of which type are relevant?
1445  *     Use 0 for any type.
1446  * @param iter function to call on each matching value;
1447  *        will be called once with a NULL value at the end
1448  * @param iter_cls closure for iter
1449  */
1450 static void
1451 sqlite_plugin_get (void *cls,
1452                    const GNUNET_HashCode * key,
1453                    const GNUNET_HashCode * vhash,
1454                    enum GNUNET_BLOCK_Type type,
1455                    PluginIterator iter, void *iter_cls)
1456 {
1457   struct Plugin *plugin = cls;
1458   struct GetNextContext *gpc;
1459   struct NextContext *nc;
1460   int ret;
1461   int total;
1462   sqlite3_stmt *stmt;
1463   char scratch[256];
1464   int sqoff;
1465
1466   GNUNET_assert (iter != NULL);
1467   if (key == NULL)
1468     {
1469       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1470       return;
1471     }
1472   GNUNET_snprintf (scratch, sizeof (scratch),
1473                    "SELECT count(*) FROM gn090 WHERE hash=:1%s%s",
1474                    vhash == NULL ? "" : " AND vhash=:2",
1475                    type == 0 ? "" : (vhash ==
1476                                      NULL) ? " AND type=:2" : " AND type=:3");
1477   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1478     {
1479       LOG_SQLITE (plugin, NULL,
1480                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1481       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1482       return;
1483     }
1484   sqoff = 1;
1485   ret = sqlite3_bind_blob (stmt,
1486                            sqoff++,
1487                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1488   if ((vhash != NULL) && (ret == SQLITE_OK))
1489     ret = sqlite3_bind_blob (stmt,
1490                              sqoff++,
1491                              vhash,
1492                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1493   if ((type != 0) && (ret == SQLITE_OK))
1494     ret = sqlite3_bind_int (stmt, sqoff++, type);
1495   if (SQLITE_OK != ret)
1496     {
1497       LOG_SQLITE (plugin, NULL,
1498                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1499       sqlite3_reset (stmt);
1500       sqlite3_finalize (stmt);
1501       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1502       return;
1503     }
1504   ret = sqlite3_step (stmt);
1505   if (ret != SQLITE_ROW)
1506     {
1507       LOG_SQLITE (plugin, NULL,
1508                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1509                   "sqlite_step");
1510       sqlite3_reset (stmt);
1511       sqlite3_finalize (stmt);
1512       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1513       return;
1514     }
1515   total = sqlite3_column_int (stmt, 0);
1516   sqlite3_reset (stmt);
1517   sqlite3_finalize (stmt);
1518   if (0 == total)
1519     {
1520       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1521       return;
1522     }
1523
1524   GNUNET_snprintf (scratch, sizeof (scratch),
1525                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1526                    "FROM gn090 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1527                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1528                    vhash == NULL ? "" : " AND vhash=:2",
1529                    type == 0 ? "" : (vhash ==
1530                                      NULL) ? " AND type=:2" : " AND type=:3",
1531                    sqoff, sqoff + 1);
1532   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1533     {
1534       LOG_SQLITE (plugin, NULL,
1535                   GNUNET_ERROR_TYPE_ERROR |
1536                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1537       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1538       return;
1539     }
1540   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1541                       sizeof(struct GetNextContext));
1542   nc->plugin = plugin;
1543   nc->iter = iter;
1544   nc->iter_cls = iter_cls;
1545   nc->stmt = stmt;
1546   gpc = (struct GetNextContext*) &nc[1];
1547   gpc->total = total;
1548   gpc->type = type;
1549   gpc->key = *key;
1550   gpc->stmt = stmt; /* alias used for freeing at the end! */
1551   if (NULL != vhash)
1552     {
1553       gpc->have_vhash = GNUNET_YES;
1554       gpc->vhash = *vhash;
1555     }
1556   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1557   nc->prep = &get_next_prepare;
1558   nc->prep_cls = gpc;
1559   sqlite_next_request (nc, GNUNET_NO);
1560 }
1561
1562
1563 /**
1564  * Get a random item for replication.  Returns a single, not expired, random item
1565  * from those with the highest replication counters.  The item's 
1566  * replication counter is decremented by one IF it was positive before.
1567  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1568  *
1569  * @param cls closure
1570  * @param iter function to call the value (once only).
1571  * @param iter_cls closure for iter
1572  */
1573 static void
1574 sqlite_plugin_replication_get (void *cls,
1575                                PluginIterator iter, void *iter_cls)
1576 {
1577   struct Plugin *plugin = cls;
1578   int n;
1579   sqlite3_stmt *stmt;
1580   struct GNUNET_TIME_Absolute expiration;
1581   unsigned long long rowid;
1582
1583 #if DEBUG_SQLITE
1584   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1585                    "sqlite",
1586                    "Getting random block based on replication order.\n");
1587 #endif
1588   stmt = plugin->selRepl;
1589   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value))
1590     {
1591       LOG_SQLITE (plugin, NULL,           
1592                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1593       if (SQLITE_OK != sqlite3_reset (stmt))
1594         LOG_SQLITE (plugin, NULL,
1595                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1596       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1597             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1598       return;
1599     }
1600   n = sqlite3_step (stmt);
1601   switch (n)
1602     {
1603     case SQLITE_ROW:
1604       rowid = sqlite3_column_int64 (stmt, 6);
1605       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1606         {
1607           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
1608                            "sqlite",
1609                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
1610           if (SQLITE_OK != sqlite3_reset (stmt))
1611             LOG_SQLITE (plugin, NULL,
1612                         GNUNET_ERROR_TYPE_ERROR |
1613                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1614           delete_by_rowid (plugin, rowid);
1615           break;
1616         }
1617       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1618       (void) iter (iter_cls,
1619                    NULL,
1620                    sqlite3_column_blob (stmt, 4) /* key */,
1621                    sqlite3_column_bytes (stmt, 5) /* size of data */,
1622                    sqlite3_column_blob (stmt, 5) /* data */, 
1623                    sqlite3_column_int (stmt, 0) /* type */,
1624                    sqlite3_column_int (stmt, 1) /* priority */,
1625                    sqlite3_column_int (stmt, 2) /* anonymity */,
1626                    expiration,
1627                    rowid);
1628       if (SQLITE_OK != sqlite3_reset (stmt))
1629         LOG_SQLITE (plugin, NULL,
1630                     GNUNET_ERROR_TYPE_ERROR |
1631                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1632       return;
1633     case SQLITE_DONE:
1634       /* database must be empty */
1635       if (SQLITE_OK != sqlite3_reset (stmt))
1636         LOG_SQLITE (plugin, NULL,
1637                     GNUNET_ERROR_TYPE_ERROR |
1638                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1639       break;
1640     case SQLITE_BUSY:    
1641     case SQLITE_ERROR:
1642     case SQLITE_MISUSE:
1643     default:
1644       LOG_SQLITE (plugin, NULL,
1645                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
1646                   "sqlite3_step");
1647       (void) sqlite3_reset (stmt);
1648       GNUNET_break (0);
1649       database_shutdown (plugin);
1650       database_setup (plugin->env->cfg,
1651                       plugin);
1652       break;
1653     }
1654   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,             
1655         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1656 }
1657
1658
1659 /**
1660  * Drop database.
1661  *
1662  * @param cls our plugin context
1663  */
1664 static void 
1665 sqlite_plugin_drop (void *cls)
1666 {
1667   struct Plugin *plugin = cls;
1668   plugin->drop_on_shutdown = GNUNET_YES;
1669 }
1670
1671
1672 static unsigned long long
1673 sqlite_plugin_get_size (void *cls)
1674 {
1675   struct Plugin *plugin = cls;
1676   sqlite3_stmt *stmt;
1677   uint64_t pages;
1678   uint64_t page_size;
1679 #if ENULL_DEFINED
1680   char *e;
1681 #endif
1682
1683   if (SQLITE_VERSION_NUMBER < 3006000)
1684     {
1685       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1686                        "datastore-sqlite",
1687                        _("sqlite version to old to determine size, assuming zero\n"));
1688       return 0;
1689     }
1690   CHECK (SQLITE_OK ==
1691          sqlite3_exec (plugin->dbh,
1692                        "VACUUM", NULL, NULL, ENULL));
1693   CHECK (SQLITE_OK ==
1694          sqlite3_exec (plugin->dbh,
1695                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1696   CHECK (SQLITE_OK ==
1697          sq_prepare (plugin->dbh,
1698                      "PRAGMA page_count",
1699                      &stmt));
1700   if (SQLITE_ROW ==
1701       sqlite3_step (stmt))
1702     pages = sqlite3_column_int64 (stmt, 0);
1703   else
1704     pages = 0;
1705   sqlite3_finalize (stmt);
1706   CHECK (SQLITE_OK ==
1707          sq_prepare (plugin->dbh,
1708                      "PRAGMA page_size",
1709                      &stmt));
1710   CHECK (SQLITE_ROW ==
1711          sqlite3_step (stmt));
1712   page_size = sqlite3_column_int64 (stmt, 0);
1713   sqlite3_finalize (stmt);
1714   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1715               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1716               (unsigned long long) pages,
1717               (unsigned long long) page_size);
1718   return  pages * page_size;
1719 }
1720                                          
1721
1722 /**
1723  * Entry point for the plugin.
1724  *
1725  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1726  * @return NULL on error, othrewise the plugin context
1727  */
1728 void *
1729 libgnunet_plugin_datastore_sqlite_init (void *cls)
1730 {
1731   static struct Plugin plugin;
1732   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1733   struct GNUNET_DATASTORE_PluginFunctions *api;
1734
1735   if (plugin.env != NULL)
1736     return NULL; /* can only initialize once! */
1737   memset (&plugin, 0, sizeof(struct Plugin));
1738   plugin.env = env;
1739   if (GNUNET_OK !=
1740       database_setup (env->cfg, &plugin))
1741     {
1742       database_shutdown (&plugin);
1743       return NULL;
1744     }
1745   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1746   api->cls = &plugin;
1747   api->get_size = &sqlite_plugin_get_size;
1748   api->put = &sqlite_plugin_put;
1749   api->next_request = &sqlite_next_request;
1750   api->get = &sqlite_plugin_get;
1751   api->replication_get = &sqlite_plugin_replication_get;
1752   api->update = &sqlite_plugin_update;
1753   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1754   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1755   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1756   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1757   api->iter_all_now = &sqlite_plugin_iter_all_now;
1758   api->drop = &sqlite_plugin_drop;
1759   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1760                    "sqlite", _("Sqlite database running\n"));
1761   return api;
1762 }
1763
1764
1765 /**
1766  * Exit point from the plugin.
1767  *
1768  * @param cls the plugin context (as returned by "init")
1769  * @return always NULL
1770  */
1771 void *
1772 libgnunet_plugin_datastore_sqlite_done (void *cls)
1773 {
1774   char *fn;
1775   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1776   struct Plugin *plugin = api->cls;
1777
1778 #if DEBUG_SQLITE
1779   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1780                    "sqlite",
1781                    "sqlite plugin is doneing\n");
1782 #endif
1783
1784   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1785     {
1786 #if DEBUG_SQLITE
1787       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1788                        "sqlite",
1789                        "Canceling next task\n");
1790 #endif
1791       GNUNET_SCHEDULER_cancel (plugin->next_task);
1792       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1793 #if DEBUG_SQLITE
1794       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1795                        "sqlite",
1796                        "Prep'ing next task\n");
1797 #endif
1798       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1799       GNUNET_free (plugin->next_task_nc);
1800       plugin->next_task_nc = NULL;
1801     }
1802   fn = NULL;
1803   if (plugin->drop_on_shutdown)
1804     fn = GNUNET_strdup (plugin->fn);
1805 #if DEBUG_SQLITE
1806   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1807                    "sqlite",
1808                    "Shutting down database\n");
1809 #endif
1810   database_shutdown (plugin);
1811   plugin->env = NULL; 
1812   GNUNET_free (api);
1813   if (fn != NULL)
1814     {
1815       if (0 != UNLINK(fn))
1816         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1817                                   "unlink",
1818                                   fn);
1819       GNUNET_free (fn);
1820     }
1821 #if DEBUG_SQLITE
1822   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1823                    "sqlite",
1824                    "sqlite plugin is finished doneing\n");
1825 #endif
1826   return NULL;
1827 }
1828
1829 /* end of plugin_datastore_sqlite.c */