insanity
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 /**
32  * Enable or disable logging debug messages.
33  */
34 #define DEBUG_SQLITE GNUNET_YES
35
36 /**
37  * We allocate items on the stack at times.  To prevent a stack
38  * overflow, we impose a limit on the maximum size for the data per
39  * item.  64k should be enough.
40  */
41 #define MAX_ITEM_SIZE 65536
42
43 /**
44  * After how many ms "busy" should a DB operation fail for good?
45  * A low value makes sure that we are more responsive to requests
46  * (especially PUTs).  A high value guarantees a higher success
47  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
48  *
49  * The default value of 250ms should ensure that users do not experience
50  * huge latencies while at the same time allowing operations to succeed
51  * with reasonable probability.
52  */
53 #define BUSY_TIMEOUT_MS 250
54
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
62
63
64
65 /**
66  * Context for all functions in this plugin.
67  */
68 struct Plugin 
69 {
70   /**
71    * Our execution environment.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment *env;
74
75   /**
76    * Database filename.
77    */
78   char *fn;
79
80   /**
81    * Native SQLite database handle.
82    */
83   sqlite3 *dbh;
84
85   /**
86    * Precompiled SQL for deletion.
87    */
88   sqlite3_stmt *delRow;
89
90   /**
91    * Precompiled SQL for update.
92    */
93   sqlite3_stmt *updPrio;
94
95   /**
96    * Precompiled SQL for replication decrement.
97    */
98   sqlite3_stmt *updRepl;
99
100   /**
101    * Precompiled SQL for replication selection.
102    */
103   sqlite3_stmt *selRepl;
104
105   /**
106    * Precompiled SQL for expiration selection.
107    */
108   sqlite3_stmt *selExpi;
109
110   /**
111    * Precompiled SQL for insertion.
112    */
113   sqlite3_stmt *insertContent;
114
115   /**
116    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
117    */
118   struct NextContext *next_task_nc;
119
120   /**
121    * Pending task with scheduler for running the next request.
122    */
123   GNUNET_SCHEDULER_TaskIdentifier next_task;
124
125   /**
126    * Should the database be dropped on shutdown?
127    */
128   int drop_on_shutdown;
129
130 };
131
132
133 /**
134  * @brief Prepare a SQL statement
135  *
136  * @param dbh handle to the database
137  * @param zSql SQL statement, UTF-8 encoded
138  * @param ppStmt set to the prepared statement
139  * @return 0 on success
140  */
141 static int
142 sq_prepare (sqlite3 * dbh, 
143             const char *zSql,
144             sqlite3_stmt ** ppStmt)
145 {
146   char *dummy;
147   int result;
148
149   result = sqlite3_prepare_v2 (dbh,
150                                zSql,
151                                strlen (zSql), 
152                                ppStmt,
153                                (const char **) &dummy);
154 #if DEBUG_SQLITE && 0
155   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
156                    "sqlite",
157                    "Prepared `%s' / %p: %d\n",
158                    zSql,
159                    *ppStmt, 
160                    result);
161 #endif
162   return result;
163 }
164
165
166 /**
167  * Create our database indices.
168  * 
169  * @param dbh handle to the database
170  */
171 static void
172 create_indices (sqlite3 * dbh)
173 {
174   /* create indices */
175   sqlite3_exec (dbh,
176                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
179                 NULL, NULL);
180   sqlite3_exec (dbh, "CREATE INDEX idx_expire_repl ON gn090 (expire ASC,repl DESC)", NULL, NULL,
181                 NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (anonLevel ASC,expire ASC,prio,type,hash)",
183                 NULL, NULL, NULL);
184   sqlite3_exec (dbh, "CREATE INDEX expire ON gn090 (expire)",
185                 NULL, NULL, NULL);
186 }
187
188
189 #if 0
190 #define CHECK(a) GNUNET_break(a)
191 #define ENULL NULL
192 #else
193 #define ENULL &e
194 #define ENULL_DEFINED 1
195 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
196 #endif
197
198
199 /**
200  * Initialize the database connections and associated
201  * data structures (create tables and indices
202  * as needed as well).
203  *
204  * @param cfg our configuration
205  * @param plugin the plugin context (state for this module)
206  * @return GNUNET_OK on success
207  */
208 static int
209 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
210                 struct Plugin *plugin)
211 {
212   sqlite3_stmt *stmt;
213   char *afsdir;
214 #if ENULL_DEFINED
215   char *e;
216 #endif
217   
218   if (GNUNET_OK != 
219       GNUNET_CONFIGURATION_get_value_filename (cfg,
220                                                "datastore-sqlite",
221                                                "FILENAME",
222                                                &afsdir))
223     {
224       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
225                        "sqlite",
226                        _("Option `%s' in section `%s' missing in configuration!\n"),
227                        "FILENAME",
228                        "datastore-sqlite");
229       return GNUNET_SYSERR;
230     }
231   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
232     {
233       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
234         {
235           GNUNET_break (0);
236           GNUNET_free (afsdir);
237           return GNUNET_SYSERR;
238         }
239       /* database is new or got deleted, reset payload to zero! */
240       plugin->env->duc (plugin->env->cls, 0);
241     }
242 #ifdef ENABLE_NLS
243   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
244                                        nl_langinfo (CODESET));
245 #else
246   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
247                                        "UTF-8");   /* good luck */
248 #endif
249   GNUNET_free (afsdir);
250   
251   /* Open database and precompile statements */
252   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
253     {
254       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
255                        "sqlite",
256                        _("Unable to initialize SQLite: %s.\n"),
257                        sqlite3_errmsg (plugin->dbh));
258       return GNUNET_SYSERR;
259     }
260   CHECK (SQLITE_OK ==
261          sqlite3_exec (plugin->dbh,
262                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
263   CHECK (SQLITE_OK ==
264          sqlite3_exec (plugin->dbh,
265                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
266   CHECK (SQLITE_OK ==
267          sqlite3_exec (plugin->dbh,
268                        "PRAGMA legacy_file_format=OFF", NULL, NULL, ENULL));
269   CHECK (SQLITE_OK ==
270          sqlite3_exec (plugin->dbh,
271                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
272   CHECK (SQLITE_OK ==
273          sqlite3_exec (plugin->dbh,
274                        "PRAGMA locking_mode=EXCLUSIVE", NULL, NULL, ENULL));
275   CHECK (SQLITE_OK ==
276          sqlite3_exec (plugin->dbh,
277                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
278   CHECK (SQLITE_OK ==
279          sqlite3_exec (plugin->dbh, 
280                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
281
282   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
283
284
285   /* We have to do it here, because otherwise precompiling SQL might fail */
286   CHECK (SQLITE_OK ==
287          sq_prepare (plugin->dbh,
288                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
289                      &stmt));
290   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
291        (sqlite3_exec (plugin->dbh,
292                       "CREATE TABLE gn090 ("
293                       "  repl INT4 NOT NULL DEFAULT 0,"
294                       "  type INT4 NOT NULL DEFAULT 0,"
295                       "  prio INT4 NOT NULL DEFAULT 0,"
296                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
297                       "  expire INT8 NOT NULL DEFAULT 0,"
298                       "  hash TEXT NOT NULL DEFAULT '',"
299                       "  vhash TEXT NOT NULL DEFAULT '',"
300                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
301                       NULL) != SQLITE_OK) )
302     {
303       LOG_SQLITE (plugin, NULL,
304                   GNUNET_ERROR_TYPE_ERROR, 
305                   "sqlite3_exec");
306       sqlite3_finalize (stmt);
307       return GNUNET_SYSERR;
308     }
309   sqlite3_finalize (stmt);
310   create_indices (plugin->dbh);
311
312   CHECK (SQLITE_OK ==
313          sq_prepare (plugin->dbh,
314                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
315                      &stmt));
316   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
317        (sqlite3_exec (plugin->dbh,
318                       "CREATE TABLE gn071 ("
319                       "  key TEXT NOT NULL DEFAULT '',"
320                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
321                       NULL) != SQLITE_OK) )
322     {
323       LOG_SQLITE (plugin, NULL,
324                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
325       sqlite3_finalize (stmt);
326       return GNUNET_SYSERR;
327     }
328   sqlite3_finalize (stmt);
329
330   if ((sq_prepare (plugin->dbh,
331                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE _ROWID_ = ?",
332                    &plugin->updPrio) != SQLITE_OK) ||
333       (sq_prepare (plugin->dbh,
334                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
335                    &plugin->updRepl) != SQLITE_OK) ||
336       (sq_prepare (plugin->dbh,
337                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE expire > ?"
338                    " ORDER BY repl DESC, Random() LIMIT 1",
339                    &plugin->selRepl) != SQLITE_OK) ||
340       (sq_prepare (plugin->dbh,
341                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
342                    " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
343                    " ORDER BY prio ASC LIMIT 1",
344                    &plugin->selExpi) != SQLITE_OK) ||
345       (sq_prepare (plugin->dbh,
346                    "INSERT INTO gn090 (repl, type, prio, "
347                    "anonLevel, expire, hash, vhash, value) "
348                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
349                    &plugin->insertContent) != SQLITE_OK) ||
350       (sq_prepare (plugin->dbh,
351                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
352                    &plugin->delRow) != SQLITE_OK))
353     {
354       LOG_SQLITE (plugin, NULL,
355                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
356       return GNUNET_SYSERR;
357     }
358
359   return GNUNET_OK;
360 }
361
362
363 /**
364  * Shutdown database connection and associate data
365  * structures.
366  * @param plugin the plugin context (state for this module)
367  */
368 static void
369 database_shutdown (struct Plugin *plugin)
370 {
371   int result;
372 #if SQLITE_VERSION_NUMBER >= 3007000
373   sqlite3_stmt *stmt;
374 #endif
375
376   if (plugin->delRow != NULL)
377     sqlite3_finalize (plugin->delRow);
378   if (plugin->updPrio != NULL)
379     sqlite3_finalize (plugin->updPrio);
380   if (plugin->updRepl != NULL)
381     sqlite3_finalize (plugin->updRepl);
382   if (plugin->selRepl != NULL)
383     sqlite3_finalize (plugin->selRepl);
384   if (plugin->selExpi != NULL)
385     sqlite3_finalize (plugin->selExpi);
386   if (plugin->insertContent != NULL)
387     sqlite3_finalize (plugin->insertContent);
388   result = sqlite3_close(plugin->dbh);
389 #if SQLITE_VERSION_NUMBER >= 3007000
390   if (result == SQLITE_BUSY)
391     {
392       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
393                        "sqlite",
394                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
395       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
396       while (stmt != NULL)
397         {
398 #if DEBUG_SQLITE
399           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
400                      "sqlite", "Closing statement %p\n", stmt);
401 #endif
402           result = sqlite3_finalize(stmt);
403 #if DEBUG_SQLITE
404           if (result != SQLITE_OK)
405               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
406                                "sqlite",
407                                "Failed to close statement %p: %d\n", stmt, result);
408 #endif
409           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
410         }
411       result = sqlite3_close(plugin->dbh);
412     }
413 #endif
414   if (SQLITE_OK != result)
415       LOG_SQLITE (plugin, NULL,
416                   GNUNET_ERROR_TYPE_ERROR, 
417                   "sqlite3_close");
418
419   GNUNET_free_non_null (plugin->fn);
420 }
421
422
423 /**
424  * Delete the database entry with the given
425  * row identifier.
426  *
427  * @param plugin the plugin context (state for this module)
428  * @param rid the ID of the row to delete
429  */
430 static int
431 delete_by_rowid (struct Plugin* plugin, 
432                  unsigned long long rid)
433 {
434   sqlite3_bind_int64 (plugin->delRow, 1, rid);
435   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
436     {
437       LOG_SQLITE (plugin, NULL,
438                   GNUNET_ERROR_TYPE_ERROR |
439                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
440       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
441           LOG_SQLITE (plugin, NULL,
442                       GNUNET_ERROR_TYPE_ERROR |
443                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
444       return GNUNET_SYSERR;
445     }
446   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
447     LOG_SQLITE (plugin, NULL,
448                 GNUNET_ERROR_TYPE_ERROR |
449                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
450   return GNUNET_OK;
451 }
452
453
454 /**
455  * Context for the universal iterator.
456  */
457 struct NextContext;
458
459 /**
460  * Type of a function that will prepare
461  * the next iteration.
462  *
463  * @param cls closure
464  * @param nc the next context; NULL for the last
465  *         call which gives the callback a chance to
466  *         clean up the closure
467  * @return GNUNET_OK on success, GNUNET_NO if there are
468  *         no more values, GNUNET_SYSERR on error
469  */
470 typedef int (*PrepareFunction)(void *cls,
471                                struct NextContext *nc);
472
473
474 /**
475  * Context we keep for the "next request" callback.
476  */
477 struct NextContext
478 {
479   /**
480    * Internal state.
481    */ 
482   struct Plugin *plugin;
483
484   /**
485    * Function to call on the next value.
486    */
487   PluginIterator iter;
488
489   /**
490    * Closure for iter.
491    */
492   void *iter_cls;
493
494   /**
495    * Function to call to prepare the next
496    * iteration.
497    */
498   PrepareFunction prep;
499
500   /**
501    * Closure for prep.
502    */
503   void *prep_cls;
504
505   /**
506    * Statement that the iterator will get the data
507    * from (updated or set by prep).
508    */ 
509   sqlite3_stmt *stmt;
510
511   /**
512    * Row ID of the last result.
513    */
514   unsigned long long last_rowid;
515
516   /**
517    * Key of the last result.
518    */
519   GNUNET_HashCode lastKey;  
520
521   /**
522    * Priority of the last value visited.
523    */ 
524   unsigned int lastPriority; 
525
526   /**
527    * Number of results processed so far.
528    */
529   unsigned int count;
530
531   /**
532    * Set to GNUNET_YES if we must stop now.
533    */
534   int end_it;
535 };
536
537
538 /**
539  * Continuation of "sqlite_next_request".
540  *
541  * @param cls the 'struct NextContext*'
542  * @param tc the task context (unused)
543  */
544 static void 
545 sqlite_next_request_cont (void *cls,
546                           const struct GNUNET_SCHEDULER_TaskContext *tc)
547 {
548   struct NextContext * nc = cls;
549   struct Plugin *plugin;
550   unsigned long long rowid;
551   int ret;
552   unsigned int size;
553   unsigned int hsize;
554   uint32_t anonymity;
555   uint32_t priority;
556   enum GNUNET_BLOCK_Type type;
557   const GNUNET_HashCode *key;
558   struct GNUNET_TIME_Absolute expiration;
559   
560   plugin = nc->plugin;
561   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
562   plugin->next_task_nc = NULL;
563   if ( (GNUNET_YES == nc->end_it) ||
564        (GNUNET_OK != (nc->prep(nc->prep_cls,
565                                nc))) )
566     {
567 #if DEBUG_SQLITE
568       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
569                        "sqlite",
570                        "Iteration completes after %u results\n",
571                        nc->count);
572 #endif
573     END:
574       nc->iter (nc->iter_cls, 
575                 NULL, NULL, 0, NULL, 0, 0, 0, 
576                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
577       nc->prep (nc->prep_cls, NULL);
578       GNUNET_free (nc);
579       return;
580     }
581
582   type = sqlite3_column_int (nc->stmt, 0);
583   priority = sqlite3_column_int (nc->stmt, 1);
584   anonymity = sqlite3_column_int (nc->stmt, 2);
585   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
586   hsize = sqlite3_column_bytes (nc->stmt, 4);
587   key = sqlite3_column_blob (nc->stmt, 4);
588   size = sqlite3_column_bytes (nc->stmt, 5);
589   rowid = sqlite3_column_int64 (nc->stmt, 6);
590   if (hsize != sizeof (GNUNET_HashCode))
591     {
592       GNUNET_break (0);
593       if (SQLITE_OK != sqlite3_reset (nc->stmt))
594         LOG_SQLITE (plugin, NULL,
595                     GNUNET_ERROR_TYPE_ERROR |
596                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
597       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
598         plugin->env->duc (plugin->env->cls,
599                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
600       goto END;
601     }
602 #if DEBUG_SQLITE
603   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
604                    "sqlite",
605                    "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
606                    type, 
607                    GNUNET_h2s(key),
608                    priority,
609                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
610                    (long long) expiration.abs_value);
611 #endif
612   if (size > MAX_ITEM_SIZE)
613     {
614       GNUNET_break (0);
615       if (SQLITE_OK != sqlite3_reset (nc->stmt))
616         LOG_SQLITE (plugin, NULL,
617                     GNUNET_ERROR_TYPE_ERROR |
618                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
619       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
620         plugin->env->duc (plugin->env->cls,
621                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); 
622       goto END;
623     }
624   {
625     char data[size];
626     
627     memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
628     nc->count++;
629     nc->last_rowid = rowid;
630     nc->lastPriority = priority;
631     nc->lastKey = *key;
632     if (SQLITE_OK != sqlite3_reset (nc->stmt))
633       LOG_SQLITE (plugin, NULL,
634                   GNUNET_ERROR_TYPE_ERROR |
635                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
636     ret = nc->iter (nc->iter_cls, nc,
637                     &nc->lastKey,
638                     size, data,
639                     type, priority,
640                     anonymity, expiration,
641                     rowid);
642   }
643   switch (ret)
644     {
645     case GNUNET_SYSERR:
646       nc->end_it = GNUNET_YES;
647       break;
648     case GNUNET_NO:
649       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
650         {
651 #if DEBUG_SQLITE
652           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
653                            "sqlite",
654                            "Removed entry %llu (%u bytes)\n",
655                            (unsigned long long) rowid,
656                            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
657 #endif
658           plugin->env->duc (plugin->env->cls,
659                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
660         }
661       break;
662     case GNUNET_YES:
663       break;
664     default:
665       GNUNET_break (0);
666     }
667 }
668
669
670 /**
671  * Function invoked on behalf of a "PluginIterator" asking the
672  * database plugin to call the iterator with the next item.
673  *
674  * @param next_cls whatever argument was given
675  *        to the PluginIterator as "next_cls".
676  * @param end_it set to GNUNET_YES if we
677  *        should terminate the iteration early
678  *        (iterator should be still called once more
679  *         to signal the end of the iteration).
680  */
681 static void 
682 sqlite_next_request (void *next_cls,
683                      int end_it)
684 {
685   struct NextContext * nc= next_cls;
686
687   if (GNUNET_YES == end_it)
688     nc->end_it = GNUNET_YES;
689   nc->plugin->next_task_nc = nc;
690   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
691                                                     nc);
692 }
693
694
695 /**
696  * Store an item in the datastore.
697  *
698  * @param cls closure
699  * @param key key for the item
700  * @param size number of bytes in data
701  * @param data content stored
702  * @param type type of the content
703  * @param priority priority of the content
704  * @param anonymity anonymity-level for the content
705  * @param replication replication-level for the content
706  * @param expiration expiration time for the content
707  * @param msg set to an error message
708  * @return GNUNET_OK on success
709  */
710 static int
711 sqlite_plugin_put (void *cls,
712                    const GNUNET_HashCode *key,
713                    uint32_t size,
714                    const void *data,
715                    enum GNUNET_BLOCK_Type type,
716                    uint32_t priority,
717                    uint32_t anonymity,
718                    uint32_t replication,
719                    struct GNUNET_TIME_Absolute expiration,
720                    char ** msg)
721 {
722   struct Plugin *plugin = cls;
723   int n;
724   int ret;
725   sqlite3_stmt *stmt;
726   GNUNET_HashCode vhash;
727
728   if (size > MAX_ITEM_SIZE)
729     return GNUNET_SYSERR;
730 #if DEBUG_SQLITE
731   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
732                    "sqlite",
733                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
734                    type, 
735                    GNUNET_h2s(key),
736                    priority,
737                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
738                    (long long) expiration.abs_value);
739 #endif
740   GNUNET_CRYPTO_hash (data, size, &vhash);
741   stmt = plugin->insertContent;
742   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
743       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
744       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
745       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
746       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
747       (SQLITE_OK !=
748        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
749                           SQLITE_TRANSIENT)) ||
750       (SQLITE_OK !=
751        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
752                           SQLITE_TRANSIENT))
753       || (SQLITE_OK !=
754           sqlite3_bind_blob (stmt, 8, data, size,
755                              SQLITE_TRANSIENT)))
756     {
757       LOG_SQLITE (plugin,
758                   msg,
759                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
760       if (SQLITE_OK != sqlite3_reset (stmt))
761         LOG_SQLITE (plugin, NULL,
762                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
763       return GNUNET_SYSERR;
764     }
765   n = sqlite3_step (stmt);
766   switch (n)
767     {
768     case SQLITE_DONE:
769       plugin->env->duc (plugin->env->cls,
770                         size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
771 #if DEBUG_SQLITE
772       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
773                        "sqlite",
774                        "Stored new entry (%u bytes)\n",
775                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
776 #endif
777       ret = GNUNET_OK;
778       break;
779     case SQLITE_BUSY:      
780       GNUNET_break (0);
781       LOG_SQLITE (plugin, msg,
782                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
783                   "sqlite3_step");
784       ret = GNUNET_SYSERR;
785       break;
786     default:
787       LOG_SQLITE (plugin, msg,
788                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
789                   "sqlite3_step");
790       if (SQLITE_OK != sqlite3_reset (stmt))
791         LOG_SQLITE (plugin, NULL,
792                     GNUNET_ERROR_TYPE_ERROR |
793                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
794       database_shutdown (plugin);
795       database_setup (plugin->env->cfg,
796                       plugin);
797       return GNUNET_SYSERR;    
798     }
799   if (SQLITE_OK != sqlite3_reset (stmt))
800     LOG_SQLITE (plugin, NULL,
801                 GNUNET_ERROR_TYPE_ERROR |
802                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
803   return ret;
804 }
805
806
807 /**
808  * Update the priority for a particular key in the datastore.  If
809  * the expiration time in value is different than the time found in
810  * the datastore, the higher value should be kept.  For the
811  * anonymity level, the lower value is to be used.  The specified
812  * priority should be added to the existing priority, ignoring the
813  * priority in value.
814  *
815  * Note that it is possible for multiple values to match this put.
816  * In that case, all of the respective values are updated.
817  *
818  * @param cls the plugin context (state for this module)
819  * @param uid unique identifier of the datum
820  * @param delta by how much should the priority
821  *     change?  If priority + delta < 0 the
822  *     priority should be set to 0 (never go
823  *     negative).
824  * @param expire new expiration time should be the
825  *     MAX of any existing expiration time and
826  *     this value
827  * @param msg set to an error message
828  * @return GNUNET_OK on success
829  */
830 static int
831 sqlite_plugin_update (void *cls,
832                       uint64_t uid,
833                       int delta, struct GNUNET_TIME_Absolute expire,
834                       char **msg)
835 {
836   struct Plugin *plugin = cls;
837   int n;
838
839   sqlite3_bind_int (plugin->updPrio, 1, delta);
840   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
841   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
842   n = sqlite3_step (plugin->updPrio);
843   sqlite3_reset (plugin->updPrio);
844   switch (n)
845     {
846     case SQLITE_DONE:
847 #if DEBUG_SQLITE
848       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
849                        "sqlite",
850                        "Block updated\n");
851 #endif
852       return GNUNET_OK;
853     case SQLITE_BUSY:
854       LOG_SQLITE (plugin, msg,
855                   GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
856                   "sqlite3_step");
857       return GNUNET_NO;
858     default:
859       LOG_SQLITE (plugin, msg,
860                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
861                   "sqlite3_step");
862       return GNUNET_SYSERR;
863     }
864 }
865
866
867 /**
868  * Internal context for an iteration.
869  */
870 struct ZeroIterContext
871 {
872   /**
873    * First iterator statement for zero-anonymity iteration.
874    */
875   sqlite3_stmt *stmt_1;
876
877   /**
878    * Second iterator statement for zero-anonymity iteration.
879    */
880   sqlite3_stmt *stmt_2;
881
882   /**
883    * Desired type for blocks returned by this iterator.
884    */
885   enum GNUNET_BLOCK_Type type;
886 };
887
888
889 /**
890  * Prepare our SQL query to obtain the next record from the database.
891  *
892  * @param cls our "struct ZeroIterContext"
893  * @param nc NULL to terminate the iteration, otherwise our context for
894  *           getting the next result.
895  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
896  *         GNUNET_SYSERR on error (or end of iteration)
897  */
898 static int
899 zero_iter_next_prepare (void *cls,
900                         struct NextContext *nc)
901 {
902   struct ZeroIterContext *ic = cls;
903   struct Plugin *plugin;
904   int ret;
905
906   if (nc == NULL)
907     {
908 #if DEBUG_SQLITE
909       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
910                   "Asked to clean up iterator state.\n");
911 #endif
912       sqlite3_finalize (ic->stmt_1);
913       sqlite3_finalize (ic->stmt_2);
914       return GNUNET_SYSERR;
915     }
916   plugin = nc->plugin;
917
918   /* first try iter 1 */
919 #if DEBUG_SQLITE
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Restricting to results larger than the last priority %u and key `%s'\n",
922               nc->lastPriority,
923               GNUNET_h2s (&nc->lastKey));
924 #endif
925   if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
926        (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2, 
927                                         &nc->lastKey, 
928                                         sizeof (GNUNET_HashCode),
929                                         SQLITE_TRANSIENT)) )
930     {
931       LOG_SQLITE (plugin, NULL,
932                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
933       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
934         LOG_SQLITE (plugin, NULL,
935                     GNUNET_ERROR_TYPE_ERROR | 
936                     GNUNET_ERROR_TYPE_BULK, 
937                     "sqlite3_reset");  
938       return GNUNET_SYSERR;
939     }
940   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
941     {      
942 #if DEBUG_SQLITE
943       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944                   "Result found using iterator 1\n");
945 #endif
946       nc->stmt = ic->stmt_1;
947       return GNUNET_OK;
948     }
949   if (ret != SQLITE_DONE)
950     {
951       LOG_SQLITE (plugin, NULL,
952                   GNUNET_ERROR_TYPE_ERROR |
953                   GNUNET_ERROR_TYPE_BULK,
954                   "sqlite3_step");
955       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
956         LOG_SQLITE (plugin, NULL,
957                     GNUNET_ERROR_TYPE_ERROR | 
958                     GNUNET_ERROR_TYPE_BULK, 
959                     "sqlite3_reset");  
960       return GNUNET_SYSERR;
961     }
962   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
963     LOG_SQLITE (plugin, NULL,
964                 GNUNET_ERROR_TYPE_ERROR | 
965                 GNUNET_ERROR_TYPE_BULK, 
966                 "sqlite3_reset");  
967
968   /* now try iter 2 */
969   if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
970     {
971       LOG_SQLITE (plugin, NULL,           
972                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
973       return GNUNET_SYSERR;
974     }
975   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
976     {
977 #if DEBUG_SQLITE
978       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
979                   "Result found using iterator 2\n");
980 #endif
981       nc->stmt = ic->stmt_2;
982       return GNUNET_OK;
983     }
984   if (ret != SQLITE_DONE)
985     {
986       LOG_SQLITE (plugin, NULL,
987                   GNUNET_ERROR_TYPE_ERROR |
988                   GNUNET_ERROR_TYPE_BULK,
989                   "sqlite3_step");
990       if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
991         LOG_SQLITE (plugin, NULL,
992                     GNUNET_ERROR_TYPE_ERROR |
993                     GNUNET_ERROR_TYPE_BULK,
994                     "sqlite3_reset");
995       return GNUNET_SYSERR;
996     }
997   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
998     LOG_SQLITE (plugin, NULL,
999                 GNUNET_ERROR_TYPE_ERROR |
1000                 GNUNET_ERROR_TYPE_BULK,
1001                 "sqlite3_reset");
1002 #if DEBUG_SQLITE
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "No result found using either iterator\n");
1005 #endif
1006   return GNUNET_NO;
1007 }
1008
1009
1010 /**
1011  * Select a subset of the items in the datastore and call
1012  * the given iterator for each of them.
1013  *
1014  * @param cls our plugin context
1015  * @param type entries of which type should be considered?
1016  *        Use 0 for any type.
1017  * @param iter function to call on each matching value;
1018  *        will be called once with a NULL value at the end
1019  * @param iter_cls closure for iter
1020  */
1021 static void
1022 sqlite_plugin_iter_zero_anonymity (void *cls,
1023                                    enum GNUNET_BLOCK_Type type,
1024                                    PluginIterator iter,
1025                                    void *iter_cls)
1026 {
1027   struct Plugin *plugin = cls;
1028   struct GNUNET_TIME_Absolute now;
1029   struct NextContext *nc;
1030   struct ZeroIterContext *ic;
1031   sqlite3_stmt *stmt_1;
1032   sqlite3_stmt *stmt_2;
1033   char *q;
1034
1035   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1036   now = GNUNET_TIME_absolute_get ();
1037   GNUNET_asprintf (&q, 
1038                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1039                    "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) "
1040                    "ORDER BY hash DESC LIMIT 1",
1041                    (unsigned long long) now.abs_value,
1042                    type);
1043   if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
1044     {
1045       LOG_SQLITE (plugin, NULL,
1046                   GNUNET_ERROR_TYPE_ERROR |
1047                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1048       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1049       GNUNET_free (q);
1050       return;
1051     }
1052   GNUNET_free (q);
1053   GNUNET_asprintf (&q, 
1054                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1055                    "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) "
1056                    "ORDER BY prio DESC, hash DESC LIMIT 1",
1057                    (unsigned long long) now.abs_value,
1058                    type);
1059   if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1060     {
1061       LOG_SQLITE (plugin, NULL,
1062                   GNUNET_ERROR_TYPE_ERROR |
1063                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1064       sqlite3_finalize (stmt_1);
1065       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1066       GNUNET_free (q);
1067       return;
1068     }
1069   GNUNET_free (q);
1070   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1071                       sizeof(struct ZeroIterContext));
1072   nc->plugin = plugin;
1073   nc->iter = iter;
1074   nc->iter_cls = iter_cls;
1075   nc->stmt = NULL;
1076   ic = (struct ZeroIterContext*) &nc[1];
1077   ic->stmt_1 = stmt_1;
1078   ic->stmt_2 = stmt_2;
1079   ic->type = type;
1080   nc->prep = &zero_iter_next_prepare;
1081   nc->prep_cls = ic;
1082   nc->lastPriority = INT32_MAX;
1083   memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1084   sqlite_next_request (nc, GNUNET_NO);
1085 }
1086
1087
1088 /**
1089  * Context for get_next_prepare.
1090  */
1091 struct GetNextContext
1092 {
1093
1094   /**
1095    * Our prepared statement.
1096    */
1097   sqlite3_stmt *stmt;
1098
1099   /**
1100    * Plugin handle.
1101    */
1102   struct Plugin *plugin;
1103
1104   /**
1105    * Key for the query.
1106    */
1107   GNUNET_HashCode key;
1108
1109   /**
1110    * Vhash for the query.
1111    */
1112   GNUNET_HashCode vhash;
1113
1114   /**
1115    * Expected total number of results.
1116    */
1117   unsigned int total;
1118
1119   /**
1120    * Offset to add for the selected result.
1121    */
1122   unsigned int off;
1123
1124   /**
1125    * Is vhash set?
1126    */
1127   int have_vhash;
1128
1129   /**
1130    * Desired block type.
1131    */
1132   enum GNUNET_BLOCK_Type type;
1133
1134 };
1135
1136
1137 /**
1138  * Prepare the stmt in 'nc' for the next round of execution, selecting the
1139  * next return value.
1140  *
1141  * @param cls our "struct GetNextContext*"
1142  * @param nc the general context
1143  * @return GNUNET_YES if there are more results, 
1144  *         GNUNET_NO if there are no more results,
1145  *         GNUNET_SYSERR on internal error
1146  */
1147 static int
1148 get_next_prepare (void *cls,
1149                   struct NextContext *nc)
1150 {
1151   struct GetNextContext *gnc = cls;
1152   int ret;
1153   int limit_off;
1154   unsigned int sqoff;
1155
1156   if (nc == NULL)
1157     {
1158       sqlite3_finalize (gnc->stmt);
1159       return GNUNET_SYSERR;
1160     }
1161   if (nc->count == gnc->total)
1162     return GNUNET_NO;
1163   if (nc->count + gnc->off == gnc->total)
1164     nc->last_rowid = 0;
1165   if (nc->count == 0)
1166     limit_off = gnc->off;
1167   else
1168     limit_off = 0;
1169   sqlite3_reset (nc->stmt);
1170   sqoff = 1;
1171   ret = sqlite3_bind_blob (nc->stmt,
1172                            sqoff++,
1173                            &gnc->key, 
1174                            sizeof (GNUNET_HashCode),
1175                            SQLITE_TRANSIENT);
1176   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1177     ret = sqlite3_bind_blob (nc->stmt,
1178                              sqoff++,
1179                              &gnc->vhash,
1180                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1181   if ((gnc->type != 0) && (ret == SQLITE_OK))
1182     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1183   if (ret == SQLITE_OK)
1184     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
1185   if (ret != SQLITE_OK)
1186     return GNUNET_SYSERR;
1187 #if DEBUG_SQLITE 
1188   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1189                    "sqlite",
1190                    "Preparing to GET for key `%s' with type %d at offset %u\n",
1191                    GNUNET_h2s (&gnc->key),
1192                    gnc->type,
1193                    limit_off);
1194 #endif
1195   ret = sqlite3_step (nc->stmt);
1196   switch (ret)
1197     {
1198     case SQLITE_ROW:
1199       return GNUNET_OK;  
1200     case SQLITE_DONE:
1201       return GNUNET_NO;
1202     default:
1203       LOG_SQLITE (gnc->plugin, NULL,
1204                   GNUNET_ERROR_TYPE_ERROR |
1205                   GNUNET_ERROR_TYPE_BULK,
1206                   "sqlite3_step");
1207       return GNUNET_SYSERR;
1208     }
1209 }
1210
1211
1212 /**
1213  * Iterate over the results for a particular key
1214  * in the datastore.
1215  *
1216  * @param cls closure
1217  * @param key key to match, never NULL
1218  * @param vhash hash of the value, maybe NULL (to
1219  *        match all values that have the right key).
1220  *        Note that for DBlocks there is no difference
1221  *        betwen key and vhash, but for other blocks
1222  *        there may be!
1223  * @param type entries of which type are relevant?
1224  *     Use 0 for any type.
1225  * @param iter function to call on each matching value;
1226  *        will be called once with a NULL value at the end
1227  * @param iter_cls closure for iter
1228  */
1229 static void
1230 sqlite_plugin_get (void *cls,
1231                    const GNUNET_HashCode *key,
1232                    const GNUNET_HashCode *vhash,
1233                    enum GNUNET_BLOCK_Type type,
1234                    PluginIterator iter, void *iter_cls)
1235 {
1236   struct Plugin *plugin = cls;
1237   struct GetNextContext *gnc;
1238   struct NextContext *nc;
1239   int ret;
1240   int total;
1241   sqlite3_stmt *stmt;
1242   char scratch[256];
1243   unsigned int sqoff;
1244
1245   GNUNET_assert (iter != NULL);
1246   GNUNET_assert (key != NULL);
1247   GNUNET_snprintf (scratch, sizeof (scratch),
1248                    "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
1249                    vhash == NULL ? "" : " AND vhash=?",
1250                    type  == 0    ? "" : " AND type=?");
1251   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1252     {
1253       LOG_SQLITE (plugin, NULL,
1254                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1255       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1256       return;
1257     }
1258   sqoff = 1;
1259   ret = sqlite3_bind_blob (stmt, sqoff++,
1260                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1261   if ((vhash != NULL) && (ret == SQLITE_OK))
1262     ret = sqlite3_bind_blob (stmt, sqoff++,
1263                              vhash,
1264                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1265   if ((type != 0) && (ret == SQLITE_OK))
1266     ret = sqlite3_bind_int (stmt, sqoff++, type);
1267   if (SQLITE_OK != ret)
1268     {
1269       LOG_SQLITE (plugin, NULL,
1270                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1271       sqlite3_finalize (stmt);
1272       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1273       return;
1274     }
1275   ret = sqlite3_step (stmt);
1276   if (ret != SQLITE_ROW)
1277     {
1278       LOG_SQLITE (plugin, NULL,
1279                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1280                   "sqlite_step");
1281       sqlite3_finalize (stmt);
1282       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1283       return;
1284     }
1285   total = sqlite3_column_int (stmt, 0);
1286   sqlite3_finalize (stmt);
1287   if (0 == total)
1288     {
1289       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1290       return;
1291     }
1292   GNUNET_snprintf (scratch, sizeof (scratch),
1293                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1294                    "FROM gn090 WHERE hash=?%s%s "
1295                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
1296                    vhash == NULL ? "" : " AND vhash=?",
1297                    type == 0 ? "" : " AND type=?");
1298
1299   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1300     {
1301       LOG_SQLITE (plugin, NULL,
1302                   GNUNET_ERROR_TYPE_ERROR |
1303                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1304       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1305       return;
1306     }
1307   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1308                       sizeof(struct GetNextContext));
1309   nc->plugin = plugin;
1310   nc->iter = iter;
1311   nc->iter_cls = iter_cls;
1312   nc->stmt = stmt;
1313   gnc = (struct GetNextContext*) &nc[1];
1314   gnc->total = total;
1315   gnc->type = type;
1316   gnc->key = *key;
1317   gnc->plugin = plugin;
1318   gnc->stmt = stmt; /* alias used for freeing at the end! */
1319   if (NULL != vhash)
1320     {
1321       gnc->have_vhash = GNUNET_YES;
1322       gnc->vhash = *vhash;
1323     }
1324   gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1325   nc->prep = &get_next_prepare;
1326   nc->prep_cls = gnc;
1327   sqlite_next_request (nc, GNUNET_NO);
1328 }
1329
1330
1331 /**
1332  * Execute statement that gets a row and call the callback
1333  * with the result.  Resets the statement afterwards.
1334  *
1335  * @param plugin the plugin
1336  * @param stmt the statement
1337  * @param iter iterator to call
1338  * @param iter_cls closure for 'iter'
1339  */
1340 static void
1341 execute_get (struct Plugin *plugin,
1342              sqlite3_stmt *stmt,
1343              PluginIterator iter, void *iter_cls)
1344 {
1345   int n;
1346   struct GNUNET_TIME_Absolute expiration;
1347   unsigned long long rowid;
1348   unsigned int size;
1349   int ret;
1350
1351   n = sqlite3_step (stmt);
1352   switch (n)
1353     {
1354     case SQLITE_ROW:
1355       size = sqlite3_column_bytes (stmt, 5);
1356       rowid = sqlite3_column_int64 (stmt, 6);
1357       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1358         {
1359           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
1360                            "sqlite",
1361                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
1362           if (SQLITE_OK != sqlite3_reset (stmt))
1363             LOG_SQLITE (plugin, NULL,
1364                         GNUNET_ERROR_TYPE_ERROR |
1365                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1366           if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1367             plugin->env->duc (plugin->env->cls,
1368                               - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));        
1369           break;
1370         }
1371       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1372       ret = iter (iter_cls,
1373                   NULL,
1374                   sqlite3_column_blob (stmt, 4) /* key */,
1375                   size,
1376                   sqlite3_column_blob (stmt, 5) /* data */, 
1377                   sqlite3_column_int (stmt, 0) /* type */,
1378                   sqlite3_column_int (stmt, 1) /* priority */,
1379                   sqlite3_column_int (stmt, 2) /* anonymity */,
1380                   expiration,
1381                   rowid);
1382       if (SQLITE_OK != sqlite3_reset (stmt))
1383         LOG_SQLITE (plugin, NULL,
1384                     GNUNET_ERROR_TYPE_ERROR |
1385                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1386       if ( (GNUNET_NO == ret) &&
1387            (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1388         plugin->env->duc (plugin->env->cls,
1389                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
1390       return;
1391     case SQLITE_DONE:
1392       /* database must be empty */
1393       if (SQLITE_OK != sqlite3_reset (stmt))
1394         LOG_SQLITE (plugin, NULL,
1395                     GNUNET_ERROR_TYPE_ERROR |
1396                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1397       break;
1398     case SQLITE_BUSY:    
1399     case SQLITE_ERROR:
1400     case SQLITE_MISUSE:
1401     default:
1402       LOG_SQLITE (plugin, NULL,
1403                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
1404                   "sqlite3_step");
1405       if (SQLITE_OK != sqlite3_reset (stmt))
1406         LOG_SQLITE (plugin, NULL,
1407                     GNUNET_ERROR_TYPE_ERROR |
1408                     GNUNET_ERROR_TYPE_BULK,
1409                     "sqlite3_reset");
1410       GNUNET_break (0);
1411       database_shutdown (plugin);
1412       database_setup (plugin->env->cfg,
1413                       plugin);
1414       break;
1415     }
1416   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,             
1417         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1418 }
1419
1420
1421 /**
1422  * Get a random item for replication.  Returns a single, not expired, random item
1423  * from those with the highest replication counters.  The item's 
1424  * replication counter is decremented by one IF it was positive before.
1425  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1426  *
1427  * @param cls closure
1428  * @param iter function to call the value (once only).
1429  * @param iter_cls closure for iter
1430  */
1431 static void
1432 sqlite_plugin_replication_get (void *cls,
1433                                PluginIterator iter, void *iter_cls)
1434 {
1435   struct Plugin *plugin = cls;
1436   sqlite3_stmt *stmt;
1437   struct GNUNET_TIME_Absolute now;
1438
1439 #if DEBUG_SQLITE
1440   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1441                    "sqlite",
1442                    "Getting random block based on replication order.\n");
1443 #endif
1444   stmt = plugin->selRepl;
1445   now = GNUNET_TIME_absolute_get ();
1446   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1447     {
1448       LOG_SQLITE (plugin, NULL,           
1449                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1450       if (SQLITE_OK != sqlite3_reset (stmt))
1451         LOG_SQLITE (plugin, NULL,
1452                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1453       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1454             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1455       return;
1456     }
1457   execute_get (plugin, stmt, iter, iter_cls);
1458 }
1459
1460
1461
1462 /**
1463  * Get a random item that has expired or has low priority.
1464  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1465  *
1466  * @param cls closure
1467  * @param iter function to call the value (once only).
1468  * @param iter_cls closure for iter
1469  */
1470 static void
1471 sqlite_plugin_expiration_get (void *cls,
1472                               PluginIterator iter, void *iter_cls)
1473 {
1474   struct Plugin *plugin = cls;
1475   sqlite3_stmt *stmt;
1476   struct GNUNET_TIME_Absolute now;
1477
1478 #if DEBUG_SQLITE
1479   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1480                    "sqlite",
1481                    "Getting random block based on expiration and priority order.\n");
1482 #endif
1483   now = GNUNET_TIME_absolute_get ();
1484   stmt = plugin->selExpi;
1485   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1486     {
1487       LOG_SQLITE (plugin, NULL,           
1488                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1489       if (SQLITE_OK != sqlite3_reset (stmt))
1490         LOG_SQLITE (plugin, NULL,
1491                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1492       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1493             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1494       return;
1495     }
1496   execute_get (plugin, stmt, iter, iter_cls);
1497 }
1498
1499
1500 /**
1501  * Drop database.
1502  *
1503  * @param cls our plugin context
1504  */
1505 static void 
1506 sqlite_plugin_drop (void *cls)
1507 {
1508   struct Plugin *plugin = cls;
1509   plugin->drop_on_shutdown = GNUNET_YES;
1510 }
1511
1512
1513 /**
1514  * Get an estimate of how much space the database is
1515  * currently using.
1516  *
1517  * @param cls the 'struct Plugin'
1518  * @return the size of the database on disk (estimate)
1519  */
1520 static unsigned long long
1521 sqlite_plugin_get_size (void *cls)
1522 {
1523   struct Plugin *plugin = cls;
1524   sqlite3_stmt *stmt;
1525   uint64_t pages;
1526   uint64_t page_size;
1527 #if ENULL_DEFINED
1528   char *e;
1529 #endif
1530
1531   if (SQLITE_VERSION_NUMBER < 3006000)
1532     {
1533       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1534                        "datastore-sqlite",
1535                        _("sqlite version to old to determine size, assuming zero\n"));
1536       return 0;
1537     }
1538   CHECK (SQLITE_OK ==
1539          sqlite3_exec (plugin->dbh,
1540                        "VACUUM", NULL, NULL, ENULL));
1541   CHECK (SQLITE_OK ==
1542          sqlite3_exec (plugin->dbh,
1543                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1544   CHECK (SQLITE_OK ==
1545          sq_prepare (plugin->dbh,
1546                      "PRAGMA page_count",
1547                      &stmt));
1548   if (SQLITE_ROW ==
1549       sqlite3_step (stmt))
1550     pages = sqlite3_column_int64 (stmt, 0);
1551   else
1552     pages = 0;
1553   sqlite3_finalize (stmt);
1554   CHECK (SQLITE_OK ==
1555          sq_prepare (plugin->dbh,
1556                      "PRAGMA page_size",
1557                      &stmt));
1558   CHECK (SQLITE_ROW ==
1559          sqlite3_step (stmt));
1560   page_size = sqlite3_column_int64 (stmt, 0);
1561   sqlite3_finalize (stmt);
1562   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1563               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1564               (unsigned long long) pages,
1565               (unsigned long long) page_size);
1566   return  pages * page_size;
1567 }
1568                                          
1569
1570 /**
1571  * Entry point for the plugin.
1572  *
1573  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1574  * @return NULL on error, othrewise the plugin context
1575  */
1576 void *
1577 libgnunet_plugin_datastore_sqlite_init (void *cls)
1578 {
1579   static struct Plugin plugin;
1580   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1581   struct GNUNET_DATASTORE_PluginFunctions *api;
1582
1583   if (plugin.env != NULL)
1584     return NULL; /* can only initialize once! */
1585   memset (&plugin, 0, sizeof(struct Plugin));
1586   plugin.env = env;
1587   if (GNUNET_OK !=
1588       database_setup (env->cfg, &plugin))
1589     {
1590       database_shutdown (&plugin);
1591       return NULL;
1592     }
1593   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1594   api->cls = &plugin;
1595   api->get_size = &sqlite_plugin_get_size;
1596   api->put = &sqlite_plugin_put;
1597   api->next_request = &sqlite_next_request;
1598   api->get = &sqlite_plugin_get;
1599   api->replication_get = &sqlite_plugin_replication_get;
1600   api->expiration_get = &sqlite_plugin_expiration_get;
1601   api->update = &sqlite_plugin_update;
1602   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1603   api->drop = &sqlite_plugin_drop;
1604   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1605                    "sqlite", _("Sqlite database running\n"));
1606   return api;
1607 }
1608
1609
1610 /**
1611  * Exit point from the plugin.
1612  *
1613  * @param cls the plugin context (as returned by "init")
1614  * @return always NULL
1615  */
1616 void *
1617 libgnunet_plugin_datastore_sqlite_done (void *cls)
1618 {
1619   char *fn;
1620   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1621   struct Plugin *plugin = api->cls;
1622
1623 #if DEBUG_SQLITE
1624   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1625                    "sqlite",
1626                    "sqlite plugin is doneing\n");
1627 #endif
1628
1629   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1630     {
1631 #if DEBUG_SQLITE
1632       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1633                        "sqlite",
1634                        "Canceling next task\n");
1635 #endif
1636       GNUNET_SCHEDULER_cancel (plugin->next_task);
1637       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1638 #if DEBUG_SQLITE
1639       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1640                        "sqlite",
1641                        "Prep'ing next task\n");
1642 #endif
1643       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1644       GNUNET_free (plugin->next_task_nc);
1645       plugin->next_task_nc = NULL;
1646     }
1647   fn = NULL;
1648   if (plugin->drop_on_shutdown)
1649     fn = GNUNET_strdup (plugin->fn);
1650 #if DEBUG_SQLITE
1651   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1652                    "sqlite",
1653                    "Shutting down database\n");
1654 #endif
1655   database_shutdown (plugin);
1656   plugin->env = NULL; 
1657   GNUNET_free (api);
1658   if (fn != NULL)
1659     {
1660       if (0 != UNLINK(fn))
1661         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1662                                   "unlink",
1663                                   fn);
1664       GNUNET_free (fn);
1665     }
1666 #if DEBUG_SQLITE
1667   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1668                    "sqlite",
1669                    "sqlite plugin is finished doneing\n");
1670 #endif
1671   return NULL;
1672 }
1673
1674 /* end of plugin_datastore_sqlite.c */