fixing comments and minor bugs
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_NO
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94 /**
95  * Context for all functions in this plugin.
96  */
97 struct Plugin 
98 {
99   /**
100    * Our execution environment.
101    */
102   struct GNUNET_DATASTORE_PluginEnvironment *env;
103
104   /**
105    * Database filename.
106    */
107   char *fn;
108
109   /**
110    * Native SQLite database handle.
111    */
112   sqlite3 *dbh;
113
114   /**
115    * Precompiled SQL for update.
116    */
117   sqlite3_stmt *updPrio;
118
119   /**
120    * Precompiled SQL for insertion.
121    */
122   sqlite3_stmt *insertContent;
123
124   /**
125    * Handle to the statistics service.
126    */
127   struct GNUNET_STATISTICS_Handle *statistics;
128   
129   /**
130    * How much data are we currently storing
131    * in the database?
132    */
133   unsigned long long payload;
134
135   /**
136    * Number of updates that were made to the
137    * payload value since we last synchronized
138    * it with the statistics service.
139    */
140   unsigned int lastSync;
141
142   /**
143    * Should the database be dropped on shutdown?
144    */
145   int drop_on_shutdown;
146 };
147
148
149 /**
150  * @brief Prepare a SQL statement
151  *
152  * @param dbh handle to the database
153  * @param zSql SQL statement, UTF-8 encoded
154  * @param ppStmt set to the prepared statement
155  * @return 0 on success
156  */
157 static int
158 sq_prepare (sqlite3 * dbh, const char *zSql,
159             sqlite3_stmt ** ppStmt)
160 {
161   char *dummy;
162   return sqlite3_prepare (dbh,
163                           zSql,
164                           strlen (zSql), ppStmt, (const char **) &dummy);
165 }
166
167
168 /**
169  * Create our database indices.
170  * 
171  * @param dbh handle to the database
172  */
173 static void
174 create_indices (sqlite3 * dbh)
175 {
176   /* create indices */
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
179   sqlite3_exec (dbh,
180                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
181                 NULL, NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
183                 NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
185                 NULL);
186   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
187                 NULL, NULL);
188   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
189                 NULL, NULL, NULL);
190   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
191                 NULL, NULL);
192 }
193
194
195
196 #if 1
197 #define CHECK(a) GNUNET_break(a)
198 #define ENULL NULL
199 #else
200 #define ENULL &e
201 #define ENULL_DEFINED 1
202 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
203 #endif
204
205
206
207
208 /**
209  * Initialize the database connections and associated
210  * data structures (create tables and indices
211  * as needed as well).
212  *
213  * @param cfg our configuration
214  * @param plugin the plugin context (state for this module)
215  * @return GNUNET_OK on success
216  */
217 static int
218 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
219                 struct Plugin *plugin)
220 {
221   sqlite3_stmt *stmt;
222   char *afsdir;
223 #if ENULL_DEFINED
224   char *e;
225 #endif
226   
227   if (GNUNET_OK != 
228       GNUNET_CONFIGURATION_get_value_filename (cfg,
229                                                "datastore-sqlite",
230                                                "FILENAME",
231                                                &afsdir))
232     {
233       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
234                        "sqlite",
235                        _("Option `%s' in section `%s' missing in configuration!\n"),
236                        "FILENAME",
237                        "datastore-sqlite");
238       return GNUNET_SYSERR;
239     }
240   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
241     {
242       GNUNET_break (0);
243       GNUNET_free (afsdir);
244       return GNUNET_SYSERR;
245     }
246   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
247 #ifdef ENABLE_NLS
248                                               nl_langinfo (CODESET)
249 #else
250                                               "UTF-8"   /* good luck */
251 #endif
252                                               );
253   GNUNET_free (afsdir);
254   
255   /* Open database and precompile statements */
256   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
257     {
258       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
259                        "sqlite",
260                        _("Unable to initialize SQLite: %s.\n"),
261                        sqlite3_errmsg (plugin->dbh));
262       return GNUNET_SYSERR;
263     }
264   CHECK (SQLITE_OK ==
265          sqlite3_exec (plugin->dbh,
266                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh,
269                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
270   CHECK (SQLITE_OK ==
271          sqlite3_exec (plugin->dbh,
272                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
273   CHECK (SQLITE_OK ==
274          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
275
276   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
277
278
279   /* We have to do it here, because otherwise precompiling SQL might fail */
280   CHECK (SQLITE_OK ==
281          sq_prepare (plugin->dbh,
282                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
283                      &stmt));
284   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
285        (sqlite3_exec (plugin->dbh,
286                       "CREATE TABLE gn080 ("
287                       "  size INT4 NOT NULL DEFAULT 0,"
288                       "  type INT4 NOT NULL DEFAULT 0,"
289                       "  prio INT4 NOT NULL DEFAULT 0,"
290                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
291                       "  expire INT8 NOT NULL DEFAULT 0,"
292                       "  hash TEXT NOT NULL DEFAULT '',"
293                       "  vhash TEXT NOT NULL DEFAULT '',"
294                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
295                       NULL) != SQLITE_OK) )
296     {
297       LOG_SQLITE (plugin, NULL,
298                   GNUNET_ERROR_TYPE_ERROR, 
299                   "sqlite3_exec");
300       sqlite3_finalize (stmt);
301       return GNUNET_SYSERR;
302     }
303   sqlite3_finalize (stmt);
304   create_indices (plugin->dbh);
305
306   CHECK (SQLITE_OK ==
307          sq_prepare (plugin->dbh,
308                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
309                      &stmt));
310   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
311        (sqlite3_exec (plugin->dbh,
312                       "CREATE TABLE gn071 ("
313                       "  key TEXT NOT NULL DEFAULT '',"
314                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
315                       NULL) != SQLITE_OK) )
316     {
317       LOG_SQLITE (plugin, NULL,
318                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
319       sqlite3_finalize (stmt);
320       return GNUNET_SYSERR;
321     }
322   sqlite3_finalize (stmt);
323
324   if ((sq_prepare (plugin->dbh,
325                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
326                    "_ROWID_ = ?",
327                    &plugin->updPrio) != SQLITE_OK) ||
328       (sq_prepare (plugin->dbh,
329                    "INSERT INTO gn080 (size, type, prio, "
330                    "anonLevel, expire, hash, vhash, value) VALUES "
331                    "(?, ?, ?, ?, ?, ?, ?, ?)",
332                    &plugin->insertContent) != SQLITE_OK))
333     {
334       LOG_SQLITE (plugin, NULL,
335                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
336       return GNUNET_SYSERR;
337     }
338   return GNUNET_OK;
339 }
340
341
342 /**
343  * Synchronize our utilization statistics with the 
344  * statistics service.
345  * @param plugin the plugin context (state for this module)
346  */
347 static void 
348 sync_stats (struct Plugin *plugin)
349 {
350   GNUNET_STATISTICS_set (plugin->statistics,
351                          QUOTA_STAT_NAME,
352                          plugin->payload,
353                          GNUNET_YES);
354   plugin->lastSync = 0;
355 }
356
357
358 /**
359  * Shutdown database connection and associate data
360  * structures.
361  * @param plugin the plugin context (state for this module)
362  */
363 static void
364 database_shutdown (struct Plugin *plugin)
365 {
366   if (plugin->lastSync > 0)
367     sync_stats (plugin);
368   if (plugin->updPrio != NULL)
369     sqlite3_finalize (plugin->updPrio);
370   if (plugin->insertContent != NULL)
371     sqlite3_finalize (plugin->insertContent);
372   sqlite3_close (plugin->dbh);
373   GNUNET_free_non_null (plugin->fn);
374 }
375
376
377 /**
378  * Get an estimate of how much space the database is
379  * currently using.
380  *
381  * @param cls our plugin context
382  * @return number of bytes used on disk
383  */
384 static unsigned long long sqlite_plugin_get_size (void *cls)
385 {
386   struct Plugin *plugin = cls;
387   return plugin->payload;
388 }
389
390
391 /**
392  * Delete the database entry with the given
393  * row identifier.
394  *
395  * @param plugin the plugin context (state for this module)
396  * @param rid the ID of the row to delete
397  */
398 static int
399 delete_by_rowid (struct Plugin* plugin, 
400                  unsigned long long rid)
401 {
402   sqlite3_stmt *stmt;
403
404   if (sq_prepare (plugin->dbh,
405                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
406     {
407       LOG_SQLITE (plugin, NULL,
408                   GNUNET_ERROR_TYPE_ERROR |
409                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
410       return GNUNET_SYSERR;
411     }
412   sqlite3_bind_int64 (stmt, 1, rid);
413   if (SQLITE_DONE != sqlite3_step (stmt))
414     {
415       LOG_SQLITE (plugin, NULL,
416                   GNUNET_ERROR_TYPE_ERROR |
417                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
418       sqlite3_finalize (stmt);
419       return GNUNET_SYSERR;
420     }
421   sqlite3_finalize (stmt);
422   return GNUNET_OK;
423 }
424
425
426 /**
427  * Context for the universal iterator.
428  */
429 struct NextContext;
430
431 /**
432  * Type of a function that will prepare
433  * the next iteration.
434  *
435  * @param cls closure
436  * @param nc the next context; NULL for the last
437  *         call which gives the callback a chance to
438  *         clean up the closure
439  * @return GNUNET_OK on success, GNUNET_NO if there are
440  *         no more values, GNUNET_SYSERR on error
441  */
442 typedef int (*PrepareFunction)(void *cls,
443                                struct NextContext *nc);
444
445
446 /**
447  * Context we keep for the "next request" callback.
448  */
449 struct NextContext
450 {
451   /**
452    * Internal state.
453    */ 
454   struct Plugin *plugin;
455
456   /**
457    * Function to call on the next value.
458    */
459   PluginIterator iter;
460
461   /**
462    * Closure for iter.
463    */
464   void *iter_cls;
465
466   /**
467    * Function to call to prepare the next
468    * iteration.
469    */
470   PrepareFunction prep;
471
472   /**
473    * Closure for prep.
474    */
475   void *prep_cls;
476
477   /**
478    * Statement that the iterator will get the data
479    * from (updated or set by prep).
480    */ 
481   sqlite3_stmt *stmt;
482
483   /**
484    * Row ID of the last result.
485    */
486   unsigned long long last_rowid;
487
488   /**
489    * Key of the last result.
490    */
491   GNUNET_HashCode lastKey;  
492
493   /**
494    * Expiration time of the last value visited.
495    */
496   struct GNUNET_TIME_Absolute lastExpiration;
497
498   /**
499    * Priority of the last value visited.
500    */ 
501   unsigned int lastPriority; 
502
503   /**
504    * Number of results processed so far.
505    */
506   unsigned int count;
507
508   /**
509    * Set to GNUNET_YES if we must stop now.
510    */
511   int end_it;
512 };
513
514
515 /**
516  * Continuation of "sqlite_next_request".
517  *
518  * @param cls the next context
519  * @param tc the task context (unused)
520  */
521 static void 
522 sqlite_next_request_cont (void *cls,
523                           const struct GNUNET_SCHEDULER_TaskContext *tc)
524 {
525   struct NextContext * nc= cls;
526   struct Plugin *plugin;
527   unsigned long long rowid;
528   sqlite3_stmt *stmtd;
529   int ret;
530   unsigned int type;
531   unsigned int size;
532   unsigned int priority;
533   unsigned int anonymity;
534   struct GNUNET_TIME_Absolute expiration;
535   const GNUNET_HashCode *key;
536   const void *data;
537
538  
539   plugin = nc->plugin;
540   if ( (GNUNET_YES == nc->end_it) ||
541        (GNUNET_OK != (nc->prep(nc->prep_cls,
542                                nc))) )
543     {
544     END:
545       nc->iter (nc->iter_cls, 
546                 NULL, NULL, 0, NULL, 0, 0, 0, 
547                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
548       nc->prep (nc->prep_cls, NULL);
549       GNUNET_free (nc);
550       return;
551     }
552
553   rowid = sqlite3_column_int64 (nc->stmt, 7);
554   nc->last_rowid = rowid;
555   type = sqlite3_column_int (nc->stmt, 1);
556   size = sqlite3_column_bytes (nc->stmt, 6);
557   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
558     {
559       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
560                        "sqlite",
561                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
562       if (SQLITE_OK != sqlite3_reset (nc->stmt))
563         LOG_SQLITE (nc->plugin, NULL,
564                     GNUNET_ERROR_TYPE_ERROR |
565                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
566       if (sq_prepare
567           (nc->plugin->dbh,
568            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
569            &stmtd) != SQLITE_OK)
570         {
571           LOG_SQLITE (nc->plugin, NULL,
572                       GNUNET_ERROR_TYPE_ERROR |
573                       GNUNET_ERROR_TYPE_BULK, 
574                       "sq_prepare");
575           goto END;
576         }
577
578       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
579         LOG_SQLITE (nc->plugin, NULL,
580                     GNUNET_ERROR_TYPE_ERROR |
581                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
582       if (SQLITE_DONE != sqlite3_step (stmtd))
583         LOG_SQLITE (nc->plugin, NULL,
584                     GNUNET_ERROR_TYPE_ERROR |
585                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
586       if (SQLITE_OK != sqlite3_finalize (stmtd))
587         LOG_SQLITE (nc->plugin, NULL,
588                     GNUNET_ERROR_TYPE_ERROR |
589                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
590       goto END;
591     }
592
593   priority = sqlite3_column_int (nc->stmt, 2);
594   anonymity = sqlite3_column_int (nc->stmt, 3);
595   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
596   key = sqlite3_column_blob (nc->stmt, 5);
597   nc->lastPriority = priority;
598   nc->lastExpiration = expiration;
599   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
600   data = sqlite3_column_blob (nc->stmt, 6);
601   nc->count++;
602   ret = nc->iter (nc->iter_cls,
603                   nc,
604                   key,
605                   size,
606                   data, 
607                   type,
608                   priority,
609                   anonymity,
610                   expiration,
611                   rowid);
612   if (ret == GNUNET_SYSERR)
613     {
614       nc->end_it = GNUNET_YES;
615       return;
616     }
617   if ( (ret == GNUNET_NO) &&
618        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
619     {
620       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
621       plugin->lastSync++; 
622       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
623         sync_stats (plugin);
624     }
625 }
626
627
628 /**
629  * Function invoked on behalf of a "PluginIterator"
630  * asking the database plugin to call the iterator
631  * with the next item.
632  *
633  * @param next_cls whatever argument was given
634  *        to the PluginIterator as "next_cls".
635  * @param end_it set to GNUNET_YES if we
636  *        should terminate the iteration early
637  *        (iterator should be still called once more
638  *         to signal the end of the iteration).
639  */
640 static void 
641 sqlite_next_request (void *next_cls,
642                      int end_it)
643 {
644   struct NextContext * nc= next_cls;
645
646   if (GNUNET_YES == end_it)
647     nc->end_it = GNUNET_YES;
648   GNUNET_SCHEDULER_add_continuation (nc->plugin->env->sched,
649                                      GNUNET_NO,
650                                      &sqlite_next_request_cont,
651                                      nc,
652                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
653 }
654
655
656
657 /**
658  * Store an item in the datastore.
659  *
660  * @param cls closure
661  * @param key key for the item
662  * @param size number of bytes in data
663  * @param data content stored
664  * @param type type of the content
665  * @param priority priority of the content
666  * @param anonymity anonymity-level for the content
667  * @param expiration expiration time for the content
668  * @param msg set to an error message
669  * @return GNUNET_OK on success
670  */
671 static int
672 sqlite_plugin_put (void *cls,
673                    const GNUNET_HashCode * key,
674                    uint32_t size,
675                    const void *data,
676                    uint32_t type,
677                    uint32_t priority,
678                    uint32_t anonymity,
679                    struct GNUNET_TIME_Absolute expiration,
680                    char ** msg)
681 {
682   struct Plugin *plugin = cls;
683   int n;
684   sqlite3_stmt *stmt;
685   GNUNET_HashCode vhash;
686
687 #if DEBUG_SQLITE
688   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
689                    "sqlite",
690                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
691                    type, 
692                    GNUNET_h2s(key),
693                    priority,
694                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
695                    (long long) expiration.value);
696 #endif
697   GNUNET_CRYPTO_hash (data, size, &vhash);
698   stmt = plugin->insertContent;
699   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
700       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
701       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
702       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
703       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
704       (SQLITE_OK !=
705        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
706                           SQLITE_TRANSIENT)) ||
707       (SQLITE_OK !=
708        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
709                           SQLITE_TRANSIENT))
710       || (SQLITE_OK !=
711           sqlite3_bind_blob (stmt, 8, data, size,
712                              SQLITE_TRANSIENT)))
713     {
714       LOG_SQLITE (plugin,
715                   msg,
716                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
717       if (SQLITE_OK != sqlite3_reset (stmt))
718         LOG_SQLITE (plugin, NULL,
719                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
720       return GNUNET_SYSERR;
721     }
722   n = sqlite3_step (stmt);
723   if (n != SQLITE_DONE)
724     {
725       if (n == SQLITE_BUSY)
726         {
727           LOG_SQLITE (plugin, msg,
728                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
729           sqlite3_reset (stmt);
730           GNUNET_break (0);
731           return GNUNET_NO;
732         }
733       LOG_SQLITE (plugin, msg,
734                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
735       sqlite3_reset (stmt);
736       return GNUNET_SYSERR;
737     }
738   if (SQLITE_OK != sqlite3_reset (stmt))
739     LOG_SQLITE (plugin, NULL,
740                 GNUNET_ERROR_TYPE_ERROR |
741                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
742   plugin->lastSync++;
743   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
744   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
745     sync_stats (plugin);
746   return GNUNET_OK;
747 }
748
749
750 /**
751  * Update the priority for a particular key in the datastore.  If
752  * the expiration time in value is different than the time found in
753  * the datastore, the higher value should be kept.  For the
754  * anonymity level, the lower value is to be used.  The specified
755  * priority should be added to the existing priority, ignoring the
756  * priority in value.
757  *
758  * Note that it is possible for multiple values to match this put.
759  * In that case, all of the respective values are updated.
760  *
761  * @param cls the plugin context (state for this module)
762  * @param uid unique identifier of the datum
763  * @param delta by how much should the priority
764  *     change?  If priority + delta < 0 the
765  *     priority should be set to 0 (never go
766  *     negative).
767  * @param expire new expiration time should be the
768  *     MAX of any existing expiration time and
769  *     this value
770  * @param msg set to an error message
771  * @return GNUNET_OK on success
772  */
773 static int
774 sqlite_plugin_update (void *cls,
775                       uint64_t uid,
776                       int delta, struct GNUNET_TIME_Absolute expire,
777                       char **msg)
778 {
779   struct Plugin *plugin = cls;
780   int n;
781
782   sqlite3_bind_int (plugin->updPrio, 1, delta);
783   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
784   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
785   n = sqlite3_step (plugin->updPrio);
786   if (n != SQLITE_DONE)
787     LOG_SQLITE (plugin, msg,
788                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
789                 "sqlite3_step");
790 #if DEBUG_SQLITE
791   else
792     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
793                      "sqlite",
794                      "Block updated\n");
795 #endif
796   sqlite3_reset (plugin->updPrio);
797
798   if (n == SQLITE_BUSY)
799     return GNUNET_NO;
800   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
801 }
802
803
804 /**
805  * Internal context for an iteration.
806  */
807 struct IterContext
808 {
809   /**
810    * FIXME.
811    */
812   sqlite3_stmt *stmt_1;
813
814   /**
815    * FIXME.
816    */
817   sqlite3_stmt *stmt_2;
818
819   /**
820    * FIXME.
821    */
822   int is_asc;
823
824   /**
825    * FIXME.
826    */
827   int is_prio;
828
829   /**
830    * FIXME.
831    */
832   int is_migr;
833
834   /**
835    * FIXME.
836    */
837   int limit_nonanonymous;
838
839   /**
840    * Desired type for blocks returned by this iterator.
841    */
842   uint32_t type;
843 };
844
845
846 /**
847  * Prepare our SQL query to obtain the next record from the database.
848  *
849  * @param cls our "struct IterContext"
850  * @param nc NULL to terminate the iteration, otherwise our context for
851  *           getting the next result.
852  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
853  *         GNUNET_SYSERR on error (or end of iteration)
854  */
855 static int
856 iter_next_prepare (void *cls,
857                    struct NextContext *nc)
858 {
859   struct IterContext *ic = cls;
860   struct Plugin *plugin;
861   int ret;
862
863   if (nc == NULL)
864     {
865 #if DEBUG_SQLITE
866       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
867                   "Asked to clean up iterator state.\n");
868 #endif
869       sqlite3_finalize (ic->stmt_1);
870       sqlite3_finalize (ic->stmt_2);
871       return GNUNET_SYSERR;
872     }
873   sqlite3_reset (ic->stmt_1);
874   sqlite3_reset (ic->stmt_2);
875   plugin = nc->plugin;
876   if (ic->is_prio)
877     {
878 #if DEBUG_SQLITE
879       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
880                   "Restricting to results larger than the last priority %u\n",
881                   nc->lastPriority);
882 #endif
883       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
884       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
885     }
886   else
887     {
888 #if DEBUG_SQLITE
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Restricting to results larger than the last expiration %llu\n",
891                   (unsigned long long) nc->lastExpiration.value);
892 #endif
893       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
894       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
895     }
896 #if DEBUG_SQLITE
897   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
898               "Restricting to results larger than the last key `%s'\n",
899               GNUNET_h2s(&nc->lastKey));
900 #endif
901   sqlite3_bind_blob (ic->stmt_1, 2, 
902                      &nc->lastKey, 
903                      sizeof (GNUNET_HashCode),
904                      SQLITE_TRANSIENT);
905   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
906     {      
907 #if DEBUG_SQLITE
908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909                   "Result found using iterator 1\n");
910 #endif
911       nc->stmt = ic->stmt_1;
912       return GNUNET_OK;
913     }
914   if (ret != SQLITE_DONE)
915     {
916       LOG_SQLITE (plugin, NULL,
917                   GNUNET_ERROR_TYPE_ERROR |
918                   GNUNET_ERROR_TYPE_BULK,
919                   "sqlite3_step");
920       return GNUNET_SYSERR;
921     }
922   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
923     LOG_SQLITE (plugin, NULL,
924                 GNUNET_ERROR_TYPE_ERROR | 
925                 GNUNET_ERROR_TYPE_BULK, 
926                 "sqlite3_reset");
927   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
928     {
929 #if DEBUG_SQLITE
930       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
931                   "Result found using iterator 2\n");
932 #endif
933       nc->stmt = ic->stmt_2;
934       return GNUNET_OK;
935     }
936   if (ret != SQLITE_DONE)
937     {
938       LOG_SQLITE (plugin, NULL,
939                   GNUNET_ERROR_TYPE_ERROR |
940                   GNUNET_ERROR_TYPE_BULK,
941                   "sqlite3_step");
942       return GNUNET_SYSERR;
943     }
944   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
945     LOG_SQLITE (plugin, NULL,
946                 GNUNET_ERROR_TYPE_ERROR |
947                 GNUNET_ERROR_TYPE_BULK,
948                 "sqlite3_reset");
949 #if DEBUG_SQLITE
950   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951               "No result found using either iterator\n");
952 #endif
953   return GNUNET_NO;
954 }
955
956
957 /**
958  * Call a method for each key in the database and
959  * call the callback method on it.
960  *
961  * @param plugin our plugin context
962  * @param type entries of which type should be considered?
963  * @param is_asc are we iterating in ascending order?
964  * @param is_prio are we iterating by priority (otherwise by expiration)
965  * @param is_migr are we iterating in migration order?
966  * @param limit_nonanonymous are we restricting results to those with anonymity
967  *              level zero?
968  * @param stmt_str_1 first SQL statement to execute
969  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
970  * @param iter function to call on each matching value;
971  *        will be called once with a NULL value at the end
972  * @param iter_cls closure for iter
973  */
974 static void
975 basic_iter (struct Plugin *plugin,
976             uint32_t type,
977             int is_asc,
978             int is_prio,
979             int is_migr,
980             int limit_nonanonymous,
981             const char *stmt_str_1,
982             const char *stmt_str_2,
983             PluginIterator iter,
984             void *iter_cls)
985 {
986   struct NextContext *nc;
987   struct IterContext *ic;
988   sqlite3_stmt *stmt_1;
989   sqlite3_stmt *stmt_2;
990
991 #if DEBUG_SQLITE
992   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
993               "At %llu, using queries `%s' and `%s'\n",
994               (unsigned long long) GNUNET_TIME_absolute_get ().value,
995               stmt_str_1,
996               stmt_str_2);
997 #endif
998   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
999     {
1000       LOG_SQLITE (plugin, NULL,
1001                   GNUNET_ERROR_TYPE_ERROR |
1002                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1003       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1004       return;
1005     }
1006   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1007     {
1008       LOG_SQLITE (plugin, NULL,
1009                   GNUNET_ERROR_TYPE_ERROR |
1010                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1011       sqlite3_finalize (stmt_1);
1012       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1013       return;
1014     }
1015   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1016                       sizeof(struct IterContext));
1017   nc->plugin = plugin;
1018   nc->iter = iter;
1019   nc->iter_cls = iter_cls;
1020   nc->stmt = NULL;
1021   ic = (struct IterContext*) &nc[1];
1022   ic->stmt_1 = stmt_1;
1023   ic->stmt_2 = stmt_2;
1024   ic->type = type;
1025   ic->is_asc = is_asc;
1026   ic->is_prio = is_prio;
1027   ic->is_migr = is_migr;
1028   ic->limit_nonanonymous = limit_nonanonymous;
1029   nc->prep = &iter_next_prepare;
1030   nc->prep_cls = ic;
1031   if (is_asc)
1032     {
1033       nc->lastPriority = 0;
1034       nc->lastExpiration.value = 0;
1035       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1036     }
1037   else
1038     {
1039       nc->lastPriority = 0x7FFFFFFF;
1040       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1041       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1042     }
1043   sqlite_next_request (nc, GNUNET_NO);
1044 }
1045
1046
1047 /**
1048  * Select a subset of the items in the datastore and call
1049  * the given iterator for each of them.
1050  *
1051  * @param cls our plugin context
1052  * @param type entries of which type should be considered?
1053  *        Use 0 for any type.
1054  * @param iter function to call on each matching value;
1055  *        will be called once with a NULL value at the end
1056  * @param iter_cls closure for iter
1057  */
1058 static void
1059 sqlite_plugin_iter_low_priority (void *cls,
1060                                  uint32_t type,
1061                                  PluginIterator iter,
1062                                  void *iter_cls)
1063 {
1064   basic_iter (cls,
1065               type, 
1066               GNUNET_YES, GNUNET_YES, 
1067               GNUNET_NO, GNUNET_NO,
1068               SELECT_IT_LOW_PRIORITY_1,
1069               SELECT_IT_LOW_PRIORITY_2, 
1070               iter, iter_cls);
1071 }
1072
1073
1074 /**
1075  * Select a subset of the items in the datastore and call
1076  * the given iterator for each of them.
1077  *
1078  * @param cls our plugin context
1079  * @param type entries of which type should be considered?
1080  *        Use 0 for any type.
1081  * @param iter function to call on each matching value;
1082  *        will be called once with a NULL value at the end
1083  * @param iter_cls closure for iter
1084  */
1085 static void
1086 sqlite_plugin_iter_zero_anonymity (void *cls,
1087                                    uint32_t type,
1088                                    PluginIterator iter,
1089                                    void *iter_cls)
1090 {
1091   struct GNUNET_TIME_Absolute now;
1092   char *q1;
1093   char *q2;
1094
1095   now = GNUNET_TIME_absolute_get ();
1096   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1097                    (unsigned long long) now.value);
1098   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1099                    (unsigned long long) now.value);
1100   basic_iter (cls,
1101               type, 
1102               GNUNET_NO, GNUNET_YES, 
1103               GNUNET_NO, GNUNET_YES,
1104               q1,
1105               q2,
1106               iter, iter_cls);
1107   GNUNET_free (q1);
1108   GNUNET_free (q2);
1109 }
1110
1111
1112
1113 /**
1114  * Select a subset of the items in the datastore and call
1115  * the given iterator for each of them.
1116  *
1117  * @param cls our plugin context
1118  * @param type entries of which type should be considered?
1119  *        Use 0 for any type.
1120  * @param iter function to call on each matching value;
1121  *        will be called once with a NULL value at the end
1122  * @param iter_cls closure for iter
1123  */
1124 static void
1125 sqlite_plugin_iter_ascending_expiration (void *cls,
1126                                          uint32_t type,
1127                                          PluginIterator iter,
1128                                          void *iter_cls)
1129 {
1130   struct GNUNET_TIME_Absolute now;
1131   char *q1;
1132   char *q2;
1133
1134   now = GNUNET_TIME_absolute_get ();
1135   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1136                    (unsigned long long) 0*now.value);
1137   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1138                    (unsigned long long) 0*now.value);
1139   basic_iter (cls,
1140               type, 
1141               GNUNET_YES, GNUNET_NO, 
1142               GNUNET_NO, GNUNET_NO,
1143               q1, q2,
1144               iter, iter_cls);
1145   GNUNET_free (q1);
1146   GNUNET_free (q2);
1147 }
1148
1149
1150 /**
1151  * Select a subset of the items in the datastore and call
1152  * the given iterator for each of them.
1153  *
1154  * @param cls our plugin context
1155  * @param type entries of which type should be considered?
1156  *        Use 0 for any type.
1157  * @param iter function to call on each matching value;
1158  *        will be called once with a NULL value at the end
1159  * @param iter_cls closure for iter
1160  */
1161 static void
1162 sqlite_plugin_iter_migration_order (void *cls,
1163                                     uint32_t type,
1164                                     PluginIterator iter,
1165                                     void *iter_cls)
1166 {
1167   struct GNUNET_TIME_Absolute now;
1168   char *q;
1169
1170   now = GNUNET_TIME_absolute_get ();
1171   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1172                    (unsigned long long) now.value);
1173   basic_iter (cls,
1174               type, 
1175               GNUNET_NO, GNUNET_NO, 
1176               GNUNET_YES, GNUNET_NO,
1177               SELECT_IT_MIGRATION_ORDER_1,
1178               q,
1179               iter, iter_cls);
1180   GNUNET_free (q);
1181 }
1182
1183
1184 /**
1185  * Call sqlite using the already prepared query to get
1186  * the next result.
1187  *
1188  * @param cls not used
1189  * @param nc context with the prepared query
1190  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1191  *        there are no more results 
1192  */
1193 static int
1194 all_next_prepare (void *cls,
1195                   struct NextContext *nc)
1196 {
1197   struct Plugin *plugin;
1198   int ret;
1199
1200   if (nc == NULL)
1201     {
1202 #if DEBUG_SQLITE
1203       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204                   "Asked to clean up iterator state.\n");
1205 #endif
1206       return GNUNET_SYSERR;
1207     }
1208   plugin = nc->plugin;
1209   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1210     {      
1211       return GNUNET_OK;
1212     }
1213   if (ret != SQLITE_DONE)
1214     {
1215       LOG_SQLITE (plugin, NULL,
1216                   GNUNET_ERROR_TYPE_ERROR |
1217                   GNUNET_ERROR_TYPE_BULK,
1218                   "sqlite3_step");
1219       return GNUNET_SYSERR;
1220     }
1221   return GNUNET_NO;
1222 }
1223
1224
1225 /**
1226  * Select a subset of the items in the datastore and call
1227  * the given iterator for each of them.
1228  *
1229  * @param cls our plugin context
1230  * @param type entries of which type should be considered?
1231  *        Use 0 for any type.
1232  * @param iter function to call on each matching value;
1233  *        will be called once with a NULL value at the end
1234  * @param iter_cls closure for iter
1235  */
1236 static void
1237 sqlite_plugin_iter_all_now (void *cls,
1238                             uint32_t type,
1239                             PluginIterator iter,
1240                             void *iter_cls)
1241 {
1242   struct Plugin *plugin = cls;
1243   struct NextContext *nc;
1244   sqlite3_stmt *stmt;
1245
1246   if (sq_prepare (plugin->dbh, 
1247                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1248                   &stmt) != SQLITE_OK)
1249     {
1250       LOG_SQLITE (plugin, NULL,
1251                   GNUNET_ERROR_TYPE_ERROR |
1252                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1253       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1254       return;
1255     }
1256   nc = GNUNET_malloc (sizeof(struct NextContext));
1257   nc->plugin = plugin;
1258   nc->iter = iter;
1259   nc->iter_cls = iter_cls;
1260   nc->stmt = stmt;
1261   nc->prep = &all_next_prepare;
1262   nc->prep_cls = NULL;
1263   sqlite_next_request (nc, GNUNET_NO);
1264 }
1265
1266
1267 /**
1268  * FIXME.
1269  */
1270 struct GetNextContext
1271 {
1272
1273   /**
1274    * FIXME.
1275    */
1276   int total;
1277
1278   /**
1279    * FIXME.
1280    */
1281   int off;
1282
1283   /**
1284    * FIXME.
1285    */
1286   int have_vhash;
1287
1288   /**
1289    * FIXME.
1290    */
1291   unsigned int type;
1292
1293   /**
1294    * FIXME.
1295    */
1296   sqlite3_stmt *stmt;
1297
1298   /**
1299    * FIXME.
1300    */
1301   GNUNET_HashCode key;
1302
1303   /**
1304    * FIXME.
1305    */
1306   GNUNET_HashCode vhash;
1307 };
1308
1309
1310
1311 /**
1312  * FIXME.
1313  *
1314  * @param cls our "struct GetNextContext*"
1315  * @param nc FIXME
1316  * @return GNUNET_YES if there are more results, 
1317  *         GNUNET_NO if there are no more results,
1318  *         GNUNET_SYSERR on internal error
1319  */
1320 static int
1321 get_next_prepare (void *cls,
1322                   struct NextContext *nc)
1323 {
1324   struct GetNextContext *gnc = cls;
1325   int sqoff;
1326   int ret;
1327   int limit_off;
1328
1329   if (nc == NULL)
1330     {
1331       sqlite3_finalize (gnc->stmt);
1332       return GNUNET_SYSERR;
1333     }
1334   if (nc->count == gnc->total)
1335     return GNUNET_NO;
1336   if (nc->count + gnc->off == gnc->total)
1337     nc->last_rowid = 0;
1338   if (nc->count == 0)
1339     limit_off = gnc->off;
1340   else
1341     limit_off = 0;
1342   sqoff = 1;
1343   sqlite3_reset (nc->stmt);
1344   ret = sqlite3_bind_blob (nc->stmt,
1345                            sqoff++,
1346                            &gnc->key, 
1347                            sizeof (GNUNET_HashCode),
1348                            SQLITE_TRANSIENT);
1349   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1350     ret = sqlite3_bind_blob (nc->stmt,
1351                              sqoff++,
1352                              &gnc->vhash,
1353                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1354   if ((gnc->type != 0) && (ret == SQLITE_OK))
1355     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1356   if (ret == SQLITE_OK)
1357     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1358   if (ret == SQLITE_OK)
1359     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1360   if (ret != SQLITE_OK)
1361     return GNUNET_SYSERR;
1362   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1363     return GNUNET_NO;
1364   return GNUNET_OK;
1365 }
1366
1367
1368 /**
1369  * Iterate over the results for a particular key
1370  * in the datastore.
1371  *
1372  * @param cls closure
1373  * @param key maybe NULL (to match all entries)
1374  * @param vhash hash of the value, maybe NULL (to
1375  *        match all values that have the right key).
1376  *        Note that for DBlocks there is no difference
1377  *        betwen key and vhash, but for other blocks
1378  *        there may be!
1379  * @param type entries of which type are relevant?
1380  *     Use 0 for any type.
1381  * @param iter function to call on each matching value;
1382  *        will be called once with a NULL value at the end
1383  * @param iter_cls closure for iter
1384  */
1385 static void
1386 sqlite_plugin_get (void *cls,
1387                    const GNUNET_HashCode * key,
1388                    const GNUNET_HashCode * vhash,
1389                    uint32_t type,
1390                    PluginIterator iter, void *iter_cls)
1391 {
1392   struct Plugin *plugin = cls;
1393   struct GetNextContext *gpc;
1394   struct NextContext *nc;
1395   int ret;
1396   int total;
1397   sqlite3_stmt *stmt;
1398   char scratch[256];
1399   int sqoff;
1400
1401   GNUNET_assert (iter != NULL);
1402   if (key == NULL)
1403     {
1404       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1405       return;
1406     }
1407   GNUNET_snprintf (scratch, 256,
1408                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1409                    vhash == NULL ? "" : " AND vhash=:2",
1410                    type == 0 ? "" : (vhash ==
1411                                      NULL) ? " AND type=:2" : " AND type=:3");
1412   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1413     {
1414       LOG_SQLITE (plugin, NULL,
1415                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1416       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1417       return;
1418     }
1419   sqoff = 1;
1420   ret = sqlite3_bind_blob (stmt,
1421                            sqoff++,
1422                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1423   if ((vhash != NULL) && (ret == SQLITE_OK))
1424     ret = sqlite3_bind_blob (stmt,
1425                              sqoff++,
1426                              vhash,
1427                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1428   if ((type != 0) && (ret == SQLITE_OK))
1429     ret = sqlite3_bind_int (stmt, sqoff++, type);
1430   if (SQLITE_OK != ret)
1431     {
1432       LOG_SQLITE (plugin, NULL,
1433                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1434       sqlite3_reset (stmt);
1435       sqlite3_finalize (stmt);
1436       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1437       return;
1438     }
1439   ret = sqlite3_step (stmt);
1440   if (ret != SQLITE_ROW)
1441     {
1442       LOG_SQLITE (plugin, NULL,
1443                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1444                   "sqlite_step");
1445       sqlite3_reset (stmt);
1446       sqlite3_finalize (stmt);
1447       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1448       return;
1449     }
1450   total = sqlite3_column_int (stmt, 0);
1451   sqlite3_reset (stmt);
1452   sqlite3_finalize (stmt);
1453   if (0 == total)
1454     {
1455       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1456       return;
1457     }
1458
1459   GNUNET_snprintf (scratch, 256,
1460                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1461                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1462                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1463                    vhash == NULL ? "" : " AND vhash=:2",
1464                    type == 0 ? "" : (vhash ==
1465                                      NULL) ? " AND type=:2" : " AND type=:3",
1466                    sqoff, sqoff + 1);
1467   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1468     {
1469       LOG_SQLITE (plugin, NULL,
1470                   GNUNET_ERROR_TYPE_ERROR |
1471                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1472       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1473       return;
1474     }
1475   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1476                       sizeof(struct GetNextContext));
1477   nc->plugin = plugin;
1478   nc->iter = iter;
1479   nc->iter_cls = iter_cls;
1480   nc->stmt = stmt;
1481   gpc = (struct GetNextContext*) &nc[1];
1482   gpc->total = total;
1483   gpc->type = type;
1484   gpc->key = *key;
1485   gpc->stmt = stmt; /* alias used for freeing at the end! */
1486   if (NULL != vhash)
1487     {
1488       gpc->have_vhash = GNUNET_YES;
1489       gpc->vhash = *vhash;
1490     }
1491   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1492   nc->prep = &get_next_prepare;
1493   nc->prep_cls = gpc;
1494   sqlite_next_request (nc, GNUNET_NO);
1495 }
1496
1497
1498 /**
1499  * Drop database.
1500  *
1501  * @param cls our plugin context
1502  */
1503 static void 
1504 sqlite_plugin_drop (void *cls)
1505 {
1506   struct Plugin *plugin = cls;
1507   plugin->drop_on_shutdown = GNUNET_YES;
1508 }
1509
1510
1511 /**
1512  * Callback function to process statistic values.
1513  *
1514  * @param cls closure
1515  * @param subsystem name of subsystem that created the statistic
1516  * @param name the name of the datum
1517  * @param value the current value
1518  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1519  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1520  */
1521 static int
1522 process_stat_in (void *cls,
1523                  const char *subsystem,
1524                  const char *name,
1525                  unsigned long long value,
1526                  int is_persistent)
1527 {
1528   struct Plugin *plugin = cls;
1529   plugin->payload += value;
1530   return GNUNET_OK;
1531 }
1532                                          
1533
1534 /**
1535  * Entry point for the plugin.
1536  *
1537  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1538  * @return NULL on error, othrewise the plugin context
1539  */
1540 void *
1541 libgnunet_plugin_datastore_sqlite_init (void *cls)
1542 {
1543   static struct Plugin plugin;
1544   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1545   struct GNUNET_DATASTORE_PluginFunctions *api;
1546
1547   if (plugin.env != NULL)
1548     return NULL; /* can only initialize once! */
1549   memset (&plugin, 0, sizeof(struct Plugin));
1550   plugin.env = env;
1551   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1552                                                 "sqlite",
1553                                                 env->cfg);
1554   GNUNET_STATISTICS_get (plugin.statistics,
1555                          "sqlite",
1556                          QUOTA_STAT_NAME,
1557                          GNUNET_TIME_UNIT_MINUTES,
1558                          NULL,
1559                          &process_stat_in,
1560                          &plugin);
1561   if (GNUNET_OK !=
1562       database_setup (env->cfg, &plugin))
1563     {
1564       database_shutdown (&plugin);
1565       return NULL;
1566     }
1567   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1568   api->cls = &plugin;
1569   api->get_size = &sqlite_plugin_get_size;
1570   api->put = &sqlite_plugin_put;
1571   api->next_request = &sqlite_next_request;
1572   api->get = &sqlite_plugin_get;
1573   api->update = &sqlite_plugin_update;
1574   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1575   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1576   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1577   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1578   api->iter_all_now = &sqlite_plugin_iter_all_now;
1579   api->drop = &sqlite_plugin_drop;
1580   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1581                    "sqlite", _("Sqlite database running\n"));
1582   return api;
1583 }
1584
1585
1586 /**
1587  * Exit point from the plugin.
1588  *
1589  * @param cls the plugin context (as returned by "init")
1590  * @return always NULL
1591  */
1592 void *
1593 libgnunet_plugin_datastore_sqlite_done (void *cls)
1594 {
1595   char *fn;
1596   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1597   struct Plugin *plugin = api->cls;
1598
1599   fn = NULL;
1600   if (plugin->drop_on_shutdown)
1601     fn = GNUNET_strdup (plugin->fn);
1602   database_shutdown (plugin);
1603   plugin->env = NULL; 
1604   plugin->payload = 0;
1605   GNUNET_free (api);
1606   if (fn != NULL)
1607     {
1608       if (0 != UNLINK(fn))
1609         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1610                                   "unlink",
1611                                   fn);
1612       GNUNET_free (fn);
1613     }
1614   return NULL;
1615 }
1616
1617 /* end of plugin_datastore_sqlite.c */