89c7edba3ea0a08bc3f0efe1c98420e94b6ffbf6
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct GNUNET_DATASTORE_QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct GNUNET_DATASTORE_QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct GNUNET_DATASTORE_QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct GNUNET_DATASTORE_QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct GNUNET_DATASTORE_QueueEntry *qe;
244
245   if (h->client != NULL)
246     {
247       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
248       h->client = NULL;
249     }
250   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
251     {
252       GNUNET_SCHEDULER_cancel (h->sched,
253                                h->reconnect_task);
254       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
255     }
256   while (NULL != (qe = h->queue_head))
257     {
258       if (NULL != qe->response_proc)
259         {
260           qe->response_proc (qe, NULL);
261         }
262       else
263         {
264           GNUNET_CONTAINER_DLL_remove (h->queue_head,
265                                        h->queue_tail,
266                                        qe);
267           if (qe->task != GNUNET_SCHEDULER_NO_TASK)
268             GNUNET_SCHEDULER_cancel (h->sched,
269                                      qe->task);
270           GNUNET_free (qe);
271         }
272     }
273   if (GNUNET_YES == drop) 
274     {
275       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
276       if (h->client != NULL)
277         {
278           if (NULL != 
279               GNUNET_CLIENT_notify_transmit_ready (h->client,
280                                                    sizeof(struct GNUNET_MessageHeader),
281                                                    GNUNET_TIME_UNIT_MINUTES,
282                                                    GNUNET_YES,
283                                                    &transmit_drop,
284                                                    h))
285             return;
286           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
287         }
288       GNUNET_break (0);
289     }
290   GNUNET_free (h);
291 }
292
293
294 /**
295  * A request has timed out (before being transmitted to the service).
296  *
297  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
298  * @param tc scheduler context
299  */
300 static void
301 timeout_queue_entry (void *cls,
302                      const struct GNUNET_SCHEDULER_TaskContext *tc)
303 {
304   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
305
306   qe->task = GNUNET_SCHEDULER_NO_TASK;
307   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
308   qe->response_proc (qe, NULL);
309 }
310
311
312 /**
313  * Create a new entry for our priority queue (and possibly discard other entires if
314  * the queue is getting too long).
315  *
316  * @param h handle to the datastore
317  * @param msize size of the message to queue
318  * @param queue_priority priority of the entry
319  * @param max_queue_size at what queue size should this request be dropped
320  *        (if other requests of higher priority are in the queue)
321  * @param timeout timeout for the operation
322  * @param response_proc function to call with replies (can be NULL)
323  * @param client_ctx client context (NOT a closure for response_proc)
324  * @return NULL if the queue is full (and this entry was dropped)
325  */
326 static struct GNUNET_DATASTORE_QueueEntry *
327 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
328                   size_t msize,
329                   unsigned int queue_priority,
330                   unsigned int max_queue_size,
331                   struct GNUNET_TIME_Relative timeout,
332                   GNUNET_CLIENT_MessageHandler response_proc,            
333                   void *client_ctx)
334 {
335   struct GNUNET_DATASTORE_QueueEntry *ret;
336   struct GNUNET_DATASTORE_QueueEntry *pos;
337   unsigned int c;
338
339   c = 0;
340   pos = h->queue_head;
341   while ( (pos != NULL) &&
342           (c < max_queue_size) &&
343           (pos->priority >= queue_priority) )
344     {
345       c++;
346       pos = pos->next;
347     }
348   if (c >= max_queue_size)
349     return NULL;
350   if (pos == NULL)
351     {
352       /* append at the tail */
353       pos = h->queue_tail;
354     }
355   else
356     {
357       pos = pos->prev; 
358       /* do not insert at HEAD if HEAD query was already
359          transmitted and we are still receiving replies! */
360       if ( (pos == NULL) &&
361            (h->queue_head->was_transmitted) )
362         pos = h->queue_head;
363     }
364   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
365   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
366                                      h->queue_tail,
367                                      pos,
368                                      ret);
369   ret->h = h;
370   ret->response_proc = response_proc;
371   ret->client_ctx = client_ctx;
372   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
373                                             timeout,
374                                             &timeout_queue_entry,
375                                             ret);
376   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
377   ret->priority = queue_priority;
378   ret->max_queue = max_queue_size;
379   ret->message_size = msize;
380   ret->was_transmitted = GNUNET_NO;
381   h->queue_size++;
382   c++;
383   pos = ret->next;
384   while (pos != NULL) 
385     {
386       if (pos->max_queue < h->queue_size)
387         {
388           GNUNET_CONTAINER_DLL_remove (h->queue_head,
389                                        h->queue_tail,
390                                        pos);
391           GNUNET_SCHEDULER_cancel (h->sched,
392                                    pos->task);
393           if (pos->response_proc != NULL)
394             pos->response_proc (pos, NULL);
395           GNUNET_free (pos);
396           h->queue_size--;
397           break;
398         }
399       pos = pos->next;
400     }
401   return ret;
402 }
403
404
405 /**
406  * Process entries in the queue (or do nothing if we are already
407  * doing so).
408  * 
409  * @param h handle to the datastore
410  */
411 static void
412 process_queue (struct GNUNET_DATASTORE_Handle *h);
413
414
415 /**
416  * Try reconnecting to the datastore service.
417  *
418  * @param cls the 'struct GNUNET_DATASTORE_Handle'
419  * @param tc scheduler context
420  */
421 static void
422 try_reconnect (void *cls,
423                const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct GNUNET_DATASTORE_Handle *h = cls;
426
427   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
428     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
429   else
430     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
431   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
432     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
433   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
434   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
435   if (h->client == NULL)
436     return;
437   process_queue (h);
438 }
439
440
441 /**
442  * Disconnect from the service and then try reconnecting to the datastore service
443  * after some delay.
444  *
445  * @param h handle to datastore to disconnect and reconnect
446  */
447 static void
448 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
449 {
450   if (h->client == NULL)
451     return;
452   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
453   h->client = NULL;
454   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
455                                                     h->retry_time,
456                                                     &try_reconnect,
457                                                     h);      
458 }
459
460
461 /**
462  * Transmit request from queue to datastore service.
463  *
464  * @param cls the 'struct GNUNET_DATASTORE_Handle'
465  * @param size number of bytes that can be copied to buf
466  * @param buf where to copy the drop message
467  * @return number of bytes written to buf
468  */
469 static size_t
470 transmit_request (void *cls,
471                   size_t size, 
472                   void *buf)
473 {
474   struct GNUNET_DATASTORE_Handle *h = cls;
475   struct GNUNET_DATASTORE_QueueEntry *qe;
476   size_t msize;
477
478   h->th = NULL;
479   if (NULL == (qe = h->queue_head))
480     return 0; /* no entry in queue */
481   if (buf == NULL)
482     {
483       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
484                   _("Failed to transmit request to database.\n"));
485       do_disconnect (h);
486       return 0;
487     }
488   if (size < (msize = qe->message_size))
489     {
490       process_queue (h);
491       return 0;
492     }
493   memcpy (buf, &qe[1], msize);
494   qe->was_transmitted = GNUNET_YES;
495   GNUNET_SCHEDULER_cancel (h->sched,
496                            qe->task);
497   qe->task = GNUNET_SCHEDULER_NO_TASK;
498   GNUNET_CLIENT_receive (h->client,
499                          qe->response_proc,
500                          qe,
501                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
502   return msize;
503 }
504
505
506 /**
507  * Process entries in the queue (or do nothing if we are already
508  * doing so).
509  * 
510  * @param h handle to the datastore
511  */
512 static void
513 process_queue (struct GNUNET_DATASTORE_Handle *h)
514 {
515   struct GNUNET_DATASTORE_QueueEntry *qe;
516
517   if (NULL == (qe = h->queue_head))
518     return; /* no entry in queue */
519   if (qe->was_transmitted == GNUNET_YES)
520     return; /* waiting for replies */
521   if (h->th != NULL)
522     return; /* request pending */
523   if (h->client == NULL)
524     return; /* waiting for reconnect */
525   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
526                                                qe->message_size,
527                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
528                                                GNUNET_YES,
529                                                &transmit_request,
530                                                h);
531 }
532
533
534
535
536 /**
537  * Context for processing status messages.
538  */
539 struct StatusContext
540 {
541   /**
542    * Continuation to call with the status.
543    */
544   GNUNET_DATASTORE_ContinuationWithStatus cont;
545
546   /**
547    * Closure for cont.
548    */
549   void *cont_cls;
550
551 };
552
553
554 /**
555  * Dummy continuation used to do nothing (but be non-zero).
556  *
557  * @param cls closure
558  * @param result result 
559  * @param emsg error message
560  */
561 static void
562 drop_status_cont (void *cls, int result, const char *emsg)
563 {
564   /* do nothing */
565 }
566
567
568 /**
569  * Type of a function to call when we receive a message
570  * from the service.
571  *
572  * @param cls closure
573  * @param msg message received, NULL on timeout or fatal error
574  */
575 static void 
576 process_status_message (void *cls,
577                         const struct
578                         GNUNET_MessageHeader * msg)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
581   struct GNUNET_DATASTORE_Handle *h = qe->h;
582   struct StatusContext *rc = qe->client_ctx;
583   const struct StatusMessage *sm;
584   const char *emsg;
585   int32_t status;
586
587   GNUNET_CONTAINER_DLL_remove (h->queue_head,
588                                h->queue_tail,
589                                qe);
590   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
591     {
592       GNUNET_SCHEDULER_cancel (h->sched,
593                                qe->task);
594       qe->task = GNUNET_SCHEDULER_NO_TASK;
595     }
596   GNUNET_free (qe);
597   if (msg == NULL)
598     {      
599       if (NULL == h->client)
600         return; /* forced disconnect */
601       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
602                   _("Failed to receive response from database.\n"));
603       do_disconnect (h);
604       return;
605     }
606
607   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
608        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
609     {
610       GNUNET_break (0);
611       h->retry_time = GNUNET_TIME_UNIT_ZERO;
612       do_disconnect (h);
613       rc->cont (rc->cont_cls, 
614                 GNUNET_SYSERR,
615                 _("Error reading response from datastore service"));
616       GNUNET_free (rc);
617       return;
618     }
619   sm = (const struct StatusMessage*) msg;
620   status = ntohl(sm->status);
621   emsg = NULL;
622   if (ntohs(msg->size) > sizeof(struct StatusMessage))
623     {
624       emsg = (const char*) &sm[1];
625       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
626         {
627           GNUNET_break (0);
628           emsg = _("Invalid error message received from datastore service");
629         }
630     }  
631   if ( (status == GNUNET_SYSERR) &&
632        (emsg == NULL) )
633     {
634       GNUNET_break (0);
635       emsg = _("Invalid error message received from datastore service");
636     }
637 #if DEBUG_DATASTORE
638   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
639               "Received status %d/%s\n",
640               (int) status,
641               emsg);
642 #endif
643   rc->cont (rc->cont_cls, 
644             status,
645             emsg);
646   GNUNET_free (rc);  
647   process_queue (h);
648 }
649
650
651 /**
652  * Store an item in the datastore.  If the item is already present,
653  * the priorities are summed up and the higher expiration time and
654  * lower anonymity level is used.
655  *
656  * @param h handle to the datastore
657  * @param rid reservation ID to use (from "reserve"); use 0 if no
658  *            prior reservation was made
659  * @param key key for the value
660  * @param size number of bytes in data
661  * @param data content stored
662  * @param type type of the content
663  * @param priority priority of the content
664  * @param anonymity anonymity-level for the content
665  * @param expiration expiration time for the content
666  * @param queue_priority ranking of this request in the priority queue
667  * @param max_queue_size at what queue size should this request be dropped
668  *        (if other requests of higher priority are in the queue)
669  * @param timeout timeout for the operation
670  * @param cont continuation to call when done
671  * @param cont_cls closure for cont
672  * @return NULL if the entry was not queued, otherwise a handle that can be used to
673  *         cancel; note that even if NULL is returned, the callback will be invoked
674  *         (or rather, will already have been invoked)
675  */
676 struct GNUNET_DATASTORE_QueueEntry *
677 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
678                       int rid,
679                       const GNUNET_HashCode * key,
680                       uint32_t size,
681                       const void *data,
682                       enum GNUNET_BLOCK_Type type,
683                       uint32_t priority,
684                       uint32_t anonymity,
685                       struct GNUNET_TIME_Absolute expiration,
686                       unsigned int queue_priority,
687                       unsigned int max_queue_size,
688                       struct GNUNET_TIME_Relative timeout,
689                       GNUNET_DATASTORE_ContinuationWithStatus cont,
690                       void *cont_cls)
691 {
692   struct StatusContext *scont;
693   struct GNUNET_DATASTORE_QueueEntry *qe;
694   struct DataMessage *dm;
695   size_t msize;
696
697 #if DEBUG_DATASTORE
698   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
699               "Asked to put %u bytes of data under key `%s'\n",
700               size,
701               GNUNET_h2s (key));
702 #endif
703   msize = sizeof(struct DataMessage) + size;
704   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
705   scont = GNUNET_malloc (sizeof (struct StatusContext));
706   scont->cont = cont;
707   scont->cont_cls = cont_cls;
708   qe = make_queue_entry (h, msize,
709                          queue_priority, max_queue_size, timeout,
710                          &process_status_message, scont);
711   if (qe == NULL)
712     return NULL;
713   dm = (struct DataMessage* ) &qe[1];
714   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
715   dm->header.size = htons(msize);
716   dm->rid = htonl(rid);
717   dm->size = htonl(size);
718   dm->type = htonl(type);
719   dm->priority = htonl(priority);
720   dm->anonymity = htonl(anonymity);
721   dm->uid = GNUNET_htonll(0);
722   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
723   dm->key = *key;
724   memcpy (&dm[1], data, size);
725   process_queue (h);
726   return qe;
727 }
728
729
730 /**
731  * Reserve space in the datastore.  This function should be used
732  * to avoid "out of space" failures during a longer sequence of "put"
733  * operations (for example, when a file is being inserted).
734  *
735  * @param h handle to the datastore
736  * @param amount how much space (in bytes) should be reserved (for content only)
737  * @param entries how many entries will be created (to calculate per-entry overhead)
738  * @param queue_priority ranking of this request in the priority queue
739  * @param max_queue_size at what queue size should this request be dropped
740  *        (if other requests of higher priority are in the queue)
741  * @param timeout how long to wait at most for a response (or before dying in queue)
742  * @param cont continuation to call when done; "success" will be set to
743  *             a positive reservation value if space could be reserved.
744  * @param cont_cls closure for cont
745  * @return NULL if the entry was not queued, otherwise a handle that can be used to
746  *         cancel; note that even if NULL is returned, the callback will be invoked
747  *         (or rather, will already have been invoked)
748  */
749 struct GNUNET_DATASTORE_QueueEntry *
750 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
751                           uint64_t amount,
752                           uint32_t entries,
753                           unsigned int queue_priority,
754                           unsigned int max_queue_size,
755                           struct GNUNET_TIME_Relative timeout,
756                           GNUNET_DATASTORE_ContinuationWithStatus cont,
757                           void *cont_cls)
758 {
759   struct GNUNET_DATASTORE_QueueEntry *qe;
760   struct ReserveMessage *rm;
761   struct StatusContext *scont;
762
763   if (cont == NULL)
764     cont = &drop_status_cont;
765 #if DEBUG_DATASTORE
766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767               "Asked to reserve %llu bytes of data and %u entries'\n",
768               (unsigned long long) amount,
769               (unsigned int) entries);
770 #endif
771   scont = GNUNET_malloc (sizeof (struct StatusContext));
772   scont->cont = cont;
773   scont->cont_cls = cont_cls;
774   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
775                          queue_priority, max_queue_size, timeout,
776                          &process_status_message, scont);
777   if (qe == NULL)
778     return NULL;
779   rm = (struct ReserveMessage*) &qe[1];
780   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
781   rm->header.size = htons(sizeof (struct ReserveMessage));
782   rm->entries = htonl(entries);
783   rm->amount = GNUNET_htonll(amount);
784   process_queue (h);
785   return qe;
786 }
787
788
789 /**
790  * Signal that all of the data for which a reservation was made has
791  * been stored and that whatever excess space might have been reserved
792  * can now be released.
793  *
794  * @param h handle to the datastore
795  * @param rid reservation ID (value of "success" in original continuation
796  *        from the "reserve" function).
797  * @param queue_priority ranking of this request in the priority queue
798  * @param max_queue_size at what queue size should this request be dropped
799  *        (if other requests of higher priority are in the queue)
800  * @param queue_priority ranking of this request in the priority queue
801  * @param max_queue_size at what queue size should this request be dropped
802  *        (if other requests of higher priority are in the queue)
803  * @param timeout how long to wait at most for a response
804  * @param cont continuation to call when done
805  * @param cont_cls closure for cont
806  * @return NULL if the entry was not queued, otherwise a handle that can be used to
807  *         cancel; note that even if NULL is returned, the callback will be invoked
808  *         (or rather, will already have been invoked)
809  */
810 struct GNUNET_DATASTORE_QueueEntry *
811 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
812                                   int rid,
813                                   unsigned int queue_priority,
814                                   unsigned int max_queue_size,
815                                   struct GNUNET_TIME_Relative timeout,
816                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
817                                   void *cont_cls)
818 {
819   struct GNUNET_DATASTORE_QueueEntry *qe;
820   struct ReleaseReserveMessage *rrm;
821   struct StatusContext *scont;
822
823   if (cont == NULL)
824     cont = &drop_status_cont;
825 #if DEBUG_DATASTORE
826   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827               "Asked to release reserve %d\n",
828               rid);
829 #endif
830   scont = GNUNET_malloc (sizeof (struct StatusContext));
831   scont->cont = cont;
832   scont->cont_cls = cont_cls;
833   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
834                          queue_priority, max_queue_size, timeout,
835                          &process_status_message, scont);
836   if (qe == NULL)
837     return NULL;
838   rrm = (struct ReleaseReserveMessage*) &qe[1];
839   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
840   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
841   rrm->rid = htonl(rid);
842   process_queue (h);
843   return qe;
844 }
845
846
847 /**
848  * Update a value in the datastore.
849  *
850  * @param h handle to the datastore
851  * @param uid identifier for the value
852  * @param priority how much to increase the priority of the value
853  * @param expiration new expiration value should be MAX of existing and this argument
854  * @param queue_priority ranking of this request in the priority queue
855  * @param max_queue_size at what queue size should this request be dropped
856  *        (if other requests of higher priority are in the queue)
857  * @param timeout how long to wait at most for a response
858  * @param cont continuation to call when done
859  * @param cont_cls closure for cont
860  * @return NULL if the entry was not queued, otherwise a handle that can be used to
861  *         cancel; note that even if NULL is returned, the callback will be invoked
862  *         (or rather, will already have been invoked)
863  */
864 struct GNUNET_DATASTORE_QueueEntry *
865 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
866                          unsigned long long uid,
867                          uint32_t priority,
868                          struct GNUNET_TIME_Absolute expiration,
869                          unsigned int queue_priority,
870                          unsigned int max_queue_size,
871                          struct GNUNET_TIME_Relative timeout,
872                          GNUNET_DATASTORE_ContinuationWithStatus cont,
873                          void *cont_cls)
874 {
875   struct GNUNET_DATASTORE_QueueEntry *qe;
876   struct UpdateMessage *um;
877   struct StatusContext *scont;
878
879   if (cont == NULL)
880     cont = &drop_status_cont;
881 #if DEBUG_DATASTORE
882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
883               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
884               uid,
885               (unsigned int) priority,
886               (unsigned long long) expiration.value);
887 #endif
888   scont = GNUNET_malloc (sizeof (struct StatusContext));
889   scont->cont = cont;
890   scont->cont_cls = cont_cls;
891   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
892                          queue_priority, max_queue_size, timeout,
893                          &process_status_message, scont);
894   if (qe == NULL)
895     return NULL;
896   um = (struct UpdateMessage*) &qe[1];
897   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
898   um->header.size = htons(sizeof (struct UpdateMessage));
899   um->priority = htonl(priority);
900   um->expiration = GNUNET_TIME_absolute_hton(expiration);
901   um->uid = GNUNET_htonll(uid);
902   process_queue (h);
903   return qe;
904 }
905
906
907 /**
908  * Explicitly remove some content from the database.
909  * The "cont"inuation will be called with status
910  * "GNUNET_OK" if content was removed, "GNUNET_NO"
911  * if no matching entry was found and "GNUNET_SYSERR"
912  * on all other types of errors.
913  *
914  * @param h handle to the datastore
915  * @param key key for the value
916  * @param size number of bytes in data
917  * @param data content stored
918  * @param queue_priority ranking of this request in the priority queue
919  * @param max_queue_size at what queue size should this request be dropped
920  *        (if other requests of higher priority are in the queue)
921  * @param timeout how long to wait at most for a response
922  * @param cont continuation to call when done
923  * @param cont_cls closure for cont
924  * @return NULL if the entry was not queued, otherwise a handle that can be used to
925  *         cancel; note that even if NULL is returned, the callback will be invoked
926  *         (or rather, will already have been invoked)
927  */
928 struct GNUNET_DATASTORE_QueueEntry *
929 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
930                          const GNUNET_HashCode *key,
931                          uint32_t size, 
932                          const void *data,
933                          unsigned int queue_priority,
934                          unsigned int max_queue_size,
935                          struct GNUNET_TIME_Relative timeout,
936                          GNUNET_DATASTORE_ContinuationWithStatus cont,
937                          void *cont_cls)
938 {
939   struct GNUNET_DATASTORE_QueueEntry *qe;
940   struct DataMessage *dm;
941   size_t msize;
942   struct StatusContext *scont;
943
944   if (cont == NULL)
945     cont = &drop_status_cont;
946 #if DEBUG_DATASTORE
947   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
948               "Asked to remove %u bytes under key `%s'\n",
949               size,
950               GNUNET_h2s (key));
951 #endif
952   scont = GNUNET_malloc (sizeof (struct StatusContext));
953   scont->cont = cont;
954   scont->cont_cls = cont_cls;
955   msize = sizeof(struct DataMessage) + size;
956   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
957   qe = make_queue_entry (h, msize,
958                          queue_priority, max_queue_size, timeout,
959                          &process_status_message, scont);
960   if (qe == NULL)
961     return NULL;
962   dm = (struct DataMessage*) &qe[1];
963   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
964   dm->header.size = htons(msize);
965   dm->rid = htonl(0);
966   dm->size = htonl(size);
967   dm->type = htonl(0);
968   dm->priority = htonl(0);
969   dm->anonymity = htonl(0);
970   dm->uid = GNUNET_htonll(0);
971   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
972   dm->key = *key;
973   memcpy (&dm[1], data, size);
974   process_queue (h);
975   return qe;
976 }
977
978
979
980 /**
981  * Context for processing result messages.
982  */
983 struct ResultContext
984 {
985   /**
986    * Iterator to call with the result.
987    */
988   GNUNET_DATASTORE_Iterator iter;
989
990   /**
991    * Closure for iter.
992    */
993   void *iter_cls;
994
995 };
996
997
998 /**
999  * Type of a function to call when we receive a message
1000  * from the service.
1001  *
1002  * @param cls closure
1003  * @param msg message received, NULL on timeout or fatal error
1004  */
1005 static void 
1006 process_result_message (void *cls,
1007                         const struct GNUNET_MessageHeader * msg)
1008 {
1009   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1010   struct GNUNET_DATASTORE_Handle *h = qe->h;
1011   struct ResultContext *rc = qe->client_ctx;
1012   const struct DataMessage *dm;
1013
1014   GNUNET_assert (h->queue_head == qe);
1015   if (msg == NULL)
1016     {
1017 #if DEBUG_DATASTORE
1018       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1019                   _("Failed to receive response from datastore\n"));
1020 #endif
1021       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1022                                    h->queue_tail,
1023                                    qe);
1024       GNUNET_free (qe);
1025       do_disconnect (h);
1026       rc->iter (rc->iter_cls,
1027                 NULL, 0, NULL, 0, 0, 0, 
1028                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1029       GNUNET_free (rc);
1030       return;
1031     }
1032   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1033     {
1034       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1035 #if DEBUG_DATASTORE
1036       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                   "Received end of result set\n");
1038 #endif
1039       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1040                                    h->queue_tail,
1041                                    qe);
1042       GNUNET_free (qe);
1043       rc->iter (rc->iter_cls,
1044                 NULL, 0, NULL, 0, 0, 0, 
1045                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1046       GNUNET_free (rc);
1047       process_queue (h);
1048       return;
1049     }
1050   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1051        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1052        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1053     {
1054       GNUNET_break (0);
1055       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1056                                    h->queue_tail,
1057                                    qe);
1058       GNUNET_free (qe);
1059       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1060       do_disconnect (h);
1061       rc->iter (rc->iter_cls,
1062                 NULL, 0, NULL, 0, 0, 0, 
1063                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1064       GNUNET_free (rc);
1065       return;
1066     }
1067   dm = (const struct DataMessage*) msg;
1068 #if DEBUG_DATASTORE
1069   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070               "Received result %llu with type %u and size %u with key %s\n",
1071               (unsigned long long) GNUNET_ntohll(dm->uid),
1072               ntohl(dm->type),
1073               ntohl(dm->size),
1074               GNUNET_h2s(&dm->key));
1075 #endif
1076   rc->iter (rc->iter_cls,
1077             &dm->key,
1078             ntohl(dm->size),
1079             &dm[1],
1080             ntohl(dm->type),
1081             ntohl(dm->priority),
1082             ntohl(dm->anonymity),
1083             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1084             GNUNET_ntohll(dm->uid));
1085 }
1086
1087
1088 /**
1089  * Get a random value from the datastore.
1090  *
1091  * @param h handle to the datastore
1092  * @param queue_priority ranking of this request in the priority queue
1093  * @param max_queue_size at what queue size should this request be dropped
1094  *        (if other requests of higher priority are in the queue)
1095  * @param timeout how long to wait at most for a response
1096  * @param iter function to call on a random value; it
1097  *        will be called once with a value (if available)
1098  *        and always once with a value of NULL.
1099  * @param iter_cls closure for iter
1100  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1101  *         cancel; note that even if NULL is returned, the callback will be invoked
1102  *         (or rather, will already have been invoked)
1103  */
1104 struct GNUNET_DATASTORE_QueueEntry *
1105 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1106                              unsigned int queue_priority,
1107                              unsigned int max_queue_size,
1108                              struct GNUNET_TIME_Relative timeout,
1109                              GNUNET_DATASTORE_Iterator iter, 
1110                              void *iter_cls)
1111 {
1112   struct GNUNET_DATASTORE_QueueEntry *qe;
1113   struct GNUNET_MessageHeader *m;
1114   struct ResultContext *rcont;
1115
1116 #if DEBUG_DATASTORE
1117   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1118               "Asked to get random entry in %llu ms\n",
1119               (unsigned long long) timeout.value);
1120 #endif
1121   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1122   rcont->iter = iter;
1123   rcont->iter_cls = iter_cls;
1124   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1125                          queue_priority, max_queue_size, timeout,
1126                          &process_result_message, rcont);
1127   if (qe == NULL)
1128     return NULL;
1129   m = (struct GNUNET_MessageHeader*) &qe[1];
1130   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1131   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1132   process_queue (h);
1133   return qe;
1134 }
1135
1136
1137
1138 /**
1139  * Iterate over the results for a particular key
1140  * in the datastore.  The iterator will only be called
1141  * once initially; if the first call did contain a
1142  * result, further results can be obtained by calling
1143  * "GNUNET_DATASTORE_get_next" with the given argument.
1144  *
1145  * @param h handle to the datastore
1146  * @param key maybe NULL (to match all entries)
1147  * @param type desired type, 0 for any
1148  * @param queue_priority ranking of this request in the priority queue
1149  * @param max_queue_size at what queue size should this request be dropped
1150  *        (if other requests of higher priority are in the queue)
1151  * @param timeout how long to wait at most for a response
1152  * @param iter function to call on each matching value;
1153  *        will be called once with a NULL value at the end
1154  * @param iter_cls closure for iter
1155  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1156  *         cancel; note that even if NULL is returned, the callback will be invoked
1157  *         (or rather, will already have been invoked)
1158  */
1159 struct GNUNET_DATASTORE_QueueEntry *
1160 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1161                       const GNUNET_HashCode * key,
1162                       enum GNUNET_BLOCK_Type type,
1163                       unsigned int queue_priority,
1164                       unsigned int max_queue_size,
1165                       struct GNUNET_TIME_Relative timeout,
1166                       GNUNET_DATASTORE_Iterator iter, 
1167                       void *iter_cls)
1168 {
1169   struct GNUNET_DATASTORE_QueueEntry *qe;
1170   struct GetMessage *gm;
1171   struct ResultContext *rcont;
1172
1173 #if DEBUG_DATASTORE
1174   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1175               "Asked to look for data of type %u under key `%s'\n",
1176               (unsigned int) type,
1177               GNUNET_h2s (key));
1178 #endif
1179   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1180   rcont->iter = iter;
1181   rcont->iter_cls = iter_cls;
1182   qe = make_queue_entry (h, sizeof(struct GetMessage),
1183                          queue_priority, max_queue_size, timeout,
1184                          &process_result_message, rcont);
1185   if (qe == NULL)
1186     return NULL;
1187   gm = (struct GetMessage*) &qe[1];
1188   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1189   gm->type = htonl(type);
1190   if (key != NULL)
1191     {
1192       gm->header.size = htons(sizeof (struct GetMessage));
1193       gm->key = *key;
1194     }
1195   else
1196     {
1197       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1198     }
1199   process_queue (h);
1200   return qe;
1201 }
1202
1203
1204 /**
1205  * Function called to trigger obtaining the next result
1206  * from the datastore.
1207  * 
1208  * @param h handle to the datastore
1209  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1210  *        iteration (with a final call to "iter" with key/data == NULL).
1211  */
1212 void 
1213 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1214                            int more)
1215 {
1216   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1217   struct ResultContext *rc = qe->client_ctx;
1218
1219   GNUNET_assert (NULL != qe);
1220   GNUNET_assert (&process_result_message == qe->response_proc);
1221   if (GNUNET_YES == more)
1222     {     
1223       GNUNET_CLIENT_receive (h->client,
1224                              qe->response_proc,
1225                              qe,
1226                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1227       return;
1228     }
1229   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1230                                h->queue_tail,
1231                                qe);
1232   GNUNET_free (qe);
1233   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1234   do_disconnect (h);
1235   rc->iter (rc->iter_cls,
1236             NULL, 0, NULL, 0, 0, 0, 
1237             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1238   GNUNET_free (rc);
1239 }
1240
1241
1242 /**
1243  * Cancel a datastore operation.  The final callback from the
1244  * operation must not have been done yet.
1245  * 
1246  * @param qe operation to cancel
1247  */
1248 void
1249 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1250 {
1251   struct GNUNET_DATASTORE_Handle *h;
1252   int reconnect;
1253
1254   h = qe->h;
1255   reconnect = qe->was_transmitted;
1256   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1257                                h->queue_tail,
1258                                qe);
1259   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
1260     GNUNET_SCHEDULER_cancel (h->sched,
1261                              qe->task);
1262   GNUNET_free (qe);
1263   if (reconnect)
1264     {
1265       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1266       do_disconnect (h);
1267     }
1268 }
1269
1270
1271 /* end of datastore_api.c */