16db19f943f7d6b7643e266b60b63b4ae9f14dfe
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct GNUNET_DATASTORE_QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct GNUNET_DATASTORE_QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct GNUNET_DATASTORE_QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct GNUNET_DATASTORE_QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct GNUNET_DATASTORE_QueueEntry *qe;
244
245   if (h->client != NULL)
246     {
247       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
248       h->client = NULL;
249     }
250   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
251     {
252       GNUNET_SCHEDULER_cancel (h->sched,
253                                h->reconnect_task);
254       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
255     }
256   while (NULL != (qe = h->queue_head))
257     {
258       GNUNET_assert (NULL != qe->response_proc);
259       qe->response_proc (qe, NULL);
260     }
261   if (GNUNET_YES == drop) 
262     {
263       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
264       if (h->client != NULL)
265         {
266           if (NULL != 
267               GNUNET_CLIENT_notify_transmit_ready (h->client,
268                                                    sizeof(struct GNUNET_MessageHeader),
269                                                    GNUNET_TIME_UNIT_MINUTES,
270                                                    GNUNET_YES,
271                                                    &transmit_drop,
272                                                    h))
273             return;
274           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
275         }
276       GNUNET_break (0);
277     }
278   GNUNET_free (h);
279 }
280
281
282 /**
283  * A request has timed out (before being transmitted to the service).
284  *
285  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
286  * @param tc scheduler context
287  */
288 static void
289 timeout_queue_entry (void *cls,
290                      const struct GNUNET_SCHEDULER_TaskContext *tc)
291 {
292   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
293
294   qe->task = GNUNET_SCHEDULER_NO_TASK;
295   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
296   qe->response_proc (qe, NULL);
297 }
298
299
300 /**
301  * Create a new entry for our priority queue (and possibly discard other entires if
302  * the queue is getting too long).
303  *
304  * @param h handle to the datastore
305  * @param msize size of the message to queue
306  * @param queue_priority priority of the entry
307  * @param max_queue_size at what queue size should this request be dropped
308  *        (if other requests of higher priority are in the queue)
309  * @param timeout timeout for the operation
310  * @param response_proc function to call with replies (can be NULL)
311  * @param client_ctx client context (NOT a closure for response_proc)
312  * @return NULL if the queue is full (and this entry was dropped)
313  */
314 static struct GNUNET_DATASTORE_QueueEntry *
315 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
316                   size_t msize,
317                   unsigned int queue_priority,
318                   unsigned int max_queue_size,
319                   struct GNUNET_TIME_Relative timeout,
320                   GNUNET_CLIENT_MessageHandler response_proc,            
321                   void *client_ctx)
322 {
323   struct GNUNET_DATASTORE_QueueEntry *ret;
324   struct GNUNET_DATASTORE_QueueEntry *pos;
325   unsigned int c;
326
327   c = 0;
328   pos = h->queue_head;
329   while ( (pos != NULL) &&
330           (c < max_queue_size) &&
331           (pos->priority >= queue_priority) )
332     {
333       c++;
334       pos = pos->next;
335     }
336   if (c >= max_queue_size)
337     return NULL;
338   if (pos == NULL)
339     {
340       /* append at the tail */
341       pos = h->queue_tail;
342     }
343   else
344     {
345       pos = pos->prev; 
346       /* do not insert at HEAD if HEAD query was already
347          transmitted and we are still receiving replies! */
348       if ( (pos == NULL) &&
349            (h->queue_head->was_transmitted) )
350         pos = h->queue_head;
351     }
352   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
353   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
354                                      h->queue_tail,
355                                      pos,
356                                      ret);
357   ret->h = h;
358   ret->response_proc = response_proc;
359   ret->client_ctx = client_ctx;
360   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
361                                             timeout,
362                                             &timeout_queue_entry,
363                                             ret);
364   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
365   ret->priority = queue_priority;
366   ret->max_queue = max_queue_size;
367   ret->message_size = msize;
368   ret->was_transmitted = GNUNET_NO;
369   h->queue_size++;
370   c++;
371   pos = ret->next;
372   while (pos != NULL) 
373     {
374       if (pos->max_queue < h->queue_size)
375         {
376           GNUNET_assert (pos->response_proc != NULL);
377           pos->response_proc (pos, NULL);
378           break;
379         }
380       pos = pos->next;
381     }
382   return ret;
383 }
384
385
386 /**
387  * Process entries in the queue (or do nothing if we are already
388  * doing so).
389  * 
390  * @param h handle to the datastore
391  */
392 static void
393 process_queue (struct GNUNET_DATASTORE_Handle *h);
394
395
396 /**
397  * Try reconnecting to the datastore service.
398  *
399  * @param cls the 'struct GNUNET_DATASTORE_Handle'
400  * @param tc scheduler context
401  */
402 static void
403 try_reconnect (void *cls,
404                const struct GNUNET_SCHEDULER_TaskContext *tc)
405 {
406   struct GNUNET_DATASTORE_Handle *h = cls;
407
408   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
409     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
410   else
411     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
412   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
413     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
414   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
415   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
416   if (h->client == NULL)
417     return;
418   process_queue (h);
419 }
420
421
422 /**
423  * Disconnect from the service and then try reconnecting to the datastore service
424  * after some delay.
425  *
426  * @param h handle to datastore to disconnect and reconnect
427  */
428 static void
429 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
430 {
431   if (h->client == NULL)
432     return;
433   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
434   h->client = NULL;
435   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
436                                                     h->retry_time,
437                                                     &try_reconnect,
438                                                     h);      
439 }
440
441
442 /**
443  * Transmit request from queue to datastore service.
444  *
445  * @param cls the 'struct GNUNET_DATASTORE_Handle'
446  * @param size number of bytes that can be copied to buf
447  * @param buf where to copy the drop message
448  * @return number of bytes written to buf
449  */
450 static size_t
451 transmit_request (void *cls,
452                   size_t size, 
453                   void *buf)
454 {
455   struct GNUNET_DATASTORE_Handle *h = cls;
456   struct GNUNET_DATASTORE_QueueEntry *qe;
457   size_t msize;
458
459   h->th = NULL;
460   if (NULL == (qe = h->queue_head))
461     return 0; /* no entry in queue */
462   if (buf == NULL)
463     {
464       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
465                   _("Failed to transmit request to database.\n"));
466       do_disconnect (h);
467       return 0;
468     }
469   if (size < (msize = qe->message_size))
470     {
471       process_queue (h);
472       return 0;
473     }
474   memcpy (buf, &qe[1], msize);
475   qe->was_transmitted = GNUNET_YES;
476   GNUNET_SCHEDULER_cancel (h->sched,
477                            qe->task);
478   qe->task = GNUNET_SCHEDULER_NO_TASK;
479   GNUNET_CLIENT_receive (h->client,
480                          qe->response_proc,
481                          qe,
482                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
483   return msize;
484 }
485
486
487 /**
488  * Process entries in the queue (or do nothing if we are already
489  * doing so).
490  * 
491  * @param h handle to the datastore
492  */
493 static void
494 process_queue (struct GNUNET_DATASTORE_Handle *h)
495 {
496   struct GNUNET_DATASTORE_QueueEntry *qe;
497
498   if (NULL == (qe = h->queue_head))
499     return; /* no entry in queue */
500   if (qe->was_transmitted == GNUNET_YES)
501     return; /* waiting for replies */
502   if (h->th != NULL)
503     return; /* request pending */
504   if (h->client == NULL)
505     return; /* waiting for reconnect */
506   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
507                                                qe->message_size,
508                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
509                                                GNUNET_YES,
510                                                &transmit_request,
511                                                h);
512 }
513
514
515
516
517 /**
518  * Context for processing status messages.
519  */
520 struct StatusContext
521 {
522   /**
523    * Continuation to call with the status.
524    */
525   GNUNET_DATASTORE_ContinuationWithStatus cont;
526
527   /**
528    * Closure for cont.
529    */
530   void *cont_cls;
531
532 };
533
534
535 /**
536  * Dummy continuation used to do nothing (but be non-zero).
537  *
538  * @param cls closure
539  * @param result result 
540  * @param emsg error message
541  */
542 static void
543 drop_status_cont (void *cls, int result, const char *emsg)
544 {
545   /* do nothing */
546 }
547
548
549 static void
550 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
551 {
552   struct GNUNET_DATASTORE_Handle *h = qe->h;
553
554   GNUNET_CONTAINER_DLL_remove (h->queue_head,
555                                h->queue_tail,
556                                qe);
557   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
558     {
559       GNUNET_SCHEDULER_cancel (h->sched,
560                                qe->task);
561       qe->task = GNUNET_SCHEDULER_NO_TASK;
562     }
563   h->queue_size--;
564   GNUNET_free (qe);
565 }
566
567 /**
568  * Type of a function to call when we receive a message
569  * from the service.
570  *
571  * @param cls closure
572  * @param msg message received, NULL on timeout or fatal error
573  */
574 static void 
575 process_status_message (void *cls,
576                         const struct
577                         GNUNET_MessageHeader * msg)
578 {
579   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
580   struct GNUNET_DATASTORE_Handle *h = qe->h;
581   struct StatusContext *rc = qe->client_ctx;
582   const struct StatusMessage *sm;
583   const char *emsg;
584   int32_t status;
585
586   free_queue_entry (qe);
587   if (msg == NULL)
588     {      
589       if (NULL == h->client)
590         return; /* forced disconnect */
591       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
592                   _("Failed to receive response from database.\n"));
593       do_disconnect (h);
594       return;
595     }
596
597   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
598        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
599     {
600       GNUNET_break (0);
601       h->retry_time = GNUNET_TIME_UNIT_ZERO;
602       do_disconnect (h);
603       rc->cont (rc->cont_cls, 
604                 GNUNET_SYSERR,
605                 _("Error reading response from datastore service"));
606       GNUNET_free (rc);
607       return;
608     }
609   sm = (const struct StatusMessage*) msg;
610   status = ntohl(sm->status);
611   emsg = NULL;
612   if (ntohs(msg->size) > sizeof(struct StatusMessage))
613     {
614       emsg = (const char*) &sm[1];
615       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
616         {
617           GNUNET_break (0);
618           emsg = _("Invalid error message received from datastore service");
619         }
620     }  
621   if ( (status == GNUNET_SYSERR) &&
622        (emsg == NULL) )
623     {
624       GNUNET_break (0);
625       emsg = _("Invalid error message received from datastore service");
626     }
627 #if DEBUG_DATASTORE
628   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629               "Received status %d/%s\n",
630               (int) status,
631               emsg);
632 #endif
633   rc->cont (rc->cont_cls, 
634             status,
635             emsg);
636   GNUNET_free (rc);  
637   process_queue (h);
638 }
639
640
641 /**
642  * Store an item in the datastore.  If the item is already present,
643  * the priorities are summed up and the higher expiration time and
644  * lower anonymity level is used.
645  *
646  * @param h handle to the datastore
647  * @param rid reservation ID to use (from "reserve"); use 0 if no
648  *            prior reservation was made
649  * @param key key for the value
650  * @param size number of bytes in data
651  * @param data content stored
652  * @param type type of the content
653  * @param priority priority of the content
654  * @param anonymity anonymity-level for the content
655  * @param expiration expiration time for the content
656  * @param queue_priority ranking of this request in the priority queue
657  * @param max_queue_size at what queue size should this request be dropped
658  *        (if other requests of higher priority are in the queue)
659  * @param timeout timeout for the operation
660  * @param cont continuation to call when done
661  * @param cont_cls closure for cont
662  * @return NULL if the entry was not queued, otherwise a handle that can be used to
663  *         cancel; note that even if NULL is returned, the callback will be invoked
664  *         (or rather, will already have been invoked)
665  */
666 struct GNUNET_DATASTORE_QueueEntry *
667 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
668                       int rid,
669                       const GNUNET_HashCode * key,
670                       uint32_t size,
671                       const void *data,
672                       enum GNUNET_BLOCK_Type type,
673                       uint32_t priority,
674                       uint32_t anonymity,
675                       struct GNUNET_TIME_Absolute expiration,
676                       unsigned int queue_priority,
677                       unsigned int max_queue_size,
678                       struct GNUNET_TIME_Relative timeout,
679                       GNUNET_DATASTORE_ContinuationWithStatus cont,
680                       void *cont_cls)
681 {
682   struct StatusContext *scont;
683   struct GNUNET_DATASTORE_QueueEntry *qe;
684   struct DataMessage *dm;
685   size_t msize;
686
687 #if DEBUG_DATASTORE
688   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
689               "Asked to put %u bytes of data under key `%s'\n",
690               size,
691               GNUNET_h2s (key));
692 #endif
693   msize = sizeof(struct DataMessage) + size;
694   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
695   scont = GNUNET_malloc (sizeof (struct StatusContext));
696   scont->cont = cont;
697   scont->cont_cls = cont_cls;
698   qe = make_queue_entry (h, msize,
699                          queue_priority, max_queue_size, timeout,
700                          &process_status_message, scont);
701   if (qe == NULL)
702     return NULL;
703   dm = (struct DataMessage* ) &qe[1];
704   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
705   dm->header.size = htons(msize);
706   dm->rid = htonl(rid);
707   dm->size = htonl(size);
708   dm->type = htonl(type);
709   dm->priority = htonl(priority);
710   dm->anonymity = htonl(anonymity);
711   dm->uid = GNUNET_htonll(0);
712   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
713   dm->key = *key;
714   memcpy (&dm[1], data, size);
715   process_queue (h);
716   return qe;
717 }
718
719
720 /**
721  * Reserve space in the datastore.  This function should be used
722  * to avoid "out of space" failures during a longer sequence of "put"
723  * operations (for example, when a file is being inserted).
724  *
725  * @param h handle to the datastore
726  * @param amount how much space (in bytes) should be reserved (for content only)
727  * @param entries how many entries will be created (to calculate per-entry overhead)
728  * @param queue_priority ranking of this request in the priority queue
729  * @param max_queue_size at what queue size should this request be dropped
730  *        (if other requests of higher priority are in the queue)
731  * @param timeout how long to wait at most for a response (or before dying in queue)
732  * @param cont continuation to call when done; "success" will be set to
733  *             a positive reservation value if space could be reserved.
734  * @param cont_cls closure for cont
735  * @return NULL if the entry was not queued, otherwise a handle that can be used to
736  *         cancel; note that even if NULL is returned, the callback will be invoked
737  *         (or rather, will already have been invoked)
738  */
739 struct GNUNET_DATASTORE_QueueEntry *
740 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
741                           uint64_t amount,
742                           uint32_t entries,
743                           unsigned int queue_priority,
744                           unsigned int max_queue_size,
745                           struct GNUNET_TIME_Relative timeout,
746                           GNUNET_DATASTORE_ContinuationWithStatus cont,
747                           void *cont_cls)
748 {
749   struct GNUNET_DATASTORE_QueueEntry *qe;
750   struct ReserveMessage *rm;
751   struct StatusContext *scont;
752
753   if (cont == NULL)
754     cont = &drop_status_cont;
755 #if DEBUG_DATASTORE
756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
757               "Asked to reserve %llu bytes of data and %u entries'\n",
758               (unsigned long long) amount,
759               (unsigned int) entries);
760 #endif
761   scont = GNUNET_malloc (sizeof (struct StatusContext));
762   scont->cont = cont;
763   scont->cont_cls = cont_cls;
764   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
765                          queue_priority, max_queue_size, timeout,
766                          &process_status_message, scont);
767   if (qe == NULL)
768     return NULL;
769   rm = (struct ReserveMessage*) &qe[1];
770   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
771   rm->header.size = htons(sizeof (struct ReserveMessage));
772   rm->entries = htonl(entries);
773   rm->amount = GNUNET_htonll(amount);
774   process_queue (h);
775   return qe;
776 }
777
778
779 /**
780  * Signal that all of the data for which a reservation was made has
781  * been stored and that whatever excess space might have been reserved
782  * can now be released.
783  *
784  * @param h handle to the datastore
785  * @param rid reservation ID (value of "success" in original continuation
786  *        from the "reserve" function).
787  * @param queue_priority ranking of this request in the priority queue
788  * @param max_queue_size at what queue size should this request be dropped
789  *        (if other requests of higher priority are in the queue)
790  * @param queue_priority ranking of this request in the priority queue
791  * @param max_queue_size at what queue size should this request be dropped
792  *        (if other requests of higher priority are in the queue)
793  * @param timeout how long to wait at most for a response
794  * @param cont continuation to call when done
795  * @param cont_cls closure for cont
796  * @return NULL if the entry was not queued, otherwise a handle that can be used to
797  *         cancel; note that even if NULL is returned, the callback will be invoked
798  *         (or rather, will already have been invoked)
799  */
800 struct GNUNET_DATASTORE_QueueEntry *
801 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
802                                   int rid,
803                                   unsigned int queue_priority,
804                                   unsigned int max_queue_size,
805                                   struct GNUNET_TIME_Relative timeout,
806                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
807                                   void *cont_cls)
808 {
809   struct GNUNET_DATASTORE_QueueEntry *qe;
810   struct ReleaseReserveMessage *rrm;
811   struct StatusContext *scont;
812
813   if (cont == NULL)
814     cont = &drop_status_cont;
815 #if DEBUG_DATASTORE
816   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817               "Asked to release reserve %d\n",
818               rid);
819 #endif
820   scont = GNUNET_malloc (sizeof (struct StatusContext));
821   scont->cont = cont;
822   scont->cont_cls = cont_cls;
823   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
824                          queue_priority, max_queue_size, timeout,
825                          &process_status_message, scont);
826   if (qe == NULL)
827     return NULL;
828   rrm = (struct ReleaseReserveMessage*) &qe[1];
829   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
830   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
831   rrm->rid = htonl(rid);
832   process_queue (h);
833   return qe;
834 }
835
836
837 /**
838  * Update a value in the datastore.
839  *
840  * @param h handle to the datastore
841  * @param uid identifier for the value
842  * @param priority how much to increase the priority of the value
843  * @param expiration new expiration value should be MAX of existing and this argument
844  * @param queue_priority ranking of this request in the priority queue
845  * @param max_queue_size at what queue size should this request be dropped
846  *        (if other requests of higher priority are in the queue)
847  * @param timeout how long to wait at most for a response
848  * @param cont continuation to call when done
849  * @param cont_cls closure for cont
850  * @return NULL if the entry was not queued, otherwise a handle that can be used to
851  *         cancel; note that even if NULL is returned, the callback will be invoked
852  *         (or rather, will already have been invoked)
853  */
854 struct GNUNET_DATASTORE_QueueEntry *
855 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
856                          unsigned long long uid,
857                          uint32_t priority,
858                          struct GNUNET_TIME_Absolute expiration,
859                          unsigned int queue_priority,
860                          unsigned int max_queue_size,
861                          struct GNUNET_TIME_Relative timeout,
862                          GNUNET_DATASTORE_ContinuationWithStatus cont,
863                          void *cont_cls)
864 {
865   struct GNUNET_DATASTORE_QueueEntry *qe;
866   struct UpdateMessage *um;
867   struct StatusContext *scont;
868
869   if (cont == NULL)
870     cont = &drop_status_cont;
871 #if DEBUG_DATASTORE
872   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
873               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
874               uid,
875               (unsigned int) priority,
876               (unsigned long long) expiration.value);
877 #endif
878   scont = GNUNET_malloc (sizeof (struct StatusContext));
879   scont->cont = cont;
880   scont->cont_cls = cont_cls;
881   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
882                          queue_priority, max_queue_size, timeout,
883                          &process_status_message, scont);
884   if (qe == NULL)
885     return NULL;
886   um = (struct UpdateMessage*) &qe[1];
887   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
888   um->header.size = htons(sizeof (struct UpdateMessage));
889   um->priority = htonl(priority);
890   um->expiration = GNUNET_TIME_absolute_hton(expiration);
891   um->uid = GNUNET_htonll(uid);
892   process_queue (h);
893   return qe;
894 }
895
896
897 /**
898  * Explicitly remove some content from the database.
899  * The "cont"inuation will be called with status
900  * "GNUNET_OK" if content was removed, "GNUNET_NO"
901  * if no matching entry was found and "GNUNET_SYSERR"
902  * on all other types of errors.
903  *
904  * @param h handle to the datastore
905  * @param key key for the value
906  * @param size number of bytes in data
907  * @param data content stored
908  * @param queue_priority ranking of this request in the priority queue
909  * @param max_queue_size at what queue size should this request be dropped
910  *        (if other requests of higher priority are in the queue)
911  * @param timeout how long to wait at most for a response
912  * @param cont continuation to call when done
913  * @param cont_cls closure for cont
914  * @return NULL if the entry was not queued, otherwise a handle that can be used to
915  *         cancel; note that even if NULL is returned, the callback will be invoked
916  *         (or rather, will already have been invoked)
917  */
918 struct GNUNET_DATASTORE_QueueEntry *
919 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
920                          const GNUNET_HashCode *key,
921                          uint32_t size, 
922                          const void *data,
923                          unsigned int queue_priority,
924                          unsigned int max_queue_size,
925                          struct GNUNET_TIME_Relative timeout,
926                          GNUNET_DATASTORE_ContinuationWithStatus cont,
927                          void *cont_cls)
928 {
929   struct GNUNET_DATASTORE_QueueEntry *qe;
930   struct DataMessage *dm;
931   size_t msize;
932   struct StatusContext *scont;
933
934   if (cont == NULL)
935     cont = &drop_status_cont;
936 #if DEBUG_DATASTORE
937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
938               "Asked to remove %u bytes under key `%s'\n",
939               size,
940               GNUNET_h2s (key));
941 #endif
942   scont = GNUNET_malloc (sizeof (struct StatusContext));
943   scont->cont = cont;
944   scont->cont_cls = cont_cls;
945   msize = sizeof(struct DataMessage) + size;
946   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
947   qe = make_queue_entry (h, msize,
948                          queue_priority, max_queue_size, timeout,
949                          &process_status_message, scont);
950   if (qe == NULL)
951     return NULL;
952   dm = (struct DataMessage*) &qe[1];
953   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
954   dm->header.size = htons(msize);
955   dm->rid = htonl(0);
956   dm->size = htonl(size);
957   dm->type = htonl(0);
958   dm->priority = htonl(0);
959   dm->anonymity = htonl(0);
960   dm->uid = GNUNET_htonll(0);
961   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
962   dm->key = *key;
963   memcpy (&dm[1], data, size);
964   process_queue (h);
965   return qe;
966 }
967
968
969
970 /**
971  * Context for processing result messages.
972  */
973 struct ResultContext
974 {
975   /**
976    * Iterator to call with the result.
977    */
978   GNUNET_DATASTORE_Iterator iter;
979
980   /**
981    * Closure for iter.
982    */
983   void *iter_cls;
984
985 };
986
987
988 /**
989  * Type of a function to call when we receive a message
990  * from the service.
991  *
992  * @param cls closure
993  * @param msg message received, NULL on timeout or fatal error
994  */
995 static void 
996 process_result_message (void *cls,
997                         const struct GNUNET_MessageHeader * msg)
998 {
999   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1000   struct GNUNET_DATASTORE_Handle *h = qe->h;
1001   struct ResultContext *rc = qe->client_ctx;
1002   const struct DataMessage *dm;
1003
1004   GNUNET_assert (h->queue_head == qe);
1005   if (msg == NULL)
1006     {
1007 #if DEBUG_DATASTORE
1008       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1009                   _("Failed to receive response from datastore\n"));
1010 #endif
1011       free_queue_entry (qe);
1012       do_disconnect (h);
1013       rc->iter (rc->iter_cls,
1014                 NULL, 0, NULL, 0, 0, 0, 
1015                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1016       GNUNET_free (rc);
1017       return;
1018     }
1019   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1020     {
1021       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1022 #if DEBUG_DATASTORE
1023       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1024                   "Received end of result set\n");
1025 #endif
1026       free_queue_entry (qe);
1027       rc->iter (rc->iter_cls,
1028                 NULL, 0, NULL, 0, 0, 0, 
1029                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1030       GNUNET_free (rc);
1031       process_queue (h);
1032       return;
1033     }
1034   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1035        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1036        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1037     {
1038       GNUNET_break (0);
1039       free_queue_entry (qe);
1040       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1041       do_disconnect (h);
1042       rc->iter (rc->iter_cls,
1043                 NULL, 0, NULL, 0, 0, 0, 
1044                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1045       GNUNET_free (rc);
1046       return;
1047     }
1048   dm = (const struct DataMessage*) msg;
1049 #if DEBUG_DATASTORE
1050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1051               "Received result %llu with type %u and size %u with key %s\n",
1052               (unsigned long long) GNUNET_ntohll(dm->uid),
1053               ntohl(dm->type),
1054               ntohl(dm->size),
1055               GNUNET_h2s(&dm->key));
1056 #endif
1057   rc->iter (rc->iter_cls,
1058             &dm->key,
1059             ntohl(dm->size),
1060             &dm[1],
1061             ntohl(dm->type),
1062             ntohl(dm->priority),
1063             ntohl(dm->anonymity),
1064             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1065             GNUNET_ntohll(dm->uid));
1066 }
1067
1068
1069 /**
1070  * Get a random value from the datastore.
1071  *
1072  * @param h handle to the datastore
1073  * @param queue_priority ranking of this request in the priority queue
1074  * @param max_queue_size at what queue size should this request be dropped
1075  *        (if other requests of higher priority are in the queue)
1076  * @param timeout how long to wait at most for a response
1077  * @param iter function to call on a random value; it
1078  *        will be called once with a value (if available)
1079  *        and always once with a value of NULL.
1080  * @param iter_cls closure for iter
1081  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1082  *         cancel; note that even if NULL is returned, the callback will be invoked
1083  *         (or rather, will already have been invoked)
1084  */
1085 struct GNUNET_DATASTORE_QueueEntry *
1086 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1087                              unsigned int queue_priority,
1088                              unsigned int max_queue_size,
1089                              struct GNUNET_TIME_Relative timeout,
1090                              GNUNET_DATASTORE_Iterator iter, 
1091                              void *iter_cls)
1092 {
1093   struct GNUNET_DATASTORE_QueueEntry *qe;
1094   struct GNUNET_MessageHeader *m;
1095   struct ResultContext *rcont;
1096
1097 #if DEBUG_DATASTORE
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099               "Asked to get random entry in %llu ms\n",
1100               (unsigned long long) timeout.value);
1101 #endif
1102   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1103   rcont->iter = iter;
1104   rcont->iter_cls = iter_cls;
1105   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1106                          queue_priority, max_queue_size, timeout,
1107                          &process_result_message, rcont);
1108   if (qe == NULL)
1109     return NULL;
1110   m = (struct GNUNET_MessageHeader*) &qe[1];
1111   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1112   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1113   process_queue (h);
1114   return qe;
1115 }
1116
1117
1118
1119 /**
1120  * Iterate over the results for a particular key
1121  * in the datastore.  The iterator will only be called
1122  * once initially; if the first call did contain a
1123  * result, further results can be obtained by calling
1124  * "GNUNET_DATASTORE_get_next" with the given argument.
1125  *
1126  * @param h handle to the datastore
1127  * @param key maybe NULL (to match all entries)
1128  * @param type desired type, 0 for any
1129  * @param queue_priority ranking of this request in the priority queue
1130  * @param max_queue_size at what queue size should this request be dropped
1131  *        (if other requests of higher priority are in the queue)
1132  * @param timeout how long to wait at most for a response
1133  * @param iter function to call on each matching value;
1134  *        will be called once with a NULL value at the end
1135  * @param iter_cls closure for iter
1136  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1137  *         cancel; note that even if NULL is returned, the callback will be invoked
1138  *         (or rather, will already have been invoked)
1139  */
1140 struct GNUNET_DATASTORE_QueueEntry *
1141 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1142                       const GNUNET_HashCode * key,
1143                       enum GNUNET_BLOCK_Type type,
1144                       unsigned int queue_priority,
1145                       unsigned int max_queue_size,
1146                       struct GNUNET_TIME_Relative timeout,
1147                       GNUNET_DATASTORE_Iterator iter, 
1148                       void *iter_cls)
1149 {
1150   struct GNUNET_DATASTORE_QueueEntry *qe;
1151   struct GetMessage *gm;
1152   struct ResultContext *rcont;
1153
1154 #if DEBUG_DATASTORE
1155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1156               "Asked to look for data of type %u under key `%s'\n",
1157               (unsigned int) type,
1158               GNUNET_h2s (key));
1159 #endif
1160   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1161   rcont->iter = iter;
1162   rcont->iter_cls = iter_cls;
1163   qe = make_queue_entry (h, sizeof(struct GetMessage),
1164                          queue_priority, max_queue_size, timeout,
1165                          &process_result_message, rcont);
1166   if (qe == NULL)
1167     return NULL;
1168   gm = (struct GetMessage*) &qe[1];
1169   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1170   gm->type = htonl(type);
1171   if (key != NULL)
1172     {
1173       gm->header.size = htons(sizeof (struct GetMessage));
1174       gm->key = *key;
1175     }
1176   else
1177     {
1178       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1179     }
1180   process_queue (h);
1181   return qe;
1182 }
1183
1184
1185 /**
1186  * Function called to trigger obtaining the next result
1187  * from the datastore.
1188  * 
1189  * @param h handle to the datastore
1190  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1191  *        iteration (with a final call to "iter" with key/data == NULL).
1192  */
1193 void 
1194 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1195                            int more)
1196 {
1197   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1198   struct ResultContext *rc = qe->client_ctx;
1199
1200   GNUNET_assert (NULL != qe);
1201   GNUNET_assert (&process_result_message == qe->response_proc);
1202   if (GNUNET_YES == more)
1203     {     
1204       GNUNET_CLIENT_receive (h->client,
1205                              qe->response_proc,
1206                              qe,
1207                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1208       return;
1209     }
1210   free_queue_entry (qe);
1211   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1212   do_disconnect (h);
1213   rc->iter (rc->iter_cls,
1214             NULL, 0, NULL, 0, 0, 0, 
1215             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1216   GNUNET_free (rc);
1217 }
1218
1219
1220 /**
1221  * Cancel a datastore operation.  The final callback from the
1222  * operation must not have been done yet.
1223  * 
1224  * @param qe operation to cancel
1225  */
1226 void
1227 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1228 {
1229   struct GNUNET_DATASTORE_Handle *h;
1230   int reconnect;
1231
1232   h = qe->h;
1233   reconnect = qe->was_transmitted;
1234   free_queue_entry (qe);
1235   h->queue_size--;
1236   if (reconnect)
1237     {
1238       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1239       do_disconnect (h);
1240     }
1241 }
1242
1243
1244 /* end of datastore_api.c */