gpl3
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_NO
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94
95 /**
96  * Context for all functions in this plugin.
97  */
98 struct Plugin 
99 {
100   /**
101    * Our execution environment.
102    */
103   struct GNUNET_DATASTORE_PluginEnvironment *env;
104
105   /**
106    * Database filename.
107    */
108   char *fn;
109
110   /**
111    * Native SQLite database handle.
112    */
113   sqlite3 *dbh;
114
115   /**
116    * Precompiled SQL for update.
117    */
118   sqlite3_stmt *updPrio;
119
120   /**
121    * Precompiled SQL for insertion.
122    */
123   sqlite3_stmt *insertContent;
124
125   /**
126    * Handle to the statistics service.
127    */
128   struct GNUNET_STATISTICS_Handle *statistics;
129
130   /**
131    * Handle for pending get request.
132    */
133   struct GNUNET_STATISTICS_GetHandle *stat_get;
134
135   /**
136    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
137    */
138   struct NextContext *next_task_nc;
139
140   /**
141    * Pending task with scheduler for running the next request.
142    */
143   GNUNET_SCHEDULER_TaskIdentifier next_task;
144   
145   /**
146    * How much data are we currently storing
147    * in the database?
148    */
149   unsigned long long payload;
150
151   /**
152    * Number of updates that were made to the
153    * payload value since we last synchronized
154    * it with the statistics service.
155    */
156   unsigned int lastSync;
157
158   /**
159    * Should the database be dropped on shutdown?
160    */
161   int drop_on_shutdown;
162
163   /**
164    * Did we get an answer from statistics?
165    */
166   int stats_worked;
167 };
168
169
170 /**
171  * @brief Prepare a SQL statement
172  *
173  * @param dbh handle to the database
174  * @param zSql SQL statement, UTF-8 encoded
175  * @param ppStmt set to the prepared statement
176  * @return 0 on success
177  */
178 static int
179 sq_prepare (sqlite3 * dbh, const char *zSql,
180             sqlite3_stmt ** ppStmt)
181 {
182   char *dummy;
183   return sqlite3_prepare_v2 (dbh,
184                              zSql,
185                              strlen (zSql), ppStmt, (const char **) &dummy);
186 }
187
188
189 /**
190  * Create our database indices.
191  * 
192  * @param dbh handle to the database
193  */
194 static void
195 create_indices (sqlite3 * dbh)
196 {
197   /* create indices */
198   sqlite3_exec (dbh,
199                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
200   sqlite3_exec (dbh,
201                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
202                 NULL, NULL);
203   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
204                 NULL);
205   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
206                 NULL);
207   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
208                 NULL, NULL);
209   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
210                 NULL, NULL, NULL);
211   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
212                 NULL, NULL);
213 }
214
215
216
217 #if 1
218 #define CHECK(a) GNUNET_break(a)
219 #define ENULL NULL
220 #else
221 #define ENULL &e
222 #define ENULL_DEFINED 1
223 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
224 #endif
225
226
227
228
229 /**
230  * Initialize the database connections and associated
231  * data structures (create tables and indices
232  * as needed as well).
233  *
234  * @param cfg our configuration
235  * @param plugin the plugin context (state for this module)
236  * @return GNUNET_OK on success
237  */
238 static int
239 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
240                 struct Plugin *plugin)
241 {
242   sqlite3_stmt *stmt;
243   char *afsdir;
244 #if ENULL_DEFINED
245   char *e;
246 #endif
247   
248   if (GNUNET_OK != 
249       GNUNET_CONFIGURATION_get_value_filename (cfg,
250                                                "datastore-sqlite",
251                                                "FILENAME",
252                                                &afsdir))
253     {
254       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
255                        "sqlite",
256                        _("Option `%s' in section `%s' missing in configuration!\n"),
257                        "FILENAME",
258                        "datastore-sqlite");
259       return GNUNET_SYSERR;
260     }
261   if (GNUNET_OK != GNUNET_DISK_file_test (afsdir))
262     {
263       if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
264         {
265           GNUNET_break (0);
266           GNUNET_free (afsdir);
267           return GNUNET_SYSERR;
268         }
269       /* database is new or got deleted, reset payload to zero! */
270       if (plugin->stat_get != NULL)
271         {
272           GNUNET_STATISTICS_get_cancel (plugin->stat_get);
273           plugin->stat_get = NULL;
274         }
275       plugin->payload = 0;
276     }
277   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
278 #ifdef ENABLE_NLS
279                                               nl_langinfo (CODESET)
280 #else
281                                               "UTF-8"   /* good luck */
282 #endif
283                                               );
284   GNUNET_free (afsdir);
285   
286   /* Open database and precompile statements */
287   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
288     {
289       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
290                        "sqlite",
291                        _("Unable to initialize SQLite: %s.\n"),
292                        sqlite3_errmsg (plugin->dbh));
293       return GNUNET_SYSERR;
294     }
295   CHECK (SQLITE_OK ==
296          sqlite3_exec (plugin->dbh,
297                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
298   CHECK (SQLITE_OK ==
299          sqlite3_exec (plugin->dbh,
300                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
301   CHECK (SQLITE_OK ==
302          sqlite3_exec (plugin->dbh,
303                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
304   CHECK (SQLITE_OK ==
305          sqlite3_exec (plugin->dbh,
306                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
307   CHECK (SQLITE_OK ==
308          sqlite3_exec (plugin->dbh, 
309                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
310
311   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
312
313
314   /* We have to do it here, because otherwise precompiling SQL might fail */
315   CHECK (SQLITE_OK ==
316          sq_prepare (plugin->dbh,
317                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
318                      &stmt));
319   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
320        (sqlite3_exec (plugin->dbh,
321                       "CREATE TABLE gn080 ("
322                       "  size INT4 NOT NULL DEFAULT 0,"
323                       "  type INT4 NOT NULL DEFAULT 0,"
324                       "  prio INT4 NOT NULL DEFAULT 0,"
325                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
326                       "  expire INT8 NOT NULL DEFAULT 0,"
327                       "  hash TEXT NOT NULL DEFAULT '',"
328                       "  vhash TEXT NOT NULL DEFAULT '',"
329                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
330                       NULL) != SQLITE_OK) )
331     {
332       LOG_SQLITE (plugin, NULL,
333                   GNUNET_ERROR_TYPE_ERROR, 
334                   "sqlite3_exec");
335       sqlite3_finalize (stmt);
336       return GNUNET_SYSERR;
337     }
338   sqlite3_finalize (stmt);
339   create_indices (plugin->dbh);
340
341   CHECK (SQLITE_OK ==
342          sq_prepare (plugin->dbh,
343                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
344                      &stmt));
345   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
346        (sqlite3_exec (plugin->dbh,
347                       "CREATE TABLE gn071 ("
348                       "  key TEXT NOT NULL DEFAULT '',"
349                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
350                       NULL) != SQLITE_OK) )
351     {
352       LOG_SQLITE (plugin, NULL,
353                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
354       sqlite3_finalize (stmt);
355       return GNUNET_SYSERR;
356     }
357   sqlite3_finalize (stmt);
358
359   if ((sq_prepare (plugin->dbh,
360                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
361                    "_ROWID_ = ?",
362                    &plugin->updPrio) != SQLITE_OK) ||
363       (sq_prepare (plugin->dbh,
364                    "INSERT INTO gn080 (size, type, prio, "
365                    "anonLevel, expire, hash, vhash, value) VALUES "
366                    "(?, ?, ?, ?, ?, ?, ?, ?)",
367                    &plugin->insertContent) != SQLITE_OK))
368     {
369       LOG_SQLITE (plugin, NULL,
370                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
371       return GNUNET_SYSERR;
372     }
373   return GNUNET_OK;
374 }
375
376
377 /**
378  * Synchronize our utilization statistics with the 
379  * statistics service.
380  * @param plugin the plugin context (state for this module)
381  */
382 static void 
383 sync_stats (struct Plugin *plugin)
384 {
385   GNUNET_STATISTICS_set (plugin->statistics,
386                          QUOTA_STAT_NAME,
387                          plugin->payload,
388                          GNUNET_YES);
389   plugin->lastSync = 0;
390 }
391
392
393 /**
394  * Shutdown database connection and associate data
395  * structures.
396  * @param plugin the plugin context (state for this module)
397  */
398 static void
399 database_shutdown (struct Plugin *plugin)
400 {
401   if (plugin->lastSync > 0)
402     sync_stats (plugin);
403   if (plugin->updPrio != NULL)
404     sqlite3_finalize (plugin->updPrio);
405   if (plugin->insertContent != NULL)
406     sqlite3_finalize (plugin->insertContent);
407   sqlite3_close (plugin->dbh);
408   GNUNET_free_non_null (plugin->fn);
409 }
410
411
412 /**
413  * Get an estimate of how much space the database is
414  * currently using.
415  *
416  * @param cls our plugin context
417  * @return number of bytes used on disk
418  */
419 static unsigned long long sqlite_plugin_get_size (void *cls)
420 {
421   struct Plugin *plugin = cls;
422   return plugin->payload;
423 }
424
425
426 /**
427  * Delete the database entry with the given
428  * row identifier.
429  *
430  * @param plugin the plugin context (state for this module)
431  * @param rid the ID of the row to delete
432  */
433 static int
434 delete_by_rowid (struct Plugin* plugin, 
435                  unsigned long long rid)
436 {
437   sqlite3_stmt *stmt;
438
439   if (sq_prepare (plugin->dbh,
440                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
441     {
442       LOG_SQLITE (plugin, NULL,
443                   GNUNET_ERROR_TYPE_ERROR |
444                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
445       return GNUNET_SYSERR;
446     }
447   sqlite3_bind_int64 (stmt, 1, rid);
448   if (SQLITE_DONE != sqlite3_step (stmt))
449     {
450       LOG_SQLITE (plugin, NULL,
451                   GNUNET_ERROR_TYPE_ERROR |
452                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
453       sqlite3_finalize (stmt);
454       return GNUNET_SYSERR;
455     }
456   sqlite3_finalize (stmt);
457   return GNUNET_OK;
458 }
459
460
461 /**
462  * Context for the universal iterator.
463  */
464 struct NextContext;
465
466 /**
467  * Type of a function that will prepare
468  * the next iteration.
469  *
470  * @param cls closure
471  * @param nc the next context; NULL for the last
472  *         call which gives the callback a chance to
473  *         clean up the closure
474  * @return GNUNET_OK on success, GNUNET_NO if there are
475  *         no more values, GNUNET_SYSERR on error
476  */
477 typedef int (*PrepareFunction)(void *cls,
478                                struct NextContext *nc);
479
480
481 /**
482  * Context we keep for the "next request" callback.
483  */
484 struct NextContext
485 {
486   /**
487    * Internal state.
488    */ 
489   struct Plugin *plugin;
490
491   /**
492    * Function to call on the next value.
493    */
494   PluginIterator iter;
495
496   /**
497    * Closure for iter.
498    */
499   void *iter_cls;
500
501   /**
502    * Function to call to prepare the next
503    * iteration.
504    */
505   PrepareFunction prep;
506
507   /**
508    * Closure for prep.
509    */
510   void *prep_cls;
511
512   /**
513    * Statement that the iterator will get the data
514    * from (updated or set by prep).
515    */ 
516   sqlite3_stmt *stmt;
517
518   /**
519    * Row ID of the last result.
520    */
521   unsigned long long last_rowid;
522
523   /**
524    * Key of the last result.
525    */
526   GNUNET_HashCode lastKey;  
527
528   /**
529    * Expiration time of the last value visited.
530    */
531   struct GNUNET_TIME_Absolute lastExpiration;
532
533   /**
534    * Priority of the last value visited.
535    */ 
536   unsigned int lastPriority; 
537
538   /**
539    * Number of results processed so far.
540    */
541   unsigned int count;
542
543   /**
544    * Set to GNUNET_YES if we must stop now.
545    */
546   int end_it;
547 };
548
549
550 /**
551  * Continuation of "sqlite_next_request".
552  *
553  * @param cls the next context
554  * @param tc the task context (unused)
555  */
556 static void 
557 sqlite_next_request_cont (void *cls,
558                           const struct GNUNET_SCHEDULER_TaskContext *tc)
559 {
560   struct NextContext * nc = cls;
561   struct Plugin *plugin;
562   unsigned long long rowid;
563   sqlite3_stmt *stmtd;
564   int ret;
565   unsigned int type;
566   unsigned int size;
567   unsigned int priority;
568   unsigned int anonymity;
569   struct GNUNET_TIME_Absolute expiration;
570   const GNUNET_HashCode *key;
571   const void *data;
572   
573   plugin = nc->plugin;
574   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
575   plugin->next_task_nc = NULL;
576   if ( (GNUNET_YES == nc->end_it) ||
577        (GNUNET_OK != (nc->prep(nc->prep_cls,
578                                nc))) )
579     {
580     END:
581       nc->iter (nc->iter_cls, 
582                 NULL, NULL, 0, NULL, 0, 0, 0, 
583                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
584       nc->prep (nc->prep_cls, NULL);
585       GNUNET_free (nc);
586       return;
587     }
588
589   rowid = sqlite3_column_int64 (nc->stmt, 7);
590   nc->last_rowid = rowid;
591   type = sqlite3_column_int (nc->stmt, 1);
592   size = sqlite3_column_bytes (nc->stmt, 6);
593   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
594     {
595       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
596                        "sqlite",
597                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
598       if (SQLITE_OK != sqlite3_reset (nc->stmt))
599         LOG_SQLITE (nc->plugin, NULL,
600                     GNUNET_ERROR_TYPE_ERROR |
601                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
602       if (sq_prepare
603           (nc->plugin->dbh,
604            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
605            &stmtd) != SQLITE_OK)
606         {
607           LOG_SQLITE (nc->plugin, NULL,
608                       GNUNET_ERROR_TYPE_ERROR |
609                       GNUNET_ERROR_TYPE_BULK, 
610                       "sq_prepare");
611           goto END;
612         }
613
614       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
615         LOG_SQLITE (nc->plugin, NULL,
616                     GNUNET_ERROR_TYPE_ERROR |
617                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
618       if (SQLITE_DONE != sqlite3_step (stmtd))
619         LOG_SQLITE (nc->plugin, NULL,
620                     GNUNET_ERROR_TYPE_ERROR |
621                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
622       if (SQLITE_OK != sqlite3_finalize (stmtd))
623         LOG_SQLITE (nc->plugin, NULL,
624                     GNUNET_ERROR_TYPE_ERROR |
625                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
626       goto END;
627     }
628
629   priority = sqlite3_column_int (nc->stmt, 2);
630   anonymity = sqlite3_column_int (nc->stmt, 3);
631   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
632   key = sqlite3_column_blob (nc->stmt, 5);
633   nc->lastPriority = priority;
634   nc->lastExpiration = expiration;
635   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
636   data = sqlite3_column_blob (nc->stmt, 6);
637   nc->count++;
638   ret = nc->iter (nc->iter_cls,
639                   nc,
640                   key,
641                   size,
642                   data, 
643                   type,
644                   priority,
645                   anonymity,
646                   expiration,
647                   rowid);
648   if (ret == GNUNET_SYSERR)
649     {
650       nc->end_it = GNUNET_YES;
651       return;
652     }
653 #if DEBUG_SQLITE
654   if (ret == GNUNET_NO)
655     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
656                      "sqlite",
657                      "Asked to remove entry %llu (%u bytes)\n",
658                      (unsigned long long) rowid,
659                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
660 #endif
661   if ( (ret == GNUNET_NO) &&
662        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
663     {
664       if (plugin->payload >= size + GNUNET_DATASTORE_ENTRY_OVERHEAD)
665         plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
666       else
667         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
668                     _("Datastore payload inaccurate, please fix and restart!\n"));
669       plugin->lastSync++; 
670 #if DEBUG_SQLITE
671       if (ret == GNUNET_NO)
672         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
673                          "sqlite",
674                          "Removed entry %llu (%u bytes), new payload is %llu\n",
675                          (unsigned long long) rowid,
676                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
677                          plugin->payload);
678 #endif
679       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
680         sync_stats (plugin);
681     }
682 }
683
684
685 /**
686  * Function invoked on behalf of a "PluginIterator"
687  * asking the database plugin to call the iterator
688  * with the next item.
689  *
690  * @param next_cls whatever argument was given
691  *        to the PluginIterator as "next_cls".
692  * @param end_it set to GNUNET_YES if we
693  *        should terminate the iteration early
694  *        (iterator should be still called once more
695  *         to signal the end of the iteration).
696  */
697 static void 
698 sqlite_next_request (void *next_cls,
699                      int end_it)
700 {
701   struct NextContext * nc= next_cls;
702
703   if (GNUNET_YES == end_it)
704     nc->end_it = GNUNET_YES;
705   nc->plugin->next_task_nc = nc;
706   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (nc->plugin->env->sched,
707                                                     &sqlite_next_request_cont,
708                                                     nc);
709 }
710
711
712
713 /**
714  * Store an item in the datastore.
715  *
716  * @param cls closure
717  * @param key key for the item
718  * @param size number of bytes in data
719  * @param data content stored
720  * @param type type of the content
721  * @param priority priority of the content
722  * @param anonymity anonymity-level for the content
723  * @param expiration expiration time for the content
724  * @param msg set to an error message
725  * @return GNUNET_OK on success
726  */
727 static int
728 sqlite_plugin_put (void *cls,
729                    const GNUNET_HashCode * key,
730                    uint32_t size,
731                    const void *data,
732                    enum GNUNET_BLOCK_Type type,
733                    uint32_t priority,
734                    uint32_t anonymity,
735                    struct GNUNET_TIME_Absolute expiration,
736                    char ** msg)
737 {
738   struct Plugin *plugin = cls;
739   int n;
740   sqlite3_stmt *stmt;
741   GNUNET_HashCode vhash;
742
743 #if DEBUG_SQLITE
744   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
745                    "sqlite",
746                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
747                    type, 
748                    GNUNET_h2s(key),
749                    priority,
750                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
751                    (long long) expiration.value);
752 #endif
753   GNUNET_CRYPTO_hash (data, size, &vhash);
754   stmt = plugin->insertContent;
755   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
756       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
757       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
758       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
759       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
760       (SQLITE_OK !=
761        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
762                           SQLITE_TRANSIENT)) ||
763       (SQLITE_OK !=
764        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
765                           SQLITE_TRANSIENT))
766       || (SQLITE_OK !=
767           sqlite3_bind_blob (stmt, 8, data, size,
768                              SQLITE_TRANSIENT)))
769     {
770       LOG_SQLITE (plugin,
771                   msg,
772                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
773       if (SQLITE_OK != sqlite3_reset (stmt))
774         LOG_SQLITE (plugin, NULL,
775                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
776       return GNUNET_SYSERR;
777     }
778   n = sqlite3_step (stmt);
779   if (n != SQLITE_DONE) 
780     {
781       if (n == SQLITE_BUSY)
782         {
783           LOG_SQLITE (plugin, msg,
784                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
785           sqlite3_reset (stmt);
786           GNUNET_break (0);
787           return GNUNET_NO;
788         }
789       LOG_SQLITE (plugin, msg,
790                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
791       sqlite3_reset (stmt);
792       database_shutdown (plugin);
793       database_setup (plugin->env->cfg,
794                       plugin);
795       return GNUNET_SYSERR;
796     }
797   if (SQLITE_OK != sqlite3_reset (stmt))
798     LOG_SQLITE (plugin, NULL,
799                 GNUNET_ERROR_TYPE_ERROR |
800                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
801   plugin->lastSync++;
802   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
803 #if DEBUG_SQLITE
804   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
805                    "sqlite",
806                    "Stored new entry (%u bytes), new payload is %llu\n",
807                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
808                    plugin->payload);
809 #endif
810   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
811     sync_stats (plugin);
812   return GNUNET_OK;
813 }
814
815
816 /**
817  * Update the priority for a particular key in the datastore.  If
818  * the expiration time in value is different than the time found in
819  * the datastore, the higher value should be kept.  For the
820  * anonymity level, the lower value is to be used.  The specified
821  * priority should be added to the existing priority, ignoring the
822  * priority in value.
823  *
824  * Note that it is possible for multiple values to match this put.
825  * In that case, all of the respective values are updated.
826  *
827  * @param cls the plugin context (state for this module)
828  * @param uid unique identifier of the datum
829  * @param delta by how much should the priority
830  *     change?  If priority + delta < 0 the
831  *     priority should be set to 0 (never go
832  *     negative).
833  * @param expire new expiration time should be the
834  *     MAX of any existing expiration time and
835  *     this value
836  * @param msg set to an error message
837  * @return GNUNET_OK on success
838  */
839 static int
840 sqlite_plugin_update (void *cls,
841                       uint64_t uid,
842                       int delta, struct GNUNET_TIME_Absolute expire,
843                       char **msg)
844 {
845   struct Plugin *plugin = cls;
846   int n;
847
848   sqlite3_bind_int (plugin->updPrio, 1, delta);
849   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
850   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
851   n = sqlite3_step (plugin->updPrio);
852   if (n != SQLITE_DONE)
853     LOG_SQLITE (plugin, msg,
854                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
855                 "sqlite3_step");
856 #if DEBUG_SQLITE
857   else
858     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
859                      "sqlite",
860                      "Block updated\n");
861 #endif
862   sqlite3_reset (plugin->updPrio);
863
864   if (n == SQLITE_BUSY)
865     return GNUNET_NO;
866   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
867 }
868
869
870 /**
871  * Internal context for an iteration.
872  */
873 struct IterContext
874 {
875   /**
876    * FIXME.
877    */
878   sqlite3_stmt *stmt_1;
879
880   /**
881    * FIXME.
882    */
883   sqlite3_stmt *stmt_2;
884
885   /**
886    * FIXME.
887    */
888   int is_asc;
889
890   /**
891    * FIXME.
892    */
893   int is_prio;
894
895   /**
896    * FIXME.
897    */
898   int is_migr;
899
900   /**
901    * FIXME.
902    */
903   int limit_nonanonymous;
904
905   /**
906    * Desired type for blocks returned by this iterator.
907    */
908   enum GNUNET_BLOCK_Type type;
909 };
910
911
912 /**
913  * Prepare our SQL query to obtain the next record from the database.
914  *
915  * @param cls our "struct IterContext"
916  * @param nc NULL to terminate the iteration, otherwise our context for
917  *           getting the next result.
918  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
919  *         GNUNET_SYSERR on error (or end of iteration)
920  */
921 static int
922 iter_next_prepare (void *cls,
923                    struct NextContext *nc)
924 {
925   struct IterContext *ic = cls;
926   struct Plugin *plugin;
927   int ret;
928
929   if (nc == NULL)
930     {
931 #if DEBUG_SQLITE
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Asked to clean up iterator state.\n");
934 #endif
935       sqlite3_finalize (ic->stmt_1);
936       sqlite3_finalize (ic->stmt_2);
937       return GNUNET_SYSERR;
938     }
939   sqlite3_reset (ic->stmt_1);
940   sqlite3_reset (ic->stmt_2);
941   plugin = nc->plugin;
942   if (ic->is_prio)
943     {
944 #if DEBUG_SQLITE
945       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
946                   "Restricting to results larger than the last priority %u\n",
947                   nc->lastPriority);
948 #endif
949       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
950       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
951     }
952   else
953     {
954 #if DEBUG_SQLITE
955       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
956                   "Restricting to results larger than the last expiration %llu\n",
957                   (unsigned long long) nc->lastExpiration.value);
958 #endif
959       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
960       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
961     }
962 #if DEBUG_SQLITE
963   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
964               "Restricting to results larger than the last key `%s'\n",
965               GNUNET_h2s(&nc->lastKey));
966 #endif
967   sqlite3_bind_blob (ic->stmt_1, 2, 
968                      &nc->lastKey, 
969                      sizeof (GNUNET_HashCode),
970                      SQLITE_TRANSIENT);
971   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
972     {      
973 #if DEBUG_SQLITE
974       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975                   "Result found using iterator 1\n");
976 #endif
977       nc->stmt = ic->stmt_1;
978       return GNUNET_OK;
979     }
980   if (ret != SQLITE_DONE)
981     {
982       LOG_SQLITE (plugin, NULL,
983                   GNUNET_ERROR_TYPE_ERROR |
984                   GNUNET_ERROR_TYPE_BULK,
985                   "sqlite3_step");
986       return GNUNET_SYSERR;
987     }
988   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
989     LOG_SQLITE (plugin, NULL,
990                 GNUNET_ERROR_TYPE_ERROR | 
991                 GNUNET_ERROR_TYPE_BULK, 
992                 "sqlite3_reset");
993   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
994     {
995 #if DEBUG_SQLITE
996       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997                   "Result found using iterator 2\n");
998 #endif
999       nc->stmt = ic->stmt_2;
1000       return GNUNET_OK;
1001     }
1002   if (ret != SQLITE_DONE)
1003     {
1004       LOG_SQLITE (plugin, NULL,
1005                   GNUNET_ERROR_TYPE_ERROR |
1006                   GNUNET_ERROR_TYPE_BULK,
1007                   "sqlite3_step");
1008       return GNUNET_SYSERR;
1009     }
1010   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
1011     LOG_SQLITE (plugin, NULL,
1012                 GNUNET_ERROR_TYPE_ERROR |
1013                 GNUNET_ERROR_TYPE_BULK,
1014                 "sqlite3_reset");
1015 #if DEBUG_SQLITE
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1017               "No result found using either iterator\n");
1018 #endif
1019   return GNUNET_NO;
1020 }
1021
1022
1023 /**
1024  * Call a method for each key in the database and
1025  * call the callback method on it.
1026  *
1027  * @param plugin our plugin context
1028  * @param type entries of which type should be considered?
1029  * @param is_asc are we iterating in ascending order?
1030  * @param is_prio are we iterating by priority (otherwise by expiration)
1031  * @param is_migr are we iterating in migration order?
1032  * @param limit_nonanonymous are we restricting results to those with anonymity
1033  *              level zero?
1034  * @param stmt_str_1 first SQL statement to execute
1035  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1036  * @param iter function to call on each matching value;
1037  *        will be called once with a NULL value at the end
1038  * @param iter_cls closure for iter
1039  */
1040 static void
1041 basic_iter (struct Plugin *plugin,
1042             enum GNUNET_BLOCK_Type type,
1043             int is_asc,
1044             int is_prio,
1045             int is_migr,
1046             int limit_nonanonymous,
1047             const char *stmt_str_1,
1048             const char *stmt_str_2,
1049             PluginIterator iter,
1050             void *iter_cls)
1051 {
1052   struct NextContext *nc;
1053   struct IterContext *ic;
1054   sqlite3_stmt *stmt_1;
1055   sqlite3_stmt *stmt_2;
1056
1057 #if DEBUG_SQLITE
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "At %llu, using queries `%s' and `%s'\n",
1060               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1061               stmt_str_1,
1062               stmt_str_2);
1063 #endif
1064   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1065     {
1066       LOG_SQLITE (plugin, NULL,
1067                   GNUNET_ERROR_TYPE_ERROR |
1068                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1069       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1070       return;
1071     }
1072   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1073     {
1074       LOG_SQLITE (plugin, NULL,
1075                   GNUNET_ERROR_TYPE_ERROR |
1076                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1077       sqlite3_finalize (stmt_1);
1078       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1079       return;
1080     }
1081   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1082                       sizeof(struct IterContext));
1083   nc->plugin = plugin;
1084   nc->iter = iter;
1085   nc->iter_cls = iter_cls;
1086   nc->stmt = NULL;
1087   ic = (struct IterContext*) &nc[1];
1088   ic->stmt_1 = stmt_1;
1089   ic->stmt_2 = stmt_2;
1090   ic->type = type;
1091   ic->is_asc = is_asc;
1092   ic->is_prio = is_prio;
1093   ic->is_migr = is_migr;
1094   ic->limit_nonanonymous = limit_nonanonymous;
1095   nc->prep = &iter_next_prepare;
1096   nc->prep_cls = ic;
1097   if (is_asc)
1098     {
1099       nc->lastPriority = 0;
1100       nc->lastExpiration.value = 0;
1101       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1102     }
1103   else
1104     {
1105       nc->lastPriority = 0x7FFFFFFF;
1106       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1107       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1108     }
1109   sqlite_next_request (nc, GNUNET_NO);
1110 }
1111
1112
1113 /**
1114  * Select a subset of the items in the datastore and call
1115  * the given iterator for each of them.
1116  *
1117  * @param cls our plugin context
1118  * @param type entries of which type should be considered?
1119  *        Use 0 for any type.
1120  * @param iter function to call on each matching value;
1121  *        will be called once with a NULL value at the end
1122  * @param iter_cls closure for iter
1123  */
1124 static void
1125 sqlite_plugin_iter_low_priority (void *cls,
1126                                  enum GNUNET_BLOCK_Type type,
1127                                  PluginIterator iter,
1128                                  void *iter_cls)
1129 {
1130   basic_iter (cls,
1131               type, 
1132               GNUNET_YES, GNUNET_YES, 
1133               GNUNET_NO, GNUNET_NO,
1134               SELECT_IT_LOW_PRIORITY_1,
1135               SELECT_IT_LOW_PRIORITY_2, 
1136               iter, iter_cls);
1137 }
1138
1139
1140 /**
1141  * Select a subset of the items in the datastore and call
1142  * the given iterator for each of them.
1143  *
1144  * @param cls our plugin context
1145  * @param type entries of which type should be considered?
1146  *        Use 0 for any type.
1147  * @param iter function to call on each matching value;
1148  *        will be called once with a NULL value at the end
1149  * @param iter_cls closure for iter
1150  */
1151 static void
1152 sqlite_plugin_iter_zero_anonymity (void *cls,
1153                                    enum GNUNET_BLOCK_Type type,
1154                                    PluginIterator iter,
1155                                    void *iter_cls)
1156 {
1157   struct GNUNET_TIME_Absolute now;
1158   char *q1;
1159   char *q2;
1160
1161   now = GNUNET_TIME_absolute_get ();
1162   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1163                    (unsigned long long) now.value);
1164   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1165                    (unsigned long long) now.value);
1166   basic_iter (cls,
1167               type, 
1168               GNUNET_NO, GNUNET_YES, 
1169               GNUNET_NO, GNUNET_YES,
1170               q1,
1171               q2,
1172               iter, iter_cls);
1173   GNUNET_free (q1);
1174   GNUNET_free (q2);
1175 }
1176
1177
1178
1179 /**
1180  * Select a subset of the items in the datastore and call
1181  * the given iterator for each of them.
1182  *
1183  * @param cls our plugin context
1184  * @param type entries of which type should be considered?
1185  *        Use 0 for any type.
1186  * @param iter function to call on each matching value;
1187  *        will be called once with a NULL value at the end
1188  * @param iter_cls closure for iter
1189  */
1190 static void
1191 sqlite_plugin_iter_ascending_expiration (void *cls,
1192                                          enum GNUNET_BLOCK_Type type,
1193                                          PluginIterator iter,
1194                                          void *iter_cls)
1195 {
1196   struct GNUNET_TIME_Absolute now;
1197   char *q1;
1198   char *q2;
1199
1200   now = GNUNET_TIME_absolute_get ();
1201   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1202                    (unsigned long long) 0*now.value);
1203   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1204                    (unsigned long long) 0*now.value);
1205   basic_iter (cls,
1206               type, 
1207               GNUNET_YES, GNUNET_NO, 
1208               GNUNET_NO, GNUNET_NO,
1209               q1, q2,
1210               iter, iter_cls);
1211   GNUNET_free (q1);
1212   GNUNET_free (q2);
1213 }
1214
1215
1216 /**
1217  * Select a subset of the items in the datastore and call
1218  * the given iterator for each of them.
1219  *
1220  * @param cls our plugin context
1221  * @param type entries of which type should be considered?
1222  *        Use 0 for any type.
1223  * @param iter function to call on each matching value;
1224  *        will be called once with a NULL value at the end
1225  * @param iter_cls closure for iter
1226  */
1227 static void
1228 sqlite_plugin_iter_migration_order (void *cls,
1229                                     enum GNUNET_BLOCK_Type type,
1230                                     PluginIterator iter,
1231                                     void *iter_cls)
1232 {
1233   struct GNUNET_TIME_Absolute now;
1234   char *q;
1235
1236   now = GNUNET_TIME_absolute_get ();
1237   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1238                    (unsigned long long) now.value);
1239   basic_iter (cls,
1240               type, 
1241               GNUNET_NO, GNUNET_NO, 
1242               GNUNET_YES, GNUNET_NO,
1243               SELECT_IT_MIGRATION_ORDER_1,
1244               q,
1245               iter, iter_cls);
1246   GNUNET_free (q);
1247 }
1248
1249
1250 /**
1251  * Call sqlite using the already prepared query to get
1252  * the next result.
1253  *
1254  * @param cls not used
1255  * @param nc context with the prepared query
1256  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1257  *        there are no more results 
1258  */
1259 static int
1260 all_next_prepare (void *cls,
1261                   struct NextContext *nc)
1262 {
1263   struct Plugin *plugin;
1264   int ret;
1265
1266   if (nc == NULL)
1267     {
1268 #if DEBUG_SQLITE
1269       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1270                   "Asked to clean up iterator state.\n");
1271 #endif
1272       return GNUNET_SYSERR;
1273     }
1274   plugin = nc->plugin;
1275   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1276     {      
1277       return GNUNET_OK;
1278     }
1279   if (ret != SQLITE_DONE)
1280     {
1281       LOG_SQLITE (plugin, NULL,
1282                   GNUNET_ERROR_TYPE_ERROR |
1283                   GNUNET_ERROR_TYPE_BULK,
1284                   "sqlite3_step");
1285       return GNUNET_SYSERR;
1286     }
1287   return GNUNET_NO;
1288 }
1289
1290
1291 /**
1292  * Select a subset of the items in the datastore and call
1293  * the given iterator for each of them.
1294  *
1295  * @param cls our plugin context
1296  * @param type entries of which type should be considered?
1297  *        Use 0 for any type.
1298  * @param iter function to call on each matching value;
1299  *        will be called once with a NULL value at the end
1300  * @param iter_cls closure for iter
1301  */
1302 static void
1303 sqlite_plugin_iter_all_now (void *cls,
1304                             enum GNUNET_BLOCK_Type type,
1305                             PluginIterator iter,
1306                             void *iter_cls)
1307 {
1308   struct Plugin *plugin = cls;
1309   struct NextContext *nc;
1310   sqlite3_stmt *stmt;
1311
1312   if (sq_prepare (plugin->dbh, 
1313                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1314                   &stmt) != SQLITE_OK)
1315     {
1316       LOG_SQLITE (plugin, NULL,
1317                   GNUNET_ERROR_TYPE_ERROR |
1318                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
1319       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1320       return;
1321     }
1322   nc = GNUNET_malloc (sizeof(struct NextContext));
1323   nc->plugin = plugin;
1324   nc->iter = iter;
1325   nc->iter_cls = iter_cls;
1326   nc->stmt = stmt;
1327   nc->prep = &all_next_prepare;
1328   nc->prep_cls = NULL;
1329   sqlite_next_request (nc, GNUNET_NO);
1330 }
1331
1332
1333 /**
1334  * FIXME.
1335  */
1336 struct GetNextContext
1337 {
1338
1339   /**
1340    * FIXME.
1341    */
1342   int total;
1343
1344   /**
1345    * FIXME.
1346    */
1347   int off;
1348
1349   /**
1350    * FIXME.
1351    */
1352   int have_vhash;
1353
1354   /**
1355    * FIXME.
1356    */
1357   unsigned int type;
1358
1359   /**
1360    * FIXME.
1361    */
1362   sqlite3_stmt *stmt;
1363
1364   /**
1365    * FIXME.
1366    */
1367   GNUNET_HashCode key;
1368
1369   /**
1370    * FIXME.
1371    */
1372   GNUNET_HashCode vhash;
1373 };
1374
1375
1376
1377 /**
1378  * FIXME.
1379  *
1380  * @param cls our "struct GetNextContext*"
1381  * @param nc FIXME
1382  * @return GNUNET_YES if there are more results, 
1383  *         GNUNET_NO if there are no more results,
1384  *         GNUNET_SYSERR on internal error
1385  */
1386 static int
1387 get_next_prepare (void *cls,
1388                   struct NextContext *nc)
1389 {
1390   struct GetNextContext *gnc = cls;
1391   int sqoff;
1392   int ret;
1393   int limit_off;
1394
1395   if (nc == NULL)
1396     {
1397       sqlite3_finalize (gnc->stmt);
1398       return GNUNET_SYSERR;
1399     }
1400   if (nc->count == gnc->total)
1401     return GNUNET_NO;
1402   if (nc->count + gnc->off == gnc->total)
1403     nc->last_rowid = 0;
1404   if (nc->count == 0)
1405     limit_off = gnc->off;
1406   else
1407     limit_off = 0;
1408   sqoff = 1;
1409   sqlite3_reset (nc->stmt);
1410   ret = sqlite3_bind_blob (nc->stmt,
1411                            sqoff++,
1412                            &gnc->key, 
1413                            sizeof (GNUNET_HashCode),
1414                            SQLITE_TRANSIENT);
1415   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1416     ret = sqlite3_bind_blob (nc->stmt,
1417                              sqoff++,
1418                              &gnc->vhash,
1419                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1420   if ((gnc->type != 0) && (ret == SQLITE_OK))
1421     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1422   if (ret == SQLITE_OK)
1423     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1424   if (ret == SQLITE_OK)
1425     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1426   if (ret != SQLITE_OK)
1427     return GNUNET_SYSERR;
1428   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1429     return GNUNET_NO;
1430   return GNUNET_OK;
1431 }
1432
1433
1434 /**
1435  * Iterate over the results for a particular key
1436  * in the datastore.
1437  *
1438  * @param cls closure
1439  * @param key maybe NULL (to match all entries)
1440  * @param vhash hash of the value, maybe NULL (to
1441  *        match all values that have the right key).
1442  *        Note that for DBlocks there is no difference
1443  *        betwen key and vhash, but for other blocks
1444  *        there may be!
1445  * @param type entries of which type are relevant?
1446  *     Use 0 for any type.
1447  * @param iter function to call on each matching value;
1448  *        will be called once with a NULL value at the end
1449  * @param iter_cls closure for iter
1450  */
1451 static void
1452 sqlite_plugin_get (void *cls,
1453                    const GNUNET_HashCode * key,
1454                    const GNUNET_HashCode * vhash,
1455                    enum GNUNET_BLOCK_Type type,
1456                    PluginIterator iter, void *iter_cls)
1457 {
1458   struct Plugin *plugin = cls;
1459   struct GetNextContext *gpc;
1460   struct NextContext *nc;
1461   int ret;
1462   int total;
1463   sqlite3_stmt *stmt;
1464   char scratch[256];
1465   int sqoff;
1466
1467   GNUNET_assert (iter != NULL);
1468   if (key == NULL)
1469     {
1470       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1471       return;
1472     }
1473   GNUNET_snprintf (scratch, sizeof (scratch),
1474                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1475                    vhash == NULL ? "" : " AND vhash=:2",
1476                    type == 0 ? "" : (vhash ==
1477                                      NULL) ? " AND type=:2" : " AND type=:3");
1478   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1479     {
1480       LOG_SQLITE (plugin, NULL,
1481                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1482       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1483       return;
1484     }
1485   sqoff = 1;
1486   ret = sqlite3_bind_blob (stmt,
1487                            sqoff++,
1488                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1489   if ((vhash != NULL) && (ret == SQLITE_OK))
1490     ret = sqlite3_bind_blob (stmt,
1491                              sqoff++,
1492                              vhash,
1493                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1494   if ((type != 0) && (ret == SQLITE_OK))
1495     ret = sqlite3_bind_int (stmt, sqoff++, type);
1496   if (SQLITE_OK != ret)
1497     {
1498       LOG_SQLITE (plugin, NULL,
1499                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1500       sqlite3_reset (stmt);
1501       sqlite3_finalize (stmt);
1502       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1503       return;
1504     }
1505   ret = sqlite3_step (stmt);
1506   if (ret != SQLITE_ROW)
1507     {
1508       LOG_SQLITE (plugin, NULL,
1509                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1510                   "sqlite_step");
1511       sqlite3_reset (stmt);
1512       sqlite3_finalize (stmt);
1513       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1514       return;
1515     }
1516   total = sqlite3_column_int (stmt, 0);
1517   sqlite3_reset (stmt);
1518   sqlite3_finalize (stmt);
1519   if (0 == total)
1520     {
1521       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1522       return;
1523     }
1524
1525   GNUNET_snprintf (scratch, sizeof (scratch),
1526                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1527                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1528                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1529                    vhash == NULL ? "" : " AND vhash=:2",
1530                    type == 0 ? "" : (vhash ==
1531                                      NULL) ? " AND type=:2" : " AND type=:3",
1532                    sqoff, sqoff + 1);
1533   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1534     {
1535       LOG_SQLITE (plugin, NULL,
1536                   GNUNET_ERROR_TYPE_ERROR |
1537                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1538       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1539       return;
1540     }
1541   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1542                       sizeof(struct GetNextContext));
1543   nc->plugin = plugin;
1544   nc->iter = iter;
1545   nc->iter_cls = iter_cls;
1546   nc->stmt = stmt;
1547   gpc = (struct GetNextContext*) &nc[1];
1548   gpc->total = total;
1549   gpc->type = type;
1550   gpc->key = *key;
1551   gpc->stmt = stmt; /* alias used for freeing at the end! */
1552   if (NULL != vhash)
1553     {
1554       gpc->have_vhash = GNUNET_YES;
1555       gpc->vhash = *vhash;
1556     }
1557   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1558   nc->prep = &get_next_prepare;
1559   nc->prep_cls = gpc;
1560   sqlite_next_request (nc, GNUNET_NO);
1561 }
1562
1563
1564 /**
1565  * Drop database.
1566  *
1567  * @param cls our plugin context
1568  */
1569 static void 
1570 sqlite_plugin_drop (void *cls)
1571 {
1572   struct Plugin *plugin = cls;
1573   plugin->drop_on_shutdown = GNUNET_YES;
1574 }
1575
1576
1577 /**
1578  * Callback function to process statistic values.
1579  *
1580  * @param cls closure
1581  * @param subsystem name of subsystem that created the statistic
1582  * @param name the name of the datum
1583  * @param value the current value
1584  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1585  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1586  */
1587 static int
1588 process_stat_in (void *cls,
1589                  const char *subsystem,
1590                  const char *name,
1591                  uint64_t value,
1592                  int is_persistent)
1593 {
1594   struct Plugin *plugin = cls;
1595
1596   plugin->stats_worked = GNUNET_YES;
1597   plugin->payload += value;
1598 #if DEBUG_SQLITE
1599   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1600                    "sqlite",
1601                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1602                    value,
1603                    plugin->payload);
1604 #endif
1605   return GNUNET_OK;
1606 }
1607
1608
1609 static void
1610 process_stat_done (void *cls,
1611                    int success)
1612 {
1613   struct Plugin *plugin = cls;
1614   sqlite3_stmt *stmt;
1615   uint64_t pages;
1616   uint64_t page_size;
1617
1618   plugin->stat_get = NULL;
1619   if ( (plugin->stats_worked == GNUNET_NO) &&
1620        (SQLITE_VERSION_NUMBER >= 3006000) )
1621    {
1622       CHECK (SQLITE_OK ==
1623              sqlite3_exec (plugin->dbh,
1624                            "VACUUM", NULL, NULL, ENULL));
1625       CHECK (SQLITE_OK ==
1626              sqlite3_exec (plugin->dbh,
1627                            "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
1628       CHECK (SQLITE_OK ==
1629              sq_prepare (plugin->dbh,
1630                          "PRAGMA page_count",
1631                          &stmt));
1632       if (SQLITE_ROW ==
1633           sqlite3_step (stmt))
1634         pages = sqlite3_column_int64 (stmt, 0);
1635       else
1636         pages = 0;
1637       sqlite3_finalize (stmt);
1638       CHECK (SQLITE_OK ==
1639              sq_prepare (plugin->dbh,
1640                          "PRAGMA page_size",
1641                          &stmt));
1642       CHECK (SQLITE_ROW ==
1643              sqlite3_step (stmt));
1644       page_size = sqlite3_column_int64 (stmt, 0);
1645       sqlite3_finalize (stmt);
1646       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1647                   _("Using sqlite page utilization to estimate payload (%llu pages of size %llu bytes)\n"),
1648                   (unsigned long long) pages,
1649                   (unsigned long long) page_size);
1650       plugin->payload = pages * page_size;
1651     }
1652 }
1653                                          
1654
1655 /**
1656  * Entry point for the plugin.
1657  *
1658  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1659  * @return NULL on error, othrewise the plugin context
1660  */
1661 void *
1662 libgnunet_plugin_datastore_sqlite_init (void *cls)
1663 {
1664   static struct Plugin plugin;
1665   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1666   struct GNUNET_DATASTORE_PluginFunctions *api;
1667
1668   if (plugin.env != NULL)
1669     return NULL; /* can only initialize once! */
1670   memset (&plugin, 0, sizeof(struct Plugin));
1671   plugin.env = env;
1672   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1673                                                 "ds-sqlite",
1674                                                 env->cfg);
1675   plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
1676                                            "ds-sqlite",
1677                                            QUOTA_STAT_NAME,
1678                                            GNUNET_TIME_UNIT_SECONDS,
1679                                            &process_stat_done,
1680                                            &process_stat_in,
1681                                            &plugin);
1682   if (GNUNET_OK !=
1683       database_setup (env->cfg, &plugin))
1684     {
1685       database_shutdown (&plugin);
1686       return NULL;
1687     }
1688   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1689   api->cls = &plugin;
1690   api->get_size = &sqlite_plugin_get_size;
1691   api->put = &sqlite_plugin_put;
1692   api->next_request = &sqlite_next_request;
1693   api->get = &sqlite_plugin_get;
1694   api->update = &sqlite_plugin_update;
1695   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1696   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1697   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1698   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1699   api->iter_all_now = &sqlite_plugin_iter_all_now;
1700   api->drop = &sqlite_plugin_drop;
1701   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1702                    "sqlite", _("Sqlite database running\n"));
1703   return api;
1704 }
1705
1706
1707 /**
1708  * Exit point from the plugin.
1709  *
1710  * @param cls the plugin context (as returned by "init")
1711  * @return always NULL
1712  */
1713 void *
1714 libgnunet_plugin_datastore_sqlite_done (void *cls)
1715 {
1716   char *fn;
1717   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1718   struct Plugin *plugin = api->cls;
1719
1720   if (plugin->stat_get != NULL)
1721     {
1722       GNUNET_STATISTICS_get_cancel (plugin->stat_get);
1723       plugin->stat_get = NULL;
1724     }
1725   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1726     {
1727       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1728                                plugin->next_task);
1729       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1730       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1731       GNUNET_free (plugin->next_task_nc);
1732       plugin->next_task_nc = NULL;
1733     }
1734   fn = NULL;
1735   if (plugin->drop_on_shutdown)
1736     fn = GNUNET_strdup (plugin->fn);
1737   database_shutdown (plugin);
1738   GNUNET_STATISTICS_destroy (plugin->statistics,
1739                              GNUNET_NO);
1740   plugin->env = NULL; 
1741   plugin->payload = 0;
1742   GNUNET_free (api);
1743   if (fn != NULL)
1744     {
1745       if (0 != UNLINK(fn))
1746         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1747                                   "unlink",
1748                                   fn);
1749       GNUNET_free (fn);
1750     }
1751   return NULL;
1752 }
1753
1754 /* end of plugin_datastore_sqlite.c */