c91884aa0d57f5b0c091387faa375efded085ae7
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217 };
218
219
220
221 /**
222  * Connect to the datastore service.
223  *
224  * @param cfg configuration to use
225  * @param sched scheduler to use
226  * @return handle to use to access the service
227  */
228 struct GNUNET_DATASTORE_Handle *
229 GNUNET_DATASTORE_connect (const struct
230                           GNUNET_CONFIGURATION_Handle
231                           *cfg,
232                           struct
233                           GNUNET_SCHEDULER_Handle
234                           *sched)
235 {
236   struct GNUNET_CLIENT_Connection *c;
237   struct GNUNET_DATASTORE_Handle *h;
238   
239   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
240   if (c == NULL)
241     return NULL; /* oops */
242   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
243                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
244   h->client = c;
245   h->cfg = cfg;
246   h->sched = sched;
247   return h;
248 }
249
250
251 /**
252  * Transmit DROP message to datastore service.
253  *
254  * @param cls the 'struct GNUNET_DATASTORE_Handle'
255  * @param size number of bytes that can be copied to buf
256  * @param buf where to copy the drop message
257  * @return number of bytes written to buf
258  */
259 static size_t
260 transmit_drop (void *cls,
261                size_t size, 
262                void *buf)
263 {
264   struct GNUNET_DATASTORE_Handle *h = cls;
265   struct GNUNET_MessageHeader *hdr;
266   
267   if (buf == NULL)
268     {
269       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270                   _("Failed to transmit request to drop database.\n"));
271       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
272       return 0;
273     }
274   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
275   hdr = buf;
276   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
277   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
278   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
279   return sizeof(struct GNUNET_MessageHeader);
280 }
281
282
283 /**
284  * Disconnect from the datastore service (and free
285  * associated resources).
286  *
287  * @param h handle to the datastore
288  * @param drop set to GNUNET_YES to delete all data in datastore (!)
289  */
290 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
291                                   int drop)
292 {
293   struct GNUNET_DATASTORE_QueueEntry *qe;
294
295   if (h->client != NULL)
296     {
297       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
298       h->client = NULL;
299     }
300   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
301     {
302       GNUNET_SCHEDULER_cancel (h->sched,
303                                h->reconnect_task);
304       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
305     }
306   while (NULL != (qe = h->queue_head))
307     {
308       GNUNET_assert (NULL != qe->response_proc);
309       qe->response_proc (qe, NULL);
310     }
311   if (GNUNET_YES == drop) 
312     {
313       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
314       if (h->client != NULL)
315         {
316           if (NULL != 
317               GNUNET_CLIENT_notify_transmit_ready (h->client,
318                                                    sizeof(struct GNUNET_MessageHeader),
319                                                    GNUNET_TIME_UNIT_MINUTES,
320                                                    GNUNET_YES,
321                                                    &transmit_drop,
322                                                    h))
323             return;
324           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325         }
326       GNUNET_break (0);
327     }
328   GNUNET_free (h);
329 }
330
331
332 /**
333  * A request has timed out (before being transmitted to the service).
334  *
335  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
336  * @param tc scheduler context
337  */
338 static void
339 timeout_queue_entry (void *cls,
340                      const struct GNUNET_SCHEDULER_TaskContext *tc)
341 {
342   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
343
344   qe->task = GNUNET_SCHEDULER_NO_TASK;
345   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
346   qe->response_proc (qe, NULL);
347 }
348
349
350 /**
351  * Create a new entry for our priority queue (and possibly discard other entires if
352  * the queue is getting too long).
353  *
354  * @param h handle to the datastore
355  * @param msize size of the message to queue
356  * @param queue_priority priority of the entry
357  * @param max_queue_size at what queue size should this request be dropped
358  *        (if other requests of higher priority are in the queue)
359  * @param timeout timeout for the operation
360  * @param response_proc function to call with replies (can be NULL)
361  * @param qc client context (NOT a closure for response_proc)
362  * @return NULL if the queue is full (and this entry was dropped)
363  */
364 static struct GNUNET_DATASTORE_QueueEntry *
365 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
366                   size_t msize,
367                   unsigned int queue_priority,
368                   unsigned int max_queue_size,
369                   struct GNUNET_TIME_Relative timeout,
370                   GNUNET_CLIENT_MessageHandler response_proc,            
371                   const union QueueContext *qc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *ret;
374   struct GNUNET_DATASTORE_QueueEntry *pos;
375   unsigned int c;
376
377   c = 0;
378   pos = h->queue_head;
379   while ( (pos != NULL) &&
380           (c < max_queue_size) &&
381           (pos->priority >= queue_priority) )
382     {
383       c++;
384       pos = pos->next;
385     }
386   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
387   ret->h = h;
388   ret->response_proc = response_proc;
389   ret->qc = *qc;
390   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
391   ret->priority = queue_priority;
392   ret->max_queue = max_queue_size;
393   ret->message_size = msize;
394   ret->was_transmitted = GNUNET_NO;
395   if (pos == NULL)
396     {
397       /* append at the tail */
398       pos = h->queue_tail;
399     }
400   else
401     {
402       pos = pos->prev; 
403       /* do not insert at HEAD if HEAD query was already
404          transmitted and we are still receiving replies! */
405       if ( (pos == NULL) &&
406            (h->queue_head->was_transmitted) )
407         pos = h->queue_head;
408     }
409   c++;
410   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
411                                      h->queue_tail,
412                                      pos,
413                                      ret);
414   h->queue_size++;
415   if (c > max_queue_size)
416     {
417       response_proc (ret, NULL);
418       GNUNET_free (ret);
419       return NULL;
420     }
421   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
422                                             timeout,
423                                             &timeout_queue_entry,
424                                             ret);
425   pos = ret->next;
426   while (pos != NULL) 
427     {
428       if (pos->max_queue < h->queue_size)
429         {
430           GNUNET_assert (pos->response_proc != NULL);
431           pos->response_proc (pos, NULL);
432           break;
433         }
434       pos = pos->next;
435     }
436   return ret;
437 }
438
439
440 /**
441  * Process entries in the queue (or do nothing if we are already
442  * doing so).
443  * 
444  * @param h handle to the datastore
445  */
446 static void
447 process_queue (struct GNUNET_DATASTORE_Handle *h);
448
449
450 /**
451  * Try reconnecting to the datastore service.
452  *
453  * @param cls the 'struct GNUNET_DATASTORE_Handle'
454  * @param tc scheduler context
455  */
456 static void
457 try_reconnect (void *cls,
458                const struct GNUNET_SCHEDULER_TaskContext *tc)
459 {
460   struct GNUNET_DATASTORE_Handle *h = cls;
461
462   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
463     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
464   else
465     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
466   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
468   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
469   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
470   if (h->client == NULL)
471     return;
472   process_queue (h);
473 }
474
475
476 /**
477  * Disconnect from the service and then try reconnecting to the datastore service
478  * after some delay.
479  *
480  * @param h handle to datastore to disconnect and reconnect
481  */
482 static void
483 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
484 {
485   if (h->client == NULL)
486     return;
487   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
488   h->client = NULL;
489   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
490                                                     h->retry_time,
491                                                     &try_reconnect,
492                                                     h);      
493 }
494
495
496 /**
497  * Transmit request from queue to datastore service.
498  *
499  * @param cls the 'struct GNUNET_DATASTORE_Handle'
500  * @param size number of bytes that can be copied to buf
501  * @param buf where to copy the drop message
502  * @return number of bytes written to buf
503  */
504 static size_t
505 transmit_request (void *cls,
506                   size_t size, 
507                   void *buf)
508 {
509   struct GNUNET_DATASTORE_Handle *h = cls;
510   struct GNUNET_DATASTORE_QueueEntry *qe;
511   size_t msize;
512
513   h->th = NULL;
514   if (NULL == (qe = h->queue_head))
515     return 0; /* no entry in queue */
516   if (buf == NULL)
517     {
518       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
519                   _("Failed to transmit request to database.\n"));
520       do_disconnect (h);
521       return 0;
522     }
523   if (size < (msize = qe->message_size))
524     {
525       process_queue (h);
526       return 0;
527     }
528   memcpy (buf, &qe[1], msize);
529   qe->was_transmitted = GNUNET_YES;
530   GNUNET_SCHEDULER_cancel (h->sched,
531                            qe->task);
532   qe->task = GNUNET_SCHEDULER_NO_TASK;
533   GNUNET_CLIENT_receive (h->client,
534                          qe->response_proc,
535                          qe,
536                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
537   return msize;
538 }
539
540
541 /**
542  * Process entries in the queue (or do nothing if we are already
543  * doing so).
544  * 
545  * @param h handle to the datastore
546  */
547 static void
548 process_queue (struct GNUNET_DATASTORE_Handle *h)
549 {
550   struct GNUNET_DATASTORE_QueueEntry *qe;
551
552   if (NULL == (qe = h->queue_head))
553     return; /* no entry in queue */
554   if (qe->was_transmitted == GNUNET_YES)
555     return; /* waiting for replies */
556   if (h->th != NULL)
557     return; /* request pending */
558   if (h->client == NULL)
559     return; /* waiting for reconnect */
560   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
561                                                qe->message_size,
562                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
563                                                GNUNET_YES,
564                                                &transmit_request,
565                                                h);
566 }
567
568
569 /**
570  * Dummy continuation used to do nothing (but be non-zero).
571  *
572  * @param cls closure
573  * @param result result 
574  * @param emsg error message
575  */
576 static void
577 drop_status_cont (void *cls, int result, const char *emsg)
578 {
579   /* do nothing */
580 }
581
582
583 static void
584 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
585 {
586   struct GNUNET_DATASTORE_Handle *h = qe->h;
587
588   GNUNET_CONTAINER_DLL_remove (h->queue_head,
589                                h->queue_tail,
590                                qe);
591   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
592     {
593       GNUNET_SCHEDULER_cancel (h->sched,
594                                qe->task);
595       qe->task = GNUNET_SCHEDULER_NO_TASK;
596     }
597   h->queue_size--;
598   GNUNET_free (qe);
599 }
600
601 /**
602  * Type of a function to call when we receive a message
603  * from the service.
604  *
605  * @param cls closure
606  * @param msg message received, NULL on timeout or fatal error
607  */
608 static void 
609 process_status_message (void *cls,
610                         const struct
611                         GNUNET_MessageHeader * msg)
612 {
613   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
614   struct GNUNET_DATASTORE_Handle *h = qe->h;
615   struct StatusContext rc = qe->qc.sc;
616   const struct StatusMessage *sm;
617   const char *emsg;
618   int32_t status;
619
620   free_queue_entry (qe);
621   if (msg == NULL)
622     {      
623       if (NULL == h->client)
624         return; /* forced disconnect */
625       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
626                   _("Failed to receive response from database.\n"));
627       do_disconnect (h);
628       return;
629     }
630
631   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
632        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
633     {
634       GNUNET_break (0);
635       h->retry_time = GNUNET_TIME_UNIT_ZERO;
636       do_disconnect (h);
637       rc.cont (rc.cont_cls, 
638                GNUNET_SYSERR,
639                _("Error reading response from datastore service"));
640       return;
641     }
642   sm = (const struct StatusMessage*) msg;
643   status = ntohl(sm->status);
644   emsg = NULL;
645   if (ntohs(msg->size) > sizeof(struct StatusMessage))
646     {
647       emsg = (const char*) &sm[1];
648       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
649         {
650           GNUNET_break (0);
651           emsg = _("Invalid error message received from datastore service");
652         }
653     }  
654   if ( (status == GNUNET_SYSERR) &&
655        (emsg == NULL) )
656     {
657       GNUNET_break (0);
658       emsg = _("Invalid error message received from datastore service");
659     }
660 #if DEBUG_DATASTORE
661   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
662               "Received status %d/%s\n",
663               (int) status,
664               emsg);
665 #endif
666   rc.cont (rc.cont_cls, 
667            status,
668            emsg);
669   process_queue (h);
670 }
671
672
673 /**
674  * Store an item in the datastore.  If the item is already present,
675  * the priorities are summed up and the higher expiration time and
676  * lower anonymity level is used.
677  *
678  * @param h handle to the datastore
679  * @param rid reservation ID to use (from "reserve"); use 0 if no
680  *            prior reservation was made
681  * @param key key for the value
682  * @param size number of bytes in data
683  * @param data content stored
684  * @param type type of the content
685  * @param priority priority of the content
686  * @param anonymity anonymity-level for the content
687  * @param expiration expiration time for the content
688  * @param queue_priority ranking of this request in the priority queue
689  * @param max_queue_size at what queue size should this request be dropped
690  *        (if other requests of higher priority are in the queue)
691  * @param timeout timeout for the operation
692  * @param cont continuation to call when done
693  * @param cont_cls closure for cont
694  * @return NULL if the entry was not queued, otherwise a handle that can be used to
695  *         cancel; note that even if NULL is returned, the callback will be invoked
696  *         (or rather, will already have been invoked)
697  */
698 struct GNUNET_DATASTORE_QueueEntry *
699 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
700                       int rid,
701                       const GNUNET_HashCode * key,
702                       uint32_t size,
703                       const void *data,
704                       enum GNUNET_BLOCK_Type type,
705                       uint32_t priority,
706                       uint32_t anonymity,
707                       struct GNUNET_TIME_Absolute expiration,
708                       unsigned int queue_priority,
709                       unsigned int max_queue_size,
710                       struct GNUNET_TIME_Relative timeout,
711                       GNUNET_DATASTORE_ContinuationWithStatus cont,
712                       void *cont_cls)
713 {
714   struct GNUNET_DATASTORE_QueueEntry *qe;
715   struct DataMessage *dm;
716   size_t msize;
717   union QueueContext qc;
718
719 #if DEBUG_DATASTORE
720   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
721               "Asked to put %u bytes of data under key `%s'\n",
722               size,
723               GNUNET_h2s (key));
724 #endif
725   msize = sizeof(struct DataMessage) + size;
726   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
727   qc.sc.cont = cont;
728   qc.sc.cont_cls = cont_cls;
729   qe = make_queue_entry (h, msize,
730                          queue_priority, max_queue_size, timeout,
731                          &process_status_message, &qc);
732   if (qe == NULL)
733     return NULL;
734   dm = (struct DataMessage* ) &qe[1];
735   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
736   dm->header.size = htons(msize);
737   dm->rid = htonl(rid);
738   dm->size = htonl(size);
739   dm->type = htonl(type);
740   dm->priority = htonl(priority);
741   dm->anonymity = htonl(anonymity);
742   dm->uid = GNUNET_htonll(0);
743   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
744   dm->key = *key;
745   memcpy (&dm[1], data, size);
746   process_queue (h);
747   return qe;
748 }
749
750
751 /**
752  * Reserve space in the datastore.  This function should be used
753  * to avoid "out of space" failures during a longer sequence of "put"
754  * operations (for example, when a file is being inserted).
755  *
756  * @param h handle to the datastore
757  * @param amount how much space (in bytes) should be reserved (for content only)
758  * @param entries how many entries will be created (to calculate per-entry overhead)
759  * @param queue_priority ranking of this request in the priority queue
760  * @param max_queue_size at what queue size should this request be dropped
761  *        (if other requests of higher priority are in the queue)
762  * @param timeout how long to wait at most for a response (or before dying in queue)
763  * @param cont continuation to call when done; "success" will be set to
764  *             a positive reservation value if space could be reserved.
765  * @param cont_cls closure for cont
766  * @return NULL if the entry was not queued, otherwise a handle that can be used to
767  *         cancel; note that even if NULL is returned, the callback will be invoked
768  *         (or rather, will already have been invoked)
769  */
770 struct GNUNET_DATASTORE_QueueEntry *
771 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
772                           uint64_t amount,
773                           uint32_t entries,
774                           unsigned int queue_priority,
775                           unsigned int max_queue_size,
776                           struct GNUNET_TIME_Relative timeout,
777                           GNUNET_DATASTORE_ContinuationWithStatus cont,
778                           void *cont_cls)
779 {
780   struct GNUNET_DATASTORE_QueueEntry *qe;
781   struct ReserveMessage *rm;
782   union QueueContext qc;
783
784   if (cont == NULL)
785     cont = &drop_status_cont;
786 #if DEBUG_DATASTORE
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "Asked to reserve %llu bytes of data and %u entries'\n",
789               (unsigned long long) amount,
790               (unsigned int) entries);
791 #endif
792   qc.sc.cont = cont;
793   qc.sc.cont_cls = cont_cls;
794   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
795                          queue_priority, max_queue_size, timeout,
796                          &process_status_message, &qc);
797   if (qe == NULL)
798     return NULL;
799   rm = (struct ReserveMessage*) &qe[1];
800   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
801   rm->header.size = htons(sizeof (struct ReserveMessage));
802   rm->entries = htonl(entries);
803   rm->amount = GNUNET_htonll(amount);
804   process_queue (h);
805   return qe;
806 }
807
808
809 /**
810  * Signal that all of the data for which a reservation was made has
811  * been stored and that whatever excess space might have been reserved
812  * can now be released.
813  *
814  * @param h handle to the datastore
815  * @param rid reservation ID (value of "success" in original continuation
816  *        from the "reserve" function).
817  * @param queue_priority ranking of this request in the priority queue
818  * @param max_queue_size at what queue size should this request be dropped
819  *        (if other requests of higher priority are in the queue)
820  * @param queue_priority ranking of this request in the priority queue
821  * @param max_queue_size at what queue size should this request be dropped
822  *        (if other requests of higher priority are in the queue)
823  * @param timeout how long to wait at most for a response
824  * @param cont continuation to call when done
825  * @param cont_cls closure for cont
826  * @return NULL if the entry was not queued, otherwise a handle that can be used to
827  *         cancel; note that even if NULL is returned, the callback will be invoked
828  *         (or rather, will already have been invoked)
829  */
830 struct GNUNET_DATASTORE_QueueEntry *
831 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
832                                   int rid,
833                                   unsigned int queue_priority,
834                                   unsigned int max_queue_size,
835                                   struct GNUNET_TIME_Relative timeout,
836                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
837                                   void *cont_cls)
838 {
839   struct GNUNET_DATASTORE_QueueEntry *qe;
840   struct ReleaseReserveMessage *rrm;
841   union QueueContext qc;
842
843   if (cont == NULL)
844     cont = &drop_status_cont;
845 #if DEBUG_DATASTORE
846   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
847               "Asked to release reserve %d\n",
848               rid);
849 #endif
850   qc.sc.cont = cont;
851   qc.sc.cont_cls = cont_cls;
852   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
853                          queue_priority, max_queue_size, timeout,
854                          &process_status_message, &qc);
855   if (qe == NULL)
856     return NULL;
857   rrm = (struct ReleaseReserveMessage*) &qe[1];
858   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
859   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
860   rrm->rid = htonl(rid);
861   process_queue (h);
862   return qe;
863 }
864
865
866 /**
867  * Update a value in the datastore.
868  *
869  * @param h handle to the datastore
870  * @param uid identifier for the value
871  * @param priority how much to increase the priority of the value
872  * @param expiration new expiration value should be MAX of existing and this argument
873  * @param queue_priority ranking of this request in the priority queue
874  * @param max_queue_size at what queue size should this request be dropped
875  *        (if other requests of higher priority are in the queue)
876  * @param timeout how long to wait at most for a response
877  * @param cont continuation to call when done
878  * @param cont_cls closure for cont
879  * @return NULL if the entry was not queued, otherwise a handle that can be used to
880  *         cancel; note that even if NULL is returned, the callback will be invoked
881  *         (or rather, will already have been invoked)
882  */
883 struct GNUNET_DATASTORE_QueueEntry *
884 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
885                          unsigned long long uid,
886                          uint32_t priority,
887                          struct GNUNET_TIME_Absolute expiration,
888                          unsigned int queue_priority,
889                          unsigned int max_queue_size,
890                          struct GNUNET_TIME_Relative timeout,
891                          GNUNET_DATASTORE_ContinuationWithStatus cont,
892                          void *cont_cls)
893 {
894   struct GNUNET_DATASTORE_QueueEntry *qe;
895   struct UpdateMessage *um;
896   union QueueContext qc;
897
898   if (cont == NULL)
899     cont = &drop_status_cont;
900 #if DEBUG_DATASTORE
901   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
902               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
903               uid,
904               (unsigned int) priority,
905               (unsigned long long) expiration.value);
906 #endif
907   qc.sc.cont = cont;
908   qc.sc.cont_cls = cont_cls;
909   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
910                          queue_priority, max_queue_size, timeout,
911                          &process_status_message, &qc);
912   if (qe == NULL)
913     return NULL;
914   um = (struct UpdateMessage*) &qe[1];
915   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
916   um->header.size = htons(sizeof (struct UpdateMessage));
917   um->priority = htonl(priority);
918   um->expiration = GNUNET_TIME_absolute_hton(expiration);
919   um->uid = GNUNET_htonll(uid);
920   process_queue (h);
921   return qe;
922 }
923
924
925 /**
926  * Explicitly remove some content from the database.
927  * The "cont"inuation will be called with status
928  * "GNUNET_OK" if content was removed, "GNUNET_NO"
929  * if no matching entry was found and "GNUNET_SYSERR"
930  * on all other types of errors.
931  *
932  * @param h handle to the datastore
933  * @param key key for the value
934  * @param size number of bytes in data
935  * @param data content stored
936  * @param queue_priority ranking of this request in the priority queue
937  * @param max_queue_size at what queue size should this request be dropped
938  *        (if other requests of higher priority are in the queue)
939  * @param timeout how long to wait at most for a response
940  * @param cont continuation to call when done
941  * @param cont_cls closure for cont
942  * @return NULL if the entry was not queued, otherwise a handle that can be used to
943  *         cancel; note that even if NULL is returned, the callback will be invoked
944  *         (or rather, will already have been invoked)
945  */
946 struct GNUNET_DATASTORE_QueueEntry *
947 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
948                          const GNUNET_HashCode *key,
949                          uint32_t size, 
950                          const void *data,
951                          unsigned int queue_priority,
952                          unsigned int max_queue_size,
953                          struct GNUNET_TIME_Relative timeout,
954                          GNUNET_DATASTORE_ContinuationWithStatus cont,
955                          void *cont_cls)
956 {
957   struct GNUNET_DATASTORE_QueueEntry *qe;
958   struct DataMessage *dm;
959   size_t msize;
960   union QueueContext qc;
961
962   if (cont == NULL)
963     cont = &drop_status_cont;
964 #if DEBUG_DATASTORE
965   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
966               "Asked to remove %u bytes under key `%s'\n",
967               size,
968               GNUNET_h2s (key));
969 #endif
970   qc.sc.cont = cont;
971   qc.sc.cont_cls = cont_cls;
972   msize = sizeof(struct DataMessage) + size;
973   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
974   qe = make_queue_entry (h, msize,
975                          queue_priority, max_queue_size, timeout,
976                          &process_status_message, &qc);
977   if (qe == NULL)
978     return NULL;
979   dm = (struct DataMessage*) &qe[1];
980   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
981   dm->header.size = htons(msize);
982   dm->rid = htonl(0);
983   dm->size = htonl(size);
984   dm->type = htonl(0);
985   dm->priority = htonl(0);
986   dm->anonymity = htonl(0);
987   dm->uid = GNUNET_htonll(0);
988   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
989   dm->key = *key;
990   memcpy (&dm[1], data, size);
991   process_queue (h);
992   return qe;
993 }
994
995
996 /**
997  * Type of a function to call when we receive a message
998  * from the service.
999  *
1000  * @param cls closure
1001  * @param msg message received, NULL on timeout or fatal error
1002  */
1003 static void 
1004 process_result_message (void *cls,
1005                         const struct GNUNET_MessageHeader * msg)
1006 {
1007   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1008   struct GNUNET_DATASTORE_Handle *h = qe->h;
1009   struct ResultContext rc = qe->qc.rc;
1010   const struct DataMessage *dm;
1011
1012   GNUNET_assert (h->queue_head == qe);
1013   if (msg == NULL)
1014     {
1015 #if DEBUG_DATASTORE
1016       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1017                   _("Failed to receive response from datastore\n"));
1018 #endif
1019       free_queue_entry (qe);
1020       do_disconnect (h);
1021       rc.iter (rc.iter_cls,
1022                NULL, 0, NULL, 0, 0, 0, 
1023                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1024       return;
1025     }
1026   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1027     {
1028       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1029 #if DEBUG_DATASTORE
1030       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1031                   "Received end of result set\n");
1032 #endif
1033       free_queue_entry (qe);
1034       rc.iter (rc.iter_cls,
1035                NULL, 0, NULL, 0, 0, 0, 
1036                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1037       process_queue (h);
1038       return;
1039     }
1040   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1041        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1042        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1043     {
1044       GNUNET_break (0);
1045       free_queue_entry (qe);
1046       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1047       do_disconnect (h);
1048       rc.iter (rc.iter_cls,
1049                NULL, 0, NULL, 0, 0, 0, 
1050                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1051       return;
1052     }
1053   dm = (const struct DataMessage*) msg;
1054 #if DEBUG_DATASTORE
1055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056               "Received result %llu with type %u and size %u with key %s\n",
1057               (unsigned long long) GNUNET_ntohll(dm->uid),
1058               ntohl(dm->type),
1059               ntohl(dm->size),
1060               GNUNET_h2s(&dm->key));
1061 #endif
1062   rc.iter (rc.iter_cls,
1063            &dm->key,
1064            ntohl(dm->size),
1065            &dm[1],
1066            ntohl(dm->type),
1067            ntohl(dm->priority),
1068            ntohl(dm->anonymity),
1069            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1070            GNUNET_ntohll(dm->uid));
1071 }
1072
1073
1074 /**
1075  * Get a random value from the datastore.
1076  *
1077  * @param h handle to the datastore
1078  * @param queue_priority ranking of this request in the priority queue
1079  * @param max_queue_size at what queue size should this request be dropped
1080  *        (if other requests of higher priority are in the queue)
1081  * @param timeout how long to wait at most for a response
1082  * @param iter function to call on a random value; it
1083  *        will be called once with a value (if available)
1084  *        and always once with a value of NULL.
1085  * @param iter_cls closure for iter
1086  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1087  *         cancel; note that even if NULL is returned, the callback will be invoked
1088  *         (or rather, will already have been invoked)
1089  */
1090 struct GNUNET_DATASTORE_QueueEntry *
1091 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1092                              unsigned int queue_priority,
1093                              unsigned int max_queue_size,
1094                              struct GNUNET_TIME_Relative timeout,
1095                              GNUNET_DATASTORE_Iterator iter, 
1096                              void *iter_cls)
1097 {
1098   struct GNUNET_DATASTORE_QueueEntry *qe;
1099   struct GNUNET_MessageHeader *m;
1100   union QueueContext qc;
1101
1102 #if DEBUG_DATASTORE
1103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104               "Asked to get random entry in %llu ms\n",
1105               (unsigned long long) timeout.value);
1106 #endif
1107   qc.rc.iter = iter;
1108   qc.rc.iter_cls = iter_cls;
1109   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1110                          queue_priority, max_queue_size, timeout,
1111                          &process_result_message, &qc);
1112   if (qe == NULL)
1113     return NULL;    
1114   m = (struct GNUNET_MessageHeader*) &qe[1];
1115   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1116   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1117   process_queue (h);
1118   return qe;
1119 }
1120
1121
1122
1123 /**
1124  * Iterate over the results for a particular key
1125  * in the datastore.  The iterator will only be called
1126  * once initially; if the first call did contain a
1127  * result, further results can be obtained by calling
1128  * "GNUNET_DATASTORE_get_next" with the given argument.
1129  *
1130  * @param h handle to the datastore
1131  * @param key maybe NULL (to match all entries)
1132  * @param type desired type, 0 for any
1133  * @param queue_priority ranking of this request in the priority queue
1134  * @param max_queue_size at what queue size should this request be dropped
1135  *        (if other requests of higher priority are in the queue)
1136  * @param timeout how long to wait at most for a response
1137  * @param iter function to call on each matching value;
1138  *        will be called once with a NULL value at the end
1139  * @param iter_cls closure for iter
1140  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141  *         cancel; note that even if NULL is returned, the callback will be invoked
1142  *         (or rather, will already have been invoked)
1143  */
1144 struct GNUNET_DATASTORE_QueueEntry *
1145 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1146                       const GNUNET_HashCode * key,
1147                       enum GNUNET_BLOCK_Type type,
1148                       unsigned int queue_priority,
1149                       unsigned int max_queue_size,
1150                       struct GNUNET_TIME_Relative timeout,
1151                       GNUNET_DATASTORE_Iterator iter, 
1152                       void *iter_cls)
1153 {
1154   struct GNUNET_DATASTORE_QueueEntry *qe;
1155   struct GetMessage *gm;
1156   union QueueContext qc;
1157
1158 #if DEBUG_DATASTORE
1159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160               "Asked to look for data of type %u under key `%s'\n",
1161               (unsigned int) type,
1162               GNUNET_h2s (key));
1163 #endif
1164   qc.rc.iter = iter;
1165   qc.rc.iter_cls = iter_cls;
1166   qe = make_queue_entry (h, sizeof(struct GetMessage),
1167                          queue_priority, max_queue_size, timeout,
1168                          &process_result_message, &qc);
1169   if (qe == NULL)
1170     return NULL;
1171   gm = (struct GetMessage*) &qe[1];
1172   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1173   gm->type = htonl(type);
1174   if (key != NULL)
1175     {
1176       gm->header.size = htons(sizeof (struct GetMessage));
1177       gm->key = *key;
1178     }
1179   else
1180     {
1181       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1182     }
1183   process_queue (h);
1184   return qe;
1185 }
1186
1187
1188 /**
1189  * Function called to trigger obtaining the next result
1190  * from the datastore.
1191  * 
1192  * @param h handle to the datastore
1193  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1194  *        iteration (with a final call to "iter" with key/data == NULL).
1195  */
1196 void 
1197 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1198                            int more)
1199 {
1200   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1201   struct ResultContext rc = qe->qc.rc;
1202
1203   GNUNET_assert (NULL != qe);
1204   GNUNET_assert (&process_result_message == qe->response_proc);
1205   if (GNUNET_YES == more)
1206     {     
1207       GNUNET_CLIENT_receive (h->client,
1208                              qe->response_proc,
1209                              qe,
1210                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1211       return;
1212     }
1213   free_queue_entry (qe);
1214   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1215   do_disconnect (h);
1216   rc.iter (rc.iter_cls,
1217            NULL, 0, NULL, 0, 0, 0, 
1218            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1219 }
1220
1221
1222 /**
1223  * Cancel a datastore operation.  The final callback from the
1224  * operation must not have been done yet.
1225  * 
1226  * @param qe operation to cancel
1227  */
1228 void
1229 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1230 {
1231   struct GNUNET_DATASTORE_Handle *h;
1232   int reconnect;
1233
1234   h = qe->h;
1235   reconnect = qe->was_transmitted;
1236   free_queue_entry (qe);
1237   h->queue_size--;
1238   if (reconnect)
1239     {
1240       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1241       do_disconnect (h);
1242     }
1243 }
1244
1245
1246 /* end of datastore_api.c */