seaspidery
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 #define DEBUG_SQLITE GNUNET_YES
32
33
34 /**
35  * Log an error message at log-level 'level' that indicates
36  * a failure of the command 'cmd' on file 'filename'
37  * with the message given by strerror(errno).
38  */
39 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40
41 #define SELECT_IT_LOW_PRIORITY_1 \
42   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
43   "ORDER BY hash ASC LIMIT 1"
44
45 #define SELECT_IT_LOW_PRIORITY_2 \
46   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
47   "ORDER BY prio ASC, hash ASC LIMIT 1"
48
49 #define SELECT_IT_NON_ANONYMOUS_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
51   " ORDER BY hash DESC LIMIT 1"
52
53 #define SELECT_IT_NON_ANONYMOUS_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
55   " ORDER BY prio DESC, hash DESC LIMIT 1"
56
57 #define SELECT_IT_EXPIRATION_TIME_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
59   " ORDER BY hash ASC LIMIT 1"
60
61 #define SELECT_IT_EXPIRATION_TIME_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
63   " ORDER BY expire ASC, hash ASC LIMIT 1"
64
65 #define SELECT_IT_MIGRATION_ORDER_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
67   " ORDER BY hash DESC LIMIT 1"
68
69 #define SELECT_IT_MIGRATION_ORDER_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
71   " ORDER BY expire DESC, hash DESC LIMIT 1"
72
73 /**
74  * After how many ms "busy" should a DB operation fail for good?
75  * A low value makes sure that we are more responsive to requests
76  * (especially PUTs).  A high value guarantees a higher success
77  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78  *
79  * The default value of 250ms should ensure that users do not experience
80  * huge latencies while at the same time allowing operations to succeed
81  * with reasonable probability.
82  */
83 #define BUSY_TIMEOUT_MS 250
84
85
86
87 /**
88  * Context for all functions in this plugin.
89  */
90 struct Plugin 
91 {
92   /**
93    * Our execution environment.
94    */
95   struct GNUNET_DATASTORE_PluginEnvironment *env;
96
97   /**
98    * Database filename.
99    */
100   char *fn;
101
102   /**
103    * Native SQLite database handle.
104    */
105   sqlite3 *dbh;
106
107   /**
108    * Precompiled SQL for update.
109    */
110   sqlite3_stmt *updPrio;
111
112   /**
113    * Precompiled SQL for insertion.
114    */
115   sqlite3_stmt *insertContent;
116
117   /**
118    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
119    */
120   struct NextContext *next_task_nc;
121
122   /**
123    * Pending task with scheduler for running the next request.
124    */
125   GNUNET_SCHEDULER_TaskIdentifier next_task;
126
127   /**
128    * Should the database be dropped on shutdown?
129    */
130   int drop_on_shutdown;
131
132 };
133
134
135 /**
136  * @brief Prepare a SQL statement
137  *
138  * @param dbh handle to the database
139  * @param zSql SQL statement, UTF-8 encoded
140  * @param ppStmt set to the prepared statement
141  * @return 0 on success
142  */
143 static int
144 sq_prepare (sqlite3 * dbh, const char *zSql,
145             sqlite3_stmt ** ppStmt)
146 {
147   char *dummy;
148   return sqlite3_prepare_v2 (dbh,
149                              zSql,
150                              strlen (zSql), ppStmt, (const char **) &dummy);
151 }
152
153
154 /**
155  * Create our database indices.
156  * 
157  * @param dbh handle to the database
158  */
159 static void
160 create_indices (sqlite3 * dbh)
161 {
162   /* create indices */
163   sqlite3_exec (dbh,
164                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
165   sqlite3_exec (dbh,
166                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
167                 NULL, NULL);
168   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
169                 NULL);
170   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
171                 NULL);
172   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
173                 NULL, NULL);
174   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
175                 NULL, NULL, NULL);
176   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
177                 NULL, NULL);
178 }
179
180
181
182 #if 0
183 #define CHECK(a) GNUNET_break(a)
184 #define ENULL NULL
185 #else
186 #define ENULL &e
187 #define ENULL_DEFINED 1
188 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
189 #endif
190
191
192
193
194 /**
195  * Initialize the database connections and associated
196  * data structures (create tables and indices
197  * as needed as well).
198  *
199  * @param cfg our configuration
200  * @param plugin the plugin context (state for this module)
201  * @return GNUNET_OK on success
202  */
203 static int
204 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
205                 struct Plugin *plugin)
206 {
207   sqlite3_stmt *stmt;
208   char *afsdir;
209 #if ENULL_DEFINED
210   char *e;
211 #endif
212   
213   if (GNUNET_OK != 
214       GNUNET_CONFIGURATION_get_value_filename (cfg,
215                                                "datastore-sqlite",
216                                                "FILENAME",
217                                                &afsdir))
218     {
219       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
220                        "sqlite",
221                        _("Option `%s' in section `%s' missing in configuration!\n"),
222                        "FILENAME",
223                        "datastore-sqlite");
224       return GNUNET_SYSERR;
225     }
226   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
227     {
228       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
229         {
230           GNUNET_break (0);
231           GNUNET_free (afsdir);
232           return GNUNET_SYSERR;
233         }
234       /* database is new or got deleted, reset payload to zero! */
235       plugin->env->duc (plugin->env->cls, 0);
236     }
237 #ifdef ENABLE_NLS
238   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
239                                        nl_langinfo (CODESET));
240 #else
241   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
242                                        "UTF-8");   /* good luck */
243 #endif
244   GNUNET_free (afsdir);
245   
246   /* Open database and precompile statements */
247   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
248     {
249       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
250                        "sqlite",
251                        _("Unable to initialize SQLite: %s.\n"),
252                        sqlite3_errmsg (plugin->dbh));
253       return GNUNET_SYSERR;
254     }
255   CHECK (SQLITE_OK ==
256          sqlite3_exec (plugin->dbh,
257                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
258   CHECK (SQLITE_OK ==
259          sqlite3_exec (plugin->dbh,
260                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
261   CHECK (SQLITE_OK ==
262          sqlite3_exec (plugin->dbh,
263                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
264   CHECK (SQLITE_OK ==
265          sqlite3_exec (plugin->dbh,
266                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh, 
269                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
270
271   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
272
273
274   /* We have to do it here, because otherwise precompiling SQL might fail */
275   CHECK (SQLITE_OK ==
276          sq_prepare (plugin->dbh,
277                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
278                      &stmt));
279   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
280        (sqlite3_exec (plugin->dbh,
281                       "CREATE TABLE gn080 ("
282                       "  size INT4 NOT NULL DEFAULT 0,"
283                       "  type INT4 NOT NULL DEFAULT 0,"
284                       "  prio INT4 NOT NULL DEFAULT 0,"
285                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
286                       "  expire INT8 NOT NULL DEFAULT 0,"
287                       "  hash TEXT NOT NULL DEFAULT '',"
288                       "  vhash TEXT NOT NULL DEFAULT '',"
289                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
290                       NULL) != SQLITE_OK) )
291     {
292       LOG_SQLITE (plugin, NULL,
293                   GNUNET_ERROR_TYPE_ERROR, 
294                   "sqlite3_exec");
295       sqlite3_finalize (stmt);
296       return GNUNET_SYSERR;
297     }
298   sqlite3_finalize (stmt);
299   create_indices (plugin->dbh);
300
301   CHECK (SQLITE_OK ==
302          sq_prepare (plugin->dbh,
303                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
304                      &stmt));
305   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
306        (sqlite3_exec (plugin->dbh,
307                       "CREATE TABLE gn071 ("
308                       "  key TEXT NOT NULL DEFAULT '',"
309                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
310                       NULL) != SQLITE_OK) )
311     {
312       LOG_SQLITE (plugin, NULL,
313                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
314       sqlite3_finalize (stmt);
315       return GNUNET_SYSERR;
316     }
317   sqlite3_finalize (stmt);
318
319   if ((sq_prepare (plugin->dbh,
320                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
321                    "_ROWID_ = ?",
322                    &plugin->updPrio) != SQLITE_OK) ||
323       (sq_prepare (plugin->dbh,
324                    "INSERT INTO gn080 (size, type, prio, "
325                    "anonLevel, expire, hash, vhash, value) VALUES "
326                    "(?, ?, ?, ?, ?, ?, ?, ?)",
327                    &plugin->insertContent) != SQLITE_OK))
328     {
329       LOG_SQLITE (plugin, NULL,
330                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
331       return GNUNET_SYSERR;
332     }
333   return GNUNET_OK;
334 }
335
336
337 /**
338  * Shutdown database connection and associate data
339  * structures.
340  * @param plugin the plugin context (state for this module)
341  */
342 static void
343 database_shutdown (struct Plugin *plugin)
344 {
345   if (plugin->updPrio != NULL)
346     sqlite3_finalize (plugin->updPrio);
347   if (plugin->insertContent != NULL)
348     sqlite3_finalize (plugin->insertContent);
349   sqlite3_close (plugin->dbh);
350   GNUNET_free_non_null (plugin->fn);
351 }
352
353
354 /**
355  * Delete the database entry with the given
356  * row identifier.
357  *
358  * @param plugin the plugin context (state for this module)
359  * @param rid the ID of the row to delete
360  */
361 static int
362 delete_by_rowid (struct Plugin* plugin, 
363                  unsigned long long rid)
364 {
365   sqlite3_stmt *stmt;
366
367   if (sq_prepare (plugin->dbh,
368                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
369     {
370       LOG_SQLITE (plugin, NULL,
371                   GNUNET_ERROR_TYPE_ERROR |
372                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
373       return GNUNET_SYSERR;
374     }
375   sqlite3_bind_int64 (stmt, 1, rid);
376   if (SQLITE_DONE != sqlite3_step (stmt))
377     {
378       LOG_SQLITE (plugin, NULL,
379                   GNUNET_ERROR_TYPE_ERROR |
380                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
381       sqlite3_finalize (stmt);
382       return GNUNET_SYSERR;
383     }
384   sqlite3_finalize (stmt);
385   return GNUNET_OK;
386 }
387
388
389 /**
390  * Context for the universal iterator.
391  */
392 struct NextContext;
393
394 /**
395  * Type of a function that will prepare
396  * the next iteration.
397  *
398  * @param cls closure
399  * @param nc the next context; NULL for the last
400  *         call which gives the callback a chance to
401  *         clean up the closure
402  * @return GNUNET_OK on success, GNUNET_NO if there are
403  *         no more values, GNUNET_SYSERR on error
404  */
405 typedef int (*PrepareFunction)(void *cls,
406                                struct NextContext *nc);
407
408
409 /**
410  * Context we keep for the "next request" callback.
411  */
412 struct NextContext
413 {
414   /**
415    * Internal state.
416    */ 
417   struct Plugin *plugin;
418
419   /**
420    * Function to call on the next value.
421    */
422   PluginIterator iter;
423
424   /**
425    * Closure for iter.
426    */
427   void *iter_cls;
428
429   /**
430    * Function to call to prepare the next
431    * iteration.
432    */
433   PrepareFunction prep;
434
435   /**
436    * Closure for prep.
437    */
438   void *prep_cls;
439
440   /**
441    * Statement that the iterator will get the data
442    * from (updated or set by prep).
443    */ 
444   sqlite3_stmt *stmt;
445
446   /**
447    * Row ID of the last result.
448    */
449   unsigned long long last_rowid;
450
451   /**
452    * Key of the last result.
453    */
454   GNUNET_HashCode lastKey;  
455
456   /**
457    * Expiration time of the last value visited.
458    */
459   struct GNUNET_TIME_Absolute lastExpiration;
460
461   /**
462    * Priority of the last value visited.
463    */ 
464   unsigned int lastPriority; 
465
466   /**
467    * Number of results processed so far.
468    */
469   unsigned int count;
470
471   /**
472    * Set to GNUNET_YES if we must stop now.
473    */
474   int end_it;
475 };
476
477
478 /**
479  * Continuation of "sqlite_next_request".
480  *
481  * @param cls the next context
482  * @param tc the task context (unused)
483  */
484 static void 
485 sqlite_next_request_cont (void *cls,
486                           const struct GNUNET_SCHEDULER_TaskContext *tc)
487 {
488   struct NextContext * nc = cls;
489   struct Plugin *plugin;
490   unsigned long long rowid;
491   sqlite3_stmt *stmtd;
492   int ret;
493   unsigned int type;
494   unsigned int size;
495   unsigned int priority;
496   unsigned int anonymity;
497   struct GNUNET_TIME_Absolute expiration;
498   const GNUNET_HashCode *key;
499   const void *data;
500   
501   plugin = nc->plugin;
502   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
503   plugin->next_task_nc = NULL;
504   if ( (GNUNET_YES == nc->end_it) ||
505        (GNUNET_OK != (nc->prep(nc->prep_cls,
506                                nc))) )
507     {
508     END:
509       nc->iter (nc->iter_cls, 
510                 NULL, NULL, 0, NULL, 0, 0, 0, 
511                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
512       nc->prep (nc->prep_cls, NULL);
513       GNUNET_free (nc);
514       return;
515     }
516
517   rowid = sqlite3_column_int64 (nc->stmt, 7);
518   nc->last_rowid = rowid;
519   type = sqlite3_column_int (nc->stmt, 1);
520   size = sqlite3_column_bytes (nc->stmt, 6);
521   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
522     {
523       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
524                        "sqlite",
525                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
526       if (SQLITE_OK != sqlite3_reset (nc->stmt))
527         LOG_SQLITE (nc->plugin, NULL,
528                     GNUNET_ERROR_TYPE_ERROR |
529                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
530       if (sq_prepare
531           (nc->plugin->dbh,
532            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
533            &stmtd) != SQLITE_OK)
534         {
535           LOG_SQLITE (nc->plugin, NULL,
536                       GNUNET_ERROR_TYPE_ERROR |
537                       GNUNET_ERROR_TYPE_BULK, 
538                       "sq_prepare");
539           goto END;
540         }
541
542       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
543         LOG_SQLITE (nc->plugin, NULL,
544                     GNUNET_ERROR_TYPE_ERROR |
545                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
546       if (SQLITE_DONE != sqlite3_step (stmtd))
547         LOG_SQLITE (nc->plugin, NULL,
548                     GNUNET_ERROR_TYPE_ERROR |
549                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
550       if (SQLITE_OK != sqlite3_finalize (stmtd))
551         LOG_SQLITE (nc->plugin, NULL,
552                     GNUNET_ERROR_TYPE_ERROR |
553                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
554       goto END;
555     }
556
557   priority = sqlite3_column_int (nc->stmt, 2);
558   anonymity = sqlite3_column_int (nc->stmt, 3);
559   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4);
560   key = sqlite3_column_blob (nc->stmt, 5);
561   nc->lastPriority = priority;
562   nc->lastExpiration = expiration;
563   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
564   data = sqlite3_column_blob (nc->stmt, 6);
565   nc->count++;
566   ret = nc->iter (nc->iter_cls,
567                   nc,
568                   key,
569                   size,
570                   data, 
571                   type,
572                   priority,
573                   anonymity,
574                   expiration,
575                   rowid);
576   if (ret == GNUNET_SYSERR)
577     {
578       nc->end_it = GNUNET_YES;
579       return;
580     }
581 #if DEBUG_SQLITE
582   if (ret == GNUNET_NO)
583     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
584                      "sqlite",
585                      "Asked to remove entry %llu (%u bytes)\n",
586                      (unsigned long long) rowid,
587                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
588 #endif
589   if ( (ret == GNUNET_NO) &&
590        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
591     {
592       plugin->env->duc (plugin->env->cls,
593                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
594 #if DEBUG_SQLITE
595       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
596                        "sqlite",
597                        "Removed entry %llu (%u bytes)\n",
598                        (unsigned long long) rowid,
599                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
600 #endif
601     }
602 }
603
604
605 /**
606  * Function invoked on behalf of a "PluginIterator"
607  * asking the database plugin to call the iterator
608  * with the next item.
609  *
610  * @param next_cls whatever argument was given
611  *        to the PluginIterator as "next_cls".
612  * @param end_it set to GNUNET_YES if we
613  *        should terminate the iteration early
614  *        (iterator should be still called once more
615  *         to signal the end of the iteration).
616  */
617 static void 
618 sqlite_next_request (void *next_cls,
619                      int end_it)
620 {
621   struct NextContext * nc= next_cls;
622
623   if (GNUNET_YES == end_it)
624     nc->end_it = GNUNET_YES;
625   nc->plugin->next_task_nc = nc;
626   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
627                                                     nc);
628 }
629
630
631
632 /**
633  * Store an item in the datastore.
634  *
635  * @param cls closure
636  * @param key key for the item
637  * @param size number of bytes in data
638  * @param data content stored
639  * @param type type of the content
640  * @param priority priority of the content
641  * @param anonymity anonymity-level for the content
642  * @param expiration expiration time for the content
643  * @param msg set to an error message
644  * @return GNUNET_OK on success
645  */
646 static int
647 sqlite_plugin_put (void *cls,
648                    const GNUNET_HashCode * key,
649                    uint32_t size,
650                    const void *data,
651                    enum GNUNET_BLOCK_Type type,
652                    uint32_t priority,
653                    uint32_t anonymity,
654                    struct GNUNET_TIME_Absolute expiration,
655                    char ** msg)
656 {
657   struct Plugin *plugin = cls;
658   int n;
659   sqlite3_stmt *stmt;
660   GNUNET_HashCode vhash;
661
662 #if DEBUG_SQLITE
663   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
664                    "sqlite",
665                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
666                    type, 
667                    GNUNET_h2s(key),
668                    priority,
669                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
670                    (long long) expiration.abs_value);
671 #endif
672   GNUNET_CRYPTO_hash (data, size, &vhash);
673   stmt = plugin->insertContent;
674   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
675       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
676       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
677       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
678       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
679       (SQLITE_OK !=
680        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
681                           SQLITE_TRANSIENT)) ||
682       (SQLITE_OK !=
683        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
684                           SQLITE_TRANSIENT))
685       || (SQLITE_OK !=
686           sqlite3_bind_blob (stmt, 8, data, size,
687                              SQLITE_TRANSIENT)))
688     {
689       LOG_SQLITE (plugin,
690                   msg,
691                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
692       if (SQLITE_OK != sqlite3_reset (stmt))
693         LOG_SQLITE (plugin, NULL,
694                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
695       return GNUNET_SYSERR;
696     }
697   n = sqlite3_step (stmt);
698   if (n != SQLITE_DONE) 
699     {
700       if (n == SQLITE_BUSY)
701         {
702           LOG_SQLITE (plugin, msg,
703                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
704           sqlite3_reset (stmt);
705           GNUNET_break (0);
706           return GNUNET_NO;
707         }
708       LOG_SQLITE (plugin, msg,
709                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
710       sqlite3_reset (stmt);
711       database_shutdown (plugin);
712       database_setup (plugin->env->cfg,
713                       plugin);
714       return GNUNET_SYSERR;
715     }
716   if (SQLITE_OK != sqlite3_reset (stmt))
717     LOG_SQLITE (plugin, NULL,
718                 GNUNET_ERROR_TYPE_ERROR |
719                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
720   plugin->env->duc (plugin->env->cls,
721                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
722 #if DEBUG_SQLITE
723   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
724                    "sqlite",
725                    "Stored new entry (%u bytes)\n",
726            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
727 #endif
728   return GNUNET_OK;
729 }
730
731
732 /**
733  * Update the priority for a particular key in the datastore.  If
734  * the expiration time in value is different than the time found in
735  * the datastore, the higher value should be kept.  For the
736  * anonymity level, the lower value is to be used.  The specified
737  * priority should be added to the existing priority, ignoring the
738  * priority in value.
739  *
740  * Note that it is possible for multiple values to match this put.
741  * In that case, all of the respective values are updated.
742  *
743  * @param cls the plugin context (state for this module)
744  * @param uid unique identifier of the datum
745  * @param delta by how much should the priority
746  *     change?  If priority + delta < 0 the
747  *     priority should be set to 0 (never go
748  *     negative).
749  * @param expire new expiration time should be the
750  *     MAX of any existing expiration time and
751  *     this value
752  * @param msg set to an error message
753  * @return GNUNET_OK on success
754  */
755 static int
756 sqlite_plugin_update (void *cls,
757                       uint64_t uid,
758                       int delta, struct GNUNET_TIME_Absolute expire,
759                       char **msg)
760 {
761   struct Plugin *plugin = cls;
762   int n;
763
764   sqlite3_bind_int (plugin->updPrio, 1, delta);
765   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
766   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
767   n = sqlite3_step (plugin->updPrio);
768   if (n != SQLITE_DONE)
769     LOG_SQLITE (plugin, msg,
770                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
771                 "sqlite3_step");
772 #if DEBUG_SQLITE
773   else
774     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
775                      "sqlite",
776                      "Block updated\n");
777 #endif
778   sqlite3_reset (plugin->updPrio);
779
780   if (n == SQLITE_BUSY)
781     return GNUNET_NO;
782   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
783 }
784
785
786 /**
787  * Internal context for an iteration.
788  */
789 struct IterContext
790 {
791   /**
792    * FIXME.
793    */
794   sqlite3_stmt *stmt_1;
795
796   /**
797    * FIXME.
798    */
799   sqlite3_stmt *stmt_2;
800
801   /**
802    * FIXME.
803    */
804   int is_asc;
805
806   /**
807    * FIXME.
808    */
809   int is_prio;
810
811   /**
812    * FIXME.
813    */
814   int is_migr;
815
816   /**
817    * FIXME.
818    */
819   int limit_nonanonymous;
820
821   /**
822    * Desired type for blocks returned by this iterator.
823    */
824   enum GNUNET_BLOCK_Type type;
825 };
826
827
828 /**
829  * Prepare our SQL query to obtain the next record from the database.
830  *
831  * @param cls our "struct IterContext"
832  * @param nc NULL to terminate the iteration, otherwise our context for
833  *           getting the next result.
834  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
835  *         GNUNET_SYSERR on error (or end of iteration)
836  */
837 static int
838 iter_next_prepare (void *cls,
839                    struct NextContext *nc)
840 {
841   struct IterContext *ic = cls;
842   struct Plugin *plugin;
843   int ret;
844
845   if (nc == NULL)
846     {
847 #if DEBUG_SQLITE
848       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
849                   "Asked to clean up iterator state.\n");
850 #endif
851       sqlite3_finalize (ic->stmt_1);
852       sqlite3_finalize (ic->stmt_2);
853       return GNUNET_SYSERR;
854     }
855   sqlite3_reset (ic->stmt_1);
856   sqlite3_reset (ic->stmt_2);
857   plugin = nc->plugin;
858   if (ic->is_prio)
859     {
860 #if DEBUG_SQLITE
861       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
862                   "Restricting to results larger than the last priority %u\n",
863                   nc->lastPriority);
864 #endif
865       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
866       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
867     }
868   else
869     {
870 #if DEBUG_SQLITE
871       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872                   "Restricting to results larger than the last expiration %llu\n",
873                   (unsigned long long) nc->lastExpiration.abs_value);
874 #endif
875       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value);
876       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value);
877     }
878 #if DEBUG_SQLITE
879   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880               "Restricting to results larger than the last key `%s'\n",
881               GNUNET_h2s(&nc->lastKey));
882 #endif
883   sqlite3_bind_blob (ic->stmt_1, 2, 
884                      &nc->lastKey, 
885                      sizeof (GNUNET_HashCode),
886                      SQLITE_TRANSIENT);
887   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
888     {      
889 #if DEBUG_SQLITE
890       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891                   "Result found using iterator 1\n");
892 #endif
893       nc->stmt = ic->stmt_1;
894       return GNUNET_OK;
895     }
896   if (ret != SQLITE_DONE)
897     {
898       LOG_SQLITE (plugin, NULL,
899                   GNUNET_ERROR_TYPE_ERROR |
900                   GNUNET_ERROR_TYPE_BULK,
901                   "sqlite3_step");
902       return GNUNET_SYSERR;
903     }
904   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
905     LOG_SQLITE (plugin, NULL,
906                 GNUNET_ERROR_TYPE_ERROR | 
907                 GNUNET_ERROR_TYPE_BULK, 
908                 "sqlite3_reset");
909   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
910     {
911 #if DEBUG_SQLITE
912       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                   "Result found using iterator 2\n");
914 #endif
915       nc->stmt = ic->stmt_2;
916       return GNUNET_OK;
917     }
918   if (ret != SQLITE_DONE)
919     {
920       LOG_SQLITE (plugin, NULL,
921                   GNUNET_ERROR_TYPE_ERROR |
922                   GNUNET_ERROR_TYPE_BULK,
923                   "sqlite3_step");
924       return GNUNET_SYSERR;
925     }
926   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
927     LOG_SQLITE (plugin, NULL,
928                 GNUNET_ERROR_TYPE_ERROR |
929                 GNUNET_ERROR_TYPE_BULK,
930                 "sqlite3_reset");
931 #if DEBUG_SQLITE
932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933               "No result found using either iterator\n");
934 #endif
935   return GNUNET_NO;
936 }
937
938
939 /**
940  * Call a method for each key in the database and
941  * call the callback method on it.
942  *
943  * @param plugin our plugin context
944  * @param type entries of which type should be considered?
945  * @param is_asc are we iterating in ascending order?
946  * @param is_prio are we iterating by priority (otherwise by expiration)
947  * @param is_migr are we iterating in migration order?
948  * @param limit_nonanonymous are we restricting results to those with anonymity
949  *              level zero?
950  * @param stmt_str_1 first SQL statement to execute
951  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
952  * @param iter function to call on each matching value;
953  *        will be called once with a NULL value at the end
954  * @param iter_cls closure for iter
955  */
956 static void
957 basic_iter (struct Plugin *plugin,
958             enum GNUNET_BLOCK_Type type,
959             int is_asc,
960             int is_prio,
961             int is_migr,
962             int limit_nonanonymous,
963             const char *stmt_str_1,
964             const char *stmt_str_2,
965             PluginIterator iter,
966             void *iter_cls)
967 {
968   struct NextContext *nc;
969   struct IterContext *ic;
970   sqlite3_stmt *stmt_1;
971   sqlite3_stmt *stmt_2;
972
973 #if DEBUG_SQLITE
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "At %llu, using queries `%s' and `%s'\n",
976               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value,
977               stmt_str_1,
978               stmt_str_2);
979 #endif
980   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
981     {
982       LOG_SQLITE (plugin, NULL,
983                   GNUNET_ERROR_TYPE_ERROR |
984                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
985       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
986       return;
987     }
988   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
989     {
990       LOG_SQLITE (plugin, NULL,
991                   GNUNET_ERROR_TYPE_ERROR |
992                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
993       sqlite3_finalize (stmt_1);
994       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
995       return;
996     }
997   nc = GNUNET_malloc (sizeof(struct NextContext) + 
998                       sizeof(struct IterContext));
999   nc->plugin = plugin;
1000   nc->iter = iter;
1001   nc->iter_cls = iter_cls;
1002   nc->stmt = NULL;
1003   ic = (struct IterContext*) &nc[1];
1004   ic->stmt_1 = stmt_1;
1005   ic->stmt_2 = stmt_2;
1006   ic->type = type;
1007   ic->is_asc = is_asc;
1008   ic->is_prio = is_prio;
1009   ic->is_migr = is_migr;
1010   ic->limit_nonanonymous = limit_nonanonymous;
1011   nc->prep = &iter_next_prepare;
1012   nc->prep_cls = ic;
1013   if (is_asc)
1014     {
1015       nc->lastPriority = 0;
1016       nc->lastExpiration.abs_value = 0;
1017       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1018     }
1019   else
1020     {
1021       nc->lastPriority = 0x7FFFFFFF;
1022       nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL;
1023       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1024     }
1025   sqlite_next_request (nc, GNUNET_NO);
1026 }
1027
1028
1029 /**
1030  * Select a subset of the items in the datastore and call
1031  * the given iterator for each of them.
1032  *
1033  * @param cls our plugin context
1034  * @param type entries of which type should be considered?
1035  *        Use 0 for any type.
1036  * @param iter function to call on each matching value;
1037  *        will be called once with a NULL value at the end
1038  * @param iter_cls closure for iter
1039  */
1040 static void
1041 sqlite_plugin_iter_low_priority (void *cls,
1042                                  enum GNUNET_BLOCK_Type type,
1043                                  PluginIterator iter,
1044                                  void *iter_cls)
1045 {
1046   basic_iter (cls,
1047               type, 
1048               GNUNET_YES, GNUNET_YES, 
1049               GNUNET_NO, GNUNET_NO,
1050               SELECT_IT_LOW_PRIORITY_1,
1051               SELECT_IT_LOW_PRIORITY_2, 
1052               iter, iter_cls);
1053 }
1054
1055
1056 /**
1057  * Select a subset of the items in the datastore and call
1058  * the given iterator for each of them.
1059  *
1060  * @param cls our plugin context
1061  * @param type entries of which type should be considered?
1062  *        Use 0 for any type.
1063  * @param iter function to call on each matching value;
1064  *        will be called once with a NULL value at the end
1065  * @param iter_cls closure for iter
1066  */
1067 static void
1068 sqlite_plugin_iter_zero_anonymity (void *cls,
1069                                    enum GNUNET_BLOCK_Type type,
1070                                    PluginIterator iter,
1071                                    void *iter_cls)
1072 {
1073   struct GNUNET_TIME_Absolute now;
1074   char *q1;
1075   char *q2;
1076
1077   now = GNUNET_TIME_absolute_get ();
1078   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1079                    (unsigned long long) now.abs_value);
1080   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1081                    (unsigned long long) now.abs_value);
1082   basic_iter (cls,
1083               type, 
1084               GNUNET_NO, GNUNET_YES, 
1085               GNUNET_NO, GNUNET_YES,
1086               q1,
1087               q2,
1088               iter, iter_cls);
1089   GNUNET_free (q1);
1090   GNUNET_free (q2);
1091 }
1092
1093
1094
1095 /**
1096  * Select a subset of the items in the datastore and call
1097  * the given iterator for each of them.
1098  *
1099  * @param cls our plugin context
1100  * @param type entries of which type should be considered?
1101  *        Use 0 for any type.
1102  * @param iter function to call on each matching value;
1103  *        will be called once with a NULL value at the end
1104  * @param iter_cls closure for iter
1105  */
1106 static void
1107 sqlite_plugin_iter_ascending_expiration (void *cls,
1108                                          enum GNUNET_BLOCK_Type type,
1109                                          PluginIterator iter,
1110                                          void *iter_cls)
1111 {
1112   struct GNUNET_TIME_Absolute now;
1113   char *q1;
1114   char *q2;
1115
1116   now = GNUNET_TIME_absolute_get ();
1117   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1118                    (unsigned long long) 0*now.abs_value);
1119   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1120                    (unsigned long long) 0*now.abs_value);
1121   basic_iter (cls,
1122               type, 
1123               GNUNET_YES, GNUNET_NO, 
1124               GNUNET_NO, GNUNET_NO,
1125               q1, q2,
1126               iter, iter_cls);
1127   GNUNET_free (q1);
1128   GNUNET_free (q2);
1129 }
1130
1131
1132 /**
1133  * Select a subset of the items in the datastore and call
1134  * the given iterator for each of them.
1135  *
1136  * @param cls our plugin context
1137  * @param type entries of which type should be considered?
1138  *        Use 0 for any type.
1139  * @param iter function to call on each matching value;
1140  *        will be called once with a NULL value at the end
1141  * @param iter_cls closure for iter
1142  */
1143 static void
1144 sqlite_plugin_iter_migration_order (void *cls,
1145                                     enum GNUNET_BLOCK_Type type,
1146                                     PluginIterator iter,
1147                                     void *iter_cls)
1148 {
1149   struct GNUNET_TIME_Absolute now;
1150   char *q;
1151
1152   now = GNUNET_TIME_absolute_get ();
1153   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1154                    (unsigned long long) now.abs_value);
1155   basic_iter (cls,
1156               type, 
1157               GNUNET_NO, GNUNET_NO, 
1158               GNUNET_YES, GNUNET_NO,
1159               SELECT_IT_MIGRATION_ORDER_1,
1160               q,
1161               iter, iter_cls);
1162   GNUNET_free (q);
1163 }
1164
1165
1166 /**
1167  * Call sqlite using the already prepared query to get
1168  * the next result.
1169  *
1170  * @param cls not used
1171  * @param nc context with the prepared query
1172  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1173  *        there are no more results 
1174  */
1175 static int
1176 all_next_prepare (void *cls,
1177                   struct NextContext *nc)
1178 {
1179   struct Plugin *plugin;
1180   int ret;
1181
1182   if (nc == NULL)
1183     {
1184 #if DEBUG_SQLITE
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Asked to clean up iterator state.\n");
1187 #endif
1188       return GNUNET_SYSERR;
1189     }
1190   plugin = nc->plugin;
1191   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1192     {      
1193       return GNUNET_OK;
1194     }
1195   if (ret != SQLITE_DONE)
1196     {
1197       LOG_SQLITE (plugin, NULL,
1198                   GNUNET_ERROR_TYPE_ERROR |
1199                   GNUNET_ERROR_TYPE_BULK,
1200                   "sqlite3_step");
1201       return GNUNET_SYSERR;
1202     }
1203   return GNUNET_NO;
1204 }
1205
1206
1207 /**
1208  * Select a subset of the items in the datastore and call
1209  * the given iterator for each of them.
1210  *
1211  * @param cls our plugin context
1212  * @param type entries of which type should be considered?
1213  *        Use 0 for any type.
1214  * @param iter function to call on each matching value;
1215  *        will be called once with a NULL value at the end
1216  * @param iter_cls closure for iter
1217  */
1218 static void
1219 sqlite_plugin_iter_all_now (void *cls,
1220                             enum GNUNET_BLOCK_Type type,
1221                             PluginIterator iter,
1222                             void *iter_cls)
1223 {
1224   struct Plugin *plugin = cls;
1225   struct NextContext *nc;
1226   sqlite3_stmt *stmt;
1227
1228   if (sq_prepare (plugin->dbh, 
1229                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1230                   &stmt) != SQLITE_OK)
1231     {
1232       LOG_SQLITE (plugin, NULL,
1233                   GNUNET_ERROR_TYPE_ERROR |
1234                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1235       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1236       return;
1237     }
1238   nc = GNUNET_malloc (sizeof(struct NextContext));
1239   nc->plugin = plugin;
1240   nc->iter = iter;
1241   nc->iter_cls = iter_cls;
1242   nc->stmt = stmt;
1243   nc->prep = &all_next_prepare;
1244   nc->prep_cls = NULL;
1245   sqlite_next_request (nc, GNUNET_NO);
1246 }
1247
1248
1249 /**
1250  * FIXME.
1251  */
1252 struct GetNextContext
1253 {
1254
1255   /**
1256    * FIXME.
1257    */
1258   int total;
1259
1260   /**
1261    * FIXME.
1262    */
1263   int off;
1264
1265   /**
1266    * FIXME.
1267    */
1268   int have_vhash;
1269
1270   /**
1271    * FIXME.
1272    */
1273   unsigned int type;
1274
1275   /**
1276    * FIXME.
1277    */
1278   sqlite3_stmt *stmt;
1279
1280   /**
1281    * FIXME.
1282    */
1283   GNUNET_HashCode key;
1284
1285   /**
1286    * FIXME.
1287    */
1288   GNUNET_HashCode vhash;
1289 };
1290
1291
1292
1293 /**
1294  * FIXME.
1295  *
1296  * @param cls our "struct GetNextContext*"
1297  * @param nc FIXME
1298  * @return GNUNET_YES if there are more results, 
1299  *         GNUNET_NO if there are no more results,
1300  *         GNUNET_SYSERR on internal error
1301  */
1302 static int
1303 get_next_prepare (void *cls,
1304                   struct NextContext *nc)
1305 {
1306   struct GetNextContext *gnc = cls;
1307   int sqoff;
1308   int ret;
1309   int limit_off;
1310
1311   if (nc == NULL)
1312     {
1313       sqlite3_finalize (gnc->stmt);
1314       return GNUNET_SYSERR;
1315     }
1316   if (nc->count == gnc->total)
1317     return GNUNET_NO;
1318   if (nc->count + gnc->off == gnc->total)
1319     nc->last_rowid = 0;
1320   if (nc->count == 0)
1321     limit_off = gnc->off;
1322   else
1323     limit_off = 0;
1324   sqoff = 1;
1325   sqlite3_reset (nc->stmt);
1326   ret = sqlite3_bind_blob (nc->stmt,
1327                            sqoff++,
1328                            &gnc->key, 
1329                            sizeof (GNUNET_HashCode),
1330                            SQLITE_TRANSIENT);
1331   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1332     ret = sqlite3_bind_blob (nc->stmt,
1333                              sqoff++,
1334                              &gnc->vhash,
1335                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1336   if ((gnc->type != 0) && (ret == SQLITE_OK))
1337     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1338   if (ret == SQLITE_OK)
1339     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1340   if (ret == SQLITE_OK)
1341     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1342   if (ret != SQLITE_OK)
1343     return GNUNET_SYSERR;
1344   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1345     return GNUNET_NO;
1346   return GNUNET_OK;
1347 }
1348
1349
1350 /**
1351  * Iterate over the results for a particular key
1352  * in the datastore.
1353  *
1354  * @param cls closure
1355  * @param key maybe NULL (to match all entries)
1356  * @param vhash hash of the value, maybe NULL (to
1357  *        match all values that have the right key).
1358  *        Note that for DBlocks there is no difference
1359  *        betwen key and vhash, but for other blocks
1360  *        there may be!
1361  * @param type entries of which type are relevant?
1362  *     Use 0 for any type.
1363  * @param iter function to call on each matching value;
1364  *        will be called once with a NULL value at the end
1365  * @param iter_cls closure for iter
1366  */
1367 static void
1368 sqlite_plugin_get (void *cls,
1369                    const GNUNET_HashCode * key,
1370                    const GNUNET_HashCode * vhash,
1371                    enum GNUNET_BLOCK_Type type,
1372                    PluginIterator iter, void *iter_cls)
1373 {
1374   struct Plugin *plugin = cls;
1375   struct GetNextContext *gpc;
1376   struct NextContext *nc;
1377   int ret;
1378   int total;
1379   sqlite3_stmt *stmt;
1380   char scratch[256];
1381   int sqoff;
1382
1383   GNUNET_assert (iter != NULL);
1384   if (key == NULL)
1385     {
1386       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1387       return;
1388     }
1389   GNUNET_snprintf (scratch, sizeof (scratch),
1390                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1391                    vhash == NULL ? "" : " AND vhash=:2",
1392                    type == 0 ? "" : (vhash ==
1393                                      NULL) ? " AND type=:2" : " AND type=:3");
1394   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1395     {
1396       LOG_SQLITE (plugin, NULL,
1397                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1398       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1399       return;
1400     }
1401   sqoff = 1;
1402   ret = sqlite3_bind_blob (stmt,
1403                            sqoff++,
1404                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1405   if ((vhash != NULL) && (ret == SQLITE_OK))
1406     ret = sqlite3_bind_blob (stmt,
1407                              sqoff++,
1408                              vhash,
1409                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1410   if ((type != 0) && (ret == SQLITE_OK))
1411     ret = sqlite3_bind_int (stmt, sqoff++, type);
1412   if (SQLITE_OK != ret)
1413     {
1414       LOG_SQLITE (plugin, NULL,
1415                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1416       sqlite3_reset (stmt);
1417       sqlite3_finalize (stmt);
1418       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1419       return;
1420     }
1421   ret = sqlite3_step (stmt);
1422   if (ret != SQLITE_ROW)
1423     {
1424       LOG_SQLITE (plugin, NULL,
1425                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1426                   "sqlite_step");
1427       sqlite3_reset (stmt);
1428       sqlite3_finalize (stmt);
1429       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1430       return;
1431     }
1432   total = sqlite3_column_int (stmt, 0);
1433   sqlite3_reset (stmt);
1434   sqlite3_finalize (stmt);
1435   if (0 == total)
1436     {
1437       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1438       return;
1439     }
1440
1441   GNUNET_snprintf (scratch, sizeof (scratch),
1442                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1443                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1444                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1445                    vhash == NULL ? "" : " AND vhash=:2",
1446                    type == 0 ? "" : (vhash ==
1447                                      NULL) ? " AND type=:2" : " AND type=:3",
1448                    sqoff, sqoff + 1);
1449   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1450     {
1451       LOG_SQLITE (plugin, NULL,
1452                   GNUNET_ERROR_TYPE_ERROR |
1453                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1454       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1455       return;
1456     }
1457   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1458                       sizeof(struct GetNextContext));
1459   nc->plugin = plugin;
1460   nc->iter = iter;
1461   nc->iter_cls = iter_cls;
1462   nc->stmt = stmt;
1463   gpc = (struct GetNextContext*) &nc[1];
1464   gpc->total = total;
1465   gpc->type = type;
1466   gpc->key = *key;
1467   gpc->stmt = stmt; /* alias used for freeing at the end! */
1468   if (NULL != vhash)
1469     {
1470       gpc->have_vhash = GNUNET_YES;
1471       gpc->vhash = *vhash;
1472     }
1473   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1474   nc->prep = &get_next_prepare;
1475   nc->prep_cls = gpc;
1476   sqlite_next_request (nc, GNUNET_NO);
1477 }
1478
1479
1480 /**
1481  * Drop database.
1482  *
1483  * @param cls our plugin context
1484  */
1485 static void 
1486 sqlite_plugin_drop (void *cls)
1487 {
1488   struct Plugin *plugin = cls;
1489   plugin->drop_on_shutdown = GNUNET_YES;
1490 }
1491
1492
1493 static unsigned long long
1494 sqlite_plugin_get_size (void *cls)
1495 {
1496   struct Plugin *plugin = cls;
1497   sqlite3_stmt *stmt;
1498   uint64_t pages;
1499   uint64_t page_size;
1500 #if ENULL_DEFINED
1501   char *e;
1502 #endif
1503
1504   if (SQLITE_VERSION_NUMBER < 3006000)
1505     {
1506       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1507                        "datastore-sqlite",
1508                        _("sqlite version to old to determine size, assuming zero\n"));
1509       return 0;
1510     }
1511   CHECK (SQLITE_OK ==
1512          sqlite3_exec (plugin->dbh,
1513                        "VACUUM", NULL, NULL, ENULL));
1514   CHECK (SQLITE_OK ==
1515          sqlite3_exec (plugin->dbh,
1516                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1517   CHECK (SQLITE_OK ==
1518          sq_prepare (plugin->dbh,
1519                      "PRAGMA page_count",
1520                      &stmt));
1521   if (SQLITE_ROW ==
1522       sqlite3_step (stmt))
1523     pages = sqlite3_column_int64 (stmt, 0);
1524   else
1525     pages = 0;
1526   sqlite3_finalize (stmt);
1527   CHECK (SQLITE_OK ==
1528          sq_prepare (plugin->dbh,
1529                      "PRAGMA page_size",
1530                      &stmt));
1531   CHECK (SQLITE_ROW ==
1532          sqlite3_step (stmt));
1533   page_size = sqlite3_column_int64 (stmt, 0);
1534   sqlite3_finalize (stmt);
1535   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1536               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1537               (unsigned long long) pages,
1538               (unsigned long long) page_size);
1539   return  pages * page_size;
1540 }
1541                                          
1542
1543 /**
1544  * Entry point for the plugin.
1545  *
1546  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1547  * @return NULL on error, othrewise the plugin context
1548  */
1549 void *
1550 libgnunet_plugin_datastore_sqlite_init (void *cls)
1551 {
1552   static struct Plugin plugin;
1553   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1554   struct GNUNET_DATASTORE_PluginFunctions *api;
1555
1556   if (plugin.env != NULL)
1557     return NULL; /* can only initialize once! */
1558   memset (&plugin, 0, sizeof(struct Plugin));
1559   plugin.env = env;
1560   if (GNUNET_OK !=
1561       database_setup (env->cfg, &plugin))
1562     {
1563       database_shutdown (&plugin);
1564       return NULL;
1565     }
1566   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1567   api->cls = &plugin;
1568   api->get_size = &sqlite_plugin_get_size;
1569   api->put = &sqlite_plugin_put;
1570   api->next_request = &sqlite_next_request;
1571   api->get = &sqlite_plugin_get;
1572   api->update = &sqlite_plugin_update;
1573   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1574   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1575   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1576   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1577   api->iter_all_now = &sqlite_plugin_iter_all_now;
1578   api->drop = &sqlite_plugin_drop;
1579   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1580                    "sqlite", _("Sqlite database running\n"));
1581   return api;
1582 }
1583
1584
1585 /**
1586  * Exit point from the plugin.
1587  *
1588  * @param cls the plugin context (as returned by "init")
1589  * @return always NULL
1590  */
1591 void *
1592 libgnunet_plugin_datastore_sqlite_done (void *cls)
1593 {
1594   char *fn;
1595   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1596   struct Plugin *plugin = api->cls;
1597
1598   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1599     {
1600       GNUNET_SCHEDULER_cancel (plugin->next_task);
1601       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1602       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1603       GNUNET_free (plugin->next_task_nc);
1604       plugin->next_task_nc = NULL;
1605     }
1606   fn = NULL;
1607   if (plugin->drop_on_shutdown)
1608     fn = GNUNET_strdup (plugin->fn);
1609   database_shutdown (plugin);
1610   plugin->env = NULL; 
1611   GNUNET_free (api);
1612   if (fn != NULL)
1613     {
1614       if (0 != UNLINK(fn))
1615         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1616                                   "unlink",
1617                                   fn);
1618       GNUNET_free (fn);
1619     }
1620   return NULL;
1621 }
1622
1623 /* end of plugin_datastore_sqlite.c */