fixing mess with search update serialization and parenting
[oweals/gnunet.git] / src / datastore / plugin_datastore_sqlite.c
1  /*
2      This file is part of GNUnet
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/plugin_datastore_sqlite.c
23  * @brief sqlite-based datastore backend
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_statistics_service.h"
29 #include "plugin_datastore.h"
30 #include <sqlite3.h>
31
32 #define DEBUG_SQLITE GNUNET_YES
33
34 /**
35  * After how many payload-changing operations
36  * do we sync our statistics?
37  */
38 #define MAX_STAT_SYNC_LAG 50
39
40 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
41
42 /**
43  * Log an error message at log-level 'level' that indicates
44  * a failure of the command 'cmd' on file 'filename'
45  * with the message given by strerror(errno).
46  */
47 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed with error: %s"), cmd, sqlite3_errmsg(db->dbh)); } while(0)
48
49 #define SELECT_IT_LOW_PRIORITY_1 \
50   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash > ?) "\
51   "ORDER BY hash ASC LIMIT 1"
52
53 #define SELECT_IT_LOW_PRIORITY_2 \
54   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio > ?) "\
55   "ORDER BY prio ASC, hash ASC LIMIT 1"
56
57 #define SELECT_IT_NON_ANONYMOUS_1 \
58   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
59   " ORDER BY hash DESC LIMIT 1"
60
61 #define SELECT_IT_NON_ANONYMOUS_2 \
62   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\
63   " ORDER BY prio DESC, hash DESC LIMIT 1"
64
65 #define SELECT_IT_EXPIRATION_TIME_1 \
66   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash > ?) "\
67   " ORDER BY hash ASC LIMIT 1"
68
69 #define SELECT_IT_EXPIRATION_TIME_2 \
70   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire > ?) "\
71   " ORDER BY expire ASC, hash ASC LIMIT 1"
72
73 #define SELECT_IT_MIGRATION_ORDER_1 \
74   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire = ? AND hash < ?) "\
75   " ORDER BY hash DESC LIMIT 1"
76
77 #define SELECT_IT_MIGRATION_ORDER_2 \
78   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE (expire < ? AND expire > %llu) "\
79   " ORDER BY expire DESC, hash DESC LIMIT 1"
80
81 /**
82  * After how many ms "busy" should a DB operation fail for good?
83  * A low value makes sure that we are more responsive to requests
84  * (especially PUTs).  A high value guarantees a higher success
85  * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
86  *
87  * The default value of 250ms should ensure that users do not experience
88  * huge latencies while at the same time allowing operations to succeed
89  * with reasonable probability.
90  */
91 #define BUSY_TIMEOUT_MS 250
92
93
94
95 /**
96  * Context for all functions in this plugin.
97  */
98 struct Plugin 
99 {
100   /**
101    * Our execution environment.
102    */
103   struct GNUNET_DATASTORE_PluginEnvironment *env;
104
105   /**
106    * Database filename.
107    */
108   char *fn;
109
110   /**
111    * Native SQLite database handle.
112    */
113   sqlite3 *dbh;
114
115   /**
116    * Precompiled SQL for update.
117    */
118   sqlite3_stmt *updPrio;
119
120   /**
121    * Precompiled SQL for insertion.
122    */
123   sqlite3_stmt *insertContent;
124
125   /**
126    * Handle to the statistics service.
127    */
128   struct GNUNET_STATISTICS_Handle *statistics;
129
130   /**
131    * Handle for pending get request.
132    */
133   struct GNUNET_STATISTICS_GetHandle *stat_get;
134
135   /**
136    * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
137    */
138   struct NextContext *next_task_nc;
139
140   /**
141    * Pending task with scheduler for running the next request.
142    */
143   GNUNET_SCHEDULER_TaskIdentifier next_task;
144   
145   /**
146    * How much data are we currently storing
147    * in the database?
148    */
149   unsigned long long payload;
150
151   /**
152    * Number of updates that were made to the
153    * payload value since we last synchronized
154    * it with the statistics service.
155    */
156   unsigned int lastSync;
157
158   /**
159    * Should the database be dropped on shutdown?
160    */
161   int drop_on_shutdown;
162
163   /**
164    * Did we get an answer from statistics?
165    */
166   int stats_worked;
167 };
168
169
170 /**
171  * @brief Prepare a SQL statement
172  *
173  * @param dbh handle to the database
174  * @param zSql SQL statement, UTF-8 encoded
175  * @param ppStmt set to the prepared statement
176  * @return 0 on success
177  */
178 static int
179 sq_prepare (sqlite3 * dbh, const char *zSql,
180             sqlite3_stmt ** ppStmt)
181 {
182   char *dummy;
183   return sqlite3_prepare (dbh,
184                           zSql,
185                           strlen (zSql), ppStmt, (const char **) &dummy);
186 }
187
188
189 /**
190  * Create our database indices.
191  * 
192  * @param dbh handle to the database
193  */
194 static void
195 create_indices (sqlite3 * dbh)
196 {
197   /* create indices */
198   sqlite3_exec (dbh,
199                 "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
200   sqlite3_exec (dbh,
201                 "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
202                 NULL, NULL);
203   sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
204                 NULL);
205   sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
206                 NULL);
207   sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
208                 NULL, NULL);
209   sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
210                 NULL, NULL, NULL);
211   sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
212                 NULL, NULL);
213 }
214
215
216
217 #if 1
218 #define CHECK(a) GNUNET_break(a)
219 #define ENULL NULL
220 #else
221 #define ENULL &e
222 #define ENULL_DEFINED 1
223 #define CHECK(a) if (! a) { GNUNET_log(GNUNET_ERROR_TYPE_ERRROR, "%s\n", e); sqlite3_free(e); }
224 #endif
225
226
227
228
229 /**
230  * Initialize the database connections and associated
231  * data structures (create tables and indices
232  * as needed as well).
233  *
234  * @param cfg our configuration
235  * @param plugin the plugin context (state for this module)
236  * @return GNUNET_OK on success
237  */
238 static int
239 database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg,
240                 struct Plugin *plugin)
241 {
242   sqlite3_stmt *stmt;
243   char *afsdir;
244 #if ENULL_DEFINED
245   char *e;
246 #endif
247   
248   if (GNUNET_OK != 
249       GNUNET_CONFIGURATION_get_value_filename (cfg,
250                                                "datastore-sqlite",
251                                                "FILENAME",
252                                                &afsdir))
253     {
254       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
255                        "sqlite",
256                        _("Option `%s' in section `%s' missing in configuration!\n"),
257                        "FILENAME",
258                        "datastore-sqlite");
259       return GNUNET_SYSERR;
260     }
261   if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (afsdir))
262     {
263       GNUNET_break (0);
264       GNUNET_free (afsdir);
265       return GNUNET_SYSERR;
266     }
267   plugin->fn = GNUNET_STRINGS_to_utf8 (afsdir, strlen (afsdir),
268 #ifdef ENABLE_NLS
269                                               nl_langinfo (CODESET)
270 #else
271                                               "UTF-8"   /* good luck */
272 #endif
273                                               );
274   GNUNET_free (afsdir);
275   
276   /* Open database and precompile statements */
277   if (sqlite3_open (plugin->fn, &plugin->dbh) != SQLITE_OK)
278     {
279       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
280                        "sqlite",
281                        _("Unable to initialize SQLite: %s.\n"),
282                        sqlite3_errmsg (plugin->dbh));
283       return GNUNET_SYSERR;
284     }
285   CHECK (SQLITE_OK ==
286          sqlite3_exec (plugin->dbh,
287                        "PRAGMA temp_store=MEMORY", NULL, NULL, ENULL));
288   CHECK (SQLITE_OK ==
289          sqlite3_exec (plugin->dbh,
290                        "PRAGMA synchronous=OFF", NULL, NULL, ENULL));
291   CHECK (SQLITE_OK ==
292          sqlite3_exec (plugin->dbh,
293                        "PRAGMA auto_vacuum=INCREMENTAL", NULL, NULL, ENULL));
294   CHECK (SQLITE_OK ==
295          sqlite3_exec (plugin->dbh,
296                        "PRAGMA count_changes=OFF", NULL, NULL, ENULL));
297   CHECK (SQLITE_OK ==
298          sqlite3_exec (plugin->dbh, 
299                        "PRAGMA page_size=4092", NULL, NULL, ENULL));
300
301   CHECK (SQLITE_OK == sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS));
302
303
304   /* We have to do it here, because otherwise precompiling SQL might fail */
305   CHECK (SQLITE_OK ==
306          sq_prepare (plugin->dbh,
307                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
308                      &stmt));
309   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
310        (sqlite3_exec (plugin->dbh,
311                       "CREATE TABLE gn080 ("
312                       "  size INT4 NOT NULL DEFAULT 0,"
313                       "  type INT4 NOT NULL DEFAULT 0,"
314                       "  prio INT4 NOT NULL DEFAULT 0,"
315                       "  anonLevel INT4 NOT NULL DEFAULT 0,"
316                       "  expire INT8 NOT NULL DEFAULT 0,"
317                       "  hash TEXT NOT NULL DEFAULT '',"
318                       "  vhash TEXT NOT NULL DEFAULT '',"
319                       "  value BLOB NOT NULL DEFAULT '')", NULL, NULL,
320                       NULL) != SQLITE_OK) )
321     {
322       LOG_SQLITE (plugin, NULL,
323                   GNUNET_ERROR_TYPE_ERROR, 
324                   "sqlite3_exec");
325       sqlite3_finalize (stmt);
326       return GNUNET_SYSERR;
327     }
328   sqlite3_finalize (stmt);
329   create_indices (plugin->dbh);
330
331   CHECK (SQLITE_OK ==
332          sq_prepare (plugin->dbh,
333                      "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn071'",
334                      &stmt));
335   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
336        (sqlite3_exec (plugin->dbh,
337                       "CREATE TABLE gn071 ("
338                       "  key TEXT NOT NULL DEFAULT '',"
339                       "  value INTEGER NOT NULL DEFAULT 0)", NULL, NULL,
340                       NULL) != SQLITE_OK) )
341     {
342       LOG_SQLITE (plugin, NULL,
343                   GNUNET_ERROR_TYPE_ERROR, "sqlite3_exec");
344       sqlite3_finalize (stmt);
345       return GNUNET_SYSERR;
346     }
347   sqlite3_finalize (stmt);
348
349   if ((sq_prepare (plugin->dbh,
350                    "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) WHERE "
351                    "_ROWID_ = ?",
352                    &plugin->updPrio) != SQLITE_OK) ||
353       (sq_prepare (plugin->dbh,
354                    "INSERT INTO gn080 (size, type, prio, "
355                    "anonLevel, expire, hash, vhash, value) VALUES "
356                    "(?, ?, ?, ?, ?, ?, ?, ?)",
357                    &plugin->insertContent) != SQLITE_OK))
358     {
359       LOG_SQLITE (plugin, NULL,
360                   GNUNET_ERROR_TYPE_ERROR, "precompiling");
361       return GNUNET_SYSERR;
362     }
363   return GNUNET_OK;
364 }
365
366
367 /**
368  * Synchronize our utilization statistics with the 
369  * statistics service.
370  * @param plugin the plugin context (state for this module)
371  */
372 static void 
373 sync_stats (struct Plugin *plugin)
374 {
375   GNUNET_STATISTICS_set (plugin->statistics,
376                          QUOTA_STAT_NAME,
377                          plugin->payload,
378                          GNUNET_YES);
379   plugin->lastSync = 0;
380 }
381
382
383 /**
384  * Shutdown database connection and associate data
385  * structures.
386  * @param plugin the plugin context (state for this module)
387  */
388 static void
389 database_shutdown (struct Plugin *plugin)
390 {
391   if (plugin->lastSync > 0)
392     sync_stats (plugin);
393   if (plugin->updPrio != NULL)
394     sqlite3_finalize (plugin->updPrio);
395   if (plugin->insertContent != NULL)
396     sqlite3_finalize (plugin->insertContent);
397   sqlite3_close (plugin->dbh);
398   GNUNET_free_non_null (plugin->fn);
399 }
400
401
402 /**
403  * Get an estimate of how much space the database is
404  * currently using.
405  *
406  * @param cls our plugin context
407  * @return number of bytes used on disk
408  */
409 static unsigned long long sqlite_plugin_get_size (void *cls)
410 {
411   struct Plugin *plugin = cls;
412   return plugin->payload;
413 }
414
415
416 /**
417  * Delete the database entry with the given
418  * row identifier.
419  *
420  * @param plugin the plugin context (state for this module)
421  * @param rid the ID of the row to delete
422  */
423 static int
424 delete_by_rowid (struct Plugin* plugin, 
425                  unsigned long long rid)
426 {
427   sqlite3_stmt *stmt;
428
429   if (sq_prepare (plugin->dbh,
430                   "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != SQLITE_OK)
431     {
432       LOG_SQLITE (plugin, NULL,
433                   GNUNET_ERROR_TYPE_ERROR |
434                   GNUNET_ERROR_TYPE_BULK, "sq_prepare");
435       return GNUNET_SYSERR;
436     }
437   sqlite3_bind_int64 (stmt, 1, rid);
438   if (SQLITE_DONE != sqlite3_step (stmt))
439     {
440       LOG_SQLITE (plugin, NULL,
441                   GNUNET_ERROR_TYPE_ERROR |
442                   GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
443       sqlite3_finalize (stmt);
444       return GNUNET_SYSERR;
445     }
446   sqlite3_finalize (stmt);
447   return GNUNET_OK;
448 }
449
450
451 /**
452  * Context for the universal iterator.
453  */
454 struct NextContext;
455
456 /**
457  * Type of a function that will prepare
458  * the next iteration.
459  *
460  * @param cls closure
461  * @param nc the next context; NULL for the last
462  *         call which gives the callback a chance to
463  *         clean up the closure
464  * @return GNUNET_OK on success, GNUNET_NO if there are
465  *         no more values, GNUNET_SYSERR on error
466  */
467 typedef int (*PrepareFunction)(void *cls,
468                                struct NextContext *nc);
469
470
471 /**
472  * Context we keep for the "next request" callback.
473  */
474 struct NextContext
475 {
476   /**
477    * Internal state.
478    */ 
479   struct Plugin *plugin;
480
481   /**
482    * Function to call on the next value.
483    */
484   PluginIterator iter;
485
486   /**
487    * Closure for iter.
488    */
489   void *iter_cls;
490
491   /**
492    * Function to call to prepare the next
493    * iteration.
494    */
495   PrepareFunction prep;
496
497   /**
498    * Closure for prep.
499    */
500   void *prep_cls;
501
502   /**
503    * Statement that the iterator will get the data
504    * from (updated or set by prep).
505    */ 
506   sqlite3_stmt *stmt;
507
508   /**
509    * Row ID of the last result.
510    */
511   unsigned long long last_rowid;
512
513   /**
514    * Key of the last result.
515    */
516   GNUNET_HashCode lastKey;  
517
518   /**
519    * Expiration time of the last value visited.
520    */
521   struct GNUNET_TIME_Absolute lastExpiration;
522
523   /**
524    * Priority of the last value visited.
525    */ 
526   unsigned int lastPriority; 
527
528   /**
529    * Number of results processed so far.
530    */
531   unsigned int count;
532
533   /**
534    * Set to GNUNET_YES if we must stop now.
535    */
536   int end_it;
537 };
538
539
540 /**
541  * Continuation of "sqlite_next_request".
542  *
543  * @param cls the next context
544  * @param tc the task context (unused)
545  */
546 static void 
547 sqlite_next_request_cont (void *cls,
548                           const struct GNUNET_SCHEDULER_TaskContext *tc)
549 {
550   struct NextContext * nc = cls;
551   struct Plugin *plugin;
552   unsigned long long rowid;
553   sqlite3_stmt *stmtd;
554   int ret;
555   unsigned int type;
556   unsigned int size;
557   unsigned int priority;
558   unsigned int anonymity;
559   struct GNUNET_TIME_Absolute expiration;
560   const GNUNET_HashCode *key;
561   const void *data;
562   
563   plugin = nc->plugin;
564   plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
565   plugin->next_task_nc = NULL;
566   if ( (GNUNET_YES == nc->end_it) ||
567        (GNUNET_OK != (nc->prep(nc->prep_cls,
568                                nc))) )
569     {
570     END:
571       nc->iter (nc->iter_cls, 
572                 NULL, NULL, 0, NULL, 0, 0, 0, 
573                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
574       nc->prep (nc->prep_cls, NULL);
575       GNUNET_free (nc);
576       return;
577     }
578
579   rowid = sqlite3_column_int64 (nc->stmt, 7);
580   nc->last_rowid = rowid;
581   type = sqlite3_column_int (nc->stmt, 1);
582   size = sqlite3_column_bytes (nc->stmt, 6);
583   if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
584     {
585       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
586                        "sqlite",
587                        _("Invalid data in database.  Trying to fix (by deletion).\n"));
588       if (SQLITE_OK != sqlite3_reset (nc->stmt))
589         LOG_SQLITE (nc->plugin, NULL,
590                     GNUNET_ERROR_TYPE_ERROR |
591                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
592       if (sq_prepare
593           (nc->plugin->dbh,
594            "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
595            &stmtd) != SQLITE_OK)
596         {
597           LOG_SQLITE (nc->plugin, NULL,
598                       GNUNET_ERROR_TYPE_ERROR |
599                       GNUNET_ERROR_TYPE_BULK, 
600                       "sq_prepare");
601           goto END;
602         }
603
604       if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode)))
605         LOG_SQLITE (nc->plugin, NULL,
606                     GNUNET_ERROR_TYPE_ERROR |
607                     GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int");
608       if (SQLITE_DONE != sqlite3_step (stmtd))
609         LOG_SQLITE (nc->plugin, NULL,
610                     GNUNET_ERROR_TYPE_ERROR |
611                     GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
612       if (SQLITE_OK != sqlite3_finalize (stmtd))
613         LOG_SQLITE (nc->plugin, NULL,
614                     GNUNET_ERROR_TYPE_ERROR |
615                     GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize");
616       goto END;
617     }
618
619   priority = sqlite3_column_int (nc->stmt, 2);
620   anonymity = sqlite3_column_int (nc->stmt, 3);
621   expiration.value = sqlite3_column_int64 (nc->stmt, 4);
622   key = sqlite3_column_blob (nc->stmt, 5);
623   nc->lastPriority = priority;
624   nc->lastExpiration = expiration;
625   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
626   data = sqlite3_column_blob (nc->stmt, 6);
627   nc->count++;
628   ret = nc->iter (nc->iter_cls,
629                   nc,
630                   key,
631                   size,
632                   data, 
633                   type,
634                   priority,
635                   anonymity,
636                   expiration,
637                   rowid);
638   if (ret == GNUNET_SYSERR)
639     {
640       nc->end_it = GNUNET_YES;
641       return;
642     }
643 #if DEBUG_SQLITE
644   if (ret == GNUNET_NO)
645     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
646                      "sqlite",
647                      "Asked to remove entry %llu (%u bytes)\n",
648                      (unsigned long long) rowid,
649                      size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
650 #endif
651   if ( (ret == GNUNET_NO) &&
652        (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
653     {
654       if (plugin->payload >= size + GNUNET_DATASTORE_ENTRY_OVERHEAD)
655         plugin->payload -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
656       else
657         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
658                     _("Datastore payload inaccurate, please fix and restart!\n"));
659       plugin->lastSync++; 
660 #if DEBUG_SQLITE
661       if (ret == GNUNET_NO)
662         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
663                          "sqlite",
664                          "Removed entry %llu (%u bytes), new payload is %llu\n",
665                          (unsigned long long) rowid,
666                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
667                          plugin->payload);
668 #endif
669       if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
670         sync_stats (plugin);
671     }
672 }
673
674
675 /**
676  * Function invoked on behalf of a "PluginIterator"
677  * asking the database plugin to call the iterator
678  * with the next item.
679  *
680  * @param next_cls whatever argument was given
681  *        to the PluginIterator as "next_cls".
682  * @param end_it set to GNUNET_YES if we
683  *        should terminate the iteration early
684  *        (iterator should be still called once more
685  *         to signal the end of the iteration).
686  */
687 static void 
688 sqlite_next_request (void *next_cls,
689                      int end_it)
690 {
691   struct NextContext * nc= next_cls;
692
693   if (GNUNET_YES == end_it)
694     nc->end_it = GNUNET_YES;
695   nc->plugin->next_task_nc = nc;
696   nc->plugin->next_task = GNUNET_SCHEDULER_add_now (nc->plugin->env->sched,
697                                                     &sqlite_next_request_cont,
698                                                     nc);
699 }
700
701
702
703 /**
704  * Store an item in the datastore.
705  *
706  * @param cls closure
707  * @param key key for the item
708  * @param size number of bytes in data
709  * @param data content stored
710  * @param type type of the content
711  * @param priority priority of the content
712  * @param anonymity anonymity-level for the content
713  * @param expiration expiration time for the content
714  * @param msg set to an error message
715  * @return GNUNET_OK on success
716  */
717 static int
718 sqlite_plugin_put (void *cls,
719                    const GNUNET_HashCode * key,
720                    uint32_t size,
721                    const void *data,
722                    enum GNUNET_BLOCK_Type type,
723                    uint32_t priority,
724                    uint32_t anonymity,
725                    struct GNUNET_TIME_Absolute expiration,
726                    char ** msg)
727 {
728   struct Plugin *plugin = cls;
729   int n;
730   sqlite3_stmt *stmt;
731   GNUNET_HashCode vhash;
732
733 #if DEBUG_SQLITE
734   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
735                    "sqlite",
736                    "Storing in database block with type %u/key `%s'/priority %u/expiration %llu (%lld).\n",
737                    type, 
738                    GNUNET_h2s(key),
739                    priority,
740                    (unsigned long long) GNUNET_TIME_absolute_get_remaining (expiration).value,
741                    (long long) expiration.value);
742 #endif
743   GNUNET_CRYPTO_hash (data, size, &vhash);
744   stmt = plugin->insertContent;
745   if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
746       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
747       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
748       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
749       (SQLITE_OK != sqlite3_bind_int64 (stmt, 5, (sqlite3_int64) expiration.value)) ||
750       (SQLITE_OK !=
751        sqlite3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
752                           SQLITE_TRANSIENT)) ||
753       (SQLITE_OK !=
754        sqlite3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
755                           SQLITE_TRANSIENT))
756       || (SQLITE_OK !=
757           sqlite3_bind_blob (stmt, 8, data, size,
758                              SQLITE_TRANSIENT)))
759     {
760       LOG_SQLITE (plugin,
761                   msg,
762                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX");
763       if (SQLITE_OK != sqlite3_reset (stmt))
764         LOG_SQLITE (plugin, NULL,
765                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
766       return GNUNET_SYSERR;
767     }
768   n = sqlite3_step (stmt);
769   if (n != SQLITE_DONE)
770     {
771       if (n == SQLITE_BUSY)
772         {
773           LOG_SQLITE (plugin, msg,
774                       GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
775           sqlite3_reset (stmt);
776           GNUNET_break (0);
777           return GNUNET_NO;
778         }
779       LOG_SQLITE (plugin, msg,
780                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step");
781       sqlite3_reset (stmt);
782       return GNUNET_SYSERR;
783     }
784   if (SQLITE_OK != sqlite3_reset (stmt))
785     LOG_SQLITE (plugin, NULL,
786                 GNUNET_ERROR_TYPE_ERROR |
787                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
788   plugin->lastSync++;
789   plugin->payload += size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
790 #if DEBUG_SQLITE
791   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
792                    "sqlite",
793                    "Stored new entry (%u bytes), new payload is %llu\n",
794                    size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
795                    plugin->payload);
796 #endif
797   if (plugin->lastSync >= MAX_STAT_SYNC_LAG)
798     sync_stats (plugin);
799   return GNUNET_OK;
800 }
801
802
803 /**
804  * Update the priority for a particular key in the datastore.  If
805  * the expiration time in value is different than the time found in
806  * the datastore, the higher value should be kept.  For the
807  * anonymity level, the lower value is to be used.  The specified
808  * priority should be added to the existing priority, ignoring the
809  * priority in value.
810  *
811  * Note that it is possible for multiple values to match this put.
812  * In that case, all of the respective values are updated.
813  *
814  * @param cls the plugin context (state for this module)
815  * @param uid unique identifier of the datum
816  * @param delta by how much should the priority
817  *     change?  If priority + delta < 0 the
818  *     priority should be set to 0 (never go
819  *     negative).
820  * @param expire new expiration time should be the
821  *     MAX of any existing expiration time and
822  *     this value
823  * @param msg set to an error message
824  * @return GNUNET_OK on success
825  */
826 static int
827 sqlite_plugin_update (void *cls,
828                       uint64_t uid,
829                       int delta, struct GNUNET_TIME_Absolute expire,
830                       char **msg)
831 {
832   struct Plugin *plugin = cls;
833   int n;
834
835   sqlite3_bind_int (plugin->updPrio, 1, delta);
836   sqlite3_bind_int64 (plugin->updPrio, 2, expire.value);
837   sqlite3_bind_int64 (plugin->updPrio, 3, uid);
838   n = sqlite3_step (plugin->updPrio);
839   if (n != SQLITE_DONE)
840     LOG_SQLITE (plugin, msg,
841                 GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
842                 "sqlite3_step");
843 #if DEBUG_SQLITE
844   else
845     GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
846                      "sqlite",
847                      "Block updated\n");
848 #endif
849   sqlite3_reset (plugin->updPrio);
850
851   if (n == SQLITE_BUSY)
852     return GNUNET_NO;
853   return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR;
854 }
855
856
857 /**
858  * Internal context for an iteration.
859  */
860 struct IterContext
861 {
862   /**
863    * FIXME.
864    */
865   sqlite3_stmt *stmt_1;
866
867   /**
868    * FIXME.
869    */
870   sqlite3_stmt *stmt_2;
871
872   /**
873    * FIXME.
874    */
875   int is_asc;
876
877   /**
878    * FIXME.
879    */
880   int is_prio;
881
882   /**
883    * FIXME.
884    */
885   int is_migr;
886
887   /**
888    * FIXME.
889    */
890   int limit_nonanonymous;
891
892   /**
893    * Desired type for blocks returned by this iterator.
894    */
895   enum GNUNET_BLOCK_Type type;
896 };
897
898
899 /**
900  * Prepare our SQL query to obtain the next record from the database.
901  *
902  * @param cls our "struct IterContext"
903  * @param nc NULL to terminate the iteration, otherwise our context for
904  *           getting the next result.
905  * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
906  *         GNUNET_SYSERR on error (or end of iteration)
907  */
908 static int
909 iter_next_prepare (void *cls,
910                    struct NextContext *nc)
911 {
912   struct IterContext *ic = cls;
913   struct Plugin *plugin;
914   int ret;
915
916   if (nc == NULL)
917     {
918 #if DEBUG_SQLITE
919       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920                   "Asked to clean up iterator state.\n");
921 #endif
922       sqlite3_finalize (ic->stmt_1);
923       sqlite3_finalize (ic->stmt_2);
924       return GNUNET_SYSERR;
925     }
926   sqlite3_reset (ic->stmt_1);
927   sqlite3_reset (ic->stmt_2);
928   plugin = nc->plugin;
929   if (ic->is_prio)
930     {
931 #if DEBUG_SQLITE
932       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933                   "Restricting to results larger than the last priority %u\n",
934                   nc->lastPriority);
935 #endif
936       sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority);
937       sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority);
938     }
939   else
940     {
941 #if DEBUG_SQLITE
942       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943                   "Restricting to results larger than the last expiration %llu\n",
944                   (unsigned long long) nc->lastExpiration.value);
945 #endif
946       sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.value);
947       sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.value);
948     }
949 #if DEBUG_SQLITE
950   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
951               "Restricting to results larger than the last key `%s'\n",
952               GNUNET_h2s(&nc->lastKey));
953 #endif
954   sqlite3_bind_blob (ic->stmt_1, 2, 
955                      &nc->lastKey, 
956                      sizeof (GNUNET_HashCode),
957                      SQLITE_TRANSIENT);
958   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
959     {      
960 #if DEBUG_SQLITE
961       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962                   "Result found using iterator 1\n");
963 #endif
964       nc->stmt = ic->stmt_1;
965       return GNUNET_OK;
966     }
967   if (ret != SQLITE_DONE)
968     {
969       LOG_SQLITE (plugin, NULL,
970                   GNUNET_ERROR_TYPE_ERROR |
971                   GNUNET_ERROR_TYPE_BULK,
972                   "sqlite3_step");
973       return GNUNET_SYSERR;
974     }
975   if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
976     LOG_SQLITE (plugin, NULL,
977                 GNUNET_ERROR_TYPE_ERROR | 
978                 GNUNET_ERROR_TYPE_BULK, 
979                 "sqlite3_reset");
980   if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
981     {
982 #if DEBUG_SQLITE
983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
984                   "Result found using iterator 2\n");
985 #endif
986       nc->stmt = ic->stmt_2;
987       return GNUNET_OK;
988     }
989   if (ret != SQLITE_DONE)
990     {
991       LOG_SQLITE (plugin, NULL,
992                   GNUNET_ERROR_TYPE_ERROR |
993                   GNUNET_ERROR_TYPE_BULK,
994                   "sqlite3_step");
995       return GNUNET_SYSERR;
996     }
997   if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
998     LOG_SQLITE (plugin, NULL,
999                 GNUNET_ERROR_TYPE_ERROR |
1000                 GNUNET_ERROR_TYPE_BULK,
1001                 "sqlite3_reset");
1002 #if DEBUG_SQLITE
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "No result found using either iterator\n");
1005 #endif
1006   return GNUNET_NO;
1007 }
1008
1009
1010 /**
1011  * Call a method for each key in the database and
1012  * call the callback method on it.
1013  *
1014  * @param plugin our plugin context
1015  * @param type entries of which type should be considered?
1016  * @param is_asc are we iterating in ascending order?
1017  * @param is_prio are we iterating by priority (otherwise by expiration)
1018  * @param is_migr are we iterating in migration order?
1019  * @param limit_nonanonymous are we restricting results to those with anonymity
1020  *              level zero?
1021  * @param stmt_str_1 first SQL statement to execute
1022  * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration)
1023  * @param iter function to call on each matching value;
1024  *        will be called once with a NULL value at the end
1025  * @param iter_cls closure for iter
1026  */
1027 static void
1028 basic_iter (struct Plugin *plugin,
1029             enum GNUNET_BLOCK_Type type,
1030             int is_asc,
1031             int is_prio,
1032             int is_migr,
1033             int limit_nonanonymous,
1034             const char *stmt_str_1,
1035             const char *stmt_str_2,
1036             PluginIterator iter,
1037             void *iter_cls)
1038 {
1039   struct NextContext *nc;
1040   struct IterContext *ic;
1041   sqlite3_stmt *stmt_1;
1042   sqlite3_stmt *stmt_2;
1043
1044 #if DEBUG_SQLITE
1045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1046               "At %llu, using queries `%s' and `%s'\n",
1047               (unsigned long long) GNUNET_TIME_absolute_get ().value,
1048               stmt_str_1,
1049               stmt_str_2);
1050 #endif
1051   if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK)
1052     {
1053       LOG_SQLITE (plugin, NULL,
1054                   GNUNET_ERROR_TYPE_ERROR |
1055                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1056       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1057       return;
1058     }
1059   if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK)
1060     {
1061       LOG_SQLITE (plugin, NULL,
1062                   GNUNET_ERROR_TYPE_ERROR |
1063                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1064       sqlite3_finalize (stmt_1);
1065       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1066       return;
1067     }
1068   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1069                       sizeof(struct IterContext));
1070   nc->plugin = plugin;
1071   nc->iter = iter;
1072   nc->iter_cls = iter_cls;
1073   nc->stmt = NULL;
1074   ic = (struct IterContext*) &nc[1];
1075   ic->stmt_1 = stmt_1;
1076   ic->stmt_2 = stmt_2;
1077   ic->type = type;
1078   ic->is_asc = is_asc;
1079   ic->is_prio = is_prio;
1080   ic->is_migr = is_migr;
1081   ic->limit_nonanonymous = limit_nonanonymous;
1082   nc->prep = &iter_next_prepare;
1083   nc->prep_cls = ic;
1084   if (is_asc)
1085     {
1086       nc->lastPriority = 0;
1087       nc->lastExpiration.value = 0;
1088       memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode));
1089     }
1090   else
1091     {
1092       nc->lastPriority = 0x7FFFFFFF;
1093       nc->lastExpiration.value = 0x7FFFFFFFFFFFFFFFLL;
1094       memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
1095     }
1096   sqlite_next_request (nc, GNUNET_NO);
1097 }
1098
1099
1100 /**
1101  * Select a subset of the items in the datastore and call
1102  * the given iterator for each of them.
1103  *
1104  * @param cls our plugin context
1105  * @param type entries of which type should be considered?
1106  *        Use 0 for any type.
1107  * @param iter function to call on each matching value;
1108  *        will be called once with a NULL value at the end
1109  * @param iter_cls closure for iter
1110  */
1111 static void
1112 sqlite_plugin_iter_low_priority (void *cls,
1113                                  enum GNUNET_BLOCK_Type type,
1114                                  PluginIterator iter,
1115                                  void *iter_cls)
1116 {
1117   basic_iter (cls,
1118               type, 
1119               GNUNET_YES, GNUNET_YES, 
1120               GNUNET_NO, GNUNET_NO,
1121               SELECT_IT_LOW_PRIORITY_1,
1122               SELECT_IT_LOW_PRIORITY_2, 
1123               iter, iter_cls);
1124 }
1125
1126
1127 /**
1128  * Select a subset of the items in the datastore and call
1129  * the given iterator for each of them.
1130  *
1131  * @param cls our plugin context
1132  * @param type entries of which type should be considered?
1133  *        Use 0 for any type.
1134  * @param iter function to call on each matching value;
1135  *        will be called once with a NULL value at the end
1136  * @param iter_cls closure for iter
1137  */
1138 static void
1139 sqlite_plugin_iter_zero_anonymity (void *cls,
1140                                    enum GNUNET_BLOCK_Type type,
1141                                    PluginIterator iter,
1142                                    void *iter_cls)
1143 {
1144   struct GNUNET_TIME_Absolute now;
1145   char *q1;
1146   char *q2;
1147
1148   now = GNUNET_TIME_absolute_get ();
1149   GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1,
1150                    (unsigned long long) now.value);
1151   GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2,
1152                    (unsigned long long) now.value);
1153   basic_iter (cls,
1154               type, 
1155               GNUNET_NO, GNUNET_YES, 
1156               GNUNET_NO, GNUNET_YES,
1157               q1,
1158               q2,
1159               iter, iter_cls);
1160   GNUNET_free (q1);
1161   GNUNET_free (q2);
1162 }
1163
1164
1165
1166 /**
1167  * Select a subset of the items in the datastore and call
1168  * the given iterator for each of them.
1169  *
1170  * @param cls our plugin context
1171  * @param type entries of which type should be considered?
1172  *        Use 0 for any type.
1173  * @param iter function to call on each matching value;
1174  *        will be called once with a NULL value at the end
1175  * @param iter_cls closure for iter
1176  */
1177 static void
1178 sqlite_plugin_iter_ascending_expiration (void *cls,
1179                                          enum GNUNET_BLOCK_Type type,
1180                                          PluginIterator iter,
1181                                          void *iter_cls)
1182 {
1183   struct GNUNET_TIME_Absolute now;
1184   char *q1;
1185   char *q2;
1186
1187   now = GNUNET_TIME_absolute_get ();
1188   GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1,
1189                    (unsigned long long) 0*now.value);
1190   GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2,
1191                    (unsigned long long) 0*now.value);
1192   basic_iter (cls,
1193               type, 
1194               GNUNET_YES, GNUNET_NO, 
1195               GNUNET_NO, GNUNET_NO,
1196               q1, q2,
1197               iter, iter_cls);
1198   GNUNET_free (q1);
1199   GNUNET_free (q2);
1200 }
1201
1202
1203 /**
1204  * Select a subset of the items in the datastore and call
1205  * the given iterator for each of them.
1206  *
1207  * @param cls our plugin context
1208  * @param type entries of which type should be considered?
1209  *        Use 0 for any type.
1210  * @param iter function to call on each matching value;
1211  *        will be called once with a NULL value at the end
1212  * @param iter_cls closure for iter
1213  */
1214 static void
1215 sqlite_plugin_iter_migration_order (void *cls,
1216                                     enum GNUNET_BLOCK_Type type,
1217                                     PluginIterator iter,
1218                                     void *iter_cls)
1219 {
1220   struct GNUNET_TIME_Absolute now;
1221   char *q;
1222
1223   now = GNUNET_TIME_absolute_get ();
1224   GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2,
1225                    (unsigned long long) now.value);
1226   basic_iter (cls,
1227               type, 
1228               GNUNET_NO, GNUNET_NO, 
1229               GNUNET_YES, GNUNET_NO,
1230               SELECT_IT_MIGRATION_ORDER_1,
1231               q,
1232               iter, iter_cls);
1233   GNUNET_free (q);
1234 }
1235
1236
1237 /**
1238  * Call sqlite using the already prepared query to get
1239  * the next result.
1240  *
1241  * @param cls not used
1242  * @param nc context with the prepared query
1243  * @return GNUNET_OK on success, GNUNET_SYSERR on error, GNUNET_NO if
1244  *        there are no more results 
1245  */
1246 static int
1247 all_next_prepare (void *cls,
1248                   struct NextContext *nc)
1249 {
1250   struct Plugin *plugin;
1251   int ret;
1252
1253   if (nc == NULL)
1254     {
1255 #if DEBUG_SQLITE
1256       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257                   "Asked to clean up iterator state.\n");
1258 #endif
1259       return GNUNET_SYSERR;
1260     }
1261   plugin = nc->plugin;
1262   if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt)))
1263     {      
1264       return GNUNET_OK;
1265     }
1266   if (ret != SQLITE_DONE)
1267     {
1268       LOG_SQLITE (plugin, NULL,
1269                   GNUNET_ERROR_TYPE_ERROR |
1270                   GNUNET_ERROR_TYPE_BULK,
1271                   "sqlite3_step");
1272       return GNUNET_SYSERR;
1273     }
1274   return GNUNET_NO;
1275 }
1276
1277
1278 /**
1279  * Select a subset of the items in the datastore and call
1280  * the given iterator for each of them.
1281  *
1282  * @param cls our plugin context
1283  * @param type entries of which type should be considered?
1284  *        Use 0 for any type.
1285  * @param iter function to call on each matching value;
1286  *        will be called once with a NULL value at the end
1287  * @param iter_cls closure for iter
1288  */
1289 static void
1290 sqlite_plugin_iter_all_now (void *cls,
1291                             enum GNUNET_BLOCK_Type type,
1292                             PluginIterator iter,
1293                             void *iter_cls)
1294 {
1295   struct Plugin *plugin = cls;
1296   struct NextContext *nc;
1297   sqlite3_stmt *stmt;
1298
1299   if (sq_prepare (plugin->dbh, 
1300                   "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080",
1301                   &stmt) != SQLITE_OK)
1302     {
1303       LOG_SQLITE (plugin, NULL,
1304                   GNUNET_ERROR_TYPE_ERROR |
1305                   GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare");
1306       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1307       return;
1308     }
1309   nc = GNUNET_malloc (sizeof(struct NextContext));
1310   nc->plugin = plugin;
1311   nc->iter = iter;
1312   nc->iter_cls = iter_cls;
1313   nc->stmt = stmt;
1314   nc->prep = &all_next_prepare;
1315   nc->prep_cls = NULL;
1316   sqlite_next_request (nc, GNUNET_NO);
1317 }
1318
1319
1320 /**
1321  * FIXME.
1322  */
1323 struct GetNextContext
1324 {
1325
1326   /**
1327    * FIXME.
1328    */
1329   int total;
1330
1331   /**
1332    * FIXME.
1333    */
1334   int off;
1335
1336   /**
1337    * FIXME.
1338    */
1339   int have_vhash;
1340
1341   /**
1342    * FIXME.
1343    */
1344   unsigned int type;
1345
1346   /**
1347    * FIXME.
1348    */
1349   sqlite3_stmt *stmt;
1350
1351   /**
1352    * FIXME.
1353    */
1354   GNUNET_HashCode key;
1355
1356   /**
1357    * FIXME.
1358    */
1359   GNUNET_HashCode vhash;
1360 };
1361
1362
1363
1364 /**
1365  * FIXME.
1366  *
1367  * @param cls our "struct GetNextContext*"
1368  * @param nc FIXME
1369  * @return GNUNET_YES if there are more results, 
1370  *         GNUNET_NO if there are no more results,
1371  *         GNUNET_SYSERR on internal error
1372  */
1373 static int
1374 get_next_prepare (void *cls,
1375                   struct NextContext *nc)
1376 {
1377   struct GetNextContext *gnc = cls;
1378   int sqoff;
1379   int ret;
1380   int limit_off;
1381
1382   if (nc == NULL)
1383     {
1384       sqlite3_finalize (gnc->stmt);
1385       return GNUNET_SYSERR;
1386     }
1387   if (nc->count == gnc->total)
1388     return GNUNET_NO;
1389   if (nc->count + gnc->off == gnc->total)
1390     nc->last_rowid = 0;
1391   if (nc->count == 0)
1392     limit_off = gnc->off;
1393   else
1394     limit_off = 0;
1395   sqoff = 1;
1396   sqlite3_reset (nc->stmt);
1397   ret = sqlite3_bind_blob (nc->stmt,
1398                            sqoff++,
1399                            &gnc->key, 
1400                            sizeof (GNUNET_HashCode),
1401                            SQLITE_TRANSIENT);
1402   if ((gnc->have_vhash) && (ret == SQLITE_OK))
1403     ret = sqlite3_bind_blob (nc->stmt,
1404                              sqoff++,
1405                              &gnc->vhash,
1406                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1407   if ((gnc->type != 0) && (ret == SQLITE_OK))
1408     ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
1409   if (ret == SQLITE_OK)
1410     ret = sqlite3_bind_int64 (nc->stmt, sqoff++, nc->last_rowid + 1);
1411   if (ret == SQLITE_OK)
1412     ret = sqlite3_bind_int (nc->stmt, sqoff++, limit_off);
1413   if (ret != SQLITE_OK)
1414     return GNUNET_SYSERR;
1415   if (SQLITE_ROW != sqlite3_step (nc->stmt))
1416     return GNUNET_NO;
1417   return GNUNET_OK;
1418 }
1419
1420
1421 /**
1422  * Iterate over the results for a particular key
1423  * in the datastore.
1424  *
1425  * @param cls closure
1426  * @param key maybe NULL (to match all entries)
1427  * @param vhash hash of the value, maybe NULL (to
1428  *        match all values that have the right key).
1429  *        Note that for DBlocks there is no difference
1430  *        betwen key and vhash, but for other blocks
1431  *        there may be!
1432  * @param type entries of which type are relevant?
1433  *     Use 0 for any type.
1434  * @param iter function to call on each matching value;
1435  *        will be called once with a NULL value at the end
1436  * @param iter_cls closure for iter
1437  */
1438 static void
1439 sqlite_plugin_get (void *cls,
1440                    const GNUNET_HashCode * key,
1441                    const GNUNET_HashCode * vhash,
1442                    enum GNUNET_BLOCK_Type type,
1443                    PluginIterator iter, void *iter_cls)
1444 {
1445   struct Plugin *plugin = cls;
1446   struct GetNextContext *gpc;
1447   struct NextContext *nc;
1448   int ret;
1449   int total;
1450   sqlite3_stmt *stmt;
1451   char scratch[256];
1452   int sqoff;
1453
1454   GNUNET_assert (iter != NULL);
1455   if (key == NULL)
1456     {
1457       sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls);
1458       return;
1459     }
1460   GNUNET_snprintf (scratch, sizeof (scratch),
1461                    "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
1462                    vhash == NULL ? "" : " AND vhash=:2",
1463                    type == 0 ? "" : (vhash ==
1464                                      NULL) ? " AND type=:2" : " AND type=:3");
1465   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1466     {
1467       LOG_SQLITE (plugin, NULL,
1468                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1469       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1470       return;
1471     }
1472   sqoff = 1;
1473   ret = sqlite3_bind_blob (stmt,
1474                            sqoff++,
1475                            key, sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1476   if ((vhash != NULL) && (ret == SQLITE_OK))
1477     ret = sqlite3_bind_blob (stmt,
1478                              sqoff++,
1479                              vhash,
1480                              sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
1481   if ((type != 0) && (ret == SQLITE_OK))
1482     ret = sqlite3_bind_int (stmt, sqoff++, type);
1483   if (SQLITE_OK != ret)
1484     {
1485       LOG_SQLITE (plugin, NULL,
1486                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
1487       sqlite3_reset (stmt);
1488       sqlite3_finalize (stmt);
1489       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1490       return;
1491     }
1492   ret = sqlite3_step (stmt);
1493   if (ret != SQLITE_ROW)
1494     {
1495       LOG_SQLITE (plugin, NULL,
1496                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
1497                   "sqlite_step");
1498       sqlite3_reset (stmt);
1499       sqlite3_finalize (stmt);
1500       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1501       return;
1502     }
1503   total = sqlite3_column_int (stmt, 0);
1504   sqlite3_reset (stmt);
1505   sqlite3_finalize (stmt);
1506   if (0 == total)
1507     {
1508       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1509       return;
1510     }
1511
1512   GNUNET_snprintf (scratch, sizeof (scratch),
1513                    "SELECT size, type, prio, anonLevel, expire, hash, value, _ROWID_ "
1514                    "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
1515                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
1516                    vhash == NULL ? "" : " AND vhash=:2",
1517                    type == 0 ? "" : (vhash ==
1518                                      NULL) ? " AND type=:2" : " AND type=:3",
1519                    sqoff, sqoff + 1);
1520   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
1521     {
1522       LOG_SQLITE (plugin, NULL,
1523                   GNUNET_ERROR_TYPE_ERROR |
1524                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
1525       iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1526       return;
1527     }
1528   nc = GNUNET_malloc (sizeof(struct NextContext) + 
1529                       sizeof(struct GetNextContext));
1530   nc->plugin = plugin;
1531   nc->iter = iter;
1532   nc->iter_cls = iter_cls;
1533   nc->stmt = stmt;
1534   gpc = (struct GetNextContext*) &nc[1];
1535   gpc->total = total;
1536   gpc->type = type;
1537   gpc->key = *key;
1538   gpc->stmt = stmt; /* alias used for freeing at the end! */
1539   if (NULL != vhash)
1540     {
1541       gpc->have_vhash = GNUNET_YES;
1542       gpc->vhash = *vhash;
1543     }
1544   gpc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
1545   nc->prep = &get_next_prepare;
1546   nc->prep_cls = gpc;
1547   sqlite_next_request (nc, GNUNET_NO);
1548 }
1549
1550
1551 /**
1552  * Drop database.
1553  *
1554  * @param cls our plugin context
1555  */
1556 static void 
1557 sqlite_plugin_drop (void *cls)
1558 {
1559   struct Plugin *plugin = cls;
1560   plugin->drop_on_shutdown = GNUNET_YES;
1561 }
1562
1563
1564 /**
1565  * Callback function to process statistic values.
1566  *
1567  * @param cls closure
1568  * @param subsystem name of subsystem that created the statistic
1569  * @param name the name of the datum
1570  * @param value the current value
1571  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1572  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1573  */
1574 static int
1575 process_stat_in (void *cls,
1576                  const char *subsystem,
1577                  const char *name,
1578                  uint64_t value,
1579                  int is_persistent)
1580 {
1581   struct Plugin *plugin = cls;
1582
1583   plugin->stats_worked = GNUNET_YES;
1584   plugin->payload += value;
1585 #if DEBUG_SQLITE
1586   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1587                    "sqlite",
1588                    "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1589                    value,
1590                    plugin->payload);
1591 #endif
1592   return GNUNET_OK;
1593 }
1594
1595
1596 static void
1597 process_stat_done (void *cls,
1598                    int success)
1599 {
1600   struct Plugin *plugin = cls;
1601   sqlite3_stmt *stmt;
1602   uint64_t pages;
1603   uint64_t free_pages;
1604
1605   plugin->stat_get = NULL;
1606   if (plugin->stats_worked == GNUNET_NO)
1607     {
1608       CHECK (SQLITE_OK ==
1609              sq_prepare (plugin->dbh,
1610                          "PRAGMA page_count",
1611                          &stmt));
1612       if (SQLITE_ROW ==
1613           sqlite3_step (stmt))
1614         pages = sqlite3_column_int64 (stmt, 0);
1615       else
1616         pages = 0;
1617       sqlite3_finalize (stmt);
1618       CHECK (SQLITE_OK ==
1619              sq_prepare (plugin->dbh,
1620                          "PRAGMA freelist_count",
1621                          &stmt));
1622       CHECK (SQLITE_ROW ==
1623              sqlite3_step (stmt));
1624       free_pages = sqlite3_column_int64 (stmt, 0);
1625       sqlite3_finalize (stmt);
1626       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1627                   _("Using sqlite page utilization to estimate payload (%llu pages total, %llu free)\n"),
1628                   (unsigned long long) pages,
1629                   (unsigned long long) free_pages);
1630       plugin->payload = (pages - free_pages) * 4092LL;
1631     }
1632 }
1633                                          
1634
1635 /**
1636  * Entry point for the plugin.
1637  *
1638  * @param cls the "struct GNUNET_DATASTORE_PluginEnvironment*"
1639  * @return NULL on error, othrewise the plugin context
1640  */
1641 void *
1642 libgnunet_plugin_datastore_sqlite_init (void *cls)
1643 {
1644   static struct Plugin plugin;
1645   struct GNUNET_DATASTORE_PluginEnvironment *env = cls;
1646   struct GNUNET_DATASTORE_PluginFunctions *api;
1647
1648   if (plugin.env != NULL)
1649     return NULL; /* can only initialize once! */
1650   memset (&plugin, 0, sizeof(struct Plugin));
1651   plugin.env = env;
1652   plugin.statistics = GNUNET_STATISTICS_create (env->sched,
1653                                                 "ds-sqlite",
1654                                                 env->cfg);
1655   plugin.stat_get = GNUNET_STATISTICS_get (plugin.statistics,
1656                                            "ds-sqlite",
1657                                            QUOTA_STAT_NAME,
1658                                            GNUNET_TIME_UNIT_SECONDS,
1659                                            &process_stat_done,
1660                                            &process_stat_in,
1661                                            &plugin);
1662   if (GNUNET_OK !=
1663       database_setup (env->cfg, &plugin))
1664     {
1665       database_shutdown (&plugin);
1666       return NULL;
1667     }
1668   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
1669   api->cls = &plugin;
1670   api->get_size = &sqlite_plugin_get_size;
1671   api->put = &sqlite_plugin_put;
1672   api->next_request = &sqlite_next_request;
1673   api->get = &sqlite_plugin_get;
1674   api->update = &sqlite_plugin_update;
1675   api->iter_low_priority = &sqlite_plugin_iter_low_priority;
1676   api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
1677   api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration;
1678   api->iter_migration_order = &sqlite_plugin_iter_migration_order;
1679   api->iter_all_now = &sqlite_plugin_iter_all_now;
1680   api->drop = &sqlite_plugin_drop;
1681   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1682                    "sqlite", _("Sqlite database running\n"));
1683   return api;
1684 }
1685
1686
1687 /**
1688  * Exit point from the plugin.
1689  *
1690  * @param cls the plugin context (as returned by "init")
1691  * @return always NULL
1692  */
1693 void *
1694 libgnunet_plugin_datastore_sqlite_done (void *cls)
1695 {
1696   char *fn;
1697   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
1698   struct Plugin *plugin = api->cls;
1699
1700   if (plugin->stat_get != NULL)
1701     {
1702       GNUNET_STATISTICS_get_cancel (plugin->stat_get);
1703       plugin->stat_get = NULL;
1704     }
1705   if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
1706     {
1707       GNUNET_SCHEDULER_cancel (plugin->env->sched,
1708                                plugin->next_task);
1709       plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
1710       plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
1711       GNUNET_free (plugin->next_task_nc);
1712       plugin->next_task_nc = NULL;
1713     }
1714   fn = NULL;
1715   if (plugin->drop_on_shutdown)
1716     fn = GNUNET_strdup (plugin->fn);
1717   database_shutdown (plugin);
1718   GNUNET_STATISTICS_destroy (plugin->statistics,
1719                              GNUNET_NO);
1720   plugin->env = NULL; 
1721   plugin->payload = 0;
1722   GNUNET_free (api);
1723   if (fn != NULL)
1724     {
1725       if (0 != UNLINK(fn))
1726         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
1727                                   "unlink",
1728                                   fn);
1729       GNUNET_free (fn);
1730     }
1731   return NULL;
1732 }
1733
1734 /* end of plugin_datastore_sqlite.c */