guix-env: some update.
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38 /**
39  * Collect an instane number of statistics?  May cause excessive IPC.
40  */
41 #define INSANE_STATISTICS GNUNET_NO
42
43 /**
44  * If a client stopped asking for more results, how many more do
45  * we receive from the DB before killing the connection?  Trade-off
46  * between re-doing TCP handshakes and (needlessly) receiving
47  * useless results.
48  */
49 #define MAX_EXCESS_RESULTS 8
50
51 /**
52  * Context for processing status messages.
53  */
54 struct StatusContext
55 {
56   /**
57    * Continuation to call with the status.
58    */
59   GNUNET_DATASTORE_ContinuationWithStatus cont;
60
61   /**
62    * Closure for @e cont.
63    */
64   void *cont_cls;
65
66 };
67
68
69 /**
70  * Context for processing result messages.
71  */
72 struct ResultContext
73 {
74   /**
75    * Function to call with the result.
76    */
77   GNUNET_DATASTORE_DatumProcessor proc;
78
79   /**
80    * Closure for @e proc.
81    */
82   void *proc_cls;
83
84 };
85
86
87 /**
88  *  Context for a queue operation.
89  */
90 union QueueContext
91 {
92
93   struct StatusContext sc;
94
95   struct ResultContext rc;
96
97 };
98
99
100 /**
101  * Entry in our priority queue.
102  */
103 struct GNUNET_DATASTORE_QueueEntry
104 {
105
106   /**
107    * This is a linked list.
108    */
109   struct GNUNET_DATASTORE_QueueEntry *next;
110
111   /**
112    * This is a linked list.
113    */
114   struct GNUNET_DATASTORE_QueueEntry *prev;
115
116   /**
117    * Handle to the master context.
118    */
119   struct GNUNET_DATASTORE_Handle *h;
120
121   /**
122    * Function to call after transmission of the request.
123    */
124   GNUNET_DATASTORE_ContinuationWithStatus cont;
125
126   /**
127    * Closure for @e cont.
128    */
129   void *cont_cls;
130
131   /**
132    * Context for the operation.
133    */
134   union QueueContext qc;
135
136   /**
137    * Envelope of the request to transmit, NULL after
138    * transmission.
139    */
140   struct GNUNET_MQ_Envelope *env;
141
142   /**
143    * Task we run if this entry stalls the queue and we
144    * need to warn the user.
145    */
146   struct GNUNET_SCHEDULER_Task *delay_warn_task;
147
148   /**
149    * Priority in the queue.
150    */
151   unsigned int priority;
152
153   /**
154    * Maximum allowed length of queue (otherwise
155    * this request should be discarded).
156    */
157   unsigned int max_queue;
158
159   /**
160    * Expected response type.
161    */
162   uint16_t response_type;
163
164 };
165
166
167 /**
168  * Handle to the datastore service.
169  */
170 struct GNUNET_DATASTORE_Handle
171 {
172
173   /**
174    * Our configuration.
175    */
176   const struct GNUNET_CONFIGURATION_Handle *cfg;
177
178   /**
179    * Current connection to the datastore service.
180    */
181   struct GNUNET_MQ_Handle *mq;
182
183   /**
184    * Handle for statistics.
185    */
186   struct GNUNET_STATISTICS_Handle *stats;
187
188   /**
189    * Current head of priority queue.
190    */
191   struct GNUNET_DATASTORE_QueueEntry *queue_head;
192
193   /**
194    * Current tail of priority queue.
195    */
196   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
197
198   /**
199    * Task for trying to reconnect.
200    */
201   struct GNUNET_SCHEDULER_Task *reconnect_task;
202
203   /**
204    * How quickly should we retry?  Used for exponential back-off on
205    * connect-errors.
206    */
207   struct GNUNET_TIME_Relative retry_time;
208
209   /**
210    * Number of entries in the queue.
211    */
212   unsigned int queue_size;
213
214   /**
215    * Number of results we're receiving for the current query
216    * after application stopped to care.  Used to determine when
217    * to reset the connection.
218    */
219   unsigned int result_count;
220
221   /**
222    * We should ignore the next message(s) from the service.
223    */
224   unsigned int skip_next_messages;
225
226 };
227
228
229 /**
230  * Try reconnecting to the datastore service.
231  *
232  * @param cls the `struct GNUNET_DATASTORE_Handle`
233  */
234 static void
235 try_reconnect (void *cls);
236
237
238 /**
239  * Disconnect from the service and then try reconnecting to the datastore service
240  * after some delay.
241  *
242  * @param h handle to datastore to disconnect and reconnect
243  */
244 static void
245 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
246 {
247   if (NULL == h->mq)
248   {
249     GNUNET_break (0);
250     return;
251   }
252   GNUNET_MQ_destroy (h->mq);
253   h->mq = NULL;
254   h->skip_next_messages = 0;
255   h->reconnect_task
256     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
257                                     &try_reconnect,
258                                     h);
259 }
260
261
262 /**
263  * Free a queue entry.  Removes the given entry from the
264  * queue and releases associated resources.  Does NOT
265  * call the callback.
266  *
267  * @param qe entry to free.
268  */
269 static void
270 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
271 {
272   struct GNUNET_DATASTORE_Handle *h = qe->h;
273
274   GNUNET_CONTAINER_DLL_remove (h->queue_head,
275                                h->queue_tail,
276                                qe);
277   h->queue_size--;
278   if (NULL != qe->env)
279     GNUNET_MQ_discard (qe->env);
280   if (NULL != qe->delay_warn_task)
281     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
282   GNUNET_free (qe);
283 }
284
285
286 /**
287  * Task that logs an error after some time.
288  *
289  * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
290  */
291 static void
292 delay_warning (void *cls)
293 {
294   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
295
296   qe->delay_warn_task = NULL;
297   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
298               "Request %p of type %u at head of datastore queue for more than %s\n",
299               qe,
300               (unsigned int) qe->response_type,
301               GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
302                                                       GNUNET_YES));
303   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
304                                                       &delay_warning,
305                                                       qe);
306 }
307
308
309 /**
310  * Handle error in sending drop request to datastore.
311  *
312  * @param cls closure with the datastore handle
313  * @param error error code
314  */
315 static void
316 mq_error_handler (void *cls,
317                   enum GNUNET_MQ_Error error)
318 {
319   struct GNUNET_DATASTORE_Handle *h = cls;
320   struct GNUNET_DATASTORE_QueueEntry *qe;
321
322   LOG (GNUNET_ERROR_TYPE_DEBUG,
323        "MQ error, reconnecting to DATASTORE\n");
324   do_disconnect (h);
325   qe = h->queue_head;
326   if (NULL == qe)
327     return;
328   if (NULL != qe->delay_warn_task)
329   {
330     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
331     qe->delay_warn_task = NULL;
332   }
333   if (NULL == qe->env)
334   {
335     union QueueContext qc = qe->qc;
336     uint16_t rt = qe->response_type;
337
338     LOG (GNUNET_ERROR_TYPE_DEBUG,
339          "Failed to receive response from database.\n");
340     free_queue_entry (qe);
341     switch (rt)
342     {
343     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
344       if (NULL != qc.sc.cont)
345         qc.sc.cont (qc.sc.cont_cls,
346                     GNUNET_SYSERR,
347                     GNUNET_TIME_UNIT_ZERO_ABS,
348                     _("DATASTORE disconnected"));
349       break;
350     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
351       if (NULL != qc.rc.proc)
352         qc.rc.proc (qc.rc.proc_cls,
353                     NULL,
354                     0,
355                     NULL,
356                     0,
357                     0,
358                     0,
359                     0,
360                     GNUNET_TIME_UNIT_ZERO_ABS,
361                     0);
362       break;
363     default:
364       GNUNET_break (0);
365     }
366   }
367 }
368
369
370 /**
371  * Connect to the datastore service.
372  *
373  * @param cfg configuration to use
374  * @return handle to use to access the service
375  */
376 struct GNUNET_DATASTORE_Handle *
377 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
378 {
379   struct GNUNET_DATASTORE_Handle *h;
380
381   LOG (GNUNET_ERROR_TYPE_DEBUG,
382        "Establishing DATASTORE connection!\n");
383   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
384   h->cfg = cfg;
385   try_reconnect (h);
386   if (NULL == h->mq)
387   {
388     GNUNET_free (h);
389     return NULL;
390   }
391   h->stats = GNUNET_STATISTICS_create ("datastore-api",
392                                        cfg);
393   return h;
394 }
395
396
397 /**
398  * Task used by to disconnect from the datastore after
399  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
400  *
401  * @param cls the datastore handle
402  */
403 static void
404 disconnect_after_drop (void *cls)
405 {
406   struct GNUNET_DATASTORE_Handle *h = cls;
407
408   LOG (GNUNET_ERROR_TYPE_DEBUG,
409        "Drop sent, disconnecting\n");
410   GNUNET_DATASTORE_disconnect (h,
411                                GNUNET_NO);
412 }
413
414
415 /**
416  * Handle error in sending drop request to datastore.
417  *
418  * @param cls closure with the datastore handle
419  * @param error error code
420  */
421 static void
422 disconnect_on_mq_error (void *cls,
423                         enum GNUNET_MQ_Error error)
424 {
425   struct GNUNET_DATASTORE_Handle *h = cls;
426
427   LOG (GNUNET_ERROR_TYPE_ERROR,
428        "Failed to ask datastore to drop tables\n");
429   GNUNET_DATASTORE_disconnect (h,
430                                GNUNET_NO);
431 }
432
433
434 /**
435  * Disconnect from the datastore service (and free
436  * associated resources).
437  *
438  * @param h handle to the datastore
439  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
440  */
441 void
442 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
443                              int drop)
444 {
445   struct GNUNET_DATASTORE_QueueEntry *qe;
446
447   LOG (GNUNET_ERROR_TYPE_DEBUG,
448        "Datastore disconnect\n");
449   if (NULL != h->mq)
450   {
451     GNUNET_MQ_destroy (h->mq);
452     h->mq = NULL;
453   }
454   if (NULL != h->reconnect_task)
455   {
456     GNUNET_SCHEDULER_cancel (h->reconnect_task);
457     h->reconnect_task = NULL;
458   }
459   while (NULL != (qe = h->queue_head))
460   {
461     switch (qe->response_type)
462     {
463     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
464       if (NULL != qe->qc.sc.cont)
465         qe->qc.sc.cont (qe->qc.sc.cont_cls,
466                         GNUNET_SYSERR,
467                         GNUNET_TIME_UNIT_ZERO_ABS,
468                         _("Disconnected from DATASTORE"));
469       break;
470     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
471       if (NULL != qe->qc.rc.proc)
472         qe->qc.rc.proc (qe->qc.rc.proc_cls,
473                         NULL,
474                         0,
475                         NULL,
476                         0,
477                         0,
478                         0,
479                         0,
480                         GNUNET_TIME_UNIT_ZERO_ABS,
481                         0);
482       break;
483     default:
484       GNUNET_break (0);
485     }
486     free_queue_entry (qe);
487   }
488   if (GNUNET_YES == drop)
489   {
490     LOG (GNUNET_ERROR_TYPE_DEBUG,
491          "Re-connecting to issue DROP!\n");
492     GNUNET_assert (NULL == h->mq);
493     h->mq = GNUNET_CLIENT_connect (h->cfg,
494                                    "datastore",
495                                    NULL,
496                                    &disconnect_on_mq_error,
497                                    h);
498     if (NULL != h->mq)
499     {
500       struct GNUNET_MessageHeader *hdr;
501       struct GNUNET_MQ_Envelope *env;
502
503       env = GNUNET_MQ_msg (hdr,
504                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
505       GNUNET_MQ_notify_sent (env,
506                              &disconnect_after_drop,
507                              h);
508       GNUNET_MQ_send (h->mq,
509                       env);
510       return;
511     }
512     GNUNET_break (0);
513   }
514   GNUNET_STATISTICS_destroy (h->stats,
515                              GNUNET_NO);
516   h->stats = NULL;
517   GNUNET_free (h);
518 }
519
520
521 /**
522  * Create a new entry for our priority queue (and possibly discard other entires if
523  * the queue is getting too long).
524  *
525  * @param h handle to the datastore
526  * @param env envelope with the message to queue
527  * @param queue_priority priority of the entry
528  * @param max_queue_size at what queue size should this request be dropped
529  *        (if other requests of higher priority are in the queue)
530  * @param expected_type which type of response do we expect,
531  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
532  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
533  * @param qc client context (NOT a closure for @a response_proc)
534  * @return NULL if the queue is full
535  */
536 static struct GNUNET_DATASTORE_QueueEntry *
537 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
538                   struct GNUNET_MQ_Envelope *env,
539                   unsigned int queue_priority,
540                   unsigned int max_queue_size,
541                   uint16_t expected_type,
542                   const union QueueContext *qc)
543 {
544   struct GNUNET_DATASTORE_QueueEntry *qe;
545   struct GNUNET_DATASTORE_QueueEntry *pos;
546   unsigned int c;
547
548   if ( (NULL != h->queue_tail) &&
549        (h->queue_tail->priority >= queue_priority) )
550   {
551     c = h->queue_size;
552     pos = NULL;
553   }
554   else
555   {
556     c = 0;
557     pos = h->queue_head;
558   }
559   while ( (NULL != pos) &&
560           (c < max_queue_size) &&
561           (pos->priority >= queue_priority) )
562   {
563     c++;
564     pos = pos->next;
565   }
566   if (c >= max_queue_size)
567   {
568     GNUNET_STATISTICS_update (h->stats,
569                               gettext_noop ("# queue overflows"),
570                               1,
571                               GNUNET_NO);
572     GNUNET_MQ_discard (env);
573     return NULL;
574   }
575   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
576   qe->h = h;
577   qe->env = env;
578   qe->response_type = expected_type;
579   qe->qc = *qc;
580   qe->priority = queue_priority;
581   qe->max_queue = max_queue_size;
582   if (NULL == pos)
583   {
584     /* append at the tail */
585     pos = h->queue_tail;
586   }
587   else
588   {
589     pos = pos->prev;
590     /* do not insert at HEAD if HEAD query was already
591      * transmitted and we are still receiving replies! */
592     if ( (NULL == pos) &&
593          (NULL == h->queue_head->env) )
594       pos = h->queue_head;
595   }
596   c++;
597 #if INSANE_STATISTICS
598   GNUNET_STATISTICS_update (h->stats,
599                             gettext_noop ("# queue entries created"),
600                             1,
601                             GNUNET_NO);
602 #endif
603   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
604                                      h->queue_tail,
605                                      pos,
606                                      qe);
607   h->queue_size++;
608   return qe;
609 }
610
611
612 /**
613  * Process entries in the queue (or do nothing if we are already
614  * doing so).
615  *
616  * @param h handle to the datastore
617  */
618 static void
619 process_queue (struct GNUNET_DATASTORE_Handle *h)
620 {
621   struct GNUNET_DATASTORE_QueueEntry *qe;
622
623   if (NULL == (qe = h->queue_head))
624   {
625     /* no entry in queue */
626     LOG (GNUNET_ERROR_TYPE_DEBUG,
627          "Queue empty\n");
628     return;
629   }
630   if (NULL == qe->env)
631   {
632     /* waiting for replies */
633     LOG (GNUNET_ERROR_TYPE_DEBUG,
634          "Head request already transmitted\n");
635     return;
636   }
637   if (NULL == h->mq)
638   {
639     /* waiting for reconnect */
640     LOG (GNUNET_ERROR_TYPE_DEBUG,
641          "Not connected\n");
642     return;
643   }
644   GNUNET_assert (NULL == qe->delay_warn_task);
645   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
646                                                       &delay_warning,
647                                                       qe);
648   GNUNET_MQ_send (h->mq,
649                   qe->env);
650   qe->env = NULL;
651 }
652
653
654
655
656 /**
657  * Function called to check status message from the service.
658  *
659  * @param cls closure
660  * @param sm status message received
661  * @return #GNUNET_OK if the message is well-formed
662  */
663 static int
664 check_status (void *cls,
665               const struct StatusMessage *sm)
666 {
667   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
668   int32_t status = ntohl (sm->status);
669
670   if (msize > 0)
671   {
672     const char *emsg = (const char *) &sm[1];
673
674     if ('\0' != emsg[msize - 1])
675     {
676       GNUNET_break (0);
677       return GNUNET_SYSERR;
678     }
679   }
680   else if (GNUNET_SYSERR == status)
681   {
682     GNUNET_break (0);
683     return GNUNET_SYSERR;
684   }
685   return GNUNET_OK;
686 }
687
688
689 /**
690  * Function called to handle status message from the service.
691  *
692  * @param cls closure
693  * @param sm status message received
694  */
695 static void
696 handle_status (void *cls,
697                const struct StatusMessage *sm)
698 {
699   struct GNUNET_DATASTORE_Handle *h = cls;
700   struct GNUNET_DATASTORE_QueueEntry *qe;
701   struct StatusContext rc;
702   const char *emsg;
703   int32_t status = ntohl (sm->status);
704
705   if (h->skip_next_messages > 0)
706   {
707     h->skip_next_messages--;
708     process_queue (h);
709     return;
710   }
711   if (NULL == (qe = h->queue_head))
712   {
713     GNUNET_break (0);
714     do_disconnect (h);
715     return;
716   }
717   if (NULL != qe->env)
718   {
719     GNUNET_break (0);
720     do_disconnect (h);
721     return;
722   }
723   if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
724   {
725     GNUNET_break (0);
726     do_disconnect (h);
727     return;
728   }
729   rc = qe->qc.sc;
730   free_queue_entry (qe);
731   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
732     emsg = (const char *) &sm[1];
733   else
734     emsg = NULL;
735   LOG (GNUNET_ERROR_TYPE_DEBUG,
736        "Received status %d/%s\n",
737        (int) status,
738        emsg);
739   GNUNET_STATISTICS_update (h->stats,
740                             gettext_noop ("# status messages received"),
741                             1,
742                             GNUNET_NO);
743   h->retry_time = GNUNET_TIME_UNIT_ZERO;
744   process_queue (h);
745   if (NULL != rc.cont)
746     rc.cont (rc.cont_cls,
747              status,
748              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
749              emsg);
750 }
751
752
753 /**
754  * Check data message we received from the service.
755  *
756  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
757  * @param dm message received
758  */
759 static int
760 check_data (void *cls,
761             const struct DataMessage *dm)
762 {
763   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
764
765   if (msize != ntohl (dm->size))
766   {
767     GNUNET_break (0);
768     return GNUNET_SYSERR;
769   }
770   return GNUNET_OK;
771 }
772
773
774 /**
775  * Handle data message we got from the service.
776  *
777  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
778  * @param dm message received
779  */
780 static void
781 handle_data (void *cls,
782              const struct DataMessage *dm)
783 {
784   struct GNUNET_DATASTORE_Handle *h = cls;
785   struct GNUNET_DATASTORE_QueueEntry *qe;
786   struct ResultContext rc;
787
788   if (h->skip_next_messages > 0)
789   {
790     process_queue (h);
791     return;
792   }
793   qe = h->queue_head;
794   if (NULL == qe)
795   {
796     GNUNET_break (0);
797     do_disconnect (h);
798     return;
799   }
800   if (NULL != qe->env)
801   {
802     GNUNET_break (0);
803     do_disconnect (h);
804     return;
805   }
806   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
807   {
808     GNUNET_break (0);
809     do_disconnect (h);
810     return;
811   }
812 #if INSANE_STATISTICS
813   GNUNET_STATISTICS_update (h->stats,
814                             gettext_noop ("# Results received"),
815                             1,
816                             GNUNET_NO);
817 #endif
818   LOG (GNUNET_ERROR_TYPE_DEBUG,
819        "Received result %llu with type %u and size %u with key %s\n",
820        (unsigned long long) GNUNET_ntohll (dm->uid),
821        ntohl (dm->type),
822        ntohl (dm->size),
823        GNUNET_h2s (&dm->key));
824   rc = qe->qc.rc;
825   free_queue_entry (qe);
826   h->retry_time = GNUNET_TIME_UNIT_ZERO;
827   process_queue (h);
828   if (NULL != rc.proc)
829     rc.proc (rc.proc_cls,
830              &dm->key,
831              ntohl (dm->size),
832              &dm[1],
833              ntohl (dm->type),
834              ntohl (dm->priority),
835              ntohl (dm->anonymity),
836              ntohl (dm->replication),
837              GNUNET_TIME_absolute_ntoh (dm->expiration),
838              GNUNET_ntohll (dm->uid));
839 }
840
841
842 /**
843  * Type of a function to call when we receive a
844  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
845  *
846  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
847  * @param msg message received
848  */
849 static void
850 handle_data_end (void *cls,
851                  const struct GNUNET_MessageHeader *msg)
852 {
853   struct GNUNET_DATASTORE_Handle *h = cls;
854   struct GNUNET_DATASTORE_QueueEntry *qe;
855   struct ResultContext rc;
856
857   if (h->skip_next_messages > 0)
858   {
859     h->skip_next_messages--;
860     process_queue (h);
861     return;
862   }
863   qe = h->queue_head;
864   if (NULL == qe)
865   {
866     GNUNET_break (0);
867     do_disconnect (h);
868     return;
869   }
870   if (NULL != qe->env)
871   {
872     GNUNET_break (0);
873     do_disconnect (h);
874     return;
875   }
876   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
877   {
878     GNUNET_break (0);
879     do_disconnect (h);
880     return;
881   }
882   rc = qe->qc.rc;
883   free_queue_entry (qe);
884   LOG (GNUNET_ERROR_TYPE_DEBUG,
885        "Received end of result set, new queue size is %u\n",
886        h->queue_size);
887   h->retry_time = GNUNET_TIME_UNIT_ZERO;
888   h->result_count = 0;
889   process_queue (h);
890   /* signal end of iteration */
891   if (NULL != rc.proc)
892     rc.proc (rc.proc_cls,
893              NULL,
894              0,
895              NULL,
896              0,
897              0,
898              0,
899              0,
900              GNUNET_TIME_UNIT_ZERO_ABS,
901              0);
902 }
903
904
905 /**
906  * Try reconnecting to the datastore service.
907  *
908  * @param cls the `struct GNUNET_DATASTORE_Handle`
909  */
910 static void
911 try_reconnect (void *cls)
912 {
913   struct GNUNET_DATASTORE_Handle *h = cls;
914   struct GNUNET_MQ_MessageHandler handlers[] = {
915     GNUNET_MQ_hd_var_size (status,
916                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
917                            struct StatusMessage,
918                            h),
919     GNUNET_MQ_hd_var_size (data,
920                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
921                            struct DataMessage,
922                            h),
923     GNUNET_MQ_hd_fixed_size (data_end,
924                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
925                              struct GNUNET_MessageHeader,
926                              h),
927     GNUNET_MQ_handler_end ()
928   };
929
930   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
931   h->reconnect_task = NULL;
932   GNUNET_assert (NULL == h->mq);
933   h->mq = GNUNET_CLIENT_connect (h->cfg,
934                                  "datastore",
935                                  handlers,
936                                  &mq_error_handler,
937                                  h);
938   if (NULL == h->mq)
939     return;
940   GNUNET_STATISTICS_update (h->stats,
941                             gettext_noop ("# datastore connections (re)created"),
942                             1,
943                             GNUNET_NO);
944   LOG (GNUNET_ERROR_TYPE_DEBUG,
945        "Reconnected to DATASTORE\n");
946   process_queue (h);
947 }
948
949
950 /**
951  * Dummy continuation used to do nothing (but be non-zero).
952  *
953  * @param cls closure
954  * @param result result
955  * @param min_expiration expiration time
956  * @param emsg error message
957  */
958 static void
959 drop_status_cont (void *cls,
960                   int32_t result,
961                   struct GNUNET_TIME_Absolute min_expiration,
962                   const char *emsg)
963 {
964   /* do nothing */
965 }
966
967
968 /**
969  * Store an item in the datastore.  If the item is already present,
970  * the priorities are summed up and the higher expiration time and
971  * lower anonymity level is used.
972  *
973  * @param h handle to the datastore
974  * @param rid reservation ID to use (from "reserve"); use 0 if no
975  *            prior reservation was made
976  * @param key key for the value
977  * @param size number of bytes in data
978  * @param data content stored
979  * @param type type of the content
980  * @param priority priority of the content
981  * @param anonymity anonymity-level for the content
982  * @param replication how often should the content be replicated to other peers?
983  * @param expiration expiration time for the content
984  * @param queue_priority ranking of this request in the priority queue
985  * @param max_queue_size at what queue size should this request be dropped
986  *        (if other requests of higher priority are in the queue)
987  * @param cont continuation to call when done
988  * @param cont_cls closure for @a cont
989  * @return NULL if the entry was not queued, otherwise a handle that can be used to
990  *         cancel; note that even if NULL is returned, the callback will be invoked
991  *         (or rather, will already have been invoked)
992  */
993 struct GNUNET_DATASTORE_QueueEntry *
994 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
995                       uint32_t rid,
996                       const struct GNUNET_HashCode *key,
997                       size_t size,
998                       const void *data,
999                       enum GNUNET_BLOCK_Type type,
1000                       uint32_t priority,
1001                       uint32_t anonymity,
1002                       uint32_t replication,
1003                       struct GNUNET_TIME_Absolute expiration,
1004                       unsigned int queue_priority,
1005                       unsigned int max_queue_size,
1006                       GNUNET_DATASTORE_ContinuationWithStatus cont,
1007                       void *cont_cls)
1008 {
1009   struct GNUNET_DATASTORE_QueueEntry *qe;
1010   struct GNUNET_MQ_Envelope *env;
1011   struct DataMessage *dm;
1012   union QueueContext qc;
1013
1014   if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
1015   {
1016     GNUNET_break (0);
1017     return NULL;
1018   }
1019
1020   LOG (GNUNET_ERROR_TYPE_DEBUG,
1021        "Asked to put %u bytes of data under key `%s' for %s\n",
1022        size,
1023        GNUNET_h2s (key),
1024        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
1025                                                GNUNET_YES));
1026   env = GNUNET_MQ_msg_extra (dm,
1027                              size,
1028                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1029   dm->rid = htonl (rid);
1030   dm->size = htonl ((uint32_t) size);
1031   dm->type = htonl (type);
1032   dm->priority = htonl (priority);
1033   dm->anonymity = htonl (anonymity);
1034   dm->replication = htonl (replication);
1035   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1036   dm->key = *key;
1037   GNUNET_memcpy (&dm[1],
1038           data,
1039           size);
1040   qc.sc.cont = cont;
1041   qc.sc.cont_cls = cont_cls;
1042   qe = make_queue_entry (h,
1043                          env,
1044                          queue_priority,
1045                          max_queue_size,
1046                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1047                          &qc);
1048   if (NULL == qe)
1049   {
1050     LOG (GNUNET_ERROR_TYPE_DEBUG,
1051          "Could not create queue entry for PUT\n");
1052     return NULL;
1053   }
1054   GNUNET_STATISTICS_update (h->stats,
1055                             gettext_noop ("# PUT requests executed"),
1056                             1,
1057                             GNUNET_NO);
1058   process_queue (h);
1059   return qe;
1060 }
1061
1062
1063 /**
1064  * Reserve space in the datastore.  This function should be used
1065  * to avoid "out of space" failures during a longer sequence of "put"
1066  * operations (for example, when a file is being inserted).
1067  *
1068  * @param h handle to the datastore
1069  * @param amount how much space (in bytes) should be reserved (for content only)
1070  * @param entries how many entries will be created (to calculate per-entry overhead)
1071  * @param cont continuation to call when done; "success" will be set to
1072  *             a positive reservation value if space could be reserved.
1073  * @param cont_cls closure for @a cont
1074  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1075  *         cancel; note that even if NULL is returned, the callback will be invoked
1076  *         (or rather, will already have been invoked)
1077  */
1078 struct GNUNET_DATASTORE_QueueEntry *
1079 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1080                           uint64_t amount,
1081                           uint32_t entries,
1082                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1083                           void *cont_cls)
1084 {
1085   struct GNUNET_DATASTORE_QueueEntry *qe;
1086   struct GNUNET_MQ_Envelope *env;
1087   struct ReserveMessage *rm;
1088   union QueueContext qc;
1089
1090   if (NULL == cont)
1091     cont = &drop_status_cont;
1092   LOG (GNUNET_ERROR_TYPE_DEBUG,
1093        "Asked to reserve %llu bytes of data and %u entries\n",
1094        (unsigned long long) amount,
1095        (unsigned int) entries);
1096   env = GNUNET_MQ_msg (rm,
1097                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1098   rm->entries = htonl (entries);
1099   rm->amount = GNUNET_htonll (amount);
1100
1101   qc.sc.cont = cont;
1102   qc.sc.cont_cls = cont_cls;
1103   qe = make_queue_entry (h,
1104                          env,
1105                          UINT_MAX,
1106                          UINT_MAX,
1107                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1108                          &qc);
1109   if (NULL == qe)
1110   {
1111     LOG (GNUNET_ERROR_TYPE_DEBUG,
1112          "Could not create queue entry to reserve\n");
1113     return NULL;
1114   }
1115   GNUNET_STATISTICS_update (h->stats,
1116                             gettext_noop ("# RESERVE requests executed"),
1117                             1,
1118                             GNUNET_NO);
1119   process_queue (h);
1120   return qe;
1121 }
1122
1123
1124 /**
1125  * Signal that all of the data for which a reservation was made has
1126  * been stored and that whatever excess space might have been reserved
1127  * can now be released.
1128  *
1129  * @param h handle to the datastore
1130  * @param rid reservation ID (value of "success" in original continuation
1131  *        from the "reserve" function).
1132  * @param queue_priority ranking of this request in the priority queue
1133  * @param max_queue_size at what queue size should this request be dropped
1134  *        (if other requests of higher priority are in the queue)
1135  * @param queue_priority ranking of this request in the priority queue
1136  * @param max_queue_size at what queue size should this request be dropped
1137  *        (if other requests of higher priority are in the queue)
1138  * @param cont continuation to call when done
1139  * @param cont_cls closure for @a cont
1140  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1141  *         cancel; note that even if NULL is returned, the callback will be invoked
1142  *         (or rather, will already have been invoked)
1143  */
1144 struct GNUNET_DATASTORE_QueueEntry *
1145 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1146                                   uint32_t rid,
1147                                   unsigned int queue_priority,
1148                                   unsigned int max_queue_size,
1149                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1150                                   void *cont_cls)
1151 {
1152   struct GNUNET_DATASTORE_QueueEntry *qe;
1153   struct GNUNET_MQ_Envelope *env;
1154   struct ReleaseReserveMessage *rrm;
1155   union QueueContext qc;
1156
1157   if (NULL == cont)
1158     cont = &drop_status_cont;
1159   LOG (GNUNET_ERROR_TYPE_DEBUG,
1160        "Asked to release reserve %d\n",
1161        rid);
1162   env = GNUNET_MQ_msg (rrm,
1163                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1164   rrm->rid = htonl (rid);
1165   qc.sc.cont = cont;
1166   qc.sc.cont_cls = cont_cls;
1167   qe = make_queue_entry (h,
1168                          env,
1169                          queue_priority,
1170                          max_queue_size,
1171                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1172                          &qc);
1173   if (NULL == qe)
1174   {
1175     LOG (GNUNET_ERROR_TYPE_DEBUG,
1176          "Could not create queue entry to release reserve\n");
1177     return NULL;
1178   }
1179   GNUNET_STATISTICS_update (h->stats,
1180                             gettext_noop
1181                             ("# RELEASE RESERVE requests executed"), 1,
1182                             GNUNET_NO);
1183   process_queue (h);
1184   return qe;
1185 }
1186
1187
1188 /**
1189  * Explicitly remove some content from the database.
1190  * The @a cont continuation will be called with `status`
1191  * #GNUNET_OK" if content was removed, #GNUNET_NO
1192  * if no matching entry was found and #GNUNET_SYSERR
1193  * on all other types of errors.
1194  *
1195  * @param h handle to the datastore
1196  * @param key key for the value
1197  * @param size number of bytes in data
1198  * @param data content stored
1199  * @param queue_priority ranking of this request in the priority queue
1200  * @param max_queue_size at what queue size should this request be dropped
1201  *        (if other requests of higher priority are in the queue)
1202  * @param cont continuation to call when done
1203  * @param cont_cls closure for @a cont
1204  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1205  *         cancel; note that even if NULL is returned, the callback will be invoked
1206  *         (or rather, will already have been invoked)
1207  */
1208 struct GNUNET_DATASTORE_QueueEntry *
1209 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1210                          const struct GNUNET_HashCode *key,
1211                          size_t size,
1212                          const void *data,
1213                          unsigned int queue_priority,
1214                          unsigned int max_queue_size,
1215                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1216                          void *cont_cls)
1217 {
1218   struct GNUNET_DATASTORE_QueueEntry *qe;
1219   struct DataMessage *dm;
1220   struct GNUNET_MQ_Envelope *env;
1221   union QueueContext qc;
1222
1223   if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1224   {
1225     GNUNET_break (0);
1226     return NULL;
1227   }
1228   if (NULL == cont)
1229     cont = &drop_status_cont;
1230   LOG (GNUNET_ERROR_TYPE_DEBUG,
1231        "Asked to remove %u bytes under key `%s'\n",
1232        size,
1233        GNUNET_h2s (key));
1234   env = GNUNET_MQ_msg_extra (dm,
1235                              size,
1236                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1237   dm->size = htonl (size);
1238   dm->key = *key;
1239   GNUNET_memcpy (&dm[1],
1240           data,
1241           size);
1242
1243   qc.sc.cont = cont;
1244   qc.sc.cont_cls = cont_cls;
1245
1246   qe = make_queue_entry (h,
1247                          env,
1248                          queue_priority,
1249                          max_queue_size,
1250                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1251                          &qc);
1252   if (NULL == qe)
1253   {
1254     LOG (GNUNET_ERROR_TYPE_DEBUG,
1255          "Could not create queue entry for REMOVE\n");
1256     return NULL;
1257   }
1258   GNUNET_STATISTICS_update (h->stats,
1259                             gettext_noop ("# REMOVE requests executed"),
1260                             1,
1261                             GNUNET_NO);
1262   process_queue (h);
1263   return qe;
1264 }
1265
1266
1267
1268 /**
1269  * Get a random value from the datastore for content replication.
1270  * Returns a single, random value among those with the highest
1271  * replication score, lowering positive replication scores by one for
1272  * the chosen value (if only content with a replication score exists,
1273  * a random value is returned and replication scores are not changed).
1274  *
1275  * @param h handle to the datastore
1276  * @param queue_priority ranking of this request in the priority queue
1277  * @param max_queue_size at what queue size should this request be dropped
1278  *        (if other requests of higher priority are in the queue)
1279  * @param proc function to call on a random value; it
1280  *        will be called once with a value (if available)
1281  *        and always once with a value of NULL.
1282  * @param proc_cls closure for @a proc
1283  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1284  *         cancel
1285  */
1286 struct GNUNET_DATASTORE_QueueEntry *
1287 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1288                                       unsigned int queue_priority,
1289                                       unsigned int max_queue_size,
1290                                       GNUNET_DATASTORE_DatumProcessor proc,
1291                                       void *proc_cls)
1292 {
1293   struct GNUNET_DATASTORE_QueueEntry *qe;
1294   struct GNUNET_MQ_Envelope *env;
1295   struct GNUNET_MessageHeader *m;
1296   union QueueContext qc;
1297
1298   GNUNET_assert (NULL != proc);
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "Asked to get replication entry\n");
1301   env = GNUNET_MQ_msg (m,
1302                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1303   qc.rc.proc = proc;
1304   qc.rc.proc_cls = proc_cls;
1305   qe = make_queue_entry (h,
1306                          env,
1307                          queue_priority,
1308                          max_queue_size,
1309                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1310                          &qc);
1311   if (NULL == qe)
1312   {
1313     LOG (GNUNET_ERROR_TYPE_DEBUG,
1314          "Could not create queue entry for GET REPLICATION\n");
1315     return NULL;
1316   }
1317   GNUNET_STATISTICS_update (h->stats,
1318                             gettext_noop
1319                             ("# GET REPLICATION requests executed"), 1,
1320                             GNUNET_NO);
1321   process_queue (h);
1322   return qe;
1323 }
1324
1325
1326 /**
1327  * Get a single zero-anonymity value from the datastore.
1328  *
1329  * @param h handle to the datastore
1330  * @param next_uid return the result with lowest uid >= next_uid
1331  * @param queue_priority ranking of this request in the priority queue
1332  * @param max_queue_size at what queue size should this request be dropped
1333  *        (if other requests of higher priority are in the queue)
1334  * @param type allowed type for the operation (never zero)
1335  * @param proc function to call on a random value; it
1336  *        will be called once with a value (if available)
1337  *        or with NULL if none value exists.
1338  * @param proc_cls closure for @a proc
1339  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1340  *         cancel
1341  */
1342 struct GNUNET_DATASTORE_QueueEntry *
1343 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1344                                      uint64_t next_uid,
1345                                      unsigned int queue_priority,
1346                                      unsigned int max_queue_size,
1347                                      enum GNUNET_BLOCK_Type type,
1348                                      GNUNET_DATASTORE_DatumProcessor proc,
1349                                      void *proc_cls)
1350 {
1351   struct GNUNET_DATASTORE_QueueEntry *qe;
1352   struct GNUNET_MQ_Envelope *env;
1353   struct GetZeroAnonymityMessage *m;
1354   union QueueContext qc;
1355
1356   GNUNET_assert (NULL != proc);
1357   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1358   LOG (GNUNET_ERROR_TYPE_DEBUG,
1359        "Asked to get a zero-anonymity entry of type %d\n",
1360        type);
1361   env = GNUNET_MQ_msg (m,
1362                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1363   m->type = htonl ((uint32_t) type);
1364   m->next_uid = GNUNET_htonll (next_uid);
1365   qc.rc.proc = proc;
1366   qc.rc.proc_cls = proc_cls;
1367   qe = make_queue_entry (h,
1368                          env,
1369                          queue_priority,
1370                          max_queue_size,
1371                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1372                          &qc);
1373   if (NULL == qe)
1374   {
1375     LOG (GNUNET_ERROR_TYPE_DEBUG,
1376          "Could not create queue entry for zero-anonymity procation\n");
1377     return NULL;
1378   }
1379   GNUNET_STATISTICS_update (h->stats,
1380                             gettext_noop
1381                             ("# GET ZERO ANONYMITY requests executed"), 1,
1382                             GNUNET_NO);
1383   process_queue (h);
1384   return qe;
1385 }
1386
1387
1388 /**
1389  * Get a result for a particular key from the datastore.  The processor
1390  * will only be called once.
1391  *
1392  * @param h handle to the datastore
1393  * @param next_uid return the result with lowest uid >= next_uid
1394  * @param random if true, return a random result instead of using next_uid
1395  * @param key maybe NULL (to match all entries)
1396  * @param type desired type, 0 for any
1397  * @param queue_priority ranking of this request in the priority queue
1398  * @param max_queue_size at what queue size should this request be dropped
1399  *        (if other requests of higher priority are in the queue)
1400  * @param proc function to call on each matching value;
1401  *        will be called once with a NULL value at the end
1402  * @param proc_cls closure for @a proc
1403  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1404  *         cancel
1405  */
1406 struct GNUNET_DATASTORE_QueueEntry *
1407 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1408                           uint64_t next_uid,
1409                           bool random,
1410                           const struct GNUNET_HashCode *key,
1411                           enum GNUNET_BLOCK_Type type,
1412                           unsigned int queue_priority,
1413                           unsigned int max_queue_size,
1414                           GNUNET_DATASTORE_DatumProcessor proc,
1415                           void *proc_cls)
1416 {
1417   struct GNUNET_DATASTORE_QueueEntry *qe;
1418   struct GNUNET_MQ_Envelope *env;
1419   struct GetKeyMessage *gkm;
1420   struct GetMessage *gm;
1421   union QueueContext qc;
1422
1423   GNUNET_assert (NULL != proc);
1424   LOG (GNUNET_ERROR_TYPE_DEBUG,
1425        "Asked to look for data of type %u under key `%s'\n",
1426        (unsigned int) type,
1427        GNUNET_h2s (key));
1428   if (NULL == key)
1429   {
1430     env = GNUNET_MQ_msg (gm,
1431                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1432     gm->type = htonl (type);
1433     gm->next_uid = GNUNET_htonll (next_uid);
1434     gm->random = random;
1435   }
1436   else
1437   {
1438     env = GNUNET_MQ_msg (gkm,
1439                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1440     gkm->type = htonl (type);
1441     gkm->next_uid = GNUNET_htonll (next_uid);
1442     gkm->random = random;
1443     gkm->key = *key;
1444   }
1445   qc.rc.proc = proc;
1446   qc.rc.proc_cls = proc_cls;
1447   qe = make_queue_entry (h,
1448                          env,
1449                          queue_priority,
1450                          max_queue_size,
1451                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1452                          &qc);
1453   if (NULL == qe)
1454   {
1455     LOG (GNUNET_ERROR_TYPE_DEBUG,
1456          "Could not queue request for `%s'\n",
1457          GNUNET_h2s (key));
1458     return NULL;
1459   }
1460 #if INSANE_STATISTICS
1461   GNUNET_STATISTICS_update (h->stats,
1462                             gettext_noop ("# GET requests executed"),
1463                             1,
1464                             GNUNET_NO);
1465 #endif
1466   process_queue (h);
1467   return qe;
1468 }
1469
1470
1471 /**
1472  * Cancel a datastore operation.  The final callback from the
1473  * operation must not have been done yet.
1474  *
1475  * @param qe operation to cancel
1476  */
1477 void
1478 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1479 {
1480   struct GNUNET_DATASTORE_Handle *h = qe->h;
1481
1482   LOG (GNUNET_ERROR_TYPE_DEBUG,
1483        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1484        qe,
1485        NULL == qe->env,
1486        h->queue_head == qe);
1487   if (NULL == qe->env)
1488   {
1489     free_queue_entry (qe);
1490     h->skip_next_messages++;
1491     return;
1492   }
1493   free_queue_entry (qe);
1494   process_queue (h);
1495 }
1496
1497
1498 /* end of datastore_api.c */