enable stats get cancellation
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_YES
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94
95 /**
96  * Context for all functions in this plugin.
97  */
98 struct Plugin 
99 {
100   /**
101    * Our execution environment.
102    */
103   struct GNUNET_DATASTORE_PluginEnvironment *env;
104
105   /**
106    * Database filename.
107    */
108   char *fn;
109
110   /**
111    * Native SQLite database handle.
112    */
113   sqlite3 *dbh;
114
115   /**
116    * Precompiled SQL for update.
117    */
118   sqlite3_stmt *updPrio;
119
120   /**
121    * Precompiled SQL for insertion.
122    */
123   sqlite3_stmt *insertContent;
124
125   /**
126    * Handle to the statistics service.
127    */
128   struct GNUNET_STATISTICS_Handle *statistics;
129
130   /**
131    * Handle for pending get request.
132    */
133   struct GNUNET_STATISTICS_GetHandle *stat_get;
134   
135   /**
136    * How much data are we currently storing
137    * in the database?
138    */
139   unsigned long long payload;
140
141   /**
142    * Number of updates that were made to the
143    * payload value since we last synchronized
144    * it with the statistics service.
145    */
146   unsigned int lastSync;
147
148   /**
149    * Should the database be dropped on shutdown?
150    */
151   int drop_on_shutdown;
152 };
153
154
155 /**
156  * @brief Prepare a SQL statement
157  *
158  * @param dbh handle to the database
159  * @param zSql SQL statement, UTF-8 encoded
160  * @param ppStmt set to the prepared statement
161  * @return 0 on success
162  */
163 static int
164 sq_prepare (sqlite3 * dbh, const char *zSql,
165             sqlite3_stmt ** ppStmt)
166 {
167   char *dummy;
168   return sqlite3_prepare (dbh,
169                           zSql,
170                           strlen (zSql), ppStmt, (const char **) &dummy);
171 }
172
173
174 /**
175  * Create our database indices.
176  * 
177  * @param dbh handle to the database
178  */
179 static void
180 create_indices (sqlite3 * dbh)
181 {
182   /* create indices */
183   sqlite3_exec (dbh,
184                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
185   sqlite3_exec (dbh,
186                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
187                 NULL, NULL);
188   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
189                 NULL);
190   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
191                 NULL);
192   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
193                 NULL, NULL);
194   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
195                 NULL, NULL, NULL);
196   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
197                 NULL, NULL);
198 }
199
200
201
202 #if 1
203 #define CHECK(a) GNUNET_break(a)
204 #define ENULL NULL
205 #else
206 #define ENULL &e
207 #define ENULL_DEFINED 1
208 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
209 #endif
210
211
212
213
214 /**
215  * Initialize the database connections and associated
216  * data structures (create tables and indices
217  * as needed as well).
218  *
219  * @param cfg our configuration
220  * @param plugin the plugin context (state for this module)
221  * @return GNUNET_OK on success
222  */
223 static int
224 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
225                 struct Plugin *plugin)
226 {
227   sqlite3_stmt *stmt;
228   char *afsdir;
229 #if ENULL_DEFINED
230   char *e;
231 #endif
232   
233   if (GNUNET_OK != 
234       GNUNET_CONFIGURATION_get_value_filename (cfg,
235                                                "datastore-sqlite",
236                                                "FILENAME",
237                                                &afsdir))
238     {
239       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
240                        "sqlite",
241                        _("Option `%s' in section `%s' missing in configuration!\n"),
242                        "FILENAME",
243                        "datastore-sqlite");
244       return GNUNET_SYSERR;
245     }
246   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
247     {
248       GNUNET_break (0);
249       GNUNET_free (afsdir);
250       return GNUNET_SYSERR;
251     }
252   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
253 #ifdef ENABLE_NLS
254                                               nl_langinfo (CODESET)
255 #else
256                                               "UTF-8"   /* good luck */
257 #endif
258                                               );
259   GNUNET_free (afsdir);
260   
261   /* Open database and precompile statements */
262   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
263     {
264       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
265                        "sqlite",
266                        _("Unable to initialize SQLite: %s.\n"),
267                        sqlite3_errmsg (plugin->dbh));
268       return GNUNET_SYSERR;
269     }
270   CHECK (SQLITE_OK ==
271          sqlite3_exec (plugin->dbh,
272                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
273   CHECK (SQLITE_OK ==
274          sqlite3_exec (plugin->dbh,
275                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
276   CHECK (SQLITE_OK ==
277          sqlite3_exec (plugin->dbh,
278                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
279   CHECK (SQLITE_OK ==
280          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
281
282   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
283
284
285   /* We have to do it here, because otherwise precompiling SQL might fail */
286   CHECK (SQLITE_OK ==
287          sq_prepare (plugin->dbh,
288                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
289                      &stmt));
290   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
291        (sqlite3_exec (plugin->dbh,
292                       "CREATE TABLE gn080 ("
293                       "  size INT4 NOT NULL DEFAULT 0,"
294                       "  type INT4 NOT NULL DEFAULT 0,"
295                       "  prio INT4 NOT NULL DEFAULT 0,"
296                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
297                       "  expire INT8 NOT NULL DEFAULT 0,"
298                       "  hash TEXT NOT NULL DEFAULT '',"
299                       "  vhash TEXT NOT NULL DEFAULT '',"
300                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
301                       NULL) != SQLITE_OK) )
302     {
303       LOG_SQLITE (plugin, NULL,
304                   GNUNET_ERROR_TYPE_ERROR, 
305                   "sqlite3_exec");
306       sqlite3_finalize (stmt);
307       return GNUNET_SYSERR;
308     }
309   sqlite3_finalize (stmt);
310   create_indices (plugin->dbh);
311
312   CHECK (SQLITE_OK ==
313          sq_prepare (plugin->dbh,
314                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
315                      &stmt));
316   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
317        (sqlite3_exec (plugin->dbh,
318                       "CREATE TABLE gn071 ("
319                       "  key TEXT NOT NULL DEFAULT '',"
320                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
321                       NULL) != SQLITE_OK) )
322     {
323       LOG_SQLITE (plugin, NULL,
324                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
325       sqlite3_finalize (stmt);
326       return GNUNET_SYSERR;
327     }
328   sqlite3_finalize (stmt);
329
330   if ((sq_prepare (plugin->dbh,
331                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
332                    "_ROWID_ = ?",
333                    &plugin->updPrio) != SQLITE_OK) ||
334       (sq_prepare (plugin->dbh,
335                    "INSERT INTO gn080 (size, type, prio, "
336                    "anonLevel, expire, hash, vhash, value) VALUES "
337                    "(?, ?, ?, ?, ?, ?, ?, ?)",
338                    &plugin->insertContent) != SQLITE_OK))
339     {
340       LOG_SQLITE (plugin, NULL,
341                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
342       return GNUNET_SYSERR;
343     }
344   return GNUNET_OK;
345 }
346
347
348 /**
349  * Synchronize our utilization statistics with the 
350  * statistics service.
351  * @param plugin the plugin context (state for this module)
352  */
353 static void 
354 sync_stats (struct Plugin *plugin)
355 {
356   GNUNET_STATISTICS_set (plugin->statistics,
357                          QUOTA_STAT_NAME,
358                          plugin->payload,
359                          GNUNET_YES);
360   plugin->lastSync = 0;
361 }
362
363
364 /**
365  * Shutdown database connection and associate data
366  * structures.
367  * @param plugin the plugin context (state for this module)
368  */
369 static void
370 database_shutdown (struct Plugin *plugin)
371 {
372   if (plugin->lastSync > 0)
373     sync_stats (plugin);
374   if (plugin->updPrio != NULL)
375     sqlite3_finalize (plugin->updPrio);
376   if (plugin->insertContent != NULL)
377     sqlite3_finalize (plugin->insertContent);
378   sqlite3_close (plugin->dbh);
379   GNUNET_free_non_null (plugin->fn);
380 }
381
382
383 /**
384  * Get an estimate of how much space the database is
385  * currently using.
386  *
387  * @param cls our plugin context
388  * @return number of bytes used on disk
389  */
390 static unsigned long long sqlite_plugin_get_size (void *cls)
391 {
392   struct Plugin *plugin = cls;
393   return plugin->payload;
394 }
395
396
397 /**
398  * Delete the database entry with the given
399  * row identifier.
400  *
401  * @param plugin the plugin context (state for this module)
402  * @param rid the ID of the row to delete
403  */
404 static int
405 delete_by_rowid (struct Plugin* plugin, 
406                  unsigned long long rid)
407 {
408   sqlite3_stmt *stmt;
409
410   if (sq_prepare (plugin->dbh,
411                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
412     {
413       LOG_SQLITE (plugin, NULL,
414                   GNUNET_ERROR_TYPE_ERROR |
415                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
416       return GNUNET_SYSERR;
417     }
418   sqlite3_bind_int64 (stmt, 1, rid);
419   if (SQLITE_DONE != sqlite3_step (stmt))
420     {
421       LOG_SQLITE (plugin, NULL,
422                   GNUNET_ERROR_TYPE_ERROR |
423                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
424       sqlite3_finalize (stmt);
425       return GNUNET_SYSERR;
426     }
427   sqlite3_finalize (stmt);
428   return GNUNET_OK;
429 }
430
431
432 /**
433  * Context for the universal iterator.
434  */
435 struct NextContext;
436
437 /**
438  * Type of a function that will prepare
439  * the next iteration.
440  *
441  * @param cls closure
442  * @param nc the next context; NULL for the last
443  *         call which gives the callback a chance to
444  *         clean up the closure
445  * @return GNUNET_OK on success, GNUNET_NO if there are
446  *         no more values, GNUNET_SYSERR on error
447  */
448 typedef int (*PrepareFunction)(void *cls,
449                                struct NextContext *nc);
450
451
452 /**
453  * Context we keep for the "next request" callback.
454  */
455 struct NextContext
456 {
457   /**
458    * Internal state.
459    */ 
460   struct Plugin *plugin;
461
462   /**
463    * Function to call on the next value.
464    */
465   PluginIterator iter;
466
467   /**
468    * Closure for iter.
469    */
470   void *iter_cls;
471
472   /**
473    * Function to call to prepare the next
474    * iteration.
475    */
476   PrepareFunction prep;
477
478   /**
479    * Closure for prep.
480    */
481   void *prep_cls;
482
483   /**
484    * Statement that the iterator will get the data
485    * from (updated or set by prep).
486    */ 
487   sqlite3_stmt *stmt;
488
489   /**
490    * Row ID of the last result.
491    */
492   unsigned long long last_rowid;
493
494   /**
495    * Key of the last result.
496    */
497   GNUNET_HashCode lastKey;  
498
499   /**
500    * Expiration time of the last value visited.
501    */
502   struct GNUNET_TIME_Absolute lastExpiration;
503
504   /**
505    * Priority of the last value visited.
506    */ 
507   unsigned int lastPriority; 
508
509   /**
510    * Number of results processed so far.
511    */
512   unsigned int count;
513
514   /**
515    * Set to GNUNET_YES if we must stop now.
516    */
517   int end_it;
518 };
519
520
521 /**
522  * Continuation of "sqlite_next_request".
523  *
524  * @param cls the next context
525  * @param tc the task context (unused)
526  */
527 static void 
528 sqlite_next_request_cont (void *cls,
529                           const struct GNUNET_SCHEDULER_TaskContext *tc)
530 {
531   struct NextContext * nc= cls;
532   struct Plugin *plugin;
533   unsigned long long rowid;
534   sqlite3_stmt *stmtd;
535   int ret;
536   unsigned int type;
537   unsigned int size;
538   unsigned int priority;
539   unsigned int anonymity;
540   struct GNUNET_TIME_Absolute expiration;
541   const GNUNET_HashCode *key;
542   const void *data;
543
544  
545   plugin = nc->plugin;
546   if ( (GNUNET_YES == nc->end_it) ||
547        (GNUNET_OK != (nc->prep(nc->prep_cls,
548                                nc))) )
549     {
550     END:
551       nc->iter (nc->iter_cls, 
552                 NULL, NULL, 0, NULL, 0, 0, 0, 
553                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
554       nc->prep (nc->prep_cls, NULL);
555       GNUNET_free (nc);
556       return;
557     }
558
559   rowid = sqlite3_column_int64 (nc->stmt, 7);
560   nc->last_rowid = rowid;
561   type = sqlite3_column_int (nc->stmt, 1);
562   size = sqlite3_column_bytes (nc->stmt, 6);
563   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
564     {
565       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
566                        "sqlite",
567                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
568       if (SQLITE_OK != sqlite3_reset (nc->stmt))
569         LOG_SQLITE (nc->plugin, NULL,
570                     GNUNET_ERROR_TYPE_ERROR |
571                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
572       if (sq_prepare
573           (nc->plugin->dbh,
574            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
575            &stmtd) != SQLITE_OK)
576         {
577           LOG_SQLITE (nc->plugin, NULL,
578                       GNUNET_ERROR_TYPE_ERROR |
579                       GNUNET_ERROR_TYPE_BULK, 
580                       "sq_prepare");
581           goto END;
582         }
583
584       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
585         LOG_SQLITE (nc->plugin, NULL,
586                     GNUNET_ERROR_TYPE_ERROR |
587                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
588       if (SQLITE_DONE != sqlite3_step (stmtd))
589         LOG_SQLITE (nc->plugin, NULL,
590                     GNUNET_ERROR_TYPE_ERROR |
591                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
592       if (SQLITE_OK != sqlite3_finalize (stmtd))
593         LOG_SQLITE (nc->plugin, NULL,
594                     GNUNET_ERROR_TYPE_ERROR |
595                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
596       goto END;
597     }
598
599   priority = sqlite3_column_int (nc->stmt, 2);
600   anonymity = sqlite3_column_int (nc->stmt, 3);
601   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
602   key = sqlite3_column_blob (nc->stmt, 5);
603   nc->lastPriority = priority;
604   nc->lastExpiration = expiration;
605   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
606   data = sqlite3_column_blob (nc->stmt, 6);
607   nc->count++;
608   ret = nc->iter (nc->iter_cls,
609                   nc,
610                   key,
611                   size,
612                   data, 
613                   type,
614                   priority,
615                   anonymity,
616                   expiration,
617                   rowid);
618   if (ret == GNUNET_SYSERR)
619     {
620       nc->end_it = GNUNET_YES;
621       return;
622     }
623 #if DEBUG_SQLITE
624   if (ret == GNUNET_NO)
625     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
626                      "sqlite",
627                      "Asked to remove entry %llu (%u bytes)\n",
628                      (unsigned long long) rowid,
629                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
630 #endif
631   if ( (ret == GNUNET_NO) &&
632        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
633     {
634       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
635       plugin->lastSync++; 
636 #if DEBUG_SQLITE
637       if (ret == GNUNET_NO)
638         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
639                          "sqlite",
640                          "Removed entry %llu (%u bytes), new payload is %llu\n",
641                          (unsigned long long) rowid,
642                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
643                          (unsigned long long) plugin->payload);
644 #endif
645       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
646         sync_stats (plugin);
647     }
648 }
649
650
651 /**
652  * Function invoked on behalf of a "PluginIterator"
653  * asking the database plugin to call the iterator
654  * with the next item.
655  *
656  * @param next_cls whatever argument was given
657  *        to the PluginIterator as "next_cls".
658  * @param end_it set to GNUNET_YES if we
659  *        should terminate the iteration early
660  *        (iterator should be still called once more
661  *         to signal the end of the iteration).
662  */
663 static void 
664 sqlite_next_request (void *next_cls,
665                      int end_it)
666 {
667   struct NextContext * nc= next_cls;
668
669   if (GNUNET_YES == end_it)
670     nc->end_it = GNUNET_YES;
671   GNUNET_SCHEDULER_add_continuation (nc->plugin->env->sched,
672                                      &sqlite_next_request_cont,
673                                      nc,
674                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
675 }
676
677
678
679 /**
680  * Store an item in the datastore.
681  *
682  * @param cls closure
683  * @param key key for the item
684  * @param size number of bytes in data
685  * @param data content stored
686  * @param type type of the content
687  * @param priority priority of the content
688  * @param anonymity anonymity-level for the content
689  * @param expiration expiration time for the content
690  * @param msg set to an error message
691  * @return GNUNET_OK on success
692  */
693 static int
694 sqlite_plugin_put (void *cls,
695                    const GNUNET_HashCode * key,
696                    uint32_t size,
697                    const void *data,
698                    uint32_t type,
699                    uint32_t priority,
700                    uint32_t anonymity,
701                    struct GNUNET_TIME_Absolute expiration,
702                    char ** msg)
703 {
704   struct Plugin *plugin = cls;
705   int n;
706   sqlite3_stmt *stmt;
707   GNUNET_HashCode vhash;
708
709 #if DEBUG_SQLITE
710   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
711                    "sqlite",
712                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
713                    type, 
714                    GNUNET_h2s(key),
715                    priority,
716                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
717                    (long long) expiration.value);
718 #endif
719   GNUNET_CRYPTO_hash (data, size, &vhash);
720   stmt = plugin->insertContent;
721   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
722       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
723       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
724       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
725       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
726       (SQLITE_OK !=
727        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
728                           SQLITE_TRANSIENT)) ||
729       (SQLITE_OK !=
730        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
731                           SQLITE_TRANSIENT))
732       || (SQLITE_OK !=
733           sqlite3_bind_blob (stmt, 8, data, size,
734                              SQLITE_TRANSIENT)))
735     {
736       LOG_SQLITE (plugin,
737                   msg,
738                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
739       if (SQLITE_OK != sqlite3_reset (stmt))
740         LOG_SQLITE (plugin, NULL,
741                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
742       return GNUNET_SYSERR;
743     }
744   n = sqlite3_step (stmt);
745   if (n != SQLITE_DONE)
746     {
747       if (n == SQLITE_BUSY)
748         {
749           LOG_SQLITE (plugin, msg,
750                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
751           sqlite3_reset (stmt);
752           GNUNET_break (0);
753           return GNUNET_NO;
754         }
755       LOG_SQLITE (plugin, msg,
756                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
757       sqlite3_reset (stmt);
758       return GNUNET_SYSERR;
759     }
760   if (SQLITE_OK != sqlite3_reset (stmt))
761     LOG_SQLITE (plugin, NULL,
762                 GNUNET_ERROR_TYPE_ERROR |
763                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
764   plugin->lastSync++;
765   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
766 #if DEBUG_SQLITE
767   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
768                    "sqlite",
769                    "Stored new entry (%u bytes), new payload is %llu\n",
770                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
771                    (unsigned long long) plugin->payload);
772 #endif
773   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
774     sync_stats (plugin);
775   return GNUNET_OK;
776 }
777
778
779 /**
780  * Update the priority for a particular key in the datastore.  If
781  * the expiration time in value is different than the time found in
782  * the datastore, the higher value should be kept.  For the
783  * anonymity level, the lower value is to be used.  The specified
784  * priority should be added to the existing priority, ignoring the
785  * priority in value.
786  *
787  * Note that it is possible for multiple values to match this put.
788  * In that case, all of the respective values are updated.
789  *
790  * @param cls the plugin context (state for this module)
791  * @param uid unique identifier of the datum
792  * @param delta by how much should the priority
793  *     change?  If priority + delta < 0 the
794  *     priority should be set to 0 (never go
795  *     negative).
796  * @param expire new expiration time should be the
797  *     MAX of any existing expiration time and
798  *     this value
799  * @param msg set to an error message
800  * @return GNUNET_OK on success
801  */
802 static int
803 sqlite_plugin_update (void *cls,
804                       uint64_t uid,
805                       int delta, struct GNUNET_TIME_Absolute expire,
806                       char **msg)
807 {
808   struct Plugin *plugin = cls;
809   int n;
810
811   sqlite3_bind_int (plugin->updPrio, 1, delta);
812   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
813   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
814   n = sqlite3_step (plugin->updPrio);
815   if (n != SQLITE_DONE)
816     LOG_SQLITE (plugin, msg,
817                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
818                 "sqlite3_step");
819 #if DEBUG_SQLITE
820   else
821     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
822                      "sqlite",
823                      "Block updated\n");
824 #endif
825   sqlite3_reset (plugin->updPrio);
826
827   if (n == SQLITE_BUSY)
828     return GNUNET_NO;
829   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
830 }
831
832
833 /**
834  * Internal context for an iteration.
835  */
836 struct IterContext
837 {
838   /**
839    * FIXME.
840    */
841   sqlite3_stmt *stmt_1;
842
843   /**
844    * FIXME.
845    */
846   sqlite3_stmt *stmt_2;
847
848   /**
849    * FIXME.
850    */
851   int is_asc;
852
853   /**
854    * FIXME.
855    */
856   int is_prio;
857
858   /**
859    * FIXME.
860    */
861   int is_migr;
862
863   /**
864    * FIXME.
865    */
866   int limit_nonanonymous;
867
868   /**
869    * Desired type for blocks returned by this iterator.
870    */
871   uint32_t type;
872 };
873
874
875 /**
876  * Prepare our SQL query to obtain the next record from the database.
877  *
878  * @param cls our "struct IterContext"
879  * @param nc NULL to terminate the iteration, otherwise our context for
880  *           getting the next result.
881  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
882  *         GNUNET_SYSERR on error (or end of iteration)
883  */
884 static int
885 iter_next_prepare (void *cls,
886                    struct NextContext *nc)
887 {
888   struct IterContext *ic = cls;
889   struct Plugin *plugin;
890   int ret;
891
892   if (nc == NULL)
893     {
894 #if DEBUG_SQLITE
895       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896                   "Asked to clean up iterator state.\n");
897 #endif
898       sqlite3_finalize (ic->stmt_1);
899       sqlite3_finalize (ic->stmt_2);
900       return GNUNET_SYSERR;
901     }
902   sqlite3_reset (ic->stmt_1);
903   sqlite3_reset (ic->stmt_2);
904   plugin = nc->plugin;
905   if (ic->is_prio)
906     {
907 #if DEBUG_SQLITE
908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909                   "Restricting to results larger than the last priority %u\n",
910                   nc->lastPriority);
911 #endif
912       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
913       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
914     }
915   else
916     {
917 #if DEBUG_SQLITE
918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                   "Restricting to results larger than the last expiration %llu\n",
920                   (unsigned long long) nc->lastExpiration.value);
921 #endif
922       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
923       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
924     }
925 #if DEBUG_SQLITE
926   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
927               "Restricting to results larger than the last key `%s'\n",
928               GNUNET_h2s(&nc->lastKey));
929 #endif
930   sqlite3_bind_blob (ic->stmt_1, 2, 
931                      &nc->lastKey, 
932                      sizeof (GNUNET_HashCode),
933                      SQLITE_TRANSIENT);
934   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
935     {      
936 #if DEBUG_SQLITE
937       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938                   "Result found using iterator 1\n");
939 #endif
940       nc->stmt = ic->stmt_1;
941       return GNUNET_OK;
942     }
943   if (ret != SQLITE_DONE)
944     {
945       LOG_SQLITE (plugin, NULL,
946                   GNUNET_ERROR_TYPE_ERROR |
947                   GNUNET_ERROR_TYPE_BULK,
948                   "sqlite3_step");
949       return GNUNET_SYSERR;
950     }
951   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
952     LOG_SQLITE (plugin, NULL,
953                 GNUNET_ERROR_TYPE_ERROR | 
954                 GNUNET_ERROR_TYPE_BULK, 
955                 "sqlite3_reset");
956   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
957     {
958 #if DEBUG_SQLITE
959       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
960                   "Result found using iterator 2\n");
961 #endif
962       nc->stmt = ic->stmt_2;
963       return GNUNET_OK;
964     }
965   if (ret != SQLITE_DONE)
966     {
967       LOG_SQLITE (plugin, NULL,
968                   GNUNET_ERROR_TYPE_ERROR |
969                   GNUNET_ERROR_TYPE_BULK,
970                   "sqlite3_step");
971       return GNUNET_SYSERR;
972     }
973   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
974     LOG_SQLITE (plugin, NULL,
975                 GNUNET_ERROR_TYPE_ERROR |
976                 GNUNET_ERROR_TYPE_BULK,
977                 "sqlite3_reset");
978 #if DEBUG_SQLITE
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980               "No result found using either iterator\n");
981 #endif
982   return GNUNET_NO;
983 }
984
985
986 /**
987  * Call a method for each key in the database and
988  * call the callback method on it.
989  *
990  * @param plugin our plugin context
991  * @param type entries of which type should be considered?
992  * @param is_asc are we iterating in ascending order?
993  * @param is_prio are we iterating by priority (otherwise by expiration)
994  * @param is_migr are we iterating in migration order?
995  * @param limit_nonanonymous are we restricting results to those with anonymity
996  *              level zero?
997  * @param stmt_str_1 first SQL statement to execute
998  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
999  * @param iter function to call on each matching value;
1000  *        will be called once with a NULL value at the end
1001  * @param iter_cls closure for iter
1002  */
1003 static void
1004 basic_iter (struct Plugin *plugin,
1005             uint32_t type,
1006             int is_asc,
1007             int is_prio,
1008             int is_migr,
1009             int limit_nonanonymous,
1010             const char *stmt_str_1,
1011             const char *stmt_str_2,
1012             PluginIterator iter,
1013             void *iter_cls)
1014 {
1015   struct NextContext *nc;
1016   struct IterContext *ic;
1017   sqlite3_stmt *stmt_1;
1018   sqlite3_stmt *stmt_2;
1019
1020 #if DEBUG_SQLITE
1021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1022               "At %llu, using queries `%s' and `%s'\n",
1023               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1024               stmt_str_1,
1025               stmt_str_2);
1026 #endif
1027   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1028     {
1029       LOG_SQLITE (plugin, NULL,
1030                   GNUNET_ERROR_TYPE_ERROR |
1031                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1032       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1033       return;
1034     }
1035   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1036     {
1037       LOG_SQLITE (plugin, NULL,
1038                   GNUNET_ERROR_TYPE_ERROR |
1039                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1040       sqlite3_finalize (stmt_1);
1041       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1042       return;
1043     }
1044   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1045                       sizeof(struct IterContext));
1046   nc->plugin = plugin;
1047   nc->iter = iter;
1048   nc->iter_cls = iter_cls;
1049   nc->stmt = NULL;
1050   ic = (struct IterContext*) &nc[1];
1051   ic->stmt_1 = stmt_1;
1052   ic->stmt_2 = stmt_2;
1053   ic->type = type;
1054   ic->is_asc = is_asc;
1055   ic->is_prio = is_prio;
1056   ic->is_migr = is_migr;
1057   ic->limit_nonanonymous = limit_nonanonymous;
1058   nc->prep = &iter_next_prepare;
1059   nc->prep_cls = ic;
1060   if (is_asc)
1061     {
1062       nc->lastPriority = 0;
1063       nc->lastExpiration.value = 0;
1064       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1065     }
1066   else
1067     {
1068       nc->lastPriority = 0x7FFFFFFF;
1069       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1070       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1071     }
1072   sqlite_next_request (nc, GNUNET_NO);
1073 }
1074
1075
1076 /**
1077  * Select a subset of the items in the datastore and call
1078  * the given iterator for each of them.
1079  *
1080  * @param cls our plugin context
1081  * @param type entries of which type should be considered?
1082  *        Use 0 for any type.
1083  * @param iter function to call on each matching value;
1084  *        will be called once with a NULL value at the end
1085  * @param iter_cls closure for iter
1086  */
1087 static void
1088 sqlite_plugin_iter_low_priority (void *cls,
1089                                  uint32_t type,
1090                                  PluginIterator iter,
1091                                  void *iter_cls)
1092 {
1093   basic_iter (cls,
1094               type, 
1095               GNUNET_YES, GNUNET_YES, 
1096               GNUNET_NO, GNUNET_NO,
1097               SELECT_IT_LOW_PRIORITY_1,
1098               SELECT_IT_LOW_PRIORITY_2, 
1099               iter, iter_cls);
1100 }
1101
1102
1103 /**
1104  * Select a subset of the items in the datastore and call
1105  * the given iterator for each of them.
1106  *
1107  * @param cls our plugin context
1108  * @param type entries of which type should be considered?
1109  *        Use 0 for any type.
1110  * @param iter function to call on each matching value;
1111  *        will be called once with a NULL value at the end
1112  * @param iter_cls closure for iter
1113  */
1114 static void
1115 sqlite_plugin_iter_zero_anonymity (void *cls,
1116                                    uint32_t type,
1117                                    PluginIterator iter,
1118                                    void *iter_cls)
1119 {
1120   struct GNUNET_TIME_Absolute now;
1121   char *q1;
1122   char *q2;
1123
1124   now = GNUNET_TIME_absolute_get ();
1125   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1126                    (unsigned long long) now.value);
1127   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1128                    (unsigned long long) now.value);
1129   basic_iter (cls,
1130               type, 
1131               GNUNET_NO, GNUNET_YES, 
1132               GNUNET_NO, GNUNET_YES,
1133               q1,
1134               q2,
1135               iter, iter_cls);
1136   GNUNET_free (q1);
1137   GNUNET_free (q2);
1138 }
1139
1140
1141
1142 /**
1143  * Select a subset of the items in the datastore and call
1144  * the given iterator for each of them.
1145  *
1146  * @param cls our plugin context
1147  * @param type entries of which type should be considered?
1148  *        Use 0 for any type.
1149  * @param iter function to call on each matching value;
1150  *        will be called once with a NULL value at the end
1151  * @param iter_cls closure for iter
1152  */
1153 static void
1154 sqlite_plugin_iter_ascending_expiration (void *cls,
1155                                          uint32_t type,
1156                                          PluginIterator iter,
1157                                          void *iter_cls)
1158 {
1159   struct GNUNET_TIME_Absolute now;
1160   char *q1;
1161   char *q2;
1162
1163   now = GNUNET_TIME_absolute_get ();
1164   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1165                    (unsigned long long) 0*now.value);
1166   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1167                    (unsigned long long) 0*now.value);
1168   basic_iter (cls,
1169               type, 
1170               GNUNET_YES, GNUNET_NO, 
1171               GNUNET_NO, GNUNET_NO,
1172               q1, q2,
1173               iter, iter_cls);
1174   GNUNET_free (q1);
1175   GNUNET_free (q2);
1176 }
1177
1178
1179 /**
1180  * Select a subset of the items in the datastore and call
1181  * the given iterator for each of them.
1182  *
1183  * @param cls our plugin context
1184  * @param type entries of which type should be considered?
1185  *        Use 0 for any type.
1186  * @param iter function to call on each matching value;
1187  *        will be called once with a NULL value at the end
1188  * @param iter_cls closure for iter
1189  */
1190 static void
1191 sqlite_plugin_iter_migration_order (void *cls,
1192                                     uint32_t type,
1193                                     PluginIterator iter,
1194                                     void *iter_cls)
1195 {
1196   struct GNUNET_TIME_Absolute now;
1197   char *q;
1198
1199   now = GNUNET_TIME_absolute_get ();
1200   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1201                    (unsigned long long) now.value);
1202   basic_iter (cls,
1203               type, 
1204               GNUNET_NO, GNUNET_NO, 
1205               GNUNET_YES, GNUNET_NO,
1206               SELECT_IT_MIGRATION_ORDER_1,
1207               q,
1208               iter, iter_cls);
1209   GNUNET_free (q);
1210 }
1211
1212
1213 /**
1214  * Call sqlite using the already prepared query to get
1215  * the next result.
1216  *
1217  * @param cls not used
1218  * @param nc context with the prepared query
1219  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1220  *        there are no more results 
1221  */
1222 static int
1223 all_next_prepare (void *cls,
1224                   struct NextContext *nc)
1225 {
1226   struct Plugin *plugin;
1227   int ret;
1228
1229   if (nc == NULL)
1230     {
1231 #if DEBUG_SQLITE
1232       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1233                   "Asked to clean up iterator state.\n");
1234 #endif
1235       return GNUNET_SYSERR;
1236     }
1237   plugin = nc->plugin;
1238   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1239     {      
1240       return GNUNET_OK;
1241     }
1242   if (ret != SQLITE_DONE)
1243     {
1244       LOG_SQLITE (plugin, NULL,
1245                   GNUNET_ERROR_TYPE_ERROR |
1246                   GNUNET_ERROR_TYPE_BULK,
1247                   "sqlite3_step");
1248       return GNUNET_SYSERR;
1249     }
1250   return GNUNET_NO;
1251 }
1252
1253
1254 /**
1255  * Select a subset of the items in the datastore and call
1256  * the given iterator for each of them.
1257  *
1258  * @param cls our plugin context
1259  * @param type entries of which type should be considered?
1260  *        Use 0 for any type.
1261  * @param iter function to call on each matching value;
1262  *        will be called once with a NULL value at the end
1263  * @param iter_cls closure for iter
1264  */
1265 static void
1266 sqlite_plugin_iter_all_now (void *cls,
1267                             uint32_t type,
1268                             PluginIterator iter,
1269                             void *iter_cls)
1270 {
1271   struct Plugin *plugin = cls;
1272   struct NextContext *nc;
1273   sqlite3_stmt *stmt;
1274
1275   if (sq_prepare (plugin->dbh, 
1276                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1277                   &stmt) != SQLITE_OK)
1278     {
1279       LOG_SQLITE (plugin, NULL,
1280                   GNUNET_ERROR_TYPE_ERROR |
1281                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1282       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1283       return;
1284     }
1285   nc = GNUNET_malloc (sizeof(struct NextContext));
1286   nc->plugin = plugin;
1287   nc->iter = iter;
1288   nc->iter_cls = iter_cls;
1289   nc->stmt = stmt;
1290   nc->prep = &all_next_prepare;
1291   nc->prep_cls = NULL;
1292   sqlite_next_request (nc, GNUNET_NO);
1293 }
1294
1295
1296 /**
1297  * FIXME.
1298  */
1299 struct GetNextContext
1300 {
1301
1302   /**
1303    * FIXME.
1304    */
1305   int total;
1306
1307   /**
1308    * FIXME.
1309    */
1310   int off;
1311
1312   /**
1313    * FIXME.
1314    */
1315   int have_vhash;
1316
1317   /**
1318    * FIXME.
1319    */
1320   unsigned int type;
1321
1322   /**
1323    * FIXME.
1324    */
1325   sqlite3_stmt *stmt;
1326
1327   /**
1328    * FIXME.
1329    */
1330   GNUNET_HashCode key;
1331
1332   /**
1333    * FIXME.
1334    */
1335   GNUNET_HashCode vhash;
1336 };
1337
1338
1339
1340 /**
1341  * FIXME.
1342  *
1343  * @param cls our "struct GetNextContext*"
1344  * @param nc FIXME
1345  * @return GNUNET_YES if there are more results, 
1346  *         GNUNET_NO if there are no more results,
1347  *         GNUNET_SYSERR on internal error
1348  */
1349 static int
1350 get_next_prepare (void *cls,
1351                   struct NextContext *nc)
1352 {
1353   struct GetNextContext *gnc = cls;
1354   int sqoff;
1355   int ret;
1356   int limit_off;
1357
1358   if (nc == NULL)
1359     {
1360       sqlite3_finalize (gnc->stmt);
1361       return GNUNET_SYSERR;
1362     }
1363   if (nc->count == gnc->total)
1364     return GNUNET_NO;
1365   if (nc->count + gnc->off == gnc->total)
1366     nc->last_rowid = 0;
1367   if (nc->count == 0)
1368     limit_off = gnc->off;
1369   else
1370     limit_off = 0;
1371   sqoff = 1;
1372   sqlite3_reset (nc->stmt);
1373   ret = sqlite3_bind_blob (nc->stmt,
1374                            sqoff++,
1375                            &gnc->key, 
1376                            sizeof (GNUNET_HashCode),
1377                            SQLITE_TRANSIENT);
1378   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1379     ret = sqlite3_bind_blob (nc->stmt,
1380                              sqoff++,
1381                              &gnc->vhash,
1382                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1383   if ((gnc->type != 0) && (ret == SQLITE_OK))
1384     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1385   if (ret == SQLITE_OK)
1386     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1387   if (ret == SQLITE_OK)
1388     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1389   if (ret != SQLITE_OK)
1390     return GNUNET_SYSERR;
1391   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1392     return GNUNET_NO;
1393   return GNUNET_OK;
1394 }
1395
1396
1397 /**
1398  * Iterate over the results for a particular key
1399  * in the datastore.
1400  *
1401  * @param cls closure
1402  * @param key maybe NULL (to match all entries)
1403  * @param vhash hash of the value, maybe NULL (to
1404  *        match all values that have the right key).
1405  *        Note that for DBlocks there is no difference
1406  *        betwen key and vhash, but for other blocks
1407  *        there may be!
1408  * @param type entries of which type are relevant?
1409  *     Use 0 for any type.
1410  * @param iter function to call on each matching value;
1411  *        will be called once with a NULL value at the end
1412  * @param iter_cls closure for iter
1413  */
1414 static void
1415 sqlite_plugin_get (void *cls,
1416                    const GNUNET_HashCode * key,
1417                    const GNUNET_HashCode * vhash,
1418                    uint32_t type,
1419                    PluginIterator iter, void *iter_cls)
1420 {
1421   struct Plugin *plugin = cls;
1422   struct GetNextContext *gpc;
1423   struct NextContext *nc;
1424   int ret;
1425   int total;
1426   sqlite3_stmt *stmt;
1427   char scratch[256];
1428   int sqoff;
1429
1430   GNUNET_assert (iter != NULL);
1431   if (key == NULL)
1432     {
1433       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1434       return;
1435     }
1436   GNUNET_snprintf (scratch, sizeof (scratch),
1437                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1438                    vhash == NULL ? "" : " AND vhash=:2",
1439                    type == 0 ? "" : (vhash ==
1440                                      NULL) ? " AND type=:2" : " AND type=:3");
1441   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1442     {
1443       LOG_SQLITE (plugin, NULL,
1444                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1445       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1446       return;
1447     }
1448   sqoff = 1;
1449   ret = sqlite3_bind_blob (stmt,
1450                            sqoff++,
1451                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1452   if ((vhash != NULL) && (ret == SQLITE_OK))
1453     ret = sqlite3_bind_blob (stmt,
1454                              sqoff++,
1455                              vhash,
1456                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1457   if ((type != 0) && (ret == SQLITE_OK))
1458     ret = sqlite3_bind_int (stmt, sqoff++, type);
1459   if (SQLITE_OK != ret)
1460     {
1461       LOG_SQLITE (plugin, NULL,
1462                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1463       sqlite3_reset (stmt);
1464       sqlite3_finalize (stmt);
1465       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1466       return;
1467     }
1468   ret = sqlite3_step (stmt);
1469   if (ret != SQLITE_ROW)
1470     {
1471       LOG_SQLITE (plugin, NULL,
1472                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1473                   "sqlite_step");
1474       sqlite3_reset (stmt);
1475       sqlite3_finalize (stmt);
1476       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1477       return;
1478     }
1479   total = sqlite3_column_int (stmt, 0);
1480   sqlite3_reset (stmt);
1481   sqlite3_finalize (stmt);
1482   if (0 == total)
1483     {
1484       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1485       return;
1486     }
1487
1488   GNUNET_snprintf (scratch, sizeof (scratch),
1489                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1490                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1491                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1492                    vhash == NULL ? "" : " AND vhash=:2",
1493                    type == 0 ? "" : (vhash ==
1494                                      NULL) ? " AND type=:2" : " AND type=:3",
1495                    sqoff, sqoff + 1);
1496   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1497     {
1498       LOG_SQLITE (plugin, NULL,
1499                   GNUNET_ERROR_TYPE_ERROR |
1500                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1501       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1502       return;
1503     }
1504   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1505                       sizeof(struct GetNextContext));
1506   nc->plugin = plugin;
1507   nc->iter = iter;
1508   nc->iter_cls = iter_cls;
1509   nc->stmt = stmt;
1510   gpc = (struct GetNextContext*) &nc[1];
1511   gpc->total = total;
1512   gpc->type = type;
1513   gpc->key = *key;
1514   gpc->stmt = stmt; /* alias used for freeing at the end! */
1515   if (NULL != vhash)
1516     {
1517       gpc->have_vhash = GNUNET_YES;
1518       gpc->vhash = *vhash;
1519     }
1520   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1521   nc->prep = &get_next_prepare;
1522   nc->prep_cls = gpc;
1523   sqlite_next_request (nc, GNUNET_NO);
1524 }
1525
1526
1527 /**
1528  * Drop database.
1529  *
1530  * @param cls our plugin context
1531  */
1532 static void 
1533 sqlite_plugin_drop (void *cls)
1534 {
1535   struct Plugin *plugin = cls;
1536   plugin->drop_on_shutdown = GNUNET_YES;
1537 }
1538
1539
1540 /**
1541  * Callback function to process statistic values.
1542  *
1543  * @param cls closure
1544  * @param subsystem name of subsystem that created the statistic
1545  * @param name the name of the datum
1546  * @param value the current value
1547  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1548  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1549  */
1550 static int
1551 process_stat_in (void *cls,
1552                  const char *subsystem,
1553                  const char *name,
1554                  uint64_t value,
1555                  int is_persistent)
1556 {
1557   struct Plugin *plugin = cls;
1558   plugin->payload += value;
1559 #if DEBUG_SQLITE
1560   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1561                    "sqlite",
1562                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1563                    value,
1564                    (unsigned long long) plugin->payload);
1565 #endif
1566   return GNUNET_OK;
1567 }
1568
1569
1570 static void
1571 process_stat_done (void *cls,
1572                    int success)
1573 {
1574   struct Plugin *plugin = cls;
1575   plugin->stat_get = NULL;
1576 }
1577                                          
1578
1579 /**
1580  * Entry point for the plugin.
1581  *
1582  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1583  * @return NULL on error, othrewise the plugin context
1584  */
1585 void *
1586 libgnunet_plugin_datastore_sqlite_init (void *cls)
1587 {
1588   static struct Plugin plugin;
1589   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1590   struct GNUNET_DATASTORE_PluginFunctions *api;
1591
1592   if (plugin.env != NULL)
1593     return NULL; /* can only initialize once! */
1594   memset (&plugin, 0, sizeof(struct Plugin));
1595   plugin.env = env;
1596   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1597                                                 "sqlite",
1598                                                 env->cfg);
1599   plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
1600                                            "sqlite",
1601                                            QUOTA_STAT_NAME,
1602                                            GNUNET_TIME_UNIT_MINUTES,
1603                                            &process_stat_done,
1604                                            &process_stat_in,
1605                                            &plugin);
1606   if (GNUNET_OK !=
1607       database_setup (env->cfg, &plugin))
1608     {
1609       database_shutdown (&plugin);
1610       return NULL;
1611     }
1612   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1613   api->cls = &plugin;
1614   api->get_size = &sqlite_plugin_get_size;
1615   api->put = &sqlite_plugin_put;
1616   api->next_request = &sqlite_next_request;
1617   api->get = &sqlite_plugin_get;
1618   api->update = &sqlite_plugin_update;
1619   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1620   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1621   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1622   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1623   api->iter_all_now = &sqlite_plugin_iter_all_now;
1624   api->drop = &sqlite_plugin_drop;
1625   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1626                    "sqlite", _("Sqlite database running\n"));
1627   return api;
1628 }
1629
1630
1631 /**
1632  * Exit point from the plugin.
1633  *
1634  * @param cls the plugin context (as returned by "init")
1635  * @return always NULL
1636  */
1637 void *
1638 libgnunet_plugin_datastore_sqlite_done (void *cls)
1639 {
1640   char *fn;
1641   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1642   struct Plugin *plugin = api->cls;
1643
1644   if (plugin->stat_get != NULL)
1645     {
1646       GNUNET_STATISTICS_get_cancel (plugin->stat_get);
1647       plugin->stat_get = NULL;
1648     }
1649   fn = NULL;
1650   if (plugin->drop_on_shutdown)
1651     fn = GNUNET_strdup (plugin->fn);
1652   database_shutdown (plugin);
1653   GNUNET_STATISTICS_destroy (plugin->statistics,
1654                              GNUNET_NO);
1655   plugin->env = NULL; 
1656   plugin->payload = 0;
1657   GNUNET_free (api);
1658   if (fn != NULL)
1659     {
1660       if (0 != UNLINK(fn))
1661         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1662                                   "unlink",
1663                                   fn);
1664       GNUNET_free (fn);
1665     }
1666   return NULL;
1667 }
1668
1669 /* end of plugin_datastore_sqlite.c */