sqlite clean up
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 /**
32  * Enable or disable logging debug messages.
33  */
34 #define DEBUG_SQLITE GNUNET_NO
35
36 /**
37  * We allocate items on the stack at times.  To prevent a stack
38  * overflow, we impose a limit on the maximum size for the data per
39  * item.  64k should be enough.
40  */
41 #define MAX_ITEM_SIZE 65536
42
43 /**
44  * After how many ms "busy" should a DB operation fail for good?
45  * A low value makes sure that we are more responsive to requests
46  * (especially PUTs).  A high value guarantees a higher success
47  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
48  *
49  * The default value of 250ms should ensure that users do not experience
50  * huge latencies while at the same time allowing operations to succeed
51  * with reasonable probability.
52  */
53 #define BUSY_TIMEOUT_MS 250
54
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
62
63
64
65 /**
66  * Context for all functions in this plugin.
67  */
68 struct Plugin 
69 {
70   /**
71    * Our execution environment.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment *env;
74
75   /**
76    * Database filename.
77    */
78   char *fn;
79
80   /**
81    * Native SQLite database handle.
82    */
83   sqlite3 *dbh;
84
85   /**
86    * Precompiled SQL for deletion.
87    */
88   sqlite3_stmt *delRow;
89
90   /**
91    * Precompiled SQL for update.
92    */
93   sqlite3_stmt *updPrio;
94
95   /**
96    * Precompiled SQL for replication decrement.
97    */
98   sqlite3_stmt *updRepl;
99
100   /**
101    * Precompiled SQL for replication selection.
102    */
103   sqlite3_stmt *selRepl;
104
105   /**
106    * Precompiled SQL for expiration selection.
107    */
108   sqlite3_stmt *selExpi;
109
110   /**
111    * Precompiled SQL for insertion.
112    */
113   sqlite3_stmt *insertContent;
114
115   /**
116    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
117    */
118   struct NextContext *next_task_nc;
119
120   /**
121    * Pending task with scheduler for running the next request.
122    */
123   GNUNET_SCHEDULER_TaskIdentifier next_task;
124
125   /**
126    * Should the database be dropped on shutdown?
127    */
128   int drop_on_shutdown;
129
130 };
131
132
133 /**
134  * @brief Prepare a SQL statement
135  *
136  * @param dbh handle to the database
137  * @param zSql SQL statement, UTF-8 encoded
138  * @param ppStmt set to the prepared statement
139  * @return 0 on success
140  */
141 static int
142 sq_prepare (sqlite3 * dbh, 
143             const char *zSql,
144             sqlite3_stmt ** ppStmt)
145 {
146   char *dummy;
147   int result;
148
149   result = sqlite3_prepare_v2 (dbh,
150                                zSql,
151                                strlen (zSql), 
152                                ppStmt,
153                                (const char **) &dummy);
154 #if DEBUG_SQLITE && 0
155   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
156                    "sqlite",
157                    "Prepared `%s' / %p: %d\n",
158                    zSql,
159                    *ppStmt, 
160                    result);
161 #endif
162   return result;
163 }
164
165
166 /**
167  * Create our database indices.
168  * 
169  * @param dbh handle to the database
170  */
171 static void
172 create_indices (sqlite3 * dbh)
173 {
174   /* create indices */
175   sqlite3_exec (dbh,
176                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
177   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
178                 NULL);
179   sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL,
180                 NULL);
181   sqlite3_exec (dbh,
182                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
183                 NULL, NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)",
185                 NULL, NULL, NULL);
186 }
187
188
189 #if 0
190 #define CHECK(a) GNUNET_break(a)
191 #define ENULL NULL
192 #else
193 #define ENULL &e
194 #define ENULL_DEFINED 1
195 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
196 #endif
197
198
199 /**
200  * Initialize the database connections and associated
201  * data structures (create tables and indices
202  * as needed as well).
203  *
204  * @param cfg our configuration
205  * @param plugin the plugin context (state for this module)
206  * @return GNUNET_OK on success
207  */
208 static int
209 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
210                 struct Plugin *plugin)
211 {
212   sqlite3_stmt *stmt;
213   char *afsdir;
214 #if ENULL_DEFINED
215   char *e;
216 #endif
217   
218   if (GNUNET_OK != 
219       GNUNET_CONFIGURATION_get_value_filename (cfg,
220                                                "datastore-sqlite",
221                                                "FILENAME",
222                                                &afsdir))
223     {
224       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
225                        "sqlite",
226                        _("Option `%s' in section `%s' missing in configuration!\n"),
227                        "FILENAME",
228                        "datastore-sqlite");
229       return GNUNET_SYSERR;
230     }
231   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
232     {
233       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
234         {
235           GNUNET_break (0);
236           GNUNET_free (afsdir);
237           return GNUNET_SYSERR;
238         }
239       /* database is new or got deleted, reset payload to zero! */
240       plugin->env->duc (plugin->env->cls, 0);
241     }
242 #ifdef ENABLE_NLS
243   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
244                                        nl_langinfo (CODESET));
245 #else
246   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
247                                        "UTF-8");   /* good luck */
248 #endif
249   GNUNET_free (afsdir);
250   
251   /* Open database and precompile statements */
252   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
253     {
254       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
255                        "sqlite",
256                        _("Unable to initialize SQLite: %s.\n"),
257                        sqlite3_errmsg (plugin->dbh));
258       return GNUNET_SYSERR;
259     }
260   CHECK (SQLITE_OK ==
261          sqlite3_exec (plugin->dbh,
262                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
263   CHECK (SQLITE_OK ==
264          sqlite3_exec (plugin->dbh,
265                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
266   CHECK (SQLITE_OK ==
267          sqlite3_exec (plugin->dbh,
268                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
269   CHECK (SQLITE_OK ==
270          sqlite3_exec (plugin->dbh,
271                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
272   CHECK (SQLITE_OK ==
273          sqlite3_exec (plugin->dbh, 
274                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
275
276   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
277
278
279   /* We have to do it here, because otherwise precompiling SQL might fail */
280   CHECK (SQLITE_OK ==
281          sq_prepare (plugin->dbh,
282                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
283                      &stmt));
284   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
285        (sqlite3_exec (plugin->dbh,
286                       "CREATE TABLE gn090 ("
287                       "  repl INT4 NOT NULL DEFAULT 0,"
288                       "  type INT4 NOT NULL DEFAULT 0,"
289                       "  prio INT4 NOT NULL DEFAULT 0,"
290                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
291                       "  expire INT8 NOT NULL DEFAULT 0,"
292                       "  hash TEXT NOT NULL DEFAULT '',"
293                       "  vhash TEXT NOT NULL DEFAULT '',"
294                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
295                       NULL) != SQLITE_OK) )
296     {
297       LOG_SQLITE (plugin, NULL,
298                   GNUNET_ERROR_TYPE_ERROR, 
299                   "sqlite3_exec");
300       sqlite3_finalize (stmt);
301       return GNUNET_SYSERR;
302     }
303   sqlite3_finalize (stmt);
304   create_indices (plugin->dbh);
305
306   CHECK (SQLITE_OK ==
307          sq_prepare (plugin->dbh,
308                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
309                      &stmt));
310   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
311        (sqlite3_exec (plugin->dbh,
312                       "CREATE TABLE gn071 ("
313                       "  key TEXT NOT NULL DEFAULT '',"
314                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
315                       NULL) != SQLITE_OK) )
316     {
317       LOG_SQLITE (plugin, NULL,
318                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
319       sqlite3_finalize (stmt);
320       return GNUNET_SYSERR;
321     }
322   sqlite3_finalize (stmt);
323
324   if ((sq_prepare (plugin->dbh,
325                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
326                    "_ROWID_ = ?",
327                    &plugin->updPrio) != SQLITE_OK) ||
328       (sq_prepare (plugin->dbh,
329                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
330                    "_ROWID_ = ?",
331                    &plugin->updRepl) != SQLITE_OK) ||
332       (sq_prepare (plugin->dbh,
333                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "
334                    " ORDER BY repl DESC, Random() LIMIT 1",
335                    &plugin->selRepl) != SQLITE_OK) ||
336       (sq_prepare (plugin->dbh,
337                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "
338                    " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "
339                    " ORDER BY prio ASC LIMIT 1",
340                    &plugin->selExpi) != SQLITE_OK) ||
341       (sq_prepare (plugin->dbh,
342                    "INSERT INTO gn090 (repl, type, prio, "
343                    "anonLevel, expire, hash, vhash, value) VALUES "
344                    "(?, ?, ?, ?, ?, ?, ?, ?)",
345                    &plugin->insertContent) != SQLITE_OK) ||
346       (sq_prepare (plugin->dbh,
347                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
348                    &plugin->delRow) != SQLITE_OK))
349     {
350       LOG_SQLITE (plugin, NULL,
351                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
352       return GNUNET_SYSERR;
353     }
354
355   return GNUNET_OK;
356 }
357
358
359 /**
360  * Shutdown database connection and associate data
361  * structures.
362  * @param plugin the plugin context (state for this module)
363  */
364 static void
365 database_shutdown (struct Plugin *plugin)
366 {
367   int result;
368 #if SQLITE_VERSION_NUMBER >= 3007000
369   sqlite3_stmt *stmt;
370 #endif
371
372   if (plugin->delRow != NULL)
373     sqlite3_finalize (plugin->delRow);
374   if (plugin->updPrio != NULL)
375     sqlite3_finalize (plugin->updPrio);
376   if (plugin->updRepl != NULL)
377     sqlite3_finalize (plugin->updRepl);
378   if (plugin->selRepl != NULL)
379     sqlite3_finalize (plugin->selRepl);
380   if (plugin->selExpi != NULL)
381     sqlite3_finalize (plugin->selExpi);
382   if (plugin->insertContent != NULL)
383     sqlite3_finalize (plugin->insertContent);
384   result = sqlite3_close(plugin->dbh);
385 #if SQLITE_VERSION_NUMBER >= 3007000
386   if (result == SQLITE_BUSY)
387     {
388       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
389                        "sqlite",
390                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
391       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
392       while (stmt != NULL)
393         {
394 #if DEBUG_SQLITE
395           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
396                      "sqlite", "Closing statement %p\n", stmt);
397 #endif
398           result = sqlite3_finalize(stmt);
399 #if DEBUG_SQLITE
400           if (result != SQLITE_OK)
401               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
402                                "sqlite",
403                                "Failed to close statement %p: %d\n", stmt, result);
404 #endif
405           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
406         }
407       result = sqlite3_close(plugin->dbh);
408     }
409 #endif
410   if (SQLITE_OK != result)
411       LOG_SQLITE (plugin, NULL,
412                   GNUNET_ERROR_TYPE_ERROR, 
413                   "sqlite3_close");
414
415   GNUNET_free_non_null (plugin->fn);
416 }
417
418
419 /**
420  * Delete the database entry with the given
421  * row identifier.
422  *
423  * @param plugin the plugin context (state for this module)
424  * @param rid the ID of the row to delete
425  */
426 static int
427 delete_by_rowid (struct Plugin* plugin, 
428                  unsigned long long rid)
429 {
430   sqlite3_bind_int64 (plugin->delRow, 1, rid);
431   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
432     {
433       LOG_SQLITE (plugin, NULL,
434                   GNUNET_ERROR_TYPE_ERROR |
435                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
436       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
437           LOG_SQLITE (plugin, NULL,
438                       GNUNET_ERROR_TYPE_ERROR |
439                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
440       return GNUNET_SYSERR;
441     }
442   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
443     LOG_SQLITE (plugin, NULL,
444                 GNUNET_ERROR_TYPE_ERROR |
445                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
446   return GNUNET_OK;
447 }
448
449
450 /**
451  * Context for the universal iterator.
452  */
453 struct NextContext;
454
455 /**
456  * Type of a function that will prepare
457  * the next iteration.
458  *
459  * @param cls closure
460  * @param nc the next context; NULL for the last
461  *         call which gives the callback a chance to
462  *         clean up the closure
463  * @return GNUNET_OK on success, GNUNET_NO if there are
464  *         no more values, GNUNET_SYSERR on error
465  */
466 typedef int (*PrepareFunction)(void *cls,
467                                struct NextContext *nc);
468
469
470 /**
471  * Context we keep for the "next request" callback.
472  */
473 struct NextContext
474 {
475   /**
476    * Internal state.
477    */ 
478   struct Plugin *plugin;
479
480   /**
481    * Function to call on the next value.
482    */
483   PluginIterator iter;
484
485   /**
486    * Closure for iter.
487    */
488   void *iter_cls;
489
490   /**
491    * Function to call to prepare the next
492    * iteration.
493    */
494   PrepareFunction prep;
495
496   /**
497    * Closure for prep.
498    */
499   void *prep_cls;
500
501   /**
502    * Statement that the iterator will get the data
503    * from (updated or set by prep).
504    */ 
505   sqlite3_stmt *stmt;
506
507   /**
508    * Row ID of the last result.
509    */
510   unsigned long long last_rowid;
511
512   /**
513    * Key of the last result.
514    */
515   GNUNET_HashCode lastKey;  
516
517   /**
518    * Priority of the last value visited.
519    */ 
520   unsigned int lastPriority; 
521
522   /**
523    * Number of results processed so far.
524    */
525   unsigned int count;
526
527   /**
528    * Set to GNUNET_YES if we must stop now.
529    */
530   int end_it;
531 };
532
533
534 /**
535  * Continuation of "sqlite_next_request".
536  *
537  * @param cls the 'struct NextContext*'
538  * @param tc the task context (unused)
539  */
540 static void 
541 sqlite_next_request_cont (void *cls,
542                           const struct GNUNET_SCHEDULER_TaskContext *tc)
543 {
544   struct NextContext * nc = cls;
545   struct Plugin *plugin;
546   unsigned long long rowid;
547   int ret;
548   unsigned int size;
549   unsigned int hsize;
550   uint32_t anonymity;
551   uint32_t priority;
552   enum GNUNET_BLOCK_Type type;
553   const GNUNET_HashCode *key;
554   struct GNUNET_TIME_Absolute expiration;
555   
556   plugin = nc->plugin;
557   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
558   plugin->next_task_nc = NULL;
559   if ( (GNUNET_YES == nc->end_it) ||
560        (GNUNET_OK != (nc->prep(nc->prep_cls,
561                                nc))) )
562     {
563 #if DEBUG_SQLITE
564       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
565                        "sqlite",
566                        "Iteration completes after %u results\n",
567                        nc->count);
568 #endif
569     END:
570       nc->iter (nc->iter_cls, 
571                 NULL, NULL, 0, NULL, 0, 0, 0, 
572                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
573       nc->prep (nc->prep_cls, NULL);
574       GNUNET_free (nc);
575       return;
576     }
577
578   type = sqlite3_column_int (nc->stmt, 0);
579   priority = sqlite3_column_int (nc->stmt, 1);
580   anonymity = sqlite3_column_int (nc->stmt, 2);
581   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
582   hsize = sqlite3_column_bytes (nc->stmt, 4);
583   key = sqlite3_column_blob (nc->stmt, 4);
584   size = sqlite3_column_bytes (nc->stmt, 5);
585   rowid = sqlite3_column_int64 (nc->stmt, 6);
586   if (hsize != sizeof (GNUNET_HashCode))
587     {
588       GNUNET_break (0);
589       if (SQLITE_OK != sqlite3_reset (nc->stmt))
590         LOG_SQLITE (plugin, NULL,
591                     GNUNET_ERROR_TYPE_ERROR |
592                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
593       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
594         plugin->env->duc (plugin->env->cls,
595                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
596       goto END;
597     }
598 #if DEBUG_SQLITE
599   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
600                    "sqlite",
601                    "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
602                    type, 
603                    GNUNET_h2s(key),
604                    priority,
605                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
606                    (long long) expiration.abs_value);
607 #endif
608   if (size > MAX_ITEM_SIZE)
609     {
610       GNUNET_break (0);
611       if (SQLITE_OK != sqlite3_reset (nc->stmt))
612         LOG_SQLITE (plugin, NULL,
613                     GNUNET_ERROR_TYPE_ERROR |
614                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
615       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
616         plugin->env->duc (plugin->env->cls,
617                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); 
618       goto END;
619     }
620   {
621     char data[size];
622     
623     memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
624     nc->count++;
625     nc->last_rowid = rowid;
626     nc->lastPriority = priority;
627     nc->lastKey = *key;
628     if (SQLITE_OK != sqlite3_reset (nc->stmt))
629       LOG_SQLITE (plugin, NULL,
630                   GNUNET_ERROR_TYPE_ERROR |
631                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
632     ret = nc->iter (nc->iter_cls, nc,
633                     &nc->lastKey,
634                     size, data,
635                     type, priority,
636                     anonymity, expiration,
637                     rowid);
638   }
639   switch (ret)
640     {
641     case GNUNET_SYSERR:
642       nc->end_it = GNUNET_YES;
643       break;
644     case GNUNET_NO:
645       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
646         {
647 #if DEBUG_SQLITE
648           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
649                            "sqlite",
650                            "Removed entry %llu (%u bytes)\n",
651                            (unsigned long long) rowid,
652                            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
653 #endif
654           plugin->env->duc (plugin->env->cls,
655                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
656         }
657       break;
658     case GNUNET_YES:
659       break;
660     default:
661       GNUNET_break (0);
662     }
663 }
664
665
666 /**
667  * Function invoked on behalf of a "PluginIterator" asking the
668  * database plugin to call the iterator with the next item.
669  *
670  * @param next_cls whatever argument was given
671  *        to the PluginIterator as "next_cls".
672  * @param end_it set to GNUNET_YES if we
673  *        should terminate the iteration early
674  *        (iterator should be still called once more
675  *         to signal the end of the iteration).
676  */
677 static void 
678 sqlite_next_request (void *next_cls,
679                      int end_it)
680 {
681   struct NextContext * nc= next_cls;
682
683   if (GNUNET_YES == end_it)
684     nc->end_it = GNUNET_YES;
685   nc->plugin->next_task_nc = nc;
686   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
687                                                     nc);
688 }
689
690
691 /**
692  * Store an item in the datastore.
693  *
694  * @param cls closure
695  * @param key key for the item
696  * @param size number of bytes in data
697  * @param data content stored
698  * @param type type of the content
699  * @param priority priority of the content
700  * @param anonymity anonymity-level for the content
701  * @param replication replication-level for the content
702  * @param expiration expiration time for the content
703  * @param msg set to an error message
704  * @return GNUNET_OK on success
705  */
706 static int
707 sqlite_plugin_put (void *cls,
708                    const GNUNET_HashCode *key,
709                    uint32_t size,
710                    const void *data,
711                    enum GNUNET_BLOCK_Type type,
712                    uint32_t priority,
713                    uint32_t anonymity,
714                    uint32_t replication,
715                    struct GNUNET_TIME_Absolute expiration,
716                    char ** msg)
717 {
718   struct Plugin *plugin = cls;
719   int n;
720   int ret;
721   sqlite3_stmt *stmt;
722   GNUNET_HashCode vhash;
723
724   if (size > MAX_ITEM_SIZE)
725     return GNUNET_SYSERR;
726 #if DEBUG_SQLITE
727   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
728                    "sqlite",
729                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
730                    type, 
731                    GNUNET_h2s(key),
732                    priority,
733                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
734                    (long long) expiration.abs_value);
735 #endif
736   GNUNET_CRYPTO_hash (data, size, &vhash);
737   stmt = plugin->insertContent;
738   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
739       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
740       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
741       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
742       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
743       (SQLITE_OK !=
744        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
745                           SQLITE_TRANSIENT)) ||
746       (SQLITE_OK !=
747        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
748                           SQLITE_TRANSIENT))
749       || (SQLITE_OK !=
750           sqlite3_bind_blob (stmt, 8, data, size,
751                              SQLITE_TRANSIENT)))
752     {
753       LOG_SQLITE (plugin,
754                   msg,
755                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
756       if (SQLITE_OK != sqlite3_reset (stmt))
757         LOG_SQLITE (plugin, NULL,
758                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
759       return GNUNET_SYSERR;
760     }
761   n = sqlite3_step (stmt);
762   switch (n)
763     {
764     case SQLITE_DONE:
765       plugin->env->duc (plugin->env->cls,
766                         size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
767 #if DEBUG_SQLITE
768       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
769                        "sqlite",
770                        "Stored new entry (%u bytes)\n",
771                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
772 #endif
773       ret = GNUNET_OK;
774       break;
775     case SQLITE_BUSY:      
776       GNUNET_break (0);
777       LOG_SQLITE (plugin, msg,
778                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
779                   "sqlite3_step");
780       ret = GNUNET_SYSERR;
781       break;
782     default:
783       LOG_SQLITE (plugin, msg,
784                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
785                   "sqlite3_step");
786       if (SQLITE_OK != sqlite3_reset (stmt))
787         LOG_SQLITE (plugin, NULL,
788                     GNUNET_ERROR_TYPE_ERROR |
789                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
790       database_shutdown (plugin);
791       database_setup (plugin->env->cfg,
792                       plugin);
793       return GNUNET_SYSERR;    
794     }
795   if (SQLITE_OK != sqlite3_reset (stmt))
796     LOG_SQLITE (plugin, NULL,
797                 GNUNET_ERROR_TYPE_ERROR |
798                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
799   return ret;
800 }
801
802
803 /**
804  * Update the priority for a particular key in the datastore.  If
805  * the expiration time in value is different than the time found in
806  * the datastore, the higher value should be kept.  For the
807  * anonymity level, the lower value is to be used.  The specified
808  * priority should be added to the existing priority, ignoring the
809  * priority in value.
810  *
811  * Note that it is possible for multiple values to match this put.
812  * In that case, all of the respective values are updated.
813  *
814  * @param cls the plugin context (state for this module)
815  * @param uid unique identifier of the datum
816  * @param delta by how much should the priority
817  *     change?  If priority + delta < 0 the
818  *     priority should be set to 0 (never go
819  *     negative).
820  * @param expire new expiration time should be the
821  *     MAX of any existing expiration time and
822  *     this value
823  * @param msg set to an error message
824  * @return GNUNET_OK on success
825  */
826 static int
827 sqlite_plugin_update (void *cls,
828                       uint64_t uid,
829                       int delta, struct GNUNET_TIME_Absolute expire,
830                       char **msg)
831 {
832   struct Plugin *plugin = cls;
833   int n;
834
835   sqlite3_bind_int (plugin->updPrio, 1, delta);
836   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
837   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
838   n = sqlite3_step (plugin->updPrio);
839   sqlite3_reset (plugin->updPrio);
840   switch (n)
841     {
842     case SQLITE_DONE:
843 #if DEBUG_SQLITE
844       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
845                        "sqlite",
846                        "Block updated\n");
847 #endif
848       return GNUNET_OK;
849     case SQLITE_BUSY:
850       LOG_SQLITE (plugin, msg,
851                   GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
852                   "sqlite3_step");
853       return GNUNET_NO;
854     default:
855       LOG_SQLITE (plugin, msg,
856                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
857                   "sqlite3_step");
858       return GNUNET_SYSERR;
859     }
860 }
861
862
863 /**
864  * Internal context for an iteration.
865  */
866 struct ZeroIterContext
867 {
868   /**
869    * First iterator statement for zero-anonymity iteration.
870    */
871   sqlite3_stmt *stmt_1;
872
873   /**
874    * Second iterator statement for zero-anonymity iteration.
875    */
876   sqlite3_stmt *stmt_2;
877
878   /**
879    * Desired type for blocks returned by this iterator.
880    */
881   enum GNUNET_BLOCK_Type type;
882 };
883
884
885 /**
886  * Prepare our SQL query to obtain the next record from the database.
887  *
888  * @param cls our "struct ZeroIterContext"
889  * @param nc NULL to terminate the iteration, otherwise our context for
890  *           getting the next result.
891  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
892  *         GNUNET_SYSERR on error (or end of iteration)
893  */
894 static int
895 zero_iter_next_prepare (void *cls,
896                         struct NextContext *nc)
897 {
898   struct ZeroIterContext *ic = cls;
899   struct Plugin *plugin;
900   int ret;
901
902   if (nc == NULL)
903     {
904 #if DEBUG_SQLITE
905       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906                   "Asked to clean up iterator state.\n");
907 #endif
908       sqlite3_finalize (ic->stmt_1);
909       sqlite3_finalize (ic->stmt_2);
910       return GNUNET_SYSERR;
911     }
912   plugin = nc->plugin;
913
914   /* first try iter 1 */
915 #if DEBUG_SQLITE
916   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
917               "Restricting to results larger than the last priority %u and key `%s'\n",
918               nc->lastPriority,
919               GNUNET_h2s (&nc->lastKey));
920 #endif
921   if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
922        (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2, 
923                                         &nc->lastKey, 
924                                         sizeof (GNUNET_HashCode),
925                                         SQLITE_TRANSIENT)) )
926     {
927       LOG_SQLITE (plugin, NULL,
928                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
929       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
930         LOG_SQLITE (plugin, NULL,
931                     GNUNET_ERROR_TYPE_ERROR | 
932                     GNUNET_ERROR_TYPE_BULK, 
933                     "sqlite3_reset");  
934       return GNUNET_SYSERR;
935     }
936   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
937     {      
938 #if DEBUG_SQLITE
939       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940                   "Result found using iterator 1\n");
941 #endif
942       nc->stmt = ic->stmt_1;
943       return GNUNET_OK;
944     }
945   if (ret != SQLITE_DONE)
946     {
947       LOG_SQLITE (plugin, NULL,
948                   GNUNET_ERROR_TYPE_ERROR |
949                   GNUNET_ERROR_TYPE_BULK,
950                   "sqlite3_step");
951       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
952         LOG_SQLITE (plugin, NULL,
953                     GNUNET_ERROR_TYPE_ERROR | 
954                     GNUNET_ERROR_TYPE_BULK, 
955                     "sqlite3_reset");  
956       return GNUNET_SYSERR;
957     }
958   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
959     LOG_SQLITE (plugin, NULL,
960                 GNUNET_ERROR_TYPE_ERROR | 
961                 GNUNET_ERROR_TYPE_BULK, 
962                 "sqlite3_reset");  
963
964   /* now try iter 2 */
965   if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
966     {
967       LOG_SQLITE (plugin, NULL,           
968                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
969       return GNUNET_SYSERR;
970     }
971   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
972     {
973 #if DEBUG_SQLITE
974       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                   "Result found using iterator 2\n");
976 #endif
977       nc->stmt = ic->stmt_2;
978       return GNUNET_OK;
979     }
980   if (ret != SQLITE_DONE)
981     {
982       LOG_SQLITE (plugin, NULL,
983                   GNUNET_ERROR_TYPE_ERROR |
984                   GNUNET_ERROR_TYPE_BULK,
985                   "sqlite3_step");
986       if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
987         LOG_SQLITE (plugin, NULL,
988                     GNUNET_ERROR_TYPE_ERROR |
989                     GNUNET_ERROR_TYPE_BULK,
990                     "sqlite3_reset");
991       return GNUNET_SYSERR;
992     }
993   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
994     LOG_SQLITE (plugin, NULL,
995                 GNUNET_ERROR_TYPE_ERROR |
996                 GNUNET_ERROR_TYPE_BULK,
997                 "sqlite3_reset");
998 #if DEBUG_SQLITE
999   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000               "No result found using either iterator\n");
1001 #endif
1002   return GNUNET_NO;
1003 }
1004
1005
1006 /**
1007  * Select a subset of the items in the datastore and call
1008  * the given iterator for each of them.
1009  *
1010  * @param cls our plugin context
1011  * @param type entries of which type should be considered?
1012  *        Use 0 for any type.
1013  * @param iter function to call on each matching value;
1014  *        will be called once with a NULL value at the end
1015  * @param iter_cls closure for iter
1016  */
1017 static void
1018 sqlite_plugin_iter_zero_anonymity (void *cls,
1019                                    enum GNUNET_BLOCK_Type type,
1020                                    PluginIterator iter,
1021                                    void *iter_cls)
1022 {
1023   struct Plugin *plugin = cls;
1024   struct GNUNET_TIME_Absolute now;
1025   struct NextContext *nc;
1026   struct ZeroIterContext *ic;
1027   sqlite3_stmt *stmt_1;
1028   sqlite3_stmt *stmt_2;
1029   char *q;
1030
1031   now = GNUNET_TIME_absolute_get ();
1032   GNUNET_asprintf (&q, 
1033                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1034                    "WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND hash < ?2) "
1035                    "ORDER BY hash DESC LIMIT 1",
1036                    (unsigned long long) now.abs_value);
1037   if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
1038     {
1039       LOG_SQLITE (plugin, NULL,
1040                   GNUNET_ERROR_TYPE_ERROR |
1041                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1042       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1043       GNUNET_free (q);
1044       return;
1045     }
1046   GNUNET_free (q);
1047   GNUNET_asprintf (&q, 
1048                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1049                    "WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0) "
1050                    "ORDER BY prio DESC, hash DESC LIMIT 1",
1051                    (unsigned long long) now.abs_value);
1052   if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1053     {
1054       LOG_SQLITE (plugin, NULL,
1055                   GNUNET_ERROR_TYPE_ERROR |
1056                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1057       sqlite3_finalize (stmt_1);
1058       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1059       GNUNET_free (q);
1060       return;
1061     }
1062   GNUNET_free (q);
1063   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1064                       sizeof(struct ZeroIterContext));
1065   nc->plugin = plugin;
1066   nc->iter = iter;
1067   nc->iter_cls = iter_cls;
1068   nc->stmt = NULL;
1069   ic = (struct ZeroIterContext*) &nc[1];
1070   ic->stmt_1 = stmt_1;
1071   ic->stmt_2 = stmt_2;
1072   ic->type = type;
1073   nc->prep = &zero_iter_next_prepare;
1074   nc->prep_cls = ic;
1075   nc->lastPriority = INT32_MAX;
1076   memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1077   sqlite_next_request (nc, GNUNET_NO);
1078 }
1079
1080
1081 /**
1082  * Closure for 'all_next_prepare'.
1083  */
1084 struct IterateAllContext
1085 {
1086
1087   /**
1088    * Offset for the current result.
1089    */
1090   unsigned int off;
1091
1092   /**
1093    * Requested block type.
1094    */
1095   enum GNUNET_BLOCK_Type type;
1096
1097   /**
1098    * Our prepared statement.
1099    */
1100   sqlite3_stmt *stmt;
1101 };
1102
1103
1104 /**
1105  * Call sqlite using the already prepared query to get
1106  * the next result.
1107  *
1108  * @param cls context with the prepared query (of type 'struct IterateAllContext')
1109  * @param nc generic context with the prepared query
1110  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1111  *        there are no more results 
1112  */
1113 static int
1114 all_next_prepare (void *cls,
1115                   struct NextContext *nc)
1116 {
1117   struct IterateAllContext *iac = cls;
1118   struct Plugin *plugin;
1119   int ret;  
1120   unsigned int sqoff;
1121
1122   if (nc == NULL)
1123     {
1124 #if DEBUG_SQLITE
1125       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1126                   "Asked to clean up iterator state.\n");
1127 #endif
1128       if (NULL != iac->stmt)
1129         {
1130           sqlite3_finalize (iac->stmt);
1131           iac->stmt = NULL;
1132         }
1133       return GNUNET_SYSERR;
1134     }
1135   plugin = nc->plugin;
1136   sqoff = 1;
1137   ret = SQLITE_OK;
1138   if (iac->type != 0)
1139     ret = sqlite3_bind_int (nc->stmt, sqoff++, iac->type);
1140   if (SQLITE_OK == ret)
1141     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, iac->off++);
1142   if (ret != SQLITE_OK)
1143     {
1144       LOG_SQLITE (plugin, NULL,
1145                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1146       return GNUNET_SYSERR;
1147     }
1148   ret = sqlite3_step (nc->stmt);
1149   switch (ret)
1150     {
1151     case SQLITE_ROW:
1152       return GNUNET_OK;  
1153     case SQLITE_DONE:
1154       return GNUNET_NO;
1155     default:
1156       LOG_SQLITE (plugin, NULL,
1157                   GNUNET_ERROR_TYPE_ERROR |
1158                   GNUNET_ERROR_TYPE_BULK,
1159                   "sqlite3_step");
1160       return GNUNET_SYSERR;
1161     }
1162 }
1163
1164
1165 /**
1166  * Select a subset of the items in the datastore and call
1167  * the given iterator for each of them.
1168  *
1169  * @param cls our plugin context
1170  * @param type entries of which type should be considered?
1171  *        Use 0 for any type.
1172  * @param iter function to call on each matching value;
1173  *        will be called once with a NULL value at the end
1174  * @param iter_cls closure for iter
1175  */
1176 static void
1177 sqlite_plugin_iter_all_now (void *cls,
1178                             enum GNUNET_BLOCK_Type type,
1179                             PluginIterator iter,
1180                             void *iter_cls)
1181 {
1182   struct Plugin *plugin = cls;
1183   struct NextContext *nc;
1184   struct IterateAllContext *iac;
1185   sqlite3_stmt *stmt;
1186   const char *q;
1187
1188   if (type == 0)
1189     q = "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?";
1190   else
1191     q = "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE type=? ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?";
1192   if (sq_prepare (plugin->dbh, q, &stmt) != SQLITE_OK)
1193     {
1194       LOG_SQLITE (plugin, NULL,
1195                   GNUNET_ERROR_TYPE_ERROR |
1196                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1197       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1198       return;
1199     }
1200   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1201                       sizeof(struct IterateAllContext));
1202   iac = (struct IterateAllContext*) &nc[1];
1203   nc->plugin = plugin;
1204   nc->iter = iter;
1205   nc->iter_cls = iter_cls;
1206   nc->stmt = stmt;
1207   nc->prep = &all_next_prepare;
1208   nc->prep_cls = iac;
1209   iac->off = 0;
1210   iac->type = type;
1211   iac->stmt = stmt; /* alias used for freeing at the end */
1212   sqlite_next_request (nc, GNUNET_NO);
1213 }
1214
1215
1216 /**
1217  * Context for get_next_prepare.
1218  */
1219 struct GetNextContext
1220 {
1221
1222   /**
1223    * Our prepared statement.
1224    */
1225   sqlite3_stmt *stmt;
1226
1227   /**
1228    * Plugin handle.
1229    */
1230   struct Plugin *plugin;
1231
1232   /**
1233    * Key for the query.
1234    */
1235   GNUNET_HashCode key;
1236
1237   /**
1238    * Vhash for the query.
1239    */
1240   GNUNET_HashCode vhash;
1241
1242   /**
1243    * Expected total number of results.
1244    */
1245   unsigned int total;
1246
1247   /**
1248    * Offset to add for the selected result.
1249    */
1250   unsigned int off;
1251
1252   /**
1253    * Is vhash set?
1254    */
1255   int have_vhash;
1256
1257   /**
1258    * Desired block type.
1259    */
1260   enum GNUNET_BLOCK_Type type;
1261
1262 };
1263
1264
1265 /**
1266  * Prepare the stmt in 'nc' for the next round of execution, selecting the
1267  * next return value.
1268  *
1269  * @param cls our "struct GetNextContext*"
1270  * @param nc the general context
1271  * @return GNUNET_YES if there are more results, 
1272  *         GNUNET_NO if there are no more results,
1273  *         GNUNET_SYSERR on internal error
1274  */
1275 static int
1276 get_next_prepare (void *cls,
1277                   struct NextContext *nc)
1278 {
1279   struct GetNextContext *gnc = cls;
1280   int ret;
1281   int limit_off;
1282   unsigned int sqoff;
1283
1284   if (nc == NULL)
1285     {
1286       sqlite3_finalize (gnc->stmt);
1287       return GNUNET_SYSERR;
1288     }
1289   if (nc->count == gnc->total)
1290     return GNUNET_NO;
1291   if (nc->count + gnc->off == gnc->total)
1292     nc->last_rowid = 0;
1293   if (nc->count == 0)
1294     limit_off = gnc->off;
1295   else
1296     limit_off = 0;
1297   sqlite3_reset (nc->stmt);
1298   sqoff = 1;
1299   ret = sqlite3_bind_blob (nc->stmt,
1300                            sqoff++,
1301                            &gnc->key, 
1302                            sizeof (GNUNET_HashCode),
1303                            SQLITE_TRANSIENT);
1304   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1305     ret = sqlite3_bind_blob (nc->stmt,
1306                              sqoff++,
1307                              &gnc->vhash,
1308                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1309   if ((gnc->type != 0) && (ret == SQLITE_OK))
1310     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1311   if (ret == SQLITE_OK)
1312     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
1313   if (ret != SQLITE_OK)
1314     return GNUNET_SYSERR;
1315 #if DEBUG_SQLITE 
1316   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1317                    "sqlite",
1318                    "Preparing to GET for key `%s' with type %d at offset %u\n",
1319                    GNUNET_h2s (&gnc->key),
1320                    gnc->type,
1321                    limit_off);
1322 #endif
1323   ret = sqlite3_step (nc->stmt);
1324   switch (ret)
1325     {
1326     case SQLITE_ROW:
1327       return GNUNET_OK;  
1328     case SQLITE_DONE:
1329       return GNUNET_NO;
1330     default:
1331       LOG_SQLITE (gnc->plugin, NULL,
1332                   GNUNET_ERROR_TYPE_ERROR |
1333                   GNUNET_ERROR_TYPE_BULK,
1334                   "sqlite3_step");
1335       return GNUNET_SYSERR;
1336     }
1337 }
1338
1339
1340 /**
1341  * Iterate over the results for a particular key
1342  * in the datastore.
1343  *
1344  * @param cls closure
1345  * @param key key to match, never NULL
1346  * @param vhash hash of the value, maybe NULL (to
1347  *        match all values that have the right key).
1348  *        Note that for DBlocks there is no difference
1349  *        betwen key and vhash, but for other blocks
1350  *        there may be!
1351  * @param type entries of which type are relevant?
1352  *     Use 0 for any type.
1353  * @param iter function to call on each matching value;
1354  *        will be called once with a NULL value at the end
1355  * @param iter_cls closure for iter
1356  */
1357 static void
1358 sqlite_plugin_get (void *cls,
1359                    const GNUNET_HashCode *key,
1360                    const GNUNET_HashCode *vhash,
1361                    enum GNUNET_BLOCK_Type type,
1362                    PluginIterator iter, void *iter_cls)
1363 {
1364   struct Plugin *plugin = cls;
1365   struct GetNextContext *gnc;
1366   struct NextContext *nc;
1367   int ret;
1368   int total;
1369   sqlite3_stmt *stmt;
1370   char scratch[256];
1371   unsigned int sqoff;
1372
1373   GNUNET_assert (iter != NULL);
1374   GNUNET_assert (key != NULL);
1375   GNUNET_snprintf (scratch, sizeof (scratch),
1376                    "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
1377                    vhash == NULL ? "" : " AND vhash=?",
1378                    type  == 0    ? "" : " AND type=?");
1379   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1380     {
1381       LOG_SQLITE (plugin, NULL,
1382                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1383       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1384       return;
1385     }
1386   sqoff = 1;
1387   ret = sqlite3_bind_blob (stmt, sqoff++,
1388                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1389   if ((vhash != NULL) && (ret == SQLITE_OK))
1390     ret = sqlite3_bind_blob (stmt, sqoff++,
1391                              vhash,
1392                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1393   if ((type != 0) && (ret == SQLITE_OK))
1394     ret = sqlite3_bind_int (stmt, sqoff++, type);
1395   if (SQLITE_OK != ret)
1396     {
1397       LOG_SQLITE (plugin, NULL,
1398                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1399       sqlite3_finalize (stmt);
1400       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1401       return;
1402     }
1403   ret = sqlite3_step (stmt);
1404   if (ret != SQLITE_ROW)
1405     {
1406       LOG_SQLITE (plugin, NULL,
1407                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1408                   "sqlite_step");
1409       sqlite3_finalize (stmt);
1410       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1411       return;
1412     }
1413   total = sqlite3_column_int (stmt, 0);
1414   sqlite3_finalize (stmt);
1415   if (0 == total)
1416     {
1417       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1418       return;
1419     }
1420   GNUNET_snprintf (scratch, sizeof (scratch),
1421                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1422                    "FROM gn090 WHERE hash=?%s%s "
1423                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
1424                    vhash == NULL ? "" : " AND vhash=?",
1425                    type == 0 ? "" : " AND type=?");
1426
1427   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1428     {
1429       LOG_SQLITE (plugin, NULL,
1430                   GNUNET_ERROR_TYPE_ERROR |
1431                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1432       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1433       return;
1434     }
1435   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1436                       sizeof(struct GetNextContext));
1437   nc->plugin = plugin;
1438   nc->iter = iter;
1439   nc->iter_cls = iter_cls;
1440   nc->stmt = stmt;
1441   gnc = (struct GetNextContext*) &nc[1];
1442   gnc->total = total;
1443   gnc->type = type;
1444   gnc->key = *key;
1445   gnc->plugin = plugin;
1446   gnc->stmt = stmt; /* alias used for freeing at the end! */
1447   if (NULL != vhash)
1448     {
1449       gnc->have_vhash = GNUNET_YES;
1450       gnc->vhash = *vhash;
1451     }
1452   gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1453   nc->prep = &get_next_prepare;
1454   nc->prep_cls = gnc;
1455   sqlite_next_request (nc, GNUNET_NO);
1456 }
1457
1458
1459 /**
1460  * Execute statement that gets a row and call the callback
1461  * with the result.  Resets the statement afterwards.
1462  *
1463  * @param plugin the plugin
1464  * @param stmt the statement
1465  * @param iter iterator to call
1466  * @param iter_cls closure for 'iter'
1467  */
1468 static void
1469 execute_get (struct Plugin *plugin,
1470              sqlite3_stmt *stmt,
1471              PluginIterator iter, void *iter_cls)
1472 {
1473   int n;
1474   struct GNUNET_TIME_Absolute expiration;
1475   unsigned long long rowid;
1476   unsigned int size;
1477   int ret;
1478
1479   n = sqlite3_step (stmt);
1480   switch (n)
1481     {
1482     case SQLITE_ROW:
1483       size = sqlite3_column_bytes (stmt, 5);
1484       rowid = sqlite3_column_int64 (stmt, 6);
1485       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1486         {
1487           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
1488                            "sqlite",
1489                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
1490           if (SQLITE_OK != sqlite3_reset (stmt))
1491             LOG_SQLITE (plugin, NULL,
1492                         GNUNET_ERROR_TYPE_ERROR |
1493                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1494           if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1495             plugin->env->duc (plugin->env->cls,
1496                               - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));        
1497           break;
1498         }
1499       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1500       ret = iter (iter_cls,
1501                   NULL,
1502                   sqlite3_column_blob (stmt, 4) /* key */,
1503                   size,
1504                   sqlite3_column_blob (stmt, 5) /* data */, 
1505                   sqlite3_column_int (stmt, 0) /* type */,
1506                   sqlite3_column_int (stmt, 1) /* priority */,
1507                   sqlite3_column_int (stmt, 2) /* anonymity */,
1508                   expiration,
1509                   rowid);
1510       if (SQLITE_OK != sqlite3_reset (stmt))
1511         LOG_SQLITE (plugin, NULL,
1512                     GNUNET_ERROR_TYPE_ERROR |
1513                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1514       if ( (GNUNET_NO == ret) &&
1515            (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1516         plugin->env->duc (plugin->env->cls,
1517                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
1518       return;
1519     case SQLITE_DONE:
1520       /* database must be empty */
1521       if (SQLITE_OK != sqlite3_reset (stmt))
1522         LOG_SQLITE (plugin, NULL,
1523                     GNUNET_ERROR_TYPE_ERROR |
1524                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1525       break;
1526     case SQLITE_BUSY:    
1527     case SQLITE_ERROR:
1528     case SQLITE_MISUSE:
1529     default:
1530       LOG_SQLITE (plugin, NULL,
1531                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
1532                   "sqlite3_step");
1533       if (SQLITE_OK != sqlite3_reset (stmt))
1534         LOG_SQLITE (plugin, NULL,
1535                     GNUNET_ERROR_TYPE_ERROR |
1536                     GNUNET_ERROR_TYPE_BULK,
1537                     "sqlite3_reset");
1538       GNUNET_break (0);
1539       database_shutdown (plugin);
1540       database_setup (plugin->env->cfg,
1541                       plugin);
1542       break;
1543     }
1544   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,             
1545         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1546 }
1547
1548
1549 /**
1550  * Get a random item for replication.  Returns a single, not expired, random item
1551  * from those with the highest replication counters.  The item's 
1552  * replication counter is decremented by one IF it was positive before.
1553  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1554  *
1555  * @param cls closure
1556  * @param iter function to call the value (once only).
1557  * @param iter_cls closure for iter
1558  */
1559 static void
1560 sqlite_plugin_replication_get (void *cls,
1561                                PluginIterator iter, void *iter_cls)
1562 {
1563   struct Plugin *plugin = cls;
1564   sqlite3_stmt *stmt;
1565   struct GNUNET_TIME_Absolute now;
1566
1567 #if DEBUG_SQLITE
1568   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1569                    "sqlite",
1570                    "Getting random block based on replication order.\n");
1571 #endif
1572   stmt = plugin->selRepl;
1573   now = GNUNET_TIME_absolute_get ();
1574   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1575     {
1576       LOG_SQLITE (plugin, NULL,           
1577                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1578       if (SQLITE_OK != sqlite3_reset (stmt))
1579         LOG_SQLITE (plugin, NULL,
1580                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1581       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1582             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1583       return;
1584     }
1585   execute_get (plugin, stmt, iter, iter_cls);
1586 }
1587
1588
1589
1590 /**
1591  * Get a random item that has expired or has low priority.
1592  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1593  *
1594  * @param cls closure
1595  * @param iter function to call the value (once only).
1596  * @param iter_cls closure for iter
1597  */
1598 static void
1599 sqlite_plugin_expiration_get (void *cls,
1600                               PluginIterator iter, void *iter_cls)
1601 {
1602   struct Plugin *plugin = cls;
1603   sqlite3_stmt *stmt;
1604   struct GNUNET_TIME_Absolute now;
1605
1606 #if DEBUG_SQLITE
1607   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1608                    "sqlite",
1609                    "Getting random block based on expiration and priority order.\n");
1610 #endif
1611   now = GNUNET_TIME_absolute_get ();
1612   stmt = plugin->selExpi;
1613   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1614     {
1615       LOG_SQLITE (plugin, NULL,           
1616                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1617       if (SQLITE_OK != sqlite3_reset (stmt))
1618         LOG_SQLITE (plugin, NULL,
1619                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1620       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1621             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1622       return;
1623     }
1624   execute_get (plugin, stmt, iter, iter_cls);
1625 }
1626
1627
1628 /**
1629  * Drop database.
1630  *
1631  * @param cls our plugin context
1632  */
1633 static void 
1634 sqlite_plugin_drop (void *cls)
1635 {
1636   struct Plugin *plugin = cls;
1637   plugin->drop_on_shutdown = GNUNET_YES;
1638 }
1639
1640
1641 /**
1642  * Get an estimate of how much space the database is
1643  * currently using.
1644  *
1645  * @param cls the 'struct Plugin'
1646  * @return the size of the database on disk (estimate)
1647  */
1648 static unsigned long long
1649 sqlite_plugin_get_size (void *cls)
1650 {
1651   struct Plugin *plugin = cls;
1652   sqlite3_stmt *stmt;
1653   uint64_t pages;
1654   uint64_t page_size;
1655 #if ENULL_DEFINED
1656   char *e;
1657 #endif
1658
1659   if (SQLITE_VERSION_NUMBER < 3006000)
1660     {
1661       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1662                        "datastore-sqlite",
1663                        _("sqlite version to old to determine size, assuming zero\n"));
1664       return 0;
1665     }
1666   if (SQLITE_OK !=
1667       sqlite3_exec (plugin->dbh,
1668                     "VACUUM", NULL, NULL, ENULL))
1669     abort ();
1670   CHECK (SQLITE_OK ==
1671          sqlite3_exec (plugin->dbh,
1672                        "VACUUM", NULL, NULL, ENULL));
1673   CHECK (SQLITE_OK ==
1674          sqlite3_exec (plugin->dbh,
1675                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1676   CHECK (SQLITE_OK ==
1677          sq_prepare (plugin->dbh,
1678                      "PRAGMA page_count",
1679                      &stmt));
1680   if (SQLITE_ROW ==
1681       sqlite3_step (stmt))
1682     pages = sqlite3_column_int64 (stmt, 0);
1683   else
1684     pages = 0;
1685   sqlite3_finalize (stmt);
1686   CHECK (SQLITE_OK ==
1687          sq_prepare (plugin->dbh,
1688                      "PRAGMA page_size",
1689                      &stmt));
1690   CHECK (SQLITE_ROW ==
1691          sqlite3_step (stmt));
1692   page_size = sqlite3_column_int64 (stmt, 0);
1693   sqlite3_finalize (stmt);
1694   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1695               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1696               (unsigned long long) pages,
1697               (unsigned long long) page_size);
1698   return  pages * page_size;
1699 }
1700                                          
1701
1702 /**
1703  * Entry point for the plugin.
1704  *
1705  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1706  * @return NULL on error, othrewise the plugin context
1707  */
1708 void *
1709 libgnunet_plugin_datastore_sqlite_init (void *cls)
1710 {
1711   static struct Plugin plugin;
1712   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1713   struct GNUNET_DATASTORE_PluginFunctions *api;
1714
1715   if (plugin.env != NULL)
1716     return NULL; /* can only initialize once! */
1717   memset (&plugin, 0, sizeof(struct Plugin));
1718   plugin.env = env;
1719   if (GNUNET_OK !=
1720       database_setup (env->cfg, &plugin))
1721     {
1722       database_shutdown (&plugin);
1723       return NULL;
1724     }
1725   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1726   api->cls = &plugin;
1727   api->get_size = &sqlite_plugin_get_size;
1728   api->put = &sqlite_plugin_put;
1729   api->next_request = &sqlite_next_request;
1730   api->get = &sqlite_plugin_get;
1731   api->replication_get = &sqlite_plugin_replication_get;
1732   api->expiration_get = &sqlite_plugin_expiration_get;
1733   api->update = &sqlite_plugin_update;
1734   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1735   api->iter_all_now = &sqlite_plugin_iter_all_now;
1736   api->drop = &sqlite_plugin_drop;
1737   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1738                    "sqlite", _("Sqlite database running\n"));
1739   return api;
1740 }
1741
1742
1743 /**
1744  * Exit point from the plugin.
1745  *
1746  * @param cls the plugin context (as returned by "init")
1747  * @return always NULL
1748  */
1749 void *
1750 libgnunet_plugin_datastore_sqlite_done (void *cls)
1751 {
1752   char *fn;
1753   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1754   struct Plugin *plugin = api->cls;
1755
1756 #if DEBUG_SQLITE
1757   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1758                    "sqlite",
1759                    "sqlite plugin is doneing\n");
1760 #endif
1761
1762   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1763     {
1764 #if DEBUG_SQLITE
1765       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1766                        "sqlite",
1767                        "Canceling next task\n");
1768 #endif
1769       GNUNET_SCHEDULER_cancel (plugin->next_task);
1770       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1771 #if DEBUG_SQLITE
1772       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1773                        "sqlite",
1774                        "Prep'ing next task\n");
1775 #endif
1776       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1777       GNUNET_free (plugin->next_task_nc);
1778       plugin->next_task_nc = NULL;
1779     }
1780   fn = NULL;
1781   if (plugin->drop_on_shutdown)
1782     fn = GNUNET_strdup (plugin->fn);
1783 #if DEBUG_SQLITE
1784   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1785                    "sqlite",
1786                    "Shutting down database\n");
1787 #endif
1788   database_shutdown (plugin);
1789   plugin->env = NULL; 
1790   GNUNET_free (api);
1791   if (fn != NULL)
1792     {
1793       if (0 != UNLINK(fn))
1794         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1795                                   "unlink",
1796                                   fn);
1797       GNUNET_free (fn);
1798     }
1799 #if DEBUG_SQLITE
1800   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1801                    "sqlite",
1802                    "sqlite plugin is finished doneing\n");
1803 #endif
1804   return NULL;
1805 }
1806
1807 /* end of plugin_datastore_sqlite.c */