a487689e8d98e3b447eebe84a0ea769c48eef27d
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217 };
218
219
220
221 /**
222  * Connect to the datastore service.
223  *
224  * @param cfg configuration to use
225  * @param sched scheduler to use
226  * @return handle to use to access the service
227  */
228 struct GNUNET_DATASTORE_Handle *
229 GNUNET_DATASTORE_connect (const struct
230                           GNUNET_CONFIGURATION_Handle
231                           *cfg,
232                           struct
233                           GNUNET_SCHEDULER_Handle
234                           *sched)
235 {
236   struct GNUNET_CLIENT_Connection *c;
237   struct GNUNET_DATASTORE_Handle *h;
238   
239   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
240   if (c == NULL)
241     return NULL; /* oops */
242   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
243                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
244   h->client = c;
245   h->cfg = cfg;
246   h->sched = sched;
247   return h;
248 }
249
250
251 /**
252  * Transmit DROP message to datastore service.
253  *
254  * @param cls the 'struct GNUNET_DATASTORE_Handle'
255  * @param size number of bytes that can be copied to buf
256  * @param buf where to copy the drop message
257  * @return number of bytes written to buf
258  */
259 static size_t
260 transmit_drop (void *cls,
261                size_t size, 
262                void *buf)
263 {
264   struct GNUNET_DATASTORE_Handle *h = cls;
265   struct GNUNET_MessageHeader *hdr;
266   
267   if (buf == NULL)
268     {
269       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270                   _("Failed to transmit request to drop database.\n"));
271       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
272       return 0;
273     }
274   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
275   hdr = buf;
276   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
277   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
278   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
279   return sizeof(struct GNUNET_MessageHeader);
280 }
281
282
283 /**
284  * Disconnect from the datastore service (and free
285  * associated resources).
286  *
287  * @param h handle to the datastore
288  * @param drop set to GNUNET_YES to delete all data in datastore (!)
289  */
290 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
291                                   int drop)
292 {
293   struct GNUNET_DATASTORE_QueueEntry *qe;
294
295   if (h->client != NULL)
296     {
297       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
298       h->client = NULL;
299     }
300   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
301     {
302       GNUNET_SCHEDULER_cancel (h->sched,
303                                h->reconnect_task);
304       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
305     }
306   while (NULL != (qe = h->queue_head))
307     {
308       GNUNET_assert (NULL != qe->response_proc);
309       qe->response_proc (qe, NULL);
310     }
311   if (GNUNET_YES == drop) 
312     {
313       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
314       if (h->client != NULL)
315         {
316           if (NULL != 
317               GNUNET_CLIENT_notify_transmit_ready (h->client,
318                                                    sizeof(struct GNUNET_MessageHeader),
319                                                    GNUNET_TIME_UNIT_MINUTES,
320                                                    GNUNET_YES,
321                                                    &transmit_drop,
322                                                    h))
323             return;
324           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325         }
326       GNUNET_break (0);
327     }
328   GNUNET_free (h);
329 }
330
331
332 /**
333  * A request has timed out (before being transmitted to the service).
334  *
335  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
336  * @param tc scheduler context
337  */
338 static void
339 timeout_queue_entry (void *cls,
340                      const struct GNUNET_SCHEDULER_TaskContext *tc)
341 {
342   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
343
344   qe->task = GNUNET_SCHEDULER_NO_TASK;
345   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
346   qe->response_proc (qe, NULL);
347 }
348
349
350 /**
351  * Create a new entry for our priority queue (and possibly discard other entires if
352  * the queue is getting too long).
353  *
354  * @param h handle to the datastore
355  * @param msize size of the message to queue
356  * @param queue_priority priority of the entry
357  * @param max_queue_size at what queue size should this request be dropped
358  *        (if other requests of higher priority are in the queue)
359  * @param timeout timeout for the operation
360  * @param response_proc function to call with replies (can be NULL)
361  * @param qc client context (NOT a closure for response_proc)
362  * @return NULL if the queue is full (and this entry was dropped)
363  */
364 static struct GNUNET_DATASTORE_QueueEntry *
365 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
366                   size_t msize,
367                   unsigned int queue_priority,
368                   unsigned int max_queue_size,
369                   struct GNUNET_TIME_Relative timeout,
370                   GNUNET_CLIENT_MessageHandler response_proc,            
371                   const union QueueContext *qc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *ret;
374   struct GNUNET_DATASTORE_QueueEntry *pos;
375   unsigned int c;
376
377   c = 0;
378   pos = h->queue_head;
379   while ( (pos != NULL) &&
380           (c < max_queue_size) &&
381           (pos->priority >= queue_priority) )
382     {
383       c++;
384       pos = pos->next;
385     }
386   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
387   ret->h = h;
388   ret->response_proc = response_proc;
389   ret->qc = *qc;
390   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
391   ret->priority = queue_priority;
392   ret->max_queue = max_queue_size;
393   ret->message_size = msize;
394   ret->was_transmitted = GNUNET_NO;
395   if (pos == NULL)
396     {
397       /* append at the tail */
398       pos = h->queue_tail;
399     }
400   else
401     {
402       pos = pos->prev; 
403       /* do not insert at HEAD if HEAD query was already
404          transmitted and we are still receiving replies! */
405       if ( (pos == NULL) &&
406            (h->queue_head->was_transmitted) )
407         pos = h->queue_head;
408     }
409   c++;
410   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
411                                      h->queue_tail,
412                                      pos,
413                                      ret);
414   h->queue_size++;
415   if (c > max_queue_size)
416     {
417       response_proc (ret, NULL);
418       return NULL;
419     }
420   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
421                                             timeout,
422                                             &timeout_queue_entry,
423                                             ret);
424   pos = ret->next;
425   while (pos != NULL) 
426     {
427       if (pos->max_queue < h->queue_size)
428         {
429           GNUNET_assert (pos->response_proc != NULL);
430           pos->response_proc (pos, NULL);
431           break;
432         }
433       pos = pos->next;
434     }
435   return ret;
436 }
437
438
439 /**
440  * Process entries in the queue (or do nothing if we are already
441  * doing so).
442  * 
443  * @param h handle to the datastore
444  */
445 static void
446 process_queue (struct GNUNET_DATASTORE_Handle *h);
447
448
449 /**
450  * Try reconnecting to the datastore service.
451  *
452  * @param cls the 'struct GNUNET_DATASTORE_Handle'
453  * @param tc scheduler context
454  */
455 static void
456 try_reconnect (void *cls,
457                const struct GNUNET_SCHEDULER_TaskContext *tc)
458 {
459   struct GNUNET_DATASTORE_Handle *h = cls;
460
461   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
462     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
463   else
464     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
465   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
466     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
467   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
468   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
469   if (h->client == NULL)
470     return;
471   process_queue (h);
472 }
473
474
475 /**
476  * Disconnect from the service and then try reconnecting to the datastore service
477  * after some delay.
478  *
479  * @param h handle to datastore to disconnect and reconnect
480  */
481 static void
482 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
483 {
484   if (h->client == NULL)
485     return;
486   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
487   h->client = NULL;
488   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
489                                                     h->retry_time,
490                                                     &try_reconnect,
491                                                     h);      
492 }
493
494
495 /**
496  * Transmit request from queue to datastore service.
497  *
498  * @param cls the 'struct GNUNET_DATASTORE_Handle'
499  * @param size number of bytes that can be copied to buf
500  * @param buf where to copy the drop message
501  * @return number of bytes written to buf
502  */
503 static size_t
504 transmit_request (void *cls,
505                   size_t size, 
506                   void *buf)
507 {
508   struct GNUNET_DATASTORE_Handle *h = cls;
509   struct GNUNET_DATASTORE_QueueEntry *qe;
510   size_t msize;
511
512   h->th = NULL;
513   if (NULL == (qe = h->queue_head))
514     return 0; /* no entry in queue */
515   if (buf == NULL)
516     {
517       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
518                   _("Failed to transmit request to database.\n"));
519       do_disconnect (h);
520       return 0;
521     }
522   if (size < (msize = qe->message_size))
523     {
524       process_queue (h);
525       return 0;
526     }
527   memcpy (buf, &qe[1], msize);
528   qe->was_transmitted = GNUNET_YES;
529   GNUNET_SCHEDULER_cancel (h->sched,
530                            qe->task);
531   qe->task = GNUNET_SCHEDULER_NO_TASK;
532   GNUNET_CLIENT_receive (h->client,
533                          qe->response_proc,
534                          qe,
535                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
536   return msize;
537 }
538
539
540 /**
541  * Process entries in the queue (or do nothing if we are already
542  * doing so).
543  * 
544  * @param h handle to the datastore
545  */
546 static void
547 process_queue (struct GNUNET_DATASTORE_Handle *h)
548 {
549   struct GNUNET_DATASTORE_QueueEntry *qe;
550
551   if (NULL == (qe = h->queue_head))
552     return; /* no entry in queue */
553   if (qe->was_transmitted == GNUNET_YES)
554     return; /* waiting for replies */
555   if (h->th != NULL)
556     return; /* request pending */
557   if (h->client == NULL)
558     return; /* waiting for reconnect */
559   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
560                                                qe->message_size,
561                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
562                                                GNUNET_YES,
563                                                &transmit_request,
564                                                h);
565 }
566
567
568 /**
569  * Dummy continuation used to do nothing (but be non-zero).
570  *
571  * @param cls closure
572  * @param result result 
573  * @param emsg error message
574  */
575 static void
576 drop_status_cont (void *cls, int result, const char *emsg)
577 {
578   /* do nothing */
579 }
580
581
582 static void
583 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
584 {
585   struct GNUNET_DATASTORE_Handle *h = qe->h;
586
587   GNUNET_CONTAINER_DLL_remove (h->queue_head,
588                                h->queue_tail,
589                                qe);
590   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
591     {
592       GNUNET_SCHEDULER_cancel (h->sched,
593                                qe->task);
594       qe->task = GNUNET_SCHEDULER_NO_TASK;
595     }
596   h->queue_size--;
597   GNUNET_free (qe);
598 }
599
600 /**
601  * Type of a function to call when we receive a message
602  * from the service.
603  *
604  * @param cls closure
605  * @param msg message received, NULL on timeout or fatal error
606  */
607 static void 
608 process_status_message (void *cls,
609                         const struct
610                         GNUNET_MessageHeader * msg)
611 {
612   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
613   struct GNUNET_DATASTORE_Handle *h = qe->h;
614   struct StatusContext rc = qe->qc.sc;
615   const struct StatusMessage *sm;
616   const char *emsg;
617   int32_t status;
618
619   free_queue_entry (qe);
620   if (msg == NULL)
621     {      
622       if (NULL == h->client)
623         return; /* forced disconnect */
624       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
625                   _("Failed to receive response from database.\n"));
626       do_disconnect (h);
627       return;
628     }
629
630   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
631        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
632     {
633       GNUNET_break (0);
634       h->retry_time = GNUNET_TIME_UNIT_ZERO;
635       do_disconnect (h);
636       rc.cont (rc.cont_cls, 
637                GNUNET_SYSERR,
638                _("Error reading response from datastore service"));
639       return;
640     }
641   sm = (const struct StatusMessage*) msg;
642   status = ntohl(sm->status);
643   emsg = NULL;
644   if (ntohs(msg->size) > sizeof(struct StatusMessage))
645     {
646       emsg = (const char*) &sm[1];
647       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
648         {
649           GNUNET_break (0);
650           emsg = _("Invalid error message received from datastore service");
651         }
652     }  
653   if ( (status == GNUNET_SYSERR) &&
654        (emsg == NULL) )
655     {
656       GNUNET_break (0);
657       emsg = _("Invalid error message received from datastore service");
658     }
659 #if DEBUG_DATASTORE
660   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
661               "Received status %d/%s\n",
662               (int) status,
663               emsg);
664 #endif
665   rc.cont (rc.cont_cls, 
666            status,
667            emsg);
668   process_queue (h);
669 }
670
671
672 /**
673  * Store an item in the datastore.  If the item is already present,
674  * the priorities are summed up and the higher expiration time and
675  * lower anonymity level is used.
676  *
677  * @param h handle to the datastore
678  * @param rid reservation ID to use (from "reserve"); use 0 if no
679  *            prior reservation was made
680  * @param key key for the value
681  * @param size number of bytes in data
682  * @param data content stored
683  * @param type type of the content
684  * @param priority priority of the content
685  * @param anonymity anonymity-level for the content
686  * @param expiration expiration time for the content
687  * @param queue_priority ranking of this request in the priority queue
688  * @param max_queue_size at what queue size should this request be dropped
689  *        (if other requests of higher priority are in the queue)
690  * @param timeout timeout for the operation
691  * @param cont continuation to call when done
692  * @param cont_cls closure for cont
693  * @return NULL if the entry was not queued, otherwise a handle that can be used to
694  *         cancel; note that even if NULL is returned, the callback will be invoked
695  *         (or rather, will already have been invoked)
696  */
697 struct GNUNET_DATASTORE_QueueEntry *
698 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
699                       int rid,
700                       const GNUNET_HashCode * key,
701                       uint32_t size,
702                       const void *data,
703                       enum GNUNET_BLOCK_Type type,
704                       uint32_t priority,
705                       uint32_t anonymity,
706                       struct GNUNET_TIME_Absolute expiration,
707                       unsigned int queue_priority,
708                       unsigned int max_queue_size,
709                       struct GNUNET_TIME_Relative timeout,
710                       GNUNET_DATASTORE_ContinuationWithStatus cont,
711                       void *cont_cls)
712 {
713   struct GNUNET_DATASTORE_QueueEntry *qe;
714   struct DataMessage *dm;
715   size_t msize;
716   union QueueContext qc;
717
718 #if DEBUG_DATASTORE
719   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
720               "Asked to put %u bytes of data under key `%s'\n",
721               size,
722               GNUNET_h2s (key));
723 #endif
724   msize = sizeof(struct DataMessage) + size;
725   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
726   qc.sc.cont = cont;
727   qc.sc.cont_cls = cont_cls;
728   qe = make_queue_entry (h, msize,
729                          queue_priority, max_queue_size, timeout,
730                          &process_status_message, &qc);
731   if (qe == NULL)
732     return NULL;
733   dm = (struct DataMessage* ) &qe[1];
734   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
735   dm->header.size = htons(msize);
736   dm->rid = htonl(rid);
737   dm->size = htonl(size);
738   dm->type = htonl(type);
739   dm->priority = htonl(priority);
740   dm->anonymity = htonl(anonymity);
741   dm->uid = GNUNET_htonll(0);
742   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
743   dm->key = *key;
744   memcpy (&dm[1], data, size);
745   process_queue (h);
746   return qe;
747 }
748
749
750 /**
751  * Reserve space in the datastore.  This function should be used
752  * to avoid "out of space" failures during a longer sequence of "put"
753  * operations (for example, when a file is being inserted).
754  *
755  * @param h handle to the datastore
756  * @param amount how much space (in bytes) should be reserved (for content only)
757  * @param entries how many entries will be created (to calculate per-entry overhead)
758  * @param queue_priority ranking of this request in the priority queue
759  * @param max_queue_size at what queue size should this request be dropped
760  *        (if other requests of higher priority are in the queue)
761  * @param timeout how long to wait at most for a response (or before dying in queue)
762  * @param cont continuation to call when done; "success" will be set to
763  *             a positive reservation value if space could be reserved.
764  * @param cont_cls closure for cont
765  * @return NULL if the entry was not queued, otherwise a handle that can be used to
766  *         cancel; note that even if NULL is returned, the callback will be invoked
767  *         (or rather, will already have been invoked)
768  */
769 struct GNUNET_DATASTORE_QueueEntry *
770 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
771                           uint64_t amount,
772                           uint32_t entries,
773                           unsigned int queue_priority,
774                           unsigned int max_queue_size,
775                           struct GNUNET_TIME_Relative timeout,
776                           GNUNET_DATASTORE_ContinuationWithStatus cont,
777                           void *cont_cls)
778 {
779   struct GNUNET_DATASTORE_QueueEntry *qe;
780   struct ReserveMessage *rm;
781   union QueueContext qc;
782
783   if (cont == NULL)
784     cont = &drop_status_cont;
785 #if DEBUG_DATASTORE
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Asked to reserve %llu bytes of data and %u entries'\n",
788               (unsigned long long) amount,
789               (unsigned int) entries);
790 #endif
791   qc.sc.cont = cont;
792   qc.sc.cont_cls = cont_cls;
793   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
794                          queue_priority, max_queue_size, timeout,
795                          &process_status_message, &qc);
796   if (qe == NULL)
797     return NULL;
798   rm = (struct ReserveMessage*) &qe[1];
799   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
800   rm->header.size = htons(sizeof (struct ReserveMessage));
801   rm->entries = htonl(entries);
802   rm->amount = GNUNET_htonll(amount);
803   process_queue (h);
804   return qe;
805 }
806
807
808 /**
809  * Signal that all of the data for which a reservation was made has
810  * been stored and that whatever excess space might have been reserved
811  * can now be released.
812  *
813  * @param h handle to the datastore
814  * @param rid reservation ID (value of "success" in original continuation
815  *        from the "reserve" function).
816  * @param queue_priority ranking of this request in the priority queue
817  * @param max_queue_size at what queue size should this request be dropped
818  *        (if other requests of higher priority are in the queue)
819  * @param queue_priority ranking of this request in the priority queue
820  * @param max_queue_size at what queue size should this request be dropped
821  *        (if other requests of higher priority are in the queue)
822  * @param timeout how long to wait at most for a response
823  * @param cont continuation to call when done
824  * @param cont_cls closure for cont
825  * @return NULL if the entry was not queued, otherwise a handle that can be used to
826  *         cancel; note that even if NULL is returned, the callback will be invoked
827  *         (or rather, will already have been invoked)
828  */
829 struct GNUNET_DATASTORE_QueueEntry *
830 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
831                                   int rid,
832                                   unsigned int queue_priority,
833                                   unsigned int max_queue_size,
834                                   struct GNUNET_TIME_Relative timeout,
835                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
836                                   void *cont_cls)
837 {
838   struct GNUNET_DATASTORE_QueueEntry *qe;
839   struct ReleaseReserveMessage *rrm;
840   union QueueContext qc;
841
842   if (cont == NULL)
843     cont = &drop_status_cont;
844 #if DEBUG_DATASTORE
845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846               "Asked to release reserve %d\n",
847               rid);
848 #endif
849   qc.sc.cont = cont;
850   qc.sc.cont_cls = cont_cls;
851   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
852                          queue_priority, max_queue_size, timeout,
853                          &process_status_message, &qc);
854   if (qe == NULL)
855     return NULL;
856   rrm = (struct ReleaseReserveMessage*) &qe[1];
857   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
858   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
859   rrm->rid = htonl(rid);
860   process_queue (h);
861   return qe;
862 }
863
864
865 /**
866  * Update a value in the datastore.
867  *
868  * @param h handle to the datastore
869  * @param uid identifier for the value
870  * @param priority how much to increase the priority of the value
871  * @param expiration new expiration value should be MAX of existing and this argument
872  * @param queue_priority ranking of this request in the priority queue
873  * @param max_queue_size at what queue size should this request be dropped
874  *        (if other requests of higher priority are in the queue)
875  * @param timeout how long to wait at most for a response
876  * @param cont continuation to call when done
877  * @param cont_cls closure for cont
878  * @return NULL if the entry was not queued, otherwise a handle that can be used to
879  *         cancel; note that even if NULL is returned, the callback will be invoked
880  *         (or rather, will already have been invoked)
881  */
882 struct GNUNET_DATASTORE_QueueEntry *
883 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
884                          unsigned long long uid,
885                          uint32_t priority,
886                          struct GNUNET_TIME_Absolute expiration,
887                          unsigned int queue_priority,
888                          unsigned int max_queue_size,
889                          struct GNUNET_TIME_Relative timeout,
890                          GNUNET_DATASTORE_ContinuationWithStatus cont,
891                          void *cont_cls)
892 {
893   struct GNUNET_DATASTORE_QueueEntry *qe;
894   struct UpdateMessage *um;
895   union QueueContext qc;
896
897   if (cont == NULL)
898     cont = &drop_status_cont;
899 #if DEBUG_DATASTORE
900   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
901               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
902               uid,
903               (unsigned int) priority,
904               (unsigned long long) expiration.value);
905 #endif
906   qc.sc.cont = cont;
907   qc.sc.cont_cls = cont_cls;
908   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
909                          queue_priority, max_queue_size, timeout,
910                          &process_status_message, &qc);
911   if (qe == NULL)
912     return NULL;
913   um = (struct UpdateMessage*) &qe[1];
914   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
915   um->header.size = htons(sizeof (struct UpdateMessage));
916   um->priority = htonl(priority);
917   um->expiration = GNUNET_TIME_absolute_hton(expiration);
918   um->uid = GNUNET_htonll(uid);
919   process_queue (h);
920   return qe;
921 }
922
923
924 /**
925  * Explicitly remove some content from the database.
926  * The "cont"inuation will be called with status
927  * "GNUNET_OK" if content was removed, "GNUNET_NO"
928  * if no matching entry was found and "GNUNET_SYSERR"
929  * on all other types of errors.
930  *
931  * @param h handle to the datastore
932  * @param key key for the value
933  * @param size number of bytes in data
934  * @param data content stored
935  * @param queue_priority ranking of this request in the priority queue
936  * @param max_queue_size at what queue size should this request be dropped
937  *        (if other requests of higher priority are in the queue)
938  * @param timeout how long to wait at most for a response
939  * @param cont continuation to call when done
940  * @param cont_cls closure for cont
941  * @return NULL if the entry was not queued, otherwise a handle that can be used to
942  *         cancel; note that even if NULL is returned, the callback will be invoked
943  *         (or rather, will already have been invoked)
944  */
945 struct GNUNET_DATASTORE_QueueEntry *
946 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
947                          const GNUNET_HashCode *key,
948                          uint32_t size, 
949                          const void *data,
950                          unsigned int queue_priority,
951                          unsigned int max_queue_size,
952                          struct GNUNET_TIME_Relative timeout,
953                          GNUNET_DATASTORE_ContinuationWithStatus cont,
954                          void *cont_cls)
955 {
956   struct GNUNET_DATASTORE_QueueEntry *qe;
957   struct DataMessage *dm;
958   size_t msize;
959   union QueueContext qc;
960
961   if (cont == NULL)
962     cont = &drop_status_cont;
963 #if DEBUG_DATASTORE
964   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
965               "Asked to remove %u bytes under key `%s'\n",
966               size,
967               GNUNET_h2s (key));
968 #endif
969   qc.sc.cont = cont;
970   qc.sc.cont_cls = cont_cls;
971   msize = sizeof(struct DataMessage) + size;
972   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
973   qe = make_queue_entry (h, msize,
974                          queue_priority, max_queue_size, timeout,
975                          &process_status_message, &qc);
976   if (qe == NULL)
977     return NULL;
978   dm = (struct DataMessage*) &qe[1];
979   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
980   dm->header.size = htons(msize);
981   dm->rid = htonl(0);
982   dm->size = htonl(size);
983   dm->type = htonl(0);
984   dm->priority = htonl(0);
985   dm->anonymity = htonl(0);
986   dm->uid = GNUNET_htonll(0);
987   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
988   dm->key = *key;
989   memcpy (&dm[1], data, size);
990   process_queue (h);
991   return qe;
992 }
993
994
995 /**
996  * Type of a function to call when we receive a message
997  * from the service.
998  *
999  * @param cls closure
1000  * @param msg message received, NULL on timeout or fatal error
1001  */
1002 static void 
1003 process_result_message (void *cls,
1004                         const struct GNUNET_MessageHeader * msg)
1005 {
1006   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1007   struct GNUNET_DATASTORE_Handle *h = qe->h;
1008   struct ResultContext rc = qe->qc.rc;
1009   const struct DataMessage *dm;
1010   int was_transmitted;
1011
1012   if (msg == NULL)
1013     {
1014 #if DEBUG_DATASTORE
1015       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1016                   "Failed to receive response from datastore or queue full\n");
1017 #endif
1018       was_transmitted = qe->was_transmitted;
1019       free_queue_entry (qe);
1020       if (GNUNET_YES == was_transmitted)        
1021         do_disconnect (h);
1022       rc.iter (rc.iter_cls,
1023                NULL, 0, NULL, 0, 0, 0, 
1024                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1025       return;
1026     }
1027   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1028   GNUNET_assert (h->queue_head == qe);
1029   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1030     {
1031       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1032 #if DEBUG_DATASTORE
1033       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1034                   "Received end of result set\n");
1035 #endif
1036       free_queue_entry (qe);
1037       rc.iter (rc.iter_cls,
1038                NULL, 0, NULL, 0, 0, 0, 
1039                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1040       process_queue (h);
1041       return;
1042     }
1043   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1044        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1045        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1046     {
1047       GNUNET_break (0);
1048       free_queue_entry (qe);
1049       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1050       do_disconnect (h);
1051       rc.iter (rc.iter_cls,
1052                NULL, 0, NULL, 0, 0, 0, 
1053                GNUNET_TIME_UNIT_ZERO_ABS, 0);   
1054       return;
1055     }
1056   dm = (const struct DataMessage*) msg;
1057 #if DEBUG_DATASTORE
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Received result %llu with type %u and size %u with key %s\n",
1060               (unsigned long long) GNUNET_ntohll(dm->uid),
1061               ntohl(dm->type),
1062               ntohl(dm->size),
1063               GNUNET_h2s(&dm->key));
1064 #endif
1065   rc.iter (rc.iter_cls,
1066            &dm->key,
1067            ntohl(dm->size),
1068            &dm[1],
1069            ntohl(dm->type),
1070            ntohl(dm->priority),
1071            ntohl(dm->anonymity),
1072            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1073            GNUNET_ntohll(dm->uid));
1074 }
1075
1076
1077 /**
1078  * Get a random value from the datastore.
1079  *
1080  * @param h handle to the datastore
1081  * @param queue_priority ranking of this request in the priority queue
1082  * @param max_queue_size at what queue size should this request be dropped
1083  *        (if other requests of higher priority are in the queue)
1084  * @param timeout how long to wait at most for a response
1085  * @param iter function to call on a random value; it
1086  *        will be called once with a value (if available)
1087  *        and always once with a value of NULL.
1088  * @param iter_cls closure for iter
1089  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1090  *         cancel; note that even if NULL is returned, the callback will be invoked
1091  *         (or rather, will already have been invoked)
1092  */
1093 struct GNUNET_DATASTORE_QueueEntry *
1094 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1095                              unsigned int queue_priority,
1096                              unsigned int max_queue_size,
1097                              struct GNUNET_TIME_Relative timeout,
1098                              GNUNET_DATASTORE_Iterator iter, 
1099                              void *iter_cls)
1100 {
1101   struct GNUNET_DATASTORE_QueueEntry *qe;
1102   struct GNUNET_MessageHeader *m;
1103   union QueueContext qc;
1104
1105 #if DEBUG_DATASTORE
1106   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1107               "Asked to get random entry in %llu ms\n",
1108               (unsigned long long) timeout.value);
1109 #endif
1110   qc.rc.iter = iter;
1111   qc.rc.iter_cls = iter_cls;
1112   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1113                          queue_priority, max_queue_size, timeout,
1114                          &process_result_message, &qc);
1115   if (qe == NULL)
1116     return NULL;    
1117   m = (struct GNUNET_MessageHeader*) &qe[1];
1118   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1119   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1120   process_queue (h);
1121   return qe;
1122 }
1123
1124
1125
1126 /**
1127  * Iterate over the results for a particular key
1128  * in the datastore.  The iterator will only be called
1129  * once initially; if the first call did contain a
1130  * result, further results can be obtained by calling
1131  * "GNUNET_DATASTORE_get_next" with the given argument.
1132  *
1133  * @param h handle to the datastore
1134  * @param key maybe NULL (to match all entries)
1135  * @param type desired type, 0 for any
1136  * @param queue_priority ranking of this request in the priority queue
1137  * @param max_queue_size at what queue size should this request be dropped
1138  *        (if other requests of higher priority are in the queue)
1139  * @param timeout how long to wait at most for a response
1140  * @param iter function to call on each matching value;
1141  *        will be called once with a NULL value at the end
1142  * @param iter_cls closure for iter
1143  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1144  *         cancel; note that even if NULL is returned, the callback will be invoked
1145  *         (or rather, will already have been invoked)
1146  */
1147 struct GNUNET_DATASTORE_QueueEntry *
1148 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1149                       const GNUNET_HashCode * key,
1150                       enum GNUNET_BLOCK_Type type,
1151                       unsigned int queue_priority,
1152                       unsigned int max_queue_size,
1153                       struct GNUNET_TIME_Relative timeout,
1154                       GNUNET_DATASTORE_Iterator iter, 
1155                       void *iter_cls)
1156 {
1157   struct GNUNET_DATASTORE_QueueEntry *qe;
1158   struct GetMessage *gm;
1159   union QueueContext qc;
1160
1161 #if DEBUG_DATASTORE
1162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163               "Asked to look for data of type %u under key `%s'\n",
1164               (unsigned int) type,
1165               GNUNET_h2s (key));
1166 #endif
1167   qc.rc.iter = iter;
1168   qc.rc.iter_cls = iter_cls;
1169   qe = make_queue_entry (h, sizeof(struct GetMessage),
1170                          queue_priority, max_queue_size, timeout,
1171                          &process_result_message, &qc);
1172   if (qe == NULL)
1173     return NULL;
1174   gm = (struct GetMessage*) &qe[1];
1175   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1176   gm->type = htonl(type);
1177   if (key != NULL)
1178     {
1179       gm->header.size = htons(sizeof (struct GetMessage));
1180       gm->key = *key;
1181     }
1182   else
1183     {
1184       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1185     }
1186   process_queue (h);
1187   return qe;
1188 }
1189
1190
1191 /**
1192  * Function called to trigger obtaining the next result
1193  * from the datastore.
1194  * 
1195  * @param h handle to the datastore
1196  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1197  *        iteration (with a final call to "iter" with key/data == NULL).
1198  */
1199 void 
1200 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1201                            int more)
1202 {
1203   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1204   struct ResultContext rc = qe->qc.rc;
1205
1206   GNUNET_assert (NULL != qe);
1207   GNUNET_assert (&process_result_message == qe->response_proc);
1208   if (GNUNET_YES == more)
1209     {     
1210       GNUNET_CLIENT_receive (h->client,
1211                              qe->response_proc,
1212                              qe,
1213                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1214       return;
1215     }
1216   free_queue_entry (qe);
1217   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1218   do_disconnect (h);
1219   rc.iter (rc.iter_cls,
1220            NULL, 0, NULL, 0, 0, 0, 
1221            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1222 }
1223
1224
1225 /**
1226  * Cancel a datastore operation.  The final callback from the
1227  * operation must not have been done yet.
1228  * 
1229  * @param qe operation to cancel
1230  */
1231 void
1232 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1233 {
1234   struct GNUNET_DATASTORE_Handle *h;
1235   int reconnect;
1236
1237   h = qe->h;
1238   reconnect = qe->was_transmitted;
1239   free_queue_entry (qe);
1240   h->queue_size--;
1241   if (reconnect)
1242     {
1243       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1244       do_disconnect (h);
1245     }
1246 }
1247
1248
1249 /* end of datastore_api.c */