make sure stats that are being set can be committed on destroy
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_YES
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94 /**
95  * Context for all functions in this plugin.
96  */
97 struct Plugin 
98 {
99   /**
100    * Our execution environment.
101    */
102   struct GNUNET_DATASTORE_PluginEnvironment *env;
103
104   /**
105    * Database filename.
106    */
107   char *fn;
108
109   /**
110    * Native SQLite database handle.
111    */
112   sqlite3 *dbh;
113
114   /**
115    * Precompiled SQL for update.
116    */
117   sqlite3_stmt *updPrio;
118
119   /**
120    * Precompiled SQL for insertion.
121    */
122   sqlite3_stmt *insertContent;
123
124   /**
125    * Handle to the statistics service.
126    */
127   struct GNUNET_STATISTICS_Handle *statistics;
128   
129   /**
130    * How much data are we currently storing
131    * in the database?
132    */
133   unsigned long long payload;
134
135   /**
136    * Number of updates that were made to the
137    * payload value since we last synchronized
138    * it with the statistics service.
139    */
140   unsigned int lastSync;
141
142   /**
143    * Should the database be dropped on shutdown?
144    */
145   int drop_on_shutdown;
146 };
147
148
149 /**
150  * @brief Prepare a SQL statement
151  *
152  * @param dbh handle to the database
153  * @param zSql SQL statement, UTF-8 encoded
154  * @param ppStmt set to the prepared statement
155  * @return 0 on success
156  */
157 static int
158 sq_prepare (sqlite3 * dbh, const char *zSql,
159             sqlite3_stmt ** ppStmt)
160 {
161   char *dummy;
162   return sqlite3_prepare (dbh,
163                           zSql,
164                           strlen (zSql), ppStmt, (const char **) &dummy);
165 }
166
167
168 /**
169  * Create our database indices.
170  * 
171  * @param dbh handle to the database
172  */
173 static void
174 create_indices (sqlite3 * dbh)
175 {
176   /* create indices */
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
179   sqlite3_exec (dbh,
180                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
181                 NULL, NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
183                 NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
185                 NULL);
186   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
187                 NULL, NULL);
188   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
189                 NULL, NULL, NULL);
190   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
191                 NULL, NULL);
192 }
193
194
195
196 #if 1
197 #define CHECK(a) GNUNET_break(a)
198 #define ENULL NULL
199 #else
200 #define ENULL &e
201 #define ENULL_DEFINED 1
202 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
203 #endif
204
205
206
207
208 /**
209  * Initialize the database connections and associated
210  * data structures (create tables and indices
211  * as needed as well).
212  *
213  * @param cfg our configuration
214  * @param plugin the plugin context (state for this module)
215  * @return GNUNET_OK on success
216  */
217 static int
218 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
219                 struct Plugin *plugin)
220 {
221   sqlite3_stmt *stmt;
222   char *afsdir;
223 #if ENULL_DEFINED
224   char *e;
225 #endif
226   
227   if (GNUNET_OK != 
228       GNUNET_CONFIGURATION_get_value_filename (cfg,
229                                                "datastore-sqlite",
230                                                "FILENAME",
231                                                &afsdir))
232     {
233       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
234                        "sqlite",
235                        _("Option `%s' in section `%s' missing in configuration!\n"),
236                        "FILENAME",
237                        "datastore-sqlite");
238       return GNUNET_SYSERR;
239     }
240   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
241     {
242       GNUNET_break (0);
243       GNUNET_free (afsdir);
244       return GNUNET_SYSERR;
245     }
246   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
247 #ifdef ENABLE_NLS
248                                               nl_langinfo (CODESET)
249 #else
250                                               "UTF-8"   /* good luck */
251 #endif
252                                               );
253   GNUNET_free (afsdir);
254   
255   /* Open database and precompile statements */
256   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
257     {
258       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
259                        "sqlite",
260                        _("Unable to initialize SQLite: %s.\n"),
261                        sqlite3_errmsg (plugin->dbh));
262       return GNUNET_SYSERR;
263     }
264   CHECK (SQLITE_OK ==
265          sqlite3_exec (plugin->dbh,
266                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh,
269                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
270   CHECK (SQLITE_OK ==
271          sqlite3_exec (plugin->dbh,
272                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
273   CHECK (SQLITE_OK ==
274          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
275
276   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
277
278
279   /* We have to do it here, because otherwise precompiling SQL might fail */
280   CHECK (SQLITE_OK ==
281          sq_prepare (plugin->dbh,
282                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
283                      &stmt));
284   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
285        (sqlite3_exec (plugin->dbh,
286                       "CREATE TABLE gn080 ("
287                       "  size INT4 NOT NULL DEFAULT 0,"
288                       "  type INT4 NOT NULL DEFAULT 0,"
289                       "  prio INT4 NOT NULL DEFAULT 0,"
290                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
291                       "  expire INT8 NOT NULL DEFAULT 0,"
292                       "  hash TEXT NOT NULL DEFAULT '',"
293                       "  vhash TEXT NOT NULL DEFAULT '',"
294                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
295                       NULL) != SQLITE_OK) )
296     {
297       LOG_SQLITE (plugin, NULL,
298                   GNUNET_ERROR_TYPE_ERROR, 
299                   "sqlite3_exec");
300       sqlite3_finalize (stmt);
301       return GNUNET_SYSERR;
302     }
303   sqlite3_finalize (stmt);
304   create_indices (plugin->dbh);
305
306   CHECK (SQLITE_OK ==
307          sq_prepare (plugin->dbh,
308                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
309                      &stmt));
310   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
311        (sqlite3_exec (plugin->dbh,
312                       "CREATE TABLE gn071 ("
313                       "  key TEXT NOT NULL DEFAULT '',"
314                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
315                       NULL) != SQLITE_OK) )
316     {
317       LOG_SQLITE (plugin, NULL,
318                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
319       sqlite3_finalize (stmt);
320       return GNUNET_SYSERR;
321     }
322   sqlite3_finalize (stmt);
323
324   if ((sq_prepare (plugin->dbh,
325                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
326                    "_ROWID_ = ?",
327                    &plugin->updPrio) != SQLITE_OK) ||
328       (sq_prepare (plugin->dbh,
329                    "INSERT INTO gn080 (size, type, prio, "
330                    "anonLevel, expire, hash, vhash, value) VALUES "
331                    "(?, ?, ?, ?, ?, ?, ?, ?)",
332                    &plugin->insertContent) != SQLITE_OK))
333     {
334       LOG_SQLITE (plugin, NULL,
335                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
336       return GNUNET_SYSERR;
337     }
338   return GNUNET_OK;
339 }
340
341
342 /**
343  * Synchronize our utilization statistics with the 
344  * statistics service.
345  * @param plugin the plugin context (state for this module)
346  */
347 static void 
348 sync_stats (struct Plugin *plugin)
349 {
350   GNUNET_STATISTICS_set (plugin->statistics,
351                          QUOTA_STAT_NAME,
352                          plugin->payload,
353                          GNUNET_YES);
354   plugin->lastSync = 0;
355 }
356
357
358 /**
359  * Shutdown database connection and associate data
360  * structures.
361  * @param plugin the plugin context (state for this module)
362  */
363 static void
364 database_shutdown (struct Plugin *plugin)
365 {
366   if (plugin->lastSync > 0)
367     sync_stats (plugin);
368   if (plugin->updPrio != NULL)
369     sqlite3_finalize (plugin->updPrio);
370   if (plugin->insertContent != NULL)
371     sqlite3_finalize (plugin->insertContent);
372   sqlite3_close (plugin->dbh);
373   GNUNET_free_non_null (plugin->fn);
374 }
375
376
377 /**
378  * Get an estimate of how much space the database is
379  * currently using.
380  *
381  * @param cls our plugin context
382  * @return number of bytes used on disk
383  */
384 static unsigned long long sqlite_plugin_get_size (void *cls)
385 {
386   struct Plugin *plugin = cls;
387   return plugin->payload;
388 }
389
390
391 /**
392  * Delete the database entry with the given
393  * row identifier.
394  *
395  * @param plugin the plugin context (state for this module)
396  * @param rid the ID of the row to delete
397  */
398 static int
399 delete_by_rowid (struct Plugin* plugin, 
400                  unsigned long long rid)
401 {
402   sqlite3_stmt *stmt;
403
404   if (sq_prepare (plugin->dbh,
405                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
406     {
407       LOG_SQLITE (plugin, NULL,
408                   GNUNET_ERROR_TYPE_ERROR |
409                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
410       return GNUNET_SYSERR;
411     }
412   sqlite3_bind_int64 (stmt, 1, rid);
413   if (SQLITE_DONE != sqlite3_step (stmt))
414     {
415       LOG_SQLITE (plugin, NULL,
416                   GNUNET_ERROR_TYPE_ERROR |
417                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
418       sqlite3_finalize (stmt);
419       return GNUNET_SYSERR;
420     }
421   sqlite3_finalize (stmt);
422   return GNUNET_OK;
423 }
424
425
426 /**
427  * Context for the universal iterator.
428  */
429 struct NextContext;
430
431 /**
432  * Type of a function that will prepare
433  * the next iteration.
434  *
435  * @param cls closure
436  * @param nc the next context; NULL for the last
437  *         call which gives the callback a chance to
438  *         clean up the closure
439  * @return GNUNET_OK on success, GNUNET_NO if there are
440  *         no more values, GNUNET_SYSERR on error
441  */
442 typedef int (*PrepareFunction)(void *cls,
443                                struct NextContext *nc);
444
445
446 /**
447  * Context we keep for the "next request" callback.
448  */
449 struct NextContext
450 {
451   /**
452    * Internal state.
453    */ 
454   struct Plugin *plugin;
455
456   /**
457    * Function to call on the next value.
458    */
459   PluginIterator iter;
460
461   /**
462    * Closure for iter.
463    */
464   void *iter_cls;
465
466   /**
467    * Function to call to prepare the next
468    * iteration.
469    */
470   PrepareFunction prep;
471
472   /**
473    * Closure for prep.
474    */
475   void *prep_cls;
476
477   /**
478    * Statement that the iterator will get the data
479    * from (updated or set by prep).
480    */ 
481   sqlite3_stmt *stmt;
482
483   /**
484    * Row ID of the last result.
485    */
486   unsigned long long last_rowid;
487
488   /**
489    * Key of the last result.
490    */
491   GNUNET_HashCode lastKey;  
492
493   /**
494    * Expiration time of the last value visited.
495    */
496   struct GNUNET_TIME_Absolute lastExpiration;
497
498   /**
499    * Priority of the last value visited.
500    */ 
501   unsigned int lastPriority; 
502
503   /**
504    * Number of results processed so far.
505    */
506   unsigned int count;
507
508   /**
509    * Set to GNUNET_YES if we must stop now.
510    */
511   int end_it;
512 };
513
514
515 /**
516  * Continuation of "sqlite_next_request".
517  *
518  * @param cls the next context
519  * @param tc the task context (unused)
520  */
521 static void 
522 sqlite_next_request_cont (void *cls,
523                           const struct GNUNET_SCHEDULER_TaskContext *tc)
524 {
525   struct NextContext * nc= cls;
526   struct Plugin *plugin;
527   unsigned long long rowid;
528   sqlite3_stmt *stmtd;
529   int ret;
530   unsigned int type;
531   unsigned int size;
532   unsigned int priority;
533   unsigned int anonymity;
534   struct GNUNET_TIME_Absolute expiration;
535   const GNUNET_HashCode *key;
536   const void *data;
537
538  
539   plugin = nc->plugin;
540   if ( (GNUNET_YES == nc->end_it) ||
541        (GNUNET_OK != (nc->prep(nc->prep_cls,
542                                nc))) )
543     {
544     END:
545       nc->iter (nc->iter_cls, 
546                 NULL, NULL, 0, NULL, 0, 0, 0, 
547                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
548       nc->prep (nc->prep_cls, NULL);
549       GNUNET_free (nc);
550       return;
551     }
552
553   rowid = sqlite3_column_int64 (nc->stmt, 7);
554   nc->last_rowid = rowid;
555   type = sqlite3_column_int (nc->stmt, 1);
556   size = sqlite3_column_bytes (nc->stmt, 6);
557   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
558     {
559       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
560                        "sqlite",
561                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
562       if (SQLITE_OK != sqlite3_reset (nc->stmt))
563         LOG_SQLITE (nc->plugin, NULL,
564                     GNUNET_ERROR_TYPE_ERROR |
565                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
566       if (sq_prepare
567           (nc->plugin->dbh,
568            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
569            &stmtd) != SQLITE_OK)
570         {
571           LOG_SQLITE (nc->plugin, NULL,
572                       GNUNET_ERROR_TYPE_ERROR |
573                       GNUNET_ERROR_TYPE_BULK, 
574                       "sq_prepare");
575           goto END;
576         }
577
578       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
579         LOG_SQLITE (nc->plugin, NULL,
580                     GNUNET_ERROR_TYPE_ERROR |
581                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
582       if (SQLITE_DONE != sqlite3_step (stmtd))
583         LOG_SQLITE (nc->plugin, NULL,
584                     GNUNET_ERROR_TYPE_ERROR |
585                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
586       if (SQLITE_OK != sqlite3_finalize (stmtd))
587         LOG_SQLITE (nc->plugin, NULL,
588                     GNUNET_ERROR_TYPE_ERROR |
589                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
590       goto END;
591     }
592
593   priority = sqlite3_column_int (nc->stmt, 2);
594   anonymity = sqlite3_column_int (nc->stmt, 3);
595   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
596   key = sqlite3_column_blob (nc->stmt, 5);
597   nc->lastPriority = priority;
598   nc->lastExpiration = expiration;
599   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
600   data = sqlite3_column_blob (nc->stmt, 6);
601   nc->count++;
602   ret = nc->iter (nc->iter_cls,
603                   nc,
604                   key,
605                   size,
606                   data, 
607                   type,
608                   priority,
609                   anonymity,
610                   expiration,
611                   rowid);
612   if (ret == GNUNET_SYSERR)
613     {
614       nc->end_it = GNUNET_YES;
615       return;
616     }
617 #if DEBUG_SQLITE
618   if (ret == GNUNET_NO)
619     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
620                      "sqlite",
621                      "Asked to remove entry %llu (%u bytes)\n",
622                      (unsigned long long) rowid,
623                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
624 #endif
625   if ( (ret == GNUNET_NO) &&
626        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
627     {
628       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
629       plugin->lastSync++; 
630 #if DEBUG_SQLITE
631       if (ret == GNUNET_NO)
632         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
633                          "sqlite",
634                          "Removed entry %llu (%u bytes), new payload is %llu\n",
635                          (unsigned long long) rowid,
636                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
637                          (unsigned long long) plugin->payload);
638 #endif
639       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
640         sync_stats (plugin);
641     }
642 }
643
644
645 /**
646  * Function invoked on behalf of a "PluginIterator"
647  * asking the database plugin to call the iterator
648  * with the next item.
649  *
650  * @param next_cls whatever argument was given
651  *        to the PluginIterator as "next_cls".
652  * @param end_it set to GNUNET_YES if we
653  *        should terminate the iteration early
654  *        (iterator should be still called once more
655  *         to signal the end of the iteration).
656  */
657 static void 
658 sqlite_next_request (void *next_cls,
659                      int end_it)
660 {
661   struct NextContext * nc= next_cls;
662
663   if (GNUNET_YES == end_it)
664     nc->end_it = GNUNET_YES;
665   GNUNET_SCHEDULER_add_continuation (nc->plugin->env->sched,
666                                      GNUNET_NO,
667                                      &sqlite_next_request_cont,
668                                      nc,
669                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
670 }
671
672
673
674 /**
675  * Store an item in the datastore.
676  *
677  * @param cls closure
678  * @param key key for the item
679  * @param size number of bytes in data
680  * @param data content stored
681  * @param type type of the content
682  * @param priority priority of the content
683  * @param anonymity anonymity-level for the content
684  * @param expiration expiration time for the content
685  * @param msg set to an error message
686  * @return GNUNET_OK on success
687  */
688 static int
689 sqlite_plugin_put (void *cls,
690                    const GNUNET_HashCode * key,
691                    uint32_t size,
692                    const void *data,
693                    uint32_t type,
694                    uint32_t priority,
695                    uint32_t anonymity,
696                    struct GNUNET_TIME_Absolute expiration,
697                    char ** msg)
698 {
699   struct Plugin *plugin = cls;
700   int n;
701   sqlite3_stmt *stmt;
702   GNUNET_HashCode vhash;
703
704 #if DEBUG_SQLITE
705   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
706                    "sqlite",
707                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
708                    type, 
709                    GNUNET_h2s(key),
710                    priority,
711                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
712                    (long long) expiration.value);
713 #endif
714   GNUNET_CRYPTO_hash (data, size, &vhash);
715   stmt = plugin->insertContent;
716   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
717       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
718       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
719       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
720       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
721       (SQLITE_OK !=
722        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
723                           SQLITE_TRANSIENT)) ||
724       (SQLITE_OK !=
725        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
726                           SQLITE_TRANSIENT))
727       || (SQLITE_OK !=
728           sqlite3_bind_blob (stmt, 8, data, size,
729                              SQLITE_TRANSIENT)))
730     {
731       LOG_SQLITE (plugin,
732                   msg,
733                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
734       if (SQLITE_OK != sqlite3_reset (stmt))
735         LOG_SQLITE (plugin, NULL,
736                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
737       return GNUNET_SYSERR;
738     }
739   n = sqlite3_step (stmt);
740   if (n != SQLITE_DONE)
741     {
742       if (n == SQLITE_BUSY)
743         {
744           LOG_SQLITE (plugin, msg,
745                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
746           sqlite3_reset (stmt);
747           GNUNET_break (0);
748           return GNUNET_NO;
749         }
750       LOG_SQLITE (plugin, msg,
751                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
752       sqlite3_reset (stmt);
753       return GNUNET_SYSERR;
754     }
755   if (SQLITE_OK != sqlite3_reset (stmt))
756     LOG_SQLITE (plugin, NULL,
757                 GNUNET_ERROR_TYPE_ERROR |
758                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
759   plugin->lastSync++;
760   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
761 #if DEBUG_SQLITE
762   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
763                    "sqlite",
764                    "Stored new entry (%u bytes), new payload is %llu\n",
765                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
766                    (unsigned long long) plugin->payload);
767 #endif
768   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
769     sync_stats (plugin);
770   return GNUNET_OK;
771 }
772
773
774 /**
775  * Update the priority for a particular key in the datastore.  If
776  * the expiration time in value is different than the time found in
777  * the datastore, the higher value should be kept.  For the
778  * anonymity level, the lower value is to be used.  The specified
779  * priority should be added to the existing priority, ignoring the
780  * priority in value.
781  *
782  * Note that it is possible for multiple values to match this put.
783  * In that case, all of the respective values are updated.
784  *
785  * @param cls the plugin context (state for this module)
786  * @param uid unique identifier of the datum
787  * @param delta by how much should the priority
788  *     change?  If priority + delta < 0 the
789  *     priority should be set to 0 (never go
790  *     negative).
791  * @param expire new expiration time should be the
792  *     MAX of any existing expiration time and
793  *     this value
794  * @param msg set to an error message
795  * @return GNUNET_OK on success
796  */
797 static int
798 sqlite_plugin_update (void *cls,
799                       uint64_t uid,
800                       int delta, struct GNUNET_TIME_Absolute expire,
801                       char **msg)
802 {
803   struct Plugin *plugin = cls;
804   int n;
805
806   sqlite3_bind_int (plugin->updPrio, 1, delta);
807   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
808   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
809   n = sqlite3_step (plugin->updPrio);
810   if (n != SQLITE_DONE)
811     LOG_SQLITE (plugin, msg,
812                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
813                 "sqlite3_step");
814 #if DEBUG_SQLITE
815   else
816     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
817                      "sqlite",
818                      "Block updated\n");
819 #endif
820   sqlite3_reset (plugin->updPrio);
821
822   if (n == SQLITE_BUSY)
823     return GNUNET_NO;
824   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
825 }
826
827
828 /**
829  * Internal context for an iteration.
830  */
831 struct IterContext
832 {
833   /**
834    * FIXME.
835    */
836   sqlite3_stmt *stmt_1;
837
838   /**
839    * FIXME.
840    */
841   sqlite3_stmt *stmt_2;
842
843   /**
844    * FIXME.
845    */
846   int is_asc;
847
848   /**
849    * FIXME.
850    */
851   int is_prio;
852
853   /**
854    * FIXME.
855    */
856   int is_migr;
857
858   /**
859    * FIXME.
860    */
861   int limit_nonanonymous;
862
863   /**
864    * Desired type for blocks returned by this iterator.
865    */
866   uint32_t type;
867 };
868
869
870 /**
871  * Prepare our SQL query to obtain the next record from the database.
872  *
873  * @param cls our "struct IterContext"
874  * @param nc NULL to terminate the iteration, otherwise our context for
875  *           getting the next result.
876  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
877  *         GNUNET_SYSERR on error (or end of iteration)
878  */
879 static int
880 iter_next_prepare (void *cls,
881                    struct NextContext *nc)
882 {
883   struct IterContext *ic = cls;
884   struct Plugin *plugin;
885   int ret;
886
887   if (nc == NULL)
888     {
889 #if DEBUG_SQLITE
890       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891                   "Asked to clean up iterator state.\n");
892 #endif
893       sqlite3_finalize (ic->stmt_1);
894       sqlite3_finalize (ic->stmt_2);
895       return GNUNET_SYSERR;
896     }
897   sqlite3_reset (ic->stmt_1);
898   sqlite3_reset (ic->stmt_2);
899   plugin = nc->plugin;
900   if (ic->is_prio)
901     {
902 #if DEBUG_SQLITE
903       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                   "Restricting to results larger than the last priority %u\n",
905                   nc->lastPriority);
906 #endif
907       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
908       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
909     }
910   else
911     {
912 #if DEBUG_SQLITE
913       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                   "Restricting to results larger than the last expiration %llu\n",
915                   (unsigned long long) nc->lastExpiration.value);
916 #endif
917       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
918       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
919     }
920 #if DEBUG_SQLITE
921   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922               "Restricting to results larger than the last key `%s'\n",
923               GNUNET_h2s(&nc->lastKey));
924 #endif
925   sqlite3_bind_blob (ic->stmt_1, 2, 
926                      &nc->lastKey, 
927                      sizeof (GNUNET_HashCode),
928                      SQLITE_TRANSIENT);
929   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
930     {      
931 #if DEBUG_SQLITE
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Result found using iterator 1\n");
934 #endif
935       nc->stmt = ic->stmt_1;
936       return GNUNET_OK;
937     }
938   if (ret != SQLITE_DONE)
939     {
940       LOG_SQLITE (plugin, NULL,
941                   GNUNET_ERROR_TYPE_ERROR |
942                   GNUNET_ERROR_TYPE_BULK,
943                   "sqlite3_step");
944       return GNUNET_SYSERR;
945     }
946   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
947     LOG_SQLITE (plugin, NULL,
948                 GNUNET_ERROR_TYPE_ERROR | 
949                 GNUNET_ERROR_TYPE_BULK, 
950                 "sqlite3_reset");
951   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
952     {
953 #if DEBUG_SQLITE
954       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955                   "Result found using iterator 2\n");
956 #endif
957       nc->stmt = ic->stmt_2;
958       return GNUNET_OK;
959     }
960   if (ret != SQLITE_DONE)
961     {
962       LOG_SQLITE (plugin, NULL,
963                   GNUNET_ERROR_TYPE_ERROR |
964                   GNUNET_ERROR_TYPE_BULK,
965                   "sqlite3_step");
966       return GNUNET_SYSERR;
967     }
968   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
969     LOG_SQLITE (plugin, NULL,
970                 GNUNET_ERROR_TYPE_ERROR |
971                 GNUNET_ERROR_TYPE_BULK,
972                 "sqlite3_reset");
973 #if DEBUG_SQLITE
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "No result found using either iterator\n");
976 #endif
977   return GNUNET_NO;
978 }
979
980
981 /**
982  * Call a method for each key in the database and
983  * call the callback method on it.
984  *
985  * @param plugin our plugin context
986  * @param type entries of which type should be considered?
987  * @param is_asc are we iterating in ascending order?
988  * @param is_prio are we iterating by priority (otherwise by expiration)
989  * @param is_migr are we iterating in migration order?
990  * @param limit_nonanonymous are we restricting results to those with anonymity
991  *              level zero?
992  * @param stmt_str_1 first SQL statement to execute
993  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
994  * @param iter function to call on each matching value;
995  *        will be called once with a NULL value at the end
996  * @param iter_cls closure for iter
997  */
998 static void
999 basic_iter (struct Plugin *plugin,
1000             uint32_t type,
1001             int is_asc,
1002             int is_prio,
1003             int is_migr,
1004             int limit_nonanonymous,
1005             const char *stmt_str_1,
1006             const char *stmt_str_2,
1007             PluginIterator iter,
1008             void *iter_cls)
1009 {
1010   struct NextContext *nc;
1011   struct IterContext *ic;
1012   sqlite3_stmt *stmt_1;
1013   sqlite3_stmt *stmt_2;
1014
1015 #if DEBUG_SQLITE
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               "At %llu, using queries `%s' and `%s'\n",
1018               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1019               stmt_str_1,
1020               stmt_str_2);
1021 #endif
1022   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1023     {
1024       LOG_SQLITE (plugin, NULL,
1025                   GNUNET_ERROR_TYPE_ERROR |
1026                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1027       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1028       return;
1029     }
1030   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1031     {
1032       LOG_SQLITE (plugin, NULL,
1033                   GNUNET_ERROR_TYPE_ERROR |
1034                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1035       sqlite3_finalize (stmt_1);
1036       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1037       return;
1038     }
1039   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1040                       sizeof(struct IterContext));
1041   nc->plugin = plugin;
1042   nc->iter = iter;
1043   nc->iter_cls = iter_cls;
1044   nc->stmt = NULL;
1045   ic = (struct IterContext*) &nc[1];
1046   ic->stmt_1 = stmt_1;
1047   ic->stmt_2 = stmt_2;
1048   ic->type = type;
1049   ic->is_asc = is_asc;
1050   ic->is_prio = is_prio;
1051   ic->is_migr = is_migr;
1052   ic->limit_nonanonymous = limit_nonanonymous;
1053   nc->prep = &iter_next_prepare;
1054   nc->prep_cls = ic;
1055   if (is_asc)
1056     {
1057       nc->lastPriority = 0;
1058       nc->lastExpiration.value = 0;
1059       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1060     }
1061   else
1062     {
1063       nc->lastPriority = 0x7FFFFFFF;
1064       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1065       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1066     }
1067   sqlite_next_request (nc, GNUNET_NO);
1068 }
1069
1070
1071 /**
1072  * Select a subset of the items in the datastore and call
1073  * the given iterator for each of them.
1074  *
1075  * @param cls our plugin context
1076  * @param type entries of which type should be considered?
1077  *        Use 0 for any type.
1078  * @param iter function to call on each matching value;
1079  *        will be called once with a NULL value at the end
1080  * @param iter_cls closure for iter
1081  */
1082 static void
1083 sqlite_plugin_iter_low_priority (void *cls,
1084                                  uint32_t type,
1085                                  PluginIterator iter,
1086                                  void *iter_cls)
1087 {
1088   basic_iter (cls,
1089               type, 
1090               GNUNET_YES, GNUNET_YES, 
1091               GNUNET_NO, GNUNET_NO,
1092               SELECT_IT_LOW_PRIORITY_1,
1093               SELECT_IT_LOW_PRIORITY_2, 
1094               iter, iter_cls);
1095 }
1096
1097
1098 /**
1099  * Select a subset of the items in the datastore and call
1100  * the given iterator for each of them.
1101  *
1102  * @param cls our plugin context
1103  * @param type entries of which type should be considered?
1104  *        Use 0 for any type.
1105  * @param iter function to call on each matching value;
1106  *        will be called once with a NULL value at the end
1107  * @param iter_cls closure for iter
1108  */
1109 static void
1110 sqlite_plugin_iter_zero_anonymity (void *cls,
1111                                    uint32_t type,
1112                                    PluginIterator iter,
1113                                    void *iter_cls)
1114 {
1115   struct GNUNET_TIME_Absolute now;
1116   char *q1;
1117   char *q2;
1118
1119   now = GNUNET_TIME_absolute_get ();
1120   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1121                    (unsigned long long) now.value);
1122   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1123                    (unsigned long long) now.value);
1124   basic_iter (cls,
1125               type, 
1126               GNUNET_NO, GNUNET_YES, 
1127               GNUNET_NO, GNUNET_YES,
1128               q1,
1129               q2,
1130               iter, iter_cls);
1131   GNUNET_free (q1);
1132   GNUNET_free (q2);
1133 }
1134
1135
1136
1137 /**
1138  * Select a subset of the items in the datastore and call
1139  * the given iterator for each of them.
1140  *
1141  * @param cls our plugin context
1142  * @param type entries of which type should be considered?
1143  *        Use 0 for any type.
1144  * @param iter function to call on each matching value;
1145  *        will be called once with a NULL value at the end
1146  * @param iter_cls closure for iter
1147  */
1148 static void
1149 sqlite_plugin_iter_ascending_expiration (void *cls,
1150                                          uint32_t type,
1151                                          PluginIterator iter,
1152                                          void *iter_cls)
1153 {
1154   struct GNUNET_TIME_Absolute now;
1155   char *q1;
1156   char *q2;
1157
1158   now = GNUNET_TIME_absolute_get ();
1159   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1160                    (unsigned long long) 0*now.value);
1161   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1162                    (unsigned long long) 0*now.value);
1163   basic_iter (cls,
1164               type, 
1165               GNUNET_YES, GNUNET_NO, 
1166               GNUNET_NO, GNUNET_NO,
1167               q1, q2,
1168               iter, iter_cls);
1169   GNUNET_free (q1);
1170   GNUNET_free (q2);
1171 }
1172
1173
1174 /**
1175  * Select a subset of the items in the datastore and call
1176  * the given iterator for each of them.
1177  *
1178  * @param cls our plugin context
1179  * @param type entries of which type should be considered?
1180  *        Use 0 for any type.
1181  * @param iter function to call on each matching value;
1182  *        will be called once with a NULL value at the end
1183  * @param iter_cls closure for iter
1184  */
1185 static void
1186 sqlite_plugin_iter_migration_order (void *cls,
1187                                     uint32_t type,
1188                                     PluginIterator iter,
1189                                     void *iter_cls)
1190 {
1191   struct GNUNET_TIME_Absolute now;
1192   char *q;
1193
1194   now = GNUNET_TIME_absolute_get ();
1195   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1196                    (unsigned long long) now.value);
1197   basic_iter (cls,
1198               type, 
1199               GNUNET_NO, GNUNET_NO, 
1200               GNUNET_YES, GNUNET_NO,
1201               SELECT_IT_MIGRATION_ORDER_1,
1202               q,
1203               iter, iter_cls);
1204   GNUNET_free (q);
1205 }
1206
1207
1208 /**
1209  * Call sqlite using the already prepared query to get
1210  * the next result.
1211  *
1212  * @param cls not used
1213  * @param nc context with the prepared query
1214  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1215  *        there are no more results 
1216  */
1217 static int
1218 all_next_prepare (void *cls,
1219                   struct NextContext *nc)
1220 {
1221   struct Plugin *plugin;
1222   int ret;
1223
1224   if (nc == NULL)
1225     {
1226 #if DEBUG_SQLITE
1227       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1228                   "Asked to clean up iterator state.\n");
1229 #endif
1230       return GNUNET_SYSERR;
1231     }
1232   plugin = nc->plugin;
1233   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1234     {      
1235       return GNUNET_OK;
1236     }
1237   if (ret != SQLITE_DONE)
1238     {
1239       LOG_SQLITE (plugin, NULL,
1240                   GNUNET_ERROR_TYPE_ERROR |
1241                   GNUNET_ERROR_TYPE_BULK,
1242                   "sqlite3_step");
1243       return GNUNET_SYSERR;
1244     }
1245   return GNUNET_NO;
1246 }
1247
1248
1249 /**
1250  * Select a subset of the items in the datastore and call
1251  * the given iterator for each of them.
1252  *
1253  * @param cls our plugin context
1254  * @param type entries of which type should be considered?
1255  *        Use 0 for any type.
1256  * @param iter function to call on each matching value;
1257  *        will be called once with a NULL value at the end
1258  * @param iter_cls closure for iter
1259  */
1260 static void
1261 sqlite_plugin_iter_all_now (void *cls,
1262                             uint32_t type,
1263                             PluginIterator iter,
1264                             void *iter_cls)
1265 {
1266   struct Plugin *plugin = cls;
1267   struct NextContext *nc;
1268   sqlite3_stmt *stmt;
1269
1270   if (sq_prepare (plugin->dbh, 
1271                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1272                   &stmt) != SQLITE_OK)
1273     {
1274       LOG_SQLITE (plugin, NULL,
1275                   GNUNET_ERROR_TYPE_ERROR |
1276                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1277       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1278       return;
1279     }
1280   nc = GNUNET_malloc (sizeof(struct NextContext));
1281   nc->plugin = plugin;
1282   nc->iter = iter;
1283   nc->iter_cls = iter_cls;
1284   nc->stmt = stmt;
1285   nc->prep = &all_next_prepare;
1286   nc->prep_cls = NULL;
1287   sqlite_next_request (nc, GNUNET_NO);
1288 }
1289
1290
1291 /**
1292  * FIXME.
1293  */
1294 struct GetNextContext
1295 {
1296
1297   /**
1298    * FIXME.
1299    */
1300   int total;
1301
1302   /**
1303    * FIXME.
1304    */
1305   int off;
1306
1307   /**
1308    * FIXME.
1309    */
1310   int have_vhash;
1311
1312   /**
1313    * FIXME.
1314    */
1315   unsigned int type;
1316
1317   /**
1318    * FIXME.
1319    */
1320   sqlite3_stmt *stmt;
1321
1322   /**
1323    * FIXME.
1324    */
1325   GNUNET_HashCode key;
1326
1327   /**
1328    * FIXME.
1329    */
1330   GNUNET_HashCode vhash;
1331 };
1332
1333
1334
1335 /**
1336  * FIXME.
1337  *
1338  * @param cls our "struct GetNextContext*"
1339  * @param nc FIXME
1340  * @return GNUNET_YES if there are more results, 
1341  *         GNUNET_NO if there are no more results,
1342  *         GNUNET_SYSERR on internal error
1343  */
1344 static int
1345 get_next_prepare (void *cls,
1346                   struct NextContext *nc)
1347 {
1348   struct GetNextContext *gnc = cls;
1349   int sqoff;
1350   int ret;
1351   int limit_off;
1352
1353   if (nc == NULL)
1354     {
1355       sqlite3_finalize (gnc->stmt);
1356       return GNUNET_SYSERR;
1357     }
1358   if (nc->count == gnc->total)
1359     return GNUNET_NO;
1360   if (nc->count + gnc->off == gnc->total)
1361     nc->last_rowid = 0;
1362   if (nc->count == 0)
1363     limit_off = gnc->off;
1364   else
1365     limit_off = 0;
1366   sqoff = 1;
1367   sqlite3_reset (nc->stmt);
1368   ret = sqlite3_bind_blob (nc->stmt,
1369                            sqoff++,
1370                            &gnc->key, 
1371                            sizeof (GNUNET_HashCode),
1372                            SQLITE_TRANSIENT);
1373   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1374     ret = sqlite3_bind_blob (nc->stmt,
1375                              sqoff++,
1376                              &gnc->vhash,
1377                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1378   if ((gnc->type != 0) && (ret == SQLITE_OK))
1379     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1380   if (ret == SQLITE_OK)
1381     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1382   if (ret == SQLITE_OK)
1383     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1384   if (ret != SQLITE_OK)
1385     return GNUNET_SYSERR;
1386   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1387     return GNUNET_NO;
1388   return GNUNET_OK;
1389 }
1390
1391
1392 /**
1393  * Iterate over the results for a particular key
1394  * in the datastore.
1395  *
1396  * @param cls closure
1397  * @param key maybe NULL (to match all entries)
1398  * @param vhash hash of the value, maybe NULL (to
1399  *        match all values that have the right key).
1400  *        Note that for DBlocks there is no difference
1401  *        betwen key and vhash, but for other blocks
1402  *        there may be!
1403  * @param type entries of which type are relevant?
1404  *     Use 0 for any type.
1405  * @param iter function to call on each matching value;
1406  *        will be called once with a NULL value at the end
1407  * @param iter_cls closure for iter
1408  */
1409 static void
1410 sqlite_plugin_get (void *cls,
1411                    const GNUNET_HashCode * key,
1412                    const GNUNET_HashCode * vhash,
1413                    uint32_t type,
1414                    PluginIterator iter, void *iter_cls)
1415 {
1416   struct Plugin *plugin = cls;
1417   struct GetNextContext *gpc;
1418   struct NextContext *nc;
1419   int ret;
1420   int total;
1421   sqlite3_stmt *stmt;
1422   char scratch[256];
1423   int sqoff;
1424
1425   GNUNET_assert (iter != NULL);
1426   if (key == NULL)
1427     {
1428       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1429       return;
1430     }
1431   GNUNET_snprintf (scratch, 256,
1432                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1433                    vhash == NULL ? "" : " AND vhash=:2",
1434                    type == 0 ? "" : (vhash ==
1435                                      NULL) ? " AND type=:2" : " AND type=:3");
1436   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1437     {
1438       LOG_SQLITE (plugin, NULL,
1439                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1440       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1441       return;
1442     }
1443   sqoff = 1;
1444   ret = sqlite3_bind_blob (stmt,
1445                            sqoff++,
1446                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1447   if ((vhash != NULL) && (ret == SQLITE_OK))
1448     ret = sqlite3_bind_blob (stmt,
1449                              sqoff++,
1450                              vhash,
1451                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1452   if ((type != 0) && (ret == SQLITE_OK))
1453     ret = sqlite3_bind_int (stmt, sqoff++, type);
1454   if (SQLITE_OK != ret)
1455     {
1456       LOG_SQLITE (plugin, NULL,
1457                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1458       sqlite3_reset (stmt);
1459       sqlite3_finalize (stmt);
1460       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1461       return;
1462     }
1463   ret = sqlite3_step (stmt);
1464   if (ret != SQLITE_ROW)
1465     {
1466       LOG_SQLITE (plugin, NULL,
1467                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1468                   "sqlite_step");
1469       sqlite3_reset (stmt);
1470       sqlite3_finalize (stmt);
1471       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1472       return;
1473     }
1474   total = sqlite3_column_int (stmt, 0);
1475   sqlite3_reset (stmt);
1476   sqlite3_finalize (stmt);
1477   if (0 == total)
1478     {
1479       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1480       return;
1481     }
1482
1483   GNUNET_snprintf (scratch, 256,
1484                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1485                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1486                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1487                    vhash == NULL ? "" : " AND vhash=:2",
1488                    type == 0 ? "" : (vhash ==
1489                                      NULL) ? " AND type=:2" : " AND type=:3",
1490                    sqoff, sqoff + 1);
1491   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1492     {
1493       LOG_SQLITE (plugin, NULL,
1494                   GNUNET_ERROR_TYPE_ERROR |
1495                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1496       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1497       return;
1498     }
1499   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1500                       sizeof(struct GetNextContext));
1501   nc->plugin = plugin;
1502   nc->iter = iter;
1503   nc->iter_cls = iter_cls;
1504   nc->stmt = stmt;
1505   gpc = (struct GetNextContext*) &nc[1];
1506   gpc->total = total;
1507   gpc->type = type;
1508   gpc->key = *key;
1509   gpc->stmt = stmt; /* alias used for freeing at the end! */
1510   if (NULL != vhash)
1511     {
1512       gpc->have_vhash = GNUNET_YES;
1513       gpc->vhash = *vhash;
1514     }
1515   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1516   nc->prep = &get_next_prepare;
1517   nc->prep_cls = gpc;
1518   sqlite_next_request (nc, GNUNET_NO);
1519 }
1520
1521
1522 /**
1523  * Drop database.
1524  *
1525  * @param cls our plugin context
1526  */
1527 static void 
1528 sqlite_plugin_drop (void *cls)
1529 {
1530   struct Plugin *plugin = cls;
1531   plugin->drop_on_shutdown = GNUNET_YES;
1532 }
1533
1534
1535 /**
1536  * Callback function to process statistic values.
1537  *
1538  * @param cls closure
1539  * @param subsystem name of subsystem that created the statistic
1540  * @param name the name of the datum
1541  * @param value the current value
1542  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1543  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1544  */
1545 static int
1546 process_stat_in (void *cls,
1547                  const char *subsystem,
1548                  const char *name,
1549                  uint64_t value,
1550                  int is_persistent)
1551 {
1552   struct Plugin *plugin = cls;
1553   plugin->payload += value;
1554 #if DEBUG_SQLITE
1555   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1556                    "sqlite",
1557                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1558                    value,
1559                    (unsigned long long) plugin->payload);
1560 #endif
1561   return GNUNET_OK;
1562 }
1563                                          
1564
1565 /**
1566  * Entry point for the plugin.
1567  *
1568  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1569  * @return NULL on error, othrewise the plugin context
1570  */
1571 void *
1572 libgnunet_plugin_datastore_sqlite_init (void *cls)
1573 {
1574   static struct Plugin plugin;
1575   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1576   struct GNUNET_DATASTORE_PluginFunctions *api;
1577
1578   if (plugin.env != NULL)
1579     return NULL; /* can only initialize once! */
1580   memset (&plugin, 0, sizeof(struct Plugin));
1581   plugin.env = env;
1582   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1583                                                 "sqlite",
1584                                                 env->cfg);
1585   GNUNET_STATISTICS_get (plugin.statistics,
1586                          "sqlite",
1587                          QUOTA_STAT_NAME,
1588                          GNUNET_TIME_UNIT_MINUTES,
1589                          NULL,
1590                          &process_stat_in,
1591                          &plugin);
1592   if (GNUNET_OK !=
1593       database_setup (env->cfg, &plugin))
1594     {
1595       database_shutdown (&plugin);
1596       return NULL;
1597     }
1598   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1599   api->cls = &plugin;
1600   api->get_size = &sqlite_plugin_get_size;
1601   api->put = &sqlite_plugin_put;
1602   api->next_request = &sqlite_next_request;
1603   api->get = &sqlite_plugin_get;
1604   api->update = &sqlite_plugin_update;
1605   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1606   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1607   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1608   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1609   api->iter_all_now = &sqlite_plugin_iter_all_now;
1610   api->drop = &sqlite_plugin_drop;
1611   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1612                    "sqlite", _("Sqlite database running\n"));
1613   return api;
1614 }
1615
1616
1617 /**
1618  * Exit point from the plugin.
1619  *
1620  * @param cls the plugin context (as returned by "init")
1621  * @return always NULL
1622  */
1623 void *
1624 libgnunet_plugin_datastore_sqlite_done (void *cls)
1625 {
1626   char *fn;
1627   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1628   struct Plugin *plugin = api->cls;
1629
1630   fn = NULL;
1631   if (plugin->drop_on_shutdown)
1632     fn = GNUNET_strdup (plugin->fn);
1633   database_shutdown (plugin);
1634   GNUNET_STATISTICS_destroy (plugin->statistics,
1635                              GNUNET_YES);
1636   plugin->env = NULL; 
1637   plugin->payload = 0;
1638   GNUNET_free (api);
1639   if (fn != NULL)
1640     {
1641       if (0 != UNLINK(fn))
1642         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1643                                   "unlink",
1644                                   fn);
1645       GNUNET_free (fn);
1646     }
1647   return NULL;
1648 }
1649
1650 /* end of plugin_datastore_sqlite.c */