60c419092ceade57818649ef3d8c2d58d7c26d6a
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_YES
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94
95 /**
96  * Context for all functions in this plugin.
97  */
98 struct Plugin 
99 {
100   /**
101    * Our execution environment.
102    */
103   struct GNUNET_DATASTORE_PluginEnvironment *env;
104
105   /**
106    * Database filename.
107    */
108   char *fn;
109
110   /**
111    * Native SQLite database handle.
112    */
113   sqlite3 *dbh;
114
115   /**
116    * Precompiled SQL for update.
117    */
118   sqlite3_stmt *updPrio;
119
120   /**
121    * Precompiled SQL for insertion.
122    */
123   sqlite3_stmt *insertContent;
124
125   /**
126    * Handle to the statistics service.
127    */
128   struct GNUNET_STATISTICS_Handle *statistics;
129
130   /**
131    * Handle for pending get request.
132    */
133   struct GNUNET_STATISTICS_GetHandle *stat_get;
134
135   /**
136    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
137    */
138   struct NextContext *next_task_nc;
139
140   /**
141    * Pending task with scheduler for running the next request.
142    */
143   GNUNET_SCHEDULER_TaskIdentifier next_task;
144   
145   /**
146    * How much data are we currently storing
147    * in the database?
148    */
149   unsigned long long payload;
150
151   /**
152    * Number of updates that were made to the
153    * payload value since we last synchronized
154    * it with the statistics service.
155    */
156   unsigned int lastSync;
157
158   /**
159    * Should the database be dropped on shutdown?
160    */
161   int drop_on_shutdown;
162 };
163
164
165 /**
166  * @brief Prepare a SQL statement
167  *
168  * @param dbh handle to the database
169  * @param zSql SQL statement, UTF-8 encoded
170  * @param ppStmt set to the prepared statement
171  * @return 0 on success
172  */
173 static int
174 sq_prepare (sqlite3 * dbh, const char *zSql,
175             sqlite3_stmt ** ppStmt)
176 {
177   char *dummy;
178   return sqlite3_prepare (dbh,
179                           zSql,
180                           strlen (zSql), ppStmt, (const char **) &dummy);
181 }
182
183
184 /**
185  * Create our database indices.
186  * 
187  * @param dbh handle to the database
188  */
189 static void
190 create_indices (sqlite3 * dbh)
191 {
192   /* create indices */
193   sqlite3_exec (dbh,
194                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
195   sqlite3_exec (dbh,
196                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
197                 NULL, NULL);
198   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
199                 NULL);
200   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
201                 NULL);
202   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
203                 NULL, NULL);
204   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
205                 NULL, NULL, NULL);
206   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
207                 NULL, NULL);
208 }
209
210
211
212 #if 1
213 #define CHECK(a) GNUNET_break(a)
214 #define ENULL NULL
215 #else
216 #define ENULL &e
217 #define ENULL_DEFINED 1
218 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
219 #endif
220
221
222
223
224 /**
225  * Initialize the database connections and associated
226  * data structures (create tables and indices
227  * as needed as well).
228  *
229  * @param cfg our configuration
230  * @param plugin the plugin context (state for this module)
231  * @return GNUNET_OK on success
232  */
233 static int
234 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
235                 struct Plugin *plugin)
236 {
237   sqlite3_stmt *stmt;
238   char *afsdir;
239 #if ENULL_DEFINED
240   char *e;
241 #endif
242   
243   if (GNUNET_OK != 
244       GNUNET_CONFIGURATION_get_value_filename (cfg,
245                                                "datastore-sqlite",
246                                                "FILENAME",
247                                                &afsdir))
248     {
249       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
250                        "sqlite",
251                        _("Option `%s' in section `%s' missing in configuration!\n"),
252                        "FILENAME",
253                        "datastore-sqlite");
254       return GNUNET_SYSERR;
255     }
256   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
257     {
258       GNUNET_break (0);
259       GNUNET_free (afsdir);
260       return GNUNET_SYSERR;
261     }
262   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
263 #ifdef ENABLE_NLS
264                                               nl_langinfo (CODESET)
265 #else
266                                               "UTF-8"   /* good luck */
267 #endif
268                                               );
269   GNUNET_free (afsdir);
270   
271   /* Open database and precompile statements */
272   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
273     {
274       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
275                        "sqlite",
276                        _("Unable to initialize SQLite: %s.\n"),
277                        sqlite3_errmsg (plugin->dbh));
278       return GNUNET_SYSERR;
279     }
280   CHECK (SQLITE_OK ==
281          sqlite3_exec (plugin->dbh,
282                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
283   CHECK (SQLITE_OK ==
284          sqlite3_exec (plugin->dbh,
285                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
286   CHECK (SQLITE_OK ==
287          sqlite3_exec (plugin->dbh,
288                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
289   CHECK (SQLITE_OK ==
290          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
291
292   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
293
294
295   /* We have to do it here, because otherwise precompiling SQL might fail */
296   CHECK (SQLITE_OK ==
297          sq_prepare (plugin->dbh,
298                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
299                      &stmt));
300   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
301        (sqlite3_exec (plugin->dbh,
302                       "CREATE TABLE gn080 ("
303                       "  size INT4 NOT NULL DEFAULT 0,"
304                       "  type INT4 NOT NULL DEFAULT 0,"
305                       "  prio INT4 NOT NULL DEFAULT 0,"
306                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
307                       "  expire INT8 NOT NULL DEFAULT 0,"
308                       "  hash TEXT NOT NULL DEFAULT '',"
309                       "  vhash TEXT NOT NULL DEFAULT '',"
310                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
311                       NULL) != SQLITE_OK) )
312     {
313       LOG_SQLITE (plugin, NULL,
314                   GNUNET_ERROR_TYPE_ERROR, 
315                   "sqlite3_exec");
316       sqlite3_finalize (stmt);
317       return GNUNET_SYSERR;
318     }
319   sqlite3_finalize (stmt);
320   create_indices (plugin->dbh);
321
322   CHECK (SQLITE_OK ==
323          sq_prepare (plugin->dbh,
324                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
325                      &stmt));
326   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
327        (sqlite3_exec (plugin->dbh,
328                       "CREATE TABLE gn071 ("
329                       "  key TEXT NOT NULL DEFAULT '',"
330                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
331                       NULL) != SQLITE_OK) )
332     {
333       LOG_SQLITE (plugin, NULL,
334                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
335       sqlite3_finalize (stmt);
336       return GNUNET_SYSERR;
337     }
338   sqlite3_finalize (stmt);
339
340   if ((sq_prepare (plugin->dbh,
341                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
342                    "_ROWID_ = ?",
343                    &plugin->updPrio) != SQLITE_OK) ||
344       (sq_prepare (plugin->dbh,
345                    "INSERT INTO gn080 (size, type, prio, "
346                    "anonLevel, expire, hash, vhash, value) VALUES "
347                    "(?, ?, ?, ?, ?, ?, ?, ?)",
348                    &plugin->insertContent) != SQLITE_OK))
349     {
350       LOG_SQLITE (plugin, NULL,
351                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
352       return GNUNET_SYSERR;
353     }
354   return GNUNET_OK;
355 }
356
357
358 /**
359  * Synchronize our utilization statistics with the 
360  * statistics service.
361  * @param plugin the plugin context (state for this module)
362  */
363 static void 
364 sync_stats (struct Plugin *plugin)
365 {
366   GNUNET_STATISTICS_set (plugin->statistics,
367                          QUOTA_STAT_NAME,
368                          plugin->payload,
369                          GNUNET_YES);
370   plugin->lastSync = 0;
371 }
372
373
374 /**
375  * Shutdown database connection and associate data
376  * structures.
377  * @param plugin the plugin context (state for this module)
378  */
379 static void
380 database_shutdown (struct Plugin *plugin)
381 {
382   if (plugin->lastSync > 0)
383     sync_stats (plugin);
384   if (plugin->updPrio != NULL)
385     sqlite3_finalize (plugin->updPrio);
386   if (plugin->insertContent != NULL)
387     sqlite3_finalize (plugin->insertContent);
388   sqlite3_close (plugin->dbh);
389   GNUNET_free_non_null (plugin->fn);
390 }
391
392
393 /**
394  * Get an estimate of how much space the database is
395  * currently using.
396  *
397  * @param cls our plugin context
398  * @return number of bytes used on disk
399  */
400 static unsigned long long sqlite_plugin_get_size (void *cls)
401 {
402   struct Plugin *plugin = cls;
403   return plugin->payload;
404 }
405
406
407 /**
408  * Delete the database entry with the given
409  * row identifier.
410  *
411  * @param plugin the plugin context (state for this module)
412  * @param rid the ID of the row to delete
413  */
414 static int
415 delete_by_rowid (struct Plugin* plugin, 
416                  unsigned long long rid)
417 {
418   sqlite3_stmt *stmt;
419
420   if (sq_prepare (plugin->dbh,
421                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
422     {
423       LOG_SQLITE (plugin, NULL,
424                   GNUNET_ERROR_TYPE_ERROR |
425                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
426       return GNUNET_SYSERR;
427     }
428   sqlite3_bind_int64 (stmt, 1, rid);
429   if (SQLITE_DONE != sqlite3_step (stmt))
430     {
431       LOG_SQLITE (plugin, NULL,
432                   GNUNET_ERROR_TYPE_ERROR |
433                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
434       sqlite3_finalize (stmt);
435       return GNUNET_SYSERR;
436     }
437   sqlite3_finalize (stmt);
438   return GNUNET_OK;
439 }
440
441
442 /**
443  * Context for the universal iterator.
444  */
445 struct NextContext;
446
447 /**
448  * Type of a function that will prepare
449  * the next iteration.
450  *
451  * @param cls closure
452  * @param nc the next context; NULL for the last
453  *         call which gives the callback a chance to
454  *         clean up the closure
455  * @return GNUNET_OK on success, GNUNET_NO if there are
456  *         no more values, GNUNET_SYSERR on error
457  */
458 typedef int (*PrepareFunction)(void *cls,
459                                struct NextContext *nc);
460
461
462 /**
463  * Context we keep for the "next request" callback.
464  */
465 struct NextContext
466 {
467   /**
468    * Internal state.
469    */ 
470   struct Plugin *plugin;
471
472   /**
473    * Function to call on the next value.
474    */
475   PluginIterator iter;
476
477   /**
478    * Closure for iter.
479    */
480   void *iter_cls;
481
482   /**
483    * Function to call to prepare the next
484    * iteration.
485    */
486   PrepareFunction prep;
487
488   /**
489    * Closure for prep.
490    */
491   void *prep_cls;
492
493   /**
494    * Statement that the iterator will get the data
495    * from (updated or set by prep).
496    */ 
497   sqlite3_stmt *stmt;
498
499   /**
500    * Row ID of the last result.
501    */
502   unsigned long long last_rowid;
503
504   /**
505    * Key of the last result.
506    */
507   GNUNET_HashCode lastKey;  
508
509   /**
510    * Expiration time of the last value visited.
511    */
512   struct GNUNET_TIME_Absolute lastExpiration;
513
514   /**
515    * Priority of the last value visited.
516    */ 
517   unsigned int lastPriority; 
518
519   /**
520    * Number of results processed so far.
521    */
522   unsigned int count;
523
524   /**
525    * Set to GNUNET_YES if we must stop now.
526    */
527   int end_it;
528 };
529
530
531 /**
532  * Continuation of "sqlite_next_request".
533  *
534  * @param cls the next context
535  * @param tc the task context (unused)
536  */
537 static void 
538 sqlite_next_request_cont (void *cls,
539                           const struct GNUNET_SCHEDULER_TaskContext *tc)
540 {
541   struct NextContext * nc = cls;
542   struct Plugin *plugin;
543   unsigned long long rowid;
544   sqlite3_stmt *stmtd;
545   int ret;
546   unsigned int type;
547   unsigned int size;
548   unsigned int priority;
549   unsigned int anonymity;
550   struct GNUNET_TIME_Absolute expiration;
551   const GNUNET_HashCode *key;
552   const void *data;
553   
554   plugin = nc->plugin;
555   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
556   plugin->next_task_nc = NULL;
557   if ( (GNUNET_YES == nc->end_it) ||
558        (GNUNET_OK != (nc->prep(nc->prep_cls,
559                                nc))) )
560     {
561     END:
562       nc->iter (nc->iter_cls, 
563                 NULL, NULL, 0, NULL, 0, 0, 0, 
564                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
565       nc->prep (nc->prep_cls, NULL);
566       GNUNET_free (nc);
567       return;
568     }
569
570   rowid = sqlite3_column_int64 (nc->stmt, 7);
571   nc->last_rowid = rowid;
572   type = sqlite3_column_int (nc->stmt, 1);
573   size = sqlite3_column_bytes (nc->stmt, 6);
574   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
575     {
576       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
577                        "sqlite",
578                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
579       if (SQLITE_OK != sqlite3_reset (nc->stmt))
580         LOG_SQLITE (nc->plugin, NULL,
581                     GNUNET_ERROR_TYPE_ERROR |
582                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
583       if (sq_prepare
584           (nc->plugin->dbh,
585            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
586            &stmtd) != SQLITE_OK)
587         {
588           LOG_SQLITE (nc->plugin, NULL,
589                       GNUNET_ERROR_TYPE_ERROR |
590                       GNUNET_ERROR_TYPE_BULK, 
591                       "sq_prepare");
592           goto END;
593         }
594
595       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
596         LOG_SQLITE (nc->plugin, NULL,
597                     GNUNET_ERROR_TYPE_ERROR |
598                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
599       if (SQLITE_DONE != sqlite3_step (stmtd))
600         LOG_SQLITE (nc->plugin, NULL,
601                     GNUNET_ERROR_TYPE_ERROR |
602                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
603       if (SQLITE_OK != sqlite3_finalize (stmtd))
604         LOG_SQLITE (nc->plugin, NULL,
605                     GNUNET_ERROR_TYPE_ERROR |
606                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
607       goto END;
608     }
609
610   priority = sqlite3_column_int (nc->stmt, 2);
611   anonymity = sqlite3_column_int (nc->stmt, 3);
612   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
613   key = sqlite3_column_blob (nc->stmt, 5);
614   nc->lastPriority = priority;
615   nc->lastExpiration = expiration;
616   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
617   data = sqlite3_column_blob (nc->stmt, 6);
618   nc->count++;
619   ret = nc->iter (nc->iter_cls,
620                   nc,
621                   key,
622                   size,
623                   data, 
624                   type,
625                   priority,
626                   anonymity,
627                   expiration,
628                   rowid);
629   if (ret == GNUNET_SYSERR)
630     {
631       nc->end_it = GNUNET_YES;
632       return;
633     }
634 #if DEBUG_SQLITE
635   if (ret == GNUNET_NO)
636     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
637                      "sqlite",
638                      "Asked to remove entry %llu (%u bytes)\n",
639                      (unsigned long long) rowid,
640                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
641 #endif
642   if ( (ret == GNUNET_NO) &&
643        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
644     {
645       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
646       plugin->lastSync++; 
647 #if DEBUG_SQLITE
648       if (ret == GNUNET_NO)
649         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
650                          "sqlite",
651                          "Removed entry %llu (%u bytes), new payload is %llu\n",
652                          (unsigned long long) rowid,
653                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
654                          (unsigned long long) plugin->payload);
655 #endif
656       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
657         sync_stats (plugin);
658     }
659 }
660
661
662 /**
663  * Function invoked on behalf of a "PluginIterator"
664  * asking the database plugin to call the iterator
665  * with the next item.
666  *
667  * @param next_cls whatever argument was given
668  *        to the PluginIterator as "next_cls".
669  * @param end_it set to GNUNET_YES if we
670  *        should terminate the iteration early
671  *        (iterator should be still called once more
672  *         to signal the end of the iteration).
673  */
674 static void 
675 sqlite_next_request (void *next_cls,
676                      int end_it)
677 {
678   struct NextContext * nc= next_cls;
679
680   if (GNUNET_YES == end_it)
681     nc->end_it = GNUNET_YES;
682   nc->plugin->next_task_nc = nc;
683   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (nc->plugin->env->sched,
684                                                     &sqlite_next_request_cont,
685                                                     nc);
686 }
687
688
689
690 /**
691  * Store an item in the datastore.
692  *
693  * @param cls closure
694  * @param key key for the item
695  * @param size number of bytes in data
696  * @param data content stored
697  * @param type type of the content
698  * @param priority priority of the content
699  * @param anonymity anonymity-level for the content
700  * @param expiration expiration time for the content
701  * @param msg set to an error message
702  * @return GNUNET_OK on success
703  */
704 static int
705 sqlite_plugin_put (void *cls,
706                    const GNUNET_HashCode * key,
707                    uint32_t size,
708                    const void *data,
709                    uint32_t type,
710                    uint32_t priority,
711                    uint32_t anonymity,
712                    struct GNUNET_TIME_Absolute expiration,
713                    char ** msg)
714 {
715   struct Plugin *plugin = cls;
716   int n;
717   sqlite3_stmt *stmt;
718   GNUNET_HashCode vhash;
719
720 #if DEBUG_SQLITE
721   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
722                    "sqlite",
723                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
724                    type, 
725                    GNUNET_h2s(key),
726                    priority,
727                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
728                    (long long) expiration.value);
729 #endif
730   GNUNET_CRYPTO_hash (data, size, &vhash);
731   stmt = plugin->insertContent;
732   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
733       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
734       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
735       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
736       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
737       (SQLITE_OK !=
738        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
739                           SQLITE_TRANSIENT)) ||
740       (SQLITE_OK !=
741        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
742                           SQLITE_TRANSIENT))
743       || (SQLITE_OK !=
744           sqlite3_bind_blob (stmt, 8, data, size,
745                              SQLITE_TRANSIENT)))
746     {
747       LOG_SQLITE (plugin,
748                   msg,
749                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
750       if (SQLITE_OK != sqlite3_reset (stmt))
751         LOG_SQLITE (plugin, NULL,
752                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
753       return GNUNET_SYSERR;
754     }
755   n = sqlite3_step (stmt);
756   if (n != SQLITE_DONE)
757     {
758       if (n == SQLITE_BUSY)
759         {
760           LOG_SQLITE (plugin, msg,
761                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
762           sqlite3_reset (stmt);
763           GNUNET_break (0);
764           return GNUNET_NO;
765         }
766       LOG_SQLITE (plugin, msg,
767                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
768       sqlite3_reset (stmt);
769       return GNUNET_SYSERR;
770     }
771   if (SQLITE_OK != sqlite3_reset (stmt))
772     LOG_SQLITE (plugin, NULL,
773                 GNUNET_ERROR_TYPE_ERROR |
774                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
775   plugin->lastSync++;
776   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
777 #if DEBUG_SQLITE
778   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
779                    "sqlite",
780                    "Stored new entry (%u bytes), new payload is %llu\n",
781                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
782                    (unsigned long long) plugin->payload);
783 #endif
784   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
785     sync_stats (plugin);
786   return GNUNET_OK;
787 }
788
789
790 /**
791  * Update the priority for a particular key in the datastore.  If
792  * the expiration time in value is different than the time found in
793  * the datastore, the higher value should be kept.  For the
794  * anonymity level, the lower value is to be used.  The specified
795  * priority should be added to the existing priority, ignoring the
796  * priority in value.
797  *
798  * Note that it is possible for multiple values to match this put.
799  * In that case, all of the respective values are updated.
800  *
801  * @param cls the plugin context (state for this module)
802  * @param uid unique identifier of the datum
803  * @param delta by how much should the priority
804  *     change?  If priority + delta < 0 the
805  *     priority should be set to 0 (never go
806  *     negative).
807  * @param expire new expiration time should be the
808  *     MAX of any existing expiration time and
809  *     this value
810  * @param msg set to an error message
811  * @return GNUNET_OK on success
812  */
813 static int
814 sqlite_plugin_update (void *cls,
815                       uint64_t uid,
816                       int delta, struct GNUNET_TIME_Absolute expire,
817                       char **msg)
818 {
819   struct Plugin *plugin = cls;
820   int n;
821
822   sqlite3_bind_int (plugin->updPrio, 1, delta);
823   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
824   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
825   n = sqlite3_step (plugin->updPrio);
826   if (n != SQLITE_DONE)
827     LOG_SQLITE (plugin, msg,
828                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
829                 "sqlite3_step");
830 #if DEBUG_SQLITE
831   else
832     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
833                      "sqlite",
834                      "Block updated\n");
835 #endif
836   sqlite3_reset (plugin->updPrio);
837
838   if (n == SQLITE_BUSY)
839     return GNUNET_NO;
840   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
841 }
842
843
844 /**
845  * Internal context for an iteration.
846  */
847 struct IterContext
848 {
849   /**
850    * FIXME.
851    */
852   sqlite3_stmt *stmt_1;
853
854   /**
855    * FIXME.
856    */
857   sqlite3_stmt *stmt_2;
858
859   /**
860    * FIXME.
861    */
862   int is_asc;
863
864   /**
865    * FIXME.
866    */
867   int is_prio;
868
869   /**
870    * FIXME.
871    */
872   int is_migr;
873
874   /**
875    * FIXME.
876    */
877   int limit_nonanonymous;
878
879   /**
880    * Desired type for blocks returned by this iterator.
881    */
882   uint32_t type;
883 };
884
885
886 /**
887  * Prepare our SQL query to obtain the next record from the database.
888  *
889  * @param cls our "struct IterContext"
890  * @param nc NULL to terminate the iteration, otherwise our context for
891  *           getting the next result.
892  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
893  *         GNUNET_SYSERR on error (or end of iteration)
894  */
895 static int
896 iter_next_prepare (void *cls,
897                    struct NextContext *nc)
898 {
899   struct IterContext *ic = cls;
900   struct Plugin *plugin;
901   int ret;
902
903   if (nc == NULL)
904     {
905 #if DEBUG_SQLITE
906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907                   "Asked to clean up iterator state.\n");
908 #endif
909       sqlite3_finalize (ic->stmt_1);
910       sqlite3_finalize (ic->stmt_2);
911       return GNUNET_SYSERR;
912     }
913   sqlite3_reset (ic->stmt_1);
914   sqlite3_reset (ic->stmt_2);
915   plugin = nc->plugin;
916   if (ic->is_prio)
917     {
918 #if DEBUG_SQLITE
919       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                   "Restricting to results larger than the last priority %u\n",
921                   nc->lastPriority);
922 #endif
923       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
924       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
925     }
926   else
927     {
928 #if DEBUG_SQLITE
929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
930                   "Restricting to results larger than the last expiration %llu\n",
931                   (unsigned long long) nc->lastExpiration.value);
932 #endif
933       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
934       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
935     }
936 #if DEBUG_SQLITE
937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938               "Restricting to results larger than the last key `%s'\n",
939               GNUNET_h2s(&nc->lastKey));
940 #endif
941   sqlite3_bind_blob (ic->stmt_1, 2, 
942                      &nc->lastKey, 
943                      sizeof (GNUNET_HashCode),
944                      SQLITE_TRANSIENT);
945   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
946     {      
947 #if DEBUG_SQLITE
948       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949                   "Result found using iterator 1\n");
950 #endif
951       nc->stmt = ic->stmt_1;
952       return GNUNET_OK;
953     }
954   if (ret != SQLITE_DONE)
955     {
956       LOG_SQLITE (plugin, NULL,
957                   GNUNET_ERROR_TYPE_ERROR |
958                   GNUNET_ERROR_TYPE_BULK,
959                   "sqlite3_step");
960       return GNUNET_SYSERR;
961     }
962   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
963     LOG_SQLITE (plugin, NULL,
964                 GNUNET_ERROR_TYPE_ERROR | 
965                 GNUNET_ERROR_TYPE_BULK, 
966                 "sqlite3_reset");
967   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
968     {
969 #if DEBUG_SQLITE
970       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971                   "Result found using iterator 2\n");
972 #endif
973       nc->stmt = ic->stmt_2;
974       return GNUNET_OK;
975     }
976   if (ret != SQLITE_DONE)
977     {
978       LOG_SQLITE (plugin, NULL,
979                   GNUNET_ERROR_TYPE_ERROR |
980                   GNUNET_ERROR_TYPE_BULK,
981                   "sqlite3_step");
982       return GNUNET_SYSERR;
983     }
984   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
985     LOG_SQLITE (plugin, NULL,
986                 GNUNET_ERROR_TYPE_ERROR |
987                 GNUNET_ERROR_TYPE_BULK,
988                 "sqlite3_reset");
989 #if DEBUG_SQLITE
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "No result found using either iterator\n");
992 #endif
993   return GNUNET_NO;
994 }
995
996
997 /**
998  * Call a method for each key in the database and
999  * call the callback method on it.
1000  *
1001  * @param plugin our plugin context
1002  * @param type entries of which type should be considered?
1003  * @param is_asc are we iterating in ascending order?
1004  * @param is_prio are we iterating by priority (otherwise by expiration)
1005  * @param is_migr are we iterating in migration order?
1006  * @param limit_nonanonymous are we restricting results to those with anonymity
1007  *              level zero?
1008  * @param stmt_str_1 first SQL statement to execute
1009  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1010  * @param iter function to call on each matching value;
1011  *        will be called once with a NULL value at the end
1012  * @param iter_cls closure for iter
1013  */
1014 static void
1015 basic_iter (struct Plugin *plugin,
1016             uint32_t type,
1017             int is_asc,
1018             int is_prio,
1019             int is_migr,
1020             int limit_nonanonymous,
1021             const char *stmt_str_1,
1022             const char *stmt_str_2,
1023             PluginIterator iter,
1024             void *iter_cls)
1025 {
1026   struct NextContext *nc;
1027   struct IterContext *ic;
1028   sqlite3_stmt *stmt_1;
1029   sqlite3_stmt *stmt_2;
1030
1031 #if DEBUG_SQLITE
1032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033               "At %llu, using queries `%s' and `%s'\n",
1034               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1035               stmt_str_1,
1036               stmt_str_2);
1037 #endif
1038   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1039     {
1040       LOG_SQLITE (plugin, NULL,
1041                   GNUNET_ERROR_TYPE_ERROR |
1042                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1043       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1044       return;
1045     }
1046   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1047     {
1048       LOG_SQLITE (plugin, NULL,
1049                   GNUNET_ERROR_TYPE_ERROR |
1050                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1051       sqlite3_finalize (stmt_1);
1052       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1053       return;
1054     }
1055   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1056                       sizeof(struct IterContext));
1057   nc->plugin = plugin;
1058   nc->iter = iter;
1059   nc->iter_cls = iter_cls;
1060   nc->stmt = NULL;
1061   ic = (struct IterContext*) &nc[1];
1062   ic->stmt_1 = stmt_1;
1063   ic->stmt_2 = stmt_2;
1064   ic->type = type;
1065   ic->is_asc = is_asc;
1066   ic->is_prio = is_prio;
1067   ic->is_migr = is_migr;
1068   ic->limit_nonanonymous = limit_nonanonymous;
1069   nc->prep = &iter_next_prepare;
1070   nc->prep_cls = ic;
1071   if (is_asc)
1072     {
1073       nc->lastPriority = 0;
1074       nc->lastExpiration.value = 0;
1075       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1076     }
1077   else
1078     {
1079       nc->lastPriority = 0x7FFFFFFF;
1080       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1081       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1082     }
1083   sqlite_next_request (nc, GNUNET_NO);
1084 }
1085
1086
1087 /**
1088  * Select a subset of the items in the datastore and call
1089  * the given iterator for each of them.
1090  *
1091  * @param cls our plugin context
1092  * @param type entries of which type should be considered?
1093  *        Use 0 for any type.
1094  * @param iter function to call on each matching value;
1095  *        will be called once with a NULL value at the end
1096  * @param iter_cls closure for iter
1097  */
1098 static void
1099 sqlite_plugin_iter_low_priority (void *cls,
1100                                  uint32_t type,
1101                                  PluginIterator iter,
1102                                  void *iter_cls)
1103 {
1104   basic_iter (cls,
1105               type, 
1106               GNUNET_YES, GNUNET_YES, 
1107               GNUNET_NO, GNUNET_NO,
1108               SELECT_IT_LOW_PRIORITY_1,
1109               SELECT_IT_LOW_PRIORITY_2, 
1110               iter, iter_cls);
1111 }
1112
1113
1114 /**
1115  * Select a subset of the items in the datastore and call
1116  * the given iterator for each of them.
1117  *
1118  * @param cls our plugin context
1119  * @param type entries of which type should be considered?
1120  *        Use 0 for any type.
1121  * @param iter function to call on each matching value;
1122  *        will be called once with a NULL value at the end
1123  * @param iter_cls closure for iter
1124  */
1125 static void
1126 sqlite_plugin_iter_zero_anonymity (void *cls,
1127                                    uint32_t type,
1128                                    PluginIterator iter,
1129                                    void *iter_cls)
1130 {
1131   struct GNUNET_TIME_Absolute now;
1132   char *q1;
1133   char *q2;
1134
1135   now = GNUNET_TIME_absolute_get ();
1136   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1137                    (unsigned long long) now.value);
1138   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1139                    (unsigned long long) now.value);
1140   basic_iter (cls,
1141               type, 
1142               GNUNET_NO, GNUNET_YES, 
1143               GNUNET_NO, GNUNET_YES,
1144               q1,
1145               q2,
1146               iter, iter_cls);
1147   GNUNET_free (q1);
1148   GNUNET_free (q2);
1149 }
1150
1151
1152
1153 /**
1154  * Select a subset of the items in the datastore and call
1155  * the given iterator for each of them.
1156  *
1157  * @param cls our plugin context
1158  * @param type entries of which type should be considered?
1159  *        Use 0 for any type.
1160  * @param iter function to call on each matching value;
1161  *        will be called once with a NULL value at the end
1162  * @param iter_cls closure for iter
1163  */
1164 static void
1165 sqlite_plugin_iter_ascending_expiration (void *cls,
1166                                          uint32_t type,
1167                                          PluginIterator iter,
1168                                          void *iter_cls)
1169 {
1170   struct GNUNET_TIME_Absolute now;
1171   char *q1;
1172   char *q2;
1173
1174   now = GNUNET_TIME_absolute_get ();
1175   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1176                    (unsigned long long) 0*now.value);
1177   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1178                    (unsigned long long) 0*now.value);
1179   basic_iter (cls,
1180               type, 
1181               GNUNET_YES, GNUNET_NO, 
1182               GNUNET_NO, GNUNET_NO,
1183               q1, q2,
1184               iter, iter_cls);
1185   GNUNET_free (q1);
1186   GNUNET_free (q2);
1187 }
1188
1189
1190 /**
1191  * Select a subset of the items in the datastore and call
1192  * the given iterator for each of them.
1193  *
1194  * @param cls our plugin context
1195  * @param type entries of which type should be considered?
1196  *        Use 0 for any type.
1197  * @param iter function to call on each matching value;
1198  *        will be called once with a NULL value at the end
1199  * @param iter_cls closure for iter
1200  */
1201 static void
1202 sqlite_plugin_iter_migration_order (void *cls,
1203                                     uint32_t type,
1204                                     PluginIterator iter,
1205                                     void *iter_cls)
1206 {
1207   struct GNUNET_TIME_Absolute now;
1208   char *q;
1209
1210   now = GNUNET_TIME_absolute_get ();
1211   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1212                    (unsigned long long) now.value);
1213   basic_iter (cls,
1214               type, 
1215               GNUNET_NO, GNUNET_NO, 
1216               GNUNET_YES, GNUNET_NO,
1217               SELECT_IT_MIGRATION_ORDER_1,
1218               q,
1219               iter, iter_cls);
1220   GNUNET_free (q);
1221 }
1222
1223
1224 /**
1225  * Call sqlite using the already prepared query to get
1226  * the next result.
1227  *
1228  * @param cls not used
1229  * @param nc context with the prepared query
1230  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1231  *        there are no more results 
1232  */
1233 static int
1234 all_next_prepare (void *cls,
1235                   struct NextContext *nc)
1236 {
1237   struct Plugin *plugin;
1238   int ret;
1239
1240   if (nc == NULL)
1241     {
1242 #if DEBUG_SQLITE
1243       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244                   "Asked to clean up iterator state.\n");
1245 #endif
1246       return GNUNET_SYSERR;
1247     }
1248   plugin = nc->plugin;
1249   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1250     {      
1251       return GNUNET_OK;
1252     }
1253   if (ret != SQLITE_DONE)
1254     {
1255       LOG_SQLITE (plugin, NULL,
1256                   GNUNET_ERROR_TYPE_ERROR |
1257                   GNUNET_ERROR_TYPE_BULK,
1258                   "sqlite3_step");
1259       return GNUNET_SYSERR;
1260     }
1261   return GNUNET_NO;
1262 }
1263
1264
1265 /**
1266  * Select a subset of the items in the datastore and call
1267  * the given iterator for each of them.
1268  *
1269  * @param cls our plugin context
1270  * @param type entries of which type should be considered?
1271  *        Use 0 for any type.
1272  * @param iter function to call on each matching value;
1273  *        will be called once with a NULL value at the end
1274  * @param iter_cls closure for iter
1275  */
1276 static void
1277 sqlite_plugin_iter_all_now (void *cls,
1278                             uint32_t type,
1279                             PluginIterator iter,
1280                             void *iter_cls)
1281 {
1282   struct Plugin *plugin = cls;
1283   struct NextContext *nc;
1284   sqlite3_stmt *stmt;
1285
1286   if (sq_prepare (plugin->dbh, 
1287                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1288                   &stmt) != SQLITE_OK)
1289     {
1290       LOG_SQLITE (plugin, NULL,
1291                   GNUNET_ERROR_TYPE_ERROR |
1292                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1293       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1294       return;
1295     }
1296   nc = GNUNET_malloc (sizeof(struct NextContext));
1297   nc->plugin = plugin;
1298   nc->iter = iter;
1299   nc->iter_cls = iter_cls;
1300   nc->stmt = stmt;
1301   nc->prep = &all_next_prepare;
1302   nc->prep_cls = NULL;
1303   sqlite_next_request (nc, GNUNET_NO);
1304 }
1305
1306
1307 /**
1308  * FIXME.
1309  */
1310 struct GetNextContext
1311 {
1312
1313   /**
1314    * FIXME.
1315    */
1316   int total;
1317
1318   /**
1319    * FIXME.
1320    */
1321   int off;
1322
1323   /**
1324    * FIXME.
1325    */
1326   int have_vhash;
1327
1328   /**
1329    * FIXME.
1330    */
1331   unsigned int type;
1332
1333   /**
1334    * FIXME.
1335    */
1336   sqlite3_stmt *stmt;
1337
1338   /**
1339    * FIXME.
1340    */
1341   GNUNET_HashCode key;
1342
1343   /**
1344    * FIXME.
1345    */
1346   GNUNET_HashCode vhash;
1347 };
1348
1349
1350
1351 /**
1352  * FIXME.
1353  *
1354  * @param cls our "struct GetNextContext*"
1355  * @param nc FIXME
1356  * @return GNUNET_YES if there are more results, 
1357  *         GNUNET_NO if there are no more results,
1358  *         GNUNET_SYSERR on internal error
1359  */
1360 static int
1361 get_next_prepare (void *cls,
1362                   struct NextContext *nc)
1363 {
1364   struct GetNextContext *gnc = cls;
1365   int sqoff;
1366   int ret;
1367   int limit_off;
1368
1369   if (nc == NULL)
1370     {
1371       sqlite3_finalize (gnc->stmt);
1372       return GNUNET_SYSERR;
1373     }
1374   if (nc->count == gnc->total)
1375     return GNUNET_NO;
1376   if (nc->count + gnc->off == gnc->total)
1377     nc->last_rowid = 0;
1378   if (nc->count == 0)
1379     limit_off = gnc->off;
1380   else
1381     limit_off = 0;
1382   sqoff = 1;
1383   sqlite3_reset (nc->stmt);
1384   ret = sqlite3_bind_blob (nc->stmt,
1385                            sqoff++,
1386                            &gnc->key, 
1387                            sizeof (GNUNET_HashCode),
1388                            SQLITE_TRANSIENT);
1389   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1390     ret = sqlite3_bind_blob (nc->stmt,
1391                              sqoff++,
1392                              &gnc->vhash,
1393                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1394   if ((gnc->type != 0) && (ret == SQLITE_OK))
1395     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1396   if (ret == SQLITE_OK)
1397     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1398   if (ret == SQLITE_OK)
1399     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1400   if (ret != SQLITE_OK)
1401     return GNUNET_SYSERR;
1402   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1403     return GNUNET_NO;
1404   return GNUNET_OK;
1405 }
1406
1407
1408 /**
1409  * Iterate over the results for a particular key
1410  * in the datastore.
1411  *
1412  * @param cls closure
1413  * @param key maybe NULL (to match all entries)
1414  * @param vhash hash of the value, maybe NULL (to
1415  *        match all values that have the right key).
1416  *        Note that for DBlocks there is no difference
1417  *        betwen key and vhash, but for other blocks
1418  *        there may be!
1419  * @param type entries of which type are relevant?
1420  *     Use 0 for any type.
1421  * @param iter function to call on each matching value;
1422  *        will be called once with a NULL value at the end
1423  * @param iter_cls closure for iter
1424  */
1425 static void
1426 sqlite_plugin_get (void *cls,
1427                    const GNUNET_HashCode * key,
1428                    const GNUNET_HashCode * vhash,
1429                    uint32_t type,
1430                    PluginIterator iter, void *iter_cls)
1431 {
1432   struct Plugin *plugin = cls;
1433   struct GetNextContext *gpc;
1434   struct NextContext *nc;
1435   int ret;
1436   int total;
1437   sqlite3_stmt *stmt;
1438   char scratch[256];
1439   int sqoff;
1440
1441   GNUNET_assert (iter != NULL);
1442   if (key == NULL)
1443     {
1444       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1445       return;
1446     }
1447   GNUNET_snprintf (scratch, sizeof (scratch),
1448                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1449                    vhash == NULL ? "" : " AND vhash=:2",
1450                    type == 0 ? "" : (vhash ==
1451                                      NULL) ? " AND type=:2" : " AND type=:3");
1452   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1453     {
1454       LOG_SQLITE (plugin, NULL,
1455                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1456       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1457       return;
1458     }
1459   sqoff = 1;
1460   ret = sqlite3_bind_blob (stmt,
1461                            sqoff++,
1462                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1463   if ((vhash != NULL) && (ret == SQLITE_OK))
1464     ret = sqlite3_bind_blob (stmt,
1465                              sqoff++,
1466                              vhash,
1467                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1468   if ((type != 0) && (ret == SQLITE_OK))
1469     ret = sqlite3_bind_int (stmt, sqoff++, type);
1470   if (SQLITE_OK != ret)
1471     {
1472       LOG_SQLITE (plugin, NULL,
1473                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1474       sqlite3_reset (stmt);
1475       sqlite3_finalize (stmt);
1476       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1477       return;
1478     }
1479   ret = sqlite3_step (stmt);
1480   if (ret != SQLITE_ROW)
1481     {
1482       LOG_SQLITE (plugin, NULL,
1483                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1484                   "sqlite_step");
1485       sqlite3_reset (stmt);
1486       sqlite3_finalize (stmt);
1487       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1488       return;
1489     }
1490   total = sqlite3_column_int (stmt, 0);
1491   sqlite3_reset (stmt);
1492   sqlite3_finalize (stmt);
1493   if (0 == total)
1494     {
1495       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1496       return;
1497     }
1498
1499   GNUNET_snprintf (scratch, sizeof (scratch),
1500                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1501                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1502                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1503                    vhash == NULL ? "" : " AND vhash=:2",
1504                    type == 0 ? "" : (vhash ==
1505                                      NULL) ? " AND type=:2" : " AND type=:3",
1506                    sqoff, sqoff + 1);
1507   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1508     {
1509       LOG_SQLITE (plugin, NULL,
1510                   GNUNET_ERROR_TYPE_ERROR |
1511                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1512       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1513       return;
1514     }
1515   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1516                       sizeof(struct GetNextContext));
1517   nc->plugin = plugin;
1518   nc->iter = iter;
1519   nc->iter_cls = iter_cls;
1520   nc->stmt = stmt;
1521   gpc = (struct GetNextContext*) &nc[1];
1522   gpc->total = total;
1523   gpc->type = type;
1524   gpc->key = *key;
1525   gpc->stmt = stmt; /* alias used for freeing at the end! */
1526   if (NULL != vhash)
1527     {
1528       gpc->have_vhash = GNUNET_YES;
1529       gpc->vhash = *vhash;
1530     }
1531   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1532   nc->prep = &get_next_prepare;
1533   nc->prep_cls = gpc;
1534   sqlite_next_request (nc, GNUNET_NO);
1535 }
1536
1537
1538 /**
1539  * Drop database.
1540  *
1541  * @param cls our plugin context
1542  */
1543 static void 
1544 sqlite_plugin_drop (void *cls)
1545 {
1546   struct Plugin *plugin = cls;
1547   plugin->drop_on_shutdown = GNUNET_YES;
1548 }
1549
1550
1551 /**
1552  * Callback function to process statistic values.
1553  *
1554  * @param cls closure
1555  * @param subsystem name of subsystem that created the statistic
1556  * @param name the name of the datum
1557  * @param value the current value
1558  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1559  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1560  */
1561 static int
1562 process_stat_in (void *cls,
1563                  const char *subsystem,
1564                  const char *name,
1565                  uint64_t value,
1566                  int is_persistent)
1567 {
1568   struct Plugin *plugin = cls;
1569   plugin->payload += value;
1570 #if DEBUG_SQLITE
1571   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1572                    "sqlite",
1573                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1574                    value,
1575                    (unsigned long long) plugin->payload);
1576 #endif
1577   return GNUNET_OK;
1578 }
1579
1580
1581 static void
1582 process_stat_done (void *cls,
1583                    int success)
1584 {
1585   struct Plugin *plugin = cls;
1586   plugin->stat_get = NULL;
1587 }
1588                                          
1589
1590 /**
1591  * Entry point for the plugin.
1592  *
1593  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1594  * @return NULL on error, othrewise the plugin context
1595  */
1596 void *
1597 libgnunet_plugin_datastore_sqlite_init (void *cls)
1598 {
1599   static struct Plugin plugin;
1600   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1601   struct GNUNET_DATASTORE_PluginFunctions *api;
1602
1603   if (plugin.env != NULL)
1604     return NULL; /* can only initialize once! */
1605   memset (&plugin, 0, sizeof(struct Plugin));
1606   plugin.env = env;
1607   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1608                                                 "sqlite",
1609                                                 env->cfg);
1610   plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
1611                                            "sqlite",
1612                                            QUOTA_STAT_NAME,
1613                                            GNUNET_TIME_UNIT_MINUTES,
1614                                            &process_stat_done,
1615                                            &process_stat_in,
1616                                            &plugin);
1617   if (GNUNET_OK !=
1618       database_setup (env->cfg, &plugin))
1619     {
1620       database_shutdown (&plugin);
1621       return NULL;
1622     }
1623   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1624   api->cls = &plugin;
1625   api->get_size = &sqlite_plugin_get_size;
1626   api->put = &sqlite_plugin_put;
1627   api->next_request = &sqlite_next_request;
1628   api->get = &sqlite_plugin_get;
1629   api->update = &sqlite_plugin_update;
1630   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1631   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1632   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1633   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1634   api->iter_all_now = &sqlite_plugin_iter_all_now;
1635   api->drop = &sqlite_plugin_drop;
1636   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1637                    "sqlite", _("Sqlite database running\n"));
1638   return api;
1639 }
1640
1641
1642 /**
1643  * Exit point from the plugin.
1644  *
1645  * @param cls the plugin context (as returned by "init")
1646  * @return always NULL
1647  */
1648 void *
1649 libgnunet_plugin_datastore_sqlite_done (void *cls)
1650 {
1651   char *fn;
1652   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1653   struct Plugin *plugin = api->cls;
1654
1655   if (plugin->stat_get != NULL)
1656     {
1657       GNUNET_STATISTICS_get_cancel (plugin->stat_get);
1658       plugin->stat_get = NULL;
1659     }
1660   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1661     {
1662       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1663                                plugin->next_task);
1664       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1665       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1666       GNUNET_free (plugin->next_task_nc);
1667       plugin->next_task_nc = NULL;
1668     }
1669   fn = NULL;
1670   if (plugin->drop_on_shutdown)
1671     fn = GNUNET_strdup (plugin->fn);
1672   database_shutdown (plugin);
1673   GNUNET_STATISTICS_destroy (plugin->statistics,
1674                              GNUNET_NO);
1675   plugin->env = NULL; 
1676   plugin->payload = 0;
1677   GNUNET_free (api);
1678   if (fn != NULL)
1679     {
1680       if (0 != UNLINK(fn))
1681         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1682                                   "unlink",
1683                                   fn);
1684       GNUNET_free (fn);
1685     }
1686   return NULL;
1687 }
1688
1689 /* end of plugin_datastore_sqlite.c */