const-ing of config-handles
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_NO
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94 /**
95  * Context for all functions in this plugin.
96  */
97 struct Plugin 
98 {
99   /**
100    * Our execution environment.
101    */
102   struct GNUNET_DATASTORE_PluginEnvironment *env;
103
104   /**
105    * Database filename.
106    */
107   char *fn;
108
109   /**
110    * Native SQLite database handle.
111    */
112   sqlite3 *dbh;
113
114   /**
115    * Precompiled SQL for update.
116    */
117   sqlite3_stmt *updPrio;
118
119   /**
120    * Precompiled SQL for insertion.
121    */
122   sqlite3_stmt *insertContent;
123
124   /**
125    * Handle to the statistics service.
126    */
127   struct GNUNET_STATISTICS_Handle *statistics;
128   
129   /**
130    * How much data are we currently storing
131    * in the database?
132    */
133   unsigned long long payload;
134
135   /**
136    * Number of updates that were made to the
137    * payload value since we last synchronized
138    * it with the statistics service.
139    */
140   unsigned int lastSync;
141
142   /**
143    * Should the database be dropped on shutdown?
144    */
145   int drop_on_shutdown;
146 };
147
148
149 /**
150  * @brief Prepare a SQL statement
151  *
152  * @param zSql SQL statement, UTF-8 encoded
153  */
154 static int
155 sq_prepare (sqlite3 * dbh, const char *zSql,
156             sqlite3_stmt ** ppStmt)
157 {
158   char *dummy;
159   return sqlite3_prepare (dbh,
160                           zSql,
161                           strlen (zSql), ppStmt, (const char **) &dummy);
162 }
163
164
165 /**
166  * Create our database indices.
167  */
168 static void
169 create_indices (sqlite3 * dbh)
170 {
171   /* create indices */
172   sqlite3_exec (dbh,
173                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
174   sqlite3_exec (dbh,
175                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
176                 NULL, NULL);
177   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
178                 NULL);
179   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
180                 NULL);
181   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
182                 NULL, NULL);
183   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
184                 NULL, NULL, NULL);
185   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
186                 NULL, NULL);
187 }
188
189
190
191 #if 1
192 #define CHECK(a) GNUNET_break(a)
193 #define ENULL NULL
194 #else
195 #define ENULL &e
196 #define ENULL_DEFINED 1
197 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
198 #endif
199
200
201
202
203 /**
204  * Initialize the database connections and associated
205  * data structures (create tables and indices
206  * as needed as well).
207  *
208  * @return GNUNET_OK on success
209  */
210 static int
211 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
212                 struct Plugin *plugin)
213 {
214   sqlite3_stmt *stmt;
215   char *afsdir;
216 #if ENULL_DEFINED
217   char *e;
218 #endif
219   
220   if (GNUNET_OK != 
221       GNUNET_CONFIGURATION_get_value_filename (cfg,
222                                                "datastore-sqlite",
223                                                "FILENAME",
224                                                &afsdir))
225     {
226       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
227                        "sqlite",
228                        _("Option `%s' in section `%s' missing in configuration!\n"),
229                        "FILENAME",
230                        "datastore-sqlite");
231       return GNUNET_SYSERR;
232     }
233   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
234     {
235       GNUNET_break (0);
236       GNUNET_free (afsdir);
237       return GNUNET_SYSERR;
238     }
239   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
240 #ifdef ENABLE_NLS
241                                               nl_langinfo (CODESET)
242 #else
243                                               "UTF-8"   /* good luck */
244 #endif
245                                               );
246   GNUNET_free (afsdir);
247   
248   /* Open database and precompile statements */
249   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
250     {
251       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
252                        "sqlite",
253                        _("Unable to initialize SQLite: %s.\n"),
254                        sqlite3_errmsg (plugin->dbh));
255       return GNUNET_SYSERR;
256     }
257   CHECK (SQLITE_OK ==
258          sqlite3_exec (plugin->dbh,
259                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
260   CHECK (SQLITE_OK ==
261          sqlite3_exec (plugin->dbh,
262                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
263   CHECK (SQLITE_OK ==
264          sqlite3_exec (plugin->dbh,
265                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
266   CHECK (SQLITE_OK ==
267          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
268
269   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
270
271
272   /* We have to do it here, because otherwise precompiling SQL might fail */
273   CHECK (SQLITE_OK ==
274          sq_prepare (plugin->dbh,
275                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
276                      &stmt));
277   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
278        (sqlite3_exec (plugin->dbh,
279                       "CREATE TABLE gn080 ("
280                       "  size INT4 NOT NULL DEFAULT 0,"
281                       "  type INT4 NOT NULL DEFAULT 0,"
282                       "  prio INT4 NOT NULL DEFAULT 0,"
283                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
284                       "  expire INT8 NOT NULL DEFAULT 0,"
285                       "  hash TEXT NOT NULL DEFAULT '',"
286                       "  vhash TEXT NOT NULL DEFAULT '',"
287                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
288                       NULL) != SQLITE_OK) )
289     {
290       LOG_SQLITE (plugin, NULL,
291                   GNUNET_ERROR_TYPE_ERROR, 
292                   "sqlite3_exec");
293       sqlite3_finalize (stmt);
294       return GNUNET_SYSERR;
295     }
296   sqlite3_finalize (stmt);
297   create_indices (plugin->dbh);
298
299   CHECK (SQLITE_OK ==
300          sq_prepare (plugin->dbh,
301                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
302                      &stmt));
303   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
304        (sqlite3_exec (plugin->dbh,
305                       "CREATE TABLE gn071 ("
306                       "  key TEXT NOT NULL DEFAULT '',"
307                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
308                       NULL) != SQLITE_OK) )
309     {
310       LOG_SQLITE (plugin, NULL,
311                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
312       sqlite3_finalize (stmt);
313       return GNUNET_SYSERR;
314     }
315   sqlite3_finalize (stmt);
316
317   if ((sq_prepare (plugin->dbh,
318                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
319                    "_ROWID_ = ?",
320                    &plugin->updPrio) != SQLITE_OK) ||
321       (sq_prepare (plugin->dbh,
322                    "INSERT INTO gn080 (size, type, prio, "
323                    "anonLevel, expire, hash, vhash, value) VALUES "
324                    "(?, ?, ?, ?, ?, ?, ?, ?)",
325                    &plugin->insertContent) != SQLITE_OK))
326     {
327       LOG_SQLITE (plugin, NULL,
328                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
329       return GNUNET_SYSERR;
330     }
331   return GNUNET_OK;
332 }
333
334
335 /**
336  * Synchronize our utilization statistics with the 
337  * statistics service.
338  */
339 static void 
340 sync_stats (struct Plugin *plugin)
341 {
342   GNUNET_STATISTICS_set (plugin->statistics,
343                          QUOTA_STAT_NAME,
344                          plugin->payload,
345                          GNUNET_YES);
346   plugin->lastSync = 0;
347 }
348
349
350 /**
351  * Shutdown database connection and associate data
352  * structures.
353  */
354 static void
355 database_shutdown (struct Plugin *plugin)
356 {
357   if (plugin->lastSync > 0)
358     sync_stats (plugin);
359   if (plugin->updPrio != NULL)
360     sqlite3_finalize (plugin->updPrio);
361   if (plugin->insertContent != NULL)
362     sqlite3_finalize (plugin->insertContent);
363   sqlite3_close (plugin->dbh);
364   GNUNET_free_non_null (plugin->fn);
365 }
366
367
368 /**
369  * Get an estimate of how much space the database is
370  * currently using.
371  * @return number of bytes used on disk
372  */
373 static unsigned long long sqlite_plugin_get_size (void *cls)
374 {
375   struct Plugin *plugin = cls;
376   return plugin->payload;
377 }
378
379
380 /**
381  * Delete the database entry with the given
382  * row identifier.
383  */
384 static int
385 delete_by_rowid (struct Plugin* plugin, 
386                  unsigned long long rid)
387 {
388   sqlite3_stmt *stmt;
389
390   if (sq_prepare (plugin->dbh,
391                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
392     {
393       LOG_SQLITE (plugin, NULL,
394                   GNUNET_ERROR_TYPE_ERROR |
395                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
396       return GNUNET_SYSERR;
397     }
398   sqlite3_bind_int64 (stmt, 1, rid);
399   if (SQLITE_DONE != sqlite3_step (stmt))
400     {
401       LOG_SQLITE (plugin, NULL,
402                   GNUNET_ERROR_TYPE_ERROR |
403                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
404       sqlite3_finalize (stmt);
405       return GNUNET_SYSERR;
406     }
407   sqlite3_finalize (stmt);
408   return GNUNET_OK;
409 }
410
411
412 /**
413  * Context for the universal iterator.
414  */
415 struct NextContext;
416
417 /**
418  * Type of a function that will prepare
419  * the next iteration.
420  *
421  * @param cls closure
422  * @param nc the next context; NULL for the last
423  *         call which gives the callback a chance to
424  *         clean up the closure
425  * @return GNUNET_OK on success, GNUNET_NO if there are
426  *         no more values, GNUNET_SYSERR on error
427  */
428 typedef int (*PrepareFunction)(void *cls,
429                                struct NextContext *nc);
430
431
432 /**
433  * Context we keep for the "next request" callback.
434  */
435 struct NextContext
436 {
437   /**
438    * Internal state.
439    */ 
440   struct Plugin *plugin;
441
442   /**
443    * Function to call on the next value.
444    */
445   PluginIterator iter;
446
447   /**
448    * Closure for iter.
449    */
450   void *iter_cls;
451
452   /**
453    * Function to call to prepare the next
454    * iteration.
455    */
456   PrepareFunction prep;
457
458   /**
459    * Closure for prep.
460    */
461   void *prep_cls;
462
463   /**
464    * Statement that the iterator will get the data
465    * from (updated or set by prep).
466    */ 
467   sqlite3_stmt *stmt;
468
469   /**
470    * Row ID of the last result.
471    */
472   unsigned long long last_rowid;
473
474   /**
475    * Key of the last result.
476    */
477   GNUNET_HashCode lastKey;  
478
479   /**
480    * Expiration time of the last value visited.
481    */
482   struct GNUNET_TIME_Absolute lastExpiration;
483
484   /**
485    * Priority of the last value visited.
486    */ 
487   unsigned int lastPriority; 
488
489   /**
490    * Number of results processed so far.
491    */
492   unsigned int count;
493
494   /**
495    * Set to GNUNET_YES if we must stop now.
496    */
497   int end_it;
498 };
499
500
501 /**
502  * Continuation of "sqlite_next_request".
503  *
504  * @param cls the next context
505  */
506 static void 
507 sqlite_next_request_cont (void *cls,
508                           const struct GNUNET_SCHEDULER_TaskContext *tc)
509 {
510   static struct GNUNET_TIME_Absolute zero;
511   struct NextContext * nc= cls;
512   struct Plugin *plugin;
513   unsigned long long rowid;
514   sqlite3_stmt *stmtd;
515   int ret;
516   unsigned int type;
517   unsigned int size;
518   unsigned int priority;
519   unsigned int anonymity;
520   struct GNUNET_TIME_Absolute expiration;
521   const GNUNET_HashCode *key;
522   const void *data;
523
524  
525   plugin = nc->plugin;
526   if ( (GNUNET_YES == nc->end_it) ||
527        (GNUNET_OK != (nc->prep(nc->prep_cls,
528                                nc))) )
529     {
530     END:
531       nc->iter (nc->iter_cls, 
532                 NULL, NULL, 0, NULL, 0, 0, 0, 
533                 zero, 0);
534       nc->prep (nc->prep_cls, NULL);
535       GNUNET_free (nc);
536       return;
537     }
538
539   rowid = sqlite3_column_int64 (nc->stmt, 7);
540   nc->last_rowid = rowid;
541   type = sqlite3_column_int (nc->stmt, 1);
542   size = sqlite3_column_bytes (nc->stmt, 6);
543   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
544     {
545       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
546                        "sqlite",
547                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
548       if (SQLITE_OK != sqlite3_reset (nc->stmt))
549         LOG_SQLITE (nc->plugin, NULL,
550                     GNUNET_ERROR_TYPE_ERROR |
551                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
552       if (sq_prepare
553           (nc->plugin->dbh,
554            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
555            &stmtd) != SQLITE_OK)
556         {
557           LOG_SQLITE (nc->plugin, NULL,
558                       GNUNET_ERROR_TYPE_ERROR |
559                       GNUNET_ERROR_TYPE_BULK, 
560                       "sq_prepare");
561           goto END;
562         }
563
564       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
565         LOG_SQLITE (nc->plugin, NULL,
566                     GNUNET_ERROR_TYPE_ERROR |
567                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
568       if (SQLITE_DONE != sqlite3_step (stmtd))
569         LOG_SQLITE (nc->plugin, NULL,
570                     GNUNET_ERROR_TYPE_ERROR |
571                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
572       if (SQLITE_OK != sqlite3_finalize (stmtd))
573         LOG_SQLITE (nc->plugin, NULL,
574                     GNUNET_ERROR_TYPE_ERROR |
575                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
576       goto END;
577     }
578
579   priority = sqlite3_column_int (nc->stmt, 2);
580   anonymity = sqlite3_column_int (nc->stmt, 3);
581   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
582   key = sqlite3_column_blob (nc->stmt, 5);
583   nc->lastPriority = priority;
584   nc->lastExpiration = expiration;
585   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
586   data = sqlite3_column_blob (nc->stmt, 6);
587   nc->count++;
588   ret = nc->iter (nc->iter_cls,
589                   nc,
590                   key,
591                   size,
592                   data, 
593                   type,
594                   priority,
595                   anonymity,
596                   expiration,
597                   rowid);
598   if (ret == GNUNET_SYSERR)
599     {
600       nc->end_it = GNUNET_YES;
601       return;
602     }
603   if ( (ret == GNUNET_NO) &&
604        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
605     {
606       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
607       plugin->lastSync++; 
608       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
609         sync_stats (plugin);
610     }
611 }
612
613
614 /**
615  * Function invoked on behalf of a "PluginIterator"
616  * asking the database plugin to call the iterator
617  * with the next item.
618  *
619  * @param next_cls whatever argument was given
620  *        to the PluginIterator as "next_cls".
621  * @param end_it set to GNUNET_YES if we
622  *        should terminate the iteration early
623  *        (iterator should be still called once more
624  *         to signal the end of the iteration).
625  */
626 static void 
627 sqlite_next_request (void *next_cls,
628                      int end_it)
629 {
630   struct NextContext * nc= next_cls;
631
632   if (GNUNET_YES == end_it)
633     nc->end_it = GNUNET_YES;
634   GNUNET_SCHEDULER_add_continuation (nc->plugin->env->sched,
635                                      GNUNET_NO,
636                                      &sqlite_next_request_cont,
637                                      nc,
638                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
639 }
640
641
642
643 /**
644  * Store an item in the datastore.
645  *
646  * @param cls closure
647  * @param key key for the item
648  * @param size number of bytes in data
649  * @param data content stored
650  * @param type type of the content
651  * @param priority priority of the content
652  * @param anonymity anonymity-level for the content
653  * @param expiration expiration time for the content
654  * @param msg set to an error message
655  * @return GNUNET_OK on success
656  */
657 static int
658 sqlite_plugin_put (void *cls,
659                    const GNUNET_HashCode * key,
660                    uint32_t size,
661                    const void *data,
662                    uint32_t type,
663                    uint32_t priority,
664                    uint32_t anonymity,
665                    struct GNUNET_TIME_Absolute expiration,
666                    char ** msg)
667 {
668   struct Plugin *plugin = cls;
669   int n;
670   sqlite3_stmt *stmt;
671   GNUNET_HashCode vhash;
672
673 #if DEBUG_SQLITE
674   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
675                    "sqlite",
676                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
677                    type, 
678                    GNUNET_h2s(key),
679                    priority,
680                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
681                    (long long) expiration.value);
682 #endif
683   GNUNET_CRYPTO_hash (data, size, &vhash);
684   stmt = plugin->insertContent;
685   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
686       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
687       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
688       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
689       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
690       (SQLITE_OK !=
691        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
692                           SQLITE_TRANSIENT)) ||
693       (SQLITE_OK !=
694        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
695                           SQLITE_TRANSIENT))
696       || (SQLITE_OK !=
697           sqlite3_bind_blob (stmt, 8, data, size,
698                              SQLITE_TRANSIENT)))
699     {
700       LOG_SQLITE (plugin,
701                   msg,
702                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
703       if (SQLITE_OK != sqlite3_reset (stmt))
704         LOG_SQLITE (plugin, NULL,
705                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
706       return GNUNET_SYSERR;
707     }
708   n = sqlite3_step (stmt);
709   if (n != SQLITE_DONE)
710     {
711       if (n == SQLITE_BUSY)
712         {
713           LOG_SQLITE (plugin, msg,
714                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
715           sqlite3_reset (stmt);
716           GNUNET_break (0);
717           return GNUNET_NO;
718         }
719       LOG_SQLITE (plugin, msg,
720                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
721       sqlite3_reset (stmt);
722       return GNUNET_SYSERR;
723     }
724   if (SQLITE_OK != sqlite3_reset (stmt))
725     LOG_SQLITE (plugin, NULL,
726                 GNUNET_ERROR_TYPE_ERROR |
727                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
728   plugin->lastSync++;
729   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
730   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
731     sync_stats (plugin);
732   return GNUNET_OK;
733 }
734
735
736 /**
737  * Update the priority for a particular key in the datastore.  If
738  * the expiration time in value is different than the time found in
739  * the datastore, the higher value should be kept.  For the
740  * anonymity level, the lower value is to be used.  The specified
741  * priority should be added to the existing priority, ignoring the
742  * priority in value.
743  *
744  * Note that it is possible for multiple values to match this put.
745  * In that case, all of the respective values are updated.
746  *
747  * @param uid unique identifier of the datum
748  * @param delta by how much should the priority
749  *     change?  If priority + delta < 0 the
750  *     priority should be set to 0 (never go
751  *     negative).
752  * @param expire new expiration time should be the
753  *     MAX of any existing expiration time and
754  *     this value
755  * @param msg set to an error message
756  * @return GNUNET_OK on success
757  */
758 static int
759 sqlite_plugin_update (void *cls,
760                       uint64_t uid,
761                       int delta, struct GNUNET_TIME_Absolute expire,
762                       char **msg)
763 {
764   struct Plugin *plugin = cls;
765   int n;
766
767   sqlite3_bind_int (plugin->updPrio, 1, delta);
768   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
769   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
770   n = sqlite3_step (plugin->updPrio);
771   if (n != SQLITE_DONE)
772     LOG_SQLITE (plugin, msg,
773                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
774                 "sqlite3_step");
775 #if DEBUG_SQLITE
776   else
777     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
778                      "sqlite",
779                      "Block updated\n");
780 #endif
781   sqlite3_reset (plugin->updPrio);
782
783   if (n == SQLITE_BUSY)
784     return GNUNET_NO;
785   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
786 }
787
788
789 struct IterContext
790 {
791   sqlite3_stmt *stmt_1;
792   sqlite3_stmt *stmt_2;
793   int is_asc;
794   int is_prio;
795   int is_migr;
796   int limit_nonanonymous;
797   uint32_t type;
798 };
799
800
801 static int
802 iter_next_prepare (void *cls,
803                    struct NextContext *nc)
804 {
805   struct IterContext *ic = cls;
806   struct Plugin *plugin;
807   int ret;
808
809   if (nc == NULL)
810     {
811 #if DEBUG_SQLITE
812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                   "Asked to clean up iterator state.\n");
814 #endif
815       sqlite3_finalize (ic->stmt_1);
816       sqlite3_finalize (ic->stmt_2);
817       return GNUNET_SYSERR;
818     }
819   sqlite3_reset (ic->stmt_1);
820   sqlite3_reset (ic->stmt_2);
821   plugin = nc->plugin;
822   if (ic->is_prio)
823     {
824 #if DEBUG_SQLITE
825       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
826                   "Restricting to results larger than the last priority %u\n",
827                   nc->lastPriority);
828 #endif
829       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
830       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
831     }
832   else
833     {
834 #if DEBUG_SQLITE
835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
836                   "Restricting to results larger than the last expiration %llu\n",
837                   (unsigned long long) nc->lastExpiration.value);
838 #endif
839       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
840       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
841     }
842 #if DEBUG_SQLITE
843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844               "Restricting to results larger than the last key `%s'\n",
845               GNUNET_h2s(&nc->lastKey));
846 #endif
847   sqlite3_bind_blob (ic->stmt_1, 2, 
848                      &nc->lastKey, 
849                      sizeof (GNUNET_HashCode),
850                      SQLITE_TRANSIENT);
851   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
852     {      
853 #if DEBUG_SQLITE
854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855                   "Result found using iterator 1\n");
856 #endif
857       nc->stmt = ic->stmt_1;
858       return GNUNET_OK;
859     }
860   if (ret != SQLITE_DONE)
861     {
862       LOG_SQLITE (plugin, NULL,
863                   GNUNET_ERROR_TYPE_ERROR |
864                   GNUNET_ERROR_TYPE_BULK,
865                   "sqlite3_step");
866       return GNUNET_SYSERR;
867     }
868   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
869     LOG_SQLITE (plugin, NULL,
870                 GNUNET_ERROR_TYPE_ERROR | 
871                 GNUNET_ERROR_TYPE_BULK, 
872                 "sqlite3_reset");
873   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
874     {
875 #if DEBUG_SQLITE
876       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877                   "Result found using iterator 2\n");
878 #endif
879       nc->stmt = ic->stmt_2;
880       return GNUNET_OK;
881     }
882   if (ret != SQLITE_DONE)
883     {
884       LOG_SQLITE (plugin, NULL,
885                   GNUNET_ERROR_TYPE_ERROR |
886                   GNUNET_ERROR_TYPE_BULK,
887                   "sqlite3_step");
888       return GNUNET_SYSERR;
889     }
890   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
891     LOG_SQLITE (plugin, NULL,
892                 GNUNET_ERROR_TYPE_ERROR |
893                 GNUNET_ERROR_TYPE_BULK,
894                 "sqlite3_reset");
895 #if DEBUG_SQLITE
896   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
897               "No result found using either iterator\n");
898 #endif
899   return GNUNET_NO;
900 }
901
902
903 /**
904  * Call a method for each key in the database and
905  * call the callback method on it.
906  *
907  * @param type entries of which type should be considered?
908  * @param iter function to call on each matching value;
909  *        will be called once with a NULL value at the end
910  * @param iter_cls closure for iter
911  * @return the number of results processed,
912  *         GNUNET_SYSERR on error
913  */
914 static void
915 basic_iter (struct Plugin *plugin,
916             uint32_t type,
917             int is_asc,
918             int is_prio,
919             int is_migr,
920             int limit_nonanonymous,
921             const char *stmt_str_1,
922             const char *stmt_str_2,
923             PluginIterator iter,
924             void *iter_cls)
925 {
926   static struct GNUNET_TIME_Absolute zero;
927   struct NextContext *nc;
928   struct IterContext *ic;
929   sqlite3_stmt *stmt_1;
930   sqlite3_stmt *stmt_2;
931
932 #if DEBUG_SQLITE
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "At %llu, using queries `%s' and `%s'\n",
935               (unsigned long long) GNUNET_TIME_absolute_get ().value,
936               stmt_str_1,
937               stmt_str_2);
938 #endif
939   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
940     {
941       LOG_SQLITE (plugin, NULL,
942                   GNUNET_ERROR_TYPE_ERROR |
943                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
944       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
945       return;
946     }
947   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
948     {
949       LOG_SQLITE (plugin, NULL,
950                   GNUNET_ERROR_TYPE_ERROR |
951                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
952       sqlite3_finalize (stmt_1);
953       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
954       return;
955     }
956   nc = GNUNET_malloc (sizeof(struct NextContext) + 
957                       sizeof(struct IterContext));
958   nc->plugin = plugin;
959   nc->iter = iter;
960   nc->iter_cls = iter_cls;
961   nc->stmt = NULL;
962   ic = (struct IterContext*) &nc[1];
963   ic->stmt_1 = stmt_1;
964   ic->stmt_2 = stmt_2;
965   ic->type = type;
966   ic->is_asc = is_asc;
967   ic->is_prio = is_prio;
968   ic->is_migr = is_migr;
969   ic->limit_nonanonymous = limit_nonanonymous;
970   nc->prep = &iter_next_prepare;
971   nc->prep_cls = ic;
972   if (is_asc)
973     {
974       nc->lastPriority = 0;
975       nc->lastExpiration.value = 0;
976       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
977     }
978   else
979     {
980       nc->lastPriority = 0x7FFFFFFF;
981       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
982       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
983     }
984   sqlite_next_request (nc, GNUNET_NO);
985 }
986
987
988 /**
989  * Select a subset of the items in the datastore and call
990  * the given iterator for each of them.
991  *
992  * @param type entries of which type should be considered?
993  *        Use 0 for any type.
994  * @param iter function to call on each matching value;
995  *        will be called once with a NULL value at the end
996  * @param iter_cls closure for iter
997  */
998 static void
999 sqlite_plugin_iter_low_priority (void *cls,
1000                                  uint32_t type,
1001                                  PluginIterator iter,
1002                                  void *iter_cls)
1003 {
1004   basic_iter (cls,
1005               type, 
1006               GNUNET_YES, GNUNET_YES, 
1007               GNUNET_NO, GNUNET_NO,
1008               SELECT_IT_LOW_PRIORITY_1,
1009               SELECT_IT_LOW_PRIORITY_2, 
1010               iter, iter_cls);
1011 }
1012
1013
1014 /**
1015  * Select a subset of the items in the datastore and call
1016  * the given iterator for each of them.
1017  *
1018  * @param type entries of which type should be considered?
1019  *        Use 0 for any type.
1020  * @param iter function to call on each matching value;
1021  *        will be called once with a NULL value at the end
1022  * @param iter_cls closure for iter
1023  */
1024 static void
1025 sqlite_plugin_iter_zero_anonymity (void *cls,
1026                                    uint32_t type,
1027                                    PluginIterator iter,
1028                                    void *iter_cls)
1029 {
1030   struct GNUNET_TIME_Absolute now;
1031   char *q1;
1032   char *q2;
1033
1034   now = GNUNET_TIME_absolute_get ();
1035   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1036                    (unsigned long long) now.value);
1037   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1038                    (unsigned long long) now.value);
1039   basic_iter (cls,
1040               type, 
1041               GNUNET_NO, GNUNET_YES, 
1042               GNUNET_NO, GNUNET_YES,
1043               q1,
1044               q2,
1045               iter, iter_cls);
1046   GNUNET_free (q1);
1047   GNUNET_free (q2);
1048 }
1049
1050
1051
1052 /**
1053  * Select a subset of the items in the datastore and call
1054  * the given iterator for each of them.
1055  *
1056  * @param type entries of which type should be considered?
1057  *        Use 0 for any type.
1058  * @param iter function to call on each matching value;
1059  *        will be called once with a NULL value at the end
1060  * @param iter_cls closure for iter
1061  */
1062 static void
1063 sqlite_plugin_iter_ascending_expiration (void *cls,
1064                                          uint32_t type,
1065                                          PluginIterator iter,
1066                                          void *iter_cls)
1067 {
1068   struct GNUNET_TIME_Absolute now;
1069   char *q1;
1070   char *q2;
1071
1072   now = GNUNET_TIME_absolute_get ();
1073   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1074                    (unsigned long long) 0*now.value);
1075   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1076                    (unsigned long long) 0*now.value);
1077   basic_iter (cls,
1078               type, 
1079               GNUNET_YES, GNUNET_NO, 
1080               GNUNET_NO, GNUNET_NO,
1081               q1, q2,
1082               iter, iter_cls);
1083   GNUNET_free (q1);
1084   GNUNET_free (q2);
1085 }
1086
1087
1088 /**
1089  * Select a subset of the items in the datastore and call
1090  * the given iterator for each of them.
1091  *
1092  * @param type entries of which type should be considered?
1093  *        Use 0 for any type.
1094  * @param iter function to call on each matching value;
1095  *        will be called once with a NULL value at the end
1096  * @param iter_cls closure for iter
1097  */
1098 static void
1099 sqlite_plugin_iter_migration_order (void *cls,
1100                                     uint32_t type,
1101                                     PluginIterator iter,
1102                                     void *iter_cls)
1103 {
1104   struct GNUNET_TIME_Absolute now;
1105   char *q;
1106
1107   now = GNUNET_TIME_absolute_get ();
1108   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1109                    (unsigned long long) now.value);
1110   basic_iter (cls,
1111               type, 
1112               GNUNET_NO, GNUNET_NO, 
1113               GNUNET_YES, GNUNET_NO,
1114               SELECT_IT_MIGRATION_ORDER_1,
1115               q,
1116               iter, iter_cls);
1117   GNUNET_free (q);
1118 }
1119
1120
1121 static int
1122 all_next_prepare (void *cls,
1123                   struct NextContext *nc)
1124 {
1125   struct Plugin *plugin;
1126   int ret;
1127
1128   if (nc == NULL)
1129     {
1130 #if DEBUG_SQLITE
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Asked to clean up iterator state.\n");
1133 #endif
1134       return GNUNET_SYSERR;
1135     }
1136   plugin = nc->plugin;
1137   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1138     {      
1139       return GNUNET_OK;
1140     }
1141   if (ret != SQLITE_DONE)
1142     {
1143       LOG_SQLITE (plugin, NULL,
1144                   GNUNET_ERROR_TYPE_ERROR |
1145                   GNUNET_ERROR_TYPE_BULK,
1146                   "sqlite3_step");
1147       return GNUNET_SYSERR;
1148     }
1149   return GNUNET_NO;
1150 }
1151
1152
1153 /**
1154  * Select a subset of the items in the datastore and call
1155  * the given iterator for each of them.
1156  *
1157  * @param type entries of which type should be considered?
1158  *        Use 0 for any type.
1159  * @param iter function to call on each matching value;
1160  *        will be called once with a NULL value at the end
1161  * @param iter_cls closure for iter
1162  */
1163 static void
1164 sqlite_plugin_iter_all_now (void *cls,
1165                             uint32_t type,
1166                             PluginIterator iter,
1167                             void *iter_cls)
1168 {
1169   static struct GNUNET_TIME_Absolute zero;
1170   struct Plugin *plugin = cls;
1171   struct NextContext *nc;
1172   sqlite3_stmt *stmt;
1173
1174   if (sq_prepare (plugin->dbh, 
1175                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1176                   &stmt) != SQLITE_OK)
1177     {
1178       LOG_SQLITE (plugin, NULL,
1179                   GNUNET_ERROR_TYPE_ERROR |
1180                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1181       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1182       return;
1183     }
1184   nc = GNUNET_malloc (sizeof(struct NextContext));
1185   nc->plugin = plugin;
1186   nc->iter = iter;
1187   nc->iter_cls = iter_cls;
1188   nc->stmt = stmt;
1189   nc->prep = &all_next_prepare;
1190   nc->prep_cls = NULL;
1191   sqlite_next_request (nc, GNUNET_NO);
1192 }
1193
1194
1195 struct GetNextContext
1196 {
1197   int total;
1198   int off;
1199   int have_vhash;
1200   unsigned int type;
1201   sqlite3_stmt *stmt;
1202   GNUNET_HashCode key;
1203   GNUNET_HashCode vhash;
1204 };
1205
1206
1207 static int
1208 get_next_prepare (void *cls,
1209                   struct NextContext *nc)
1210 {
1211   struct GetNextContext *gnc = cls;
1212   int sqoff;
1213   int ret;
1214   int limit_off;
1215
1216   if (nc == NULL)
1217     {
1218       sqlite3_finalize (gnc->stmt);
1219       return GNUNET_SYSERR;
1220     }
1221   if (nc->count == gnc->total)
1222     return GNUNET_NO;
1223   if (nc->count + gnc->off == gnc->total)
1224     nc->last_rowid = 0;
1225   if (nc->count == 0)
1226     limit_off = gnc->off;
1227   else
1228     limit_off = 0;
1229   sqoff = 1;
1230   sqlite3_reset (nc->stmt);
1231   ret = sqlite3_bind_blob (nc->stmt,
1232                            sqoff++,
1233                            &gnc->key, 
1234                            sizeof (GNUNET_HashCode),
1235                            SQLITE_TRANSIENT);
1236   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1237     ret = sqlite3_bind_blob (nc->stmt,
1238                              sqoff++,
1239                              &gnc->vhash,
1240                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1241   if ((gnc->type != 0) && (ret == SQLITE_OK))
1242     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1243   if (ret == SQLITE_OK)
1244     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1245   if (ret == SQLITE_OK)
1246     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1247   if (ret != SQLITE_OK)
1248     return GNUNET_SYSERR;
1249   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1250     return GNUNET_NO;
1251   return GNUNET_OK;
1252 }
1253
1254
1255 /**
1256  * Iterate over the results for a particular key
1257  * in the datastore.
1258  *
1259  * @param cls closure
1260  * @param key maybe NULL (to match all entries)
1261  * @param vhash hash of the value, maybe NULL (to
1262  *        match all values that have the right key).
1263  *        Note that for DBlocks there is no difference
1264  *        betwen key and vhash, but for other blocks
1265  *        there may be!
1266  * @param type entries of which type are relevant?
1267  *     Use 0 for any type.
1268  * @param iter function to call on each matching value;
1269  *        will be called once with a NULL value at the end
1270  * @param iter_cls closure for iter
1271  */
1272 static void
1273 sqlite_plugin_get (void *cls,
1274                    const GNUNET_HashCode * key,
1275                    const GNUNET_HashCode * vhash,
1276                    uint32_t type,
1277                    PluginIterator iter, void *iter_cls)
1278 {
1279   static struct GNUNET_TIME_Absolute zero;
1280   struct Plugin *plugin = cls;
1281   struct GetNextContext *gpc;
1282   struct NextContext *nc;
1283   int ret;
1284   int total;
1285   sqlite3_stmt *stmt;
1286   char scratch[256];
1287   int sqoff;
1288
1289   GNUNET_assert (iter != NULL);
1290   if (key == NULL)
1291     {
1292       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1293       return;
1294     }
1295   GNUNET_snprintf (scratch, 256,
1296                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1297                    vhash == NULL ? "" : " AND vhash=:2",
1298                    type == 0 ? "" : (vhash ==
1299                                      NULL) ? " AND type=:2" : " AND type=:3");
1300   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1301     {
1302       LOG_SQLITE (plugin, NULL,
1303                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1304       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1305       return;
1306     }
1307   sqoff = 1;
1308   ret = sqlite3_bind_blob (stmt,
1309                            sqoff++,
1310                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1311   if ((vhash != NULL) && (ret == SQLITE_OK))
1312     ret = sqlite3_bind_blob (stmt,
1313                              sqoff++,
1314                              vhash,
1315                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1316   if ((type != 0) && (ret == SQLITE_OK))
1317     ret = sqlite3_bind_int (stmt, sqoff++, type);
1318   if (SQLITE_OK != ret)
1319     {
1320       LOG_SQLITE (plugin, NULL,
1321                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1322       sqlite3_reset (stmt);
1323       sqlite3_finalize (stmt);
1324       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1325       return;
1326     }
1327   ret = sqlite3_step (stmt);
1328   if (ret != SQLITE_ROW)
1329     {
1330       LOG_SQLITE (plugin, NULL,
1331                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1332                   "sqlite_step");
1333       sqlite3_reset (stmt);
1334       sqlite3_finalize (stmt);
1335       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1336       return;
1337     }
1338   total = sqlite3_column_int (stmt, 0);
1339   sqlite3_reset (stmt);
1340   sqlite3_finalize (stmt);
1341   if (0 == total)
1342     {
1343       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1344       return;
1345     }
1346
1347   GNUNET_snprintf (scratch, 256,
1348                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1349                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1350                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1351                    vhash == NULL ? "" : " AND vhash=:2",
1352                    type == 0 ? "" : (vhash ==
1353                                      NULL) ? " AND type=:2" : " AND type=:3",
1354                    sqoff, sqoff + 1);
1355   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1356     {
1357       LOG_SQLITE (plugin, NULL,
1358                   GNUNET_ERROR_TYPE_ERROR |
1359                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1360       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, zero, 0);
1361       return;
1362     }
1363   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1364                       sizeof(struct GetNextContext));
1365   nc->plugin = plugin;
1366   nc->iter = iter;
1367   nc->iter_cls = iter_cls;
1368   nc->stmt = stmt;
1369   gpc = (struct GetNextContext*) &nc[1];
1370   gpc->total = total;
1371   gpc->type = type;
1372   gpc->key = *key;
1373   gpc->stmt = stmt; /* alias used for freeing at the end! */
1374   if (NULL != vhash)
1375     {
1376       gpc->have_vhash = GNUNET_YES;
1377       gpc->vhash = *vhash;
1378     }
1379   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1380   nc->prep = &get_next_prepare;
1381   nc->prep_cls = gpc;
1382   sqlite_next_request (nc, GNUNET_NO);
1383 }
1384
1385
1386 /**
1387  * Drop database.
1388  */
1389 static void 
1390 sqlite_plugin_drop (void *cls)
1391 {
1392   struct Plugin *plugin = cls;
1393   plugin->drop_on_shutdown = GNUNET_YES;
1394 }
1395
1396
1397 /**
1398  * Callback function to process statistic values.
1399  *
1400  * @param cls closure
1401  * @param subsystem name of subsystem that created the statistic
1402  * @param name the name of the datum
1403  * @param value the current value
1404  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1405  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1406  */
1407 static int
1408 process_stat_in (void *cls,
1409                  const char *subsystem,
1410                  const char *name,
1411                  unsigned long long value,
1412                  int is_persistent)
1413 {
1414   struct Plugin *plugin = cls;
1415   plugin->payload += value;
1416   return GNUNET_OK;
1417 }
1418                                          
1419
1420 /**
1421  * Entry point for the plugin.
1422  */
1423 void *
1424 libgnunet_plugin_datastore_sqlite_init (void *cls)
1425 {
1426   static struct Plugin plugin;
1427   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1428   struct GNUNET_DATASTORE_PluginFunctions *api;
1429
1430   if (plugin.env != NULL)
1431     return NULL; /* can only initialize once! */
1432   memset (&plugin, 0, sizeof(struct Plugin));
1433   plugin.env = env;
1434   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1435                                                 "sqlite",
1436                                                 env->cfg);
1437   GNUNET_STATISTICS_get (plugin.statistics,
1438                          "sqlite",
1439                          QUOTA_STAT_NAME,
1440                          GNUNET_TIME_UNIT_MINUTES,
1441                          NULL,
1442                          &process_stat_in,
1443                          &plugin);
1444   if (GNUNET_OK !=
1445       database_setup (env->cfg, &plugin))
1446     {
1447       database_shutdown (&plugin);
1448       return NULL;
1449     }
1450   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1451   api->cls = &plugin;
1452   api->get_size = &sqlite_plugin_get_size;
1453   api->put = &sqlite_plugin_put;
1454   api->next_request = &sqlite_next_request;
1455   api->get = &sqlite_plugin_get;
1456   api->update = &sqlite_plugin_update;
1457   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1458   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1459   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1460   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1461   api->iter_all_now = &sqlite_plugin_iter_all_now;
1462   api->drop = &sqlite_plugin_drop;
1463   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1464                    "sqlite", _("Sqlite database running\n"));
1465   return api;
1466 }
1467
1468
1469 /**
1470  * Exit point from the plugin.
1471  */
1472 void *
1473 libgnunet_plugin_datastore_sqlite_done (void *cls)
1474 {
1475   char *fn;
1476   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1477   struct Plugin *plugin = api->cls;
1478
1479   fn = NULL;
1480   if (plugin->drop_on_shutdown)
1481     fn = GNUNET_strdup (plugin->fn);
1482   database_shutdown (plugin);
1483   plugin->env = NULL; 
1484   plugin->payload = 0;
1485   GNUNET_free (api);
1486   if (fn != NULL)
1487     {
1488       if (0 != UNLINK(fn))
1489         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1490                                   "unlink",
1491                                   fn);
1492       GNUNET_free (fn);
1493     }
1494   return NULL;
1495 }
1496
1497 /* end of plugin_datastore_sqlite.c */