fixes
[oweals/gnunet.git] / src / datastore / gnunet-service-datastore.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/gnunet-service-datastore.c
23  * @brief Management for the datastore for files stored on a GNUnet node
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_arm_service.h"
30 #include "gnunet_protocols.h"
31 #include "plugin_datastore.h"
32 #include "datastore.h"
33
34 /**
35  * How many messages do we queue at most per client?
36  */
37 #define MAX_PENDING 1024
38
39 /**
40  * How long are we at most keeping "expired" content
41  * past the expiration date in the database?
42  */
43 #define MAX_EXPIRE_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 15)
44
45
46
47 /**
48  * Our datastore plugin.
49  */
50 struct DatastorePlugin
51 {
52
53   /**
54    * API of the transport as returned by the plugin's
55    * initialization function.
56    */
57   struct GNUNET_DATASTORE_PluginFunctions *api;
58
59   /**
60    * Short name for the plugin (i.e. "sqlite").
61    */
62   char *short_name;
63
64   /**
65    * Name of the library (i.e. "gnunet_plugin_datastore_sqlite").
66    */
67   char *lib_name;
68
69   /**
70    * Environment this transport service is using
71    * for this plugin.
72    */
73   struct GNUNET_DATASTORE_PluginEnvironment env;
74
75 };
76
77
78 /**
79  * Linked list of active reservations.
80  */
81 struct ReservationList 
82 {
83
84   /**
85    * This is a linked list.
86    */
87   struct ReservationList *next;
88
89   /**
90    * Client that made the reservation.
91    */
92   struct GNUNET_SERVER_Client *client;
93
94   /**
95    * Number of bytes (still) reserved.
96    */
97   uint64_t amount;
98
99   /**
100    * Number of items (still) reserved.
101    */
102   uint64_t entries;
103
104   /**
105    * Reservation identifier.
106    */
107   int32_t rid;
108
109 };
110
111
112 /**
113  * Our datastore plugin (NULL if not available).
114  */
115 static struct DatastorePlugin *plugin;
116
117 /**
118  * Linked list of space reservations made by clients.
119  */
120 static struct ReservationList *reservations;
121
122 /**
123  * Bloomfilter to quickly tell if we don't have the content.
124  */
125 static struct GNUNET_CONTAINER_BloomFilter *filter;
126
127 /**
128  * Static counter to produce reservation identifiers.
129  */
130 static int reservation_gen;
131
132 /**
133  * How much space are we allowed to use?
134  */
135 static unsigned long long quota;
136
137 /**
138  * How much space are we using for the cache?  (space available for
139  * insertions that will be instantly reclaimed by discarding less
140  * important content --- or possibly whatever we just inserted into
141  * the "cache").
142  */
143 static unsigned long long cache_size;
144
145 /**
146  * How much space have we currently reserved?
147  */
148 static unsigned long long reserved;
149
150 /**
151  * Identity of the task that is used to delete
152  * expired content.
153  */
154 static GNUNET_SCHEDULER_TaskIdentifier expired_kill_task;
155
156 /**
157  * Our configuration.
158  */
159 const struct GNUNET_CONFIGURATION_Handle *cfg;
160
161 /**
162  * Our scheduler.
163  */
164 struct GNUNET_SCHEDULER_Handle *sched; 
165
166 /**
167  * Function called once the transmit operation has
168  * either failed or succeeded.
169  *
170  * @param cls closure
171  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
172  */
173 typedef void (*TransmitContinuation)(void *cls,
174                                      int status);
175
176
177 /**
178  * Context for transmitting replies to clients.
179  */
180 struct TransmitCallbackContext 
181 {
182   
183   /**
184    * We keep these in a doubly-linked list (for cleanup).
185    */
186   struct TransmitCallbackContext *next;
187   
188   /**
189    * We keep these in a doubly-linked list (for cleanup).
190    */
191   struct TransmitCallbackContext *prev;
192   
193   /**
194    * The message that we're asked to transmit.
195    */
196   struct GNUNET_MessageHeader *msg;
197   
198   /**
199    * Handle for the transmission request.
200    */
201   struct GNUNET_CONNECTION_TransmitHandle *th;
202
203   /**
204    * Client that we are transmitting to.
205    */
206   struct GNUNET_SERVER_Client *client;
207
208   /**
209    * Function to call once msg has been transmitted
210    * (or at least added to the buffer).
211    */
212   TransmitContinuation tc;
213
214   /**
215    * Closure for tc.
216    */
217   void *tc_cls;
218
219   /**
220    * GNUNET_YES if we are supposed to signal the server
221    * completion of the client's request.
222    */
223   int end;
224 };
225
226   
227 /**
228  * Head of the doubly-linked list (for cleanup).
229  */
230 static struct TransmitCallbackContext *tcc_head;
231
232 /**
233  * Tail of the doubly-linked list (for cleanup).
234  */
235 static struct TransmitCallbackContext *tcc_tail;
236
237 /**
238  * Have we already clean ed up the TCCs and are hence no longer
239  * willing (or able) to transmit anything to anyone?
240  */
241 static int cleaning_done;
242
243 /**
244  * Task that is used to remove expired entries from
245  * the datastore.  This task will schedule itself
246  * again automatically to always delete all expired
247  * content quickly.
248  *
249  * @param cls not used
250  * @param tc task context
251  */ 
252 static void
253 delete_expired (void *cls,
254                 const struct GNUNET_SCHEDULER_TaskContext *tc);
255
256
257 /**
258  * Iterate over the expired items stored in the datastore.
259  * Delete all expired items; once we have processed all
260  * expired items, re-schedule the "delete_expired" task.
261  *
262  * @param cls not used
263  * @param next_cls closure to pass to the "next" function.
264  * @param key key for the content
265  * @param size number of bytes in data
266  * @param data content stored
267  * @param type type of the content
268  * @param priority priority of the content
269  * @param anonymity anonymity-level for the content
270  * @param expiration expiration time for the content
271  * @param uid unique identifier for the datum;
272  *        maybe 0 if no unique identifier is available
273  *
274  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
275  *         (continue on call to "next", of course),
276  *         GNUNET_NO to delete the item and continue (if supported)
277  */
278 static int 
279 expired_processor (void *cls,
280                    void *next_cls,
281                    const GNUNET_HashCode * key,
282                    uint32_t size,
283                    const void *data,
284                    uint32_t type,
285                    uint32_t priority,
286                    uint32_t anonymity,
287                    struct GNUNET_TIME_Absolute
288                    expiration, 
289                    uint64_t uid)
290 {
291   struct GNUNET_TIME_Absolute now;
292
293   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
294   if (key == NULL) 
295     {
296       expired_kill_task 
297         = GNUNET_SCHEDULER_add_delayed (sched,
298                                         MAX_EXPIRE_DELAY,
299                                         &delete_expired,
300                                         NULL);
301       return GNUNET_SYSERR;
302     }
303   now = GNUNET_TIME_absolute_get ();
304   if (expiration.value > now.value)
305     {
306       /* finished processing */
307       plugin->api->next_request (next_cls, GNUNET_YES);
308       return GNUNET_SYSERR;
309     }
310   plugin->api->next_request (next_cls, GNUNET_NO);
311 #if DEBUG_DATASTORE
312   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
313               "Deleting content that expired %llu ms ago\n",
314               (unsigned long long) (now.value - expiration.value));
315 #endif
316   GNUNET_CONTAINER_bloomfilter_remove (filter,
317                                        key);
318   return GNUNET_NO; /* delete */
319 }
320
321
322 /**
323  * Task that is used to remove expired entries from
324  * the datastore.  This task will schedule itself
325  * again automatically to always delete all expired
326  * content quickly.
327  *
328  * @param cls not used
329  * @param tc task context
330  */ 
331 static void
332 delete_expired (void *cls,
333                 const struct GNUNET_SCHEDULER_TaskContext *tc)
334 {
335   plugin->api->iter_ascending_expiration (plugin->api->cls, 
336                                           0,
337                                           &expired_processor,
338                                           NULL);
339 }
340
341
342 /**
343  * An iterator over a set of items stored in the datastore.
344  *
345  * @param cls closure
346  * @param next_cls closure to pass to the "next" function.
347  * @param key key for the content
348  * @param size number of bytes in data
349  * @param data content stored
350  * @param type type of the content
351  * @param priority priority of the content
352  * @param anonymity anonymity-level for the content
353  * @param expiration expiration time for the content
354  * @param uid unique identifier for the datum;
355  *        maybe 0 if no unique identifier is available
356  *
357  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
358  *         (continue on call to "next", of course),
359  *         GNUNET_NO to delete the item and continue (if supported)
360  */
361 static int 
362 manage (void *cls,
363         void *next_cls,
364         const GNUNET_HashCode * key,
365         uint32_t size,
366         const void *data,
367         uint32_t type,
368         uint32_t priority,
369         uint32_t anonymity,
370         struct GNUNET_TIME_Absolute
371         expiration, 
372         uint64_t uid)
373 {
374   unsigned long long *need = cls;
375
376   if (NULL == key)
377     {
378       GNUNET_free (need);
379       return GNUNET_SYSERR;
380     }
381   if (size + GNUNET_DATASTORE_ENTRY_OVERHEAD > *need)
382     *need = 0;
383   else
384     *need -= size + GNUNET_DATASTORE_ENTRY_OVERHEAD;
385   plugin->api->next_request (next_cls, 
386                              (0 == *need) ? GNUNET_YES : GNUNET_NO);
387 #if DEBUG_DATASTORE
388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
389               "Deleting %llu bytes of low-priority content (still trying to free another %llu bytes)\n",
390               size + GNUNET_DATASTORE_ENTRY_OVERHEAD,
391               *need);
392 #endif
393   GNUNET_CONTAINER_bloomfilter_remove (filter,
394                                        key);
395   return GNUNET_NO;
396 }
397
398
399 /**
400  * Manage available disk space by running tasks
401  * that will discard content if necessary.  This
402  * function will be run whenever a request for
403  * "need" bytes of storage could only be satisfied
404  * by eating into the "cache" (and we want our cache
405  * space back).
406  *
407  * @param need number of bytes of content that were
408  *        placed into the "cache" (and hence the
409  *        number of bytes that should be removed).
410  */
411 static void
412 manage_space (unsigned long long need)
413 {
414   unsigned long long *n;
415
416 #if DEBUG_DATASTORE
417   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
418               "Asked to free up %llu bytes of cache space\n",
419               need);
420 #endif
421   n = GNUNET_malloc (sizeof(unsigned long long));
422   *n = need;
423   plugin->api->iter_low_priority (plugin->api->cls,
424                                   0,
425                                   &manage,
426                                   n);
427 }
428
429
430 /**
431  * Function called to notify a client about the socket
432  * begin ready to queue more data.  "buf" will be
433  * NULL and "size" zero if the socket was closed for
434  * writing in the meantime.
435  *
436  * @param cls closure
437  * @param size number of bytes available in buf
438  * @param buf where the callee should write the message
439  * @return number of bytes written to buf
440  */
441 static size_t
442 transmit_callback (void *cls,
443                    size_t size, void *buf)
444 {
445   struct TransmitCallbackContext *tcc = cls;
446   size_t msize;
447   
448   tcc->th = NULL;
449   GNUNET_CONTAINER_DLL_remove (tcc_head,
450                                tcc_tail,
451                                tcc);
452   msize = ntohs(tcc->msg->size);
453   if (size == 0)
454     {
455 #if DEBUG_DATASTORE
456       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
457                   "Transmission failed.\n");
458 #endif
459       if (tcc->tc != NULL)
460         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
461       if (GNUNET_YES == tcc->end)
462         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
463       GNUNET_SERVER_client_drop (tcc->client);
464       GNUNET_free (tcc->msg);
465       GNUNET_free (tcc);
466       return 0;
467     }
468   GNUNET_assert (size >= msize);
469   memcpy (buf, tcc->msg, msize);
470   if (tcc->tc != NULL)
471     tcc->tc (tcc->tc_cls, GNUNET_OK);
472   if (GNUNET_YES == tcc->end)
473     {
474       GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
475     }
476   else
477     {
478 #if DEBUG_DATASTORE
479       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
480                   "Response transmitted, more pending!\n");
481 #endif
482     }
483   GNUNET_SERVER_client_drop (tcc->client);
484   GNUNET_free (tcc->msg);
485   GNUNET_free (tcc);
486   return msize;
487 }
488
489
490 /**
491  * Transmit the given message to the client.
492  *
493  * @param client target of the message
494  * @param msg message to transmit, will be freed!
495  * @param tc function to call afterwards
496  * @param tc_cls closure for tc
497  * @param end is this the last response (and we should
498  *        signal the server completion accodingly after
499  *        transmitting this message)?
500  */
501 static void
502 transmit (struct GNUNET_SERVER_Client *client,
503           struct GNUNET_MessageHeader *msg,
504           TransmitContinuation tc,
505           void *tc_cls,
506           int end)
507 {
508   struct TransmitCallbackContext *tcc;
509
510   if (GNUNET_YES == cleaning_done)
511     {
512       if (NULL != tc)
513         tc (tc_cls, GNUNET_SYSERR);
514       return;
515     }
516   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
517   tcc->msg = msg;
518   tcc->client = client;
519   tcc->tc = tc;
520   tcc->tc_cls = tc_cls;
521   tcc->end = end;
522   if (NULL ==
523       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
524                                                       ntohs(msg->size),
525                                                       GNUNET_TIME_UNIT_FOREVER_REL,
526                                                       &transmit_callback,
527                                                       tcc)))
528     {
529       GNUNET_break (0);
530       if (GNUNET_YES == end)
531         {
532 #if DEBUG_DATASTORE
533           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
534                       "Disconnecting client.\n");
535 #endif    
536           GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
537         }
538       if (NULL != tc)
539         tc (tc_cls, GNUNET_SYSERR);
540       GNUNET_free (msg);
541       GNUNET_free (tcc);
542       return;
543     }
544   GNUNET_SERVER_client_keep (client);
545   GNUNET_CONTAINER_DLL_insert (tcc_head,
546                                tcc_tail,
547                                tcc);
548 }
549
550
551 /**
552  * Transmit a status code to the client.
553  *
554  * @param client receiver of the response
555  * @param code status code
556  * @param msg optional error message (can be NULL)
557  */
558 static void
559 transmit_status (struct GNUNET_SERVER_Client *client,
560                  int code,
561                  const char *msg)
562 {
563   struct StatusMessage *sm;
564   size_t slen;
565
566 #if DEBUG_DATASTORE
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
568               "Transmitting `%s' message with value %d and message `%s'\n",
569               "STATUS",
570               code,
571               msg != NULL ? msg : "(none)");
572 #endif
573   slen = (msg == NULL) ? 0 : strlen(msg) + 1;  
574   sm = GNUNET_malloc (sizeof(struct StatusMessage) + slen);
575   sm->header.size = htons(sizeof(struct StatusMessage) + slen);
576   sm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
577   sm->status = htonl(code);
578   if (slen > 0)
579     memcpy (&sm[1], msg, slen);  
580   transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
581 }
582
583
584 /**
585  * Function called once the transmit operation has
586  * either failed or succeeded.
587  *
588  * @param next_cls closure for calling "next_request" callback
589  * @param status GNUNET_OK on success, GNUNET_SYSERR on error
590  */
591 static void 
592 get_next(void *next_cls,
593          int status)
594 {
595   if (status != GNUNET_OK)
596     {
597       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
598                   _("Failed to transmit an item to the client; aborting iteration.\n"));
599       if (plugin != NULL)
600         plugin->api->next_request (next_cls, GNUNET_YES);
601       return;
602     }
603   plugin->api->next_request (next_cls, GNUNET_NO);
604 }
605
606
607 /**
608  * Function that will transmit the given datastore entry
609  * to the client.
610  *
611  * @param cls closure, pointer to the client (of type GNUNET_SERVER_Client).
612  * @param next_cls closure to use to ask for the next item
613  * @param key key for the content
614  * @param size number of bytes in data
615  * @param data content stored
616  * @param type type of the content
617  * @param priority priority of the content
618  * @param anonymity anonymity-level for the content
619  * @param expiration expiration time for the content
620  * @param uid unique identifier for the datum;
621  *        maybe 0 if no unique identifier is available
622  *
623  * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
624  *         GNUNET_NO to delete the item and continue (if supported)
625  */
626 static int
627 transmit_item (void *cls,
628                void *next_cls,
629                const GNUNET_HashCode * key,
630                uint32_t size,
631                const void *data,
632                uint32_t type,
633                uint32_t priority,
634                uint32_t anonymity,
635                struct GNUNET_TIME_Absolute
636                expiration, uint64_t uid)
637 {
638   struct GNUNET_SERVER_Client *client = cls;
639   struct GNUNET_MessageHeader *end;
640   struct DataMessage *dm;
641
642   if (key == NULL)
643     {
644       /* transmit 'DATA_END' */
645 #if DEBUG_DATASTORE
646       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
647                   "Transmitting `%s' message\n",
648                   "DATA_END");
649 #endif
650       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
651       end->size = htons(sizeof(struct GNUNET_MessageHeader));
652       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
653       transmit (client, end, NULL, NULL, GNUNET_YES);
654       GNUNET_SERVER_client_drop (client);
655       return GNUNET_OK;
656     }
657   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
658   dm->header.size = htons(sizeof(struct DataMessage) + size);
659   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
660   dm->rid = htonl(0);
661   dm->size = htonl(size);
662   dm->type = htonl(type);
663   dm->priority = htonl(priority);
664   dm->anonymity = htonl(anonymity);
665   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
666   dm->uid = GNUNET_htonll(uid);
667   dm->key = *key;
668   memcpy (&dm[1], data, size);
669 #if DEBUG_DATASTORE
670   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671               "Transmitting `%s' message\n",
672               "DATA");
673 #endif
674   transmit (client, &dm->header, &get_next, next_cls, GNUNET_NO);
675   return GNUNET_OK;
676 }
677
678
679 /**
680  * Handle RESERVE-message.
681  *
682  * @param cls closure
683  * @param client identification of the client
684  * @param message the actual message
685  */
686 static void
687 handle_reserve (void *cls,
688                 struct GNUNET_SERVER_Client *client,
689                 const struct GNUNET_MessageHeader *message)
690 {
691   const struct ReserveMessage *msg = (const struct ReserveMessage*) message;
692   struct ReservationList *e;
693   unsigned long long used;
694   unsigned long long req;
695   uint64_t amount;
696   uint32_t entries;
697
698 #if DEBUG_DATASTORE
699   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700               "Processing `%s' request\n",
701               "RESERVE");
702 #endif
703   amount = GNUNET_ntohll(msg->amount);
704   entries = ntohl(msg->entries);
705   used = plugin->api->get_size (plugin->api->cls) + reserved;
706   req = amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * entries;
707   if (used + req > quota)
708     {
709       if (quota < used)
710         used = quota; /* cheat a bit for error message (to avoid negative numbers) */
711       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
712                   _("Insufficient space (%llu bytes are available) to satisfy `%s' request for %llu bytes\n"),
713                   quota - used,
714                   "RESERVE",
715                   req);
716       if (cache_size < req)
717         {
718           /* TODO: document this in the FAQ; essentially, if this
719              message happens, the insertion request could be blocked
720              by less-important content from migration because it is
721              larger than 1/8th of the overall available space, and
722              we only reserve 1/8th for "fresh" insertions */
723           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
724                       _("The requested amount (%llu bytes) is larger than the cache size (%llu bytes)\n"),
725                       req,
726                       cache_size);
727           transmit_status (client, 0, 
728                            gettext_noop ("Insufficient space to satisfy request and "
729                                          "requested amount is larger than cache size"));
730         }
731       else
732         {
733           transmit_status (client, 0, 
734                            gettext_noop ("Insufficient space to satisfy request"));
735         }
736       return;      
737     }
738   reserved += req;
739   e = GNUNET_malloc (sizeof(struct ReservationList));
740   e->next = reservations;
741   reservations = e;
742   e->client = client;
743   e->amount = amount;
744   e->entries = entries;
745   e->rid = ++reservation_gen;
746   if (reservation_gen < 0)
747     reservation_gen = 0; /* wrap around */
748   transmit_status (client, e->rid, NULL);
749 }
750
751
752 /**
753  * Handle RELEASE_RESERVE-message.
754  *
755  * @param cls closure
756  * @param client identification of the client
757  * @param message the actual message
758  */
759 static void
760 handle_release_reserve (void *cls,
761                         struct GNUNET_SERVER_Client *client,
762                         const struct GNUNET_MessageHeader *message)
763 {
764   const struct ReleaseReserveMessage *msg = (const struct ReleaseReserveMessage*) message;
765   struct ReservationList *pos;
766   struct ReservationList *prev;
767   struct ReservationList *next;
768   int rid = ntohl(msg->rid);
769   unsigned long long rem;
770
771 #if DEBUG_DATASTORE
772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
773               "Processing `%s' request\n",
774               "RELEASE_RESERVE");
775 #endif
776   next = reservations;
777   prev = NULL;
778   while (NULL != (pos = next))
779     {
780       next = pos->next;
781       if (rid == pos->rid)
782         {
783           if (prev == NULL)
784             reservations = next;
785           else
786             prev->next = next;
787           rem = pos->amount + ((unsigned long long) GNUNET_DATASTORE_ENTRY_OVERHEAD) * pos->entries;
788           GNUNET_assert (reserved >= rem);
789           reserved -= rem;
790 #if DEBUG_DATASTORE
791           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
792                       "Returning %llu remaining reserved bytes to storage pool\n",
793                       rem);
794 #endif    
795           GNUNET_free (pos);
796           transmit_status (client, GNUNET_OK, NULL);
797           return;
798         }       
799       prev = pos;
800     }
801   GNUNET_break (0);
802   transmit_status (client, GNUNET_SYSERR, gettext_noop ("Could not find matching reservation"));
803 }
804
805
806 /**
807  * Check that the given message is a valid data message.
808  *
809  * @return NULL if the message is not well-formed, otherwise the message
810  */
811 static const struct DataMessage *
812 check_data (const struct GNUNET_MessageHeader *message)
813 {
814   uint16_t size;
815   uint32_t dsize;
816   const struct DataMessage *dm;
817
818   size = ntohs(message->size);
819   if (size < sizeof(struct DataMessage))
820     { 
821       GNUNET_break (0);
822       return NULL;
823     }
824   dm = (const struct DataMessage *) message;
825   dsize = ntohl(dm->size);
826   if (size != dsize + sizeof(struct DataMessage))
827     {
828       GNUNET_break (0);
829       return NULL;
830     }
831   return dm;
832 }
833
834
835 /**
836  * Handle PUT-message.
837  *
838  * @param cls closure
839  * @param client identification of the client
840  * @param message the actual message
841  */
842 static void
843 handle_put (void *cls,
844             struct GNUNET_SERVER_Client *client,
845             const struct GNUNET_MessageHeader *message)
846 {
847   const struct DataMessage *dm = check_data (message);
848   char *msg;
849   int ret;
850   int rid;
851   struct ReservationList *pos;
852   uint32_t size;
853
854 #if DEBUG_DATASTORE
855   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
856               "Processing `%s' request\n",
857               "PUT");
858 #endif
859   if (ntohl(dm->type) == 0) 
860     {
861       GNUNET_break (0);
862       dm = NULL;
863     }
864   if (dm == NULL)
865     {
866       GNUNET_break (0);
867       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
868       return;
869     }
870   rid = ntohl(dm->rid);
871   size = ntohl(dm->size);
872   if (rid > 0)
873     {
874       pos = reservations;
875       while ( (NULL != pos) &&
876               (rid != pos->rid) )
877         pos = pos->next;
878       GNUNET_break (pos != NULL);
879       if (NULL != pos)
880         {
881           GNUNET_break (pos->entries > 0);
882           GNUNET_break (pos->amount > size);
883           pos->entries--;
884           pos->amount -= size;
885           reserved -= (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
886         }
887     }
888   msg = NULL;
889   ret = plugin->api->put (plugin->api->cls,
890                           &dm->key,
891                           size,
892                           &dm[1],
893                           ntohl(dm->type),
894                           ntohl(dm->priority),
895                           ntohl(dm->anonymity),
896                           GNUNET_TIME_absolute_ntoh(dm->expiration),
897                           &msg);
898   if (GNUNET_OK == ret)
899     {
900       GNUNET_CONTAINER_bloomfilter_add (filter,
901                                         &dm->key);
902 #if DEBUG_DATASTORE
903       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
904                   "Successfully stored %u bytes under key `%s'\n",
905                   size,
906                   GNUNET_h2s (&dm->key));
907 #endif
908     }
909   transmit_status (client, 
910                    (GNUNET_SYSERR == ret) ? GNUNET_SYSERR : GNUNET_OK, 
911                    msg);
912   GNUNET_free_non_null (msg);
913   if (quota - reserved - cache_size < plugin->api->get_size (plugin->api->cls))
914     manage_space (size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
915 }
916
917
918 /**
919  * Handle GET-message.
920  *
921  * @param cls closure
922  * @param client identification of the client
923  * @param message the actual message
924  */
925 static void
926 handle_get (void *cls,
927              struct GNUNET_SERVER_Client *client,
928              const struct GNUNET_MessageHeader *message)
929 {
930   const struct GetMessage *msg;
931   uint16_t size;
932
933 #if DEBUG_DATASTORE
934   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
935               "Processing `%s' request\n",
936               "GET");
937 #endif
938   size = ntohs(message->size);
939   if ( (size != sizeof(struct GetMessage)) &&
940        (size != sizeof(struct GetMessage) - sizeof(GNUNET_HashCode)) )
941     {
942       GNUNET_break (0);
943       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
944       return;
945     }
946   GNUNET_SERVER_client_keep (client);
947   msg = (const struct GetMessage*) message;
948   if ( (size == sizeof(struct GetMessage)) &&
949        (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (filter,
950                                                          &msg->key)) )
951     {
952       /* don't bother database... */
953 #if DEBUG_DATASTORE
954       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
955                   "Empty result set for `%s' request for `%s'.\n",
956                   "GET",
957                   GNUNET_h2s (&msg->key));
958 #endif  
959       transmit_item (client,
960                      NULL, NULL, 0, NULL, 0, 0, 0, 
961                      GNUNET_TIME_UNIT_ZERO_ABS, 0);
962       return;
963     }
964   plugin->api->get (plugin->api->cls,
965                     ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
966                     NULL,
967                     ntohl(msg->type),
968                     &transmit_item,
969                     client);    
970 }
971
972
973 /**
974  * Handle UPDATE-message.
975  *
976  * @param cls closure
977  * @param client identification of the client
978  * @param message the actual message
979  */
980 static void
981 handle_update (void *cls,
982                struct GNUNET_SERVER_Client *client,
983                const struct GNUNET_MessageHeader *message)
984 {
985   const struct UpdateMessage *msg;
986   int ret;
987   char *emsg;
988
989 #if DEBUG_DATASTORE
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "Processing `%s' request\n",
992               "UPDATE");
993 #endif
994   msg = (const struct UpdateMessage*) message;
995   emsg = NULL;
996   ret = plugin->api->update (plugin->api->cls,
997                              GNUNET_ntohll(msg->uid),
998                              (int32_t) ntohl(msg->priority),
999                              GNUNET_TIME_absolute_ntoh(msg->expiration),
1000                              &emsg);
1001   transmit_status (client, ret, emsg);
1002   GNUNET_free_non_null (emsg);
1003 }
1004
1005
1006 /**
1007  * Handle GET_RANDOM-message.
1008  *
1009  * @param cls closure
1010  * @param client identification of the client
1011  * @param message the actual message
1012  */
1013 static void
1014 handle_get_random (void *cls,
1015                    struct GNUNET_SERVER_Client *client,
1016                    const struct GNUNET_MessageHeader *message)
1017 {
1018 #if DEBUG_DATASTORE
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "Processing `%s' request\n",
1021               "GET_RANDOM");
1022 #endif
1023   GNUNET_SERVER_client_keep (client);
1024   plugin->api->iter_migration_order (plugin->api->cls,
1025                                      0,
1026                                      &transmit_item,
1027                                      client);  
1028 }
1029
1030
1031 /**
1032  * Context for the 'remove_callback'.
1033  */
1034 struct RemoveContext 
1035 {
1036   /**
1037    * Client for whom we're doing the remvoing.
1038    */
1039   struct GNUNET_SERVER_Client *client;
1040
1041   /**
1042    * GNUNET_YES if we managed to remove something.
1043    */
1044   int found;
1045 };
1046
1047
1048 /**
1049  * Callback function that will cause the item that is passed
1050  * in to be deleted (by returning GNUNET_NO).
1051  */
1052 static int
1053 remove_callback (void *cls,
1054                  void *next_cls,
1055                  const GNUNET_HashCode * key,
1056                  uint32_t size,
1057                  const void *data,
1058                  uint32_t type,
1059                  uint32_t priority,
1060                  uint32_t anonymity,
1061                  struct GNUNET_TIME_Absolute
1062                  expiration, uint64_t uid)
1063 {
1064   struct RemoveContext *rc = cls;
1065
1066   if (key == NULL)
1067     {
1068 #if DEBUG_DATASTORE
1069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                   "No further matches for `%s' request.\n",
1071                   "REMOVE");
1072 #endif  
1073       if (GNUNET_YES == rc->found)
1074         transmit_status (rc->client, GNUNET_OK, NULL);       
1075       else
1076         transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
1077       GNUNET_SERVER_client_drop (rc->client);
1078       GNUNET_free (rc);
1079       return GNUNET_OK; /* last item */
1080     }
1081   rc->found = GNUNET_YES;
1082 #if DEBUG_DATASTORE
1083   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1084               "Item %llu matches `%s' request.\n",
1085               (unsigned long long) uid,
1086               "REMOVE");
1087 #endif  
1088   GNUNET_CONTAINER_bloomfilter_remove (filter,
1089                                        key);
1090   plugin->api->next_request (next_cls, GNUNET_YES);
1091   return GNUNET_NO;
1092 }
1093
1094
1095 /**
1096  * Handle REMOVE-message.
1097  *
1098  * @param cls closure
1099  * @param client identification of the client
1100  * @param message the actual message
1101  */
1102 static void
1103 handle_remove (void *cls,
1104              struct GNUNET_SERVER_Client *client,
1105              const struct GNUNET_MessageHeader *message)
1106 {
1107   const struct DataMessage *dm = check_data (message);
1108   GNUNET_HashCode vhash;
1109   struct RemoveContext *rc;
1110
1111 #if DEBUG_DATASTORE
1112   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113               "Processing `%s' request\n",
1114               "REMOVE");
1115 #endif
1116   if (dm == NULL)
1117     {
1118       GNUNET_break (0);
1119       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1120       return;
1121     }
1122   rc = GNUNET_malloc (sizeof(struct RemoveContext));
1123   GNUNET_SERVER_client_keep (client);
1124   rc->client = client;
1125   GNUNET_CRYPTO_hash (&dm[1],
1126                       ntohl(dm->size),
1127                       &vhash);
1128   plugin->api->get (plugin->api->cls,
1129                     &dm->key,
1130                     &vhash,
1131                     ntohl(dm->type),
1132                     &remove_callback,
1133                     rc);
1134 }
1135
1136
1137 /**
1138  * Handle DROP-message.
1139  *
1140  * @param cls closure
1141  * @param client identification of the client
1142  * @param message the actual message
1143  */
1144 static void
1145 handle_drop (void *cls,
1146              struct GNUNET_SERVER_Client *client,
1147              const struct GNUNET_MessageHeader *message)
1148 {
1149 #if DEBUG_DATASTORE
1150   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1151               "Processing `%s' request\n",
1152               "DROP");
1153 #endif
1154   plugin->api->drop (plugin->api->cls);
1155   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1156 }
1157
1158
1159 /**
1160  * List of handlers for the messages understood by this
1161  * service.
1162  */
1163 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1164   {&handle_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE, 
1165    sizeof(struct ReserveMessage) }, 
1166   {&handle_release_reserve, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE, 
1167    sizeof(struct ReleaseReserveMessage) }, 
1168   {&handle_put, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_PUT, 0 }, 
1169   {&handle_update, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE, 
1170    sizeof (struct UpdateMessage) }, 
1171   {&handle_get, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET, 0 }, 
1172   {&handle_get_random, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM, 
1173    sizeof(struct GNUNET_MessageHeader) }, 
1174   {&handle_remove, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE, 0 }, 
1175   {&handle_drop, NULL, GNUNET_MESSAGE_TYPE_DATASTORE_DROP, 
1176    sizeof(struct GNUNET_MessageHeader) }, 
1177   {NULL, NULL, 0, 0}
1178 };
1179
1180
1181
1182 /**
1183  * Load the datastore plugin.
1184  */
1185 static struct DatastorePlugin *
1186 load_plugin () 
1187 {
1188   struct DatastorePlugin *ret;
1189   char *libname;
1190   char *name;
1191
1192   if (GNUNET_OK !=
1193       GNUNET_CONFIGURATION_get_value_string (cfg,
1194                                              "DATASTORE", "DATABASE", &name))
1195     {
1196       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1197                   _("No `%s' specified for `%s' in configuration!\n"),
1198                   "DATABASE",
1199                   "DATASTORE");
1200       return NULL;
1201     }
1202   ret = GNUNET_malloc (sizeof(struct DatastorePlugin));
1203   ret->env.cfg = cfg;
1204   ret->env.sched = sched;  
1205   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1206               _("Loading `%s' datastore plugin\n"), name);
1207   GNUNET_asprintf (&libname, "libgnunet_plugin_datastore_%s", name);
1208   ret->short_name = name;
1209   ret->lib_name = libname;
1210   ret->api = GNUNET_PLUGIN_load (libname, &ret->env);
1211   if (ret->api == NULL)
1212     {
1213       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1214                   _("Failed to load datastore plugin for `%s'\n"), name);
1215       GNUNET_free (ret->short_name);
1216       GNUNET_free (libname);
1217       GNUNET_free (ret);
1218       return NULL;
1219     }
1220   return ret;
1221 }
1222
1223
1224 /**
1225  * Function called when the service shuts
1226  * down.  Unloads our datastore plugin.
1227  *
1228  * @param plug plugin to unload
1229  */
1230 static void
1231 unload_plugin (struct DatastorePlugin *plug)
1232 {
1233 #if DEBUG_DATASTORE
1234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1235               "Datastore service is unloading plugin...\n");
1236 #endif
1237   GNUNET_break (NULL == GNUNET_PLUGIN_unload (plug->lib_name, plug->api));
1238   GNUNET_free (plug->lib_name);
1239   GNUNET_free (plug->short_name);
1240   GNUNET_free (plug);
1241 }
1242
1243
1244 /**
1245  * Final task run after shutdown.  Unloads plugins and disconnects us from
1246  * statistics.
1247  */
1248 static void
1249 unload_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1250 {
1251   unload_plugin (plugin);
1252   plugin = NULL;
1253   if (filter != NULL)
1254     {
1255       GNUNET_CONTAINER_bloomfilter_free (filter);
1256       filter = NULL;
1257     }
1258   GNUNET_ARM_stop_services (cfg, tc->sched, "statistics", NULL);
1259 }
1260
1261
1262 /**
1263  * Last task run during shutdown.  Disconnects us from
1264  * the transport and core.
1265  */
1266 static void
1267 cleaning_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1268 {
1269   struct TransmitCallbackContext *tcc;
1270
1271   cleaning_done = GNUNET_YES;
1272   while (NULL != (tcc = tcc_head))
1273     {
1274       GNUNET_CONTAINER_DLL_remove (tcc_head,
1275                                    tcc_tail,
1276                                    tcc);
1277       if (tcc->th != NULL)
1278         {
1279           GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
1280           GNUNET_SERVER_client_drop (tcc->client);
1281         }
1282    if (NULL != tcc->tc)
1283         tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
1284       GNUNET_free (tcc->msg);
1285       GNUNET_free (tcc);
1286     }
1287   if (expired_kill_task != GNUNET_SCHEDULER_NO_TASK)
1288     {
1289       GNUNET_SCHEDULER_cancel (sched,
1290                                expired_kill_task);
1291       expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
1292     }
1293   GNUNET_SCHEDULER_add_continuation (sched,
1294                                      &unload_task,
1295                                      NULL,
1296                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1297 }
1298
1299
1300 /**
1301  * Function that removes all active reservations made
1302  * by the given client and releases the space for other
1303  * requests.
1304  *
1305  * @param cls closure
1306  * @param client identification of the client
1307  */
1308 static void
1309 cleanup_reservations (void *cls,
1310                       struct GNUNET_SERVER_Client
1311                       * client)
1312 {
1313   struct ReservationList *pos;
1314   struct ReservationList *prev;
1315   struct ReservationList *next;
1316
1317   if (client == NULL)
1318     return;
1319   prev = NULL;
1320   pos = reservations;
1321   while (NULL != pos)
1322     {
1323       next = pos->next;
1324       if (pos->client == client)
1325         {
1326           if (prev == NULL)
1327             reservations = next;
1328           else
1329             prev->next = next;
1330           reserved -= pos->amount + pos->entries * GNUNET_DATASTORE_ENTRY_OVERHEAD;
1331           GNUNET_free (pos);
1332         }
1333       else
1334         {
1335           prev = pos;
1336         }
1337       pos = next;
1338     }
1339 }
1340
1341
1342 /**
1343  * Process datastore requests.
1344  *
1345  * @param cls closure
1346  * @param s scheduler to use
1347  * @param server the initialized server
1348  * @param c configuration to use
1349  */
1350 static void
1351 run (void *cls,
1352      struct GNUNET_SCHEDULER_Handle *s,
1353      struct GNUNET_SERVER_Handle *server,
1354      const struct GNUNET_CONFIGURATION_Handle *c)
1355 {
1356   char *fn;
1357   unsigned int bf_size;
1358
1359   sched = s;
1360   cfg = c;
1361   if (GNUNET_OK !=
1362       GNUNET_CONFIGURATION_get_value_number (cfg,
1363                                              "DATASTORE", "QUOTA", &quota))
1364     {
1365       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1366                   _("No `%s' specified for `%s' in configuration!\n"),
1367                   "QUOTA",
1368                   "DATASTORE");
1369       return;
1370     }
1371   cache_size = quota / 8; /* Or should we make this an option? */
1372   bf_size = quota / 32; /* 8 bit per entry, 1 bit per 32 kb in DB */
1373   fn = NULL;
1374   if ( (GNUNET_OK !=
1375         GNUNET_CONFIGURATION_get_value_filename (cfg,
1376                                                  "DATASTORE",
1377                                                  "BLOOMFILTER",
1378                                                  &fn)) ||
1379        (GNUNET_OK !=
1380         GNUNET_DISK_directory_create_for_file (fn)) )
1381     {
1382       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1383                   _("Could not use specified filename `%s' for bloomfilter.\n"),
1384                   fn != NULL ? fn : "");
1385       GNUNET_free_non_null (fn);
1386       fn = NULL;
1387     }
1388   filter = GNUNET_CONTAINER_bloomfilter_load (fn, bf_size, 5);  /* approx. 3% false positives at max use */  
1389   GNUNET_free_non_null (fn);
1390   if (filter == NULL)
1391     {
1392       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1393                   _("Failed to initialize bloomfilter.\n"));
1394       return;
1395     }
1396   GNUNET_ARM_start_services (cfg, sched, "statistics", NULL);
1397   plugin = load_plugin ();
1398   if (NULL == plugin)
1399     {
1400       GNUNET_CONTAINER_bloomfilter_free (filter);
1401       filter = NULL;
1402       GNUNET_ARM_stop_services (cfg, sched, "statistics", NULL);
1403       return;
1404     }
1405   GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL);
1406   GNUNET_SERVER_add_handlers (server, handlers);
1407   expired_kill_task
1408     = GNUNET_SCHEDULER_add_with_priority (sched,
1409                                           GNUNET_SCHEDULER_PRIORITY_IDLE,
1410                                           &delete_expired, NULL);
1411   GNUNET_SCHEDULER_add_delayed (sched,
1412                                 GNUNET_TIME_UNIT_FOREVER_REL,
1413                                 &cleaning_task, NULL);
1414   
1415 }
1416
1417
1418 /**
1419  * The main function for the datastore service.
1420  *
1421  * @param argc number of arguments from the command line
1422  * @param argv command line arguments
1423  * @return 0 ok, 1 on error
1424  */
1425 int
1426 main (int argc, char *const *argv)
1427 {
1428   int ret;
1429
1430   ret = (GNUNET_OK ==
1431          GNUNET_SERVICE_run (argc,
1432                              argv,
1433                              "datastore",
1434                              GNUNET_SERVICE_OPTION_NONE,
1435                              &run, NULL)) ? 0 : 1;
1436   return ret;
1437 }
1438
1439
1440 /* end of gnunet-service-datastore.c */