debugging
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_protocols.h"
30 #include "gnunet_statistics_service.h"
31 #include "gnunet_datastore_plugin.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45 /**
46  * How fast are we allowed to query the database for deleting
47  * expired content? (1 item per second).
48  */
49 #define MIN_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
50
51
52 #define QUOTA_STAT_NAME gettext_noop ("# bytes used in file-sharing datastore")
53
54 /**
55  * After how many payload-changing operations
56  * do we sync our statistics?
57  */
58 #define MAX_STAT_SYNC_LAG 50
59
60
61 /**
62  * Our datastore plugin.
63  */
64 struct DatastorePlugin
65 {
66
67   /**
68    * API of the transport as returned by the plugin's
69    * initialization function.
70    */
71   struct GNUNET_DATASTORE_PluginFunctions *api;
72
73   /**
74    * Short name for the plugin (i.e. "sqlite").
75    */
76   char *short_name;
77
78   /**
79    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
80    */
81   char *lib_name;
82
83   /**
84    * Environment this transport service is using
85    * for this plugin.
86    */
87   struct GNUNET_DATASTORE_PluginEnvironment env;
88
89 };
90
91
92 /**
93  * Linked list of active reservations.
94  */
95 struct ReservationList 
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct ReservationList *next;
102
103   /**
104    * Client that made the reservation.
105    */
106   struct GNUNET_SERVER_Client *client;
107
108   /**
109    * Number of bytes (still) reserved.
110    */
111   uint64_t amount;
112
113   /**
114    * Number of items (still) reserved.
115    */
116   uint64_t entries;
117
118   /**
119    * Reservation identifier.
120    */
121   int32_t rid;
122
123 };
124
125
126
127 /**
128  * Our datastore plugin (NULL if not available).
129  */
130 static struct DatastorePlugin *plugin;
131
132 /**
133  * Linked list of space reservations made by clients.
134  */
135 static struct ReservationList *reservations;
136
137 /**
138  * Bloomfilter to quickly tell if we don't have the content.
139  */
140 static struct GNUNET_CONTAINER_BloomFilter *filter;
141
142 /**
143  * How much space are we allowed to use?
144  */
145 static unsigned long long quota;
146
147 /**
148  * How much space are we using for the cache?  (space available for
149  * insertions that will be instantly reclaimed by discarding less
150  * important content --- or possibly whatever we just inserted into
151  * the "cache").
152  */
153 static unsigned long long cache_size;
154
155 /**
156  * How much space have we currently reserved?
157  */
158 static unsigned long long reserved;
159   
160 /**
161  * How much data are we currently storing
162  * in the database?
163  */
164 static unsigned long long payload;
165
166 /**
167  * Number of updates that were made to the
168  * payload value since we last synchronized
169  * it with the statistics service.
170  */
171 static unsigned int lastSync;
172
173 /**
174  * Did we get an answer from statistics?
175  */
176 static int stats_worked;
177
178 /**
179  * Identity of the task that is used to delete
180  * expired content.
181  */
182 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
183
184 /**
185  * Our configuration.
186  */
187 const struct GNUNET_CONFIGURATION_Handle *cfg;
188
189
190 /**
191  * Handle for reporting statistics.
192  */
193 static struct GNUNET_STATISTICS_Handle *stats;
194
195
196 /**
197  * Synchronize our utilization statistics with the 
198  * statistics service.
199  */
200 static void 
201 sync_stats ()
202 {
203   GNUNET_STATISTICS_set (stats,
204                          QUOTA_STAT_NAME,
205                          payload,
206                          GNUNET_YES);
207   lastSync = 0;
208 }
209
210
211
212
213 /**
214  * Function called once the transmit operation has
215  * either failed or succeeded.
216  *
217  * @param cls closure
218  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
219  */
220 typedef void (*TransmitContinuation)(void *cls,
221                                      int status);
222
223
224 /**
225  * Context for transmitting replies to clients.
226  */
227 struct TransmitCallbackContext 
228 {
229   
230   /**
231    * We keep these in a doubly-linked list (for cleanup).
232    */
233   struct TransmitCallbackContext *next;
234   
235   /**
236    * We keep these in a doubly-linked list (for cleanup).
237    */
238   struct TransmitCallbackContext *prev;
239   
240   /**
241    * The message that we're asked to transmit.
242    */
243   struct GNUNET_MessageHeader *msg;
244   
245   /**
246    * Handle for the transmission request.
247    */
248   struct GNUNET_CONNECTION_TransmitHandle *th;
249
250   /**
251    * Client that we are transmitting to.
252    */
253   struct GNUNET_SERVER_Client *client;
254
255   /**
256    * Function to call once msg has been transmitted
257    * (or at least added to the buffer).
258    */
259   TransmitContinuation tc;
260
261   /**
262    * Closure for tc.
263    */
264   void *tc_cls;
265
266   /**
267    * GNUNET_YES if we are supposed to signal the server
268    * completion of the client's request.
269    */
270   int end;
271 };
272
273   
274 /**
275  * Head of the doubly-linked list (for cleanup).
276  */
277 static struct TransmitCallbackContext *tcc_head;
278
279 /**
280  * Tail of the doubly-linked list (for cleanup).
281  */
282 static struct TransmitCallbackContext *tcc_tail;
283
284 /**
285  * Have we already cleaned up the TCCs and are hence no longer
286  * willing (or able) to transmit anything to anyone?
287  */
288 static int cleaning_done;
289
290 /**
291  * Handle for pending get request.
292  */
293 static struct GNUNET_STATISTICS_GetHandle *stat_get;
294
295
296 /**
297  * Task that is used to remove expired entries from
298  * the datastore.  This task will schedule itself
299  * again automatically to always delete all expired
300  * content quickly.
301  *
302  * @param cls not used
303  * @param tc task context
304  */ 
305 static void
306 delete_expired (void *cls,
307                 const struct GNUNET_SCHEDULER_TaskContext *tc);
308
309
310 /**
311  * Iterate over the expired items stored in the datastore.
312  * Delete all expired items; once we have processed all
313  * expired items, re-schedule the "delete_expired" task.
314  *
315  * @param cls not used
316  * @param next_cls closure to pass to the "next" function.
317  * @param key key for the content
318  * @param size number of bytes in data
319  * @param data content stored
320  * @param type type of the content
321  * @param priority priority of the content
322  * @param anonymity anonymity-level for the content
323  * @param expiration expiration time for the content
324  * @param uid unique identifier for the datum;
325  *        maybe 0 if no unique identifier is available
326  *
327  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
328  *         (continue on call to "next", of course),
329  *         GNUNET_NO to delete the item and continue (if supported)
330  */
331 static int 
332 expired_processor (void *cls,
333                    void *next_cls,
334                    const GNUNET_HashCode * key,
335                    uint32_t size,
336                    const void *data,
337                    enum GNUNET_BLOCK_Type type,
338                    uint32_t priority,
339                    uint32_t anonymity,
340                    struct GNUNET_TIME_Absolute
341                    expiration, 
342                    uint64_t uid)
343 {
344   struct GNUNET_TIME_Absolute now;
345
346   if (key == NULL) 
347     {
348       expired_kill_task 
349         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
350                                         &delete_expired,
351                                         NULL);
352       return GNUNET_SYSERR;
353     }
354   now = GNUNET_TIME_absolute_get ();
355   if (expiration.abs_value > now.abs_value)
356     {
357       /* finished processing */
358       expired_kill_task 
359         = GNUNET_SCHEDULER_add_delayed (MAX_EXPIRE_DELAY,
360                                         &delete_expired,
361                                         NULL);
362       return GNUNET_SYSERR;
363     }
364 #if DEBUG_DATASTORE
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Deleting content `%s' of type %u that expired %llu ms ago\n",
367               GNUNET_h2s (key),
368               type,
369               (unsigned long long) (now.abs_value - expiration.abs_value));
370 #endif
371   GNUNET_STATISTICS_update (stats,
372                             gettext_noop ("# bytes expired"),
373                             size,
374                             GNUNET_YES);
375   GNUNET_CONTAINER_bloomfilter_remove (filter,
376                                        key);
377   expired_kill_task 
378     = GNUNET_SCHEDULER_add_delayed (MIN_EXPIRE_DELAY,
379                                     &delete_expired,
380                                     NULL);
381   return GNUNET_NO;
382 }
383
384
385 /**
386  * Task that is used to remove expired entries from
387  * the datastore.  This task will schedule itself
388  * again automatically to always delete all expired
389  * content quickly.
390  *
391  * @param cls not used
392  * @param tc task context
393  */ 
394 static void
395 delete_expired (void *cls,
396                 const struct GNUNET_SCHEDULER_TaskContext *tc)
397 {
398   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
399   plugin->api->expiration_get (plugin->api->cls, 
400                                &expired_processor,
401                                NULL);
402 }
403
404
405 /**
406  * An iterator over a set of items stored in the datastore
407  * that deletes until we're happy with respect to our quota.
408  *
409  * @param cls closure
410  * @param next_cls closure to pass to the "next" function.
411  * @param key key for the content
412  * @param size number of bytes in data
413  * @param data content stored
414  * @param type type of the content
415  * @param priority priority of the content
416  * @param anonymity anonymity-level for the content
417  * @param expiration expiration time for the content
418  * @param uid unique identifier for the datum;
419  *        maybe 0 if no unique identifier is available
420  *
421  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
422  *         (continue on call to "next", of course),
423  *         GNUNET_NO to delete the item and continue (if supported)
424  */
425 static int 
426 quota_processor (void *cls,
427                  void *next_cls,
428                  const GNUNET_HashCode * key,
429                  uint32_t size,
430                  const void *data,
431                  enum GNUNET_BLOCK_Type type,
432                  uint32_t priority,
433                  uint32_t anonymity,
434                  struct GNUNET_TIME_Absolute expiration, 
435                  uint64_t uid)
436 {
437   unsigned long long *need = cls;
438
439   if (NULL == key)
440     return GNUNET_SYSERR;    
441 #if DEBUG_DATASTORE
442   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
443               "Deleting %llu bytes of low-priority content `%s' of type %u (still trying to free another %llu bytes)\n",
444               (unsigned long long) (size + GNUNET_DATASTORE_ENTRY_OVERHEAD),
445               GNUNET_h2s (key),
446               type,
447               *need);
448 #endif
449   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
450     *need = 0;
451   else
452     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
453   GNUNET_STATISTICS_update (stats,
454                             gettext_noop ("# bytes purged (low-priority)"),
455                             size,
456                             GNUNET_YES);
457   GNUNET_CONTAINER_bloomfilter_remove (filter,
458                                        key);
459   return GNUNET_NO;
460 }
461
462
463 /**
464  * Manage available disk space by running tasks
465  * that will discard content if necessary.  This
466  * function will be run whenever a request for
467  * "need" bytes of storage could only be satisfied
468  * by eating into the "cache" (and we want our cache
469  * space back).
470  *
471  * @param need number of bytes of content that were
472  *        placed into the "cache" (and hence the
473  *        number of bytes that should be removed).
474  */
475 static void
476 manage_space (unsigned long long need)
477 {
478   unsigned long long last;
479
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Asked to free up %llu bytes of cache space\n",
483               need);
484 #endif
485   last = 0;
486   while ( (need > 0) &&
487           (last != need) )
488     {
489       last = need;
490       plugin->api->expiration_get (plugin->api->cls,
491                                    &quota_processor,
492                                    &need);    
493     }
494 }
495
496
497 /**
498  * Function called to notify a client about the socket
499  * begin ready to queue more data.  "buf" will be
500  * NULL and "size" zero if the socket was closed for
501  * writing in the meantime.
502  *
503  * @param cls closure
504  * @param size number of bytes available in buf
505  * @param buf where the callee should write the message
506  * @return number of bytes written to buf
507  */
508 static size_t
509 transmit_callback (void *cls,
510                    size_t size, void *buf)
511 {
512   struct TransmitCallbackContext *tcc = cls;
513   size_t msize;
514   
515   tcc->th = NULL;
516   GNUNET_CONTAINER_DLL_remove (tcc_head,
517                                tcc_tail,
518                                tcc);
519   msize = ntohs(tcc->msg->size);
520   if (size == 0)
521     {
522       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
523                   _("Transmission to client failed!\n"));
524       if (tcc->tc != NULL)
525         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
526       if (GNUNET_YES == tcc->end)
527         {
528           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                       _("Disconnecting client due to transmission failure!\n"));
530           GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
531         }
532       GNUNET_SERVER_client_drop (tcc->client);
533       GNUNET_free (tcc->msg);
534       GNUNET_free (tcc);
535       return 0;
536     }
537   GNUNET_assert (size >= msize);
538   memcpy (buf, tcc->msg, msize);
539   if (tcc->tc != NULL)
540     tcc->tc (tcc->tc_cls, GNUNET_OK);
541   if (GNUNET_YES == tcc->end)
542     {
543 #if DEBUG_DATASTORE
544       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
545                   "Done processing client request\n");
546 #endif
547       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
548     }
549   else
550     {
551 #if DEBUG_DATASTORE
552       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
553                   "Response transmitted, more pending!\n");
554 #endif
555     }
556   GNUNET_SERVER_client_drop (tcc->client);
557   GNUNET_free (tcc->msg);
558   GNUNET_free (tcc);
559   return msize;
560 }
561
562
563 /**
564  * Transmit the given message to the client.
565  *
566  * @param client target of the message
567  * @param msg message to transmit, will be freed!
568  * @param tc function to call afterwards
569  * @param tc_cls closure for tc
570  * @param end is this the last response (and we should
571  *        signal the server completion accodingly after
572  *        transmitting this message)?
573  */
574 static void
575 transmit (struct GNUNET_SERVER_Client *client,
576           struct GNUNET_MessageHeader *msg,
577           TransmitContinuation tc,
578           void *tc_cls,
579           int end)
580 {
581   struct TransmitCallbackContext *tcc;
582
583   if (GNUNET_YES == cleaning_done)
584     {
585 #if DEBUG_DATASTORE
586       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
587                   "Shutdown in progress, aborting transmission.\n");
588 #endif
589       GNUNET_free (msg);
590       if (NULL != tc)
591         tc (tc_cls, GNUNET_SYSERR);
592       return;
593     }
594   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
595   tcc->msg = msg;
596   tcc->client = client;
597   tcc->tc = tc;
598   tcc->tc_cls = tc_cls;
599   tcc->end = end;
600   if (NULL ==
601       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
602                                                       ntohs(msg->size),
603                                                       GNUNET_TIME_UNIT_FOREVER_REL,
604                                                       &transmit_callback,
605                                                       tcc)))
606     {
607       GNUNET_break (0);
608       if (GNUNET_YES == end)
609         {
610           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
611                       _("Forcefully disconnecting client.\n"));
612           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
613         }
614       if (NULL != tc)
615         tc (tc_cls, GNUNET_SYSERR);
616       GNUNET_free (msg);
617       GNUNET_free (tcc);
618       return;
619     }
620   GNUNET_SERVER_client_keep (client);
621   GNUNET_CONTAINER_DLL_insert (tcc_head,
622                                tcc_tail,
623                                tcc);
624 }
625
626
627 /**
628  * Transmit a status code to the client.
629  *
630  * @param client receiver of the response
631  * @param code status code
632  * @param msg optional error message (can be NULL)
633  */
634 static void
635 transmit_status (struct GNUNET_SERVER_Client *client,
636                  int code,
637                  const char *msg)
638 {
639   struct StatusMessage *sm;
640   size_t slen;
641
642 #if DEBUG_DATASTORE
643   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
644               "Transmitting `%s' message with value %d and message `%s'\n",
645               "STATUS",
646               code,
647               msg != NULL ? msg : "(none)");
648 #endif
649   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
650   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
651   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
652   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
653   sm->status = htonl(code);
654   if (slen > 0)
655     memcpy (&sm[1], msg, slen);  
656   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
657 }
658
659
660 /**
661  * Function called once the transmit operation has
662  * either failed or succeeded.
663  *
664  * @param next_cls closure for calling "next_request" callback
665  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
666  */
667 static void 
668 get_next(void *next_cls,
669          int status)
670 {
671   if (status != GNUNET_OK)
672     {
673       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
674                   _("Failed to transmit an item to the client; aborting iteration.\n"));
675       if (plugin != NULL)
676         plugin->api->next_request (next_cls, GNUNET_YES);
677       return;
678     }
679   if (next_cls != NULL)
680    plugin->api->next_request (next_cls, GNUNET_NO);
681 }
682
683
684 /**
685  * Function that will transmit the given datastore entry
686  * to the client.
687  *
688  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
689  * @param next_cls closure to use to ask for the next item
690  * @param key key for the content
691  * @param size number of bytes in data
692  * @param data content stored
693  * @param type type of the content
694  * @param priority priority of the content
695  * @param anonymity anonymity-level for the content
696  * @param expiration expiration time for the content
697  * @param uid unique identifier for the datum;
698  *        maybe 0 if no unique identifier is available
699  *
700  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
701  *         GNUNET_NO to delete the item and continue (if supported)
702  */
703 static int
704 transmit_item (void *cls,
705                void *next_cls,
706                const GNUNET_HashCode * key,
707                uint32_t size,
708                const void *data,
709                enum GNUNET_BLOCK_Type type,
710                uint32_t priority,
711                uint32_t anonymity,
712                struct GNUNET_TIME_Absolute
713                expiration, uint64_t uid)
714 {
715   struct GNUNET_SERVER_Client *client = cls;
716   struct GNUNET_MessageHeader *end;
717   struct DataMessage *dm;
718
719   if (key == NULL)
720     {
721       /* transmit 'DATA_END' */
722 #if DEBUG_DATASTORE
723       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
724                   "Transmitting `%s' message\n",
725                   "DATA_END");
726 #endif
727       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
728       end->size = htons(sizeof(struct GNUNET_MessageHeader));
729       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
730       transmit (client, end, NULL, NULL, GNUNET_YES);
731       GNUNET_SERVER_client_drop (client);
732       return GNUNET_OK;
733     }
734   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
735   dm->header.size = htons(sizeof(struct DataMessage) + size);
736   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
737   dm->rid = htonl(0);
738   dm->size = htonl(size);
739   dm->type = htonl(type);
740   dm->priority = htonl(priority);
741   dm->anonymity = htonl(anonymity);
742   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
743   dm->uid = GNUNET_htonll(uid);
744   dm->key = *key;
745   memcpy (&dm[1], data, size);
746 #if DEBUG_DATASTORE
747   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
748               "Transmitting `%s' message for `%s' of type %u\n",
749               "DATA",
750               GNUNET_h2s (key),
751               type);
752 #endif
753   GNUNET_STATISTICS_update (stats,
754                             gettext_noop ("# results found"),
755                             1,
756                             GNUNET_NO);
757   transmit (client, &dm->header, &get_next, next_cls, 
758             (next_cls != NULL) ? GNUNET_NO : GNUNET_YES);
759   return GNUNET_OK;
760 }
761
762
763 /**
764  * Handle RESERVE-message.
765  *
766  * @param cls closure
767  * @param client identification of the client
768  * @param message the actual message
769  */
770 static void
771 handle_reserve (void *cls,
772                 struct GNUNET_SERVER_Client *client,
773                 const struct GNUNET_MessageHeader *message)
774 {
775   /**
776    * Static counter to produce reservation identifiers.
777    */
778   static int reservation_gen;
779
780   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
781   struct ReservationList *e;
782   unsigned long long used;
783   unsigned long long req;
784   uint64_t amount;
785   uint32_t entries;
786
787 #if DEBUG_DATASTORE
788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
789               "Processing `%s' request\n",
790               "RESERVE");
791 #endif
792   amount = GNUNET_ntohll(msg->amount);
793   entries = ntohl(msg->entries);
794   used = payload + reserved;
795   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
796   if (used + req > quota)
797     {
798       if (quota < used)
799         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
800       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
801                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
802                   quota - used,
803                   "RESERVE",
804                   req);
805       if (cache_size < req)
806         {
807           /* TODO: document this in the FAQ; essentially, if this
808              message happens, the insertion request could be blocked
809              by less-important content from migration because it is
810              larger than 1/8th of the overall available space, and
811              we only reserve 1/8th for "fresh" insertions */
812           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
813                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
814                       req,
815                       cache_size);
816           transmit_status (client, 0, 
817                            gettext_noop ("Insufficient space to satisfy request and "
818                                          "requested amount is larger than cache size"));
819         }
820       else
821         {
822           transmit_status (client, 0, 
823                            gettext_noop ("Insufficient space to satisfy request"));
824         }
825       return;      
826     }
827   reserved += req;
828   GNUNET_STATISTICS_set (stats,
829                          gettext_noop ("# reserved"),
830                          reserved,
831                          GNUNET_NO);
832   e = GNUNET_malloc (sizeof(struct ReservationList));
833   e->next = reservations;
834   reservations = e;
835   e->client = client;
836   e->amount = amount;
837   e->entries = entries;
838   e->rid = ++reservation_gen;
839   if (reservation_gen < 0)
840     reservation_gen = 0; /* wrap around */
841   transmit_status (client, e->rid, NULL);
842 }
843
844
845 /**
846  * Handle RELEASE_RESERVE-message.
847  *
848  * @param cls closure
849  * @param client identification of the client
850  * @param message the actual message
851  */
852 static void
853 handle_release_reserve (void *cls,
854                         struct GNUNET_SERVER_Client *client,
855                         const struct GNUNET_MessageHeader *message)
856 {
857   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
858   struct ReservationList *pos;
859   struct ReservationList *prev;
860   struct ReservationList *next;
861   int rid = ntohl(msg->rid);
862   unsigned long long rem;
863
864 #if DEBUG_DATASTORE
865   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866               "Processing `%s' request\n",
867               "RELEASE_RESERVE");
868 #endif
869   next = reservations;
870   prev = NULL;
871   while (NULL != (pos = next))
872     {
873       next = pos->next;
874       if (rid == pos->rid)
875         {
876           if (prev == NULL)
877             reservations = next;
878           else
879             prev->next = next;
880           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
881           GNUNET_assert (reserved >= rem);
882           reserved -= rem;
883           GNUNET_STATISTICS_set (stats,
884                          gettext_noop ("# reserved"),
885                                  reserved,
886                                  GNUNET_NO);
887 #if DEBUG_DATASTORE
888           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
889                       "Returning %llu remaining reserved bytes to storage pool\n",
890                       rem);
891 #endif    
892           GNUNET_free (pos);
893           transmit_status (client, GNUNET_OK, NULL);
894           return;
895         }       
896       prev = pos;
897     }
898   GNUNET_break (0);
899   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
900 }
901
902
903 /**
904  * Check that the given message is a valid data message.
905  *
906  * @return NULL if the message is not well-formed, otherwise the message
907  */
908 static const struct DataMessage *
909 check_data (const struct GNUNET_MessageHeader *message)
910 {
911   uint16_t size;
912   uint32_t dsize;
913   const struct DataMessage *dm;
914
915   size = ntohs(message->size);
916   if (size < sizeof(struct DataMessage))
917     { 
918       GNUNET_break (0);
919       return NULL;
920     }
921   dm = (const struct DataMessage *) message;
922   dsize = ntohl(dm->size);
923   if (size != dsize + sizeof(struct DataMessage))
924     {
925       GNUNET_break (0);
926       return NULL;
927     }
928   return dm;
929 }
930
931
932 /**
933  * Context for a put request used to see if the content is
934  * already present.
935  */
936 struct PutContext
937 {
938   /**
939    * Client to notify on completion.
940    */
941   struct GNUNET_SERVER_Client *client;
942
943   /**
944    * Did we find the data already in the database?
945    */
946   int is_present;
947   
948   /* followed by the 'struct DataMessage' */
949 };
950
951
952 /**
953  * Actually put the data message.
954  */
955 static void
956 execute_put (struct GNUNET_SERVER_Client *client,
957              const struct DataMessage *dm)
958 {
959   uint32_t size;
960   char *msg;
961   int ret;
962
963   size = ntohl(dm->size);
964   msg = NULL;
965   ret = plugin->api->put (plugin->api->cls,
966                           &dm->key,
967                           size,
968                           &dm[1],
969                           ntohl(dm->type),
970                           ntohl(dm->priority),
971                           ntohl(dm->anonymity),
972                           0 /* FIXME: replication */,
973                           GNUNET_TIME_absolute_ntoh(dm->expiration),
974                           &msg);
975   if (GNUNET_OK == ret)
976     {
977       GNUNET_STATISTICS_update (stats,
978                                 gettext_noop ("# bytes stored"),
979                                 size,
980                                 GNUNET_YES);
981       GNUNET_CONTAINER_bloomfilter_add (filter,
982                                         &dm->key);
983 #if DEBUG_DATASTORE
984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985                   "Successfully stored %u bytes of type %u under key `%s'\n",
986                   size,
987                   ntohl(dm->type),
988                   GNUNET_h2s (&dm->key));
989 #endif
990     }
991   transmit_status (client, 
992                    ret,
993                    msg);
994   GNUNET_free_non_null (msg);
995   if (quota - reserved - cache_size < payload)
996     {
997       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
998                   _("Need %llu bytes more space (%llu allowed, using %llu)\n"),
999                   (unsigned long long) size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
1000                   (unsigned long long) (quota - reserved - cache_size),
1001                   (unsigned long long) payload);
1002       manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1003     }
1004 }
1005
1006
1007 /**
1008  * Function that will check if the given datastore entry
1009  * matches the put and if none match executes the put.
1010  *
1011  * @param cls closure, pointer to the client (of type 'struct PutContext').
1012  * @param next_cls closure to use to ask for the next item
1013  * @param key key for the content
1014  * @param size number of bytes in data
1015  * @param data content stored
1016  * @param type type of the content
1017  * @param priority priority of the content
1018  * @param anonymity anonymity-level for the content
1019  * @param expiration expiration time for the content
1020  * @param uid unique identifier for the datum;
1021  *        maybe 0 if no unique identifier is available
1022  *
1023  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
1024  *         GNUNET_NO to delete the item and continue (if supported)
1025  */
1026 static int
1027 check_present (void *cls,
1028                void *next_cls,
1029                const GNUNET_HashCode * key,
1030                uint32_t size,
1031                const void *data,
1032                enum GNUNET_BLOCK_Type type,
1033                uint32_t priority,
1034                uint32_t anonymity,
1035                struct GNUNET_TIME_Absolute
1036                expiration, uint64_t uid)
1037 {
1038   struct PutContext *pc = cls;
1039   const struct DataMessage *dm;
1040
1041   dm = (const struct DataMessage*) &pc[1];
1042   if (key == NULL)
1043     {
1044       if (pc->is_present == GNUNET_YES) 
1045         transmit_status (pc->client, GNUNET_NO, NULL);
1046       else
1047         execute_put (pc->client, dm);
1048       GNUNET_SERVER_client_drop (pc->client);
1049       GNUNET_free (pc);
1050       return GNUNET_SYSERR;
1051     }
1052   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
1053        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
1054        ( (size == ntohl(dm->size)) &&
1055          (0 == memcmp (&dm[1],
1056                        data,
1057                        size)) ) )
1058     {
1059       pc->is_present = GNUNET_YES;
1060       plugin->api->next_request (next_cls, GNUNET_YES);
1061     }
1062   else
1063     {
1064       plugin->api->next_request (next_cls, GNUNET_NO);
1065     }
1066   return GNUNET_OK;
1067 }
1068
1069
1070 /**
1071  * Handle PUT-message.
1072  *
1073  * @param cls closure
1074  * @param client identification of the client
1075  * @param message the actual message
1076  */
1077 static void
1078 handle_put (void *cls,
1079             struct GNUNET_SERVER_Client *client,
1080             const struct GNUNET_MessageHeader *message)
1081 {
1082   const struct DataMessage *dm = check_data (message);
1083   int rid;
1084   struct ReservationList *pos;
1085   struct PutContext *pc;
1086   uint32_t size;
1087
1088   if ( (dm == NULL) ||
1089        (ntohl(dm->type) == 0) ) 
1090     {
1091       GNUNET_break (0);
1092       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1093       return;
1094     }
1095 #if DEBUG_DATASTORE
1096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097               "Processing `%s' request for `%s' of type %u\n",
1098               "PUT",
1099               GNUNET_h2s (&dm->key),
1100               ntohl (dm->type));
1101 #endif
1102   rid = ntohl(dm->rid);
1103   size = ntohl(dm->size);
1104   if (rid > 0)
1105     {
1106       pos = reservations;
1107       while ( (NULL != pos) &&
1108               (rid != pos->rid) )
1109         pos = pos->next;
1110       GNUNET_break (pos != NULL);
1111       if (NULL != pos)
1112         {
1113           GNUNET_break (pos->entries > 0);
1114           GNUNET_break (pos->amount >= size);
1115           pos->entries--;
1116           pos->amount -= size;
1117           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
1118           GNUNET_STATISTICS_set (stats,
1119                                  gettext_noop ("# reserved"),
1120                                  reserved,
1121                                  GNUNET_NO);
1122         }
1123     }
1124   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
1125                                                        &dm->key))
1126     {
1127       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct DataMessage));
1128       pc->client = client;
1129       GNUNET_SERVER_client_keep (client);
1130       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
1131       plugin->api->get (plugin->api->cls,
1132                         &dm->key,
1133                         NULL,
1134                         ntohl (dm->type),
1135                         &check_present,
1136                         pc);      
1137       return;
1138     }
1139   execute_put (client, dm);
1140 }
1141
1142
1143 /**
1144  * Handle GET-message.
1145  *
1146  * @param cls closure
1147  * @param client identification of the client
1148  * @param message the actual message
1149  */
1150 static void
1151 handle_get (void *cls,
1152             struct GNUNET_SERVER_Client *client,
1153             const struct GNUNET_MessageHeader *message)
1154 {
1155   const struct GetMessage *msg;
1156   uint16_t size;
1157
1158   size = ntohs(message->size);
1159   if ( (size != sizeof(struct GetMessage)) &&
1160        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
1161     {
1162       GNUNET_break (0);
1163       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1164       return;
1165     }
1166   msg = (const struct GetMessage*) message;
1167 #if DEBUG_DATASTORE
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "Processing `%s' request for `%s' of type %u\n",
1170               "GET",
1171               GNUNET_h2s (&msg->key),
1172               ntohl (msg->type));
1173 #endif
1174   GNUNET_STATISTICS_update (stats,
1175                             gettext_noop ("# GET requests received"),
1176                             1,
1177                             GNUNET_NO);
1178   GNUNET_SERVER_client_keep (client);
1179   if ( (size == sizeof(struct GetMessage)) &&
1180        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
1181                                                          &msg->key)) )
1182     {
1183       /* don't bother database... */
1184 #if DEBUG_DATASTORE
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Empty result set for `%s' request for `%s' (bloomfilter).\n",
1187                   "GET",
1188                   GNUNET_h2s (&msg->key));
1189 #endif  
1190       GNUNET_STATISTICS_update (stats,
1191                                 gettext_noop ("# requests filtered by bloomfilter"),
1192                                 1,
1193                                 GNUNET_NO);
1194       transmit_item (client,
1195                      NULL, NULL, 0, NULL, 0, 0, 0, 
1196                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
1197       return;
1198     }
1199   plugin->api->get (plugin->api->cls,
1200                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
1201                     NULL,
1202                     ntohl(msg->type),
1203                     &transmit_item,
1204                     client);    
1205 }
1206
1207
1208 /**
1209  * Handle UPDATE-message.
1210  *
1211  * @param cls closure
1212  * @param client identification of the client
1213  * @param message the actual message
1214  */
1215 static void
1216 handle_update (void *cls,
1217                struct GNUNET_SERVER_Client *client,
1218                const struct GNUNET_MessageHeader *message)
1219 {
1220   const struct UpdateMessage *msg;
1221   int ret;
1222   char *emsg;
1223
1224   GNUNET_STATISTICS_update (stats,
1225                             gettext_noop ("# UPDATE requests received"),
1226                             1,
1227                             GNUNET_NO);
1228   msg = (const struct UpdateMessage*) message;
1229   emsg = NULL;
1230 #if DEBUG_DATASTORE
1231   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1232               "Processing `%s' request for %llu\n",
1233               "UPDATE",
1234               (unsigned long long) GNUNET_ntohll (msg->uid));
1235 #endif
1236   ret = plugin->api->update (plugin->api->cls,
1237                              GNUNET_ntohll(msg->uid),
1238                              (int32_t) ntohl(msg->priority),
1239                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1240                              &emsg);
1241   transmit_status (client, ret, emsg);
1242   GNUNET_free_non_null (emsg);
1243 }
1244
1245
1246 /**
1247  * Handle GET_REPLICATION-message.
1248  *
1249  * @param cls closure
1250  * @param client identification of the client
1251  * @param message the actual message
1252  */
1253 static void
1254 handle_get_replication (void *cls,
1255                         struct GNUNET_SERVER_Client *client,
1256                         const struct GNUNET_MessageHeader *message)
1257 {
1258 #if DEBUG_DATASTORE
1259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260               "Processing `%s' request\n",
1261               "GET_REPLICATION");
1262 #endif
1263   GNUNET_STATISTICS_update (stats,
1264                             gettext_noop ("# GET REPLICATION requests received"),
1265                             1,
1266                             GNUNET_NO);
1267   GNUNET_SERVER_client_keep (client);
1268   plugin->api->replication_get (plugin->api->cls,
1269                                 &transmit_item,
1270                                 client);  
1271 }
1272
1273
1274 /**
1275  * Handle GET_ZERO_ANONYMITY-message.
1276  *
1277  * @param cls closure
1278  * @param client identification of the client
1279  * @param message the actual message
1280  */
1281 static void
1282 handle_get_zero_anonymity (void *cls,
1283                            struct GNUNET_SERVER_Client *client,
1284                            const struct GNUNET_MessageHeader *message)
1285 {
1286   const struct GetZeroAnonymityMessage * msg = (const struct GetZeroAnonymityMessage*) message;
1287   enum GNUNET_BLOCK_Type type;
1288
1289   type = (enum GNUNET_BLOCK_Type) ntohl (msg->type);
1290   if (type == GNUNET_BLOCK_TYPE_ANY)
1291     {
1292       GNUNET_break (0);
1293       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1294       return;
1295     }
1296 #if DEBUG_DATASTORE
1297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1298               "Processing `%s' request\n",
1299               "GET_ZERO_ANONYMITY");
1300 #endif
1301   GNUNET_STATISTICS_update (stats,
1302                             gettext_noop ("# GET ZERO ANONYMITY requests received"),
1303                             1,
1304                             GNUNET_NO);
1305   GNUNET_SERVER_client_keep (client);
1306   plugin->api->iter_zero_anonymity (plugin->api->cls,
1307                                     type,
1308                                     &transmit_item,
1309                                     client);  
1310 }
1311
1312
1313 /**
1314  * Context for the 'remove_callback'.
1315  */
1316 struct RemoveContext 
1317 {
1318   /**
1319    * Client for whom we're doing the remvoing.
1320    */
1321   struct GNUNET_SERVER_Client *client;
1322
1323   /**
1324    * GNUNET_YES if we managed to remove something.
1325    */
1326   int found;
1327 };
1328
1329
1330 /**
1331  * Callback function that will cause the item that is passed
1332  * in to be deleted (by returning GNUNET_NO).
1333  */
1334 static int
1335 remove_callback (void *cls,
1336                  void *next_cls,
1337                  const GNUNET_HashCode * key,
1338                  uint32_t size,
1339                  const void *data,
1340                  enum GNUNET_BLOCK_Type type,
1341                  uint32_t priority,
1342                  uint32_t anonymity,
1343                  struct GNUNET_TIME_Absolute
1344                  expiration, uint64_t uid)
1345 {
1346   struct RemoveContext *rc = cls;
1347
1348   fprintf (stderr,
1349            "remove called with key %p\n",
1350            key);
1351   if (key == NULL)
1352     {
1353 #if DEBUG_DATASTORE
1354       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1355                   "No further matches for `%s' request.\n",
1356                   "REMOVE");
1357 #endif  
1358       if (GNUNET_YES == rc->found)
1359         transmit_status (rc->client, GNUNET_OK, NULL);       
1360       else
1361         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1362       GNUNET_SERVER_client_drop (rc->client);
1363       GNUNET_free (rc);
1364       return GNUNET_OK; /* last item */
1365     }
1366   rc->found = GNUNET_YES;
1367 #if DEBUG_DATASTORE
1368   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1369               "Item %llu matches `%s' request for key `%s' and type %u.\n",
1370               (unsigned long long) uid,
1371               "REMOVE",
1372               GNUNET_h2s (key),
1373               type);
1374 #endif  
1375   GNUNET_STATISTICS_update (stats,
1376                             gettext_noop ("# bytes removed (explicit request)"),
1377                             size,
1378                             GNUNET_YES);
1379   GNUNET_CONTAINER_bloomfilter_remove (filter,
1380                                        key);
1381   plugin->api->next_request (next_cls, GNUNET_YES);
1382   return GNUNET_NO;
1383 }
1384
1385
1386 /**
1387  * Handle REMOVE-message.
1388  *
1389  * @param cls closure
1390  * @param client identification of the client
1391  * @param message the actual message
1392  */
1393 static void
1394 handle_remove (void *cls,
1395                struct GNUNET_SERVER_Client *client,
1396                const struct GNUNET_MessageHeader *message)
1397 {
1398   const struct DataMessage *dm = check_data (message);
1399   GNUNET_HashCode vhash;
1400   struct RemoveContext *rc;
1401
1402   if (dm == NULL)
1403     {
1404       GNUNET_break (0);
1405       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1406       return;
1407     }
1408 #if DEBUG_DATASTORE
1409   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410               "Processing `%s' request for `%s' of type %u\n",
1411               "REMOVE",
1412               GNUNET_h2s (&dm->key),
1413               ntohl (dm->type));
1414 #endif
1415   GNUNET_STATISTICS_update (stats,
1416                             gettext_noop ("# REMOVE requests received"),
1417                             1,
1418                             GNUNET_NO);
1419   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1420   GNUNET_SERVER_client_keep (client);
1421   rc->client = client;
1422   GNUNET_CRYPTO_hash (&dm[1],
1423                       ntohl(dm->size),
1424                       &vhash);
1425   fprintf (stderr,
1426            "remove does get for key %s\n",
1427            GNUNET_h2s (&dm->key));
1428   fprintf (stderr,
1429            "remove does get for %u vhash %s\n",
1430            ntohl (dm->type),
1431            GNUNET_h2s (&vhash));
1432   plugin->api->get (plugin->api->cls,
1433                     &dm->key,
1434                     &vhash,
1435                     (enum GNUNET_BLOCK_Type) ntohl(dm->type),
1436                     &remove_callback,
1437                     rc);
1438 }
1439
1440
1441 /**
1442  * Handle DROP-message.
1443  *
1444  * @param cls closure
1445  * @param client identification of the client
1446  * @param message the actual message
1447  */
1448 static void
1449 handle_drop (void *cls,
1450              struct GNUNET_SERVER_Client *client,
1451              const struct GNUNET_MessageHeader *message)
1452 {
1453 #if DEBUG_DATASTORE
1454   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455               "Processing `%s' request\n",
1456               "DROP");
1457 #endif
1458   plugin->api->drop (plugin->api->cls);
1459   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1460 }
1461
1462
1463 /**
1464  * Function called by plugins to notify us about a
1465  * change in their disk utilization.
1466  *
1467  * @param cls closure (NULL)
1468  * @param delta change in disk utilization, 
1469  *        0 for "reset to empty"
1470  */
1471 static void
1472 disk_utilization_change_cb (void *cls,
1473                             int delta)
1474 {
1475   if ( (delta < 0) &&
1476        (payload < -delta) )
1477     {
1478       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1479                   _("Datastore payload inaccurate (%lld < %lld).  Trying to fix.\n"),
1480                   (long long) payload,
1481                   (long long) -delta);
1482       payload = plugin->api->get_size (plugin->api->cls);
1483       sync_stats ();
1484       return;
1485     }
1486   payload += delta;
1487   lastSync++;
1488   if (lastSync >= MAX_STAT_SYNC_LAG)
1489     sync_stats ();
1490 }
1491
1492
1493 /**
1494  * Callback function to process statistic values.
1495  *
1496  * @param cls closure (struct Plugin*)
1497  * @param subsystem name of subsystem that created the statistic
1498  * @param name the name of the datum
1499  * @param value the current value
1500  * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not
1501  * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration
1502  */
1503 static int
1504 process_stat_in (void *cls,
1505                  const char *subsystem,
1506                  const char *name,
1507                  uint64_t value,
1508                  int is_persistent)
1509 {
1510   GNUNET_assert (stats_worked == GNUNET_NO);
1511   stats_worked = GNUNET_YES;
1512   payload += value;
1513 #if DEBUG_SQLITE
1514   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1515               "Notification from statistics about existing payload (%llu), new payload is %llu\n",
1516               abs_value,
1517               payload);
1518 #endif
1519   return GNUNET_OK;
1520 }
1521
1522
1523 static void
1524 process_stat_done (void *cls,
1525                    int success)
1526 {
1527   struct DatastorePlugin *plugin = cls;
1528
1529   stat_get = NULL;
1530   if (stats_worked == GNUNET_NO) 
1531     payload = plugin->api->get_size (plugin->api->cls);
1532 }
1533
1534
1535 /**
1536  * Load the datastore plugin.
1537  */
1538 static struct DatastorePlugin *
1539 load_plugin () 
1540 {
1541   struct DatastorePlugin *ret;
1542   char *libname;
1543   char *name;
1544
1545   if (GNUNET_OK !=
1546       GNUNET_CONFIGURATION_get_value_string (cfg,
1547                                              "DATASTORE", "DATABASE", &name))
1548     {
1549       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1550                   _("No `%s' specified for `%s' in configuration!\n"),
1551                   "DATABASE",
1552                   "DATASTORE");
1553       return NULL;
1554     }
1555   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1556   ret->env.cfg = cfg;
1557   ret->env.duc = &disk_utilization_change_cb;
1558   ret->env.cls = NULL;
1559   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1560               _("Loading `%s' datastore plugin\n"), name);
1561   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1562   ret->short_name = name;
1563   ret->lib_name = libname;
1564   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1565   if (ret->api == NULL)
1566     {
1567       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1568                   _("Failed to load datastore plugin for `%s'\n"), name);
1569       GNUNET_free (ret->short_name);
1570       GNUNET_free (libname);
1571       GNUNET_free (ret);
1572       return NULL;
1573     }
1574   return ret;
1575 }
1576
1577
1578 /**
1579  * Function called when the service shuts
1580  * down.  Unloads our datastore plugin.
1581  *
1582  * @param plug plugin to unload
1583  */
1584 static void
1585 unload_plugin (struct DatastorePlugin *plug)
1586 {
1587 #if DEBUG_DATASTORE
1588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1589               "Datastore service is unloading plugin...\n");
1590 #endif
1591   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1592   GNUNET_free (plug->lib_name);
1593   GNUNET_free (plug->short_name);
1594   GNUNET_free (plug);
1595 }
1596
1597
1598 /**
1599  * Final task run after shutdown.  Unloads plugins and disconnects us from
1600  * statistics.
1601  */
1602 static void
1603 unload_task (void *cls,
1604              const struct GNUNET_SCHEDULER_TaskContext *tc)
1605 {
1606   unload_plugin (plugin);
1607   plugin = NULL;
1608   if (filter != NULL)
1609     {
1610       GNUNET_CONTAINER_bloomfilter_free (filter);
1611       filter = NULL;
1612     }
1613   if (lastSync > 0)
1614     sync_stats ();
1615   if (stat_get != NULL)
1616     {
1617       GNUNET_STATISTICS_get_cancel (stat_get);
1618       stat_get = NULL;
1619     }
1620   if (stats != NULL)
1621     {
1622       GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1623       stats = NULL;
1624     }
1625 }
1626
1627
1628 /**
1629  * Last task run during shutdown.  Disconnects us from
1630  * the transport and core.
1631  */
1632 static void
1633 cleaning_task (void *cls,
1634                const struct GNUNET_SCHEDULER_TaskContext *tc)
1635 {
1636   struct TransmitCallbackContext *tcc;
1637
1638   cleaning_done = GNUNET_YES;
1639   while (NULL != (tcc = tcc_head))
1640     {
1641       GNUNET_CONTAINER_DLL_remove (tcc_head,
1642                                    tcc_tail,
1643                                    tcc);
1644       if (tcc->th != NULL)
1645         {
1646           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1647           GNUNET_SERVER_client_drop (tcc->client);
1648         }
1649       if (NULL != tcc->tc)
1650         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1651       GNUNET_free (tcc->msg);
1652       GNUNET_free (tcc);
1653     }
1654   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1655     {
1656       GNUNET_SCHEDULER_cancel (expired_kill_task);
1657       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1658     }
1659   GNUNET_SCHEDULER_add_continuation (&unload_task,
1660                                      NULL,
1661                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1662 }
1663
1664
1665 /**
1666  * Function that removes all active reservations made
1667  * by the given client and releases the space for other
1668  * requests.
1669  *
1670  * @param cls closure
1671  * @param client identification of the client
1672  */
1673 static void
1674 cleanup_reservations (void *cls,
1675                       struct GNUNET_SERVER_Client *client)
1676 {
1677   struct ReservationList *pos;
1678   struct ReservationList *prev;
1679   struct ReservationList *next;
1680
1681   if (client == NULL)
1682     return;
1683   prev = NULL;
1684   pos = reservations;
1685   while (NULL != pos)
1686     {
1687       next = pos->next;
1688       if (pos->client == client)
1689         {
1690           if (prev == NULL)
1691             reservations = next;
1692           else
1693             prev->next = next;
1694           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1695           GNUNET_free (pos);
1696         }
1697       else
1698         {
1699           prev = pos;
1700         }
1701       pos = next;
1702     }
1703   GNUNET_STATISTICS_set (stats,
1704                          gettext_noop ("# reserved"),
1705                          reserved,
1706                          GNUNET_NO);
1707 }
1708
1709
1710 /**
1711  * Process datastore requests.
1712  *
1713  * @param cls closure
1714  * @param server the initialized server
1715  * @param c configuration to use
1716  */
1717 static void
1718 run (void *cls,
1719      struct GNUNET_SERVER_Handle *server,
1720      const struct GNUNET_CONFIGURATION_Handle *c)
1721 {
1722   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
1723     {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1724      sizeof(struct ReserveMessage) }, 
1725     {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1726      sizeof(struct ReleaseReserveMessage) }, 
1727     {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1728     {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1729      sizeof (struct UpdateMessage) }, 
1730     {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1731     {&handle_get_replication, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION, 
1732      sizeof(struct GNUNET_MessageHeader) }, 
1733     {&handle_get_zero_anonymity, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY, 
1734      sizeof(struct GetZeroAnonymityMessage) }, 
1735     {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1736     {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1737      sizeof(struct GNUNET_MessageHeader) }, 
1738     {NULL, NULL, 0, 0}
1739   };
1740   char *fn;
1741   unsigned int bf_size;
1742
1743   cfg = c;
1744   if (GNUNET_OK !=
1745       GNUNET_CONFIGURATION_get_value_number (cfg,
1746                                              "DATASTORE", "QUOTA", &quota))
1747     {
1748       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1749                   _("No `%s' specified for `%s' in configuration!\n"),
1750                   "QUOTA",
1751                   "DATASTORE");
1752       return;
1753     }
1754   stats = GNUNET_STATISTICS_create ("datastore", cfg);
1755   GNUNET_STATISTICS_set (stats,
1756                          gettext_noop ("# quota"),
1757                          quota,
1758                          GNUNET_NO);
1759   cache_size = quota / 8; /* Or should we make this an option? */
1760   GNUNET_STATISTICS_set (stats,
1761                          gettext_noop ("# cache size"),
1762                          cache_size,
1763                          GNUNET_NO);
1764   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1765   fn = NULL;
1766   if ( (GNUNET_OK !=
1767         GNUNET_CONFIGURATION_get_value_filename (cfg,
1768                                                  "DATASTORE",
1769                                                  "BLOOMFILTER",
1770                                                  &fn)) ||
1771        (GNUNET_OK !=
1772         GNUNET_DISK_directory_create_for_file (fn)) )
1773     {
1774       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1775                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1776                   fn != NULL ? fn : "");
1777       GNUNET_free_non_null (fn);
1778       fn = NULL;
1779     }
1780   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1781   GNUNET_free_non_null (fn);
1782   if (filter == NULL)
1783     {
1784       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1785                   _("Failed to initialize bloomfilter.\n"));
1786       if (stats != NULL)
1787         {
1788           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1789           stats = NULL;
1790         }
1791       return;
1792     }
1793   plugin = load_plugin ();
1794   if (NULL == plugin)
1795     {
1796       GNUNET_CONTAINER_bloomfilter_free (filter);
1797       filter = NULL;
1798       if (stats != NULL)
1799         {
1800           GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
1801           stats = NULL;
1802         }
1803       return;
1804     }
1805   stat_get = GNUNET_STATISTICS_get (stats,
1806                                     "datastore",
1807                                     QUOTA_STAT_NAME,
1808                                     GNUNET_TIME_UNIT_SECONDS,
1809                                     &process_stat_done,
1810                                     &process_stat_in,
1811                                     plugin);
1812   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1813   GNUNET_SERVER_add_handlers (server, handlers);
1814   expired_kill_task
1815     = GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_IDLE,
1816                                           &delete_expired, NULL);
1817   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
1818                                 &cleaning_task, NULL);
1819 }
1820
1821
1822 /**
1823  * The main function for the datastore service.
1824  *
1825  * @param argc number of arguments from the command line
1826  * @param argv command line arguments
1827  * @return 0 ok, 1 on error
1828  */
1829 int
1830 main (int argc, char *const *argv)
1831 {
1832   int ret;
1833
1834   ret = (GNUNET_OK ==
1835          GNUNET_SERVICE_run (argc,
1836                              argv,
1837                              "datastore",
1838                              GNUNET_SERVICE_OPTION_NONE,
1839                              &run, NULL)) ? 0 : 1;
1840   return ret;
1841 }
1842
1843
1844 /* end of gnunet-service-datastore.c */