avoid disconnect on cancel
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "datastore.h"
32
33
34 /**
35  * Context for processing status messages.
36  */
37 struct StatusContext
38 {
39   /**
40    * Continuation to call with the status.
41    */
42   GNUNET_DATASTORE_ContinuationWithStatus cont;
43
44   /**
45    * Closure for cont.
46    */
47   void *cont_cls;
48
49 };
50
51
52 /**
53  * Context for processing result messages.
54  */
55 struct ResultContext
56 {
57   /**
58    * Iterator to call with the result.
59    */
60   GNUNET_DATASTORE_Iterator iter;
61
62   /**
63    * Closure for iter.
64    */
65   void *iter_cls;
66
67 };
68
69
70 /**
71  *  Context for a queue operation.
72  */
73 union QueueContext
74 {
75
76   struct StatusContext sc;
77   
78   struct ResultContext rc;
79
80 };
81
82
83
84 /**
85  * Entry in our priority queue.
86  */
87 struct GNUNET_DATASTORE_QueueEntry
88 {
89
90   /**
91    * This is a linked list.
92    */
93   struct GNUNET_DATASTORE_QueueEntry *next;
94
95   /**
96    * This is a linked list.
97    */
98   struct GNUNET_DATASTORE_QueueEntry *prev;
99
100   /**
101    * Handle to the master context.
102    */
103   struct GNUNET_DATASTORE_Handle *h;
104
105   /**
106    * Response processor (NULL if we are not waiting for a response).
107    * This struct should be used for the closure, function-specific
108    * arguments can be passed via 'qc'.
109    */
110   GNUNET_CLIENT_MessageHandler response_proc;
111
112   /**
113    * Function to call after transmission of the request.
114    */
115   GNUNET_DATASTORE_ContinuationWithStatus cont;
116    
117   /**
118    * Closure for 'cont'.
119    */
120   void *cont_cls;
121
122   /**
123    * Context for the operation.
124    */
125   union QueueContext qc;
126
127   /**
128    * Task for timeout signalling.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132   /**
133    * Timeout for the current operation.
134    */
135   struct GNUNET_TIME_Absolute timeout;
136
137   /**
138    * Priority in the queue.
139    */
140   unsigned int priority;
141
142   /**
143    * Maximum allowed length of queue (otherwise
144    * this request should be discarded).
145    */
146   unsigned int max_queue;
147
148   /**
149    * Number of bytes in the request message following
150    * this struct.  32-bit value for nicer memory
151    * access (and overall struct alignment).
152    */
153   uint32_t message_size;
154
155   /**
156    * Has this message been transmitted to the service?
157    * Only ever GNUNET_YES for the head of the queue.
158    * Note that the overall struct should end at a 
159    * multiple of 64 bits.
160    */
161   int32_t was_transmitted;
162   
163 };
164
165 /**
166  * Handle to the datastore service. 
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Our scheduler.
178    */
179   struct GNUNET_SCHEDULER_Handle *sched;
180
181   /**
182    * Current connection to the datastore service.
183    */
184   struct GNUNET_CLIENT_Connection *client;
185
186   /**
187    * Current transmit handle.
188    */
189   struct GNUNET_CLIENT_TransmitHandle *th;
190
191   /**
192    * Current head of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_head;
195
196   /**
197    * Current tail of priority queue.
198    */
199   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
200
201   /**
202    * Task for trying to reconnect.
203    */
204   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
205
206   /**
207    * How quickly should we retry?  Used for exponential back-off on
208    * connect-errors.
209    */
210   struct GNUNET_TIME_Relative retry_time;
211
212   /**
213    * Number of entries in the queue.
214    */
215   unsigned int queue_size;
216
217   /**
218    * Are we currently trying to receive from the service?
219    */
220   int in_receive;
221
222 };
223
224
225
226 /**
227  * Connect to the datastore service.
228  *
229  * @param cfg configuration to use
230  * @param sched scheduler to use
231  * @return handle to use to access the service
232  */
233 struct GNUNET_DATASTORE_Handle *
234 GNUNET_DATASTORE_connect (const struct
235                           GNUNET_CONFIGURATION_Handle
236                           *cfg,
237                           struct
238                           GNUNET_SCHEDULER_Handle
239                           *sched)
240 {
241   struct GNUNET_CLIENT_Connection *c;
242   struct GNUNET_DATASTORE_Handle *h;
243   
244   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
245   if (c == NULL)
246     return NULL; /* oops */
247   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
248                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
249   h->client = c;
250   h->cfg = cfg;
251   h->sched = sched;
252   return h;
253 }
254
255
256 /**
257  * Transmit DROP message to datastore service.
258  *
259  * @param cls the 'struct GNUNET_DATASTORE_Handle'
260  * @param size number of bytes that can be copied to buf
261  * @param buf where to copy the drop message
262  * @return number of bytes written to buf
263  */
264 static size_t
265 transmit_drop (void *cls,
266                size_t size, 
267                void *buf)
268 {
269   struct GNUNET_DATASTORE_Handle *h = cls;
270   struct GNUNET_MessageHeader *hdr;
271   
272   if (buf == NULL)
273     {
274       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
275                   _("Failed to transmit request to drop database.\n"));
276       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
277       return 0;
278     }
279   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
280   hdr = buf;
281   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
282   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
283   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
284   return sizeof(struct GNUNET_MessageHeader);
285 }
286
287
288 /**
289  * Disconnect from the datastore service (and free
290  * associated resources).
291  *
292  * @param h handle to the datastore
293  * @param drop set to GNUNET_YES to delete all data in datastore (!)
294  */
295 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
296                                   int drop)
297 {
298   struct GNUNET_DATASTORE_QueueEntry *qe;
299
300   if (h->client != NULL)
301     {
302       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
303       h->client = NULL;
304     }
305   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
306     {
307       GNUNET_SCHEDULER_cancel (h->sched,
308                                h->reconnect_task);
309       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
310     }
311   while (NULL != (qe = h->queue_head))
312     {
313       GNUNET_assert (NULL != qe->response_proc);
314       qe->response_proc (qe, NULL);
315     }
316   if (GNUNET_YES == drop) 
317     {
318       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
319       if (h->client != NULL)
320         {
321           if (NULL != 
322               GNUNET_CLIENT_notify_transmit_ready (h->client,
323                                                    sizeof(struct GNUNET_MessageHeader),
324                                                    GNUNET_TIME_UNIT_MINUTES,
325                                                    GNUNET_YES,
326                                                    &transmit_drop,
327                                                    h))
328             return;
329           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
330         }
331       GNUNET_break (0);
332     }
333   GNUNET_free (h);
334 }
335
336
337 /**
338  * A request has timed out (before being transmitted to the service).
339  *
340  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
341  * @param tc scheduler context
342  */
343 static void
344 timeout_queue_entry (void *cls,
345                      const struct GNUNET_SCHEDULER_TaskContext *tc)
346 {
347   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
348
349   qe->task = GNUNET_SCHEDULER_NO_TASK;
350   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
351   qe->response_proc (qe, NULL);
352 }
353
354
355 /**
356  * Create a new entry for our priority queue (and possibly discard other entires if
357  * the queue is getting too long).
358  *
359  * @param h handle to the datastore
360  * @param msize size of the message to queue
361  * @param queue_priority priority of the entry
362  * @param max_queue_size at what queue size should this request be dropped
363  *        (if other requests of higher priority are in the queue)
364  * @param timeout timeout for the operation
365  * @param response_proc function to call with replies (can be NULL)
366  * @param qc client context (NOT a closure for response_proc)
367  * @return NULL if the queue is full (and this entry was dropped)
368  */
369 static struct GNUNET_DATASTORE_QueueEntry *
370 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
371                   size_t msize,
372                   unsigned int queue_priority,
373                   unsigned int max_queue_size,
374                   struct GNUNET_TIME_Relative timeout,
375                   GNUNET_CLIENT_MessageHandler response_proc,            
376                   const union QueueContext *qc)
377 {
378   struct GNUNET_DATASTORE_QueueEntry *ret;
379   struct GNUNET_DATASTORE_QueueEntry *pos;
380   unsigned int c;
381
382   c = 0;
383   pos = h->queue_head;
384   while ( (pos != NULL) &&
385           (c < max_queue_size) &&
386           (pos->priority >= queue_priority) )
387     {
388       c++;
389       pos = pos->next;
390     }
391   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
392   ret->h = h;
393   ret->response_proc = response_proc;
394   ret->qc = *qc;
395   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
396   ret->priority = queue_priority;
397   ret->max_queue = max_queue_size;
398   ret->message_size = msize;
399   ret->was_transmitted = GNUNET_NO;
400   if (pos == NULL)
401     {
402       /* append at the tail */
403       pos = h->queue_tail;
404     }
405   else
406     {
407       pos = pos->prev; 
408       /* do not insert at HEAD if HEAD query was already
409          transmitted and we are still receiving replies! */
410       if ( (pos == NULL) &&
411            (h->queue_head->was_transmitted) )
412         pos = h->queue_head;
413     }
414   c++;
415   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
416                                      h->queue_tail,
417                                      pos,
418                                      ret);
419   h->queue_size++;
420   if (c > max_queue_size)
421     {
422       response_proc (ret, NULL);
423       return NULL;
424     }
425   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
426                                             timeout,
427                                             &timeout_queue_entry,
428                                             ret);
429   pos = ret->next;
430   while (pos != NULL) 
431     {
432       if (pos->max_queue < h->queue_size)
433         {
434           GNUNET_assert (pos->response_proc != NULL);
435           pos->response_proc (pos, NULL);
436           break;
437         }
438       pos = pos->next;
439     }
440   return ret;
441 }
442
443
444 /**
445  * Process entries in the queue (or do nothing if we are already
446  * doing so).
447  * 
448  * @param h handle to the datastore
449  */
450 static void
451 process_queue (struct GNUNET_DATASTORE_Handle *h);
452
453
454 /**
455  * Try reconnecting to the datastore service.
456  *
457  * @param cls the 'struct GNUNET_DATASTORE_Handle'
458  * @param tc scheduler context
459  */
460 static void
461 try_reconnect (void *cls,
462                const struct GNUNET_SCHEDULER_TaskContext *tc)
463 {
464   struct GNUNET_DATASTORE_Handle *h = cls;
465
466   if (h->retry_time.value < GNUNET_CONSTANTS_SERVICE_RETRY.value)
467     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
468   else
469     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
470   if (h->retry_time.value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.value)
471     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
472   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
473   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
474   if (h->client == NULL)
475     {
476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
477                   "DATASTORE reconnect failed (fatally)\n");
478       return;
479     }
480 #if DEBUG_DATASTORE
481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
482               "Reconnected to DATASTORE\n");
483 #endif
484   process_queue (h);
485 }
486
487
488 /**
489  * Disconnect from the service and then try reconnecting to the datastore service
490  * after some delay.
491  *
492  * @param h handle to datastore to disconnect and reconnect
493  */
494 static void
495 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
496 {
497   if (h->client == NULL)
498     {
499 #if DEBUG_DATASTORE
500       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
501                   "client NULL in disconnect, will not try to reconnect\n");
502 #endif
503       return;
504     }
505 #if 0
506   GNUNET_STATISTICS_update (stats,
507                             gettext_noop ("# reconnected to DATASTORE"),
508                             1,
509                             GNUNET_NO);
510 #endif
511   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
512   h->client = NULL;
513   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
514                                                     h->retry_time,
515                                                     &try_reconnect,
516                                                     h);      
517 }
518
519
520 /**
521  * Transmit request from queue to datastore service.
522  *
523  * @param cls the 'struct GNUNET_DATASTORE_Handle'
524  * @param size number of bytes that can be copied to buf
525  * @param buf where to copy the drop message
526  * @return number of bytes written to buf
527  */
528 static size_t
529 transmit_request (void *cls,
530                   size_t size, 
531                   void *buf)
532 {
533   struct GNUNET_DATASTORE_Handle *h = cls;
534   struct GNUNET_DATASTORE_QueueEntry *qe;
535   size_t msize;
536
537   h->th = NULL;
538   if (NULL == (qe = h->queue_head))
539     return 0; /* no entry in queue */
540   if (buf == NULL)
541     {
542       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
543                   _("Failed to transmit request to DATASTORE.\n"));
544       do_disconnect (h);
545       return 0;
546     }
547   if (size < (msize = qe->message_size))
548     {
549       process_queue (h);
550       return 0;
551     }
552  #if DEBUG_DATASTORE
553   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
554               "Transmitting %u byte request to DATASTORE\n",
555               msize);
556 #endif
557   memcpy (buf, &qe[1], msize);
558   qe->was_transmitted = GNUNET_YES;
559   GNUNET_SCHEDULER_cancel (h->sched,
560                            qe->task);
561   qe->task = GNUNET_SCHEDULER_NO_TASK;
562   h->in_receive = GNUNET_YES;
563   GNUNET_CLIENT_receive (h->client,
564                          qe->response_proc,
565                          qe,
566                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
567   return msize;
568 }
569
570
571 /**
572  * Process entries in the queue (or do nothing if we are already
573  * doing so).
574  * 
575  * @param h handle to the datastore
576  */
577 static void
578 process_queue (struct GNUNET_DATASTORE_Handle *h)
579 {
580   struct GNUNET_DATASTORE_QueueEntry *qe;
581
582   if (NULL == (qe = h->queue_head))
583     {
584 #if DEBUG_DATASTORE > 1
585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586                   "Queue empty\n");
587 #endif
588       return; /* no entry in queue */
589     }
590   if (qe->was_transmitted == GNUNET_YES)
591     {
592 #if DEBUG_DATASTORE > 1
593       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
594                   "Head request already transmitted\n");
595 #endif
596       return; /* waiting for replies */
597     }
598   if (h->th != NULL)
599     {
600 #if DEBUG_DATASTORE > 1
601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
602                   "Pending transmission request\n");
603 #endif
604       return; /* request pending */
605     }
606   if (h->client == NULL)
607     {
608 #if DEBUG_DATASTORE > 1
609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
610                   "Not connected\n");
611 #endif
612       return; /* waiting for reconnect */
613     }
614 #if DEBUG_DATASTORE
615   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
616               "Queueing %u byte request to DATASTORE\n",
617               qe->message_size);
618 #endif
619   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
620                                                qe->message_size,
621                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
622                                                GNUNET_YES,
623                                                &transmit_request,
624                                                h);
625 }
626
627
628 /**
629  * Dummy continuation used to do nothing (but be non-zero).
630  *
631  * @param cls closure
632  * @param result result 
633  * @param emsg error message
634  */
635 static void
636 drop_status_cont (void *cls, int result, const char *emsg)
637 {
638   /* do nothing */
639 }
640
641
642 static void
643 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
644 {
645   struct GNUNET_DATASTORE_Handle *h = qe->h;
646
647   GNUNET_CONTAINER_DLL_remove (h->queue_head,
648                                h->queue_tail,
649                                qe);
650   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
651     {
652       GNUNET_SCHEDULER_cancel (h->sched,
653                                qe->task);
654       qe->task = GNUNET_SCHEDULER_NO_TASK;
655     }
656   h->queue_size--;
657   GNUNET_free (qe);
658 }
659
660 /**
661  * Type of a function to call when we receive a message
662  * from the service.
663  *
664  * @param cls closure
665  * @param msg message received, NULL on timeout or fatal error
666  */
667 static void 
668 process_status_message (void *cls,
669                         const struct
670                         GNUNET_MessageHeader * msg)
671 {
672   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
673   struct GNUNET_DATASTORE_Handle *h = qe->h;
674   struct StatusContext rc = qe->qc.sc;
675   const struct StatusMessage *sm;
676   const char *emsg;
677   int32_t status;
678   int was_transmitted;
679
680   h->in_receive = GNUNET_NO;
681   was_transmitted = qe->was_transmitted;
682   if (msg == NULL)
683     {      
684       free_queue_entry (qe);
685       if (NULL == h->client)
686         return; /* forced disconnect */
687       if (rc.cont != NULL)
688         rc.cont (rc.cont_cls, 
689                  GNUNET_SYSERR,
690                  _("Failed to receive status response from database."));
691       if (was_transmitted == GNUNET_YES)
692         do_disconnect (h);
693       return;
694     }
695   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
696   GNUNET_assert (h->queue_head == qe);
697   free_queue_entry (qe);
698   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
699        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
700     {
701       GNUNET_break (0);
702       h->retry_time = GNUNET_TIME_UNIT_ZERO;
703       do_disconnect (h);
704       if (rc.cont != NULL)
705         rc.cont (rc.cont_cls, 
706                  GNUNET_SYSERR,
707                  _("Error reading response from datastore service"));
708       return;
709     }
710   sm = (const struct StatusMessage*) msg;
711   status = ntohl(sm->status);
712   emsg = NULL;
713   if (ntohs(msg->size) > sizeof(struct StatusMessage))
714     {
715       emsg = (const char*) &sm[1];
716       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
717         {
718           GNUNET_break (0);
719           emsg = _("Invalid error message received from datastore service");
720         }
721     }  
722   if ( (status == GNUNET_SYSERR) &&
723        (emsg == NULL) )
724     {
725       GNUNET_break (0);
726       emsg = _("Invalid error message received from datastore service");
727     }
728 #if DEBUG_DATASTORE
729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
730               "Received status %d/%s\n",
731               (int) status,
732               emsg);
733 #endif
734   process_queue (h);
735   if (rc.cont != NULL)
736     rc.cont (rc.cont_cls, 
737              status,
738              emsg);
739 }
740
741
742 /**
743  * Store an item in the datastore.  If the item is already present,
744  * the priorities are summed up and the higher expiration time and
745  * lower anonymity level is used.
746  *
747  * @param h handle to the datastore
748  * @param rid reservation ID to use (from "reserve"); use 0 if no
749  *            prior reservation was made
750  * @param key key for the value
751  * @param size number of bytes in data
752  * @param data content stored
753  * @param type type of the content
754  * @param priority priority of the content
755  * @param anonymity anonymity-level for the content
756  * @param expiration expiration time for the content
757  * @param queue_priority ranking of this request in the priority queue
758  * @param max_queue_size at what queue size should this request be dropped
759  *        (if other requests of higher priority are in the queue)
760  * @param timeout timeout for the operation
761  * @param cont continuation to call when done
762  * @param cont_cls closure for cont
763  * @return NULL if the entry was not queued, otherwise a handle that can be used to
764  *         cancel; note that even if NULL is returned, the callback will be invoked
765  *         (or rather, will already have been invoked)
766  */
767 struct GNUNET_DATASTORE_QueueEntry *
768 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
769                       int rid,
770                       const GNUNET_HashCode * key,
771                       size_t size,
772                       const void *data,
773                       enum GNUNET_BLOCK_Type type,
774                       uint32_t priority,
775                       uint32_t anonymity,
776                       struct GNUNET_TIME_Absolute expiration,
777                       unsigned int queue_priority,
778                       unsigned int max_queue_size,
779                       struct GNUNET_TIME_Relative timeout,
780                       GNUNET_DATASTORE_ContinuationWithStatus cont,
781                       void *cont_cls)
782 {
783   struct GNUNET_DATASTORE_QueueEntry *qe;
784   struct DataMessage *dm;
785   size_t msize;
786   union QueueContext qc;
787
788 #if DEBUG_DATASTORE
789   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
790               "Asked to put %u bytes of data under key `%s'\n",
791               size,
792               GNUNET_h2s (key));
793 #endif
794   msize = sizeof(struct DataMessage) + size;
795   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
796   qc.sc.cont = cont;
797   qc.sc.cont_cls = cont_cls;
798   qe = make_queue_entry (h, msize,
799                          queue_priority, max_queue_size, timeout,
800                          &process_status_message, &qc);
801   if (qe == NULL)
802     return NULL;
803   dm = (struct DataMessage* ) &qe[1];
804   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
805   dm->header.size = htons(msize);
806   dm->rid = htonl(rid);
807   dm->size = htonl( (uint32_t) size);
808   dm->type = htonl(type);
809   dm->priority = htonl(priority);
810   dm->anonymity = htonl(anonymity);
811   dm->uid = GNUNET_htonll(0);
812   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
813   dm->key = *key;
814   memcpy (&dm[1], data, size);
815   process_queue (h);
816   return qe;
817 }
818
819
820 /**
821  * Reserve space in the datastore.  This function should be used
822  * to avoid "out of space" failures during a longer sequence of "put"
823  * operations (for example, when a file is being inserted).
824  *
825  * @param h handle to the datastore
826  * @param amount how much space (in bytes) should be reserved (for content only)
827  * @param entries how many entries will be created (to calculate per-entry overhead)
828  * @param queue_priority ranking of this request in the priority queue
829  * @param max_queue_size at what queue size should this request be dropped
830  *        (if other requests of higher priority are in the queue)
831  * @param timeout how long to wait at most for a response (or before dying in queue)
832  * @param cont continuation to call when done; "success" will be set to
833  *             a positive reservation value if space could be reserved.
834  * @param cont_cls closure for cont
835  * @return NULL if the entry was not queued, otherwise a handle that can be used to
836  *         cancel; note that even if NULL is returned, the callback will be invoked
837  *         (or rather, will already have been invoked)
838  */
839 struct GNUNET_DATASTORE_QueueEntry *
840 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
841                           uint64_t amount,
842                           uint32_t entries,
843                           unsigned int queue_priority,
844                           unsigned int max_queue_size,
845                           struct GNUNET_TIME_Relative timeout,
846                           GNUNET_DATASTORE_ContinuationWithStatus cont,
847                           void *cont_cls)
848 {
849   struct GNUNET_DATASTORE_QueueEntry *qe;
850   struct ReserveMessage *rm;
851   union QueueContext qc;
852
853   if (cont == NULL)
854     cont = &drop_status_cont;
855 #if DEBUG_DATASTORE
856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
857               "Asked to reserve %llu bytes of data and %u entries'\n",
858               (unsigned long long) amount,
859               (unsigned int) entries);
860 #endif
861   qc.sc.cont = cont;
862   qc.sc.cont_cls = cont_cls;
863   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
864                          queue_priority, max_queue_size, timeout,
865                          &process_status_message, &qc);
866   if (qe == NULL)
867     return NULL;
868   rm = (struct ReserveMessage*) &qe[1];
869   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
870   rm->header.size = htons(sizeof (struct ReserveMessage));
871   rm->entries = htonl(entries);
872   rm->amount = GNUNET_htonll(amount);
873   process_queue (h);
874   return qe;
875 }
876
877
878 /**
879  * Signal that all of the data for which a reservation was made has
880  * been stored and that whatever excess space might have been reserved
881  * can now be released.
882  *
883  * @param h handle to the datastore
884  * @param rid reservation ID (value of "success" in original continuation
885  *        from the "reserve" function).
886  * @param queue_priority ranking of this request in the priority queue
887  * @param max_queue_size at what queue size should this request be dropped
888  *        (if other requests of higher priority are in the queue)
889  * @param queue_priority ranking of this request in the priority queue
890  * @param max_queue_size at what queue size should this request be dropped
891  *        (if other requests of higher priority are in the queue)
892  * @param timeout how long to wait at most for a response
893  * @param cont continuation to call when done
894  * @param cont_cls closure for cont
895  * @return NULL if the entry was not queued, otherwise a handle that can be used to
896  *         cancel; note that even if NULL is returned, the callback will be invoked
897  *         (or rather, will already have been invoked)
898  */
899 struct GNUNET_DATASTORE_QueueEntry *
900 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
901                                   int rid,
902                                   unsigned int queue_priority,
903                                   unsigned int max_queue_size,
904                                   struct GNUNET_TIME_Relative timeout,
905                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
906                                   void *cont_cls)
907 {
908   struct GNUNET_DATASTORE_QueueEntry *qe;
909   struct ReleaseReserveMessage *rrm;
910   union QueueContext qc;
911
912   if (cont == NULL)
913     cont = &drop_status_cont;
914 #if DEBUG_DATASTORE
915   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
916               "Asked to release reserve %d\n",
917               rid);
918 #endif
919   qc.sc.cont = cont;
920   qc.sc.cont_cls = cont_cls;
921   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
922                          queue_priority, max_queue_size, timeout,
923                          &process_status_message, &qc);
924   if (qe == NULL)
925     return NULL;
926   rrm = (struct ReleaseReserveMessage*) &qe[1];
927   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
928   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
929   rrm->rid = htonl(rid);
930   process_queue (h);
931   return qe;
932 }
933
934
935 /**
936  * Update a value in the datastore.
937  *
938  * @param h handle to the datastore
939  * @param uid identifier for the value
940  * @param priority how much to increase the priority of the value
941  * @param expiration new expiration value should be MAX of existing and this argument
942  * @param queue_priority ranking of this request in the priority queue
943  * @param max_queue_size at what queue size should this request be dropped
944  *        (if other requests of higher priority are in the queue)
945  * @param timeout how long to wait at most for a response
946  * @param cont continuation to call when done
947  * @param cont_cls closure for cont
948  * @return NULL if the entry was not queued, otherwise a handle that can be used to
949  *         cancel; note that even if NULL is returned, the callback will be invoked
950  *         (or rather, will already have been invoked)
951  */
952 struct GNUNET_DATASTORE_QueueEntry *
953 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
954                          unsigned long long uid,
955                          uint32_t priority,
956                          struct GNUNET_TIME_Absolute expiration,
957                          unsigned int queue_priority,
958                          unsigned int max_queue_size,
959                          struct GNUNET_TIME_Relative timeout,
960                          GNUNET_DATASTORE_ContinuationWithStatus cont,
961                          void *cont_cls)
962 {
963   struct GNUNET_DATASTORE_QueueEntry *qe;
964   struct UpdateMessage *um;
965   union QueueContext qc;
966
967   if (cont == NULL)
968     cont = &drop_status_cont;
969 #if DEBUG_DATASTORE
970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
971               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
972               uid,
973               (unsigned int) priority,
974               (unsigned long long) expiration.value);
975 #endif
976   qc.sc.cont = cont;
977   qc.sc.cont_cls = cont_cls;
978   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
979                          queue_priority, max_queue_size, timeout,
980                          &process_status_message, &qc);
981   if (qe == NULL)
982     return NULL;
983   um = (struct UpdateMessage*) &qe[1];
984   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
985   um->header.size = htons(sizeof (struct UpdateMessage));
986   um->priority = htonl(priority);
987   um->expiration = GNUNET_TIME_absolute_hton(expiration);
988   um->uid = GNUNET_htonll(uid);
989   process_queue (h);
990   return qe;
991 }
992
993
994 /**
995  * Explicitly remove some content from the database.
996  * The "cont"inuation will be called with status
997  * "GNUNET_OK" if content was removed, "GNUNET_NO"
998  * if no matching entry was found and "GNUNET_SYSERR"
999  * on all other types of errors.
1000  *
1001  * @param h handle to the datastore
1002  * @param key key for the value
1003  * @param size number of bytes in data
1004  * @param data content stored
1005  * @param queue_priority ranking of this request in the priority queue
1006  * @param max_queue_size at what queue size should this request be dropped
1007  *        (if other requests of higher priority are in the queue)
1008  * @param timeout how long to wait at most for a response
1009  * @param cont continuation to call when done
1010  * @param cont_cls closure for cont
1011  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1012  *         cancel; note that even if NULL is returned, the callback will be invoked
1013  *         (or rather, will already have been invoked)
1014  */
1015 struct GNUNET_DATASTORE_QueueEntry *
1016 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1017                          const GNUNET_HashCode *key,
1018                          size_t size, 
1019                          const void *data,
1020                          unsigned int queue_priority,
1021                          unsigned int max_queue_size,
1022                          struct GNUNET_TIME_Relative timeout,
1023                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1024                          void *cont_cls)
1025 {
1026   struct GNUNET_DATASTORE_QueueEntry *qe;
1027   struct DataMessage *dm;
1028   size_t msize;
1029   union QueueContext qc;
1030
1031   if (cont == NULL)
1032     cont = &drop_status_cont;
1033 #if DEBUG_DATASTORE
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035               "Asked to remove %u bytes under key `%s'\n",
1036               size,
1037               GNUNET_h2s (key));
1038 #endif
1039   qc.sc.cont = cont;
1040   qc.sc.cont_cls = cont_cls;
1041   msize = sizeof(struct DataMessage) + size;
1042   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1043   qe = make_queue_entry (h, msize,
1044                          queue_priority, max_queue_size, timeout,
1045                          &process_status_message, &qc);
1046   if (qe == NULL)
1047     return NULL;
1048   dm = (struct DataMessage*) &qe[1];
1049   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1050   dm->header.size = htons(msize);
1051   dm->rid = htonl(0);
1052   dm->size = htonl(size);
1053   dm->type = htonl(0);
1054   dm->priority = htonl(0);
1055   dm->anonymity = htonl(0);
1056   dm->uid = GNUNET_htonll(0);
1057   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1058   dm->key = *key;
1059   memcpy (&dm[1], data, size);
1060   process_queue (h);
1061   return qe;
1062 }
1063
1064
1065 /**
1066  * Type of a function to call when we receive a message
1067  * from the service.
1068  *
1069  * @param cls closure
1070  * @param msg message received, NULL on timeout or fatal error
1071  */
1072 static void 
1073 process_result_message (void *cls,
1074                         const struct GNUNET_MessageHeader * msg)
1075 {
1076   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1077   struct GNUNET_DATASTORE_Handle *h = qe->h;
1078   struct ResultContext rc = qe->qc.rc;
1079   const struct DataMessage *dm;
1080   int was_transmitted;
1081
1082   h->in_receive = GNUNET_NO;
1083   if (msg == NULL)
1084    {
1085       was_transmitted = qe->was_transmitted;
1086       free_queue_entry (qe);
1087       if (was_transmitted == GNUNET_YES)
1088         {
1089           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1090                       _("Failed to receive response from database.\n"));
1091           do_disconnect (h);
1092         }
1093       if (rc.iter != NULL)
1094         rc.iter (rc.iter_cls,
1095                  NULL, 0, NULL, 0, 0, 0, 
1096                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1097       return;
1098     }
1099   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1100   GNUNET_assert (h->queue_head == qe);
1101   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1102     {
1103       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1104 #if DEBUG_DATASTORE
1105       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106                   "Received end of result set\n");
1107 #endif
1108       free_queue_entry (qe);
1109       if (rc.iter != NULL)
1110         rc.iter (rc.iter_cls,
1111                  NULL, 0, NULL, 0, 0, 0, 
1112                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1113       process_queue (h);
1114       return;
1115     }
1116   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1117        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1118        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1119     {
1120       GNUNET_break (0);
1121       free_queue_entry (qe);
1122       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1123       do_disconnect (h);
1124       if (rc.iter != NULL)
1125         rc.iter (rc.iter_cls,
1126                  NULL, 0, NULL, 0, 0, 0, 
1127                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1128       return;
1129     }
1130   if (rc.iter == NULL)
1131     {
1132       /* abort iteration */
1133 #if DEBUG_DATASTORE
1134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1135                   "Aborting iteration via disconnect (client has cancelled)\n");
1136 #endif
1137       free_queue_entry (qe);
1138       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1139       do_disconnect (h);
1140       return;
1141     }
1142   dm = (const struct DataMessage*) msg;
1143 #if DEBUG_DATASTORE
1144   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1145               "Received result %llu with type %u and size %u with key %s\n",
1146               (unsigned long long) GNUNET_ntohll(dm->uid),
1147               ntohl(dm->type),
1148               ntohl(dm->size),
1149               GNUNET_h2s(&dm->key));
1150 #endif
1151   rc.iter (rc.iter_cls,
1152            &dm->key,
1153            ntohl(dm->size),
1154            &dm[1],
1155            ntohl(dm->type),
1156            ntohl(dm->priority),
1157            ntohl(dm->anonymity),
1158            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1159            GNUNET_ntohll(dm->uid));
1160 }
1161
1162
1163 /**
1164  * Get a random value from the datastore.
1165  *
1166  * @param h handle to the datastore
1167  * @param queue_priority ranking of this request in the priority queue
1168  * @param max_queue_size at what queue size should this request be dropped
1169  *        (if other requests of higher priority are in the queue)
1170  * @param timeout how long to wait at most for a response
1171  * @param iter function to call on a random value; it
1172  *        will be called once with a value (if available)
1173  *        and always once with a value of NULL.
1174  * @param iter_cls closure for iter
1175  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1176  *         cancel; note that even if NULL is returned, the callback will be invoked
1177  *         (or rather, will already have been invoked)
1178  */
1179 struct GNUNET_DATASTORE_QueueEntry *
1180 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1181                              unsigned int queue_priority,
1182                              unsigned int max_queue_size,
1183                              struct GNUNET_TIME_Relative timeout,
1184                              GNUNET_DATASTORE_Iterator iter, 
1185                              void *iter_cls)
1186 {
1187   struct GNUNET_DATASTORE_QueueEntry *qe;
1188   struct GNUNET_MessageHeader *m;
1189   union QueueContext qc;
1190
1191 #if DEBUG_DATASTORE
1192   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193               "Asked to get random entry in %llu ms\n",
1194               (unsigned long long) timeout.value);
1195 #endif
1196   qc.rc.iter = iter;
1197   qc.rc.iter_cls = iter_cls;
1198   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1199                          queue_priority, max_queue_size, timeout,
1200                          &process_result_message, &qc);
1201   if (qe == NULL)
1202     return NULL;    
1203   m = (struct GNUNET_MessageHeader*) &qe[1];
1204   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1205   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1206   process_queue (h);
1207   return qe;
1208 }
1209
1210
1211 /**
1212  * Get a zero-anonymity value from the datastore.
1213  *
1214  * @param h handle to the datastore
1215  * @param queue_priority ranking of this request in the priority queue
1216  * @param max_queue_size at what queue size should this request be dropped
1217  *        (if other requests of higher priority are in the queue)
1218  * @param timeout how long to wait at most for a response
1219  * @param type allowed type for the operation
1220  * @param iter function to call on a random value; it
1221  *        will be called once with a value (if available)
1222  *        and always once with a value of NULL.
1223  * @param iter_cls closure for iter
1224  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1225  *         cancel; note that even if NULL is returned, the callback will be invoked
1226  *         (or rather, will already have been invoked)
1227  */
1228 struct GNUNET_DATASTORE_QueueEntry *
1229 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1230                                      unsigned int queue_priority,
1231                                      unsigned int max_queue_size,
1232                                      struct GNUNET_TIME_Relative timeout,
1233                                      enum GNUNET_BLOCK_Type type,
1234                                      GNUNET_DATASTORE_Iterator iter, 
1235                                      void *iter_cls)
1236 {
1237   struct GNUNET_DATASTORE_QueueEntry *qe;
1238   struct GetZeroAnonymityMessage *m;
1239   union QueueContext qc;
1240
1241 #if DEBUG_DATASTORE
1242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1243               "Asked to get zero-anonymity entry in %llu ms\n",
1244               (unsigned long long) timeout.value);
1245 #endif
1246   qc.rc.iter = iter;
1247   qc.rc.iter_cls = iter_cls;
1248   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1249                          queue_priority, max_queue_size, timeout,
1250                          &process_result_message, &qc);
1251   if (qe == NULL)
1252     return NULL;    
1253   m = (struct GetZeroAnonymityMessage*) &qe[1];
1254   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1255   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1256   m->type = htonl ((uint32_t) type);
1257   process_queue (h);
1258   return qe;
1259 }
1260
1261
1262
1263 /**
1264  * Iterate over the results for a particular key
1265  * in the datastore.  The iterator will only be called
1266  * once initially; if the first call did contain a
1267  * result, further results can be obtained by calling
1268  * "GNUNET_DATASTORE_get_next" with the given argument.
1269  *
1270  * @param h handle to the datastore
1271  * @param key maybe NULL (to match all entries)
1272  * @param type desired type, 0 for any
1273  * @param queue_priority ranking of this request in the priority queue
1274  * @param max_queue_size at what queue size should this request be dropped
1275  *        (if other requests of higher priority are in the queue)
1276  * @param timeout how long to wait at most for a response
1277  * @param iter function to call on each matching value;
1278  *        will be called once with a NULL value at the end
1279  * @param iter_cls closure for iter
1280  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1281  *         cancel; note that even if NULL is returned, the callback will be invoked
1282  *         (or rather, will already have been invoked)
1283  */
1284 struct GNUNET_DATASTORE_QueueEntry *
1285 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1286                       const GNUNET_HashCode * key,
1287                       enum GNUNET_BLOCK_Type type,
1288                       unsigned int queue_priority,
1289                       unsigned int max_queue_size,
1290                       struct GNUNET_TIME_Relative timeout,
1291                       GNUNET_DATASTORE_Iterator iter, 
1292                       void *iter_cls)
1293 {
1294   struct GNUNET_DATASTORE_QueueEntry *qe;
1295   struct GetMessage *gm;
1296   union QueueContext qc;
1297
1298 #if DEBUG_DATASTORE
1299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1300               "Asked to look for data of type %u under key `%s'\n",
1301               (unsigned int) type,
1302               GNUNET_h2s (key));
1303 #endif
1304   qc.rc.iter = iter;
1305   qc.rc.iter_cls = iter_cls;
1306   qe = make_queue_entry (h, sizeof(struct GetMessage),
1307                          queue_priority, max_queue_size, timeout,
1308                          &process_result_message, &qc);
1309   if (qe == NULL)
1310     return NULL;
1311   gm = (struct GetMessage*) &qe[1];
1312   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1313   gm->type = htonl(type);
1314   if (key != NULL)
1315     {
1316       gm->header.size = htons(sizeof (struct GetMessage));
1317       gm->key = *key;
1318     }
1319   else
1320     {
1321       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1322     }
1323   process_queue (h);
1324   return qe;
1325 }
1326
1327
1328 /**
1329  * Function called to trigger obtaining the next result
1330  * from the datastore.
1331  * 
1332  * @param h handle to the datastore
1333  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1334  *        iteration (with a final call to "iter" with key/data == NULL).
1335  */
1336 void 
1337 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1338                            int more)
1339 {
1340   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1341   struct ResultContext rc = qe->qc.rc;
1342
1343   GNUNET_assert (&process_result_message == qe->response_proc);
1344   if (GNUNET_YES == more)
1345     {     
1346       h->in_receive = GNUNET_YES;
1347       GNUNET_CLIENT_receive (h->client,
1348                              qe->response_proc,
1349                              qe,
1350                              GNUNET_TIME_absolute_get_remaining (qe->timeout));
1351       return;
1352     }
1353   free_queue_entry (qe);
1354   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1355   do_disconnect (h);
1356   rc.iter (rc.iter_cls,
1357            NULL, 0, NULL, 0, 0, 0, 
1358            GNUNET_TIME_UNIT_ZERO_ABS, 0);       
1359 }
1360
1361
1362 /**
1363  * Cancel a datastore operation.  The final callback from the
1364  * operation must not have been done yet.
1365  * 
1366  * @param qe operation to cancel
1367  */
1368 void
1369 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1370 {
1371   struct GNUNET_DATASTORE_Handle *h;
1372   int reconnect;
1373
1374   h = qe->h;
1375 #if DEBUG_DATASTORE
1376   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1377                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1378                qe,
1379                qe->was_transmitted,
1380                h->queue_head == qe);
1381 #endif
1382   reconnect = GNUNET_NO;
1383   if (GNUNET_YES == qe->was_transmitted) 
1384     {
1385       if (qe->response_proc == &process_result_message) 
1386         {
1387           qe->qc.rc.iter = NULL;    
1388           if (GNUNET_YES != h->in_receive)
1389             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1390         }
1391       else
1392         {
1393           qe->qc.sc.cont = NULL;
1394         }
1395       return;
1396     }
1397   free_queue_entry (qe);
1398   process_queue (h);
1399 }
1400
1401
1402 /* end of datastore_api.c */