improving datastore API --- not working yet
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 #define DEBUG_SQLITE GNUNET_NO
32
33
34 /**
35  * Log an error message at log-level 'level' that indicates
36  * a failure of the command 'cmd' on file 'filename'
37  * with the message given by strerror(errno).
38  */
39 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40
41
42 #define SELECT_IT_NON_ANONYMOUS_1 \
43   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND hash < ?2) "\
44   " ORDER BY hash DESC LIMIT 1"
45
46 #define SELECT_IT_NON_ANONYMOUS_2 \
47   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0)"\
48   " ORDER BY prio DESC, hash DESC LIMIT 1"
49
50
51 #define SELECT_IT_REPLICATION_ORDER \
52   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "\
53   " ORDER BY repl DESC, Random() LIMIT 1"
54
55 #define SELECT_IT_EXPIRATION_ORDER \
56   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "\
57   " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "\
58   " ORDER BY prio ASC LIMIT 1"
59
60
61 /**
62  * After how many ms "busy" should a DB operation fail for good?
63  * A low value makes sure that we are more responsive to requests
64  * (especially PUTs).  A high value guarantees a higher success
65  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
66  *
67  * The default value of 250ms should ensure that users do not experience
68  * huge latencies while at the same time allowing operations to succeed
69  * with reasonable probability.
70  */
71 #define BUSY_TIMEOUT_MS 250
72
73
74
75 /**
76  * Context for all functions in this plugin.
77  */
78 struct Plugin 
79 {
80   /**
81    * Our execution environment.
82    */
83   struct GNUNET_DATASTORE_PluginEnvironment *env;
84
85   /**
86    * Database filename.
87    */
88   char *fn;
89
90   /**
91    * Native SQLite database handle.
92    */
93   sqlite3 *dbh;
94
95   /**
96    * Precompiled SQL for deletion.
97    */
98   sqlite3_stmt *delRow;
99
100   /**
101    * Precompiled SQL for update.
102    */
103   sqlite3_stmt *updPrio;
104
105   /**
106    * Precompiled SQL for replication decrement.
107    */
108   sqlite3_stmt *updRepl;
109
110   /**
111    * Precompiled SQL for replication selection.
112    */
113   sqlite3_stmt *selRepl;
114
115   /**
116    * Precompiled SQL for expiration selection.
117    */
118   sqlite3_stmt *selExpi;
119
120   /**
121    * Precompiled SQL for insertion.
122    */
123   sqlite3_stmt *insertContent;
124
125   /**
126    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
127    */
128   struct NextContext *next_task_nc;
129
130   /**
131    * Pending task with scheduler for running the next request.
132    */
133   GNUNET_SCHEDULER_TaskIdentifier next_task;
134
135   /**
136    * Should the database be dropped on shutdown?
137    */
138   int drop_on_shutdown;
139
140 };
141
142
143 /**
144  * @brief Prepare a SQL statement
145  *
146  * @param dbh handle to the database
147  * @param zSql SQL statement, UTF-8 encoded
148  * @param ppStmt set to the prepared statement
149  * @return 0 on success
150  */
151 static int
152 sq_prepare (sqlite3 * dbh, 
153             const char *zSql,
154             sqlite3_stmt ** ppStmt)
155 {
156   char *dummy;
157   int result;
158   result = sqlite3_prepare_v2 (dbh,
159                                zSql,
160                                strlen (zSql), 
161                                ppStmt,
162                                (const char **) &dummy);
163 #if DEBUG_SQLITE
164   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
165                    "sqlite",
166                    "Prepared %p: %d\n",
167                    *ppStmt, 
168                    result);
169 #endif
170   return result;
171 }
172
173
174 /**
175  * Create our database indices.
176  * 
177  * @param dbh handle to the database
178  */
179 static void
180 create_indices (sqlite3 * dbh)
181 {
182   /* create indices */
183   sqlite3_exec (dbh,
184                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
185   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
186                 NULL);
187   sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL,
188                 NULL);
189   sqlite3_exec (dbh,
190                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
191                 NULL, NULL);
192   sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)",
193                 NULL, NULL, NULL);
194 }
195
196
197
198 #if 0
199 #define CHECK(a) GNUNET_break(a)
200 #define ENULL NULL
201 #else
202 #define ENULL &e
203 #define ENULL_DEFINED 1
204 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
205 #endif
206
207
208
209
210 /**
211  * Initialize the database connections and associated
212  * data structures (create tables and indices
213  * as needed as well).
214  *
215  * @param cfg our configuration
216  * @param plugin the plugin context (state for this module)
217  * @return GNUNET_OK on success
218  */
219 static int
220 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
221                 struct Plugin *plugin)
222 {
223   sqlite3_stmt *stmt;
224   char *afsdir;
225 #if ENULL_DEFINED
226   char *e;
227 #endif
228   
229   if (GNUNET_OK != 
230       GNUNET_CONFIGURATION_get_value_filename (cfg,
231                                                "datastore-sqlite",
232                                                "FILENAME",
233                                                &afsdir))
234     {
235       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
236                        "sqlite",
237                        _("Option `%s' in section `%s' missing in configuration!\n"),
238                        "FILENAME",
239                        "datastore-sqlite");
240       return GNUNET_SYSERR;
241     }
242   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
243     {
244       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
245         {
246           GNUNET_break (0);
247           GNUNET_free (afsdir);
248           return GNUNET_SYSERR;
249         }
250       /* database is new or got deleted, reset payload to zero! */
251       plugin->env->duc (plugin->env->cls, 0);
252     }
253 #ifdef ENABLE_NLS
254   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
255                                        nl_langinfo (CODESET));
256 #else
257   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
258                                        "UTF-8");   /* good luck */
259 #endif
260   GNUNET_free (afsdir);
261   
262   /* Open database and precompile statements */
263   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
264     {
265       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
266                        "sqlite",
267                        _("Unable to initialize SQLite: %s.\n"),
268                        sqlite3_errmsg (plugin->dbh));
269       return GNUNET_SYSERR;
270     }
271   CHECK (SQLITE_OK ==
272          sqlite3_exec (plugin->dbh,
273                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
274   CHECK (SQLITE_OK ==
275          sqlite3_exec (plugin->dbh,
276                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
277   CHECK (SQLITE_OK ==
278          sqlite3_exec (plugin->dbh,
279                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
280   CHECK (SQLITE_OK ==
281          sqlite3_exec (plugin->dbh,
282                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
283   CHECK (SQLITE_OK ==
284          sqlite3_exec (plugin->dbh, 
285                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
286
287   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
288
289
290   /* We have to do it here, because otherwise precompiling SQL might fail */
291   CHECK (SQLITE_OK ==
292          sq_prepare (plugin->dbh,
293                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
294                      &stmt));
295   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
296        (sqlite3_exec (plugin->dbh,
297                       "CREATE TABLE gn090 ("
298                       "  repl INT4 NOT NULL DEFAULT 0,"
299                       "  type INT4 NOT NULL DEFAULT 0,"
300                       "  prio INT4 NOT NULL DEFAULT 0,"
301                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
302                       "  expire INT8 NOT NULL DEFAULT 0,"
303                       "  hash TEXT NOT NULL DEFAULT '',"
304                       "  vhash TEXT NOT NULL DEFAULT '',"
305                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
306                       NULL) != SQLITE_OK) )
307     {
308       LOG_SQLITE (plugin, NULL,
309                   GNUNET_ERROR_TYPE_ERROR, 
310                   "sqlite3_exec");
311       sqlite3_finalize (stmt);
312       return GNUNET_SYSERR;
313     }
314   sqlite3_finalize (stmt);
315   create_indices (plugin->dbh);
316
317   CHECK (SQLITE_OK ==
318          sq_prepare (plugin->dbh,
319                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
320                      &stmt));
321   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
322        (sqlite3_exec (plugin->dbh,
323                       "CREATE TABLE gn071 ("
324                       "  key TEXT NOT NULL DEFAULT '',"
325                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
326                       NULL) != SQLITE_OK) )
327     {
328       LOG_SQLITE (plugin, NULL,
329                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
330       sqlite3_finalize (stmt);
331       return GNUNET_SYSERR;
332     }
333   sqlite3_finalize (stmt);
334
335   if ((sq_prepare (plugin->dbh,
336                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
337                    "_ROWID_ = ?",
338                    &plugin->updPrio) != SQLITE_OK) ||
339       (sq_prepare (plugin->dbh,
340                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
341                    "_ROWID_ = ?",
342                    &plugin->updRepl) != SQLITE_OK) ||
343       (sq_prepare (plugin->dbh,
344                    SELECT_IT_REPLICATION_ORDER,
345                    &plugin->selRepl) != SQLITE_OK) ||
346       (sq_prepare (plugin->dbh,
347                    SELECT_IT_EXPIRATION_ORDER,
348                    &plugin->selExpi) != SQLITE_OK) ||
349       (sq_prepare (plugin->dbh,
350                    "INSERT INTO gn090 (repl, type, prio, "
351                    "anonLevel, expire, hash, vhash, value) VALUES "
352                    "(?, ?, ?, ?, ?, ?, ?, ?)",
353                    &plugin->insertContent) != SQLITE_OK) ||
354       (sq_prepare (plugin->dbh,
355                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
356                    &plugin->delRow) != SQLITE_OK))
357     {
358       LOG_SQLITE (plugin, NULL,
359                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
360       return GNUNET_SYSERR;
361     }
362
363   return GNUNET_OK;
364 }
365
366
367 /**
368  * Shutdown database connection and associate data
369  * structures.
370  * @param plugin the plugin context (state for this module)
371  */
372 static void
373 database_shutdown (struct Plugin *plugin)
374 {
375   int result;
376 #if SQLITE_VERSION_NUMBER >= 3007000
377   sqlite3_stmt *stmt;
378 #endif
379
380   if (plugin->delRow != NULL)
381     sqlite3_finalize (plugin->delRow);
382   if (plugin->updPrio != NULL)
383     sqlite3_finalize (plugin->updPrio);
384   if (plugin->updRepl != NULL)
385     sqlite3_finalize (plugin->updRepl);
386   if (plugin->selRepl != NULL)
387     sqlite3_finalize (plugin->selRepl);
388   if (plugin->selExpi != NULL)
389     sqlite3_finalize (plugin->selExpi);
390   if (plugin->insertContent != NULL)
391     sqlite3_finalize (plugin->insertContent);
392   result = sqlite3_close(plugin->dbh);
393 #if SQLITE_VERSION_NUMBER >= 3007000
394   if (result == SQLITE_BUSY)
395     {
396       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
397                        "sqlite",
398                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
399       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
400       while (stmt != NULL)
401         {
402 #if DEBUG_SQLITE
403           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
404                      "sqlite", "Closing statement %p\n", stmt);
405 #endif
406           result = sqlite3_finalize(stmt);
407 #if DEBUG_SQLITE
408           if (result != SQLITE_OK)
409               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
410                                "sqlite",
411                                "Failed to close statement %p: %d\n", stmt, result);
412 #endif
413           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
414         }
415       result = sqlite3_close(plugin->dbh);
416     }
417 #endif
418   if (SQLITE_OK != result)
419       LOG_SQLITE (plugin, NULL,
420                   GNUNET_ERROR_TYPE_ERROR, 
421                   "sqlite3_close");
422
423   GNUNET_free_non_null (plugin->fn);
424 }
425
426
427 /**
428  * Delete the database entry with the given
429  * row identifier.
430  *
431  * @param plugin the plugin context (state for this module)
432  * @param rid the ID of the row to delete
433  */
434 static int
435 delete_by_rowid (struct Plugin* plugin, 
436                  unsigned long long rid)
437 {
438   sqlite3_bind_int64 (plugin->delRow, 1, rid);
439   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
440     {
441       LOG_SQLITE (plugin, NULL,
442                   GNUNET_ERROR_TYPE_ERROR |
443                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
444       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
445           LOG_SQLITE (plugin, NULL,
446                       GNUNET_ERROR_TYPE_ERROR |
447                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
448       return GNUNET_SYSERR;
449     }
450   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
451     LOG_SQLITE (plugin, NULL,
452                 GNUNET_ERROR_TYPE_ERROR |
453                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
454   return GNUNET_OK;
455 }
456
457
458 /**
459  * Context for the universal iterator.
460  */
461 struct NextContext;
462
463 /**
464  * Type of a function that will prepare
465  * the next iteration.
466  *
467  * @param cls closure
468  * @param nc the next context; NULL for the last
469  *         call which gives the callback a chance to
470  *         clean up the closure
471  * @return GNUNET_OK on success, GNUNET_NO if there are
472  *         no more values, GNUNET_SYSERR on error
473  */
474 typedef int (*PrepareFunction)(void *cls,
475                                struct NextContext *nc);
476
477
478 /**
479  * Context we keep for the "next request" callback.
480  */
481 struct NextContext
482 {
483   /**
484    * Internal state.
485    */ 
486   struct Plugin *plugin;
487
488   /**
489    * Function to call on the next value.
490    */
491   PluginIterator iter;
492
493   /**
494    * Closure for iter.
495    */
496   void *iter_cls;
497
498   /**
499    * Function to call to prepare the next
500    * iteration.
501    */
502   PrepareFunction prep;
503
504   /**
505    * Closure for prep.
506    */
507   void *prep_cls;
508
509   /**
510    * Statement that the iterator will get the data
511    * from (updated or set by prep).
512    */ 
513   sqlite3_stmt *stmt;
514
515   /**
516    * Row ID of the last result.
517    */
518   unsigned long long last_rowid;
519
520   /**
521    * Key of the last result.
522    */
523   GNUNET_HashCode lastKey;  
524
525   /**
526    * Priority of the last value visited.
527    */ 
528   unsigned int lastPriority; 
529
530   /**
531    * Number of results processed so far.
532    */
533   unsigned int count;
534
535   /**
536    * Set to GNUNET_YES if we must stop now.
537    */
538   int end_it;
539 };
540
541
542 /**
543  * Continuation of "sqlite_next_request".
544  *
545  * @param cls the next context
546  * @param tc the task context (unused)
547  */
548 static void 
549 sqlite_next_request_cont (void *cls,
550                           const struct GNUNET_SCHEDULER_TaskContext *tc)
551 {
552   struct NextContext * nc = cls;
553   struct Plugin *plugin;
554   unsigned long long rowid;
555   int ret;
556   unsigned int size;
557   uint32_t anonymity;
558   uint32_t priority;
559   enum GNUNET_BLOCK_Type type;
560   const GNUNET_HashCode *key;
561   struct GNUNET_TIME_Absolute expiration;
562   char data[GNUNET_SERVER_MAX_MESSAGE_SIZE];
563   
564   plugin = nc->plugin;
565   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
566   plugin->next_task_nc = NULL;
567   if ( (GNUNET_YES == nc->end_it) ||
568        (GNUNET_OK != (nc->prep(nc->prep_cls,
569                                nc))) )
570     {
571     END:
572       nc->iter (nc->iter_cls, 
573                 NULL, NULL, 0, NULL, 0, 0, 0, 
574                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
575       nc->prep (nc->prep_cls, NULL);
576       GNUNET_free (nc);
577       return;
578     }
579
580   type = sqlite3_column_int (nc->stmt, 0);
581   priority = sqlite3_column_int (nc->stmt, 1);
582   anonymity = sqlite3_column_int (nc->stmt, 2);
583   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
584   key = sqlite3_column_blob (nc->stmt, 4);
585   size = sqlite3_column_bytes (nc->stmt, 5);
586   memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
587   rowid = sqlite3_column_int64 (nc->stmt, 6);
588   if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode))
589     {
590       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
591                        "sqlite",
592                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
593       if (SQLITE_OK != sqlite3_reset (nc->stmt))
594         LOG_SQLITE (plugin, NULL,
595                     GNUNET_ERROR_TYPE_ERROR |
596                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
597       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
598         plugin->env->duc (plugin->env->cls,
599                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
600       goto END;
601     }
602   nc->count++;
603   nc->last_rowid = rowid;
604   nc->lastPriority = priority;
605   nc->lastKey = *key;
606   if (SQLITE_OK != sqlite3_reset (nc->stmt))
607     LOG_SQLITE (plugin, NULL,
608                 GNUNET_ERROR_TYPE_ERROR |
609                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
610   ret = nc->iter (nc->iter_cls, nc,
611                   key,
612                   size, data,
613                   type, priority,
614                   anonymity, expiration,
615                   rowid);
616   switch (ret)
617     {
618     case GNUNET_SYSERR:
619       nc->end_it = GNUNET_YES;
620       break;
621     case GNUNET_NO:
622 #if DEBUG_SQLITE
623       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
624                        "sqlite",
625                        "Asked to remove entry %llu (%u bytes)\n",
626                        (unsigned long long) rowid,
627                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
628 #endif
629       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
630         {
631           plugin->env->duc (plugin->env->cls,
632                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
633 #if DEBUG_SQLITE
634           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
635                            "sqlite",
636                            "Removed entry %llu (%u bytes)\n",
637                            (unsigned long long) rowid,
638                            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
639 #endif
640         }
641       break;
642     case GNUNET_YES:
643       break;
644     default:
645       GNUNET_break (0);
646     }
647 }
648
649
650 /**
651  * Function invoked on behalf of a "PluginIterator"
652  * asking the database plugin to call the iterator
653  * with the next item.
654  *
655  * @param next_cls whatever argument was given
656  *        to the PluginIterator as "next_cls".
657  * @param end_it set to GNUNET_YES if we
658  *        should terminate the iteration early
659  *        (iterator should be still called once more
660  *         to signal the end of the iteration).
661  */
662 static void 
663 sqlite_next_request (void *next_cls,
664                      int end_it)
665 {
666   struct NextContext * nc= next_cls;
667
668   if (GNUNET_YES == end_it)
669     nc->end_it = GNUNET_YES;
670   nc->plugin->next_task_nc = nc;
671   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
672                                                     nc);
673 }
674
675
676 /**
677  * Store an item in the datastore.
678  *
679  * @param cls closure
680  * @param key key for the item
681  * @param size number of bytes in data
682  * @param data content stored
683  * @param type type of the content
684  * @param priority priority of the content
685  * @param anonymity anonymity-level for the content
686  * @param replication replication-level for the content
687  * @param expiration expiration time for the content
688  * @param msg set to an error message
689  * @return GNUNET_OK on success
690  */
691 static int
692 sqlite_plugin_put (void *cls,
693                    const GNUNET_HashCode *key,
694                    uint32_t size,
695                    const void *data,
696                    enum GNUNET_BLOCK_Type type,
697                    uint32_t priority,
698                    uint32_t anonymity,
699                    uint32_t replication,
700                    struct GNUNET_TIME_Absolute expiration,
701                    char ** msg)
702 {
703   struct Plugin *plugin = cls;
704   int n;
705   sqlite3_stmt *stmt;
706   GNUNET_HashCode vhash;
707
708 #if DEBUG_SQLITE
709   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
710                    "sqlite",
711                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
712                    type, 
713                    GNUNET_h2s(key),
714                    priority,
715                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
716                    (long long) expiration.abs_value);
717 #endif
718   GNUNET_CRYPTO_hash (data, size, &vhash);
719   stmt = plugin->insertContent;
720   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
721       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
722       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
723       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
724       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
725       (SQLITE_OK !=
726        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
727                           SQLITE_TRANSIENT)) ||
728       (SQLITE_OK !=
729        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
730                           SQLITE_TRANSIENT))
731       || (SQLITE_OK !=
732           sqlite3_bind_blob (stmt, 8, data, size,
733                              SQLITE_TRANSIENT)))
734     {
735       LOG_SQLITE (plugin,
736                   msg,
737                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
738       if (SQLITE_OK != sqlite3_reset (stmt))
739         LOG_SQLITE (plugin, NULL,
740                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
741       return GNUNET_SYSERR;
742     }
743   n = sqlite3_step (stmt);
744   switch (n)
745     {
746     case SQLITE_DONE:
747       if (SQLITE_OK != sqlite3_reset (stmt))
748         LOG_SQLITE (plugin, NULL,
749                     GNUNET_ERROR_TYPE_ERROR |
750                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
751       plugin->env->duc (plugin->env->cls,
752                         size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
753 #if DEBUG_SQLITE
754       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
755                        "sqlite",
756                        "Stored new entry (%u bytes)\n",
757                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
758 #endif
759       return GNUNET_OK;
760     case SQLITE_BUSY:      
761       GNUNET_break (0);
762       LOG_SQLITE (plugin, msg,
763                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
764                   "sqlite3_step");
765       sqlite3_reset (stmt);
766       return GNUNET_SYSERR;
767     default:
768       LOG_SQLITE (plugin, msg,
769                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
770                   "sqlite3_step");
771       sqlite3_reset (stmt);
772       database_shutdown (plugin);
773       database_setup (plugin->env->cfg,
774                       plugin);
775       return GNUNET_SYSERR;    
776     }
777 }
778
779
780 /**
781  * Update the priority for a particular key in the datastore.  If
782  * the expiration time in value is different than the time found in
783  * the datastore, the higher value should be kept.  For the
784  * anonymity level, the lower value is to be used.  The specified
785  * priority should be added to the existing priority, ignoring the
786  * priority in value.
787  *
788  * Note that it is possible for multiple values to match this put.
789  * In that case, all of the respective values are updated.
790  *
791  * @param cls the plugin context (state for this module)
792  * @param uid unique identifier of the datum
793  * @param delta by how much should the priority
794  *     change?  If priority + delta < 0 the
795  *     priority should be set to 0 (never go
796  *     negative).
797  * @param expire new expiration time should be the
798  *     MAX of any existing expiration time and
799  *     this value
800  * @param msg set to an error message
801  * @return GNUNET_OK on success
802  */
803 static int
804 sqlite_plugin_update (void *cls,
805                       uint64_t uid,
806                       int delta, struct GNUNET_TIME_Absolute expire,
807                       char **msg)
808 {
809   struct Plugin *plugin = cls;
810   int n;
811
812   sqlite3_bind_int (plugin->updPrio, 1, delta);
813   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
814   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
815   n = sqlite3_step (plugin->updPrio);
816   sqlite3_reset (plugin->updPrio);
817   switch (n)
818     {
819     case SQLITE_DONE:
820 #if DEBUG_SQLITE
821       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
822                        "sqlite",
823                        "Block updated\n");
824 #endif
825       return GNUNET_OK;
826     case SQLITE_BUSY:
827       LOG_SQLITE (plugin, msg,
828                   GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
829                   "sqlite3_step");
830       return GNUNET_NO;
831     default:
832       LOG_SQLITE (plugin, msg,
833                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
834                   "sqlite3_step");
835       return GNUNET_SYSERR;
836     }
837 }
838
839
840 /**
841  * Internal context for an iteration.
842  */
843 struct IterContext
844 {
845   /**
846    * FIXME.
847    */
848   sqlite3_stmt *stmt_1;
849
850   /**
851    * FIXME.
852    */
853   sqlite3_stmt *stmt_2;
854
855   /**
856    * Desired type for blocks returned by this iterator.
857    */
858   enum GNUNET_BLOCK_Type type;
859 };
860
861
862 /**
863  * Prepare our SQL query to obtain the next record from the database.
864  *
865  * @param cls our "struct IterContext"
866  * @param nc NULL to terminate the iteration, otherwise our context for
867  *           getting the next result.
868  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
869  *         GNUNET_SYSERR on error (or end of iteration)
870  */
871 static int
872 iter_next_prepare (void *cls,
873                    struct NextContext *nc)
874 {
875   struct IterContext *ic = cls;
876   struct Plugin *plugin;
877   int ret;
878
879   if (nc == NULL)
880     {
881 #if DEBUG_SQLITE
882       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883                   "Asked to clean up iterator state.\n");
884 #endif
885       sqlite3_finalize (ic->stmt_1);
886       sqlite3_finalize (ic->stmt_2);
887       return GNUNET_SYSERR;
888     }
889   sqlite3_reset (ic->stmt_1);
890   sqlite3_reset (ic->stmt_2);
891   plugin = nc->plugin;
892 #if DEBUG_SQLITE
893   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894               "Restricting to results larger than the last priority %u\n",
895               nc->lastPriority);
896 #endif
897   sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
898   sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
899 #if DEBUG_SQLITE
900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901               "Restricting to results larger than the last key `%s'\n",
902               GNUNET_h2s(&nc->lastKey));
903 #endif
904   sqlite3_bind_blob (ic->stmt_1, 2, 
905                      &nc->lastKey, 
906                      sizeof (GNUNET_HashCode),
907                      SQLITE_TRANSIENT);
908   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
909     {      
910 #if DEBUG_SQLITE
911       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
912                   "Result found using iterator 1\n");
913 #endif
914       nc->stmt = ic->stmt_1;
915       return GNUNET_OK;
916     }
917   if (ret != SQLITE_DONE)
918     {
919       LOG_SQLITE (plugin, NULL,
920                   GNUNET_ERROR_TYPE_ERROR |
921                   GNUNET_ERROR_TYPE_BULK,
922                   "sqlite3_step");
923       return GNUNET_SYSERR;
924     }
925   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
926     LOG_SQLITE (plugin, NULL,
927                 GNUNET_ERROR_TYPE_ERROR | 
928                 GNUNET_ERROR_TYPE_BULK, 
929                 "sqlite3_reset");
930   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
931     {
932 #if DEBUG_SQLITE
933       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934                   "Result found using iterator 2\n");
935 #endif
936       nc->stmt = ic->stmt_2;
937       return GNUNET_OK;
938     }
939   if (ret != SQLITE_DONE)
940     {
941       LOG_SQLITE (plugin, NULL,
942                   GNUNET_ERROR_TYPE_ERROR |
943                   GNUNET_ERROR_TYPE_BULK,
944                   "sqlite3_step");
945       return GNUNET_SYSERR;
946     }
947   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
948     LOG_SQLITE (plugin, NULL,
949                 GNUNET_ERROR_TYPE_ERROR |
950                 GNUNET_ERROR_TYPE_BULK,
951                 "sqlite3_reset");
952 #if DEBUG_SQLITE
953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954               "No result found using either iterator\n");
955 #endif
956   return GNUNET_NO;
957 }
958
959
960 /**
961  * Select a subset of the items in the datastore and call
962  * the given iterator for each of them.
963  *
964  * @param cls our plugin context
965  * @param type entries of which type should be considered?
966  *        Use 0 for any type.
967  * @param iter function to call on each matching value;
968  *        will be called once with a NULL value at the end
969  * @param iter_cls closure for iter
970  */
971 static void
972 sqlite_plugin_iter_zero_anonymity (void *cls,
973                                    enum GNUNET_BLOCK_Type type,
974                                    PluginIterator iter,
975                                    void *iter_cls)
976 {
977   struct Plugin *plugin = cls;
978   struct GNUNET_TIME_Absolute now;
979   struct NextContext *nc;
980   struct IterContext *ic;
981   sqlite3_stmt *stmt_1;
982   sqlite3_stmt *stmt_2;
983   char *q;
984
985   now = GNUNET_TIME_absolute_get ();
986   GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_1,
987                    (unsigned long long) now.abs_value);
988   if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
989     {
990       LOG_SQLITE (plugin, NULL,
991                   GNUNET_ERROR_TYPE_ERROR |
992                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
993       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
994       GNUNET_free (q);
995       return;
996     }
997   GNUNET_free (q);
998   GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_2,
999                    (unsigned long long) now.abs_value);
1000   if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1001     {
1002       LOG_SQLITE (plugin, NULL,
1003                   GNUNET_ERROR_TYPE_ERROR |
1004                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1005       sqlite3_finalize (stmt_1);
1006       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1007       GNUNET_free (q);
1008       return;
1009     }
1010   GNUNET_free (q);
1011   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1012                       sizeof(struct IterContext));
1013   nc->plugin = plugin;
1014   nc->iter = iter;
1015   nc->iter_cls = iter_cls;
1016   nc->stmt = NULL;
1017   ic = (struct IterContext*) &nc[1];
1018   ic->stmt_1 = stmt_1;
1019   ic->stmt_2 = stmt_2;
1020   ic->type = type;
1021   nc->prep = &iter_next_prepare;
1022   nc->prep_cls = ic;
1023   nc->lastPriority = 0x7FFFFFFF;
1024   memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1025   sqlite_next_request (nc, GNUNET_NO);
1026 }
1027
1028
1029 /**
1030  * Call sqlite using the already prepared query to get
1031  * the next result.
1032  *
1033  * @param cls context with the prepared query
1034  * @param nc context with the prepared query
1035  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1036  *        there are no more results 
1037  */
1038 static int
1039 all_next_prepare (void *cls,
1040                   struct NextContext *nc)
1041 {
1042   struct Plugin *plugin;
1043   int ret;
1044
1045   if (nc == NULL)
1046     {
1047 #if DEBUG_SQLITE
1048       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049                   "Asked to clean up iterator state.\n");
1050 #endif
1051       nc = (struct NextContext *)cls;
1052       if (nc->stmt)
1053           sqlite3_finalize (nc->stmt);
1054       nc->stmt = NULL;
1055       return GNUNET_SYSERR;
1056     }
1057   plugin = nc->plugin;
1058   ret = sqlite3_step (nc->stmt);
1059   switch (ret)
1060     {
1061     case SQLITE_ROW:
1062       return GNUNET_OK;  
1063     case SQLITE_DONE:
1064       return GNUNET_NO;
1065     default:
1066       LOG_SQLITE (plugin, NULL,
1067                   GNUNET_ERROR_TYPE_ERROR |
1068                   GNUNET_ERROR_TYPE_BULK,
1069                   "sqlite3_step");
1070       return GNUNET_SYSERR;
1071     }
1072 }
1073
1074
1075 /**
1076  * Select a subset of the items in the datastore and call
1077  * the given iterator for each of them.
1078  *
1079  * @param cls our plugin context
1080  * @param type entries of which type should be considered?
1081  *        Use 0 for any type.
1082  * @param iter function to call on each matching value;
1083  *        will be called once with a NULL value at the end
1084  * @param iter_cls closure for iter
1085  */
1086 static void
1087 sqlite_plugin_iter_all_now (void *cls,
1088                             enum GNUNET_BLOCK_Type type,
1089                             PluginIterator iter,
1090                             void *iter_cls)
1091 {
1092   struct Plugin *plugin = cls;
1093   struct NextContext *nc;
1094   sqlite3_stmt *stmt;
1095
1096   if (sq_prepare (plugin->dbh, 
1097                   "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090",
1098                   &stmt) != SQLITE_OK)
1099     {
1100       LOG_SQLITE (plugin, NULL,
1101                   GNUNET_ERROR_TYPE_ERROR |
1102                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1103       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1104       return;
1105     }
1106   nc = GNUNET_malloc (sizeof(struct NextContext));
1107   nc->plugin = plugin;
1108   nc->iter = iter;
1109   nc->iter_cls = iter_cls;
1110   nc->stmt = stmt;
1111   nc->prep = &all_next_prepare;
1112   nc->prep_cls = nc;
1113   sqlite_next_request (nc, GNUNET_NO);
1114 }
1115
1116
1117 /**
1118  * FIXME.
1119  */
1120 struct GetNextContext
1121 {
1122
1123   /**
1124    * FIXME.
1125    */
1126   int total;
1127
1128   /**
1129    * FIXME.
1130    */
1131   int off;
1132
1133   /**
1134    * FIXME.
1135    */
1136   int have_vhash;
1137
1138   /**
1139    * FIXME.
1140    */
1141   unsigned int type;
1142
1143   /**
1144    * FIXME.
1145    */
1146   sqlite3_stmt *stmt;
1147
1148   /**
1149    * FIXME.
1150    */
1151   GNUNET_HashCode key;
1152
1153   /**
1154    * FIXME.
1155    */
1156   GNUNET_HashCode vhash;
1157 };
1158
1159
1160
1161 /**
1162  * FIXME.
1163  *
1164  * @param cls our "struct GetNextContext*"
1165  * @param nc FIXME
1166  * @return GNUNET_YES if there are more results, 
1167  *         GNUNET_NO if there are no more results,
1168  *         GNUNET_SYSERR on internal error
1169  */
1170 static int
1171 get_next_prepare (void *cls,
1172                   struct NextContext *nc)
1173 {
1174   struct GetNextContext *gnc = cls;
1175   int sqoff;
1176   int ret;
1177   int limit_off;
1178
1179   if (nc == NULL)
1180     {
1181       sqlite3_finalize (gnc->stmt);
1182       return GNUNET_SYSERR;
1183     }
1184   if (nc->count == gnc->total)
1185     return GNUNET_NO;
1186   if (nc->count + gnc->off == gnc->total)
1187     nc->last_rowid = 0;
1188   if (nc->count == 0)
1189     limit_off = gnc->off;
1190   else
1191     limit_off = 0;
1192   sqoff = 1;
1193   sqlite3_reset (nc->stmt);
1194   ret = sqlite3_bind_blob (nc->stmt,
1195                            sqoff++,
1196                            &gnc->key, 
1197                            sizeof (GNUNET_HashCode),
1198                            SQLITE_TRANSIENT);
1199   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1200     ret = sqlite3_bind_blob (nc->stmt,
1201                              sqoff++,
1202                              &gnc->vhash,
1203                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1204   if ((gnc->type != 0) && (ret == SQLITE_OK))
1205     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1206   if (ret == SQLITE_OK)
1207     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1208   if (ret == SQLITE_OK)
1209     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1210   if (ret != SQLITE_OK)
1211     return GNUNET_SYSERR;
1212   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1213     return GNUNET_NO;
1214   return GNUNET_OK;
1215 }
1216
1217
1218 /**
1219  * Iterate over the results for a particular key
1220  * in the datastore.
1221  *
1222  * @param cls closure
1223  * @param key maybe NULL (to match all entries)
1224  * @param vhash hash of the value, maybe NULL (to
1225  *        match all values that have the right key).
1226  *        Note that for DBlocks there is no difference
1227  *        betwen key and vhash, but for other blocks
1228  *        there may be!
1229  * @param type entries of which type are relevant?
1230  *     Use 0 for any type.
1231  * @param iter function to call on each matching value;
1232  *        will be called once with a NULL value at the end
1233  * @param iter_cls closure for iter
1234  */
1235 static void
1236 sqlite_plugin_get (void *cls,
1237                    const GNUNET_HashCode * key,
1238                    const GNUNET_HashCode * vhash,
1239                    enum GNUNET_BLOCK_Type type,
1240                    PluginIterator iter, void *iter_cls)
1241 {
1242   struct Plugin *plugin = cls;
1243   struct GetNextContext *gpc;
1244   struct NextContext *nc;
1245   int ret;
1246   int total;
1247   sqlite3_stmt *stmt;
1248   char scratch[256];
1249   int sqoff;
1250
1251   GNUNET_assert (iter != NULL);
1252   if (key == NULL)
1253     {
1254       sqlite_plugin_iter_all_now (cls, type, iter, iter_cls);
1255       return;
1256     }
1257   GNUNET_snprintf (scratch, sizeof (scratch),
1258                    "SELECT count(*) FROM gn090 WHERE hash=:1%s%s",
1259                    vhash == NULL ? "" : " AND vhash=:2",
1260                    type == 0 ? "" : (vhash ==
1261                                      NULL) ? " AND type=:2" : " AND type=:3");
1262   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1263     {
1264       LOG_SQLITE (plugin, NULL,
1265                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1266       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1267       return;
1268     }
1269   sqoff = 1;
1270   ret = sqlite3_bind_blob (stmt,
1271                            sqoff++,
1272                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1273   if ((vhash != NULL) && (ret == SQLITE_OK))
1274     ret = sqlite3_bind_blob (stmt,
1275                              sqoff++,
1276                              vhash,
1277                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1278   if ((type != 0) && (ret == SQLITE_OK))
1279     ret = sqlite3_bind_int (stmt, sqoff++, type);
1280   if (SQLITE_OK != ret)
1281     {
1282       LOG_SQLITE (plugin, NULL,
1283                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1284       sqlite3_reset (stmt);
1285       sqlite3_finalize (stmt);
1286       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1287       return;
1288     }
1289   ret = sqlite3_step (stmt);
1290   if (ret != SQLITE_ROW)
1291     {
1292       LOG_SQLITE (plugin, NULL,
1293                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1294                   "sqlite_step");
1295       sqlite3_reset (stmt);
1296       sqlite3_finalize (stmt);
1297       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1298       return;
1299     }
1300   total = sqlite3_column_int (stmt, 0);
1301   sqlite3_reset (stmt);
1302   sqlite3_finalize (stmt);
1303   if (0 == total)
1304     {
1305       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1306       return;
1307     }
1308
1309   GNUNET_snprintf (scratch, sizeof (scratch),
1310                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1311                    "FROM gn090 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1312                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1313                    vhash == NULL ? "" : " AND vhash=:2",
1314                    type == 0 ? "" : (vhash ==
1315                                      NULL) ? " AND type=:2" : " AND type=:3",
1316                    sqoff, sqoff + 1);
1317   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1318     {
1319       LOG_SQLITE (plugin, NULL,
1320                   GNUNET_ERROR_TYPE_ERROR |
1321                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1322       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1323       return;
1324     }
1325   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1326                       sizeof(struct GetNextContext));
1327   nc->plugin = plugin;
1328   nc->iter = iter;
1329   nc->iter_cls = iter_cls;
1330   nc->stmt = stmt;
1331   gpc = (struct GetNextContext*) &nc[1];
1332   gpc->total = total;
1333   gpc->type = type;
1334   gpc->key = *key;
1335   gpc->stmt = stmt; /* alias used for freeing at the end! */
1336   if (NULL != vhash)
1337     {
1338       gpc->have_vhash = GNUNET_YES;
1339       gpc->vhash = *vhash;
1340     }
1341   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1342   nc->prep = &get_next_prepare;
1343   nc->prep_cls = gpc;
1344   sqlite_next_request (nc, GNUNET_NO);
1345 }
1346
1347
1348 /**
1349  * Execute statement that gets a row and call the iterator
1350  * with the result.  Resets the statement afterwards.
1351  *
1352  * @param plugin the plugin
1353  * @param stmt the statement
1354  * @param iter iterator to call
1355  * @param iter_cls closure for 'iter'
1356  */
1357 static void
1358 execute_get (struct Plugin *plugin,
1359              sqlite3_stmt *stmt,
1360              PluginIterator iter, void *iter_cls)
1361 {
1362   int n;
1363   struct GNUNET_TIME_Absolute expiration;
1364   unsigned long long rowid;
1365   unsigned int size;
1366   int ret;
1367
1368   n = sqlite3_step (stmt);
1369   switch (n)
1370     {
1371     case SQLITE_ROW:
1372       size = sqlite3_column_bytes (stmt, 5);
1373       rowid = sqlite3_column_int64 (stmt, 6);
1374       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1375         {
1376           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
1377                            "sqlite",
1378                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
1379           if (SQLITE_OK != sqlite3_reset (stmt))
1380             LOG_SQLITE (plugin, NULL,
1381                         GNUNET_ERROR_TYPE_ERROR |
1382                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1383           if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1384             plugin->env->duc (plugin->env->cls,
1385                               - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));        
1386           break;
1387         }
1388       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1389       ret = iter (iter_cls,
1390                   NULL,
1391                   sqlite3_column_blob (stmt, 4) /* key */,
1392                   size,
1393                   sqlite3_column_blob (stmt, 5) /* data */, 
1394                   sqlite3_column_int (stmt, 0) /* type */,
1395                   sqlite3_column_int (stmt, 1) /* priority */,
1396                   sqlite3_column_int (stmt, 2) /* anonymity */,
1397                   expiration,
1398                   rowid);
1399       if (SQLITE_OK != sqlite3_reset (stmt))
1400         LOG_SQLITE (plugin, NULL,
1401                     GNUNET_ERROR_TYPE_ERROR |
1402                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1403       if ( (GNUNET_NO == ret) &&
1404            (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1405         plugin->env->duc (plugin->env->cls,
1406                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
1407       return;
1408     case SQLITE_DONE:
1409       /* database must be empty */
1410       if (SQLITE_OK != sqlite3_reset (stmt))
1411         LOG_SQLITE (plugin, NULL,
1412                     GNUNET_ERROR_TYPE_ERROR |
1413                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1414       break;
1415     case SQLITE_BUSY:    
1416     case SQLITE_ERROR:
1417     case SQLITE_MISUSE:
1418     default:
1419       LOG_SQLITE (plugin, NULL,
1420                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
1421                   "sqlite3_step");
1422       (void) sqlite3_reset (stmt);
1423       GNUNET_break (0);
1424       database_shutdown (plugin);
1425       database_setup (plugin->env->cfg,
1426                       plugin);
1427       break;
1428     }
1429   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,             
1430         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1431 }
1432
1433
1434 /**
1435  * Get a random item for replication.  Returns a single, not expired, random item
1436  * from those with the highest replication counters.  The item's 
1437  * replication counter is decremented by one IF it was positive before.
1438  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1439  *
1440  * @param cls closure
1441  * @param iter function to call the value (once only).
1442  * @param iter_cls closure for iter
1443  */
1444 static void
1445 sqlite_plugin_replication_get (void *cls,
1446                                PluginIterator iter, void *iter_cls)
1447 {
1448   struct Plugin *plugin = cls;
1449   sqlite3_stmt *stmt;
1450   struct GNUNET_TIME_Absolute now;
1451
1452 #if DEBUG_SQLITE
1453   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1454                    "sqlite",
1455                    "Getting random block based on replication order.\n");
1456 #endif
1457   stmt = plugin->selRepl;
1458   now = GNUNET_TIME_absolute_get ();
1459   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1460     {
1461       LOG_SQLITE (plugin, NULL,           
1462                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1463       if (SQLITE_OK != sqlite3_reset (stmt))
1464         LOG_SQLITE (plugin, NULL,
1465                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1466       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1467             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1468       return;
1469     }
1470   execute_get (plugin, stmt, iter, iter_cls);
1471 }
1472
1473
1474
1475 /**
1476  * Get a random item that has expired or has low priority.
1477  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1478  *
1479  * @param cls closure
1480  * @param iter function to call the value (once only).
1481  * @param iter_cls closure for iter
1482  */
1483 static void
1484 sqlite_plugin_expiration_get (void *cls,
1485                               PluginIterator iter, void *iter_cls)
1486 {
1487   struct Plugin *plugin = cls;
1488   sqlite3_stmt *stmt;
1489   struct GNUNET_TIME_Absolute now;
1490
1491 #if DEBUG_SQLITE
1492   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1493                    "sqlite",
1494                    "Getting random block based on expiration and priority order.\n");
1495 #endif
1496   now = GNUNET_TIME_absolute_get ();
1497   stmt = plugin->selExpi;
1498   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1499     {
1500       LOG_SQLITE (plugin, NULL,           
1501                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1502       if (SQLITE_OK != sqlite3_reset (stmt))
1503         LOG_SQLITE (plugin, NULL,
1504                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1505       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1506             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1507       return;
1508     }
1509   execute_get (plugin, stmt, iter, iter_cls);
1510 }
1511
1512
1513 /**
1514  * Drop database.
1515  *
1516  * @param cls our plugin context
1517  */
1518 static void 
1519 sqlite_plugin_drop (void *cls)
1520 {
1521   struct Plugin *plugin = cls;
1522   plugin->drop_on_shutdown = GNUNET_YES;
1523 }
1524
1525
1526 /**
1527  * FIXME.
1528  *
1529  * @param cls the 'struct Plugin'
1530  * @return the size of the database on disk (estimate)
1531  */
1532 static unsigned long long
1533 sqlite_plugin_get_size (void *cls)
1534 {
1535   struct Plugin *plugin = cls;
1536   sqlite3_stmt *stmt;
1537   uint64_t pages;
1538   uint64_t page_size;
1539 #if ENULL_DEFINED
1540   char *e;
1541 #endif
1542
1543   if (SQLITE_VERSION_NUMBER < 3006000)
1544     {
1545       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1546                        "datastore-sqlite",
1547                        _("sqlite version to old to determine size, assuming zero\n"));
1548       return 0;
1549     }
1550   CHECK (SQLITE_OK ==
1551          sqlite3_exec (plugin->dbh,
1552                        "VACUUM", NULL, NULL, ENULL));
1553   CHECK (SQLITE_OK ==
1554          sqlite3_exec (plugin->dbh,
1555                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1556   CHECK (SQLITE_OK ==
1557          sq_prepare (plugin->dbh,
1558                      "PRAGMA page_count",
1559                      &stmt));
1560   if (SQLITE_ROW ==
1561       sqlite3_step (stmt))
1562     pages = sqlite3_column_int64 (stmt, 0);
1563   else
1564     pages = 0;
1565   sqlite3_finalize (stmt);
1566   CHECK (SQLITE_OK ==
1567          sq_prepare (plugin->dbh,
1568                      "PRAGMA page_size",
1569                      &stmt));
1570   CHECK (SQLITE_ROW ==
1571          sqlite3_step (stmt));
1572   page_size = sqlite3_column_int64 (stmt, 0);
1573   sqlite3_finalize (stmt);
1574   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1575               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1576               (unsigned long long) pages,
1577               (unsigned long long) page_size);
1578   return  pages * page_size;
1579 }
1580                                          
1581
1582 /**
1583  * Entry point for the plugin.
1584  *
1585  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1586  * @return NULL on error, othrewise the plugin context
1587  */
1588 void *
1589 libgnunet_plugin_datastore_sqlite_init (void *cls)
1590 {
1591   static struct Plugin plugin;
1592   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1593   struct GNUNET_DATASTORE_PluginFunctions *api;
1594
1595   if (plugin.env != NULL)
1596     return NULL; /* can only initialize once! */
1597   memset (&plugin, 0, sizeof(struct Plugin));
1598   plugin.env = env;
1599   if (GNUNET_OK !=
1600       database_setup (env->cfg, &plugin))
1601     {
1602       database_shutdown (&plugin);
1603       return NULL;
1604     }
1605   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1606   api->cls = &plugin;
1607   api->get_size = &sqlite_plugin_get_size;
1608   api->put = &sqlite_plugin_put;
1609   api->next_request = &sqlite_next_request;
1610   api->get = &sqlite_plugin_get;
1611   api->replication_get = &sqlite_plugin_replication_get;
1612   api->expiration_get = &sqlite_plugin_expiration_get;
1613   api->update = &sqlite_plugin_update;
1614   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1615   api->iter_all_now = &sqlite_plugin_iter_all_now;
1616   api->drop = &sqlite_plugin_drop;
1617   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1618                    "sqlite", _("Sqlite database running\n"));
1619   return api;
1620 }
1621
1622
1623 /**
1624  * Exit point from the plugin.
1625  *
1626  * @param cls the plugin context (as returned by "init")
1627  * @return always NULL
1628  */
1629 void *
1630 libgnunet_plugin_datastore_sqlite_done (void *cls)
1631 {
1632   char *fn;
1633   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1634   struct Plugin *plugin = api->cls;
1635
1636 #if DEBUG_SQLITE
1637   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1638                    "sqlite",
1639                    "sqlite plugin is doneing\n");
1640 #endif
1641
1642   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1643     {
1644 #if DEBUG_SQLITE
1645       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1646                        "sqlite",
1647                        "Canceling next task\n");
1648 #endif
1649       GNUNET_SCHEDULER_cancel (plugin->next_task);
1650       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1651 #if DEBUG_SQLITE
1652       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1653                        "sqlite",
1654                        "Prep'ing next task\n");
1655 #endif
1656       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1657       GNUNET_free (plugin->next_task_nc);
1658       plugin->next_task_nc = NULL;
1659     }
1660   fn = NULL;
1661   if (plugin->drop_on_shutdown)
1662     fn = GNUNET_strdup (plugin->fn);
1663 #if DEBUG_SQLITE
1664   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1665                    "sqlite",
1666                    "Shutting down database\n");
1667 #endif
1668   database_shutdown (plugin);
1669   plugin->env = NULL; 
1670   GNUNET_free (api);
1671   if (fn != NULL)
1672     {
1673       if (0 != UNLINK(fn))
1674         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1675                                   "unlink",
1676                                   fn);
1677       GNUNET_free (fn);
1678     }
1679 #if DEBUG_SQLITE
1680   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1681                    "sqlite",
1682                    "sqlite plugin is finished doneing\n");
1683 #endif
1684   return NULL;
1685 }
1686
1687 /* end of plugin_datastore_sqlite.c */