do not use default services for things that ARM will start automatically
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33 /**
34  * Entry in our priority queue.
35  */
36 struct QueueEntry
37 {
38
39   /**
40    * This is a linked list.
41    */
42   struct QueueEntry *next;
43
44   /**
45    * This is a linked list.
46    */
47   struct QueueEntry *prev;
48
49   /**
50    * Handle to the master context.
51    */
52   struct GNUNET_DATASTORE_Handle *h;
53
54   /**
55    * Response processor (NULL if we are not waiting for a response).
56    * This struct should be used for the closure, function-specific
57    * arguments can be passed via 'client_ctx'.
58    */
59   GNUNET_CLIENT_MessageHandler response_proc;
60   
61   /**
62    * Specific context (variable argument that
63    * can be used by the response processor).
64    */
65   void *client_ctx;
66
67   /**
68    * Function to call after transmission of the request.
69    */
70   GNUNET_DATASTORE_ContinuationWithStatus contX;
71    
72   /**
73    * Closure for 'cont'.
74    */
75   void *cont_clsX;
76
77   /**
78    * Task for timeout signalling.
79    */
80   GNUNET_SCHEDULER_TaskIdentifier task;
81
82   /**
83    * Timeout for the current operation.
84    */
85   struct GNUNET_TIME_Absolute timeout;
86
87   /**
88    * Priority in the queue.
89    */
90   unsigned int priority;
91
92   /**
93    * Maximum allowed length of queue (otherwise
94    * this request should be discarded).
95    */
96   unsigned int max_queue;
97
98   /**
99    * Number of bytes in the request message following
100    * this struct.  32-bit value for nicer memory
101    * access (and overall struct alignment).
102    */
103   uint32_t message_size;
104
105   /**
106    * Has this message been transmitted to the service?
107    * Only ever GNUNET_YES for the head of the queue.
108    * Note that the overall struct should end at a 
109    * multiple of 64 bits.
110    */
111   int32_t was_transmitted;
112
113 };
114
115 /**
116  * Handle to the datastore service. 
117  */
118 struct GNUNET_DATASTORE_Handle
119 {
120
121   /**
122    * Our configuration.
123    */
124   const struct GNUNET_CONFIGURATION_Handle *cfg;
125
126   /**
127    * Our scheduler.
128    */
129   struct GNUNET_SCHEDULER_Handle *sched;
130
131   /**
132    * Current connection to the datastore service.
133    */
134   struct GNUNET_CLIENT_Connection *client;
135
136   /**
137    * Current transmit handle.
138    */
139   struct GNUNET_CLIENT_TransmitHandle *th;
140
141   /**
142    * Current head of priority queue.
143    */
144   struct QueueEntry *queue_head;
145
146   /**
147    * Current tail of priority queue.
148    */
149   struct QueueEntry *queue_tail;
150
151   /**
152    * Task for trying to reconnect.
153    */
154   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
155
156   /**
157    * How quickly should we retry?  Used for exponential back-off on
158    * connect-errors.
159    */
160   struct GNUNET_TIME_Relative retry_time;
161
162   /**
163    * Number of entries in the queue.
164    */
165   unsigned int queue_size;
166
167 };
168
169
170
171 /**
172  * Connect to the datastore service.
173  *
174  * @param cfg configuration to use
175  * @param sched scheduler to use
176  * @return handle to use to access the service
177  */
178 struct GNUNET_DATASTORE_Handle *
179 GNUNET_DATASTORE_connect (const struct
180                           GNUNET_CONFIGURATION_Handle
181                           *cfg,
182                           struct
183                           GNUNET_SCHEDULER_Handle
184                           *sched)
185 {
186   struct GNUNET_CLIENT_Connection *c;
187   struct GNUNET_DATASTORE_Handle *h;
188   
189   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
190   if (c == NULL)
191     return NULL; /* oops */
192   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
193                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
194   h->client = c;
195   h->cfg = cfg;
196   h->sched = sched;
197   return h;
198 }
199
200
201 /**
202  * Transmit DROP message to datastore service.
203  *
204  * @param cls the 'struct GNUNET_DATASTORE_Handle'
205  * @param size number of bytes that can be copied to buf
206  * @param buf where to copy the drop message
207  * @return number of bytes written to buf
208  */
209 static size_t
210 transmit_drop (void *cls,
211                size_t size, 
212                void *buf)
213 {
214   struct GNUNET_DATASTORE_Handle *h = cls;
215   struct GNUNET_MessageHeader *hdr;
216   
217   if (buf == NULL)
218     {
219       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220                   _("Failed to transmit request to drop database.\n"));
221       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
222       return 0;
223     }
224   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
225   hdr = buf;
226   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
227   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
228   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
229   return sizeof(struct GNUNET_MessageHeader);
230 }
231
232
233 /**
234  * Disconnect from the datastore service (and free
235  * associated resources).
236  *
237  * @param h handle to the datastore
238  * @param drop set to GNUNET_YES to delete all data in datastore (!)
239  */
240 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
241                                   int drop)
242 {
243   struct QueueEntry *qe;
244
245   if (h->client != NULL)
246     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
247   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
248     GNUNET_SCHEDULER_cancel (h->sched,
249                              h->reconnect_task);
250   h->client = NULL;
251   while (NULL != (qe = h->queue_head))
252     {
253       GNUNET_CONTAINER_DLL_remove (h->queue_head,
254                                    h->queue_tail,
255                                    qe);
256       if (NULL != qe->response_proc)
257         qe->response_proc (qe, NULL);
258       GNUNET_free (qe);
259     }
260   if (GNUNET_YES == drop) 
261     {
262       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
263       if (h->client != NULL)
264         {
265           if (NULL != 
266               GNUNET_CLIENT_notify_transmit_ready (h->client,
267                                                    sizeof(struct GNUNET_MessageHeader),
268                                                    GNUNET_TIME_UNIT_MINUTES,
269                                                    GNUNET_YES,
270                                                    &transmit_drop,
271                                                    h))
272             return;
273           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
274         }
275       GNUNET_break (0);
276     }
277   GNUNET_free (h);
278 }
279
280
281 /**
282  * A request has timed out (before being transmitted to the service).
283  *
284  * @param cls the 'struct QueueEntry'
285  * @param tc scheduler context
286  */
287 static void
288 timeout_queue_entry (void *cls,
289                      const struct GNUNET_SCHEDULER_TaskContext *tc)
290 {
291   struct QueueEntry *qe = cls;
292   struct GNUNET_DATASTORE_Handle *h = qe->h;
293
294   qe->task = GNUNET_SCHEDULER_NO_TASK;
295   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
296   GNUNET_CONTAINER_DLL_remove (h->queue_head,
297                                h->queue_tail,
298                                qe);
299   if (qe->response_proc != NULL)
300     qe->response_proc (qe, NULL);
301   GNUNET_free (qe);
302 }
303
304
305 /**
306  * Create a new entry for our priority queue (and possibly discard other entires if
307  * the queue is getting too long).
308  *
309  * @param h handle to the datastore
310  * @param msize size of the message to queue
311  * @param queue_priority priority of the entry
312  * @param max_queue_size at what queue size should this request be dropped
313  *        (if other requests of higher priority are in the queue)
314  * @param timeout timeout for the operation
315  * @param response_proc function to call with replies (can be NULL)
316  * @param client_ctx client context (NOT a closure for response_proc)
317  * @return NULL if the queue is full (and this entry was dropped)
318  */
319 static struct QueueEntry *
320 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
321                   size_t msize,
322                   unsigned int queue_priority,
323                   unsigned int max_queue_size,
324                   struct GNUNET_TIME_Relative timeout,
325                   GNUNET_CLIENT_MessageHandler response_proc,            
326                   void *client_ctx)
327 {
328   struct QueueEntry *ret;
329   struct QueueEntry *pos;
330   unsigned int c;
331
332   c = 0;
333   pos = h->queue_head;
334   while ( (pos != NULL) &&
335           (c < max_queue_size) &&
336           (pos->priority >= queue_priority) )
337     {
338       c++;
339       pos = pos->next;
340     }
341   if (c >= max_queue_size)
342     return NULL;
343   if (pos == NULL)
344     {
345       /* append at the tail */
346       pos = h->queue_tail;
347     }
348   else
349     {
350       pos = pos->prev; 
351       /* do not insert at HEAD if HEAD query was already
352          transmitted and we are still receiving replies! */
353       if ( (pos == NULL) &&
354            (h->queue_head->was_transmitted) )
355         pos = h->queue_head;
356     }
357   ret = GNUNET_malloc (sizeof (struct QueueEntry) + msize);
358   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
359                                      h->queue_tail,
360                                      pos,
361                                      ret);
362   ret->h = h;
363   ret->response_proc = response_proc;
364   ret->client_ctx = client_ctx;
365   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
366                                             timeout,
367                                             &timeout_queue_entry,
368                                             ret);
369   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
370   ret->priority = queue_priority;
371   ret->max_queue = max_queue_size;
372   ret->message_size = msize;
373   ret->was_transmitted = GNUNET_NO;
374   h->queue_size++;
375   c++;
376   pos = ret->next;
377   while (pos != NULL) 
378     {
379       if (pos->max_queue < h->queue_size)
380         {
381           GNUNET_CONTAINER_DLL_remove (h->queue_head,
382                                        h->queue_tail,
383                                        pos);
384           GNUNET_SCHEDULER_cancel (h->sched,
385                                    pos->task);
386           if (pos->response_proc != NULL)
387             pos->response_proc (pos, NULL);
388           GNUNET_free (pos);
389           h->queue_size--;
390           break;
391         }
392       pos = pos->next;
393     }
394   return ret;
395 }
396
397
398 /**
399  * Process entries in the queue (or do nothing if we are already
400  * doing so).
401  * 
402  * @param h handle to the datastore
403  */
404 static void
405 process_queue (struct GNUNET_DATASTORE_Handle *h);
406
407
408 /**
409  * Try reconnecting to the datastore service.
410  *
411  * @param cls the 'struct GNUNET_DATASTORE_Handle'
412  * @param tc scheduler context
413  */
414 static void
415 try_reconnect (void *cls,
416                const struct GNUNET_SCHEDULER_TaskContext *tc)
417 {
418   struct GNUNET_DATASTORE_Handle *h = cls;
419
420   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
421     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
422   else
423     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
424   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
425     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
426   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
427   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
428   if (h->client == NULL)
429     return;
430   process_queue (h);
431 }
432
433
434 /**
435  * Disconnect from the service and then try reconnecting to the datastore service
436  * after some delay.
437  *
438  * @param cls the 'struct GNUNET_DATASTORE_Handle'
439  * @param tc scheduler context
440  */
441 static void
442 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
443 {
444   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
445   h->client = NULL;
446   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
447                                                     h->retry_time,
448                                                     &try_reconnect,
449                                                     h);      
450 }
451
452
453 /**
454  * Transmit request from queue to datastore service.
455  *
456  * @param cls the 'struct GNUNET_DATASTORE_Handle'
457  * @param size number of bytes that can be copied to buf
458  * @param buf where to copy the drop message
459  * @return number of bytes written to buf
460  */
461 static size_t
462 transmit_request (void *cls,
463                   size_t size, 
464                   void *buf)
465 {
466   struct GNUNET_DATASTORE_Handle *h = cls;
467   struct QueueEntry *qe;
468   size_t msize;
469
470   h->th = NULL;
471   if (NULL == (qe = h->queue_head))
472     return 0; /* no entry in queue */
473   if (buf == NULL)
474     {
475       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
476                   _("Failed to transmit request to database.\n"));
477       do_disconnect (h);
478       return 0;
479     }
480   if (size < (msize = qe->message_size))
481     {
482       process_queue (h);
483       return 0;
484     }
485   memcpy (buf, &qe[1], msize);
486   qe->was_transmitted = GNUNET_YES;
487   GNUNET_SCHEDULER_cancel (h->sched,
488                            qe->task);
489   qe->task = GNUNET_SCHEDULER_NO_TASK;
490   GNUNET_CLIENT_receive (h->client,
491                          qe->response_proc,
492                          qe,
493                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
494   return msize;
495 }
496
497
498 /**
499  * Process entries in the queue (or do nothing if we are already
500  * doing so).
501  * 
502  * @param h handle to the datastore
503  */
504 static void
505 process_queue (struct GNUNET_DATASTORE_Handle *h)
506 {
507   struct QueueEntry *qe;
508
509   if (NULL == (qe = h->queue_head))
510     return; /* no entry in queue */
511   if (qe->was_transmitted == GNUNET_YES)
512     return; /* waiting for replies */
513   if (h->th != NULL)
514     return; /* request pending */
515   if (h->client == NULL)
516     return; /* waiting for reconnect */
517   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
518                                                qe->message_size,
519                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
520                                                GNUNET_YES,
521                                                &transmit_request,
522                                                h);
523 }
524
525
526
527
528 /**
529  * Context for processing status messages.
530  */
531 struct StatusContext
532 {
533   /**
534    * Continuation to call with the status.
535    */
536   GNUNET_DATASTORE_ContinuationWithStatus cont;
537
538   /**
539    * Closure for cont.
540    */
541   void *cont_cls;
542
543 };
544
545
546 /**
547  * Dummy continuation used to do nothing (but be non-zero).
548  *
549  * @param cls closure
550  * @param result result 
551  * @param emsg error message
552  */
553 static void
554 drop_status_cont (void *cls, int result, const char *emsg)
555 {
556   /* do nothing */
557 }
558
559
560 /**
561  * Type of a function to call when we receive a message
562  * from the service.
563  *
564  * @param cls closure
565  * @param msg message received, NULL on timeout or fatal error
566  */
567 static void 
568 process_status_message (void *cls,
569                         const struct
570                         GNUNET_MessageHeader * msg)
571 {
572   struct QueueEntry *qe = cls;
573   struct GNUNET_DATASTORE_Handle *h = qe->h;
574   struct StatusContext *rc = qe->client_ctx;
575   const struct StatusMessage *sm;
576   const char *emsg;
577   int32_t status;
578
579   GNUNET_CONTAINER_DLL_remove (h->queue_head,
580                                h->queue_tail,
581                                qe);
582   GNUNET_free (qe);
583   if (msg == NULL)
584     {
585       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
586                   _("Failed to receive response from database.\n"));
587       do_disconnect (h);
588       return;
589     }
590
591   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
592        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
593     {
594       GNUNET_break (0);
595       h->retry_time = GNUNET_TIME_UNIT_ZERO;
596       do_disconnect (h);
597       rc->cont (rc->cont_cls, 
598                 GNUNET_SYSERR,
599                 _("Error reading response from datastore service"));
600       GNUNET_free (rc);
601       return;
602     }
603   sm = (const struct StatusMessage*) msg;
604   status = ntohl(sm->status);
605   emsg = NULL;
606   if (ntohs(msg->size) > sizeof(struct StatusMessage))
607     {
608       emsg = (const char*) &sm[1];
609       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
610         {
611           GNUNET_break (0);
612           emsg = _("Invalid error message received from datastore service");
613         }
614     }  
615   if ( (status == GNUNET_SYSERR) &&
616        (emsg == NULL) )
617     {
618       GNUNET_break (0);
619       emsg = _("Invalid error message received from datastore service");
620     }
621 #if DEBUG_DATASTORE
622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623               "Received status %d/%s\n",
624               (int) status,
625               emsg);
626 #endif
627   rc->cont (rc->cont_cls, 
628             status,
629             emsg);
630   GNUNET_free (rc);  
631   process_queue (h);
632 }
633
634
635 /**
636  * Store an item in the datastore.  If the item is already present,
637  * the priorities are summed up and the higher expiration time and
638  * lower anonymity level is used.
639  *
640  * @param h handle to the datastore
641  * @param rid reservation ID to use (from "reserve"); use 0 if no
642  *            prior reservation was made
643  * @param key key for the value
644  * @param size number of bytes in data
645  * @param data content stored
646  * @param type type of the content
647  * @param priority priority of the content
648  * @param anonymity anonymity-level for the content
649  * @param expiration expiration time for the content
650  * @param queue_priority ranking of this request in the priority queue
651  * @param max_queue_size at what queue size should this request be dropped
652  *        (if other requests of higher priority are in the queue)
653  * @param timeout timeout for the operation
654  * @param cont continuation to call when done
655  * @param cont_cls closure for cont
656  */
657 void
658 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
659                       int rid,
660                       const GNUNET_HashCode * key,
661                       uint32_t size,
662                       const void *data,
663                       enum GNUNET_BLOCK_Type type,
664                       uint32_t priority,
665                       uint32_t anonymity,
666                       struct GNUNET_TIME_Absolute expiration,
667                       unsigned int queue_priority,
668                       unsigned int max_queue_size,
669                       struct GNUNET_TIME_Relative timeout,
670                       GNUNET_DATASTORE_ContinuationWithStatus cont,
671                       void *cont_cls)
672 {
673   struct StatusContext *scont;
674   struct QueueEntry *qe;
675   struct DataMessage *dm;
676   size_t msize;
677
678 #if DEBUG_DATASTORE
679   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
680               "Asked to put %u bytes of data under key `%s'\n",
681               size,
682               GNUNET_h2s (key));
683 #endif
684   msize = sizeof(struct DataMessage) + size;
685   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
686   scont = GNUNET_malloc (sizeof (struct StatusContext));
687   scont->cont = cont;
688   scont->cont_cls = cont_cls;
689   qe = make_queue_entry (h, msize,
690                          queue_priority, max_queue_size, timeout,
691                          &process_status_message, scont);
692   if (qe == NULL)
693     return;
694   dm = (struct DataMessage* ) &qe[1];
695   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
696   dm->header.size = htons(msize);
697   dm->rid = htonl(rid);
698   dm->size = htonl(size);
699   dm->type = htonl(type);
700   dm->priority = htonl(priority);
701   dm->anonymity = htonl(anonymity);
702   dm->uid = GNUNET_htonll(0);
703   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
704   dm->key = *key;
705   memcpy (&dm[1], data, size);
706   process_queue (h);
707 }
708
709
710 /**
711  * Reserve space in the datastore.  This function should be used
712  * to avoid "out of space" failures during a longer sequence of "put"
713  * operations (for example, when a file is being inserted).
714  *
715  * @param h handle to the datastore
716  * @param amount how much space (in bytes) should be reserved (for content only)
717  * @param entries how many entries will be created (to calculate per-entry overhead)
718  * @param queue_priority ranking of this request in the priority queue
719  * @param max_queue_size at what queue size should this request be dropped
720  *        (if other requests of higher priority are in the queue)
721  * @param timeout how long to wait at most for a response (or before dying in queue)
722  * @param cont continuation to call when done; "success" will be set to
723  *             a positive reservation value if space could be reserved.
724  * @param cont_cls closure for cont
725  */
726 void
727 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
728                           uint64_t amount,
729                           uint32_t entries,
730                           unsigned int queue_priority,
731                           unsigned int max_queue_size,
732                           struct GNUNET_TIME_Relative timeout,
733                           GNUNET_DATASTORE_ContinuationWithStatus cont,
734                           void *cont_cls)
735 {
736   struct QueueEntry *qe;
737   struct ReserveMessage *rm;
738   struct StatusContext *scont;
739
740   if (cont == NULL)
741     cont = &drop_status_cont;
742 #if DEBUG_DATASTORE
743   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
744               "Asked to reserve %llu bytes of data and %u entries'\n",
745               (unsigned long long) amount,
746               (unsigned int) entries);
747 #endif
748   scont = GNUNET_malloc (sizeof (struct StatusContext));
749   scont->cont = cont;
750   scont->cont_cls = cont_cls;
751   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
752                          queue_priority, max_queue_size, timeout,
753                          &process_status_message, scont);
754   if (qe == NULL)
755     return;
756   rm = (struct ReserveMessage*) &qe[1];
757   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
758   rm->header.size = htons(sizeof (struct ReserveMessage));
759   rm->entries = htonl(entries);
760   rm->amount = GNUNET_htonll(amount);
761   process_queue (h);
762 }
763
764
765 /**
766  * Signal that all of the data for which a reservation was made has
767  * been stored and that whatever excess space might have been reserved
768  * can now be released.
769  *
770  * @param h handle to the datastore
771  * @param rid reservation ID (value of "success" in original continuation
772  *        from the "reserve" function).
773  * @param queue_priority ranking of this request in the priority queue
774  * @param max_queue_size at what queue size should this request be dropped
775  *        (if other requests of higher priority are in the queue)
776  * @param queue_priority ranking of this request in the priority queue
777  * @param max_queue_size at what queue size should this request be dropped
778  *        (if other requests of higher priority are in the queue)
779  * @param timeout how long to wait at most for a response
780  * @param cont continuation to call when done
781  * @param cont_cls closure for cont
782  */
783 void
784 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
785                                   int rid,
786                                   unsigned int queue_priority,
787                                   unsigned int max_queue_size,
788                                   struct GNUNET_TIME_Relative timeout,
789                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
790                                   void *cont_cls)
791 {
792   struct QueueEntry *qe;
793   struct ReleaseReserveMessage *rrm;
794   struct StatusContext *scont;
795
796   if (cont == NULL)
797     cont = &drop_status_cont;
798 #if DEBUG_DATASTORE
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Asked to release reserve %d\n",
801               rid);
802 #endif
803   scont = GNUNET_malloc (sizeof (struct StatusContext));
804   scont->cont = cont;
805   scont->cont_cls = cont_cls;
806   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
807                          queue_priority, max_queue_size, timeout,
808                          &process_status_message, scont);
809   if (qe == NULL)
810     return;
811   rrm = (struct ReleaseReserveMessage*) &qe[1];
812   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
813   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
814   rrm->rid = htonl(rid);
815   process_queue (h);
816 }
817
818
819 /**
820  * Update a value in the datastore.
821  *
822  * @param h handle to the datastore
823  * @param uid identifier for the value
824  * @param priority how much to increase the priority of the value
825  * @param expiration new expiration value should be MAX of existing and this argument
826  * @param queue_priority ranking of this request in the priority queue
827  * @param max_queue_size at what queue size should this request be dropped
828  *        (if other requests of higher priority are in the queue)
829  * @param timeout how long to wait at most for a response
830  * @param cont continuation to call when done
831  * @param cont_cls closure for cont
832  */
833 void
834 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
835                          unsigned long long uid,
836                          uint32_t priority,
837                          struct GNUNET_TIME_Absolute expiration,
838                          unsigned int queue_priority,
839                          unsigned int max_queue_size,
840                          struct GNUNET_TIME_Relative timeout,
841                          GNUNET_DATASTORE_ContinuationWithStatus cont,
842                          void *cont_cls)
843 {
844   struct QueueEntry *qe;
845   struct UpdateMessage *um;
846   struct StatusContext *scont;
847
848   if (cont == NULL)
849     cont = &drop_status_cont;
850 #if DEBUG_DATASTORE
851   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
852               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
853               uid,
854               (unsigned int) priority,
855               (unsigned long long) expiration.value);
856 #endif
857   scont = GNUNET_malloc (sizeof (struct StatusContext));
858   scont->cont = cont;
859   scont->cont_cls = cont_cls;
860   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
861                          queue_priority, max_queue_size, timeout,
862                          &process_status_message, scont);
863   if (qe == NULL)
864     return;
865   um = (struct UpdateMessage*) &qe[1];
866   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
867   um->header.size = htons(sizeof (struct UpdateMessage));
868   um->priority = htonl(priority);
869   um->expiration = GNUNET_TIME_absolute_hton(expiration);
870   um->uid = GNUNET_htonll(uid);
871   process_queue (h);
872 }
873
874
875 /**
876  * Explicitly remove some content from the database.
877  * The "cont"inuation will be called with status
878  * "GNUNET_OK" if content was removed, "GNUNET_NO"
879  * if no matching entry was found and "GNUNET_SYSERR"
880  * on all other types of errors.
881  *
882  * @param h handle to the datastore
883  * @param key key for the value
884  * @param size number of bytes in data
885  * @param data content stored
886  * @param queue_priority ranking of this request in the priority queue
887  * @param max_queue_size at what queue size should this request be dropped
888  *        (if other requests of higher priority are in the queue)
889  * @param timeout how long to wait at most for a response
890  * @param cont continuation to call when done
891  * @param cont_cls closure for cont
892  */
893 void
894 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
895                          const GNUNET_HashCode *key,
896                          uint32_t size, 
897                          const void *data,
898                          unsigned int queue_priority,
899                          unsigned int max_queue_size,
900                          struct GNUNET_TIME_Relative timeout,
901                          GNUNET_DATASTORE_ContinuationWithStatus cont,
902                          void *cont_cls)
903 {
904   struct QueueEntry *qe;
905   struct DataMessage *dm;
906   size_t msize;
907   struct StatusContext *scont;
908
909   if (cont == NULL)
910     cont = &drop_status_cont;
911 #if DEBUG_DATASTORE
912   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913               "Asked to remove %u bytes under key `%s'\n",
914               size,
915               GNUNET_h2s (key));
916 #endif
917   scont = GNUNET_malloc (sizeof (struct StatusContext));
918   scont->cont = cont;
919   scont->cont_cls = cont_cls;
920   msize = sizeof(struct DataMessage) + size;
921   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
922   qe = make_queue_entry (h, msize,
923                          queue_priority, max_queue_size, timeout,
924                          &process_status_message, scont);
925   if (qe == NULL)
926     return;
927   dm = (struct DataMessage*) &qe[1];
928   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
929   dm->header.size = htons(msize);
930   dm->rid = htonl(0);
931   dm->size = htonl(size);
932   dm->type = htonl(0);
933   dm->priority = htonl(0);
934   dm->anonymity = htonl(0);
935   dm->uid = GNUNET_htonll(0);
936   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
937   dm->key = *key;
938   memcpy (&dm[1], data, size);
939   process_queue (h);
940 }
941
942
943
944 /**
945  * Context for processing result messages.
946  */
947 struct ResultContext
948 {
949   /**
950    * Iterator to call with the result.
951    */
952   GNUNET_DATASTORE_Iterator iter;
953
954   /**
955    * Closure for iter.
956    */
957   void *iter_cls;
958
959 };
960
961
962 /**
963  * Type of a function to call when we receive a message
964  * from the service.
965  *
966  * @param cls closure
967  * @param msg message received, NULL on timeout or fatal error
968  */
969 static void 
970 process_result_message (void *cls,
971                         const struct GNUNET_MessageHeader * msg)
972 {
973   struct QueueEntry *qe = cls;
974   struct GNUNET_DATASTORE_Handle *h = qe->h;
975   struct ResultContext *rc = qe->client_ctx;
976   const struct DataMessage *dm;
977
978   if (msg == NULL)
979     {
980 #if DEBUG_DATASTORE
981       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
982                   _("Failed to receive response from datastore\n"));
983 #endif
984       GNUNET_CONTAINER_DLL_remove (h->queue_head,
985                                    h->queue_tail,
986                                    qe);
987       GNUNET_free (qe);
988       do_disconnect (h);
989       rc->iter (rc->iter_cls,
990                 NULL, 0, NULL, 0, 0, 0, 
991                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
992       GNUNET_free (rc);
993       return;
994     }
995   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
996     {
997       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
998 #if DEBUG_DATASTORE
999       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1000                   "Received end of result set\n");
1001 #endif
1002       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1003                                    h->queue_tail,
1004                                    qe);
1005       GNUNET_free (qe);
1006       rc->iter (rc->iter_cls,
1007                 NULL, 0, NULL, 0, 0, 0, 
1008                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1009       GNUNET_free (rc);
1010       process_queue (h);
1011       return;
1012     }
1013   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1014        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1015        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1016     {
1017       GNUNET_break (0);
1018       GNUNET_CONTAINER_DLL_remove (h->queue_head,
1019                                    h->queue_tail,
1020                                    qe);
1021       GNUNET_free (qe);
1022       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1023       do_disconnect (h);
1024       rc->iter (rc->iter_cls,
1025                 NULL, 0, NULL, 0, 0, 0, 
1026                 GNUNET_TIME_UNIT_ZERO_ABS, 0);  
1027       GNUNET_free (rc);
1028       return;
1029     }
1030   dm = (const struct DataMessage*) msg;
1031 #if DEBUG_DATASTORE
1032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033               "Received result %llu with type %u and size %u with key %s\n",
1034               (unsigned long long) GNUNET_ntohll(dm->uid),
1035               ntohl(dm->type),
1036               ntohl(dm->size),
1037               GNUNET_h2s(&dm->key));
1038 #endif
1039   rc->iter (rc->iter_cls,
1040             &dm->key,
1041             ntohl(dm->size),
1042             &dm[1],
1043             ntohl(dm->type),
1044             ntohl(dm->priority),
1045             ntohl(dm->anonymity),
1046             GNUNET_TIME_absolute_ntoh(dm->expiration),  
1047             GNUNET_ntohll(dm->uid));
1048 }
1049
1050
1051 /**
1052  * Get a random value from the datastore.
1053  *
1054  * @param h handle to the datastore
1055  * @param queue_priority ranking of this request in the priority queue
1056  * @param max_queue_size at what queue size should this request be dropped
1057  *        (if other requests of higher priority are in the queue)
1058  * @param timeout how long to wait at most for a response
1059  * @param iter function to call on a random value; it
1060  *        will be called once with a value (if available)
1061  *        and always once with a value of NULL.
1062  * @param iter_cls closure for iter
1063  */
1064 void
1065 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1066                              unsigned int queue_priority,
1067                              unsigned int max_queue_size,
1068                              struct GNUNET_TIME_Relative timeout,
1069                              GNUNET_DATASTORE_Iterator iter, 
1070                              void *iter_cls)
1071 {
1072   struct QueueEntry *qe;
1073   struct GNUNET_MessageHeader *m;
1074   struct ResultContext *rcont;
1075
1076 #if DEBUG_DATASTORE
1077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1078               "Asked to get random entry in %llu ms\n",
1079               (unsigned long long) timeout.value);
1080 #endif
1081   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1082   rcont->iter = iter;
1083   rcont->iter_cls = iter_cls;
1084   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1085                          queue_priority, max_queue_size, timeout,
1086                          &process_result_message, rcont);
1087   if (qe == NULL)
1088     return;
1089   m = (struct GNUNET_MessageHeader*) &qe[1];
1090   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1091   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1092   process_queue (h);
1093 }
1094
1095
1096
1097 /**
1098  * Iterate over the results for a particular key
1099  * in the datastore.  The iterator will only be called
1100  * once initially; if the first call did contain a
1101  * result, further results can be obtained by calling
1102  * "GNUNET_DATASTORE_get_next" with the given argument.
1103  *
1104  * @param h handle to the datastore
1105  * @param key maybe NULL (to match all entries)
1106  * @param type desired type, 0 for any
1107  * @param queue_priority ranking of this request in the priority queue
1108  * @param max_queue_size at what queue size should this request be dropped
1109  *        (if other requests of higher priority are in the queue)
1110  * @param timeout how long to wait at most for a response
1111  * @param iter function to call on each matching value;
1112  *        will be called once with a NULL value at the end
1113  * @param iter_cls closure for iter
1114  */
1115 void
1116 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1117                       const GNUNET_HashCode * key,
1118                       enum GNUNET_BLOCK_Type type,
1119                       unsigned int queue_priority,
1120                       unsigned int max_queue_size,
1121                       struct GNUNET_TIME_Relative timeout,
1122                       GNUNET_DATASTORE_Iterator iter, 
1123                       void *iter_cls)
1124 {
1125   struct QueueEntry *qe;
1126   struct GetMessage *gm;
1127   struct ResultContext *rcont;
1128
1129 #if DEBUG_DATASTORE
1130   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131               "Asked to look for data of type %u under key `%s'\n",
1132               (unsigned int) type,
1133               GNUNET_h2s (key));
1134 #endif
1135   rcont = GNUNET_malloc (sizeof (struct ResultContext));
1136   rcont->iter = iter;
1137   rcont->iter_cls = iter_cls;
1138   qe = make_queue_entry (h, sizeof(struct GetMessage),
1139                          queue_priority, max_queue_size, timeout,
1140                          &process_result_message, rcont);
1141   if (qe == NULL)
1142     return;
1143   gm = (struct GetMessage*) &qe[1];
1144   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1145   gm->type = htonl(type);
1146   if (key != NULL)
1147     {
1148       gm->header.size = htons(sizeof (struct GetMessage));
1149       gm->key = *key;
1150     }
1151   else
1152     {
1153       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1154     }
1155   process_queue (h);
1156 }
1157
1158
1159 /**
1160  * Function called to trigger obtaining the next result
1161  * from the datastore.
1162  * 
1163  * @param h handle to the datastore
1164  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1165  *        iteration (with a final call to "iter" with key/data == NULL).
1166  */
1167 void 
1168 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1169                            int more)
1170 {
1171   struct QueueEntry *qe = h->queue_head;
1172   struct ResultContext *rc = qe->client_ctx;
1173
1174   GNUNET_assert (NULL != qe);
1175   GNUNET_assert (&process_result_message == qe->response_proc);
1176   if (GNUNET_YES == more)
1177     {     
1178       GNUNET_CLIENT_receive (h->client,
1179                              qe->response_proc,
1180                              qe,
1181                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1182       return;
1183     }
1184   GNUNET_CONTAINER_DLL_remove (h->queue_head,
1185                                h->queue_tail,
1186                                qe);
1187   GNUNET_free (qe);
1188   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1189   do_disconnect (h);
1190   rc->iter (rc->iter_cls,
1191             NULL, 0, NULL, 0, 0, 0, 
1192             GNUNET_TIME_UNIT_ZERO_ABS, 0);      
1193   GNUNET_free (rc);
1194 }
1195
1196
1197 /* end of datastore_api.c */