fixes
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
46
47 /**
48  * After how many payload-changing operations
49  * do we sync our statistics?
50  */
51 #define MAX_STAT_SYNC_LAG 50
52
53
54 /**
55  * Our datastore plugin.
56  */
57 struct DatastorePlugin
58 {
59
60   /**
61    * API of the transport as returned by the plugin's
62    * initialization function.
63    */
64   struct GNUNET_DATASTORE_PluginFunctions *api;
65
66   /**
67    * Short name for the plugin (i.e. "sqlite").
68    */
69   char *short_name;
70
71   /**
72    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
73    */
74   char *lib_name;
75
76   /**
77    * Environment this transport service is using
78    * for this plugin.
79    */
80   struct GNUNET_DATASTORE_PluginEnvironment env;
81
82 };
83
84
85 /**
86  * Linked list of active reservations.
87  */
88 struct ReservationList 
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct ReservationList *next;
95
96   /**
97    * Client that made the reservation.
98    */
99   struct GNUNET_SERVER_Client *client;
100
101   /**
102    * Number of bytes (still) reserved.
103    */
104   uint64_t amount;
105
106   /**
107    * Number of items (still) reserved.
108    */
109   uint64_t entries;
110
111   /**
112    * Reservation identifier.
113    */
114   int32_t rid;
115
116 };
117
118
119
120 /**
121  * Our datastore plugin (NULL if not available).
122  */
123 static struct DatastorePlugin *plugin;
124
125 /**
126  * Linked list of space reservations made by clients.
127  */
128 static struct ReservationList *reservations;
129
130 /**
131  * Bloomfilter to quickly tell if we don't have the content.
132  */
133 static struct GNUNET_CONTAINER_BloomFilter *filter;
134
135 /**
136  * How much space are we allowed to use?
137  */
138 static unsigned long long quota;
139
140 /**
141  * How much space are we using for the cache?  (space available for
142  * insertions that will be instantly reclaimed by discarding less
143  * important content --- or possibly whatever we just inserted into
144  * the "cache").
145  */
146 static unsigned long long cache_size;
147
148 /**
149  * How much space have we currently reserved?
150  */
151 static unsigned long long reserved;
152   
153 /**
154  * How much data are we currently storing
155  * in the database?
156  */
157 static unsigned long long payload;
158
159 /**
160  * Number of updates that were made to the
161  * payload value since we last synchronized
162  * it with the statistics service.
163  */
164 static unsigned int lastSync;
165
166 /**
167  * Did we get an answer from statistics?
168  */
169 static int stats_worked;
170
171 /**
172  * Identity of the task that is used to delete
173  * expired content.
174  */
175 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
176
177 /**
178  * Our configuration.
179  */
180 const struct GNUNET_CONFIGURATION_Handle *cfg;
181
182 /**
183  * Our scheduler.
184  */
185 struct GNUNET_SCHEDULER_Handle *sched; 
186
187 /**
188  * Handle for reporting statistics.
189  */
190 static struct GNUNET_STATISTICS_Handle *stats;
191
192
193 /**
194  * Synchronize our utilization statistics with the 
195  * statistics service.
196  */
197 static void 
198 sync_stats ()
199 {
200   GNUNET_STATISTICS_set (stats,
201                          QUOTA_STAT_NAME,
202                          payload,
203                          GNUNET_YES);
204   lastSync = 0;
205 }
206
207
208
209
210 /**
211  * Function called once the transmit operation has
212  * either failed or succeeded.
213  *
214  * @param cls closure
215  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
216  */
217 typedef void (*TransmitContinuation)(void *cls,
218                                      int status);
219
220
221 /**
222  * Context for transmitting replies to clients.
223  */
224 struct TransmitCallbackContext 
225 {
226   
227   /**
228    * We keep these in a doubly-linked list (for cleanup).
229    */
230   struct TransmitCallbackContext *next;
231   
232   /**
233    * We keep these in a doubly-linked list (for cleanup).
234    */
235   struct TransmitCallbackContext *prev;
236   
237   /**
238    * The message that we're asked to transmit.
239    */
240   struct GNUNET_MessageHeader *msg;
241   
242   /**
243    * Handle for the transmission request.
244    */
245   struct GNUNET_CONNECTION_TransmitHandle *th;
246
247   /**
248    * Client that we are transmitting to.
249    */
250   struct GNUNET_SERVER_Client *client;
251
252   /**
253    * Function to call once msg has been transmitted
254    * (or at least added to the buffer).
255    */
256   TransmitContinuation tc;
257
258   /**
259    * Closure for tc.
260    */
261   void *tc_cls;
262
263   /**
264    * GNUNET_YES if we are supposed to signal the server
265    * completion of the client's request.
266    */
267   int end;
268 };
269
270   
271 /**
272  * Head of the doubly-linked list (for cleanup).
273  */
274 static struct TransmitCallbackContext *tcc_head;
275
276 /**
277  * Tail of the doubly-linked list (for cleanup).
278  */
279 static struct TransmitCallbackContext *tcc_tail;
280
281 /**
282  * Have we already cleaned up the TCCs and are hence no longer
283  * willing (or able) to transmit anything to anyone?
284  */
285 static int cleaning_done;
286
287 /**
288  * Handle for pending get request.
289  */
290 static struct GNUNET_STATISTICS_GetHandle *stat_get;
291
292
293 /**
294  * Task that is used to remove expired entries from
295  * the datastore.  This task will schedule itself
296  * again automatically to always delete all expired
297  * content quickly.
298  *
299  * @param cls not used
300  * @param tc task context
301  */ 
302 static void
303 delete_expired (void *cls,
304                 const struct GNUNET_SCHEDULER_TaskContext *tc);
305
306
307 /**
308  * Iterate over the expired items stored in the datastore.
309  * Delete all expired items; once we have processed all
310  * expired items, re-schedule the "delete_expired" task.
311  *
312  * @param cls not used
313  * @param next_cls closure to pass to the "next" function.
314  * @param key key for the content
315  * @param size number of bytes in data
316  * @param data content stored
317  * @param type type of the content
318  * @param priority priority of the content
319  * @param anonymity anonymity-level for the content
320  * @param expiration expiration time for the content
321  * @param uid unique identifier for the datum;
322  *        maybe 0 if no unique identifier is available
323  *
324  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
325  *         (continue on call to "next", of course),
326  *         GNUNET_NO to delete the item and continue (if supported)
327  */
328 static int 
329 expired_processor (void *cls,
330                    void *next_cls,
331                    const GNUNET_HashCode * key,
332                    uint32_t size,
333                    const void *data,
334                    enum GNUNET_BLOCK_Type type,
335                    uint32_t priority,
336                    uint32_t anonymity,
337                    struct GNUNET_TIME_Absolute
338                    expiration, 
339                    uint64_t uid)
340 {
341   struct GNUNET_TIME_Absolute now;
342
343   if (key == NULL) 
344     {
345       expired_kill_task 
346         = GNUNET_SCHEDULER_add_delayed (sched,
347                                         MAX_EXPIRE_DELAY,
348                                         &delete_expired,
349                                         NULL);
350       return GNUNET_SYSERR;
351     }
352   now = GNUNET_TIME_absolute_get ();
353   if (expiration.value > now.value)
354     {
355       /* finished processing */
356       plugin->api->next_request (next_cls, GNUNET_YES);
357       return GNUNET_SYSERR;
358     }
359   plugin->api->next_request (next_cls, GNUNET_NO);
360 #if DEBUG_DATASTORE
361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362               "Deleting content `%s' of type %u that expired %llu ms ago\n",
363               GNUNET_h2s (key),
364               type,
365               (unsigned long long) (now.value - expiration.value));
366 #endif
367   GNUNET_STATISTICS_update (stats,
368                             gettext_noop ("# bytes expired"),
369                             size,
370                             GNUNET_YES);
371   GNUNET_CONTAINER_bloomfilter_remove (filter,
372                                        key);
373   return GNUNET_NO; /* delete */
374 }
375
376
377 /**
378  * Task that is used to remove expired entries from
379  * the datastore.  This task will schedule itself
380  * again automatically to always delete all expired
381  * content quickly.
382  *
383  * @param cls not used
384  * @param tc task context
385  */ 
386 static void
387 delete_expired (void *cls,
388                 const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
391   plugin->api->iter_ascending_expiration (plugin->api->cls, 
392                                           0,
393                                           &expired_processor,
394                                           NULL);
395 }
396
397
398 /**
399  * An iterator over a set of items stored in the datastore.
400  *
401  * @param cls closure
402  * @param next_cls closure to pass to the "next" function.
403  * @param key key for the content
404  * @param size number of bytes in data
405  * @param data content stored
406  * @param type type of the content
407  * @param priority priority of the content
408  * @param anonymity anonymity-level for the content
409  * @param expiration expiration time for the content
410  * @param uid unique identifier for the datum;
411  *        maybe 0 if no unique identifier is available
412  *
413  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
414  *         (continue on call to "next", of course),
415  *         GNUNET_NO to delete the item and continue (if supported)
416  */
417 static int 
418 manage (void *cls,
419         void *next_cls,
420         const GNUNET_HashCode * key,
421         uint32_t size,
422         const void *data,
423         enum GNUNET_BLOCK_Type type,
424         uint32_t priority,
425         uint32_t anonymity,
426         struct GNUNET_TIME_Absolute
427         expiration, 
428         uint64_t uid)
429 {
430   unsigned long long *need = cls;
431
432   if (NULL == key)
433     {
434       GNUNET_free (need);
435       return GNUNET_SYSERR;
436     }
437   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
438     *need = 0;
439   else
440     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
441   plugin->api->next_request (next_cls, 
442                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
443 #if DEBUG_DATASTORE
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
446               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
447               GNUNET_h2s (key),
448               type,
449               *need);
450 #endif
451   GNUNET_STATISTICS_update (stats,
452                             gettext_noop ("# bytes purged (low-priority)"),
453                             size,
454                             GNUNET_YES);
455   GNUNET_CONTAINER_bloomfilter_remove (filter,
456                                        key);
457   return GNUNET_NO;
458 }
459
460
461 /**
462  * Manage available disk space by running tasks
463  * that will discard content if necessary.  This
464  * function will be run whenever a request for
465  * "need" bytes of storage could only be satisfied
466  * by eating into the "cache" (and we want our cache
467  * space back).
468  *
469  * @param need number of bytes of content that were
470  *        placed into the "cache" (and hence the
471  *        number of bytes that should be removed).
472  */
473 static void
474 manage_space (unsigned long long need)
475 {
476   unsigned long long *n;
477
478 #if DEBUG_DATASTORE
479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480               "Asked to free up %llu bytes of cache space\n",
481               need);
482 #endif
483   n = GNUNET_malloc (sizeof(unsigned long long));
484   *n = need;
485   plugin->api->iter_low_priority (plugin->api->cls,
486                                   0,
487                                   &manage,
488                                   n);
489 }
490
491
492 /**
493  * Function called to notify a client about the socket
494  * begin ready to queue more data.  "buf" will be
495  * NULL and "size" zero if the socket was closed for
496  * writing in the meantime.
497  *
498  * @param cls closure
499  * @param size number of bytes available in buf
500  * @param buf where the callee should write the message
501  * @return number of bytes written to buf
502  */
503 static size_t
504 transmit_callback (void *cls,
505                    size_t size, void *buf)
506 {
507   struct TransmitCallbackContext *tcc = cls;
508   size_t msize;
509   
510   tcc->th = NULL;
511   GNUNET_CONTAINER_DLL_remove (tcc_head,
512                                tcc_tail,
513                                tcc);
514   msize = ntohs(tcc->msg->size);
515   if (size == 0)
516     {
517 #if DEBUG_DATASTORE
518       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519                   "Transmission failed.\n");
520 #endif
521       if (tcc->tc != NULL)
522         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
523       if (GNUNET_YES == tcc->end)
524         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
525       GNUNET_SERVER_client_drop (tcc->client);
526       GNUNET_free (tcc->msg);
527       GNUNET_free (tcc);
528       return 0;
529     }
530   GNUNET_assert (size >= msize);
531   memcpy (buf, tcc->msg, msize);
532   if (tcc->tc != NULL)
533     tcc->tc (tcc->tc_cls, GNUNET_OK);
534   if (GNUNET_YES == tcc->end)
535     {
536       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
537     }
538   else
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "Response transmitted, more pending!\n");
543 #endif
544     }
545   GNUNET_SERVER_client_drop (tcc->client);
546   GNUNET_free (tcc->msg);
547   GNUNET_free (tcc);
548   return msize;
549 }
550
551
552 /**
553  * Transmit the given message to the client.
554  *
555  * @param client target of the message
556  * @param msg message to transmit, will be freed!
557  * @param tc function to call afterwards
558  * @param tc_cls closure for tc
559  * @param end is this the last response (and we should
560  *        signal the server completion accodingly after
561  *        transmitting this message)?
562  */
563 static void
564 transmit (struct GNUNET_SERVER_Client *client,
565           struct GNUNET_MessageHeader *msg,
566           TransmitContinuation tc,
567           void *tc_cls,
568           int end)
569 {
570   struct TransmitCallbackContext *tcc;
571
572   if (GNUNET_YES == cleaning_done)
573     {
574 #if DEBUG_DATASTORE
575       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
576                   "Shutdown in progress, aborting transmission.\n");
577 #endif
578       GNUNET_free (msg);
579       if (NULL != tc)
580         tc (tc_cls, GNUNET_SYSERR);
581       return;
582     }
583   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
584   tcc->msg = msg;
585   tcc->client = client;
586   tcc->tc = tc;
587   tcc->tc_cls = tc_cls;
588   tcc->end = end;
589   if (NULL ==
590       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
591                                                       ntohs(msg->size),
592                                                       GNUNET_TIME_UNIT_FOREVER_REL,
593                                                       &transmit_callback,
594                                                       tcc)))
595     {
596       GNUNET_break (0);
597       if (GNUNET_YES == end)
598         {
599 #if DEBUG_DATASTORE
600           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
601                       "Disconnecting client.\n");
602 #endif    
603           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
604         }
605       if (NULL != tc)
606         tc (tc_cls, GNUNET_SYSERR);
607       GNUNET_free (msg);
608       GNUNET_free (tcc);
609       return;
610     }
611   GNUNET_SERVER_client_keep (client);
612   GNUNET_CONTAINER_DLL_insert (tcc_head,
613                                tcc_tail,
614                                tcc);
615 }
616
617
618 /**
619  * Transmit a status code to the client.
620  *
621  * @param client receiver of the response
622  * @param code status code
623  * @param msg optional error message (can be NULL)
624  */
625 static void
626 transmit_status (struct GNUNET_SERVER_Client *client,
627                  int code,
628                  const char *msg)
629 {
630   struct StatusMessage *sm;
631   size_t slen;
632
633 #if DEBUG_DATASTORE
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635               "Transmitting `%s' message with value %d and message `%s'\n",
636               "STATUS",
637               code,
638               msg != NULL ? msg : "(none)");
639 #endif
640   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
641   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
642   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
643   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
644   sm->status = htonl(code);
645   if (slen > 0)
646     memcpy (&sm[1], msg, slen);  
647   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
648 }
649
650
651 /**
652  * Function called once the transmit operation has
653  * either failed or succeeded.
654  *
655  * @param next_cls closure for calling "next_request" callback
656  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
657  */
658 static void 
659 get_next(void *next_cls,
660          int status)
661 {
662   if (status != GNUNET_OK)
663     {
664       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
665                   _("Failed to transmit an item to the client; aborting iteration.\n"));
666       if (plugin != NULL)
667         plugin->api->next_request (next_cls, GNUNET_YES);
668       return;
669     }
670   plugin->api->next_request (next_cls, GNUNET_NO);
671 }
672
673
674 /**
675  * Function that will transmit the given datastore entry
676  * to the client.
677  *
678  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
679  * @param next_cls closure to use to ask for the next item
680  * @param key key for the content
681  * @param size number of bytes in data
682  * @param data content stored
683  * @param type type of the content
684  * @param priority priority of the content
685  * @param anonymity anonymity-level for the content
686  * @param expiration expiration time for the content
687  * @param uid unique identifier for the datum;
688  *        maybe 0 if no unique identifier is available
689  *
690  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
691  *         GNUNET_NO to delete the item and continue (if supported)
692  */
693 static int
694 transmit_item (void *cls,
695                void *next_cls,
696                const GNUNET_HashCode * key,
697                uint32_t size,
698                const void *data,
699                enum GNUNET_BLOCK_Type type,
700                uint32_t priority,
701                uint32_t anonymity,
702                struct GNUNET_TIME_Absolute
703                expiration, uint64_t uid)
704 {
705   struct GNUNET_SERVER_Client *client = cls;
706   struct GNUNET_MessageHeader *end;
707   struct DataMessage *dm;
708
709   if (key == NULL)
710     {
711       /* transmit 'DATA_END' */
712 #if DEBUG_DATASTORE
713       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714                   "Transmitting `%s' message\n",
715                   "DATA_END");
716 #endif
717       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
718       end->size = htons(sizeof(struct GNUNET_MessageHeader));
719       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
720       transmit (client, end, NULL, NULL, GNUNET_YES);
721       GNUNET_SERVER_client_drop (client);
722       return GNUNET_OK;
723     }
724   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
725   dm->header.size = htons(sizeof(struct DataMessage) + size);
726   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
727   dm->rid = htonl(0);
728   dm->size = htonl(size);
729   dm->type = htonl(type);
730   dm->priority = htonl(priority);
731   dm->anonymity = htonl(anonymity);
732   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
733   dm->uid = GNUNET_htonll(uid);
734   dm->key = *key;
735   memcpy (&dm[1], data, size);
736 #if DEBUG_DATASTORE
737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
738               "Transmitting `%s' message for `%s' of type %u\n",
739               "DATA",
740               GNUNET_h2s (key),
741               type);
742 #endif
743   GNUNET_STATISTICS_update (stats,
744                             gettext_noop ("# results found"),
745                             1,
746                             GNUNET_NO);
747   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
748   return GNUNET_OK;
749 }
750
751
752 /**
753  * Handle RESERVE-message.
754  *
755  * @param cls closure
756  * @param client identification of the client
757  * @param message the actual message
758  */
759 static void
760 handle_reserve (void *cls,
761                 struct GNUNET_SERVER_Client *client,
762                 const struct GNUNET_MessageHeader *message)
763 {
764   /**
765    * Static counter to produce reservation identifiers.
766    */
767   static int reservation_gen;
768
769   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
770   struct ReservationList *e;
771   unsigned long long used;
772   unsigned long long req;
773   uint64_t amount;
774   uint32_t entries;
775
776 #if DEBUG_DATASTORE
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "Processing `%s' request\n",
779               "RESERVE");
780 #endif
781   amount = GNUNET_ntohll(msg->amount);
782   entries = ntohl(msg->entries);
783   used = payload + reserved;
784   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
785   if (used + req > quota)
786     {
787       if (quota < used)
788         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
789       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
790                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
791                   quota - used,
792                   "RESERVE",
793                   req);
794       if (cache_size < req)
795         {
796           /* TODO: document this in the FAQ; essentially, if this
797              message happens, the insertion request could be blocked
798              by less-important content from migration because it is
799              larger than 1/8th of the overall available space, and
800              we only reserve 1/8th for "fresh" insertions */
801           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
802                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
803                       req,
804                       cache_size);
805           transmit_status (client, 0, 
806                            gettext_noop ("Insufficient space to satisfy request and "
807                                          "requested amount is larger than cache size"));
808         }
809       else
810         {
811           transmit_status (client, 0, 
812                            gettext_noop ("Insufficient space to satisfy request"));
813         }
814       return;      
815     }
816   reserved += req;
817   GNUNET_STATISTICS_set (stats,
818                          gettext_noop ("# reserved"),
819                          reserved,
820                          GNUNET_NO);
821   e = GNUNET_malloc (sizeof(struct ReservationList));
822   e->next = reservations;
823   reservations = e;
824   e->client = client;
825   e->amount = amount;
826   e->entries = entries;
827   e->rid = ++reservation_gen;
828   if (reservation_gen < 0)
829     reservation_gen = 0; /* wrap around */
830   transmit_status (client, e->rid, NULL);
831 }
832
833
834 /**
835  * Handle RELEASE_RESERVE-message.
836  *
837  * @param cls closure
838  * @param client identification of the client
839  * @param message the actual message
840  */
841 static void
842 handle_release_reserve (void *cls,
843                         struct GNUNET_SERVER_Client *client,
844                         const struct GNUNET_MessageHeader *message)
845 {
846   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
847   struct ReservationList *pos;
848   struct ReservationList *prev;
849   struct ReservationList *next;
850   int rid = ntohl(msg->rid);
851   unsigned long long rem;
852
853 #if DEBUG_DATASTORE
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Processing `%s' request\n",
856               "RELEASE_RESERVE");
857 #endif
858   next = reservations;
859   prev = NULL;
860   while (NULL != (pos = next))
861     {
862       next = pos->next;
863       if (rid == pos->rid)
864         {
865           if (prev == NULL)
866             reservations = next;
867           else
868             prev->next = next;
869           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
870           GNUNET_assert (reserved >= rem);
871           reserved -= rem;
872           GNUNET_STATISTICS_set (stats,
873                          gettext_noop ("# reserved"),
874                                  reserved,
875                                  GNUNET_NO);
876 #if DEBUG_DATASTORE
877           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                       "Returning %llu remaining reserved bytes to storage pool\n",
879                       rem);
880 #endif    
881           GNUNET_free (pos);
882           transmit_status (client, GNUNET_OK, NULL);
883           return;
884         }       
885       prev = pos;
886     }
887   GNUNET_break (0);
888   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
889 }
890
891
892 /**
893  * Check that the given message is a valid data message.
894  *
895  * @return NULL if the message is not well-formed, otherwise the message
896  */
897 static const struct DataMessage *
898 check_data (const struct GNUNET_MessageHeader *message)
899 {
900   uint16_t size;
901   uint32_t dsize;
902   const struct DataMessage *dm;
903
904   size = ntohs(message->size);
905   if (size < sizeof(struct DataMessage))
906     { 
907       GNUNET_break (0);
908       return NULL;
909     }
910   dm = (const struct DataMessage *) message;
911   dsize = ntohl(dm->size);
912   if (size != dsize + sizeof(struct DataMessage))
913     {
914       GNUNET_break (0);
915       return NULL;
916     }
917   return dm;
918 }
919
920
921 /**
922  * Context for a put request used to see if the content is
923  * already present.
924  */
925 struct PutContext
926 {
927   /**
928    * Client to notify on completion.
929    */
930   struct GNUNET_SERVER_Client *client;
931
932   /**
933    * Did we find the data already in the database?
934    */
935   int is_present;
936   
937   /* followed by the 'struct DataMessage' */
938 };
939
940
941 /**
942  * Actually put the data message.
943  */
944 static void
945 execute_put (struct GNUNET_SERVER_Client *client,
946              const struct DataMessage *dm)
947 {
948   uint32_t size;
949   char *msg;
950   int ret;
951
952   size = ntohl(dm->size);
953   msg = NULL;
954   ret = plugin->api->put (plugin->api->cls,
955                           &dm->key,
956                           size,
957                           &dm[1],
958                           ntohl(dm->type),
959                           ntohl(dm->priority),
960                           ntohl(dm->anonymity),
961                           GNUNET_TIME_absolute_ntoh(dm->expiration),
962                           &msg);
963   if (GNUNET_OK == ret)
964     {
965       GNUNET_STATISTICS_update (stats,
966                                 gettext_noop ("# bytes stored"),
967                                 size,
968                                 GNUNET_YES);
969       GNUNET_CONTAINER_bloomfilter_add (filter,
970                                         &dm->key);
971 #if DEBUG_DATASTORE
972       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973                   "Successfully stored %u bytes of type %u under key `%s'\n",
974                   size,
975                   ntohl(dm->type),
976                   GNUNET_h2s (&dm->key));
977 #endif
978     }
979   transmit_status (client, 
980                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
981                    msg);
982   GNUNET_free_non_null (msg);
983   if (quota - reserved - cache_size < payload)
984     {
985       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
986                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
987                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
988                   (unsigned long long) (quota - reserved - cache_size),
989                   (unsigned long long) payload);
990       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
991     }
992 }
993
994
995
996 /**
997  * Function that will check if the given datastore entry
998  * matches the put and if none match executes the put.
999  *
1000  * @param cls closure, pointer to the client (of type 'struct PutContext').
1001  * @param next_cls closure to use to ask for the next item
1002  * @param key key for the content
1003  * @param size number of bytes in data
1004  * @param data content stored
1005  * @param type type of the content
1006  * @param priority priority of the content
1007  * @param anonymity anonymity-level for the content
1008  * @param expiration expiration time for the content
1009  * @param uid unique identifier for the datum;
1010  *        maybe 0 if no unique identifier is available
1011  *
1012  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1013  *         GNUNET_NO to delete the item and continue (if supported)
1014  */
1015 static int
1016 check_present (void *cls,
1017                void *next_cls,
1018                const GNUNET_HashCode * key,
1019                uint32_t size,
1020                const void *data,
1021                enum GNUNET_BLOCK_Type type,
1022                uint32_t priority,
1023                uint32_t anonymity,
1024                struct GNUNET_TIME_Absolute
1025                expiration, uint64_t uid)
1026 {
1027   struct PutContext *pc = cls;
1028   const struct DataMessage *dm;
1029
1030   dm = (const struct DataMessage*) &pc[1];
1031   if (key == NULL)
1032     {
1033       if (pc->is_present == GNUNET_YES) 
1034         transmit_status (pc->client, GNUNET_OK, NULL);
1035       else
1036         execute_put (pc->client, dm);
1037       GNUNET_SERVER_client_drop (pc->client);
1038       GNUNET_free (pc);
1039       return GNUNET_SYSERR;
1040     }
1041   if ( (size == ntohl(dm->size)) &&
1042        (0 == memcmp (&dm[1],
1043                      data,
1044                      size)) )
1045     {
1046       pc->is_present = GNUNET_YES;
1047       plugin->api->next_request (next_cls, GNUNET_YES);
1048     }
1049   else
1050     {
1051       plugin->api->next_request (next_cls, GNUNET_NO);
1052     }
1053   return GNUNET_OK;
1054 }
1055
1056
1057 /**
1058  * Handle PUT-message.
1059  *
1060  * @param cls closure
1061  * @param client identification of the client
1062  * @param message the actual message
1063  */
1064 static void
1065 handle_put (void *cls,
1066             struct GNUNET_SERVER_Client *client,
1067             const struct GNUNET_MessageHeader *message)
1068 {
1069   const struct DataMessage *dm = check_data (message);
1070   int rid;
1071   struct ReservationList *pos;
1072   struct PutContext *pc;
1073   uint32_t size;
1074
1075   if ( (dm == NULL) ||
1076        (ntohl(dm->type) == 0) ) 
1077     {
1078       GNUNET_break (0);
1079       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1080       return;
1081     }
1082 #if DEBUG_DATASTORE
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084               "Processing `%s' request for `%s' of type %u\n",
1085               "PUT",
1086               GNUNET_h2s (&dm->key),
1087               ntohl (dm->type));
1088 #endif
1089   rid = ntohl(dm->rid);
1090   size = ntohl(dm->size);
1091   if (rid > 0)
1092     {
1093       pos = reservations;
1094       while ( (NULL != pos) &&
1095               (rid != pos->rid) )
1096         pos = pos->next;
1097       GNUNET_break (pos != NULL);
1098       if (NULL != pos)
1099         {
1100           GNUNET_break (pos->entries > 0);
1101           GNUNET_break (pos->amount >= size);
1102           pos->entries--;
1103           pos->amount -= size;
1104           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1105           GNUNET_STATISTICS_set (stats,
1106                                  gettext_noop ("# reserved"),
1107                                  reserved,
1108                                  GNUNET_NO);
1109         }
1110     }
1111   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1112                                                        &dm->key))
1113     {
1114       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1115       pc->client = client;
1116       GNUNET_SERVER_client_keep (client);
1117       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1118       plugin->api->get (plugin->api->cls,
1119                         &dm->key,
1120                         NULL,
1121                         ntohl (dm->type),
1122                         &check_present,
1123                         pc);      
1124       return;
1125     }
1126   execute_put (client, dm);
1127 }
1128
1129
1130 /**
1131  * Handle GET-message.
1132  *
1133  * @param cls closure
1134  * @param client identification of the client
1135  * @param message the actual message
1136  */
1137 static void
1138 handle_get (void *cls,
1139             struct GNUNET_SERVER_Client *client,
1140             const struct GNUNET_MessageHeader *message)
1141 {
1142   const struct GetMessage *msg;
1143   uint16_t size;
1144
1145   size = ntohs(message->size);
1146   if ( (size != sizeof(struct GetMessage)) &&
1147        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1148     {
1149       GNUNET_break (0);
1150       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1151       return;
1152     }
1153   msg = (const struct GetMessage*) message;
1154 #if DEBUG_DATASTORE
1155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156               "Processing `%s' request for `%s' of type %u\n",
1157               "GET",
1158               GNUNET_h2s (&msg->key),
1159               ntohl (msg->type));
1160 #endif
1161   GNUNET_STATISTICS_update (stats,
1162                             gettext_noop ("# GET requests received"),
1163                             1,
1164                             GNUNET_NO);
1165   GNUNET_SERVER_client_keep (client);
1166   if ( (size == sizeof(struct GetMessage)) &&
1167        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1168                                                          &msg->key)) )
1169     {
1170       /* don't bother database... */
1171 #if DEBUG_DATASTORE
1172       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1173                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1174                   "GET",
1175                   GNUNET_h2s (&msg->key));
1176 #endif  
1177       GNUNET_STATISTICS_update (stats,
1178                                 gettext_noop ("# requests filtered by bloomfilter"),
1179                                 1,
1180                                 GNUNET_NO);
1181       transmit_item (client,
1182                      NULL, NULL, 0, NULL, 0, 0, 0, 
1183                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1184       return;
1185     }
1186   plugin->api->get (plugin->api->cls,
1187                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1188                     NULL,
1189                     ntohl(msg->type),
1190                     &transmit_item,
1191                     client);    
1192 }
1193
1194
1195 /**
1196  * Handle UPDATE-message.
1197  *
1198  * @param cls closure
1199  * @param client identification of the client
1200  * @param message the actual message
1201  */
1202 static void
1203 handle_update (void *cls,
1204                struct GNUNET_SERVER_Client *client,
1205                const struct GNUNET_MessageHeader *message)
1206 {
1207   const struct UpdateMessage *msg;
1208   int ret;
1209   char *emsg;
1210
1211   GNUNET_STATISTICS_update (stats,
1212                             gettext_noop ("# UPDATE requests received"),
1213                             1,
1214                             GNUNET_NO);
1215   msg = (const struct UpdateMessage*) message;
1216   emsg = NULL;
1217 #if DEBUG_DATASTORE
1218   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1219               "Processing `%s' request for %llu\n",
1220               "UPDATE",
1221               (unsigned long long) GNUNET_ntohll (msg->uid));
1222 #endif
1223   ret = plugin->api->update (plugin->api->cls,
1224                              GNUNET_ntohll(msg->uid),
1225                              (int32_t) ntohl(msg->priority),
1226                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1227                              &emsg);
1228   transmit_status (client, ret, emsg);
1229   GNUNET_free_non_null (emsg);
1230 }
1231
1232
1233 /**
1234  * Handle GET_RANDOM-message.
1235  *
1236  * @param cls closure
1237  * @param client identification of the client
1238  * @param message the actual message
1239  */
1240 static void
1241 handle_get_random (void *cls,
1242                    struct GNUNET_SERVER_Client *client,
1243                    const struct GNUNET_MessageHeader *message)
1244 {
1245 #if DEBUG_DATASTORE
1246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247               "Processing `%s' request\n",
1248               "GET_RANDOM");
1249 #endif
1250   GNUNET_STATISTICS_update (stats,
1251                             gettext_noop ("# GET RANDOM requests received"),
1252                             1,
1253                             GNUNET_NO);
1254   GNUNET_SERVER_client_keep (client);
1255   plugin->api->iter_migration_order (plugin->api->cls,
1256                                      0,
1257                                      &transmit_item,
1258                                      client);  
1259 }
1260
1261
1262 /**
1263  * Context for the 'remove_callback'.
1264  */
1265 struct RemoveContext 
1266 {
1267   /**
1268    * Client for whom we're doing the remvoing.
1269    */
1270   struct GNUNET_SERVER_Client *client;
1271
1272   /**
1273    * GNUNET_YES if we managed to remove something.
1274    */
1275   int found;
1276 };
1277
1278
1279 /**
1280  * Callback function that will cause the item that is passed
1281  * in to be deleted (by returning GNUNET_NO).
1282  */
1283 static int
1284 remove_callback (void *cls,
1285                  void *next_cls,
1286                  const GNUNET_HashCode * key,
1287                  uint32_t size,
1288                  const void *data,
1289                  enum GNUNET_BLOCK_Type type,
1290                  uint32_t priority,
1291                  uint32_t anonymity,
1292                  struct GNUNET_TIME_Absolute
1293                  expiration, uint64_t uid)
1294 {
1295   struct RemoveContext *rc = cls;
1296
1297   if (key == NULL)
1298     {
1299 #if DEBUG_DATASTORE
1300       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301                   "No further matches for `%s' request.\n",
1302                   "REMOVE");
1303 #endif  
1304       if (GNUNET_YES == rc->found)
1305         transmit_status (rc->client, GNUNET_OK, NULL);       
1306       else
1307         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1308       GNUNET_SERVER_client_drop (rc->client);
1309       GNUNET_free (rc);
1310       return GNUNET_OK; /* last item */
1311     }
1312   rc->found = GNUNET_YES;
1313 #if DEBUG_DATASTORE
1314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1316               (unsigned long long) uid,
1317               "REMOVE",
1318               GNUNET_h2s (key),
1319               type);
1320 #endif  
1321   GNUNET_STATISTICS_update (stats,
1322                             gettext_noop ("# bytes removed (explicit request)"),
1323                             size,
1324                             GNUNET_YES);
1325   GNUNET_CONTAINER_bloomfilter_remove (filter,
1326                                        key);
1327   plugin->api->next_request (next_cls, GNUNET_YES);
1328   return GNUNET_NO;
1329 }
1330
1331
1332 /**
1333  * Handle REMOVE-message.
1334  *
1335  * @param cls closure
1336  * @param client identification of the client
1337  * @param message the actual message
1338  */
1339 static void
1340 handle_remove (void *cls,
1341              struct GNUNET_SERVER_Client *client,
1342              const struct GNUNET_MessageHeader *message)
1343 {
1344   const struct DataMessage *dm = check_data (message);
1345   GNUNET_HashCode vhash;
1346   struct RemoveContext *rc;
1347
1348   if (dm == NULL)
1349     {
1350       GNUNET_break (0);
1351       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1352       return;
1353     }
1354 #if DEBUG_DATASTORE
1355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1356               "Processing `%s' request for `%s' of type %u\n",
1357               "REMOVE",
1358               GNUNET_h2s (&dm->key),
1359               ntohl (dm->type));
1360 #endif
1361   GNUNET_STATISTICS_update (stats,
1362                             gettext_noop ("# REMOVE requests received"),
1363                             1,
1364                             GNUNET_NO);
1365   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1366   GNUNET_SERVER_client_keep (client);
1367   rc->client = client;
1368   GNUNET_CRYPTO_hash (&dm[1],
1369                       ntohl(dm->size),
1370                       &vhash);
1371   plugin->api->get (plugin->api->cls,
1372                     &dm->key,
1373                     &vhash,
1374                     ntohl(dm->type),
1375                     &remove_callback,
1376                     rc);
1377 }
1378
1379
1380 /**
1381  * Handle DROP-message.
1382  *
1383  * @param cls closure
1384  * @param client identification of the client
1385  * @param message the actual message
1386  */
1387 static void
1388 handle_drop (void *cls,
1389              struct GNUNET_SERVER_Client *client,
1390              const struct GNUNET_MessageHeader *message)
1391 {
1392 #if DEBUG_DATASTORE
1393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1394               "Processing `%s' request\n",
1395               "DROP");
1396 #endif
1397   plugin->api->drop (plugin->api->cls);
1398   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1399 }
1400
1401
1402 /**
1403  * Function called by plugins to notify us about a
1404  * change in their disk utilization.
1405  *
1406  * @param cls closure (NULL)
1407  * @param delta change in disk utilization, 
1408  *        0 for "reset to empty"
1409  */
1410 static void
1411 disk_utilization_change_cb (void *cls,
1412                             int delta)
1413 {
1414   if ( (delta < 0) &&
1415        (payload < -delta) )
1416     {
1417       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1418                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1419                   (long long) payload,
1420                   (long long) -delta);
1421       payload = plugin->api->get_size (plugin->api->cls);
1422       sync_stats ();
1423       return;
1424     }
1425   payload += delta;
1426   lastSync++;
1427   if (lastSync >= MAX_STAT_SYNC_LAG)
1428     sync_stats ();
1429 }
1430
1431
1432 /**
1433  * Callback function to process statistic values.
1434  *
1435  * @param cls closure (struct Plugin*)
1436  * @param subsystem name of subsystem that created the statistic
1437  * @param name the name of the datum
1438  * @param value the current value
1439  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1440  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1441  */
1442 static int
1443 process_stat_in (void *cls,
1444                  const char *subsystem,
1445                  const char *name,
1446                  uint64_t value,
1447                  int is_persistent)
1448 {
1449   GNUNET_assert (stats_worked == GNUNET_NO);
1450   stats_worked = GNUNET_YES;
1451   payload += value;
1452 #if DEBUG_SQLITE
1453   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1454               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1455               value,
1456               payload);
1457 #endif
1458   return GNUNET_OK;
1459 }
1460
1461
1462 static void
1463 process_stat_done (void *cls,
1464                    int success)
1465 {
1466   struct DatastorePlugin *plugin = cls;
1467
1468   stat_get = NULL;
1469   if (stats_worked == GNUNET_NO) 
1470     payload = plugin->api->get_size (plugin->api->cls);
1471 }
1472
1473
1474 /**
1475  * Load the datastore plugin.
1476  */
1477 static struct DatastorePlugin *
1478 load_plugin () 
1479 {
1480   struct DatastorePlugin *ret;
1481   char *libname;
1482   char *name;
1483
1484   if (GNUNET_OK !=
1485       GNUNET_CONFIGURATION_get_value_string (cfg,
1486                                              "DATASTORE", "DATABASE", &name))
1487     {
1488       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1489                   _("No `%s' specified for `%s' in configuration!\n"),
1490                   "DATABASE",
1491                   "DATASTORE");
1492       return NULL;
1493     }
1494   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1495   ret->env.cfg = cfg;
1496   ret->env.sched = sched;  
1497   ret->env.duc = &disk_utilization_change_cb;
1498   ret->env.cls = NULL;
1499   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1500               _("Loading `%s' datastore plugin\n"), name);
1501   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1502   ret->short_name = name;
1503   ret->lib_name = libname;
1504   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1505   if (ret->api == NULL)
1506     {
1507       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1508                   _("Failed to load datastore plugin for `%s'\n"), name);
1509       GNUNET_free (ret->short_name);
1510       GNUNET_free (libname);
1511       GNUNET_free (ret);
1512       return NULL;
1513     }
1514   return ret;
1515 }
1516
1517
1518 /**
1519  * Function called when the service shuts
1520  * down.  Unloads our datastore plugin.
1521  *
1522  * @param plug plugin to unload
1523  */
1524 static void
1525 unload_plugin (struct DatastorePlugin *plug)
1526 {
1527 #if DEBUG_DATASTORE
1528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1529               "Datastore service is unloading plugin...\n");
1530 #endif
1531   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1532   GNUNET_free (plug->lib_name);
1533   GNUNET_free (plug->short_name);
1534   GNUNET_free (plug);
1535 }
1536
1537
1538 /**
1539  * Final task run after shutdown.  Unloads plugins and disconnects us from
1540  * statistics.
1541  */
1542 static void
1543 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1544 {
1545   unload_plugin (plugin);
1546   plugin = NULL;
1547   if (filter != NULL)
1548     {
1549       GNUNET_CONTAINER_bloomfilter_free (filter);
1550       filter = NULL;
1551     }
1552   if (lastSync > 0)
1553     sync_stats ();
1554   if (stat_get != NULL)
1555     {
1556       GNUNET_STATISTICS_get_cancel (stat_get);
1557       stat_get = NULL;
1558     }
1559   if (stats != NULL)
1560     {
1561       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1562       stats = NULL;
1563     }
1564 }
1565
1566
1567 /**
1568  * Last task run during shutdown.  Disconnects us from
1569  * the transport and core.
1570  */
1571 static void
1572 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1573 {
1574   struct TransmitCallbackContext *tcc;
1575
1576   cleaning_done = GNUNET_YES;
1577   while (NULL != (tcc = tcc_head))
1578     {
1579       GNUNET_CONTAINER_DLL_remove (tcc_head,
1580                                    tcc_tail,
1581                                    tcc);
1582       if (tcc->th != NULL)
1583         {
1584           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1585           GNUNET_SERVER_client_drop (tcc->client);
1586         }
1587       if (NULL != tcc->tc)
1588         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1589       GNUNET_free (tcc->msg);
1590       GNUNET_free (tcc);
1591     }
1592   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1593     {
1594       GNUNET_SCHEDULER_cancel (sched,
1595                                expired_kill_task);
1596       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1597     }
1598   GNUNET_SCHEDULER_add_continuation (sched,
1599                                      &unload_task,
1600                                      NULL,
1601                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1602 }
1603
1604
1605 /**
1606  * Function that removes all active reservations made
1607  * by the given client and releases the space for other
1608  * requests.
1609  *
1610  * @param cls closure
1611  * @param client identification of the client
1612  */
1613 static void
1614 cleanup_reservations (void *cls,
1615                       struct GNUNET_SERVER_Client
1616                       * client)
1617 {
1618   struct ReservationList *pos;
1619   struct ReservationList *prev;
1620   struct ReservationList *next;
1621
1622   if (client == NULL)
1623     return;
1624   prev = NULL;
1625   pos = reservations;
1626   while (NULL != pos)
1627     {
1628       next = pos->next;
1629       if (pos->client == client)
1630         {
1631           if (prev == NULL)
1632             reservations = next;
1633           else
1634             prev->next = next;
1635           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1636           GNUNET_free (pos);
1637         }
1638       else
1639         {
1640           prev = pos;
1641         }
1642       pos = next;
1643     }
1644   GNUNET_STATISTICS_set (stats,
1645                          gettext_noop ("# reserved"),
1646                          reserved,
1647                          GNUNET_NO);
1648 }
1649
1650
1651 /**
1652  * Process datastore requests.
1653  *
1654  * @param cls closure
1655  * @param s scheduler to use
1656  * @param server the initialized server
1657  * @param c configuration to use
1658  */
1659 static void
1660 run (void *cls,
1661      struct GNUNET_SCHEDULER_Handle *s,
1662      struct GNUNET_SERVER_Handle *server,
1663      const struct GNUNET_CONFIGURATION_Handle *c)
1664 {
1665   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1666     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1667      sizeof(struct ReserveMessage) }, 
1668     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1669      sizeof(struct ReleaseReserveMessage) }, 
1670     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1671     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1672      sizeof (struct UpdateMessage) }, 
1673     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1674     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1675      sizeof(struct GNUNET_MessageHeader) }, 
1676     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1677     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1678      sizeof(struct GNUNET_MessageHeader) }, 
1679     {NULL, NULL, 0, 0}
1680   };
1681   char *fn;
1682   unsigned int bf_size;
1683
1684   sched = s;
1685   cfg = c;
1686   if (GNUNET_OK !=
1687       GNUNET_CONFIGURATION_get_value_number (cfg,
1688                                              "DATASTORE", "QUOTA", &quota))
1689     {
1690       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1691                   _("No `%s' specified for `%s' in configuration!\n"),
1692                   "QUOTA",
1693                   "DATASTORE");
1694       return;
1695     }
1696   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1697   GNUNET_STATISTICS_set (stats,
1698                          gettext_noop ("# quota"),
1699                          quota,
1700                          GNUNET_NO);
1701   cache_size = quota / 8; /* Or should we make this an option? */
1702   GNUNET_STATISTICS_set (stats,
1703                          gettext_noop ("# cache size"),
1704                          cache_size,
1705                          GNUNET_NO);
1706   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1707   fn = NULL;
1708   if ( (GNUNET_OK !=
1709         GNUNET_CONFIGURATION_get_value_filename (cfg,
1710                                                  "DATASTORE",
1711                                                  "BLOOMFILTER",
1712                                                  &fn)) ||
1713        (GNUNET_OK !=
1714         GNUNET_DISK_directory_create_for_file (fn)) )
1715     {
1716       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1717                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1718                   fn != NULL ? fn : "");
1719       GNUNET_free_non_null (fn);
1720       fn = NULL;
1721     }
1722   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1723   GNUNET_free_non_null (fn);
1724   if (filter == NULL)
1725     {
1726       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1727                   _("Failed to initialize bloomfilter.\n"));
1728       if (stats != NULL)
1729         {
1730           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1731           stats = NULL;
1732         }
1733       return;
1734     }
1735   plugin = load_plugin ();
1736   if (NULL == plugin)
1737     {
1738       GNUNET_CONTAINER_bloomfilter_free (filter);
1739       filter = NULL;
1740       if (stats != NULL)
1741         {
1742           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1743           stats = NULL;
1744         }
1745       return;
1746     }
1747   stat_get = GNUNET_STATISTICS_get (stats,
1748                                     "datastore",
1749                                     QUOTA_STAT_NAME,
1750                                     GNUNET_TIME_UNIT_SECONDS,
1751                                     &process_stat_done,
1752                                     &process_stat_in,
1753                                     plugin);
1754   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1755   GNUNET_SERVER_add_handlers (server, handlers);
1756   expired_kill_task
1757     = GNUNET_SCHEDULER_add_with_priority (sched,
1758                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1759                                           &delete_expired, NULL);
1760   GNUNET_SCHEDULER_add_delayed (sched,
1761                                 GNUNET_TIME_UNIT_FOREVER_REL,
1762                                 &cleaning_task, NULL);
1763 }
1764
1765
1766 /**
1767  * The main function for the datastore service.
1768  *
1769  * @param argc number of arguments from the command line
1770  * @param argv command line arguments
1771  * @return 0 ok, 1 on error
1772  */
1773 int
1774 main (int argc, char *const *argv)
1775 {
1776   int ret;
1777
1778   ret = (GNUNET_OK ==
1779          GNUNET_SERVICE_run (argc,
1780                              argv,
1781                              "datastore",
1782                              GNUNET_SERVICE_OPTION_NONE,
1783                              &run, NULL)) ? 0 : 1;
1784   return ret;
1785 }
1786
1787
1788 /* end of gnunet-service-datastore.c */