use v2
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct GNUNET_DATASTORE_QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct GNUNET_DATASTORE_QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct GNUNET_DATASTORE_QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct GNUNET_DATASTORE_QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct GNUNET_DATASTORE_QueueEntry *qe;
244
245   if (h->client != NULL)
246     {
247       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
248       h->client = NULL;
249     }
250   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
251     {
252       GNUNET_SCHEDULER_cancel (h->sched,
253                                h->reconnect_task);
254       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
255     }
256   while (NULL != (qe = h->queue_head))
257     {
258       GNUNET_assert (NULL != qe->response_proc);
259       qe->response_proc (qe, NULL);
260     }
261   if (GNUNET_YES == drop) 
262     {
263       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
264       if (h->client != NULL)
265         {
266           if (NULL != 
267               GNUNET_CLIENT_notify_transmit_ready (h->client,
268                                                    sizeof(struct GNUNET_MessageHeader),
269                                                    GNUNET_TIME_UNIT_MINUTES,
270                                                    GNUNET_YES,
271                                                    &transmit_drop,
272                                                    h))
273             return;
274           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
275         }
276       GNUNET_break (0);
277     }
278   GNUNET_free (h);
279 }
280
281
282 /**
283  * A request has timed out (before being transmitted to the service).
284  *
285  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
286  * @param tc scheduler context
287  */
288 static void
289 timeout_queue_entry (void *cls,
290                      const struct GNUNET_SCHEDULER_TaskContext *tc)
291 {
292   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
293
294   qe->task = GNUNET_SCHEDULER_NO_TASK;
295   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
296   qe->response_proc (qe, NULL);
297 }
298
299
300 /**
301  * Create a new entry for our priority queue (and possibly discard other entires if
302  * the queue is getting too long).
303  *
304  * @param h handle to the datastore
305  * @param msize size of the message to queue
306  * @param queue_priority priority of the entry
307  * @param max_queue_size at what queue size should this request be dropped
308  *        (if other requests of higher priority are in the queue)
309  * @param timeout timeout for the operation
310  * @param response_proc function to call with replies (can be NULL)
311  * @param client_ctx client context (NOT a closure for response_proc)
312  * @return NULL if the queue is full (and this entry was dropped)
313  */
314 static struct GNUNET_DATASTORE_QueueEntry *
315 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
316                   size_t msize,
317                   unsigned int queue_priority,
318                   unsigned int max_queue_size,
319                   struct GNUNET_TIME_Relative timeout,
320                   GNUNET_CLIENT_MessageHandler response_proc,            
321                   void *client_ctx)
322 {
323   struct GNUNET_DATASTORE_QueueEntry *ret;
324   struct GNUNET_DATASTORE_QueueEntry *pos;
325   unsigned int c;
326
327   c = 0;
328   pos = h->queue_head;
329   while ( (pos != NULL) &&
330           (c < max_queue_size) &&
331           (pos->priority >= queue_priority) )
332     {
333       c++;
334       pos = pos->next;
335     }
336   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
337   ret->h = h;
338   ret->response_proc = response_proc;
339   ret->client_ctx = client_ctx;
340   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
341   ret->priority = queue_priority;
342   ret->max_queue = max_queue_size;
343   ret->message_size = msize;
344   ret->was_transmitted = GNUNET_NO;
345   if (pos == NULL)
346     {
347       /* append at the tail */
348       pos = h->queue_tail;
349     }
350   else
351     {
352       pos = pos->prev; 
353       /* do not insert at HEAD if HEAD query was already
354          transmitted and we are still receiving replies! */
355       if ( (pos == NULL) &&
356            (h->queue_head->was_transmitted) )
357         pos = h->queue_head;
358     }
359   c++;
360   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
361                                      h->queue_tail,
362                                      pos,
363                                      ret);
364   h->queue_size++;
365   if (c > max_queue_size)
366     {
367       response_proc (ret, NULL);
368       GNUNET_free (ret);
369       return NULL;
370     }
371   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
372                                             timeout,
373                                             &timeout_queue_entry,
374                                             ret);
375   pos = ret->next;
376   while (pos != NULL) 
377     {
378       if (pos->max_queue < h->queue_size)
379         {
380           GNUNET_assert (pos->response_proc != NULL);
381           pos->response_proc (pos, NULL);
382           break;
383         }
384       pos = pos->next;
385     }
386   return ret;
387 }
388
389
390 /**
391  * Process entries in the queue (or do nothing if we are already
392  * doing so).
393  * 
394  * @param h handle to the datastore
395  */
396 static void
397 process_queue (struct GNUNET_DATASTORE_Handle *h);
398
399
400 /**
401  * Try reconnecting to the datastore service.
402  *
403  * @param cls the 'struct GNUNET_DATASTORE_Handle'
404  * @param tc scheduler context
405  */
406 static void
407 try_reconnect (void *cls,
408                const struct GNUNET_SCHEDULER_TaskContext *tc)
409 {
410   struct GNUNET_DATASTORE_Handle *h = cls;
411
412   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
413     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
414   else
415     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
416   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
417     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
418   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
419   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
420   if (h->client == NULL)
421     return;
422   process_queue (h);
423 }
424
425
426 /**
427  * Disconnect from the service and then try reconnecting to the datastore service
428  * after some delay.
429  *
430  * @param h handle to datastore to disconnect and reconnect
431  */
432 static void
433 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
434 {
435   if (h->client == NULL)
436     return;
437   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
438   h->client = NULL;
439   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
440                                                     h->retry_time,
441                                                     &try_reconnect,
442                                                     h);      
443 }
444
445
446 /**
447  * Transmit request from queue to datastore service.
448  *
449  * @param cls the 'struct GNUNET_DATASTORE_Handle'
450  * @param size number of bytes that can be copied to buf
451  * @param buf where to copy the drop message
452  * @return number of bytes written to buf
453  */
454 static size_t
455 transmit_request (void *cls,
456                   size_t size, 
457                   void *buf)
458 {
459   struct GNUNET_DATASTORE_Handle *h = cls;
460   struct GNUNET_DATASTORE_QueueEntry *qe;
461   size_t msize;
462
463   h->th = NULL;
464   if (NULL == (qe = h->queue_head))
465     return 0; /* no entry in queue */
466   if (buf == NULL)
467     {
468       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
469                   _("Failed to transmit request to database.\n"));
470       do_disconnect (h);
471       return 0;
472     }
473   if (size < (msize = qe->message_size))
474     {
475       process_queue (h);
476       return 0;
477     }
478   memcpy (buf, &qe[1], msize);
479   qe->was_transmitted = GNUNET_YES;
480   GNUNET_SCHEDULER_cancel (h->sched,
481                            qe->task);
482   qe->task = GNUNET_SCHEDULER_NO_TASK;
483   GNUNET_CLIENT_receive (h->client,
484                          qe->response_proc,
485                          qe,
486                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
487   return msize;
488 }
489
490
491 /**
492  * Process entries in the queue (or do nothing if we are already
493  * doing so).
494  * 
495  * @param h handle to the datastore
496  */
497 static void
498 process_queue (struct GNUNET_DATASTORE_Handle *h)
499 {
500   struct GNUNET_DATASTORE_QueueEntry *qe;
501
502   if (NULL == (qe = h->queue_head))
503     return; /* no entry in queue */
504   if (qe->was_transmitted == GNUNET_YES)
505     return; /* waiting for replies */
506   if (h->th != NULL)
507     return; /* request pending */
508   if (h->client == NULL)
509     return; /* waiting for reconnect */
510   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
511                                                qe->message_size,
512                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
513                                                GNUNET_YES,
514                                                &transmit_request,
515                                                h);
516 }
517
518
519
520
521 /**
522  * Context for processing status messages.
523  */
524 struct StatusContext
525 {
526   /**
527    * Continuation to call with the status.
528    */
529   GNUNET_DATASTORE_ContinuationWithStatus cont;
530
531   /**
532    * Closure for cont.
533    */
534   void *cont_cls;
535
536 };
537
538
539 /**
540  * Dummy continuation used to do nothing (but be non-zero).
541  *
542  * @param cls closure
543  * @param result result 
544  * @param emsg error message
545  */
546 static void
547 drop_status_cont (void *cls, int result, const char *emsg)
548 {
549   /* do nothing */
550 }
551
552
553 static void
554 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
555 {
556   struct GNUNET_DATASTORE_Handle *h = qe->h;
557
558   GNUNET_CONTAINER_DLL_remove (h->queue_head,
559                                h->queue_tail,
560                                qe);
561   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
562     {
563       GNUNET_SCHEDULER_cancel (h->sched,
564                                qe->task);
565       qe->task = GNUNET_SCHEDULER_NO_TASK;
566     }
567   h->queue_size--;
568   GNUNET_free (qe);
569 }
570
571 /**
572  * Type of a function to call when we receive a message
573  * from the service.
574  *
575  * @param cls closure
576  * @param msg message received, NULL on timeout or fatal error
577  */
578 static void 
579 process_status_message (void *cls,
580                         const struct
581                         GNUNET_MessageHeader * msg)
582 {
583   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
584   struct GNUNET_DATASTORE_Handle *h = qe->h;
585   struct StatusContext *rc = qe->client_ctx;
586   const struct StatusMessage *sm;
587   const char *emsg;
588   int32_t status;
589
590   free_queue_entry (qe);
591   if (msg == NULL)
592     {      
593       if (NULL == h->client)
594         return; /* forced disconnect */
595       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
596                   _("Failed to receive response from database.\n"));
597       do_disconnect (h);
598       return;
599     }
600
601   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
602        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
603     {
604       GNUNET_break (0);
605       h->retry_time = GNUNET_TIME_UNIT_ZERO;
606       do_disconnect (h);
607       rc->cont (rc->cont_cls, 
608                 GNUNET_SYSERR,
609                 _("Error reading response from datastore service"));
610       GNUNET_free (rc);
611       return;
612     }
613   sm = (const struct StatusMessage*) msg;
614   status = ntohl(sm->status);
615   emsg = NULL;
616   if (ntohs(msg->size) > sizeof(struct StatusMessage))
617     {
618       emsg = (const char*) &sm[1];
619       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
620         {
621           GNUNET_break (0);
622           emsg = _("Invalid error message received from datastore service");
623         }
624     }  
625   if ( (status == GNUNET_SYSERR) &&
626        (emsg == NULL) )
627     {
628       GNUNET_break (0);
629       emsg = _("Invalid error message received from datastore service");
630     }
631 #if DEBUG_DATASTORE
632   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
633               "Received status %d/%s\n",
634               (int) status,
635               emsg);
636 #endif
637   rc->cont (rc->cont_cls, 
638             status,
639             emsg);
640   GNUNET_free (rc);  
641   process_queue (h);
642 }
643
644
645 /**
646  * Store an item in the datastore.  If the item is already present,
647  * the priorities are summed up and the higher expiration time and
648  * lower anonymity level is used.
649  *
650  * @param h handle to the datastore
651  * @param rid reservation ID to use (from "reserve"); use 0 if no
652  *            prior reservation was made
653  * @param key key for the value
654  * @param size number of bytes in data
655  * @param data content stored
656  * @param type type of the content
657  * @param priority priority of the content
658  * @param anonymity anonymity-level for the content
659  * @param expiration expiration time for the content
660  * @param queue_priority ranking of this request in the priority queue
661  * @param max_queue_size at what queue size should this request be dropped
662  *        (if other requests of higher priority are in the queue)
663  * @param timeout timeout for the operation
664  * @param cont continuation to call when done
665  * @param cont_cls closure for cont
666  * @return NULL if the entry was not queued, otherwise a handle that can be used to
667  *         cancel; note that even if NULL is returned, the callback will be invoked
668  *         (or rather, will already have been invoked)
669  */
670 struct GNUNET_DATASTORE_QueueEntry *
671 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
672                       int rid,
673                       const GNUNET_HashCode * key,
674                       uint32_t size,
675                       const void *data,
676                       enum GNUNET_BLOCK_Type type,
677                       uint32_t priority,
678                       uint32_t anonymity,
679                       struct GNUNET_TIME_Absolute expiration,
680                       unsigned int queue_priority,
681                       unsigned int max_queue_size,
682                       struct GNUNET_TIME_Relative timeout,
683                       GNUNET_DATASTORE_ContinuationWithStatus cont,
684                       void *cont_cls)
685 {
686   struct StatusContext *scont;
687   struct GNUNET_DATASTORE_QueueEntry *qe;
688   struct DataMessage *dm;
689   size_t msize;
690
691 #if DEBUG_DATASTORE
692   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
693               "Asked to put %u bytes of data under key `%s'\n",
694               size,
695               GNUNET_h2s (key));
696 #endif
697   msize = sizeof(struct DataMessage) + size;
698   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
699   scont = GNUNET_malloc (sizeof (struct StatusContext));
700   scont->cont = cont;
701   scont->cont_cls = cont_cls;
702   qe = make_queue_entry (h, msize,
703                          queue_priority, max_queue_size, timeout,
704                          &process_status_message, scont);
705   if (qe == NULL)
706     return NULL;
707   dm = (struct DataMessage* ) &qe[1];
708   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
709   dm->header.size = htons(msize);
710   dm->rid = htonl(rid);
711   dm->size = htonl(size);
712   dm->type = htonl(type);
713   dm->priority = htonl(priority);
714   dm->anonymity = htonl(anonymity);
715   dm->uid = GNUNET_htonll(0);
716   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
717   dm->key = *key;
718   memcpy (&dm[1], data, size);
719   process_queue (h);
720   return qe;
721 }
722
723
724 /**
725  * Reserve space in the datastore.  This function should be used
726  * to avoid "out of space" failures during a longer sequence of "put"
727  * operations (for example, when a file is being inserted).
728  *
729  * @param h handle to the datastore
730  * @param amount how much space (in bytes) should be reserved (for content only)
731  * @param entries how many entries will be created (to calculate per-entry overhead)
732  * @param queue_priority ranking of this request in the priority queue
733  * @param max_queue_size at what queue size should this request be dropped
734  *        (if other requests of higher priority are in the queue)
735  * @param timeout how long to wait at most for a response (or before dying in queue)
736  * @param cont continuation to call when done; "success" will be set to
737  *             a positive reservation value if space could be reserved.
738  * @param cont_cls closure for cont
739  * @return NULL if the entry was not queued, otherwise a handle that can be used to
740  *         cancel; note that even if NULL is returned, the callback will be invoked
741  *         (or rather, will already have been invoked)
742  */
743 struct GNUNET_DATASTORE_QueueEntry *
744 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
745                           uint64_t amount,
746                           uint32_t entries,
747                           unsigned int queue_priority,
748                           unsigned int max_queue_size,
749                           struct GNUNET_TIME_Relative timeout,
750                           GNUNET_DATASTORE_ContinuationWithStatus cont,
751                           void *cont_cls)
752 {
753   struct GNUNET_DATASTORE_QueueEntry *qe;
754   struct ReserveMessage *rm;
755   struct StatusContext *scont;
756
757   if (cont == NULL)
758     cont = &drop_status_cont;
759 #if DEBUG_DATASTORE
760   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
761               "Asked to reserve %llu bytes of data and %u entries'\n",
762               (unsigned long long) amount,
763               (unsigned int) entries);
764 #endif
765   scont = GNUNET_malloc (sizeof (struct StatusContext));
766   scont->cont = cont;
767   scont->cont_cls = cont_cls;
768   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
769                          queue_priority, max_queue_size, timeout,
770                          &process_status_message, scont);
771   if (qe == NULL)
772     return NULL;
773   rm = (struct ReserveMessage*) &qe[1];
774   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
775   rm->header.size = htons(sizeof (struct ReserveMessage));
776   rm->entries = htonl(entries);
777   rm->amount = GNUNET_htonll(amount);
778   process_queue (h);
779   return qe;
780 }
781
782
783 /**
784  * Signal that all of the data for which a reservation was made has
785  * been stored and that whatever excess space might have been reserved
786  * can now be released.
787  *
788  * @param h handle to the datastore
789  * @param rid reservation ID (value of "success" in original continuation
790  *        from the "reserve" function).
791  * @param queue_priority ranking of this request in the priority queue
792  * @param max_queue_size at what queue size should this request be dropped
793  *        (if other requests of higher priority are in the queue)
794  * @param queue_priority ranking of this request in the priority queue
795  * @param max_queue_size at what queue size should this request be dropped
796  *        (if other requests of higher priority are in the queue)
797  * @param timeout how long to wait at most for a response
798  * @param cont continuation to call when done
799  * @param cont_cls closure for cont
800  * @return NULL if the entry was not queued, otherwise a handle that can be used to
801  *         cancel; note that even if NULL is returned, the callback will be invoked
802  *         (or rather, will already have been invoked)
803  */
804 struct GNUNET_DATASTORE_QueueEntry *
805 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
806                                   int rid,
807                                   unsigned int queue_priority,
808                                   unsigned int max_queue_size,
809                                   struct GNUNET_TIME_Relative timeout,
810                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
811                                   void *cont_cls)
812 {
813   struct GNUNET_DATASTORE_QueueEntry *qe;
814   struct ReleaseReserveMessage *rrm;
815   struct StatusContext *scont;
816
817   if (cont == NULL)
818     cont = &drop_status_cont;
819 #if DEBUG_DATASTORE
820   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821               "Asked to release reserve %d\n",
822               rid);
823 #endif
824   scont = GNUNET_malloc (sizeof (struct StatusContext));
825   scont->cont = cont;
826   scont->cont_cls = cont_cls;
827   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
828                          queue_priority, max_queue_size, timeout,
829                          &process_status_message, scont);
830   if (qe == NULL)
831     return NULL;
832   rrm = (struct ReleaseReserveMessage*) &qe[1];
833   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
834   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
835   rrm->rid = htonl(rid);
836   process_queue (h);
837   return qe;
838 }
839
840
841 /**
842  * Update a value in the datastore.
843  *
844  * @param h handle to the datastore
845  * @param uid identifier for the value
846  * @param priority how much to increase the priority of the value
847  * @param expiration new expiration value should be MAX of existing and this argument
848  * @param queue_priority ranking of this request in the priority queue
849  * @param max_queue_size at what queue size should this request be dropped
850  *        (if other requests of higher priority are in the queue)
851  * @param timeout how long to wait at most for a response
852  * @param cont continuation to call when done
853  * @param cont_cls closure for cont
854  * @return NULL if the entry was not queued, otherwise a handle that can be used to
855  *         cancel; note that even if NULL is returned, the callback will be invoked
856  *         (or rather, will already have been invoked)
857  */
858 struct GNUNET_DATASTORE_QueueEntry *
859 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
860                          unsigned long long uid,
861                          uint32_t priority,
862                          struct GNUNET_TIME_Absolute expiration,
863                          unsigned int queue_priority,
864                          unsigned int max_queue_size,
865                          struct GNUNET_TIME_Relative timeout,
866                          GNUNET_DATASTORE_ContinuationWithStatus cont,
867                          void *cont_cls)
868 {
869   struct GNUNET_DATASTORE_QueueEntry *qe;
870   struct UpdateMessage *um;
871   struct StatusContext *scont;
872
873   if (cont == NULL)
874     cont = &drop_status_cont;
875 #if DEBUG_DATASTORE
876   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
877               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
878               uid,
879               (unsigned int) priority,
880               (unsigned long long) expiration.value);
881 #endif
882   scont = GNUNET_malloc (sizeof (struct StatusContext));
883   scont->cont = cont;
884   scont->cont_cls = cont_cls;
885   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
886                          queue_priority, max_queue_size, timeout,
887                          &process_status_message, scont);
888   if (qe == NULL)
889     return NULL;
890   um = (struct UpdateMessage*) &qe[1];
891   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
892   um->header.size = htons(sizeof (struct UpdateMessage));
893   um->priority = htonl(priority);
894   um->expiration = GNUNET_TIME_absolute_hton(expiration);
895   um->uid = GNUNET_htonll(uid);
896   process_queue (h);
897   return qe;
898 }
899
900
901 /**
902  * Explicitly remove some content from the database.
903  * The "cont"inuation will be called with status
904  * "GNUNET_OK" if content was removed, "GNUNET_NO"
905  * if no matching entry was found and "GNUNET_SYSERR"
906  * on all other types of errors.
907  *
908  * @param h handle to the datastore
909  * @param key key for the value
910  * @param size number of bytes in data
911  * @param data content stored
912  * @param queue_priority ranking of this request in the priority queue
913  * @param max_queue_size at what queue size should this request be dropped
914  *        (if other requests of higher priority are in the queue)
915  * @param timeout how long to wait at most for a response
916  * @param cont continuation to call when done
917  * @param cont_cls closure for cont
918  * @return NULL if the entry was not queued, otherwise a handle that can be used to
919  *         cancel; note that even if NULL is returned, the callback will be invoked
920  *         (or rather, will already have been invoked)
921  */
922 struct GNUNET_DATASTORE_QueueEntry *
923 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
924                          const GNUNET_HashCode *key,
925                          uint32_t size, 
926                          const void *data,
927                          unsigned int queue_priority,
928                          unsigned int max_queue_size,
929                          struct GNUNET_TIME_Relative timeout,
930                          GNUNET_DATASTORE_ContinuationWithStatus cont,
931                          void *cont_cls)
932 {
933   struct GNUNET_DATASTORE_QueueEntry *qe;
934   struct DataMessage *dm;
935   size_t msize;
936   struct StatusContext *scont;
937
938   if (cont == NULL)
939     cont = &drop_status_cont;
940 #if DEBUG_DATASTORE
941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
942               "Asked to remove %u bytes under key `%s'\n",
943               size,
944               GNUNET_h2s (key));
945 #endif
946   scont = GNUNET_malloc (sizeof (struct StatusContext));
947   scont->cont = cont;
948   scont->cont_cls = cont_cls;
949   msize = sizeof(struct DataMessage) + size;
950   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
951   qe = make_queue_entry (h, msize,
952                          queue_priority, max_queue_size, timeout,
953                          &process_status_message, scont);
954   if (qe == NULL)
955     return NULL;
956   dm = (struct DataMessage*) &qe[1];
957   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
958   dm->header.size = htons(msize);
959   dm->rid = htonl(0);
960   dm->size = htonl(size);
961   dm->type = htonl(0);
962   dm->priority = htonl(0);
963   dm->anonymity = htonl(0);
964   dm->uid = GNUNET_htonll(0);
965   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
966   dm->key = *key;
967   memcpy (&dm[1], data, size);
968   process_queue (h);
969   return qe;
970 }
971
972
973
974 /**
975  * Context for processing result messages.
976  */
977 struct ResultContext
978 {
979   /**
980    * Iterator to call with the result.
981    */
982   GNUNET_DATASTORE_Iterator iter;
983
984   /**
985    * Closure for iter.
986    */
987   void *iter_cls;
988
989 };
990
991
992 /**
993  * Type of a function to call when we receive a message
994  * from the service.
995  *
996  * @param cls closure
997  * @param msg message received, NULL on timeout or fatal error
998  */
999 static void 
1000 process_result_message (void *cls,
1001                         const struct GNUNET_MessageHeader * msg)
1002 {
1003   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1004   struct GNUNET_DATASTORE_Handle *h = qe->h;
1005   struct ResultContext *rc = qe->client_ctx;
1006   const struct DataMessage *dm;
1007
1008   GNUNET_assert (h->queue_head == qe);
1009   if (msg == NULL)
1010     {
1011 #if DEBUG_DATASTORE
1012       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1013                   _("Failed to receive response from datastore\n"));
1014 #endif
1015       free_queue_entry (qe);
1016       do_disconnect (h);
1017       rc->iter (rc->iter_cls,
1018                 NULL, 0, NULL, 0, 0, 0, 
1019                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1020       GNUNET_free (rc);
1021       return;
1022     }
1023   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1024     {
1025       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1026 #if DEBUG_DATASTORE
1027       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028                   "Received end of result set\n");
1029 #endif
1030       free_queue_entry (qe);
1031       rc->iter (rc->iter_cls,
1032                 NULL, 0, NULL, 0, 0, 0, 
1033                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1034       GNUNET_free (rc);
1035       process_queue (h);
1036       return;
1037     }
1038   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1039        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1040        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1041     {
1042       GNUNET_break (0);
1043       free_queue_entry (qe);
1044       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1045       do_disconnect (h);
1046       rc->iter (rc->iter_cls,
1047                 NULL, 0, NULL, 0, 0, 0, 
1048                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1049       GNUNET_free (rc);
1050       return;
1051     }
1052   dm = (const struct DataMessage*) msg;
1053 #if DEBUG_DATASTORE
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Received result %llu with type %u and size %u with key %s\n",
1056               (unsigned long long) GNUNET_ntohll(dm->uid),
1057               ntohl(dm->type),
1058               ntohl(dm->size),
1059               GNUNET_h2s(&dm->key));
1060 #endif
1061   rc->iter (rc->iter_cls,
1062             &dm->key,
1063             ntohl(dm->size),
1064             &dm[1],
1065             ntohl(dm->type),
1066             ntohl(dm->priority),
1067             ntohl(dm->anonymity),
1068             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1069             GNUNET_ntohll(dm->uid));
1070 }
1071
1072
1073 /**
1074  * Get a random value from the datastore.
1075  *
1076  * @param h handle to the datastore
1077  * @param queue_priority ranking of this request in the priority queue
1078  * @param max_queue_size at what queue size should this request be dropped
1079  *        (if other requests of higher priority are in the queue)
1080  * @param timeout how long to wait at most for a response
1081  * @param iter function to call on a random value; it
1082  *        will be called once with a value (if available)
1083  *        and always once with a value of NULL.
1084  * @param iter_cls closure for iter
1085  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1086  *         cancel; note that even if NULL is returned, the callback will be invoked
1087  *         (or rather, will already have been invoked)
1088  */
1089 struct GNUNET_DATASTORE_QueueEntry *
1090 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1091                              unsigned int queue_priority,
1092                              unsigned int max_queue_size,
1093                              struct GNUNET_TIME_Relative timeout,
1094                              GNUNET_DATASTORE_Iterator iter, 
1095                              void *iter_cls)
1096 {
1097   struct GNUNET_DATASTORE_QueueEntry *qe;
1098   struct GNUNET_MessageHeader *m;
1099   struct ResultContext *rcont;
1100
1101 #if DEBUG_DATASTORE
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Asked to get random entry in %llu ms\n",
1104               (unsigned long long) timeout.value);
1105 #endif
1106   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1107   rcont->iter = iter;
1108   rcont->iter_cls = iter_cls;
1109   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1110                          queue_priority, max_queue_size, timeout,
1111                          &process_result_message, rcont);
1112   if (qe == NULL)
1113     return NULL;    
1114   m = (struct GNUNET_MessageHeader*) &qe[1];
1115   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1116   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1117   process_queue (h);
1118   return qe;
1119 }
1120
1121
1122
1123 /**
1124  * Iterate over the results for a particular key
1125  * in the datastore.  The iterator will only be called
1126  * once initially; if the first call did contain a
1127  * result, further results can be obtained by calling
1128  * "GNUNET_DATASTORE_get_next" with the given argument.
1129  *
1130  * @param h handle to the datastore
1131  * @param key maybe NULL (to match all entries)
1132  * @param type desired type, 0 for any
1133  * @param queue_priority ranking of this request in the priority queue
1134  * @param max_queue_size at what queue size should this request be dropped
1135  *        (if other requests of higher priority are in the queue)
1136  * @param timeout how long to wait at most for a response
1137  * @param iter function to call on each matching value;
1138  *        will be called once with a NULL value at the end
1139  * @param iter_cls closure for iter
1140  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141  *         cancel; note that even if NULL is returned, the callback will be invoked
1142  *         (or rather, will already have been invoked)
1143  */
1144 struct GNUNET_DATASTORE_QueueEntry *
1145 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1146                       const GNUNET_HashCode * key,
1147                       enum GNUNET_BLOCK_Type type,
1148                       unsigned int queue_priority,
1149                       unsigned int max_queue_size,
1150                       struct GNUNET_TIME_Relative timeout,
1151                       GNUNET_DATASTORE_Iterator iter, 
1152                       void *iter_cls)
1153 {
1154   struct GNUNET_DATASTORE_QueueEntry *qe;
1155   struct GetMessage *gm;
1156   struct ResultContext *rcont;
1157
1158 #if DEBUG_DATASTORE
1159   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1160               "Asked to look for data of type %u under key `%s'\n",
1161               (unsigned int) type,
1162               GNUNET_h2s (key));
1163 #endif
1164   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1165   rcont->iter = iter;
1166   rcont->iter_cls = iter_cls;
1167   qe = make_queue_entry (h, sizeof(struct GetMessage),
1168                          queue_priority, max_queue_size, timeout,
1169                          &process_result_message, rcont);
1170   if (qe == NULL)
1171     return NULL;
1172   gm = (struct GetMessage*) &qe[1];
1173   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1174   gm->type = htonl(type);
1175   if (key != NULL)
1176     {
1177       gm->header.size = htons(sizeof (struct GetMessage));
1178       gm->key = *key;
1179     }
1180   else
1181     {
1182       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1183     }
1184   process_queue (h);
1185   return qe;
1186 }
1187
1188
1189 /**
1190  * Function called to trigger obtaining the next result
1191  * from the datastore.
1192  * 
1193  * @param h handle to the datastore
1194  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1195  *        iteration (with a final call to "iter" with key/data == NULL).
1196  */
1197 void 
1198 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1199                            int more)
1200 {
1201   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1202   struct ResultContext *rc = qe->client_ctx;
1203
1204   GNUNET_assert (NULL != qe);
1205   GNUNET_assert (&process_result_message == qe->response_proc);
1206   if (GNUNET_YES == more)
1207     {     
1208       GNUNET_CLIENT_receive (h->client,
1209                              qe->response_proc,
1210                              qe,
1211                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1212       return;
1213     }
1214   free_queue_entry (qe);
1215   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1216   do_disconnect (h);
1217   rc->iter (rc->iter_cls,
1218             NULL, 0, NULL, 0, 0, 0, 
1219             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1220   GNUNET_free (rc);
1221 }
1222
1223
1224 /**
1225  * Cancel a datastore operation.  The final callback from the
1226  * operation must not have been done yet.
1227  * 
1228  * @param qe operation to cancel
1229  */
1230 void
1231 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1232 {
1233   struct GNUNET_DATASTORE_Handle *h;
1234   int reconnect;
1235
1236   h = qe->h;
1237   reconnect = qe->was_transmitted;
1238   free_queue_entry (qe);
1239   h->queue_size--;
1240   if (reconnect)
1241     {
1242       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1243       do_disconnect (h);
1244     }
1245 }
1246
1247
1248 /* end of datastore_api.c */