avoid disconnect on cancel
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "plugin_datastore.h"
29 #include <sqlite3.h>
30
31 #define DEBUG_SQLITE GNUNET_NO
32
33
34 /**
35  * Log an error message at log-level 'level' that indicates
36  * a failure of the command 'cmd' on file 'filename'
37  * with the message given by strerror(errno).
38  */
39 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40
41 #define SELECT_IT_LOW_PRIORITY_1 \
42   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
43   "ORDER BY hash ASC LIMIT 1"
44
45 #define SELECT_IT_LOW_PRIORITY_2 \
46   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
47   "ORDER BY prio ASC, hash ASC LIMIT 1"
48
49 #define SELECT_IT_NON_ANONYMOUS_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
51   " ORDER BY hash DESC LIMIT 1"
52
53 #define SELECT_IT_NON_ANONYMOUS_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
55   " ORDER BY prio DESC, hash DESC LIMIT 1"
56
57 #define SELECT_IT_EXPIRATION_TIME_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
59   " ORDER BY hash ASC LIMIT 1"
60
61 #define SELECT_IT_EXPIRATION_TIME_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
63   " ORDER BY expire ASC, hash ASC LIMIT 1"
64
65 #define SELECT_IT_MIGRATION_ORDER_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
67   " ORDER BY hash DESC LIMIT 1"
68
69 #define SELECT_IT_MIGRATION_ORDER_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
71   " ORDER BY expire DESC, hash DESC LIMIT 1"
72
73 /**
74  * After how many ms "busy" should a DB operation fail for good?
75  * A low value makes sure that we are more responsive to requests
76  * (especially PUTs).  A high value guarantees a higher success
77  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78  *
79  * The default value of 250ms should ensure that users do not experience
80  * huge latencies while at the same time allowing operations to succeed
81  * with reasonable probability.
82  */
83 #define BUSY_TIMEOUT_MS 250
84
85
86
87 /**
88  * Context for all functions in this plugin.
89  */
90 struct Plugin 
91 {
92   /**
93    * Our execution environment.
94    */
95   struct GNUNET_DATASTORE_PluginEnvironment *env;
96
97   /**
98    * Database filename.
99    */
100   char *fn;
101
102   /**
103    * Native SQLite database handle.
104    */
105   sqlite3 *dbh;
106
107   /**
108    * Precompiled SQL for update.
109    */
110   sqlite3_stmt *updPrio;
111
112   /**
113    * Precompiled SQL for insertion.
114    */
115   sqlite3_stmt *insertContent;
116
117   /**
118    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
119    */
120   struct NextContext *next_task_nc;
121
122   /**
123    * Pending task with scheduler for running the next request.
124    */
125   GNUNET_SCHEDULER_TaskIdentifier next_task;
126
127   /**
128    * Should the database be dropped on shutdown?
129    */
130   int drop_on_shutdown;
131
132 };
133
134
135 /**
136  * @brief Prepare a SQL statement
137  *
138  * @param dbh handle to the database
139  * @param zSql SQL statement, UTF-8 encoded
140  * @param ppStmt set to the prepared statement
141  * @return 0 on success
142  */
143 static int
144 sq_prepare (sqlite3 * dbh, const char *zSql,
145             sqlite3_stmt ** ppStmt)
146 {
147   char *dummy;
148   return sqlite3_prepare_v2 (dbh,
149                              zSql,
150                              strlen (zSql), ppStmt, (const char **) &dummy);
151 }
152
153
154 /**
155  * Create our database indices.
156  * 
157  * @param dbh handle to the database
158  */
159 static void
160 create_indices (sqlite3 * dbh)
161 {
162   /* create indices */
163   sqlite3_exec (dbh,
164                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
165   sqlite3_exec (dbh,
166                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
167                 NULL, NULL);
168   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
169                 NULL);
170   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
171                 NULL);
172   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
173                 NULL, NULL);
174   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
175                 NULL, NULL, NULL);
176   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
177                 NULL, NULL);
178 }
179
180
181
182 #if 0
183 #define CHECK(a) GNUNET_break(a)
184 #define ENULL NULL
185 #else
186 #define ENULL &e
187 #define ENULL_DEFINED 1
188 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
189 #endif
190
191
192
193
194 /**
195  * Initialize the database connections and associated
196  * data structures (create tables and indices
197  * as needed as well).
198  *
199  * @param cfg our configuration
200  * @param plugin the plugin context (state for this module)
201  * @return GNUNET_OK on success
202  */
203 static int
204 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
205                 struct Plugin *plugin)
206 {
207   sqlite3_stmt *stmt;
208   char *afsdir;
209 #if ENULL_DEFINED
210   char *e;
211 #endif
212   
213   if (GNUNET_OK != 
214       GNUNET_CONFIGURATION_get_value_filename (cfg,
215                                                "datastore-sqlite",
216                                                "FILENAME",
217                                                &afsdir))
218     {
219       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
220                        "sqlite",
221                        _("Option `%s' in section `%s' missing in configuration!\n"),
222                        "FILENAME",
223                        "datastore-sqlite");
224       return GNUNET_SYSERR;
225     }
226   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
227     {
228       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
229         {
230           GNUNET_break (0);
231           GNUNET_free (afsdir);
232           return GNUNET_SYSERR;
233         }
234       /* database is new or got deleted, reset payload to zero! */
235       plugin->env->duc (plugin->env->cls, 0);
236     }
237   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
238 #ifdef ENABLE_NLS
239                                               nl_langinfo (CODESET)
240 #else
241                                               "UTF-8"   /* good luck */
242 #endif
243                                               );
244   GNUNET_free (afsdir);
245   
246   /* Open database and precompile statements */
247   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
248     {
249       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
250                        "sqlite",
251                        _("Unable to initialize SQLite: %s.\n"),
252                        sqlite3_errmsg (plugin->dbh));
253       return GNUNET_SYSERR;
254     }
255   CHECK (SQLITE_OK ==
256          sqlite3_exec (plugin->dbh,
257                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
258   CHECK (SQLITE_OK ==
259          sqlite3_exec (plugin->dbh,
260                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
261   CHECK (SQLITE_OK ==
262          sqlite3_exec (plugin->dbh,
263                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
264   CHECK (SQLITE_OK ==
265          sqlite3_exec (plugin->dbh,
266                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh, 
269                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
270
271   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
272
273
274   /* We have to do it here, because otherwise precompiling SQL might fail */
275   CHECK (SQLITE_OK ==
276          sq_prepare (plugin->dbh,
277                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
278                      &stmt));
279   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
280        (sqlite3_exec (plugin->dbh,
281                       "CREATE TABLE gn080 ("
282                       "  size INT4 NOT NULL DEFAULT 0,"
283                       "  type INT4 NOT NULL DEFAULT 0,"
284                       "  prio INT4 NOT NULL DEFAULT 0,"
285                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
286                       "  expire INT8 NOT NULL DEFAULT 0,"
287                       "  hash TEXT NOT NULL DEFAULT '',"
288                       "  vhash TEXT NOT NULL DEFAULT '',"
289                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
290                       NULL) != SQLITE_OK) )
291     {
292       LOG_SQLITE (plugin, NULL,
293                   GNUNET_ERROR_TYPE_ERROR, 
294                   "sqlite3_exec");
295       sqlite3_finalize (stmt);
296       return GNUNET_SYSERR;
297     }
298   sqlite3_finalize (stmt);
299   create_indices (plugin->dbh);
300
301   CHECK (SQLITE_OK ==
302          sq_prepare (plugin->dbh,
303                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
304                      &stmt));
305   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
306        (sqlite3_exec (plugin->dbh,
307                       "CREATE TABLE gn071 ("
308                       "  key TEXT NOT NULL DEFAULT '',"
309                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
310                       NULL) != SQLITE_OK) )
311     {
312       LOG_SQLITE (plugin, NULL,
313                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
314       sqlite3_finalize (stmt);
315       return GNUNET_SYSERR;
316     }
317   sqlite3_finalize (stmt);
318
319   if ((sq_prepare (plugin->dbh,
320                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
321                    "_ROWID_ = ?",
322                    &plugin->updPrio) != SQLITE_OK) ||
323       (sq_prepare (plugin->dbh,
324                    "INSERT INTO gn080 (size, type, prio, "
325                    "anonLevel, expire, hash, vhash, value) VALUES "
326                    "(?, ?, ?, ?, ?, ?, ?, ?)",
327                    &plugin->insertContent) != SQLITE_OK))
328     {
329       LOG_SQLITE (plugin, NULL,
330                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
331       return GNUNET_SYSERR;
332     }
333   return GNUNET_OK;
334 }
335
336
337 /**
338  * Shutdown database connection and associate data
339  * structures.
340  * @param plugin the plugin context (state for this module)
341  */
342 static void
343 database_shutdown (struct Plugin *plugin)
344 {
345   if (plugin->updPrio != NULL)
346     sqlite3_finalize (plugin->updPrio);
347   if (plugin->insertContent != NULL)
348     sqlite3_finalize (plugin->insertContent);
349   sqlite3_close (plugin->dbh);
350   GNUNET_free_non_null (plugin->fn);
351 }
352
353
354 /**
355  * Delete the database entry with the given
356  * row identifier.
357  *
358  * @param plugin the plugin context (state for this module)
359  * @param rid the ID of the row to delete
360  */
361 static int
362 delete_by_rowid (struct Plugin* plugin, 
363                  unsigned long long rid)
364 {
365   sqlite3_stmt *stmt;
366
367   if (sq_prepare (plugin->dbh,
368                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
369     {
370       LOG_SQLITE (plugin, NULL,
371                   GNUNET_ERROR_TYPE_ERROR |
372                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
373       return GNUNET_SYSERR;
374     }
375   sqlite3_bind_int64 (stmt, 1, rid);
376   if (SQLITE_DONE != sqlite3_step (stmt))
377     {
378       LOG_SQLITE (plugin, NULL,
379                   GNUNET_ERROR_TYPE_ERROR |
380                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
381       sqlite3_finalize (stmt);
382       return GNUNET_SYSERR;
383     }
384   sqlite3_finalize (stmt);
385   return GNUNET_OK;
386 }
387
388
389 /**
390  * Context for the universal iterator.
391  */
392 struct NextContext;
393
394 /**
395  * Type of a function that will prepare
396  * the next iteration.
397  *
398  * @param cls closure
399  * @param nc the next context; NULL for the last
400  *         call which gives the callback a chance to
401  *         clean up the closure
402  * @return GNUNET_OK on success, GNUNET_NO if there are
403  *         no more values, GNUNET_SYSERR on error
404  */
405 typedef int (*PrepareFunction)(void *cls,
406                                struct NextContext *nc);
407
408
409 /**
410  * Context we keep for the "next request" callback.
411  */
412 struct NextContext
413 {
414   /**
415    * Internal state.
416    */ 
417   struct Plugin *plugin;
418
419   /**
420    * Function to call on the next value.
421    */
422   PluginIterator iter;
423
424   /**
425    * Closure for iter.
426    */
427   void *iter_cls;
428
429   /**
430    * Function to call to prepare the next
431    * iteration.
432    */
433   PrepareFunction prep;
434
435   /**
436    * Closure for prep.
437    */
438   void *prep_cls;
439
440   /**
441    * Statement that the iterator will get the data
442    * from (updated or set by prep).
443    */ 
444   sqlite3_stmt *stmt;
445
446   /**
447    * Row ID of the last result.
448    */
449   unsigned long long last_rowid;
450
451   /**
452    * Key of the last result.
453    */
454   GNUNET_HashCode lastKey;  
455
456   /**
457    * Expiration time of the last value visited.
458    */
459   struct GNUNET_TIME_Absolute lastExpiration;
460
461   /**
462    * Priority of the last value visited.
463    */ 
464   unsigned int lastPriority; 
465
466   /**
467    * Number of results processed so far.
468    */
469   unsigned int count;
470
471   /**
472    * Set to GNUNET_YES if we must stop now.
473    */
474   int end_it;
475 };
476
477
478 /**
479  * Continuation of "sqlite_next_request".
480  *
481  * @param cls the next context
482  * @param tc the task context (unused)
483  */
484 static void 
485 sqlite_next_request_cont (void *cls,
486                           const struct GNUNET_SCHEDULER_TaskContext *tc)
487 {
488   struct NextContext * nc = cls;
489   struct Plugin *plugin;
490   unsigned long long rowid;
491   sqlite3_stmt *stmtd;
492   int ret;
493   unsigned int type;
494   unsigned int size;
495   unsigned int priority;
496   unsigned int anonymity;
497   struct GNUNET_TIME_Absolute expiration;
498   const GNUNET_HashCode *key;
499   const void *data;
500   
501   plugin = nc->plugin;
502   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
503   plugin->next_task_nc = NULL;
504   if ( (GNUNET_YES == nc->end_it) ||
505        (GNUNET_OK != (nc->prep(nc->prep_cls,
506                                nc))) )
507     {
508     END:
509       nc->iter (nc->iter_cls, 
510                 NULL, NULL, 0, NULL, 0, 0, 0, 
511                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
512       nc->prep (nc->prep_cls, NULL);
513       GNUNET_free (nc);
514       return;
515     }
516
517   rowid = sqlite3_column_int64 (nc->stmt, 7);
518   nc->last_rowid = rowid;
519   type = sqlite3_column_int (nc->stmt, 1);
520   size = sqlite3_column_bytes (nc->stmt, 6);
521   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
522     {
523       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
524                        "sqlite",
525                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
526       if (SQLITE_OK != sqlite3_reset (nc->stmt))
527         LOG_SQLITE (nc->plugin, NULL,
528                     GNUNET_ERROR_TYPE_ERROR |
529                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
530       if (sq_prepare
531           (nc->plugin->dbh,
532            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
533            &stmtd) != SQLITE_OK)
534         {
535           LOG_SQLITE (nc->plugin, NULL,
536                       GNUNET_ERROR_TYPE_ERROR |
537                       GNUNET_ERROR_TYPE_BULK, 
538                       "sq_prepare");
539           goto END;
540         }
541
542       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
543         LOG_SQLITE (nc->plugin, NULL,
544                     GNUNET_ERROR_TYPE_ERROR |
545                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
546       if (SQLITE_DONE != sqlite3_step (stmtd))
547         LOG_SQLITE (nc->plugin, NULL,
548                     GNUNET_ERROR_TYPE_ERROR |
549                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
550       if (SQLITE_OK != sqlite3_finalize (stmtd))
551         LOG_SQLITE (nc->plugin, NULL,
552                     GNUNET_ERROR_TYPE_ERROR |
553                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
554       goto END;
555     }
556
557   priority = sqlite3_column_int (nc->stmt, 2);
558   anonymity = sqlite3_column_int (nc->stmt, 3);
559   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
560   key = sqlite3_column_blob (nc->stmt, 5);
561   nc->lastPriority = priority;
562   nc->lastExpiration = expiration;
563   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
564   data = sqlite3_column_blob (nc->stmt, 6);
565   nc->count++;
566   ret = nc->iter (nc->iter_cls,
567                   nc,
568                   key,
569                   size,
570                   data, 
571                   type,
572                   priority,
573                   anonymity,
574                   expiration,
575                   rowid);
576   if (ret == GNUNET_SYSERR)
577     {
578       nc->end_it = GNUNET_YES;
579       return;
580     }
581 #if DEBUG_SQLITE
582   if (ret == GNUNET_NO)
583     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
584                      "sqlite",
585                      "Asked to remove entry %llu (%u bytes)\n",
586                      (unsigned long long) rowid,
587                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
588 #endif
589   if ( (ret == GNUNET_NO) &&
590        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
591     {
592       plugin->env->duc (plugin->env->cls,
593                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
594 #if DEBUG_SQLITE
595       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
596                        "sqlite",
597                        "Removed entry %llu (%u bytes)\n",
598                        (unsigned long long) rowid,
599                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
600 #endif
601     }
602 }
603
604
605 /**
606  * Function invoked on behalf of a "PluginIterator"
607  * asking the database plugin to call the iterator
608  * with the next item.
609  *
610  * @param next_cls whatever argument was given
611  *        to the PluginIterator as "next_cls".
612  * @param end_it set to GNUNET_YES if we
613  *        should terminate the iteration early
614  *        (iterator should be still called once more
615  *         to signal the end of the iteration).
616  */
617 static void 
618 sqlite_next_request (void *next_cls,
619                      int end_it)
620 {
621   struct NextContext * nc= next_cls;
622
623   if (GNUNET_YES == end_it)
624     nc->end_it = GNUNET_YES;
625   nc->plugin->next_task_nc = nc;
626   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (nc->plugin->env->sched,
627                                                     &sqlite_next_request_cont,
628                                                     nc);
629 }
630
631
632
633 /**
634  * Store an item in the datastore.
635  *
636  * @param cls closure
637  * @param key key for the item
638  * @param size number of bytes in data
639  * @param data content stored
640  * @param type type of the content
641  * @param priority priority of the content
642  * @param anonymity anonymity-level for the content
643  * @param expiration expiration time for the content
644  * @param msg set to an error message
645  * @return GNUNET_OK on success
646  */
647 static int
648 sqlite_plugin_put (void *cls,
649                    const GNUNET_HashCode * key,
650                    uint32_t size,
651                    const void *data,
652                    enum GNUNET_BLOCK_Type type,
653                    uint32_t priority,
654                    uint32_t anonymity,
655                    struct GNUNET_TIME_Absolute expiration,
656                    char ** msg)
657 {
658   struct Plugin *plugin = cls;
659   int n;
660   sqlite3_stmt *stmt;
661   GNUNET_HashCode vhash;
662
663 #if DEBUG_SQLITE
664   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
665                    "sqlite",
666                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
667                    type, 
668                    GNUNET_h2s(key),
669                    priority,
670                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
671                    (long long) expiration.value);
672 #endif
673   GNUNET_CRYPTO_hash (data, size, &vhash);
674   stmt = plugin->insertContent;
675   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
676       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
677       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
678       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
679       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.value)) ||
680       (SQLITE_OK !=
681        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
682                           SQLITE_TRANSIENT)) ||
683       (SQLITE_OK !=
684        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
685                           SQLITE_TRANSIENT))
686       || (SQLITE_OK !=
687           sqlite3_bind_blob (stmt, 8, data, size,
688                              SQLITE_TRANSIENT)))
689     {
690       LOG_SQLITE (plugin,
691                   msg,
692                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
693       if (SQLITE_OK != sqlite3_reset (stmt))
694         LOG_SQLITE (plugin, NULL,
695                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
696       return GNUNET_SYSERR;
697     }
698   n = sqlite3_step (stmt);
699   if (n != SQLITE_DONE) 
700     {
701       if (n == SQLITE_BUSY)
702         {
703           LOG_SQLITE (plugin, msg,
704                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
705           sqlite3_reset (stmt);
706           GNUNET_break (0);
707           return GNUNET_NO;
708         }
709       LOG_SQLITE (plugin, msg,
710                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
711       sqlite3_reset (stmt);
712       database_shutdown (plugin);
713       database_setup (plugin->env->cfg,
714                       plugin);
715       return GNUNET_SYSERR;
716     }
717   if (SQLITE_OK != sqlite3_reset (stmt))
718     LOG_SQLITE (plugin, NULL,
719                 GNUNET_ERROR_TYPE_ERROR |
720                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
721   plugin->env->duc (plugin->env->cls,
722                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
723 #if DEBUG_SQLITE
724   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
725                    "sqlite",
726                    "Stored new entry (%u bytes)\n",
727            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
728 #endif
729   return GNUNET_OK;
730 }
731
732
733 /**
734  * Update the priority for a particular key in the datastore.  If
735  * the expiration time in value is different than the time found in
736  * the datastore, the higher value should be kept.  For the
737  * anonymity level, the lower value is to be used.  The specified
738  * priority should be added to the existing priority, ignoring the
739  * priority in value.
740  *
741  * Note that it is possible for multiple values to match this put.
742  * In that case, all of the respective values are updated.
743  *
744  * @param cls the plugin context (state for this module)
745  * @param uid unique identifier of the datum
746  * @param delta by how much should the priority
747  *     change?  If priority + delta < 0 the
748  *     priority should be set to 0 (never go
749  *     negative).
750  * @param expire new expiration time should be the
751  *     MAX of any existing expiration time and
752  *     this value
753  * @param msg set to an error message
754  * @return GNUNET_OK on success
755  */
756 static int
757 sqlite_plugin_update (void *cls,
758                       uint64_t uid,
759                       int delta, struct GNUNET_TIME_Absolute expire,
760                       char **msg)
761 {
762   struct Plugin *plugin = cls;
763   int n;
764
765   sqlite3_bind_int (plugin->updPrio, 1, delta);
766   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
767   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
768   n = sqlite3_step (plugin->updPrio);
769   if (n != SQLITE_DONE)
770     LOG_SQLITE (plugin, msg,
771                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
772                 "sqlite3_step");
773 #if DEBUG_SQLITE
774   else
775     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
776                      "sqlite",
777                      "Block updated\n");
778 #endif
779   sqlite3_reset (plugin->updPrio);
780
781   if (n == SQLITE_BUSY)
782     return GNUNET_NO;
783   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
784 }
785
786
787 /**
788  * Internal context for an iteration.
789  */
790 struct IterContext
791 {
792   /**
793    * FIXME.
794    */
795   sqlite3_stmt *stmt_1;
796
797   /**
798    * FIXME.
799    */
800   sqlite3_stmt *stmt_2;
801
802   /**
803    * FIXME.
804    */
805   int is_asc;
806
807   /**
808    * FIXME.
809    */
810   int is_prio;
811
812   /**
813    * FIXME.
814    */
815   int is_migr;
816
817   /**
818    * FIXME.
819    */
820   int limit_nonanonymous;
821
822   /**
823    * Desired type for blocks returned by this iterator.
824    */
825   enum GNUNET_BLOCK_Type type;
826 };
827
828
829 /**
830  * Prepare our SQL query to obtain the next record from the database.
831  *
832  * @param cls our "struct IterContext"
833  * @param nc NULL to terminate the iteration, otherwise our context for
834  *           getting the next result.
835  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
836  *         GNUNET_SYSERR on error (or end of iteration)
837  */
838 static int
839 iter_next_prepare (void *cls,
840                    struct NextContext *nc)
841 {
842   struct IterContext *ic = cls;
843   struct Plugin *plugin;
844   int ret;
845
846   if (nc == NULL)
847     {
848 #if DEBUG_SQLITE
849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850                   "Asked to clean up iterator state.\n");
851 #endif
852       sqlite3_finalize (ic->stmt_1);
853       sqlite3_finalize (ic->stmt_2);
854       return GNUNET_SYSERR;
855     }
856   sqlite3_reset (ic->stmt_1);
857   sqlite3_reset (ic->stmt_2);
858   plugin = nc->plugin;
859   if (ic->is_prio)
860     {
861 #if DEBUG_SQLITE
862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863                   "Restricting to results larger than the last priority %u\n",
864                   nc->lastPriority);
865 #endif
866       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
867       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
868     }
869   else
870     {
871 #if DEBUG_SQLITE
872       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873                   "Restricting to results larger than the last expiration %llu\n",
874                   (unsigned long long) nc->lastExpiration.value);
875 #endif
876       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
877       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
878     }
879 #if DEBUG_SQLITE
880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
881               "Restricting to results larger than the last key `%s'\n",
882               GNUNET_h2s(&nc->lastKey));
883 #endif
884   sqlite3_bind_blob (ic->stmt_1, 2, 
885                      &nc->lastKey, 
886                      sizeof (GNUNET_HashCode),
887                      SQLITE_TRANSIENT);
888   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
889     {      
890 #if DEBUG_SQLITE
891       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
892                   "Result found using iterator 1\n");
893 #endif
894       nc->stmt = ic->stmt_1;
895       return GNUNET_OK;
896     }
897   if (ret != SQLITE_DONE)
898     {
899       LOG_SQLITE (plugin, NULL,
900                   GNUNET_ERROR_TYPE_ERROR |
901                   GNUNET_ERROR_TYPE_BULK,
902                   "sqlite3_step");
903       return GNUNET_SYSERR;
904     }
905   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
906     LOG_SQLITE (plugin, NULL,
907                 GNUNET_ERROR_TYPE_ERROR | 
908                 GNUNET_ERROR_TYPE_BULK, 
909                 "sqlite3_reset");
910   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
911     {
912 #if DEBUG_SQLITE
913       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                   "Result found using iterator 2\n");
915 #endif
916       nc->stmt = ic->stmt_2;
917       return GNUNET_OK;
918     }
919   if (ret != SQLITE_DONE)
920     {
921       LOG_SQLITE (plugin, NULL,
922                   GNUNET_ERROR_TYPE_ERROR |
923                   GNUNET_ERROR_TYPE_BULK,
924                   "sqlite3_step");
925       return GNUNET_SYSERR;
926     }
927   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
928     LOG_SQLITE (plugin, NULL,
929                 GNUNET_ERROR_TYPE_ERROR |
930                 GNUNET_ERROR_TYPE_BULK,
931                 "sqlite3_reset");
932 #if DEBUG_SQLITE
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "No result found using either iterator\n");
935 #endif
936   return GNUNET_NO;
937 }
938
939
940 /**
941  * Call a method for each key in the database and
942  * call the callback method on it.
943  *
944  * @param plugin our plugin context
945  * @param type entries of which type should be considered?
946  * @param is_asc are we iterating in ascending order?
947  * @param is_prio are we iterating by priority (otherwise by expiration)
948  * @param is_migr are we iterating in migration order?
949  * @param limit_nonanonymous are we restricting results to those with anonymity
950  *              level zero?
951  * @param stmt_str_1 first SQL statement to execute
952  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
953  * @param iter function to call on each matching value;
954  *        will be called once with a NULL value at the end
955  * @param iter_cls closure for iter
956  */
957 static void
958 basic_iter (struct Plugin *plugin,
959             enum GNUNET_BLOCK_Type type,
960             int is_asc,
961             int is_prio,
962             int is_migr,
963             int limit_nonanonymous,
964             const char *stmt_str_1,
965             const char *stmt_str_2,
966             PluginIterator iter,
967             void *iter_cls)
968 {
969   struct NextContext *nc;
970   struct IterContext *ic;
971   sqlite3_stmt *stmt_1;
972   sqlite3_stmt *stmt_2;
973
974 #if DEBUG_SQLITE
975   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
976               "At %llu, using queries `%s' and `%s'\n",
977               (unsigned long long) GNUNET_TIME_absolute_get ().value,
978               stmt_str_1,
979               stmt_str_2);
980 #endif
981   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
982     {
983       LOG_SQLITE (plugin, NULL,
984                   GNUNET_ERROR_TYPE_ERROR |
985                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
986       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
987       return;
988     }
989   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
990     {
991       LOG_SQLITE (plugin, NULL,
992                   GNUNET_ERROR_TYPE_ERROR |
993                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
994       sqlite3_finalize (stmt_1);
995       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
996       return;
997     }
998   nc = GNUNET_malloc (sizeof(struct NextContext) + 
999                       sizeof(struct IterContext));
1000   nc->plugin = plugin;
1001   nc->iter = iter;
1002   nc->iter_cls = iter_cls;
1003   nc->stmt = NULL;
1004   ic = (struct IterContext*) &nc[1];
1005   ic->stmt_1 = stmt_1;
1006   ic->stmt_2 = stmt_2;
1007   ic->type = type;
1008   ic->is_asc = is_asc;
1009   ic->is_prio = is_prio;
1010   ic->is_migr = is_migr;
1011   ic->limit_nonanonymous = limit_nonanonymous;
1012   nc->prep = &iter_next_prepare;
1013   nc->prep_cls = ic;
1014   if (is_asc)
1015     {
1016       nc->lastPriority = 0;
1017       nc->lastExpiration.value = 0;
1018       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1019     }
1020   else
1021     {
1022       nc->lastPriority = 0x7FFFFFFF;
1023       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1024       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1025     }
1026   sqlite_next_request (nc, GNUNET_NO);
1027 }
1028
1029
1030 /**
1031  * Select a subset of the items in the datastore and call
1032  * the given iterator for each of them.
1033  *
1034  * @param cls our plugin context
1035  * @param type entries of which type should be considered?
1036  *        Use 0 for any type.
1037  * @param iter function to call on each matching value;
1038  *        will be called once with a NULL value at the end
1039  * @param iter_cls closure for iter
1040  */
1041 static void
1042 sqlite_plugin_iter_low_priority (void *cls,
1043                                  enum GNUNET_BLOCK_Type type,
1044                                  PluginIterator iter,
1045                                  void *iter_cls)
1046 {
1047   basic_iter (cls,
1048               type, 
1049               GNUNET_YES, GNUNET_YES, 
1050               GNUNET_NO, GNUNET_NO,
1051               SELECT_IT_LOW_PRIORITY_1,
1052               SELECT_IT_LOW_PRIORITY_2, 
1053               iter, iter_cls);
1054 }
1055
1056
1057 /**
1058  * Select a subset of the items in the datastore and call
1059  * the given iterator for each of them.
1060  *
1061  * @param cls our plugin context
1062  * @param type entries of which type should be considered?
1063  *        Use 0 for any type.
1064  * @param iter function to call on each matching value;
1065  *        will be called once with a NULL value at the end
1066  * @param iter_cls closure for iter
1067  */
1068 static void
1069 sqlite_plugin_iter_zero_anonymity (void *cls,
1070                                    enum GNUNET_BLOCK_Type type,
1071                                    PluginIterator iter,
1072                                    void *iter_cls)
1073 {
1074   struct GNUNET_TIME_Absolute now;
1075   char *q1;
1076   char *q2;
1077
1078   now = GNUNET_TIME_absolute_get ();
1079   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1080                    (unsigned long long) now.value);
1081   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1082                    (unsigned long long) now.value);
1083   basic_iter (cls,
1084               type, 
1085               GNUNET_NO, GNUNET_YES, 
1086               GNUNET_NO, GNUNET_YES,
1087               q1,
1088               q2,
1089               iter, iter_cls);
1090   GNUNET_free (q1);
1091   GNUNET_free (q2);
1092 }
1093
1094
1095
1096 /**
1097  * Select a subset of the items in the datastore and call
1098  * the given iterator for each of them.
1099  *
1100  * @param cls our plugin context
1101  * @param type entries of which type should be considered?
1102  *        Use 0 for any type.
1103  * @param iter function to call on each matching value;
1104  *        will be called once with a NULL value at the end
1105  * @param iter_cls closure for iter
1106  */
1107 static void
1108 sqlite_plugin_iter_ascending_expiration (void *cls,
1109                                          enum GNUNET_BLOCK_Type type,
1110                                          PluginIterator iter,
1111                                          void *iter_cls)
1112 {
1113   struct GNUNET_TIME_Absolute now;
1114   char *q1;
1115   char *q2;
1116
1117   now = GNUNET_TIME_absolute_get ();
1118   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1119                    (unsigned long long) 0*now.value);
1120   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1121                    (unsigned long long) 0*now.value);
1122   basic_iter (cls,
1123               type, 
1124               GNUNET_YES, GNUNET_NO, 
1125               GNUNET_NO, GNUNET_NO,
1126               q1, q2,
1127               iter, iter_cls);
1128   GNUNET_free (q1);
1129   GNUNET_free (q2);
1130 }
1131
1132
1133 /**
1134  * Select a subset of the items in the datastore and call
1135  * the given iterator for each of them.
1136  *
1137  * @param cls our plugin context
1138  * @param type entries of which type should be considered?
1139  *        Use 0 for any type.
1140  * @param iter function to call on each matching value;
1141  *        will be called once with a NULL value at the end
1142  * @param iter_cls closure for iter
1143  */
1144 static void
1145 sqlite_plugin_iter_migration_order (void *cls,
1146                                     enum GNUNET_BLOCK_Type type,
1147                                     PluginIterator iter,
1148                                     void *iter_cls)
1149 {
1150   struct GNUNET_TIME_Absolute now;
1151   char *q;
1152
1153   now = GNUNET_TIME_absolute_get ();
1154   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1155                    (unsigned long long) now.value);
1156   basic_iter (cls,
1157               type, 
1158               GNUNET_NO, GNUNET_NO, 
1159               GNUNET_YES, GNUNET_NO,
1160               SELECT_IT_MIGRATION_ORDER_1,
1161               q,
1162               iter, iter_cls);
1163   GNUNET_free (q);
1164 }
1165
1166
1167 /**
1168  * Call sqlite using the already prepared query to get
1169  * the next result.
1170  *
1171  * @param cls not used
1172  * @param nc context with the prepared query
1173  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1174  *        there are no more results 
1175  */
1176 static int
1177 all_next_prepare (void *cls,
1178                   struct NextContext *nc)
1179 {
1180   struct Plugin *plugin;
1181   int ret;
1182
1183   if (nc == NULL)
1184     {
1185 #if DEBUG_SQLITE
1186       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1187                   "Asked to clean up iterator state.\n");
1188 #endif
1189       return GNUNET_SYSERR;
1190     }
1191   plugin = nc->plugin;
1192   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1193     {      
1194       return GNUNET_OK;
1195     }
1196   if (ret != SQLITE_DONE)
1197     {
1198       LOG_SQLITE (plugin, NULL,
1199                   GNUNET_ERROR_TYPE_ERROR |
1200                   GNUNET_ERROR_TYPE_BULK,
1201                   "sqlite3_step");
1202       return GNUNET_SYSERR;
1203     }
1204   return GNUNET_NO;
1205 }
1206
1207
1208 /**
1209  * Select a subset of the items in the datastore and call
1210  * the given iterator for each of them.
1211  *
1212  * @param cls our plugin context
1213  * @param type entries of which type should be considered?
1214  *        Use 0 for any type.
1215  * @param iter function to call on each matching value;
1216  *        will be called once with a NULL value at the end
1217  * @param iter_cls closure for iter
1218  */
1219 static void
1220 sqlite_plugin_iter_all_now (void *cls,
1221                             enum GNUNET_BLOCK_Type type,
1222                             PluginIterator iter,
1223                             void *iter_cls)
1224 {
1225   struct Plugin *plugin = cls;
1226   struct NextContext *nc;
1227   sqlite3_stmt *stmt;
1228
1229   if (sq_prepare (plugin->dbh, 
1230                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1231                   &stmt) != SQLITE_OK)
1232     {
1233       LOG_SQLITE (plugin, NULL,
1234                   GNUNET_ERROR_TYPE_ERROR |
1235                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1236       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1237       return;
1238     }
1239   nc = GNUNET_malloc (sizeof(struct NextContext));
1240   nc->plugin = plugin;
1241   nc->iter = iter;
1242   nc->iter_cls = iter_cls;
1243   nc->stmt = stmt;
1244   nc->prep = &all_next_prepare;
1245   nc->prep_cls = NULL;
1246   sqlite_next_request (nc, GNUNET_NO);
1247 }
1248
1249
1250 /**
1251  * FIXME.
1252  */
1253 struct GetNextContext
1254 {
1255
1256   /**
1257    * FIXME.
1258    */
1259   int total;
1260
1261   /**
1262    * FIXME.
1263    */
1264   int off;
1265
1266   /**
1267    * FIXME.
1268    */
1269   int have_vhash;
1270
1271   /**
1272    * FIXME.
1273    */
1274   unsigned int type;
1275
1276   /**
1277    * FIXME.
1278    */
1279   sqlite3_stmt *stmt;
1280
1281   /**
1282    * FIXME.
1283    */
1284   GNUNET_HashCode key;
1285
1286   /**
1287    * FIXME.
1288    */
1289   GNUNET_HashCode vhash;
1290 };
1291
1292
1293
1294 /**
1295  * FIXME.
1296  *
1297  * @param cls our "struct GetNextContext*"
1298  * @param nc FIXME
1299  * @return GNUNET_YES if there are more results, 
1300  *         GNUNET_NO if there are no more results,
1301  *         GNUNET_SYSERR on internal error
1302  */
1303 static int
1304 get_next_prepare (void *cls,
1305                   struct NextContext *nc)
1306 {
1307   struct GetNextContext *gnc = cls;
1308   int sqoff;
1309   int ret;
1310   int limit_off;
1311
1312   if (nc == NULL)
1313     {
1314       sqlite3_finalize (gnc->stmt);
1315       return GNUNET_SYSERR;
1316     }
1317   if (nc->count == gnc->total)
1318     return GNUNET_NO;
1319   if (nc->count + gnc->off == gnc->total)
1320     nc->last_rowid = 0;
1321   if (nc->count == 0)
1322     limit_off = gnc->off;
1323   else
1324     limit_off = 0;
1325   sqoff = 1;
1326   sqlite3_reset (nc->stmt);
1327   ret = sqlite3_bind_blob (nc->stmt,
1328                            sqoff++,
1329                            &gnc->key, 
1330                            sizeof (GNUNET_HashCode),
1331                            SQLITE_TRANSIENT);
1332   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1333     ret = sqlite3_bind_blob (nc->stmt,
1334                              sqoff++,
1335                              &gnc->vhash,
1336                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1337   if ((gnc->type != 0) && (ret == SQLITE_OK))
1338     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1339   if (ret == SQLITE_OK)
1340     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1341   if (ret == SQLITE_OK)
1342     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1343   if (ret != SQLITE_OK)
1344     return GNUNET_SYSERR;
1345   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1346     return GNUNET_NO;
1347   return GNUNET_OK;
1348 }
1349
1350
1351 /**
1352  * Iterate over the results for a particular key
1353  * in the datastore.
1354  *
1355  * @param cls closure
1356  * @param key maybe NULL (to match all entries)
1357  * @param vhash hash of the value, maybe NULL (to
1358  *        match all values that have the right key).
1359  *        Note that for DBlocks there is no difference
1360  *        betwen key and vhash, but for other blocks
1361  *        there may be!
1362  * @param type entries of which type are relevant?
1363  *     Use 0 for any type.
1364  * @param iter function to call on each matching value;
1365  *        will be called once with a NULL value at the end
1366  * @param iter_cls closure for iter
1367  */
1368 static void
1369 sqlite_plugin_get (void *cls,
1370                    const GNUNET_HashCode * key,
1371                    const GNUNET_HashCode * vhash,
1372                    enum GNUNET_BLOCK_Type type,
1373                    PluginIterator iter, void *iter_cls)
1374 {
1375   struct Plugin *plugin = cls;
1376   struct GetNextContext *gpc;
1377   struct NextContext *nc;
1378   int ret;
1379   int total;
1380   sqlite3_stmt *stmt;
1381   char scratch[256];
1382   int sqoff;
1383
1384   GNUNET_assert (iter != NULL);
1385   if (key == NULL)
1386     {
1387       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1388       return;
1389     }
1390   GNUNET_snprintf (scratch, sizeof (scratch),
1391                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1392                    vhash == NULL ? "" : " AND vhash=:2",
1393                    type == 0 ? "" : (vhash ==
1394                                      NULL) ? " AND type=:2" : " AND type=:3");
1395   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1396     {
1397       LOG_SQLITE (plugin, NULL,
1398                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1399       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1400       return;
1401     }
1402   sqoff = 1;
1403   ret = sqlite3_bind_blob (stmt,
1404                            sqoff++,
1405                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1406   if ((vhash != NULL) && (ret == SQLITE_OK))
1407     ret = sqlite3_bind_blob (stmt,
1408                              sqoff++,
1409                              vhash,
1410                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1411   if ((type != 0) && (ret == SQLITE_OK))
1412     ret = sqlite3_bind_int (stmt, sqoff++, type);
1413   if (SQLITE_OK != ret)
1414     {
1415       LOG_SQLITE (plugin, NULL,
1416                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1417       sqlite3_reset (stmt);
1418       sqlite3_finalize (stmt);
1419       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1420       return;
1421     }
1422   ret = sqlite3_step (stmt);
1423   if (ret != SQLITE_ROW)
1424     {
1425       LOG_SQLITE (plugin, NULL,
1426                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1427                   "sqlite_step");
1428       sqlite3_reset (stmt);
1429       sqlite3_finalize (stmt);
1430       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1431       return;
1432     }
1433   total = sqlite3_column_int (stmt, 0);
1434   sqlite3_reset (stmt);
1435   sqlite3_finalize (stmt);
1436   if (0 == total)
1437     {
1438       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1439       return;
1440     }
1441
1442   GNUNET_snprintf (scratch, sizeof (scratch),
1443                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1444                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1445                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1446                    vhash == NULL ? "" : " AND vhash=:2",
1447                    type == 0 ? "" : (vhash ==
1448                                      NULL) ? " AND type=:2" : " AND type=:3",
1449                    sqoff, sqoff + 1);
1450   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1451     {
1452       LOG_SQLITE (plugin, NULL,
1453                   GNUNET_ERROR_TYPE_ERROR |
1454                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1455       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1456       return;
1457     }
1458   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1459                       sizeof(struct GetNextContext));
1460   nc->plugin = plugin;
1461   nc->iter = iter;
1462   nc->iter_cls = iter_cls;
1463   nc->stmt = stmt;
1464   gpc = (struct GetNextContext*) &nc[1];
1465   gpc->total = total;
1466   gpc->type = type;
1467   gpc->key = *key;
1468   gpc->stmt = stmt; /* alias used for freeing at the end! */
1469   if (NULL != vhash)
1470     {
1471       gpc->have_vhash = GNUNET_YES;
1472       gpc->vhash = *vhash;
1473     }
1474   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1475   nc->prep = &get_next_prepare;
1476   nc->prep_cls = gpc;
1477   sqlite_next_request (nc, GNUNET_NO);
1478 }
1479
1480
1481 /**
1482  * Drop database.
1483  *
1484  * @param cls our plugin context
1485  */
1486 static void 
1487 sqlite_plugin_drop (void *cls)
1488 {
1489   struct Plugin *plugin = cls;
1490   plugin->drop_on_shutdown = GNUNET_YES;
1491 }
1492
1493
1494 static unsigned long long
1495 sqlite_plugin_get_size (void *cls)
1496 {
1497   struct Plugin *plugin = cls;
1498   sqlite3_stmt *stmt;
1499   uint64_t pages;
1500   uint64_t page_size;
1501 #if ENULL_DEFINED
1502   char *e;
1503 #endif
1504
1505   if (SQLITE_VERSION_NUMBER < 3006000)
1506     {
1507       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1508                        "datastore-sqlite",
1509                        _("sqlite version to old to determine size, assuming zero\n"));
1510       return 0;
1511     }
1512   CHECK (SQLITE_OK ==
1513          sqlite3_exec (plugin->dbh,
1514                        "VACUUM", NULL, NULL, ENULL));
1515   CHECK (SQLITE_OK ==
1516          sqlite3_exec (plugin->dbh,
1517                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1518   CHECK (SQLITE_OK ==
1519          sq_prepare (plugin->dbh,
1520                      "PRAGMA page_count",
1521                      &stmt));
1522   if (SQLITE_ROW ==
1523       sqlite3_step (stmt))
1524     pages = sqlite3_column_int64 (stmt, 0);
1525   else
1526     pages = 0;
1527   sqlite3_finalize (stmt);
1528   CHECK (SQLITE_OK ==
1529          sq_prepare (plugin->dbh,
1530                      "PRAGMA page_size",
1531                      &stmt));
1532   CHECK (SQLITE_ROW ==
1533          sqlite3_step (stmt));
1534   page_size = sqlite3_column_int64 (stmt, 0);
1535   sqlite3_finalize (stmt);
1536   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1537               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1538               (unsigned long long) pages,
1539               (unsigned long long) page_size);
1540   return  pages * page_size;
1541 }
1542                                          
1543
1544 /**
1545  * Entry point for the plugin.
1546  *
1547  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1548  * @return NULL on error, othrewise the plugin context
1549  */
1550 void *
1551 libgnunet_plugin_datastore_sqlite_init (void *cls)
1552 {
1553   static struct Plugin plugin;
1554   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1555   struct GNUNET_DATASTORE_PluginFunctions *api;
1556
1557   if (plugin.env != NULL)
1558     return NULL; /* can only initialize once! */
1559   memset (&plugin, 0, sizeof(struct Plugin));
1560   plugin.env = env;
1561   if (GNUNET_OK !=
1562       database_setup (env->cfg, &plugin))
1563     {
1564       database_shutdown (&plugin);
1565       return NULL;
1566     }
1567   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1568   api->cls = &plugin;
1569   api->get_size = &sqlite_plugin_get_size;
1570   api->put = &sqlite_plugin_put;
1571   api->next_request = &sqlite_next_request;
1572   api->get = &sqlite_plugin_get;
1573   api->update = &sqlite_plugin_update;
1574   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1575   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1576   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1577   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1578   api->iter_all_now = &sqlite_plugin_iter_all_now;
1579   api->drop = &sqlite_plugin_drop;
1580   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1581                    "sqlite", _("Sqlite database running\n"));
1582   return api;
1583 }
1584
1585
1586 /**
1587  * Exit point from the plugin.
1588  *
1589  * @param cls the plugin context (as returned by "init")
1590  * @return always NULL
1591  */
1592 void *
1593 libgnunet_plugin_datastore_sqlite_done (void *cls)
1594 {
1595   char *fn;
1596   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1597   struct Plugin *plugin = api->cls;
1598
1599   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1600     {
1601       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1602                                plugin->next_task);
1603       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1604       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1605       GNUNET_free (plugin->next_task_nc);
1606       plugin->next_task_nc = NULL;
1607     }
1608   fn = NULL;
1609   if (plugin->drop_on_shutdown)
1610     fn = GNUNET_strdup (plugin->fn);
1611   database_shutdown (plugin);
1612   plugin->env = NULL; 
1613   GNUNET_free (api);
1614   if (fn != NULL)
1615     {
1616       if (0 != UNLINK(fn))
1617         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1618                                   "unlink",
1619                                   fn);
1620       GNUNET_free (fn);
1621     }
1622   return NULL;
1623 }
1624
1625 /* end of plugin_datastore_sqlite.c */