fix
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     {
476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477                   "DATASTORE reconnect failed (fatally)\n");
478       return;
479     }
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Reconnected to DATASTORE\n");
483 #endif
484   process_queue (h);
485 }
486
487
488 /**
489  * Disconnect from the service and then try reconnecting to the datastore service
490  * after some delay.
491  *
492  * @param h handle to datastore to disconnect and reconnect
493  */
494 static void
495 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
496 {
497   if (h->client == NULL)
498     {
499 #if DEBUG_DATASTORE
500       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                   "client NULL in disconnect, will not try to reconnect\n");
502 #endif
503       return;
504     }
505 #if 0
506   GNUNET_STATISTICS_update (stats,
507                             gettext_noop ("# reconnected to DATASTORE"),
508                             1,
509                             GNUNET_NO);
510 #endif
511   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
512   h->client = NULL;
513   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
514                                                     h->retry_time,
515                                                     &try_reconnect,
516                                                     h);      
517 }
518
519
520 /**
521  * Transmit request from queue to datastore service.
522  *
523  * @param cls the 'struct GNUNET_DATASTORE_Handle'
524  * @param size number of bytes that can be copied to buf
525  * @param buf where to copy the drop message
526  * @return number of bytes written to buf
527  */
528 static size_t
529 transmit_request (void *cls,
530                   size_t size, 
531                   void *buf)
532 {
533   struct GNUNET_DATASTORE_Handle *h = cls;
534   struct GNUNET_DATASTORE_QueueEntry *qe;
535   size_t msize;
536
537   h->th = NULL;
538   if (NULL == (qe = h->queue_head))
539     return 0; /* no entry in queue */
540   if (buf == NULL)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
543                   _("Failed to transmit request to DATASTORE.\n"));
544       do_disconnect (h);
545       return 0;
546     }
547   if (size < (msize = qe->message_size))
548     {
549       process_queue (h);
550       return 0;
551     }
552  #if DEBUG_DATASTORE
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Transmitting %u byte request to DATASTORE\n",
555               msize);
556 #endif
557   memcpy (buf, &qe[1], msize);
558   qe->was_transmitted = GNUNET_YES;
559   GNUNET_SCHEDULER_cancel (h->sched,
560                            qe->task);
561   qe->task = GNUNET_SCHEDULER_NO_TASK;
562   h->in_receive = GNUNET_YES;
563   GNUNET_CLIENT_receive (h->client,
564                          qe->response_proc,
565                          qe,
566                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
567   return msize;
568 }
569
570
571 /**
572  * Process entries in the queue (or do nothing if we are already
573  * doing so).
574  * 
575  * @param h handle to the datastore
576  */
577 static void
578 process_queue (struct GNUNET_DATASTORE_Handle *h)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   if (NULL == (qe = h->queue_head))
583     {
584 #if DEBUG_DATASTORE > 1
585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                   "Queue empty\n");
587 #endif
588       return; /* no entry in queue */
589     }
590   if (qe->was_transmitted == GNUNET_YES)
591     {
592 #if DEBUG_DATASTORE > 1
593       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                   "Head request already transmitted\n");
595 #endif
596       return; /* waiting for replies */
597     }
598   if (h->th != NULL)
599     {
600 #if DEBUG_DATASTORE > 1
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                   "Pending transmission request\n");
603 #endif
604       return; /* request pending */
605     }
606   if (h->client == NULL)
607     {
608 #if DEBUG_DATASTORE > 1
609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610                   "Not connected\n");
611 #endif
612       return; /* waiting for reconnect */
613     }
614 #if DEBUG_DATASTORE
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Queueing %u byte request to DATASTORE\n",
617               qe->message_size);
618 #endif
619   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
620                                                qe->message_size,
621                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
622                                                GNUNET_YES,
623                                                &transmit_request,
624                                                h);
625 }
626
627
628 /**
629  * Dummy continuation used to do nothing (but be non-zero).
630  *
631  * @param cls closure
632  * @param result result 
633  * @param emsg error message
634  */
635 static void
636 drop_status_cont (void *cls, int result, const char *emsg)
637 {
638   /* do nothing */
639 }
640
641
642 static void
643 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
644 {
645   struct GNUNET_DATASTORE_Handle *h = qe->h;
646
647   GNUNET_CONTAINER_DLL_remove (h->queue_head,
648                                h->queue_tail,
649                                qe);
650   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
651     {
652       GNUNET_SCHEDULER_cancel (h->sched,
653                                qe->task);
654       qe->task = GNUNET_SCHEDULER_NO_TASK;
655     }
656   h->queue_size--;
657   GNUNET_free (qe);
658 }
659
660 /**
661  * Type of a function to call when we receive a message
662  * from the service.
663  *
664  * @param cls closure
665  * @param msg message received, NULL on timeout or fatal error
666  */
667 static void 
668 process_status_message (void *cls,
669                         const struct
670                         GNUNET_MessageHeader * msg)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
673   struct GNUNET_DATASTORE_Handle *h = qe->h;
674   struct StatusContext rc = qe->qc.sc;
675   const struct StatusMessage *sm;
676   const char *emsg;
677   int32_t status;
678   int was_transmitted;
679
680   h->in_receive = GNUNET_NO;
681   was_transmitted = qe->was_transmitted;
682   if (msg == NULL)
683     {      
684       free_queue_entry (qe);
685       if (NULL == h->client)
686         return; /* forced disconnect */
687       rc.cont (rc.cont_cls, 
688                GNUNET_SYSERR,
689                _("Failed to receive response from database."));
690       if (was_transmitted == GNUNET_YES)
691         do_disconnect (h);
692       return;
693     }
694   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
695   GNUNET_assert (h->queue_head == qe);
696   free_queue_entry (qe);
697   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
698        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
699     {
700       GNUNET_break (0);
701       h->retry_time = GNUNET_TIME_UNIT_ZERO;
702       do_disconnect (h);
703       rc.cont (rc.cont_cls, 
704                GNUNET_SYSERR,
705                _("Error reading response from datastore service"));
706       return;
707     }
708   sm = (const struct StatusMessage*) msg;
709   status = ntohl(sm->status);
710   emsg = NULL;
711   if (ntohs(msg->size) > sizeof(struct StatusMessage))
712     {
713       emsg = (const char*) &sm[1];
714       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
715         {
716           GNUNET_break (0);
717           emsg = _("Invalid error message received from datastore service");
718         }
719     }  
720   if ( (status == GNUNET_SYSERR) &&
721        (emsg == NULL) )
722     {
723       GNUNET_break (0);
724       emsg = _("Invalid error message received from datastore service");
725     }
726 #if DEBUG_DATASTORE
727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
728               "Received status %d/%s\n",
729               (int) status,
730               emsg);
731 #endif
732   rc.cont (rc.cont_cls, 
733            status,
734            emsg);
735   process_queue (h);
736 }
737
738
739 /**
740  * Store an item in the datastore.  If the item is already present,
741  * the priorities are summed up and the higher expiration time and
742  * lower anonymity level is used.
743  *
744  * @param h handle to the datastore
745  * @param rid reservation ID to use (from "reserve"); use 0 if no
746  *            prior reservation was made
747  * @param key key for the value
748  * @param size number of bytes in data
749  * @param data content stored
750  * @param type type of the content
751  * @param priority priority of the content
752  * @param anonymity anonymity-level for the content
753  * @param expiration expiration time for the content
754  * @param queue_priority ranking of this request in the priority queue
755  * @param max_queue_size at what queue size should this request be dropped
756  *        (if other requests of higher priority are in the queue)
757  * @param timeout timeout for the operation
758  * @param cont continuation to call when done
759  * @param cont_cls closure for cont
760  * @return NULL if the entry was not queued, otherwise a handle that can be used to
761  *         cancel; note that even if NULL is returned, the callback will be invoked
762  *         (or rather, will already have been invoked)
763  */
764 struct GNUNET_DATASTORE_QueueEntry *
765 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
766                       int rid,
767                       const GNUNET_HashCode * key,
768                       uint32_t size,
769                       const void *data,
770                       enum GNUNET_BLOCK_Type type,
771                       uint32_t priority,
772                       uint32_t anonymity,
773                       struct GNUNET_TIME_Absolute expiration,
774                       unsigned int queue_priority,
775                       unsigned int max_queue_size,
776                       struct GNUNET_TIME_Relative timeout,
777                       GNUNET_DATASTORE_ContinuationWithStatus cont,
778                       void *cont_cls)
779 {
780   struct GNUNET_DATASTORE_QueueEntry *qe;
781   struct DataMessage *dm;
782   size_t msize;
783   union QueueContext qc;
784
785 #if DEBUG_DATASTORE
786   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
787               "Asked to put %u bytes of data under key `%s'\n",
788               size,
789               GNUNET_h2s (key));
790 #endif
791   msize = sizeof(struct DataMessage) + size;
792   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
793   qc.sc.cont = cont;
794   qc.sc.cont_cls = cont_cls;
795   qe = make_queue_entry (h, msize,
796                          queue_priority, max_queue_size, timeout,
797                          &process_status_message, &qc);
798   if (qe == NULL)
799     return NULL;
800   dm = (struct DataMessage* ) &qe[1];
801   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
802   dm->header.size = htons(msize);
803   dm->rid = htonl(rid);
804   dm->size = htonl(size);
805   dm->type = htonl(type);
806   dm->priority = htonl(priority);
807   dm->anonymity = htonl(anonymity);
808   dm->uid = GNUNET_htonll(0);
809   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
810   dm->key = *key;
811   memcpy (&dm[1], data, size);
812   process_queue (h);
813   return qe;
814 }
815
816
817 /**
818  * Reserve space in the datastore.  This function should be used
819  * to avoid "out of space" failures during a longer sequence of "put"
820  * operations (for example, when a file is being inserted).
821  *
822  * @param h handle to the datastore
823  * @param amount how much space (in bytes) should be reserved (for content only)
824  * @param entries how many entries will be created (to calculate per-entry overhead)
825  * @param queue_priority ranking of this request in the priority queue
826  * @param max_queue_size at what queue size should this request be dropped
827  *        (if other requests of higher priority are in the queue)
828  * @param timeout how long to wait at most for a response (or before dying in queue)
829  * @param cont continuation to call when done; "success" will be set to
830  *             a positive reservation value if space could be reserved.
831  * @param cont_cls closure for cont
832  * @return NULL if the entry was not queued, otherwise a handle that can be used to
833  *         cancel; note that even if NULL is returned, the callback will be invoked
834  *         (or rather, will already have been invoked)
835  */
836 struct GNUNET_DATASTORE_QueueEntry *
837 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
838                           uint64_t amount,
839                           uint32_t entries,
840                           unsigned int queue_priority,
841                           unsigned int max_queue_size,
842                           struct GNUNET_TIME_Relative timeout,
843                           GNUNET_DATASTORE_ContinuationWithStatus cont,
844                           void *cont_cls)
845 {
846   struct GNUNET_DATASTORE_QueueEntry *qe;
847   struct ReserveMessage *rm;
848   union QueueContext qc;
849
850   if (cont == NULL)
851     cont = &drop_status_cont;
852 #if DEBUG_DATASTORE
853   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
854               "Asked to reserve %llu bytes of data and %u entries'\n",
855               (unsigned long long) amount,
856               (unsigned int) entries);
857 #endif
858   qc.sc.cont = cont;
859   qc.sc.cont_cls = cont_cls;
860   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
861                          queue_priority, max_queue_size, timeout,
862                          &process_status_message, &qc);
863   if (qe == NULL)
864     return NULL;
865   rm = (struct ReserveMessage*) &qe[1];
866   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
867   rm->header.size = htons(sizeof (struct ReserveMessage));
868   rm->entries = htonl(entries);
869   rm->amount = GNUNET_htonll(amount);
870   process_queue (h);
871   return qe;
872 }
873
874
875 /**
876  * Signal that all of the data for which a reservation was made has
877  * been stored and that whatever excess space might have been reserved
878  * can now be released.
879  *
880  * @param h handle to the datastore
881  * @param rid reservation ID (value of "success" in original continuation
882  *        from the "reserve" function).
883  * @param queue_priority ranking of this request in the priority queue
884  * @param max_queue_size at what queue size should this request be dropped
885  *        (if other requests of higher priority are in the queue)
886  * @param queue_priority ranking of this request in the priority queue
887  * @param max_queue_size at what queue size should this request be dropped
888  *        (if other requests of higher priority are in the queue)
889  * @param timeout how long to wait at most for a response
890  * @param cont continuation to call when done
891  * @param cont_cls closure for cont
892  * @return NULL if the entry was not queued, otherwise a handle that can be used to
893  *         cancel; note that even if NULL is returned, the callback will be invoked
894  *         (or rather, will already have been invoked)
895  */
896 struct GNUNET_DATASTORE_QueueEntry *
897 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
898                                   int rid,
899                                   unsigned int queue_priority,
900                                   unsigned int max_queue_size,
901                                   struct GNUNET_TIME_Relative timeout,
902                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
903                                   void *cont_cls)
904 {
905   struct GNUNET_DATASTORE_QueueEntry *qe;
906   struct ReleaseReserveMessage *rrm;
907   union QueueContext qc;
908
909   if (cont == NULL)
910     cont = &drop_status_cont;
911 #if DEBUG_DATASTORE
912   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
913               "Asked to release reserve %d\n",
914               rid);
915 #endif
916   qc.sc.cont = cont;
917   qc.sc.cont_cls = cont_cls;
918   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
919                          queue_priority, max_queue_size, timeout,
920                          &process_status_message, &qc);
921   if (qe == NULL)
922     return NULL;
923   rrm = (struct ReleaseReserveMessage*) &qe[1];
924   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
925   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
926   rrm->rid = htonl(rid);
927   process_queue (h);
928   return qe;
929 }
930
931
932 /**
933  * Update a value in the datastore.
934  *
935  * @param h handle to the datastore
936  * @param uid identifier for the value
937  * @param priority how much to increase the priority of the value
938  * @param expiration new expiration value should be MAX of existing and this argument
939  * @param queue_priority ranking of this request in the priority queue
940  * @param max_queue_size at what queue size should this request be dropped
941  *        (if other requests of higher priority are in the queue)
942  * @param timeout how long to wait at most for a response
943  * @param cont continuation to call when done
944  * @param cont_cls closure for cont
945  * @return NULL if the entry was not queued, otherwise a handle that can be used to
946  *         cancel; note that even if NULL is returned, the callback will be invoked
947  *         (or rather, will already have been invoked)
948  */
949 struct GNUNET_DATASTORE_QueueEntry *
950 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
951                          unsigned long long uid,
952                          uint32_t priority,
953                          struct GNUNET_TIME_Absolute expiration,
954                          unsigned int queue_priority,
955                          unsigned int max_queue_size,
956                          struct GNUNET_TIME_Relative timeout,
957                          GNUNET_DATASTORE_ContinuationWithStatus cont,
958                          void *cont_cls)
959 {
960   struct GNUNET_DATASTORE_QueueEntry *qe;
961   struct UpdateMessage *um;
962   union QueueContext qc;
963
964   if (cont == NULL)
965     cont = &drop_status_cont;
966 #if DEBUG_DATASTORE
967   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
968               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
969               uid,
970               (unsigned int) priority,
971               (unsigned long long) expiration.value);
972 #endif
973   qc.sc.cont = cont;
974   qc.sc.cont_cls = cont_cls;
975   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
976                          queue_priority, max_queue_size, timeout,
977                          &process_status_message, &qc);
978   if (qe == NULL)
979     return NULL;
980   um = (struct UpdateMessage*) &qe[1];
981   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
982   um->header.size = htons(sizeof (struct UpdateMessage));
983   um->priority = htonl(priority);
984   um->expiration = GNUNET_TIME_absolute_hton(expiration);
985   um->uid = GNUNET_htonll(uid);
986   process_queue (h);
987   return qe;
988 }
989
990
991 /**
992  * Explicitly remove some content from the database.
993  * The "cont"inuation will be called with status
994  * "GNUNET_OK" if content was removed, "GNUNET_NO"
995  * if no matching entry was found and "GNUNET_SYSERR"
996  * on all other types of errors.
997  *
998  * @param h handle to the datastore
999  * @param key key for the value
1000  * @param size number of bytes in data
1001  * @param data content stored
1002  * @param queue_priority ranking of this request in the priority queue
1003  * @param max_queue_size at what queue size should this request be dropped
1004  *        (if other requests of higher priority are in the queue)
1005  * @param timeout how long to wait at most for a response
1006  * @param cont continuation to call when done
1007  * @param cont_cls closure for cont
1008  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1009  *         cancel; note that even if NULL is returned, the callback will be invoked
1010  *         (or rather, will already have been invoked)
1011  */
1012 struct GNUNET_DATASTORE_QueueEntry *
1013 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1014                          const GNUNET_HashCode *key,
1015                          uint32_t size, 
1016                          const void *data,
1017                          unsigned int queue_priority,
1018                          unsigned int max_queue_size,
1019                          struct GNUNET_TIME_Relative timeout,
1020                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1021                          void *cont_cls)
1022 {
1023   struct GNUNET_DATASTORE_QueueEntry *qe;
1024   struct DataMessage *dm;
1025   size_t msize;
1026   union QueueContext qc;
1027
1028   if (cont == NULL)
1029     cont = &drop_status_cont;
1030 #if DEBUG_DATASTORE
1031   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1032               "Asked to remove %u bytes under key `%s'\n",
1033               size,
1034               GNUNET_h2s (key));
1035 #endif
1036   qc.sc.cont = cont;
1037   qc.sc.cont_cls = cont_cls;
1038   msize = sizeof(struct DataMessage) + size;
1039   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1040   qe = make_queue_entry (h, msize,
1041                          queue_priority, max_queue_size, timeout,
1042                          &process_status_message, &qc);
1043   if (qe == NULL)
1044     return NULL;
1045   dm = (struct DataMessage*) &qe[1];
1046   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1047   dm->header.size = htons(msize);
1048   dm->rid = htonl(0);
1049   dm->size = htonl(size);
1050   dm->type = htonl(0);
1051   dm->priority = htonl(0);
1052   dm->anonymity = htonl(0);
1053   dm->uid = GNUNET_htonll(0);
1054   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1055   dm->key = *key;
1056   memcpy (&dm[1], data, size);
1057   process_queue (h);
1058   return qe;
1059 }
1060
1061
1062 /**
1063  * Type of a function to call when we receive a message
1064  * from the service.
1065  *
1066  * @param cls closure
1067  * @param msg message received, NULL on timeout or fatal error
1068  */
1069 static void 
1070 process_result_message (void *cls,
1071                         const struct GNUNET_MessageHeader * msg)
1072 {
1073   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1074   struct GNUNET_DATASTORE_Handle *h = qe->h;
1075   struct ResultContext rc = qe->qc.rc;
1076   const struct DataMessage *dm;
1077   int was_transmitted;
1078
1079   h->in_receive = GNUNET_NO;
1080   if (msg == NULL)
1081    {
1082       was_transmitted = qe->was_transmitted;
1083       free_queue_entry (qe);
1084       if (was_transmitted == GNUNET_YES)
1085         {
1086           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1087                       _("Failed to receive response from database.\n"));
1088           do_disconnect (h);
1089         }
1090       if (rc.iter != NULL)
1091         rc.iter (rc.iter_cls,
1092                  NULL, 0, NULL, 0, 0, 0, 
1093                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1094       return;
1095     }
1096   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1097   GNUNET_assert (h->queue_head == qe);
1098   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1099     {
1100       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1101 #if DEBUG_DATASTORE
1102       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                   "Received end of result set\n");
1104 #endif
1105       free_queue_entry (qe);
1106       if (rc.iter != NULL)
1107         rc.iter (rc.iter_cls,
1108                  NULL, 0, NULL, 0, 0, 0, 
1109                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1110       process_queue (h);
1111       return;
1112     }
1113   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1114        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1115        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1116     {
1117       GNUNET_break (0);
1118       free_queue_entry (qe);
1119       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1120       do_disconnect (h);
1121       if (rc.iter != NULL)
1122         rc.iter (rc.iter_cls,
1123                  NULL, 0, NULL, 0, 0, 0, 
1124                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1125       return;
1126     }
1127   if (rc.iter == NULL)
1128     {
1129       /* abort iteration */
1130 #if DEBUG_DATASTORE
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Aborting iteration via disconnect (client has cancelled)\n");
1133 #endif
1134       free_queue_entry (qe);
1135       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1136       do_disconnect (h);
1137       return;
1138     }
1139   dm = (const struct DataMessage*) msg;
1140 #if DEBUG_DATASTORE
1141   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142               "Received result %llu with type %u and size %u with key %s\n",
1143               (unsigned long long) GNUNET_ntohll(dm->uid),
1144               ntohl(dm->type),
1145               ntohl(dm->size),
1146               GNUNET_h2s(&dm->key));
1147 #endif
1148   rc.iter (rc.iter_cls,
1149            &dm->key,
1150            ntohl(dm->size),
1151            &dm[1],
1152            ntohl(dm->type),
1153            ntohl(dm->priority),
1154            ntohl(dm->anonymity),
1155            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1156            GNUNET_ntohll(dm->uid));
1157 }
1158
1159
1160 /**
1161  * Get a random value from the datastore.
1162  *
1163  * @param h handle to the datastore
1164  * @param queue_priority ranking of this request in the priority queue
1165  * @param max_queue_size at what queue size should this request be dropped
1166  *        (if other requests of higher priority are in the queue)
1167  * @param timeout how long to wait at most for a response
1168  * @param iter function to call on a random value; it
1169  *        will be called once with a value (if available)
1170  *        and always once with a value of NULL.
1171  * @param iter_cls closure for iter
1172  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1173  *         cancel; note that even if NULL is returned, the callback will be invoked
1174  *         (or rather, will already have been invoked)
1175  */
1176 struct GNUNET_DATASTORE_QueueEntry *
1177 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1178                              unsigned int queue_priority,
1179                              unsigned int max_queue_size,
1180                              struct GNUNET_TIME_Relative timeout,
1181                              GNUNET_DATASTORE_Iterator iter, 
1182                              void *iter_cls)
1183 {
1184   struct GNUNET_DATASTORE_QueueEntry *qe;
1185   struct GNUNET_MessageHeader *m;
1186   union QueueContext qc;
1187
1188 #if DEBUG_DATASTORE
1189   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1190               "Asked to get random entry in %llu ms\n",
1191               (unsigned long long) timeout.value);
1192 #endif
1193   qc.rc.iter = iter;
1194   qc.rc.iter_cls = iter_cls;
1195   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1196                          queue_priority, max_queue_size, timeout,
1197                          &process_result_message, &qc);
1198   if (qe == NULL)
1199     return NULL;    
1200   m = (struct GNUNET_MessageHeader*) &qe[1];
1201   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1202   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1203   process_queue (h);
1204   return qe;
1205 }
1206
1207
1208
1209 /**
1210  * Iterate over the results for a particular key
1211  * in the datastore.  The iterator will only be called
1212  * once initially; if the first call did contain a
1213  * result, further results can be obtained by calling
1214  * "GNUNET_DATASTORE_get_next" with the given argument.
1215  *
1216  * @param h handle to the datastore
1217  * @param key maybe NULL (to match all entries)
1218  * @param type desired type, 0 for any
1219  * @param queue_priority ranking of this request in the priority queue
1220  * @param max_queue_size at what queue size should this request be dropped
1221  *        (if other requests of higher priority are in the queue)
1222  * @param timeout how long to wait at most for a response
1223  * @param iter function to call on each matching value;
1224  *        will be called once with a NULL value at the end
1225  * @param iter_cls closure for iter
1226  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1227  *         cancel; note that even if NULL is returned, the callback will be invoked
1228  *         (or rather, will already have been invoked)
1229  */
1230 struct GNUNET_DATASTORE_QueueEntry *
1231 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1232                       const GNUNET_HashCode * key,
1233                       enum GNUNET_BLOCK_Type type,
1234                       unsigned int queue_priority,
1235                       unsigned int max_queue_size,
1236                       struct GNUNET_TIME_Relative timeout,
1237                       GNUNET_DATASTORE_Iterator iter, 
1238                       void *iter_cls)
1239 {
1240   struct GNUNET_DATASTORE_QueueEntry *qe;
1241   struct GetMessage *gm;
1242   union QueueContext qc;
1243
1244 #if DEBUG_DATASTORE
1245   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1246               "Asked to look for data of type %u under key `%s'\n",
1247               (unsigned int) type,
1248               GNUNET_h2s (key));
1249 #endif
1250   qc.rc.iter = iter;
1251   qc.rc.iter_cls = iter_cls;
1252   qe = make_queue_entry (h, sizeof(struct GetMessage),
1253                          queue_priority, max_queue_size, timeout,
1254                          &process_result_message, &qc);
1255   if (qe == NULL)
1256     return NULL;
1257   gm = (struct GetMessage*) &qe[1];
1258   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1259   gm->type = htonl(type);
1260   if (key != NULL)
1261     {
1262       gm->header.size = htons(sizeof (struct GetMessage));
1263       gm->key = *key;
1264     }
1265   else
1266     {
1267       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1268     }
1269   process_queue (h);
1270   return qe;
1271 }
1272
1273
1274 /**
1275  * Function called to trigger obtaining the next result
1276  * from the datastore.
1277  * 
1278  * @param h handle to the datastore
1279  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1280  *        iteration (with a final call to "iter" with key/data == NULL).
1281  */
1282 void 
1283 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1284                            int more)
1285 {
1286   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1287   struct ResultContext rc = qe->qc.rc;
1288
1289   GNUNET_assert (&process_result_message == qe->response_proc);
1290   if (GNUNET_YES == more)
1291     {     
1292       h->in_receive = GNUNET_YES;
1293       GNUNET_CLIENT_receive (h->client,
1294                              qe->response_proc,
1295                              qe,
1296                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1297       return;
1298     }
1299   free_queue_entry (qe);
1300   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1301   do_disconnect (h);
1302   rc.iter (rc.iter_cls,
1303            NULL, 0, NULL, 0, 0, 0, 
1304            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1305 }
1306
1307
1308 /**
1309  * Cancel a datastore operation.  The final callback from the
1310  * operation must not have been done yet.
1311  * 
1312  * @param qe operation to cancel
1313  */
1314 void
1315 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1316 {
1317   struct GNUNET_DATASTORE_Handle *h;
1318   int reconnect;
1319
1320   h = qe->h;
1321 #if DEBUG_DATASTORE
1322   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1323                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1324                qe,
1325                qe->was_transmitted,
1326                h->queue_head == qe);
1327 #endif
1328   reconnect = GNUNET_NO;
1329   if (GNUNET_YES == qe->was_transmitted) 
1330     {
1331       if (qe->response_proc == &process_result_message) 
1332         {
1333           qe->qc.rc.iter = NULL;    
1334           if (GNUNET_YES != h->in_receive)
1335             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1336           return;
1337         }
1338       reconnect = GNUNET_YES;
1339     }
1340   free_queue_entry (qe);
1341   if (reconnect)
1342     {
1343       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1344       do_disconnect (h);
1345     }
1346   else
1347     {
1348       process_queue (h);
1349     }
1350 }
1351
1352
1353 /* end of datastore_api.c */