keep until server done call
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 /**
178  * Context for transmitting replies to clients.
179  */
180 struct TransmitCallbackContext 
181 {
182   
183   /**
184    * We keep these in a doubly-linked list (for cleanup).
185    */
186   struct TransmitCallbackContext *next;
187   
188   /**
189    * We keep these in a doubly-linked list (for cleanup).
190    */
191   struct TransmitCallbackContext *prev;
192   
193   /**
194    * The message that we're asked to transmit.
195    */
196   struct GNUNET_MessageHeader *msg;
197   
198   /**
199    * Handle for the transmission request.
200    */
201   struct GNUNET_CONNECTION_TransmitHandle *th;
202
203   /**
204    * Client that we are transmitting to.
205    */
206   struct GNUNET_SERVER_Client *client;
207
208   /**
209    * Function to call once msg has been transmitted
210    * (or at least added to the buffer).
211    */
212   TransmitContinuation tc;
213
214   /**
215    * Closure for tc.
216    */
217   void *tc_cls;
218
219   /**
220    * GNUNET_YES if we are supposed to signal the server
221    * completion of the client's request.
222    */
223   int end;
224 };
225
226   
227 /**
228  * Head of the doubly-linked list (for cleanup).
229  */
230 static struct TransmitCallbackContext *tcc_head;
231
232 /**
233  * Tail of the doubly-linked list (for cleanup).
234  */
235 static struct TransmitCallbackContext *tcc_tail;
236
237
238 /**
239  * Task that is used to remove expired entries from
240  * the datastore.  This task will schedule itself
241  * again automatically to always delete all expired
242  * content quickly.
243  *
244  * @param cls not used
245  * @param tc task context
246  */ 
247 static void
248 delete_expired (void *cls,
249                 const struct GNUNET_SCHEDULER_TaskContext *tc);
250
251
252 /**
253  * Iterate over the expired items stored in the datastore.
254  * Delete all expired items; once we have processed all
255  * expired items, re-schedule the "delete_expired" task.
256  *
257  * @param cls not used
258  * @param next_cls closure to pass to the "next" function.
259  * @param key key for the content
260  * @param size number of bytes in data
261  * @param data content stored
262  * @param type type of the content
263  * @param priority priority of the content
264  * @param anonymity anonymity-level for the content
265  * @param expiration expiration time for the content
266  * @param uid unique identifier for the datum;
267  *        maybe 0 if no unique identifier is available
268  *
269  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
270  *         (continue on call to "next", of course),
271  *         GNUNET_NO to delete the item and continue (if supported)
272  */
273 static int 
274 expired_processor (void *cls,
275                    void *next_cls,
276                    const GNUNET_HashCode * key,
277                    uint32_t size,
278                    const void *data,
279                    uint32_t type,
280                    uint32_t priority,
281                    uint32_t anonymity,
282                    struct GNUNET_TIME_Absolute
283                    expiration, 
284                    uint64_t uid)
285 {
286   struct GNUNET_TIME_Absolute now;
287
288   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
289   if (key == NULL) 
290     {
291       expired_kill_task 
292         = GNUNET_SCHEDULER_add_delayed (sched,
293                                         MAX_EXPIRE_DELAY,
294                                         &delete_expired,
295                                         NULL);
296       return GNUNET_SYSERR;
297     }
298   now = GNUNET_TIME_absolute_get ();
299   if (expiration.value > now.value)
300     {
301       /* finished processing */
302       plugin->api->next_request (next_cls, GNUNET_YES);
303       return GNUNET_SYSERR;
304     }
305   plugin->api->next_request (next_cls, GNUNET_NO);
306 #if DEBUG_DATASTORE
307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
308               "Deleting content that expired %llu ms ago\n",
309               (unsigned long long) (now.value - expiration.value));
310 #endif
311   GNUNET_CONTAINER_bloomfilter_remove (filter,
312                                        key);
313   return GNUNET_NO; /* delete */
314 }
315
316
317 /**
318  * Task that is used to remove expired entries from
319  * the datastore.  This task will schedule itself
320  * again automatically to always delete all expired
321  * content quickly.
322  *
323  * @param cls not used
324  * @param tc task context
325  */ 
326 static void
327 delete_expired (void *cls,
328                 const struct GNUNET_SCHEDULER_TaskContext *tc)
329 {
330   plugin->api->iter_ascending_expiration (plugin->api->cls, 
331                                           0,
332                                           &expired_processor,
333                                           NULL);
334 }
335
336
337 /**
338  * An iterator over a set of items stored in the datastore.
339  *
340  * @param cls closure
341  * @param next_cls closure to pass to the "next" function.
342  * @param key key for the content
343  * @param size number of bytes in data
344  * @param data content stored
345  * @param type type of the content
346  * @param priority priority of the content
347  * @param anonymity anonymity-level for the content
348  * @param expiration expiration time for the content
349  * @param uid unique identifier for the datum;
350  *        maybe 0 if no unique identifier is available
351  *
352  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
353  *         (continue on call to "next", of course),
354  *         GNUNET_NO to delete the item and continue (if supported)
355  */
356 static int 
357 manage (void *cls,
358         void *next_cls,
359         const GNUNET_HashCode * key,
360         uint32_t size,
361         const void *data,
362         uint32_t type,
363         uint32_t priority,
364         uint32_t anonymity,
365         struct GNUNET_TIME_Absolute
366         expiration, 
367         uint64_t uid)
368 {
369   unsigned long long *need = cls;
370
371   if (NULL == key)
372     {
373       GNUNET_free (need);
374       return GNUNET_SYSERR;
375     }
376   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
377     *need = 0;
378   else
379     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
380   plugin->api->next_request (next_cls, 
381                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
382 #if DEBUG_DATASTORE
383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
384               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
385               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
386               *need);
387 #endif
388   GNUNET_CONTAINER_bloomfilter_remove (filter,
389                                        key);
390   return GNUNET_NO;
391 }
392
393
394 /**
395  * Manage available disk space by running tasks
396  * that will discard content if necessary.  This
397  * function will be run whenever a request for
398  * "need" bytes of storage could only be satisfied
399  * by eating into the "cache" (and we want our cache
400  * space back).
401  *
402  * @param need number of bytes of content that were
403  *        placed into the "cache" (and hence the
404  *        number of bytes that should be removed).
405  */
406 static void
407 manage_space (unsigned long long need)
408 {
409   unsigned long long *n;
410
411 #if DEBUG_DATASTORE
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Asked to free up %llu bytes of cache space\n",
414               need);
415 #endif
416   n = GNUNET_malloc (sizeof(unsigned long long));
417   *n = need;
418   plugin->api->iter_low_priority (plugin->api->cls,
419                                   0,
420                                   &manage,
421                                   n);
422 }
423
424
425 /**
426  * Function called to notify a client about the socket
427  * begin ready to queue more data.  "buf" will be
428  * NULL and "size" zero if the socket was closed for
429  * writing in the meantime.
430  *
431  * @param cls closure
432  * @param size number of bytes available in buf
433  * @param buf where the callee should write the message
434  * @return number of bytes written to buf
435  */
436 static size_t
437 transmit_callback (void *cls,
438                    size_t size, void *buf)
439 {
440   struct TransmitCallbackContext *tcc = cls;
441   size_t msize;
442   
443   tcc->th = NULL;
444   GNUNET_CONTAINER_DLL_remove (tcc_head,
445                                tcc_tail,
446                                tcc);
447   msize = ntohs(tcc->msg->size);
448   if (size == 0)
449     {
450 #if DEBUG_DATASTORE
451       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
452                   "Transmission failed.\n");
453 #endif
454       if (tcc->tc != NULL)
455         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
456       if (GNUNET_YES == tcc->end)
457         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
458       GNUNET_SERVER_client_drop (tcc->client);
459       GNUNET_free (tcc->msg);
460       GNUNET_free (tcc);
461       return 0;
462     }
463   GNUNET_assert (size >= msize);
464   memcpy (buf, tcc->msg, msize);
465   if (tcc->tc != NULL)
466     tcc->tc (tcc->tc_cls, GNUNET_OK);
467   if (GNUNET_YES == tcc->end)
468     {
469       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
470     }
471   else
472     {
473 #if DEBUG_DATASTORE
474       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475                   "Response transmitted, more pending!\n");
476 #endif
477     }
478   GNUNET_SERVER_client_drop (tcc->client);
479   GNUNET_free (tcc->msg);
480   GNUNET_free (tcc);
481   return msize;
482 }
483
484
485 /**
486  * Transmit the given message to the client.
487  *
488  * @param client target of the message
489  * @param msg message to transmit, will be freed!
490  * @param tc function to call afterwards
491  * @param tc_cls closure for tc
492  * @param end is this the last response (and we should
493  *        signal the server completion accodingly after
494  *        transmitting this message)?
495  */
496 static void
497 transmit (struct GNUNET_SERVER_Client *client,
498           struct GNUNET_MessageHeader *msg,
499           TransmitContinuation tc,
500           void *tc_cls,
501           int end)
502 {
503   struct TransmitCallbackContext *tcc;
504
505   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
506   tcc->msg = msg;
507   tcc->client = client;
508   tcc->tc = tc;
509   tcc->tc_cls = tc_cls;
510   tcc->end = end;
511   if (NULL ==
512       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
513                                                       ntohs(msg->size),
514                                                       GNUNET_TIME_UNIT_FOREVER_REL,
515                                                       &transmit_callback,
516                                                       tcc)))
517     {
518       GNUNET_break (0);
519       if (GNUNET_YES == end)
520         {
521 #if DEBUG_DATASTORE
522           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
523                       "Disconnecting client.\n");
524 #endif    
525           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
526         }
527       if (NULL != tc)
528         tc (tc_cls, GNUNET_SYSERR);
529       GNUNET_free (msg);
530       GNUNET_free (tcc);
531       return;
532     }
533   GNUNET_SERVER_client_keep (client);
534   GNUNET_CONTAINER_DLL_insert (tcc_head,
535                                tcc_tail,
536                                tcc);
537 }
538
539
540 /**
541  * Transmit a status code to the client.
542  *
543  * @param client receiver of the response
544  * @param code status code
545  * @param msg optional error message (can be NULL)
546  */
547 static void
548 transmit_status (struct GNUNET_SERVER_Client *client,
549                  int code,
550                  const char *msg)
551 {
552   struct StatusMessage *sm;
553   size_t slen;
554
555 #if DEBUG_DATASTORE
556   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
557               "Transmitting `%s' message with value %d and message `%s'\n",
558               "STATUS",
559               code,
560               msg != NULL ? msg : "(none)");
561 #endif
562   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
563   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
564   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
565   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
566   sm->status = htonl(code);
567   if (slen > 0)
568     memcpy (&sm[1], msg, slen);  
569   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
570 }
571
572
573 /**
574  * Function called once the transmit operation has
575  * either failed or succeeded.
576  *
577  * @param next_cls closure for calling "next_request" callback
578  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
579  */
580 static void 
581 get_next(void *next_cls,
582          int status)
583 {
584   if (status != GNUNET_OK)
585     {
586       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
587                   _("Failed to transmit an item to the client; aborting iteration.\n"));
588       if (plugin != NULL)
589         plugin->api->next_request (next_cls, GNUNET_YES);
590       return;
591     }
592   plugin->api->next_request (next_cls, GNUNET_NO);
593 }
594
595
596 /**
597  * Function that will transmit the given datastore entry
598  * to the client.
599  *
600  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
601  * @param next_cls closure to use to ask for the next item
602  * @param key key for the content
603  * @param size number of bytes in data
604  * @param data content stored
605  * @param type type of the content
606  * @param priority priority of the content
607  * @param anonymity anonymity-level for the content
608  * @param expiration expiration time for the content
609  * @param uid unique identifier for the datum;
610  *        maybe 0 if no unique identifier is available
611  *
612  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
613  *         GNUNET_NO to delete the item and continue (if supported)
614  */
615 static int
616 transmit_item (void *cls,
617                void *next_cls,
618                const GNUNET_HashCode * key,
619                uint32_t size,
620                const void *data,
621                uint32_t type,
622                uint32_t priority,
623                uint32_t anonymity,
624                struct GNUNET_TIME_Absolute
625                expiration, uint64_t uid)
626 {
627   struct GNUNET_SERVER_Client *client = cls;
628   struct GNUNET_MessageHeader *end;
629   struct DataMessage *dm;
630
631   if (key == NULL)
632     {
633       /* transmit 'DATA_END' */
634 #if DEBUG_DATASTORE
635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
636                   "Transmitting `%s' message\n",
637                   "DATA_END");
638 #endif
639       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
640       end->size = htons(sizeof(struct GNUNET_MessageHeader));
641       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
642       transmit (client, end, NULL, NULL, GNUNET_YES);
643       GNUNET_SERVER_client_drop (client);
644       return GNUNET_OK;
645     }
646   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
647   dm->header.size = htons(sizeof(struct DataMessage) + size);
648   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
649   dm->rid = htonl(0);
650   dm->size = htonl(size);
651   dm->type = htonl(type);
652   dm->priority = htonl(priority);
653   dm->anonymity = htonl(anonymity);
654   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
655   dm->uid = GNUNET_htonll(uid);
656   dm->key = *key;
657   memcpy (&dm[1], data, size);
658 #if DEBUG_DATASTORE
659   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
660               "Transmitting `%s' message\n",
661               "DATA");
662 #endif
663   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
664   return GNUNET_OK;
665 }
666
667
668 /**
669  * Handle RESERVE-message.
670  *
671  * @param cls closure
672  * @param client identification of the client
673  * @param message the actual message
674  */
675 static void
676 handle_reserve (void *cls,
677                 struct GNUNET_SERVER_Client *client,
678                 const struct GNUNET_MessageHeader *message)
679 {
680   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
681   struct ReservationList *e;
682   unsigned long long used;
683   unsigned long long req;
684   uint64_t amount;
685   uint32_t entries;
686
687 #if DEBUG_DATASTORE
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Processing `%s' request\n",
690               "RESERVE");
691 #endif
692   amount = GNUNET_ntohll(msg->amount);
693   entries = ntohl(msg->entries);
694   used = plugin->api->get_size (plugin->api->cls) + reserved;
695   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
696   if (used + req > quota)
697     {
698       if (quota < used)
699         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
700       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
701                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
702                   quota - used,
703                   "RESERVE",
704                   req);
705       if (cache_size < req)
706         {
707           /* TODO: document this in the FAQ; essentially, if this
708              message happens, the insertion request could be blocked
709              by less-important content from migration because it is
710              larger than 1/8th of the overall available space, and
711              we only reserve 1/8th for "fresh" insertions */
712           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
713                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
714                       req,
715                       cache_size);
716           transmit_status (client, 0, 
717                            gettext_noop ("Insufficient space to satisfy request and "
718                                          "requested amount is larger than cache size"));
719         }
720       else
721         {
722           transmit_status (client, 0, 
723                            gettext_noop ("Insufficient space to satisfy request"));
724         }
725       return;      
726     }
727   reserved += req;
728   e = GNUNET_malloc (sizeof(struct ReservationList));
729   e->next = reservations;
730   reservations = e;
731   e->client = client;
732   e->amount = amount;
733   e->entries = entries;
734   e->rid = ++reservation_gen;
735   if (reservation_gen < 0)
736     reservation_gen = 0; /* wrap around */
737   transmit_status (client, e->rid, NULL);
738 }
739
740
741 /**
742  * Handle RELEASE_RESERVE-message.
743  *
744  * @param cls closure
745  * @param client identification of the client
746  * @param message the actual message
747  */
748 static void
749 handle_release_reserve (void *cls,
750                         struct GNUNET_SERVER_Client *client,
751                         const struct GNUNET_MessageHeader *message)
752 {
753   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
754   struct ReservationList *pos;
755   struct ReservationList *prev;
756   struct ReservationList *next;
757   int rid = ntohl(msg->rid);
758   unsigned long long rem;
759
760 #if DEBUG_DATASTORE
761   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
762               "Processing `%s' request\n",
763               "RELEASE_RESERVE");
764 #endif
765   next = reservations;
766   prev = NULL;
767   while (NULL != (pos = next))
768     {
769       next = pos->next;
770       if (rid == pos->rid)
771         {
772           if (prev == NULL)
773             reservations = next;
774           else
775             prev->next = next;
776           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
777           GNUNET_assert (reserved >= rem);
778           reserved -= rem;
779 #if DEBUG_DATASTORE
780           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
781                       "Returning %llu remaining reserved bytes to storage pool\n",
782                       rem);
783 #endif    
784           GNUNET_free (pos);
785           transmit_status (client, GNUNET_OK, NULL);
786           return;
787         }       
788       prev = pos;
789     }
790   GNUNET_break (0);
791   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
792 }
793
794
795 /**
796  * Check that the given message is a valid data message.
797  *
798  * @return NULL if the message is not well-formed, otherwise the message
799  */
800 static const struct DataMessage *
801 check_data (const struct GNUNET_MessageHeader *message)
802 {
803   uint16_t size;
804   uint32_t dsize;
805   const struct DataMessage *dm;
806
807   size = ntohs(message->size);
808   if (size < sizeof(struct DataMessage))
809     { 
810       GNUNET_break (0);
811       return NULL;
812     }
813   dm = (const struct DataMessage *) message;
814   dsize = ntohl(dm->size);
815   if (size != dsize + sizeof(struct DataMessage))
816     {
817       GNUNET_break (0);
818       return NULL;
819     }
820   return dm;
821 }
822
823
824 /**
825  * Handle PUT-message.
826  *
827  * @param cls closure
828  * @param client identification of the client
829  * @param message the actual message
830  */
831 static void
832 handle_put (void *cls,
833             struct GNUNET_SERVER_Client *client,
834             const struct GNUNET_MessageHeader *message)
835 {
836   const struct DataMessage *dm = check_data (message);
837   char *msg;
838   int ret;
839   int rid;
840   struct ReservationList *pos;
841   uint32_t size;
842
843 #if DEBUG_DATASTORE
844   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
845               "Processing `%s' request\n",
846               "PUT");
847 #endif
848   if (ntohl(dm->type) == 0) 
849     {
850       GNUNET_break (0);
851       dm = NULL;
852     }
853   if (dm == NULL)
854     {
855       GNUNET_break (0);
856       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
857       return;
858     }
859   rid = ntohl(dm->rid);
860   size = ntohl(dm->size);
861   if (rid > 0)
862     {
863       pos = reservations;
864       while ( (NULL != pos) &&
865               (rid != pos->rid) )
866         pos = pos->next;
867       GNUNET_break (pos != NULL);
868       if (NULL != pos)
869         {
870           GNUNET_break (pos->entries > 0);
871           GNUNET_break (pos->amount > size);
872           pos->entries--;
873           pos->amount -= size;
874           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
875         }
876     }
877   msg = NULL;
878   ret = plugin->api->put (plugin->api->cls,
879                           &dm->key,
880                           size,
881                           &dm[1],
882                           ntohl(dm->type),
883                           ntohl(dm->priority),
884                           ntohl(dm->anonymity),
885                           GNUNET_TIME_absolute_ntoh(dm->expiration),
886                           &msg);
887   if (GNUNET_OK == ret)
888     {
889       GNUNET_CONTAINER_bloomfilter_add (filter,
890                                         &dm->key);
891 #if DEBUG_DATASTORE
892       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
893                   "Successfully stored %u bytes under key `%s'\n",
894                   size,
895                   GNUNET_h2s (&dm->key));
896 #endif
897     }
898   transmit_status (client, 
899                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
900                    msg);
901   GNUNET_free_non_null (msg);
902   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
903     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
904 }
905
906
907 /**
908  * Handle GET-message.
909  *
910  * @param cls closure
911  * @param client identification of the client
912  * @param message the actual message
913  */
914 static void
915 handle_get (void *cls,
916              struct GNUNET_SERVER_Client *client,
917              const struct GNUNET_MessageHeader *message)
918 {
919   const struct GetMessage *msg;
920   uint16_t size;
921
922 #if DEBUG_DATASTORE
923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
924               "Processing `%s' request\n",
925               "GET");
926 #endif
927   size = ntohs(message->size);
928   if ( (size != sizeof(struct GetMessage)) &&
929        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
930     {
931       GNUNET_break (0);
932       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
933       return;
934     }
935   GNUNET_SERVER_client_keep (client);
936   msg = (const struct GetMessage*) message;
937   if ( (size == sizeof(struct GetMessage)) &&
938        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
939                                                          &msg->key)) )
940     {
941       /* don't bother database... */
942 #if DEBUG_DATASTORE
943       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944                   "Empty result set for `%s' request for `%s'.\n",
945                   "GET",
946                   GNUNET_h2s (&msg->key));
947 #endif  
948       transmit_item (client,
949                      NULL, NULL, 0, NULL, 0, 0, 0, 
950                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
951       return;
952     }
953   plugin->api->get (plugin->api->cls,
954                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
955                     NULL,
956                     ntohl(msg->type),
957                     &transmit_item,
958                     client);    
959 }
960
961
962 /**
963  * Handle UPDATE-message.
964  *
965  * @param cls closure
966  * @param client identification of the client
967  * @param message the actual message
968  */
969 static void
970 handle_update (void *cls,
971                struct GNUNET_SERVER_Client *client,
972                const struct GNUNET_MessageHeader *message)
973 {
974   const struct UpdateMessage *msg;
975   int ret;
976   char *emsg;
977
978 #if DEBUG_DATASTORE
979   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
980               "Processing `%s' request\n",
981               "UPDATE");
982 #endif
983   msg = (const struct UpdateMessage*) message;
984   emsg = NULL;
985   ret = plugin->api->update (plugin->api->cls,
986                              GNUNET_ntohll(msg->uid),
987                              (int32_t) ntohl(msg->priority),
988                              GNUNET_TIME_absolute_ntoh(msg->expiration),
989                              &emsg);
990   transmit_status (client, ret, emsg);
991   GNUNET_free_non_null (emsg);
992 }
993
994
995 /**
996  * Handle GET_RANDOM-message.
997  *
998  * @param cls closure
999  * @param client identification of the client
1000  * @param message the actual message
1001  */
1002 static void
1003 handle_get_random (void *cls,
1004                    struct GNUNET_SERVER_Client *client,
1005                    const struct GNUNET_MessageHeader *message)
1006 {
1007 #if DEBUG_DATASTORE
1008   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1009               "Processing `%s' request\n",
1010               "GET_RANDOM");
1011 #endif
1012   GNUNET_SERVER_client_keep (client);
1013   plugin->api->iter_migration_order (plugin->api->cls,
1014                                      0,
1015                                      &transmit_item,
1016                                      client);  
1017 }
1018
1019
1020 /**
1021  * Context for the 'remove_callback'.
1022  */
1023 struct RemoveContext 
1024 {
1025   /**
1026    * Client for whom we're doing the remvoing.
1027    */
1028   struct GNUNET_SERVER_Client *client;
1029
1030   /**
1031    * GNUNET_YES if we managed to remove something.
1032    */
1033   int found;
1034 };
1035
1036
1037 /**
1038  * Callback function that will cause the item that is passed
1039  * in to be deleted (by returning GNUNET_NO).
1040  */
1041 static int
1042 remove_callback (void *cls,
1043                  void *next_cls,
1044                  const GNUNET_HashCode * key,
1045                  uint32_t size,
1046                  const void *data,
1047                  uint32_t type,
1048                  uint32_t priority,
1049                  uint32_t anonymity,
1050                  struct GNUNET_TIME_Absolute
1051                  expiration, uint64_t uid)
1052 {
1053   struct RemoveContext *rc = cls;
1054
1055   if (key == NULL)
1056     {
1057 #if DEBUG_DATASTORE
1058       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059                   "No further matches for `%s' request.\n",
1060                   "REMOVE");
1061 #endif  
1062       if (GNUNET_YES == rc->found)
1063         transmit_status (rc->client, GNUNET_OK, NULL);       
1064       else
1065         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1066       GNUNET_SERVER_client_drop (rc->client);
1067       GNUNET_free (rc);
1068       return GNUNET_OK; /* last item */
1069     }
1070   rc->found = GNUNET_YES;
1071 #if DEBUG_DATASTORE
1072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073               "Item %llu matches `%s' request.\n",
1074               (unsigned long long) uid,
1075               "REMOVE");
1076 #endif  
1077   GNUNET_CONTAINER_bloomfilter_remove (filter,
1078                                        key);
1079   plugin->api->next_request (next_cls, GNUNET_YES);
1080   return GNUNET_NO;
1081 }
1082
1083
1084 /**
1085  * Handle REMOVE-message.
1086  *
1087  * @param cls closure
1088  * @param client identification of the client
1089  * @param message the actual message
1090  */
1091 static void
1092 handle_remove (void *cls,
1093              struct GNUNET_SERVER_Client *client,
1094              const struct GNUNET_MessageHeader *message)
1095 {
1096   const struct DataMessage *dm = check_data (message);
1097   GNUNET_HashCode vhash;
1098   struct RemoveContext *rc;
1099
1100 #if DEBUG_DATASTORE
1101   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1102               "Processing `%s' request\n",
1103               "REMOVE");
1104 #endif
1105   if (dm == NULL)
1106     {
1107       GNUNET_break (0);
1108       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1109       return;
1110     }
1111   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1112   GNUNET_SERVER_client_keep (client);
1113   rc->client = client;
1114   GNUNET_CRYPTO_hash (&dm[1],
1115                       ntohl(dm->size),
1116                       &vhash);
1117   plugin->api->get (plugin->api->cls,
1118                     &dm->key,
1119                     &vhash,
1120                     ntohl(dm->type),
1121                     &remove_callback,
1122                     rc);
1123 }
1124
1125
1126 /**
1127  * Handle DROP-message.
1128  *
1129  * @param cls closure
1130  * @param client identification of the client
1131  * @param message the actual message
1132  */
1133 static void
1134 handle_drop (void *cls,
1135              struct GNUNET_SERVER_Client *client,
1136              const struct GNUNET_MessageHeader *message)
1137 {
1138 #if DEBUG_DATASTORE
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Processing `%s' request\n",
1141               "DROP");
1142 #endif
1143   plugin->api->drop (plugin->api->cls);
1144   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1145 }
1146
1147
1148 /**
1149  * List of handlers for the messages understood by this
1150  * service.
1151  */
1152 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1153   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1154    sizeof(struct ReserveMessage) }, 
1155   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1156    sizeof(struct ReleaseReserveMessage) }, 
1157   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1158   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1159    sizeof (struct UpdateMessage) }, 
1160   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1161   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1162    sizeof(struct GNUNET_MessageHeader) }, 
1163   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1164   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1165    sizeof(struct GNUNET_MessageHeader) }, 
1166   {NULL, NULL, 0, 0}
1167 };
1168
1169
1170
1171 /**
1172  * Load the datastore plugin.
1173  */
1174 static struct DatastorePlugin *
1175 load_plugin () 
1176 {
1177   struct DatastorePlugin *ret;
1178   char *libname;
1179   char *name;
1180
1181   if (GNUNET_OK !=
1182       GNUNET_CONFIGURATION_get_value_string (cfg,
1183                                              "DATASTORE", "DATABASE", &name))
1184     {
1185       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1186                   _("No `%s' specified for `%s' in configuration!\n"),
1187                   "DATABASE",
1188                   "DATASTORE");
1189       return NULL;
1190     }
1191   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1192   ret->env.cfg = cfg;
1193   ret->env.sched = sched;  
1194   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1195               _("Loading `%s' datastore plugin\n"), name);
1196   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1197   ret->short_name = name;
1198   ret->lib_name = libname;
1199   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1200   if (ret->api == NULL)
1201     {
1202       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1203                   _("Failed to load datastore plugin for `%s'\n"), name);
1204       GNUNET_free (ret->short_name);
1205       GNUNET_free (libname);
1206       GNUNET_free (ret);
1207       return NULL;
1208     }
1209   return ret;
1210 }
1211
1212
1213 /**
1214  * Function called when the service shuts
1215  * down.  Unloads our datastore plugin.
1216  *
1217  * @param plug plugin to unload
1218  */
1219 static void
1220 unload_plugin (struct DatastorePlugin *plug)
1221 {
1222 #if DEBUG_DATASTORE
1223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224               "Datastore service is unloading plugin...\n");
1225 #endif
1226   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1227   GNUNET_free (plug->lib_name);
1228   GNUNET_free (plug->short_name);
1229   GNUNET_free (plug);
1230 }
1231
1232
1233 /**
1234  * Final task run after shutdown.  Unloads plugins and disconnects us from
1235  * statistics.
1236  */
1237 static void
1238 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1239 {
1240   unload_plugin (plugin);
1241   plugin = NULL;
1242   if (filter != NULL)
1243     {
1244       GNUNET_CONTAINER_bloomfilter_free (filter);
1245       filter = NULL;
1246     }
1247   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1248 }
1249
1250
1251 /**
1252  * Last task run during shutdown.  Disconnects us from
1253  * the transport and core.
1254  */
1255 static void
1256 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1257 {
1258   struct TransmitCallbackContext *tcc;
1259
1260   while (NULL != (tcc = tcc_head))
1261     {
1262       GNUNET_CONTAINER_DLL_remove (tcc_head,
1263                                    tcc_tail,
1264                                    tcc);
1265       if (tcc->th != NULL)
1266         GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1267       if (NULL != tcc->tc)
1268         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1269       GNUNET_free (tcc->msg);
1270       GNUNET_free (tcc);
1271     }
1272   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1273     {
1274       GNUNET_SCHEDULER_cancel (sched,
1275                                expired_kill_task);
1276       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1277     }
1278   GNUNET_SCHEDULER_add_continuation (sched,
1279                                      &unload_task,
1280                                      NULL,
1281                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1282 }
1283
1284
1285 /**
1286  * Function that removes all active reservations made
1287  * by the given client and releases the space for other
1288  * requests.
1289  *
1290  * @param cls closure
1291  * @param client identification of the client
1292  */
1293 static void
1294 cleanup_reservations (void *cls,
1295                       struct GNUNET_SERVER_Client
1296                       * client)
1297 {
1298   struct ReservationList *pos;
1299   struct ReservationList *prev;
1300   struct ReservationList *next;
1301
1302   if (client == NULL)
1303     return;
1304   prev = NULL;
1305   pos = reservations;
1306   while (NULL != pos)
1307     {
1308       next = pos->next;
1309       if (pos->client == client)
1310         {
1311           if (prev == NULL)
1312             reservations = next;
1313           else
1314             prev->next = next;
1315           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1316           GNUNET_free (pos);
1317         }
1318       else
1319         {
1320           prev = pos;
1321         }
1322       pos = next;
1323     }
1324 }
1325
1326
1327 /**
1328  * Process datastore requests.
1329  *
1330  * @param cls closure
1331  * @param s scheduler to use
1332  * @param server the initialized server
1333  * @param c configuration to use
1334  */
1335 static void
1336 run (void *cls,
1337      struct GNUNET_SCHEDULER_Handle *s,
1338      struct GNUNET_SERVER_Handle *server,
1339      const struct GNUNET_CONFIGURATION_Handle *c)
1340 {
1341   char *fn;
1342   unsigned int bf_size;
1343
1344   sched = s;
1345   cfg = c;
1346   if (GNUNET_OK !=
1347       GNUNET_CONFIGURATION_get_value_number (cfg,
1348                                              "DATASTORE", "QUOTA", &quota))
1349     {
1350       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1351                   _("No `%s' specified for `%s' in configuration!\n"),
1352                   "QUOTA",
1353                   "DATASTORE");
1354       return;
1355     }
1356   cache_size = quota / 8; /* Or should we make this an option? */
1357   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1358   fn = NULL;
1359   if ( (GNUNET_OK !=
1360         GNUNET_CONFIGURATION_get_value_filename (cfg,
1361                                                  "DATASTORE",
1362                                                  "BLOOMFILTER",
1363                                                  &fn)) ||
1364        (GNUNET_OK !=
1365         GNUNET_DISK_directory_create_for_file (fn)) )
1366     {
1367       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1368                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1369                   fn != NULL ? fn : "");
1370       GNUNET_free_non_null (fn);
1371       fn = NULL;
1372     }
1373   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1374   GNUNET_free_non_null (fn);
1375   if (filter == NULL)
1376     {
1377       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1378                   _("Failed to initialize bloomfilter.\n"));
1379       return;
1380     }
1381   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1382   plugin = load_plugin ();
1383   if (NULL == plugin)
1384     {
1385       GNUNET_CONTAINER_bloomfilter_free (filter);
1386       filter = NULL;
1387       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1388       return;
1389     }
1390   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1391   GNUNET_SERVER_add_handlers (server, handlers);
1392   expired_kill_task
1393     = GNUNET_SCHEDULER_add_with_priority (sched,
1394                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1395                                           &delete_expired, NULL);
1396   GNUNET_SCHEDULER_add_delayed (sched,
1397                                 GNUNET_TIME_UNIT_FOREVER_REL,
1398                                 &cleaning_task, NULL);
1399   
1400 }
1401
1402
1403 /**
1404  * The main function for the datastore service.
1405  *
1406  * @param argc number of arguments from the command line
1407  * @param argv command line arguments
1408  * @return 0 ok, 1 on error
1409  */
1410 int
1411 main (int argc, char *const *argv)
1412 {
1413   int ret;
1414
1415   ret = (GNUNET_OK ==
1416          GNUNET_SERVICE_run (argc,
1417                              argv,
1418                              "datastore",
1419                              GNUNET_SERVICE_OPTION_NONE,
1420                              &run, NULL)) ? 0 : 1;
1421   return ret;
1422 }
1423
1424
1425 /* end of gnunet-service-datastore.c */