8a896a7fbdcce5bd2e9cc5ccb90dbc4daac7afe6
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
46
47 /**
48  * After how many payload-changing operations
49  * do we sync our statistics?
50  */
51 #define MAX_STAT_SYNC_LAG 50
52
53
54 /**
55  * Our datastore plugin.
56  */
57 struct DatastorePlugin
58 {
59
60   /**
61    * API of the transport as returned by the plugin's
62    * initialization function.
63    */
64   struct GNUNET_DATASTORE_PluginFunctions *api;
65
66   /**
67    * Short name for the plugin (i.e. "sqlite").
68    */
69   char *short_name;
70
71   /**
72    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
73    */
74   char *lib_name;
75
76   /**
77    * Environment this transport service is using
78    * for this plugin.
79    */
80   struct GNUNET_DATASTORE_PluginEnvironment env;
81
82 };
83
84
85 /**
86  * Linked list of active reservations.
87  */
88 struct ReservationList 
89 {
90
91   /**
92    * This is a linked list.
93    */
94   struct ReservationList *next;
95
96   /**
97    * Client that made the reservation.
98    */
99   struct GNUNET_SERVER_Client *client;
100
101   /**
102    * Number of bytes (still) reserved.
103    */
104   uint64_t amount;
105
106   /**
107    * Number of items (still) reserved.
108    */
109   uint64_t entries;
110
111   /**
112    * Reservation identifier.
113    */
114   int32_t rid;
115
116 };
117
118
119
120 /**
121  * Our datastore plugin (NULL if not available).
122  */
123 static struct DatastorePlugin *plugin;
124
125 /**
126  * Linked list of space reservations made by clients.
127  */
128 static struct ReservationList *reservations;
129
130 /**
131  * Bloomfilter to quickly tell if we don't have the content.
132  */
133 static struct GNUNET_CONTAINER_BloomFilter *filter;
134
135 /**
136  * How much space are we allowed to use?
137  */
138 static unsigned long long quota;
139
140 /**
141  * How much space are we using for the cache?  (space available for
142  * insertions that will be instantly reclaimed by discarding less
143  * important content --- or possibly whatever we just inserted into
144  * the "cache").
145  */
146 static unsigned long long cache_size;
147
148 /**
149  * How much space have we currently reserved?
150  */
151 static unsigned long long reserved;
152   
153 /**
154  * How much data are we currently storing
155  * in the database?
156  */
157 static unsigned long long payload;
158
159 /**
160  * Number of updates that were made to the
161  * payload value since we last synchronized
162  * it with the statistics service.
163  */
164 static unsigned int lastSync;
165
166 /**
167  * Did we get an answer from statistics?
168  */
169 static int stats_worked;
170
171 /**
172  * Identity of the task that is used to delete
173  * expired content.
174  */
175 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
176
177 /**
178  * Our configuration.
179  */
180 const struct GNUNET_CONFIGURATION_Handle *cfg;
181
182 /**
183  * Our scheduler.
184  */
185 struct GNUNET_SCHEDULER_Handle *sched; 
186
187 /**
188  * Handle for reporting statistics.
189  */
190 static struct GNUNET_STATISTICS_Handle *stats;
191
192
193 /**
194  * Synchronize our utilization statistics with the 
195  * statistics service.
196  */
197 static void 
198 sync_stats ()
199 {
200   GNUNET_STATISTICS_set (stats,
201                          QUOTA_STAT_NAME,
202                          payload,
203                          GNUNET_YES);
204   lastSync = 0;
205 }
206
207
208
209
210 /**
211  * Function called once the transmit operation has
212  * either failed or succeeded.
213  *
214  * @param cls closure
215  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
216  */
217 typedef void (*TransmitContinuation)(void *cls,
218                                      int status);
219
220
221 /**
222  * Context for transmitting replies to clients.
223  */
224 struct TransmitCallbackContext 
225 {
226   
227   /**
228    * We keep these in a doubly-linked list (for cleanup).
229    */
230   struct TransmitCallbackContext *next;
231   
232   /**
233    * We keep these in a doubly-linked list (for cleanup).
234    */
235   struct TransmitCallbackContext *prev;
236   
237   /**
238    * The message that we're asked to transmit.
239    */
240   struct GNUNET_MessageHeader *msg;
241   
242   /**
243    * Handle for the transmission request.
244    */
245   struct GNUNET_CONNECTION_TransmitHandle *th;
246
247   /**
248    * Client that we are transmitting to.
249    */
250   struct GNUNET_SERVER_Client *client;
251
252   /**
253    * Function to call once msg has been transmitted
254    * (or at least added to the buffer).
255    */
256   TransmitContinuation tc;
257
258   /**
259    * Closure for tc.
260    */
261   void *tc_cls;
262
263   /**
264    * GNUNET_YES if we are supposed to signal the server
265    * completion of the client's request.
266    */
267   int end;
268 };
269
270   
271 /**
272  * Head of the doubly-linked list (for cleanup).
273  */
274 static struct TransmitCallbackContext *tcc_head;
275
276 /**
277  * Tail of the doubly-linked list (for cleanup).
278  */
279 static struct TransmitCallbackContext *tcc_tail;
280
281 /**
282  * Have we already cleaned up the TCCs and are hence no longer
283  * willing (or able) to transmit anything to anyone?
284  */
285 static int cleaning_done;
286
287 /**
288  * Handle for pending get request.
289  */
290 static struct GNUNET_STATISTICS_GetHandle *stat_get;
291
292
293 /**
294  * Task that is used to remove expired entries from
295  * the datastore.  This task will schedule itself
296  * again automatically to always delete all expired
297  * content quickly.
298  *
299  * @param cls not used
300  * @param tc task context
301  */ 
302 static void
303 delete_expired (void *cls,
304                 const struct GNUNET_SCHEDULER_TaskContext *tc);
305
306
307 /**
308  * Iterate over the expired items stored in the datastore.
309  * Delete all expired items; once we have processed all
310  * expired items, re-schedule the "delete_expired" task.
311  *
312  * @param cls not used
313  * @param next_cls closure to pass to the "next" function.
314  * @param key key for the content
315  * @param size number of bytes in data
316  * @param data content stored
317  * @param type type of the content
318  * @param priority priority of the content
319  * @param anonymity anonymity-level for the content
320  * @param expiration expiration time for the content
321  * @param uid unique identifier for the datum;
322  *        maybe 0 if no unique identifier is available
323  *
324  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
325  *         (continue on call to "next", of course),
326  *         GNUNET_NO to delete the item and continue (if supported)
327  */
328 static int 
329 expired_processor (void *cls,
330                    void *next_cls,
331                    const GNUNET_HashCode * key,
332                    uint32_t size,
333                    const void *data,
334                    enum GNUNET_BLOCK_Type type,
335                    uint32_t priority,
336                    uint32_t anonymity,
337                    struct GNUNET_TIME_Absolute
338                    expiration, 
339                    uint64_t uid)
340 {
341   struct GNUNET_TIME_Absolute now;
342
343   if (key == NULL) 
344     {
345       expired_kill_task 
346         = GNUNET_SCHEDULER_add_delayed (sched,
347                                         MAX_EXPIRE_DELAY,
348                                         &delete_expired,
349                                         NULL);
350       return GNUNET_SYSERR;
351     }
352   now = GNUNET_TIME_absolute_get ();
353   if (expiration.value > now.value)
354     {
355       /* finished processing */
356       plugin->api->next_request (next_cls, GNUNET_YES);
357       return GNUNET_SYSERR;
358     }
359   plugin->api->next_request (next_cls, GNUNET_NO);
360 #if DEBUG_DATASTORE
361   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
362               "Deleting content `%s' of type %u that expired %llu ms ago\n",
363               GNUNET_h2s (key),
364               type,
365               (unsigned long long) (now.value - expiration.value));
366 #endif
367   GNUNET_STATISTICS_update (stats,
368                             gettext_noop ("# bytes expired"),
369                             size,
370                             GNUNET_YES);
371   GNUNET_CONTAINER_bloomfilter_remove (filter,
372                                        key);
373   return GNUNET_NO; /* delete */
374 }
375
376
377 /**
378  * Task that is used to remove expired entries from
379  * the datastore.  This task will schedule itself
380  * again automatically to always delete all expired
381  * content quickly.
382  *
383  * @param cls not used
384  * @param tc task context
385  */ 
386 static void
387 delete_expired (void *cls,
388                 const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
391   plugin->api->iter_ascending_expiration (plugin->api->cls, 
392                                           0,
393                                           &expired_processor,
394                                           NULL);
395 }
396
397
398 /**
399  * An iterator over a set of items stored in the datastore.
400  *
401  * @param cls closure
402  * @param next_cls closure to pass to the "next" function.
403  * @param key key for the content
404  * @param size number of bytes in data
405  * @param data content stored
406  * @param type type of the content
407  * @param priority priority of the content
408  * @param anonymity anonymity-level for the content
409  * @param expiration expiration time for the content
410  * @param uid unique identifier for the datum;
411  *        maybe 0 if no unique identifier is available
412  *
413  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
414  *         (continue on call to "next", of course),
415  *         GNUNET_NO to delete the item and continue (if supported)
416  */
417 static int 
418 manage (void *cls,
419         void *next_cls,
420         const GNUNET_HashCode * key,
421         uint32_t size,
422         const void *data,
423         enum GNUNET_BLOCK_Type type,
424         uint32_t priority,
425         uint32_t anonymity,
426         struct GNUNET_TIME_Absolute
427         expiration, 
428         uint64_t uid)
429 {
430   unsigned long long *need = cls;
431
432   if (NULL == key)
433     {
434       GNUNET_free (need);
435       return GNUNET_SYSERR;
436     }
437   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
438     *need = 0;
439   else
440     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
441   plugin->api->next_request (next_cls, 
442                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
443 #if DEBUG_DATASTORE
444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
445               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
446               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
447               GNUNET_h2s (key),
448               type,
449               *need);
450 #endif
451   GNUNET_STATISTICS_update (stats,
452                             gettext_noop ("# bytes purged (low-priority)"),
453                             size,
454                             GNUNET_YES);
455   GNUNET_CONTAINER_bloomfilter_remove (filter,
456                                        key);
457   return GNUNET_NO;
458 }
459
460
461 /**
462  * Manage available disk space by running tasks
463  * that will discard content if necessary.  This
464  * function will be run whenever a request for
465  * "need" bytes of storage could only be satisfied
466  * by eating into the "cache" (and we want our cache
467  * space back).
468  *
469  * @param need number of bytes of content that were
470  *        placed into the "cache" (and hence the
471  *        number of bytes that should be removed).
472  */
473 static void
474 manage_space (unsigned long long need)
475 {
476   unsigned long long *n;
477
478 #if DEBUG_DATASTORE
479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480               "Asked to free up %llu bytes of cache space\n",
481               need);
482 #endif
483   n = GNUNET_malloc (sizeof(unsigned long long));
484   *n = need;
485   plugin->api->iter_low_priority (plugin->api->cls,
486                                   0,
487                                   &manage,
488                                   n);
489 }
490
491
492 /**
493  * Function called to notify a client about the socket
494  * begin ready to queue more data.  "buf" will be
495  * NULL and "size" zero if the socket was closed for
496  * writing in the meantime.
497  *
498  * @param cls closure
499  * @param size number of bytes available in buf
500  * @param buf where the callee should write the message
501  * @return number of bytes written to buf
502  */
503 static size_t
504 transmit_callback (void *cls,
505                    size_t size, void *buf)
506 {
507   struct TransmitCallbackContext *tcc = cls;
508   size_t msize;
509   
510   tcc->th = NULL;
511   GNUNET_CONTAINER_DLL_remove (tcc_head,
512                                tcc_tail,
513                                tcc);
514   msize = ntohs(tcc->msg->size);
515   if (size == 0)
516     {
517 #if DEBUG_DATASTORE
518       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519                   "Transmission failed.\n");
520 #endif
521       if (tcc->tc != NULL)
522         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
523       if (GNUNET_YES == tcc->end)
524         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
525       GNUNET_SERVER_client_drop (tcc->client);
526       GNUNET_free (tcc->msg);
527       GNUNET_free (tcc);
528       return 0;
529     }
530   GNUNET_assert (size >= msize);
531   memcpy (buf, tcc->msg, msize);
532   if (tcc->tc != NULL)
533     tcc->tc (tcc->tc_cls, GNUNET_OK);
534   if (GNUNET_YES == tcc->end)
535     {
536       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
537     }
538   else
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "Response transmitted, more pending!\n");
543 #endif
544     }
545   GNUNET_SERVER_client_drop (tcc->client);
546   GNUNET_free (tcc->msg);
547   GNUNET_free (tcc);
548   return msize;
549 }
550
551
552 /**
553  * Transmit the given message to the client.
554  *
555  * @param client target of the message
556  * @param msg message to transmit, will be freed!
557  * @param tc function to call afterwards
558  * @param tc_cls closure for tc
559  * @param end is this the last response (and we should
560  *        signal the server completion accodingly after
561  *        transmitting this message)?
562  */
563 static void
564 transmit (struct GNUNET_SERVER_Client *client,
565           struct GNUNET_MessageHeader *msg,
566           TransmitContinuation tc,
567           void *tc_cls,
568           int end)
569 {
570   struct TransmitCallbackContext *tcc;
571
572   if (GNUNET_YES == cleaning_done)
573     {
574 #if DEBUG_DATASTORE
575       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
576                   "Shutdown in progress, aborting transmission.\n");
577 #endif
578       GNUNET_free (msg);
579       if (NULL != tc)
580         tc (tc_cls, GNUNET_SYSERR);
581       return;
582     }
583   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
584   tcc->msg = msg;
585   tcc->client = client;
586   tcc->tc = tc;
587   tcc->tc_cls = tc_cls;
588   tcc->end = end;
589   if (NULL ==
590       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
591                                                       ntohs(msg->size),
592                                                       GNUNET_TIME_UNIT_FOREVER_REL,
593                                                       &transmit_callback,
594                                                       tcc)))
595     {
596       GNUNET_break (0);
597       if (GNUNET_YES == end)
598         {
599 #if DEBUG_DATASTORE
600           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
601                       "Disconnecting client.\n");
602 #endif    
603           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
604         }
605       if (NULL != tc)
606         tc (tc_cls, GNUNET_SYSERR);
607       GNUNET_free (msg);
608       GNUNET_free (tcc);
609       return;
610     }
611   GNUNET_SERVER_client_keep (client);
612   GNUNET_CONTAINER_DLL_insert (tcc_head,
613                                tcc_tail,
614                                tcc);
615 }
616
617
618 /**
619  * Transmit a status code to the client.
620  *
621  * @param client receiver of the response
622  * @param code status code
623  * @param msg optional error message (can be NULL)
624  */
625 static void
626 transmit_status (struct GNUNET_SERVER_Client *client,
627                  int code,
628                  const char *msg)
629 {
630   struct StatusMessage *sm;
631   size_t slen;
632
633 #if DEBUG_DATASTORE
634   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635               "Transmitting `%s' message with value %d and message `%s'\n",
636               "STATUS",
637               code,
638               msg != NULL ? msg : "(none)");
639 #endif
640   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
641   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
642   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
643   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
644   sm->status = htonl(code);
645   if (slen > 0)
646     memcpy (&sm[1], msg, slen);  
647   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
648 }
649
650
651 /**
652  * Function called once the transmit operation has
653  * either failed or succeeded.
654  *
655  * @param next_cls closure for calling "next_request" callback
656  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
657  */
658 static void 
659 get_next(void *next_cls,
660          int status)
661 {
662   if (status != GNUNET_OK)
663     {
664       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
665                   _("Failed to transmit an item to the client; aborting iteration.\n"));
666       if (plugin != NULL)
667         plugin->api->next_request (next_cls, GNUNET_YES);
668       return;
669     }
670   plugin->api->next_request (next_cls, GNUNET_NO);
671 }
672
673
674 /**
675  * Function that will transmit the given datastore entry
676  * to the client.
677  *
678  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
679  * @param next_cls closure to use to ask for the next item
680  * @param key key for the content
681  * @param size number of bytes in data
682  * @param data content stored
683  * @param type type of the content
684  * @param priority priority of the content
685  * @param anonymity anonymity-level for the content
686  * @param expiration expiration time for the content
687  * @param uid unique identifier for the datum;
688  *        maybe 0 if no unique identifier is available
689  *
690  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
691  *         GNUNET_NO to delete the item and continue (if supported)
692  */
693 static int
694 transmit_item (void *cls,
695                void *next_cls,
696                const GNUNET_HashCode * key,
697                uint32_t size,
698                const void *data,
699                enum GNUNET_BLOCK_Type type,
700                uint32_t priority,
701                uint32_t anonymity,
702                struct GNUNET_TIME_Absolute
703                expiration, uint64_t uid)
704 {
705   struct GNUNET_SERVER_Client *client = cls;
706   struct GNUNET_MessageHeader *end;
707   struct DataMessage *dm;
708
709   if (key == NULL)
710     {
711       /* transmit 'DATA_END' */
712 #if DEBUG_DATASTORE
713       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
714                   "Transmitting `%s' message\n",
715                   "DATA_END");
716 #endif
717       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
718       end->size = htons(sizeof(struct GNUNET_MessageHeader));
719       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
720       transmit (client, end, NULL, NULL, GNUNET_YES);
721       GNUNET_SERVER_client_drop (client);
722       return GNUNET_OK;
723     }
724   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
725   dm->header.size = htons(sizeof(struct DataMessage) + size);
726   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
727   dm->rid = htonl(0);
728   dm->size = htonl(size);
729   dm->type = htonl(type);
730   dm->priority = htonl(priority);
731   dm->anonymity = htonl(anonymity);
732   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
733   dm->uid = GNUNET_htonll(uid);
734   dm->key = *key;
735   memcpy (&dm[1], data, size);
736 #if DEBUG_DATASTORE
737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
738               "Transmitting `%s' message for `%s' of type %u\n",
739               "DATA",
740               GNUNET_h2s (key),
741               type);
742 #endif
743   GNUNET_STATISTICS_update (stats,
744                             gettext_noop ("# results found"),
745                             1,
746                             GNUNET_NO);
747   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
748   return GNUNET_OK;
749 }
750
751
752 /**
753  * Handle RESERVE-message.
754  *
755  * @param cls closure
756  * @param client identification of the client
757  * @param message the actual message
758  */
759 static void
760 handle_reserve (void *cls,
761                 struct GNUNET_SERVER_Client *client,
762                 const struct GNUNET_MessageHeader *message)
763 {
764   /**
765    * Static counter to produce reservation identifiers.
766    */
767   static int reservation_gen;
768
769   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
770   struct ReservationList *e;
771   unsigned long long used;
772   unsigned long long req;
773   uint64_t amount;
774   uint32_t entries;
775
776 #if DEBUG_DATASTORE
777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
778               "Processing `%s' request\n",
779               "RESERVE");
780 #endif
781   amount = GNUNET_ntohll(msg->amount);
782   entries = ntohl(msg->entries);
783   used = payload + reserved;
784   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
785   if (used + req > quota)
786     {
787       if (quota < used)
788         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
789       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
790                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
791                   quota - used,
792                   "RESERVE",
793                   req);
794       if (cache_size < req)
795         {
796           /* TODO: document this in the FAQ; essentially, if this
797              message happens, the insertion request could be blocked
798              by less-important content from migration because it is
799              larger than 1/8th of the overall available space, and
800              we only reserve 1/8th for "fresh" insertions */
801           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
802                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
803                       req,
804                       cache_size);
805           transmit_status (client, 0, 
806                            gettext_noop ("Insufficient space to satisfy request and "
807                                          "requested amount is larger than cache size"));
808         }
809       else
810         {
811           transmit_status (client, 0, 
812                            gettext_noop ("Insufficient space to satisfy request"));
813         }
814       return;      
815     }
816   reserved += req;
817   GNUNET_STATISTICS_set (stats,
818                          gettext_noop ("# reserved"),
819                          reserved,
820                          GNUNET_NO);
821   e = GNUNET_malloc (sizeof(struct ReservationList));
822   e->next = reservations;
823   reservations = e;
824   e->client = client;
825   e->amount = amount;
826   e->entries = entries;
827   e->rid = ++reservation_gen;
828   if (reservation_gen < 0)
829     reservation_gen = 0; /* wrap around */
830   transmit_status (client, e->rid, NULL);
831 }
832
833
834 /**
835  * Handle RELEASE_RESERVE-message.
836  *
837  * @param cls closure
838  * @param client identification of the client
839  * @param message the actual message
840  */
841 static void
842 handle_release_reserve (void *cls,
843                         struct GNUNET_SERVER_Client *client,
844                         const struct GNUNET_MessageHeader *message)
845 {
846   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
847   struct ReservationList *pos;
848   struct ReservationList *prev;
849   struct ReservationList *next;
850   int rid = ntohl(msg->rid);
851   unsigned long long rem;
852
853 #if DEBUG_DATASTORE
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Processing `%s' request\n",
856               "RELEASE_RESERVE");
857 #endif
858   next = reservations;
859   prev = NULL;
860   while (NULL != (pos = next))
861     {
862       next = pos->next;
863       if (rid == pos->rid)
864         {
865           if (prev == NULL)
866             reservations = next;
867           else
868             prev->next = next;
869           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
870           GNUNET_assert (reserved >= rem);
871           reserved -= rem;
872           GNUNET_STATISTICS_set (stats,
873                          gettext_noop ("# reserved"),
874                                  reserved,
875                                  GNUNET_NO);
876 #if DEBUG_DATASTORE
877           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878                       "Returning %llu remaining reserved bytes to storage pool\n",
879                       rem);
880 #endif    
881           GNUNET_free (pos);
882           transmit_status (client, GNUNET_OK, NULL);
883           return;
884         }       
885       prev = pos;
886     }
887   GNUNET_break (0);
888   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
889 }
890
891
892 /**
893  * Check that the given message is a valid data message.
894  *
895  * @return NULL if the message is not well-formed, otherwise the message
896  */
897 static const struct DataMessage *
898 check_data (const struct GNUNET_MessageHeader *message)
899 {
900   uint16_t size;
901   uint32_t dsize;
902   const struct DataMessage *dm;
903
904   size = ntohs(message->size);
905   if (size < sizeof(struct DataMessage))
906     { 
907       GNUNET_break (0);
908       return NULL;
909     }
910   dm = (const struct DataMessage *) message;
911   dsize = ntohl(dm->size);
912   if (size != dsize + sizeof(struct DataMessage))
913     {
914       GNUNET_break (0);
915       return NULL;
916     }
917   return dm;
918 }
919
920
921 /**
922  * Context for a put request used to see if the content is
923  * already present.
924  */
925 struct PutContext
926 {
927   /**
928    * Client to notify on completion.
929    */
930   struct GNUNET_SERVER_Client *client;
931
932   /**
933    * Did we find the data already in the database?
934    */
935   int is_present;
936   
937   /* followed by the 'struct DataMessage' */
938 };
939
940
941 /**
942  * Actually put the data message.
943  */
944 static void
945 execute_put (struct GNUNET_SERVER_Client *client,
946              const struct DataMessage *dm)
947 {
948   uint32_t size;
949   char *msg;
950   int ret;
951
952   size = ntohl(dm->size);
953   msg = NULL;
954   ret = plugin->api->put (plugin->api->cls,
955                           &dm->key,
956                           size,
957                           &dm[1],
958                           ntohl(dm->type),
959                           ntohl(dm->priority),
960                           ntohl(dm->anonymity),
961                           GNUNET_TIME_absolute_ntoh(dm->expiration),
962                           &msg);
963   if (GNUNET_OK == ret)
964     {
965       GNUNET_STATISTICS_update (stats,
966                                 gettext_noop ("# bytes stored"),
967                                 size,
968                                 GNUNET_YES);
969       GNUNET_CONTAINER_bloomfilter_add (filter,
970                                         &dm->key);
971 #if DEBUG_DATASTORE
972       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
973                   "Successfully stored %u bytes of type %u under key `%s'\n",
974                   size,
975                   ntohl(dm->type),
976                   GNUNET_h2s (&dm->key));
977 #endif
978     }
979   transmit_status (client, 
980                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
981                    msg);
982   GNUNET_free_non_null (msg);
983   if (quota - reserved - cache_size < payload)
984     {
985       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
986                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
987                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
988                   (unsigned long long) (quota - reserved - cache_size),
989                   (unsigned long long) payload);
990       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
991     }
992 }
993
994
995
996 /**
997  * Function that will check if the given datastore entry
998  * matches the put and if none match executes the put.
999  *
1000  * @param cls closure, pointer to the client (of type 'struct PutContext').
1001  * @param next_cls closure to use to ask for the next item
1002  * @param key key for the content
1003  * @param size number of bytes in data
1004  * @param data content stored
1005  * @param type type of the content
1006  * @param priority priority of the content
1007  * @param anonymity anonymity-level for the content
1008  * @param expiration expiration time for the content
1009  * @param uid unique identifier for the datum;
1010  *        maybe 0 if no unique identifier is available
1011  *
1012  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1013  *         GNUNET_NO to delete the item and continue (if supported)
1014  */
1015 static int
1016 check_present (void *cls,
1017                void *next_cls,
1018                const GNUNET_HashCode * key,
1019                uint32_t size,
1020                const void *data,
1021                enum GNUNET_BLOCK_Type type,
1022                uint32_t priority,
1023                uint32_t anonymity,
1024                struct GNUNET_TIME_Absolute
1025                expiration, uint64_t uid)
1026 {
1027   struct PutContext *pc = cls;
1028   const struct DataMessage *dm;
1029
1030   dm = (const struct DataMessage*) &pc[1];
1031   if (key == NULL)
1032     {
1033       if (pc->is_present == GNUNET_YES) 
1034         transmit_status (pc->client, GNUNET_OK, NULL);
1035       else
1036         execute_put (pc->client, dm);
1037       GNUNET_SERVER_client_drop (pc->client);
1038       GNUNET_free (pc);
1039       return GNUNET_SYSERR;
1040     }
1041   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1042        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1043        ( (size == ntohl(dm->size)) &&
1044          (0 == memcmp (&dm[1],
1045                        data,
1046                        size)) ) )
1047     {
1048       pc->is_present = GNUNET_YES;
1049       plugin->api->next_request (next_cls, GNUNET_YES);
1050     }
1051   else
1052     {
1053       plugin->api->next_request (next_cls, GNUNET_NO);
1054     }
1055   return GNUNET_OK;
1056 }
1057
1058
1059 /**
1060  * Handle PUT-message.
1061  *
1062  * @param cls closure
1063  * @param client identification of the client
1064  * @param message the actual message
1065  */
1066 static void
1067 handle_put (void *cls,
1068             struct GNUNET_SERVER_Client *client,
1069             const struct GNUNET_MessageHeader *message)
1070 {
1071   const struct DataMessage *dm = check_data (message);
1072   int rid;
1073   struct ReservationList *pos;
1074   struct PutContext *pc;
1075   uint32_t size;
1076
1077   if ( (dm == NULL) ||
1078        (ntohl(dm->type) == 0) ) 
1079     {
1080       GNUNET_break (0);
1081       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1082       return;
1083     }
1084 #if DEBUG_DATASTORE
1085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086               "Processing `%s' request for `%s' of type %u\n",
1087               "PUT",
1088               GNUNET_h2s (&dm->key),
1089               ntohl (dm->type));
1090 #endif
1091   rid = ntohl(dm->rid);
1092   size = ntohl(dm->size);
1093   if (rid > 0)
1094     {
1095       pos = reservations;
1096       while ( (NULL != pos) &&
1097               (rid != pos->rid) )
1098         pos = pos->next;
1099       GNUNET_break (pos != NULL);
1100       if (NULL != pos)
1101         {
1102           GNUNET_break (pos->entries > 0);
1103           GNUNET_break (pos->amount >= size);
1104           pos->entries--;
1105           pos->amount -= size;
1106           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1107           GNUNET_STATISTICS_set (stats,
1108                                  gettext_noop ("# reserved"),
1109                                  reserved,
1110                                  GNUNET_NO);
1111         }
1112     }
1113   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1114                                                        &dm->key))
1115     {
1116       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1117       pc->client = client;
1118       GNUNET_SERVER_client_keep (client);
1119       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1120       plugin->api->get (plugin->api->cls,
1121                         &dm->key,
1122                         NULL,
1123                         ntohl (dm->type),
1124                         &check_present,
1125                         pc);      
1126       return;
1127     }
1128   execute_put (client, dm);
1129 }
1130
1131
1132 /**
1133  * Handle GET-message.
1134  *
1135  * @param cls closure
1136  * @param client identification of the client
1137  * @param message the actual message
1138  */
1139 static void
1140 handle_get (void *cls,
1141             struct GNUNET_SERVER_Client *client,
1142             const struct GNUNET_MessageHeader *message)
1143 {
1144   const struct GetMessage *msg;
1145   uint16_t size;
1146
1147   size = ntohs(message->size);
1148   if ( (size != sizeof(struct GetMessage)) &&
1149        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1150     {
1151       GNUNET_break (0);
1152       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1153       return;
1154     }
1155   msg = (const struct GetMessage*) message;
1156 #if DEBUG_DATASTORE
1157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158               "Processing `%s' request for `%s' of type %u\n",
1159               "GET",
1160               GNUNET_h2s (&msg->key),
1161               ntohl (msg->type));
1162 #endif
1163   GNUNET_STATISTICS_update (stats,
1164                             gettext_noop ("# GET requests received"),
1165                             1,
1166                             GNUNET_NO);
1167   GNUNET_SERVER_client_keep (client);
1168   if ( (size == sizeof(struct GetMessage)) &&
1169        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1170                                                          &msg->key)) )
1171     {
1172       /* don't bother database... */
1173 #if DEBUG_DATASTORE
1174       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1176                   "GET",
1177                   GNUNET_h2s (&msg->key));
1178 #endif  
1179       GNUNET_STATISTICS_update (stats,
1180                                 gettext_noop ("# requests filtered by bloomfilter"),
1181                                 1,
1182                                 GNUNET_NO);
1183       transmit_item (client,
1184                      NULL, NULL, 0, NULL, 0, 0, 0, 
1185                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1186       return;
1187     }
1188   plugin->api->get (plugin->api->cls,
1189                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1190                     NULL,
1191                     ntohl(msg->type),
1192                     &transmit_item,
1193                     client);    
1194 }
1195
1196
1197 /**
1198  * Handle UPDATE-message.
1199  *
1200  * @param cls closure
1201  * @param client identification of the client
1202  * @param message the actual message
1203  */
1204 static void
1205 handle_update (void *cls,
1206                struct GNUNET_SERVER_Client *client,
1207                const struct GNUNET_MessageHeader *message)
1208 {
1209   const struct UpdateMessage *msg;
1210   int ret;
1211   char *emsg;
1212
1213   GNUNET_STATISTICS_update (stats,
1214                             gettext_noop ("# UPDATE requests received"),
1215                             1,
1216                             GNUNET_NO);
1217   msg = (const struct UpdateMessage*) message;
1218   emsg = NULL;
1219 #if DEBUG_DATASTORE
1220   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1221               "Processing `%s' request for %llu\n",
1222               "UPDATE",
1223               (unsigned long long) GNUNET_ntohll (msg->uid));
1224 #endif
1225   ret = plugin->api->update (plugin->api->cls,
1226                              GNUNET_ntohll(msg->uid),
1227                              (int32_t) ntohl(msg->priority),
1228                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1229                              &emsg);
1230   transmit_status (client, ret, emsg);
1231   GNUNET_free_non_null (emsg);
1232 }
1233
1234
1235 /**
1236  * Handle GET_RANDOM-message.
1237  *
1238  * @param cls closure
1239  * @param client identification of the client
1240  * @param message the actual message
1241  */
1242 static void
1243 handle_get_random (void *cls,
1244                    struct GNUNET_SERVER_Client *client,
1245                    const struct GNUNET_MessageHeader *message)
1246 {
1247 #if DEBUG_DATASTORE
1248   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1249               "Processing `%s' request\n",
1250               "GET_RANDOM");
1251 #endif
1252   GNUNET_STATISTICS_update (stats,
1253                             gettext_noop ("# GET RANDOM requests received"),
1254                             1,
1255                             GNUNET_NO);
1256   GNUNET_SERVER_client_keep (client);
1257   plugin->api->iter_migration_order (plugin->api->cls,
1258                                      0,
1259                                      &transmit_item,
1260                                      client);  
1261 }
1262
1263
1264 /**
1265  * Context for the 'remove_callback'.
1266  */
1267 struct RemoveContext 
1268 {
1269   /**
1270    * Client for whom we're doing the remvoing.
1271    */
1272   struct GNUNET_SERVER_Client *client;
1273
1274   /**
1275    * GNUNET_YES if we managed to remove something.
1276    */
1277   int found;
1278 };
1279
1280
1281 /**
1282  * Callback function that will cause the item that is passed
1283  * in to be deleted (by returning GNUNET_NO).
1284  */
1285 static int
1286 remove_callback (void *cls,
1287                  void *next_cls,
1288                  const GNUNET_HashCode * key,
1289                  uint32_t size,
1290                  const void *data,
1291                  enum GNUNET_BLOCK_Type type,
1292                  uint32_t priority,
1293                  uint32_t anonymity,
1294                  struct GNUNET_TIME_Absolute
1295                  expiration, uint64_t uid)
1296 {
1297   struct RemoveContext *rc = cls;
1298
1299   if (key == NULL)
1300     {
1301 #if DEBUG_DATASTORE
1302       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303                   "No further matches for `%s' request.\n",
1304                   "REMOVE");
1305 #endif  
1306       if (GNUNET_YES == rc->found)
1307         transmit_status (rc->client, GNUNET_OK, NULL);       
1308       else
1309         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1310       GNUNET_SERVER_client_drop (rc->client);
1311       GNUNET_free (rc);
1312       return GNUNET_OK; /* last item */
1313     }
1314   rc->found = GNUNET_YES;
1315 #if DEBUG_DATASTORE
1316   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1318               (unsigned long long) uid,
1319               "REMOVE",
1320               GNUNET_h2s (key),
1321               type);
1322 #endif  
1323   GNUNET_STATISTICS_update (stats,
1324                             gettext_noop ("# bytes removed (explicit request)"),
1325                             size,
1326                             GNUNET_YES);
1327   GNUNET_CONTAINER_bloomfilter_remove (filter,
1328                                        key);
1329   plugin->api->next_request (next_cls, GNUNET_YES);
1330   return GNUNET_NO;
1331 }
1332
1333
1334 /**
1335  * Handle REMOVE-message.
1336  *
1337  * @param cls closure
1338  * @param client identification of the client
1339  * @param message the actual message
1340  */
1341 static void
1342 handle_remove (void *cls,
1343              struct GNUNET_SERVER_Client *client,
1344              const struct GNUNET_MessageHeader *message)
1345 {
1346   const struct DataMessage *dm = check_data (message);
1347   GNUNET_HashCode vhash;
1348   struct RemoveContext *rc;
1349
1350   if (dm == NULL)
1351     {
1352       GNUNET_break (0);
1353       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1354       return;
1355     }
1356 #if DEBUG_DATASTORE
1357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1358               "Processing `%s' request for `%s' of type %u\n",
1359               "REMOVE",
1360               GNUNET_h2s (&dm->key),
1361               ntohl (dm->type));
1362 #endif
1363   GNUNET_STATISTICS_update (stats,
1364                             gettext_noop ("# REMOVE requests received"),
1365                             1,
1366                             GNUNET_NO);
1367   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1368   GNUNET_SERVER_client_keep (client);
1369   rc->client = client;
1370   GNUNET_CRYPTO_hash (&dm[1],
1371                       ntohl(dm->size),
1372                       &vhash);
1373   plugin->api->get (plugin->api->cls,
1374                     &dm->key,
1375                     &vhash,
1376                     ntohl(dm->type),
1377                     &remove_callback,
1378                     rc);
1379 }
1380
1381
1382 /**
1383  * Handle DROP-message.
1384  *
1385  * @param cls closure
1386  * @param client identification of the client
1387  * @param message the actual message
1388  */
1389 static void
1390 handle_drop (void *cls,
1391              struct GNUNET_SERVER_Client *client,
1392              const struct GNUNET_MessageHeader *message)
1393 {
1394 #if DEBUG_DATASTORE
1395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1396               "Processing `%s' request\n",
1397               "DROP");
1398 #endif
1399   plugin->api->drop (plugin->api->cls);
1400   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1401 }
1402
1403
1404 /**
1405  * Function called by plugins to notify us about a
1406  * change in their disk utilization.
1407  *
1408  * @param cls closure (NULL)
1409  * @param delta change in disk utilization, 
1410  *        0 for "reset to empty"
1411  */
1412 static void
1413 disk_utilization_change_cb (void *cls,
1414                             int delta)
1415 {
1416   if ( (delta < 0) &&
1417        (payload < -delta) )
1418     {
1419       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1420                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1421                   (long long) payload,
1422                   (long long) -delta);
1423       payload = plugin->api->get_size (plugin->api->cls);
1424       sync_stats ();
1425       return;
1426     }
1427   payload += delta;
1428   lastSync++;
1429   if (lastSync >= MAX_STAT_SYNC_LAG)
1430     sync_stats ();
1431 }
1432
1433
1434 /**
1435  * Callback function to process statistic values.
1436  *
1437  * @param cls closure (struct Plugin*)
1438  * @param subsystem name of subsystem that created the statistic
1439  * @param name the name of the datum
1440  * @param value the current value
1441  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1442  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1443  */
1444 static int
1445 process_stat_in (void *cls,
1446                  const char *subsystem,
1447                  const char *name,
1448                  uint64_t value,
1449                  int is_persistent)
1450 {
1451   GNUNET_assert (stats_worked == GNUNET_NO);
1452   stats_worked = GNUNET_YES;
1453   payload += value;
1454 #if DEBUG_SQLITE
1455   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1456               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1457               value,
1458               payload);
1459 #endif
1460   return GNUNET_OK;
1461 }
1462
1463
1464 static void
1465 process_stat_done (void *cls,
1466                    int success)
1467 {
1468   struct DatastorePlugin *plugin = cls;
1469
1470   stat_get = NULL;
1471   if (stats_worked == GNUNET_NO) 
1472     payload = plugin->api->get_size (plugin->api->cls);
1473 }
1474
1475
1476 /**
1477  * Load the datastore plugin.
1478  */
1479 static struct DatastorePlugin *
1480 load_plugin () 
1481 {
1482   struct DatastorePlugin *ret;
1483   char *libname;
1484   char *name;
1485
1486   if (GNUNET_OK !=
1487       GNUNET_CONFIGURATION_get_value_string (cfg,
1488                                              "DATASTORE", "DATABASE", &name))
1489     {
1490       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1491                   _("No `%s' specified for `%s' in configuration!\n"),
1492                   "DATABASE",
1493                   "DATASTORE");
1494       return NULL;
1495     }
1496   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1497   ret->env.cfg = cfg;
1498   ret->env.sched = sched;  
1499   ret->env.duc = &disk_utilization_change_cb;
1500   ret->env.cls = NULL;
1501   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1502               _("Loading `%s' datastore plugin\n"), name);
1503   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1504   ret->short_name = name;
1505   ret->lib_name = libname;
1506   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1507   if (ret->api == NULL)
1508     {
1509       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1510                   _("Failed to load datastore plugin for `%s'\n"), name);
1511       GNUNET_free (ret->short_name);
1512       GNUNET_free (libname);
1513       GNUNET_free (ret);
1514       return NULL;
1515     }
1516   return ret;
1517 }
1518
1519
1520 /**
1521  * Function called when the service shuts
1522  * down.  Unloads our datastore plugin.
1523  *
1524  * @param plug plugin to unload
1525  */
1526 static void
1527 unload_plugin (struct DatastorePlugin *plug)
1528 {
1529 #if DEBUG_DATASTORE
1530   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1531               "Datastore service is unloading plugin...\n");
1532 #endif
1533   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1534   GNUNET_free (plug->lib_name);
1535   GNUNET_free (plug->short_name);
1536   GNUNET_free (plug);
1537 }
1538
1539
1540 /**
1541  * Final task run after shutdown.  Unloads plugins and disconnects us from
1542  * statistics.
1543  */
1544 static void
1545 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1546 {
1547   unload_plugin (plugin);
1548   plugin = NULL;
1549   if (filter != NULL)
1550     {
1551       GNUNET_CONTAINER_bloomfilter_free (filter);
1552       filter = NULL;
1553     }
1554   if (lastSync > 0)
1555     sync_stats ();
1556   if (stat_get != NULL)
1557     {
1558       GNUNET_STATISTICS_get_cancel (stat_get);
1559       stat_get = NULL;
1560     }
1561   if (stats != NULL)
1562     {
1563       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1564       stats = NULL;
1565     }
1566 }
1567
1568
1569 /**
1570  * Last task run during shutdown.  Disconnects us from
1571  * the transport and core.
1572  */
1573 static void
1574 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1575 {
1576   struct TransmitCallbackContext *tcc;
1577
1578   cleaning_done = GNUNET_YES;
1579   while (NULL != (tcc = tcc_head))
1580     {
1581       GNUNET_CONTAINER_DLL_remove (tcc_head,
1582                                    tcc_tail,
1583                                    tcc);
1584       if (tcc->th != NULL)
1585         {
1586           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1587           GNUNET_SERVER_client_drop (tcc->client);
1588         }
1589       if (NULL != tcc->tc)
1590         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1591       GNUNET_free (tcc->msg);
1592       GNUNET_free (tcc);
1593     }
1594   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1595     {
1596       GNUNET_SCHEDULER_cancel (sched,
1597                                expired_kill_task);
1598       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1599     }
1600   GNUNET_SCHEDULER_add_continuation (sched,
1601                                      &unload_task,
1602                                      NULL,
1603                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1604 }
1605
1606
1607 /**
1608  * Function that removes all active reservations made
1609  * by the given client and releases the space for other
1610  * requests.
1611  *
1612  * @param cls closure
1613  * @param client identification of the client
1614  */
1615 static void
1616 cleanup_reservations (void *cls,
1617                       struct GNUNET_SERVER_Client
1618                       * client)
1619 {
1620   struct ReservationList *pos;
1621   struct ReservationList *prev;
1622   struct ReservationList *next;
1623
1624   if (client == NULL)
1625     return;
1626   prev = NULL;
1627   pos = reservations;
1628   while (NULL != pos)
1629     {
1630       next = pos->next;
1631       if (pos->client == client)
1632         {
1633           if (prev == NULL)
1634             reservations = next;
1635           else
1636             prev->next = next;
1637           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1638           GNUNET_free (pos);
1639         }
1640       else
1641         {
1642           prev = pos;
1643         }
1644       pos = next;
1645     }
1646   GNUNET_STATISTICS_set (stats,
1647                          gettext_noop ("# reserved"),
1648                          reserved,
1649                          GNUNET_NO);
1650 }
1651
1652
1653 /**
1654  * Process datastore requests.
1655  *
1656  * @param cls closure
1657  * @param s scheduler to use
1658  * @param server the initialized server
1659  * @param c configuration to use
1660  */
1661 static void
1662 run (void *cls,
1663      struct GNUNET_SCHEDULER_Handle *s,
1664      struct GNUNET_SERVER_Handle *server,
1665      const struct GNUNET_CONFIGURATION_Handle *c)
1666 {
1667   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1668     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1669      sizeof(struct ReserveMessage) }, 
1670     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1671      sizeof(struct ReleaseReserveMessage) }, 
1672     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1673     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1674      sizeof (struct UpdateMessage) }, 
1675     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1676     {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1677      sizeof(struct GNUNET_MessageHeader) }, 
1678     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1679     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1680      sizeof(struct GNUNET_MessageHeader) }, 
1681     {NULL, NULL, 0, 0}
1682   };
1683   char *fn;
1684   unsigned int bf_size;
1685
1686   sched = s;
1687   cfg = c;
1688   if (GNUNET_OK !=
1689       GNUNET_CONFIGURATION_get_value_number (cfg,
1690                                              "DATASTORE", "QUOTA", &quota))
1691     {
1692       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1693                   _("No `%s' specified for `%s' in configuration!\n"),
1694                   "QUOTA",
1695                   "DATASTORE");
1696       return;
1697     }
1698   stats = GNUNET_STATISTICS_create (sched, "datastore", cfg);
1699   GNUNET_STATISTICS_set (stats,
1700                          gettext_noop ("# quota"),
1701                          quota,
1702                          GNUNET_NO);
1703   cache_size = quota / 8; /* Or should we make this an option? */
1704   GNUNET_STATISTICS_set (stats,
1705                          gettext_noop ("# cache size"),
1706                          cache_size,
1707                          GNUNET_NO);
1708   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1709   fn = NULL;
1710   if ( (GNUNET_OK !=
1711         GNUNET_CONFIGURATION_get_value_filename (cfg,
1712                                                  "DATASTORE",
1713                                                  "BLOOMFILTER",
1714                                                  &fn)) ||
1715        (GNUNET_OK !=
1716         GNUNET_DISK_directory_create_for_file (fn)) )
1717     {
1718       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1719                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1720                   fn != NULL ? fn : "");
1721       GNUNET_free_non_null (fn);
1722       fn = NULL;
1723     }
1724   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1725   GNUNET_free_non_null (fn);
1726   if (filter == NULL)
1727     {
1728       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1729                   _("Failed to initialize bloomfilter.\n"));
1730       if (stats != NULL)
1731         {
1732           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1733           stats = NULL;
1734         }
1735       return;
1736     }
1737   plugin = load_plugin ();
1738   if (NULL == plugin)
1739     {
1740       GNUNET_CONTAINER_bloomfilter_free (filter);
1741       filter = NULL;
1742       if (stats != NULL)
1743         {
1744           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1745           stats = NULL;
1746         }
1747       return;
1748     }
1749   stat_get = GNUNET_STATISTICS_get (stats,
1750                                     "datastore",
1751                                     QUOTA_STAT_NAME,
1752                                     GNUNET_TIME_UNIT_SECONDS,
1753                                     &process_stat_done,
1754                                     &process_stat_in,
1755                                     plugin);
1756   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1757   GNUNET_SERVER_add_handlers (server, handlers);
1758   expired_kill_task
1759     = GNUNET_SCHEDULER_add_with_priority (sched,
1760                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1761                                           &delete_expired, NULL);
1762   GNUNET_SCHEDULER_add_delayed (sched,
1763                                 GNUNET_TIME_UNIT_FOREVER_REL,
1764                                 &cleaning_task, NULL);
1765 }
1766
1767
1768 /**
1769  * The main function for the datastore service.
1770  *
1771  * @param argc number of arguments from the command line
1772  * @param argv command line arguments
1773  * @return 0 ok, 1 on error
1774  */
1775 int
1776 main (int argc, char *const *argv)
1777 {
1778   int ret;
1779
1780   ret = (GNUNET_OK ==
1781          GNUNET_SERVICE_run (argc,
1782                              argv,
1783                              "datastore",
1784                              GNUNET_SERVICE_OPTION_NONE,
1785                              &run, NULL)) ? 0 : 1;
1786   return ret;
1787 }
1788
1789
1790 /* end of gnunet-service-datastore.c */