260bd54cc189313c9cd7f02031d64ada1dc85630
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_datastore_plugin.h"
29 #include <sqlite3.h>
30
31 #define DEBUG_SQLITE GNUNET_NO
32
33
34 /**
35  * Log an error message at log-level 'level' that indicates
36  * a failure of the command 'cmd' on file 'filename'
37  * with the message given by strerror(errno).
38  */
39 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
40
41 #define SELECT_IT_LOW_PRIORITY_1 \
42   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
43   "ORDER BY hash ASC LIMIT 1"
44
45 #define SELECT_IT_LOW_PRIORITY_2 \
46   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
47   "ORDER BY prio ASC, hash ASC LIMIT 1"
48
49 #define SELECT_IT_NON_ANONYMOUS_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
51   " ORDER BY hash DESC LIMIT 1"
52
53 #define SELECT_IT_NON_ANONYMOUS_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
55   " ORDER BY prio DESC, hash DESC LIMIT 1"
56
57 #define SELECT_IT_EXPIRATION_TIME_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
59   " ORDER BY hash ASC LIMIT 1"
60
61 #define SELECT_IT_EXPIRATION_TIME_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
63   " ORDER BY expire ASC, hash ASC LIMIT 1"
64
65 #define SELECT_IT_MIGRATION_ORDER_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
67   " ORDER BY hash DESC LIMIT 1"
68
69 #define SELECT_IT_MIGRATION_ORDER_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
71   " ORDER BY expire DESC, hash DESC LIMIT 1"
72
73 /**
74  * After how many ms "busy" should a DB operation fail for good?
75  * A low value makes sure that we are more responsive to requests
76  * (especially PUTs).  A high value guarantees a higher success
77  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
78  *
79  * The default value of 250ms should ensure that users do not experience
80  * huge latencies while at the same time allowing operations to succeed
81  * with reasonable probability.
82  */
83 #define BUSY_TIMEOUT_MS 250
84
85
86
87 /**
88  * Context for all functions in this plugin.
89  */
90 struct Plugin 
91 {
92   /**
93    * Our execution environment.
94    */
95   struct GNUNET_DATASTORE_PluginEnvironment *env;
96
97   /**
98    * Database filename.
99    */
100   char *fn;
101
102   /**
103    * Native SQLite database handle.
104    */
105   sqlite3 *dbh;
106
107   /**
108    * Precompiled SQL for deletion.
109    */
110   sqlite3_stmt *delRow;
111
112   /**
113    * Precompiled SQL for update.
114    */
115   sqlite3_stmt *updPrio;
116
117   /**
118    * Precompiled SQL for insertion.
119    */
120   sqlite3_stmt *insertContent;
121
122   /**
123    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
124    */
125   struct NextContext *next_task_nc;
126
127   /**
128    * Pending task with scheduler for running the next request.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier next_task;
131
132   /**
133    * Should the database be dropped on shutdown?
134    */
135   int drop_on_shutdown;
136
137 };
138
139
140 /**
141  * @brief Prepare a SQL statement
142  *
143  * @param dbh handle to the database
144  * @param zSql SQL statement, UTF-8 encoded
145  * @param ppStmt set to the prepared statement
146  * @return 0 on success
147  */
148 static int
149 sq_prepare (sqlite3 * dbh, const char *zSql,
150             sqlite3_stmt ** ppStmt)
151 {
152   char *dummy;
153   int result;
154   result = sqlite3_prepare_v2 (dbh,
155                                zSql,
156                                strlen (zSql), ppStmt, (const char **) &dummy);
157 #if DEBUG_SQLITE
158   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
159                    "sqlite",
160                    "Prepared %p: %d\n", *ppStmt, result);
161 #endif
162   return result;
163 }
164
165
166 /**
167  * Create our database indices.
168  * 
169  * @param dbh handle to the database
170  */
171 static void
172 create_indices (sqlite3 * dbh)
173 {
174   /* create indices */
175   sqlite3_exec (dbh,
176                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
179                 NULL, NULL);
180   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
181                 NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
183                 NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
185                 NULL, NULL);
186   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
187                 NULL, NULL, NULL);
188   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
189                 NULL, NULL);
190 }
191
192
193
194 #if 0
195 #define CHECK(a) GNUNET_break(a)
196 #define ENULL NULL
197 #else
198 #define ENULL &e
199 #define ENULL_DEFINED 1
200 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "%s\n", e); sqlite3_free(e); }
201 #endif
202
203
204
205
206 /**
207  * Initialize the database connections and associated
208  * data structures (create tables and indices
209  * as needed as well).
210  *
211  * @param cfg our configuration
212  * @param plugin the plugin context (state for this module)
213  * @return GNUNET_OK on success
214  */
215 static int
216 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
217                 struct Plugin *plugin)
218 {
219   sqlite3_stmt *stmt;
220   char *afsdir;
221 #if ENULL_DEFINED
222   char *e;
223 #endif
224   
225   if (GNUNET_OK != 
226       GNUNET_CONFIGURATION_get_value_filename (cfg,
227                                                "datastore-sqlite",
228                                                "FILENAME",
229                                                &afsdir))
230     {
231       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
232                        "sqlite",
233                        _("Option `%s' in section `%s' missing in configuration!\n"),
234                        "FILENAME",
235                        "datastore-sqlite");
236       return GNUNET_SYSERR;
237     }
238   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
239     {
240       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
241         {
242           GNUNET_break (0);
243           GNUNET_free (afsdir);
244           return GNUNET_SYSERR;
245         }
246       /* database is new or got deleted, reset payload to zero! */
247       plugin->env->duc (plugin->env->cls, 0);
248     }
249 #ifdef ENABLE_NLS
250   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
251                                        nl_langinfo (CODESET));
252 #else
253   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
254                                        "UTF-8");   /* good luck */
255 #endif
256   GNUNET_free (afsdir);
257   
258   /* Open database and precompile statements */
259   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
260     {
261       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
262                        "sqlite",
263                        _("Unable to initialize SQLite: %s.\n"),
264                        sqlite3_errmsg (plugin->dbh));
265       return GNUNET_SYSERR;
266     }
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh,
269                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
270   CHECK (SQLITE_OK ==
271          sqlite3_exec (plugin->dbh,
272                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
273   CHECK (SQLITE_OK ==
274          sqlite3_exec (plugin->dbh,
275                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
276   CHECK (SQLITE_OK ==
277          sqlite3_exec (plugin->dbh,
278                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
279   CHECK (SQLITE_OK ==
280          sqlite3_exec (plugin->dbh, 
281                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
282
283   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
284
285
286   /* We have to do it here, because otherwise precompiling SQL might fail */
287   CHECK (SQLITE_OK ==
288          sq_prepare (plugin->dbh,
289                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
290                      &stmt));
291   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
292        (sqlite3_exec (plugin->dbh,
293                       "CREATE TABLE gn080 ("
294                       "  size INT4 NOT NULL DEFAULT 0,"
295                       "  type INT4 NOT NULL DEFAULT 0,"
296                       "  prio INT4 NOT NULL DEFAULT 0,"
297                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
298                       "  expire INT8 NOT NULL DEFAULT 0,"
299                       "  hash TEXT NOT NULL DEFAULT '',"
300                       "  vhash TEXT NOT NULL DEFAULT '',"
301                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
302                       NULL) != SQLITE_OK) )
303     {
304       LOG_SQLITE (plugin, NULL,
305                   GNUNET_ERROR_TYPE_ERROR, 
306                   "sqlite3_exec");
307       sqlite3_finalize (stmt);
308       return GNUNET_SYSERR;
309     }
310   sqlite3_finalize (stmt);
311   create_indices (plugin->dbh);
312
313   CHECK (SQLITE_OK ==
314          sq_prepare (plugin->dbh,
315                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
316                      &stmt));
317   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
318        (sqlite3_exec (plugin->dbh,
319                       "CREATE TABLE gn071 ("
320                       "  key TEXT NOT NULL DEFAULT '',"
321                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
322                       NULL) != SQLITE_OK) )
323     {
324       LOG_SQLITE (plugin, NULL,
325                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
326       sqlite3_finalize (stmt);
327       return GNUNET_SYSERR;
328     }
329   sqlite3_finalize (stmt);
330
331   if ((sq_prepare (plugin->dbh,
332                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
333                    "_ROWID_ = ?",
334                    &plugin->updPrio) != SQLITE_OK) ||
335       (sq_prepare (plugin->dbh,
336                    "INSERT INTO gn080 (size, type, prio, "
337                    "anonLevel, expire, hash, vhash, value) VALUES "
338                    "(?, ?, ?, ?, ?, ?, ?, ?)",
339                    &plugin->insertContent) != SQLITE_OK) ||
340       (sq_prepare (plugin->dbh,
341                    "DELETE FROM gn080 WHERE _ROWID_ = ?",
342                    &plugin->delRow) != SQLITE_OK))
343     {
344       LOG_SQLITE (plugin, NULL,
345                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
346       return GNUNET_SYSERR;
347     }
348
349   return GNUNET_OK;
350 }
351
352
353 /**
354  * Shutdown database connection and associate data
355  * structures.
356  * @param plugin the plugin context (state for this module)
357  */
358 static void
359 database_shutdown (struct Plugin *plugin)
360 {
361   int result;
362 #if SQLITE_VERSION_NUMBER >= 3007000
363   sqlite3_stmt *stmt;
364 #endif
365
366   if (plugin->delRow != NULL)
367     sqlite3_finalize (plugin->delRow);
368   if (plugin->updPrio != NULL)
369     sqlite3_finalize (plugin->updPrio);
370   if (plugin->insertContent != NULL)
371     sqlite3_finalize (plugin->insertContent);
372   result = sqlite3_close(plugin->dbh);
373 #if SQLITE_VERSION_NUMBER >= 3007000
374   if (result == SQLITE_BUSY)
375     {
376       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
377                        "sqlite",
378                        _("Tried to close sqlite without finalizing all prepared statements.\n"));
379       stmt = sqlite3_next_stmt(plugin->dbh, NULL); 
380       while (stmt != NULL)
381         {
382 #if DEBUG_SQLITE
383           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
384                      "sqlite", "Closing statement %p\n", stmt);
385 #endif
386           result = sqlite3_finalize(stmt);
387 #if DEBUG_SQLITE
388           if (result != SQLITE_OK)
389               GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
390                                "sqlite",
391                                "Failed to close statement %p: %d\n", stmt, result);
392 #endif
393           stmt = sqlite3_next_stmt(plugin->dbh, NULL);
394         }
395       result = sqlite3_close(plugin->dbh);
396     }
397 #endif
398   if (SQLITE_OK != result)
399       LOG_SQLITE (plugin, NULL,
400                   GNUNET_ERROR_TYPE_ERROR, 
401                   "sqlite3_close");
402
403   GNUNET_free_non_null (plugin->fn);
404 }
405
406
407 /**
408  * Delete the database entry with the given
409  * row identifier.
410  *
411  * @param plugin the plugin context (state for this module)
412  * @param rid the ID of the row to delete
413  */
414 static int
415 delete_by_rowid (struct Plugin* plugin, 
416                  unsigned long long rid)
417 {
418
419   sqlite3_bind_int64 (plugin->delRow, 1, rid);
420   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
421     {
422       LOG_SQLITE (plugin, NULL,
423                   GNUNET_ERROR_TYPE_ERROR |
424                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
425       if (SQLITE_OK != sqlite3_reset (plugin->delRow))
426           LOG_SQLITE (plugin, NULL,
427                       GNUNET_ERROR_TYPE_ERROR |
428                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
429       return GNUNET_SYSERR;
430     }
431   if (SQLITE_OK != sqlite3_reset (plugin->delRow))
432       LOG_SQLITE (plugin, NULL,
433                   GNUNET_ERROR_TYPE_ERROR |
434                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
435   return GNUNET_OK;
436 }
437
438
439 /**
440  * Context for the universal iterator.
441  */
442 struct NextContext;
443
444 /**
445  * Type of a function that will prepare
446  * the next iteration.
447  *
448  * @param cls closure
449  * @param nc the next context; NULL for the last
450  *         call which gives the callback a chance to
451  *         clean up the closure
452  * @return GNUNET_OK on success, GNUNET_NO if there are
453  *         no more values, GNUNET_SYSERR on error
454  */
455 typedef int (*PrepareFunction)(void *cls,
456                                struct NextContext *nc);
457
458
459 /**
460  * Context we keep for the "next request" callback.
461  */
462 struct NextContext
463 {
464   /**
465    * Internal state.
466    */ 
467   struct Plugin *plugin;
468
469   /**
470    * Function to call on the next value.
471    */
472   PluginIterator iter;
473
474   /**
475    * Closure for iter.
476    */
477   void *iter_cls;
478
479   /**
480    * Function to call to prepare the next
481    * iteration.
482    */
483   PrepareFunction prep;
484
485   /**
486    * Closure for prep.
487    */
488   void *prep_cls;
489
490   /**
491    * Statement that the iterator will get the data
492    * from (updated or set by prep).
493    */ 
494   sqlite3_stmt *stmt;
495
496   /**
497    * Row ID of the last result.
498    */
499   unsigned long long last_rowid;
500
501   /**
502    * Key of the last result.
503    */
504   GNUNET_HashCode lastKey;  
505
506   /**
507    * Expiration time of the last value visited.
508    */
509   struct GNUNET_TIME_Absolute lastExpiration;
510
511   /**
512    * Priority of the last value visited.
513    */ 
514   unsigned int lastPriority; 
515
516   /**
517    * Number of results processed so far.
518    */
519   unsigned int count;
520
521   /**
522    * Set to GNUNET_YES if we must stop now.
523    */
524   int end_it;
525 };
526
527
528 /**
529  * Continuation of "sqlite_next_request".
530  *
531  * @param cls the next context
532  * @param tc the task context (unused)
533  */
534 static void 
535 sqlite_next_request_cont (void *cls,
536                           const struct GNUNET_SCHEDULER_TaskContext *tc)
537 {
538   struct NextContext * nc = cls;
539   struct Plugin *plugin;
540   unsigned long long rowid;
541   sqlite3_stmt *stmtd;
542   int ret;
543   unsigned int type;
544   unsigned int size;
545   unsigned int priority;
546   unsigned int anonymity;
547   struct GNUNET_TIME_Absolute expiration;
548   const GNUNET_HashCode *key;
549   const void *data;
550   
551   plugin = nc->plugin;
552   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
553   plugin->next_task_nc = NULL;
554   if ( (GNUNET_YES == nc->end_it) ||
555        (GNUNET_OK != (nc->prep(nc->prep_cls,
556                                nc))) )
557     {
558     END:
559       nc->iter (nc->iter_cls, 
560                 NULL, NULL, 0, NULL, 0, 0, 0, 
561                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
562       nc->prep (nc->prep_cls, NULL);
563       GNUNET_free (nc);
564       return;
565     }
566
567   rowid = sqlite3_column_int64 (nc->stmt, 7);
568   nc->last_rowid = rowid;
569   type = sqlite3_column_int (nc->stmt, 1);
570   size = sqlite3_column_bytes (nc->stmt, 6);
571   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
572     {
573       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
574                        "sqlite",
575                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
576       if (SQLITE_OK != sqlite3_reset (nc->stmt))
577         LOG_SQLITE (nc->plugin, NULL,
578                     GNUNET_ERROR_TYPE_ERROR |
579                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
580       if (sq_prepare
581           (nc->plugin->dbh,
582            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
583            &stmtd) != SQLITE_OK)
584         {
585           LOG_SQLITE (nc->plugin, NULL,
586                       GNUNET_ERROR_TYPE_ERROR |
587                       GNUNET_ERROR_TYPE_BULK, 
588                       "sq_prepare");
589           goto END;
590         }
591
592       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
593         LOG_SQLITE (nc->plugin, NULL,
594                     GNUNET_ERROR_TYPE_ERROR |
595                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
596       if (SQLITE_DONE != sqlite3_step (stmtd))
597         LOG_SQLITE (nc->plugin, NULL,
598                     GNUNET_ERROR_TYPE_ERROR |
599                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
600       if (SQLITE_OK != sqlite3_finalize (stmtd))
601         LOG_SQLITE (nc->plugin, NULL,
602                     GNUNET_ERROR_TYPE_ERROR |
603                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
604       goto END;
605     }
606
607   priority = sqlite3_column_int (nc->stmt, 2);
608   anonymity = sqlite3_column_int (nc->stmt, 3);
609   expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4);
610   key = sqlite3_column_blob (nc->stmt, 5);
611   nc->lastPriority = priority;
612   nc->lastExpiration = expiration;
613   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
614   data = sqlite3_column_blob (nc->stmt, 6);
615   nc->count++;
616   ret = nc->iter (nc->iter_cls,
617                   nc,
618                   key,
619                   size,
620                   data, 
621                   type,
622                   priority,
623                   anonymity,
624                   expiration,
625                   rowid);
626   if (ret == GNUNET_SYSERR)
627     {
628       nc->end_it = GNUNET_YES;
629       return;
630     }
631 #if DEBUG_SQLITE
632   if (ret == GNUNET_NO)
633     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
634                      "sqlite",
635                      "Asked to remove entry %llu (%u bytes)\n",
636                      (unsigned long long) rowid,
637                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
638 #endif
639   if ( (ret == GNUNET_NO) &&
640        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
641     {
642       plugin->env->duc (plugin->env->cls,
643                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
644 #if DEBUG_SQLITE
645       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
646                        "sqlite",
647                        "Removed entry %llu (%u bytes)\n",
648                        (unsigned long long) rowid,
649                        size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
650 #endif
651     }
652 }
653
654
655 /**
656  * Function invoked on behalf of a "PluginIterator"
657  * asking the database plugin to call the iterator
658  * with the next item.
659  *
660  * @param next_cls whatever argument was given
661  *        to the PluginIterator as "next_cls".
662  * @param end_it set to GNUNET_YES if we
663  *        should terminate the iteration early
664  *        (iterator should be still called once more
665  *         to signal the end of the iteration).
666  */
667 static void 
668 sqlite_next_request (void *next_cls,
669                      int end_it)
670 {
671   struct NextContext * nc= next_cls;
672
673   if (GNUNET_YES == end_it)
674     nc->end_it = GNUNET_YES;
675   nc->plugin->next_task_nc = nc;
676   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
677                                                     nc);
678 }
679
680
681
682 /**
683  * Store an item in the datastore.
684  *
685  * @param cls closure
686  * @param key key for the item
687  * @param size number of bytes in data
688  * @param data content stored
689  * @param type type of the content
690  * @param priority priority of the content
691  * @param anonymity anonymity-level for the content
692  * @param replication replication-level for the content
693  * @param expiration expiration time for the content
694  * @param msg set to an error message
695  * @return GNUNET_OK on success
696  */
697 static int
698 sqlite_plugin_put (void *cls,
699                    const GNUNET_HashCode * key,
700                    uint32_t size,
701                    const void *data,
702                    enum GNUNET_BLOCK_Type type,
703                    uint32_t priority,
704                    uint32_t anonymity,
705                    uint32_t replication,
706                    struct GNUNET_TIME_Absolute expiration,
707                    char ** msg)
708 {
709   struct Plugin *plugin = cls;
710   int n;
711   sqlite3_stmt *stmt;
712   GNUNET_HashCode vhash;
713
714 #if DEBUG_SQLITE
715   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
716                    "sqlite",
717                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
718                    type, 
719                    GNUNET_h2s(key),
720                    priority,
721                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).rel_value,
722                    (long long) expiration.abs_value);
723 #endif
724   GNUNET_CRYPTO_hash (data, size, &vhash);
725   stmt = plugin->insertContent;
726   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
727       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
728       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
729       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
730       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, expiration.abs_value)) ||
731       (SQLITE_OK !=
732        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
733                           SQLITE_TRANSIENT)) ||
734       (SQLITE_OK !=
735        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
736                           SQLITE_TRANSIENT))
737       || (SQLITE_OK !=
738           sqlite3_bind_blob (stmt, 8, data, size,
739                              SQLITE_TRANSIENT)))
740     {
741       LOG_SQLITE (plugin,
742                   msg,
743                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
744       if (SQLITE_OK != sqlite3_reset (stmt))
745         LOG_SQLITE (plugin, NULL,
746                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
747       return GNUNET_SYSERR;
748     }
749   n = sqlite3_step (stmt);
750   if (n != SQLITE_DONE) 
751     {
752       if (n == SQLITE_BUSY)
753         {
754           LOG_SQLITE (plugin, msg,
755                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
756           sqlite3_reset (stmt);
757           GNUNET_break (0);
758           return GNUNET_NO;
759         }
760       LOG_SQLITE (plugin, msg,
761                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
762       sqlite3_reset (stmt);
763       database_shutdown (plugin);
764       database_setup (plugin->env->cfg,
765                       plugin);
766       return GNUNET_SYSERR;
767     }
768   if (SQLITE_OK != sqlite3_reset (stmt))
769     LOG_SQLITE (plugin, NULL,
770                 GNUNET_ERROR_TYPE_ERROR |
771                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
772   plugin->env->duc (plugin->env->cls,
773                     size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
774 #if DEBUG_SQLITE
775   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
776                    "sqlite",
777                    "Stored new entry (%u bytes)\n",
778            size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
779 #endif
780   return GNUNET_OK;
781 }
782
783
784 /**
785  * Update the priority for a particular key in the datastore.  If
786  * the expiration time in value is different than the time found in
787  * the datastore, the higher value should be kept.  For the
788  * anonymity level, the lower value is to be used.  The specified
789  * priority should be added to the existing priority, ignoring the
790  * priority in value.
791  *
792  * Note that it is possible for multiple values to match this put.
793  * In that case, all of the respective values are updated.
794  *
795  * @param cls the plugin context (state for this module)
796  * @param uid unique identifier of the datum
797  * @param delta by how much should the priority
798  *     change?  If priority + delta < 0 the
799  *     priority should be set to 0 (never go
800  *     negative).
801  * @param expire new expiration time should be the
802  *     MAX of any existing expiration time and
803  *     this value
804  * @param msg set to an error message
805  * @return GNUNET_OK on success
806  */
807 static int
808 sqlite_plugin_update (void *cls,
809                       uint64_t uid,
810                       int delta, struct GNUNET_TIME_Absolute expire,
811                       char **msg)
812 {
813   struct Plugin *plugin = cls;
814   int n;
815
816   sqlite3_bind_int (plugin->updPrio, 1, delta);
817   sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value);
818   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
819   n = sqlite3_step (plugin->updPrio);
820   if (n != SQLITE_DONE)
821     LOG_SQLITE (plugin, msg,
822                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
823                 "sqlite3_step");
824 #if DEBUG_SQLITE
825   else
826     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
827                      "sqlite",
828                      "Block updated\n");
829 #endif
830   sqlite3_reset (plugin->updPrio);
831
832   if (n == SQLITE_BUSY)
833     return GNUNET_NO;
834   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
835 }
836
837
838 /**
839  * Internal context for an iteration.
840  */
841 struct IterContext
842 {
843   /**
844    * FIXME.
845    */
846   sqlite3_stmt *stmt_1;
847
848   /**
849    * FIXME.
850    */
851   sqlite3_stmt *stmt_2;
852
853   /**
854    * FIXME.
855    */
856   int is_asc;
857
858   /**
859    * FIXME.
860    */
861   int is_prio;
862
863   /**
864    * FIXME.
865    */
866   int is_migr;
867
868   /**
869    * FIXME.
870    */
871   int limit_nonanonymous;
872
873   /**
874    * Desired type for blocks returned by this iterator.
875    */
876   enum GNUNET_BLOCK_Type type;
877 };
878
879
880 /**
881  * Prepare our SQL query to obtain the next record from the database.
882  *
883  * @param cls our "struct IterContext"
884  * @param nc NULL to terminate the iteration, otherwise our context for
885  *           getting the next result.
886  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
887  *         GNUNET_SYSERR on error (or end of iteration)
888  */
889 static int
890 iter_next_prepare (void *cls,
891                    struct NextContext *nc)
892 {
893   struct IterContext *ic = cls;
894   struct Plugin *plugin;
895   int ret;
896
897   if (nc == NULL)
898     {
899 #if DEBUG_SQLITE
900       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901                   "Asked to clean up iterator state.\n");
902 #endif
903       sqlite3_finalize (ic->stmt_1);
904       sqlite3_finalize (ic->stmt_2);
905       return GNUNET_SYSERR;
906     }
907   sqlite3_reset (ic->stmt_1);
908   sqlite3_reset (ic->stmt_2);
909   plugin = nc->plugin;
910   if (ic->is_prio)
911     {
912 #if DEBUG_SQLITE
913       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                   "Restricting to results larger than the last priority %u\n",
915                   nc->lastPriority);
916 #endif
917       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
918       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
919     }
920   else
921     {
922 #if DEBUG_SQLITE
923       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
924                   "Restricting to results larger than the last expiration %llu\n",
925                   (unsigned long long) nc->lastExpiration.abs_value);
926 #endif
927       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value);
928       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value);
929     }
930 #if DEBUG_SQLITE
931   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932               "Restricting to results larger than the last key `%s'\n",
933               GNUNET_h2s(&nc->lastKey));
934 #endif
935   sqlite3_bind_blob (ic->stmt_1, 2, 
936                      &nc->lastKey, 
937                      sizeof (GNUNET_HashCode),
938                      SQLITE_TRANSIENT);
939   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
940     {      
941 #if DEBUG_SQLITE
942       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943                   "Result found using iterator 1\n");
944 #endif
945       nc->stmt = ic->stmt_1;
946       return GNUNET_OK;
947     }
948   if (ret != SQLITE_DONE)
949     {
950       LOG_SQLITE (plugin, NULL,
951                   GNUNET_ERROR_TYPE_ERROR |
952                   GNUNET_ERROR_TYPE_BULK,
953                   "sqlite3_step");
954       return GNUNET_SYSERR;
955     }
956   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
957     LOG_SQLITE (plugin, NULL,
958                 GNUNET_ERROR_TYPE_ERROR | 
959                 GNUNET_ERROR_TYPE_BULK, 
960                 "sqlite3_reset");
961   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
962     {
963 #if DEBUG_SQLITE
964       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965                   "Result found using iterator 2\n");
966 #endif
967       nc->stmt = ic->stmt_2;
968       return GNUNET_OK;
969     }
970   if (ret != SQLITE_DONE)
971     {
972       LOG_SQLITE (plugin, NULL,
973                   GNUNET_ERROR_TYPE_ERROR |
974                   GNUNET_ERROR_TYPE_BULK,
975                   "sqlite3_step");
976       return GNUNET_SYSERR;
977     }
978   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
979     LOG_SQLITE (plugin, NULL,
980                 GNUNET_ERROR_TYPE_ERROR |
981                 GNUNET_ERROR_TYPE_BULK,
982                 "sqlite3_reset");
983 #if DEBUG_SQLITE
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               "No result found using either iterator\n");
986 #endif
987   return GNUNET_NO;
988 }
989
990
991 /**
992  * Call a method for each key in the database and
993  * call the callback method on it.
994  *
995  * @param plugin our plugin context
996  * @param type entries of which type should be considered?
997  * @param is_asc are we iterating in ascending order?
998  * @param is_prio are we iterating by priority (otherwise by expiration)
999  * @param is_migr are we iterating in migration order?
1000  * @param limit_nonanonymous are we restricting results to those with anonymity
1001  *              level zero?
1002  * @param stmt_str_1 first SQL statement to execute
1003  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1004  * @param iter function to call on each matching value;
1005  *        will be called once with a NULL value at the end
1006  * @param iter_cls closure for iter
1007  */
1008 static void
1009 basic_iter (struct Plugin *plugin,
1010             enum GNUNET_BLOCK_Type type,
1011             int is_asc,
1012             int is_prio,
1013             int is_migr,
1014             int limit_nonanonymous,
1015             const char *stmt_str_1,
1016             const char *stmt_str_2,
1017             PluginIterator iter,
1018             void *iter_cls)
1019 {
1020   struct NextContext *nc;
1021   struct IterContext *ic;
1022   sqlite3_stmt *stmt_1;
1023   sqlite3_stmt *stmt_2;
1024
1025 #if DEBUG_SQLITE
1026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027               "At %llu, using queries `%s' and `%s'\n",
1028               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value,
1029               stmt_str_1,
1030               stmt_str_2);
1031 #endif
1032   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1033     {
1034       LOG_SQLITE (plugin, NULL,
1035                   GNUNET_ERROR_TYPE_ERROR |
1036                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1037       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1038       return;
1039     }
1040   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1041     {
1042       LOG_SQLITE (plugin, NULL,
1043                   GNUNET_ERROR_TYPE_ERROR |
1044                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1045       sqlite3_finalize (stmt_1);
1046       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1047       return;
1048     }
1049   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1050                       sizeof(struct IterContext));
1051   nc->plugin = plugin;
1052   nc->iter = iter;
1053   nc->iter_cls = iter_cls;
1054   nc->stmt = NULL;
1055   ic = (struct IterContext*) &nc[1];
1056   ic->stmt_1 = stmt_1;
1057   ic->stmt_2 = stmt_2;
1058   ic->type = type;
1059   ic->is_asc = is_asc;
1060   ic->is_prio = is_prio;
1061   ic->is_migr = is_migr;
1062   ic->limit_nonanonymous = limit_nonanonymous;
1063   nc->prep = &iter_next_prepare;
1064   nc->prep_cls = ic;
1065   if (is_asc)
1066     {
1067       nc->lastPriority = 0;
1068       nc->lastExpiration.abs_value = 0;
1069       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1070     }
1071   else
1072     {
1073       nc->lastPriority = 0x7FFFFFFF;
1074       nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL;
1075       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1076     }
1077   sqlite_next_request (nc, GNUNET_NO);
1078 }
1079
1080
1081 /**
1082  * Select a subset of the items in the datastore and call
1083  * the given iterator for each of them.
1084  *
1085  * @param cls our plugin context
1086  * @param type entries of which type should be considered?
1087  *        Use 0 for any type.
1088  * @param iter function to call on each matching value;
1089  *        will be called once with a NULL value at the end
1090  * @param iter_cls closure for iter
1091  */
1092 static void
1093 sqlite_plugin_iter_low_priority (void *cls,
1094                                  enum GNUNET_BLOCK_Type type,
1095                                  PluginIterator iter,
1096                                  void *iter_cls)
1097 {
1098   basic_iter (cls,
1099               type, 
1100               GNUNET_YES, GNUNET_YES, 
1101               GNUNET_NO, GNUNET_NO,
1102               SELECT_IT_LOW_PRIORITY_1,
1103               SELECT_IT_LOW_PRIORITY_2, 
1104               iter, iter_cls);
1105 }
1106
1107
1108 /**
1109  * Select a subset of the items in the datastore and call
1110  * the given iterator for each of them.
1111  *
1112  * @param cls our plugin context
1113  * @param type entries of which type should be considered?
1114  *        Use 0 for any type.
1115  * @param iter function to call on each matching value;
1116  *        will be called once with a NULL value at the end
1117  * @param iter_cls closure for iter
1118  */
1119 static void
1120 sqlite_plugin_iter_zero_anonymity (void *cls,
1121                                    enum GNUNET_BLOCK_Type type,
1122                                    PluginIterator iter,
1123                                    void *iter_cls)
1124 {
1125   struct GNUNET_TIME_Absolute now;
1126   char *q1;
1127   char *q2;
1128
1129   now = GNUNET_TIME_absolute_get ();
1130   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1131                    (unsigned long long) now.abs_value);
1132   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1133                    (unsigned long long) now.abs_value);
1134   basic_iter (cls,
1135               type, 
1136               GNUNET_NO, GNUNET_YES, 
1137               GNUNET_NO, GNUNET_YES,
1138               q1,
1139               q2,
1140               iter, iter_cls);
1141   GNUNET_free (q1);
1142   GNUNET_free (q2);
1143 }
1144
1145
1146
1147 /**
1148  * Select a subset of the items in the datastore and call
1149  * the given iterator for each of them.
1150  *
1151  * @param cls our plugin context
1152  * @param type entries of which type should be considered?
1153  *        Use 0 for any type.
1154  * @param iter function to call on each matching value;
1155  *        will be called once with a NULL value at the end
1156  * @param iter_cls closure for iter
1157  */
1158 static void
1159 sqlite_plugin_iter_ascending_expiration (void *cls,
1160                                          enum GNUNET_BLOCK_Type type,
1161                                          PluginIterator iter,
1162                                          void *iter_cls)
1163 {
1164   struct GNUNET_TIME_Absolute now;
1165   char *q1;
1166   char *q2;
1167
1168   now = GNUNET_TIME_absolute_get ();
1169   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1170                    (unsigned long long) 0*now.abs_value);
1171   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1172                    (unsigned long long) 0*now.abs_value);
1173   basic_iter (cls,
1174               type, 
1175               GNUNET_YES, GNUNET_NO, 
1176               GNUNET_NO, GNUNET_NO,
1177               q1, q2,
1178               iter, iter_cls);
1179   GNUNET_free (q1);
1180   GNUNET_free (q2);
1181 }
1182
1183
1184 /**
1185  * Select a subset of the items in the datastore and call
1186  * the given iterator for each of them.
1187  *
1188  * @param cls our plugin context
1189  * @param type entries of which type should be considered?
1190  *        Use 0 for any type.
1191  * @param iter function to call on each matching value;
1192  *        will be called once with a NULL value at the end
1193  * @param iter_cls closure for iter
1194  */
1195 static void
1196 sqlite_plugin_iter_migration_order (void *cls,
1197                                     enum GNUNET_BLOCK_Type type,
1198                                     PluginIterator iter,
1199                                     void *iter_cls)
1200 {
1201   struct GNUNET_TIME_Absolute now;
1202   char *q;
1203
1204   now = GNUNET_TIME_absolute_get ();
1205   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1206                    (unsigned long long) now.abs_value);
1207   basic_iter (cls,
1208               type, 
1209               GNUNET_NO, GNUNET_NO, 
1210               GNUNET_YES, GNUNET_NO,
1211               SELECT_IT_MIGRATION_ORDER_1,
1212               q,
1213               iter, iter_cls);
1214   GNUNET_free (q);
1215 }
1216
1217
1218 /**
1219  * Call sqlite using the already prepared query to get
1220  * the next result.
1221  *
1222  * @param cls context with the prepared query
1223  * @param nc context with the prepared query
1224  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1225  *        there are no more results 
1226  */
1227 static int
1228 all_next_prepare (void *cls,
1229                   struct NextContext *nc)
1230 {
1231   struct Plugin *plugin;
1232   int ret;
1233
1234   if (nc == NULL)
1235     {
1236 #if DEBUG_SQLITE
1237       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238                   "Asked to clean up iterator state.\n");
1239 #endif
1240       nc = (struct NextContext *)cls;
1241       if (nc->stmt)
1242           sqlite3_finalize (nc->stmt);
1243       nc->stmt = NULL;
1244       return GNUNET_SYSERR;
1245     }
1246   plugin = nc->plugin;
1247   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1248     {      
1249       return GNUNET_OK;
1250     }
1251   if (ret != SQLITE_DONE)
1252     {
1253       LOG_SQLITE (plugin, NULL,
1254                   GNUNET_ERROR_TYPE_ERROR |
1255                   GNUNET_ERROR_TYPE_BULK,
1256                   "sqlite3_step");
1257       return GNUNET_SYSERR;
1258     }
1259   return GNUNET_NO;
1260 }
1261
1262
1263 /**
1264  * Select a subset of the items in the datastore and call
1265  * the given iterator for each of them.
1266  *
1267  * @param cls our plugin context
1268  * @param type entries of which type should be considered?
1269  *        Use 0 for any type.
1270  * @param iter function to call on each matching value;
1271  *        will be called once with a NULL value at the end
1272  * @param iter_cls closure for iter
1273  */
1274 static void
1275 sqlite_plugin_iter_all_now (void *cls,
1276                             enum GNUNET_BLOCK_Type type,
1277                             PluginIterator iter,
1278                             void *iter_cls)
1279 {
1280   struct Plugin *plugin = cls;
1281   struct NextContext *nc;
1282   sqlite3_stmt *stmt;
1283
1284   if (sq_prepare (plugin->dbh, 
1285                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1286                   &stmt) != SQLITE_OK)
1287     {
1288       LOG_SQLITE (plugin, NULL,
1289                   GNUNET_ERROR_TYPE_ERROR |
1290                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1291       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1292       return;
1293     }
1294   nc = GNUNET_malloc (sizeof(struct NextContext));
1295   nc->plugin = plugin;
1296   nc->iter = iter;
1297   nc->iter_cls = iter_cls;
1298   nc->stmt = stmt;
1299   nc->prep = &all_next_prepare;
1300   nc->prep_cls = nc;
1301   sqlite_next_request (nc, GNUNET_NO);
1302 }
1303
1304
1305 /**
1306  * FIXME.
1307  */
1308 struct GetNextContext
1309 {
1310
1311   /**
1312    * FIXME.
1313    */
1314   int total;
1315
1316   /**
1317    * FIXME.
1318    */
1319   int off;
1320
1321   /**
1322    * FIXME.
1323    */
1324   int have_vhash;
1325
1326   /**
1327    * FIXME.
1328    */
1329   unsigned int type;
1330
1331   /**
1332    * FIXME.
1333    */
1334   sqlite3_stmt *stmt;
1335
1336   /**
1337    * FIXME.
1338    */
1339   GNUNET_HashCode key;
1340
1341   /**
1342    * FIXME.
1343    */
1344   GNUNET_HashCode vhash;
1345 };
1346
1347
1348
1349 /**
1350  * FIXME.
1351  *
1352  * @param cls our "struct GetNextContext*"
1353  * @param nc FIXME
1354  * @return GNUNET_YES if there are more results, 
1355  *         GNUNET_NO if there are no more results,
1356  *         GNUNET_SYSERR on internal error
1357  */
1358 static int
1359 get_next_prepare (void *cls,
1360                   struct NextContext *nc)
1361 {
1362   struct GetNextContext *gnc = cls;
1363   int sqoff;
1364   int ret;
1365   int limit_off;
1366
1367   if (nc == NULL)
1368     {
1369       sqlite3_finalize (gnc->stmt);
1370       return GNUNET_SYSERR;
1371     }
1372   if (nc->count == gnc->total)
1373     return GNUNET_NO;
1374   if (nc->count + gnc->off == gnc->total)
1375     nc->last_rowid = 0;
1376   if (nc->count == 0)
1377     limit_off = gnc->off;
1378   else
1379     limit_off = 0;
1380   sqoff = 1;
1381   sqlite3_reset (nc->stmt);
1382   ret = sqlite3_bind_blob (nc->stmt,
1383                            sqoff++,
1384                            &gnc->key, 
1385                            sizeof (GNUNET_HashCode),
1386                            SQLITE_TRANSIENT);
1387   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1388     ret = sqlite3_bind_blob (nc->stmt,
1389                              sqoff++,
1390                              &gnc->vhash,
1391                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1392   if ((gnc->type != 0) && (ret == SQLITE_OK))
1393     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1394   if (ret == SQLITE_OK)
1395     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1396   if (ret == SQLITE_OK)
1397     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1398   if (ret != SQLITE_OK)
1399     return GNUNET_SYSERR;
1400   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1401     return GNUNET_NO;
1402   return GNUNET_OK;
1403 }
1404
1405
1406 /**
1407  * Iterate over the results for a particular key
1408  * in the datastore.
1409  *
1410  * @param cls closure
1411  * @param key maybe NULL (to match all entries)
1412  * @param vhash hash of the value, maybe NULL (to
1413  *        match all values that have the right key).
1414  *        Note that for DBlocks there is no difference
1415  *        betwen key and vhash, but for other blocks
1416  *        there may be!
1417  * @param type entries of which type are relevant?
1418  *     Use 0 for any type.
1419  * @param iter function to call on each matching value;
1420  *        will be called once with a NULL value at the end
1421  * @param iter_cls closure for iter
1422  */
1423 static void
1424 sqlite_plugin_get (void *cls,
1425                    const GNUNET_HashCode * key,
1426                    const GNUNET_HashCode * vhash,
1427                    enum GNUNET_BLOCK_Type type,
1428                    PluginIterator iter, void *iter_cls)
1429 {
1430   struct Plugin *plugin = cls;
1431   struct GetNextContext *gpc;
1432   struct NextContext *nc;
1433   int ret;
1434   int total;
1435   sqlite3_stmt *stmt;
1436   char scratch[256];
1437   int sqoff;
1438
1439   GNUNET_assert (iter != NULL);
1440   if (key == NULL)
1441     {
1442       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1443       return;
1444     }
1445   GNUNET_snprintf (scratch, sizeof (scratch),
1446                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1447                    vhash == NULL ? "" : " AND vhash=:2",
1448                    type == 0 ? "" : (vhash ==
1449                                      NULL) ? " AND type=:2" : " AND type=:3");
1450   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1451     {
1452       LOG_SQLITE (plugin, NULL,
1453                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1454       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1455       return;
1456     }
1457   sqoff = 1;
1458   ret = sqlite3_bind_blob (stmt,
1459                            sqoff++,
1460                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1461   if ((vhash != NULL) && (ret == SQLITE_OK))
1462     ret = sqlite3_bind_blob (stmt,
1463                              sqoff++,
1464                              vhash,
1465                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1466   if ((type != 0) && (ret == SQLITE_OK))
1467     ret = sqlite3_bind_int (stmt, sqoff++, type);
1468   if (SQLITE_OK != ret)
1469     {
1470       LOG_SQLITE (plugin, NULL,
1471                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1472       sqlite3_reset (stmt);
1473       sqlite3_finalize (stmt);
1474       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1475       return;
1476     }
1477   ret = sqlite3_step (stmt);
1478   if (ret != SQLITE_ROW)
1479     {
1480       LOG_SQLITE (plugin, NULL,
1481                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1482                   "sqlite_step");
1483       sqlite3_reset (stmt);
1484       sqlite3_finalize (stmt);
1485       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1486       return;
1487     }
1488   total = sqlite3_column_int (stmt, 0);
1489   sqlite3_reset (stmt);
1490   sqlite3_finalize (stmt);
1491   if (0 == total)
1492     {
1493       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1494       return;
1495     }
1496
1497   GNUNET_snprintf (scratch, sizeof (scratch),
1498                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1499                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1500                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1501                    vhash == NULL ? "" : " AND vhash=:2",
1502                    type == 0 ? "" : (vhash ==
1503                                      NULL) ? " AND type=:2" : " AND type=:3",
1504                    sqoff, sqoff + 1);
1505   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1506     {
1507       LOG_SQLITE (plugin, NULL,
1508                   GNUNET_ERROR_TYPE_ERROR |
1509                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1510       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1511       return;
1512     }
1513   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1514                       sizeof(struct GetNextContext));
1515   nc->plugin = plugin;
1516   nc->iter = iter;
1517   nc->iter_cls = iter_cls;
1518   nc->stmt = stmt;
1519   gpc = (struct GetNextContext*) &nc[1];
1520   gpc->total = total;
1521   gpc->type = type;
1522   gpc->key = *key;
1523   gpc->stmt = stmt; /* alias used for freeing at the end! */
1524   if (NULL != vhash)
1525     {
1526       gpc->have_vhash = GNUNET_YES;
1527       gpc->vhash = *vhash;
1528     }
1529   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1530   nc->prep = &get_next_prepare;
1531   nc->prep_cls = gpc;
1532   sqlite_next_request (nc, GNUNET_NO);
1533 }
1534
1535
1536 /**
1537  * Get a random item for replication.  Returns a single, not expired, random item
1538  * from those with the highest replication counters.  The item's 
1539  * replication counter is decremented by one IF it was positive before.
1540  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
1541  *
1542  * @param cls closure
1543  * @param iter function to call the value (once only).
1544  * @param iter_cls closure for iter
1545  */
1546 static void
1547 sqlite_plugin_replication_get (void *cls,
1548                                PluginIterator iter, void *iter_cls)
1549 {
1550   /* FIXME: not implemented! */
1551   iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
1552         GNUNET_TIME_UNIT_ZERO_ABS, 0);
1553 }
1554
1555
1556 /**
1557  * Drop database.
1558  *
1559  * @param cls our plugin context
1560  */
1561 static void 
1562 sqlite_plugin_drop (void *cls)
1563 {
1564   struct Plugin *plugin = cls;
1565   plugin->drop_on_shutdown = GNUNET_YES;
1566 }
1567
1568
1569 static unsigned long long
1570 sqlite_plugin_get_size (void *cls)
1571 {
1572   struct Plugin *plugin = cls;
1573   sqlite3_stmt *stmt;
1574   uint64_t pages;
1575   uint64_t page_size;
1576 #if ENULL_DEFINED
1577   char *e;
1578 #endif
1579
1580   if (SQLITE_VERSION_NUMBER < 3006000)
1581     {
1582       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1583                        "datastore-sqlite",
1584                        _("sqlite version to old to determine size, assuming zero\n"));
1585       return 0;
1586     }
1587   CHECK (SQLITE_OK ==
1588          sqlite3_exec (plugin->dbh,
1589                        "VACUUM", NULL, NULL, ENULL));
1590   CHECK (SQLITE_OK ==
1591          sqlite3_exec (plugin->dbh,
1592                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1593   CHECK (SQLITE_OK ==
1594          sq_prepare (plugin->dbh,
1595                      "PRAGMA page_count",
1596                      &stmt));
1597   if (SQLITE_ROW ==
1598       sqlite3_step (stmt))
1599     pages = sqlite3_column_int64 (stmt, 0);
1600   else
1601     pages = 0;
1602   sqlite3_finalize (stmt);
1603   CHECK (SQLITE_OK ==
1604          sq_prepare (plugin->dbh,
1605                      "PRAGMA page_size",
1606                      &stmt));
1607   CHECK (SQLITE_ROW ==
1608          sqlite3_step (stmt));
1609   page_size = sqlite3_column_int64 (stmt, 0);
1610   sqlite3_finalize (stmt);
1611   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1612               _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1613               (unsigned long long) pages,
1614               (unsigned long long) page_size);
1615   return  pages * page_size;
1616 }
1617                                          
1618
1619 /**
1620  * Entry point for the plugin.
1621  *
1622  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1623  * @return NULL on error, othrewise the plugin context
1624  */
1625 void *
1626 libgnunet_plugin_datastore_sqlite_init (void *cls)
1627 {
1628   static struct Plugin plugin;
1629   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1630   struct GNUNET_DATASTORE_PluginFunctions *api;
1631
1632   if (plugin.env != NULL)
1633     return NULL; /* can only initialize once! */
1634   memset (&plugin, 0, sizeof(struct Plugin));
1635   plugin.env = env;
1636   if (GNUNET_OK !=
1637       database_setup (env->cfg, &plugin))
1638     {
1639       database_shutdown (&plugin);
1640       return NULL;
1641     }
1642   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1643   api->cls = &plugin;
1644   api->get_size = &sqlite_plugin_get_size;
1645   api->put = &sqlite_plugin_put;
1646   api->next_request = &sqlite_next_request;
1647   api->get = &sqlite_plugin_get;
1648   api->replication_get = &sqlite_plugin_replication_get;
1649   api->update = &sqlite_plugin_update;
1650   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1651   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1652   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1653   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1654   api->iter_all_now = &sqlite_plugin_iter_all_now;
1655   api->drop = &sqlite_plugin_drop;
1656   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1657                    "sqlite", _("Sqlite database running\n"));
1658   return api;
1659 }
1660
1661
1662 /**
1663  * Exit point from the plugin.
1664  *
1665  * @param cls the plugin context (as returned by "init")
1666  * @return always NULL
1667  */
1668 void *
1669 libgnunet_plugin_datastore_sqlite_done (void *cls)
1670 {
1671   char *fn;
1672   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1673   struct Plugin *plugin = api->cls;
1674
1675 #if DEBUG_SQLITE
1676   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1677                    "sqlite",
1678                    "sqlite plugin is doneing\n");
1679 #endif
1680
1681   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1682     {
1683 #if DEBUG_SQLITE
1684       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1685                        "sqlite",
1686                        "Canceling next task\n");
1687 #endif
1688       GNUNET_SCHEDULER_cancel (plugin->next_task);
1689       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1690 #if DEBUG_SQLITE
1691       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1692                        "sqlite",
1693                        "Prep'ing next task\n");
1694 #endif
1695       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1696       GNUNET_free (plugin->next_task_nc);
1697       plugin->next_task_nc = NULL;
1698     }
1699   fn = NULL;
1700   if (plugin->drop_on_shutdown)
1701     fn = GNUNET_strdup (plugin->fn);
1702 #if DEBUG_SQLITE
1703   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1704                    "sqlite",
1705                    "Shutting down database\n");
1706 #endif
1707   database_shutdown (plugin);
1708   plugin->env = NULL; 
1709   GNUNET_free (api);
1710   if (fn != NULL)
1711     {
1712       if (0 != UNLINK(fn))
1713         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1714                                   "unlink",
1715                                   fn);
1716       GNUNET_free (fn);
1717     }
1718 #if DEBUG_SQLITE
1719   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1720                    "sqlite",
1721                    "sqlite plugin is finished doneing\n");
1722 #endif
1723   return NULL;
1724 }
1725
1726 /* end of plugin_datastore_sqlite.c */