missing function reference
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_YES
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("file-sharing datastore utilization (in bytes)")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94 /**
95  * Context for all functions in this plugin.
96  */
97 struct Plugin 
98 {
99   /**
100    * Our execution environment.
101    */
102   struct GNUNET_DATASTORE_PluginEnvironment *env;
103
104   /**
105    * Database filename.
106    */
107   char *fn;
108
109   /**
110    * Native SQLite database handle.
111    */
112   sqlite3 *dbh;
113
114   /**
115    * Precompiled SQL for update.
116    */
117   sqlite3_stmt *updPrio;
118
119   /**
120    * Precompiled SQL for insertion.
121    */
122   sqlite3_stmt *insertContent;
123
124   /**
125    * Handle to the statistics service.
126    */
127   struct GNUNET_STATISTICS_Handle *statistics;
128   
129   /**
130    * How much data are we currently storing
131    * in the database?
132    */
133   unsigned long long payload;
134
135   /**
136    * Number of updates that were made to the
137    * payload value since we last synchronized
138    * it with the statistics service.
139    */
140   unsigned int lastSync;
141
142   /**
143    * Should the database be dropped on shutdown?
144    */
145   int drop_on_shutdown;
146 };
147
148
149 /**
150  * @brief Prepare a SQL statement
151  *
152  * @param dbh handle to the database
153  * @param zSql SQL statement, UTF-8 encoded
154  * @param ppStmt set to the prepared statement
155  * @return 0 on success
156  */
157 static int
158 sq_prepare (sqlite3 * dbh, const char *zSql,
159             sqlite3_stmt ** ppStmt)
160 {
161   char *dummy;
162   return sqlite3_prepare (dbh,
163                           zSql,
164                           strlen (zSql), ppStmt, (const char **) &dummy);
165 }
166
167
168 /**
169  * Create our database indices.
170  * 
171  * @param dbh handle to the database
172  */
173 static void
174 create_indices (sqlite3 * dbh)
175 {
176   /* create indices */
177   sqlite3_exec (dbh,
178                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
179   sqlite3_exec (dbh,
180                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
181                 NULL, NULL);
182   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
183                 NULL);
184   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
185                 NULL);
186   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
187                 NULL, NULL);
188   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
189                 NULL, NULL, NULL);
190   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
191                 NULL, NULL);
192 }
193
194
195
196 #if 1
197 #define CHECK(a) GNUNET_break(a)
198 #define ENULL NULL
199 #else
200 #define ENULL &e
201 #define ENULL_DEFINED 1
202 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
203 #endif
204
205
206
207
208 /**
209  * Initialize the database connections and associated
210  * data structures (create tables and indices
211  * as needed as well).
212  *
213  * @param cfg our configuration
214  * @param plugin the plugin context (state for this module)
215  * @return GNUNET_OK on success
216  */
217 static int
218 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
219                 struct Plugin *plugin)
220 {
221   sqlite3_stmt *stmt;
222   char *afsdir;
223 #if ENULL_DEFINED
224   char *e;
225 #endif
226   
227   if (GNUNET_OK != 
228       GNUNET_CONFIGURATION_get_value_filename (cfg,
229                                                "datastore-sqlite",
230                                                "FILENAME",
231                                                &afsdir))
232     {
233       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
234                        "sqlite",
235                        _("Option `%s' in section `%s' missing in configuration!\n"),
236                        "FILENAME",
237                        "datastore-sqlite");
238       return GNUNET_SYSERR;
239     }
240   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
241     {
242       GNUNET_break (0);
243       GNUNET_free (afsdir);
244       return GNUNET_SYSERR;
245     }
246   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
247 #ifdef ENABLE_NLS
248                                               nl_langinfo (CODESET)
249 #else
250                                               "UTF-8"   /* good luck */
251 #endif
252                                               );
253   GNUNET_free (afsdir);
254   
255   /* Open database and precompile statements */
256   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
257     {
258       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
259                        "sqlite",
260                        _("Unable to initialize SQLite: %s.\n"),
261                        sqlite3_errmsg (plugin->dbh));
262       return GNUNET_SYSERR;
263     }
264   CHECK (SQLITE_OK ==
265          sqlite3_exec (plugin->dbh,
266                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
267   CHECK (SQLITE_OK ==
268          sqlite3_exec (plugin->dbh,
269                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
270   CHECK (SQLITE_OK ==
271          sqlite3_exec (plugin->dbh,
272                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
273   CHECK (SQLITE_OK ==
274          sqlite3_exec (plugin->dbh, "PRAGMA page_size=4092", NULL, NULL, ENULL));
275
276   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
277
278
279   /* We have to do it here, because otherwise precompiling SQL might fail */
280   CHECK (SQLITE_OK ==
281          sq_prepare (plugin->dbh,
282                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
283                      &stmt));
284   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
285        (sqlite3_exec (plugin->dbh,
286                       "CREATE TABLE gn080 ("
287                       "  size INT4 NOT NULL DEFAULT 0,"
288                       "  type INT4 NOT NULL DEFAULT 0,"
289                       "  prio INT4 NOT NULL DEFAULT 0,"
290                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
291                       "  expire INT8 NOT NULL DEFAULT 0,"
292                       "  hash TEXT NOT NULL DEFAULT '',"
293                       "  vhash TEXT NOT NULL DEFAULT '',"
294                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
295                       NULL) != SQLITE_OK) )
296     {
297       LOG_SQLITE (plugin, NULL,
298                   GNUNET_ERROR_TYPE_ERROR, 
299                   "sqlite3_exec");
300       sqlite3_finalize (stmt);
301       return GNUNET_SYSERR;
302     }
303   sqlite3_finalize (stmt);
304   create_indices (plugin->dbh);
305
306   CHECK (SQLITE_OK ==
307          sq_prepare (plugin->dbh,
308                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
309                      &stmt));
310   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
311        (sqlite3_exec (plugin->dbh,
312                       "CREATE TABLE gn071 ("
313                       "  key TEXT NOT NULL DEFAULT '',"
314                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
315                       NULL) != SQLITE_OK) )
316     {
317       LOG_SQLITE (plugin, NULL,
318                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
319       sqlite3_finalize (stmt);
320       return GNUNET_SYSERR;
321     }
322   sqlite3_finalize (stmt);
323
324   if ((sq_prepare (plugin->dbh,
325                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
326                    "_ROWID_ = ?",
327                    &plugin->updPrio) != SQLITE_OK) ||
328       (sq_prepare (plugin->dbh,
329                    "INSERT INTO gn080 (size, type, prio, "
330                    "anonLevel, expire, hash, vhash, value) VALUES "
331                    "(?, ?, ?, ?, ?, ?, ?, ?)",
332                    &plugin->insertContent) != SQLITE_OK))
333     {
334       LOG_SQLITE (plugin, NULL,
335                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
336       return GNUNET_SYSERR;
337     }
338   return GNUNET_OK;
339 }
340
341
342 /**
343  * Synchronize our utilization statistics with the 
344  * statistics service.
345  * @param plugin the plugin context (state for this module)
346  */
347 static void 
348 sync_stats (struct Plugin *plugin)
349 {
350   GNUNET_STATISTICS_set (plugin->statistics,
351                          QUOTA_STAT_NAME,
352                          plugin->payload,
353                          GNUNET_YES);
354   plugin->lastSync = 0;
355 }
356
357
358 /**
359  * Shutdown database connection and associate data
360  * structures.
361  * @param plugin the plugin context (state for this module)
362  */
363 static void
364 database_shutdown (struct Plugin *plugin)
365 {
366   if (plugin->lastSync > 0)
367     sync_stats (plugin);
368   if (plugin->updPrio != NULL)
369     sqlite3_finalize (plugin->updPrio);
370   if (plugin->insertContent != NULL)
371     sqlite3_finalize (plugin->insertContent);
372   sqlite3_close (plugin->dbh);
373   GNUNET_free_non_null (plugin->fn);
374 }
375
376
377 /**
378  * Get an estimate of how much space the database is
379  * currently using.
380  *
381  * @param cls our plugin context
382  * @return number of bytes used on disk
383  */
384 static unsigned long long sqlite_plugin_get_size (void *cls)
385 {
386   struct Plugin *plugin = cls;
387   return plugin->payload;
388 }
389
390
391 /**
392  * Delete the database entry with the given
393  * row identifier.
394  *
395  * @param plugin the plugin context (state for this module)
396  * @param rid the ID of the row to delete
397  */
398 static int
399 delete_by_rowid (struct Plugin* plugin, 
400                  unsigned long long rid)
401 {
402   sqlite3_stmt *stmt;
403
404   if (sq_prepare (plugin->dbh,
405                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
406     {
407       LOG_SQLITE (plugin, NULL,
408                   GNUNET_ERROR_TYPE_ERROR |
409                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
410       return GNUNET_SYSERR;
411     }
412   sqlite3_bind_int64 (stmt, 1, rid);
413   if (SQLITE_DONE != sqlite3_step (stmt))
414     {
415       LOG_SQLITE (plugin, NULL,
416                   GNUNET_ERROR_TYPE_ERROR |
417                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
418       sqlite3_finalize (stmt);
419       return GNUNET_SYSERR;
420     }
421   sqlite3_finalize (stmt);
422   return GNUNET_OK;
423 }
424
425
426 /**
427  * Context for the universal iterator.
428  */
429 struct NextContext;
430
431 /**
432  * Type of a function that will prepare
433  * the next iteration.
434  *
435  * @param cls closure
436  * @param nc the next context; NULL for the last
437  *         call which gives the callback a chance to
438  *         clean up the closure
439  * @return GNUNET_OK on success, GNUNET_NO if there are
440  *         no more values, GNUNET_SYSERR on error
441  */
442 typedef int (*PrepareFunction)(void *cls,
443                                struct NextContext *nc);
444
445
446 /**
447  * Context we keep for the "next request" callback.
448  */
449 struct NextContext
450 {
451   /**
452    * Internal state.
453    */ 
454   struct Plugin *plugin;
455
456   /**
457    * Function to call on the next value.
458    */
459   PluginIterator iter;
460
461   /**
462    * Closure for iter.
463    */
464   void *iter_cls;
465
466   /**
467    * Function to call to prepare the next
468    * iteration.
469    */
470   PrepareFunction prep;
471
472   /**
473    * Closure for prep.
474    */
475   void *prep_cls;
476
477   /**
478    * Statement that the iterator will get the data
479    * from (updated or set by prep).
480    */ 
481   sqlite3_stmt *stmt;
482
483   /**
484    * Row ID of the last result.
485    */
486   unsigned long long last_rowid;
487
488   /**
489    * Key of the last result.
490    */
491   GNUNET_HashCode lastKey;  
492
493   /**
494    * Expiration time of the last value visited.
495    */
496   struct GNUNET_TIME_Absolute lastExpiration;
497
498   /**
499    * Priority of the last value visited.
500    */ 
501   unsigned int lastPriority; 
502
503   /**
504    * Number of results processed so far.
505    */
506   unsigned int count;
507
508   /**
509    * Set to GNUNET_YES if we must stop now.
510    */
511   int end_it;
512 };
513
514
515 /**
516  * Continuation of "sqlite_next_request".
517  *
518  * @param cls the next context
519  * @param tc the task context (unused)
520  */
521 static void 
522 sqlite_next_request_cont (void *cls,
523                           const struct GNUNET_SCHEDULER_TaskContext *tc)
524 {
525   struct NextContext * nc= cls;
526   struct Plugin *plugin;
527   unsigned long long rowid;
528   sqlite3_stmt *stmtd;
529   int ret;
530   unsigned int type;
531   unsigned int size;
532   unsigned int priority;
533   unsigned int anonymity;
534   struct GNUNET_TIME_Absolute expiration;
535   const GNUNET_HashCode *key;
536   const void *data;
537
538  
539   plugin = nc->plugin;
540   if ( (GNUNET_YES == nc->end_it) ||
541        (GNUNET_OK != (nc->prep(nc->prep_cls,
542                                nc))) )
543     {
544     END:
545       nc->iter (nc->iter_cls, 
546                 NULL, NULL, 0, NULL, 0, 0, 0, 
547                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
548       nc->prep (nc->prep_cls, NULL);
549       GNUNET_free (nc);
550       return;
551     }
552
553   rowid = sqlite3_column_int64 (nc->stmt, 7);
554   nc->last_rowid = rowid;
555   type = sqlite3_column_int (nc->stmt, 1);
556   size = sqlite3_column_bytes (nc->stmt, 6);
557   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
558     {
559       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
560                        "sqlite",
561                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
562       if (SQLITE_OK != sqlite3_reset (nc->stmt))
563         LOG_SQLITE (nc->plugin, NULL,
564                     GNUNET_ERROR_TYPE_ERROR |
565                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
566       if (sq_prepare
567           (nc->plugin->dbh,
568            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
569            &stmtd) != SQLITE_OK)
570         {
571           LOG_SQLITE (nc->plugin, NULL,
572                       GNUNET_ERROR_TYPE_ERROR |
573                       GNUNET_ERROR_TYPE_BULK, 
574                       "sq_prepare");
575           goto END;
576         }
577
578       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
579         LOG_SQLITE (nc->plugin, NULL,
580                     GNUNET_ERROR_TYPE_ERROR |
581                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
582       if (SQLITE_DONE != sqlite3_step (stmtd))
583         LOG_SQLITE (nc->plugin, NULL,
584                     GNUNET_ERROR_TYPE_ERROR |
585                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
586       if (SQLITE_OK != sqlite3_finalize (stmtd))
587         LOG_SQLITE (nc->plugin, NULL,
588                     GNUNET_ERROR_TYPE_ERROR |
589                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
590       goto END;
591     }
592
593   priority = sqlite3_column_int (nc->stmt, 2);
594   anonymity = sqlite3_column_int (nc->stmt, 3);
595   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
596   key = sqlite3_column_blob (nc->stmt, 5);
597   nc->lastPriority = priority;
598   nc->lastExpiration = expiration;
599   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
600   data = sqlite3_column_blob (nc->stmt, 6);
601   nc->count++;
602   ret = nc->iter (nc->iter_cls,
603                   nc,
604                   key,
605                   size,
606                   data, 
607                   type,
608                   priority,
609                   anonymity,
610                   expiration,
611                   rowid);
612   if (ret == GNUNET_SYSERR)
613     {
614       nc->end_it = GNUNET_YES;
615       return;
616     }
617 #if DEBUG_SQLITE
618   if (ret == GNUNET_NO)
619     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
620                      "sqlite",
621                      "Asked to remove entry %llu (%u bytes)\n",
622                      (unsigned long long) rowid,
623                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
624 #endif
625   if ( (ret == GNUNET_NO) &&
626        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
627     {
628       plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
629       plugin->lastSync++; 
630 #if DEBUG_SQLITE
631       if (ret == GNUNET_NO)
632         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
633                          "sqlite",
634                          "Removed entry %llu (%u bytes), new payload is %llu\n",
635                          (unsigned long long) rowid,
636                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
637                          (unsigned long long) plugin->payload);
638 #endif
639       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
640         sync_stats (plugin);
641     }
642 }
643
644
645 /**
646  * Function invoked on behalf of a "PluginIterator"
647  * asking the database plugin to call the iterator
648  * with the next item.
649  *
650  * @param next_cls whatever argument was given
651  *        to the PluginIterator as "next_cls".
652  * @param end_it set to GNUNET_YES if we
653  *        should terminate the iteration early
654  *        (iterator should be still called once more
655  *         to signal the end of the iteration).
656  */
657 static void 
658 sqlite_next_request (void *next_cls,
659                      int end_it)
660 {
661   struct NextContext * nc= next_cls;
662
663   if (GNUNET_YES == end_it)
664     nc->end_it = GNUNET_YES;
665   GNUNET_SCHEDULER_add_continuation (nc->plugin->env->sched,
666                                      &sqlite_next_request_cont,
667                                      nc,
668                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
669 }
670
671
672
673 /**
674  * Store an item in the datastore.
675  *
676  * @param cls closure
677  * @param key key for the item
678  * @param size number of bytes in data
679  * @param data content stored
680  * @param type type of the content
681  * @param priority priority of the content
682  * @param anonymity anonymity-level for the content
683  * @param expiration expiration time for the content
684  * @param msg set to an error message
685  * @return GNUNET_OK on success
686  */
687 static int
688 sqlite_plugin_put (void *cls,
689                    const GNUNET_HashCode * key,
690                    uint32_t size,
691                    const void *data,
692                    uint32_t type,
693                    uint32_t priority,
694                    uint32_t anonymity,
695                    struct GNUNET_TIME_Absolute expiration,
696                    char ** msg)
697 {
698   struct Plugin *plugin = cls;
699   int n;
700   sqlite3_stmt *stmt;
701   GNUNET_HashCode vhash;
702
703 #if DEBUG_SQLITE
704   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
705                    "sqlite",
706                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
707                    type, 
708                    GNUNET_h2s(key),
709                    priority,
710                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
711                    (long long) expiration.value);
712 #endif
713   GNUNET_CRYPTO_hash (data, size, &vhash);
714   stmt = plugin->insertContent;
715   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
716       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
717       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
718       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
719       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
720       (SQLITE_OK !=
721        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
722                           SQLITE_TRANSIENT)) ||
723       (SQLITE_OK !=
724        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
725                           SQLITE_TRANSIENT))
726       || (SQLITE_OK !=
727           sqlite3_bind_blob (stmt, 8, data, size,
728                              SQLITE_TRANSIENT)))
729     {
730       LOG_SQLITE (plugin,
731                   msg,
732                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
733       if (SQLITE_OK != sqlite3_reset (stmt))
734         LOG_SQLITE (plugin, NULL,
735                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
736       return GNUNET_SYSERR;
737     }
738   n = sqlite3_step (stmt);
739   if (n != SQLITE_DONE)
740     {
741       if (n == SQLITE_BUSY)
742         {
743           LOG_SQLITE (plugin, msg,
744                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
745           sqlite3_reset (stmt);
746           GNUNET_break (0);
747           return GNUNET_NO;
748         }
749       LOG_SQLITE (plugin, msg,
750                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
751       sqlite3_reset (stmt);
752       return GNUNET_SYSERR;
753     }
754   if (SQLITE_OK != sqlite3_reset (stmt))
755     LOG_SQLITE (plugin, NULL,
756                 GNUNET_ERROR_TYPE_ERROR |
757                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
758   plugin->lastSync++;
759   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
760 #if DEBUG_SQLITE
761   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
762                    "sqlite",
763                    "Stored new entry (%u bytes), new payload is %llu\n",
764                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
765                    (unsigned long long) plugin->payload);
766 #endif
767   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
768     sync_stats (plugin);
769   return GNUNET_OK;
770 }
771
772
773 /**
774  * Update the priority for a particular key in the datastore.  If
775  * the expiration time in value is different than the time found in
776  * the datastore, the higher value should be kept.  For the
777  * anonymity level, the lower value is to be used.  The specified
778  * priority should be added to the existing priority, ignoring the
779  * priority in value.
780  *
781  * Note that it is possible for multiple values to match this put.
782  * In that case, all of the respective values are updated.
783  *
784  * @param cls the plugin context (state for this module)
785  * @param uid unique identifier of the datum
786  * @param delta by how much should the priority
787  *     change?  If priority + delta < 0 the
788  *     priority should be set to 0 (never go
789  *     negative).
790  * @param expire new expiration time should be the
791  *     MAX of any existing expiration time and
792  *     this value
793  * @param msg set to an error message
794  * @return GNUNET_OK on success
795  */
796 static int
797 sqlite_plugin_update (void *cls,
798                       uint64_t uid,
799                       int delta, struct GNUNET_TIME_Absolute expire,
800                       char **msg)
801 {
802   struct Plugin *plugin = cls;
803   int n;
804
805   sqlite3_bind_int (plugin->updPrio, 1, delta);
806   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
807   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
808   n = sqlite3_step (plugin->updPrio);
809   if (n != SQLITE_DONE)
810     LOG_SQLITE (plugin, msg,
811                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
812                 "sqlite3_step");
813 #if DEBUG_SQLITE
814   else
815     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
816                      "sqlite",
817                      "Block updated\n");
818 #endif
819   sqlite3_reset (plugin->updPrio);
820
821   if (n == SQLITE_BUSY)
822     return GNUNET_NO;
823   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
824 }
825
826
827 /**
828  * Internal context for an iteration.
829  */
830 struct IterContext
831 {
832   /**
833    * FIXME.
834    */
835   sqlite3_stmt *stmt_1;
836
837   /**
838    * FIXME.
839    */
840   sqlite3_stmt *stmt_2;
841
842   /**
843    * FIXME.
844    */
845   int is_asc;
846
847   /**
848    * FIXME.
849    */
850   int is_prio;
851
852   /**
853    * FIXME.
854    */
855   int is_migr;
856
857   /**
858    * FIXME.
859    */
860   int limit_nonanonymous;
861
862   /**
863    * Desired type for blocks returned by this iterator.
864    */
865   uint32_t type;
866 };
867
868
869 /**
870  * Prepare our SQL query to obtain the next record from the database.
871  *
872  * @param cls our "struct IterContext"
873  * @param nc NULL to terminate the iteration, otherwise our context for
874  *           getting the next result.
875  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
876  *         GNUNET_SYSERR on error (or end of iteration)
877  */
878 static int
879 iter_next_prepare (void *cls,
880                    struct NextContext *nc)
881 {
882   struct IterContext *ic = cls;
883   struct Plugin *plugin;
884   int ret;
885
886   if (nc == NULL)
887     {
888 #if DEBUG_SQLITE
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Asked to clean up iterator state.\n");
891 #endif
892       sqlite3_finalize (ic->stmt_1);
893       sqlite3_finalize (ic->stmt_2);
894       return GNUNET_SYSERR;
895     }
896   sqlite3_reset (ic->stmt_1);
897   sqlite3_reset (ic->stmt_2);
898   plugin = nc->plugin;
899   if (ic->is_prio)
900     {
901 #if DEBUG_SQLITE
902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
903                   "Restricting to results larger than the last priority %u\n",
904                   nc->lastPriority);
905 #endif
906       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
907       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
908     }
909   else
910     {
911 #if DEBUG_SQLITE
912       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913                   "Restricting to results larger than the last expiration %llu\n",
914                   (unsigned long long) nc->lastExpiration.value);
915 #endif
916       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
917       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
918     }
919 #if DEBUG_SQLITE
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Restricting to results larger than the last key `%s'\n",
922               GNUNET_h2s(&nc->lastKey));
923 #endif
924   sqlite3_bind_blob (ic->stmt_1, 2, 
925                      &nc->lastKey, 
926                      sizeof (GNUNET_HashCode),
927                      SQLITE_TRANSIENT);
928   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
929     {      
930 #if DEBUG_SQLITE
931       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
932                   "Result found using iterator 1\n");
933 #endif
934       nc->stmt = ic->stmt_1;
935       return GNUNET_OK;
936     }
937   if (ret != SQLITE_DONE)
938     {
939       LOG_SQLITE (plugin, NULL,
940                   GNUNET_ERROR_TYPE_ERROR |
941                   GNUNET_ERROR_TYPE_BULK,
942                   "sqlite3_step");
943       return GNUNET_SYSERR;
944     }
945   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
946     LOG_SQLITE (plugin, NULL,
947                 GNUNET_ERROR_TYPE_ERROR | 
948                 GNUNET_ERROR_TYPE_BULK, 
949                 "sqlite3_reset");
950   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
951     {
952 #if DEBUG_SQLITE
953       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954                   "Result found using iterator 2\n");
955 #endif
956       nc->stmt = ic->stmt_2;
957       return GNUNET_OK;
958     }
959   if (ret != SQLITE_DONE)
960     {
961       LOG_SQLITE (plugin, NULL,
962                   GNUNET_ERROR_TYPE_ERROR |
963                   GNUNET_ERROR_TYPE_BULK,
964                   "sqlite3_step");
965       return GNUNET_SYSERR;
966     }
967   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
968     LOG_SQLITE (plugin, NULL,
969                 GNUNET_ERROR_TYPE_ERROR |
970                 GNUNET_ERROR_TYPE_BULK,
971                 "sqlite3_reset");
972 #if DEBUG_SQLITE
973   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
974               "No result found using either iterator\n");
975 #endif
976   return GNUNET_NO;
977 }
978
979
980 /**
981  * Call a method for each key in the database and
982  * call the callback method on it.
983  *
984  * @param plugin our plugin context
985  * @param type entries of which type should be considered?
986  * @param is_asc are we iterating in ascending order?
987  * @param is_prio are we iterating by priority (otherwise by expiration)
988  * @param is_migr are we iterating in migration order?
989  * @param limit_nonanonymous are we restricting results to those with anonymity
990  *              level zero?
991  * @param stmt_str_1 first SQL statement to execute
992  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
993  * @param iter function to call on each matching value;
994  *        will be called once with a NULL value at the end
995  * @param iter_cls closure for iter
996  */
997 static void
998 basic_iter (struct Plugin *plugin,
999             uint32_t type,
1000             int is_asc,
1001             int is_prio,
1002             int is_migr,
1003             int limit_nonanonymous,
1004             const char *stmt_str_1,
1005             const char *stmt_str_2,
1006             PluginIterator iter,
1007             void *iter_cls)
1008 {
1009   struct NextContext *nc;
1010   struct IterContext *ic;
1011   sqlite3_stmt *stmt_1;
1012   sqlite3_stmt *stmt_2;
1013
1014 #if DEBUG_SQLITE
1015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016               "At %llu, using queries `%s' and `%s'\n",
1017               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1018               stmt_str_1,
1019               stmt_str_2);
1020 #endif
1021   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1022     {
1023       LOG_SQLITE (plugin, NULL,
1024                   GNUNET_ERROR_TYPE_ERROR |
1025                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1026       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1027       return;
1028     }
1029   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1030     {
1031       LOG_SQLITE (plugin, NULL,
1032                   GNUNET_ERROR_TYPE_ERROR |
1033                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1034       sqlite3_finalize (stmt_1);
1035       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1036       return;
1037     }
1038   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1039                       sizeof(struct IterContext));
1040   nc->plugin = plugin;
1041   nc->iter = iter;
1042   nc->iter_cls = iter_cls;
1043   nc->stmt = NULL;
1044   ic = (struct IterContext*) &nc[1];
1045   ic->stmt_1 = stmt_1;
1046   ic->stmt_2 = stmt_2;
1047   ic->type = type;
1048   ic->is_asc = is_asc;
1049   ic->is_prio = is_prio;
1050   ic->is_migr = is_migr;
1051   ic->limit_nonanonymous = limit_nonanonymous;
1052   nc->prep = &iter_next_prepare;
1053   nc->prep_cls = ic;
1054   if (is_asc)
1055     {
1056       nc->lastPriority = 0;
1057       nc->lastExpiration.value = 0;
1058       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1059     }
1060   else
1061     {
1062       nc->lastPriority = 0x7FFFFFFF;
1063       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1064       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1065     }
1066   sqlite_next_request (nc, GNUNET_NO);
1067 }
1068
1069
1070 /**
1071  * Select a subset of the items in the datastore and call
1072  * the given iterator for each of them.
1073  *
1074  * @param cls our plugin context
1075  * @param type entries of which type should be considered?
1076  *        Use 0 for any type.
1077  * @param iter function to call on each matching value;
1078  *        will be called once with a NULL value at the end
1079  * @param iter_cls closure for iter
1080  */
1081 static void
1082 sqlite_plugin_iter_low_priority (void *cls,
1083                                  uint32_t type,
1084                                  PluginIterator iter,
1085                                  void *iter_cls)
1086 {
1087   basic_iter (cls,
1088               type, 
1089               GNUNET_YES, GNUNET_YES, 
1090               GNUNET_NO, GNUNET_NO,
1091               SELECT_IT_LOW_PRIORITY_1,
1092               SELECT_IT_LOW_PRIORITY_2, 
1093               iter, iter_cls);
1094 }
1095
1096
1097 /**
1098  * Select a subset of the items in the datastore and call
1099  * the given iterator for each of them.
1100  *
1101  * @param cls our plugin context
1102  * @param type entries of which type should be considered?
1103  *        Use 0 for any type.
1104  * @param iter function to call on each matching value;
1105  *        will be called once with a NULL value at the end
1106  * @param iter_cls closure for iter
1107  */
1108 static void
1109 sqlite_plugin_iter_zero_anonymity (void *cls,
1110                                    uint32_t type,
1111                                    PluginIterator iter,
1112                                    void *iter_cls)
1113 {
1114   struct GNUNET_TIME_Absolute now;
1115   char *q1;
1116   char *q2;
1117
1118   now = GNUNET_TIME_absolute_get ();
1119   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1120                    (unsigned long long) now.value);
1121   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1122                    (unsigned long long) now.value);
1123   basic_iter (cls,
1124               type, 
1125               GNUNET_NO, GNUNET_YES, 
1126               GNUNET_NO, GNUNET_YES,
1127               q1,
1128               q2,
1129               iter, iter_cls);
1130   GNUNET_free (q1);
1131   GNUNET_free (q2);
1132 }
1133
1134
1135
1136 /**
1137  * Select a subset of the items in the datastore and call
1138  * the given iterator for each of them.
1139  *
1140  * @param cls our plugin context
1141  * @param type entries of which type should be considered?
1142  *        Use 0 for any type.
1143  * @param iter function to call on each matching value;
1144  *        will be called once with a NULL value at the end
1145  * @param iter_cls closure for iter
1146  */
1147 static void
1148 sqlite_plugin_iter_ascending_expiration (void *cls,
1149                                          uint32_t type,
1150                                          PluginIterator iter,
1151                                          void *iter_cls)
1152 {
1153   struct GNUNET_TIME_Absolute now;
1154   char *q1;
1155   char *q2;
1156
1157   now = GNUNET_TIME_absolute_get ();
1158   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1159                    (unsigned long long) 0*now.value);
1160   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1161                    (unsigned long long) 0*now.value);
1162   basic_iter (cls,
1163               type, 
1164               GNUNET_YES, GNUNET_NO, 
1165               GNUNET_NO, GNUNET_NO,
1166               q1, q2,
1167               iter, iter_cls);
1168   GNUNET_free (q1);
1169   GNUNET_free (q2);
1170 }
1171
1172
1173 /**
1174  * Select a subset of the items in the datastore and call
1175  * the given iterator for each of them.
1176  *
1177  * @param cls our plugin context
1178  * @param type entries of which type should be considered?
1179  *        Use 0 for any type.
1180  * @param iter function to call on each matching value;
1181  *        will be called once with a NULL value at the end
1182  * @param iter_cls closure for iter
1183  */
1184 static void
1185 sqlite_plugin_iter_migration_order (void *cls,
1186                                     uint32_t type,
1187                                     PluginIterator iter,
1188                                     void *iter_cls)
1189 {
1190   struct GNUNET_TIME_Absolute now;
1191   char *q;
1192
1193   now = GNUNET_TIME_absolute_get ();
1194   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1195                    (unsigned long long) now.value);
1196   basic_iter (cls,
1197               type, 
1198               GNUNET_NO, GNUNET_NO, 
1199               GNUNET_YES, GNUNET_NO,
1200               SELECT_IT_MIGRATION_ORDER_1,
1201               q,
1202               iter, iter_cls);
1203   GNUNET_free (q);
1204 }
1205
1206
1207 /**
1208  * Call sqlite using the already prepared query to get
1209  * the next result.
1210  *
1211  * @param cls not used
1212  * @param nc context with the prepared query
1213  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1214  *        there are no more results 
1215  */
1216 static int
1217 all_next_prepare (void *cls,
1218                   struct NextContext *nc)
1219 {
1220   struct Plugin *plugin;
1221   int ret;
1222
1223   if (nc == NULL)
1224     {
1225 #if DEBUG_SQLITE
1226       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                   "Asked to clean up iterator state.\n");
1228 #endif
1229       return GNUNET_SYSERR;
1230     }
1231   plugin = nc->plugin;
1232   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1233     {      
1234       return GNUNET_OK;
1235     }
1236   if (ret != SQLITE_DONE)
1237     {
1238       LOG_SQLITE (plugin, NULL,
1239                   GNUNET_ERROR_TYPE_ERROR |
1240                   GNUNET_ERROR_TYPE_BULK,
1241                   "sqlite3_step");
1242       return GNUNET_SYSERR;
1243     }
1244   return GNUNET_NO;
1245 }
1246
1247
1248 /**
1249  * Select a subset of the items in the datastore and call
1250  * the given iterator for each of them.
1251  *
1252  * @param cls our plugin context
1253  * @param type entries of which type should be considered?
1254  *        Use 0 for any type.
1255  * @param iter function to call on each matching value;
1256  *        will be called once with a NULL value at the end
1257  * @param iter_cls closure for iter
1258  */
1259 static void
1260 sqlite_plugin_iter_all_now (void *cls,
1261                             uint32_t type,
1262                             PluginIterator iter,
1263                             void *iter_cls)
1264 {
1265   struct Plugin *plugin = cls;
1266   struct NextContext *nc;
1267   sqlite3_stmt *stmt;
1268
1269   if (sq_prepare (plugin->dbh, 
1270                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1271                   &stmt) != SQLITE_OK)
1272     {
1273       LOG_SQLITE (plugin, NULL,
1274                   GNUNET_ERROR_TYPE_ERROR |
1275                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1276       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1277       return;
1278     }
1279   nc = GNUNET_malloc (sizeof(struct NextContext));
1280   nc->plugin = plugin;
1281   nc->iter = iter;
1282   nc->iter_cls = iter_cls;
1283   nc->stmt = stmt;
1284   nc->prep = &all_next_prepare;
1285   nc->prep_cls = NULL;
1286   sqlite_next_request (nc, GNUNET_NO);
1287 }
1288
1289
1290 /**
1291  * FIXME.
1292  */
1293 struct GetNextContext
1294 {
1295
1296   /**
1297    * FIXME.
1298    */
1299   int total;
1300
1301   /**
1302    * FIXME.
1303    */
1304   int off;
1305
1306   /**
1307    * FIXME.
1308    */
1309   int have_vhash;
1310
1311   /**
1312    * FIXME.
1313    */
1314   unsigned int type;
1315
1316   /**
1317    * FIXME.
1318    */
1319   sqlite3_stmt *stmt;
1320
1321   /**
1322    * FIXME.
1323    */
1324   GNUNET_HashCode key;
1325
1326   /**
1327    * FIXME.
1328    */
1329   GNUNET_HashCode vhash;
1330 };
1331
1332
1333
1334 /**
1335  * FIXME.
1336  *
1337  * @param cls our "struct GetNextContext*"
1338  * @param nc FIXME
1339  * @return GNUNET_YES if there are more results, 
1340  *         GNUNET_NO if there are no more results,
1341  *         GNUNET_SYSERR on internal error
1342  */
1343 static int
1344 get_next_prepare (void *cls,
1345                   struct NextContext *nc)
1346 {
1347   struct GetNextContext *gnc = cls;
1348   int sqoff;
1349   int ret;
1350   int limit_off;
1351
1352   if (nc == NULL)
1353     {
1354       sqlite3_finalize (gnc->stmt);
1355       return GNUNET_SYSERR;
1356     }
1357   if (nc->count == gnc->total)
1358     return GNUNET_NO;
1359   if (nc->count + gnc->off == gnc->total)
1360     nc->last_rowid = 0;
1361   if (nc->count == 0)
1362     limit_off = gnc->off;
1363   else
1364     limit_off = 0;
1365   sqoff = 1;
1366   sqlite3_reset (nc->stmt);
1367   ret = sqlite3_bind_blob (nc->stmt,
1368                            sqoff++,
1369                            &gnc->key, 
1370                            sizeof (GNUNET_HashCode),
1371                            SQLITE_TRANSIENT);
1372   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1373     ret = sqlite3_bind_blob (nc->stmt,
1374                              sqoff++,
1375                              &gnc->vhash,
1376                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1377   if ((gnc->type != 0) && (ret == SQLITE_OK))
1378     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1379   if (ret == SQLITE_OK)
1380     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1381   if (ret == SQLITE_OK)
1382     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1383   if (ret != SQLITE_OK)
1384     return GNUNET_SYSERR;
1385   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1386     return GNUNET_NO;
1387   return GNUNET_OK;
1388 }
1389
1390
1391 /**
1392  * Iterate over the results for a particular key
1393  * in the datastore.
1394  *
1395  * @param cls closure
1396  * @param key maybe NULL (to match all entries)
1397  * @param vhash hash of the value, maybe NULL (to
1398  *        match all values that have the right key).
1399  *        Note that for DBlocks there is no difference
1400  *        betwen key and vhash, but for other blocks
1401  *        there may be!
1402  * @param type entries of which type are relevant?
1403  *     Use 0 for any type.
1404  * @param iter function to call on each matching value;
1405  *        will be called once with a NULL value at the end
1406  * @param iter_cls closure for iter
1407  */
1408 static void
1409 sqlite_plugin_get (void *cls,
1410                    const GNUNET_HashCode * key,
1411                    const GNUNET_HashCode * vhash,
1412                    uint32_t type,
1413                    PluginIterator iter, void *iter_cls)
1414 {
1415   struct Plugin *plugin = cls;
1416   struct GetNextContext *gpc;
1417   struct NextContext *nc;
1418   int ret;
1419   int total;
1420   sqlite3_stmt *stmt;
1421   char scratch[256];
1422   int sqoff;
1423
1424   GNUNET_assert (iter != NULL);
1425   if (key == NULL)
1426     {
1427       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1428       return;
1429     }
1430   GNUNET_snprintf (scratch, 256,
1431                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1432                    vhash == NULL ? "" : " AND vhash=:2",
1433                    type == 0 ? "" : (vhash ==
1434                                      NULL) ? " AND type=:2" : " AND type=:3");
1435   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1436     {
1437       LOG_SQLITE (plugin, NULL,
1438                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1439       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1440       return;
1441     }
1442   sqoff = 1;
1443   ret = sqlite3_bind_blob (stmt,
1444                            sqoff++,
1445                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1446   if ((vhash != NULL) && (ret == SQLITE_OK))
1447     ret = sqlite3_bind_blob (stmt,
1448                              sqoff++,
1449                              vhash,
1450                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1451   if ((type != 0) && (ret == SQLITE_OK))
1452     ret = sqlite3_bind_int (stmt, sqoff++, type);
1453   if (SQLITE_OK != ret)
1454     {
1455       LOG_SQLITE (plugin, NULL,
1456                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1457       sqlite3_reset (stmt);
1458       sqlite3_finalize (stmt);
1459       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1460       return;
1461     }
1462   ret = sqlite3_step (stmt);
1463   if (ret != SQLITE_ROW)
1464     {
1465       LOG_SQLITE (plugin, NULL,
1466                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1467                   "sqlite_step");
1468       sqlite3_reset (stmt);
1469       sqlite3_finalize (stmt);
1470       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1471       return;
1472     }
1473   total = sqlite3_column_int (stmt, 0);
1474   sqlite3_reset (stmt);
1475   sqlite3_finalize (stmt);
1476   if (0 == total)
1477     {
1478       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1479       return;
1480     }
1481
1482   GNUNET_snprintf (scratch, 256,
1483                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1484                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1485                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1486                    vhash == NULL ? "" : " AND vhash=:2",
1487                    type == 0 ? "" : (vhash ==
1488                                      NULL) ? " AND type=:2" : " AND type=:3",
1489                    sqoff, sqoff + 1);
1490   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1491     {
1492       LOG_SQLITE (plugin, NULL,
1493                   GNUNET_ERROR_TYPE_ERROR |
1494                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1495       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1496       return;
1497     }
1498   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1499                       sizeof(struct GetNextContext));
1500   nc->plugin = plugin;
1501   nc->iter = iter;
1502   nc->iter_cls = iter_cls;
1503   nc->stmt = stmt;
1504   gpc = (struct GetNextContext*) &nc[1];
1505   gpc->total = total;
1506   gpc->type = type;
1507   gpc->key = *key;
1508   gpc->stmt = stmt; /* alias used for freeing at the end! */
1509   if (NULL != vhash)
1510     {
1511       gpc->have_vhash = GNUNET_YES;
1512       gpc->vhash = *vhash;
1513     }
1514   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1515   nc->prep = &get_next_prepare;
1516   nc->prep_cls = gpc;
1517   sqlite_next_request (nc, GNUNET_NO);
1518 }
1519
1520
1521 /**
1522  * Drop database.
1523  *
1524  * @param cls our plugin context
1525  */
1526 static void 
1527 sqlite_plugin_drop (void *cls)
1528 {
1529   struct Plugin *plugin = cls;
1530   plugin->drop_on_shutdown = GNUNET_YES;
1531 }
1532
1533
1534 /**
1535  * Callback function to process statistic values.
1536  *
1537  * @param cls closure
1538  * @param subsystem name of subsystem that created the statistic
1539  * @param name the name of the datum
1540  * @param value the current value
1541  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1542  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1543  */
1544 static int
1545 process_stat_in (void *cls,
1546                  const char *subsystem,
1547                  const char *name,
1548                  uint64_t value,
1549                  int is_persistent)
1550 {
1551   struct Plugin *plugin = cls;
1552   plugin->payload += value;
1553 #if DEBUG_SQLITE
1554   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1555                    "sqlite",
1556                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1557                    value,
1558                    (unsigned long long) plugin->payload);
1559 #endif
1560   return GNUNET_OK;
1561 }
1562                                          
1563
1564 /**
1565  * Entry point for the plugin.
1566  *
1567  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1568  * @return NULL on error, othrewise the plugin context
1569  */
1570 void *
1571 libgnunet_plugin_datastore_sqlite_init (void *cls)
1572 {
1573   static struct Plugin plugin;
1574   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1575   struct GNUNET_DATASTORE_PluginFunctions *api;
1576
1577   if (plugin.env != NULL)
1578     return NULL; /* can only initialize once! */
1579   memset (&plugin, 0, sizeof(struct Plugin));
1580   plugin.env = env;
1581   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1582                                                 "sqlite",
1583                                                 env->cfg);
1584   GNUNET_STATISTICS_get (plugin.statistics,
1585                          "sqlite",
1586                          QUOTA_STAT_NAME,
1587                          GNUNET_TIME_UNIT_MINUTES,
1588                          NULL,
1589                          &process_stat_in,
1590                          &plugin);
1591   if (GNUNET_OK !=
1592       database_setup (env->cfg, &plugin))
1593     {
1594       database_shutdown (&plugin);
1595       return NULL;
1596     }
1597   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1598   api->cls = &plugin;
1599   api->get_size = &sqlite_plugin_get_size;
1600   api->put = &sqlite_plugin_put;
1601   api->next_request = &sqlite_next_request;
1602   api->get = &sqlite_plugin_get;
1603   api->update = &sqlite_plugin_update;
1604   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1605   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1606   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1607   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1608   api->iter_all_now = &sqlite_plugin_iter_all_now;
1609   api->drop = &sqlite_plugin_drop;
1610   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1611                    "sqlite", _("Sqlite database running\n"));
1612   return api;
1613 }
1614
1615
1616 /**
1617  * Exit point from the plugin.
1618  *
1619  * @param cls the plugin context (as returned by "init")
1620  * @return always NULL
1621  */
1622 void *
1623 libgnunet_plugin_datastore_sqlite_done (void *cls)
1624 {
1625   char *fn;
1626   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1627   struct Plugin *plugin = api->cls;
1628
1629   fn = NULL;
1630   if (plugin->drop_on_shutdown)
1631     fn = GNUNET_strdup (plugin->fn);
1632   database_shutdown (plugin);
1633   GNUNET_STATISTICS_destroy (plugin->statistics,
1634                              GNUNET_NO);
1635   plugin->env = NULL; 
1636   plugin->payload = 0;
1637   GNUNET_free (api);
1638   if (fn != NULL)
1639     {
1640       if (0 != UNLINK(fn))
1641         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1642                                   "unlink",
1643                                   fn);
1644       GNUNET_free (fn);
1645     }
1646   return NULL;
1647 }
1648
1649 /* end of plugin_datastore_sqlite.c */