dfbdd439248315840fb9f00ccebfdb6db9d9b0c8
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217 };
218
219
220
221 /**
222  * Connect to the datastore service.
223  *
224  * @param cfg configuration to use
225  * @param sched scheduler to use
226  * @return handle to use to access the service
227  */
228 struct GNUNET_DATASTORE_Handle *
229 GNUNET_DATASTORE_connect (const struct
230                           GNUNET_CONFIGURATION_Handle
231                           *cfg,
232                           struct
233                           GNUNET_SCHEDULER_Handle
234                           *sched)
235 {
236   struct GNUNET_CLIENT_Connection *c;
237   struct GNUNET_DATASTORE_Handle *h;
238   
239   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
240   if (c == NULL)
241     return NULL; /* oops */
242   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
243                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
244   h->client = c;
245   h->cfg = cfg;
246   h->sched = sched;
247   return h;
248 }
249
250
251 /**
252  * Transmit DROP message to datastore service.
253  *
254  * @param cls the 'struct GNUNET_DATASTORE_Handle'
255  * @param size number of bytes that can be copied to buf
256  * @param buf where to copy the drop message
257  * @return number of bytes written to buf
258  */
259 static size_t
260 transmit_drop (void *cls,
261                size_t size, 
262                void *buf)
263 {
264   struct GNUNET_DATASTORE_Handle *h = cls;
265   struct GNUNET_MessageHeader *hdr;
266   
267   if (buf == NULL)
268     {
269       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270                   _("Failed to transmit request to drop database.\n"));
271       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
272       return 0;
273     }
274   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
275   hdr = buf;
276   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
277   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
278   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
279   return sizeof(struct GNUNET_MessageHeader);
280 }
281
282
283 /**
284  * Disconnect from the datastore service (and free
285  * associated resources).
286  *
287  * @param h handle to the datastore
288  * @param drop set to GNUNET_YES to delete all data in datastore (!)
289  */
290 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
291                                   int drop)
292 {
293   struct GNUNET_DATASTORE_QueueEntry *qe;
294
295   if (h->client != NULL)
296     {
297       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
298       h->client = NULL;
299     }
300   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
301     {
302       GNUNET_SCHEDULER_cancel (h->sched,
303                                h->reconnect_task);
304       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
305     }
306   while (NULL != (qe = h->queue_head))
307     {
308       GNUNET_assert (NULL != qe->response_proc);
309       qe->response_proc (qe, NULL);
310     }
311   if (GNUNET_YES == drop) 
312     {
313       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
314       if (h->client != NULL)
315         {
316           if (NULL != 
317               GNUNET_CLIENT_notify_transmit_ready (h->client,
318                                                    sizeof(struct GNUNET_MessageHeader),
319                                                    GNUNET_TIME_UNIT_MINUTES,
320                                                    GNUNET_YES,
321                                                    &transmit_drop,
322                                                    h))
323             return;
324           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325         }
326       GNUNET_break (0);
327     }
328   GNUNET_free (h);
329 }
330
331
332 /**
333  * A request has timed out (before being transmitted to the service).
334  *
335  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
336  * @param tc scheduler context
337  */
338 static void
339 timeout_queue_entry (void *cls,
340                      const struct GNUNET_SCHEDULER_TaskContext *tc)
341 {
342   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
343
344   qe->task = GNUNET_SCHEDULER_NO_TASK;
345   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
346   qe->response_proc (qe, NULL);
347 }
348
349
350 /**
351  * Create a new entry for our priority queue (and possibly discard other entires if
352  * the queue is getting too long).
353  *
354  * @param h handle to the datastore
355  * @param msize size of the message to queue
356  * @param queue_priority priority of the entry
357  * @param max_queue_size at what queue size should this request be dropped
358  *        (if other requests of higher priority are in the queue)
359  * @param timeout timeout for the operation
360  * @param response_proc function to call with replies (can be NULL)
361  * @param qc client context (NOT a closure for response_proc)
362  * @return NULL if the queue is full (and this entry was dropped)
363  */
364 static struct GNUNET_DATASTORE_QueueEntry *
365 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
366                   size_t msize,
367                   unsigned int queue_priority,
368                   unsigned int max_queue_size,
369                   struct GNUNET_TIME_Relative timeout,
370                   GNUNET_CLIENT_MessageHandler response_proc,            
371                   const union QueueContext *qc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *ret;
374   struct GNUNET_DATASTORE_QueueEntry *pos;
375   unsigned int c;
376
377   c = 0;
378   pos = h->queue_head;
379   while ( (pos != NULL) &&
380           (c < max_queue_size) &&
381           (pos->priority >= queue_priority) )
382     {
383       c++;
384       pos = pos->next;
385     }
386   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
387   ret->h = h;
388   ret->response_proc = response_proc;
389   ret->qc = *qc;
390   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
391   ret->priority = queue_priority;
392   ret->max_queue = max_queue_size;
393   ret->message_size = msize;
394   ret->was_transmitted = GNUNET_NO;
395   if (pos == NULL)
396     {
397       /* append at the tail */
398       pos = h->queue_tail;
399     }
400   else
401     {
402       pos = pos->prev; 
403       /* do not insert at HEAD if HEAD query was already
404          transmitted and we are still receiving replies! */
405       if ( (pos == NULL) &&
406            (h->queue_head->was_transmitted) )
407         pos = h->queue_head;
408     }
409   c++;
410   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
411                                      h->queue_tail,
412                                      pos,
413                                      ret);
414   h->queue_size++;
415   if (c > max_queue_size)
416     {
417       response_proc (ret, NULL);
418       return NULL;
419     }
420   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
421                                             timeout,
422                                             &timeout_queue_entry,
423                                             ret);
424   pos = ret->next;
425   while (pos != NULL) 
426     {
427       if (pos->max_queue < h->queue_size)
428         {
429           GNUNET_assert (pos->response_proc != NULL);
430           pos->response_proc (pos, NULL);
431           break;
432         }
433       pos = pos->next;
434     }
435   return ret;
436 }
437
438
439 /**
440  * Process entries in the queue (or do nothing if we are already
441  * doing so).
442  * 
443  * @param h handle to the datastore
444  */
445 static void
446 process_queue (struct GNUNET_DATASTORE_Handle *h);
447
448
449 /**
450  * Try reconnecting to the datastore service.
451  *
452  * @param cls the 'struct GNUNET_DATASTORE_Handle'
453  * @param tc scheduler context
454  */
455 static void
456 try_reconnect (void *cls,
457                const struct GNUNET_SCHEDULER_TaskContext *tc)
458 {
459   struct GNUNET_DATASTORE_Handle *h = cls;
460
461   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
462     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
463   else
464     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
465   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
466     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
467   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
468   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
469   if (h->client == NULL)
470     return;
471   process_queue (h);
472 }
473
474
475 /**
476  * Disconnect from the service and then try reconnecting to the datastore service
477  * after some delay.
478  *
479  * @param h handle to datastore to disconnect and reconnect
480  */
481 static void
482 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
483 {
484   if (h->client == NULL)
485     return;
486 #if 0
487   GNUNET_STATISTICS_update (stats,
488                             gettext_noop ("# reconnected to datastore"),
489                             1,
490                             GNUNET_NO);
491 #endif
492   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
493   h->client = NULL;
494   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
495                                                     h->retry_time,
496                                                     &try_reconnect,
497                                                     h);      
498 }
499
500
501 /**
502  * Transmit request from queue to datastore service.
503  *
504  * @param cls the 'struct GNUNET_DATASTORE_Handle'
505  * @param size number of bytes that can be copied to buf
506  * @param buf where to copy the drop message
507  * @return number of bytes written to buf
508  */
509 static size_t
510 transmit_request (void *cls,
511                   size_t size, 
512                   void *buf)
513 {
514   struct GNUNET_DATASTORE_Handle *h = cls;
515   struct GNUNET_DATASTORE_QueueEntry *qe;
516   size_t msize;
517
518   h->th = NULL;
519   if (NULL == (qe = h->queue_head))
520     return 0; /* no entry in queue */
521   if (buf == NULL)
522     {
523       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
524                   _("Failed to transmit request to database.\n"));
525       do_disconnect (h);
526       return 0;
527     }
528   if (size < (msize = qe->message_size))
529     {
530       process_queue (h);
531       return 0;
532     }
533   memcpy (buf, &qe[1], msize);
534   qe->was_transmitted = GNUNET_YES;
535   GNUNET_SCHEDULER_cancel (h->sched,
536                            qe->task);
537   qe->task = GNUNET_SCHEDULER_NO_TASK;
538   GNUNET_CLIENT_receive (h->client,
539                          qe->response_proc,
540                          qe,
541                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
542   return msize;
543 }
544
545
546 /**
547  * Process entries in the queue (or do nothing if we are already
548  * doing so).
549  * 
550  * @param h handle to the datastore
551  */
552 static void
553 process_queue (struct GNUNET_DATASTORE_Handle *h)
554 {
555   struct GNUNET_DATASTORE_QueueEntry *qe;
556
557   if (NULL == (qe = h->queue_head))
558     return; /* no entry in queue */
559   if (qe->was_transmitted == GNUNET_YES)
560     return; /* waiting for replies */
561   if (h->th != NULL)
562     return; /* request pending */
563   if (h->client == NULL)
564     return; /* waiting for reconnect */
565 #if DEBUG_DATASTORE
566   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
567               "Transmitting %u bytes request to datastore\n",
568               qe->message_size);
569 #endif
570   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
571                                                qe->message_size,
572                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
573                                                GNUNET_YES,
574                                                &transmit_request,
575                                                h);
576 }
577
578
579 /**
580  * Dummy continuation used to do nothing (but be non-zero).
581  *
582  * @param cls closure
583  * @param result result 
584  * @param emsg error message
585  */
586 static void
587 drop_status_cont (void *cls, int result, const char *emsg)
588 {
589   /* do nothing */
590 }
591
592
593 static void
594 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
595 {
596   struct GNUNET_DATASTORE_Handle *h = qe->h;
597
598   GNUNET_CONTAINER_DLL_remove (h->queue_head,
599                                h->queue_tail,
600                                qe);
601   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
602     {
603       GNUNET_SCHEDULER_cancel (h->sched,
604                                qe->task);
605       qe->task = GNUNET_SCHEDULER_NO_TASK;
606     }
607   h->queue_size--;
608   GNUNET_free (qe);
609 }
610
611 /**
612  * Type of a function to call when we receive a message
613  * from the service.
614  *
615  * @param cls closure
616  * @param msg message received, NULL on timeout or fatal error
617  */
618 static void 
619 process_status_message (void *cls,
620                         const struct
621                         GNUNET_MessageHeader * msg)
622 {
623   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
624   struct GNUNET_DATASTORE_Handle *h = qe->h;
625   struct StatusContext rc = qe->qc.sc;
626   const struct StatusMessage *sm;
627   const char *emsg;
628   int32_t status;
629   int was_transmitted;
630
631   was_transmitted = qe->was_transmitted;
632   if (msg == NULL)
633     {      
634       free_queue_entry (qe);
635       if (NULL == h->client)
636         return; /* forced disconnect */
637       if (was_transmitted == GNUNET_YES)
638         {
639           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
640                       _("Failed to receive response from database.\n"));
641           do_disconnect (h);
642         }
643       return;
644     }
645   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
646   GNUNET_assert (h->queue_head == qe);
647   free_queue_entry (qe);
648   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
649        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
650     {
651       GNUNET_break (0);
652       h->retry_time = GNUNET_TIME_UNIT_ZERO;
653       do_disconnect (h);
654       rc.cont (rc.cont_cls, 
655                GNUNET_SYSERR,
656                _("Error reading response from datastore service"));
657       return;
658     }
659   sm = (const struct StatusMessage*) msg;
660   status = ntohl(sm->status);
661   emsg = NULL;
662   if (ntohs(msg->size) > sizeof(struct StatusMessage))
663     {
664       emsg = (const char*) &sm[1];
665       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
666         {
667           GNUNET_break (0);
668           emsg = _("Invalid error message received from datastore service");
669         }
670     }  
671   if ( (status == GNUNET_SYSERR) &&
672        (emsg == NULL) )
673     {
674       GNUNET_break (0);
675       emsg = _("Invalid error message received from datastore service");
676     }
677 #if DEBUG_DATASTORE
678   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
679               "Received status %d/%s\n",
680               (int) status,
681               emsg);
682 #endif
683   rc.cont (rc.cont_cls, 
684            status,
685            emsg);
686   process_queue (h);
687 }
688
689
690 /**
691  * Store an item in the datastore.  If the item is already present,
692  * the priorities are summed up and the higher expiration time and
693  * lower anonymity level is used.
694  *
695  * @param h handle to the datastore
696  * @param rid reservation ID to use (from "reserve"); use 0 if no
697  *            prior reservation was made
698  * @param key key for the value
699  * @param size number of bytes in data
700  * @param data content stored
701  * @param type type of the content
702  * @param priority priority of the content
703  * @param anonymity anonymity-level for the content
704  * @param expiration expiration time for the content
705  * @param queue_priority ranking of this request in the priority queue
706  * @param max_queue_size at what queue size should this request be dropped
707  *        (if other requests of higher priority are in the queue)
708  * @param timeout timeout for the operation
709  * @param cont continuation to call when done
710  * @param cont_cls closure for cont
711  * @return NULL if the entry was not queued, otherwise a handle that can be used to
712  *         cancel; note that even if NULL is returned, the callback will be invoked
713  *         (or rather, will already have been invoked)
714  */
715 struct GNUNET_DATASTORE_QueueEntry *
716 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
717                       int rid,
718                       const GNUNET_HashCode * key,
719                       uint32_t size,
720                       const void *data,
721                       enum GNUNET_BLOCK_Type type,
722                       uint32_t priority,
723                       uint32_t anonymity,
724                       struct GNUNET_TIME_Absolute expiration,
725                       unsigned int queue_priority,
726                       unsigned int max_queue_size,
727                       struct GNUNET_TIME_Relative timeout,
728                       GNUNET_DATASTORE_ContinuationWithStatus cont,
729                       void *cont_cls)
730 {
731   struct GNUNET_DATASTORE_QueueEntry *qe;
732   struct DataMessage *dm;
733   size_t msize;
734   union QueueContext qc;
735
736 #if DEBUG_DATASTORE
737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
738               "Asked to put %u bytes of data under key `%s'\n",
739               size,
740               GNUNET_h2s (key));
741 #endif
742   msize = sizeof(struct DataMessage) + size;
743   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
744   qc.sc.cont = cont;
745   qc.sc.cont_cls = cont_cls;
746   qe = make_queue_entry (h, msize,
747                          queue_priority, max_queue_size, timeout,
748                          &process_status_message, &qc);
749   if (qe == NULL)
750     return NULL;
751   dm = (struct DataMessage* ) &qe[1];
752   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
753   dm->header.size = htons(msize);
754   dm->rid = htonl(rid);
755   dm->size = htonl(size);
756   dm->type = htonl(type);
757   dm->priority = htonl(priority);
758   dm->anonymity = htonl(anonymity);
759   dm->uid = GNUNET_htonll(0);
760   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
761   dm->key = *key;
762   memcpy (&dm[1], data, size);
763   process_queue (h);
764   return qe;
765 }
766
767
768 /**
769  * Reserve space in the datastore.  This function should be used
770  * to avoid "out of space" failures during a longer sequence of "put"
771  * operations (for example, when a file is being inserted).
772  *
773  * @param h handle to the datastore
774  * @param amount how much space (in bytes) should be reserved (for content only)
775  * @param entries how many entries will be created (to calculate per-entry overhead)
776  * @param queue_priority ranking of this request in the priority queue
777  * @param max_queue_size at what queue size should this request be dropped
778  *        (if other requests of higher priority are in the queue)
779  * @param timeout how long to wait at most for a response (or before dying in queue)
780  * @param cont continuation to call when done; "success" will be set to
781  *             a positive reservation value if space could be reserved.
782  * @param cont_cls closure for cont
783  * @return NULL if the entry was not queued, otherwise a handle that can be used to
784  *         cancel; note that even if NULL is returned, the callback will be invoked
785  *         (or rather, will already have been invoked)
786  */
787 struct GNUNET_DATASTORE_QueueEntry *
788 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
789                           uint64_t amount,
790                           uint32_t entries,
791                           unsigned int queue_priority,
792                           unsigned int max_queue_size,
793                           struct GNUNET_TIME_Relative timeout,
794                           GNUNET_DATASTORE_ContinuationWithStatus cont,
795                           void *cont_cls)
796 {
797   struct GNUNET_DATASTORE_QueueEntry *qe;
798   struct ReserveMessage *rm;
799   union QueueContext qc;
800
801   if (cont == NULL)
802     cont = &drop_status_cont;
803 #if DEBUG_DATASTORE
804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805               "Asked to reserve %llu bytes of data and %u entries'\n",
806               (unsigned long long) amount,
807               (unsigned int) entries);
808 #endif
809   qc.sc.cont = cont;
810   qc.sc.cont_cls = cont_cls;
811   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
812                          queue_priority, max_queue_size, timeout,
813                          &process_status_message, &qc);
814   if (qe == NULL)
815     return NULL;
816   rm = (struct ReserveMessage*) &qe[1];
817   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
818   rm->header.size = htons(sizeof (struct ReserveMessage));
819   rm->entries = htonl(entries);
820   rm->amount = GNUNET_htonll(amount);
821   process_queue (h);
822   return qe;
823 }
824
825
826 /**
827  * Signal that all of the data for which a reservation was made has
828  * been stored and that whatever excess space might have been reserved
829  * can now be released.
830  *
831  * @param h handle to the datastore
832  * @param rid reservation ID (value of "success" in original continuation
833  *        from the "reserve" function).
834  * @param queue_priority ranking of this request in the priority queue
835  * @param max_queue_size at what queue size should this request be dropped
836  *        (if other requests of higher priority are in the queue)
837  * @param queue_priority ranking of this request in the priority queue
838  * @param max_queue_size at what queue size should this request be dropped
839  *        (if other requests of higher priority are in the queue)
840  * @param timeout how long to wait at most for a response
841  * @param cont continuation to call when done
842  * @param cont_cls closure for cont
843  * @return NULL if the entry was not queued, otherwise a handle that can be used to
844  *         cancel; note that even if NULL is returned, the callback will be invoked
845  *         (or rather, will already have been invoked)
846  */
847 struct GNUNET_DATASTORE_QueueEntry *
848 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
849                                   int rid,
850                                   unsigned int queue_priority,
851                                   unsigned int max_queue_size,
852                                   struct GNUNET_TIME_Relative timeout,
853                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
854                                   void *cont_cls)
855 {
856   struct GNUNET_DATASTORE_QueueEntry *qe;
857   struct ReleaseReserveMessage *rrm;
858   union QueueContext qc;
859
860   if (cont == NULL)
861     cont = &drop_status_cont;
862 #if DEBUG_DATASTORE
863   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
864               "Asked to release reserve %d\n",
865               rid);
866 #endif
867   qc.sc.cont = cont;
868   qc.sc.cont_cls = cont_cls;
869   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
870                          queue_priority, max_queue_size, timeout,
871                          &process_status_message, &qc);
872   if (qe == NULL)
873     return NULL;
874   rrm = (struct ReleaseReserveMessage*) &qe[1];
875   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
876   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
877   rrm->rid = htonl(rid);
878   process_queue (h);
879   return qe;
880 }
881
882
883 /**
884  * Update a value in the datastore.
885  *
886  * @param h handle to the datastore
887  * @param uid identifier for the value
888  * @param priority how much to increase the priority of the value
889  * @param expiration new expiration value should be MAX of existing and this argument
890  * @param queue_priority ranking of this request in the priority queue
891  * @param max_queue_size at what queue size should this request be dropped
892  *        (if other requests of higher priority are in the queue)
893  * @param timeout how long to wait at most for a response
894  * @param cont continuation to call when done
895  * @param cont_cls closure for cont
896  * @return NULL if the entry was not queued, otherwise a handle that can be used to
897  *         cancel; note that even if NULL is returned, the callback will be invoked
898  *         (or rather, will already have been invoked)
899  */
900 struct GNUNET_DATASTORE_QueueEntry *
901 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
902                          unsigned long long uid,
903                          uint32_t priority,
904                          struct GNUNET_TIME_Absolute expiration,
905                          unsigned int queue_priority,
906                          unsigned int max_queue_size,
907                          struct GNUNET_TIME_Relative timeout,
908                          GNUNET_DATASTORE_ContinuationWithStatus cont,
909                          void *cont_cls)
910 {
911   struct GNUNET_DATASTORE_QueueEntry *qe;
912   struct UpdateMessage *um;
913   union QueueContext qc;
914
915   if (cont == NULL)
916     cont = &drop_status_cont;
917 #if DEBUG_DATASTORE
918   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
920               uid,
921               (unsigned int) priority,
922               (unsigned long long) expiration.value);
923 #endif
924   qc.sc.cont = cont;
925   qc.sc.cont_cls = cont_cls;
926   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
927                          queue_priority, max_queue_size, timeout,
928                          &process_status_message, &qc);
929   if (qe == NULL)
930     return NULL;
931   um = (struct UpdateMessage*) &qe[1];
932   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
933   um->header.size = htons(sizeof (struct UpdateMessage));
934   um->priority = htonl(priority);
935   um->expiration = GNUNET_TIME_absolute_hton(expiration);
936   um->uid = GNUNET_htonll(uid);
937   process_queue (h);
938   return qe;
939 }
940
941
942 /**
943  * Explicitly remove some content from the database.
944  * The "cont"inuation will be called with status
945  * "GNUNET_OK" if content was removed, "GNUNET_NO"
946  * if no matching entry was found and "GNUNET_SYSERR"
947  * on all other types of errors.
948  *
949  * @param h handle to the datastore
950  * @param key key for the value
951  * @param size number of bytes in data
952  * @param data content stored
953  * @param queue_priority ranking of this request in the priority queue
954  * @param max_queue_size at what queue size should this request be dropped
955  *        (if other requests of higher priority are in the queue)
956  * @param timeout how long to wait at most for a response
957  * @param cont continuation to call when done
958  * @param cont_cls closure for cont
959  * @return NULL if the entry was not queued, otherwise a handle that can be used to
960  *         cancel; note that even if NULL is returned, the callback will be invoked
961  *         (or rather, will already have been invoked)
962  */
963 struct GNUNET_DATASTORE_QueueEntry *
964 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
965                          const GNUNET_HashCode *key,
966                          uint32_t size, 
967                          const void *data,
968                          unsigned int queue_priority,
969                          unsigned int max_queue_size,
970                          struct GNUNET_TIME_Relative timeout,
971                          GNUNET_DATASTORE_ContinuationWithStatus cont,
972                          void *cont_cls)
973 {
974   struct GNUNET_DATASTORE_QueueEntry *qe;
975   struct DataMessage *dm;
976   size_t msize;
977   union QueueContext qc;
978
979   if (cont == NULL)
980     cont = &drop_status_cont;
981 #if DEBUG_DATASTORE
982   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
983               "Asked to remove %u bytes under key `%s'\n",
984               size,
985               GNUNET_h2s (key));
986 #endif
987   qc.sc.cont = cont;
988   qc.sc.cont_cls = cont_cls;
989   msize = sizeof(struct DataMessage) + size;
990   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
991   qe = make_queue_entry (h, msize,
992                          queue_priority, max_queue_size, timeout,
993                          &process_status_message, &qc);
994   if (qe == NULL)
995     return NULL;
996   dm = (struct DataMessage*) &qe[1];
997   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
998   dm->header.size = htons(msize);
999   dm->rid = htonl(0);
1000   dm->size = htonl(size);
1001   dm->type = htonl(0);
1002   dm->priority = htonl(0);
1003   dm->anonymity = htonl(0);
1004   dm->uid = GNUNET_htonll(0);
1005   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1006   dm->key = *key;
1007   memcpy (&dm[1], data, size);
1008   process_queue (h);
1009   return qe;
1010 }
1011
1012
1013 /**
1014  * Type of a function to call when we receive a message
1015  * from the service.
1016  *
1017  * @param cls closure
1018  * @param msg message received, NULL on timeout or fatal error
1019  */
1020 static void 
1021 process_result_message (void *cls,
1022                         const struct GNUNET_MessageHeader * msg)
1023 {
1024   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1025   struct GNUNET_DATASTORE_Handle *h = qe->h;
1026   struct ResultContext rc = qe->qc.rc;
1027   const struct DataMessage *dm;
1028   int was_transmitted;
1029
1030   if (msg == NULL)
1031     {
1032       was_transmitted = qe->was_transmitted;
1033       free_queue_entry (qe);
1034       if (was_transmitted == GNUNET_YES)
1035         {
1036           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1037                       _("Failed to receive response from database.\n"));
1038           do_disconnect (h);
1039         }
1040       if (rc.iter != NULL)
1041         rc.iter (rc.iter_cls,
1042                  NULL, 0, NULL, 0, 0, 0, 
1043                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1044       return;
1045     }
1046   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1047   GNUNET_assert (h->queue_head == qe);
1048   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1049     {
1050       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1051 #if DEBUG_DATASTORE
1052       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1053                   "Received end of result set\n");
1054 #endif
1055       free_queue_entry (qe);
1056       if (rc.iter != NULL)
1057         rc.iter (rc.iter_cls,
1058                  NULL, 0, NULL, 0, 0, 0, 
1059                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1060       process_queue (h);
1061       return;
1062     }
1063   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1064        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1065        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1066     {
1067       GNUNET_break (0);
1068       free_queue_entry (qe);
1069       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1070       do_disconnect (h);
1071       if (rc.iter != NULL)
1072         rc.iter (rc.iter_cls,
1073                  NULL, 0, NULL, 0, 0, 0, 
1074                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1075       return;
1076     }
1077   if (rc.iter == NULL)
1078     {
1079       /* abort iteration */
1080       do_disconnect (h);
1081       return;
1082     }
1083   dm = (const struct DataMessage*) msg;
1084 #if DEBUG_DATASTORE
1085   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1086               "Received result %llu with type %u and size %u with key %s\n",
1087               (unsigned long long) GNUNET_ntohll(dm->uid),
1088               ntohl(dm->type),
1089               ntohl(dm->size),
1090               GNUNET_h2s(&dm->key));
1091 #endif
1092   rc.iter (rc.iter_cls,
1093            &dm->key,
1094            ntohl(dm->size),
1095            &dm[1],
1096            ntohl(dm->type),
1097            ntohl(dm->priority),
1098            ntohl(dm->anonymity),
1099            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1100            GNUNET_ntohll(dm->uid));
1101 }
1102
1103
1104 /**
1105  * Get a random value from the datastore.
1106  *
1107  * @param h handle to the datastore
1108  * @param queue_priority ranking of this request in the priority queue
1109  * @param max_queue_size at what queue size should this request be dropped
1110  *        (if other requests of higher priority are in the queue)
1111  * @param timeout how long to wait at most for a response
1112  * @param iter function to call on a random value; it
1113  *        will be called once with a value (if available)
1114  *        and always once with a value of NULL.
1115  * @param iter_cls closure for iter
1116  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1117  *         cancel; note that even if NULL is returned, the callback will be invoked
1118  *         (or rather, will already have been invoked)
1119  */
1120 struct GNUNET_DATASTORE_QueueEntry *
1121 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1122                              unsigned int queue_priority,
1123                              unsigned int max_queue_size,
1124                              struct GNUNET_TIME_Relative timeout,
1125                              GNUNET_DATASTORE_Iterator iter, 
1126                              void *iter_cls)
1127 {
1128   struct GNUNET_DATASTORE_QueueEntry *qe;
1129   struct GNUNET_MessageHeader *m;
1130   union QueueContext qc;
1131
1132 #if DEBUG_DATASTORE
1133   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1134               "Asked to get random entry in %llu ms\n",
1135               (unsigned long long) timeout.value);
1136 #endif
1137   qc.rc.iter = iter;
1138   qc.rc.iter_cls = iter_cls;
1139   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1140                          queue_priority, max_queue_size, timeout,
1141                          &process_result_message, &qc);
1142   if (qe == NULL)
1143     return NULL;    
1144   m = (struct GNUNET_MessageHeader*) &qe[1];
1145   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1146   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1147   process_queue (h);
1148   return qe;
1149 }
1150
1151
1152
1153 /**
1154  * Iterate over the results for a particular key
1155  * in the datastore.  The iterator will only be called
1156  * once initially; if the first call did contain a
1157  * result, further results can be obtained by calling
1158  * "GNUNET_DATASTORE_get_next" with the given argument.
1159  *
1160  * @param h handle to the datastore
1161  * @param key maybe NULL (to match all entries)
1162  * @param type desired type, 0 for any
1163  * @param queue_priority ranking of this request in the priority queue
1164  * @param max_queue_size at what queue size should this request be dropped
1165  *        (if other requests of higher priority are in the queue)
1166  * @param timeout how long to wait at most for a response
1167  * @param iter function to call on each matching value;
1168  *        will be called once with a NULL value at the end
1169  * @param iter_cls closure for iter
1170  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1171  *         cancel; note that even if NULL is returned, the callback will be invoked
1172  *         (or rather, will already have been invoked)
1173  */
1174 struct GNUNET_DATASTORE_QueueEntry *
1175 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1176                       const GNUNET_HashCode * key,
1177                       enum GNUNET_BLOCK_Type type,
1178                       unsigned int queue_priority,
1179                       unsigned int max_queue_size,
1180                       struct GNUNET_TIME_Relative timeout,
1181                       GNUNET_DATASTORE_Iterator iter, 
1182                       void *iter_cls)
1183 {
1184   struct GNUNET_DATASTORE_QueueEntry *qe;
1185   struct GetMessage *gm;
1186   union QueueContext qc;
1187
1188 #if DEBUG_DATASTORE
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190               "Asked to look for data of type %u under key `%s'\n",
1191               (unsigned int) type,
1192               GNUNET_h2s (key));
1193 #endif
1194   qc.rc.iter = iter;
1195   qc.rc.iter_cls = iter_cls;
1196   qe = make_queue_entry (h, sizeof(struct GetMessage),
1197                          queue_priority, max_queue_size, timeout,
1198                          &process_result_message, &qc);
1199   if (qe == NULL)
1200     return NULL;
1201   gm = (struct GetMessage*) &qe[1];
1202   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1203   gm->type = htonl(type);
1204   if (key != NULL)
1205     {
1206       gm->header.size = htons(sizeof (struct GetMessage));
1207       gm->key = *key;
1208     }
1209   else
1210     {
1211       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1212     }
1213   process_queue (h);
1214   return qe;
1215 }
1216
1217
1218 /**
1219  * Function called to trigger obtaining the next result
1220  * from the datastore.
1221  * 
1222  * @param h handle to the datastore
1223  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1224  *        iteration (with a final call to "iter" with key/data == NULL).
1225  */
1226 void 
1227 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1228                            int more)
1229 {
1230   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1231   struct ResultContext rc = qe->qc.rc;
1232
1233   GNUNET_assert (NULL != qe);
1234   GNUNET_assert (&process_result_message == qe->response_proc);
1235   if (GNUNET_YES == more)
1236     {     
1237       GNUNET_CLIENT_receive (h->client,
1238                              qe->response_proc,
1239                              qe,
1240                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1241       return;
1242     }
1243   free_queue_entry (qe);
1244   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1245   do_disconnect (h);
1246   rc.iter (rc.iter_cls,
1247            NULL, 0, NULL, 0, 0, 0, 
1248            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1249 }
1250
1251
1252 /**
1253  * Cancel a datastore operation.  The final callback from the
1254  * operation must not have been done yet.
1255  * 
1256  * @param qe operation to cancel
1257  */
1258 void
1259 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1260 {
1261   struct GNUNET_DATASTORE_Handle *h;
1262   int reconnect;
1263
1264   h = qe->h;
1265   reconnect = GNUNET_NO;
1266   if (GNUNET_YES == qe->was_transmitted) 
1267     {
1268       if (qe->response_proc == &process_result_message) 
1269         {
1270           qe->qc.rc.iter = NULL;    
1271           GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1272           return;
1273         }
1274       reconnect = GNUNET_YES;
1275     }
1276   free_queue_entry (qe);
1277   h->queue_size--;
1278   if (reconnect)
1279     {
1280       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1281       do_disconnect (h);
1282     }
1283 }
1284
1285
1286 /* end of datastore_api.c */