Merge branch 'master' of git+ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for @e cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for @e proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98 /**
99  * Entry in our priority queue.
100  */
101 struct GNUNET_DATASTORE_QueueEntry
102 {
103
104   /**
105    * This is a linked list.
106    */
107   struct GNUNET_DATASTORE_QueueEntry *next;
108
109   /**
110    * This is a linked list.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *prev;
113
114   /**
115    * Handle to the master context.
116    */
117   struct GNUNET_DATASTORE_Handle *h;
118
119   /**
120    * Function to call after transmission of the request.
121    */
122   GNUNET_DATASTORE_ContinuationWithStatus cont;
123
124   /**
125    * Closure for @e cont.
126    */
127   void *cont_cls;
128
129   /**
130    * Context for the operation.
131    */
132   union QueueContext qc;
133
134   /**
135    * Envelope of the request to transmit, NULL after
136    * transmission.
137    */
138   struct GNUNET_MQ_Envelope *env;
139
140   /**
141    * Priority in the queue.
142    */
143   unsigned int priority;
144
145   /**
146    * Maximum allowed length of queue (otherwise
147    * this request should be discarded).
148    */
149   unsigned int max_queue;
150
151   /**
152    * Expected response type.
153    */
154   uint16_t response_type;
155
156 };
157
158
159 /**
160  * Handle to the datastore service.
161  */
162 struct GNUNET_DATASTORE_Handle
163 {
164
165   /**
166    * Our configuration.
167    */
168   const struct GNUNET_CONFIGURATION_Handle *cfg;
169
170   /**
171    * Current connection to the datastore service.
172    */
173   struct GNUNET_MQ_Handle *mq;
174
175   /**
176    * Handle for statistics.
177    */
178   struct GNUNET_STATISTICS_Handle *stats;
179
180   /**
181    * Current head of priority queue.
182    */
183   struct GNUNET_DATASTORE_QueueEntry *queue_head;
184
185   /**
186    * Current tail of priority queue.
187    */
188   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
189
190   /**
191    * Task for trying to reconnect.
192    */
193   struct GNUNET_SCHEDULER_Task *reconnect_task;
194
195   /**
196    * How quickly should we retry?  Used for exponential back-off on
197    * connect-errors.
198    */
199   struct GNUNET_TIME_Relative retry_time;
200
201   /**
202    * Number of entries in the queue.
203    */
204   unsigned int queue_size;
205
206   /**
207    * Number of results we're receiving for the current query
208    * after application stopped to care.  Used to determine when
209    * to reset the connection.
210    */
211   unsigned int result_count;
212
213   /**
214    * We should ignore the next message(s) from the service.
215    */
216   unsigned int skip_next_messages;
217
218 };
219
220
221 /**
222  * Try reconnecting to the datastore service.
223  *
224  * @param cls the `struct GNUNET_DATASTORE_Handle`
225  */
226 static void
227 try_reconnect (void *cls);
228
229
230 /**
231  * Disconnect from the service and then try reconnecting to the datastore service
232  * after some delay.
233  *
234  * @param h handle to datastore to disconnect and reconnect
235  */
236 static void
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238 {
239   if (NULL == h->mq)
240   {
241     GNUNET_break (0);
242     return;
243   }
244   GNUNET_MQ_destroy (h->mq);
245   h->mq = NULL;
246   h->skip_next_messages = 0;
247   h->reconnect_task
248     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249                                     &try_reconnect,
250                                     h);
251 }
252
253
254 /**
255  * Free a queue entry.  Removes the given entry from the
256  * queue and releases associated resources.  Does NOT
257  * call the callback.
258  *
259  * @param qe entry to free.
260  */
261 static void
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263 {
264   struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266   GNUNET_CONTAINER_DLL_remove (h->queue_head,
267                                h->queue_tail,
268                                qe);
269   h->queue_size--;
270   if (NULL != qe->env)
271     GNUNET_MQ_discard (qe->env);
272   GNUNET_free (qe);
273 }
274
275
276 /**
277  * Handle error in sending drop request to datastore.
278  *
279  * @param cls closure with the datastore handle
280  * @param error error code
281  */
282 static void
283 mq_error_handler (void *cls,
284                   enum GNUNET_MQ_Error error)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_DATASTORE_QueueEntry *qe;
288
289   LOG (GNUNET_ERROR_TYPE_DEBUG,
290        "MQ error, reconnecting to DATASTORE\n");
291   do_disconnect (h);
292   qe = h->queue_head;
293   if ( (NULL != qe) &&
294        (NULL == qe->env) )
295   {
296     union QueueContext qc = qe->qc;
297     uint16_t rt = qe->response_type;
298
299     LOG (GNUNET_ERROR_TYPE_DEBUG,
300          "Failed to receive response from database.\n");
301     free_queue_entry (qe);
302     switch (rt)
303     {
304     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
305       if (NULL != qc.sc.cont)
306         qc.sc.cont (qc.sc.cont_cls,
307                     GNUNET_SYSERR,
308                     GNUNET_TIME_UNIT_ZERO_ABS,
309                     _("DATASTORE disconnected"));
310       break;
311     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
312       if (NULL != qc.rc.proc)
313         qc.rc.proc (qc.rc.proc_cls,
314                     NULL,
315                     0,
316                     NULL, 0, 0, 0,
317                     GNUNET_TIME_UNIT_ZERO_ABS,
318                     0);
319       break;
320     default:
321       GNUNET_break (0);
322     }
323   }
324 }
325
326
327 /**
328  * Connect to the datastore service.
329  *
330  * @param cfg configuration to use
331  * @return handle to use to access the service
332  */
333 struct GNUNET_DATASTORE_Handle *
334 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
335 {
336   struct GNUNET_DATASTORE_Handle *h;
337
338   LOG (GNUNET_ERROR_TYPE_DEBUG,
339        "Establishing DATASTORE connection!\n");
340   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
341   h->cfg = cfg;
342   try_reconnect (h);
343   if (NULL == h->mq)
344   {
345     GNUNET_free (h);
346     return NULL;
347   }
348   h->stats = GNUNET_STATISTICS_create ("datastore-api",
349                                        cfg);
350   return h;
351 }
352
353
354 /**
355  * Task used by to disconnect from the datastore after
356  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
357  *
358  * @param cls the datastore handle
359  */
360 static void
361 disconnect_after_drop (void *cls)
362 {
363   struct GNUNET_DATASTORE_Handle *h = cls;
364
365   LOG (GNUNET_ERROR_TYPE_DEBUG,
366        "Drop sent, disconnecting\n");
367   GNUNET_DATASTORE_disconnect (h,
368                                GNUNET_NO);
369 }
370
371
372 /**
373  * Handle error in sending drop request to datastore.
374  *
375  * @param cls closure with the datastore handle
376  * @param error error code
377  */
378 static void
379 disconnect_on_mq_error (void *cls,
380                         enum GNUNET_MQ_Error error)
381 {
382   struct GNUNET_DATASTORE_Handle *h = cls;
383
384   LOG (GNUNET_ERROR_TYPE_ERROR,
385        "Failed to ask datastore to drop tables\n");
386   GNUNET_DATASTORE_disconnect (h,
387                                GNUNET_NO);
388 }
389
390
391 /**
392  * Disconnect from the datastore service (and free
393  * associated resources).
394  *
395  * @param h handle to the datastore
396  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
397  */
398 void
399 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
400                              int drop)
401 {
402   struct GNUNET_DATASTORE_QueueEntry *qe;
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Datastore disconnect\n");
406   if (NULL != h->mq)
407   {
408     GNUNET_MQ_destroy (h->mq);
409     h->mq = NULL;
410   }
411   if (NULL != h->reconnect_task)
412   {
413     GNUNET_SCHEDULER_cancel (h->reconnect_task);
414     h->reconnect_task = NULL;
415   }
416   while (NULL != (qe = h->queue_head))
417   {
418     switch (qe->response_type)
419     {
420     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
421       if (NULL != qe->qc.sc.cont)
422         qe->qc.sc.cont (qe->qc.sc.cont_cls,
423                         GNUNET_SYSERR,
424                         GNUNET_TIME_UNIT_ZERO_ABS,
425                         _("Disconnected from DATASTORE"));
426       break;
427     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
428       if (NULL != qe->qc.rc.proc)
429         qe->qc.rc.proc (qe->qc.rc.proc_cls,
430                         NULL,
431                         0,
432                         NULL, 0, 0, 0,
433                         GNUNET_TIME_UNIT_ZERO_ABS,
434                         0);
435       break;
436     default:
437       GNUNET_break (0);
438     }
439     free_queue_entry (qe);
440   }
441   if (GNUNET_YES == drop)
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "Re-connecting to issue DROP!\n");
445     GNUNET_assert (NULL == h->mq);
446     h->mq = GNUNET_CLIENT_connecT (h->cfg,
447                                    "datastore",
448                                    NULL,
449                                    &disconnect_on_mq_error,
450                                    h);
451     if (NULL != h->mq)
452     {
453       struct GNUNET_MessageHeader *hdr;
454       struct GNUNET_MQ_Envelope *env;
455
456       env = GNUNET_MQ_msg (hdr,
457                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
458       GNUNET_MQ_notify_sent (env,
459                              &disconnect_after_drop,
460                              h);
461       GNUNET_MQ_send (h->mq,
462                       env);
463       return;
464     }
465     GNUNET_break (0);
466   }
467   GNUNET_STATISTICS_destroy (h->stats,
468                              GNUNET_NO);
469   h->stats = NULL;
470   GNUNET_free (h);
471 }
472
473
474 /**
475  * Create a new entry for our priority queue (and possibly discard other entires if
476  * the queue is getting too long).
477  *
478  * @param h handle to the datastore
479  * @param env envelope with the message to queue
480  * @param queue_priority priority of the entry
481  * @param max_queue_size at what queue size should this request be dropped
482  *        (if other requests of higher priority are in the queue)
483  * @param expected_type which type of response do we expect,
484  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
485  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
486  * @param qc client context (NOT a closure for @a response_proc)
487  * @return NULL if the queue is full
488  */
489 static struct GNUNET_DATASTORE_QueueEntry *
490 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
491                   struct GNUNET_MQ_Envelope *env,
492                   unsigned int queue_priority,
493                   unsigned int max_queue_size,
494                   uint16_t expected_type,
495                   const union QueueContext *qc)
496 {
497   struct GNUNET_DATASTORE_QueueEntry *qe;
498   struct GNUNET_DATASTORE_QueueEntry *pos;
499   unsigned int c;
500
501   c = 0;
502   pos = h->queue_head;
503   while ( (NULL != pos) &&
504           (c < max_queue_size) &&
505           (pos->priority >= queue_priority) )
506   {
507     c++;
508     pos = pos->next;
509   }
510   if (c >= max_queue_size)
511   {
512     GNUNET_STATISTICS_update (h->stats,
513                               gettext_noop ("# queue overflows"),
514                               1,
515                               GNUNET_NO);
516     GNUNET_MQ_discard (env);
517     return NULL;
518   }
519   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
520   qe->h = h;
521   qe->env = env;
522   qe->response_type = expected_type;
523   qe->qc = *qc;
524   qe->priority = queue_priority;
525   qe->max_queue = max_queue_size;
526   if (NULL == pos)
527   {
528     /* append at the tail */
529     pos = h->queue_tail;
530   }
531   else
532   {
533     pos = pos->prev;
534     /* do not insert at HEAD if HEAD query was already
535      * transmitted and we are still receiving replies! */
536     if ( (NULL == pos) &&
537          (NULL == h->queue_head->env) )
538       pos = h->queue_head;
539   }
540   c++;
541 #if INSANE_STATISTICS
542   GNUNET_STATISTICS_update (h->stats,
543                             gettext_noop ("# queue entries created"),
544                             1,
545                             GNUNET_NO);
546 #endif
547   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
548                                      h->queue_tail,
549                                      pos,
550                                      qe);
551   h->queue_size++;
552   return qe;
553 }
554
555
556 /**
557  * Process entries in the queue (or do nothing if we are already
558  * doing so).
559  *
560  * @param h handle to the datastore
561  */
562 static void
563 process_queue (struct GNUNET_DATASTORE_Handle *h)
564 {
565   struct GNUNET_DATASTORE_QueueEntry *qe;
566
567   if (NULL == (qe = h->queue_head))
568   {
569     /* no entry in queue */
570     LOG (GNUNET_ERROR_TYPE_DEBUG,
571          "Queue empty\n");
572     return;
573   }
574   if (NULL == qe->env)
575   {
576     /* waiting for replies */
577     LOG (GNUNET_ERROR_TYPE_DEBUG,
578          "Head request already transmitted\n");
579     return;
580   }
581   if (NULL == h->mq)
582   {
583     /* waiting for reconnect */
584     LOG (GNUNET_ERROR_TYPE_DEBUG,
585          "Not connected\n");
586     return;
587   }
588   GNUNET_MQ_send (h->mq,
589                   qe->env);
590   qe->env = NULL;
591 }
592
593
594
595
596 /**
597  * Function called to check status message from the service.
598  *
599  * @param cls closure
600  * @param sm status message received
601  * @return #GNUNET_OK if the message is well-formed
602  */
603 static int
604 check_status (void *cls,
605               const struct StatusMessage *sm)
606 {
607   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
608   int32_t status = ntohl (sm->status);
609
610   if (msize > 0)
611   {
612     const char *emsg = (const char *) &sm[1];
613
614     if ('\0' != emsg[msize - 1])
615     {
616       GNUNET_break (0);
617       return GNUNET_SYSERR;
618     }
619   }
620   else if (GNUNET_SYSERR == status)
621   {
622     GNUNET_break (0);
623     return GNUNET_SYSERR;
624   }
625   return GNUNET_OK;
626 }
627
628
629 /**
630  * Function called to handle status message from the service.
631  *
632  * @param cls closure
633  * @param sm status message received
634  */
635 static void
636 handle_status (void *cls,
637                const struct StatusMessage *sm)
638 {
639   struct GNUNET_DATASTORE_Handle *h = cls;
640   struct GNUNET_DATASTORE_QueueEntry *qe;
641   struct StatusContext rc;
642   const char *emsg;
643   int32_t status = ntohl (sm->status);
644
645   if (h->skip_next_messages > 0)
646   {
647     h->skip_next_messages--;
648     process_queue (h);
649     return;
650   }
651   if (NULL == (qe = h->queue_head))
652   {
653     GNUNET_break (0);
654     do_disconnect (h);
655     return;
656   }
657   if (NULL != qe->env)
658   {
659     GNUNET_break (0);
660     do_disconnect (h);
661     return;
662   }
663   if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
664   {
665     GNUNET_break (0);
666     do_disconnect (h);
667     return;
668   }
669   rc = qe->qc.sc;
670   free_queue_entry (qe);
671   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
672     emsg = (const char *) &sm[1];
673   else
674     emsg = NULL;
675   LOG (GNUNET_ERROR_TYPE_DEBUG,
676        "Received status %d/%s\n",
677        (int) status,
678        emsg);
679   GNUNET_STATISTICS_update (h->stats,
680                             gettext_noop ("# status messages received"),
681                             1,
682                             GNUNET_NO);
683   h->retry_time = GNUNET_TIME_UNIT_ZERO;
684   process_queue (h);
685   if (NULL != rc.cont)
686     rc.cont (rc.cont_cls,
687              status,
688              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
689              emsg);
690 }
691
692
693 /**
694  * Check data message we received from the service.
695  *
696  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
697  * @param dm message received
698  */
699 static int
700 check_data (void *cls,
701             const struct DataMessage *dm)
702 {
703   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
704
705   if (msize != ntohl (dm->size))
706   {
707     GNUNET_break (0);
708     return GNUNET_SYSERR;
709   }
710   return GNUNET_OK;
711 }
712
713
714 /**
715  * Handle data message we got from the service.
716  *
717  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
718  * @param dm message received
719  */
720 static void
721 handle_data (void *cls,
722              const struct DataMessage *dm)
723 {
724   struct GNUNET_DATASTORE_Handle *h = cls;
725   struct GNUNET_DATASTORE_QueueEntry *qe;
726   struct ResultContext rc;
727
728   if (h->skip_next_messages > 0)
729   {
730     process_queue (h);
731     return;
732   }
733   qe = h->queue_head;
734   if (NULL == qe)
735   {
736     GNUNET_break (0);
737     do_disconnect (h);
738     return;
739   }
740   if (NULL != qe->env)
741   {
742     GNUNET_break (0);
743     do_disconnect (h);
744     return;
745   }
746   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
747   {
748     GNUNET_break (0);
749     do_disconnect (h);
750     return;
751   }
752 #if INSANE_STATISTICS
753   GNUNET_STATISTICS_update (h->stats,
754                             gettext_noop ("# Results received"),
755                             1,
756                             GNUNET_NO);
757 #endif
758   LOG (GNUNET_ERROR_TYPE_DEBUG,
759        "Received result %llu with type %u and size %u with key %s\n",
760        (unsigned long long) GNUNET_ntohll (dm->uid),
761        ntohl (dm->type),
762        ntohl (dm->size),
763        GNUNET_h2s (&dm->key));
764   rc = qe->qc.rc;
765   free_queue_entry (qe);
766   h->retry_time = GNUNET_TIME_UNIT_ZERO;
767   process_queue (h);
768   if (NULL != rc.proc)
769     rc.proc (rc.proc_cls,
770              &dm->key,
771              ntohl (dm->size),
772              &dm[1],
773              ntohl (dm->type),
774              ntohl (dm->priority),
775              ntohl (dm->anonymity),
776              GNUNET_TIME_absolute_ntoh (dm->expiration),
777              GNUNET_ntohll (dm->uid));
778 }
779
780
781 /**
782  * Type of a function to call when we receive a
783  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
784  *
785  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
786  * @param msg message received
787  */
788 static void
789 handle_data_end (void *cls,
790                  const struct GNUNET_MessageHeader *msg)
791 {
792   struct GNUNET_DATASTORE_Handle *h = cls;
793   struct GNUNET_DATASTORE_QueueEntry *qe;
794   struct ResultContext rc;
795
796   if (h->skip_next_messages > 0)
797   {
798     h->skip_next_messages--;
799     process_queue (h);
800     return;
801   }
802   qe = h->queue_head;
803   if (NULL == qe)
804   {
805     GNUNET_break (0);
806     do_disconnect (h);
807     return;
808   }
809   if (NULL != qe->env)
810   {
811     GNUNET_break (0);
812     do_disconnect (h);
813     return;
814   }
815   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
816   {
817     GNUNET_break (0);
818     do_disconnect (h);
819     return;
820   }
821   rc = qe->qc.rc;
822   free_queue_entry (qe);
823   LOG (GNUNET_ERROR_TYPE_DEBUG,
824        "Received end of result set, new queue size is %u\n",
825        h->queue_size);
826   h->retry_time = GNUNET_TIME_UNIT_ZERO;
827   h->result_count = 0;
828   process_queue (h);
829   /* signal end of iteration */
830   if (NULL != rc.proc)
831     rc.proc (rc.proc_cls,
832              NULL,
833              0,
834              NULL,
835              0,
836              0,
837              0,
838              GNUNET_TIME_UNIT_ZERO_ABS,
839              0);
840 }
841
842
843 /**
844  * Try reconnecting to the datastore service.
845  *
846  * @param cls the `struct GNUNET_DATASTORE_Handle`
847  */
848 static void
849 try_reconnect (void *cls)
850 {
851   struct GNUNET_DATASTORE_Handle *h = cls;
852   struct GNUNET_MQ_MessageHandler handlers[] = {
853     GNUNET_MQ_hd_var_size (status,
854                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
855                            struct StatusMessage,
856                            h),
857     GNUNET_MQ_hd_var_size (data,
858                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
859                            struct DataMessage,
860                            h),
861     GNUNET_MQ_hd_fixed_size (data_end,
862                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
863                              struct GNUNET_MessageHeader,
864                              h),
865     GNUNET_MQ_handler_end ()
866   };
867
868   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
869   h->reconnect_task = NULL;
870   GNUNET_assert (NULL == h->mq);
871   h->mq = GNUNET_CLIENT_connecT (h->cfg,
872                                  "datastore",
873                                  handlers,
874                                  &mq_error_handler,
875                                  h);
876   if (NULL == h->mq)
877     return;
878   GNUNET_STATISTICS_update (h->stats,
879                             gettext_noop ("# datastore connections (re)created"),
880                             1,
881                             GNUNET_NO);
882   LOG (GNUNET_ERROR_TYPE_DEBUG,
883        "Reconnected to DATASTORE\n");
884   process_queue (h);
885 }
886
887
888 /**
889  * Dummy continuation used to do nothing (but be non-zero).
890  *
891  * @param cls closure
892  * @param result result
893  * @param min_expiration expiration time
894  * @param emsg error message
895  */
896 static void
897 drop_status_cont (void *cls,
898                   int32_t result,
899                   struct GNUNET_TIME_Absolute min_expiration,
900                   const char *emsg)
901 {
902   /* do nothing */
903 }
904
905
906 /**
907  * Store an item in the datastore.  If the item is already present,
908  * the priorities are summed up and the higher expiration time and
909  * lower anonymity level is used.
910  *
911  * @param h handle to the datastore
912  * @param rid reservation ID to use (from "reserve"); use 0 if no
913  *            prior reservation was made
914  * @param key key for the value
915  * @param size number of bytes in data
916  * @param data content stored
917  * @param type type of the content
918  * @param priority priority of the content
919  * @param anonymity anonymity-level for the content
920  * @param replication how often should the content be replicated to other peers?
921  * @param expiration expiration time for the content
922  * @param queue_priority ranking of this request in the priority queue
923  * @param max_queue_size at what queue size should this request be dropped
924  *        (if other requests of higher priority are in the queue)
925  * @param cont continuation to call when done
926  * @param cont_cls closure for @a cont
927  * @return NULL if the entry was not queued, otherwise a handle that can be used to
928  *         cancel; note that even if NULL is returned, the callback will be invoked
929  *         (or rather, will already have been invoked)
930  */
931 struct GNUNET_DATASTORE_QueueEntry *
932 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
933                       uint32_t rid,
934                       const struct GNUNET_HashCode *key,
935                       size_t size,
936                       const void *data,
937                       enum GNUNET_BLOCK_Type type,
938                       uint32_t priority,
939                       uint32_t anonymity,
940                       uint32_t replication,
941                       struct GNUNET_TIME_Absolute expiration,
942                       unsigned int queue_priority,
943                       unsigned int max_queue_size,
944                       GNUNET_DATASTORE_ContinuationWithStatus cont,
945                       void *cont_cls)
946 {
947   struct GNUNET_DATASTORE_QueueEntry *qe;
948   struct GNUNET_MQ_Envelope *env;
949   struct DataMessage *dm;
950   union QueueContext qc;
951
952   if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
953   {
954     GNUNET_break (0);
955     return NULL;
956   }
957
958   LOG (GNUNET_ERROR_TYPE_DEBUG,
959        "Asked to put %u bytes of data under key `%s' for %s\n",
960        size,
961        GNUNET_h2s (key),
962        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
963                                                GNUNET_YES));
964   env = GNUNET_MQ_msg_extra (dm,
965                              size,
966                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
967   dm->rid = htonl (rid);
968   dm->size = htonl ((uint32_t) size);
969   dm->type = htonl (type);
970   dm->priority = htonl (priority);
971   dm->anonymity = htonl (anonymity);
972   dm->replication = htonl (replication);
973   dm->reserved = htonl (0);
974   dm->uid = GNUNET_htonll (0);
975   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
976   dm->key = *key;
977   GNUNET_memcpy (&dm[1],
978           data,
979           size);
980   qc.sc.cont = cont;
981   qc.sc.cont_cls = cont_cls;
982   qe = make_queue_entry (h,
983                          env,
984                          queue_priority,
985                          max_queue_size,
986                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
987                          &qc);
988   if (NULL == qe)
989   {
990     LOG (GNUNET_ERROR_TYPE_DEBUG,
991          "Could not create queue entry for PUT\n");
992     return NULL;
993   }
994   GNUNET_STATISTICS_update (h->stats,
995                             gettext_noop ("# PUT requests executed"),
996                             1,
997                             GNUNET_NO);
998   process_queue (h);
999   return qe;
1000 }
1001
1002
1003 /**
1004  * Reserve space in the datastore.  This function should be used
1005  * to avoid "out of space" failures during a longer sequence of "put"
1006  * operations (for example, when a file is being inserted).
1007  *
1008  * @param h handle to the datastore
1009  * @param amount how much space (in bytes) should be reserved (for content only)
1010  * @param entries how many entries will be created (to calculate per-entry overhead)
1011  * @param cont continuation to call when done; "success" will be set to
1012  *             a positive reservation value if space could be reserved.
1013  * @param cont_cls closure for @a cont
1014  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1015  *         cancel; note that even if NULL is returned, the callback will be invoked
1016  *         (or rather, will already have been invoked)
1017  */
1018 struct GNUNET_DATASTORE_QueueEntry *
1019 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1020                           uint64_t amount,
1021                           uint32_t entries,
1022                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1023                           void *cont_cls)
1024 {
1025   struct GNUNET_DATASTORE_QueueEntry *qe;
1026   struct GNUNET_MQ_Envelope *env;
1027   struct ReserveMessage *rm;
1028   union QueueContext qc;
1029
1030   if (NULL == cont)
1031     cont = &drop_status_cont;
1032   LOG (GNUNET_ERROR_TYPE_DEBUG,
1033        "Asked to reserve %llu bytes of data and %u entries\n",
1034        (unsigned long long) amount,
1035        (unsigned int) entries);
1036   env = GNUNET_MQ_msg (rm,
1037                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1038   rm->entries = htonl (entries);
1039   rm->amount = GNUNET_htonll (amount);
1040
1041   qc.sc.cont = cont;
1042   qc.sc.cont_cls = cont_cls;
1043   qe = make_queue_entry (h,
1044                          env,
1045                          UINT_MAX,
1046                          UINT_MAX,
1047                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1048                          &qc);
1049   if (NULL == qe)
1050   {
1051     LOG (GNUNET_ERROR_TYPE_DEBUG,
1052          "Could not create queue entry to reserve\n");
1053     return NULL;
1054   }
1055   GNUNET_STATISTICS_update (h->stats,
1056                             gettext_noop ("# RESERVE requests executed"),
1057                             1,
1058                             GNUNET_NO);
1059   process_queue (h);
1060   return qe;
1061 }
1062
1063
1064 /**
1065  * Signal that all of the data for which a reservation was made has
1066  * been stored and that whatever excess space might have been reserved
1067  * can now be released.
1068  *
1069  * @param h handle to the datastore
1070  * @param rid reservation ID (value of "success" in original continuation
1071  *        from the "reserve" function).
1072  * @param queue_priority ranking of this request in the priority queue
1073  * @param max_queue_size at what queue size should this request be dropped
1074  *        (if other requests of higher priority are in the queue)
1075  * @param queue_priority ranking of this request in the priority queue
1076  * @param max_queue_size at what queue size should this request be dropped
1077  *        (if other requests of higher priority are in the queue)
1078  * @param cont continuation to call when done
1079  * @param cont_cls closure for @a cont
1080  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1081  *         cancel; note that even if NULL is returned, the callback will be invoked
1082  *         (or rather, will already have been invoked)
1083  */
1084 struct GNUNET_DATASTORE_QueueEntry *
1085 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1086                                   uint32_t rid,
1087                                   unsigned int queue_priority,
1088                                   unsigned int max_queue_size,
1089                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1090                                   void *cont_cls)
1091 {
1092   struct GNUNET_DATASTORE_QueueEntry *qe;
1093   struct GNUNET_MQ_Envelope *env;
1094   struct ReleaseReserveMessage *rrm;
1095   union QueueContext qc;
1096
1097   if (NULL == cont)
1098     cont = &drop_status_cont;
1099   LOG (GNUNET_ERROR_TYPE_DEBUG,
1100        "Asked to release reserve %d\n",
1101        rid);
1102   env = GNUNET_MQ_msg (rrm,
1103                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1104   rrm->rid = htonl (rid);
1105   qc.sc.cont = cont;
1106   qc.sc.cont_cls = cont_cls;
1107   qe = make_queue_entry (h,
1108                          env,
1109                          queue_priority,
1110                          max_queue_size,
1111                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1112                          &qc);
1113   if (NULL == qe)
1114   {
1115     LOG (GNUNET_ERROR_TYPE_DEBUG,
1116          "Could not create queue entry to release reserve\n");
1117     return NULL;
1118   }
1119   GNUNET_STATISTICS_update (h->stats,
1120                             gettext_noop
1121                             ("# RELEASE RESERVE requests executed"), 1,
1122                             GNUNET_NO);
1123   process_queue (h);
1124   return qe;
1125 }
1126
1127
1128 /**
1129  * Update a value in the datastore.
1130  *
1131  * @param h handle to the datastore
1132  * @param uid identifier for the value
1133  * @param priority how much to increase the priority of the value
1134  * @param expiration new expiration value should be MAX of existing and this argument
1135  * @param queue_priority ranking of this request in the priority queue
1136  * @param max_queue_size at what queue size should this request be dropped
1137  *        (if other requests of higher priority are in the queue)
1138  * @param cont continuation to call when done
1139  * @param cont_cls closure for @a cont
1140  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141  *         cancel; note that even if NULL is returned, the callback will be invoked
1142  *         (or rather, will already have been invoked)
1143  */
1144 struct GNUNET_DATASTORE_QueueEntry *
1145 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1146                          uint64_t uid,
1147                          uint32_t priority,
1148                          struct GNUNET_TIME_Absolute expiration,
1149                          unsigned int queue_priority,
1150                          unsigned int max_queue_size,
1151                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1152                          void *cont_cls)
1153 {
1154   struct GNUNET_DATASTORE_QueueEntry *qe;
1155   struct GNUNET_MQ_Envelope *env;
1156   struct UpdateMessage *um;
1157   union QueueContext qc;
1158
1159   if (NULL == cont)
1160     cont = &drop_status_cont;
1161   LOG (GNUNET_ERROR_TYPE_DEBUG,
1162        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1163        uid,
1164        (unsigned int) priority,
1165        GNUNET_STRINGS_absolute_time_to_string (expiration));
1166   env = GNUNET_MQ_msg (um,
1167                        GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1168   um->priority = htonl (priority);
1169   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1170   um->uid = GNUNET_htonll (uid);
1171
1172   qc.sc.cont = cont;
1173   qc.sc.cont_cls = cont_cls;
1174   qe = make_queue_entry (h,
1175                          env,
1176                          queue_priority,
1177                          max_queue_size,
1178                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1179                          &qc);
1180   if (NULL == qe)
1181   {
1182     LOG (GNUNET_ERROR_TYPE_DEBUG,
1183          "Could not create queue entry for UPDATE\n");
1184     return NULL;
1185   }
1186   GNUNET_STATISTICS_update (h->stats,
1187                             gettext_noop ("# UPDATE requests executed"), 1,
1188                             GNUNET_NO);
1189   process_queue (h);
1190   return qe;
1191 }
1192
1193
1194 /**
1195  * Explicitly remove some content from the database.
1196  * The @a cont continuation will be called with `status`
1197  * #GNUNET_OK" if content was removed, #GNUNET_NO
1198  * if no matching entry was found and #GNUNET_SYSERR
1199  * on all other types of errors.
1200  *
1201  * @param h handle to the datastore
1202  * @param key key for the value
1203  * @param size number of bytes in data
1204  * @param data content stored
1205  * @param queue_priority ranking of this request in the priority queue
1206  * @param max_queue_size at what queue size should this request be dropped
1207  *        (if other requests of higher priority are in the queue)
1208  * @param cont continuation to call when done
1209  * @param cont_cls closure for @a cont
1210  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1211  *         cancel; note that even if NULL is returned, the callback will be invoked
1212  *         (or rather, will already have been invoked)
1213  */
1214 struct GNUNET_DATASTORE_QueueEntry *
1215 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1216                          const struct GNUNET_HashCode *key,
1217                          size_t size,
1218                          const void *data,
1219                          unsigned int queue_priority,
1220                          unsigned int max_queue_size,
1221                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1222                          void *cont_cls)
1223 {
1224   struct GNUNET_DATASTORE_QueueEntry *qe;
1225   struct DataMessage *dm;
1226   struct GNUNET_MQ_Envelope *env;
1227   union QueueContext qc;
1228
1229   if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1230   {
1231     GNUNET_break (0);
1232     return NULL;
1233   }
1234   if (NULL == cont)
1235     cont = &drop_status_cont;
1236   LOG (GNUNET_ERROR_TYPE_DEBUG,
1237        "Asked to remove %u bytes under key `%s'\n",
1238        size,
1239        GNUNET_h2s (key));
1240   env = GNUNET_MQ_msg_extra (dm,
1241                              size,
1242                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1243   dm->rid = htonl (0);
1244   dm->size = htonl (size);
1245   dm->type = htonl (0);
1246   dm->priority = htonl (0);
1247   dm->anonymity = htonl (0);
1248   dm->uid = GNUNET_htonll (0);
1249   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1250   dm->key = *key;
1251   GNUNET_memcpy (&dm[1],
1252           data,
1253           size);
1254
1255   qc.sc.cont = cont;
1256   qc.sc.cont_cls = cont_cls;
1257
1258   qe = make_queue_entry (h,
1259                          env,
1260                          queue_priority,
1261                          max_queue_size,
1262                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1263                          &qc);
1264   if (NULL == qe)
1265   {
1266     LOG (GNUNET_ERROR_TYPE_DEBUG,
1267          "Could not create queue entry for REMOVE\n");
1268     return NULL;
1269   }
1270   GNUNET_STATISTICS_update (h->stats,
1271                             gettext_noop ("# REMOVE requests executed"),
1272                             1,
1273                             GNUNET_NO);
1274   process_queue (h);
1275   return qe;
1276 }
1277
1278
1279
1280 /**
1281  * Get a random value from the datastore for content replication.
1282  * Returns a single, random value among those with the highest
1283  * replication score, lowering positive replication scores by one for
1284  * the chosen value (if only content with a replication score exists,
1285  * a random value is returned and replication scores are not changed).
1286  *
1287  * @param h handle to the datastore
1288  * @param queue_priority ranking of this request in the priority queue
1289  * @param max_queue_size at what queue size should this request be dropped
1290  *        (if other requests of higher priority are in the queue)
1291  * @param proc function to call on a random value; it
1292  *        will be called once with a value (if available)
1293  *        and always once with a value of NULL.
1294  * @param proc_cls closure for @a proc
1295  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1296  *         cancel
1297  */
1298 struct GNUNET_DATASTORE_QueueEntry *
1299 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1300                                       unsigned int queue_priority,
1301                                       unsigned int max_queue_size,
1302                                       GNUNET_DATASTORE_DatumProcessor proc,
1303                                       void *proc_cls)
1304 {
1305   struct GNUNET_DATASTORE_QueueEntry *qe;
1306   struct GNUNET_MQ_Envelope *env;
1307   struct GNUNET_MessageHeader *m;
1308   union QueueContext qc;
1309
1310   GNUNET_assert (NULL != proc);
1311   LOG (GNUNET_ERROR_TYPE_DEBUG,
1312        "Asked to get replication entry\n");
1313   env = GNUNET_MQ_msg (m,
1314                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1315   qc.rc.proc = proc;
1316   qc.rc.proc_cls = proc_cls;
1317   qe = make_queue_entry (h,
1318                          env,
1319                          queue_priority,
1320                          max_queue_size,
1321                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1322                          &qc);
1323   if (NULL == qe)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_DEBUG,
1326          "Could not create queue entry for GET REPLICATION\n");
1327     return NULL;
1328   }
1329   GNUNET_STATISTICS_update (h->stats,
1330                             gettext_noop
1331                             ("# GET REPLICATION requests executed"), 1,
1332                             GNUNET_NO);
1333   process_queue (h);
1334   return qe;
1335 }
1336
1337
1338 /**
1339  * Get a single zero-anonymity value from the datastore.
1340  *
1341  * @param h handle to the datastore
1342  * @param offset offset of the result (modulo num-results); set to
1343  *               a random 64-bit value initially; then increment by
1344  *               one each time; detect that all results have been found by uid
1345  *               being again the first uid ever returned.
1346  * @param queue_priority ranking of this request in the priority queue
1347  * @param max_queue_size at what queue size should this request be dropped
1348  *        (if other requests of higher priority are in the queue)
1349  * @param type allowed type for the operation (never zero)
1350  * @param proc function to call on a random value; it
1351  *        will be called once with a value (if available)
1352  *        or with NULL if none value exists.
1353  * @param proc_cls closure for @a proc
1354  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1355  *         cancel
1356  */
1357 struct GNUNET_DATASTORE_QueueEntry *
1358 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1359                                      uint64_t offset,
1360                                      unsigned int queue_priority,
1361                                      unsigned int max_queue_size,
1362                                      enum GNUNET_BLOCK_Type type,
1363                                      GNUNET_DATASTORE_DatumProcessor proc,
1364                                      void *proc_cls)
1365 {
1366   struct GNUNET_DATASTORE_QueueEntry *qe;
1367   struct GNUNET_MQ_Envelope *env;
1368   struct GetZeroAnonymityMessage *m;
1369   union QueueContext qc;
1370
1371   GNUNET_assert (NULL != proc);
1372   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1373   LOG (GNUNET_ERROR_TYPE_DEBUG,
1374        "Asked to get %llu-th zero-anonymity entry of type %d\n",
1375        (unsigned long long) offset,
1376        type);
1377   env = GNUNET_MQ_msg (m,
1378                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1379   m->type = htonl ((uint32_t) type);
1380   m->offset = GNUNET_htonll (offset);
1381   qc.rc.proc = proc;
1382   qc.rc.proc_cls = proc_cls;
1383   qe = make_queue_entry (h,
1384                          env,
1385                          queue_priority,
1386                          max_queue_size,
1387                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1388                          &qc);
1389   if (NULL == qe)
1390   {
1391     LOG (GNUNET_ERROR_TYPE_DEBUG,
1392          "Could not create queue entry for zero-anonymity procation\n");
1393     return NULL;
1394   }
1395   GNUNET_STATISTICS_update (h->stats,
1396                             gettext_noop
1397                             ("# GET ZERO ANONYMITY requests executed"), 1,
1398                             GNUNET_NO);
1399   process_queue (h);
1400   return qe;
1401 }
1402
1403
1404 /**
1405  * Get a result for a particular key from the datastore.  The processor
1406  * will only be called once.
1407  *
1408  * @param h handle to the datastore
1409  * @param offset offset of the result (modulo num-results); set to
1410  *               a random 64-bit value initially; then increment by
1411  *               one each time; detect that all results have been found by uid
1412  *               being again the first uid ever returned.
1413  * @param key maybe NULL (to match all entries)
1414  * @param type desired type, 0 for any
1415  * @param queue_priority ranking of this request in the priority queue
1416  * @param max_queue_size at what queue size should this request be dropped
1417  *        (if other requests of higher priority are in the queue)
1418  * @param proc function to call on each matching value;
1419  *        will be called once with a NULL value at the end
1420  * @param proc_cls closure for @a proc
1421  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1422  *         cancel
1423  */
1424 struct GNUNET_DATASTORE_QueueEntry *
1425 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1426                           uint64_t offset,
1427                           const struct GNUNET_HashCode *key,
1428                           enum GNUNET_BLOCK_Type type,
1429                           unsigned int queue_priority,
1430                           unsigned int max_queue_size,
1431                           GNUNET_DATASTORE_DatumProcessor proc,
1432                           void *proc_cls)
1433 {
1434   struct GNUNET_DATASTORE_QueueEntry *qe;
1435   struct GNUNET_MQ_Envelope *env;
1436   struct GetKeyMessage *gkm;
1437   struct GetMessage *gm;
1438   union QueueContext qc;
1439
1440   GNUNET_assert (NULL != proc);
1441   LOG (GNUNET_ERROR_TYPE_DEBUG,
1442        "Asked to look for data of type %u under key `%s'\n",
1443        (unsigned int) type,
1444        GNUNET_h2s (key));
1445   if (NULL == key)
1446   {
1447     env = GNUNET_MQ_msg (gm,
1448                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1449     gm->type = htonl (type);
1450     gm->offset = GNUNET_htonll (offset);
1451   }
1452   else
1453   {
1454     env = GNUNET_MQ_msg (gkm,
1455                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1456     gkm->type = htonl (type);
1457     gkm->offset = GNUNET_htonll (offset);
1458     gkm->key = *key;
1459   }
1460   qc.rc.proc = proc;
1461   qc.rc.proc_cls = proc_cls;
1462   qe = make_queue_entry (h,
1463                          env,
1464                          queue_priority,
1465                          max_queue_size,
1466                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1467                          &qc);
1468   if (NULL == qe)
1469   {
1470     LOG (GNUNET_ERROR_TYPE_DEBUG,
1471          "Could not queue request for `%s'\n",
1472          GNUNET_h2s (key));
1473     return NULL;
1474   }
1475 #if INSANE_STATISTICS
1476   GNUNET_STATISTICS_update (h->stats,
1477                             gettext_noop ("# GET requests executed"),
1478                             1,
1479                             GNUNET_NO);
1480 #endif
1481   process_queue (h);
1482   return qe;
1483 }
1484
1485
1486 /**
1487  * Cancel a datastore operation.  The final callback from the
1488  * operation must not have been done yet.
1489  *
1490  * @param qe operation to cancel
1491  */
1492 void
1493 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1494 {
1495   struct GNUNET_DATASTORE_Handle *h = qe->h;
1496
1497   LOG (GNUNET_ERROR_TYPE_DEBUG,
1498        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1499        qe,
1500        NULL == qe->env,
1501        h->queue_head == qe);
1502   if (NULL == qe->env)
1503   {
1504     free_queue_entry (qe);
1505     h->skip_next_messages++;
1506     return;
1507   }
1508   free_queue_entry (qe);
1509   process_queue (h);
1510 }
1511
1512
1513 /* end of datastore_api.c */