fix
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     {
476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477                   "DATASTORE reconnect failed (fatally)\n");
478       return;
479     }
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Reconnected to DATASTORE\n");
483 #endif
484   process_queue (h);
485 }
486
487
488 /**
489  * Disconnect from the service and then try reconnecting to the datastore service
490  * after some delay.
491  *
492  * @param h handle to datastore to disconnect and reconnect
493  */
494 static void
495 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
496 {
497   if (h->client == NULL)
498     {
499 #if DEBUG_DATASTORE
500       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                   "client NULL in disconnect, will not try to reconnect\n");
502 #endif
503       return;
504     }
505 #if 0
506   GNUNET_STATISTICS_update (stats,
507                             gettext_noop ("# reconnected to DATASTORE"),
508                             1,
509                             GNUNET_NO);
510 #endif
511   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
512   h->client = NULL;
513   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
514                                                     h->retry_time,
515                                                     &try_reconnect,
516                                                     h);      
517 }
518
519
520 /**
521  * Transmit request from queue to datastore service.
522  *
523  * @param cls the 'struct GNUNET_DATASTORE_Handle'
524  * @param size number of bytes that can be copied to buf
525  * @param buf where to copy the drop message
526  * @return number of bytes written to buf
527  */
528 static size_t
529 transmit_request (void *cls,
530                   size_t size, 
531                   void *buf)
532 {
533   struct GNUNET_DATASTORE_Handle *h = cls;
534   struct GNUNET_DATASTORE_QueueEntry *qe;
535   size_t msize;
536
537   h->th = NULL;
538   if (NULL == (qe = h->queue_head))
539     return 0; /* no entry in queue */
540   if (buf == NULL)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
543                   _("Failed to transmit request to DATASTORE.\n"));
544       do_disconnect (h);
545       return 0;
546     }
547   if (size < (msize = qe->message_size))
548     {
549       process_queue (h);
550       return 0;
551     }
552  #if DEBUG_DATASTORE
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Transmitting %u byte request to DATASTORE\n",
555               msize);
556 #endif
557   memcpy (buf, &qe[1], msize);
558   qe->was_transmitted = GNUNET_YES;
559   GNUNET_SCHEDULER_cancel (h->sched,
560                            qe->task);
561   qe->task = GNUNET_SCHEDULER_NO_TASK;
562   h->in_receive = GNUNET_YES;
563   GNUNET_CLIENT_receive (h->client,
564                          qe->response_proc,
565                          qe,
566                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
567   return msize;
568 }
569
570
571 /**
572  * Process entries in the queue (or do nothing if we are already
573  * doing so).
574  * 
575  * @param h handle to the datastore
576  */
577 static void
578 process_queue (struct GNUNET_DATASTORE_Handle *h)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   if (NULL == (qe = h->queue_head))
583     {
584 #if DEBUG_DATASTORE > 1
585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                   "Queue empty\n");
587 #endif
588       return; /* no entry in queue */
589     }
590   if (qe->was_transmitted == GNUNET_YES)
591     {
592 #if DEBUG_DATASTORE > 1
593       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                   "Head request already transmitted\n");
595 #endif
596       return; /* waiting for replies */
597     }
598   if (h->th != NULL)
599     {
600 #if DEBUG_DATASTORE > 1
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                   "Pending transmission request\n");
603 #endif
604       return; /* request pending */
605     }
606   if (h->client == NULL)
607     {
608 #if DEBUG_DATASTORE > 1
609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610                   "Not connected\n");
611 #endif
612       return; /* waiting for reconnect */
613     }
614 #if DEBUG_DATASTORE
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Queueing %u byte request to DATASTORE\n",
617               qe->message_size);
618 #endif
619   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
620                                                qe->message_size,
621                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
622                                                GNUNET_YES,
623                                                &transmit_request,
624                                                h);
625 }
626
627
628 /**
629  * Dummy continuation used to do nothing (but be non-zero).
630  *
631  * @param cls closure
632  * @param result result 
633  * @param emsg error message
634  */
635 static void
636 drop_status_cont (void *cls, int result, const char *emsg)
637 {
638   /* do nothing */
639 }
640
641
642 static void
643 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
644 {
645   struct GNUNET_DATASTORE_Handle *h = qe->h;
646
647   GNUNET_CONTAINER_DLL_remove (h->queue_head,
648                                h->queue_tail,
649                                qe);
650   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
651     {
652       GNUNET_SCHEDULER_cancel (h->sched,
653                                qe->task);
654       qe->task = GNUNET_SCHEDULER_NO_TASK;
655     }
656   h->queue_size--;
657   GNUNET_free (qe);
658 }
659
660 /**
661  * Type of a function to call when we receive a message
662  * from the service.
663  *
664  * @param cls closure
665  * @param msg message received, NULL on timeout or fatal error
666  */
667 static void 
668 process_status_message (void *cls,
669                         const struct
670                         GNUNET_MessageHeader * msg)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
673   struct GNUNET_DATASTORE_Handle *h = qe->h;
674   struct StatusContext rc = qe->qc.sc;
675   const struct StatusMessage *sm;
676   const char *emsg;
677   int32_t status;
678   int was_transmitted;
679
680   h->in_receive = GNUNET_NO;
681   was_transmitted = qe->was_transmitted;
682   if (msg == NULL)
683     {      
684       free_queue_entry (qe);
685       if (NULL == h->client)
686         return; /* forced disconnect */
687       if (was_transmitted == GNUNET_YES)
688         {
689           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
690                       _("Failed to receive response from database.\n"));
691           do_disconnect (h);
692         }
693       return;
694     }
695   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
696   GNUNET_assert (h->queue_head == qe);
697   free_queue_entry (qe);
698   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
699        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
700     {
701       GNUNET_break (0);
702       h->retry_time = GNUNET_TIME_UNIT_ZERO;
703       do_disconnect (h);
704       rc.cont (rc.cont_cls, 
705                GNUNET_SYSERR,
706                _("Error reading response from datastore service"));
707       return;
708     }
709   sm = (const struct StatusMessage*) msg;
710   status = ntohl(sm->status);
711   emsg = NULL;
712   if (ntohs(msg->size) > sizeof(struct StatusMessage))
713     {
714       emsg = (const char*) &sm[1];
715       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
716         {
717           GNUNET_break (0);
718           emsg = _("Invalid error message received from datastore service");
719         }
720     }  
721   if ( (status == GNUNET_SYSERR) &&
722        (emsg == NULL) )
723     {
724       GNUNET_break (0);
725       emsg = _("Invalid error message received from datastore service");
726     }
727 #if DEBUG_DATASTORE
728   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
729               "Received status %d/%s\n",
730               (int) status,
731               emsg);
732 #endif
733   rc.cont (rc.cont_cls, 
734            status,
735            emsg);
736   process_queue (h);
737 }
738
739
740 /**
741  * Store an item in the datastore.  If the item is already present,
742  * the priorities are summed up and the higher expiration time and
743  * lower anonymity level is used.
744  *
745  * @param h handle to the datastore
746  * @param rid reservation ID to use (from "reserve"); use 0 if no
747  *            prior reservation was made
748  * @param key key for the value
749  * @param size number of bytes in data
750  * @param data content stored
751  * @param type type of the content
752  * @param priority priority of the content
753  * @param anonymity anonymity-level for the content
754  * @param expiration expiration time for the content
755  * @param queue_priority ranking of this request in the priority queue
756  * @param max_queue_size at what queue size should this request be dropped
757  *        (if other requests of higher priority are in the queue)
758  * @param timeout timeout for the operation
759  * @param cont continuation to call when done
760  * @param cont_cls closure for cont
761  * @return NULL if the entry was not queued, otherwise a handle that can be used to
762  *         cancel; note that even if NULL is returned, the callback will be invoked
763  *         (or rather, will already have been invoked)
764  */
765 struct GNUNET_DATASTORE_QueueEntry *
766 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
767                       int rid,
768                       const GNUNET_HashCode * key,
769                       uint32_t size,
770                       const void *data,
771                       enum GNUNET_BLOCK_Type type,
772                       uint32_t priority,
773                       uint32_t anonymity,
774                       struct GNUNET_TIME_Absolute expiration,
775                       unsigned int queue_priority,
776                       unsigned int max_queue_size,
777                       struct GNUNET_TIME_Relative timeout,
778                       GNUNET_DATASTORE_ContinuationWithStatus cont,
779                       void *cont_cls)
780 {
781   struct GNUNET_DATASTORE_QueueEntry *qe;
782   struct DataMessage *dm;
783   size_t msize;
784   union QueueContext qc;
785
786 #if DEBUG_DATASTORE
787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
788               "Asked to put %u bytes of data under key `%s'\n",
789               size,
790               GNUNET_h2s (key));
791 #endif
792   msize = sizeof(struct DataMessage) + size;
793   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
794   qc.sc.cont = cont;
795   qc.sc.cont_cls = cont_cls;
796   qe = make_queue_entry (h, msize,
797                          queue_priority, max_queue_size, timeout,
798                          &process_status_message, &qc);
799   if (qe == NULL)
800     return NULL;
801   dm = (struct DataMessage* ) &qe[1];
802   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
803   dm->header.size = htons(msize);
804   dm->rid = htonl(rid);
805   dm->size = htonl(size);
806   dm->type = htonl(type);
807   dm->priority = htonl(priority);
808   dm->anonymity = htonl(anonymity);
809   dm->uid = GNUNET_htonll(0);
810   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
811   dm->key = *key;
812   memcpy (&dm[1], data, size);
813   process_queue (h);
814   return qe;
815 }
816
817
818 /**
819  * Reserve space in the datastore.  This function should be used
820  * to avoid "out of space" failures during a longer sequence of "put"
821  * operations (for example, when a file is being inserted).
822  *
823  * @param h handle to the datastore
824  * @param amount how much space (in bytes) should be reserved (for content only)
825  * @param entries how many entries will be created (to calculate per-entry overhead)
826  * @param queue_priority ranking of this request in the priority queue
827  * @param max_queue_size at what queue size should this request be dropped
828  *        (if other requests of higher priority are in the queue)
829  * @param timeout how long to wait at most for a response (or before dying in queue)
830  * @param cont continuation to call when done; "success" will be set to
831  *             a positive reservation value if space could be reserved.
832  * @param cont_cls closure for cont
833  * @return NULL if the entry was not queued, otherwise a handle that can be used to
834  *         cancel; note that even if NULL is returned, the callback will be invoked
835  *         (or rather, will already have been invoked)
836  */
837 struct GNUNET_DATASTORE_QueueEntry *
838 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
839                           uint64_t amount,
840                           uint32_t entries,
841                           unsigned int queue_priority,
842                           unsigned int max_queue_size,
843                           struct GNUNET_TIME_Relative timeout,
844                           GNUNET_DATASTORE_ContinuationWithStatus cont,
845                           void *cont_cls)
846 {
847   struct GNUNET_DATASTORE_QueueEntry *qe;
848   struct ReserveMessage *rm;
849   union QueueContext qc;
850
851   if (cont == NULL)
852     cont = &drop_status_cont;
853 #if DEBUG_DATASTORE
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Asked to reserve %llu bytes of data and %u entries'\n",
856               (unsigned long long) amount,
857               (unsigned int) entries);
858 #endif
859   qc.sc.cont = cont;
860   qc.sc.cont_cls = cont_cls;
861   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
862                          queue_priority, max_queue_size, timeout,
863                          &process_status_message, &qc);
864   if (qe == NULL)
865     return NULL;
866   rm = (struct ReserveMessage*) &qe[1];
867   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
868   rm->header.size = htons(sizeof (struct ReserveMessage));
869   rm->entries = htonl(entries);
870   rm->amount = GNUNET_htonll(amount);
871   process_queue (h);
872   return qe;
873 }
874
875
876 /**
877  * Signal that all of the data for which a reservation was made has
878  * been stored and that whatever excess space might have been reserved
879  * can now be released.
880  *
881  * @param h handle to the datastore
882  * @param rid reservation ID (value of "success" in original continuation
883  *        from the "reserve" function).
884  * @param queue_priority ranking of this request in the priority queue
885  * @param max_queue_size at what queue size should this request be dropped
886  *        (if other requests of higher priority are in the queue)
887  * @param queue_priority ranking of this request in the priority queue
888  * @param max_queue_size at what queue size should this request be dropped
889  *        (if other requests of higher priority are in the queue)
890  * @param timeout how long to wait at most for a response
891  * @param cont continuation to call when done
892  * @param cont_cls closure for cont
893  * @return NULL if the entry was not queued, otherwise a handle that can be used to
894  *         cancel; note that even if NULL is returned, the callback will be invoked
895  *         (or rather, will already have been invoked)
896  */
897 struct GNUNET_DATASTORE_QueueEntry *
898 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
899                                   int rid,
900                                   unsigned int queue_priority,
901                                   unsigned int max_queue_size,
902                                   struct GNUNET_TIME_Relative timeout,
903                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
904                                   void *cont_cls)
905 {
906   struct GNUNET_DATASTORE_QueueEntry *qe;
907   struct ReleaseReserveMessage *rrm;
908   union QueueContext qc;
909
910   if (cont == NULL)
911     cont = &drop_status_cont;
912 #if DEBUG_DATASTORE
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914               "Asked to release reserve %d\n",
915               rid);
916 #endif
917   qc.sc.cont = cont;
918   qc.sc.cont_cls = cont_cls;
919   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
920                          queue_priority, max_queue_size, timeout,
921                          &process_status_message, &qc);
922   if (qe == NULL)
923     return NULL;
924   rrm = (struct ReleaseReserveMessage*) &qe[1];
925   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
926   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
927   rrm->rid = htonl(rid);
928   process_queue (h);
929   return qe;
930 }
931
932
933 /**
934  * Update a value in the datastore.
935  *
936  * @param h handle to the datastore
937  * @param uid identifier for the value
938  * @param priority how much to increase the priority of the value
939  * @param expiration new expiration value should be MAX of existing and this argument
940  * @param queue_priority ranking of this request in the priority queue
941  * @param max_queue_size at what queue size should this request be dropped
942  *        (if other requests of higher priority are in the queue)
943  * @param timeout how long to wait at most for a response
944  * @param cont continuation to call when done
945  * @param cont_cls closure for cont
946  * @return NULL if the entry was not queued, otherwise a handle that can be used to
947  *         cancel; note that even if NULL is returned, the callback will be invoked
948  *         (or rather, will already have been invoked)
949  */
950 struct GNUNET_DATASTORE_QueueEntry *
951 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
952                          unsigned long long uid,
953                          uint32_t priority,
954                          struct GNUNET_TIME_Absolute expiration,
955                          unsigned int queue_priority,
956                          unsigned int max_queue_size,
957                          struct GNUNET_TIME_Relative timeout,
958                          GNUNET_DATASTORE_ContinuationWithStatus cont,
959                          void *cont_cls)
960 {
961   struct GNUNET_DATASTORE_QueueEntry *qe;
962   struct UpdateMessage *um;
963   union QueueContext qc;
964
965   if (cont == NULL)
966     cont = &drop_status_cont;
967 #if DEBUG_DATASTORE
968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
970               uid,
971               (unsigned int) priority,
972               (unsigned long long) expiration.value);
973 #endif
974   qc.sc.cont = cont;
975   qc.sc.cont_cls = cont_cls;
976   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
977                          queue_priority, max_queue_size, timeout,
978                          &process_status_message, &qc);
979   if (qe == NULL)
980     return NULL;
981   um = (struct UpdateMessage*) &qe[1];
982   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
983   um->header.size = htons(sizeof (struct UpdateMessage));
984   um->priority = htonl(priority);
985   um->expiration = GNUNET_TIME_absolute_hton(expiration);
986   um->uid = GNUNET_htonll(uid);
987   process_queue (h);
988   return qe;
989 }
990
991
992 /**
993  * Explicitly remove some content from the database.
994  * The "cont"inuation will be called with status
995  * "GNUNET_OK" if content was removed, "GNUNET_NO"
996  * if no matching entry was found and "GNUNET_SYSERR"
997  * on all other types of errors.
998  *
999  * @param h handle to the datastore
1000  * @param key key for the value
1001  * @param size number of bytes in data
1002  * @param data content stored
1003  * @param queue_priority ranking of this request in the priority queue
1004  * @param max_queue_size at what queue size should this request be dropped
1005  *        (if other requests of higher priority are in the queue)
1006  * @param timeout how long to wait at most for a response
1007  * @param cont continuation to call when done
1008  * @param cont_cls closure for cont
1009  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1010  *         cancel; note that even if NULL is returned, the callback will be invoked
1011  *         (or rather, will already have been invoked)
1012  */
1013 struct GNUNET_DATASTORE_QueueEntry *
1014 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1015                          const GNUNET_HashCode *key,
1016                          uint32_t size, 
1017                          const void *data,
1018                          unsigned int queue_priority,
1019                          unsigned int max_queue_size,
1020                          struct GNUNET_TIME_Relative timeout,
1021                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1022                          void *cont_cls)
1023 {
1024   struct GNUNET_DATASTORE_QueueEntry *qe;
1025   struct DataMessage *dm;
1026   size_t msize;
1027   union QueueContext qc;
1028
1029   if (cont == NULL)
1030     cont = &drop_status_cont;
1031 #if DEBUG_DATASTORE
1032   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1033               "Asked to remove %u bytes under key `%s'\n",
1034               size,
1035               GNUNET_h2s (key));
1036 #endif
1037   qc.sc.cont = cont;
1038   qc.sc.cont_cls = cont_cls;
1039   msize = sizeof(struct DataMessage) + size;
1040   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1041   qe = make_queue_entry (h, msize,
1042                          queue_priority, max_queue_size, timeout,
1043                          &process_status_message, &qc);
1044   if (qe == NULL)
1045     return NULL;
1046   dm = (struct DataMessage*) &qe[1];
1047   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1048   dm->header.size = htons(msize);
1049   dm->rid = htonl(0);
1050   dm->size = htonl(size);
1051   dm->type = htonl(0);
1052   dm->priority = htonl(0);
1053   dm->anonymity = htonl(0);
1054   dm->uid = GNUNET_htonll(0);
1055   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1056   dm->key = *key;
1057   memcpy (&dm[1], data, size);
1058   process_queue (h);
1059   return qe;
1060 }
1061
1062
1063 /**
1064  * Type of a function to call when we receive a message
1065  * from the service.
1066  *
1067  * @param cls closure
1068  * @param msg message received, NULL on timeout or fatal error
1069  */
1070 static void 
1071 process_result_message (void *cls,
1072                         const struct GNUNET_MessageHeader * msg)
1073 {
1074   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1075   struct GNUNET_DATASTORE_Handle *h = qe->h;
1076   struct ResultContext rc = qe->qc.rc;
1077   const struct DataMessage *dm;
1078   int was_transmitted;
1079
1080   h->in_receive = GNUNET_NO;
1081   if (msg == NULL)
1082     {
1083       was_transmitted = qe->was_transmitted;
1084       free_queue_entry (qe);
1085       if (was_transmitted == GNUNET_YES)
1086         {
1087           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1088                       _("Failed to receive response from database.\n"));
1089           do_disconnect (h);
1090         }
1091       if (rc.iter != NULL)
1092         rc.iter (rc.iter_cls,
1093                  NULL, 0, NULL, 0, 0, 0, 
1094                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1095       return;
1096     }
1097   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1098   GNUNET_assert (h->queue_head == qe);
1099   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1100     {
1101       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1102 #if DEBUG_DATASTORE
1103       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1104                   "Received end of result set\n");
1105 #endif
1106       free_queue_entry (qe);
1107       if (rc.iter != NULL)
1108         rc.iter (rc.iter_cls,
1109                  NULL, 0, NULL, 0, 0, 0, 
1110                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1111       process_queue (h);
1112       return;
1113     }
1114   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1115        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1116        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1117     {
1118       GNUNET_break (0);
1119       free_queue_entry (qe);
1120       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1121       do_disconnect (h);
1122       if (rc.iter != NULL)
1123         rc.iter (rc.iter_cls,
1124                  NULL, 0, NULL, 0, 0, 0, 
1125                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1126       return;
1127     }
1128   if (rc.iter == NULL)
1129     {
1130       /* abort iteration */
1131 #if DEBUG_DATASTORE
1132       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1133                   "Aborting iteration via disconnect (client has cancelled)\n");
1134 #endif
1135       free_queue_entry (qe);
1136       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1137       do_disconnect (h);
1138       return;
1139     }
1140   dm = (const struct DataMessage*) msg;
1141 #if DEBUG_DATASTORE
1142   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143               "Received result %llu with type %u and size %u with key %s\n",
1144               (unsigned long long) GNUNET_ntohll(dm->uid),
1145               ntohl(dm->type),
1146               ntohl(dm->size),
1147               GNUNET_h2s(&dm->key));
1148 #endif
1149   rc.iter (rc.iter_cls,
1150            &dm->key,
1151            ntohl(dm->size),
1152            &dm[1],
1153            ntohl(dm->type),
1154            ntohl(dm->priority),
1155            ntohl(dm->anonymity),
1156            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1157            GNUNET_ntohll(dm->uid));
1158 }
1159
1160
1161 /**
1162  * Get a random value from the datastore.
1163  *
1164  * @param h handle to the datastore
1165  * @param queue_priority ranking of this request in the priority queue
1166  * @param max_queue_size at what queue size should this request be dropped
1167  *        (if other requests of higher priority are in the queue)
1168  * @param timeout how long to wait at most for a response
1169  * @param iter function to call on a random value; it
1170  *        will be called once with a value (if available)
1171  *        and always once with a value of NULL.
1172  * @param iter_cls closure for iter
1173  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1174  *         cancel; note that even if NULL is returned, the callback will be invoked
1175  *         (or rather, will already have been invoked)
1176  */
1177 struct GNUNET_DATASTORE_QueueEntry *
1178 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1179                              unsigned int queue_priority,
1180                              unsigned int max_queue_size,
1181                              struct GNUNET_TIME_Relative timeout,
1182                              GNUNET_DATASTORE_Iterator iter, 
1183                              void *iter_cls)
1184 {
1185   struct GNUNET_DATASTORE_QueueEntry *qe;
1186   struct GNUNET_MessageHeader *m;
1187   union QueueContext qc;
1188
1189 #if DEBUG_DATASTORE
1190   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1191               "Asked to get random entry in %llu ms\n",
1192               (unsigned long long) timeout.value);
1193 #endif
1194   qc.rc.iter = iter;
1195   qc.rc.iter_cls = iter_cls;
1196   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1197                          queue_priority, max_queue_size, timeout,
1198                          &process_result_message, &qc);
1199   if (qe == NULL)
1200     return NULL;    
1201   m = (struct GNUNET_MessageHeader*) &qe[1];
1202   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1203   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1204   process_queue (h);
1205   return qe;
1206 }
1207
1208
1209
1210 /**
1211  * Iterate over the results for a particular key
1212  * in the datastore.  The iterator will only be called
1213  * once initially; if the first call did contain a
1214  * result, further results can be obtained by calling
1215  * "GNUNET_DATASTORE_get_next" with the given argument.
1216  *
1217  * @param h handle to the datastore
1218  * @param key maybe NULL (to match all entries)
1219  * @param type desired type, 0 for any
1220  * @param queue_priority ranking of this request in the priority queue
1221  * @param max_queue_size at what queue size should this request be dropped
1222  *        (if other requests of higher priority are in the queue)
1223  * @param timeout how long to wait at most for a response
1224  * @param iter function to call on each matching value;
1225  *        will be called once with a NULL value at the end
1226  * @param iter_cls closure for iter
1227  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1228  *         cancel; note that even if NULL is returned, the callback will be invoked
1229  *         (or rather, will already have been invoked)
1230  */
1231 struct GNUNET_DATASTORE_QueueEntry *
1232 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1233                       const GNUNET_HashCode * key,
1234                       enum GNUNET_BLOCK_Type type,
1235                       unsigned int queue_priority,
1236                       unsigned int max_queue_size,
1237                       struct GNUNET_TIME_Relative timeout,
1238                       GNUNET_DATASTORE_Iterator iter, 
1239                       void *iter_cls)
1240 {
1241   struct GNUNET_DATASTORE_QueueEntry *qe;
1242   struct GetMessage *gm;
1243   union QueueContext qc;
1244
1245 #if DEBUG_DATASTORE
1246   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1247               "Asked to look for data of type %u under key `%s'\n",
1248               (unsigned int) type,
1249               GNUNET_h2s (key));
1250 #endif
1251   qc.rc.iter = iter;
1252   qc.rc.iter_cls = iter_cls;
1253   qe = make_queue_entry (h, sizeof(struct GetMessage),
1254                          queue_priority, max_queue_size, timeout,
1255                          &process_result_message, &qc);
1256   if (qe == NULL)
1257     return NULL;
1258   gm = (struct GetMessage*) &qe[1];
1259   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1260   gm->type = htonl(type);
1261   if (key != NULL)
1262     {
1263       gm->header.size = htons(sizeof (struct GetMessage));
1264       gm->key = *key;
1265     }
1266   else
1267     {
1268       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1269     }
1270   process_queue (h);
1271   return qe;
1272 }
1273
1274
1275 /**
1276  * Function called to trigger obtaining the next result
1277  * from the datastore.
1278  * 
1279  * @param h handle to the datastore
1280  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1281  *        iteration (with a final call to "iter" with key/data == NULL).
1282  */
1283 void 
1284 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1285                            int more)
1286 {
1287   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1288   struct ResultContext rc = qe->qc.rc;
1289
1290   GNUNET_assert (&process_result_message == qe->response_proc);
1291   if (GNUNET_YES == more)
1292     {     
1293       h->in_receive = GNUNET_YES;
1294       GNUNET_CLIENT_receive (h->client,
1295                              qe->response_proc,
1296                              qe,
1297                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1298       return;
1299     }
1300   free_queue_entry (qe);
1301   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1302   do_disconnect (h);
1303   rc.iter (rc.iter_cls,
1304            NULL, 0, NULL, 0, 0, 0, 
1305            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1306 }
1307
1308
1309 /**
1310  * Cancel a datastore operation.  The final callback from the
1311  * operation must not have been done yet.
1312  * 
1313  * @param qe operation to cancel
1314  */
1315 void
1316 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1317 {
1318   struct GNUNET_DATASTORE_Handle *h;
1319   int reconnect;
1320
1321   h = qe->h;
1322 #if DEBUG_DATASTORE
1323   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1324                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1325                qe,
1326                qe->was_transmitted,
1327                h->queue_head == qe);
1328 #endif
1329   reconnect = GNUNET_NO;
1330   if (GNUNET_YES == qe->was_transmitted) 
1331     {
1332       if (qe->response_proc == &process_result_message) 
1333         {
1334           qe->qc.rc.iter = NULL;    
1335           if (GNUNET_YES != h->in_receive)
1336             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1337           return;
1338         }
1339       reconnect = GNUNET_YES;
1340     }
1341   free_queue_entry (qe);
1342   if (reconnect)
1343     {
1344       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1345       do_disconnect (h);
1346     }
1347   else
1348     {
1349       process_queue (h);
1350     }
1351 }
1352
1353
1354 /* end of datastore_api.c */