abc8c7645d4e9580bd96ec48d202553b62ded498
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162   
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     {
476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477                   "DATASTORE reconnect failed (fatally)\n");
478       return;
479     }
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Reconnected to DATASTORE\n");
483 #endif
484   process_queue (h);
485 }
486
487
488 /**
489  * Disconnect from the service and then try reconnecting to the datastore service
490  * after some delay.
491  *
492  * @param h handle to datastore to disconnect and reconnect
493  */
494 static void
495 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
496 {
497   if (h->client == NULL)
498     {
499 #if DEBUG_DATASTORE
500       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                   "client NULL in disconnect, will not try to reconnect\n");
502 #endif
503       return;
504     }
505 #if 0
506   GNUNET_STATISTICS_update (stats,
507                             gettext_noop ("# reconnected to DATASTORE"),
508                             1,
509                             GNUNET_NO);
510 #endif
511   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
512   h->client = NULL;
513   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
514                                                     h->retry_time,
515                                                     &try_reconnect,
516                                                     h);      
517 }
518
519
520 /**
521  * Transmit request from queue to datastore service.
522  *
523  * @param cls the 'struct GNUNET_DATASTORE_Handle'
524  * @param size number of bytes that can be copied to buf
525  * @param buf where to copy the drop message
526  * @return number of bytes written to buf
527  */
528 static size_t
529 transmit_request (void *cls,
530                   size_t size, 
531                   void *buf)
532 {
533   struct GNUNET_DATASTORE_Handle *h = cls;
534   struct GNUNET_DATASTORE_QueueEntry *qe;
535   size_t msize;
536
537   h->th = NULL;
538   if (NULL == (qe = h->queue_head))
539     return 0; /* no entry in queue */
540   if (buf == NULL)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
543                   _("Failed to transmit request to DATASTORE.\n"));
544       do_disconnect (h);
545       return 0;
546     }
547   if (size < (msize = qe->message_size))
548     {
549       process_queue (h);
550       return 0;
551     }
552  #if DEBUG_DATASTORE
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Transmitting %u byte request to DATASTORE\n",
555               msize);
556 #endif
557   memcpy (buf, &qe[1], msize);
558   qe->was_transmitted = GNUNET_YES;
559   GNUNET_SCHEDULER_cancel (h->sched,
560                            qe->task);
561   qe->task = GNUNET_SCHEDULER_NO_TASK;
562   h->in_receive = GNUNET_YES;
563   GNUNET_CLIENT_receive (h->client,
564                          qe->response_proc,
565                          qe,
566                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
567   return msize;
568 }
569
570
571 /**
572  * Process entries in the queue (or do nothing if we are already
573  * doing so).
574  * 
575  * @param h handle to the datastore
576  */
577 static void
578 process_queue (struct GNUNET_DATASTORE_Handle *h)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   if (NULL == (qe = h->queue_head))
583     {
584 #if DEBUG_DATASTORE > 1
585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                   "Queue empty\n");
587 #endif
588       return; /* no entry in queue */
589     }
590   if (qe->was_transmitted == GNUNET_YES)
591     {
592 #if DEBUG_DATASTORE > 1
593       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                   "Head request already transmitted\n");
595 #endif
596       return; /* waiting for replies */
597     }
598   if (h->th != NULL)
599     {
600 #if DEBUG_DATASTORE > 1
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                   "Pending transmission request\n");
603 #endif
604       return; /* request pending */
605     }
606   if (h->client == NULL)
607     {
608 #if DEBUG_DATASTORE > 1
609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610                   "Not connected\n");
611 #endif
612       return; /* waiting for reconnect */
613     }
614 #if DEBUG_DATASTORE
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Queueing %u byte request to DATASTORE\n",
617               qe->message_size);
618 #endif
619   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
620                                                qe->message_size,
621                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
622                                                GNUNET_YES,
623                                                &transmit_request,
624                                                h);
625 }
626
627
628 /**
629  * Dummy continuation used to do nothing (but be non-zero).
630  *
631  * @param cls closure
632  * @param result result 
633  * @param emsg error message
634  */
635 static void
636 drop_status_cont (void *cls, int result, const char *emsg)
637 {
638   /* do nothing */
639 }
640
641
642 static void
643 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
644 {
645   struct GNUNET_DATASTORE_Handle *h = qe->h;
646
647   GNUNET_CONTAINER_DLL_remove (h->queue_head,
648                                h->queue_tail,
649                                qe);
650   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
651     {
652       GNUNET_SCHEDULER_cancel (h->sched,
653                                qe->task);
654       qe->task = GNUNET_SCHEDULER_NO_TASK;
655     }
656   h->queue_size--;
657   GNUNET_free (qe);
658 }
659
660 /**
661  * Type of a function to call when we receive a message
662  * from the service.
663  *
664  * @param cls closure
665  * @param msg message received, NULL on timeout or fatal error
666  */
667 static void 
668 process_status_message (void *cls,
669                         const struct
670                         GNUNET_MessageHeader * msg)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
673   struct GNUNET_DATASTORE_Handle *h = qe->h;
674   struct StatusContext rc = qe->qc.sc;
675   const struct StatusMessage *sm;
676   const char *emsg;
677   int32_t status;
678   int was_transmitted;
679
680   h->in_receive = GNUNET_NO;
681   was_transmitted = qe->was_transmitted;
682   if (msg == NULL)
683     {      
684       free_queue_entry (qe);
685       if (NULL == h->client)
686         return; /* forced disconnect */
687       if (rc.cont != NULL)
688         rc.cont (rc.cont_cls, 
689                  GNUNET_SYSERR,
690                  _("Failed to receive status response from database."));
691       if (was_transmitted == GNUNET_YES)
692         do_disconnect (h);
693       return;
694     }
695   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
696   GNUNET_assert (h->queue_head == qe);
697   free_queue_entry (qe);
698   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
699        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
700     {
701       GNUNET_break (0);
702       h->retry_time = GNUNET_TIME_UNIT_ZERO;
703       do_disconnect (h);
704       if (rc.cont != NULL)
705         rc.cont (rc.cont_cls, 
706                  GNUNET_SYSERR,
707                  _("Error reading response from datastore service"));
708       return;
709     }
710   sm = (const struct StatusMessage*) msg;
711   status = ntohl(sm->status);
712   emsg = NULL;
713   if (ntohs(msg->size) > sizeof(struct StatusMessage))
714     {
715       emsg = (const char*) &sm[1];
716       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
717         {
718           GNUNET_break (0);
719           emsg = _("Invalid error message received from datastore service");
720         }
721     }  
722   if ( (status == GNUNET_SYSERR) &&
723        (emsg == NULL) )
724     {
725       GNUNET_break (0);
726       emsg = _("Invalid error message received from datastore service");
727     }
728 #if DEBUG_DATASTORE
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Received status %d/%s\n",
731               (int) status,
732               emsg);
733 #endif
734   process_queue (h);
735   if (rc.cont != NULL)
736     rc.cont (rc.cont_cls, 
737              status,
738              emsg);
739 }
740
741
742 /**
743  * Store an item in the datastore.  If the item is already present,
744  * the priorities are summed up and the higher expiration time and
745  * lower anonymity level is used.
746  *
747  * @param h handle to the datastore
748  * @param rid reservation ID to use (from "reserve"); use 0 if no
749  *            prior reservation was made
750  * @param key key for the value
751  * @param size number of bytes in data
752  * @param data content stored
753  * @param type type of the content
754  * @param priority priority of the content
755  * @param anonymity anonymity-level for the content
756  * @param expiration expiration time for the content
757  * @param queue_priority ranking of this request in the priority queue
758  * @param max_queue_size at what queue size should this request be dropped
759  *        (if other requests of higher priority are in the queue)
760  * @param timeout timeout for the operation
761  * @param cont continuation to call when done
762  * @param cont_cls closure for cont
763  * @return NULL if the entry was not queued, otherwise a handle that can be used to
764  *         cancel; note that even if NULL is returned, the callback will be invoked
765  *         (or rather, will already have been invoked)
766  */
767 struct GNUNET_DATASTORE_QueueEntry *
768 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
769                       int rid,
770                       const GNUNET_HashCode * key,
771                       size_t size,
772                       const void *data,
773                       enum GNUNET_BLOCK_Type type,
774                       uint32_t priority,
775                       uint32_t anonymity,
776                       struct GNUNET_TIME_Absolute expiration,
777                       unsigned int queue_priority,
778                       unsigned int max_queue_size,
779                       struct GNUNET_TIME_Relative timeout,
780                       GNUNET_DATASTORE_ContinuationWithStatus cont,
781                       void *cont_cls)
782 {
783   struct GNUNET_DATASTORE_QueueEntry *qe;
784   struct DataMessage *dm;
785   size_t msize;
786   union QueueContext qc;
787
788 #if DEBUG_DATASTORE
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Asked to put %u bytes of data under key `%s'\n",
791               size,
792               GNUNET_h2s (key));
793 #endif
794   msize = sizeof(struct DataMessage) + size;
795   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
796   qc.sc.cont = cont;
797   qc.sc.cont_cls = cont_cls;
798   qe = make_queue_entry (h, msize,
799                          queue_priority, max_queue_size, timeout,
800                          &process_status_message, &qc);
801   if (qe == NULL)
802     {
803 #if DEBUG_DATASTORE
804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805                   "Could not create queue entry for PUT\n");
806 #endif
807       return NULL;
808     }
809   dm = (struct DataMessage* ) &qe[1];
810   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
811   dm->header.size = htons(msize);
812   dm->rid = htonl(rid);
813   dm->size = htonl( (uint32_t) size);
814   dm->type = htonl(type);
815   dm->priority = htonl(priority);
816   dm->anonymity = htonl(anonymity);
817   dm->uid = GNUNET_htonll(0);
818   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
819   dm->key = *key;
820   memcpy (&dm[1], data, size);
821   process_queue (h);
822   return qe;
823 }
824
825
826 /**
827  * Reserve space in the datastore.  This function should be used
828  * to avoid "out of space" failures during a longer sequence of "put"
829  * operations (for example, when a file is being inserted).
830  *
831  * @param h handle to the datastore
832  * @param amount how much space (in bytes) should be reserved (for content only)
833  * @param entries how many entries will be created (to calculate per-entry overhead)
834  * @param queue_priority ranking of this request in the priority queue
835  * @param max_queue_size at what queue size should this request be dropped
836  *        (if other requests of higher priority are in the queue)
837  * @param timeout how long to wait at most for a response (or before dying in queue)
838  * @param cont continuation to call when done; "success" will be set to
839  *             a positive reservation value if space could be reserved.
840  * @param cont_cls closure for cont
841  * @return NULL if the entry was not queued, otherwise a handle that can be used to
842  *         cancel; note that even if NULL is returned, the callback will be invoked
843  *         (or rather, will already have been invoked)
844  */
845 struct GNUNET_DATASTORE_QueueEntry *
846 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
847                           uint64_t amount,
848                           uint32_t entries,
849                           unsigned int queue_priority,
850                           unsigned int max_queue_size,
851                           struct GNUNET_TIME_Relative timeout,
852                           GNUNET_DATASTORE_ContinuationWithStatus cont,
853                           void *cont_cls)
854 {
855   struct GNUNET_DATASTORE_QueueEntry *qe;
856   struct ReserveMessage *rm;
857   union QueueContext qc;
858
859   if (cont == NULL)
860     cont = &drop_status_cont;
861 #if DEBUG_DATASTORE
862   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
863               "Asked to reserve %llu bytes of data and %u entries'\n",
864               (unsigned long long) amount,
865               (unsigned int) entries);
866 #endif
867   qc.sc.cont = cont;
868   qc.sc.cont_cls = cont_cls;
869   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
870                          queue_priority, max_queue_size, timeout,
871                          &process_status_message, &qc);
872   if (qe == NULL)
873     {
874 #if DEBUG_DATASTORE
875       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
876                   "Could not create queue entry to reserve\n");
877 #endif
878       return NULL;
879     }
880   rm = (struct ReserveMessage*) &qe[1];
881   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
882   rm->header.size = htons(sizeof (struct ReserveMessage));
883   rm->entries = htonl(entries);
884   rm->amount = GNUNET_htonll(amount);
885   process_queue (h);
886   return qe;
887 }
888
889
890 /**
891  * Signal that all of the data for which a reservation was made has
892  * been stored and that whatever excess space might have been reserved
893  * can now be released.
894  *
895  * @param h handle to the datastore
896  * @param rid reservation ID (value of "success" in original continuation
897  *        from the "reserve" function).
898  * @param queue_priority ranking of this request in the priority queue
899  * @param max_queue_size at what queue size should this request be dropped
900  *        (if other requests of higher priority are in the queue)
901  * @param queue_priority ranking of this request in the priority queue
902  * @param max_queue_size at what queue size should this request be dropped
903  *        (if other requests of higher priority are in the queue)
904  * @param timeout how long to wait at most for a response
905  * @param cont continuation to call when done
906  * @param cont_cls closure for cont
907  * @return NULL if the entry was not queued, otherwise a handle that can be used to
908  *         cancel; note that even if NULL is returned, the callback will be invoked
909  *         (or rather, will already have been invoked)
910  */
911 struct GNUNET_DATASTORE_QueueEntry *
912 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
913                                   int rid,
914                                   unsigned int queue_priority,
915                                   unsigned int max_queue_size,
916                                   struct GNUNET_TIME_Relative timeout,
917                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
918                                   void *cont_cls)
919 {
920   struct GNUNET_DATASTORE_QueueEntry *qe;
921   struct ReleaseReserveMessage *rrm;
922   union QueueContext qc;
923
924   if (cont == NULL)
925     cont = &drop_status_cont;
926 #if DEBUG_DATASTORE
927   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
928               "Asked to release reserve %d\n",
929               rid);
930 #endif
931   qc.sc.cont = cont;
932   qc.sc.cont_cls = cont_cls;
933   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
934                          queue_priority, max_queue_size, timeout,
935                          &process_status_message, &qc);
936   if (qe == NULL)
937     {
938 #if DEBUG_DATASTORE
939       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940                   "Could not create queue entry to release reserve\n");
941 #endif
942       return NULL;
943     }
944   rrm = (struct ReleaseReserveMessage*) &qe[1];
945   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
946   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
947   rrm->rid = htonl(rid);
948   process_queue (h);
949   return qe;
950 }
951
952
953 /**
954  * Update a value in the datastore.
955  *
956  * @param h handle to the datastore
957  * @param uid identifier for the value
958  * @param priority how much to increase the priority of the value
959  * @param expiration new expiration value should be MAX of existing and this argument
960  * @param queue_priority ranking of this request in the priority queue
961  * @param max_queue_size at what queue size should this request be dropped
962  *        (if other requests of higher priority are in the queue)
963  * @param timeout how long to wait at most for a response
964  * @param cont continuation to call when done
965  * @param cont_cls closure for cont
966  * @return NULL if the entry was not queued, otherwise a handle that can be used to
967  *         cancel; note that even if NULL is returned, the callback will be invoked
968  *         (or rather, will already have been invoked)
969  */
970 struct GNUNET_DATASTORE_QueueEntry *
971 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
972                          unsigned long long uid,
973                          uint32_t priority,
974                          struct GNUNET_TIME_Absolute expiration,
975                          unsigned int queue_priority,
976                          unsigned int max_queue_size,
977                          struct GNUNET_TIME_Relative timeout,
978                          GNUNET_DATASTORE_ContinuationWithStatus cont,
979                          void *cont_cls)
980 {
981   struct GNUNET_DATASTORE_QueueEntry *qe;
982   struct UpdateMessage *um;
983   union QueueContext qc;
984
985   if (cont == NULL)
986     cont = &drop_status_cont;
987 #if DEBUG_DATASTORE
988   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
990               uid,
991               (unsigned int) priority,
992               (unsigned long long) expiration.value);
993 #endif
994   qc.sc.cont = cont;
995   qc.sc.cont_cls = cont_cls;
996   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
997                          queue_priority, max_queue_size, timeout,
998                          &process_status_message, &qc);
999   if (qe == NULL)
1000     {
1001 #if DEBUG_DATASTORE
1002       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003                   "Could not create queue entry for UPDATE\n");
1004 #endif
1005       return NULL;
1006     }
1007   um = (struct UpdateMessage*) &qe[1];
1008   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1009   um->header.size = htons(sizeof (struct UpdateMessage));
1010   um->priority = htonl(priority);
1011   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1012   um->uid = GNUNET_htonll(uid);
1013   process_queue (h);
1014   return qe;
1015 }
1016
1017
1018 /**
1019  * Explicitly remove some content from the database.
1020  * The "cont"inuation will be called with status
1021  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1022  * if no matching entry was found and "GNUNET_SYSERR"
1023  * on all other types of errors.
1024  *
1025  * @param h handle to the datastore
1026  * @param key key for the value
1027  * @param size number of bytes in data
1028  * @param data content stored
1029  * @param queue_priority ranking of this request in the priority queue
1030  * @param max_queue_size at what queue size should this request be dropped
1031  *        (if other requests of higher priority are in the queue)
1032  * @param timeout how long to wait at most for a response
1033  * @param cont continuation to call when done
1034  * @param cont_cls closure for cont
1035  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1036  *         cancel; note that even if NULL is returned, the callback will be invoked
1037  *         (or rather, will already have been invoked)
1038  */
1039 struct GNUNET_DATASTORE_QueueEntry *
1040 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1041                          const GNUNET_HashCode *key,
1042                          size_t size, 
1043                          const void *data,
1044                          unsigned int queue_priority,
1045                          unsigned int max_queue_size,
1046                          struct GNUNET_TIME_Relative timeout,
1047                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1048                          void *cont_cls)
1049 {
1050   struct GNUNET_DATASTORE_QueueEntry *qe;
1051   struct DataMessage *dm;
1052   size_t msize;
1053   union QueueContext qc;
1054
1055   if (cont == NULL)
1056     cont = &drop_status_cont;
1057 #if DEBUG_DATASTORE
1058   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1059               "Asked to remove %u bytes under key `%s'\n",
1060               size,
1061               GNUNET_h2s (key));
1062 #endif
1063   qc.sc.cont = cont;
1064   qc.sc.cont_cls = cont_cls;
1065   msize = sizeof(struct DataMessage) + size;
1066   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1067   qe = make_queue_entry (h, msize,
1068                          queue_priority, max_queue_size, timeout,
1069                          &process_status_message, &qc);
1070   if (qe == NULL)
1071     return NULL;
1072   dm = (struct DataMessage*) &qe[1];
1073   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1074   dm->header.size = htons(msize);
1075   dm->rid = htonl(0);
1076   dm->size = htonl(size);
1077   dm->type = htonl(0);
1078   dm->priority = htonl(0);
1079   dm->anonymity = htonl(0);
1080   dm->uid = GNUNET_htonll(0);
1081   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1082   dm->key = *key;
1083   memcpy (&dm[1], data, size);
1084   process_queue (h);
1085   return qe;
1086 }
1087
1088
1089 /**
1090  * Type of a function to call when we receive a message
1091  * from the service.
1092  *
1093  * @param cls closure
1094  * @param msg message received, NULL on timeout or fatal error
1095  */
1096 static void 
1097 process_result_message (void *cls,
1098                         const struct GNUNET_MessageHeader * msg)
1099 {
1100   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1101   struct GNUNET_DATASTORE_Handle *h = qe->h;
1102   struct ResultContext rc = qe->qc.rc;
1103   const struct DataMessage *dm;
1104   int was_transmitted;
1105
1106   h->in_receive = GNUNET_NO;
1107   if (msg == NULL)
1108    {
1109       was_transmitted = qe->was_transmitted;
1110       free_queue_entry (qe);
1111       if (was_transmitted == GNUNET_YES)
1112         {
1113           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1114                       _("Failed to receive response from database.\n"));
1115           do_disconnect (h);
1116         }
1117       else
1118         {
1119 #if DEBUG_DATASTORE
1120           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1121                       "Request dropped due to finite datastore queue length.\n");
1122 #endif
1123         }
1124       if (rc.iter != NULL)
1125         rc.iter (rc.iter_cls,
1126                  NULL, 0, NULL, 0, 0, 0, 
1127                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1128       return;
1129     }
1130   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1131   GNUNET_assert (h->queue_head == qe);
1132   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1133     {
1134       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1135 #if DEBUG_DATASTORE
1136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137                   "Received end of result set\n");
1138 #endif
1139       free_queue_entry (qe);
1140       if (rc.iter != NULL)
1141         rc.iter (rc.iter_cls,
1142                  NULL, 0, NULL, 0, 0, 0, 
1143                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1144       process_queue (h);
1145       return;
1146     }
1147   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1148        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1149        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1150     {
1151       GNUNET_break (0);
1152       free_queue_entry (qe);
1153       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1154       do_disconnect (h);
1155       if (rc.iter != NULL)
1156         rc.iter (rc.iter_cls,
1157                  NULL, 0, NULL, 0, 0, 0, 
1158                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1159       return;
1160     }
1161   if (rc.iter == NULL)
1162     {
1163       /* abort iteration */
1164 #if DEBUG_DATASTORE
1165       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1166                   "Aborting iteration via disconnect (client has cancelled)\n");
1167 #endif
1168       free_queue_entry (qe);
1169       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1170       do_disconnect (h);
1171       return;
1172     }
1173   dm = (const struct DataMessage*) msg;
1174 #if DEBUG_DATASTORE
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Received result %llu with type %u and size %u with key %s\n",
1177               (unsigned long long) GNUNET_ntohll(dm->uid),
1178               ntohl(dm->type),
1179               ntohl(dm->size),
1180               GNUNET_h2s(&dm->key));
1181 #endif
1182   rc.iter (rc.iter_cls,
1183            &dm->key,
1184            ntohl(dm->size),
1185            &dm[1],
1186            ntohl(dm->type),
1187            ntohl(dm->priority),
1188            ntohl(dm->anonymity),
1189            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1190            GNUNET_ntohll(dm->uid));
1191 }
1192
1193
1194 /**
1195  * Get a random value from the datastore.
1196  *
1197  * @param h handle to the datastore
1198  * @param queue_priority ranking of this request in the priority queue
1199  * @param max_queue_size at what queue size should this request be dropped
1200  *        (if other requests of higher priority are in the queue)
1201  * @param timeout how long to wait at most for a response
1202  * @param iter function to call on a random value; it
1203  *        will be called once with a value (if available)
1204  *        and always once with a value of NULL.
1205  * @param iter_cls closure for iter
1206  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1207  *         cancel; note that even if NULL is returned, the callback will be invoked
1208  *         (or rather, will already have been invoked)
1209  */
1210 struct GNUNET_DATASTORE_QueueEntry *
1211 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1212                              unsigned int queue_priority,
1213                              unsigned int max_queue_size,
1214                              struct GNUNET_TIME_Relative timeout,
1215                              GNUNET_DATASTORE_Iterator iter, 
1216                              void *iter_cls)
1217 {
1218   struct GNUNET_DATASTORE_QueueEntry *qe;
1219   struct GNUNET_MessageHeader *m;
1220   union QueueContext qc;
1221
1222 #if DEBUG_DATASTORE
1223   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1224               "Asked to get random entry in %llu ms\n",
1225               (unsigned long long) timeout.value);
1226 #endif
1227   qc.rc.iter = iter;
1228   qc.rc.iter_cls = iter_cls;
1229   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1230                          queue_priority, max_queue_size, timeout,
1231                          &process_result_message, &qc);
1232   if (qe == NULL)
1233     return NULL;    
1234   m = (struct GNUNET_MessageHeader*) &qe[1];
1235   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1236   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1237   process_queue (h);
1238   return qe;
1239 }
1240
1241
1242 /**
1243  * Get a zero-anonymity value from the datastore.
1244  *
1245  * @param h handle to the datastore
1246  * @param queue_priority ranking of this request in the priority queue
1247  * @param max_queue_size at what queue size should this request be dropped
1248  *        (if other requests of higher priority are in the queue)
1249  * @param timeout how long to wait at most for a response
1250  * @param type allowed type for the operation
1251  * @param iter function to call on a random value; it
1252  *        will be called once with a value (if available)
1253  *        and always once with a value of NULL.
1254  * @param iter_cls closure for iter
1255  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1256  *         cancel; note that even if NULL is returned, the callback will be invoked
1257  *         (or rather, will already have been invoked)
1258  */
1259 struct GNUNET_DATASTORE_QueueEntry *
1260 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1261                                      unsigned int queue_priority,
1262                                      unsigned int max_queue_size,
1263                                      struct GNUNET_TIME_Relative timeout,
1264                                      enum GNUNET_BLOCK_Type type,
1265                                      GNUNET_DATASTORE_Iterator iter, 
1266                                      void *iter_cls)
1267 {
1268   struct GNUNET_DATASTORE_QueueEntry *qe;
1269   struct GetZeroAnonymityMessage *m;
1270   union QueueContext qc;
1271
1272 #if DEBUG_DATASTORE
1273   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1274               "Asked to get zero-anonymity entry in %llu ms\n",
1275               (unsigned long long) timeout.value);
1276 #endif
1277   qc.rc.iter = iter;
1278   qc.rc.iter_cls = iter_cls;
1279   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1280                          queue_priority, max_queue_size, timeout,
1281                          &process_result_message, &qc);
1282   if (qe == NULL)
1283     {
1284 #if DEBUG_DATASTORE
1285       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1286                   "Could not create queue entry for zero-anonymity iteration\n");
1287 #endif
1288       return NULL;    
1289     }
1290   m = (struct GetZeroAnonymityMessage*) &qe[1];
1291   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1292   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1293   m->type = htonl ((uint32_t) type);
1294   process_queue (h);
1295   return qe;
1296 }
1297
1298
1299
1300 /**
1301  * Iterate over the results for a particular key
1302  * in the datastore.  The iterator will only be called
1303  * once initially; if the first call did contain a
1304  * result, further results can be obtained by calling
1305  * "GNUNET_DATASTORE_get_next" with the given argument.
1306  *
1307  * @param h handle to the datastore
1308  * @param key maybe NULL (to match all entries)
1309  * @param type desired type, 0 for any
1310  * @param queue_priority ranking of this request in the priority queue
1311  * @param max_queue_size at what queue size should this request be dropped
1312  *        (if other requests of higher priority are in the queue)
1313  * @param timeout how long to wait at most for a response
1314  * @param iter function to call on each matching value;
1315  *        will be called once with a NULL value at the end
1316  * @param iter_cls closure for iter
1317  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1318  *         cancel; note that even if NULL is returned, the callback will be invoked
1319  *         (or rather, will already have been invoked)
1320  */
1321 struct GNUNET_DATASTORE_QueueEntry *
1322 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1323                       const GNUNET_HashCode * key,
1324                       enum GNUNET_BLOCK_Type type,
1325                       unsigned int queue_priority,
1326                       unsigned int max_queue_size,
1327                       struct GNUNET_TIME_Relative timeout,
1328                       GNUNET_DATASTORE_Iterator iter, 
1329                       void *iter_cls)
1330 {
1331   struct GNUNET_DATASTORE_QueueEntry *qe;
1332   struct GetMessage *gm;
1333   union QueueContext qc;
1334
1335 #if DEBUG_DATASTORE
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337               "Asked to look for data of type %u under key `%s'\n",
1338               (unsigned int) type,
1339               GNUNET_h2s (key));
1340 #endif
1341   qc.rc.iter = iter;
1342   qc.rc.iter_cls = iter_cls;
1343   qe = make_queue_entry (h, sizeof(struct GetMessage),
1344                          queue_priority, max_queue_size, timeout,
1345                          &process_result_message, &qc);
1346   if (qe == NULL)
1347     {
1348 #if DEBUG_DATASTORE
1349       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1350                   "Could not queue request for `%s'\n",
1351                   GNUNET_h2s (key));
1352 #endif
1353       return NULL;
1354     }
1355   gm = (struct GetMessage*) &qe[1];
1356   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1357   gm->type = htonl(type);
1358   if (key != NULL)
1359     {
1360       gm->header.size = htons(sizeof (struct GetMessage));
1361       gm->key = *key;
1362     }
1363   else
1364     {
1365       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1366     }
1367   process_queue (h);
1368   return qe;
1369 }
1370
1371
1372 /**
1373  * Function called to trigger obtaining the next result
1374  * from the datastore.
1375  * 
1376  * @param h handle to the datastore
1377  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1378  *        iteration (with a final call to "iter" with key/data == NULL).
1379  */
1380 void 
1381 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1382                            int more)
1383 {
1384   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1385   struct ResultContext rc = qe->qc.rc;
1386
1387   GNUNET_assert (&process_result_message == qe->response_proc);
1388   if (GNUNET_YES == more)
1389     {     
1390       h->in_receive = GNUNET_YES;
1391       GNUNET_CLIENT_receive (h->client,
1392                              qe->response_proc,
1393                              qe,
1394                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1395       return;
1396     }
1397   free_queue_entry (qe);
1398   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1399   do_disconnect (h);
1400   rc.iter (rc.iter_cls,
1401            NULL, 0, NULL, 0, 0, 0, 
1402            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1403 }
1404
1405
1406 /**
1407  * Cancel a datastore operation.  The final callback from the
1408  * operation must not have been done yet.
1409  * 
1410  * @param qe operation to cancel
1411  */
1412 void
1413 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1414 {
1415   struct GNUNET_DATASTORE_Handle *h;
1416   int reconnect;
1417
1418   h = qe->h;
1419 #if DEBUG_DATASTORE
1420   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1421                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1422                qe,
1423                qe->was_transmitted,
1424                h->queue_head == qe);
1425 #endif
1426   reconnect = GNUNET_NO;
1427   if (GNUNET_YES == qe->was_transmitted) 
1428     {
1429       if (qe->response_proc == &process_result_message) 
1430         {
1431           qe->qc.rc.iter = NULL;    
1432           if (GNUNET_YES != h->in_receive)
1433             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1434         }
1435       else
1436         {
1437           qe->qc.sc.cont = NULL;
1438         }
1439       return;
1440     }
1441   free_queue_entry (qe);
1442   process_queue (h);
1443 }
1444
1445
1446 /* end of datastore_api.c */