fixing reconnect issues
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217 };
218
219
220
221 /**
222  * Connect to the datastore service.
223  *
224  * @param cfg configuration to use
225  * @param sched scheduler to use
226  * @return handle to use to access the service
227  */
228 struct GNUNET_DATASTORE_Handle *
229 GNUNET_DATASTORE_connect (const struct
230                           GNUNET_CONFIGURATION_Handle
231                           *cfg,
232                           struct
233                           GNUNET_SCHEDULER_Handle
234                           *sched)
235 {
236   struct GNUNET_CLIENT_Connection *c;
237   struct GNUNET_DATASTORE_Handle *h;
238   
239   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
240   if (c == NULL)
241     return NULL; /* oops */
242   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
243                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
244   h->client = c;
245   h->cfg = cfg;
246   h->sched = sched;
247   return h;
248 }
249
250
251 /**
252  * Transmit DROP message to datastore service.
253  *
254  * @param cls the 'struct GNUNET_DATASTORE_Handle'
255  * @param size number of bytes that can be copied to buf
256  * @param buf where to copy the drop message
257  * @return number of bytes written to buf
258  */
259 static size_t
260 transmit_drop (void *cls,
261                size_t size, 
262                void *buf)
263 {
264   struct GNUNET_DATASTORE_Handle *h = cls;
265   struct GNUNET_MessageHeader *hdr;
266   
267   if (buf == NULL)
268     {
269       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
270                   _("Failed to transmit request to drop database.\n"));
271       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
272       return 0;
273     }
274   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
275   hdr = buf;
276   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
277   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
278   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
279   return sizeof(struct GNUNET_MessageHeader);
280 }
281
282
283 /**
284  * Disconnect from the datastore service (and free
285  * associated resources).
286  *
287  * @param h handle to the datastore
288  * @param drop set to GNUNET_YES to delete all data in datastore (!)
289  */
290 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
291                                   int drop)
292 {
293   struct GNUNET_DATASTORE_QueueEntry *qe;
294
295   if (h->client != NULL)
296     {
297       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
298       h->client = NULL;
299     }
300   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
301     {
302       GNUNET_SCHEDULER_cancel (h->sched,
303                                h->reconnect_task);
304       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
305     }
306   while (NULL != (qe = h->queue_head))
307     {
308       GNUNET_assert (NULL != qe->response_proc);
309       qe->response_proc (qe, NULL);
310     }
311   if (GNUNET_YES == drop) 
312     {
313       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
314       if (h->client != NULL)
315         {
316           if (NULL != 
317               GNUNET_CLIENT_notify_transmit_ready (h->client,
318                                                    sizeof(struct GNUNET_MessageHeader),
319                                                    GNUNET_TIME_UNIT_MINUTES,
320                                                    GNUNET_YES,
321                                                    &transmit_drop,
322                                                    h))
323             return;
324           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325         }
326       GNUNET_break (0);
327     }
328   GNUNET_free (h);
329 }
330
331
332 /**
333  * A request has timed out (before being transmitted to the service).
334  *
335  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
336  * @param tc scheduler context
337  */
338 static void
339 timeout_queue_entry (void *cls,
340                      const struct GNUNET_SCHEDULER_TaskContext *tc)
341 {
342   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
343
344   qe->task = GNUNET_SCHEDULER_NO_TASK;
345   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
346   qe->response_proc (qe, NULL);
347 }
348
349
350 /**
351  * Create a new entry for our priority queue (and possibly discard other entires if
352  * the queue is getting too long).
353  *
354  * @param h handle to the datastore
355  * @param msize size of the message to queue
356  * @param queue_priority priority of the entry
357  * @param max_queue_size at what queue size should this request be dropped
358  *        (if other requests of higher priority are in the queue)
359  * @param timeout timeout for the operation
360  * @param response_proc function to call with replies (can be NULL)
361  * @param qc client context (NOT a closure for response_proc)
362  * @return NULL if the queue is full (and this entry was dropped)
363  */
364 static struct GNUNET_DATASTORE_QueueEntry *
365 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
366                   size_t msize,
367                   unsigned int queue_priority,
368                   unsigned int max_queue_size,
369                   struct GNUNET_TIME_Relative timeout,
370                   GNUNET_CLIENT_MessageHandler response_proc,            
371                   const union QueueContext *qc)
372 {
373   struct GNUNET_DATASTORE_QueueEntry *ret;
374   struct GNUNET_DATASTORE_QueueEntry *pos;
375   unsigned int c;
376
377   c = 0;
378   pos = h->queue_head;
379   while ( (pos != NULL) &&
380           (c < max_queue_size) &&
381           (pos->priority >= queue_priority) )
382     {
383       c++;
384       pos = pos->next;
385     }
386   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
387   ret->h = h;
388   ret->response_proc = response_proc;
389   ret->qc = *qc;
390   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
391   ret->priority = queue_priority;
392   ret->max_queue = max_queue_size;
393   ret->message_size = msize;
394   ret->was_transmitted = GNUNET_NO;
395   if (pos == NULL)
396     {
397       /* append at the tail */
398       pos = h->queue_tail;
399     }
400   else
401     {
402       pos = pos->prev; 
403       /* do not insert at HEAD if HEAD query was already
404          transmitted and we are still receiving replies! */
405       if ( (pos == NULL) &&
406            (h->queue_head->was_transmitted) )
407         pos = h->queue_head;
408     }
409   c++;
410   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
411                                      h->queue_tail,
412                                      pos,
413                                      ret);
414   h->queue_size++;
415   if (c > max_queue_size)
416     {
417       response_proc (ret, NULL);
418       return NULL;
419     }
420   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
421                                             timeout,
422                                             &timeout_queue_entry,
423                                             ret);
424   pos = ret->next;
425   while (pos != NULL) 
426     {
427       if (pos->max_queue < h->queue_size)
428         {
429           GNUNET_assert (pos->response_proc != NULL);
430           pos->response_proc (pos, NULL);
431           break;
432         }
433       pos = pos->next;
434     }
435   return ret;
436 }
437
438
439 /**
440  * Process entries in the queue (or do nothing if we are already
441  * doing so).
442  * 
443  * @param h handle to the datastore
444  */
445 static void
446 process_queue (struct GNUNET_DATASTORE_Handle *h);
447
448
449 /**
450  * Try reconnecting to the datastore service.
451  *
452  * @param cls the 'struct GNUNET_DATASTORE_Handle'
453  * @param tc scheduler context
454  */
455 static void
456 try_reconnect (void *cls,
457                const struct GNUNET_SCHEDULER_TaskContext *tc)
458 {
459   struct GNUNET_DATASTORE_Handle *h = cls;
460
461   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
462     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
463   else
464     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
465   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
466     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
467   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
468   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
469   if (h->client == NULL)
470     return;
471   process_queue (h);
472 }
473
474
475 /**
476  * Disconnect from the service and then try reconnecting to the datastore service
477  * after some delay.
478  *
479  * @param h handle to datastore to disconnect and reconnect
480  */
481 static void
482 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
483 {
484   if (h->client == NULL)
485     return;
486 #if 0
487   GNUNET_STATISTICS_update (stats,
488                             gettext_noop ("# reconnected to datastore"),
489                             1,
490                             GNUNET_NO);
491 #endif
492   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
493   h->client = NULL;
494   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
495                                                     h->retry_time,
496                                                     &try_reconnect,
497                                                     h);      
498 }
499
500
501 /**
502  * Transmit request from queue to datastore service.
503  *
504  * @param cls the 'struct GNUNET_DATASTORE_Handle'
505  * @param size number of bytes that can be copied to buf
506  * @param buf where to copy the drop message
507  * @return number of bytes written to buf
508  */
509 static size_t
510 transmit_request (void *cls,
511                   size_t size, 
512                   void *buf)
513 {
514   struct GNUNET_DATASTORE_Handle *h = cls;
515   struct GNUNET_DATASTORE_QueueEntry *qe;
516   size_t msize;
517
518   h->th = NULL;
519   if (NULL == (qe = h->queue_head))
520     return 0; /* no entry in queue */
521   if (buf == NULL)
522     {
523       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
524                   _("Failed to transmit request to database.\n"));
525       do_disconnect (h);
526       return 0;
527     }
528   if (size < (msize = qe->message_size))
529     {
530       process_queue (h);
531       return 0;
532     }
533   memcpy (buf, &qe[1], msize);
534   qe->was_transmitted = GNUNET_YES;
535   GNUNET_SCHEDULER_cancel (h->sched,
536                            qe->task);
537   qe->task = GNUNET_SCHEDULER_NO_TASK;
538   GNUNET_CLIENT_receive (h->client,
539                          qe->response_proc,
540                          qe,
541                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
542   return msize;
543 }
544
545
546 /**
547  * Process entries in the queue (or do nothing if we are already
548  * doing so).
549  * 
550  * @param h handle to the datastore
551  */
552 static void
553 process_queue (struct GNUNET_DATASTORE_Handle *h)
554 {
555   struct GNUNET_DATASTORE_QueueEntry *qe;
556
557   if (NULL == (qe = h->queue_head))
558     return; /* no entry in queue */
559   if (qe->was_transmitted == GNUNET_YES)
560     return; /* waiting for replies */
561   if (h->th != NULL)
562     return; /* request pending */
563   if (h->client == NULL)
564     return; /* waiting for reconnect */
565   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
566                                                qe->message_size,
567                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
568                                                GNUNET_YES,
569                                                &transmit_request,
570                                                h);
571 }
572
573
574 /**
575  * Dummy continuation used to do nothing (but be non-zero).
576  *
577  * @param cls closure
578  * @param result result 
579  * @param emsg error message
580  */
581 static void
582 drop_status_cont (void *cls, int result, const char *emsg)
583 {
584   /* do nothing */
585 }
586
587
588 static void
589 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
590 {
591   struct GNUNET_DATASTORE_Handle *h = qe->h;
592
593   GNUNET_CONTAINER_DLL_remove (h->queue_head,
594                                h->queue_tail,
595                                qe);
596   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
597     {
598       GNUNET_SCHEDULER_cancel (h->sched,
599                                qe->task);
600       qe->task = GNUNET_SCHEDULER_NO_TASK;
601     }
602   h->queue_size--;
603   GNUNET_free (qe);
604 }
605
606 /**
607  * Type of a function to call when we receive a message
608  * from the service.
609  *
610  * @param cls closure
611  * @param msg message received, NULL on timeout or fatal error
612  */
613 static void 
614 process_status_message (void *cls,
615                         const struct
616                         GNUNET_MessageHeader * msg)
617 {
618   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
619   struct GNUNET_DATASTORE_Handle *h = qe->h;
620   struct StatusContext rc = qe->qc.sc;
621   const struct StatusMessage *sm;
622   const char *emsg;
623   int32_t status;
624   int was_transmitted;
625
626   was_transmitted = qe->was_transmitted;
627   if (msg == NULL)
628     {      
629       free_queue_entry (qe);
630       if (NULL == h->client)
631         return; /* forced disconnect */
632       if (was_transmitted == GNUNET_YES)
633         {
634           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
635                       _("Failed to receive response from database.\n"));
636           do_disconnect (h);
637         }
638       return;
639     }
640   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
641   GNUNET_assert (h->queue_head == qe);
642   free_queue_entry (qe);
643   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
644        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
645     {
646       GNUNET_break (0);
647       h->retry_time = GNUNET_TIME_UNIT_ZERO;
648       do_disconnect (h);
649       rc.cont (rc.cont_cls, 
650                GNUNET_SYSERR,
651                _("Error reading response from datastore service"));
652       return;
653     }
654   sm = (const struct StatusMessage*) msg;
655   status = ntohl(sm->status);
656   emsg = NULL;
657   if (ntohs(msg->size) > sizeof(struct StatusMessage))
658     {
659       emsg = (const char*) &sm[1];
660       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
661         {
662           GNUNET_break (0);
663           emsg = _("Invalid error message received from datastore service");
664         }
665     }  
666   if ( (status == GNUNET_SYSERR) &&
667        (emsg == NULL) )
668     {
669       GNUNET_break (0);
670       emsg = _("Invalid error message received from datastore service");
671     }
672 #if DEBUG_DATASTORE
673   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
674               "Received status %d/%s\n",
675               (int) status,
676               emsg);
677 #endif
678   rc.cont (rc.cont_cls, 
679            status,
680            emsg);
681   process_queue (h);
682 }
683
684
685 /**
686  * Store an item in the datastore.  If the item is already present,
687  * the priorities are summed up and the higher expiration time and
688  * lower anonymity level is used.
689  *
690  * @param h handle to the datastore
691  * @param rid reservation ID to use (from "reserve"); use 0 if no
692  *            prior reservation was made
693  * @param key key for the value
694  * @param size number of bytes in data
695  * @param data content stored
696  * @param type type of the content
697  * @param priority priority of the content
698  * @param anonymity anonymity-level for the content
699  * @param expiration expiration time for the content
700  * @param queue_priority ranking of this request in the priority queue
701  * @param max_queue_size at what queue size should this request be dropped
702  *        (if other requests of higher priority are in the queue)
703  * @param timeout timeout for the operation
704  * @param cont continuation to call when done
705  * @param cont_cls closure for cont
706  * @return NULL if the entry was not queued, otherwise a handle that can be used to
707  *         cancel; note that even if NULL is returned, the callback will be invoked
708  *         (or rather, will already have been invoked)
709  */
710 struct GNUNET_DATASTORE_QueueEntry *
711 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
712                       int rid,
713                       const GNUNET_HashCode * key,
714                       uint32_t size,
715                       const void *data,
716                       enum GNUNET_BLOCK_Type type,
717                       uint32_t priority,
718                       uint32_t anonymity,
719                       struct GNUNET_TIME_Absolute expiration,
720                       unsigned int queue_priority,
721                       unsigned int max_queue_size,
722                       struct GNUNET_TIME_Relative timeout,
723                       GNUNET_DATASTORE_ContinuationWithStatus cont,
724                       void *cont_cls)
725 {
726   struct GNUNET_DATASTORE_QueueEntry *qe;
727   struct DataMessage *dm;
728   size_t msize;
729   union QueueContext qc;
730
731 #if DEBUG_DATASTORE
732   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
733               "Asked to put %u bytes of data under key `%s'\n",
734               size,
735               GNUNET_h2s (key));
736 #endif
737   msize = sizeof(struct DataMessage) + size;
738   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
739   qc.sc.cont = cont;
740   qc.sc.cont_cls = cont_cls;
741   qe = make_queue_entry (h, msize,
742                          queue_priority, max_queue_size, timeout,
743                          &process_status_message, &qc);
744   if (qe == NULL)
745     return NULL;
746   dm = (struct DataMessage* ) &qe[1];
747   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
748   dm->header.size = htons(msize);
749   dm->rid = htonl(rid);
750   dm->size = htonl(size);
751   dm->type = htonl(type);
752   dm->priority = htonl(priority);
753   dm->anonymity = htonl(anonymity);
754   dm->uid = GNUNET_htonll(0);
755   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
756   dm->key = *key;
757   memcpy (&dm[1], data, size);
758   process_queue (h);
759   return qe;
760 }
761
762
763 /**
764  * Reserve space in the datastore.  This function should be used
765  * to avoid "out of space" failures during a longer sequence of "put"
766  * operations (for example, when a file is being inserted).
767  *
768  * @param h handle to the datastore
769  * @param amount how much space (in bytes) should be reserved (for content only)
770  * @param entries how many entries will be created (to calculate per-entry overhead)
771  * @param queue_priority ranking of this request in the priority queue
772  * @param max_queue_size at what queue size should this request be dropped
773  *        (if other requests of higher priority are in the queue)
774  * @param timeout how long to wait at most for a response (or before dying in queue)
775  * @param cont continuation to call when done; "success" will be set to
776  *             a positive reservation value if space could be reserved.
777  * @param cont_cls closure for cont
778  * @return NULL if the entry was not queued, otherwise a handle that can be used to
779  *         cancel; note that even if NULL is returned, the callback will be invoked
780  *         (or rather, will already have been invoked)
781  */
782 struct GNUNET_DATASTORE_QueueEntry *
783 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
784                           uint64_t amount,
785                           uint32_t entries,
786                           unsigned int queue_priority,
787                           unsigned int max_queue_size,
788                           struct GNUNET_TIME_Relative timeout,
789                           GNUNET_DATASTORE_ContinuationWithStatus cont,
790                           void *cont_cls)
791 {
792   struct GNUNET_DATASTORE_QueueEntry *qe;
793   struct ReserveMessage *rm;
794   union QueueContext qc;
795
796   if (cont == NULL)
797     cont = &drop_status_cont;
798 #if DEBUG_DATASTORE
799   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800               "Asked to reserve %llu bytes of data and %u entries'\n",
801               (unsigned long long) amount,
802               (unsigned int) entries);
803 #endif
804   qc.sc.cont = cont;
805   qc.sc.cont_cls = cont_cls;
806   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
807                          queue_priority, max_queue_size, timeout,
808                          &process_status_message, &qc);
809   if (qe == NULL)
810     return NULL;
811   rm = (struct ReserveMessage*) &qe[1];
812   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
813   rm->header.size = htons(sizeof (struct ReserveMessage));
814   rm->entries = htonl(entries);
815   rm->amount = GNUNET_htonll(amount);
816   process_queue (h);
817   return qe;
818 }
819
820
821 /**
822  * Signal that all of the data for which a reservation was made has
823  * been stored and that whatever excess space might have been reserved
824  * can now be released.
825  *
826  * @param h handle to the datastore
827  * @param rid reservation ID (value of "success" in original continuation
828  *        from the "reserve" function).
829  * @param queue_priority ranking of this request in the priority queue
830  * @param max_queue_size at what queue size should this request be dropped
831  *        (if other requests of higher priority are in the queue)
832  * @param queue_priority ranking of this request in the priority queue
833  * @param max_queue_size at what queue size should this request be dropped
834  *        (if other requests of higher priority are in the queue)
835  * @param timeout how long to wait at most for a response
836  * @param cont continuation to call when done
837  * @param cont_cls closure for cont
838  * @return NULL if the entry was not queued, otherwise a handle that can be used to
839  *         cancel; note that even if NULL is returned, the callback will be invoked
840  *         (or rather, will already have been invoked)
841  */
842 struct GNUNET_DATASTORE_QueueEntry *
843 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
844                                   int rid,
845                                   unsigned int queue_priority,
846                                   unsigned int max_queue_size,
847                                   struct GNUNET_TIME_Relative timeout,
848                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
849                                   void *cont_cls)
850 {
851   struct GNUNET_DATASTORE_QueueEntry *qe;
852   struct ReleaseReserveMessage *rrm;
853   union QueueContext qc;
854
855   if (cont == NULL)
856     cont = &drop_status_cont;
857 #if DEBUG_DATASTORE
858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859               "Asked to release reserve %d\n",
860               rid);
861 #endif
862   qc.sc.cont = cont;
863   qc.sc.cont_cls = cont_cls;
864   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
865                          queue_priority, max_queue_size, timeout,
866                          &process_status_message, &qc);
867   if (qe == NULL)
868     return NULL;
869   rrm = (struct ReleaseReserveMessage*) &qe[1];
870   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
871   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
872   rrm->rid = htonl(rid);
873   process_queue (h);
874   return qe;
875 }
876
877
878 /**
879  * Update a value in the datastore.
880  *
881  * @param h handle to the datastore
882  * @param uid identifier for the value
883  * @param priority how much to increase the priority of the value
884  * @param expiration new expiration value should be MAX of existing and this argument
885  * @param queue_priority ranking of this request in the priority queue
886  * @param max_queue_size at what queue size should this request be dropped
887  *        (if other requests of higher priority are in the queue)
888  * @param timeout how long to wait at most for a response
889  * @param cont continuation to call when done
890  * @param cont_cls closure for cont
891  * @return NULL if the entry was not queued, otherwise a handle that can be used to
892  *         cancel; note that even if NULL is returned, the callback will be invoked
893  *         (or rather, will already have been invoked)
894  */
895 struct GNUNET_DATASTORE_QueueEntry *
896 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
897                          unsigned long long uid,
898                          uint32_t priority,
899                          struct GNUNET_TIME_Absolute expiration,
900                          unsigned int queue_priority,
901                          unsigned int max_queue_size,
902                          struct GNUNET_TIME_Relative timeout,
903                          GNUNET_DATASTORE_ContinuationWithStatus cont,
904                          void *cont_cls)
905 {
906   struct GNUNET_DATASTORE_QueueEntry *qe;
907   struct UpdateMessage *um;
908   union QueueContext qc;
909
910   if (cont == NULL)
911     cont = &drop_status_cont;
912 #if DEBUG_DATASTORE
913   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
915               uid,
916               (unsigned int) priority,
917               (unsigned long long) expiration.value);
918 #endif
919   qc.sc.cont = cont;
920   qc.sc.cont_cls = cont_cls;
921   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
922                          queue_priority, max_queue_size, timeout,
923                          &process_status_message, &qc);
924   if (qe == NULL)
925     return NULL;
926   um = (struct UpdateMessage*) &qe[1];
927   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
928   um->header.size = htons(sizeof (struct UpdateMessage));
929   um->priority = htonl(priority);
930   um->expiration = GNUNET_TIME_absolute_hton(expiration);
931   um->uid = GNUNET_htonll(uid);
932   process_queue (h);
933   return qe;
934 }
935
936
937 /**
938  * Explicitly remove some content from the database.
939  * The "cont"inuation will be called with status
940  * "GNUNET_OK" if content was removed, "GNUNET_NO"
941  * if no matching entry was found and "GNUNET_SYSERR"
942  * on all other types of errors.
943  *
944  * @param h handle to the datastore
945  * @param key key for the value
946  * @param size number of bytes in data
947  * @param data content stored
948  * @param queue_priority ranking of this request in the priority queue
949  * @param max_queue_size at what queue size should this request be dropped
950  *        (if other requests of higher priority are in the queue)
951  * @param timeout how long to wait at most for a response
952  * @param cont continuation to call when done
953  * @param cont_cls closure for cont
954  * @return NULL if the entry was not queued, otherwise a handle that can be used to
955  *         cancel; note that even if NULL is returned, the callback will be invoked
956  *         (or rather, will already have been invoked)
957  */
958 struct GNUNET_DATASTORE_QueueEntry *
959 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
960                          const GNUNET_HashCode *key,
961                          uint32_t size, 
962                          const void *data,
963                          unsigned int queue_priority,
964                          unsigned int max_queue_size,
965                          struct GNUNET_TIME_Relative timeout,
966                          GNUNET_DATASTORE_ContinuationWithStatus cont,
967                          void *cont_cls)
968 {
969   struct GNUNET_DATASTORE_QueueEntry *qe;
970   struct DataMessage *dm;
971   size_t msize;
972   union QueueContext qc;
973
974   if (cont == NULL)
975     cont = &drop_status_cont;
976 #if DEBUG_DATASTORE
977   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
978               "Asked to remove %u bytes under key `%s'\n",
979               size,
980               GNUNET_h2s (key));
981 #endif
982   qc.sc.cont = cont;
983   qc.sc.cont_cls = cont_cls;
984   msize = sizeof(struct DataMessage) + size;
985   GNUNET_assert (msize <= GNUNET_SERVER_MAX_MESSAGE_SIZE);
986   qe = make_queue_entry (h, msize,
987                          queue_priority, max_queue_size, timeout,
988                          &process_status_message, &qc);
989   if (qe == NULL)
990     return NULL;
991   dm = (struct DataMessage*) &qe[1];
992   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
993   dm->header.size = htons(msize);
994   dm->rid = htonl(0);
995   dm->size = htonl(size);
996   dm->type = htonl(0);
997   dm->priority = htonl(0);
998   dm->anonymity = htonl(0);
999   dm->uid = GNUNET_htonll(0);
1000   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1001   dm->key = *key;
1002   memcpy (&dm[1], data, size);
1003   process_queue (h);
1004   return qe;
1005 }
1006
1007
1008 /**
1009  * Type of a function to call when we receive a message
1010  * from the service.
1011  *
1012  * @param cls closure
1013  * @param msg message received, NULL on timeout or fatal error
1014  */
1015 static void 
1016 process_result_message (void *cls,
1017                         const struct GNUNET_MessageHeader * msg)
1018 {
1019   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1020   struct GNUNET_DATASTORE_Handle *h = qe->h;
1021   struct ResultContext rc = qe->qc.rc;
1022   const struct DataMessage *dm;
1023   int was_transmitted;
1024
1025   if (msg == NULL)
1026     {
1027       was_transmitted = qe->was_transmitted;
1028       free_queue_entry (qe);
1029       if (was_transmitted == GNUNET_YES)
1030         {
1031           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1032                       _("Failed to receive response from database.\n"));
1033           do_disconnect (h);
1034         }
1035       if (rc.iter != NULL)
1036         rc.iter (rc.iter_cls,
1037                  NULL, 0, NULL, 0, 0, 0, 
1038                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1039       return;
1040     }
1041   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1042   GNUNET_assert (h->queue_head == qe);
1043   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1044     {
1045       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1046 #if DEBUG_DATASTORE
1047       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1048                   "Received end of result set\n");
1049 #endif
1050       free_queue_entry (qe);
1051       if (rc.iter != NULL)
1052         rc.iter (rc.iter_cls,
1053                  NULL, 0, NULL, 0, 0, 0, 
1054                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1055       process_queue (h);
1056       return;
1057     }
1058   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1059        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1060        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1061     {
1062       GNUNET_break (0);
1063       free_queue_entry (qe);
1064       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1065       do_disconnect (h);
1066       if (rc.iter != NULL)
1067         rc.iter (rc.iter_cls,
1068                  NULL, 0, NULL, 0, 0, 0, 
1069                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1070       return;
1071     }
1072   if (rc.iter == NULL)
1073     {
1074       /* abort iteration */
1075       do_disconnect (h);
1076       return;
1077     }
1078   dm = (const struct DataMessage*) msg;
1079 #if DEBUG_DATASTORE
1080   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1081               "Received result %llu with type %u and size %u with key %s\n",
1082               (unsigned long long) GNUNET_ntohll(dm->uid),
1083               ntohl(dm->type),
1084               ntohl(dm->size),
1085               GNUNET_h2s(&dm->key));
1086 #endif
1087   rc.iter (rc.iter_cls,
1088            &dm->key,
1089            ntohl(dm->size),
1090            &dm[1],
1091            ntohl(dm->type),
1092            ntohl(dm->priority),
1093            ntohl(dm->anonymity),
1094            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1095            GNUNET_ntohll(dm->uid));
1096 }
1097
1098
1099 /**
1100  * Get a random value from the datastore.
1101  *
1102  * @param h handle to the datastore
1103  * @param queue_priority ranking of this request in the priority queue
1104  * @param max_queue_size at what queue size should this request be dropped
1105  *        (if other requests of higher priority are in the queue)
1106  * @param timeout how long to wait at most for a response
1107  * @param iter function to call on a random value; it
1108  *        will be called once with a value (if available)
1109  *        and always once with a value of NULL.
1110  * @param iter_cls closure for iter
1111  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1112  *         cancel; note that even if NULL is returned, the callback will be invoked
1113  *         (or rather, will already have been invoked)
1114  */
1115 struct GNUNET_DATASTORE_QueueEntry *
1116 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1117                              unsigned int queue_priority,
1118                              unsigned int max_queue_size,
1119                              struct GNUNET_TIME_Relative timeout,
1120                              GNUNET_DATASTORE_Iterator iter, 
1121                              void *iter_cls)
1122 {
1123   struct GNUNET_DATASTORE_QueueEntry *qe;
1124   struct GNUNET_MessageHeader *m;
1125   union QueueContext qc;
1126
1127 #if DEBUG_DATASTORE
1128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129               "Asked to get random entry in %llu ms\n",
1130               (unsigned long long) timeout.value);
1131 #endif
1132   qc.rc.iter = iter;
1133   qc.rc.iter_cls = iter_cls;
1134   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1135                          queue_priority, max_queue_size, timeout,
1136                          &process_result_message, &qc);
1137   if (qe == NULL)
1138     return NULL;    
1139   m = (struct GNUNET_MessageHeader*) &qe[1];
1140   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1141   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1142   process_queue (h);
1143   return qe;
1144 }
1145
1146
1147
1148 /**
1149  * Iterate over the results for a particular key
1150  * in the datastore.  The iterator will only be called
1151  * once initially; if the first call did contain a
1152  * result, further results can be obtained by calling
1153  * "GNUNET_DATASTORE_get_next" with the given argument.
1154  *
1155  * @param h handle to the datastore
1156  * @param key maybe NULL (to match all entries)
1157  * @param type desired type, 0 for any
1158  * @param queue_priority ranking of this request in the priority queue
1159  * @param max_queue_size at what queue size should this request be dropped
1160  *        (if other requests of higher priority are in the queue)
1161  * @param timeout how long to wait at most for a response
1162  * @param iter function to call on each matching value;
1163  *        will be called once with a NULL value at the end
1164  * @param iter_cls closure for iter
1165  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1166  *         cancel; note that even if NULL is returned, the callback will be invoked
1167  *         (or rather, will already have been invoked)
1168  */
1169 struct GNUNET_DATASTORE_QueueEntry *
1170 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1171                       const GNUNET_HashCode * key,
1172                       enum GNUNET_BLOCK_Type type,
1173                       unsigned int queue_priority,
1174                       unsigned int max_queue_size,
1175                       struct GNUNET_TIME_Relative timeout,
1176                       GNUNET_DATASTORE_Iterator iter, 
1177                       void *iter_cls)
1178 {
1179   struct GNUNET_DATASTORE_QueueEntry *qe;
1180   struct GetMessage *gm;
1181   union QueueContext qc;
1182
1183 #if DEBUG_DATASTORE
1184   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1185               "Asked to look for data of type %u under key `%s'\n",
1186               (unsigned int) type,
1187               GNUNET_h2s (key));
1188 #endif
1189   qc.rc.iter = iter;
1190   qc.rc.iter_cls = iter_cls;
1191   qe = make_queue_entry (h, sizeof(struct GetMessage),
1192                          queue_priority, max_queue_size, timeout,
1193                          &process_result_message, &qc);
1194   if (qe == NULL)
1195     return NULL;
1196   gm = (struct GetMessage*) &qe[1];
1197   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1198   gm->type = htonl(type);
1199   if (key != NULL)
1200     {
1201       gm->header.size = htons(sizeof (struct GetMessage));
1202       gm->key = *key;
1203     }
1204   else
1205     {
1206       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1207     }
1208   process_queue (h);
1209   return qe;
1210 }
1211
1212
1213 /**
1214  * Function called to trigger obtaining the next result
1215  * from the datastore.
1216  * 
1217  * @param h handle to the datastore
1218  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1219  *        iteration (with a final call to "iter" with key/data == NULL).
1220  */
1221 void 
1222 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1223                            int more)
1224 {
1225   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1226   struct ResultContext rc = qe->qc.rc;
1227
1228   GNUNET_assert (NULL != qe);
1229   GNUNET_assert (&process_result_message == qe->response_proc);
1230   if (GNUNET_YES == more)
1231     {     
1232       GNUNET_CLIENT_receive (h->client,
1233                              qe->response_proc,
1234                              qe,
1235                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1236       return;
1237     }
1238   free_queue_entry (qe);
1239   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1240   do_disconnect (h);
1241   rc.iter (rc.iter_cls,
1242            NULL, 0, NULL, 0, 0, 0, 
1243            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1244 }
1245
1246
1247 /**
1248  * Cancel a datastore operation.  The final callback from the
1249  * operation must not have been done yet.
1250  * 
1251  * @param qe operation to cancel
1252  */
1253 void
1254 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1255 {
1256   struct GNUNET_DATASTORE_Handle *h;
1257   int reconnect;
1258
1259   h = qe->h;
1260   reconnect = GNUNET_NO;
1261   if (GNUNET_YES == qe->was_transmitted) 
1262     {
1263       if (qe->response_proc == &process_result_message) 
1264         {
1265           qe->qc.rc.iter = NULL;    
1266           return;
1267         }
1268       reconnect = GNUNET_YES;
1269     }
1270   free_queue_entry (qe);
1271   h->queue_size--;
1272   if (reconnect)
1273     {
1274       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1275       do_disconnect (h);
1276     }
1277 }
1278
1279
1280 /* end of datastore_api.c */