fab403a86eabb0fc8a8866faa8c271c4f63f4760
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * Should the database be dropped on exit?
149  */
150 static int do_drop;
151
152 /**
153  * How much space are we using for the cache?  (space available for
154  * insertions that will be instantly reclaimed by discarding less
155  * important content --- or possibly whatever we just inserted into
156  * the "cache").
157  */
158 static unsigned long long cache_size;
159
160 /**
161  * How much space have we currently reserved?
162  */
163 static unsigned long long reserved;
164   
165 /**
166  * How much data are we currently storing
167  * in the database?
168  */
169 static unsigned long long payload;
170
171 /**
172  * Number of updates that were made to the
173  * payload value since we last synchronized
174  * it with the statistics service.
175  */
176 static unsigned int lastSync;
177
178 /**
179  * Did we get an answer from statistics?
180  */
181 static int stats_worked;
182
183 /**
184  * Identity of the task that is used to delete
185  * expired content.
186  */
187 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
188
189 /**
190  * Our configuration.
191  */
192 const struct GNUNET_CONFIGURATION_Handle *cfg;
193
194
195 /**
196  * Handle for reporting statistics.
197  */
198 static struct GNUNET_STATISTICS_Handle *stats;
199
200
201 /**
202  * Synchronize our utilization statistics with the 
203  * statistics service.
204  */
205 static void 
206 sync_stats ()
207 {
208   GNUNET_STATISTICS_set (stats,
209                          QUOTA_STAT_NAME,
210                          payload,
211                          GNUNET_YES);
212   lastSync = 0;
213 }
214
215
216
217 /**
218  * Context for transmitting replies to clients.
219  */
220 struct TransmitCallbackContext 
221 {
222   
223   /**
224    * We keep these in a doubly-linked list (for cleanup).
225    */
226   struct TransmitCallbackContext *next;
227   
228   /**
229    * We keep these in a doubly-linked list (for cleanup).
230    */
231   struct TransmitCallbackContext *prev;
232   
233   /**
234    * The message that we're asked to transmit.
235    */
236   struct GNUNET_MessageHeader *msg;
237   
238   /**
239    * Handle for the transmission request.
240    */
241   struct GNUNET_CONNECTION_TransmitHandle *th;
242
243   /**
244    * Client that we are transmitting to.
245    */
246   struct GNUNET_SERVER_Client *client;
247
248 };
249
250   
251 /**
252  * Head of the doubly-linked list (for cleanup).
253  */
254 static struct TransmitCallbackContext *tcc_head;
255
256 /**
257  * Tail of the doubly-linked list (for cleanup).
258  */
259 static struct TransmitCallbackContext *tcc_tail;
260
261 /**
262  * Have we already cleaned up the TCCs and are hence no longer
263  * willing (or able) to transmit anything to anyone?
264  */
265 static int cleaning_done;
266
267 /**
268  * Handle for pending get request.
269  */
270 static struct GNUNET_STATISTICS_GetHandle *stat_get;
271
272
273 /**
274  * Task that is used to remove expired entries from
275  * the datastore.  This task will schedule itself
276  * again automatically to always delete all expired
277  * content quickly.
278  *
279  * @param cls not used
280  * @param tc task context
281  */ 
282 static void
283 delete_expired (void *cls,
284                 const struct GNUNET_SCHEDULER_TaskContext *tc);
285
286
287 /**
288  * Iterate over the expired items stored in the datastore.
289  * Delete all expired items; once we have processed all
290  * expired items, re-schedule the "delete_expired" task.
291  *
292  * @param cls not used
293  * @param key key for the content
294  * @param size number of bytes in data
295  * @param data content stored
296  * @param type type of the content
297  * @param priority priority of the content
298  * @param anonymity anonymity-level for the content
299  * @param expiration expiration time for the content
300  * @param uid unique identifier for the datum;
301  *        maybe 0 if no unique identifier is available
302  *
303  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
304  *         (continue on call to "next", of course),
305  *         GNUNET_NO to delete the item and continue (if supported)
306  */
307 static int 
308 expired_processor (void *cls,
309                    const GNUNET_HashCode * key,
310                    uint32_t size,
311                    const void *data,
312                    enum GNUNET_BLOCK_Type type,
313                    uint32_t priority,
314                    uint32_t anonymity,
315                    struct GNUNET_TIME_Absolute
316                    expiration, 
317                    uint64_t uid)
318 {
319   struct GNUNET_TIME_Absolute now;
320
321   if (key == NULL) 
322     {
323       expired_kill_task 
324         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
325                                         &delete_expired,
326                                         NULL);
327       return GNUNET_SYSERR;
328     }
329   now = GNUNET_TIME_absolute_get ();
330   if (expiration.abs_value > now.abs_value)
331     {
332       /* finished processing */
333       expired_kill_task 
334         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
335                                         &delete_expired,
336                                         NULL);
337       return GNUNET_SYSERR;
338     }
339 #if DEBUG_DATASTORE
340   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
341               "Deleting content `%s' of type %u that expired %llu ms ago\n",
342               GNUNET_h2s (key),
343               type,
344               (unsigned long long) (now.abs_value - expiration.abs_value));
345 #endif
346   GNUNET_STATISTICS_update (stats,
347                             gettext_noop ("# bytes expired"),
348                             size,
349                             GNUNET_YES);
350   GNUNET_CONTAINER_bloomfilter_remove (filter,
351                                        key);
352   expired_kill_task 
353     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
354                                     &delete_expired,
355                                     NULL);
356   return GNUNET_NO;
357 }
358
359
360 /**
361  * Task that is used to remove expired entries from
362  * the datastore.  This task will schedule itself
363  * again automatically to always delete all expired
364  * content quickly.
365  *
366  * @param cls not used
367  * @param tc task context
368  */ 
369 static void
370 delete_expired (void *cls,
371                 const struct GNUNET_SCHEDULER_TaskContext *tc)
372 {
373   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
374   plugin->api->get_expiration (plugin->api->cls, 
375                                &expired_processor,
376                                NULL);
377 }
378
379
380 /**
381  * An iterator over a set of items stored in the datastore
382  * that deletes until we're happy with respect to our quota.
383  *
384  * @param cls closure
385  * @param key key for the content
386  * @param size number of bytes in data
387  * @param data content stored
388  * @param type type of the content
389  * @param priority priority of the content
390  * @param anonymity anonymity-level for the content
391  * @param expiration expiration time for the content
392  * @param uid unique identifier for the datum;
393  *        maybe 0 if no unique identifier is available
394  *
395  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
396  *         (continue on call to "next", of course),
397  *         GNUNET_NO to delete the item and continue (if supported)
398  */
399 static int 
400 quota_processor (void *cls,
401                  const GNUNET_HashCode * key,
402                  uint32_t size,
403                  const void *data,
404                  enum GNUNET_BLOCK_Type type,
405                  uint32_t priority,
406                  uint32_t anonymity,
407                  struct GNUNET_TIME_Absolute expiration, 
408                  uint64_t uid)
409 {
410   unsigned long long *need = cls;
411
412   if (NULL == key)
413     return GNUNET_SYSERR;    
414 #if DEBUG_DATASTORE
415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
416               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
417               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
418               GNUNET_h2s (key),
419               type,
420               *need);
421 #endif
422   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
423     *need = 0;
424   else
425     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
426   GNUNET_STATISTICS_update (stats,
427                             gettext_noop ("# bytes purged (low-priority)"),
428                             size,
429                             GNUNET_YES);
430   GNUNET_CONTAINER_bloomfilter_remove (filter,
431                                        key);
432   return GNUNET_NO;
433 }
434
435
436 /**
437  * Manage available disk space by running tasks
438  * that will discard content if necessary.  This
439  * function will be run whenever a request for
440  * "need" bytes of storage could only be satisfied
441  * by eating into the "cache" (and we want our cache
442  * space back).
443  *
444  * @param need number of bytes of content that were
445  *        placed into the "cache" (and hence the
446  *        number of bytes that should be removed).
447  */
448 static void
449 manage_space (unsigned long long need)
450 {
451   unsigned long long last;
452
453 #if DEBUG_DATASTORE
454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
455               "Asked to free up %llu bytes of cache space\n",
456               need);
457 #endif
458   last = 0;
459   while ( (need > 0) &&
460           (last != need) )
461     {
462       last = need;
463       plugin->api->get_expiration (plugin->api->cls,
464                                    &quota_processor,
465                                    &need);    
466     }
467 }
468
469
470 /**
471  * Function called to notify a client about the socket
472  * begin ready to queue more data.  "buf" will be
473  * NULL and "size" zero if the socket was closed for
474  * writing in the meantime.
475  *
476  * @param cls closure
477  * @param size number of bytes available in buf
478  * @param buf where the callee should write the message
479  * @return number of bytes written to buf
480  */
481 static size_t
482 transmit_callback (void *cls,
483                    size_t size, void *buf)
484 {
485   struct TransmitCallbackContext *tcc = cls;
486   size_t msize;
487   
488   tcc->th = NULL;
489   GNUNET_CONTAINER_DLL_remove (tcc_head,
490                                tcc_tail,
491                                tcc);
492   msize = ntohs(tcc->msg->size);
493   if (size == 0)
494     {
495       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
496                   _("Transmission to client failed!\n"));
497       GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
498       GNUNET_SERVER_client_drop (tcc->client);
499       GNUNET_free (tcc->msg);
500       GNUNET_free (tcc);
501       return 0;
502     }
503   GNUNET_assert (size >= msize);
504   memcpy (buf, tcc->msg, msize);
505   GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
506   GNUNET_SERVER_client_drop (tcc->client);
507   GNUNET_free (tcc->msg);
508   GNUNET_free (tcc);
509   return msize;
510 }
511
512
513 /**
514  * Transmit the given message to the client.
515  *
516  * @param client target of the message
517  * @param msg message to transmit, will be freed!
518  */
519 static void
520 transmit (struct GNUNET_SERVER_Client *client,
521           struct GNUNET_MessageHeader *msg)
522 {
523   struct TransmitCallbackContext *tcc;
524
525   if (GNUNET_YES == cleaning_done)
526     {
527 #if DEBUG_DATASTORE
528       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                   "Shutdown in progress, aborting transmission.\n");
530 #endif
531       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
532       GNUNET_free (msg);
533       return;
534     }
535   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
536   tcc->msg = msg;
537   tcc->client = client;
538   if (NULL ==
539       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
540                                                       ntohs(msg->size),
541                                                       GNUNET_TIME_UNIT_FOREVER_REL,
542                                                       &transmit_callback,
543                                                       tcc)))
544     {
545       GNUNET_break (0);
546       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
547       GNUNET_free (msg);
548       GNUNET_free (tcc);
549       return;
550     }
551   GNUNET_SERVER_client_keep (client);
552   GNUNET_CONTAINER_DLL_insert (tcc_head,
553                                tcc_tail,
554                                tcc);
555 }
556
557
558 /**
559  * Transmit a status code to the client.
560  *
561  * @param client receiver of the response
562  * @param code status code
563  * @param msg optional error message (can be NULL)
564  */
565 static void
566 transmit_status (struct GNUNET_SERVER_Client *client,
567                  int code,
568                  const char *msg)
569 {
570   struct StatusMessage *sm;
571   size_t slen;
572
573 #if DEBUG_DATASTORE
574   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
575               "Transmitting `%s' message with value %d and message `%s'\n",
576               "STATUS",
577               code,
578               msg != NULL ? msg : "(none)");
579 #endif
580   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
581   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
582   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
583   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
584   sm->status = htonl(code);
585   if (slen > 0)
586     memcpy (&sm[1], msg, slen);  
587   transmit (client, &sm->header);
588 }
589
590
591
592 /**
593  * Function that will transmit the given datastore entry
594  * to the client.
595  *
596  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
597  * @param key key for the content
598  * @param size number of bytes in data
599  * @param data content stored
600  * @param type type of the content
601  * @param priority priority of the content
602  * @param anonymity anonymity-level for the content
603  * @param expiration expiration time for the content
604  * @param uid unique identifier for the datum;
605  *        maybe 0 if no unique identifier is available
606  *
607  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
608  *         GNUNET_NO to delete the item and continue (if supported)
609  */
610 static int
611 transmit_item (void *cls,
612                const GNUNET_HashCode * key,
613                uint32_t size,
614                const void *data,
615                enum GNUNET_BLOCK_Type type,
616                uint32_t priority,
617                uint32_t anonymity,
618                struct GNUNET_TIME_Absolute
619                expiration, uint64_t uid)
620 {
621   struct GNUNET_SERVER_Client *client = cls;
622   struct GNUNET_MessageHeader *end;
623   struct DataMessage *dm;
624
625   if (key == NULL)
626     {
627       /* transmit 'DATA_END' */
628 #if DEBUG_DATASTORE
629       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
630                   "Transmitting `%s' message\n",
631                   "DATA_END");
632 #endif
633       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
634       end->size = htons(sizeof(struct GNUNET_MessageHeader));
635       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
636       transmit (client, end);
637       GNUNET_SERVER_client_drop (client);
638       return GNUNET_OK;
639     }
640   GNUNET_assert (sizeof (struct DataMessage) + size < GNUNET_SERVER_MAX_MESSAGE_SIZE);
641   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
642   dm->header.size = htons(sizeof(struct DataMessage) + size);
643   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
644   dm->rid = htonl(0);
645   dm->size = htonl(size);
646   dm->type = htonl(type);
647   dm->priority = htonl(priority);
648   dm->anonymity = htonl(anonymity);
649   dm->replication = htonl (0);
650   dm->reserved = htonl (0);
651   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
652   dm->uid = GNUNET_htonll(uid);
653   dm->key = *key;
654   memcpy (&dm[1], data, size);
655 #if DEBUG_DATASTORE
656   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
657               "Transmitting `%s' message for `%s' of type %u with expiration %llu (now: %llu)\n",
658               "DATA",
659               GNUNET_h2s (key),
660               type,
661               (unsigned long long) expiration.abs_value,
662               (unsigned long long) GNUNET_TIME_absolute_get ().abs_value);
663 #endif
664   GNUNET_STATISTICS_update (stats,
665                             gettext_noop ("# results found"),
666                             1,
667                             GNUNET_NO);
668   transmit (client, &dm->header);
669   return GNUNET_OK;
670 }
671
672
673 /**
674  * Handle RESERVE-message.
675  *
676  * @param cls closure
677  * @param client identification of the client
678  * @param message the actual message
679  */
680 static void
681 handle_reserve (void *cls,
682                 struct GNUNET_SERVER_Client *client,
683                 const struct GNUNET_MessageHeader *message)
684 {
685   /**
686    * Static counter to produce reservation identifiers.
687    */
688   static int reservation_gen;
689
690   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
691   struct ReservationList *e;
692   unsigned long long used;
693   unsigned long long req;
694   uint64_t amount;
695   uint32_t entries;
696
697 #if DEBUG_DATASTORE
698   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
699               "Processing `%s' request\n",
700               "RESERVE");
701 #endif
702   amount = GNUNET_ntohll(msg->amount);
703   entries = ntohl(msg->entries);
704   used = payload + reserved;
705   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
706   if (used + req > quota)
707     {
708       if (quota < used)
709         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
710       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
711                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
712                   quota - used,
713                   "RESERVE",
714                   req);
715       if (cache_size < req)
716         {
717           /* TODO: document this in the FAQ; essentially, if this
718              message happens, the insertion request could be blocked
719              by less-important content from migration because it is
720              larger than 1/8th of the overall available space, and
721              we only reserve 1/8th for "fresh" insertions */
722           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
723                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
724                       req,
725                       cache_size);
726           transmit_status (client, 0, 
727                            gettext_noop ("Insufficient space to satisfy request and "
728                                          "requested amount is larger than cache size"));
729         }
730       else
731         {
732           transmit_status (client, 0, 
733                            gettext_noop ("Insufficient space to satisfy request"));
734         }
735       return;      
736     }
737   reserved += req;
738   GNUNET_STATISTICS_set (stats,
739                          gettext_noop ("# reserved"),
740                          reserved,
741                          GNUNET_NO);
742   e = GNUNET_malloc (sizeof(struct ReservationList));
743   e->next = reservations;
744   reservations = e;
745   e->client = client;
746   e->amount = amount;
747   e->entries = entries;
748   e->rid = ++reservation_gen;
749   if (reservation_gen < 0)
750     reservation_gen = 0; /* wrap around */
751   transmit_status (client, e->rid, NULL);
752 }
753
754
755 /**
756  * Handle RELEASE_RESERVE-message.
757  *
758  * @param cls closure
759  * @param client identification of the client
760  * @param message the actual message
761  */
762 static void
763 handle_release_reserve (void *cls,
764                         struct GNUNET_SERVER_Client *client,
765                         const struct GNUNET_MessageHeader *message)
766 {
767   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
768   struct ReservationList *pos;
769   struct ReservationList *prev;
770   struct ReservationList *next;
771   int rid = ntohl(msg->rid);
772   unsigned long long rem;
773
774 #if DEBUG_DATASTORE
775   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
776               "Processing `%s' request\n",
777               "RELEASE_RESERVE");
778 #endif
779   next = reservations;
780   prev = NULL;
781   while (NULL != (pos = next))
782     {
783       next = pos->next;
784       if (rid == pos->rid)
785         {
786           if (prev == NULL)
787             reservations = next;
788           else
789             prev->next = next;
790           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
791           GNUNET_assert (reserved >= rem);
792           reserved -= rem;
793           GNUNET_STATISTICS_set (stats,
794                          gettext_noop ("# reserved"),
795                                  reserved,
796                                  GNUNET_NO);
797 #if DEBUG_DATASTORE
798           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799                       "Returning %llu remaining reserved bytes to storage pool\n",
800                       rem);
801 #endif    
802           GNUNET_free (pos);
803           transmit_status (client, GNUNET_OK, NULL);
804           return;
805         }       
806       prev = pos;
807     }
808   GNUNET_break (0);
809   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
810 }
811
812
813 /**
814  * Check that the given message is a valid data message.
815  *
816  * @return NULL if the message is not well-formed, otherwise the message
817  */
818 static const struct DataMessage *
819 check_data (const struct GNUNET_MessageHeader *message)
820 {
821   uint16_t size;
822   uint32_t dsize;
823   const struct DataMessage *dm;
824
825   size = ntohs(message->size);
826   if (size < sizeof(struct DataMessage))
827     { 
828       GNUNET_break (0);
829       return NULL;
830     }
831   dm = (const struct DataMessage *) message;
832   dsize = ntohl(dm->size);
833   if (size != dsize + sizeof(struct DataMessage))
834     {
835       GNUNET_break (0);
836       return NULL;
837     }
838   return dm;
839 }
840
841
842 /**
843  * Context for a put request used to see if the content is
844  * already present.
845  */
846 struct PutContext
847 {
848   /**
849    * Client to notify on completion.
850    */
851   struct GNUNET_SERVER_Client *client;
852   
853   /* followed by the 'struct DataMessage' */
854 };
855
856
857 /**
858  * Actually put the data message.
859  */
860 static void
861 execute_put (struct GNUNET_SERVER_Client *client,
862              const struct DataMessage *dm)
863 {
864   uint32_t size;
865   char *msg;
866   int ret;
867
868   size = ntohl(dm->size);
869   msg = NULL;
870   ret = plugin->api->put (plugin->api->cls,
871                           &dm->key,
872                           size,
873                           &dm[1],
874                           ntohl(dm->type),
875                           ntohl(dm->priority),
876                           ntohl(dm->anonymity),
877                           ntohl(dm->replication),
878                           GNUNET_TIME_absolute_ntoh(dm->expiration),
879                           &msg);
880   if (GNUNET_OK == ret)
881     {
882       GNUNET_STATISTICS_update (stats,
883                                 gettext_noop ("# bytes stored"),
884                                 size,
885                                 GNUNET_YES);
886       GNUNET_CONTAINER_bloomfilter_add (filter,
887                                         &dm->key);
888 #if DEBUG_DATASTORE
889       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
890                   "Successfully stored %u bytes of type %u under key `%s'\n",
891                   size,
892                   ntohl(dm->type),
893                   GNUNET_h2s (&dm->key));
894 #endif
895     }
896   transmit_status (client, 
897                    ret,
898                    msg);
899   GNUNET_free_non_null (msg);
900   if (quota - reserved - cache_size < payload)
901     {
902       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
903                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
904                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
905                   (unsigned long long) (quota - reserved - cache_size),
906                   (unsigned long long) payload);
907       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
908     }
909 }
910
911
912 /**
913  * Function that will check if the given datastore entry
914  * matches the put and if none match executes the put.
915  *
916  * @param cls closure, pointer to the client (of type 'struct PutContext').
917  * @param key key for the content
918  * @param size number of bytes in data
919  * @param data content stored
920  * @param type type of the content
921  * @param priority priority of the content
922  * @param anonymity anonymity-level for the content
923  * @param expiration expiration time for the content
924  * @param uid unique identifier for the datum;
925  *        maybe 0 if no unique identifier is available
926  *
927  * @return GNUNET_OK usually
928  *         GNUNET_NO to delete the item 
929  */
930 static int
931 check_present (void *cls,
932                const GNUNET_HashCode * key,
933                uint32_t size,
934                const void *data,
935                enum GNUNET_BLOCK_Type type,
936                uint32_t priority,
937                uint32_t anonymity,
938                struct GNUNET_TIME_Absolute
939                expiration, uint64_t uid)
940 {
941   struct PutContext *pc = cls;
942   const struct DataMessage *dm;
943
944   dm = (const struct DataMessage*) &pc[1];
945   if (key == NULL)
946     {
947       execute_put (pc->client, dm);
948       GNUNET_SERVER_client_drop (pc->client);
949       GNUNET_free (pc);
950       return GNUNET_OK;
951     }
952   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
953        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
954        ( (size == ntohl(dm->size)) &&
955          (0 == memcmp (&dm[1],
956                        data,
957                        size)) ) )
958     {
959 #if DEBUG_MYSQL
960       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
961                   "Result already present in datastore\n");
962 #endif
963       /* FIXME: change API to allow increasing 'replication' counter */
964       if ( (ntohl (dm->priority) > 0) ||
965            (GNUNET_TIME_absolute_ntoh(dm->expiration).abs_value >
966             expiration.abs_value) )
967         plugin->api->update (plugin->api->cls,
968                              uid,
969                              (int32_t) ntohl(dm->priority),
970                              GNUNET_TIME_absolute_ntoh(dm->expiration),
971                              NULL);
972       transmit_status (pc->client, GNUNET_NO, NULL);
973       GNUNET_SERVER_client_drop (pc->client);
974       GNUNET_free (pc);
975     }
976   else
977     {
978       execute_put (pc->client, dm);
979       GNUNET_SERVER_client_drop (pc->client);
980       GNUNET_free (pc);
981     }
982   return GNUNET_OK;
983 }
984
985
986 /**
987  * Handle PUT-message.
988  *
989  * @param cls closure
990  * @param client identification of the client
991  * @param message the actual message
992  */
993 static void
994 handle_put (void *cls,
995             struct GNUNET_SERVER_Client *client,
996             const struct GNUNET_MessageHeader *message)
997 {
998   const struct DataMessage *dm = check_data (message);
999   int rid;
1000   struct ReservationList *pos;
1001   struct PutContext *pc;
1002   GNUNET_HashCode vhash;
1003   uint32_t size;
1004
1005   if ( (dm == NULL) ||
1006        (ntohl(dm->type) == 0) ) 
1007     {
1008       GNUNET_break (0);
1009       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1010       return;
1011     }
1012 #if DEBUG_DATASTORE
1013   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014               "Processing `%s' request for `%s' of type %u\n",
1015               "PUT",
1016               GNUNET_h2s (&dm->key),
1017               ntohl (dm->type));
1018 #endif
1019   rid = ntohl(dm->rid);
1020   size = ntohl(dm->size);
1021   if (rid > 0)
1022     {
1023       pos = reservations;
1024       while ( (NULL != pos) &&
1025               (rid != pos->rid) )
1026         pos = pos->next;
1027       GNUNET_break (pos != NULL);
1028       if (NULL != pos)
1029         {
1030           GNUNET_break (pos->entries > 0);
1031           GNUNET_break (pos->amount >= size);
1032           pos->entries--;
1033           pos->amount -= size;
1034           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1035           GNUNET_STATISTICS_set (stats,
1036                                  gettext_noop ("# reserved"),
1037                                  reserved,
1038                                  GNUNET_NO);
1039         }
1040     }
1041   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1042                                                        &dm->key))
1043     {
1044       GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
1045       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1046       pc->client = client;
1047       GNUNET_SERVER_client_keep (client);
1048       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1049       plugin->api->get_key (plugin->api->cls,
1050                             0,
1051                             &dm->key,
1052                             &vhash,
1053                             ntohl (dm->type),
1054                             &check_present,
1055                             pc);      
1056       return;
1057     }
1058   execute_put (client, dm);
1059 }
1060
1061
1062 /**
1063  * Handle GET-message.
1064  *
1065  * @param cls closure
1066  * @param client identification of the client
1067  * @param message the actual message
1068  */
1069 static void
1070 handle_get (void *cls,
1071             struct GNUNET_SERVER_Client *client,
1072             const struct GNUNET_MessageHeader *message)
1073 {
1074   const struct GetMessage *msg;
1075   uint16_t size;
1076
1077   size = ntohs(message->size);
1078   if ( (size != sizeof(struct GetMessage)) &&
1079        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1080     {
1081       GNUNET_break (0);
1082       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1083       return;
1084     }
1085   msg = (const struct GetMessage*) message;
1086 #if DEBUG_DATASTORE
1087   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1088               "Processing `%s' request for `%s' of type %u\n",
1089               "GET",
1090               GNUNET_h2s (&msg->key),
1091               ntohl (msg->type));
1092 #endif
1093   GNUNET_STATISTICS_update (stats,
1094                             gettext_noop ("# GET requests received"),
1095                             1,
1096                             GNUNET_NO);
1097   GNUNET_SERVER_client_keep (client);
1098   if ( (size == sizeof(struct GetMessage)) &&
1099        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1100                                                          &msg->key)) )
1101     {
1102       /* don't bother database... */
1103 #if DEBUG_DATASTORE
1104       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1105                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1106                   "GET",
1107                   GNUNET_h2s (&msg->key));
1108 #endif  
1109       GNUNET_STATISTICS_update (stats,
1110                                 gettext_noop ("# requests filtered by bloomfilter"),
1111                                 1,
1112                                 GNUNET_NO);
1113       transmit_item (client,
1114                      NULL, 0, NULL, 0, 0, 0, 
1115                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1116       return;
1117     }
1118   plugin->api->get_key (plugin->api->cls,
1119                         GNUNET_ntohll (msg->offset),
1120                         ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1121                         NULL,
1122                         ntohl(msg->type),
1123                         &transmit_item,
1124                         client);    
1125 }
1126
1127
1128 /**
1129  * Handle UPDATE-message.
1130  *
1131  * @param cls closure
1132  * @param client identification of the client
1133  * @param message the actual message
1134  */
1135 static void
1136 handle_update (void *cls,
1137                struct GNUNET_SERVER_Client *client,
1138                const struct GNUNET_MessageHeader *message)
1139 {
1140   const struct UpdateMessage *msg;
1141   int ret;
1142   char *emsg;
1143
1144   GNUNET_STATISTICS_update (stats,
1145                             gettext_noop ("# UPDATE requests received"),
1146                             1,
1147                             GNUNET_NO);
1148   msg = (const struct UpdateMessage*) message;
1149   emsg = NULL;
1150 #if DEBUG_DATASTORE
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Processing `%s' request for %llu\n",
1153               "UPDATE",
1154               (unsigned long long) GNUNET_ntohll (msg->uid));
1155 #endif
1156   ret = plugin->api->update (plugin->api->cls,
1157                              GNUNET_ntohll(msg->uid),
1158                              (int32_t) ntohl(msg->priority),
1159                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1160                              &emsg);
1161   transmit_status (client, ret, emsg);
1162   GNUNET_free_non_null (emsg);
1163 }
1164
1165
1166 /**
1167  * Handle GET_REPLICATION-message.
1168  *
1169  * @param cls closure
1170  * @param client identification of the client
1171  * @param message the actual message
1172  */
1173 static void
1174 handle_get_replication (void *cls,
1175                         struct GNUNET_SERVER_Client *client,
1176                         const struct GNUNET_MessageHeader *message)
1177 {
1178 #if DEBUG_DATASTORE
1179   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1180               "Processing `%s' request\n",
1181               "GET_REPLICATION");
1182 #endif
1183   GNUNET_STATISTICS_update (stats,
1184                             gettext_noop ("# GET REPLICATION requests received"),
1185                             1,
1186                             GNUNET_NO);
1187   GNUNET_SERVER_client_keep (client);
1188   plugin->api->get_replication (plugin->api->cls,
1189                                 &transmit_item,
1190                                 client);  
1191 }
1192
1193
1194 /**
1195  * Handle GET_ZERO_ANONYMITY-message.
1196  *
1197  * @param cls closure
1198  * @param client identification of the client
1199  * @param message the actual message
1200  */
1201 static void
1202 handle_get_zero_anonymity (void *cls,
1203                            struct GNUNET_SERVER_Client *client,
1204                            const struct GNUNET_MessageHeader *message)
1205 {
1206   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1207   enum GNUNET_BLOCK_Type type;
1208
1209   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1210   if (type == GNUNET_BLOCK_TYPE_ANY)
1211     {
1212       GNUNET_break (0);
1213       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1214       return;
1215     }
1216 #if DEBUG_DATASTORE
1217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218               "Processing `%s' request\n",
1219               "GET_ZERO_ANONYMITY");
1220 #endif
1221   GNUNET_STATISTICS_update (stats,
1222                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1223                             1,
1224                             GNUNET_NO);
1225   GNUNET_SERVER_client_keep (client);
1226   plugin->api->get_zero_anonymity (plugin->api->cls,
1227                                    GNUNET_ntohll (msg->offset),
1228                                    type,
1229                                    &transmit_item,
1230                                    client);  
1231 }
1232
1233
1234 /**
1235  * Callback function that will cause the item that is passed
1236  * in to be deleted (by returning GNUNET_NO).
1237  */
1238 static int
1239 remove_callback (void *cls,
1240                  const GNUNET_HashCode * key,
1241                  uint32_t size,
1242                  const void *data,
1243                  enum GNUNET_BLOCK_Type type,
1244                  uint32_t priority,
1245                  uint32_t anonymity,
1246                  struct GNUNET_TIME_Absolute
1247                  expiration, uint64_t uid)
1248 {
1249   struct GNUNET_SERVER_Client *client = cls;
1250
1251   if (key == NULL)
1252     {
1253 #if DEBUG_DATASTORE
1254       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1255                   "No further matches for `%s' request.\n",
1256                   "REMOVE");
1257 #endif  
1258       transmit_status (client, GNUNET_NO, _("Content not found"));              
1259       GNUNET_SERVER_client_drop (client);
1260       return GNUNET_OK; /* last item */
1261     }
1262 #if DEBUG_DATASTORE
1263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1264               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1265               (unsigned long long) uid,
1266               "REMOVE",
1267               GNUNET_h2s (key),
1268               type);
1269 #endif  
1270   GNUNET_STATISTICS_update (stats,
1271                             gettext_noop ("# bytes removed (explicit request)"),
1272                             size,
1273                             GNUNET_YES);
1274   GNUNET_CONTAINER_bloomfilter_remove (filter,
1275                                        key);
1276   transmit_status (client, GNUNET_OK, NULL);       
1277   GNUNET_SERVER_client_drop (client);
1278   return GNUNET_NO;
1279 }
1280
1281
1282 /**
1283  * Handle REMOVE-message.
1284  *
1285  * @param cls closure
1286  * @param client identification of the client
1287  * @param message the actual message
1288  */
1289 static void
1290 handle_remove (void *cls,
1291                struct GNUNET_SERVER_Client *client,
1292                const struct GNUNET_MessageHeader *message)
1293 {
1294   const struct DataMessage *dm = check_data (message);
1295   GNUNET_HashCode vhash;
1296
1297   if (dm == NULL)
1298     {
1299       GNUNET_break (0);
1300       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1301       return;
1302     }
1303 #if DEBUG_DATASTORE
1304   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1305               "Processing `%s' request for `%s' of type %u\n",
1306               "REMOVE",
1307               GNUNET_h2s (&dm->key),
1308               ntohl (dm->type));
1309 #endif
1310   GNUNET_STATISTICS_update (stats,
1311                             gettext_noop ("# REMOVE requests received"),
1312                             1,
1313                             GNUNET_NO);
1314   GNUNET_SERVER_client_keep (client);
1315   GNUNET_CRYPTO_hash (&dm[1],
1316                       ntohl(dm->size),
1317                       &vhash);
1318   plugin->api->get_key (plugin->api->cls,
1319                         0,
1320                         &dm->key,
1321                         &vhash,
1322                         (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1323                         &remove_callback,
1324                         client);
1325 }
1326
1327
1328 /**
1329  * Handle DROP-message.
1330  *
1331  * @param cls closure
1332  * @param client identification of the client
1333  * @param message the actual message
1334  */
1335 static void
1336 handle_drop (void *cls,
1337              struct GNUNET_SERVER_Client *client,
1338              const struct GNUNET_MessageHeader *message)
1339 {
1340 #if DEBUG_DATASTORE
1341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1342               "Processing `%s' request\n",
1343               "DROP");
1344 #endif
1345   do_drop = GNUNET_YES;
1346   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1347 }
1348
1349
1350 /**
1351  * Function called by plugins to notify us about a
1352  * change in their disk utilization.
1353  *
1354  * @param cls closure (NULL)
1355  * @param delta change in disk utilization, 
1356  *        0 for "reset to empty"
1357  */
1358 static void
1359 disk_utilization_change_cb (void *cls,
1360                             int delta)
1361 {
1362   if ( (delta < 0) &&
1363        (payload < -delta) )
1364     {
1365       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1366                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1367                   (long long) payload,
1368                   (long long) -delta);
1369       payload = plugin->api->estimate_size (plugin->api->cls);
1370       sync_stats ();
1371       return;
1372     }
1373   payload += delta;
1374   lastSync++;
1375   if (lastSync >= MAX_STAT_SYNC_LAG)
1376     sync_stats ();
1377 }
1378
1379
1380 /**
1381  * Callback function to process statistic values.
1382  *
1383  * @param cls closure (struct Plugin*)
1384  * @param subsystem name of subsystem that created the statistic
1385  * @param name the name of the datum
1386  * @param value the current value
1387  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1388  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1389  */
1390 static int
1391 process_stat_in (void *cls,
1392                  const char *subsystem,
1393                  const char *name,
1394                  uint64_t value,
1395                  int is_persistent)
1396 {
1397   GNUNET_assert (stats_worked == GNUNET_NO);
1398   stats_worked = GNUNET_YES;
1399   payload += value;
1400 #if DEBUG_SQLITE
1401   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1402               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1403               abs_value,
1404               payload);
1405 #endif
1406   return GNUNET_OK;
1407 }
1408
1409
1410 static void
1411 process_stat_done (void *cls,
1412                    int success)
1413 {
1414   struct DatastorePlugin *plugin = cls;
1415
1416   stat_get = NULL;
1417   if (stats_worked == GNUNET_NO) 
1418     payload = plugin->api->estimate_size (plugin->api->cls);
1419 }
1420
1421
1422 /**
1423  * Load the datastore plugin.
1424  */
1425 static struct DatastorePlugin *
1426 load_plugin () 
1427 {
1428   struct DatastorePlugin *ret;
1429   char *libname;
1430   char *name;
1431
1432   if (GNUNET_OK !=
1433       GNUNET_CONFIGURATION_get_value_string (cfg,
1434                                              "DATASTORE", "DATABASE", &name))
1435     {
1436       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1437                   _("No `%s' specified for `%s' in configuration!\n"),
1438                   "DATABASE",
1439                   "DATASTORE");
1440       return NULL;
1441     }
1442   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1443   ret->env.cfg = cfg;
1444   ret->env.duc = &disk_utilization_change_cb;
1445   ret->env.cls = NULL;
1446   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1447               _("Loading `%s' datastore plugin\n"), name);
1448   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1449   ret->short_name = name;
1450   ret->lib_name = libname;
1451   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1452   if (ret->api == NULL)
1453     {
1454       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1455                   _("Failed to load datastore plugin for `%s'\n"), name);
1456       GNUNET_free (ret->short_name);
1457       GNUNET_free (libname);
1458       GNUNET_free (ret);
1459       return NULL;
1460     }
1461   return ret;
1462 }
1463
1464
1465 /**
1466  * Function called when the service shuts
1467  * down.  Unloads our datastore plugin.
1468  *
1469  * @param plug plugin to unload
1470  */
1471 static void
1472 unload_plugin (struct DatastorePlugin *plug)
1473 {
1474 #if DEBUG_DATASTORE
1475   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476               "Datastore service is unloading plugin...\n");
1477 #endif
1478   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1479   GNUNET_free (plug->lib_name);
1480   GNUNET_free (plug->short_name);
1481   GNUNET_free (plug);
1482 }
1483
1484
1485 /**
1486  * Final task run after shutdown.  Unloads plugins and disconnects us from
1487  * statistics.
1488  */
1489 static void
1490 unload_task (void *cls,
1491              const struct GNUNET_SCHEDULER_TaskContext *tc)
1492 {
1493   if (GNUNET_YES == do_drop)
1494     plugin->api->drop (plugin->api->cls);
1495   unload_plugin (plugin);
1496   plugin = NULL;
1497   if (filter != NULL)
1498     {
1499       GNUNET_CONTAINER_bloomfilter_free (filter);
1500       filter = NULL;
1501     }
1502   if (lastSync > 0)
1503     sync_stats ();
1504   if (stat_get != NULL)
1505     {
1506       GNUNET_STATISTICS_get_cancel (stat_get);
1507       stat_get = NULL;
1508     }
1509   if (stats != NULL)
1510     {
1511       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1512       stats = NULL;
1513     }
1514 }
1515
1516
1517 /**
1518  * Last task run during shutdown.  Disconnects us from
1519  * the transport and core.
1520  */
1521 static void
1522 cleaning_task (void *cls,
1523                const struct GNUNET_SCHEDULER_TaskContext *tc)
1524 {
1525   struct TransmitCallbackContext *tcc;
1526
1527   cleaning_done = GNUNET_YES;
1528   while (NULL != (tcc = tcc_head))
1529     {
1530       GNUNET_CONTAINER_DLL_remove (tcc_head,
1531                                    tcc_tail,
1532                                    tcc);
1533       if (tcc->th != NULL)
1534         {
1535           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1536           GNUNET_SERVER_client_drop (tcc->client);
1537         }
1538       GNUNET_free (tcc->msg);
1539       GNUNET_free (tcc);
1540     }
1541   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1542     {
1543       GNUNET_SCHEDULER_cancel (expired_kill_task);
1544       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1545     }
1546   GNUNET_SCHEDULER_add_continuation (&unload_task,
1547                                      NULL,
1548                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1549 }
1550
1551
1552 /**
1553  * Function that removes all active reservations made
1554  * by the given client and releases the space for other
1555  * requests.
1556  *
1557  * @param cls closure
1558  * @param client identification of the client
1559  */
1560 static void
1561 cleanup_reservations (void *cls,
1562                       struct GNUNET_SERVER_Client *client)
1563 {
1564   struct ReservationList *pos;
1565   struct ReservationList *prev;
1566   struct ReservationList *next;
1567
1568   if (client == NULL)
1569     return;
1570   prev = NULL;
1571   pos = reservations;
1572   while (NULL != pos)
1573     {
1574       next = pos->next;
1575       if (pos->client == client)
1576         {
1577           if (prev == NULL)
1578             reservations = next;
1579           else
1580             prev->next = next;
1581           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1582           GNUNET_free (pos);
1583         }
1584       else
1585         {
1586           prev = pos;
1587         }
1588       pos = next;
1589     }
1590   GNUNET_STATISTICS_set (stats,
1591                          gettext_noop ("# reserved"),
1592                          reserved,
1593                          GNUNET_NO);
1594 }
1595
1596
1597 /**
1598  * Process datastore requests.
1599  *
1600  * @param cls closure
1601  * @param server the initialized server
1602  * @param c configuration to use
1603  */
1604 static void
1605 run (void *cls,
1606      struct GNUNET_SERVER_Handle *server,
1607      const struct GNUNET_CONFIGURATION_Handle *c)
1608 {
1609   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1610     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1611      sizeof(struct ReserveMessage) }, 
1612     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1613      sizeof(struct ReleaseReserveMessage) }, 
1614     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1615     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1616      sizeof (struct UpdateMessage) }, 
1617     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1618     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1619      sizeof(struct GNUNET_MessageHeader) }, 
1620     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1621      sizeof(struct GetZeroAnonymityMessage) }, 
1622     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1623     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1624      sizeof(struct GNUNET_MessageHeader) }, 
1625     {NULL, NULL, 0, 0}
1626   };
1627   char *fn;
1628   unsigned int bf_size;
1629
1630   cfg = c;
1631   if (GNUNET_OK !=
1632       GNUNET_CONFIGURATION_get_value_number (cfg,
1633                                              "DATASTORE", "QUOTA", &quota))
1634     {
1635       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1636                   _("No `%s' specified for `%s' in configuration!\n"),
1637                   "QUOTA",
1638                   "DATASTORE");
1639       return;
1640     }
1641   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1642   GNUNET_STATISTICS_set (stats,
1643                          gettext_noop ("# quota"),
1644                          quota,
1645                          GNUNET_NO);
1646   cache_size = quota / 8; /* Or should we make this an option? */
1647   GNUNET_STATISTICS_set (stats,
1648                          gettext_noop ("# cache size"),
1649                          cache_size,
1650                          GNUNET_NO);
1651   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1652   fn = NULL;
1653   if ( (GNUNET_OK !=
1654         GNUNET_CONFIGURATION_get_value_filename (cfg,
1655                                                  "DATASTORE",
1656                                                  "BLOOMFILTER",
1657                                                  &fn)) ||
1658        (GNUNET_OK !=
1659         GNUNET_DISK_directory_create_for_file (fn)) )
1660     {
1661       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1662                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1663                   fn != NULL ? fn : "");
1664       GNUNET_free_non_null (fn);
1665       fn = NULL;
1666     }
1667   if (fn != NULL)
1668     filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1669   else
1670     filter = GNUNET_CONTAINER_bloomfilter_init (NULL, bf_size, 5);  /* approx. 3% false positives at max use */  
1671   GNUNET_free_non_null (fn);
1672   if (filter == NULL)
1673     {
1674       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1675                   _("Failed to initialize bloomfilter.\n"));
1676       if (stats != NULL)
1677         {
1678           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1679           stats = NULL;
1680         }
1681       return;
1682     }
1683   plugin = load_plugin ();
1684   if (NULL == plugin)
1685     {
1686       GNUNET_CONTAINER_bloomfilter_free (filter);
1687       filter = NULL;
1688       if (stats != NULL)
1689         {
1690           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1691           stats = NULL;
1692         }
1693       return;
1694     }
1695   stat_get = GNUNET_STATISTICS_get (stats,
1696                                     "datastore",
1697                                     QUOTA_STAT_NAME,
1698                                     GNUNET_TIME_UNIT_SECONDS,
1699                                     &process_stat_done,
1700                                     &process_stat_in,
1701                                     plugin);
1702   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1703   GNUNET_SERVER_add_handlers (server, handlers);
1704   expired_kill_task
1705     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1706                                           &delete_expired, NULL);
1707   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1708                                 &cleaning_task, NULL);
1709 }
1710
1711
1712 /**
1713  * The main function for the datastore service.
1714  *
1715  * @param argc number of arguments from the command line
1716  * @param argv command line arguments
1717  * @return 0 ok, 1 on error
1718  */
1719 int
1720 main (int argc, char *const *argv)
1721 {
1722   int ret;
1723
1724   ret = (GNUNET_OK ==
1725          GNUNET_SERVICE_run (argc,
1726                              argv,
1727                              "datastore",
1728                              GNUNET_SERVICE_OPTION_NONE,
1729                              &run, NULL)) ? 0 : 1;
1730   return ret;
1731 }
1732
1733
1734 /* end of gnunet-service-datastore.c */