58340ed6cfb13a436ff764fc619d30849baa5b30
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162   
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     {
476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477                   "DATASTORE reconnect failed (fatally)\n");
478       return;
479     }
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Reconnected to DATASTORE\n");
483 #endif
484   process_queue (h);
485 }
486
487
488 /**
489  * Disconnect from the service and then try reconnecting to the datastore service
490  * after some delay.
491  *
492  * @param h handle to datastore to disconnect and reconnect
493  */
494 static void
495 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
496 {
497   if (h->client == NULL)
498     {
499 #if DEBUG_DATASTORE
500       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                   "client NULL in disconnect, will not try to reconnect\n");
502 #endif
503       return;
504     }
505 #if 0
506   GNUNET_STATISTICS_update (stats,
507                             gettext_noop ("# reconnected to DATASTORE"),
508                             1,
509                             GNUNET_NO);
510 #endif
511   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
512   h->client = NULL;
513   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
514                                                     h->retry_time,
515                                                     &try_reconnect,
516                                                     h);      
517 }
518
519
520 /**
521  * Transmit request from queue to datastore service.
522  *
523  * @param cls the 'struct GNUNET_DATASTORE_Handle'
524  * @param size number of bytes that can be copied to buf
525  * @param buf where to copy the drop message
526  * @return number of bytes written to buf
527  */
528 static size_t
529 transmit_request (void *cls,
530                   size_t size, 
531                   void *buf)
532 {
533   struct GNUNET_DATASTORE_Handle *h = cls;
534   struct GNUNET_DATASTORE_QueueEntry *qe;
535   size_t msize;
536
537   h->th = NULL;
538   if (NULL == (qe = h->queue_head))
539     return 0; /* no entry in queue */
540   if (buf == NULL)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
543                   _("Failed to transmit request to DATASTORE.\n"));
544       do_disconnect (h);
545       return 0;
546     }
547   if (size < (msize = qe->message_size))
548     {
549       process_queue (h);
550       return 0;
551     }
552  #if DEBUG_DATASTORE
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Transmitting %u byte request to DATASTORE\n",
555               msize);
556 #endif
557   memcpy (buf, &qe[1], msize);
558   qe->was_transmitted = GNUNET_YES;
559   GNUNET_SCHEDULER_cancel (h->sched,
560                            qe->task);
561   qe->task = GNUNET_SCHEDULER_NO_TASK;
562   h->in_receive = GNUNET_YES;
563   GNUNET_CLIENT_receive (h->client,
564                          qe->response_proc,
565                          qe,
566                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
567   return msize;
568 }
569
570
571 /**
572  * Process entries in the queue (or do nothing if we are already
573  * doing so).
574  * 
575  * @param h handle to the datastore
576  */
577 static void
578 process_queue (struct GNUNET_DATASTORE_Handle *h)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   if (NULL == (qe = h->queue_head))
583     {
584 #if DEBUG_DATASTORE > 1
585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                   "Queue empty\n");
587 #endif
588       return; /* no entry in queue */
589     }
590   if (qe->was_transmitted == GNUNET_YES)
591     {
592 #if DEBUG_DATASTORE > 1
593       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                   "Head request already transmitted\n");
595 #endif
596       return; /* waiting for replies */
597     }
598   if (h->th != NULL)
599     {
600 #if DEBUG_DATASTORE > 1
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                   "Pending transmission request\n");
603 #endif
604       return; /* request pending */
605     }
606   if (h->client == NULL)
607     {
608 #if DEBUG_DATASTORE > 1
609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610                   "Not connected\n");
611 #endif
612       return; /* waiting for reconnect */
613     }
614 #if DEBUG_DATASTORE
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Queueing %u byte request to DATASTORE\n",
617               qe->message_size);
618 #endif
619   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
620                                                qe->message_size,
621                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
622                                                GNUNET_YES,
623                                                &transmit_request,
624                                                h);
625 }
626
627
628 /**
629  * Dummy continuation used to do nothing (but be non-zero).
630  *
631  * @param cls closure
632  * @param result result 
633  * @param emsg error message
634  */
635 static void
636 drop_status_cont (void *cls, int result, const char *emsg)
637 {
638   /* do nothing */
639 }
640
641
642 static void
643 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
644 {
645   struct GNUNET_DATASTORE_Handle *h = qe->h;
646
647   GNUNET_CONTAINER_DLL_remove (h->queue_head,
648                                h->queue_tail,
649                                qe);
650   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
651     {
652       GNUNET_SCHEDULER_cancel (h->sched,
653                                qe->task);
654       qe->task = GNUNET_SCHEDULER_NO_TASK;
655     }
656   h->queue_size--;
657   GNUNET_free (qe);
658 }
659
660 /**
661  * Type of a function to call when we receive a message
662  * from the service.
663  *
664  * @param cls closure
665  * @param msg message received, NULL on timeout or fatal error
666  */
667 static void 
668 process_status_message (void *cls,
669                         const struct
670                         GNUNET_MessageHeader * msg)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
673   struct GNUNET_DATASTORE_Handle *h = qe->h;
674   struct StatusContext rc = qe->qc.sc;
675   const struct StatusMessage *sm;
676   const char *emsg;
677   int32_t status;
678   int was_transmitted;
679
680   h->in_receive = GNUNET_NO;
681   was_transmitted = qe->was_transmitted;
682   if (msg == NULL)
683     {      
684       free_queue_entry (qe);
685       if (NULL == h->client)
686         return; /* forced disconnect */
687       if (rc.cont != NULL)
688         rc.cont (rc.cont_cls, 
689                  GNUNET_SYSERR,
690                  _("Failed to receive status response from database."));
691       if (was_transmitted == GNUNET_YES)
692         do_disconnect (h);
693       return;
694     }
695   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
696   GNUNET_assert (h->queue_head == qe);
697   free_queue_entry (qe);
698   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
699        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
700     {
701       GNUNET_break (0);
702       h->retry_time = GNUNET_TIME_UNIT_ZERO;
703       do_disconnect (h);
704       if (rc.cont != NULL)
705         rc.cont (rc.cont_cls, 
706                  GNUNET_SYSERR,
707                  _("Error reading response from datastore service"));
708       return;
709     }
710   sm = (const struct StatusMessage*) msg;
711   status = ntohl(sm->status);
712   emsg = NULL;
713   if (ntohs(msg->size) > sizeof(struct StatusMessage))
714     {
715       emsg = (const char*) &sm[1];
716       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
717         {
718           GNUNET_break (0);
719           emsg = _("Invalid error message received from datastore service");
720         }
721     }  
722   if ( (status == GNUNET_SYSERR) &&
723        (emsg == NULL) )
724     {
725       GNUNET_break (0);
726       emsg = _("Invalid error message received from datastore service");
727     }
728 #if DEBUG_DATASTORE
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Received status %d/%s\n",
731               (int) status,
732               emsg);
733 #endif
734   process_queue (h);
735   if (rc.cont != NULL)
736     rc.cont (rc.cont_cls, 
737              status,
738              emsg);
739 }
740
741
742 /**
743  * Store an item in the datastore.  If the item is already present,
744  * the priorities are summed up and the higher expiration time and
745  * lower anonymity level is used.
746  *
747  * @param h handle to the datastore
748  * @param rid reservation ID to use (from "reserve"); use 0 if no
749  *            prior reservation was made
750  * @param key key for the value
751  * @param size number of bytes in data
752  * @param data content stored
753  * @param type type of the content
754  * @param priority priority of the content
755  * @param anonymity anonymity-level for the content
756  * @param expiration expiration time for the content
757  * @param queue_priority ranking of this request in the priority queue
758  * @param max_queue_size at what queue size should this request be dropped
759  *        (if other requests of higher priority are in the queue)
760  * @param timeout timeout for the operation
761  * @param cont continuation to call when done
762  * @param cont_cls closure for cont
763  * @return NULL if the entry was not queued, otherwise a handle that can be used to
764  *         cancel; note that even if NULL is returned, the callback will be invoked
765  *         (or rather, will already have been invoked)
766  */
767 struct GNUNET_DATASTORE_QueueEntry *
768 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
769                       int rid,
770                       const GNUNET_HashCode * key,
771                       size_t size,
772                       const void *data,
773                       enum GNUNET_BLOCK_Type type,
774                       uint32_t priority,
775                       uint32_t anonymity,
776                       struct GNUNET_TIME_Absolute expiration,
777                       unsigned int queue_priority,
778                       unsigned int max_queue_size,
779                       struct GNUNET_TIME_Relative timeout,
780                       GNUNET_DATASTORE_ContinuationWithStatus cont,
781                       void *cont_cls)
782 {
783   struct GNUNET_DATASTORE_QueueEntry *qe;
784   struct DataMessage *dm;
785   size_t msize;
786   union QueueContext qc;
787
788 #if DEBUG_DATASTORE
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Asked to put %u bytes of data under key `%s'\n",
791               size,
792               GNUNET_h2s (key));
793 #endif
794   msize = sizeof(struct DataMessage) + size;
795   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
796   qc.sc.cont = cont;
797   qc.sc.cont_cls = cont_cls;
798   qe = make_queue_entry (h, msize,
799                          queue_priority, max_queue_size, timeout,
800                          &process_status_message, &qc);
801   if (qe == NULL)
802     return NULL;
803   dm = (struct DataMessage* ) &qe[1];
804   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
805   dm->header.size = htons(msize);
806   dm->rid = htonl(rid);
807   dm->size = htonl( (uint32_t) size);
808   dm->type = htonl(type);
809   dm->priority = htonl(priority);
810   dm->anonymity = htonl(anonymity);
811   dm->uid = GNUNET_htonll(0);
812   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
813   dm->key = *key;
814   memcpy (&dm[1], data, size);
815   process_queue (h);
816   return qe;
817 }
818
819
820 /**
821  * Reserve space in the datastore.  This function should be used
822  * to avoid "out of space" failures during a longer sequence of "put"
823  * operations (for example, when a file is being inserted).
824  *
825  * @param h handle to the datastore
826  * @param amount how much space (in bytes) should be reserved (for content only)
827  * @param entries how many entries will be created (to calculate per-entry overhead)
828  * @param queue_priority ranking of this request in the priority queue
829  * @param max_queue_size at what queue size should this request be dropped
830  *        (if other requests of higher priority are in the queue)
831  * @param timeout how long to wait at most for a response (or before dying in queue)
832  * @param cont continuation to call when done; "success" will be set to
833  *             a positive reservation value if space could be reserved.
834  * @param cont_cls closure for cont
835  * @return NULL if the entry was not queued, otherwise a handle that can be used to
836  *         cancel; note that even if NULL is returned, the callback will be invoked
837  *         (or rather, will already have been invoked)
838  */
839 struct GNUNET_DATASTORE_QueueEntry *
840 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
841                           uint64_t amount,
842                           uint32_t entries,
843                           unsigned int queue_priority,
844                           unsigned int max_queue_size,
845                           struct GNUNET_TIME_Relative timeout,
846                           GNUNET_DATASTORE_ContinuationWithStatus cont,
847                           void *cont_cls)
848 {
849   struct GNUNET_DATASTORE_QueueEntry *qe;
850   struct ReserveMessage *rm;
851   union QueueContext qc;
852
853   if (cont == NULL)
854     cont = &drop_status_cont;
855 #if DEBUG_DATASTORE
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Asked to reserve %llu bytes of data and %u entries'\n",
858               (unsigned long long) amount,
859               (unsigned int) entries);
860 #endif
861   qc.sc.cont = cont;
862   qc.sc.cont_cls = cont_cls;
863   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
864                          queue_priority, max_queue_size, timeout,
865                          &process_status_message, &qc);
866   if (qe == NULL)
867     return NULL;
868   rm = (struct ReserveMessage*) &qe[1];
869   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
870   rm->header.size = htons(sizeof (struct ReserveMessage));
871   rm->entries = htonl(entries);
872   rm->amount = GNUNET_htonll(amount);
873   process_queue (h);
874   return qe;
875 }
876
877
878 /**
879  * Signal that all of the data for which a reservation was made has
880  * been stored and that whatever excess space might have been reserved
881  * can now be released.
882  *
883  * @param h handle to the datastore
884  * @param rid reservation ID (value of "success" in original continuation
885  *        from the "reserve" function).
886  * @param queue_priority ranking of this request in the priority queue
887  * @param max_queue_size at what queue size should this request be dropped
888  *        (if other requests of higher priority are in the queue)
889  * @param queue_priority ranking of this request in the priority queue
890  * @param max_queue_size at what queue size should this request be dropped
891  *        (if other requests of higher priority are in the queue)
892  * @param timeout how long to wait at most for a response
893  * @param cont continuation to call when done
894  * @param cont_cls closure for cont
895  * @return NULL if the entry was not queued, otherwise a handle that can be used to
896  *         cancel; note that even if NULL is returned, the callback will be invoked
897  *         (or rather, will already have been invoked)
898  */
899 struct GNUNET_DATASTORE_QueueEntry *
900 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
901                                   int rid,
902                                   unsigned int queue_priority,
903                                   unsigned int max_queue_size,
904                                   struct GNUNET_TIME_Relative timeout,
905                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
906                                   void *cont_cls)
907 {
908   struct GNUNET_DATASTORE_QueueEntry *qe;
909   struct ReleaseReserveMessage *rrm;
910   union QueueContext qc;
911
912   if (cont == NULL)
913     cont = &drop_status_cont;
914 #if DEBUG_DATASTORE
915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
916               "Asked to release reserve %d\n",
917               rid);
918 #endif
919   qc.sc.cont = cont;
920   qc.sc.cont_cls = cont_cls;
921   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
922                          queue_priority, max_queue_size, timeout,
923                          &process_status_message, &qc);
924   if (qe == NULL)
925     return NULL;
926   rrm = (struct ReleaseReserveMessage*) &qe[1];
927   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
928   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
929   rrm->rid = htonl(rid);
930   process_queue (h);
931   return qe;
932 }
933
934
935 /**
936  * Update a value in the datastore.
937  *
938  * @param h handle to the datastore
939  * @param uid identifier for the value
940  * @param priority how much to increase the priority of the value
941  * @param expiration new expiration value should be MAX of existing and this argument
942  * @param queue_priority ranking of this request in the priority queue
943  * @param max_queue_size at what queue size should this request be dropped
944  *        (if other requests of higher priority are in the queue)
945  * @param timeout how long to wait at most for a response
946  * @param cont continuation to call when done
947  * @param cont_cls closure for cont
948  * @return NULL if the entry was not queued, otherwise a handle that can be used to
949  *         cancel; note that even if NULL is returned, the callback will be invoked
950  *         (or rather, will already have been invoked)
951  */
952 struct GNUNET_DATASTORE_QueueEntry *
953 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
954                          unsigned long long uid,
955                          uint32_t priority,
956                          struct GNUNET_TIME_Absolute expiration,
957                          unsigned int queue_priority,
958                          unsigned int max_queue_size,
959                          struct GNUNET_TIME_Relative timeout,
960                          GNUNET_DATASTORE_ContinuationWithStatus cont,
961                          void *cont_cls)
962 {
963   struct GNUNET_DATASTORE_QueueEntry *qe;
964   struct UpdateMessage *um;
965   union QueueContext qc;
966
967   if (cont == NULL)
968     cont = &drop_status_cont;
969 #if DEBUG_DATASTORE
970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
972               uid,
973               (unsigned int) priority,
974               (unsigned long long) expiration.value);
975 #endif
976   qc.sc.cont = cont;
977   qc.sc.cont_cls = cont_cls;
978   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
979                          queue_priority, max_queue_size, timeout,
980                          &process_status_message, &qc);
981   if (qe == NULL)
982     return NULL;
983   um = (struct UpdateMessage*) &qe[1];
984   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
985   um->header.size = htons(sizeof (struct UpdateMessage));
986   um->priority = htonl(priority);
987   um->expiration = GNUNET_TIME_absolute_hton(expiration);
988   um->uid = GNUNET_htonll(uid);
989   process_queue (h);
990   return qe;
991 }
992
993
994 /**
995  * Explicitly remove some content from the database.
996  * The "cont"inuation will be called with status
997  * "GNUNET_OK" if content was removed, "GNUNET_NO"
998  * if no matching entry was found and "GNUNET_SYSERR"
999  * on all other types of errors.
1000  *
1001  * @param h handle to the datastore
1002  * @param key key for the value
1003  * @param size number of bytes in data
1004  * @param data content stored
1005  * @param queue_priority ranking of this request in the priority queue
1006  * @param max_queue_size at what queue size should this request be dropped
1007  *        (if other requests of higher priority are in the queue)
1008  * @param timeout how long to wait at most for a response
1009  * @param cont continuation to call when done
1010  * @param cont_cls closure for cont
1011  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1012  *         cancel; note that even if NULL is returned, the callback will be invoked
1013  *         (or rather, will already have been invoked)
1014  */
1015 struct GNUNET_DATASTORE_QueueEntry *
1016 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1017                          const GNUNET_HashCode *key,
1018                          size_t size, 
1019                          const void *data,
1020                          unsigned int queue_priority,
1021                          unsigned int max_queue_size,
1022                          struct GNUNET_TIME_Relative timeout,
1023                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1024                          void *cont_cls)
1025 {
1026   struct GNUNET_DATASTORE_QueueEntry *qe;
1027   struct DataMessage *dm;
1028   size_t msize;
1029   union QueueContext qc;
1030
1031   if (cont == NULL)
1032     cont = &drop_status_cont;
1033 #if DEBUG_DATASTORE
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035               "Asked to remove %u bytes under key `%s'\n",
1036               size,
1037               GNUNET_h2s (key));
1038 #endif
1039   qc.sc.cont = cont;
1040   qc.sc.cont_cls = cont_cls;
1041   msize = sizeof(struct DataMessage) + size;
1042   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1043   qe = make_queue_entry (h, msize,
1044                          queue_priority, max_queue_size, timeout,
1045                          &process_status_message, &qc);
1046   if (qe == NULL)
1047     return NULL;
1048   dm = (struct DataMessage*) &qe[1];
1049   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1050   dm->header.size = htons(msize);
1051   dm->rid = htonl(0);
1052   dm->size = htonl(size);
1053   dm->type = htonl(0);
1054   dm->priority = htonl(0);
1055   dm->anonymity = htonl(0);
1056   dm->uid = GNUNET_htonll(0);
1057   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1058   dm->key = *key;
1059   memcpy (&dm[1], data, size);
1060   process_queue (h);
1061   return qe;
1062 }
1063
1064
1065 /**
1066  * Type of a function to call when we receive a message
1067  * from the service.
1068  *
1069  * @param cls closure
1070  * @param msg message received, NULL on timeout or fatal error
1071  */
1072 static void 
1073 process_result_message (void *cls,
1074                         const struct GNUNET_MessageHeader * msg)
1075 {
1076   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1077   struct GNUNET_DATASTORE_Handle *h = qe->h;
1078   struct ResultContext rc = qe->qc.rc;
1079   const struct DataMessage *dm;
1080   int was_transmitted;
1081
1082   h->in_receive = GNUNET_NO;
1083   if (msg == NULL)
1084    {
1085       was_transmitted = qe->was_transmitted;
1086       free_queue_entry (qe);
1087       if (was_transmitted == GNUNET_YES)
1088         {
1089           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1090                       _("Failed to receive response from database.\n"));
1091           do_disconnect (h);
1092         }
1093       else
1094         {
1095 #if DEBUG_DATASTORE
1096           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1097                       "Request dropped due to finite datastore queue length.\n");
1098 #endif
1099         }
1100       if (rc.iter != NULL)
1101         rc.iter (rc.iter_cls,
1102                  NULL, 0, NULL, 0, 0, 0, 
1103                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1104       return;
1105     }
1106   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1107   GNUNET_assert (h->queue_head == qe);
1108   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1109     {
1110       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1111 #if DEBUG_DATASTORE
1112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1113                   "Received end of result set\n");
1114 #endif
1115       free_queue_entry (qe);
1116       if (rc.iter != NULL)
1117         rc.iter (rc.iter_cls,
1118                  NULL, 0, NULL, 0, 0, 0, 
1119                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1120       process_queue (h);
1121       return;
1122     }
1123   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1124        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1125        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1126     {
1127       GNUNET_break (0);
1128       free_queue_entry (qe);
1129       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1130       do_disconnect (h);
1131       if (rc.iter != NULL)
1132         rc.iter (rc.iter_cls,
1133                  NULL, 0, NULL, 0, 0, 0, 
1134                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1135       return;
1136     }
1137   if (rc.iter == NULL)
1138     {
1139       /* abort iteration */
1140 #if DEBUG_DATASTORE
1141       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1142                   "Aborting iteration via disconnect (client has cancelled)\n");
1143 #endif
1144       free_queue_entry (qe);
1145       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1146       do_disconnect (h);
1147       return;
1148     }
1149   dm = (const struct DataMessage*) msg;
1150 #if DEBUG_DATASTORE
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Received result %llu with type %u and size %u with key %s\n",
1153               (unsigned long long) GNUNET_ntohll(dm->uid),
1154               ntohl(dm->type),
1155               ntohl(dm->size),
1156               GNUNET_h2s(&dm->key));
1157 #endif
1158   rc.iter (rc.iter_cls,
1159            &dm->key,
1160            ntohl(dm->size),
1161            &dm[1],
1162            ntohl(dm->type),
1163            ntohl(dm->priority),
1164            ntohl(dm->anonymity),
1165            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1166            GNUNET_ntohll(dm->uid));
1167 }
1168
1169
1170 /**
1171  * Get a random value from the datastore.
1172  *
1173  * @param h handle to the datastore
1174  * @param queue_priority ranking of this request in the priority queue
1175  * @param max_queue_size at what queue size should this request be dropped
1176  *        (if other requests of higher priority are in the queue)
1177  * @param timeout how long to wait at most for a response
1178  * @param iter function to call on a random value; it
1179  *        will be called once with a value (if available)
1180  *        and always once with a value of NULL.
1181  * @param iter_cls closure for iter
1182  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1183  *         cancel; note that even if NULL is returned, the callback will be invoked
1184  *         (or rather, will already have been invoked)
1185  */
1186 struct GNUNET_DATASTORE_QueueEntry *
1187 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1188                              unsigned int queue_priority,
1189                              unsigned int max_queue_size,
1190                              struct GNUNET_TIME_Relative timeout,
1191                              GNUNET_DATASTORE_Iterator iter, 
1192                              void *iter_cls)
1193 {
1194   struct GNUNET_DATASTORE_QueueEntry *qe;
1195   struct GNUNET_MessageHeader *m;
1196   union QueueContext qc;
1197
1198 #if DEBUG_DATASTORE
1199   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1200               "Asked to get random entry in %llu ms\n",
1201               (unsigned long long) timeout.value);
1202 #endif
1203   qc.rc.iter = iter;
1204   qc.rc.iter_cls = iter_cls;
1205   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1206                          queue_priority, max_queue_size, timeout,
1207                          &process_result_message, &qc);
1208   if (qe == NULL)
1209     return NULL;    
1210   m = (struct GNUNET_MessageHeader*) &qe[1];
1211   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1212   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1213   process_queue (h);
1214   return qe;
1215 }
1216
1217
1218 /**
1219  * Get a zero-anonymity value from the datastore.
1220  *
1221  * @param h handle to the datastore
1222  * @param queue_priority ranking of this request in the priority queue
1223  * @param max_queue_size at what queue size should this request be dropped
1224  *        (if other requests of higher priority are in the queue)
1225  * @param timeout how long to wait at most for a response
1226  * @param type allowed type for the operation
1227  * @param iter function to call on a random value; it
1228  *        will be called once with a value (if available)
1229  *        and always once with a value of NULL.
1230  * @param iter_cls closure for iter
1231  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1232  *         cancel; note that even if NULL is returned, the callback will be invoked
1233  *         (or rather, will already have been invoked)
1234  */
1235 struct GNUNET_DATASTORE_QueueEntry *
1236 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1237                                      unsigned int queue_priority,
1238                                      unsigned int max_queue_size,
1239                                      struct GNUNET_TIME_Relative timeout,
1240                                      enum GNUNET_BLOCK_Type type,
1241                                      GNUNET_DATASTORE_Iterator iter, 
1242                                      void *iter_cls)
1243 {
1244   struct GNUNET_DATASTORE_QueueEntry *qe;
1245   struct GetZeroAnonymityMessage *m;
1246   union QueueContext qc;
1247
1248 #if DEBUG_DATASTORE
1249   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1250               "Asked to get zero-anonymity entry in %llu ms\n",
1251               (unsigned long long) timeout.value);
1252 #endif
1253   qc.rc.iter = iter;
1254   qc.rc.iter_cls = iter_cls;
1255   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1256                          queue_priority, max_queue_size, timeout,
1257                          &process_result_message, &qc);
1258   if (qe == NULL)
1259     return NULL;    
1260   m = (struct GetZeroAnonymityMessage*) &qe[1];
1261   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1262   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1263   m->type = htonl ((uint32_t) type);
1264   process_queue (h);
1265   return qe;
1266 }
1267
1268
1269
1270 /**
1271  * Iterate over the results for a particular key
1272  * in the datastore.  The iterator will only be called
1273  * once initially; if the first call did contain a
1274  * result, further results can be obtained by calling
1275  * "GNUNET_DATASTORE_get_next" with the given argument.
1276  *
1277  * @param h handle to the datastore
1278  * @param key maybe NULL (to match all entries)
1279  * @param type desired type, 0 for any
1280  * @param queue_priority ranking of this request in the priority queue
1281  * @param max_queue_size at what queue size should this request be dropped
1282  *        (if other requests of higher priority are in the queue)
1283  * @param timeout how long to wait at most for a response
1284  * @param iter function to call on each matching value;
1285  *        will be called once with a NULL value at the end
1286  * @param iter_cls closure for iter
1287  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1288  *         cancel; note that even if NULL is returned, the callback will be invoked
1289  *         (or rather, will already have been invoked)
1290  */
1291 struct GNUNET_DATASTORE_QueueEntry *
1292 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1293                       const GNUNET_HashCode * key,
1294                       enum GNUNET_BLOCK_Type type,
1295                       unsigned int queue_priority,
1296                       unsigned int max_queue_size,
1297                       struct GNUNET_TIME_Relative timeout,
1298                       GNUNET_DATASTORE_Iterator iter, 
1299                       void *iter_cls)
1300 {
1301   struct GNUNET_DATASTORE_QueueEntry *qe;
1302   struct GetMessage *gm;
1303   union QueueContext qc;
1304
1305 #if DEBUG_DATASTORE
1306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307               "Asked to look for data of type %u under key `%s'\n",
1308               (unsigned int) type,
1309               GNUNET_h2s (key));
1310 #endif
1311   qc.rc.iter = iter;
1312   qc.rc.iter_cls = iter_cls;
1313   qe = make_queue_entry (h, sizeof(struct GetMessage),
1314                          queue_priority, max_queue_size, timeout,
1315                          &process_result_message, &qc);
1316   if (qe == NULL)
1317     {
1318 #if DEBUG_DATASTORE
1319       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320                   "Could not queue request for `%s'\n",
1321                   GNUNET_h2s (key));
1322 #endif
1323       return NULL;
1324     }
1325   gm = (struct GetMessage*) &qe[1];
1326   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1327   gm->type = htonl(type);
1328   if (key != NULL)
1329     {
1330       gm->header.size = htons(sizeof (struct GetMessage));
1331       gm->key = *key;
1332     }
1333   else
1334     {
1335       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1336     }
1337   process_queue (h);
1338   return qe;
1339 }
1340
1341
1342 /**
1343  * Function called to trigger obtaining the next result
1344  * from the datastore.
1345  * 
1346  * @param h handle to the datastore
1347  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1348  *        iteration (with a final call to "iter" with key/data == NULL).
1349  */
1350 void 
1351 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1352                            int more)
1353 {
1354   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1355   struct ResultContext rc = qe->qc.rc;
1356
1357   GNUNET_assert (&process_result_message == qe->response_proc);
1358   if (GNUNET_YES == more)
1359     {     
1360       h->in_receive = GNUNET_YES;
1361       GNUNET_CLIENT_receive (h->client,
1362                              qe->response_proc,
1363                              qe,
1364                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1365       return;
1366     }
1367   free_queue_entry (qe);
1368   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1369   do_disconnect (h);
1370   rc.iter (rc.iter_cls,
1371            NULL, 0, NULL, 0, 0, 0, 
1372            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1373 }
1374
1375
1376 /**
1377  * Cancel a datastore operation.  The final callback from the
1378  * operation must not have been done yet.
1379  * 
1380  * @param qe operation to cancel
1381  */
1382 void
1383 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1384 {
1385   struct GNUNET_DATASTORE_Handle *h;
1386   int reconnect;
1387
1388   h = qe->h;
1389 #if DEBUG_DATASTORE
1390   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1391                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1392                qe,
1393                qe->was_transmitted,
1394                h->queue_head == qe);
1395 #endif
1396   reconnect = GNUNET_NO;
1397   if (GNUNET_YES == qe->was_transmitted) 
1398     {
1399       if (qe->response_proc == &process_result_message) 
1400         {
1401           qe->qc.rc.iter = NULL;    
1402           if (GNUNET_YES != h->in_receive)
1403             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1404         }
1405       else
1406         {
1407           qe->qc.sc.cont = NULL;
1408         }
1409       return;
1410     }
1411   free_queue_entry (qe);
1412   process_queue (h);
1413 }
1414
1415
1416 /* end of datastore_api.c */