3e47f84d12fa9fba5c201bc713b5654796e7e35e
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus cont;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_cls;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct QueueEntry *qe;
244
245   if (h->client != NULL)
246     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
247   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
248     GNUNET_SCHEDULER_cancel (h->sched,
249                              h->reconnect_task);
250   h->client = NULL;
251   while (NULL != (qe = h->queue_head))
252     {
253       GNUNET_CONTAINER_DLL_remove (h->queue_head,
254                                    h->queue_tail,
255                                    qe);
256       if (NULL != qe->response_proc)
257         qe->response_proc (qe, NULL);
258       GNUNET_free (qe);
259     }
260   if (GNUNET_YES == drop) 
261     {
262       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
263       if (h->client != NULL)
264         {
265           if (NULL != 
266               GNUNET_CLIENT_notify_transmit_ready (h->client,
267                                                    sizeof(struct GNUNET_MessageHeader),
268                                                    GNUNET_TIME_UNIT_MINUTES,
269                                                    GNUNET_YES,
270                                                    &transmit_drop,
271                                                    h))
272             return;
273           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
274         }
275       GNUNET_break (0);
276     }
277   GNUNET_free (h);
278 }
279
280
281 /**
282  * A request has timed out (before being transmitted to the service).
283  *
284  * @param cls the 'struct QueueEntry'
285  * @param tc scheduler context
286  */
287 static void
288 timeout_queue_entry (void *cls,
289                      const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct QueueEntry *qe = cls;
292   struct GNUNET_DATASTORE_Handle *h = qe->h;
293
294   qe->task = GNUNET_SCHEDULER_NO_TASK;
295   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
296   GNUNET_CONTAINER_DLL_remove (h->queue_head,
297                                h->queue_tail,
298                                qe);
299   if (qe->cont != NULL)
300     qe->cont (qe->cont_cls, GNUNET_NO, _("timeout"));
301   if (qe->response_proc != NULL)
302     qe->response_proc (qe, NULL);
303   GNUNET_free (qe);
304 }
305
306
307 /**
308  * Create a new entry for our priority queue (and possibly discard other entires if
309  * the queue is getting too long).
310  *
311  * @param h handle to the datastore
312  * @param msize size of the message to queue
313  * @param queue_priority priority of the entry
314  * @param max_queue_size at what queue size should this request be dropped
315  *        (if other requests of higher priority are in the queue)
316  * @param timeout timeout for the operation
317  * @param cont continuation to call when done with request transmission (can be NULL)
318  * @param cont_cls closure for cont
319  * @param response_proc function to call with replies (can be NULL)
320  * @param client_ctx client context (NOT a closure for response_proc)
321  * @return NULL if the queue is full (and this entry was dropped)
322  */
323 static struct QueueEntry *
324 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
325                   size_t msize,
326                   unsigned int queue_priority,
327                   unsigned int max_queue_size,
328                   struct GNUNET_TIME_Relative timeout,
329                   GNUNET_DATASTORE_ContinuationWithStatus cont,
330                   void *cont_cls,
331                   GNUNET_CLIENT_MessageHandler response_proc,            
332                   void *client_ctx)
333 {
334   struct QueueEntry *ret;
335   struct QueueEntry *pos;
336   unsigned int c;
337
338   c = 0;
339   pos = h->queue_head;
340   while ( (pos != NULL) &&
341           (c < max_queue_size) &&
342           (pos->priority >= queue_priority) )
343     {
344       c++;
345       pos = pos->next;
346     }
347   if (c >= max_queue_size)
348     return NULL;
349   if (pos == NULL)
350     {
351       /* append at the tail */
352       pos = h->queue_tail;
353     }
354   else
355     {
356       pos = pos->prev; 
357       /* do not insert at HEAD if HEAD query was already
358          transmitted and we are still receiving replies! */
359       if ( (pos == NULL) &&
360            (h->queue_head->was_transmitted) )
361         pos = h->queue_head;
362     }
363   ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize);
364   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
365                                      h->queue_tail,
366                                      pos,
367                                      ret);
368   ret->h = h;
369   ret->response_proc = response_proc;
370   ret->client_ctx = client_ctx;
371   ret->cont = cont;
372   ret->cont_cls = cont_cls;  
373   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
374                                             timeout,
375                                             &timeout_queue_entry,
376                                             ret);
377   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
378   ret->priority = queue_priority;
379   ret->max_queue = max_queue_size;
380   ret->message_size = msize;
381   ret->was_transmitted = GNUNET_NO;
382   h->queue_size++;
383   c++;
384   pos = ret->next;
385   while (pos != NULL) 
386     {
387       if (pos->max_queue < h->queue_size)
388         {
389           GNUNET_CONTAINER_DLL_remove (h->queue_head,
390                                        h->queue_tail,
391                                        pos);
392           GNUNET_SCHEDULER_cancel (h->sched,
393                                    pos->task);
394           if (pos->cont != NULL)
395             pos->cont (pos->cont_cls, GNUNET_NO, _("Message queue full"));
396           if (pos->response_proc != NULL)
397             pos->response_proc (pos, NULL);
398           GNUNET_free (pos);
399           h->queue_size--;
400           break;
401         }
402       pos = pos->next;
403     }
404   return ret;
405 }
406
407
408 /**
409  * Process entries in the queue (or do nothing if we are already
410  * doing so).
411  * 
412  * @param h handle to the datastore
413  */
414 static void
415 process_queue (struct GNUNET_DATASTORE_Handle *h);
416
417
418 /**
419  * Try reconnecting to the datastore service.
420  *
421  * @param cls the 'struct GNUNET_DATASTORE_Handle'
422  * @param tc scheduler context
423  */
424 static void
425 try_reconnect (void *cls,
426                const struct GNUNET_SCHEDULER_TaskContext *tc)
427 {
428   struct GNUNET_DATASTORE_Handle *h = cls;
429
430   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
431     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
432   else
433     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
434   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
435     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
436   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
437   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
438   if (h->client == NULL)
439     return;
440   process_queue (h);
441 }
442
443
444 /**
445  * Disconnect from the service and then try reconnecting to the datastore service
446  * after some delay.
447  *
448  * @param cls the 'struct GNUNET_DATASTORE_Handle'
449  * @param tc scheduler context
450  */
451 static void
452 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
453 {
454   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
455   h->client = NULL;
456   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
457                                                     h->retry_time,
458                                                     &try_reconnect,
459                                                     h);      
460 }
461
462
463 /**
464  * Transmit request from queue to datastore service.
465  *
466  * @param cls the 'struct GNUNET_DATASTORE_Handle'
467  * @param size number of bytes that can be copied to buf
468  * @param buf where to copy the drop message
469  * @return number of bytes written to buf
470  */
471 static size_t
472 transmit_request (void *cls,
473                   size_t size, 
474                   void *buf)
475 {
476   struct GNUNET_DATASTORE_Handle *h = cls;
477   struct QueueEntry *qe;
478   size_t msize;
479
480   h->th = NULL;
481   if (NULL == (qe = h->queue_head))
482     return 0; /* no entry in queue */
483   if (buf == NULL)
484     {
485       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
486                   _("Failed to transmit request to database.\n"));
487       do_disconnect (h);
488       return 0;
489     }
490   if (size < (msize = qe->message_size))
491     {
492       process_queue (h);
493       return 0;
494     }
495   memcpy (buf, &qe[1], msize);
496   qe->was_transmitted = GNUNET_YES;
497   if (qe->cont != NULL)
498     qe->cont (qe->cont_cls, GNUNET_OK, NULL);
499   GNUNET_SCHEDULER_cancel (h->sched,
500                            qe->task);
501   qe->task = GNUNET_SCHEDULER_NO_TASK;
502   if (qe->response_proc == NULL)
503     {
504       GNUNET_CONTAINER_DLL_remove (h->queue_head,
505                                    h->queue_tail,
506                                    qe);
507       GNUNET_free (qe);
508       process_queue (h);
509     }
510   else
511     {
512       GNUNET_CLIENT_receive (h->client,
513                              qe->response_proc,
514                              qe,
515                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
516     }  
517   return msize;
518 }
519
520
521 /**
522  * Process entries in the queue (or do nothing if we are already
523  * doing so).
524  * 
525  * @param h handle to the datastore
526  */
527 static void
528 process_queue (struct GNUNET_DATASTORE_Handle *h)
529 {
530   struct QueueEntry *qe;
531
532   if (NULL == (qe = h->queue_head))
533     return; /* no entry in queue */
534   if (qe->was_transmitted == GNUNET_YES)
535     return; /* waiting for replies */
536   if (h->th != NULL)
537     return; /* request pending */
538   if (h->client == NULL)
539     return; /* waiting for reconnect */
540   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
541                                                qe->message_size,
542                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
543                                                GNUNET_YES,
544                                                &transmit_request,
545                                                h);
546 }
547
548
549 /**
550  * Store an item in the datastore.  If the item is already present,
551  * the priorities are summed up and the higher expiration time and
552  * lower anonymity level is used.
553  *
554  * @param h handle to the datastore
555  * @param rid reservation ID to use (from "reserve"); use 0 if no
556  *            prior reservation was made
557  * @param key key for the value
558  * @param size number of bytes in data
559  * @param data content stored
560  * @param type type of the content
561  * @param priority priority of the content
562  * @param anonymity anonymity-level for the content
563  * @param expiration expiration time for the content
564  * @param queue_priority ranking of this request in the priority queue
565  * @param max_queue_size at what queue size should this request be dropped
566  *        (if other requests of higher priority are in the queue)
567  * @param timeout timeout for the operation
568  * @param cont continuation to call when done
569  * @param cont_cls closure for cont
570  */
571 void
572 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
573                       int rid,
574                       const GNUNET_HashCode * key,
575                       uint32_t size,
576                       const void *data,
577                       enum GNUNET_BLOCK_Type type,
578                       uint32_t priority,
579                       uint32_t anonymity,
580                       struct GNUNET_TIME_Absolute expiration,
581                       unsigned int queue_priority,
582                       unsigned int max_queue_size,
583                       struct GNUNET_TIME_Relative timeout,
584                       GNUNET_DATASTORE_ContinuationWithStatus cont,
585                       void *cont_cls)
586 {
587   struct QueueEntry *qe;
588   struct DataMessage *dm;
589   size_t msize;
590
591 #if DEBUG_DATASTORE
592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
593               "Asked to put %u bytes of data under key `%s'\n",
594               size,
595               GNUNET_h2s (key));
596 #endif
597   msize = sizeof(struct DataMessage) + size;
598   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
599   qe = make_queue_entry (h, msize,
600                          queue_priority, max_queue_size, timeout,
601                          cont, cont_cls, NULL, NULL);
602   if (qe == NULL)
603     return;
604   dm = (struct DataMessage* ) &qe[1];
605   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
606   dm->header.size = htons(msize);
607   dm->rid = htonl(rid);
608   dm->size = htonl(size);
609   dm->type = htonl(type);
610   dm->priority = htonl(priority);
611   dm->anonymity = htonl(anonymity);
612   dm->uid = GNUNET_htonll(0);
613   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
614   dm->key = *key;
615   memcpy (&dm[1], data, size);
616   process_queue (h);
617 }
618
619
620 /**
621  * Context for processing status messages.
622  */
623 struct StatusContext
624 {
625   /**
626    * Continuation to call with the status.
627    */
628   GNUNET_DATASTORE_ContinuationWithStatus cont;
629
630   /**
631    * Closure for cont.
632    */
633   void *cont_cls;
634
635 };
636
637
638 /**
639  * Type of a function to call when we receive a message
640  * from the service.
641  *
642  * @param cls closure
643  * @param msg message received, NULL on timeout or fatal error
644  */
645 static void 
646 process_status_message (void *cls,
647                         const struct
648                         GNUNET_MessageHeader * msg)
649 {
650   struct QueueEntry *qe = cls;
651   struct GNUNET_DATASTORE_Handle *h = qe->h;
652   struct StatusContext *rc = qe->client_ctx;
653   const struct StatusMessage *sm;
654   const char *emsg;
655   int32_t status;
656
657   GNUNET_CONTAINER_DLL_remove (h->queue_head,
658                                h->queue_tail,
659                                qe);
660   GNUNET_free (qe);
661   if (msg == NULL)
662     {
663       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
664                   _("Failed to receive response from database.\n"));
665       do_disconnect (h);
666       return;
667     }
668
669   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
670        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
671     {
672       GNUNET_break (0);
673       h->retry_time = GNUNET_TIME_UNIT_ZERO;
674       do_disconnect (h);
675       rc->cont (rc->cont_cls, 
676                 GNUNET_SYSERR,
677                 _("Error reading response from datastore service"));
678       GNUNET_free (rc);
679       return;
680     }
681   sm = (const struct StatusMessage*) msg;
682   status = ntohl(sm->status);
683   emsg = NULL;
684   if (ntohs(msg->size) > sizeof(struct StatusMessage))
685     {
686       emsg = (const char*) &sm[1];
687       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
688         {
689           GNUNET_break (0);
690           emsg = _("Invalid error message received from datastore service");
691         }
692     }  
693   if ( (status == GNUNET_SYSERR) &&
694        (emsg == NULL) )
695     {
696       GNUNET_break (0);
697       emsg = _("Invalid error message received from datastore service");
698     }
699 #if DEBUG_DATASTORE
700   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
701               "Received status %d/%s\n",
702               (int) status,
703               emsg);
704 #endif
705   rc->cont (rc->cont_cls, 
706             status,
707             emsg);
708   GNUNET_free (rc);  
709   process_queue (h);
710 }
711
712
713 /**
714  * Reserve space in the datastore.  This function should be used
715  * to avoid "out of space" failures during a longer sequence of "put"
716  * operations (for example, when a file is being inserted).
717  *
718  * @param h handle to the datastore
719  * @param amount how much space (in bytes) should be reserved (for content only)
720  * @param entries how many entries will be created (to calculate per-entry overhead)
721  * @param queue_priority ranking of this request in the priority queue
722  * @param max_queue_size at what queue size should this request be dropped
723  *        (if other requests of higher priority are in the queue)
724  * @param timeout how long to wait at most for a response (or before dying in queue)
725  * @param cont continuation to call when done; "success" will be set to
726  *             a positive reservation value if space could be reserved.
727  * @param cont_cls closure for cont
728  */
729 void
730 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
731                           uint64_t amount,
732                           uint32_t entries,
733                           unsigned int queue_priority,
734                           unsigned int max_queue_size,
735                           struct GNUNET_TIME_Relative timeout,
736                           GNUNET_DATASTORE_ContinuationWithStatus cont,
737                           void *cont_cls)
738 {
739   struct QueueEntry *qe;
740   struct ReserveMessage *rm;
741   struct StatusContext *scont;
742
743 #if DEBUG_DATASTORE
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Asked to reserve %llu bytes of data and %u entries'\n",
746               (unsigned long long) amount,
747               (unsigned int) entries);
748 #endif
749   scont = GNUNET_malloc (sizeof (struct StatusContext));
750   scont->cont = cont;
751   scont->cont_cls = cont_cls;
752   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
753                          queue_priority, max_queue_size, timeout,
754                          NULL, NULL, &process_status_message, scont);
755   if (qe == NULL)
756     return;
757   rm = (struct ReserveMessage*) &qe[1];
758   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
759   rm->header.size = htons(sizeof (struct ReserveMessage));
760   rm->entries = htonl(entries);
761   rm->amount = GNUNET_htonll(amount);
762   process_queue (h);
763 }
764
765
766 /**
767  * Signal that all of the data for which a reservation was made has
768  * been stored and that whatever excess space might have been reserved
769  * can now be released.
770  *
771  * @param h handle to the datastore
772  * @param rid reservation ID (value of "success" in original continuation
773  *        from the "reserve" function).
774  * @param queue_priority ranking of this request in the priority queue
775  * @param max_queue_size at what queue size should this request be dropped
776  *        (if other requests of higher priority are in the queue)
777  * @param queue_priority ranking of this request in the priority queue
778  * @param max_queue_size at what queue size should this request be dropped
779  *        (if other requests of higher priority are in the queue)
780  * @param timeout how long to wait at most for a response
781  * @param cont continuation to call when done
782  * @param cont_cls closure for cont
783  */
784 void
785 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
786                                   int rid,
787                                   unsigned int queue_priority,
788                                   unsigned int max_queue_size,
789                                   struct GNUNET_TIME_Relative timeout,
790                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
791                                   void *cont_cls)
792 {
793   struct QueueEntry *qe;
794   struct ReleaseReserveMessage *rrm;
795   struct StatusContext *scont;
796
797 #if DEBUG_DATASTORE
798   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
799               "Asked to reserve %llu bytes of data and %u entries'\n",
800               (unsigned long long) amount,
801               (unsigned int) entries);
802 #endif
803   scont = GNUNET_malloc (sizeof (struct StatusContext));
804   scont->cont = cont;
805   scont->cont_cls = cont_cls;
806   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
807                          queue_priority, max_queue_size, timeout,
808                          NULL, NULL, &process_status_message, scont);
809   if (qe == NULL)
810     return;
811   rrm = (struct ReleaseReserveMessage*) &qe[1];
812   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
813   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
814   rrm->rid = htonl(rid);
815   process_queue (h);
816 }
817
818
819 /**
820  * Update a value in the datastore.
821  *
822  * @param h handle to the datastore
823  * @param uid identifier for the value
824  * @param priority how much to increase the priority of the value
825  * @param expiration new expiration value should be MAX of existing and this argument
826  * @param queue_priority ranking of this request in the priority queue
827  * @param max_queue_size at what queue size should this request be dropped
828  *        (if other requests of higher priority are in the queue)
829  * @param timeout how long to wait at most for a response
830  * @param cont continuation to call when done
831  * @param cont_cls closure for cont
832  */
833 void
834 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
835                          unsigned long long uid,
836                          uint32_t priority,
837                          struct GNUNET_TIME_Absolute expiration,
838                          unsigned int queue_priority,
839                          unsigned int max_queue_size,
840                          struct GNUNET_TIME_Relative timeout,
841                          GNUNET_DATASTORE_ContinuationWithStatus cont,
842                          void *cont_cls)
843 {
844   struct QueueEntry *qe;
845   struct UpdateMessage *um;
846   struct StatusContext *scont;
847
848 #if DEBUG_DATASTORE
849   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
850               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
851               uid,
852               (unsigned int) priority,
853               (unsigned long long) expiration.value);
854 #endif
855   scont = GNUNET_malloc (sizeof (struct StatusContext));
856   scont->cont = cont;
857   scont->cont_cls = cont_cls;
858   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
859                          queue_priority, max_queue_size, timeout,
860                          NULL, NULL, &process_status_message, scont);
861   if (qe == NULL)
862     return;
863   um = (struct UpdateMessage*) &qe[1];
864   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
865   um->header.size = htons(sizeof (struct UpdateMessage));
866   um->priority = htonl(priority);
867   um->expiration = GNUNET_TIME_absolute_hton(expiration);
868   um->uid = GNUNET_htonll(uid);
869   process_queue (h);
870 }
871
872
873 /**
874  * Explicitly remove some content from the database.
875  * The "cont"inuation will be called with status
876  * "GNUNET_OK" if content was removed, "GNUNET_NO"
877  * if no matching entry was found and "GNUNET_SYSERR"
878  * on all other types of errors.
879  *
880  * @param h handle to the datastore
881  * @param key key for the value
882  * @param size number of bytes in data
883  * @param data content stored
884  * @param queue_priority ranking of this request in the priority queue
885  * @param max_queue_size at what queue size should this request be dropped
886  *        (if other requests of higher priority are in the queue)
887  * @param timeout how long to wait at most for a response
888  * @param cont continuation to call when done
889  * @param cont_cls closure for cont
890  */
891 void
892 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
893                          const GNUNET_HashCode *key,
894                          uint32_t size, 
895                          const void *data,
896                          unsigned int queue_priority,
897                          unsigned int max_queue_size,
898                          struct GNUNET_TIME_Relative timeout,
899                          GNUNET_DATASTORE_ContinuationWithStatus cont,
900                          void *cont_cls)
901 {
902   struct QueueEntry *qe;
903   struct DataMessage *dm;
904   size_t msize;
905   struct StatusContext *scont;
906
907 #if DEBUG_DATASTORE
908   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
909               "Asked to remove %u bytes under key `%s'\n",
910               size,
911               GNUNET_h2s (key));
912 #endif
913   scont = GNUNET_malloc (sizeof (struct StatusContext));
914   scont->cont = cont;
915   scont->cont_cls = cont_cls;
916   msize = sizeof(struct DataMessage) + size;
917   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
918   qe = make_queue_entry (h, msize,
919                          queue_priority, max_queue_size, timeout,
920                          NULL, NULL, &process_status_message, scont);
921   if (qe == NULL)
922     return;
923   dm = (struct DataMessage*) &qe[1];
924   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
925   dm->header.size = htons(msize);
926   dm->rid = htonl(0);
927   dm->size = htonl(size);
928   dm->type = htonl(0);
929   dm->priority = htonl(0);
930   dm->anonymity = htonl(0);
931   dm->uid = GNUNET_htonll(0);
932   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
933   dm->key = *key;
934   memcpy (&dm[1], data, size);
935   process_queue (h);
936 }
937
938
939
940 /**
941  * Context for processing result messages.
942  */
943 struct ResultContext
944 {
945   /**
946    * Iterator to call with the result.
947    */
948   GNUNET_DATASTORE_Iterator iter;
949
950   /**
951    * Closure for iter.
952    */
953   void *iter_cls;
954
955   /**
956    * Automatically get the next result, or wait for a call to
957    * GNUNET_DATASTORE_get_next?  GNUNET_YES means we automatically
958    * get the next one (if there are more).
959    */
960   int get_next;
961
962 };
963
964
965 /**
966  * Type of a function to call when we receive a message
967  * from the service.
968  *
969  * @param cls closure
970  * @param msg message received, NULL on timeout or fatal error
971  */
972 static void 
973 process_result_message (void *cls,
974                         const struct GNUNET_MessageHeader * msg)
975 {
976   struct QueueEntry *qe = cls;
977   struct GNUNET_DATASTORE_Handle *h = qe->h;
978   struct ResultContext *rc = qe->client_ctx;
979   const struct DataMessage *dm;
980
981   if (msg == NULL)
982     {
983 #if DEBUG_DATASTORE
984       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
985                   _("Failed to receive response from datastore\n"));
986 #endif
987       GNUNET_CONTAINER_DLL_remove (h->queue_head,
988                                    h->queue_tail,
989                                    qe);
990       GNUNET_free (qe);
991       do_disconnect (h);
992       rc->iter (rc->iter_cls,
993                 NULL, 0, NULL, 0, 0, 0, 
994                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
995       GNUNET_free (rc);
996       return;
997     }
998   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
999     {
1000       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1001 #if DEBUG_DATASTORE
1002       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003                   "Received end of result set\n");
1004 #endif
1005       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1006                                    h->queue_tail,
1007                                    qe);
1008       GNUNET_free (qe);
1009       rc->iter (rc->iter_cls,
1010                 NULL, 0, NULL, 0, 0, 0, 
1011                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1012       GNUNET_free (rc);
1013       process_queue (h);
1014       return;
1015     }
1016   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1017        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1018        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1019     {
1020       GNUNET_break (0);
1021       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1022                                    h->queue_tail,
1023                                    qe);
1024       GNUNET_free (qe);
1025       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1026       do_disconnect (h);
1027       rc->iter (rc->iter_cls,
1028                 NULL, 0, NULL, 0, 0, 0, 
1029                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1030       GNUNET_free (rc);
1031       return;
1032     }
1033   dm = (const struct DataMessage*) msg;
1034 #if DEBUG_DATASTORE
1035   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1036               "Received result %llu with type %u and size %u with key %s\n",
1037               (unsigned long long) GNUNET_ntohll(dm->uid),
1038               ntohl(dm->type),
1039               ntohl(dm->size),
1040               GNUNET_h2s(&dm->key));
1041 #endif
1042   rc->iter (rc->iter_cls,
1043             &dm->key,
1044             ntohl(dm->size),
1045             &dm[1],
1046             ntohl(dm->type),
1047             ntohl(dm->priority),
1048             ntohl(dm->anonymity),
1049             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1050             GNUNET_ntohll(dm->uid));
1051   if (rc->get_next == GNUNET_YES)
1052     GNUNET_CLIENT_receive (h->client,
1053                            qe->response_proc,
1054                            qe,
1055                            GNUNET_TIME_absolute_get_remaining (qe->timeout));
1056 }
1057
1058
1059 /**
1060  * Get a random value from the datastore.
1061  *
1062  * @param h handle to the datastore
1063  * @param queue_priority ranking of this request in the priority queue
1064  * @param max_queue_size at what queue size should this request be dropped
1065  *        (if other requests of higher priority are in the queue)
1066  * @param timeout how long to wait at most for a response
1067  * @param iter function to call on a random value; it
1068  *        will be called once with a value (if available)
1069  *        and always once with a value of NULL.
1070  * @param iter_cls closure for iter
1071  */
1072 void
1073 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1074                              unsigned int queue_priority,
1075                              unsigned int max_queue_size,
1076                              struct GNUNET_TIME_Relative timeout,
1077                              GNUNET_DATASTORE_Iterator iter, 
1078                              void *iter_cls)
1079 {
1080   struct QueueEntry *qe;
1081   struct GNUNET_MessageHeader *m;
1082   struct ResultContext *rcont;
1083
1084 #if DEBUG_DATASTORE
1085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086               "Asked to get random entry in %llu ms\n",
1087               (unsigned long long) timeout.value);
1088 #endif
1089   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1090   rcont->iter = iter;
1091   rcont->iter_cls = iter_cls;
1092   rcont->get_next = GNUNET_YES;
1093   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1094                          queue_priority, max_queue_size, timeout,
1095                          NULL, NULL, &process_result_message, rcont);
1096   if (qe == NULL)
1097     return;
1098   m = (struct GNUNET_MessageHeader*) &qe[1];
1099   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1100   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1101   process_queue (h);
1102 }
1103
1104
1105
1106 /**
1107  * Iterate over the results for a particular key
1108  * in the datastore.  The iterator will only be called
1109  * once initially; if the first call did contain a
1110  * result, further results can be obtained by calling
1111  * "GNUNET_DATASTORE_get_next" with the given argument.
1112  *
1113  * @param h handle to the datastore
1114  * @param key maybe NULL (to match all entries)
1115  * @param type desired type, 0 for any
1116  * @param queue_priority ranking of this request in the priority queue
1117  * @param max_queue_size at what queue size should this request be dropped
1118  *        (if other requests of higher priority are in the queue)
1119  * @param timeout how long to wait at most for a response
1120  * @param iter function to call on each matching value;
1121  *        will be called once with a NULL value at the end
1122  * @param iter_cls closure for iter
1123  */
1124 void
1125 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1126                       const GNUNET_HashCode * key,
1127                       enum GNUNET_BLOCK_Type type,
1128                       unsigned int queue_priority,
1129                       unsigned int max_queue_size,
1130                       struct GNUNET_TIME_Relative timeout,
1131                       GNUNET_DATASTORE_Iterator iter, 
1132                       void *iter_cls)
1133 {
1134   struct QueueEntry *qe;
1135   struct GetMessage *gm;
1136   struct ResultContext *rcont;
1137
1138 #if DEBUG_DATASTORE
1139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140               "Asked to look for data of type %u under key `%s'\n",
1141               (unsigned int) type,
1142               GNUNET_h2s (key));
1143 #endif
1144   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1145   rcont->iter = iter;
1146   rcont->iter_cls = iter_cls;
1147   rcont->get_next = GNUNET_NO;
1148   qe = make_queue_entry (h, sizeof(struct GetMessage),
1149                          queue_priority, max_queue_size, timeout,
1150                          NULL, NULL, &process_result_message, rcont);
1151   if (qe == NULL)
1152     return;
1153   gm = (struct GetMessage*) &qe[1];
1154   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1155   gm->type = htonl(type);
1156   if (key != NULL)
1157     {
1158       gm->header.size = htons(sizeof (struct GetMessage));
1159       gm->key = *key;
1160     }
1161   else
1162     {
1163       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1164     }
1165   process_queue (h);
1166 }
1167
1168
1169 /**
1170  * Function called to trigger obtaining the next result
1171  * from the datastore.
1172  * 
1173  * @param h handle to the datastore
1174  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1175  *        iteration (with a final call to "iter" with key/data == NULL).
1176  */
1177 void 
1178 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1179                            int more)
1180 {
1181   struct QueueEntry *qe = h->queue_head;
1182   struct ResultContext *rc = qe->client_ctx;
1183
1184   GNUNET_assert (NULL != qe);
1185   GNUNET_assert (&process_result_message == qe->response_proc);
1186   GNUNET_assert (rc->get_next == GNUNET_NO);
1187   if (GNUNET_YES == more)
1188     {     
1189       GNUNET_CLIENT_receive (h->client,
1190                              qe->response_proc,
1191                              qe,
1192                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1193       return;
1194     }
1195   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1196                                h->queue_tail,
1197                                qe);
1198   GNUNET_free (qe);
1199   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1200   do_disconnect (h);
1201   rc->iter (rc->iter_cls,
1202             NULL, 0, NULL, 0, 0, 0, 
1203             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1204   GNUNET_free (rc);
1205 }
1206
1207
1208 /* end of datastore_api.c */