fix
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     return;
476   process_queue (h);
477 }
478
479
480 /**
481  * Disconnect from the service and then try reconnecting to the datastore service
482  * after some delay.
483  *
484  * @param h handle to datastore to disconnect and reconnect
485  */
486 static void
487 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
488 {
489   if (h->client == NULL)
490     return;
491 #if 0
492   GNUNET_STATISTICS_update (stats,
493                             gettext_noop ("# reconnected to datastore"),
494                             1,
495                             GNUNET_NO);
496 #endif
497   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
498   h->client = NULL;
499   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
500                                                     h->retry_time,
501                                                     &try_reconnect,
502                                                     h);      
503 }
504
505
506 /**
507  * Transmit request from queue to datastore service.
508  *
509  * @param cls the 'struct GNUNET_DATASTORE_Handle'
510  * @param size number of bytes that can be copied to buf
511  * @param buf where to copy the drop message
512  * @return number of bytes written to buf
513  */
514 static size_t
515 transmit_request (void *cls,
516                   size_t size, 
517                   void *buf)
518 {
519   struct GNUNET_DATASTORE_Handle *h = cls;
520   struct GNUNET_DATASTORE_QueueEntry *qe;
521   size_t msize;
522
523   h->th = NULL;
524   if (NULL == (qe = h->queue_head))
525     return 0; /* no entry in queue */
526   if (buf == NULL)
527     {
528       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
529                   _("Failed to transmit request to database.\n"));
530       do_disconnect (h);
531       return 0;
532     }
533   if (size < (msize = qe->message_size))
534     {
535       process_queue (h);
536       return 0;
537     }
538   memcpy (buf, &qe[1], msize);
539   qe->was_transmitted = GNUNET_YES;
540   GNUNET_SCHEDULER_cancel (h->sched,
541                            qe->task);
542   qe->task = GNUNET_SCHEDULER_NO_TASK;
543   h->in_receive = GNUNET_YES;
544   GNUNET_CLIENT_receive (h->client,
545                          qe->response_proc,
546                          qe,
547                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
548   return msize;
549 }
550
551
552 /**
553  * Process entries in the queue (or do nothing if we are already
554  * doing so).
555  * 
556  * @param h handle to the datastore
557  */
558 static void
559 process_queue (struct GNUNET_DATASTORE_Handle *h)
560 {
561   struct GNUNET_DATASTORE_QueueEntry *qe;
562
563   if (NULL == (qe = h->queue_head))
564     return; /* no entry in queue */
565   if (qe->was_transmitted == GNUNET_YES)
566     return; /* waiting for replies */
567   if (h->th != NULL)
568     return; /* request pending */
569   if (h->client == NULL)
570     return; /* waiting for reconnect */
571 #if DEBUG_DATASTORE
572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573               "Transmitting %u bytes request to datastore\n",
574               qe->message_size);
575 #endif
576   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
577                                                qe->message_size,
578                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
579                                                GNUNET_YES,
580                                                &transmit_request,
581                                                h);
582 }
583
584
585 /**
586  * Dummy continuation used to do nothing (but be non-zero).
587  *
588  * @param cls closure
589  * @param result result 
590  * @param emsg error message
591  */
592 static void
593 drop_status_cont (void *cls, int result, const char *emsg)
594 {
595   /* do nothing */
596 }
597
598
599 static void
600 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
601 {
602   struct GNUNET_DATASTORE_Handle *h = qe->h;
603
604   GNUNET_CONTAINER_DLL_remove (h->queue_head,
605                                h->queue_tail,
606                                qe);
607   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
608     {
609       GNUNET_SCHEDULER_cancel (h->sched,
610                                qe->task);
611       qe->task = GNUNET_SCHEDULER_NO_TASK;
612     }
613   h->queue_size--;
614   GNUNET_free (qe);
615 }
616
617 /**
618  * Type of a function to call when we receive a message
619  * from the service.
620  *
621  * @param cls closure
622  * @param msg message received, NULL on timeout or fatal error
623  */
624 static void 
625 process_status_message (void *cls,
626                         const struct
627                         GNUNET_MessageHeader * msg)
628 {
629   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
630   struct GNUNET_DATASTORE_Handle *h = qe->h;
631   struct StatusContext rc = qe->qc.sc;
632   const struct StatusMessage *sm;
633   const char *emsg;
634   int32_t status;
635   int was_transmitted;
636
637   h->in_receive = GNUNET_NO;
638   was_transmitted = qe->was_transmitted;
639   if (msg == NULL)
640     {      
641       free_queue_entry (qe);
642       if (NULL == h->client)
643         return; /* forced disconnect */
644       if (was_transmitted == GNUNET_YES)
645         {
646           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
647                       _("Failed to receive response from database.\n"));
648           do_disconnect (h);
649         }
650       return;
651     }
652   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
653   GNUNET_assert (h->queue_head == qe);
654   free_queue_entry (qe);
655   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
656        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
657     {
658       GNUNET_break (0);
659       h->retry_time = GNUNET_TIME_UNIT_ZERO;
660       do_disconnect (h);
661       rc.cont (rc.cont_cls, 
662                GNUNET_SYSERR,
663                _("Error reading response from datastore service"));
664       return;
665     }
666   sm = (const struct StatusMessage*) msg;
667   status = ntohl(sm->status);
668   emsg = NULL;
669   if (ntohs(msg->size) > sizeof(struct StatusMessage))
670     {
671       emsg = (const char*) &sm[1];
672       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
673         {
674           GNUNET_break (0);
675           emsg = _("Invalid error message received from datastore service");
676         }
677     }  
678   if ( (status == GNUNET_SYSERR) &&
679        (emsg == NULL) )
680     {
681       GNUNET_break (0);
682       emsg = _("Invalid error message received from datastore service");
683     }
684 #if DEBUG_DATASTORE
685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686               "Received status %d/%s\n",
687               (int) status,
688               emsg);
689 #endif
690   rc.cont (rc.cont_cls, 
691            status,
692            emsg);
693   process_queue (h);
694 }
695
696
697 /**
698  * Store an item in the datastore.  If the item is already present,
699  * the priorities are summed up and the higher expiration time and
700  * lower anonymity level is used.
701  *
702  * @param h handle to the datastore
703  * @param rid reservation ID to use (from "reserve"); use 0 if no
704  *            prior reservation was made
705  * @param key key for the value
706  * @param size number of bytes in data
707  * @param data content stored
708  * @param type type of the content
709  * @param priority priority of the content
710  * @param anonymity anonymity-level for the content
711  * @param expiration expiration time for the content
712  * @param queue_priority ranking of this request in the priority queue
713  * @param max_queue_size at what queue size should this request be dropped
714  *        (if other requests of higher priority are in the queue)
715  * @param timeout timeout for the operation
716  * @param cont continuation to call when done
717  * @param cont_cls closure for cont
718  * @return NULL if the entry was not queued, otherwise a handle that can be used to
719  *         cancel; note that even if NULL is returned, the callback will be invoked
720  *         (or rather, will already have been invoked)
721  */
722 struct GNUNET_DATASTORE_QueueEntry *
723 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
724                       int rid,
725                       const GNUNET_HashCode * key,
726                       uint32_t size,
727                       const void *data,
728                       enum GNUNET_BLOCK_Type type,
729                       uint32_t priority,
730                       uint32_t anonymity,
731                       struct GNUNET_TIME_Absolute expiration,
732                       unsigned int queue_priority,
733                       unsigned int max_queue_size,
734                       struct GNUNET_TIME_Relative timeout,
735                       GNUNET_DATASTORE_ContinuationWithStatus cont,
736                       void *cont_cls)
737 {
738   struct GNUNET_DATASTORE_QueueEntry *qe;
739   struct DataMessage *dm;
740   size_t msize;
741   union QueueContext qc;
742
743 #if DEBUG_DATASTORE
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Asked to put %u bytes of data under key `%s'\n",
746               size,
747               GNUNET_h2s (key));
748 #endif
749   msize = sizeof(struct DataMessage) + size;
750   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
751   qc.sc.cont = cont;
752   qc.sc.cont_cls = cont_cls;
753   qe = make_queue_entry (h, msize,
754                          queue_priority, max_queue_size, timeout,
755                          &process_status_message, &qc);
756   if (qe == NULL)
757     return NULL;
758   dm = (struct DataMessage* ) &qe[1];
759   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
760   dm->header.size = htons(msize);
761   dm->rid = htonl(rid);
762   dm->size = htonl(size);
763   dm->type = htonl(type);
764   dm->priority = htonl(priority);
765   dm->anonymity = htonl(anonymity);
766   dm->uid = GNUNET_htonll(0);
767   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
768   dm->key = *key;
769   memcpy (&dm[1], data, size);
770   process_queue (h);
771   return qe;
772 }
773
774
775 /**
776  * Reserve space in the datastore.  This function should be used
777  * to avoid "out of space" failures during a longer sequence of "put"
778  * operations (for example, when a file is being inserted).
779  *
780  * @param h handle to the datastore
781  * @param amount how much space (in bytes) should be reserved (for content only)
782  * @param entries how many entries will be created (to calculate per-entry overhead)
783  * @param queue_priority ranking of this request in the priority queue
784  * @param max_queue_size at what queue size should this request be dropped
785  *        (if other requests of higher priority are in the queue)
786  * @param timeout how long to wait at most for a response (or before dying in queue)
787  * @param cont continuation to call when done; "success" will be set to
788  *             a positive reservation value if space could be reserved.
789  * @param cont_cls closure for cont
790  * @return NULL if the entry was not queued, otherwise a handle that can be used to
791  *         cancel; note that even if NULL is returned, the callback will be invoked
792  *         (or rather, will already have been invoked)
793  */
794 struct GNUNET_DATASTORE_QueueEntry *
795 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
796                           uint64_t amount,
797                           uint32_t entries,
798                           unsigned int queue_priority,
799                           unsigned int max_queue_size,
800                           struct GNUNET_TIME_Relative timeout,
801                           GNUNET_DATASTORE_ContinuationWithStatus cont,
802                           void *cont_cls)
803 {
804   struct GNUNET_DATASTORE_QueueEntry *qe;
805   struct ReserveMessage *rm;
806   union QueueContext qc;
807
808   if (cont == NULL)
809     cont = &drop_status_cont;
810 #if DEBUG_DATASTORE
811   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
812               "Asked to reserve %llu bytes of data and %u entries'\n",
813               (unsigned long long) amount,
814               (unsigned int) entries);
815 #endif
816   qc.sc.cont = cont;
817   qc.sc.cont_cls = cont_cls;
818   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
819                          queue_priority, max_queue_size, timeout,
820                          &process_status_message, &qc);
821   if (qe == NULL)
822     return NULL;
823   rm = (struct ReserveMessage*) &qe[1];
824   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
825   rm->header.size = htons(sizeof (struct ReserveMessage));
826   rm->entries = htonl(entries);
827   rm->amount = GNUNET_htonll(amount);
828   process_queue (h);
829   return qe;
830 }
831
832
833 /**
834  * Signal that all of the data for which a reservation was made has
835  * been stored and that whatever excess space might have been reserved
836  * can now be released.
837  *
838  * @param h handle to the datastore
839  * @param rid reservation ID (value of "success" in original continuation
840  *        from the "reserve" function).
841  * @param queue_priority ranking of this request in the priority queue
842  * @param max_queue_size at what queue size should this request be dropped
843  *        (if other requests of higher priority are in the queue)
844  * @param queue_priority ranking of this request in the priority queue
845  * @param max_queue_size at what queue size should this request be dropped
846  *        (if other requests of higher priority are in the queue)
847  * @param timeout how long to wait at most for a response
848  * @param cont continuation to call when done
849  * @param cont_cls closure for cont
850  * @return NULL if the entry was not queued, otherwise a handle that can be used to
851  *         cancel; note that even if NULL is returned, the callback will be invoked
852  *         (or rather, will already have been invoked)
853  */
854 struct GNUNET_DATASTORE_QueueEntry *
855 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
856                                   int rid,
857                                   unsigned int queue_priority,
858                                   unsigned int max_queue_size,
859                                   struct GNUNET_TIME_Relative timeout,
860                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
861                                   void *cont_cls)
862 {
863   struct GNUNET_DATASTORE_QueueEntry *qe;
864   struct ReleaseReserveMessage *rrm;
865   union QueueContext qc;
866
867   if (cont == NULL)
868     cont = &drop_status_cont;
869 #if DEBUG_DATASTORE
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Asked to release reserve %d\n",
872               rid);
873 #endif
874   qc.sc.cont = cont;
875   qc.sc.cont_cls = cont_cls;
876   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
877                          queue_priority, max_queue_size, timeout,
878                          &process_status_message, &qc);
879   if (qe == NULL)
880     return NULL;
881   rrm = (struct ReleaseReserveMessage*) &qe[1];
882   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
883   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
884   rrm->rid = htonl(rid);
885   process_queue (h);
886   return qe;
887 }
888
889
890 /**
891  * Update a value in the datastore.
892  *
893  * @param h handle to the datastore
894  * @param uid identifier for the value
895  * @param priority how much to increase the priority of the value
896  * @param expiration new expiration value should be MAX of existing and this argument
897  * @param queue_priority ranking of this request in the priority queue
898  * @param max_queue_size at what queue size should this request be dropped
899  *        (if other requests of higher priority are in the queue)
900  * @param timeout how long to wait at most for a response
901  * @param cont continuation to call when done
902  * @param cont_cls closure for cont
903  * @return NULL if the entry was not queued, otherwise a handle that can be used to
904  *         cancel; note that even if NULL is returned, the callback will be invoked
905  *         (or rather, will already have been invoked)
906  */
907 struct GNUNET_DATASTORE_QueueEntry *
908 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
909                          unsigned long long uid,
910                          uint32_t priority,
911                          struct GNUNET_TIME_Absolute expiration,
912                          unsigned int queue_priority,
913                          unsigned int max_queue_size,
914                          struct GNUNET_TIME_Relative timeout,
915                          GNUNET_DATASTORE_ContinuationWithStatus cont,
916                          void *cont_cls)
917 {
918   struct GNUNET_DATASTORE_QueueEntry *qe;
919   struct UpdateMessage *um;
920   union QueueContext qc;
921
922   if (cont == NULL)
923     cont = &drop_status_cont;
924 #if DEBUG_DATASTORE
925   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
927               uid,
928               (unsigned int) priority,
929               (unsigned long long) expiration.value);
930 #endif
931   qc.sc.cont = cont;
932   qc.sc.cont_cls = cont_cls;
933   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
934                          queue_priority, max_queue_size, timeout,
935                          &process_status_message, &qc);
936   if (qe == NULL)
937     return NULL;
938   um = (struct UpdateMessage*) &qe[1];
939   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
940   um->header.size = htons(sizeof (struct UpdateMessage));
941   um->priority = htonl(priority);
942   um->expiration = GNUNET_TIME_absolute_hton(expiration);
943   um->uid = GNUNET_htonll(uid);
944   process_queue (h);
945   return qe;
946 }
947
948
949 /**
950  * Explicitly remove some content from the database.
951  * The "cont"inuation will be called with status
952  * "GNUNET_OK" if content was removed, "GNUNET_NO"
953  * if no matching entry was found and "GNUNET_SYSERR"
954  * on all other types of errors.
955  *
956  * @param h handle to the datastore
957  * @param key key for the value
958  * @param size number of bytes in data
959  * @param data content stored
960  * @param queue_priority ranking of this request in the priority queue
961  * @param max_queue_size at what queue size should this request be dropped
962  *        (if other requests of higher priority are in the queue)
963  * @param timeout how long to wait at most for a response
964  * @param cont continuation to call when done
965  * @param cont_cls closure for cont
966  * @return NULL if the entry was not queued, otherwise a handle that can be used to
967  *         cancel; note that even if NULL is returned, the callback will be invoked
968  *         (or rather, will already have been invoked)
969  */
970 struct GNUNET_DATASTORE_QueueEntry *
971 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
972                          const GNUNET_HashCode *key,
973                          uint32_t size, 
974                          const void *data,
975                          unsigned int queue_priority,
976                          unsigned int max_queue_size,
977                          struct GNUNET_TIME_Relative timeout,
978                          GNUNET_DATASTORE_ContinuationWithStatus cont,
979                          void *cont_cls)
980 {
981   struct GNUNET_DATASTORE_QueueEntry *qe;
982   struct DataMessage *dm;
983   size_t msize;
984   union QueueContext qc;
985
986   if (cont == NULL)
987     cont = &drop_status_cont;
988 #if DEBUG_DATASTORE
989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990               "Asked to remove %u bytes under key `%s'\n",
991               size,
992               GNUNET_h2s (key));
993 #endif
994   qc.sc.cont = cont;
995   qc.sc.cont_cls = cont_cls;
996   msize = sizeof(struct DataMessage) + size;
997   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
998   qe = make_queue_entry (h, msize,
999                          queue_priority, max_queue_size, timeout,
1000                          &process_status_message, &qc);
1001   if (qe == NULL)
1002     return NULL;
1003   dm = (struct DataMessage*) &qe[1];
1004   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1005   dm->header.size = htons(msize);
1006   dm->rid = htonl(0);
1007   dm->size = htonl(size);
1008   dm->type = htonl(0);
1009   dm->priority = htonl(0);
1010   dm->anonymity = htonl(0);
1011   dm->uid = GNUNET_htonll(0);
1012   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1013   dm->key = *key;
1014   memcpy (&dm[1], data, size);
1015   process_queue (h);
1016   return qe;
1017 }
1018
1019
1020 /**
1021  * Type of a function to call when we receive a message
1022  * from the service.
1023  *
1024  * @param cls closure
1025  * @param msg message received, NULL on timeout or fatal error
1026  */
1027 static void 
1028 process_result_message (void *cls,
1029                         const struct GNUNET_MessageHeader * msg)
1030 {
1031   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1032   struct GNUNET_DATASTORE_Handle *h = qe->h;
1033   struct ResultContext rc = qe->qc.rc;
1034   const struct DataMessage *dm;
1035   int was_transmitted;
1036
1037   h->in_receive = GNUNET_NO;
1038   if (msg == NULL)
1039     {
1040       was_transmitted = qe->was_transmitted;
1041       free_queue_entry (qe);
1042       if (was_transmitted == GNUNET_YES)
1043         {
1044           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1045                       _("Failed to receive response from database.\n"));
1046           do_disconnect (h);
1047         }
1048       if (rc.iter != NULL)
1049         rc.iter (rc.iter_cls,
1050                  NULL, 0, NULL, 0, 0, 0, 
1051                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1052       return;
1053     }
1054   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1055   GNUNET_assert (h->queue_head == qe);
1056   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1057     {
1058       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1059 #if DEBUG_DATASTORE
1060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061                   "Received end of result set\n");
1062 #endif
1063       free_queue_entry (qe);
1064       if (rc.iter != NULL)
1065         rc.iter (rc.iter_cls,
1066                  NULL, 0, NULL, 0, 0, 0, 
1067                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1068       process_queue (h);
1069       return;
1070     }
1071   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1072        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1073        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1074     {
1075       GNUNET_break (0);
1076       free_queue_entry (qe);
1077       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1078       do_disconnect (h);
1079       if (rc.iter != NULL)
1080         rc.iter (rc.iter_cls,
1081                  NULL, 0, NULL, 0, 0, 0, 
1082                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1083       return;
1084     }
1085   if (rc.iter == NULL)
1086     {
1087       /* abort iteration */
1088       do_disconnect (h);
1089       return;
1090     }
1091   dm = (const struct DataMessage*) msg;
1092 #if DEBUG_DATASTORE
1093   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094               "Received result %llu with type %u and size %u with key %s\n",
1095               (unsigned long long) GNUNET_ntohll(dm->uid),
1096               ntohl(dm->type),
1097               ntohl(dm->size),
1098               GNUNET_h2s(&dm->key));
1099 #endif
1100   rc.iter (rc.iter_cls,
1101            &dm->key,
1102            ntohl(dm->size),
1103            &dm[1],
1104            ntohl(dm->type),
1105            ntohl(dm->priority),
1106            ntohl(dm->anonymity),
1107            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1108            GNUNET_ntohll(dm->uid));
1109 }
1110
1111
1112 /**
1113  * Get a random value from the datastore.
1114  *
1115  * @param h handle to the datastore
1116  * @param queue_priority ranking of this request in the priority queue
1117  * @param max_queue_size at what queue size should this request be dropped
1118  *        (if other requests of higher priority are in the queue)
1119  * @param timeout how long to wait at most for a response
1120  * @param iter function to call on a random value; it
1121  *        will be called once with a value (if available)
1122  *        and always once with a value of NULL.
1123  * @param iter_cls closure for iter
1124  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1125  *         cancel; note that even if NULL is returned, the callback will be invoked
1126  *         (or rather, will already have been invoked)
1127  */
1128 struct GNUNET_DATASTORE_QueueEntry *
1129 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1130                              unsigned int queue_priority,
1131                              unsigned int max_queue_size,
1132                              struct GNUNET_TIME_Relative timeout,
1133                              GNUNET_DATASTORE_Iterator iter, 
1134                              void *iter_cls)
1135 {
1136   struct GNUNET_DATASTORE_QueueEntry *qe;
1137   struct GNUNET_MessageHeader *m;
1138   union QueueContext qc;
1139
1140 #if DEBUG_DATASTORE
1141   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142               "Asked to get random entry in %llu ms\n",
1143               (unsigned long long) timeout.value);
1144 #endif
1145   qc.rc.iter = iter;
1146   qc.rc.iter_cls = iter_cls;
1147   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1148                          queue_priority, max_queue_size, timeout,
1149                          &process_result_message, &qc);
1150   if (qe == NULL)
1151     return NULL;    
1152   m = (struct GNUNET_MessageHeader*) &qe[1];
1153   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1154   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1155   process_queue (h);
1156   return qe;
1157 }
1158
1159
1160
1161 /**
1162  * Iterate over the results for a particular key
1163  * in the datastore.  The iterator will only be called
1164  * once initially; if the first call did contain a
1165  * result, further results can be obtained by calling
1166  * "GNUNET_DATASTORE_get_next" with the given argument.
1167  *
1168  * @param h handle to the datastore
1169  * @param key maybe NULL (to match all entries)
1170  * @param type desired type, 0 for any
1171  * @param queue_priority ranking of this request in the priority queue
1172  * @param max_queue_size at what queue size should this request be dropped
1173  *        (if other requests of higher priority are in the queue)
1174  * @param timeout how long to wait at most for a response
1175  * @param iter function to call on each matching value;
1176  *        will be called once with a NULL value at the end
1177  * @param iter_cls closure for iter
1178  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1179  *         cancel; note that even if NULL is returned, the callback will be invoked
1180  *         (or rather, will already have been invoked)
1181  */
1182 struct GNUNET_DATASTORE_QueueEntry *
1183 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1184                       const GNUNET_HashCode * key,
1185                       enum GNUNET_BLOCK_Type type,
1186                       unsigned int queue_priority,
1187                       unsigned int max_queue_size,
1188                       struct GNUNET_TIME_Relative timeout,
1189                       GNUNET_DATASTORE_Iterator iter, 
1190                       void *iter_cls)
1191 {
1192   struct GNUNET_DATASTORE_QueueEntry *qe;
1193   struct GetMessage *gm;
1194   union QueueContext qc;
1195
1196 #if DEBUG_DATASTORE
1197   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1198               "Asked to look for data of type %u under key `%s'\n",
1199               (unsigned int) type,
1200               GNUNET_h2s (key));
1201 #endif
1202   qc.rc.iter = iter;
1203   qc.rc.iter_cls = iter_cls;
1204   qe = make_queue_entry (h, sizeof(struct GetMessage),
1205                          queue_priority, max_queue_size, timeout,
1206                          &process_result_message, &qc);
1207   if (qe == NULL)
1208     return NULL;
1209   gm = (struct GetMessage*) &qe[1];
1210   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1211   gm->type = htonl(type);
1212   if (key != NULL)
1213     {
1214       gm->header.size = htons(sizeof (struct GetMessage));
1215       gm->key = *key;
1216     }
1217   else
1218     {
1219       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1220     }
1221   process_queue (h);
1222   return qe;
1223 }
1224
1225
1226 /**
1227  * Function called to trigger obtaining the next result
1228  * from the datastore.
1229  * 
1230  * @param h handle to the datastore
1231  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1232  *        iteration (with a final call to "iter" with key/data == NULL).
1233  */
1234 void 
1235 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1236                            int more)
1237 {
1238   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1239   struct ResultContext rc = qe->qc.rc;
1240
1241   GNUNET_assert (NULL != qe);
1242   GNUNET_assert (&process_result_message == qe->response_proc);
1243   if (GNUNET_YES == more)
1244     {     
1245       h->in_receive = GNUNET_YES;
1246       GNUNET_CLIENT_receive (h->client,
1247                              qe->response_proc,
1248                              qe,
1249                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1250       return;
1251     }
1252   free_queue_entry (qe);
1253   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1254   do_disconnect (h);
1255   rc.iter (rc.iter_cls,
1256            NULL, 0, NULL, 0, 0, 0, 
1257            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1258 }
1259
1260
1261 /**
1262  * Cancel a datastore operation.  The final callback from the
1263  * operation must not have been done yet.
1264  * 
1265  * @param qe operation to cancel
1266  */
1267 void
1268 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1269 {
1270   struct GNUNET_DATASTORE_Handle *h;
1271   int reconnect;
1272
1273   h = qe->h;
1274   reconnect = GNUNET_NO;
1275   if (GNUNET_YES == qe->was_transmitted) 
1276     {
1277       if (qe->response_proc == &process_result_message) 
1278         {
1279           qe->qc.rc.iter = NULL;    
1280           if (GNUNET_YES != h->in_receive)
1281             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1282           return;
1283         }
1284       reconnect = GNUNET_YES;
1285     }
1286   free_queue_entry (qe);
1287   h->queue_size--;
1288   if (reconnect)
1289     {
1290       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1291       do_disconnect (h);
1292     }
1293 }
1294
1295
1296 /* end of datastore_api.c */