f3db9446940ae8f20bf8838dd9b75bffd4bffae1
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct GNUNET_DATASTORE_QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct GNUNET_DATASTORE_QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct GNUNET_DATASTORE_QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct GNUNET_DATASTORE_QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct GNUNET_DATASTORE_QueueEntry *qe;
244
245   if (h->client != NULL)
246     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
247   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
248     GNUNET_SCHEDULER_cancel (h->sched,
249                              h->reconnect_task);
250   h->client = NULL;
251   while (NULL != (qe = h->queue_head))
252     {
253       GNUNET_CONTAINER_DLL_remove (h->queue_head,
254                                    h->queue_tail,
255                                    qe);
256       if (NULL != qe->response_proc)
257         qe->response_proc (qe, NULL);
258       GNUNET_free (qe);
259     }
260   if (GNUNET_YES == drop) 
261     {
262       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
263       if (h->client != NULL)
264         {
265           if (NULL != 
266               GNUNET_CLIENT_notify_transmit_ready (h->client,
267                                                    sizeof(struct GNUNET_MessageHeader),
268                                                    GNUNET_TIME_UNIT_MINUTES,
269                                                    GNUNET_YES,
270                                                    &transmit_drop,
271                                                    h))
272             return;
273           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
274         }
275       GNUNET_break (0);
276     }
277   GNUNET_free (h);
278 }
279
280
281 /**
282  * A request has timed out (before being transmitted to the service).
283  *
284  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
285  * @param tc scheduler context
286  */
287 static void
288 timeout_queue_entry (void *cls,
289                      const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
292
293   qe->task = GNUNET_SCHEDULER_NO_TASK;
294   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
295   qe->response_proc (qe, NULL);
296 }
297
298
299 /**
300  * Create a new entry for our priority queue (and possibly discard other entires if
301  * the queue is getting too long).
302  *
303  * @param h handle to the datastore
304  * @param msize size of the message to queue
305  * @param queue_priority priority of the entry
306  * @param max_queue_size at what queue size should this request be dropped
307  *        (if other requests of higher priority are in the queue)
308  * @param timeout timeout for the operation
309  * @param response_proc function to call with replies (can be NULL)
310  * @param client_ctx client context (NOT a closure for response_proc)
311  * @return NULL if the queue is full (and this entry was dropped)
312  */
313 static struct GNUNET_DATASTORE_QueueEntry *
314 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
315                   size_t msize,
316                   unsigned int queue_priority,
317                   unsigned int max_queue_size,
318                   struct GNUNET_TIME_Relative timeout,
319                   GNUNET_CLIENT_MessageHandler response_proc,            
320                   void *client_ctx)
321 {
322   struct GNUNET_DATASTORE_QueueEntry *ret;
323   struct GNUNET_DATASTORE_QueueEntry *pos;
324   unsigned int c;
325
326   c = 0;
327   pos = h->queue_head;
328   while ( (pos != NULL) &&
329           (c < max_queue_size) &&
330           (pos->priority >= queue_priority) )
331     {
332       c++;
333       pos = pos->next;
334     }
335   if (c >= max_queue_size)
336     return NULL;
337   if (pos == NULL)
338     {
339       /* append at the tail */
340       pos = h->queue_tail;
341     }
342   else
343     {
344       pos = pos->prev; 
345       /* do not insert at HEAD if HEAD query was already
346          transmitted and we are still receiving replies! */
347       if ( (pos == NULL) &&
348            (h->queue_head->was_transmitted) )
349         pos = h->queue_head;
350     }
351   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
352   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
353                                      h->queue_tail,
354                                      pos,
355                                      ret);
356   ret->h = h;
357   ret->response_proc = response_proc;
358   ret->client_ctx = client_ctx;
359   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
360                                             timeout,
361                                             &timeout_queue_entry,
362                                             ret);
363   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
364   ret->priority = queue_priority;
365   ret->max_queue = max_queue_size;
366   ret->message_size = msize;
367   ret->was_transmitted = GNUNET_NO;
368   h->queue_size++;
369   c++;
370   pos = ret->next;
371   while (pos != NULL) 
372     {
373       if (pos->max_queue < h->queue_size)
374         {
375           GNUNET_CONTAINER_DLL_remove (h->queue_head,
376                                        h->queue_tail,
377                                        pos);
378           GNUNET_SCHEDULER_cancel (h->sched,
379                                    pos->task);
380           if (pos->response_proc != NULL)
381             pos->response_proc (pos, NULL);
382           GNUNET_free (pos);
383           h->queue_size--;
384           break;
385         }
386       pos = pos->next;
387     }
388   return ret;
389 }
390
391
392 /**
393  * Process entries in the queue (or do nothing if we are already
394  * doing so).
395  * 
396  * @param h handle to the datastore
397  */
398 static void
399 process_queue (struct GNUNET_DATASTORE_Handle *h);
400
401
402 /**
403  * Try reconnecting to the datastore service.
404  *
405  * @param cls the 'struct GNUNET_DATASTORE_Handle'
406  * @param tc scheduler context
407  */
408 static void
409 try_reconnect (void *cls,
410                const struct GNUNET_SCHEDULER_TaskContext *tc)
411 {
412   struct GNUNET_DATASTORE_Handle *h = cls;
413
414   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
415     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
416   else
417     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
418   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
419     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
420   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
421   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
422   if (h->client == NULL)
423     return;
424   process_queue (h);
425 }
426
427
428 /**
429  * Disconnect from the service and then try reconnecting to the datastore service
430  * after some delay.
431  *
432  * @param cls the 'struct GNUNET_DATASTORE_Handle'
433  * @param tc scheduler context
434  */
435 static void
436 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
437 {
438   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
439   h->client = NULL;
440   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
441                                                     h->retry_time,
442                                                     &try_reconnect,
443                                                     h);      
444 }
445
446
447 /**
448  * Transmit request from queue to datastore service.
449  *
450  * @param cls the 'struct GNUNET_DATASTORE_Handle'
451  * @param size number of bytes that can be copied to buf
452  * @param buf where to copy the drop message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 transmit_request (void *cls,
457                   size_t size, 
458                   void *buf)
459 {
460   struct GNUNET_DATASTORE_Handle *h = cls;
461   struct GNUNET_DATASTORE_QueueEntry *qe;
462   size_t msize;
463
464   h->th = NULL;
465   if (NULL == (qe = h->queue_head))
466     return 0; /* no entry in queue */
467   if (buf == NULL)
468     {
469       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
470                   _("Failed to transmit request to database.\n"));
471       do_disconnect (h);
472       return 0;
473     }
474   if (size < (msize = qe->message_size))
475     {
476       process_queue (h);
477       return 0;
478     }
479   memcpy (buf, &qe[1], msize);
480   qe->was_transmitted = GNUNET_YES;
481   GNUNET_SCHEDULER_cancel (h->sched,
482                            qe->task);
483   qe->task = GNUNET_SCHEDULER_NO_TASK;
484   GNUNET_CLIENT_receive (h->client,
485                          qe->response_proc,
486                          qe,
487                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
488   return msize;
489 }
490
491
492 /**
493  * Process entries in the queue (or do nothing if we are already
494  * doing so).
495  * 
496  * @param h handle to the datastore
497  */
498 static void
499 process_queue (struct GNUNET_DATASTORE_Handle *h)
500 {
501   struct GNUNET_DATASTORE_QueueEntry *qe;
502
503   if (NULL == (qe = h->queue_head))
504     return; /* no entry in queue */
505   if (qe->was_transmitted == GNUNET_YES)
506     return; /* waiting for replies */
507   if (h->th != NULL)
508     return; /* request pending */
509   if (h->client == NULL)
510     return; /* waiting for reconnect */
511   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
512                                                qe->message_size,
513                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
514                                                GNUNET_YES,
515                                                &transmit_request,
516                                                h);
517 }
518
519
520
521
522 /**
523  * Context for processing status messages.
524  */
525 struct StatusContext
526 {
527   /**
528    * Continuation to call with the status.
529    */
530   GNUNET_DATASTORE_ContinuationWithStatus cont;
531
532   /**
533    * Closure for cont.
534    */
535   void *cont_cls;
536
537 };
538
539
540 /**
541  * Dummy continuation used to do nothing (but be non-zero).
542  *
543  * @param cls closure
544  * @param result result 
545  * @param emsg error message
546  */
547 static void
548 drop_status_cont (void *cls, int result, const char *emsg)
549 {
550   /* do nothing */
551 }
552
553
554 /**
555  * Type of a function to call when we receive a message
556  * from the service.
557  *
558  * @param cls closure
559  * @param msg message received, NULL on timeout or fatal error
560  */
561 static void 
562 process_status_message (void *cls,
563                         const struct
564                         GNUNET_MessageHeader * msg)
565 {
566   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
567   struct GNUNET_DATASTORE_Handle *h = qe->h;
568   struct StatusContext *rc = qe->client_ctx;
569   const struct StatusMessage *sm;
570   const char *emsg;
571   int32_t status;
572
573   GNUNET_CONTAINER_DLL_remove (h->queue_head,
574                                h->queue_tail,
575                                qe);
576   GNUNET_free (qe);
577   if (msg == NULL)
578     {
579       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
580                   _("Failed to receive response from database.\n"));
581       do_disconnect (h);
582       return;
583     }
584
585   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
586        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
587     {
588       GNUNET_break (0);
589       h->retry_time = GNUNET_TIME_UNIT_ZERO;
590       do_disconnect (h);
591       rc->cont (rc->cont_cls, 
592                 GNUNET_SYSERR,
593                 _("Error reading response from datastore service"));
594       GNUNET_free (rc);
595       return;
596     }
597   sm = (const struct StatusMessage*) msg;
598   status = ntohl(sm->status);
599   emsg = NULL;
600   if (ntohs(msg->size) > sizeof(struct StatusMessage))
601     {
602       emsg = (const char*) &sm[1];
603       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
604         {
605           GNUNET_break (0);
606           emsg = _("Invalid error message received from datastore service");
607         }
608     }  
609   if ( (status == GNUNET_SYSERR) &&
610        (emsg == NULL) )
611     {
612       GNUNET_break (0);
613       emsg = _("Invalid error message received from datastore service");
614     }
615 #if DEBUG_DATASTORE
616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617               "Received status %d/%s\n",
618               (int) status,
619               emsg);
620 #endif
621   rc->cont (rc->cont_cls, 
622             status,
623             emsg);
624   GNUNET_free (rc);  
625   process_queue (h);
626 }
627
628
629 /**
630  * Store an item in the datastore.  If the item is already present,
631  * the priorities are summed up and the higher expiration time and
632  * lower anonymity level is used.
633  *
634  * @param h handle to the datastore
635  * @param rid reservation ID to use (from "reserve"); use 0 if no
636  *            prior reservation was made
637  * @param key key for the value
638  * @param size number of bytes in data
639  * @param data content stored
640  * @param type type of the content
641  * @param priority priority of the content
642  * @param anonymity anonymity-level for the content
643  * @param expiration expiration time for the content
644  * @param queue_priority ranking of this request in the priority queue
645  * @param max_queue_size at what queue size should this request be dropped
646  *        (if other requests of higher priority are in the queue)
647  * @param timeout timeout for the operation
648  * @param cont continuation to call when done
649  * @param cont_cls closure for cont
650  * @return NULL if the entry was not queued, otherwise a handle that can be used to
651  *         cancel; note that even if NULL is returned, the callback will be invoked
652  *         (or rather, will already have been invoked)
653  */
654 struct GNUNET_DATASTORE_QueueEntry *
655 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
656                       int rid,
657                       const GNUNET_HashCode * key,
658                       uint32_t size,
659                       const void *data,
660                       enum GNUNET_BLOCK_Type type,
661                       uint32_t priority,
662                       uint32_t anonymity,
663                       struct GNUNET_TIME_Absolute expiration,
664                       unsigned int queue_priority,
665                       unsigned int max_queue_size,
666                       struct GNUNET_TIME_Relative timeout,
667                       GNUNET_DATASTORE_ContinuationWithStatus cont,
668                       void *cont_cls)
669 {
670   struct StatusContext *scont;
671   struct GNUNET_DATASTORE_QueueEntry *qe;
672   struct DataMessage *dm;
673   size_t msize;
674
675 #if DEBUG_DATASTORE
676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
677               "Asked to put %u bytes of data under key `%s'\n",
678               size,
679               GNUNET_h2s (key));
680 #endif
681   msize = sizeof(struct DataMessage) + size;
682   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
683   scont = GNUNET_malloc (sizeof (struct StatusContext));
684   scont->cont = cont;
685   scont->cont_cls = cont_cls;
686   qe = make_queue_entry (h, msize,
687                          queue_priority, max_queue_size, timeout,
688                          &process_status_message, scont);
689   if (qe == NULL)
690     return NULL;
691   dm = (struct DataMessage* ) &qe[1];
692   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
693   dm->header.size = htons(msize);
694   dm->rid = htonl(rid);
695   dm->size = htonl(size);
696   dm->type = htonl(type);
697   dm->priority = htonl(priority);
698   dm->anonymity = htonl(anonymity);
699   dm->uid = GNUNET_htonll(0);
700   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
701   dm->key = *key;
702   memcpy (&dm[1], data, size);
703   process_queue (h);
704   return qe;
705 }
706
707
708 /**
709  * Reserve space in the datastore.  This function should be used
710  * to avoid "out of space" failures during a longer sequence of "put"
711  * operations (for example, when a file is being inserted).
712  *
713  * @param h handle to the datastore
714  * @param amount how much space (in bytes) should be reserved (for content only)
715  * @param entries how many entries will be created (to calculate per-entry overhead)
716  * @param queue_priority ranking of this request in the priority queue
717  * @param max_queue_size at what queue size should this request be dropped
718  *        (if other requests of higher priority are in the queue)
719  * @param timeout how long to wait at most for a response (or before dying in queue)
720  * @param cont continuation to call when done; "success" will be set to
721  *             a positive reservation value if space could be reserved.
722  * @param cont_cls closure for cont
723  * @return NULL if the entry was not queued, otherwise a handle that can be used to
724  *         cancel; note that even if NULL is returned, the callback will be invoked
725  *         (or rather, will already have been invoked)
726  */
727 struct GNUNET_DATASTORE_QueueEntry *
728 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
729                           uint64_t amount,
730                           uint32_t entries,
731                           unsigned int queue_priority,
732                           unsigned int max_queue_size,
733                           struct GNUNET_TIME_Relative timeout,
734                           GNUNET_DATASTORE_ContinuationWithStatus cont,
735                           void *cont_cls)
736 {
737   struct GNUNET_DATASTORE_QueueEntry *qe;
738   struct ReserveMessage *rm;
739   struct StatusContext *scont;
740
741   if (cont == NULL)
742     cont = &drop_status_cont;
743 #if DEBUG_DATASTORE
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Asked to reserve %llu bytes of data and %u entries'\n",
746               (unsigned long long) amount,
747               (unsigned int) entries);
748 #endif
749   scont = GNUNET_malloc (sizeof (struct StatusContext));
750   scont->cont = cont;
751   scont->cont_cls = cont_cls;
752   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
753                          queue_priority, max_queue_size, timeout,
754                          &process_status_message, scont);
755   if (qe == NULL)
756     return NULL;
757   rm = (struct ReserveMessage*) &qe[1];
758   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
759   rm->header.size = htons(sizeof (struct ReserveMessage));
760   rm->entries = htonl(entries);
761   rm->amount = GNUNET_htonll(amount);
762   process_queue (h);
763   return qe;
764 }
765
766
767 /**
768  * Signal that all of the data for which a reservation was made has
769  * been stored and that whatever excess space might have been reserved
770  * can now be released.
771  *
772  * @param h handle to the datastore
773  * @param rid reservation ID (value of "success" in original continuation
774  *        from the "reserve" function).
775  * @param queue_priority ranking of this request in the priority queue
776  * @param max_queue_size at what queue size should this request be dropped
777  *        (if other requests of higher priority are in the queue)
778  * @param queue_priority ranking of this request in the priority queue
779  * @param max_queue_size at what queue size should this request be dropped
780  *        (if other requests of higher priority are in the queue)
781  * @param timeout how long to wait at most for a response
782  * @param cont continuation to call when done
783  * @param cont_cls closure for cont
784  * @return NULL if the entry was not queued, otherwise a handle that can be used to
785  *         cancel; note that even if NULL is returned, the callback will be invoked
786  *         (or rather, will already have been invoked)
787  */
788 struct GNUNET_DATASTORE_QueueEntry *
789 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
790                                   int rid,
791                                   unsigned int queue_priority,
792                                   unsigned int max_queue_size,
793                                   struct GNUNET_TIME_Relative timeout,
794                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
795                                   void *cont_cls)
796 {
797   struct GNUNET_DATASTORE_QueueEntry *qe;
798   struct ReleaseReserveMessage *rrm;
799   struct StatusContext *scont;
800
801   if (cont == NULL)
802     cont = &drop_status_cont;
803 #if DEBUG_DATASTORE
804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805               "Asked to release reserve %d\n",
806               rid);
807 #endif
808   scont = GNUNET_malloc (sizeof (struct StatusContext));
809   scont->cont = cont;
810   scont->cont_cls = cont_cls;
811   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
812                          queue_priority, max_queue_size, timeout,
813                          &process_status_message, scont);
814   if (qe == NULL)
815     return NULL;
816   rrm = (struct ReleaseReserveMessage*) &qe[1];
817   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
818   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
819   rrm->rid = htonl(rid);
820   process_queue (h);
821   return qe;
822 }
823
824
825 /**
826  * Update a value in the datastore.
827  *
828  * @param h handle to the datastore
829  * @param uid identifier for the value
830  * @param priority how much to increase the priority of the value
831  * @param expiration new expiration value should be MAX of existing and this argument
832  * @param queue_priority ranking of this request in the priority queue
833  * @param max_queue_size at what queue size should this request be dropped
834  *        (if other requests of higher priority are in the queue)
835  * @param timeout how long to wait at most for a response
836  * @param cont continuation to call when done
837  * @param cont_cls closure for cont
838  * @return NULL if the entry was not queued, otherwise a handle that can be used to
839  *         cancel; note that even if NULL is returned, the callback will be invoked
840  *         (or rather, will already have been invoked)
841  */
842 struct GNUNET_DATASTORE_QueueEntry *
843 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
844                          unsigned long long uid,
845                          uint32_t priority,
846                          struct GNUNET_TIME_Absolute expiration,
847                          unsigned int queue_priority,
848                          unsigned int max_queue_size,
849                          struct GNUNET_TIME_Relative timeout,
850                          GNUNET_DATASTORE_ContinuationWithStatus cont,
851                          void *cont_cls)
852 {
853   struct GNUNET_DATASTORE_QueueEntry *qe;
854   struct UpdateMessage *um;
855   struct StatusContext *scont;
856
857   if (cont == NULL)
858     cont = &drop_status_cont;
859 #if DEBUG_DATASTORE
860   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
861               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
862               uid,
863               (unsigned int) priority,
864               (unsigned long long) expiration.value);
865 #endif
866   scont = GNUNET_malloc (sizeof (struct StatusContext));
867   scont->cont = cont;
868   scont->cont_cls = cont_cls;
869   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
870                          queue_priority, max_queue_size, timeout,
871                          &process_status_message, scont);
872   if (qe == NULL)
873     return NULL;
874   um = (struct UpdateMessage*) &qe[1];
875   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
876   um->header.size = htons(sizeof (struct UpdateMessage));
877   um->priority = htonl(priority);
878   um->expiration = GNUNET_TIME_absolute_hton(expiration);
879   um->uid = GNUNET_htonll(uid);
880   process_queue (h);
881   return qe;
882 }
883
884
885 /**
886  * Explicitly remove some content from the database.
887  * The "cont"inuation will be called with status
888  * "GNUNET_OK" if content was removed, "GNUNET_NO"
889  * if no matching entry was found and "GNUNET_SYSERR"
890  * on all other types of errors.
891  *
892  * @param h handle to the datastore
893  * @param key key for the value
894  * @param size number of bytes in data
895  * @param data content stored
896  * @param queue_priority ranking of this request in the priority queue
897  * @param max_queue_size at what queue size should this request be dropped
898  *        (if other requests of higher priority are in the queue)
899  * @param timeout how long to wait at most for a response
900  * @param cont continuation to call when done
901  * @param cont_cls closure for cont
902  * @return NULL if the entry was not queued, otherwise a handle that can be used to
903  *         cancel; note that even if NULL is returned, the callback will be invoked
904  *         (or rather, will already have been invoked)
905  */
906 struct GNUNET_DATASTORE_QueueEntry *
907 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
908                          const GNUNET_HashCode *key,
909                          uint32_t size, 
910                          const void *data,
911                          unsigned int queue_priority,
912                          unsigned int max_queue_size,
913                          struct GNUNET_TIME_Relative timeout,
914                          GNUNET_DATASTORE_ContinuationWithStatus cont,
915                          void *cont_cls)
916 {
917   struct GNUNET_DATASTORE_QueueEntry *qe;
918   struct DataMessage *dm;
919   size_t msize;
920   struct StatusContext *scont;
921
922   if (cont == NULL)
923     cont = &drop_status_cont;
924 #if DEBUG_DATASTORE
925   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
926               "Asked to remove %u bytes under key `%s'\n",
927               size,
928               GNUNET_h2s (key));
929 #endif
930   scont = GNUNET_malloc (sizeof (struct StatusContext));
931   scont->cont = cont;
932   scont->cont_cls = cont_cls;
933   msize = sizeof(struct DataMessage) + size;
934   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
935   qe = make_queue_entry (h, msize,
936                          queue_priority, max_queue_size, timeout,
937                          &process_status_message, scont);
938   if (qe == NULL)
939     return NULL;
940   dm = (struct DataMessage*) &qe[1];
941   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
942   dm->header.size = htons(msize);
943   dm->rid = htonl(0);
944   dm->size = htonl(size);
945   dm->type = htonl(0);
946   dm->priority = htonl(0);
947   dm->anonymity = htonl(0);
948   dm->uid = GNUNET_htonll(0);
949   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
950   dm->key = *key;
951   memcpy (&dm[1], data, size);
952   process_queue (h);
953   return qe;
954 }
955
956
957
958 /**
959  * Context for processing result messages.
960  */
961 struct ResultContext
962 {
963   /**
964    * Iterator to call with the result.
965    */
966   GNUNET_DATASTORE_Iterator iter;
967
968   /**
969    * Closure for iter.
970    */
971   void *iter_cls;
972
973 };
974
975
976 /**
977  * Type of a function to call when we receive a message
978  * from the service.
979  *
980  * @param cls closure
981  * @param msg message received, NULL on timeout or fatal error
982  */
983 static void 
984 process_result_message (void *cls,
985                         const struct GNUNET_MessageHeader * msg)
986 {
987   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
988   struct GNUNET_DATASTORE_Handle *h = qe->h;
989   struct ResultContext *rc = qe->client_ctx;
990   const struct DataMessage *dm;
991
992   if (msg == NULL)
993     {
994 #if DEBUG_DATASTORE
995       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
996                   _("Failed to receive response from datastore\n"));
997 #endif
998       GNUNET_CONTAINER_DLL_remove (h->queue_head,
999                                    h->queue_tail,
1000                                    qe);
1001       GNUNET_free (qe);
1002       do_disconnect (h);
1003       rc->iter (rc->iter_cls,
1004                 NULL, 0, NULL, 0, 0, 0, 
1005                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1006       GNUNET_free (rc);
1007       return;
1008     }
1009   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1010     {
1011       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1012 #if DEBUG_DATASTORE
1013       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1014                   "Received end of result set\n");
1015 #endif
1016       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1017                                    h->queue_tail,
1018                                    qe);
1019       GNUNET_free (qe);
1020       rc->iter (rc->iter_cls,
1021                 NULL, 0, NULL, 0, 0, 0, 
1022                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1023       GNUNET_free (rc);
1024       process_queue (h);
1025       return;
1026     }
1027   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1028        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1029        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1030     {
1031       GNUNET_break (0);
1032       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1033                                    h->queue_tail,
1034                                    qe);
1035       GNUNET_free (qe);
1036       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1037       do_disconnect (h);
1038       rc->iter (rc->iter_cls,
1039                 NULL, 0, NULL, 0, 0, 0, 
1040                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1041       GNUNET_free (rc);
1042       return;
1043     }
1044   dm = (const struct DataMessage*) msg;
1045 #if DEBUG_DATASTORE
1046   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047               "Received result %llu with type %u and size %u with key %s\n",
1048               (unsigned long long) GNUNET_ntohll(dm->uid),
1049               ntohl(dm->type),
1050               ntohl(dm->size),
1051               GNUNET_h2s(&dm->key));
1052 #endif
1053   rc->iter (rc->iter_cls,
1054             &dm->key,
1055             ntohl(dm->size),
1056             &dm[1],
1057             ntohl(dm->type),
1058             ntohl(dm->priority),
1059             ntohl(dm->anonymity),
1060             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1061             GNUNET_ntohll(dm->uid));
1062 }
1063
1064
1065 /**
1066  * Get a random value from the datastore.
1067  *
1068  * @param h handle to the datastore
1069  * @param queue_priority ranking of this request in the priority queue
1070  * @param max_queue_size at what queue size should this request be dropped
1071  *        (if other requests of higher priority are in the queue)
1072  * @param timeout how long to wait at most for a response
1073  * @param iter function to call on a random value; it
1074  *        will be called once with a value (if available)
1075  *        and always once with a value of NULL.
1076  * @param iter_cls closure for iter
1077  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1078  *         cancel; note that even if NULL is returned, the callback will be invoked
1079  *         (or rather, will already have been invoked)
1080  */
1081 struct GNUNET_DATASTORE_QueueEntry *
1082 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1083                              unsigned int queue_priority,
1084                              unsigned int max_queue_size,
1085                              struct GNUNET_TIME_Relative timeout,
1086                              GNUNET_DATASTORE_Iterator iter, 
1087                              void *iter_cls)
1088 {
1089   struct GNUNET_DATASTORE_QueueEntry *qe;
1090   struct GNUNET_MessageHeader *m;
1091   struct ResultContext *rcont;
1092
1093 #if DEBUG_DATASTORE
1094   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1095               "Asked to get random entry in %llu ms\n",
1096               (unsigned long long) timeout.value);
1097 #endif
1098   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1099   rcont->iter = iter;
1100   rcont->iter_cls = iter_cls;
1101   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1102                          queue_priority, max_queue_size, timeout,
1103                          &process_result_message, rcont);
1104   if (qe == NULL)
1105     return NULL;
1106   m = (struct GNUNET_MessageHeader*) &qe[1];
1107   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1108   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1109   process_queue (h);
1110   return qe;
1111 }
1112
1113
1114
1115 /**
1116  * Iterate over the results for a particular key
1117  * in the datastore.  The iterator will only be called
1118  * once initially; if the first call did contain a
1119  * result, further results can be obtained by calling
1120  * "GNUNET_DATASTORE_get_next" with the given argument.
1121  *
1122  * @param h handle to the datastore
1123  * @param key maybe NULL (to match all entries)
1124  * @param type desired type, 0 for any
1125  * @param queue_priority ranking of this request in the priority queue
1126  * @param max_queue_size at what queue size should this request be dropped
1127  *        (if other requests of higher priority are in the queue)
1128  * @param timeout how long to wait at most for a response
1129  * @param iter function to call on each matching value;
1130  *        will be called once with a NULL value at the end
1131  * @param iter_cls closure for iter
1132  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1133  *         cancel; note that even if NULL is returned, the callback will be invoked
1134  *         (or rather, will already have been invoked)
1135  */
1136 struct GNUNET_DATASTORE_QueueEntry *
1137 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1138                       const GNUNET_HashCode * key,
1139                       enum GNUNET_BLOCK_Type type,
1140                       unsigned int queue_priority,
1141                       unsigned int max_queue_size,
1142                       struct GNUNET_TIME_Relative timeout,
1143                       GNUNET_DATASTORE_Iterator iter, 
1144                       void *iter_cls)
1145 {
1146   struct GNUNET_DATASTORE_QueueEntry *qe;
1147   struct GetMessage *gm;
1148   struct ResultContext *rcont;
1149
1150 #if DEBUG_DATASTORE
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Asked to look for data of type %u under key `%s'\n",
1153               (unsigned int) type,
1154               GNUNET_h2s (key));
1155 #endif
1156   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1157   rcont->iter = iter;
1158   rcont->iter_cls = iter_cls;
1159   qe = make_queue_entry (h, sizeof(struct GetMessage),
1160                          queue_priority, max_queue_size, timeout,
1161                          &process_result_message, rcont);
1162   if (qe == NULL)
1163     return NULL;
1164   gm = (struct GetMessage*) &qe[1];
1165   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1166   gm->type = htonl(type);
1167   if (key != NULL)
1168     {
1169       gm->header.size = htons(sizeof (struct GetMessage));
1170       gm->key = *key;
1171     }
1172   else
1173     {
1174       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1175     }
1176   process_queue (h);
1177   return qe;
1178 }
1179
1180
1181 /**
1182  * Function called to trigger obtaining the next result
1183  * from the datastore.
1184  * 
1185  * @param h handle to the datastore
1186  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1187  *        iteration (with a final call to "iter" with key/data == NULL).
1188  */
1189 void 
1190 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1191                            int more)
1192 {
1193   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1194   struct ResultContext *rc = qe->client_ctx;
1195
1196   GNUNET_assert (NULL != qe);
1197   GNUNET_assert (&process_result_message == qe->response_proc);
1198   if (GNUNET_YES == more)
1199     {     
1200       GNUNET_CLIENT_receive (h->client,
1201                              qe->response_proc,
1202                              qe,
1203                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1204       return;
1205     }
1206   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1207                                h->queue_tail,
1208                                qe);
1209   GNUNET_free (qe);
1210   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1211   do_disconnect (h);
1212   rc->iter (rc->iter_cls,
1213             NULL, 0, NULL, 0, 0, 0, 
1214             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1215   GNUNET_free (rc);
1216 }
1217
1218
1219 /**
1220  * Cancel a datastore operation.  The final callback from the
1221  * operation must not have been done yet.
1222  * 
1223  * @param qe operation to cancel
1224  */
1225 void
1226 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1227 {
1228   struct GNUNET_DATASTORE_Handle *h;
1229   int reconnect;
1230
1231   h = qe->h;
1232   reconnect = qe->was_transmitted;
1233   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1234                                h->queue_tail,
1235                                qe);
1236   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
1237     GNUNET_SCHEDULER_cancel (h->sched,
1238                              qe->task);
1239   GNUNET_free (qe);
1240   if (reconnect)
1241     {
1242       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1243       do_disconnect (h);
1244     }
1245 }
1246
1247
1248 /* end of datastore_api.c */