7bdd3b38bccdbc1d1d905d0eb96fb9e028ce720c
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct GNUNET_DATASTORE_QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct GNUNET_DATASTORE_QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct GNUNET_DATASTORE_QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct GNUNET_DATASTORE_QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct GNUNET_DATASTORE_QueueEntry *qe;
244
245   if (h->client != NULL)
246     {
247       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
248       h->client = NULL;
249     }
250   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
251     {
252       GNUNET_SCHEDULER_cancel (h->sched,
253                                h->reconnect_task);
254       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
255     }
256   while (NULL != (qe = h->queue_head))
257     {
258       if (NULL != qe->response_proc)
259         {
260           qe->response_proc (qe, NULL);
261         }
262       else
263         {
264           GNUNET_CONTAINER_DLL_remove (h->queue_head,
265                                        h->queue_tail,
266                                        qe);
267           if (qe->task != GNUNET_SCHEDULER_NO_TASK)
268             GNUNET_SCHEDULER_cancel (h->sched,
269                                      qe->task);
270           GNUNET_free (qe);
271         }
272     }
273   if (GNUNET_YES == drop) 
274     {
275       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
276       if (h->client != NULL)
277         {
278           if (NULL != 
279               GNUNET_CLIENT_notify_transmit_ready (h->client,
280                                                    sizeof(struct GNUNET_MessageHeader),
281                                                    GNUNET_TIME_UNIT_MINUTES,
282                                                    GNUNET_YES,
283                                                    &transmit_drop,
284                                                    h))
285             return;
286           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
287         }
288       GNUNET_break (0);
289     }
290   GNUNET_free (h);
291 }
292
293
294 /**
295  * A request has timed out (before being transmitted to the service).
296  *
297  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
298  * @param tc scheduler context
299  */
300 static void
301 timeout_queue_entry (void *cls,
302                      const struct GNUNET_SCHEDULER_TaskContext *tc)
303 {
304   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
305
306   qe->task = GNUNET_SCHEDULER_NO_TASK;
307   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
308   qe->response_proc (qe, NULL);
309 }
310
311
312 /**
313  * Create a new entry for our priority queue (and possibly discard other entires if
314  * the queue is getting too long).
315  *
316  * @param h handle to the datastore
317  * @param msize size of the message to queue
318  * @param queue_priority priority of the entry
319  * @param max_queue_size at what queue size should this request be dropped
320  *        (if other requests of higher priority are in the queue)
321  * @param timeout timeout for the operation
322  * @param response_proc function to call with replies (can be NULL)
323  * @param client_ctx client context (NOT a closure for response_proc)
324  * @return NULL if the queue is full (and this entry was dropped)
325  */
326 static struct GNUNET_DATASTORE_QueueEntry *
327 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
328                   size_t msize,
329                   unsigned int queue_priority,
330                   unsigned int max_queue_size,
331                   struct GNUNET_TIME_Relative timeout,
332                   GNUNET_CLIENT_MessageHandler response_proc,            
333                   void *client_ctx)
334 {
335   struct GNUNET_DATASTORE_QueueEntry *ret;
336   struct GNUNET_DATASTORE_QueueEntry *pos;
337   unsigned int c;
338
339   c = 0;
340   pos = h->queue_head;
341   while ( (pos != NULL) &&
342           (c < max_queue_size) &&
343           (pos->priority >= queue_priority) )
344     {
345       c++;
346       pos = pos->next;
347     }
348   if (c >= max_queue_size)
349     return NULL;
350   if (pos == NULL)
351     {
352       /* append at the tail */
353       pos = h->queue_tail;
354     }
355   else
356     {
357       pos = pos->prev; 
358       /* do not insert at HEAD if HEAD query was already
359          transmitted and we are still receiving replies! */
360       if ( (pos == NULL) &&
361            (h->queue_head->was_transmitted) )
362         pos = h->queue_head;
363     }
364   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
365   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
366                                      h->queue_tail,
367                                      pos,
368                                      ret);
369   ret->h = h;
370   ret->response_proc = response_proc;
371   ret->client_ctx = client_ctx;
372   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
373                                             timeout,
374                                             &timeout_queue_entry,
375                                             ret);
376   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
377   ret->priority = queue_priority;
378   ret->max_queue = max_queue_size;
379   ret->message_size = msize;
380   ret->was_transmitted = GNUNET_NO;
381   h->queue_size++;
382   c++;
383   pos = ret->next;
384   while (pos != NULL) 
385     {
386       if (pos->max_queue < h->queue_size)
387         {
388           GNUNET_CONTAINER_DLL_remove (h->queue_head,
389                                        h->queue_tail,
390                                        pos);
391           GNUNET_SCHEDULER_cancel (h->sched,
392                                    pos->task);
393           if (pos->response_proc != NULL)
394             pos->response_proc (pos, NULL);
395           GNUNET_free (pos);
396           h->queue_size--;
397           break;
398         }
399       pos = pos->next;
400     }
401   return ret;
402 }
403
404
405 /**
406  * Process entries in the queue (or do nothing if we are already
407  * doing so).
408  * 
409  * @param h handle to the datastore
410  */
411 static void
412 process_queue (struct GNUNET_DATASTORE_Handle *h);
413
414
415 /**
416  * Try reconnecting to the datastore service.
417  *
418  * @param cls the 'struct GNUNET_DATASTORE_Handle'
419  * @param tc scheduler context
420  */
421 static void
422 try_reconnect (void *cls,
423                const struct GNUNET_SCHEDULER_TaskContext *tc)
424 {
425   struct GNUNET_DATASTORE_Handle *h = cls;
426
427   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
428     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
429   else
430     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
431   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
432     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
433   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
434   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
435   if (h->client == NULL)
436     return;
437   process_queue (h);
438 }
439
440
441 /**
442  * Disconnect from the service and then try reconnecting to the datastore service
443  * after some delay.
444  *
445  * @param cls the 'struct GNUNET_DATASTORE_Handle'
446  * @param tc scheduler context
447  */
448 static void
449 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
450 {
451   if (h->client == NULL)
452     return;
453   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
454   h->client = NULL;
455   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
456                                                     h->retry_time,
457                                                     &try_reconnect,
458                                                     h);      
459 }
460
461
462 /**
463  * Transmit request from queue to datastore service.
464  *
465  * @param cls the 'struct GNUNET_DATASTORE_Handle'
466  * @param size number of bytes that can be copied to buf
467  * @param buf where to copy the drop message
468  * @return number of bytes written to buf
469  */
470 static size_t
471 transmit_request (void *cls,
472                   size_t size, 
473                   void *buf)
474 {
475   struct GNUNET_DATASTORE_Handle *h = cls;
476   struct GNUNET_DATASTORE_QueueEntry *qe;
477   size_t msize;
478
479   h->th = NULL;
480   if (NULL == (qe = h->queue_head))
481     return 0; /* no entry in queue */
482   if (buf == NULL)
483     {
484       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
485                   _("Failed to transmit request to database.\n"));
486       do_disconnect (h);
487       return 0;
488     }
489   if (size < (msize = qe->message_size))
490     {
491       process_queue (h);
492       return 0;
493     }
494   memcpy (buf, &qe[1], msize);
495   qe->was_transmitted = GNUNET_YES;
496   GNUNET_SCHEDULER_cancel (h->sched,
497                            qe->task);
498   qe->task = GNUNET_SCHEDULER_NO_TASK;
499   GNUNET_CLIENT_receive (h->client,
500                          qe->response_proc,
501                          qe,
502                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
503   return msize;
504 }
505
506
507 /**
508  * Process entries in the queue (or do nothing if we are already
509  * doing so).
510  * 
511  * @param h handle to the datastore
512  */
513 static void
514 process_queue (struct GNUNET_DATASTORE_Handle *h)
515 {
516   struct GNUNET_DATASTORE_QueueEntry *qe;
517
518   if (NULL == (qe = h->queue_head))
519     return; /* no entry in queue */
520   if (qe->was_transmitted == GNUNET_YES)
521     return; /* waiting for replies */
522   if (h->th != NULL)
523     return; /* request pending */
524   if (h->client == NULL)
525     return; /* waiting for reconnect */
526   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
527                                                qe->message_size,
528                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
529                                                GNUNET_YES,
530                                                &transmit_request,
531                                                h);
532 }
533
534
535
536
537 /**
538  * Context for processing status messages.
539  */
540 struct StatusContext
541 {
542   /**
543    * Continuation to call with the status.
544    */
545   GNUNET_DATASTORE_ContinuationWithStatus cont;
546
547   /**
548    * Closure for cont.
549    */
550   void *cont_cls;
551
552 };
553
554
555 /**
556  * Dummy continuation used to do nothing (but be non-zero).
557  *
558  * @param cls closure
559  * @param result result 
560  * @param emsg error message
561  */
562 static void
563 drop_status_cont (void *cls, int result, const char *emsg)
564 {
565   /* do nothing */
566 }
567
568
569 /**
570  * Type of a function to call when we receive a message
571  * from the service.
572  *
573  * @param cls closure
574  * @param msg message received, NULL on timeout or fatal error
575  */
576 static void 
577 process_status_message (void *cls,
578                         const struct
579                         GNUNET_MessageHeader * msg)
580 {
581   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
582   struct GNUNET_DATASTORE_Handle *h = qe->h;
583   struct StatusContext *rc = qe->client_ctx;
584   const struct StatusMessage *sm;
585   const char *emsg;
586   int32_t status;
587
588   GNUNET_CONTAINER_DLL_remove (h->queue_head,
589                                h->queue_tail,
590                                qe);
591   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
592     {
593       GNUNET_SCHEDULER_cancel (h->sched,
594                                qe->task);
595       qe->task = GNUNET_SCHEDULER_NO_TASK;
596     }
597   GNUNET_free (qe);
598   if (msg == NULL)
599     {      
600       if (NULL == h->client)
601         return; /* forced disconnect */
602       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
603                   _("Failed to receive response from database.\n"));
604       do_disconnect (h);
605       return;
606     }
607
608   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
609        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
610     {
611       GNUNET_break (0);
612       h->retry_time = GNUNET_TIME_UNIT_ZERO;
613       do_disconnect (h);
614       rc->cont (rc->cont_cls, 
615                 GNUNET_SYSERR,
616                 _("Error reading response from datastore service"));
617       GNUNET_free (rc);
618       return;
619     }
620   sm = (const struct StatusMessage*) msg;
621   status = ntohl(sm->status);
622   emsg = NULL;
623   if (ntohs(msg->size) > sizeof(struct StatusMessage))
624     {
625       emsg = (const char*) &sm[1];
626       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
627         {
628           GNUNET_break (0);
629           emsg = _("Invalid error message received from datastore service");
630         }
631     }  
632   if ( (status == GNUNET_SYSERR) &&
633        (emsg == NULL) )
634     {
635       GNUNET_break (0);
636       emsg = _("Invalid error message received from datastore service");
637     }
638 #if DEBUG_DATASTORE
639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
640               "Received status %d/%s\n",
641               (int) status,
642               emsg);
643 #endif
644   rc->cont (rc->cont_cls, 
645             status,
646             emsg);
647   GNUNET_free (rc);  
648   process_queue (h);
649 }
650
651
652 /**
653  * Store an item in the datastore.  If the item is already present,
654  * the priorities are summed up and the higher expiration time and
655  * lower anonymity level is used.
656  *
657  * @param h handle to the datastore
658  * @param rid reservation ID to use (from "reserve"); use 0 if no
659  *            prior reservation was made
660  * @param key key for the value
661  * @param size number of bytes in data
662  * @param data content stored
663  * @param type type of the content
664  * @param priority priority of the content
665  * @param anonymity anonymity-level for the content
666  * @param expiration expiration time for the content
667  * @param queue_priority ranking of this request in the priority queue
668  * @param max_queue_size at what queue size should this request be dropped
669  *        (if other requests of higher priority are in the queue)
670  * @param timeout timeout for the operation
671  * @param cont continuation to call when done
672  * @param cont_cls closure for cont
673  * @return NULL if the entry was not queued, otherwise a handle that can be used to
674  *         cancel; note that even if NULL is returned, the callback will be invoked
675  *         (or rather, will already have been invoked)
676  */
677 struct GNUNET_DATASTORE_QueueEntry *
678 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
679                       int rid,
680                       const GNUNET_HashCode * key,
681                       uint32_t size,
682                       const void *data,
683                       enum GNUNET_BLOCK_Type type,
684                       uint32_t priority,
685                       uint32_t anonymity,
686                       struct GNUNET_TIME_Absolute expiration,
687                       unsigned int queue_priority,
688                       unsigned int max_queue_size,
689                       struct GNUNET_TIME_Relative timeout,
690                       GNUNET_DATASTORE_ContinuationWithStatus cont,
691                       void *cont_cls)
692 {
693   struct StatusContext *scont;
694   struct GNUNET_DATASTORE_QueueEntry *qe;
695   struct DataMessage *dm;
696   size_t msize;
697
698 #if DEBUG_DATASTORE
699   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
700               "Asked to put %u bytes of data under key `%s'\n",
701               size,
702               GNUNET_h2s (key));
703 #endif
704   msize = sizeof(struct DataMessage) + size;
705   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
706   scont = GNUNET_malloc (sizeof (struct StatusContext));
707   scont->cont = cont;
708   scont->cont_cls = cont_cls;
709   qe = make_queue_entry (h, msize,
710                          queue_priority, max_queue_size, timeout,
711                          &process_status_message, scont);
712   if (qe == NULL)
713     return NULL;
714   dm = (struct DataMessage* ) &qe[1];
715   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
716   dm->header.size = htons(msize);
717   dm->rid = htonl(rid);
718   dm->size = htonl(size);
719   dm->type = htonl(type);
720   dm->priority = htonl(priority);
721   dm->anonymity = htonl(anonymity);
722   dm->uid = GNUNET_htonll(0);
723   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
724   dm->key = *key;
725   memcpy (&dm[1], data, size);
726   process_queue (h);
727   return qe;
728 }
729
730
731 /**
732  * Reserve space in the datastore.  This function should be used
733  * to avoid "out of space" failures during a longer sequence of "put"
734  * operations (for example, when a file is being inserted).
735  *
736  * @param h handle to the datastore
737  * @param amount how much space (in bytes) should be reserved (for content only)
738  * @param entries how many entries will be created (to calculate per-entry overhead)
739  * @param queue_priority ranking of this request in the priority queue
740  * @param max_queue_size at what queue size should this request be dropped
741  *        (if other requests of higher priority are in the queue)
742  * @param timeout how long to wait at most for a response (or before dying in queue)
743  * @param cont continuation to call when done; "success" will be set to
744  *             a positive reservation value if space could be reserved.
745  * @param cont_cls closure for cont
746  * @return NULL if the entry was not queued, otherwise a handle that can be used to
747  *         cancel; note that even if NULL is returned, the callback will be invoked
748  *         (or rather, will already have been invoked)
749  */
750 struct GNUNET_DATASTORE_QueueEntry *
751 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
752                           uint64_t amount,
753                           uint32_t entries,
754                           unsigned int queue_priority,
755                           unsigned int max_queue_size,
756                           struct GNUNET_TIME_Relative timeout,
757                           GNUNET_DATASTORE_ContinuationWithStatus cont,
758                           void *cont_cls)
759 {
760   struct GNUNET_DATASTORE_QueueEntry *qe;
761   struct ReserveMessage *rm;
762   struct StatusContext *scont;
763
764   if (cont == NULL)
765     cont = &drop_status_cont;
766 #if DEBUG_DATASTORE
767   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
768               "Asked to reserve %llu bytes of data and %u entries'\n",
769               (unsigned long long) amount,
770               (unsigned int) entries);
771 #endif
772   scont = GNUNET_malloc (sizeof (struct StatusContext));
773   scont->cont = cont;
774   scont->cont_cls = cont_cls;
775   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
776                          queue_priority, max_queue_size, timeout,
777                          &process_status_message, scont);
778   if (qe == NULL)
779     return NULL;
780   rm = (struct ReserveMessage*) &qe[1];
781   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
782   rm->header.size = htons(sizeof (struct ReserveMessage));
783   rm->entries = htonl(entries);
784   rm->amount = GNUNET_htonll(amount);
785   process_queue (h);
786   return qe;
787 }
788
789
790 /**
791  * Signal that all of the data for which a reservation was made has
792  * been stored and that whatever excess space might have been reserved
793  * can now be released.
794  *
795  * @param h handle to the datastore
796  * @param rid reservation ID (value of "success" in original continuation
797  *        from the "reserve" function).
798  * @param queue_priority ranking of this request in the priority queue
799  * @param max_queue_size at what queue size should this request be dropped
800  *        (if other requests of higher priority are in the queue)
801  * @param queue_priority ranking of this request in the priority queue
802  * @param max_queue_size at what queue size should this request be dropped
803  *        (if other requests of higher priority are in the queue)
804  * @param timeout how long to wait at most for a response
805  * @param cont continuation to call when done
806  * @param cont_cls closure for cont
807  * @return NULL if the entry was not queued, otherwise a handle that can be used to
808  *         cancel; note that even if NULL is returned, the callback will be invoked
809  *         (or rather, will already have been invoked)
810  */
811 struct GNUNET_DATASTORE_QueueEntry *
812 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
813                                   int rid,
814                                   unsigned int queue_priority,
815                                   unsigned int max_queue_size,
816                                   struct GNUNET_TIME_Relative timeout,
817                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
818                                   void *cont_cls)
819 {
820   struct GNUNET_DATASTORE_QueueEntry *qe;
821   struct ReleaseReserveMessage *rrm;
822   struct StatusContext *scont;
823
824   if (cont == NULL)
825     cont = &drop_status_cont;
826 #if DEBUG_DATASTORE
827   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828               "Asked to release reserve %d\n",
829               rid);
830 #endif
831   scont = GNUNET_malloc (sizeof (struct StatusContext));
832   scont->cont = cont;
833   scont->cont_cls = cont_cls;
834   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
835                          queue_priority, max_queue_size, timeout,
836                          &process_status_message, scont);
837   if (qe == NULL)
838     return NULL;
839   rrm = (struct ReleaseReserveMessage*) &qe[1];
840   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
841   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
842   rrm->rid = htonl(rid);
843   process_queue (h);
844   return qe;
845 }
846
847
848 /**
849  * Update a value in the datastore.
850  *
851  * @param h handle to the datastore
852  * @param uid identifier for the value
853  * @param priority how much to increase the priority of the value
854  * @param expiration new expiration value should be MAX of existing and this argument
855  * @param queue_priority ranking of this request in the priority queue
856  * @param max_queue_size at what queue size should this request be dropped
857  *        (if other requests of higher priority are in the queue)
858  * @param timeout how long to wait at most for a response
859  * @param cont continuation to call when done
860  * @param cont_cls closure for cont
861  * @return NULL if the entry was not queued, otherwise a handle that can be used to
862  *         cancel; note that even if NULL is returned, the callback will be invoked
863  *         (or rather, will already have been invoked)
864  */
865 struct GNUNET_DATASTORE_QueueEntry *
866 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
867                          unsigned long long uid,
868                          uint32_t priority,
869                          struct GNUNET_TIME_Absolute expiration,
870                          unsigned int queue_priority,
871                          unsigned int max_queue_size,
872                          struct GNUNET_TIME_Relative timeout,
873                          GNUNET_DATASTORE_ContinuationWithStatus cont,
874                          void *cont_cls)
875 {
876   struct GNUNET_DATASTORE_QueueEntry *qe;
877   struct UpdateMessage *um;
878   struct StatusContext *scont;
879
880   if (cont == NULL)
881     cont = &drop_status_cont;
882 #if DEBUG_DATASTORE
883   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
884               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
885               uid,
886               (unsigned int) priority,
887               (unsigned long long) expiration.value);
888 #endif
889   scont = GNUNET_malloc (sizeof (struct StatusContext));
890   scont->cont = cont;
891   scont->cont_cls = cont_cls;
892   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
893                          queue_priority, max_queue_size, timeout,
894                          &process_status_message, scont);
895   if (qe == NULL)
896     return NULL;
897   um = (struct UpdateMessage*) &qe[1];
898   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
899   um->header.size = htons(sizeof (struct UpdateMessage));
900   um->priority = htonl(priority);
901   um->expiration = GNUNET_TIME_absolute_hton(expiration);
902   um->uid = GNUNET_htonll(uid);
903   process_queue (h);
904   return qe;
905 }
906
907
908 /**
909  * Explicitly remove some content from the database.
910  * The "cont"inuation will be called with status
911  * "GNUNET_OK" if content was removed, "GNUNET_NO"
912  * if no matching entry was found and "GNUNET_SYSERR"
913  * on all other types of errors.
914  *
915  * @param h handle to the datastore
916  * @param key key for the value
917  * @param size number of bytes in data
918  * @param data content stored
919  * @param queue_priority ranking of this request in the priority queue
920  * @param max_queue_size at what queue size should this request be dropped
921  *        (if other requests of higher priority are in the queue)
922  * @param timeout how long to wait at most for a response
923  * @param cont continuation to call when done
924  * @param cont_cls closure for cont
925  * @return NULL if the entry was not queued, otherwise a handle that can be used to
926  *         cancel; note that even if NULL is returned, the callback will be invoked
927  *         (or rather, will already have been invoked)
928  */
929 struct GNUNET_DATASTORE_QueueEntry *
930 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
931                          const GNUNET_HashCode *key,
932                          uint32_t size, 
933                          const void *data,
934                          unsigned int queue_priority,
935                          unsigned int max_queue_size,
936                          struct GNUNET_TIME_Relative timeout,
937                          GNUNET_DATASTORE_ContinuationWithStatus cont,
938                          void *cont_cls)
939 {
940   struct GNUNET_DATASTORE_QueueEntry *qe;
941   struct DataMessage *dm;
942   size_t msize;
943   struct StatusContext *scont;
944
945   if (cont == NULL)
946     cont = &drop_status_cont;
947 #if DEBUG_DATASTORE
948   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949               "Asked to remove %u bytes under key `%s'\n",
950               size,
951               GNUNET_h2s (key));
952 #endif
953   scont = GNUNET_malloc (sizeof (struct StatusContext));
954   scont->cont = cont;
955   scont->cont_cls = cont_cls;
956   msize = sizeof(struct DataMessage) + size;
957   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
958   qe = make_queue_entry (h, msize,
959                          queue_priority, max_queue_size, timeout,
960                          &process_status_message, scont);
961   if (qe == NULL)
962     return NULL;
963   dm = (struct DataMessage*) &qe[1];
964   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
965   dm->header.size = htons(msize);
966   dm->rid = htonl(0);
967   dm->size = htonl(size);
968   dm->type = htonl(0);
969   dm->priority = htonl(0);
970   dm->anonymity = htonl(0);
971   dm->uid = GNUNET_htonll(0);
972   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
973   dm->key = *key;
974   memcpy (&dm[1], data, size);
975   process_queue (h);
976   return qe;
977 }
978
979
980
981 /**
982  * Context for processing result messages.
983  */
984 struct ResultContext
985 {
986   /**
987    * Iterator to call with the result.
988    */
989   GNUNET_DATASTORE_Iterator iter;
990
991   /**
992    * Closure for iter.
993    */
994   void *iter_cls;
995
996 };
997
998
999 /**
1000  * Type of a function to call when we receive a message
1001  * from the service.
1002  *
1003  * @param cls closure
1004  * @param msg message received, NULL on timeout or fatal error
1005  */
1006 static void 
1007 process_result_message (void *cls,
1008                         const struct GNUNET_MessageHeader * msg)
1009 {
1010   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1011   struct GNUNET_DATASTORE_Handle *h = qe->h;
1012   struct ResultContext *rc = qe->client_ctx;
1013   const struct DataMessage *dm;
1014
1015   GNUNET_assert (h->queue_head == qe);
1016   if (msg == NULL)
1017     {
1018 #if DEBUG_DATASTORE
1019       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1020                   _("Failed to receive response from datastore\n"));
1021 #endif
1022       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1023                                    h->queue_tail,
1024                                    qe);
1025       GNUNET_free (qe);
1026       do_disconnect (h);
1027       rc->iter (rc->iter_cls,
1028                 NULL, 0, NULL, 0, 0, 0, 
1029                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1030       GNUNET_free (rc);
1031       return;
1032     }
1033   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1034     {
1035       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1036 #if DEBUG_DATASTORE
1037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1038                   "Received end of result set\n");
1039 #endif
1040       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1041                                    h->queue_tail,
1042                                    qe);
1043       GNUNET_free (qe);
1044       rc->iter (rc->iter_cls,
1045                 NULL, 0, NULL, 0, 0, 0, 
1046                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1047       GNUNET_free (rc);
1048       process_queue (h);
1049       return;
1050     }
1051   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1052        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1053        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1054     {
1055       GNUNET_break (0);
1056       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1057                                    h->queue_tail,
1058                                    qe);
1059       GNUNET_free (qe);
1060       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1061       do_disconnect (h);
1062       rc->iter (rc->iter_cls,
1063                 NULL, 0, NULL, 0, 0, 0, 
1064                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1065       GNUNET_free (rc);
1066       return;
1067     }
1068   dm = (const struct DataMessage*) msg;
1069 #if DEBUG_DATASTORE
1070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1071               "Received result %llu with type %u and size %u with key %s\n",
1072               (unsigned long long) GNUNET_ntohll(dm->uid),
1073               ntohl(dm->type),
1074               ntohl(dm->size),
1075               GNUNET_h2s(&dm->key));
1076 #endif
1077   rc->iter (rc->iter_cls,
1078             &dm->key,
1079             ntohl(dm->size),
1080             &dm[1],
1081             ntohl(dm->type),
1082             ntohl(dm->priority),
1083             ntohl(dm->anonymity),
1084             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1085             GNUNET_ntohll(dm->uid));
1086 }
1087
1088
1089 /**
1090  * Get a random value from the datastore.
1091  *
1092  * @param h handle to the datastore
1093  * @param queue_priority ranking of this request in the priority queue
1094  * @param max_queue_size at what queue size should this request be dropped
1095  *        (if other requests of higher priority are in the queue)
1096  * @param timeout how long to wait at most for a response
1097  * @param iter function to call on a random value; it
1098  *        will be called once with a value (if available)
1099  *        and always once with a value of NULL.
1100  * @param iter_cls closure for iter
1101  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1102  *         cancel; note that even if NULL is returned, the callback will be invoked
1103  *         (or rather, will already have been invoked)
1104  */
1105 struct GNUNET_DATASTORE_QueueEntry *
1106 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1107                              unsigned int queue_priority,
1108                              unsigned int max_queue_size,
1109                              struct GNUNET_TIME_Relative timeout,
1110                              GNUNET_DATASTORE_Iterator iter, 
1111                              void *iter_cls)
1112 {
1113   struct GNUNET_DATASTORE_QueueEntry *qe;
1114   struct GNUNET_MessageHeader *m;
1115   struct ResultContext *rcont;
1116
1117 #if DEBUG_DATASTORE
1118   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1119               "Asked to get random entry in %llu ms\n",
1120               (unsigned long long) timeout.value);
1121 #endif
1122   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1123   rcont->iter = iter;
1124   rcont->iter_cls = iter_cls;
1125   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1126                          queue_priority, max_queue_size, timeout,
1127                          &process_result_message, rcont);
1128   if (qe == NULL)
1129     return NULL;
1130   m = (struct GNUNET_MessageHeader*) &qe[1];
1131   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1132   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1133   process_queue (h);
1134   return qe;
1135 }
1136
1137
1138
1139 /**
1140  * Iterate over the results for a particular key
1141  * in the datastore.  The iterator will only be called
1142  * once initially; if the first call did contain a
1143  * result, further results can be obtained by calling
1144  * "GNUNET_DATASTORE_get_next" with the given argument.
1145  *
1146  * @param h handle to the datastore
1147  * @param key maybe NULL (to match all entries)
1148  * @param type desired type, 0 for any
1149  * @param queue_priority ranking of this request in the priority queue
1150  * @param max_queue_size at what queue size should this request be dropped
1151  *        (if other requests of higher priority are in the queue)
1152  * @param timeout how long to wait at most for a response
1153  * @param iter function to call on each matching value;
1154  *        will be called once with a NULL value at the end
1155  * @param iter_cls closure for iter
1156  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1157  *         cancel; note that even if NULL is returned, the callback will be invoked
1158  *         (or rather, will already have been invoked)
1159  */
1160 struct GNUNET_DATASTORE_QueueEntry *
1161 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1162                       const GNUNET_HashCode * key,
1163                       enum GNUNET_BLOCK_Type type,
1164                       unsigned int queue_priority,
1165                       unsigned int max_queue_size,
1166                       struct GNUNET_TIME_Relative timeout,
1167                       GNUNET_DATASTORE_Iterator iter, 
1168                       void *iter_cls)
1169 {
1170   struct GNUNET_DATASTORE_QueueEntry *qe;
1171   struct GetMessage *gm;
1172   struct ResultContext *rcont;
1173
1174 #if DEBUG_DATASTORE
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Asked to look for data of type %u under key `%s'\n",
1177               (unsigned int) type,
1178               GNUNET_h2s (key));
1179 #endif
1180   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1181   rcont->iter = iter;
1182   rcont->iter_cls = iter_cls;
1183   qe = make_queue_entry (h, sizeof(struct GetMessage),
1184                          queue_priority, max_queue_size, timeout,
1185                          &process_result_message, rcont);
1186   if (qe == NULL)
1187     return NULL;
1188   gm = (struct GetMessage*) &qe[1];
1189   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1190   gm->type = htonl(type);
1191   if (key != NULL)
1192     {
1193       gm->header.size = htons(sizeof (struct GetMessage));
1194       gm->key = *key;
1195     }
1196   else
1197     {
1198       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1199     }
1200   process_queue (h);
1201   return qe;
1202 }
1203
1204
1205 /**
1206  * Function called to trigger obtaining the next result
1207  * from the datastore.
1208  * 
1209  * @param h handle to the datastore
1210  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1211  *        iteration (with a final call to "iter" with key/data == NULL).
1212  */
1213 void 
1214 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1215                            int more)
1216 {
1217   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1218   struct ResultContext *rc = qe->client_ctx;
1219
1220   GNUNET_assert (NULL != qe);
1221   GNUNET_assert (&process_result_message == qe->response_proc);
1222   if (GNUNET_YES == more)
1223     {     
1224       GNUNET_CLIENT_receive (h->client,
1225                              qe->response_proc,
1226                              qe,
1227                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1228       return;
1229     }
1230   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1231                                h->queue_tail,
1232                                qe);
1233   GNUNET_free (qe);
1234   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1235   do_disconnect (h);
1236   rc->iter (rc->iter_cls,
1237             NULL, 0, NULL, 0, 0, 0, 
1238             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1239   GNUNET_free (rc);
1240 }
1241
1242
1243 /**
1244  * Cancel a datastore operation.  The final callback from the
1245  * operation must not have been done yet.
1246  * 
1247  * @param qe operation to cancel
1248  */
1249 void
1250 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1251 {
1252   struct GNUNET_DATASTORE_Handle *h;
1253   int reconnect;
1254
1255   h = qe->h;
1256   reconnect = qe->was_transmitted;
1257   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1258                                h->queue_tail,
1259                                qe);
1260   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
1261     GNUNET_SCHEDULER_cancel (h->sched,
1262                              qe->task);
1263   GNUNET_free (qe);
1264   if (reconnect)
1265     {
1266       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1267       do_disconnect (h);
1268     }
1269 }
1270
1271
1272 /* end of datastore_api.c */