ef3dccbf670fff587eede1f5c9bf9a514dad3257
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus contX;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_clsX;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct QueueEntry *qe;
244
245   if (h->client != NULL)
246     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
247   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
248     GNUNET_SCHEDULER_cancel (h->sched,
249                              h->reconnect_task);
250   h->client = NULL;
251   while (NULL != (qe = h->queue_head))
252     {
253       GNUNET_CONTAINER_DLL_remove (h->queue_head,
254                                    h->queue_tail,
255                                    qe);
256       if (NULL != qe->response_proc)
257         qe->response_proc (qe, NULL);
258       GNUNET_free (qe);
259     }
260   if (GNUNET_YES == drop) 
261     {
262       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
263       if (h->client != NULL)
264         {
265           if (NULL != 
266               GNUNET_CLIENT_notify_transmit_ready (h->client,
267                                                    sizeof(struct GNUNET_MessageHeader),
268                                                    GNUNET_TIME_UNIT_MINUTES,
269                                                    GNUNET_YES,
270                                                    &transmit_drop,
271                                                    h))
272             return;
273           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
274         }
275       GNUNET_break (0);
276     }
277   GNUNET_free (h);
278 }
279
280
281 /**
282  * A request has timed out (before being transmitted to the service).
283  *
284  * @param cls the 'struct QueueEntry'
285  * @param tc scheduler context
286  */
287 static void
288 timeout_queue_entry (void *cls,
289                      const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct QueueEntry *qe = cls;
292
293   qe->task = GNUNET_SCHEDULER_NO_TASK;
294   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
295   qe->response_proc (qe, NULL);
296 }
297
298
299 /**
300  * Create a new entry for our priority queue (and possibly discard other entires if
301  * the queue is getting too long).
302  *
303  * @param h handle to the datastore
304  * @param msize size of the message to queue
305  * @param queue_priority priority of the entry
306  * @param max_queue_size at what queue size should this request be dropped
307  *        (if other requests of higher priority are in the queue)
308  * @param timeout timeout for the operation
309  * @param response_proc function to call with replies (can be NULL)
310  * @param client_ctx client context (NOT a closure for response_proc)
311  * @return NULL if the queue is full (and this entry was dropped)
312  */
313 static struct QueueEntry *
314 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
315                   size_t msize,
316                   unsigned int queue_priority,
317                   unsigned int max_queue_size,
318                   struct GNUNET_TIME_Relative timeout,
319                   GNUNET_CLIENT_MessageHandler response_proc,            
320                   void *client_ctx)
321 {
322   struct QueueEntry *ret;
323   struct QueueEntry *pos;
324   unsigned int c;
325
326   c = 0;
327   pos = h->queue_head;
328   while ( (pos != NULL) &&
329           (c < max_queue_size) &&
330           (pos->priority >= queue_priority) )
331     {
332       c++;
333       pos = pos->next;
334     }
335   if (c >= max_queue_size)
336     return NULL;
337   if (pos == NULL)
338     {
339       /* append at the tail */
340       pos = h->queue_tail;
341     }
342   else
343     {
344       pos = pos->prev; 
345       /* do not insert at HEAD if HEAD query was already
346          transmitted and we are still receiving replies! */
347       if ( (pos == NULL) &&
348            (h->queue_head->was_transmitted) )
349         pos = h->queue_head;
350     }
351   ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize);
352   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
353                                      h->queue_tail,
354                                      pos,
355                                      ret);
356   ret->h = h;
357   ret->response_proc = response_proc;
358   ret->client_ctx = client_ctx;
359   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
360                                             timeout,
361                                             &timeout_queue_entry,
362                                             ret);
363   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
364   ret->priority = queue_priority;
365   ret->max_queue = max_queue_size;
366   ret->message_size = msize;
367   ret->was_transmitted = GNUNET_NO;
368   h->queue_size++;
369   c++;
370   pos = ret->next;
371   while (pos != NULL) 
372     {
373       if (pos->max_queue < h->queue_size)
374         {
375           GNUNET_CONTAINER_DLL_remove (h->queue_head,
376                                        h->queue_tail,
377                                        pos);
378           GNUNET_SCHEDULER_cancel (h->sched,
379                                    pos->task);
380           if (pos->response_proc != NULL)
381             pos->response_proc (pos, NULL);
382           GNUNET_free (pos);
383           h->queue_size--;
384           break;
385         }
386       pos = pos->next;
387     }
388   return ret;
389 }
390
391
392 /**
393  * Process entries in the queue (or do nothing if we are already
394  * doing so).
395  * 
396  * @param h handle to the datastore
397  */
398 static void
399 process_queue (struct GNUNET_DATASTORE_Handle *h);
400
401
402 /**
403  * Try reconnecting to the datastore service.
404  *
405  * @param cls the 'struct GNUNET_DATASTORE_Handle'
406  * @param tc scheduler context
407  */
408 static void
409 try_reconnect (void *cls,
410                const struct GNUNET_SCHEDULER_TaskContext *tc)
411 {
412   struct GNUNET_DATASTORE_Handle *h = cls;
413
414   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
415     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
416   else
417     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
418   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
419     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
420   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
421   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
422   if (h->client == NULL)
423     return;
424   process_queue (h);
425 }
426
427
428 /**
429  * Disconnect from the service and then try reconnecting to the datastore service
430  * after some delay.
431  *
432  * @param cls the 'struct GNUNET_DATASTORE_Handle'
433  * @param tc scheduler context
434  */
435 static void
436 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
437 {
438   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
439   h->client = NULL;
440   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
441                                                     h->retry_time,
442                                                     &try_reconnect,
443                                                     h);      
444 }
445
446
447 /**
448  * Transmit request from queue to datastore service.
449  *
450  * @param cls the 'struct GNUNET_DATASTORE_Handle'
451  * @param size number of bytes that can be copied to buf
452  * @param buf where to copy the drop message
453  * @return number of bytes written to buf
454  */
455 static size_t
456 transmit_request (void *cls,
457                   size_t size, 
458                   void *buf)
459 {
460   struct GNUNET_DATASTORE_Handle *h = cls;
461   struct QueueEntry *qe;
462   size_t msize;
463
464   h->th = NULL;
465   if (NULL == (qe = h->queue_head))
466     return 0; /* no entry in queue */
467   if (buf == NULL)
468     {
469       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
470                   _("Failed to transmit request to database.\n"));
471       do_disconnect (h);
472       return 0;
473     }
474   if (size < (msize = qe->message_size))
475     {
476       process_queue (h);
477       return 0;
478     }
479   memcpy (buf, &qe[1], msize);
480   qe->was_transmitted = GNUNET_YES;
481   GNUNET_SCHEDULER_cancel (h->sched,
482                            qe->task);
483   qe->task = GNUNET_SCHEDULER_NO_TASK;
484   GNUNET_CLIENT_receive (h->client,
485                          qe->response_proc,
486                          qe,
487                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
488   return msize;
489 }
490
491
492 /**
493  * Process entries in the queue (or do nothing if we are already
494  * doing so).
495  * 
496  * @param h handle to the datastore
497  */
498 static void
499 process_queue (struct GNUNET_DATASTORE_Handle *h)
500 {
501   struct QueueEntry *qe;
502
503   if (NULL == (qe = h->queue_head))
504     return; /* no entry in queue */
505   if (qe->was_transmitted == GNUNET_YES)
506     return; /* waiting for replies */
507   if (h->th != NULL)
508     return; /* request pending */
509   if (h->client == NULL)
510     return; /* waiting for reconnect */
511   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
512                                                qe->message_size,
513                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
514                                                GNUNET_YES,
515                                                &transmit_request,
516                                                h);
517 }
518
519
520
521
522 /**
523  * Context for processing status messages.
524  */
525 struct StatusContext
526 {
527   /**
528    * Continuation to call with the status.
529    */
530   GNUNET_DATASTORE_ContinuationWithStatus cont;
531
532   /**
533    * Closure for cont.
534    */
535   void *cont_cls;
536
537 };
538
539
540 /**
541  * Dummy continuation used to do nothing (but be non-zero).
542  *
543  * @param cls closure
544  * @param result result 
545  * @param emsg error message
546  */
547 static void
548 drop_status_cont (void *cls, int result, const char *emsg)
549 {
550   /* do nothing */
551 }
552
553
554 /**
555  * Type of a function to call when we receive a message
556  * from the service.
557  *
558  * @param cls closure
559  * @param msg message received, NULL on timeout or fatal error
560  */
561 static void 
562 process_status_message (void *cls,
563                         const struct
564                         GNUNET_MessageHeader * msg)
565 {
566   struct QueueEntry *qe = cls;
567   struct GNUNET_DATASTORE_Handle *h = qe->h;
568   struct StatusContext *rc = qe->client_ctx;
569   const struct StatusMessage *sm;
570   const char *emsg;
571   int32_t status;
572
573   GNUNET_CONTAINER_DLL_remove (h->queue_head,
574                                h->queue_tail,
575                                qe);
576   GNUNET_free (qe);
577   if (msg == NULL)
578     {
579       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
580                   _("Failed to receive response from database.\n"));
581       do_disconnect (h);
582       return;
583     }
584
585   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
586        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
587     {
588       GNUNET_break (0);
589       h->retry_time = GNUNET_TIME_UNIT_ZERO;
590       do_disconnect (h);
591       rc->cont (rc->cont_cls, 
592                 GNUNET_SYSERR,
593                 _("Error reading response from datastore service"));
594       GNUNET_free (rc);
595       return;
596     }
597   sm = (const struct StatusMessage*) msg;
598   status = ntohl(sm->status);
599   emsg = NULL;
600   if (ntohs(msg->size) > sizeof(struct StatusMessage))
601     {
602       emsg = (const char*) &sm[1];
603       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
604         {
605           GNUNET_break (0);
606           emsg = _("Invalid error message received from datastore service");
607         }
608     }  
609   if ( (status == GNUNET_SYSERR) &&
610        (emsg == NULL) )
611     {
612       GNUNET_break (0);
613       emsg = _("Invalid error message received from datastore service");
614     }
615 #if DEBUG_DATASTORE
616   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
617               "Received status %d/%s\n",
618               (int) status,
619               emsg);
620 #endif
621   rc->cont (rc->cont_cls, 
622             status,
623             emsg);
624   GNUNET_free (rc);  
625   process_queue (h);
626 }
627
628
629 /**
630  * Store an item in the datastore.  If the item is already present,
631  * the priorities are summed up and the higher expiration time and
632  * lower anonymity level is used.
633  *
634  * @param h handle to the datastore
635  * @param rid reservation ID to use (from "reserve"); use 0 if no
636  *            prior reservation was made
637  * @param key key for the value
638  * @param size number of bytes in data
639  * @param data content stored
640  * @param type type of the content
641  * @param priority priority of the content
642  * @param anonymity anonymity-level for the content
643  * @param expiration expiration time for the content
644  * @param queue_priority ranking of this request in the priority queue
645  * @param max_queue_size at what queue size should this request be dropped
646  *        (if other requests of higher priority are in the queue)
647  * @param timeout timeout for the operation
648  * @param cont continuation to call when done
649  * @param cont_cls closure for cont
650  */
651 void
652 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
653                       int rid,
654                       const GNUNET_HashCode * key,
655                       uint32_t size,
656                       const void *data,
657                       enum GNUNET_BLOCK_Type type,
658                       uint32_t priority,
659                       uint32_t anonymity,
660                       struct GNUNET_TIME_Absolute expiration,
661                       unsigned int queue_priority,
662                       unsigned int max_queue_size,
663                       struct GNUNET_TIME_Relative timeout,
664                       GNUNET_DATASTORE_ContinuationWithStatus cont,
665                       void *cont_cls)
666 {
667   struct StatusContext *scont;
668   struct QueueEntry *qe;
669   struct DataMessage *dm;
670   size_t msize;
671
672 #if DEBUG_DATASTORE
673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674               "Asked to put %u bytes of data under key `%s'\n",
675               size,
676               GNUNET_h2s (key));
677 #endif
678   msize = sizeof(struct DataMessage) + size;
679   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
680   scont = GNUNET_malloc (sizeof (struct StatusContext));
681   scont->cont = cont;
682   scont->cont_cls = cont_cls;
683   qe = make_queue_entry (h, msize,
684                          queue_priority, max_queue_size, timeout,
685                          &process_status_message, scont);
686   if (qe == NULL)
687     return;
688   dm = (struct DataMessage* ) &qe[1];
689   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
690   dm->header.size = htons(msize);
691   dm->rid = htonl(rid);
692   dm->size = htonl(size);
693   dm->type = htonl(type);
694   dm->priority = htonl(priority);
695   dm->anonymity = htonl(anonymity);
696   dm->uid = GNUNET_htonll(0);
697   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
698   dm->key = *key;
699   memcpy (&dm[1], data, size);
700   process_queue (h);
701 }
702
703
704 /**
705  * Reserve space in the datastore.  This function should be used
706  * to avoid "out of space" failures during a longer sequence of "put"
707  * operations (for example, when a file is being inserted).
708  *
709  * @param h handle to the datastore
710  * @param amount how much space (in bytes) should be reserved (for content only)
711  * @param entries how many entries will be created (to calculate per-entry overhead)
712  * @param queue_priority ranking of this request in the priority queue
713  * @param max_queue_size at what queue size should this request be dropped
714  *        (if other requests of higher priority are in the queue)
715  * @param timeout how long to wait at most for a response (or before dying in queue)
716  * @param cont continuation to call when done; "success" will be set to
717  *             a positive reservation value if space could be reserved.
718  * @param cont_cls closure for cont
719  */
720 void
721 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
722                           uint64_t amount,
723                           uint32_t entries,
724                           unsigned int queue_priority,
725                           unsigned int max_queue_size,
726                           struct GNUNET_TIME_Relative timeout,
727                           GNUNET_DATASTORE_ContinuationWithStatus cont,
728                           void *cont_cls)
729 {
730   struct QueueEntry *qe;
731   struct ReserveMessage *rm;
732   struct StatusContext *scont;
733
734   if (cont == NULL)
735     cont = &drop_status_cont;
736 #if DEBUG_DATASTORE
737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
738               "Asked to reserve %llu bytes of data and %u entries'\n",
739               (unsigned long long) amount,
740               (unsigned int) entries);
741 #endif
742   scont = GNUNET_malloc (sizeof (struct StatusContext));
743   scont->cont = cont;
744   scont->cont_cls = cont_cls;
745   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
746                          queue_priority, max_queue_size, timeout,
747                          &process_status_message, scont);
748   if (qe == NULL)
749     return;
750   rm = (struct ReserveMessage*) &qe[1];
751   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
752   rm->header.size = htons(sizeof (struct ReserveMessage));
753   rm->entries = htonl(entries);
754   rm->amount = GNUNET_htonll(amount);
755   process_queue (h);
756 }
757
758
759 /**
760  * Signal that all of the data for which a reservation was made has
761  * been stored and that whatever excess space might have been reserved
762  * can now be released.
763  *
764  * @param h handle to the datastore
765  * @param rid reservation ID (value of "success" in original continuation
766  *        from the "reserve" function).
767  * @param queue_priority ranking of this request in the priority queue
768  * @param max_queue_size at what queue size should this request be dropped
769  *        (if other requests of higher priority are in the queue)
770  * @param queue_priority ranking of this request in the priority queue
771  * @param max_queue_size at what queue size should this request be dropped
772  *        (if other requests of higher priority are in the queue)
773  * @param timeout how long to wait at most for a response
774  * @param cont continuation to call when done
775  * @param cont_cls closure for cont
776  */
777 void
778 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
779                                   int rid,
780                                   unsigned int queue_priority,
781                                   unsigned int max_queue_size,
782                                   struct GNUNET_TIME_Relative timeout,
783                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
784                                   void *cont_cls)
785 {
786   struct QueueEntry *qe;
787   struct ReleaseReserveMessage *rrm;
788   struct StatusContext *scont;
789
790   if (cont == NULL)
791     cont = &drop_status_cont;
792 #if DEBUG_DATASTORE
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Asked to release reserve %d\n",
795               rid);
796 #endif
797   scont = GNUNET_malloc (sizeof (struct StatusContext));
798   scont->cont = cont;
799   scont->cont_cls = cont_cls;
800   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
801                          queue_priority, max_queue_size, timeout,
802                          &process_status_message, scont);
803   if (qe == NULL)
804     return;
805   rrm = (struct ReleaseReserveMessage*) &qe[1];
806   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
807   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
808   rrm->rid = htonl(rid);
809   process_queue (h);
810 }
811
812
813 /**
814  * Update a value in the datastore.
815  *
816  * @param h handle to the datastore
817  * @param uid identifier for the value
818  * @param priority how much to increase the priority of the value
819  * @param expiration new expiration value should be MAX of existing and this argument
820  * @param queue_priority ranking of this request in the priority queue
821  * @param max_queue_size at what queue size should this request be dropped
822  *        (if other requests of higher priority are in the queue)
823  * @param timeout how long to wait at most for a response
824  * @param cont continuation to call when done
825  * @param cont_cls closure for cont
826  */
827 void
828 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
829                          unsigned long long uid,
830                          uint32_t priority,
831                          struct GNUNET_TIME_Absolute expiration,
832                          unsigned int queue_priority,
833                          unsigned int max_queue_size,
834                          struct GNUNET_TIME_Relative timeout,
835                          GNUNET_DATASTORE_ContinuationWithStatus cont,
836                          void *cont_cls)
837 {
838   struct QueueEntry *qe;
839   struct UpdateMessage *um;
840   struct StatusContext *scont;
841
842   if (cont == NULL)
843     cont = &drop_status_cont;
844 #if DEBUG_DATASTORE
845   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
847               uid,
848               (unsigned int) priority,
849               (unsigned long long) expiration.value);
850 #endif
851   scont = GNUNET_malloc (sizeof (struct StatusContext));
852   scont->cont = cont;
853   scont->cont_cls = cont_cls;
854   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
855                          queue_priority, max_queue_size, timeout,
856                          &process_status_message, scont);
857   if (qe == NULL)
858     return;
859   um = (struct UpdateMessage*) &qe[1];
860   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
861   um->header.size = htons(sizeof (struct UpdateMessage));
862   um->priority = htonl(priority);
863   um->expiration = GNUNET_TIME_absolute_hton(expiration);
864   um->uid = GNUNET_htonll(uid);
865   process_queue (h);
866 }
867
868
869 /**
870  * Explicitly remove some content from the database.
871  * The "cont"inuation will be called with status
872  * "GNUNET_OK" if content was removed, "GNUNET_NO"
873  * if no matching entry was found and "GNUNET_SYSERR"
874  * on all other types of errors.
875  *
876  * @param h handle to the datastore
877  * @param key key for the value
878  * @param size number of bytes in data
879  * @param data content stored
880  * @param queue_priority ranking of this request in the priority queue
881  * @param max_queue_size at what queue size should this request be dropped
882  *        (if other requests of higher priority are in the queue)
883  * @param timeout how long to wait at most for a response
884  * @param cont continuation to call when done
885  * @param cont_cls closure for cont
886  */
887 void
888 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
889                          const GNUNET_HashCode *key,
890                          uint32_t size, 
891                          const void *data,
892                          unsigned int queue_priority,
893                          unsigned int max_queue_size,
894                          struct GNUNET_TIME_Relative timeout,
895                          GNUNET_DATASTORE_ContinuationWithStatus cont,
896                          void *cont_cls)
897 {
898   struct QueueEntry *qe;
899   struct DataMessage *dm;
900   size_t msize;
901   struct StatusContext *scont;
902
903   if (cont == NULL)
904     cont = &drop_status_cont;
905 #if DEBUG_DATASTORE
906   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
907               "Asked to remove %u bytes under key `%s'\n",
908               size,
909               GNUNET_h2s (key));
910 #endif
911   scont = GNUNET_malloc (sizeof (struct StatusContext));
912   scont->cont = cont;
913   scont->cont_cls = cont_cls;
914   msize = sizeof(struct DataMessage) + size;
915   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
916   qe = make_queue_entry (h, msize,
917                          queue_priority, max_queue_size, timeout,
918                          &process_status_message, scont);
919   if (qe == NULL)
920     return;
921   dm = (struct DataMessage*) &qe[1];
922   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
923   dm->header.size = htons(msize);
924   dm->rid = htonl(0);
925   dm->size = htonl(size);
926   dm->type = htonl(0);
927   dm->priority = htonl(0);
928   dm->anonymity = htonl(0);
929   dm->uid = GNUNET_htonll(0);
930   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
931   dm->key = *key;
932   memcpy (&dm[1], data, size);
933   process_queue (h);
934 }
935
936
937
938 /**
939  * Context for processing result messages.
940  */
941 struct ResultContext
942 {
943   /**
944    * Iterator to call with the result.
945    */
946   GNUNET_DATASTORE_Iterator iter;
947
948   /**
949    * Closure for iter.
950    */
951   void *iter_cls;
952
953 };
954
955
956 /**
957  * Type of a function to call when we receive a message
958  * from the service.
959  *
960  * @param cls closure
961  * @param msg message received, NULL on timeout or fatal error
962  */
963 static void 
964 process_result_message (void *cls,
965                         const struct GNUNET_MessageHeader * msg)
966 {
967   struct QueueEntry *qe = cls;
968   struct GNUNET_DATASTORE_Handle *h = qe->h;
969   struct ResultContext *rc = qe->client_ctx;
970   const struct DataMessage *dm;
971
972   if (msg == NULL)
973     {
974 #if DEBUG_DATASTORE
975       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
976                   _("Failed to receive response from datastore\n"));
977 #endif
978       GNUNET_CONTAINER_DLL_remove (h->queue_head,
979                                    h->queue_tail,
980                                    qe);
981       GNUNET_free (qe);
982       do_disconnect (h);
983       rc->iter (rc->iter_cls,
984                 NULL, 0, NULL, 0, 0, 0, 
985                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
986       GNUNET_free (rc);
987       return;
988     }
989   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
990     {
991       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
992 #if DEBUG_DATASTORE
993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
994                   "Received end of result set\n");
995 #endif
996       GNUNET_CONTAINER_DLL_remove (h->queue_head,
997                                    h->queue_tail,
998                                    qe);
999       GNUNET_free (qe);
1000       rc->iter (rc->iter_cls,
1001                 NULL, 0, NULL, 0, 0, 0, 
1002                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1003       GNUNET_free (rc);
1004       process_queue (h);
1005       return;
1006     }
1007   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1008        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1009        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1010     {
1011       GNUNET_break (0);
1012       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1013                                    h->queue_tail,
1014                                    qe);
1015       GNUNET_free (qe);
1016       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1017       do_disconnect (h);
1018       rc->iter (rc->iter_cls,
1019                 NULL, 0, NULL, 0, 0, 0, 
1020                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1021       GNUNET_free (rc);
1022       return;
1023     }
1024   dm = (const struct DataMessage*) msg;
1025 #if DEBUG_DATASTORE
1026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027               "Received result %llu with type %u and size %u with key %s\n",
1028               (unsigned long long) GNUNET_ntohll(dm->uid),
1029               ntohl(dm->type),
1030               ntohl(dm->size),
1031               GNUNET_h2s(&dm->key));
1032 #endif
1033   rc->iter (rc->iter_cls,
1034             &dm->key,
1035             ntohl(dm->size),
1036             &dm[1],
1037             ntohl(dm->type),
1038             ntohl(dm->priority),
1039             ntohl(dm->anonymity),
1040             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1041             GNUNET_ntohll(dm->uid));
1042 }
1043
1044
1045 /**
1046  * Get a random value from the datastore.
1047  *
1048  * @param h handle to the datastore
1049  * @param queue_priority ranking of this request in the priority queue
1050  * @param max_queue_size at what queue size should this request be dropped
1051  *        (if other requests of higher priority are in the queue)
1052  * @param timeout how long to wait at most for a response
1053  * @param iter function to call on a random value; it
1054  *        will be called once with a value (if available)
1055  *        and always once with a value of NULL.
1056  * @param iter_cls closure for iter
1057  */
1058 void
1059 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1060                              unsigned int queue_priority,
1061                              unsigned int max_queue_size,
1062                              struct GNUNET_TIME_Relative timeout,
1063                              GNUNET_DATASTORE_Iterator iter, 
1064                              void *iter_cls)
1065 {
1066   struct QueueEntry *qe;
1067   struct GNUNET_MessageHeader *m;
1068   struct ResultContext *rcont;
1069
1070 #if DEBUG_DATASTORE
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072               "Asked to get random entry in %llu ms\n",
1073               (unsigned long long) timeout.value);
1074 #endif
1075   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1076   rcont->iter = iter;
1077   rcont->iter_cls = iter_cls;
1078   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1079                          queue_priority, max_queue_size, timeout,
1080                          &process_result_message, rcont);
1081   if (qe == NULL)
1082     return;
1083   m = (struct GNUNET_MessageHeader*) &qe[1];
1084   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1085   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1086   process_queue (h);
1087 }
1088
1089
1090
1091 /**
1092  * Iterate over the results for a particular key
1093  * in the datastore.  The iterator will only be called
1094  * once initially; if the first call did contain a
1095  * result, further results can be obtained by calling
1096  * "GNUNET_DATASTORE_get_next" with the given argument.
1097  *
1098  * @param h handle to the datastore
1099  * @param key maybe NULL (to match all entries)
1100  * @param type desired type, 0 for any
1101  * @param queue_priority ranking of this request in the priority queue
1102  * @param max_queue_size at what queue size should this request be dropped
1103  *        (if other requests of higher priority are in the queue)
1104  * @param timeout how long to wait at most for a response
1105  * @param iter function to call on each matching value;
1106  *        will be called once with a NULL value at the end
1107  * @param iter_cls closure for iter
1108  */
1109 void
1110 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1111                       const GNUNET_HashCode * key,
1112                       enum GNUNET_BLOCK_Type type,
1113                       unsigned int queue_priority,
1114                       unsigned int max_queue_size,
1115                       struct GNUNET_TIME_Relative timeout,
1116                       GNUNET_DATASTORE_Iterator iter, 
1117                       void *iter_cls)
1118 {
1119   struct QueueEntry *qe;
1120   struct GetMessage *gm;
1121   struct ResultContext *rcont;
1122
1123 #if DEBUG_DATASTORE
1124   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1125               "Asked to look for data of type %u under key `%s'\n",
1126               (unsigned int) type,
1127               GNUNET_h2s (key));
1128 #endif
1129   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1130   rcont->iter = iter;
1131   rcont->iter_cls = iter_cls;
1132   qe = make_queue_entry (h, sizeof(struct GetMessage),
1133                          queue_priority, max_queue_size, timeout,
1134                          &process_result_message, rcont);
1135   if (qe == NULL)
1136     return;
1137   gm = (struct GetMessage*) &qe[1];
1138   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1139   gm->type = htonl(type);
1140   if (key != NULL)
1141     {
1142       gm->header.size = htons(sizeof (struct GetMessage));
1143       gm->key = *key;
1144     }
1145   else
1146     {
1147       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1148     }
1149   process_queue (h);
1150 }
1151
1152
1153 /**
1154  * Function called to trigger obtaining the next result
1155  * from the datastore.
1156  * 
1157  * @param h handle to the datastore
1158  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1159  *        iteration (with a final call to "iter" with key/data == NULL).
1160  */
1161 void 
1162 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1163                            int more)
1164 {
1165   struct QueueEntry *qe = h->queue_head;
1166   struct ResultContext *rc = qe->client_ctx;
1167
1168   GNUNET_assert (NULL != qe);
1169   GNUNET_assert (&process_result_message == qe->response_proc);
1170   if (GNUNET_YES == more)
1171     {     
1172       GNUNET_CLIENT_receive (h->client,
1173                              qe->response_proc,
1174                              qe,
1175                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1176       return;
1177     }
1178   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1179                                h->queue_tail,
1180                                qe);
1181   GNUNET_free (qe);
1182   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1183   do_disconnect (h);
1184   rc->iter (rc->iter_cls,
1185             NULL, 0, NULL, 0, 0, 0, 
1186             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1187   GNUNET_free (rc);
1188 }
1189
1190
1191 /* end of datastore_api.c */