pass repl
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * How much space are we using for the cache?  (space available for
149  * insertions that will be instantly reclaimed by discarding less
150  * important content --- or possibly whatever we just inserted into
151  * the "cache").
152  */
153 static unsigned long long cache_size;
154
155 /**
156  * How much space have we currently reserved?
157  */
158 static unsigned long long reserved;
159   
160 /**
161  * How much data are we currently storing
162  * in the database?
163  */
164 static unsigned long long payload;
165
166 /**
167  * Number of updates that were made to the
168  * payload value since we last synchronized
169  * it with the statistics service.
170  */
171 static unsigned int lastSync;
172
173 /**
174  * Did we get an answer from statistics?
175  */
176 static int stats_worked;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
183
184 /**
185  * Our configuration.
186  */
187 const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189
190 /**
191  * Handle for reporting statistics.
192  */
193 static struct GNUNET_STATISTICS_Handle *stats;
194
195
196 /**
197  * Synchronize our utilization statistics with the 
198  * statistics service.
199  */
200 static void 
201 sync_stats ()
202 {
203   GNUNET_STATISTICS_set (stats,
204                          QUOTA_STAT_NAME,
205                          payload,
206                          GNUNET_YES);
207   lastSync = 0;
208 }
209
210
211
212 /**
213  * Context for transmitting replies to clients.
214  */
215 struct TransmitCallbackContext 
216 {
217   
218   /**
219    * We keep these in a doubly-linked list (for cleanup).
220    */
221   struct TransmitCallbackContext *next;
222   
223   /**
224    * We keep these in a doubly-linked list (for cleanup).
225    */
226   struct TransmitCallbackContext *prev;
227   
228   /**
229    * The message that we're asked to transmit.
230    */
231   struct GNUNET_MessageHeader *msg;
232   
233   /**
234    * Handle for the transmission request.
235    */
236   struct GNUNET_CONNECTION_TransmitHandle *th;
237
238   /**
239    * Client that we are transmitting to.
240    */
241   struct GNUNET_SERVER_Client *client;
242
243 };
244
245   
246 /**
247  * Head of the doubly-linked list (for cleanup).
248  */
249 static struct TransmitCallbackContext *tcc_head;
250
251 /**
252  * Tail of the doubly-linked list (for cleanup).
253  */
254 static struct TransmitCallbackContext *tcc_tail;
255
256 /**
257  * Have we already cleaned up the TCCs and are hence no longer
258  * willing (or able) to transmit anything to anyone?
259  */
260 static int cleaning_done;
261
262 /**
263  * Handle for pending get request.
264  */
265 static struct GNUNET_STATISTICS_GetHandle *stat_get;
266
267
268 /**
269  * Task that is used to remove expired entries from
270  * the datastore.  This task will schedule itself
271  * again automatically to always delete all expired
272  * content quickly.
273  *
274  * @param cls not used
275  * @param tc task context
276  */ 
277 static void
278 delete_expired (void *cls,
279                 const struct GNUNET_SCHEDULER_TaskContext *tc);
280
281
282 /**
283  * Iterate over the expired items stored in the datastore.
284  * Delete all expired items; once we have processed all
285  * expired items, re-schedule the "delete_expired" task.
286  *
287  * @param cls not used
288  * @param next_cls closure to pass to the "next" function.
289  * @param key key for the content
290  * @param size number of bytes in data
291  * @param data content stored
292  * @param type type of the content
293  * @param priority priority of the content
294  * @param anonymity anonymity-level for the content
295  * @param expiration expiration time for the content
296  * @param uid unique identifier for the datum;
297  *        maybe 0 if no unique identifier is available
298  *
299  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
300  *         (continue on call to "next", of course),
301  *         GNUNET_NO to delete the item and continue (if supported)
302  */
303 static int 
304 expired_processor (void *cls,
305                    const GNUNET_HashCode * key,
306                    uint32_t size,
307                    const void *data,
308                    enum GNUNET_BLOCK_Type type,
309                    uint32_t priority,
310                    uint32_t anonymity,
311                    struct GNUNET_TIME_Absolute
312                    expiration, 
313                    uint64_t uid)
314 {
315   struct GNUNET_TIME_Absolute now;
316
317   if (key == NULL) 
318     {
319       expired_kill_task 
320         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
321                                         &delete_expired,
322                                         NULL);
323       return GNUNET_SYSERR;
324     }
325   now = GNUNET_TIME_absolute_get ();
326   if (expiration.abs_value > now.abs_value)
327     {
328       /* finished processing */
329       expired_kill_task 
330         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
331                                         &delete_expired,
332                                         NULL);
333       return GNUNET_SYSERR;
334     }
335 #if DEBUG_DATASTORE
336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
337               "Deleting content `%s' of type %u that expired %llu ms ago\n",
338               GNUNET_h2s (key),
339               type,
340               (unsigned long long) (now.abs_value - expiration.abs_value));
341 #endif
342   GNUNET_STATISTICS_update (stats,
343                             gettext_noop ("# bytes expired"),
344                             size,
345                             GNUNET_YES);
346   GNUNET_CONTAINER_bloomfilter_remove (filter,
347                                        key);
348   expired_kill_task 
349     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
350                                     &delete_expired,
351                                     NULL);
352   return GNUNET_NO;
353 }
354
355
356 /**
357  * Task that is used to remove expired entries from
358  * the datastore.  This task will schedule itself
359  * again automatically to always delete all expired
360  * content quickly.
361  *
362  * @param cls not used
363  * @param tc task context
364  */ 
365 static void
366 delete_expired (void *cls,
367                 const struct GNUNET_SCHEDULER_TaskContext *tc)
368 {
369   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
370   plugin->api->get_expiration (plugin->api->cls, 
371                                &expired_processor,
372                                NULL);
373 }
374
375
376 /**
377  * An iterator over a set of items stored in the datastore
378  * that deletes until we're happy with respect to our quota.
379  *
380  * @param cls closure
381  * @param next_cls closure to pass to the "next" function.
382  * @param key key for the content
383  * @param size number of bytes in data
384  * @param data content stored
385  * @param type type of the content
386  * @param priority priority of the content
387  * @param anonymity anonymity-level for the content
388  * @param expiration expiration time for the content
389  * @param uid unique identifier for the datum;
390  *        maybe 0 if no unique identifier is available
391  *
392  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
393  *         (continue on call to "next", of course),
394  *         GNUNET_NO to delete the item and continue (if supported)
395  */
396 static int 
397 quota_processor (void *cls,
398                  const GNUNET_HashCode * key,
399                  uint32_t size,
400                  const void *data,
401                  enum GNUNET_BLOCK_Type type,
402                  uint32_t priority,
403                  uint32_t anonymity,
404                  struct GNUNET_TIME_Absolute expiration, 
405                  uint64_t uid)
406 {
407   unsigned long long *need = cls;
408
409   if (NULL == key)
410     return GNUNET_SYSERR;    
411 #if DEBUG_DATASTORE
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
414               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
415               GNUNET_h2s (key),
416               type,
417               *need);
418 #endif
419   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
420     *need = 0;
421   else
422     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
423   GNUNET_STATISTICS_update (stats,
424                             gettext_noop ("# bytes purged (low-priority)"),
425                             size,
426                             GNUNET_YES);
427   GNUNET_CONTAINER_bloomfilter_remove (filter,
428                                        key);
429   return GNUNET_NO;
430 }
431
432
433 /**
434  * Manage available disk space by running tasks
435  * that will discard content if necessary.  This
436  * function will be run whenever a request for
437  * "need" bytes of storage could only be satisfied
438  * by eating into the "cache" (and we want our cache
439  * space back).
440  *
441  * @param need number of bytes of content that were
442  *        placed into the "cache" (and hence the
443  *        number of bytes that should be removed).
444  */
445 static void
446 manage_space (unsigned long long need)
447 {
448   unsigned long long last;
449
450 #if DEBUG_DATASTORE
451   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
452               "Asked to free up %llu bytes of cache space\n",
453               need);
454 #endif
455   last = 0;
456   while ( (need > 0) &&
457           (last != need) )
458     {
459       last = need;
460       plugin->api->get_expiration (plugin->api->cls,
461                                    &quota_processor,
462                                    &need);    
463     }
464 }
465
466
467 /**
468  * Function called to notify a client about the socket
469  * begin ready to queue more data.  "buf" will be
470  * NULL and "size" zero if the socket was closed for
471  * writing in the meantime.
472  *
473  * @param cls closure
474  * @param size number of bytes available in buf
475  * @param buf where the callee should write the message
476  * @return number of bytes written to buf
477  */
478 static size_t
479 transmit_callback (void *cls,
480                    size_t size, void *buf)
481 {
482   struct TransmitCallbackContext *tcc = cls;
483   size_t msize;
484   
485   tcc->th = NULL;
486   GNUNET_CONTAINER_DLL_remove (tcc_head,
487                                tcc_tail,
488                                tcc);
489   msize = ntohs(tcc->msg->size);
490   if (size == 0)
491     {
492       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
493                   _("Transmission to client failed!\n"));
494       GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
495       GNUNET_SERVER_client_drop (tcc->client);
496       GNUNET_free (tcc->msg);
497       GNUNET_free (tcc);
498       return 0;
499     }
500   GNUNET_assert (size >= msize);
501   memcpy (buf, tcc->msg, msize);
502   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
503   GNUNET_SERVER_client_drop (tcc->client);
504   GNUNET_free (tcc->msg);
505   GNUNET_free (tcc);
506   return msize;
507 }
508
509
510 /**
511  * Transmit the given message to the client.
512  *
513  * @param client target of the message
514  * @param msg message to transmit, will be freed!
515  * @param tc function to call afterwards
516  * @param tc_cls closure for tc
517  */
518 static void
519 transmit (struct GNUNET_SERVER_Client *client,
520           struct GNUNET_MessageHeader *msg)
521 {
522   struct TransmitCallbackContext *tcc;
523
524   if (GNUNET_YES == cleaning_done)
525     {
526 #if DEBUG_DATASTORE
527       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
528                   "Shutdown in progress, aborting transmission.\n");
529 #endif
530       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
531       GNUNET_free (msg);
532       return;
533     }
534   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
535   tcc->msg = msg;
536   tcc->client = client;
537   if (NULL ==
538       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
539                                                       ntohs(msg->size),
540                                                       GNUNET_TIME_UNIT_FOREVER_REL,
541                                                       &transmit_callback,
542                                                       tcc)))
543     {
544       GNUNET_break (0);
545       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
546       GNUNET_free (msg);
547       GNUNET_free (tcc);
548       return;
549     }
550   GNUNET_SERVER_client_keep (client);
551   GNUNET_CONTAINER_DLL_insert (tcc_head,
552                                tcc_tail,
553                                tcc);
554 }
555
556
557 /**
558  * Transmit a status code to the client.
559  *
560  * @param client receiver of the response
561  * @param code status code
562  * @param msg optional error message (can be NULL)
563  */
564 static void
565 transmit_status (struct GNUNET_SERVER_Client *client,
566                  int code,
567                  const char *msg)
568 {
569   struct StatusMessage *sm;
570   size_t slen;
571
572 #if DEBUG_DATASTORE
573   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
574               "Transmitting `%s' message with value %d and message `%s'\n",
575               "STATUS",
576               code,
577               msg != NULL ? msg : "(none)");
578 #endif
579   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
580   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
581   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
582   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
583   sm->status = htonl(code);
584   if (slen > 0)
585     memcpy (&sm[1], msg, slen);  
586   transmit (client, &sm->header);
587 }
588
589
590
591 /**
592  * Function that will transmit the given datastore entry
593  * to the client.
594  *
595  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
596  * @param next_cls closure to use to ask for the next item
597  * @param key key for the content
598  * @param size number of bytes in data
599  * @param data content stored
600  * @param type type of the content
601  * @param priority priority of the content
602  * @param anonymity anonymity-level for the content
603  * @param expiration expiration time for the content
604  * @param uid unique identifier for the datum;
605  *        maybe 0 if no unique identifier is available
606  *
607  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
608  *         GNUNET_NO to delete the item and continue (if supported)
609  */
610 static int
611 transmit_item (void *cls,
612                const GNUNET_HashCode * key,
613                uint32_t size,
614                const void *data,
615                enum GNUNET_BLOCK_Type type,
616                uint32_t priority,
617                uint32_t anonymity,
618                struct GNUNET_TIME_Absolute
619                expiration, uint64_t uid)
620 {
621   struct GNUNET_SERVER_Client *client = cls;
622   struct GNUNET_MessageHeader *end;
623   struct DataMessage *dm;
624
625   if (key == NULL)
626     {
627       /* transmit 'DATA_END' */
628 #if DEBUG_DATASTORE
629       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630                   "Transmitting `%s' message\n",
631                   "DATA_END");
632 #endif
633       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
634       end->size = htons(sizeof(struct GNUNET_MessageHeader));
635       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
636       transmit (client, end);
637       GNUNET_SERVER_client_drop (client);
638       return GNUNET_OK;
639     }
640   GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
641   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
642   dm->header.size = htons(sizeof(struct DataMessage) + size);
643   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
644   dm->rid = htonl(0);
645   dm->size = htonl(size);
646   dm->type = htonl(type);
647   dm->priority = htonl(priority);
648   dm->anonymity = htonl(anonymity);
649   dm->replication = htonl (0);
650   dm->reserved = htonl (0);
651   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
652   dm->uid = GNUNET_htonll(uid);
653   dm->key = *key;
654   memcpy (&dm[1], data, size);
655 #if DEBUG_DATASTORE
656   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
657               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
658               "DATA",
659               GNUNET_h2s (key),
660               type,
661               (unsigned long long) expiration.abs_value,
662               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
663 #endif
664   GNUNET_STATISTICS_update (stats,
665                             gettext_noop ("# results found"),
666                             1,
667                             GNUNET_NO);
668   transmit (client, &dm->header);
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Handle RESERVE-message.
675  *
676  * @param cls closure
677  * @param client identification of the client
678  * @param message the actual message
679  */
680 static void
681 handle_reserve (void *cls,
682                 struct GNUNET_SERVER_Client *client,
683                 const struct GNUNET_MessageHeader *message)
684 {
685   /**
686    * Static counter to produce reservation identifiers.
687    */
688   static int reservation_gen;
689
690   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
691   struct ReservationList *e;
692   unsigned long long used;
693   unsigned long long req;
694   uint64_t amount;
695   uint32_t entries;
696
697 #if DEBUG_DATASTORE
698   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
699               "Processing `%s' request\n",
700               "RESERVE");
701 #endif
702   amount = GNUNET_ntohll(msg->amount);
703   entries = ntohl(msg->entries);
704   used = payload + reserved;
705   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
706   if (used + req > quota)
707     {
708       if (quota < used)
709         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
710       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
711                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
712                   quota - used,
713                   "RESERVE",
714                   req);
715       if (cache_size < req)
716         {
717           /* TODO: document this in the FAQ; essentially, if this
718              message happens, the insertion request could be blocked
719              by less-important content from migration because it is
720              larger than 1/8th of the overall available space, and
721              we only reserve 1/8th for "fresh" insertions */
722           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
723                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
724                       req,
725                       cache_size);
726           transmit_status (client, 0, 
727                            gettext_noop ("Insufficient space to satisfy request and "
728                                          "requested amount is larger than cache size"));
729         }
730       else
731         {
732           transmit_status (client, 0, 
733                            gettext_noop ("Insufficient space to satisfy request"));
734         }
735       return;      
736     }
737   reserved += req;
738   GNUNET_STATISTICS_set (stats,
739                          gettext_noop ("# reserved"),
740                          reserved,
741                          GNUNET_NO);
742   e = GNUNET_malloc (sizeof(struct ReservationList));
743   e->next = reservations;
744   reservations = e;
745   e->client = client;
746   e->amount = amount;
747   e->entries = entries;
748   e->rid = ++reservation_gen;
749   if (reservation_gen < 0)
750     reservation_gen = 0; /* wrap around */
751   transmit_status (client, e->rid, NULL);
752 }
753
754
755 /**
756  * Handle RELEASE_RESERVE-message.
757  *
758  * @param cls closure
759  * @param client identification of the client
760  * @param message the actual message
761  */
762 static void
763 handle_release_reserve (void *cls,
764                         struct GNUNET_SERVER_Client *client,
765                         const struct GNUNET_MessageHeader *message)
766 {
767   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
768   struct ReservationList *pos;
769   struct ReservationList *prev;
770   struct ReservationList *next;
771   int rid = ntohl(msg->rid);
772   unsigned long long rem;
773
774 #if DEBUG_DATASTORE
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "Processing `%s' request\n",
777               "RELEASE_RESERVE");
778 #endif
779   next = reservations;
780   prev = NULL;
781   while (NULL != (pos = next))
782     {
783       next = pos->next;
784       if (rid == pos->rid)
785         {
786           if (prev == NULL)
787             reservations = next;
788           else
789             prev->next = next;
790           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
791           GNUNET_assert (reserved >= rem);
792           reserved -= rem;
793           GNUNET_STATISTICS_set (stats,
794                          gettext_noop ("# reserved"),
795                                  reserved,
796                                  GNUNET_NO);
797 #if DEBUG_DATASTORE
798           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799                       "Returning %llu remaining reserved bytes to storage pool\n",
800                       rem);
801 #endif    
802           GNUNET_free (pos);
803           transmit_status (client, GNUNET_OK, NULL);
804           return;
805         }       
806       prev = pos;
807     }
808   GNUNET_break (0);
809   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
810 }
811
812
813 /**
814  * Check that the given message is a valid data message.
815  *
816  * @return NULL if the message is not well-formed, otherwise the message
817  */
818 static const struct DataMessage *
819 check_data (const struct GNUNET_MessageHeader *message)
820 {
821   uint16_t size;
822   uint32_t dsize;
823   const struct DataMessage *dm;
824
825   size = ntohs(message->size);
826   if (size < sizeof(struct DataMessage))
827     { 
828       GNUNET_break (0);
829       return NULL;
830     }
831   dm = (const struct DataMessage *) message;
832   dsize = ntohl(dm->size);
833   if (size != dsize + sizeof(struct DataMessage))
834     {
835       GNUNET_break (0);
836       return NULL;
837     }
838   return dm;
839 }
840
841
842 /**
843  * Context for a put request used to see if the content is
844  * already present.
845  */
846 struct PutContext
847 {
848   /**
849    * Client to notify on completion.
850    */
851   struct GNUNET_SERVER_Client *client;
852   
853   /* followed by the 'struct DataMessage' */
854 };
855
856
857 /**
858  * Actually put the data message.
859  */
860 static void
861 execute_put (struct GNUNET_SERVER_Client *client,
862              const struct DataMessage *dm)
863 {
864   uint32_t size;
865   char *msg;
866   int ret;
867
868   size = ntohl(dm->size);
869   msg = NULL;
870   ret = plugin->api->put (plugin->api->cls,
871                           &dm->key,
872                           size,
873                           &dm[1],
874                           ntohl(dm->type),
875                           ntohl(dm->priority),
876                           ntohl(dm->anonymity),
877                           ntohl(dm->replication),
878                           GNUNET_TIME_absolute_ntoh(dm->expiration),
879                           &msg);
880   if (GNUNET_OK == ret)
881     {
882       GNUNET_STATISTICS_update (stats,
883                                 gettext_noop ("# bytes stored"),
884                                 size,
885                                 GNUNET_YES);
886       GNUNET_CONTAINER_bloomfilter_add (filter,
887                                         &dm->key);
888 #if DEBUG_DATASTORE
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Successfully stored %u bytes of type %u under key `%s'\n",
891                   size,
892                   ntohl(dm->type),
893                   GNUNET_h2s (&dm->key));
894 #endif
895     }
896   transmit_status (client, 
897                    ret,
898                    msg);
899   GNUNET_free_non_null (msg);
900   if (quota - reserved - cache_size < payload)
901     {
902       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
903                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
904                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
905                   (unsigned long long) (quota - reserved - cache_size),
906                   (unsigned long long) payload);
907       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
908     }
909 }
910
911
912 /**
913  * Function that will check if the given datastore entry
914  * matches the put and if none match executes the put.
915  *
916  * @param cls closure, pointer to the client (of type 'struct PutContext').
917  * @param key key for the content
918  * @param size number of bytes in data
919  * @param data content stored
920  * @param type type of the content
921  * @param priority priority of the content
922  * @param anonymity anonymity-level for the content
923  * @param expiration expiration time for the content
924  * @param uid unique identifier for the datum;
925  *        maybe 0 if no unique identifier is available
926  *
927  * @return GNUNET_OK usually
928  *         GNUNET_NO to delete the item 
929  */
930 static int
931 check_present (void *cls,
932                const GNUNET_HashCode * key,
933                uint32_t size,
934                const void *data,
935                enum GNUNET_BLOCK_Type type,
936                uint32_t priority,
937                uint32_t anonymity,
938                struct GNUNET_TIME_Absolute
939                expiration, uint64_t uid)
940 {
941   struct PutContext *pc = cls;
942   const struct DataMessage *dm;
943
944   dm = (const struct DataMessage*) &pc[1];
945   if (key == NULL)
946     {
947       execute_put (pc->client, dm);
948       GNUNET_SERVER_client_drop (pc->client);
949       GNUNET_free (pc);
950       return GNUNET_OK;
951     }
952   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
953        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
954        ( (size == ntohl(dm->size)) &&
955          (0 == memcmp (&dm[1],
956                        data,
957                        size)) ) )
958     {
959 #if DEBUG_MYSQL
960       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961                   "Result already present in datastore\n");
962 #endif
963       /* FIXME: change API to allow increasing 'replication' counter */
964       if ( (ntohl (dm->priority) > 0) ||
965            (GNUNET_TIME_absolute_ntoh(dm->expiration).abs_value >
966             expiration.abs_value) )
967         plugin->api->update (plugin->api->cls,
968                              uid,
969                              (int32_t) ntohl(dm->priority),
970                              GNUNET_TIME_absolute_ntoh(dm->expiration),
971                              NULL);
972       transmit_status (pc->client, GNUNET_NO, NULL);
973       GNUNET_SERVER_client_drop (pc->client);
974       GNUNET_free (pc);
975     }
976   else
977     {
978       execute_put (pc->client, dm);
979       GNUNET_SERVER_client_drop (pc->client);
980       GNUNET_free (pc);
981     }
982   return GNUNET_OK;
983 }
984
985
986 /**
987  * Handle PUT-message.
988  *
989  * @param cls closure
990  * @param client identification of the client
991  * @param message the actual message
992  */
993 static void
994 handle_put (void *cls,
995             struct GNUNET_SERVER_Client *client,
996             const struct GNUNET_MessageHeader *message)
997 {
998   const struct DataMessage *dm = check_data (message);
999   int rid;
1000   struct ReservationList *pos;
1001   struct PutContext *pc;
1002   GNUNET_HashCode vhash;
1003   uint32_t size;
1004
1005   if ( (dm == NULL) ||
1006        (ntohl(dm->type) == 0) ) 
1007     {
1008       GNUNET_break (0);
1009       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1010       return;
1011     }
1012 #if DEBUG_DATASTORE
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Processing `%s' request for `%s' of type %u\n",
1015               "PUT",
1016               GNUNET_h2s (&dm->key),
1017               ntohl (dm->type));
1018 #endif
1019   rid = ntohl(dm->rid);
1020   size = ntohl(dm->size);
1021   if (rid > 0)
1022     {
1023       pos = reservations;
1024       while ( (NULL != pos) &&
1025               (rid != pos->rid) )
1026         pos = pos->next;
1027       GNUNET_break (pos != NULL);
1028       if (NULL != pos)
1029         {
1030           GNUNET_break (pos->entries > 0);
1031           GNUNET_break (pos->amount >= size);
1032           pos->entries--;
1033           pos->amount -= size;
1034           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1035           GNUNET_STATISTICS_set (stats,
1036                                  gettext_noop ("# reserved"),
1037                                  reserved,
1038                                  GNUNET_NO);
1039         }
1040     }
1041   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1042                                                        &dm->key))
1043     {
1044       GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1045       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1046       pc->client = client;
1047       GNUNET_SERVER_client_keep (client);
1048       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1049       plugin->api->get_key (plugin->api->cls,
1050                             0,
1051                             &dm->key,
1052                             &vhash,
1053                             ntohl (dm->type),
1054                             &check_present,
1055                             pc);      
1056       return;
1057     }
1058   execute_put (client, dm);
1059 }
1060
1061
1062 /**
1063  * Handle GET-message.
1064  *
1065  * @param cls closure
1066  * @param client identification of the client
1067  * @param message the actual message
1068  */
1069 static void
1070 handle_get (void *cls,
1071             struct GNUNET_SERVER_Client *client,
1072             const struct GNUNET_MessageHeader *message)
1073 {
1074   const struct GetMessage *msg;
1075   uint16_t size;
1076
1077   size = ntohs(message->size);
1078   if ( (size != sizeof(struct GetMessage)) &&
1079        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1080     {
1081       GNUNET_break (0);
1082       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1083       return;
1084     }
1085   msg = (const struct GetMessage*) message;
1086 #if DEBUG_DATASTORE
1087   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088               "Processing `%s' request for `%s' of type %u\n",
1089               "GET",
1090               GNUNET_h2s (&msg->key),
1091               ntohl (msg->type));
1092 #endif
1093   GNUNET_STATISTICS_update (stats,
1094                             gettext_noop ("# GET requests received"),
1095                             1,
1096                             GNUNET_NO);
1097   GNUNET_SERVER_client_keep (client);
1098   if ( (size == sizeof(struct GetMessage)) &&
1099        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1100                                                          &msg->key)) )
1101     {
1102       /* don't bother database... */
1103 #if DEBUG_DATASTORE
1104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1106                   "GET",
1107                   GNUNET_h2s (&msg->key));
1108 #endif  
1109       GNUNET_STATISTICS_update (stats,
1110                                 gettext_noop ("# requests filtered by bloomfilter"),
1111                                 1,
1112                                 GNUNET_NO);
1113       transmit_item (client,
1114                      NULL, 0, NULL, 0, 0, 0, 
1115                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1116       return;
1117     }
1118   plugin->api->get_key (plugin->api->cls,
1119                         GNUNET_ntohll (msg->offset),
1120                         ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1121                         NULL,
1122                         ntohl(msg->type),
1123                         &transmit_item,
1124                         client);    
1125 }
1126
1127
1128 /**
1129  * Handle UPDATE-message.
1130  *
1131  * @param cls closure
1132  * @param client identification of the client
1133  * @param message the actual message
1134  */
1135 static void
1136 handle_update (void *cls,
1137                struct GNUNET_SERVER_Client *client,
1138                const struct GNUNET_MessageHeader *message)
1139 {
1140   const struct UpdateMessage *msg;
1141   int ret;
1142   char *emsg;
1143
1144   GNUNET_STATISTICS_update (stats,
1145                             gettext_noop ("# UPDATE requests received"),
1146                             1,
1147                             GNUNET_NO);
1148   msg = (const struct UpdateMessage*) message;
1149   emsg = NULL;
1150 #if DEBUG_DATASTORE
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Processing `%s' request for %llu\n",
1153               "UPDATE",
1154               (unsigned long long) GNUNET_ntohll (msg->uid));
1155 #endif
1156   ret = plugin->api->update (plugin->api->cls,
1157                              GNUNET_ntohll(msg->uid),
1158                              (int32_t) ntohl(msg->priority),
1159                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1160                              &emsg);
1161   transmit_status (client, ret, emsg);
1162   GNUNET_free_non_null (emsg);
1163 }
1164
1165
1166 /**
1167  * Handle GET_REPLICATION-message.
1168  *
1169  * @param cls closure
1170  * @param client identification of the client
1171  * @param message the actual message
1172  */
1173 static void
1174 handle_get_replication (void *cls,
1175                         struct GNUNET_SERVER_Client *client,
1176                         const struct GNUNET_MessageHeader *message)
1177 {
1178 #if DEBUG_DATASTORE
1179   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180               "Processing `%s' request\n",
1181               "GET_REPLICATION");
1182 #endif
1183   GNUNET_STATISTICS_update (stats,
1184                             gettext_noop ("# GET REPLICATION requests received"),
1185                             1,
1186                             GNUNET_NO);
1187   GNUNET_SERVER_client_keep (client);
1188   plugin->api->get_replication (plugin->api->cls,
1189                                 &transmit_item,
1190                                 client);  
1191 }
1192
1193
1194 /**
1195  * Handle GET_ZERO_ANONYMITY-message.
1196  *
1197  * @param cls closure
1198  * @param client identification of the client
1199  * @param message the actual message
1200  */
1201 static void
1202 handle_get_zero_anonymity (void *cls,
1203                            struct GNUNET_SERVER_Client *client,
1204                            const struct GNUNET_MessageHeader *message)
1205 {
1206   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1207   enum GNUNET_BLOCK_Type type;
1208
1209   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1210   if (type == GNUNET_BLOCK_TYPE_ANY)
1211     {
1212       GNUNET_break (0);
1213       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1214       return;
1215     }
1216 #if DEBUG_DATASTORE
1217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218               "Processing `%s' request\n",
1219               "GET_ZERO_ANONYMITY");
1220 #endif
1221   GNUNET_STATISTICS_update (stats,
1222                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1223                             1,
1224                             GNUNET_NO);
1225   GNUNET_SERVER_client_keep (client);
1226   plugin->api->get_zero_anonymity (plugin->api->cls,
1227                                    GNUNET_ntohll (msg->offset),
1228                                    type,
1229                                    &transmit_item,
1230                                    client);  
1231 }
1232
1233
1234 /**
1235  * Callback function that will cause the item that is passed
1236  * in to be deleted (by returning GNUNET_NO).
1237  */
1238 static int
1239 remove_callback (void *cls,
1240                  const GNUNET_HashCode * key,
1241                  uint32_t size,
1242                  const void *data,
1243                  enum GNUNET_BLOCK_Type type,
1244                  uint32_t priority,
1245                  uint32_t anonymity,
1246                  struct GNUNET_TIME_Absolute
1247                  expiration, uint64_t uid)
1248 {
1249   struct GNUNET_SERVER_Client *client = cls;
1250
1251   if (key == NULL)
1252     {
1253 #if DEBUG_DATASTORE
1254       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                   "No further matches for `%s' request.\n",
1256                   "REMOVE");
1257 #endif  
1258       transmit_status (client, GNUNET_NO, _("Content not found"));              
1259       GNUNET_SERVER_client_drop (client);
1260       return GNUNET_OK; /* last item */
1261     }
1262 #if DEBUG_DATASTORE
1263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1265               (unsigned long long) uid,
1266               "REMOVE",
1267               GNUNET_h2s (key),
1268               type);
1269 #endif  
1270   GNUNET_STATISTICS_update (stats,
1271                             gettext_noop ("# bytes removed (explicit request)"),
1272                             size,
1273                             GNUNET_YES);
1274   GNUNET_CONTAINER_bloomfilter_remove (filter,
1275                                        key);
1276   transmit_status (client, GNUNET_OK, NULL);       
1277   GNUNET_SERVER_client_drop (client);
1278   return GNUNET_NO;
1279 }
1280
1281
1282 /**
1283  * Handle REMOVE-message.
1284  *
1285  * @param cls closure
1286  * @param client identification of the client
1287  * @param message the actual message
1288  */
1289 static void
1290 handle_remove (void *cls,
1291                struct GNUNET_SERVER_Client *client,
1292                const struct GNUNET_MessageHeader *message)
1293 {
1294   const struct DataMessage *dm = check_data (message);
1295   GNUNET_HashCode vhash;
1296
1297   if (dm == NULL)
1298     {
1299       GNUNET_break (0);
1300       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1301       return;
1302     }
1303 #if DEBUG_DATASTORE
1304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1305               "Processing `%s' request for `%s' of type %u\n",
1306               "REMOVE",
1307               GNUNET_h2s (&dm->key),
1308               ntohl (dm->type));
1309 #endif
1310   GNUNET_STATISTICS_update (stats,
1311                             gettext_noop ("# REMOVE requests received"),
1312                             1,
1313                             GNUNET_NO);
1314   GNUNET_SERVER_client_keep (client);
1315   GNUNET_CRYPTO_hash (&dm[1],
1316                       ntohl(dm->size),
1317                       &vhash);
1318   plugin->api->get_key (plugin->api->cls,
1319                         0,
1320                         &dm->key,
1321                         &vhash,
1322                         (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1323                         &remove_callback,
1324                         client);
1325 }
1326
1327
1328 /**
1329  * Handle DROP-message.
1330  *
1331  * @param cls closure
1332  * @param client identification of the client
1333  * @param message the actual message
1334  */
1335 static void
1336 handle_drop (void *cls,
1337              struct GNUNET_SERVER_Client *client,
1338              const struct GNUNET_MessageHeader *message)
1339 {
1340 #if DEBUG_DATASTORE
1341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1342               "Processing `%s' request\n",
1343               "DROP");
1344 #endif
1345   plugin->api->drop (plugin->api->cls);
1346   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1347 }
1348
1349
1350 /**
1351  * Function called by plugins to notify us about a
1352  * change in their disk utilization.
1353  *
1354  * @param cls closure (NULL)
1355  * @param delta change in disk utilization, 
1356  *        0 for "reset to empty"
1357  */
1358 static void
1359 disk_utilization_change_cb (void *cls,
1360                             int delta)
1361 {
1362   if ( (delta < 0) &&
1363        (payload < -delta) )
1364     {
1365       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1366                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1367                   (long long) payload,
1368                   (long long) -delta);
1369       payload = plugin->api->estimate_size (plugin->api->cls);
1370       sync_stats ();
1371       return;
1372     }
1373   payload += delta;
1374   lastSync++;
1375   if (lastSync >= MAX_STAT_SYNC_LAG)
1376     sync_stats ();
1377 }
1378
1379
1380 /**
1381  * Callback function to process statistic values.
1382  *
1383  * @param cls closure (struct Plugin*)
1384  * @param subsystem name of subsystem that created the statistic
1385  * @param name the name of the datum
1386  * @param value the current value
1387  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1388  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1389  */
1390 static int
1391 process_stat_in (void *cls,
1392                  const char *subsystem,
1393                  const char *name,
1394                  uint64_t value,
1395                  int is_persistent)
1396 {
1397   GNUNET_assert (stats_worked == GNUNET_NO);
1398   stats_worked = GNUNET_YES;
1399   payload += value;
1400 #if DEBUG_SQLITE
1401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1403               abs_value,
1404               payload);
1405 #endif
1406   return GNUNET_OK;
1407 }
1408
1409
1410 static void
1411 process_stat_done (void *cls,
1412                    int success)
1413 {
1414   struct DatastorePlugin *plugin = cls;
1415
1416   stat_get = NULL;
1417   if (stats_worked == GNUNET_NO) 
1418     payload = plugin->api->estimate_size (plugin->api->cls);
1419 }
1420
1421
1422 /**
1423  * Load the datastore plugin.
1424  */
1425 static struct DatastorePlugin *
1426 load_plugin () 
1427 {
1428   struct DatastorePlugin *ret;
1429   char *libname;
1430   char *name;
1431
1432   if (GNUNET_OK !=
1433       GNUNET_CONFIGURATION_get_value_string (cfg,
1434                                              "DATASTORE", "DATABASE", &name))
1435     {
1436       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1437                   _("No `%s' specified for `%s' in configuration!\n"),
1438                   "DATABASE",
1439                   "DATASTORE");
1440       return NULL;
1441     }
1442   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1443   ret->env.cfg = cfg;
1444   ret->env.duc = &disk_utilization_change_cb;
1445   ret->env.cls = NULL;
1446   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1447               _("Loading `%s' datastore plugin\n"), name);
1448   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1449   ret->short_name = name;
1450   ret->lib_name = libname;
1451   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1452   if (ret->api == NULL)
1453     {
1454       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1455                   _("Failed to load datastore plugin for `%s'\n"), name);
1456       GNUNET_free (ret->short_name);
1457       GNUNET_free (libname);
1458       GNUNET_free (ret);
1459       return NULL;
1460     }
1461   return ret;
1462 }
1463
1464
1465 /**
1466  * Function called when the service shuts
1467  * down.  Unloads our datastore plugin.
1468  *
1469  * @param plug plugin to unload
1470  */
1471 static void
1472 unload_plugin (struct DatastorePlugin *plug)
1473 {
1474 #if DEBUG_DATASTORE
1475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476               "Datastore service is unloading plugin...\n");
1477 #endif
1478   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1479   GNUNET_free (plug->lib_name);
1480   GNUNET_free (plug->short_name);
1481   GNUNET_free (plug);
1482 }
1483
1484
1485 /**
1486  * Final task run after shutdown.  Unloads plugins and disconnects us from
1487  * statistics.
1488  */
1489 static void
1490 unload_task (void *cls,
1491              const struct GNUNET_SCHEDULER_TaskContext *tc)
1492 {
1493   unload_plugin (plugin);
1494   plugin = NULL;
1495   if (filter != NULL)
1496     {
1497       GNUNET_CONTAINER_bloomfilter_free (filter);
1498       filter = NULL;
1499     }
1500   if (lastSync > 0)
1501     sync_stats ();
1502   if (stat_get != NULL)
1503     {
1504       GNUNET_STATISTICS_get_cancel (stat_get);
1505       stat_get = NULL;
1506     }
1507   if (stats != NULL)
1508     {
1509       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1510       stats = NULL;
1511     }
1512 }
1513
1514
1515 /**
1516  * Last task run during shutdown.  Disconnects us from
1517  * the transport and core.
1518  */
1519 static void
1520 cleaning_task (void *cls,
1521                const struct GNUNET_SCHEDULER_TaskContext *tc)
1522 {
1523   struct TransmitCallbackContext *tcc;
1524
1525   cleaning_done = GNUNET_YES;
1526   while (NULL != (tcc = tcc_head))
1527     {
1528       GNUNET_CONTAINER_DLL_remove (tcc_head,
1529                                    tcc_tail,
1530                                    tcc);
1531       if (tcc->th != NULL)
1532         {
1533           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1534           GNUNET_SERVER_client_drop (tcc->client);
1535         }
1536       GNUNET_free (tcc->msg);
1537       GNUNET_free (tcc);
1538     }
1539   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1540     {
1541       GNUNET_SCHEDULER_cancel (expired_kill_task);
1542       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1543     }
1544   GNUNET_SCHEDULER_add_continuation (&unload_task,
1545                                      NULL,
1546                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1547 }
1548
1549
1550 /**
1551  * Function that removes all active reservations made
1552  * by the given client and releases the space for other
1553  * requests.
1554  *
1555  * @param cls closure
1556  * @param client identification of the client
1557  */
1558 static void
1559 cleanup_reservations (void *cls,
1560                       struct GNUNET_SERVER_Client *client)
1561 {
1562   struct ReservationList *pos;
1563   struct ReservationList *prev;
1564   struct ReservationList *next;
1565
1566   if (client == NULL)
1567     return;
1568   prev = NULL;
1569   pos = reservations;
1570   while (NULL != pos)
1571     {
1572       next = pos->next;
1573       if (pos->client == client)
1574         {
1575           if (prev == NULL)
1576             reservations = next;
1577           else
1578             prev->next = next;
1579           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1580           GNUNET_free (pos);
1581         }
1582       else
1583         {
1584           prev = pos;
1585         }
1586       pos = next;
1587     }
1588   GNUNET_STATISTICS_set (stats,
1589                          gettext_noop ("# reserved"),
1590                          reserved,
1591                          GNUNET_NO);
1592 }
1593
1594
1595 /**
1596  * Process datastore requests.
1597  *
1598  * @param cls closure
1599  * @param server the initialized server
1600  * @param c configuration to use
1601  */
1602 static void
1603 run (void *cls,
1604      struct GNUNET_SERVER_Handle *server,
1605      const struct GNUNET_CONFIGURATION_Handle *c)
1606 {
1607   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1608     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1609      sizeof(struct ReserveMessage) }, 
1610     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1611      sizeof(struct ReleaseReserveMessage) }, 
1612     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1613     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1614      sizeof (struct UpdateMessage) }, 
1615     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1616     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1617      sizeof(struct GNUNET_MessageHeader) }, 
1618     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1619      sizeof(struct GetZeroAnonymityMessage) }, 
1620     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1621     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1622      sizeof(struct GNUNET_MessageHeader) }, 
1623     {NULL, NULL, 0, 0}
1624   };
1625   char *fn;
1626   unsigned int bf_size;
1627
1628   cfg = c;
1629   if (GNUNET_OK !=
1630       GNUNET_CONFIGURATION_get_value_number (cfg,
1631                                              "DATASTORE", "QUOTA", &quota))
1632     {
1633       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1634                   _("No `%s' specified for `%s' in configuration!\n"),
1635                   "QUOTA",
1636                   "DATASTORE");
1637       return;
1638     }
1639   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1640   GNUNET_STATISTICS_set (stats,
1641                          gettext_noop ("# quota"),
1642                          quota,
1643                          GNUNET_NO);
1644   cache_size = quota / 8; /* Or should we make this an option? */
1645   GNUNET_STATISTICS_set (stats,
1646                          gettext_noop ("# cache size"),
1647                          cache_size,
1648                          GNUNET_NO);
1649   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1650   fn = NULL;
1651   if ( (GNUNET_OK !=
1652         GNUNET_CONFIGURATION_get_value_filename (cfg,
1653                                                  "DATASTORE",
1654                                                  "BLOOMFILTER",
1655                                                  &fn)) ||
1656        (GNUNET_OK !=
1657         GNUNET_DISK_directory_create_for_file (fn)) )
1658     {
1659       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1660                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1661                   fn != NULL ? fn : "");
1662       GNUNET_free_non_null (fn);
1663       fn = NULL;
1664     }
1665   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1666   GNUNET_free_non_null (fn);
1667   if (filter == NULL)
1668     {
1669       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1670                   _("Failed to initialize bloomfilter.\n"));
1671       if (stats != NULL)
1672         {
1673           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1674           stats = NULL;
1675         }
1676       return;
1677     }
1678   plugin = load_plugin ();
1679   if (NULL == plugin)
1680     {
1681       GNUNET_CONTAINER_bloomfilter_free (filter);
1682       filter = NULL;
1683       if (stats != NULL)
1684         {
1685           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1686           stats = NULL;
1687         }
1688       return;
1689     }
1690   stat_get = GNUNET_STATISTICS_get (stats,
1691                                     "datastore",
1692                                     QUOTA_STAT_NAME,
1693                                     GNUNET_TIME_UNIT_SECONDS,
1694                                     &process_stat_done,
1695                                     &process_stat_in,
1696                                     plugin);
1697   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1698   GNUNET_SERVER_add_handlers (server, handlers);
1699   expired_kill_task
1700     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1701                                           &delete_expired, NULL);
1702   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1703                                 &cleaning_task, NULL);
1704 }
1705
1706
1707 /**
1708  * The main function for the datastore service.
1709  *
1710  * @param argc number of arguments from the command line
1711  * @param argv command line arguments
1712  * @return 0 ok, 1 on error
1713  */
1714 int
1715 main (int argc, char *const *argv)
1716 {
1717   int ret;
1718
1719   ret = (GNUNET_OK ==
1720          GNUNET_SERVICE_run (argc,
1721                              argv,
1722                              "datastore",
1723                              GNUNET_SERVICE_OPTION_NONE,
1724                              &run, NULL)) ? 0 : 1;
1725   return ret;
1726 }
1727
1728
1729 /* end of gnunet-service-datastore.c */