566a227c1306ff8fc41d47e830dc8871070f9feb
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * How much space are we using for the cache?  (space available for
149  * insertions that will be instantly reclaimed by discarding less
150  * important content --- or possibly whatever we just inserted into
151  * the "cache").
152  */
153 static unsigned long long cache_size;
154
155 /**
156  * How much space have we currently reserved?
157  */
158 static unsigned long long reserved;
159   
160 /**
161  * How much data are we currently storing
162  * in the database?
163  */
164 static unsigned long long payload;
165
166 /**
167  * Number of updates that were made to the
168  * payload value since we last synchronized
169  * it with the statistics service.
170  */
171 static unsigned int lastSync;
172
173 /**
174  * Did we get an answer from statistics?
175  */
176 static int stats_worked;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
183
184 /**
185  * Our configuration.
186  */
187 const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189
190 /**
191  * Handle for reporting statistics.
192  */
193 static struct GNUNET_STATISTICS_Handle *stats;
194
195
196 /**
197  * Synchronize our utilization statistics with the 
198  * statistics service.
199  */
200 static void 
201 sync_stats ()
202 {
203   GNUNET_STATISTICS_set (stats,
204                          QUOTA_STAT_NAME,
205                          payload,
206                          GNUNET_YES);
207   lastSync = 0;
208 }
209
210
211
212
213 /**
214  * Function called once the transmit operation has
215  * either failed or succeeded.
216  *
217  * @param cls closure
218  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
219  */
220 typedef void (*TransmitContinuation)(void *cls,
221                                      int status);
222
223
224 /**
225  * Context for transmitting replies to clients.
226  */
227 struct TransmitCallbackContext 
228 {
229   
230   /**
231    * We keep these in a doubly-linked list (for cleanup).
232    */
233   struct TransmitCallbackContext *next;
234   
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *prev;
239   
240   /**
241    * The message that we're asked to transmit.
242    */
243   struct GNUNET_MessageHeader *msg;
244   
245   /**
246    * Handle for the transmission request.
247    */
248   struct GNUNET_CONNECTION_TransmitHandle *th;
249
250   /**
251    * Client that we are transmitting to.
252    */
253   struct GNUNET_SERVER_Client *client;
254
255   /**
256    * Function to call once msg has been transmitted
257    * (or at least added to the buffer).
258    */
259   TransmitContinuation tc;
260
261   /**
262    * Closure for tc.
263    */
264   void *tc_cls;
265
266   /**
267    * GNUNET_YES if we are supposed to signal the server
268    * completion of the client's request.
269    */
270   int end;
271 };
272
273   
274 /**
275  * Head of the doubly-linked list (for cleanup).
276  */
277 static struct TransmitCallbackContext *tcc_head;
278
279 /**
280  * Tail of the doubly-linked list (for cleanup).
281  */
282 static struct TransmitCallbackContext *tcc_tail;
283
284 /**
285  * Have we already cleaned up the TCCs and are hence no longer
286  * willing (or able) to transmit anything to anyone?
287  */
288 static int cleaning_done;
289
290 /**
291  * Handle for pending get request.
292  */
293 static struct GNUNET_STATISTICS_GetHandle *stat_get;
294
295
296 /**
297  * Task that is used to remove expired entries from
298  * the datastore.  This task will schedule itself
299  * again automatically to always delete all expired
300  * content quickly.
301  *
302  * @param cls not used
303  * @param tc task context
304  */ 
305 static void
306 delete_expired (void *cls,
307                 const struct GNUNET_SCHEDULER_TaskContext *tc);
308
309
310 /**
311  * Iterate over the expired items stored in the datastore.
312  * Delete all expired items; once we have processed all
313  * expired items, re-schedule the "delete_expired" task.
314  *
315  * @param cls not used
316  * @param next_cls closure to pass to the "next" function.
317  * @param key key for the content
318  * @param size number of bytes in data
319  * @param data content stored
320  * @param type type of the content
321  * @param priority priority of the content
322  * @param anonymity anonymity-level for the content
323  * @param expiration expiration time for the content
324  * @param uid unique identifier for the datum;
325  *        maybe 0 if no unique identifier is available
326  *
327  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
328  *         (continue on call to "next", of course),
329  *         GNUNET_NO to delete the item and continue (if supported)
330  */
331 static int 
332 expired_processor (void *cls,
333                    void *next_cls,
334                    const GNUNET_HashCode * key,
335                    uint32_t size,
336                    const void *data,
337                    enum GNUNET_BLOCK_Type type,
338                    uint32_t priority,
339                    uint32_t anonymity,
340                    struct GNUNET_TIME_Absolute
341                    expiration, 
342                    uint64_t uid)
343 {
344   struct GNUNET_TIME_Absolute now;
345
346   if (key == NULL) 
347     {
348       expired_kill_task 
349         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
350                                         &delete_expired,
351                                         NULL);
352       return GNUNET_SYSERR;
353     }
354   now = GNUNET_TIME_absolute_get ();
355   if (expiration.abs_value > now.abs_value)
356     {
357       /* finished processing */
358       expired_kill_task 
359         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
360                                         &delete_expired,
361                                         NULL);
362       return GNUNET_SYSERR;
363     }
364 #if DEBUG_DATASTORE
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Deleting content `%s' of type %u that expired %llu ms ago\n",
367               GNUNET_h2s (key),
368               type,
369               (unsigned long long) (now.abs_value - expiration.abs_value));
370 #endif
371   GNUNET_STATISTICS_update (stats,
372                             gettext_noop ("# bytes expired"),
373                             size,
374                             GNUNET_YES);
375   GNUNET_CONTAINER_bloomfilter_remove (filter,
376                                        key);
377   expired_kill_task 
378     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
379                                     &delete_expired,
380                                     NULL);
381   return GNUNET_NO;
382 }
383
384
385 /**
386  * Task that is used to remove expired entries from
387  * the datastore.  This task will schedule itself
388  * again automatically to always delete all expired
389  * content quickly.
390  *
391  * @param cls not used
392  * @param tc task context
393  */ 
394 static void
395 delete_expired (void *cls,
396                 const struct GNUNET_SCHEDULER_TaskContext *tc)
397 {
398   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
399   plugin->api->expiration_get (plugin->api->cls, 
400                                &expired_processor,
401                                NULL);
402 }
403
404
405 /**
406  * An iterator over a set of items stored in the datastore
407  * that deletes until we're happy with respect to our quota.
408  *
409  * @param cls closure
410  * @param next_cls closure to pass to the "next" function.
411  * @param key key for the content
412  * @param size number of bytes in data
413  * @param data content stored
414  * @param type type of the content
415  * @param priority priority of the content
416  * @param anonymity anonymity-level for the content
417  * @param expiration expiration time for the content
418  * @param uid unique identifier for the datum;
419  *        maybe 0 if no unique identifier is available
420  *
421  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
422  *         (continue on call to "next", of course),
423  *         GNUNET_NO to delete the item and continue (if supported)
424  */
425 static int 
426 quota_processor (void *cls,
427                  void *next_cls,
428                  const GNUNET_HashCode * key,
429                  uint32_t size,
430                  const void *data,
431                  enum GNUNET_BLOCK_Type type,
432                  uint32_t priority,
433                  uint32_t anonymity,
434                  struct GNUNET_TIME_Absolute expiration, 
435                  uint64_t uid)
436 {
437   unsigned long long *need = cls;
438
439   if (NULL == key)
440     return GNUNET_SYSERR;    
441 #if DEBUG_DATASTORE
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
444               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
445               GNUNET_h2s (key),
446               type,
447               *need);
448 #endif
449   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
450     *need = 0;
451   else
452     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
453   GNUNET_STATISTICS_update (stats,
454                             gettext_noop ("# bytes purged (low-priority)"),
455                             size,
456                             GNUNET_YES);
457   GNUNET_CONTAINER_bloomfilter_remove (filter,
458                                        key);
459   return GNUNET_NO;
460 }
461
462
463 /**
464  * Manage available disk space by running tasks
465  * that will discard content if necessary.  This
466  * function will be run whenever a request for
467  * "need" bytes of storage could only be satisfied
468  * by eating into the "cache" (and we want our cache
469  * space back).
470  *
471  * @param need number of bytes of content that were
472  *        placed into the "cache" (and hence the
473  *        number of bytes that should be removed).
474  */
475 static void
476 manage_space (unsigned long long need)
477 {
478   unsigned long long last;
479
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Asked to free up %llu bytes of cache space\n",
483               need);
484 #endif
485   last = 0;
486   while ( (need > 0) &&
487           (last != need) )
488     {
489       last = need;
490       plugin->api->expiration_get (plugin->api->cls,
491                                    &quota_processor,
492                                    &need);    
493     }
494 }
495
496
497 /**
498  * Function called to notify a client about the socket
499  * begin ready to queue more data.  "buf" will be
500  * NULL and "size" zero if the socket was closed for
501  * writing in the meantime.
502  *
503  * @param cls closure
504  * @param size number of bytes available in buf
505  * @param buf where the callee should write the message
506  * @return number of bytes written to buf
507  */
508 static size_t
509 transmit_callback (void *cls,
510                    size_t size, void *buf)
511 {
512   struct TransmitCallbackContext *tcc = cls;
513   size_t msize;
514   
515   tcc->th = NULL;
516   GNUNET_CONTAINER_DLL_remove (tcc_head,
517                                tcc_tail,
518                                tcc);
519   msize = ntohs(tcc->msg->size);
520   if (size == 0)
521     {
522       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
523                   _("Transmission to client failed!\n"));
524       if (tcc->tc != NULL)
525         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
526       if (GNUNET_YES == tcc->end)
527         {
528           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                       _("Disconnecting client due to transmission failure!\n"));
530           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
531         }
532       GNUNET_SERVER_client_drop (tcc->client);
533       GNUNET_free (tcc->msg);
534       GNUNET_free (tcc);
535       return 0;
536     }
537   GNUNET_assert (size >= msize);
538   memcpy (buf, tcc->msg, msize);
539   if (tcc->tc != NULL)
540     tcc->tc (tcc->tc_cls, GNUNET_OK);
541   if (GNUNET_YES == tcc->end)
542     {
543 #if DEBUG_DATASTORE
544       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545                   "Done processing client request\n");
546 #endif
547       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
548     }
549   else
550     {
551 #if DEBUG_DATASTORE
552       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553                   "Response transmitted, more pending!\n");
554 #endif
555     }
556   GNUNET_SERVER_client_drop (tcc->client);
557   GNUNET_free (tcc->msg);
558   GNUNET_free (tcc);
559   return msize;
560 }
561
562
563 /**
564  * Transmit the given message to the client.
565  *
566  * @param client target of the message
567  * @param msg message to transmit, will be freed!
568  * @param tc function to call afterwards
569  * @param tc_cls closure for tc
570  * @param end is this the last response (and we should
571  *        signal the server completion accodingly after
572  *        transmitting this message)?
573  */
574 static void
575 transmit (struct GNUNET_SERVER_Client *client,
576           struct GNUNET_MessageHeader *msg,
577           TransmitContinuation tc,
578           void *tc_cls,
579           int end)
580 {
581   struct TransmitCallbackContext *tcc;
582
583   if (GNUNET_YES == cleaning_done)
584     {
585 #if DEBUG_DATASTORE
586       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
587                   "Shutdown in progress, aborting transmission.\n");
588 #endif
589       GNUNET_free (msg);
590       if (NULL != tc)
591         tc (tc_cls, GNUNET_SYSERR);
592       return;
593     }
594   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
595   tcc->msg = msg;
596   tcc->client = client;
597   tcc->tc = tc;
598   tcc->tc_cls = tc_cls;
599   tcc->end = end;
600   if (NULL ==
601       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
602                                                       ntohs(msg->size),
603                                                       GNUNET_TIME_UNIT_FOREVER_REL,
604                                                       &transmit_callback,
605                                                       tcc)))
606     {
607       GNUNET_break (0);
608       if (GNUNET_YES == end)
609         {
610           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
611                       _("Forcefully disconnecting client.\n"));
612           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
613         }
614       if (NULL != tc)
615         tc (tc_cls, GNUNET_SYSERR);
616       GNUNET_free (msg);
617       GNUNET_free (tcc);
618       return;
619     }
620   GNUNET_SERVER_client_keep (client);
621   GNUNET_CONTAINER_DLL_insert (tcc_head,
622                                tcc_tail,
623                                tcc);
624 }
625
626
627 /**
628  * Transmit a status code to the client.
629  *
630  * @param client receiver of the response
631  * @param code status code
632  * @param msg optional error message (can be NULL)
633  */
634 static void
635 transmit_status (struct GNUNET_SERVER_Client *client,
636                  int code,
637                  const char *msg)
638 {
639   struct StatusMessage *sm;
640   size_t slen;
641
642 #if DEBUG_DATASTORE
643   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644               "Transmitting `%s' message with value %d and message `%s'\n",
645               "STATUS",
646               code,
647               msg != NULL ? msg : "(none)");
648 #endif
649   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
650   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
651   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
652   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
653   sm->status = htonl(code);
654   if (slen > 0)
655     memcpy (&sm[1], msg, slen);  
656   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
657 }
658
659
660 /**
661  * Function called once the transmit operation has
662  * either failed or succeeded.
663  *
664  * @param next_cls closure for calling "next_request" callback
665  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
666  */
667 static void 
668 get_next(void *next_cls,
669          int status)
670 {
671   if (status != GNUNET_OK)
672     {
673       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
674                   _("Failed to transmit an item to the client; aborting iteration.\n"));
675       if (plugin != NULL)
676         plugin->api->next_request (next_cls, GNUNET_YES);
677       return;
678     }
679   if (next_cls != NULL)
680    plugin->api->next_request (next_cls, GNUNET_NO);
681 }
682
683
684 /**
685  * Function that will transmit the given datastore entry
686  * to the client.
687  *
688  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
689  * @param next_cls closure to use to ask for the next item
690  * @param key key for the content
691  * @param size number of bytes in data
692  * @param data content stored
693  * @param type type of the content
694  * @param priority priority of the content
695  * @param anonymity anonymity-level for the content
696  * @param expiration expiration time for the content
697  * @param uid unique identifier for the datum;
698  *        maybe 0 if no unique identifier is available
699  *
700  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
701  *         GNUNET_NO to delete the item and continue (if supported)
702  */
703 static int
704 transmit_item (void *cls,
705                void *next_cls,
706                const GNUNET_HashCode * key,
707                uint32_t size,
708                const void *data,
709                enum GNUNET_BLOCK_Type type,
710                uint32_t priority,
711                uint32_t anonymity,
712                struct GNUNET_TIME_Absolute
713                expiration, uint64_t uid)
714 {
715   struct GNUNET_SERVER_Client *client = cls;
716   struct GNUNET_MessageHeader *end;
717   struct DataMessage *dm;
718
719   if (key == NULL)
720     {
721       /* transmit 'DATA_END' */
722 #if DEBUG_DATASTORE
723       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                   "Transmitting `%s' message\n",
725                   "DATA_END");
726 #endif
727       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
728       end->size = htons(sizeof(struct GNUNET_MessageHeader));
729       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
730       transmit (client, end, NULL, NULL, GNUNET_YES);
731       GNUNET_SERVER_client_drop (client);
732       return GNUNET_OK;
733     }
734   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
735   dm->header.size = htons(sizeof(struct DataMessage) + size);
736   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
737   dm->rid = htonl(0);
738   dm->size = htonl(size);
739   dm->type = htonl(type);
740   dm->priority = htonl(priority);
741   dm->anonymity = htonl(anonymity);
742   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
743   dm->uid = GNUNET_htonll(uid);
744   dm->key = *key;
745   memcpy (&dm[1], data, size);
746 #if DEBUG_DATASTORE
747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748               "Transmitting `%s' message for `%s' of type %u\n",
749               "DATA",
750               GNUNET_h2s (key),
751               type);
752 #endif
753   GNUNET_STATISTICS_update (stats,
754                             gettext_noop ("# results found"),
755                             1,
756                             GNUNET_NO);
757   transmit (client, &dm->header, &get_next, next_cls, 
758             (next_cls != NULL) ? GNUNET_NO : GNUNET_YES);
759   return GNUNET_OK;
760 }
761
762
763 /**
764  * Handle RESERVE-message.
765  *
766  * @param cls closure
767  * @param client identification of the client
768  * @param message the actual message
769  */
770 static void
771 handle_reserve (void *cls,
772                 struct GNUNET_SERVER_Client *client,
773                 const struct GNUNET_MessageHeader *message)
774 {
775   /**
776    * Static counter to produce reservation identifiers.
777    */
778   static int reservation_gen;
779
780   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
781   struct ReservationList *e;
782   unsigned long long used;
783   unsigned long long req;
784   uint64_t amount;
785   uint32_t entries;
786
787 #if DEBUG_DATASTORE
788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789               "Processing `%s' request\n",
790               "RESERVE");
791 #endif
792   amount = GNUNET_ntohll(msg->amount);
793   entries = ntohl(msg->entries);
794   used = payload + reserved;
795   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
796   if (used + req > quota)
797     {
798       if (quota < used)
799         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
800       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
801                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
802                   quota - used,
803                   "RESERVE",
804                   req);
805       if (cache_size < req)
806         {
807           /* TODO: document this in the FAQ; essentially, if this
808              message happens, the insertion request could be blocked
809              by less-important content from migration because it is
810              larger than 1/8th of the overall available space, and
811              we only reserve 1/8th for "fresh" insertions */
812           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
813                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
814                       req,
815                       cache_size);
816           transmit_status (client, 0, 
817                            gettext_noop ("Insufficient space to satisfy request and "
818                                          "requested amount is larger than cache size"));
819         }
820       else
821         {
822           transmit_status (client, 0, 
823                            gettext_noop ("Insufficient space to satisfy request"));
824         }
825       return;      
826     }
827   reserved += req;
828   GNUNET_STATISTICS_set (stats,
829                          gettext_noop ("# reserved"),
830                          reserved,
831                          GNUNET_NO);
832   e = GNUNET_malloc (sizeof(struct ReservationList));
833   e->next = reservations;
834   reservations = e;
835   e->client = client;
836   e->amount = amount;
837   e->entries = entries;
838   e->rid = ++reservation_gen;
839   if (reservation_gen < 0)
840     reservation_gen = 0; /* wrap around */
841   transmit_status (client, e->rid, NULL);
842 }
843
844
845 /**
846  * Handle RELEASE_RESERVE-message.
847  *
848  * @param cls closure
849  * @param client identification of the client
850  * @param message the actual message
851  */
852 static void
853 handle_release_reserve (void *cls,
854                         struct GNUNET_SERVER_Client *client,
855                         const struct GNUNET_MessageHeader *message)
856 {
857   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
858   struct ReservationList *pos;
859   struct ReservationList *prev;
860   struct ReservationList *next;
861   int rid = ntohl(msg->rid);
862   unsigned long long rem;
863
864 #if DEBUG_DATASTORE
865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866               "Processing `%s' request\n",
867               "RELEASE_RESERVE");
868 #endif
869   next = reservations;
870   prev = NULL;
871   while (NULL != (pos = next))
872     {
873       next = pos->next;
874       if (rid == pos->rid)
875         {
876           if (prev == NULL)
877             reservations = next;
878           else
879             prev->next = next;
880           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
881           GNUNET_assert (reserved >= rem);
882           reserved -= rem;
883           GNUNET_STATISTICS_set (stats,
884                          gettext_noop ("# reserved"),
885                                  reserved,
886                                  GNUNET_NO);
887 #if DEBUG_DATASTORE
888           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                       "Returning %llu remaining reserved bytes to storage pool\n",
890                       rem);
891 #endif    
892           GNUNET_free (pos);
893           transmit_status (client, GNUNET_OK, NULL);
894           return;
895         }       
896       prev = pos;
897     }
898   GNUNET_break (0);
899   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
900 }
901
902
903 /**
904  * Check that the given message is a valid data message.
905  *
906  * @return NULL if the message is not well-formed, otherwise the message
907  */
908 static const struct DataMessage *
909 check_data (const struct GNUNET_MessageHeader *message)
910 {
911   uint16_t size;
912   uint32_t dsize;
913   const struct DataMessage *dm;
914
915   size = ntohs(message->size);
916   if (size < sizeof(struct DataMessage))
917     { 
918       GNUNET_break (0);
919       return NULL;
920     }
921   dm = (const struct DataMessage *) message;
922   dsize = ntohl(dm->size);
923   if (size != dsize + sizeof(struct DataMessage))
924     {
925       GNUNET_break (0);
926       return NULL;
927     }
928   return dm;
929 }
930
931
932 /**
933  * Context for a put request used to see if the content is
934  * already present.
935  */
936 struct PutContext
937 {
938   /**
939    * Client to notify on completion.
940    */
941   struct GNUNET_SERVER_Client *client;
942
943   /**
944    * Did we find the data already in the database?
945    */
946   int is_present;
947   
948   /* followed by the 'struct DataMessage' */
949 };
950
951
952 /**
953  * Actually put the data message.
954  */
955 static void
956 execute_put (struct GNUNET_SERVER_Client *client,
957              const struct DataMessage *dm)
958 {
959   uint32_t size;
960   char *msg;
961   int ret;
962
963   size = ntohl(dm->size);
964   msg = NULL;
965   ret = plugin->api->put (plugin->api->cls,
966                           &dm->key,
967                           size,
968                           &dm[1],
969                           ntohl(dm->type),
970                           ntohl(dm->priority),
971                           ntohl(dm->anonymity),
972                           0 /* FIXME: replication */,
973                           GNUNET_TIME_absolute_ntoh(dm->expiration),
974                           &msg);
975   if (GNUNET_OK == ret)
976     {
977       GNUNET_STATISTICS_update (stats,
978                                 gettext_noop ("# bytes stored"),
979                                 size,
980                                 GNUNET_YES);
981       GNUNET_CONTAINER_bloomfilter_add (filter,
982                                         &dm->key);
983 #if DEBUG_DATASTORE
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Successfully stored %u bytes of type %u under key `%s'\n",
986                   size,
987                   ntohl(dm->type),
988                   GNUNET_h2s (&dm->key));
989 #endif
990     }
991   transmit_status (client, 
992                    ret,
993                    msg);
994   GNUNET_free_non_null (msg);
995   if (quota - reserved - cache_size < payload)
996     {
997       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
998                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
999                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
1000                   (unsigned long long) (quota - reserved - cache_size),
1001                   (unsigned long long) payload);
1002       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1003     }
1004 }
1005
1006
1007 /**
1008  * Function that will check if the given datastore entry
1009  * matches the put and if none match executes the put.
1010  *
1011  * @param cls closure, pointer to the client (of type 'struct PutContext').
1012  * @param next_cls closure to use to ask for the next item
1013  * @param key key for the content
1014  * @param size number of bytes in data
1015  * @param data content stored
1016  * @param type type of the content
1017  * @param priority priority of the content
1018  * @param anonymity anonymity-level for the content
1019  * @param expiration expiration time for the content
1020  * @param uid unique identifier for the datum;
1021  *        maybe 0 if no unique identifier is available
1022  *
1023  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1024  *         GNUNET_NO to delete the item and continue (if supported)
1025  */
1026 static int
1027 check_present (void *cls,
1028                void *next_cls,
1029                const GNUNET_HashCode * key,
1030                uint32_t size,
1031                const void *data,
1032                enum GNUNET_BLOCK_Type type,
1033                uint32_t priority,
1034                uint32_t anonymity,
1035                struct GNUNET_TIME_Absolute
1036                expiration, uint64_t uid)
1037 {
1038   struct PutContext *pc = cls;
1039   const struct DataMessage *dm;
1040
1041   dm = (const struct DataMessage*) &pc[1];
1042   if (key == NULL)
1043     {
1044       if (pc->is_present == GNUNET_YES) 
1045         transmit_status (pc->client, GNUNET_NO, NULL);
1046       else
1047         execute_put (pc->client, dm);
1048       GNUNET_SERVER_client_drop (pc->client);
1049       GNUNET_free (pc);
1050       return GNUNET_SYSERR;
1051     }
1052   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1053        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1054        ( (size == ntohl(dm->size)) &&
1055          (0 == memcmp (&dm[1],
1056                        data,
1057                        size)) ) )
1058     {
1059       pc->is_present = GNUNET_YES;
1060       plugin->api->next_request (next_cls, GNUNET_YES);
1061     }
1062   else
1063     {
1064       plugin->api->next_request (next_cls, GNUNET_NO);
1065     }
1066   return GNUNET_OK;
1067 }
1068
1069
1070 /**
1071  * Handle PUT-message.
1072  *
1073  * @param cls closure
1074  * @param client identification of the client
1075  * @param message the actual message
1076  */
1077 static void
1078 handle_put (void *cls,
1079             struct GNUNET_SERVER_Client *client,
1080             const struct GNUNET_MessageHeader *message)
1081 {
1082   const struct DataMessage *dm = check_data (message);
1083   int rid;
1084   struct ReservationList *pos;
1085   struct PutContext *pc;
1086   uint32_t size;
1087
1088   if ( (dm == NULL) ||
1089        (ntohl(dm->type) == 0) ) 
1090     {
1091       GNUNET_break (0);
1092       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1093       return;
1094     }
1095 #if DEBUG_DATASTORE
1096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097               "Processing `%s' request for `%s' of type %u\n",
1098               "PUT",
1099               GNUNET_h2s (&dm->key),
1100               ntohl (dm->type));
1101 #endif
1102   rid = ntohl(dm->rid);
1103   size = ntohl(dm->size);
1104   if (rid > 0)
1105     {
1106       pos = reservations;
1107       while ( (NULL != pos) &&
1108               (rid != pos->rid) )
1109         pos = pos->next;
1110       GNUNET_break (pos != NULL);
1111       if (NULL != pos)
1112         {
1113           GNUNET_break (pos->entries > 0);
1114           GNUNET_break (pos->amount >= size);
1115           pos->entries--;
1116           pos->amount -= size;
1117           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1118           GNUNET_STATISTICS_set (stats,
1119                                  gettext_noop ("# reserved"),
1120                                  reserved,
1121                                  GNUNET_NO);
1122         }
1123     }
1124   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1125                                                        &dm->key))
1126     {
1127       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1128       pc->client = client;
1129       GNUNET_SERVER_client_keep (client);
1130       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1131       plugin->api->get (plugin->api->cls,
1132                         &dm->key,
1133                         NULL,
1134                         ntohl (dm->type),
1135                         &check_present,
1136                         pc);      
1137       return;
1138     }
1139   execute_put (client, dm);
1140 }
1141
1142
1143 /**
1144  * Handle GET-message.
1145  *
1146  * @param cls closure
1147  * @param client identification of the client
1148  * @param message the actual message
1149  */
1150 static void
1151 handle_get (void *cls,
1152             struct GNUNET_SERVER_Client *client,
1153             const struct GNUNET_MessageHeader *message)
1154 {
1155   const struct GetMessage *msg;
1156   uint16_t size;
1157
1158   size = ntohs(message->size);
1159   if ( (size != sizeof(struct GetMessage)) &&
1160        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1161     {
1162       GNUNET_break (0);
1163       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1164       return;
1165     }
1166   msg = (const struct GetMessage*) message;
1167 #if DEBUG_DATASTORE
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "Processing `%s' request for `%s' of type %u\n",
1170               "GET",
1171               GNUNET_h2s (&msg->key),
1172               ntohl (msg->type));
1173 #endif
1174   GNUNET_STATISTICS_update (stats,
1175                             gettext_noop ("# GET requests received"),
1176                             1,
1177                             GNUNET_NO);
1178   GNUNET_SERVER_client_keep (client);
1179   if ( (size == sizeof(struct GetMessage)) &&
1180        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1181                                                          &msg->key)) )
1182     {
1183       /* don't bother database... */
1184 #if DEBUG_DATASTORE
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1187                   "GET",
1188                   GNUNET_h2s (&msg->key));
1189 #endif  
1190       GNUNET_STATISTICS_update (stats,
1191                                 gettext_noop ("# requests filtered by bloomfilter"),
1192                                 1,
1193                                 GNUNET_NO);
1194       transmit_item (client,
1195                      NULL, NULL, 0, NULL, 0, 0, 0, 
1196                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1197       return;
1198     }
1199   plugin->api->get (plugin->api->cls,
1200                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1201                     NULL,
1202                     ntohl(msg->type),
1203                     &transmit_item,
1204                     client);    
1205 }
1206
1207
1208 /**
1209  * Handle UPDATE-message.
1210  *
1211  * @param cls closure
1212  * @param client identification of the client
1213  * @param message the actual message
1214  */
1215 static void
1216 handle_update (void *cls,
1217                struct GNUNET_SERVER_Client *client,
1218                const struct GNUNET_MessageHeader *message)
1219 {
1220   const struct UpdateMessage *msg;
1221   int ret;
1222   char *emsg;
1223
1224   GNUNET_STATISTICS_update (stats,
1225                             gettext_noop ("# UPDATE requests received"),
1226                             1,
1227                             GNUNET_NO);
1228   msg = (const struct UpdateMessage*) message;
1229   emsg = NULL;
1230 #if DEBUG_DATASTORE
1231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232               "Processing `%s' request for %llu\n",
1233               "UPDATE",
1234               (unsigned long long) GNUNET_ntohll (msg->uid));
1235 #endif
1236   ret = plugin->api->update (plugin->api->cls,
1237                              GNUNET_ntohll(msg->uid),
1238                              (int32_t) ntohl(msg->priority),
1239                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1240                              &emsg);
1241   transmit_status (client, ret, emsg);
1242   GNUNET_free_non_null (emsg);
1243 }
1244
1245
1246 /**
1247  * Handle GET_REPLICATION-message.
1248  *
1249  * @param cls closure
1250  * @param client identification of the client
1251  * @param message the actual message
1252  */
1253 static void
1254 handle_get_replication (void *cls,
1255                         struct GNUNET_SERVER_Client *client,
1256                         const struct GNUNET_MessageHeader *message)
1257 {
1258 #if DEBUG_DATASTORE
1259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260               "Processing `%s' request\n",
1261               "GET_REPLICATION");
1262 #endif
1263   GNUNET_STATISTICS_update (stats,
1264                             gettext_noop ("# GET REPLICATION requests received"),
1265                             1,
1266                             GNUNET_NO);
1267   GNUNET_SERVER_client_keep (client);
1268   plugin->api->replication_get (plugin->api->cls,
1269                                 &transmit_item,
1270                                 client);  
1271 }
1272
1273
1274 /**
1275  * Handle GET_ZERO_ANONYMITY-message.
1276  *
1277  * @param cls closure
1278  * @param client identification of the client
1279  * @param message the actual message
1280  */
1281 static void
1282 handle_get_zero_anonymity (void *cls,
1283                            struct GNUNET_SERVER_Client *client,
1284                            const struct GNUNET_MessageHeader *message)
1285 {
1286   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1287   enum GNUNET_BLOCK_Type type;
1288
1289   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1290   if (type == GNUNET_BLOCK_TYPE_ANY)
1291     {
1292       GNUNET_break (0);
1293       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1294       return;
1295     }
1296 #if DEBUG_DATASTORE
1297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298               "Processing `%s' request\n",
1299               "GET_ZERO_ANONYMITY");
1300 #endif
1301   GNUNET_STATISTICS_update (stats,
1302                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1303                             1,
1304                             GNUNET_NO);
1305   GNUNET_SERVER_client_keep (client);
1306   plugin->api->iter_zero_anonymity (plugin->api->cls,
1307                                     type,
1308                                     &transmit_item,
1309                                     client);  
1310 }
1311
1312
1313 /**
1314  * Context for the 'remove_callback'.
1315  */
1316 struct RemoveContext 
1317 {
1318   /**
1319    * Client for whom we're doing the remvoing.
1320    */
1321   struct GNUNET_SERVER_Client *client;
1322
1323   /**
1324    * GNUNET_YES if we managed to remove something.
1325    */
1326   int found;
1327 };
1328
1329
1330 /**
1331  * Callback function that will cause the item that is passed
1332  * in to be deleted (by returning GNUNET_NO).
1333  */
1334 static int
1335 remove_callback (void *cls,
1336                  void *next_cls,
1337                  const GNUNET_HashCode * key,
1338                  uint32_t size,
1339                  const void *data,
1340                  enum GNUNET_BLOCK_Type type,
1341                  uint32_t priority,
1342                  uint32_t anonymity,
1343                  struct GNUNET_TIME_Absolute
1344                  expiration, uint64_t uid)
1345 {
1346   struct RemoveContext *rc = cls;
1347
1348   if (key == NULL)
1349     {
1350 #if DEBUG_DATASTORE
1351       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1352                   "No further matches for `%s' request.\n",
1353                   "REMOVE");
1354 #endif  
1355       if (GNUNET_YES == rc->found)
1356         transmit_status (rc->client, GNUNET_OK, NULL);       
1357       else
1358         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1359       GNUNET_SERVER_client_drop (rc->client);
1360       GNUNET_free (rc);
1361       return GNUNET_OK; /* last item */
1362     }
1363   rc->found = GNUNET_YES;
1364 #if DEBUG_DATASTORE
1365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1367               (unsigned long long) uid,
1368               "REMOVE",
1369               GNUNET_h2s (key),
1370               type);
1371 #endif  
1372   GNUNET_STATISTICS_update (stats,
1373                             gettext_noop ("# bytes removed (explicit request)"),
1374                             size,
1375                             GNUNET_YES);
1376   GNUNET_CONTAINER_bloomfilter_remove (filter,
1377                                        key);
1378   plugin->api->next_request (next_cls, GNUNET_YES);
1379   return GNUNET_NO;
1380 }
1381
1382
1383 /**
1384  * Handle REMOVE-message.
1385  *
1386  * @param cls closure
1387  * @param client identification of the client
1388  * @param message the actual message
1389  */
1390 static void
1391 handle_remove (void *cls,
1392                struct GNUNET_SERVER_Client *client,
1393                const struct GNUNET_MessageHeader *message)
1394 {
1395   const struct DataMessage *dm = check_data (message);
1396   GNUNET_HashCode vhash;
1397   struct RemoveContext *rc;
1398
1399   if (dm == NULL)
1400     {
1401       GNUNET_break (0);
1402       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1403       return;
1404     }
1405 #if DEBUG_DATASTORE
1406   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1407               "Processing `%s' request for `%s' of type %u\n",
1408               "REMOVE",
1409               GNUNET_h2s (&dm->key),
1410               ntohl (dm->type));
1411 #endif
1412   GNUNET_STATISTICS_update (stats,
1413                             gettext_noop ("# REMOVE requests received"),
1414                             1,
1415                             GNUNET_NO);
1416   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1417   GNUNET_SERVER_client_keep (client);
1418   rc->client = client;
1419   GNUNET_CRYPTO_hash (&dm[1],
1420                       ntohl(dm->size),
1421                       &vhash);
1422   plugin->api->get (plugin->api->cls,
1423                     &dm->key,
1424                     &vhash,
1425                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1426                     &remove_callback,
1427                     rc);
1428 }
1429
1430
1431 /**
1432  * Handle DROP-message.
1433  *
1434  * @param cls closure
1435  * @param client identification of the client
1436  * @param message the actual message
1437  */
1438 static void
1439 handle_drop (void *cls,
1440              struct GNUNET_SERVER_Client *client,
1441              const struct GNUNET_MessageHeader *message)
1442 {
1443 #if DEBUG_DATASTORE
1444   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445               "Processing `%s' request\n",
1446               "DROP");
1447 #endif
1448   plugin->api->drop (plugin->api->cls);
1449   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1450 }
1451
1452
1453 /**
1454  * Function called by plugins to notify us about a
1455  * change in their disk utilization.
1456  *
1457  * @param cls closure (NULL)
1458  * @param delta change in disk utilization, 
1459  *        0 for "reset to empty"
1460  */
1461 static void
1462 disk_utilization_change_cb (void *cls,
1463                             int delta)
1464 {
1465   if ( (delta < 0) &&
1466        (payload < -delta) )
1467     {
1468       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1469                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1470                   (long long) payload,
1471                   (long long) -delta);
1472       payload = plugin->api->get_size (plugin->api->cls);
1473       sync_stats ();
1474       return;
1475     }
1476   payload += delta;
1477   lastSync++;
1478   if (lastSync >= MAX_STAT_SYNC_LAG)
1479     sync_stats ();
1480 }
1481
1482
1483 /**
1484  * Callback function to process statistic values.
1485  *
1486  * @param cls closure (struct Plugin*)
1487  * @param subsystem name of subsystem that created the statistic
1488  * @param name the name of the datum
1489  * @param value the current value
1490  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1491  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1492  */
1493 static int
1494 process_stat_in (void *cls,
1495                  const char *subsystem,
1496                  const char *name,
1497                  uint64_t value,
1498                  int is_persistent)
1499 {
1500   GNUNET_assert (stats_worked == GNUNET_NO);
1501   stats_worked = GNUNET_YES;
1502   payload += value;
1503 #if DEBUG_SQLITE
1504   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1505               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1506               abs_value,
1507               payload);
1508 #endif
1509   return GNUNET_OK;
1510 }
1511
1512
1513 static void
1514 process_stat_done (void *cls,
1515                    int success)
1516 {
1517   struct DatastorePlugin *plugin = cls;
1518
1519   stat_get = NULL;
1520   if (stats_worked == GNUNET_NO) 
1521     payload = plugin->api->get_size (plugin->api->cls);
1522 }
1523
1524
1525 /**
1526  * Load the datastore plugin.
1527  */
1528 static struct DatastorePlugin *
1529 load_plugin () 
1530 {
1531   struct DatastorePlugin *ret;
1532   char *libname;
1533   char *name;
1534
1535   if (GNUNET_OK !=
1536       GNUNET_CONFIGURATION_get_value_string (cfg,
1537                                              "DATASTORE", "DATABASE", &name))
1538     {
1539       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1540                   _("No `%s' specified for `%s' in configuration!\n"),
1541                   "DATABASE",
1542                   "DATASTORE");
1543       return NULL;
1544     }
1545   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1546   ret->env.cfg = cfg;
1547   ret->env.duc = &disk_utilization_change_cb;
1548   ret->env.cls = NULL;
1549   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1550               _("Loading `%s' datastore plugin\n"), name);
1551   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1552   ret->short_name = name;
1553   ret->lib_name = libname;
1554   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1555   if (ret->api == NULL)
1556     {
1557       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1558                   _("Failed to load datastore plugin for `%s'\n"), name);
1559       GNUNET_free (ret->short_name);
1560       GNUNET_free (libname);
1561       GNUNET_free (ret);
1562       return NULL;
1563     }
1564   return ret;
1565 }
1566
1567
1568 /**
1569  * Function called when the service shuts
1570  * down.  Unloads our datastore plugin.
1571  *
1572  * @param plug plugin to unload
1573  */
1574 static void
1575 unload_plugin (struct DatastorePlugin *plug)
1576 {
1577 #if DEBUG_DATASTORE
1578   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1579               "Datastore service is unloading plugin...\n");
1580 #endif
1581   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1582   GNUNET_free (plug->lib_name);
1583   GNUNET_free (plug->short_name);
1584   GNUNET_free (plug);
1585 }
1586
1587
1588 /**
1589  * Final task run after shutdown.  Unloads plugins and disconnects us from
1590  * statistics.
1591  */
1592 static void
1593 unload_task (void *cls,
1594              const struct GNUNET_SCHEDULER_TaskContext *tc)
1595 {
1596   unload_plugin (plugin);
1597   plugin = NULL;
1598   if (filter != NULL)
1599     {
1600       GNUNET_CONTAINER_bloomfilter_free (filter);
1601       filter = NULL;
1602     }
1603   if (lastSync > 0)
1604     sync_stats ();
1605   if (stat_get != NULL)
1606     {
1607       GNUNET_STATISTICS_get_cancel (stat_get);
1608       stat_get = NULL;
1609     }
1610   if (stats != NULL)
1611     {
1612       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1613       stats = NULL;
1614     }
1615 }
1616
1617
1618 /**
1619  * Last task run during shutdown.  Disconnects us from
1620  * the transport and core.
1621  */
1622 static void
1623 cleaning_task (void *cls,
1624                const struct GNUNET_SCHEDULER_TaskContext *tc)
1625 {
1626   struct TransmitCallbackContext *tcc;
1627
1628   cleaning_done = GNUNET_YES;
1629   while (NULL != (tcc = tcc_head))
1630     {
1631       GNUNET_CONTAINER_DLL_remove (tcc_head,
1632                                    tcc_tail,
1633                                    tcc);
1634       if (tcc->th != NULL)
1635         {
1636           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1637           GNUNET_SERVER_client_drop (tcc->client);
1638         }
1639       if (NULL != tcc->tc)
1640         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1641       GNUNET_free (tcc->msg);
1642       GNUNET_free (tcc);
1643     }
1644   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1645     {
1646       GNUNET_SCHEDULER_cancel (expired_kill_task);
1647       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1648     }
1649   GNUNET_SCHEDULER_add_continuation (&unload_task,
1650                                      NULL,
1651                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1652 }
1653
1654
1655 /**
1656  * Function that removes all active reservations made
1657  * by the given client and releases the space for other
1658  * requests.
1659  *
1660  * @param cls closure
1661  * @param client identification of the client
1662  */
1663 static void
1664 cleanup_reservations (void *cls,
1665                       struct GNUNET_SERVER_Client *client)
1666 {
1667   struct ReservationList *pos;
1668   struct ReservationList *prev;
1669   struct ReservationList *next;
1670
1671   if (client == NULL)
1672     return;
1673   prev = NULL;
1674   pos = reservations;
1675   while (NULL != pos)
1676     {
1677       next = pos->next;
1678       if (pos->client == client)
1679         {
1680           if (prev == NULL)
1681             reservations = next;
1682           else
1683             prev->next = next;
1684           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1685           GNUNET_free (pos);
1686         }
1687       else
1688         {
1689           prev = pos;
1690         }
1691       pos = next;
1692     }
1693   GNUNET_STATISTICS_set (stats,
1694                          gettext_noop ("# reserved"),
1695                          reserved,
1696                          GNUNET_NO);
1697 }
1698
1699
1700 /**
1701  * Process datastore requests.
1702  *
1703  * @param cls closure
1704  * @param server the initialized server
1705  * @param c configuration to use
1706  */
1707 static void
1708 run (void *cls,
1709      struct GNUNET_SERVER_Handle *server,
1710      const struct GNUNET_CONFIGURATION_Handle *c)
1711 {
1712   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1713     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1714      sizeof(struct ReserveMessage) }, 
1715     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1716      sizeof(struct ReleaseReserveMessage) }, 
1717     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1718     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1719      sizeof (struct UpdateMessage) }, 
1720     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1721     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1722      sizeof(struct GNUNET_MessageHeader) }, 
1723     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1724      sizeof(struct GetZeroAnonymityMessage) }, 
1725     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1726     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1727      sizeof(struct GNUNET_MessageHeader) }, 
1728     {NULL, NULL, 0, 0}
1729   };
1730   char *fn;
1731   unsigned int bf_size;
1732
1733   cfg = c;
1734   if (GNUNET_OK !=
1735       GNUNET_CONFIGURATION_get_value_number (cfg,
1736                                              "DATASTORE", "QUOTA", &quota))
1737     {
1738       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1739                   _("No `%s' specified for `%s' in configuration!\n"),
1740                   "QUOTA",
1741                   "DATASTORE");
1742       return;
1743     }
1744   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1745   GNUNET_STATISTICS_set (stats,
1746                          gettext_noop ("# quota"),
1747                          quota,
1748                          GNUNET_NO);
1749   cache_size = quota / 8; /* Or should we make this an option? */
1750   GNUNET_STATISTICS_set (stats,
1751                          gettext_noop ("# cache size"),
1752                          cache_size,
1753                          GNUNET_NO);
1754   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1755   fn = NULL;
1756   if ( (GNUNET_OK !=
1757         GNUNET_CONFIGURATION_get_value_filename (cfg,
1758                                                  "DATASTORE",
1759                                                  "BLOOMFILTER",
1760                                                  &fn)) ||
1761        (GNUNET_OK !=
1762         GNUNET_DISK_directory_create_for_file (fn)) )
1763     {
1764       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1765                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1766                   fn != NULL ? fn : "");
1767       GNUNET_free_non_null (fn);
1768       fn = NULL;
1769     }
1770   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1771   GNUNET_free_non_null (fn);
1772   if (filter == NULL)
1773     {
1774       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1775                   _("Failed to initialize bloomfilter.\n"));
1776       if (stats != NULL)
1777         {
1778           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1779           stats = NULL;
1780         }
1781       return;
1782     }
1783   plugin = load_plugin ();
1784   if (NULL == plugin)
1785     {
1786       GNUNET_CONTAINER_bloomfilter_free (filter);
1787       filter = NULL;
1788       if (stats != NULL)
1789         {
1790           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1791           stats = NULL;
1792         }
1793       return;
1794     }
1795   stat_get = GNUNET_STATISTICS_get (stats,
1796                                     "datastore",
1797                                     QUOTA_STAT_NAME,
1798                                     GNUNET_TIME_UNIT_SECONDS,
1799                                     &process_stat_done,
1800                                     &process_stat_in,
1801                                     plugin);
1802   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1803   GNUNET_SERVER_add_handlers (server, handlers);
1804   expired_kill_task
1805     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1806                                           &delete_expired, NULL);
1807   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1808                                 &cleaning_task, NULL);
1809 }
1810
1811
1812 /**
1813  * The main function for the datastore service.
1814  *
1815  * @param argc number of arguments from the command line
1816  * @param argv command line arguments
1817  * @return 0 ok, 1 on error
1818  */
1819 int
1820 main (int argc, char *const *argv)
1821 {
1822   int ret;
1823
1824   ret = (GNUNET_OK ==
1825          GNUNET_SERVICE_run (argc,
1826                              argv,
1827                              "datastore",
1828                              GNUNET_SERVICE_OPTION_NONE,
1829                              &run, NULL)) ? 0 : 1;
1830   return ret;
1831 }
1832
1833
1834 /* end of gnunet-service-datastore.c */