2856347595e3c9aa2442499478db4f302834f916
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for @e cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for @e proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98 /**
99  * Entry in our priority queue.
100  */
101 struct GNUNET_DATASTORE_QueueEntry
102 {
103
104   /**
105    * This is a linked list.
106    */
107   struct GNUNET_DATASTORE_QueueEntry *next;
108
109   /**
110    * This is a linked list.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *prev;
113
114   /**
115    * Handle to the master context.
116    */
117   struct GNUNET_DATASTORE_Handle *h;
118
119   /**
120    * Function to call after transmission of the request.
121    */
122   GNUNET_DATASTORE_ContinuationWithStatus cont;
123
124   /**
125    * Closure for @e cont.
126    */
127   void *cont_cls;
128
129   /**
130    * Context for the operation.
131    */
132   union QueueContext qc;
133
134   /**
135    * Envelope of the request to transmit, NULL after
136    * transmission.
137    */
138   struct GNUNET_MQ_Envelope *env;
139
140   /**
141    * Priority in the queue.
142    */
143   unsigned int priority;
144
145   /**
146    * Maximum allowed length of queue (otherwise
147    * this request should be discarded).
148    */
149   unsigned int max_queue;
150
151   /**
152    * Expected response type.
153    */
154   uint16_t response_type;
155
156 };
157
158
159 /**
160  * Handle to the datastore service.
161  */
162 struct GNUNET_DATASTORE_Handle
163 {
164
165   /**
166    * Our configuration.
167    */
168   const struct GNUNET_CONFIGURATION_Handle *cfg;
169
170   /**
171    * Current connection to the datastore service.
172    */
173   struct GNUNET_MQ_Handle *mq;
174
175   /**
176    * Handle for statistics.
177    */
178   struct GNUNET_STATISTICS_Handle *stats;
179
180   /**
181    * Current head of priority queue.
182    */
183   struct GNUNET_DATASTORE_QueueEntry *queue_head;
184
185   /**
186    * Current tail of priority queue.
187    */
188   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
189
190   /**
191    * Task for trying to reconnect.
192    */
193   struct GNUNET_SCHEDULER_Task *reconnect_task;
194
195   /**
196    * How quickly should we retry?  Used for exponential back-off on
197    * connect-errors.
198    */
199   struct GNUNET_TIME_Relative retry_time;
200
201   /**
202    * Number of entries in the queue.
203    */
204   unsigned int queue_size;
205
206   /**
207    * Number of results we're receiving for the current query
208    * after application stopped to care.  Used to determine when
209    * to reset the connection.
210    */
211   unsigned int result_count;
212
213   /**
214    * We should ignore the next message(s) from the service.
215    */
216   unsigned int skip_next_messages;
217
218 };
219
220
221 /**
222  * Try reconnecting to the datastore service.
223  *
224  * @param cls the `struct GNUNET_DATASTORE_Handle`
225  */
226 static void
227 try_reconnect (void *cls);
228
229
230 /**
231  * Disconnect from the service and then try reconnecting to the datastore service
232  * after some delay.
233  *
234  * @param h handle to datastore to disconnect and reconnect
235  */
236 static void
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238 {
239   if (NULL == h->mq)
240   {
241     GNUNET_break (0);
242     return;
243   }
244   GNUNET_MQ_destroy (h->mq);
245   h->mq = NULL;
246   h->skip_next_messages = 0;
247   h->reconnect_task
248     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249                                     &try_reconnect,
250                                     h);
251 }
252
253
254 /**
255  * Free a queue entry.  Removes the given entry from the
256  * queue and releases associated resources.  Does NOT
257  * call the callback.
258  *
259  * @param qe entry to free.
260  */
261 static void
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263 {
264   struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266   GNUNET_CONTAINER_DLL_remove (h->queue_head,
267                                h->queue_tail,
268                                qe);
269   h->queue_size--;
270   GNUNET_free (qe);
271 }
272
273
274 /**
275  * Handle error in sending drop request to datastore.
276  *
277  * @param cls closure with the datastore handle
278  * @param error error code
279  */
280 static void
281 mq_error_handler (void *cls,
282                   enum GNUNET_MQ_Error error)
283 {
284   struct GNUNET_DATASTORE_Handle *h = cls;
285   struct GNUNET_DATASTORE_QueueEntry *qe;
286
287   LOG (GNUNET_ERROR_TYPE_DEBUG,
288        "MQ error, reconnecting to DATASTORE\n");
289   do_disconnect (h);
290   qe = h->queue_head;
291   if ( (NULL != qe) &&
292        (NULL == qe->env) )
293   {
294     union QueueContext qc = qe->qc;
295     uint16_t rt = qe->response_type;
296
297     LOG (GNUNET_ERROR_TYPE_DEBUG,
298          "Failed to receive response from database.\n");
299     free_queue_entry (qe);
300     switch (rt)
301     {
302     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
303       if (NULL != qc.sc.cont)
304         qc.sc.cont (qc.sc.cont_cls,
305                     GNUNET_SYSERR,
306                     GNUNET_TIME_UNIT_ZERO_ABS,
307                     _("DATASTORE disconnected"));
308       break;
309     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
310       if (NULL != qc.rc.proc)
311         qc.rc.proc (qc.rc.proc_cls,
312                     NULL,
313                     0,
314                     NULL, 0, 0, 0,
315                     GNUNET_TIME_UNIT_ZERO_ABS,
316                     0);
317       break;
318     default:
319       GNUNET_break (0);
320     }
321   }
322 }
323
324
325 /**
326  * Connect to the datastore service.
327  *
328  * @param cfg configuration to use
329  * @return handle to use to access the service
330  */
331 struct GNUNET_DATASTORE_Handle *
332 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
333 {
334   struct GNUNET_DATASTORE_Handle *h;
335
336   LOG (GNUNET_ERROR_TYPE_DEBUG,
337        "Establishing DATASTORE connection!\n");
338   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
339   h->cfg = cfg;
340   try_reconnect (h);
341   if (NULL == h->mq)
342   {
343     GNUNET_free (h);
344     return NULL;
345   }
346   h->stats = GNUNET_STATISTICS_create ("datastore-api",
347                                        cfg);
348   return h;
349 }
350
351
352 /**
353  * Task used by to disconnect from the datastore after
354  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
355  *
356  * @param cls the datastore handle
357  */
358 static void
359 disconnect_after_drop (void *cls)
360 {
361   struct GNUNET_DATASTORE_Handle *h = cls;
362
363   LOG (GNUNET_ERROR_TYPE_DEBUG,
364        "Drop sent, disconnecting\n");
365   GNUNET_DATASTORE_disconnect (h,
366                                GNUNET_NO);
367 }
368
369
370 /**
371  * Handle error in sending drop request to datastore.
372  *
373  * @param cls closure with the datastore handle
374  * @param error error code
375  */
376 static void
377 disconnect_on_mq_error (void *cls,
378                         enum GNUNET_MQ_Error error)
379 {
380   struct GNUNET_DATASTORE_Handle *h = cls;
381
382   LOG (GNUNET_ERROR_TYPE_ERROR,
383        "Failed to ask datastore to drop tables\n");
384   GNUNET_DATASTORE_disconnect (h,
385                                GNUNET_NO);
386 }
387
388
389 /**
390  * Disconnect from the datastore service (and free
391  * associated resources).
392  *
393  * @param h handle to the datastore
394  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
395  */
396 void
397 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
398                              int drop)
399 {
400   struct GNUNET_DATASTORE_QueueEntry *qe;
401
402   LOG (GNUNET_ERROR_TYPE_DEBUG,
403        "Datastore disconnect\n");
404   if (NULL != h->mq)
405   {
406     GNUNET_MQ_destroy (h->mq);
407     h->mq = NULL;
408   }
409   if (NULL != h->reconnect_task)
410   {
411     GNUNET_SCHEDULER_cancel (h->reconnect_task);
412     h->reconnect_task = NULL;
413   }
414   while (NULL != (qe = h->queue_head))
415   {
416     switch (qe->response_type)
417     {
418     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
419       if (NULL != qe->qc.sc.cont)
420         qe->qc.sc.cont (qe->qc.sc.cont_cls,
421                         GNUNET_SYSERR,
422                         GNUNET_TIME_UNIT_ZERO_ABS,
423                         _("Disconnected from DATASTORE"));
424       break;
425     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
426       if (NULL != qe->qc.rc.proc)
427         qe->qc.rc.proc (qe->qc.rc.proc_cls,
428                         NULL,
429                         0,
430                         NULL, 0, 0, 0,
431                         GNUNET_TIME_UNIT_ZERO_ABS,
432                         0);
433       break;
434     default:
435       GNUNET_break (0);
436     }
437     free_queue_entry (qe);
438   }
439   if (GNUNET_YES == drop)
440   {
441     LOG (GNUNET_ERROR_TYPE_DEBUG,
442          "Re-connecting to issue DROP!\n");
443     GNUNET_assert (NULL == h->mq);
444     h->mq = GNUNET_CLIENT_connecT (h->cfg,
445                                    "datastore",
446                                    NULL,
447                                    &disconnect_on_mq_error,
448                                    h);
449     if (NULL != h->mq)
450     {
451       struct GNUNET_MessageHeader *hdr;
452       struct GNUNET_MQ_Envelope *env;
453
454       env = GNUNET_MQ_msg (hdr,
455                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
456       GNUNET_MQ_notify_sent (env,
457                              &disconnect_after_drop,
458                              h);
459       GNUNET_MQ_send (h->mq,
460                       env);
461       return;
462     }
463     GNUNET_break (0);
464   }
465   GNUNET_STATISTICS_destroy (h->stats,
466                              GNUNET_NO);
467   h->stats = NULL;
468   GNUNET_free (h);
469 }
470
471
472 /**
473  * Create a new entry for our priority queue (and possibly discard other entires if
474  * the queue is getting too long).
475  *
476  * @param h handle to the datastore
477  * @param env envelope with the message to queue
478  * @param queue_priority priority of the entry
479  * @param max_queue_size at what queue size should this request be dropped
480  *        (if other requests of higher priority are in the queue)
481  * @param expected_type which type of response do we expect,
482  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
483  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
484  * @param qc client context (NOT a closure for @a response_proc)
485  * @return NULL if the queue is full
486  */
487 static struct GNUNET_DATASTORE_QueueEntry *
488 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
489                   struct GNUNET_MQ_Envelope *env,
490                   unsigned int queue_priority,
491                   unsigned int max_queue_size,
492                   uint16_t expected_type,
493                   const union QueueContext *qc)
494 {
495   struct GNUNET_DATASTORE_QueueEntry *qe;
496   struct GNUNET_DATASTORE_QueueEntry *pos;
497   unsigned int c;
498
499   c = 0;
500   pos = h->queue_head;
501   while ( (NULL != pos) &&
502           (c < max_queue_size) &&
503           (pos->priority >= queue_priority) )
504   {
505     c++;
506     pos = pos->next;
507   }
508   if (c >= max_queue_size)
509   {
510     GNUNET_STATISTICS_update (h->stats,
511                               gettext_noop ("# queue overflows"),
512                               1,
513                               GNUNET_NO);
514     GNUNET_MQ_discard (env);
515     return NULL;
516   }
517   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
518   qe->h = h;
519   qe->env = env;
520   qe->response_type = expected_type;
521   qe->qc = *qc;
522   qe->priority = queue_priority;
523   qe->max_queue = max_queue_size;
524   if (NULL == pos)
525   {
526     /* append at the tail */
527     pos = h->queue_tail;
528   }
529   else
530   {
531     pos = pos->prev;
532     /* do not insert at HEAD if HEAD query was already
533      * transmitted and we are still receiving replies! */
534     if ( (NULL == pos) &&
535          (NULL == h->queue_head->env) )
536       pos = h->queue_head;
537   }
538   c++;
539 #if INSANE_STATISTICS
540   GNUNET_STATISTICS_update (h->stats,
541                             gettext_noop ("# queue entries created"),
542                             1,
543                             GNUNET_NO);
544 #endif
545   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
546                                      h->queue_tail,
547                                      pos,
548                                      qe);
549   h->queue_size++;
550   return qe;
551 }
552
553
554 /**
555  * Process entries in the queue (or do nothing if we are already
556  * doing so).
557  *
558  * @param h handle to the datastore
559  */
560 static void
561 process_queue (struct GNUNET_DATASTORE_Handle *h)
562 {
563   struct GNUNET_DATASTORE_QueueEntry *qe;
564
565   if (NULL == (qe = h->queue_head))
566   {
567     /* no entry in queue */
568     LOG (GNUNET_ERROR_TYPE_DEBUG,
569          "Queue empty\n");
570     return;
571   }
572   if (NULL == qe->env)
573   {
574     /* waiting for replies */
575     LOG (GNUNET_ERROR_TYPE_DEBUG,
576          "Head request already transmitted\n");
577     return;
578   }
579   if (NULL == h->mq)
580   {
581     /* waiting for reconnect */
582     LOG (GNUNET_ERROR_TYPE_DEBUG,
583          "Not connected\n");
584     return;
585   }
586   GNUNET_MQ_send (h->mq,
587                   qe->env);
588   qe->env = NULL;
589 }
590
591
592
593
594 /**
595  * Function called to check status message from the service.
596  *
597  * @param cls closure
598  * @param sm status message received
599  * @return #GNUNET_OK if the message is well-formed
600  */
601 static int
602 check_status (void *cls,
603               const struct StatusMessage *sm)
604 {
605   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
606   int32_t status = ntohl (sm->status);
607
608   if (msize > 0)
609   {
610     const char *emsg = (const char *) &sm[1];
611
612     if ('\0' != emsg[msize - 1])
613     {
614       GNUNET_break (0);
615       return GNUNET_SYSERR;
616     }
617   }
618   else if (GNUNET_SYSERR == status)
619   {
620     GNUNET_break (0);
621     return GNUNET_SYSERR;
622   }
623   return GNUNET_OK;
624 }
625
626
627 /**
628  * Function called to handle status message from the service.
629  *
630  * @param cls closure
631  * @param sm status message received
632  */
633 static void
634 handle_status (void *cls,
635                const struct StatusMessage *sm)
636 {
637   struct GNUNET_DATASTORE_Handle *h = cls;
638   struct GNUNET_DATASTORE_QueueEntry *qe;
639   struct StatusContext rc;
640   const char *emsg;
641   int32_t status = ntohl (sm->status);
642
643   if (h->skip_next_messages > 0)
644   {
645     h->skip_next_messages--;
646     process_queue (h);
647     return;
648   }
649   if (NULL == (qe = h->queue_head))
650   {
651     GNUNET_break (0);
652     do_disconnect (h);
653     return;
654   }
655   if (NULL != qe->env)
656   {
657     GNUNET_break (0);
658     do_disconnect (h);
659     return;
660   }
661   if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
662   {
663     GNUNET_break (0);
664     do_disconnect (h);
665     return;
666   }
667   rc = qe->qc.sc;
668   free_queue_entry (qe);
669   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
670     emsg = (const char *) &sm[1];
671   else
672     emsg = NULL;
673   LOG (GNUNET_ERROR_TYPE_DEBUG,
674        "Received status %d/%s\n",
675        (int) status,
676        emsg);
677   GNUNET_STATISTICS_update (h->stats,
678                             gettext_noop ("# status messages received"),
679                             1,
680                             GNUNET_NO);
681   h->retry_time = GNUNET_TIME_UNIT_ZERO;
682   process_queue (h);
683   if (NULL != rc.cont)
684     rc.cont (rc.cont_cls,
685              status,
686              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
687              emsg);
688 }
689
690
691 /**
692  * Check data message we received from the service.
693  *
694  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
695  * @param dm message received
696  */
697 static int
698 check_data (void *cls,
699             const struct DataMessage *dm)
700 {
701   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
702
703   if (msize != ntohl (dm->size))
704   {
705     GNUNET_break (0);
706     return GNUNET_SYSERR;
707   }
708   return GNUNET_OK;
709 }
710
711
712 /**
713  * Handle data message we got from the service.
714  *
715  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
716  * @param dm message received
717  */
718 static void
719 handle_data (void *cls,
720              const struct DataMessage *dm)
721 {
722   struct GNUNET_DATASTORE_Handle *h = cls;
723   struct GNUNET_DATASTORE_QueueEntry *qe;
724   struct ResultContext rc;
725
726   if (h->skip_next_messages > 0)
727   {
728     process_queue (h);
729     return;
730   }
731   qe = h->queue_head;
732   if (NULL == qe)
733   {
734     GNUNET_break (0);
735     do_disconnect (h);
736     return;
737   }
738   if (NULL != qe->env)
739   {
740     GNUNET_break (0);
741     do_disconnect (h);
742     return;
743   }
744   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
745   {
746     GNUNET_break (0);
747     do_disconnect (h);
748     return;
749   }
750 #if INSANE_STATISTICS
751   GNUNET_STATISTICS_update (h->stats,
752                             gettext_noop ("# Results received"),
753                             1,
754                             GNUNET_NO);
755 #endif
756   LOG (GNUNET_ERROR_TYPE_DEBUG,
757        "Received result %llu with type %u and size %u with key %s\n",
758        (unsigned long long) GNUNET_ntohll (dm->uid),
759        ntohl (dm->type),
760        ntohl (dm->size),
761        GNUNET_h2s (&dm->key));
762   rc = qe->qc.rc;
763   free_queue_entry (qe);
764   h->retry_time = GNUNET_TIME_UNIT_ZERO;
765   process_queue (h);
766   if (NULL != rc.proc)
767     rc.proc (rc.proc_cls,
768              &dm->key,
769              ntohl (dm->size),
770              &dm[1],
771              ntohl (dm->type),
772              ntohl (dm->priority),
773              ntohl (dm->anonymity),
774              GNUNET_TIME_absolute_ntoh (dm->expiration),
775              GNUNET_ntohll (dm->uid));
776 }
777
778
779 /**
780  * Type of a function to call when we receive a
781  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
782  *
783  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
784  * @param msg message received
785  */
786 static void
787 handle_data_end (void *cls,
788                  const struct GNUNET_MessageHeader *msg)
789 {
790   struct GNUNET_DATASTORE_Handle *h = cls;
791   struct GNUNET_DATASTORE_QueueEntry *qe;
792   struct ResultContext rc;
793
794   if (h->skip_next_messages > 0)
795   {
796     h->skip_next_messages--;
797     process_queue (h);
798     return;
799   }
800   qe = h->queue_head;
801   if (NULL == qe)
802   {
803     GNUNET_break (0);
804     do_disconnect (h);
805     return;
806   }
807   if (NULL != qe->env)
808   {
809     GNUNET_break (0);
810     do_disconnect (h);
811     return;
812   }
813   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
814   {
815     GNUNET_break (0);
816     do_disconnect (h);
817     return;
818   }
819   rc = qe->qc.rc;
820   free_queue_entry (qe);
821   LOG (GNUNET_ERROR_TYPE_DEBUG,
822        "Received end of result set, new queue size is %u\n",
823        h->queue_size);
824   h->retry_time = GNUNET_TIME_UNIT_ZERO;
825   h->result_count = 0;
826   process_queue (h);
827   /* signal end of iteration */
828   if (NULL != rc.proc)
829     rc.proc (rc.proc_cls,
830              NULL,
831              0,
832              NULL,
833              0,
834              0,
835              0,
836              GNUNET_TIME_UNIT_ZERO_ABS,
837              0);
838 }
839
840
841 /**
842  * Try reconnecting to the datastore service.
843  *
844  * @param cls the `struct GNUNET_DATASTORE_Handle`
845  */
846 static void
847 try_reconnect (void *cls)
848 {
849   GNUNET_MQ_hd_var_size (status,
850                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
851                          struct StatusMessage);
852   GNUNET_MQ_hd_var_size (data,
853                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
854                          struct DataMessage);
855   GNUNET_MQ_hd_fixed_size (data_end,
856                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
857                            struct GNUNET_MessageHeader);
858   struct GNUNET_DATASTORE_Handle *h = cls;
859   struct GNUNET_MQ_MessageHandler handlers[] = {
860     make_status_handler (h),
861     make_data_handler (h),
862     make_data_end_handler (h),
863     GNUNET_MQ_handler_end ()
864   };
865
866   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
867   h->reconnect_task = NULL;
868   GNUNET_assert (NULL == h->mq);
869   h->mq = GNUNET_CLIENT_connecT (h->cfg,
870                                  "datastore",
871                                  handlers,
872                                  &mq_error_handler,
873                                  h);
874   if (NULL == h->mq)
875     return;
876   GNUNET_STATISTICS_update (h->stats,
877                             gettext_noop ("# datastore connections (re)created"),
878                             1,
879                             GNUNET_NO);
880   LOG (GNUNET_ERROR_TYPE_DEBUG,
881        "Reconnected to DATASTORE\n");
882   process_queue (h);
883 }
884
885
886 /**
887  * Dummy continuation used to do nothing (but be non-zero).
888  *
889  * @param cls closure
890  * @param result result
891  * @param min_expiration expiration time
892  * @param emsg error message
893  */
894 static void
895 drop_status_cont (void *cls,
896                   int32_t result,
897                   struct GNUNET_TIME_Absolute min_expiration,
898                   const char *emsg)
899 {
900   /* do nothing */
901 }
902
903
904 /**
905  * Store an item in the datastore.  If the item is already present,
906  * the priorities are summed up and the higher expiration time and
907  * lower anonymity level is used.
908  *
909  * @param h handle to the datastore
910  * @param rid reservation ID to use (from "reserve"); use 0 if no
911  *            prior reservation was made
912  * @param key key for the value
913  * @param size number of bytes in data
914  * @param data content stored
915  * @param type type of the content
916  * @param priority priority of the content
917  * @param anonymity anonymity-level for the content
918  * @param replication how often should the content be replicated to other peers?
919  * @param expiration expiration time for the content
920  * @param queue_priority ranking of this request in the priority queue
921  * @param max_queue_size at what queue size should this request be dropped
922  *        (if other requests of higher priority are in the queue)
923  * @param cont continuation to call when done
924  * @param cont_cls closure for @a cont
925  * @return NULL if the entry was not queued, otherwise a handle that can be used to
926  *         cancel; note that even if NULL is returned, the callback will be invoked
927  *         (or rather, will already have been invoked)
928  */
929 struct GNUNET_DATASTORE_QueueEntry *
930 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
931                       uint32_t rid,
932                       const struct GNUNET_HashCode *key,
933                       size_t size,
934                       const void *data,
935                       enum GNUNET_BLOCK_Type type,
936                       uint32_t priority,
937                       uint32_t anonymity,
938                       uint32_t replication,
939                       struct GNUNET_TIME_Absolute expiration,
940                       unsigned int queue_priority,
941                       unsigned int max_queue_size,
942                       GNUNET_DATASTORE_ContinuationWithStatus cont,
943                       void *cont_cls)
944 {
945   struct GNUNET_DATASTORE_QueueEntry *qe;
946   struct GNUNET_MQ_Envelope *env;
947   struct DataMessage *dm;
948   union QueueContext qc;
949
950   if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
951   {
952     GNUNET_break (0);
953     return NULL;
954   }
955
956   LOG (GNUNET_ERROR_TYPE_DEBUG,
957        "Asked to put %u bytes of data under key `%s' for %s\n",
958        size,
959        GNUNET_h2s (key),
960        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
961                                                GNUNET_YES));
962   env = GNUNET_MQ_msg_extra (dm,
963                              size,
964                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
965   dm->rid = htonl (rid);
966   dm->size = htonl ((uint32_t) size);
967   dm->type = htonl (type);
968   dm->priority = htonl (priority);
969   dm->anonymity = htonl (anonymity);
970   dm->replication = htonl (replication);
971   dm->reserved = htonl (0);
972   dm->uid = GNUNET_htonll (0);
973   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
974   dm->key = *key;
975   memcpy (&dm[1],
976           data,
977           size);
978   qc.sc.cont = cont;
979   qc.sc.cont_cls = cont_cls;
980   qe = make_queue_entry (h,
981                          env,
982                          queue_priority,
983                          max_queue_size,
984                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
985                          &qc);
986   if (NULL == qe)
987   {
988     LOG (GNUNET_ERROR_TYPE_DEBUG,
989          "Could not create queue entry for PUT\n");
990     return NULL;
991   }
992   GNUNET_STATISTICS_update (h->stats,
993                             gettext_noop ("# PUT requests executed"),
994                             1,
995                             GNUNET_NO);
996   process_queue (h);
997   return qe;
998 }
999
1000
1001 /**
1002  * Reserve space in the datastore.  This function should be used
1003  * to avoid "out of space" failures during a longer sequence of "put"
1004  * operations (for example, when a file is being inserted).
1005  *
1006  * @param h handle to the datastore
1007  * @param amount how much space (in bytes) should be reserved (for content only)
1008  * @param entries how many entries will be created (to calculate per-entry overhead)
1009  * @param cont continuation to call when done; "success" will be set to
1010  *             a positive reservation value if space could be reserved.
1011  * @param cont_cls closure for @a cont
1012  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1013  *         cancel; note that even if NULL is returned, the callback will be invoked
1014  *         (or rather, will already have been invoked)
1015  */
1016 struct GNUNET_DATASTORE_QueueEntry *
1017 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1018                           uint64_t amount,
1019                           uint32_t entries,
1020                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1021                           void *cont_cls)
1022 {
1023   struct GNUNET_DATASTORE_QueueEntry *qe;
1024   struct GNUNET_MQ_Envelope *env;
1025   struct ReserveMessage *rm;
1026   union QueueContext qc;
1027
1028   if (NULL == cont)
1029     cont = &drop_status_cont;
1030   LOG (GNUNET_ERROR_TYPE_DEBUG,
1031        "Asked to reserve %llu bytes of data and %u entries\n",
1032        (unsigned long long) amount,
1033        (unsigned int) entries);
1034   env = GNUNET_MQ_msg (rm,
1035                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1036   rm->entries = htonl (entries);
1037   rm->amount = GNUNET_htonll (amount);
1038
1039   qc.sc.cont = cont;
1040   qc.sc.cont_cls = cont_cls;
1041   qe = make_queue_entry (h,
1042                          env,
1043                          UINT_MAX,
1044                          UINT_MAX,
1045                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1046                          &qc);
1047   if (NULL == qe)
1048   {
1049     LOG (GNUNET_ERROR_TYPE_DEBUG,
1050          "Could not create queue entry to reserve\n");
1051     return NULL;
1052   }
1053   GNUNET_STATISTICS_update (h->stats,
1054                             gettext_noop ("# RESERVE requests executed"),
1055                             1,
1056                             GNUNET_NO);
1057   process_queue (h);
1058   return qe;
1059 }
1060
1061
1062 /**
1063  * Signal that all of the data for which a reservation was made has
1064  * been stored and that whatever excess space might have been reserved
1065  * can now be released.
1066  *
1067  * @param h handle to the datastore
1068  * @param rid reservation ID (value of "success" in original continuation
1069  *        from the "reserve" function).
1070  * @param queue_priority ranking of this request in the priority queue
1071  * @param max_queue_size at what queue size should this request be dropped
1072  *        (if other requests of higher priority are in the queue)
1073  * @param queue_priority ranking of this request in the priority queue
1074  * @param max_queue_size at what queue size should this request be dropped
1075  *        (if other requests of higher priority are in the queue)
1076  * @param cont continuation to call when done
1077  * @param cont_cls closure for @a cont
1078  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1079  *         cancel; note that even if NULL is returned, the callback will be invoked
1080  *         (or rather, will already have been invoked)
1081  */
1082 struct GNUNET_DATASTORE_QueueEntry *
1083 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1084                                   uint32_t rid,
1085                                   unsigned int queue_priority,
1086                                   unsigned int max_queue_size,
1087                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1088                                   void *cont_cls)
1089 {
1090   struct GNUNET_DATASTORE_QueueEntry *qe;
1091   struct GNUNET_MQ_Envelope *env;
1092   struct ReleaseReserveMessage *rrm;
1093   union QueueContext qc;
1094
1095   if (NULL == cont)
1096     cont = &drop_status_cont;
1097   LOG (GNUNET_ERROR_TYPE_DEBUG,
1098        "Asked to release reserve %d\n",
1099        rid);
1100   env = GNUNET_MQ_msg (rrm,
1101                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1102   rrm->rid = htonl (rid);
1103   qc.sc.cont = cont;
1104   qc.sc.cont_cls = cont_cls;
1105   qe = make_queue_entry (h,
1106                          env,
1107                          queue_priority,
1108                          max_queue_size,
1109                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1110                          &qc);
1111   if (NULL == qe)
1112   {
1113     LOG (GNUNET_ERROR_TYPE_DEBUG,
1114          "Could not create queue entry to release reserve\n");
1115     return NULL;
1116   }
1117   GNUNET_STATISTICS_update (h->stats,
1118                             gettext_noop
1119                             ("# RELEASE RESERVE requests executed"), 1,
1120                             GNUNET_NO);
1121   process_queue (h);
1122   return qe;
1123 }
1124
1125
1126 /**
1127  * Update a value in the datastore.
1128  *
1129  * @param h handle to the datastore
1130  * @param uid identifier for the value
1131  * @param priority how much to increase the priority of the value
1132  * @param expiration new expiration value should be MAX of existing and this argument
1133  * @param queue_priority ranking of this request in the priority queue
1134  * @param max_queue_size at what queue size should this request be dropped
1135  *        (if other requests of higher priority are in the queue)
1136  * @param cont continuation to call when done
1137  * @param cont_cls closure for @a cont
1138  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1139  *         cancel; note that even if NULL is returned, the callback will be invoked
1140  *         (or rather, will already have been invoked)
1141  */
1142 struct GNUNET_DATASTORE_QueueEntry *
1143 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1144                          uint64_t uid,
1145                          uint32_t priority,
1146                          struct GNUNET_TIME_Absolute expiration,
1147                          unsigned int queue_priority,
1148                          unsigned int max_queue_size,
1149                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1150                          void *cont_cls)
1151 {
1152   struct GNUNET_DATASTORE_QueueEntry *qe;
1153   struct GNUNET_MQ_Envelope *env;
1154   struct UpdateMessage *um;
1155   union QueueContext qc;
1156
1157   if (NULL == cont)
1158     cont = &drop_status_cont;
1159   LOG (GNUNET_ERROR_TYPE_DEBUG,
1160        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1161        uid,
1162        (unsigned int) priority,
1163        GNUNET_STRINGS_absolute_time_to_string (expiration));
1164   env = GNUNET_MQ_msg (um,
1165                        GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1166   um->priority = htonl (priority);
1167   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1168   um->uid = GNUNET_htonll (uid);
1169
1170   qc.sc.cont = cont;
1171   qc.sc.cont_cls = cont_cls;
1172   qe = make_queue_entry (h,
1173                          env,
1174                          queue_priority,
1175                          max_queue_size,
1176                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1177                          &qc);
1178   if (NULL == qe)
1179   {
1180     LOG (GNUNET_ERROR_TYPE_DEBUG,
1181          "Could not create queue entry for UPDATE\n");
1182     return NULL;
1183   }
1184   GNUNET_STATISTICS_update (h->stats,
1185                             gettext_noop ("# UPDATE requests executed"), 1,
1186                             GNUNET_NO);
1187   process_queue (h);
1188   return qe;
1189 }
1190
1191
1192 /**
1193  * Explicitly remove some content from the database.
1194  * The @a cont continuation will be called with `status`
1195  * #GNUNET_OK" if content was removed, #GNUNET_NO
1196  * if no matching entry was found and #GNUNET_SYSERR
1197  * on all other types of errors.
1198  *
1199  * @param h handle to the datastore
1200  * @param key key for the value
1201  * @param size number of bytes in data
1202  * @param data content stored
1203  * @param queue_priority ranking of this request in the priority queue
1204  * @param max_queue_size at what queue size should this request be dropped
1205  *        (if other requests of higher priority are in the queue)
1206  * @param cont continuation to call when done
1207  * @param cont_cls closure for @a cont
1208  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1209  *         cancel; note that even if NULL is returned, the callback will be invoked
1210  *         (or rather, will already have been invoked)
1211  */
1212 struct GNUNET_DATASTORE_QueueEntry *
1213 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1214                          const struct GNUNET_HashCode *key,
1215                          size_t size,
1216                          const void *data,
1217                          unsigned int queue_priority,
1218                          unsigned int max_queue_size,
1219                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1220                          void *cont_cls)
1221 {
1222   struct GNUNET_DATASTORE_QueueEntry *qe;
1223   struct DataMessage *dm;
1224   struct GNUNET_MQ_Envelope *env;
1225   union QueueContext qc;
1226
1227   if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1228   {
1229     GNUNET_break (0);
1230     return NULL;
1231   }
1232   if (NULL == cont)
1233     cont = &drop_status_cont;
1234   LOG (GNUNET_ERROR_TYPE_DEBUG,
1235        "Asked to remove %u bytes under key `%s'\n",
1236        size,
1237        GNUNET_h2s (key));
1238   env = GNUNET_MQ_msg_extra (dm,
1239                              size,
1240                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1241   dm->rid = htonl (0);
1242   dm->size = htonl (size);
1243   dm->type = htonl (0);
1244   dm->priority = htonl (0);
1245   dm->anonymity = htonl (0);
1246   dm->uid = GNUNET_htonll (0);
1247   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1248   dm->key = *key;
1249   memcpy (&dm[1],
1250           data,
1251           size);
1252
1253   qc.sc.cont = cont;
1254   qc.sc.cont_cls = cont_cls;
1255
1256   qe = make_queue_entry (h,
1257                          env,
1258                          queue_priority,
1259                          max_queue_size,
1260                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1261                          &qc);
1262   if (NULL == qe)
1263   {
1264     LOG (GNUNET_ERROR_TYPE_DEBUG,
1265          "Could not create queue entry for REMOVE\n");
1266     return NULL;
1267   }
1268   GNUNET_STATISTICS_update (h->stats,
1269                             gettext_noop ("# REMOVE requests executed"),
1270                             1,
1271                             GNUNET_NO);
1272   process_queue (h);
1273   return qe;
1274 }
1275
1276
1277
1278 /**
1279  * Get a random value from the datastore for content replication.
1280  * Returns a single, random value among those with the highest
1281  * replication score, lowering positive replication scores by one for
1282  * the chosen value (if only content with a replication score exists,
1283  * a random value is returned and replication scores are not changed).
1284  *
1285  * @param h handle to the datastore
1286  * @param queue_priority ranking of this request in the priority queue
1287  * @param max_queue_size at what queue size should this request be dropped
1288  *        (if other requests of higher priority are in the queue)
1289  * @param proc function to call on a random value; it
1290  *        will be called once with a value (if available)
1291  *        and always once with a value of NULL.
1292  * @param proc_cls closure for @a proc
1293  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1294  *         cancel
1295  */
1296 struct GNUNET_DATASTORE_QueueEntry *
1297 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1298                                       unsigned int queue_priority,
1299                                       unsigned int max_queue_size,
1300                                       GNUNET_DATASTORE_DatumProcessor proc,
1301                                       void *proc_cls)
1302 {
1303   struct GNUNET_DATASTORE_QueueEntry *qe;
1304   struct GNUNET_MQ_Envelope *env;
1305   struct GNUNET_MessageHeader *m;
1306   union QueueContext qc;
1307
1308   GNUNET_assert (NULL != proc);
1309   LOG (GNUNET_ERROR_TYPE_DEBUG,
1310        "Asked to get replication entry\n");
1311   env = GNUNET_MQ_msg (m,
1312                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1313   qc.rc.proc = proc;
1314   qc.rc.proc_cls = proc_cls;
1315   qe = make_queue_entry (h,
1316                          env,
1317                          queue_priority,
1318                          max_queue_size,
1319                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1320                          &qc);
1321   if (NULL == qe)
1322   {
1323     LOG (GNUNET_ERROR_TYPE_DEBUG,
1324          "Could not create queue entry for GET REPLICATION\n");
1325     return NULL;
1326   }
1327   GNUNET_STATISTICS_update (h->stats,
1328                             gettext_noop
1329                             ("# GET REPLICATION requests executed"), 1,
1330                             GNUNET_NO);
1331   process_queue (h);
1332   return qe;
1333 }
1334
1335
1336 /**
1337  * Get a single zero-anonymity value from the datastore.
1338  *
1339  * @param h handle to the datastore
1340  * @param offset offset of the result (modulo num-results); set to
1341  *               a random 64-bit value initially; then increment by
1342  *               one each time; detect that all results have been found by uid
1343  *               being again the first uid ever returned.
1344  * @param queue_priority ranking of this request in the priority queue
1345  * @param max_queue_size at what queue size should this request be dropped
1346  *        (if other requests of higher priority are in the queue)
1347  * @param type allowed type for the operation (never zero)
1348  * @param proc function to call on a random value; it
1349  *        will be called once with a value (if available)
1350  *        or with NULL if none value exists.
1351  * @param proc_cls closure for @a proc
1352  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1353  *         cancel
1354  */
1355 struct GNUNET_DATASTORE_QueueEntry *
1356 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1357                                      uint64_t offset,
1358                                      unsigned int queue_priority,
1359                                      unsigned int max_queue_size,
1360                                      enum GNUNET_BLOCK_Type type,
1361                                      GNUNET_DATASTORE_DatumProcessor proc,
1362                                      void *proc_cls)
1363 {
1364   struct GNUNET_DATASTORE_QueueEntry *qe;
1365   struct GNUNET_MQ_Envelope *env;
1366   struct GetZeroAnonymityMessage *m;
1367   union QueueContext qc;
1368
1369   GNUNET_assert (NULL != proc);
1370   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1371   LOG (GNUNET_ERROR_TYPE_DEBUG,
1372        "Asked to get %llu-th zero-anonymity entry of type %d\n",
1373        (unsigned long long) offset,
1374        type);
1375   env = GNUNET_MQ_msg (m,
1376                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1377   m->type = htonl ((uint32_t) type);
1378   m->offset = GNUNET_htonll (offset);
1379   qc.rc.proc = proc;
1380   qc.rc.proc_cls = proc_cls;
1381   qe = make_queue_entry (h,
1382                          env,
1383                          queue_priority,
1384                          max_queue_size,
1385                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1386                          &qc);
1387   if (NULL == qe)
1388   {
1389     LOG (GNUNET_ERROR_TYPE_DEBUG,
1390          "Could not create queue entry for zero-anonymity procation\n");
1391     return NULL;
1392   }
1393   GNUNET_STATISTICS_update (h->stats,
1394                             gettext_noop
1395                             ("# GET ZERO ANONYMITY requests executed"), 1,
1396                             GNUNET_NO);
1397   process_queue (h);
1398   return qe;
1399 }
1400
1401
1402 /**
1403  * Get a result for a particular key from the datastore.  The processor
1404  * will only be called once.
1405  *
1406  * @param h handle to the datastore
1407  * @param offset offset of the result (modulo num-results); set to
1408  *               a random 64-bit value initially; then increment by
1409  *               one each time; detect that all results have been found by uid
1410  *               being again the first uid ever returned.
1411  * @param key maybe NULL (to match all entries)
1412  * @param type desired type, 0 for any
1413  * @param queue_priority ranking of this request in the priority queue
1414  * @param max_queue_size at what queue size should this request be dropped
1415  *        (if other requests of higher priority are in the queue)
1416  * @param proc function to call on each matching value;
1417  *        will be called once with a NULL value at the end
1418  * @param proc_cls closure for @a proc
1419  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1420  *         cancel
1421  */
1422 struct GNUNET_DATASTORE_QueueEntry *
1423 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1424                           uint64_t offset,
1425                           const struct GNUNET_HashCode *key,
1426                           enum GNUNET_BLOCK_Type type,
1427                           unsigned int queue_priority,
1428                           unsigned int max_queue_size,
1429                           GNUNET_DATASTORE_DatumProcessor proc,
1430                           void *proc_cls)
1431 {
1432   struct GNUNET_DATASTORE_QueueEntry *qe;
1433   struct GNUNET_MQ_Envelope *env;
1434   struct GetKeyMessage *gkm;
1435   struct GetMessage *gm;
1436   union QueueContext qc;
1437
1438   GNUNET_assert (NULL != proc);
1439   LOG (GNUNET_ERROR_TYPE_DEBUG,
1440        "Asked to look for data of type %u under key `%s'\n",
1441        (unsigned int) type,
1442        GNUNET_h2s (key));
1443   if (NULL == key)
1444   {
1445     env = GNUNET_MQ_msg (gm,
1446                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1447     gm->type = htonl (type);
1448     gm->offset = GNUNET_htonll (offset);
1449   }
1450   else
1451   {
1452     env = GNUNET_MQ_msg (gkm,
1453                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1454     gkm->type = htonl (type);
1455     gkm->offset = GNUNET_htonll (offset);
1456     gkm->key = *key;
1457   }
1458   qc.rc.proc = proc;
1459   qc.rc.proc_cls = proc_cls;
1460   qe = make_queue_entry (h,
1461                          env,
1462                          queue_priority,
1463                          max_queue_size,
1464                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1465                          &qc);
1466   if (NULL == qe)
1467   {
1468     LOG (GNUNET_ERROR_TYPE_DEBUG,
1469          "Could not queue request for `%s'\n",
1470          GNUNET_h2s (key));
1471     return NULL;
1472   }
1473 #if INSANE_STATISTICS
1474   GNUNET_STATISTICS_update (h->stats,
1475                             gettext_noop ("# GET requests executed"),
1476                             1,
1477                             GNUNET_NO);
1478 #endif
1479   process_queue (h);
1480   return qe;
1481 }
1482
1483
1484 /**
1485  * Cancel a datastore operation.  The final callback from the
1486  * operation must not have been done yet.
1487  *
1488  * @param qe operation to cancel
1489  */
1490 void
1491 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1492 {
1493   struct GNUNET_DATASTORE_Handle *h = qe->h;
1494
1495   LOG (GNUNET_ERROR_TYPE_DEBUG,
1496        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1497        qe,
1498        NULL == qe->env,
1499        h->queue_head == qe);
1500   if (NULL == qe->env)
1501   {
1502     free_queue_entry (qe);
1503     h->skip_next_messages++;
1504     return;
1505   }
1506   free_queue_entry (qe);
1507   process_queue (h);
1508 }
1509
1510
1511 /* end of datastore_api.c */