de-experimentalizing
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 /**
32  * Enable or disable logging debug messages.
33  */
34 #define DEBUG_SQLITE GNUNET_NO
35
36 /**
37  * We allocate items on the stack at times.  To prevent a stack
38  * overflow, we impose a limit on the maximum size for the data per
39  * item.  64k should be enough.
40  */
41 #define MAX_ITEM_SIZE 65536
42
43 /**
44  * After how many ms "busy" should a DB operation fail for good?
45  * A low value makes sure that we are more responsive to requests
46  * (especially PUTs).  A high value guarantees a higher success
47  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
48  *
49  * The default value of 250ms should ensure that users do not experience
50  * huge latencies while at the same time allowing operations to succeed
51  * with reasonable probability.
52  */
53 #define BUSY_TIMEOUT_MS 250
54
55
56 /**
57  * Log an error message at log-level 'level' that indicates
58  * a failure of the command 'cmd' on file 'filename'
59  * with the message given by strerror(errno).
60  */
61 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
62
63
64
65 /**
66  * Context for all functions in this plugin.
67  */
68 struct Plugin 
69 {
70   /**
71    * Our execution environment.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment *env;
74
75   /**
76    * Database filename.
77    */
78   char *fn;
79
80   /**
81    * Native SQLite database handle.
82    */
83   sqlite3 *dbh;
84
85   /**
86    * Precompiled SQL for deletion.
87    */
88   sqlite3_stmt *delRow;
89
90   /**
91    * Precompiled SQL for update.
92    */
93   sqlite3_stmt *updPrio;
94
95   /**
96    * Precompiled SQL for replication decrement.
97    */
98   sqlite3_stmt *updRepl;
99
100   /**
101    * Precompiled SQL for replication selection.
102    */
103   sqlite3_stmt *selRepl;
104
105   /**
106    * Precompiled SQL for expiration selection.
107    */
108   sqlite3_stmt *selExpi;
109
110   /**
111    * Precompiled SQL for insertion.
112    */
113   sqlite3_stmt *insertContent;
114
115   /**
116    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
117    */
118   struct NextContext *next_task_nc;
119
120   /**
121    * Pending task with scheduler for running the next request.
122    */
123   GNUNET_SCHEDULER_TaskIdentifier next_task;
124
125   /**
126    * Should the database be dropped on shutdown?
127    */
128   int drop_on_shutdown;
129
130 };
131
132
133 /**
134  * @brief Prepare a SQL statement
135  *
136  * @param dbh handle to the database
137  * @param zSql SQL statement, UTF-8 encoded
138  * @param ppStmt set to the prepared statement
139  * @return 0 on success
140  */
141 static int
142 sq_prepare (sqlite3 * dbh, 
143             const char *zSql,
144             sqlite3_stmt ** ppStmt)
145 {
146   char *dummy;
147   int result;
148
149   result = sqlite3_prepare_v2 (dbh,
150                                zSql,
151                                strlen (zSql), 
152                                ppStmt,
153                                (const char **) &dummy);
154 #if DEBUG_SQLITE && 0
155   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
156                    "sqlite",
157                    "Prepared `%s' / %p: %d\n",
158                    zSql,
159                    *ppStmt, 
160                    result);
161 #endif
162   return result;
163 }
164
165
166 /**
167  * Create our database indices.
168  * 
169  * @param dbh handle to the database
170  */
171 static void
172 create_indices (sqlite3 * dbh)
173 {
174   /* create indices */
175   sqlite3_exec (dbh,
176                 "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
179                 NULL, NULL);
180   sqlite3_exec (dbh, "CREATE INDEX idx_expire_repl ON gn090 (expire ASC,repl DESC)", NULL, NULL,
181                 NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (anonLevel ASC,expire ASC,prio,type,hash)",
183                 NULL, NULL, NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)",
185                 NULL, NULL, NULL);
186   sqlite3_exec (dbh, "CREATE INDEX idx_repl ON gn090 (repl)",
187                 NULL, NULL, NULL);
188 }
189
190
191 #if 0
192 #define CHECK(a) GNUNET_break(a)
193 #define ENULL NULL
194 #else
195 #define ENULL &e
196 #define ENULL_DEFINED 1
197 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
198 #endif
199
200
201 /**
202  * Initialize the database connections and associated
203  * data structures (create tables and indices
204  * as needed as well).
205  *
206  * @param cfg our configuration
207  * @param plugin the plugin context (state for this module)
208  * @return GNUNET_OK on success
209  */
210 static int
211 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
212                 struct Plugin *plugin)
213 {
214   sqlite3_stmt *stmt;
215   char *afsdir;
216 #if ENULL_DEFINED
217   char *e;
218 #endif
219   
220   if (GNUNET_OK != 
221       GNUNET_CONFIGURATION_get_value_filename (cfg,
222                                                "datastore-sqlite",
223                                                "FILENAME",
224                                                &afsdir))
225     {
226       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
227                        "sqlite",
228                        _("Option `%s' in section `%s' missing in configuration!\n"),
229                        "FILENAME",
230                        "datastore-sqlite");
231       return GNUNET_SYSERR;
232     }
233   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
234     {
235       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
236         {
237           GNUNET_break (0);
238           GNUNET_free (afsdir);
239           return GNUNET_SYSERR;
240         }
241       /* database is new or got deleted, reset payload to zero! */
242       plugin->env->duc (plugin->env->cls, 0);
243     }
244 #ifdef ENABLE_NLS
245   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
246                                        nl_langinfo (CODESET));
247 #else
248   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
249                                        "UTF-8");   /* good luck */
250 #endif
251   GNUNET_free (afsdir);
252   
253   /* Open database and precompile statements */
254   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
255     {
256       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
257                        "sqlite",
258                        _("Unable to initialize SQLite: %s.\n"),
259                        sqlite3_errmsg (plugin->dbh));
260       return GNUNET_SYSERR;
261     }
262   CHECK (SQLITE_OK ==
263          sqlite3_exec (plugin->dbh,
264                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
265   CHECK (SQLITE_OK ==
266          sqlite3_exec (plugin->dbh,
267                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
268   CHECK (SQLITE_OK ==
269          sqlite3_exec (plugin->dbh,
270                        "PRAGMA legacy_file_format=OFF", NULL, NULL, ENULL));
271   CHECK (SQLITE_OK ==
272          sqlite3_exec (plugin->dbh,
273                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
274   CHECK (SQLITE_OK ==
275          sqlite3_exec (plugin->dbh,
276                        "PRAGMA locking_mode=EXCLUSIVE", NULL, NULL, ENULL));
277   CHECK (SQLITE_OK ==
278          sqlite3_exec (plugin->dbh,
279                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
280   CHECK (SQLITE_OK ==
281          sqlite3_exec (plugin->dbh, 
282                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
283
284   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
285
286
287   /* We have to do it here, because otherwise precompiling SQL might fail */
288   CHECK (SQLITE_OK ==
289          sq_prepare (plugin->dbh,
290                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
291                      &stmt));
292   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
293        (sqlite3_exec (plugin->dbh,
294                       "CREATE TABLE gn090 ("
295                       "  repl INT4 NOT NULL DEFAULT 0,"
296                       "  type INT4 NOT NULL DEFAULT 0,"
297                       "  prio INT4 NOT NULL DEFAULT 0,"
298                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
299                       "  expire INT8 NOT NULL DEFAULT 0,"
300                       "  hash TEXT NOT NULL DEFAULT '',"
301                       "  vhash TEXT NOT NULL DEFAULT '',"
302                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
303                       NULL) != SQLITE_OK) )
304     {
305       LOG_SQLITE (plugin, NULL,
306                   GNUNET_ERROR_TYPE_ERROR, 
307                   "sqlite3_exec");
308       sqlite3_finalize (stmt);
309       return GNUNET_SYSERR;
310     }
311   sqlite3_finalize (stmt);
312   create_indices (plugin->dbh);
313
314   if ((sq_prepare (plugin->dbh,
315                    "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) WHERE _ROWID_ = ?",
316                    &plugin->updPrio) != SQLITE_OK) ||
317       (sq_prepare (plugin->dbh,
318                    "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE _ROWID_ = ?",
319                    &plugin->updRepl) != SQLITE_OK) ||
320       (sq_prepare (plugin->dbh,
321                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090"
322                    " ORDER BY repl DESC, Random() LIMIT 1",
323                    &plugin->selRepl) != SQLITE_OK) ||
324       (sq_prepare (plugin->dbh,
325                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
326                    " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 LIMIT 1) OR expire < ?1 "
327                    " ORDER BY prio ASC LIMIT 1",
328                    &plugin->selExpi) != SQLITE_OK) ||
329       (sq_prepare (plugin->dbh,
330                    "INSERT INTO gn090 (repl, type, prio, "
331                    "anonLevel, expire, hash, vhash, value) "
332                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
333                    &plugin->insertContent) != SQLITE_OK) ||
334       (sq_prepare (plugin->dbh,
335                    "DELETE FROM gn090 WHERE _ROWID_ = ?",
336                    &plugin->delRow) != SQLITE_OK))
337     {
338       LOG_SQLITE (plugin, NULL,
339                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
340       return GNUNET_SYSERR;
341     }
342
343   return GNUNET_OK;
344 }
345
346
347 /**
348  * Shutdown database connection and associate data
349  * structures.
350  * @param plugin the plugin context (state for this module)
351  */
352 static void
353 database_shutdown (struct Plugin *plugin)
354 {
355   int result;
356 #if SQLITE_VERSION_NUMBER >= 3007000
357   sqlite3_stmt *stmt;
358 #endif
359
360   if (plugin->delRow != NULL)
361     sqlite3_finalize (plugin->delRow);
362   if (plugin->updPrio != NULL)
363     sqlite3_finalize (plugin->updPrio);
364   if (plugin->updRepl != NULL)
365     sqlite3_finalize (plugin->updRepl);
366   if (plugin->selRepl != NULL)
367     sqlite3_finalize (plugin->selRepl);
368   if (plugin->selExpi != NULL)
369     sqlite3_finalize (plugin->selExpi);
370   if (plugin->insertContent != NULL)
371     sqlite3_finalize (plugin->insertContent);
372   result = sqlite3_close(plugin->dbh);
373 #if SQLITE_VERSION_NUMBER >= 3007000
374   if (result == SQLITE_BUSY)
375     {
376       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
377                        "sqlite",
378                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
379       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
380       while (stmt != NULL)
381         {
382 #if DEBUG_SQLITE
383           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
384                      "sqlite", "Closing statement %p\n", stmt);
385 #endif
386           result = sqlite3_finalize(stmt);
387 #if DEBUG_SQLITE
388           if (result != SQLITE_OK)
389               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
390                                "sqlite",
391                                "Failed to close statement %p: %d\n", stmt, result);
392 #endif
393           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
394         }
395       result = sqlite3_close(plugin->dbh);
396     }
397 #endif
398   if (SQLITE_OK != result)
399       LOG_SQLITE (plugin, NULL,
400                   GNUNET_ERROR_TYPE_ERROR, 
401                   "sqlite3_close");
402
403   GNUNET_free_non_null (plugin->fn);
404 }
405
406
407 /**
408  * Delete the database entry with the given
409  * row identifier.
410  *
411  * @param plugin the plugin context (state for this module)
412  * @param rid the ID of the row to delete
413  */
414 static int
415 delete_by_rowid (struct Plugin* plugin, 
416                  unsigned long long rid)
417 {
418   sqlite3_bind_int64 (plugin->delRow, 1, rid);
419   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
420     {
421       LOG_SQLITE (plugin, NULL,
422                   GNUNET_ERROR_TYPE_ERROR |
423                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
424       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
425           LOG_SQLITE (plugin, NULL,
426                       GNUNET_ERROR_TYPE_ERROR |
427                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
428       return GNUNET_SYSERR;
429     }
430   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
431     LOG_SQLITE (plugin, NULL,
432                 GNUNET_ERROR_TYPE_ERROR |
433                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
434   return GNUNET_OK;
435 }
436
437
438 /**
439  * Context for the universal iterator.
440  */
441 struct NextContext;
442
443 /**
444  * Type of a function that will prepare
445  * the next iteration.
446  *
447  * @param cls closure
448  * @param nc the next context; NULL for the last
449  *         call which gives the callback a chance to
450  *         clean up the closure
451  * @return GNUNET_OK on success, GNUNET_NO if there are
452  *         no more values, GNUNET_SYSERR on error
453  */
454 typedef int (*PrepareFunction)(void *cls,
455                                struct NextContext *nc);
456
457
458 /**
459  * Context we keep for the "next request" callback.
460  */
461 struct NextContext
462 {
463   /**
464    * Internal state.
465    */ 
466   struct Plugin *plugin;
467
468   /**
469    * Function to call on the next value.
470    */
471   PluginIterator iter;
472
473   /**
474    * Closure for iter.
475    */
476   void *iter_cls;
477
478   /**
479    * Function to call to prepare the next
480    * iteration.
481    */
482   PrepareFunction prep;
483
484   /**
485    * Closure for prep.
486    */
487   void *prep_cls;
488
489   /**
490    * Statement that the iterator will get the data
491    * from (updated or set by prep).
492    */ 
493   sqlite3_stmt *stmt;
494
495   /**
496    * Row ID of the last result.
497    */
498   unsigned long long last_rowid;
499
500   /**
501    * Key of the last result.
502    */
503   GNUNET_HashCode lastKey;  
504
505   /**
506    * Priority of the last value visited.
507    */ 
508   unsigned int lastPriority; 
509
510   /**
511    * Number of results processed so far.
512    */
513   unsigned int count;
514
515   /**
516    * Set to GNUNET_YES if we must stop now.
517    */
518   int end_it;
519 };
520
521
522 /**
523  * Continuation of "sqlite_next_request".
524  *
525  * @param cls the 'struct NextContext*'
526  * @param tc the task context (unused)
527  */
528 static void 
529 sqlite_next_request_cont (void *cls,
530                           const struct GNUNET_SCHEDULER_TaskContext *tc)
531 {
532   struct NextContext * nc = cls;
533   struct Plugin *plugin;
534   unsigned long long rowid;
535   int ret;
536   unsigned int size;
537   unsigned int hsize;
538   uint32_t anonymity;
539   uint32_t priority;
540   enum GNUNET_BLOCK_Type type;
541   const GNUNET_HashCode *key;
542   struct GNUNET_TIME_Absolute expiration;
543   
544   plugin = nc->plugin;
545   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
546   plugin->next_task_nc = NULL;
547   if ( (GNUNET_YES == nc->end_it) ||
548        (GNUNET_OK != (nc->prep(nc->prep_cls,
549                                nc))) )
550     {
551 #if DEBUG_SQLITE
552       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
553                        "sqlite",
554                        "Iteration completes after %u results\n",
555                        nc->count);
556 #endif
557     END:
558       nc->iter (nc->iter_cls, 
559                 NULL, NULL, 0, NULL, 0, 0, 0, 
560                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
561       nc->prep (nc->prep_cls, NULL);
562       GNUNET_free (nc);
563       return;
564     }
565
566   type = sqlite3_column_int (nc->stmt, 0);
567   priority = sqlite3_column_int (nc->stmt, 1);
568   anonymity = sqlite3_column_int (nc->stmt, 2);
569   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
570   hsize = sqlite3_column_bytes (nc->stmt, 4);
571   key = sqlite3_column_blob (nc->stmt, 4);
572   size = sqlite3_column_bytes (nc->stmt, 5);
573   rowid = sqlite3_column_int64 (nc->stmt, 6);
574   if (hsize != sizeof (GNUNET_HashCode))
575     {
576       GNUNET_break (0);
577       if (SQLITE_OK != sqlite3_reset (nc->stmt))
578         LOG_SQLITE (plugin, NULL,
579                     GNUNET_ERROR_TYPE_ERROR |
580                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
581       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
582         plugin->env->duc (plugin->env->cls,
583                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
584       goto END;
585     }
586 #if DEBUG_SQLITE
587   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
588                    "sqlite",
589                    "Iterator returns value with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
590                    type, 
591                    GNUNET_h2s(key),
592                    priority,
593                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
594                    (long long) expiration.abs_value);
595 #endif
596   if (size > MAX_ITEM_SIZE)
597     {
598       GNUNET_break (0);
599       if (SQLITE_OK != sqlite3_reset (nc->stmt))
600         LOG_SQLITE (plugin, NULL,
601                     GNUNET_ERROR_TYPE_ERROR |
602                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
603       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
604         plugin->env->duc (plugin->env->cls,
605                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); 
606       goto END;
607     }
608   {
609     char data[size];
610     
611     memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
612     nc->count++;
613     nc->last_rowid = rowid;
614     nc->lastPriority = priority;
615     nc->lastKey = *key;
616     if (SQLITE_OK != sqlite3_reset (nc->stmt))
617       LOG_SQLITE (plugin, NULL,
618                   GNUNET_ERROR_TYPE_ERROR |
619                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
620     ret = nc->iter (nc->iter_cls, nc,
621                     &nc->lastKey,
622                     size, data,
623                     type, priority,
624                     anonymity, expiration,
625                     rowid);
626   }
627   switch (ret)
628     {
629     case GNUNET_SYSERR:
630       nc->end_it = GNUNET_YES;
631       break;
632     case GNUNET_NO:
633       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
634         {
635 #if DEBUG_SQLITE
636           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
637                            "sqlite",
638                            "Removed entry %llu (%u bytes)\n",
639                            (unsigned long long) rowid,
640                            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
641 #endif
642           plugin->env->duc (plugin->env->cls,
643                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
644         }
645       break;
646     case GNUNET_YES:
647       break;
648     default:
649       GNUNET_break (0);
650     }
651 }
652
653
654 /**
655  * Function invoked on behalf of a "PluginIterator" asking the
656  * database plugin to call the iterator with the next item.
657  *
658  * @param next_cls whatever argument was given
659  *        to the PluginIterator as "next_cls".
660  * @param end_it set to GNUNET_YES if we
661  *        should terminate the iteration early
662  *        (iterator should be still called once more
663  *         to signal the end of the iteration).
664  */
665 static void 
666 sqlite_next_request (void *next_cls,
667                      int end_it)
668 {
669   struct NextContext * nc= next_cls;
670
671   if (GNUNET_YES == end_it)
672     nc->end_it = GNUNET_YES;
673   nc->plugin->next_task_nc = nc;
674   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
675                                                     nc);
676 }
677
678
679 /**
680  * Store an item in the datastore.
681  *
682  * @param cls closure
683  * @param key key for the item
684  * @param size number of bytes in data
685  * @param data content stored
686  * @param type type of the content
687  * @param priority priority of the content
688  * @param anonymity anonymity-level for the content
689  * @param replication replication-level for the content
690  * @param expiration expiration time for the content
691  * @param msg set to an error message
692  * @return GNUNET_OK on success
693  */
694 static int
695 sqlite_plugin_put (void *cls,
696                    const GNUNET_HashCode *key,
697                    uint32_t size,
698                    const void *data,
699                    enum GNUNET_BLOCK_Type type,
700                    uint32_t priority,
701                    uint32_t anonymity,
702                    uint32_t replication,
703                    struct GNUNET_TIME_Absolute expiration,
704                    char ** msg)
705 {
706   struct Plugin *plugin = cls;
707   int n;
708   int ret;
709   sqlite3_stmt *stmt;
710   GNUNET_HashCode vhash;
711
712   if (size > MAX_ITEM_SIZE)
713     return GNUNET_SYSERR;
714 #if DEBUG_SQLITE
715   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
716                    "sqlite",
717                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
718                    type, 
719                    GNUNET_h2s(key),
720                    priority,
721                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
722                    (long long) expiration.abs_value);
723 #endif
724   GNUNET_CRYPTO_hash (data, size, &vhash);
725   stmt = plugin->insertContent;
726   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
727       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
728       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
729       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
730       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
731       (SQLITE_OK !=
732        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
733                           SQLITE_TRANSIENT)) ||
734       (SQLITE_OK !=
735        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
736                           SQLITE_TRANSIENT))
737       || (SQLITE_OK !=
738           sqlite3_bind_blob (stmt, 8, data, size,
739                              SQLITE_TRANSIENT)))
740     {
741       LOG_SQLITE (plugin,
742                   msg,
743                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
744       if (SQLITE_OK != sqlite3_reset (stmt))
745         LOG_SQLITE (plugin, NULL,
746                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
747       return GNUNET_SYSERR;
748     }
749   n = sqlite3_step (stmt);
750   switch (n)
751     {
752     case SQLITE_DONE:
753       plugin->env->duc (plugin->env->cls,
754                         size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
755 #if DEBUG_SQLITE
756       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
757                        "sqlite",
758                        "Stored new entry (%u bytes)\n",
759                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
760 #endif
761       ret = GNUNET_OK;
762       break;
763     case SQLITE_BUSY:      
764       GNUNET_break (0);
765       LOG_SQLITE (plugin, msg,
766                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
767                   "sqlite3_step");
768       ret = GNUNET_SYSERR;
769       break;
770     default:
771       LOG_SQLITE (plugin, msg,
772                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
773                   "sqlite3_step");
774       if (SQLITE_OK != sqlite3_reset (stmt))
775         LOG_SQLITE (plugin, NULL,
776                     GNUNET_ERROR_TYPE_ERROR |
777                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
778       database_shutdown (plugin);
779       database_setup (plugin->env->cfg,
780                       plugin);
781       return GNUNET_SYSERR;    
782     }
783   if (SQLITE_OK != sqlite3_reset (stmt))
784     LOG_SQLITE (plugin, NULL,
785                 GNUNET_ERROR_TYPE_ERROR |
786                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
787   return ret;
788 }
789
790
791 /**
792  * Update the priority for a particular key in the datastore.  If
793  * the expiration time in value is different than the time found in
794  * the datastore, the higher value should be kept.  For the
795  * anonymity level, the lower value is to be used.  The specified
796  * priority should be added to the existing priority, ignoring the
797  * priority in value.
798  *
799  * Note that it is possible for multiple values to match this put.
800  * In that case, all of the respective values are updated.
801  *
802  * @param cls the plugin context (state for this module)
803  * @param uid unique identifier of the datum
804  * @param delta by how much should the priority
805  *     change?  If priority + delta < 0 the
806  *     priority should be set to 0 (never go
807  *     negative).
808  * @param expire new expiration time should be the
809  *     MAX of any existing expiration time and
810  *     this value
811  * @param msg set to an error message
812  * @return GNUNET_OK on success
813  */
814 static int
815 sqlite_plugin_update (void *cls,
816                       uint64_t uid,
817                       int delta, struct GNUNET_TIME_Absolute expire,
818                       char **msg)
819 {
820   struct Plugin *plugin = cls;
821   int n;
822
823   sqlite3_bind_int (plugin->updPrio, 1, delta);
824   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
825   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
826   n = sqlite3_step (plugin->updPrio);
827   sqlite3_reset (plugin->updPrio);
828   switch (n)
829     {
830     case SQLITE_DONE:
831 #if DEBUG_SQLITE
832       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
833                        "sqlite",
834                        "Block updated\n");
835 #endif
836       return GNUNET_OK;
837     case SQLITE_BUSY:
838       LOG_SQLITE (plugin, msg,
839                   GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
840                   "sqlite3_step");
841       return GNUNET_NO;
842     default:
843       LOG_SQLITE (plugin, msg,
844                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
845                   "sqlite3_step");
846       return GNUNET_SYSERR;
847     }
848 }
849
850
851 /**
852  * Internal context for an iteration.
853  */
854 struct ZeroIterContext
855 {
856   /**
857    * First iterator statement for zero-anonymity iteration.
858    */
859   sqlite3_stmt *stmt_1;
860
861   /**
862    * Second iterator statement for zero-anonymity iteration.
863    */
864   sqlite3_stmt *stmt_2;
865
866   /**
867    * Desired type for blocks returned by this iterator.
868    */
869   enum GNUNET_BLOCK_Type type;
870 };
871
872
873 /**
874  * Prepare our SQL query to obtain the next record from the database.
875  *
876  * @param cls our "struct ZeroIterContext"
877  * @param nc NULL to terminate the iteration, otherwise our context for
878  *           getting the next result.
879  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
880  *         GNUNET_SYSERR on error (or end of iteration)
881  */
882 static int
883 zero_iter_next_prepare (void *cls,
884                         struct NextContext *nc)
885 {
886   struct ZeroIterContext *ic = cls;
887   struct Plugin *plugin;
888   int ret;
889
890   if (nc == NULL)
891     {
892 #if DEBUG_SQLITE
893       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
894                   "Asked to clean up iterator state.\n");
895 #endif
896       sqlite3_finalize (ic->stmt_1);
897       sqlite3_finalize (ic->stmt_2);
898       return GNUNET_SYSERR;
899     }
900   plugin = nc->plugin;
901
902   /* first try iter 1 */
903 #if DEBUG_SQLITE
904   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
905               "Restricting to results larger than the last priority %u and key `%s'\n",
906               nc->lastPriority,
907               GNUNET_h2s (&nc->lastKey));
908 #endif
909   if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
910        (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2, 
911                                         &nc->lastKey, 
912                                         sizeof (GNUNET_HashCode),
913                                         SQLITE_TRANSIENT)) )
914     {
915       LOG_SQLITE (plugin, NULL,
916                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
917       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
918         LOG_SQLITE (plugin, NULL,
919                     GNUNET_ERROR_TYPE_ERROR | 
920                     GNUNET_ERROR_TYPE_BULK, 
921                     "sqlite3_reset");  
922       return GNUNET_SYSERR;
923     }
924   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
925     {      
926 #if DEBUG_SQLITE
927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928                   "Result found using iterator 1\n");
929 #endif
930       nc->stmt = ic->stmt_1;
931       return GNUNET_OK;
932     }
933   if (ret != SQLITE_DONE)
934     {
935       LOG_SQLITE (plugin, NULL,
936                   GNUNET_ERROR_TYPE_ERROR |
937                   GNUNET_ERROR_TYPE_BULK,
938                   "sqlite3_step");
939       if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
940         LOG_SQLITE (plugin, NULL,
941                     GNUNET_ERROR_TYPE_ERROR | 
942                     GNUNET_ERROR_TYPE_BULK, 
943                     "sqlite3_reset");  
944       return GNUNET_SYSERR;
945     }
946   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
947     LOG_SQLITE (plugin, NULL,
948                 GNUNET_ERROR_TYPE_ERROR | 
949                 GNUNET_ERROR_TYPE_BULK, 
950                 "sqlite3_reset");  
951
952   /* now try iter 2 */
953   if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
954     {
955       LOG_SQLITE (plugin, NULL,           
956                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
957       return GNUNET_SYSERR;
958     }
959   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
960     {
961 #if DEBUG_SQLITE
962       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
963                   "Result found using iterator 2\n");
964 #endif
965       nc->stmt = ic->stmt_2;
966       return GNUNET_OK;
967     }
968   if (ret != SQLITE_DONE)
969     {
970       LOG_SQLITE (plugin, NULL,
971                   GNUNET_ERROR_TYPE_ERROR |
972                   GNUNET_ERROR_TYPE_BULK,
973                   "sqlite3_step");
974       if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
975         LOG_SQLITE (plugin, NULL,
976                     GNUNET_ERROR_TYPE_ERROR |
977                     GNUNET_ERROR_TYPE_BULK,
978                     "sqlite3_reset");
979       return GNUNET_SYSERR;
980     }
981   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
982     LOG_SQLITE (plugin, NULL,
983                 GNUNET_ERROR_TYPE_ERROR |
984                 GNUNET_ERROR_TYPE_BULK,
985                 "sqlite3_reset");
986 #if DEBUG_SQLITE
987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
988               "No result found using either iterator\n");
989 #endif
990   return GNUNET_NO;
991 }
992
993
994 /**
995  * Select a subset of the items in the datastore and call
996  * the given iterator for each of them.
997  *
998  * @param cls our plugin context
999  * @param type entries of which type should be considered?
1000  *        Use 0 for any type.
1001  * @param iter function to call on each matching value;
1002  *        will be called once with a NULL value at the end
1003  * @param iter_cls closure for iter
1004  */
1005 static void
1006 sqlite_plugin_iter_zero_anonymity (void *cls,
1007                                    enum GNUNET_BLOCK_Type type,
1008                                    PluginIterator iter,
1009                                    void *iter_cls)
1010 {
1011   struct Plugin *plugin = cls;
1012   struct GNUNET_TIME_Absolute now;
1013   struct NextContext *nc;
1014   struct ZeroIterContext *ic;
1015   sqlite3_stmt *stmt_1;
1016   sqlite3_stmt *stmt_2;
1017   char *q;
1018
1019   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1020   now = GNUNET_TIME_absolute_get ();
1021   GNUNET_asprintf (&q, 
1022                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1023                    "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND type=%d AND hash < ?2) "
1024                    "ORDER BY hash DESC LIMIT 1",
1025                    (unsigned long long) now.abs_value,
1026                    type);
1027   if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
1028     {
1029       LOG_SQLITE (plugin, NULL,
1030                   GNUNET_ERROR_TYPE_ERROR |
1031                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1032       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1033       GNUNET_free (q);
1034       return;
1035     }
1036   GNUNET_free (q);
1037   GNUNET_asprintf (&q, 
1038                    "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 "
1039                    "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND type=%d) "
1040                    "ORDER BY prio DESC, hash DESC LIMIT 1",
1041                    (unsigned long long) now.abs_value,
1042                    type);
1043   if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
1044     {
1045       LOG_SQLITE (plugin, NULL,
1046                   GNUNET_ERROR_TYPE_ERROR |
1047                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1048       sqlite3_finalize (stmt_1);
1049       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1050       GNUNET_free (q);
1051       return;
1052     }
1053   GNUNET_free (q);
1054   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1055                       sizeof(struct ZeroIterContext));
1056   nc->plugin = plugin;
1057   nc->iter = iter;
1058   nc->iter_cls = iter_cls;
1059   nc->stmt = NULL;
1060   ic = (struct ZeroIterContext*) &nc[1];
1061   ic->stmt_1 = stmt_1;
1062   ic->stmt_2 = stmt_2;
1063   ic->type = type;
1064   nc->prep = &zero_iter_next_prepare;
1065   nc->prep_cls = ic;
1066   nc->lastPriority = INT32_MAX;
1067   memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1068   sqlite_next_request (nc, GNUNET_NO);
1069 }
1070
1071
1072 /**
1073  * Context for get_next_prepare.
1074  */
1075 struct GetNextContext
1076 {
1077
1078   /**
1079    * Our prepared statement.
1080    */
1081   sqlite3_stmt *stmt;
1082
1083   /**
1084    * Plugin handle.
1085    */
1086   struct Plugin *plugin;
1087
1088   /**
1089    * Key for the query.
1090    */
1091   GNUNET_HashCode key;
1092
1093   /**
1094    * Vhash for the query.
1095    */
1096   GNUNET_HashCode vhash;
1097
1098   /**
1099    * Expected total number of results.
1100    */
1101   unsigned int total;
1102
1103   /**
1104    * Offset to add for the selected result.
1105    */
1106   unsigned int off;
1107
1108   /**
1109    * Is vhash set?
1110    */
1111   int have_vhash;
1112
1113   /**
1114    * Desired block type.
1115    */
1116   enum GNUNET_BLOCK_Type type;
1117
1118 };
1119
1120
1121 /**
1122  * Prepare the stmt in 'nc' for the next round of execution, selecting the
1123  * next return value.
1124  *
1125  * @param cls our "struct GetNextContext*"
1126  * @param nc the general context
1127  * @return GNUNET_YES if there are more results, 
1128  *         GNUNET_NO if there are no more results,
1129  *         GNUNET_SYSERR on internal error
1130  */
1131 static int
1132 get_next_prepare (void *cls,
1133                   struct NextContext *nc)
1134 {
1135   struct GetNextContext *gnc = cls;
1136   int ret;
1137   int limit_off;
1138   unsigned int sqoff;
1139
1140   if (nc == NULL)
1141     {
1142       sqlite3_finalize (gnc->stmt);
1143       return GNUNET_SYSERR;
1144     }
1145   if (nc->count == gnc->total)
1146     return GNUNET_NO;
1147   if (nc->count + gnc->off == gnc->total)
1148     nc->last_rowid = 0;
1149   if (nc->count == 0)
1150     limit_off = gnc->off;
1151   else
1152     limit_off = 0;
1153   sqlite3_reset (nc->stmt);
1154   sqoff = 1;
1155   ret = sqlite3_bind_blob (nc->stmt,
1156                            sqoff++,
1157                            &gnc->key, 
1158                            sizeof (GNUNET_HashCode),
1159                            SQLITE_TRANSIENT);
1160   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1161     ret = sqlite3_bind_blob (nc->stmt,
1162                              sqoff++,
1163                              &gnc->vhash,
1164                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1165   if ((gnc->type != 0) && (ret == SQLITE_OK))
1166     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1167   if (ret == SQLITE_OK)
1168     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
1169   if (ret != SQLITE_OK)
1170     return GNUNET_SYSERR;
1171 #if DEBUG_SQLITE 
1172   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1173                    "sqlite",
1174                    "Preparing to GET for key `%s' with type %d at offset %u\n",
1175                    GNUNET_h2s (&gnc->key),
1176                    gnc->type,
1177                    limit_off);
1178 #endif
1179   ret = sqlite3_step (nc->stmt);
1180   switch (ret)
1181     {
1182     case SQLITE_ROW:
1183       return GNUNET_OK;  
1184     case SQLITE_DONE:
1185       return GNUNET_NO;
1186     default:
1187       LOG_SQLITE (gnc->plugin, NULL,
1188                   GNUNET_ERROR_TYPE_ERROR |
1189                   GNUNET_ERROR_TYPE_BULK,
1190                   "sqlite3_step");
1191       return GNUNET_SYSERR;
1192     }
1193 }
1194
1195
1196 /**
1197  * Iterate over the results for a particular key
1198  * in the datastore.
1199  *
1200  * @param cls closure
1201  * @param key key to match, never NULL
1202  * @param vhash hash of the value, maybe NULL (to
1203  *        match all values that have the right key).
1204  *        Note that for DBlocks there is no difference
1205  *        betwen key and vhash, but for other blocks
1206  *        there may be!
1207  * @param type entries of which type are relevant?
1208  *     Use 0 for any type.
1209  * @param iter function to call on each matching value;
1210  *        will be called once with a NULL value at the end
1211  * @param iter_cls closure for iter
1212  */
1213 static void
1214 sqlite_plugin_get (void *cls,
1215                    const GNUNET_HashCode *key,
1216                    const GNUNET_HashCode *vhash,
1217                    enum GNUNET_BLOCK_Type type,
1218                    PluginIterator iter, void *iter_cls)
1219 {
1220   struct Plugin *plugin = cls;
1221   struct GetNextContext *gnc;
1222   struct NextContext *nc;
1223   int ret;
1224   int total;
1225   sqlite3_stmt *stmt;
1226   char scratch[256];
1227   unsigned int sqoff;
1228
1229   GNUNET_assert (iter != NULL);
1230   GNUNET_assert (key != NULL);
1231   GNUNET_snprintf (scratch, sizeof (scratch),
1232                    "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
1233                    vhash == NULL ? "" : " AND vhash=?",
1234                    type  == 0    ? "" : " AND type=?");
1235   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1236     {
1237       LOG_SQLITE (plugin, NULL,
1238                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1239       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1240       return;
1241     }
1242   sqoff = 1;
1243   ret = sqlite3_bind_blob (stmt, sqoff++,
1244                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1245   if ((vhash != NULL) && (ret == SQLITE_OK))
1246     ret = sqlite3_bind_blob (stmt, sqoff++,
1247                              vhash,
1248                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1249   if ((type != 0) && (ret == SQLITE_OK))
1250     ret = sqlite3_bind_int (stmt, sqoff++, type);
1251   if (SQLITE_OK != ret)
1252     {
1253       LOG_SQLITE (plugin, NULL,
1254                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1255       sqlite3_finalize (stmt);
1256       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1257       return;
1258     }
1259   ret = sqlite3_step (stmt);
1260   if (ret != SQLITE_ROW)
1261     {
1262       LOG_SQLITE (plugin, NULL,
1263                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1264                   "sqlite_step");
1265       sqlite3_finalize (stmt);
1266       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1267       return;
1268     }
1269   total = sqlite3_column_int (stmt, 0);
1270   sqlite3_finalize (stmt);
1271   if (0 == total)
1272     {
1273       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1274       return;
1275     }
1276   GNUNET_snprintf (scratch, sizeof (scratch),
1277                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ "
1278                    "FROM gn090 WHERE hash=?%s%s "
1279                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
1280                    vhash == NULL ? "" : " AND vhash=?",
1281                    type == 0 ? "" : " AND type=?");
1282
1283   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1284     {
1285       LOG_SQLITE (plugin, NULL,
1286                   GNUNET_ERROR_TYPE_ERROR |
1287                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1288       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1289       return;
1290     }
1291   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1292                       sizeof(struct GetNextContext));
1293   nc->plugin = plugin;
1294   nc->iter = iter;
1295   nc->iter_cls = iter_cls;
1296   nc->stmt = stmt;
1297   gnc = (struct GetNextContext*) &nc[1];
1298   gnc->total = total;
1299   gnc->type = type;
1300   gnc->key = *key;
1301   gnc->plugin = plugin;
1302   gnc->stmt = stmt; /* alias used for freeing at the end! */
1303   if (NULL != vhash)
1304     {
1305       gnc->have_vhash = GNUNET_YES;
1306       gnc->vhash = *vhash;
1307     }
1308   gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1309   nc->prep = &get_next_prepare;
1310   nc->prep_cls = gnc;
1311   sqlite_next_request (nc, GNUNET_NO);
1312 }
1313
1314
1315 /**
1316  * Execute statement that gets a row and call the callback
1317  * with the result.  Resets the statement afterwards.
1318  *
1319  * @param plugin the plugin
1320  * @param stmt the statement
1321  * @param iter iterator to call
1322  * @param iter_cls closure for 'iter'
1323  */
1324 static void
1325 execute_get (struct Plugin *plugin,
1326              sqlite3_stmt *stmt,
1327              PluginIterator iter, void *iter_cls)
1328 {
1329   int n;
1330   struct GNUNET_TIME_Absolute expiration;
1331   unsigned long long rowid;
1332   unsigned int size;
1333   int ret;
1334
1335   n = sqlite3_step (stmt);
1336   switch (n)
1337     {
1338     case SQLITE_ROW:
1339       size = sqlite3_column_bytes (stmt, 5);
1340       rowid = sqlite3_column_int64 (stmt, 6);
1341       if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
1342         {
1343           GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
1344                            "sqlite",
1345                            _("Invalid data in database.  Trying to fix (by deletion).\n"));
1346           if (SQLITE_OK != sqlite3_reset (stmt))
1347             LOG_SQLITE (plugin, NULL,
1348                         GNUNET_ERROR_TYPE_ERROR |
1349                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1350           if (GNUNET_OK == delete_by_rowid (plugin, rowid))
1351             plugin->env->duc (plugin->env->cls,
1352                               - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));        
1353           break;
1354         }
1355       expiration.abs_value = sqlite3_column_int64 (stmt, 3);
1356       ret = iter (iter_cls,
1357                   NULL,
1358                   sqlite3_column_blob (stmt, 4) /* key */,
1359                   size,
1360                   sqlite3_column_blob (stmt, 5) /* data */, 
1361                   sqlite3_column_int (stmt, 0) /* type */,
1362                   sqlite3_column_int (stmt, 1) /* priority */,
1363                   sqlite3_column_int (stmt, 2) /* anonymity */,
1364                   expiration,
1365                   rowid);
1366       if (SQLITE_OK != sqlite3_reset (stmt))
1367         LOG_SQLITE (plugin, NULL,
1368                     GNUNET_ERROR_TYPE_ERROR |
1369                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1370       if ( (GNUNET_NO == ret) &&
1371            (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
1372         plugin->env->duc (plugin->env->cls,
1373                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
1374       return;
1375     case SQLITE_DONE:
1376       /* database must be empty */
1377       if (SQLITE_OK != sqlite3_reset (stmt))
1378         LOG_SQLITE (plugin, NULL,
1379                     GNUNET_ERROR_TYPE_ERROR |
1380                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1381       break;
1382     case SQLITE_BUSY:    
1383     case SQLITE_ERROR:
1384     case SQLITE_MISUSE:
1385     default:
1386       LOG_SQLITE (plugin, NULL,
1387                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
1388                   "sqlite3_step");
1389       if (SQLITE_OK != sqlite3_reset (stmt))
1390         LOG_SQLITE (plugin, NULL,
1391                     GNUNET_ERROR_TYPE_ERROR |
1392                     GNUNET_ERROR_TYPE_BULK,
1393                     "sqlite3_reset");
1394       GNUNET_break (0);
1395       database_shutdown (plugin);
1396       database_setup (plugin->env->cfg,
1397                       plugin);
1398       break;
1399     }
1400   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,             
1401         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1402 }
1403
1404
1405 /**
1406  * Context for 'repl_iter' function.
1407  */
1408 struct ReplCtx
1409 {
1410   
1411   /**
1412    * Plugin handle.
1413    */
1414   struct Plugin *plugin;
1415   
1416   /**
1417    * Function to call for the result (or the NULL).
1418    */
1419   PluginIterator iter;
1420   
1421   /**
1422    * Closure for iter.
1423    */
1424   void *iter_cls;
1425 };
1426
1427
1428 /**
1429  * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
1430  * Decrements the replication counter and calls the original
1431  * iterator.
1432  *
1433  * @param cls closure
1434  * @param next_cls closure to pass to the "next" function.
1435  * @param key key for the content
1436  * @param size number of bytes in data
1437  * @param data content stored
1438  * @param type type of the content
1439  * @param priority priority of the content
1440  * @param anonymity anonymity-level for the content
1441  * @param expiration expiration time for the content
1442  * @param uid unique identifier for the datum;
1443  *        maybe 0 if no unique identifier is available
1444  *
1445  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
1446  *         (continue on call to "next", of course),
1447  *         GNUNET_NO to delete the item and continue (if supported)
1448  */
1449 static int
1450 repl_iter (void *cls,
1451            void *next_cls,
1452            const GNUNET_HashCode *key,
1453            uint32_t size,
1454            const void *data,
1455            enum GNUNET_BLOCK_Type type,
1456            uint32_t priority,
1457            uint32_t anonymity,
1458            struct GNUNET_TIME_Absolute expiration, 
1459            uint64_t uid)
1460 {
1461   struct ReplCtx *rc = cls;
1462   struct Plugin *plugin = rc->plugin;
1463   int ret;
1464
1465   ret = rc->iter (rc->iter_cls,
1466                   next_cls, key,
1467                   size, data, 
1468                   type, priority, anonymity, expiration,
1469                   uid);
1470   if (NULL != key)
1471     {
1472       sqlite3_bind_int64 (plugin->updRepl, 1, uid);
1473       if (SQLITE_DONE != sqlite3_step (plugin->updRepl))
1474         {
1475           LOG_SQLITE (plugin, NULL,
1476                       GNUNET_ERROR_TYPE_ERROR |
1477                       GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
1478           if (SQLITE_OK != sqlite3_reset (plugin->updRepl))
1479             LOG_SQLITE (plugin, NULL,
1480                         GNUNET_ERROR_TYPE_ERROR |
1481                         GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1482           return GNUNET_SYSERR;
1483         }
1484       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
1485         LOG_SQLITE (plugin, NULL,
1486                     GNUNET_ERROR_TYPE_ERROR |
1487                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1488     }
1489   return ret;
1490 }
1491
1492
1493 /**
1494  * Get a random item for replication.  Returns a single random item
1495  * from those with the highest replication counters.  The item's 
1496  * replication counter is decremented by one IF it was positive before.
1497  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1498  *
1499  * @param cls closure
1500  * @param iter function to call the value (once only).
1501  * @param iter_cls closure for iter
1502  */
1503 static void
1504 sqlite_plugin_replication_get (void *cls,
1505                                PluginIterator iter, void *iter_cls)
1506 {
1507   struct Plugin *plugin = cls;
1508   struct ReplCtx rc;
1509
1510 #if DEBUG_SQLITE
1511   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1512                    "sqlite",
1513                    "Getting random block based on replication order.\n");
1514 #endif
1515   rc.plugin = plugin;
1516   rc.iter = iter;
1517   rc.iter_cls = iter_cls;
1518   execute_get (plugin, plugin->selRepl, &repl_iter, &rc);
1519 }
1520
1521
1522
1523 /**
1524  * Get a random item that has expired or has low priority.
1525  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1526  *
1527  * @param cls closure
1528  * @param iter function to call the value (once only).
1529  * @param iter_cls closure for iter
1530  */
1531 static void
1532 sqlite_plugin_expiration_get (void *cls,
1533                               PluginIterator iter, void *iter_cls)
1534 {
1535   struct Plugin *plugin = cls;
1536   sqlite3_stmt *stmt;
1537   struct GNUNET_TIME_Absolute now;
1538
1539 #if DEBUG_SQLITE
1540   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1541                    "sqlite",
1542                    "Getting random block based on expiration and priority order.\n");
1543 #endif
1544   now = GNUNET_TIME_absolute_get ();
1545   stmt = plugin->selExpi;
1546   if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value))
1547     {
1548       LOG_SQLITE (plugin, NULL,           
1549                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
1550       if (SQLITE_OK != sqlite3_reset (stmt))
1551         LOG_SQLITE (plugin, NULL,
1552                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
1553       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1554             GNUNET_TIME_UNIT_ZERO_ABS, 0);
1555       return;
1556     }
1557   execute_get (plugin, stmt, iter, iter_cls);
1558 }
1559
1560
1561 /**
1562  * Drop database.
1563  *
1564  * @param cls our plugin context
1565  */
1566 static void 
1567 sqlite_plugin_drop (void *cls)
1568 {
1569   struct Plugin *plugin = cls;
1570   plugin->drop_on_shutdown = GNUNET_YES;
1571 }
1572
1573
1574 /**
1575  * Get an estimate of how much space the database is
1576  * currently using.
1577  *
1578  * @param cls the 'struct Plugin'
1579  * @return the size of the database on disk (estimate)
1580  */
1581 static unsigned long long
1582 sqlite_plugin_get_size (void *cls)
1583 {
1584   struct Plugin *plugin = cls;
1585   sqlite3_stmt *stmt;
1586   uint64_t pages;
1587   uint64_t page_size;
1588 #if ENULL_DEFINED
1589   char *e;
1590 #endif
1591
1592   if (SQLITE_VERSION_NUMBER < 3006000)
1593     {
1594       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1595                        "datastore-sqlite",
1596                        _("sqlite version to old to determine size, assuming zero\n"));
1597       return 0;
1598     }
1599   CHECK (SQLITE_OK ==
1600          sqlite3_exec (plugin->dbh,
1601                        "VACUUM", NULL, NULL, ENULL));
1602   CHECK (SQLITE_OK ==
1603          sqlite3_exec (plugin->dbh,
1604                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1605   CHECK (SQLITE_OK ==
1606          sq_prepare (plugin->dbh,
1607                      "PRAGMA page_count",
1608                      &stmt));
1609   if (SQLITE_ROW ==
1610       sqlite3_step (stmt))
1611     pages = sqlite3_column_int64 (stmt, 0);
1612   else
1613     pages = 0;
1614   sqlite3_finalize (stmt);
1615   CHECK (SQLITE_OK ==
1616          sq_prepare (plugin->dbh,
1617                      "PRAGMA page_size",
1618                      &stmt));
1619   CHECK (SQLITE_ROW ==
1620          sqlite3_step (stmt));
1621   page_size = sqlite3_column_int64 (stmt, 0);
1622   sqlite3_finalize (stmt);
1623   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1624               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1625               (unsigned long long) pages,
1626               (unsigned long long) page_size);
1627   return  pages * page_size;
1628 }
1629                                          
1630
1631 /**
1632  * Entry point for the plugin.
1633  *
1634  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1635  * @return NULL on error, othrewise the plugin context
1636  */
1637 void *
1638 libgnunet_plugin_datastore_sqlite_init (void *cls)
1639 {
1640   static struct Plugin plugin;
1641   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1642   struct GNUNET_DATASTORE_PluginFunctions *api;
1643
1644   if (plugin.env != NULL)
1645     return NULL; /* can only initialize once! */
1646   memset (&plugin, 0, sizeof(struct Plugin));
1647   plugin.env = env;
1648   if (GNUNET_OK !=
1649       database_setup (env->cfg, &plugin))
1650     {
1651       database_shutdown (&plugin);
1652       return NULL;
1653     }
1654   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1655   api->cls = &plugin;
1656   api->get_size = &sqlite_plugin_get_size;
1657   api->put = &sqlite_plugin_put;
1658   api->next_request = &sqlite_next_request;
1659   api->get = &sqlite_plugin_get;
1660   api->replication_get = &sqlite_plugin_replication_get;
1661   api->expiration_get = &sqlite_plugin_expiration_get;
1662   api->update = &sqlite_plugin_update;
1663   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1664   api->drop = &sqlite_plugin_drop;
1665   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1666                    "sqlite", _("Sqlite database running\n"));
1667   return api;
1668 }
1669
1670
1671 /**
1672  * Exit point from the plugin.
1673  *
1674  * @param cls the plugin context (as returned by "init")
1675  * @return always NULL
1676  */
1677 void *
1678 libgnunet_plugin_datastore_sqlite_done (void *cls)
1679 {
1680   char *fn;
1681   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1682   struct Plugin *plugin = api->cls;
1683
1684 #if DEBUG_SQLITE
1685   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1686                    "sqlite",
1687                    "sqlite plugin is doneing\n");
1688 #endif
1689
1690   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1691     {
1692 #if DEBUG_SQLITE
1693       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1694                        "sqlite",
1695                        "Canceling next task\n");
1696 #endif
1697       GNUNET_SCHEDULER_cancel (plugin->next_task);
1698       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1699 #if DEBUG_SQLITE
1700       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1701                        "sqlite",
1702                        "Prep'ing next task\n");
1703 #endif
1704       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1705       GNUNET_free (plugin->next_task_nc);
1706       plugin->next_task_nc = NULL;
1707     }
1708   fn = NULL;
1709   if (plugin->drop_on_shutdown)
1710     fn = GNUNET_strdup (plugin->fn);
1711 #if DEBUG_SQLITE
1712   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1713                    "sqlite",
1714                    "Shutting down database\n");
1715 #endif
1716   database_shutdown (plugin);
1717   plugin->env = NULL; 
1718   GNUNET_free (api);
1719   if (fn != NULL)
1720     {
1721       if (0 != UNLINK(fn))
1722         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1723                                   "unlink",
1724                                   fn);
1725       GNUNET_free (fn);
1726     }
1727 #if DEBUG_SQLITE
1728   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1729                    "sqlite",
1730                    "sqlite plugin is finished doneing\n");
1731 #endif
1732   return NULL;
1733 }
1734
1735 /* end of plugin_datastore_sqlite.c */