first batch of license fixes (boring)
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14 */
15
16 /**
17  * @file datastore/datastore_api.c
18  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
19  *        a priority queue for requests
20  * @author Christian Grothoff
21  */
22 #include "platform.h"
23 #include "gnunet_arm_service.h"
24 #include "gnunet_constants.h"
25 #include "gnunet_datastore_service.h"
26 #include "gnunet_statistics_service.h"
27 #include "datastore.h"
28
29 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
30
31 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
32
33 /**
34  * Collect an instane number of statistics?  May cause excessive IPC.
35  */
36 #define INSANE_STATISTICS GNUNET_NO
37
38 /**
39  * If a client stopped asking for more results, how many more do
40  * we receive from the DB before killing the connection?  Trade-off
41  * between re-doing TCP handshakes and (needlessly) receiving
42  * useless results.
43  */
44 #define MAX_EXCESS_RESULTS 8
45
46 /**
47  * Context for processing status messages.
48  */
49 struct StatusContext
50 {
51   /**
52    * Continuation to call with the status.
53    */
54   GNUNET_DATASTORE_ContinuationWithStatus cont;
55
56   /**
57    * Closure for @e cont.
58    */
59   void *cont_cls;
60
61 };
62
63
64 /**
65  * Context for processing result messages.
66  */
67 struct ResultContext
68 {
69   /**
70    * Function to call with the result.
71    */
72   GNUNET_DATASTORE_DatumProcessor proc;
73
74   /**
75    * Closure for @e proc.
76    */
77   void *proc_cls;
78
79 };
80
81
82 /**
83  *  Context for a queue operation.
84  */
85 union QueueContext
86 {
87
88   struct StatusContext sc;
89
90   struct ResultContext rc;
91
92 };
93
94
95 /**
96  * Entry in our priority queue.
97  */
98 struct GNUNET_DATASTORE_QueueEntry
99 {
100
101   /**
102    * This is a linked list.
103    */
104   struct GNUNET_DATASTORE_QueueEntry *next;
105
106   /**
107    * This is a linked list.
108    */
109   struct GNUNET_DATASTORE_QueueEntry *prev;
110
111   /**
112    * Handle to the master context.
113    */
114   struct GNUNET_DATASTORE_Handle *h;
115
116   /**
117    * Function to call after transmission of the request.
118    */
119   GNUNET_DATASTORE_ContinuationWithStatus cont;
120
121   /**
122    * Closure for @e cont.
123    */
124   void *cont_cls;
125
126   /**
127    * Context for the operation.
128    */
129   union QueueContext qc;
130
131   /**
132    * Envelope of the request to transmit, NULL after
133    * transmission.
134    */
135   struct GNUNET_MQ_Envelope *env;
136
137   /**
138    * Task we run if this entry stalls the queue and we
139    * need to warn the user.
140    */
141   struct GNUNET_SCHEDULER_Task *delay_warn_task;
142
143   /**
144    * Priority in the queue.
145    */
146   unsigned int priority;
147
148   /**
149    * Maximum allowed length of queue (otherwise
150    * this request should be discarded).
151    */
152   unsigned int max_queue;
153
154   /**
155    * Expected response type.
156    */
157   uint16_t response_type;
158
159 };
160
161
162 /**
163  * Handle to the datastore service.
164  */
165 struct GNUNET_DATASTORE_Handle
166 {
167
168   /**
169    * Our configuration.
170    */
171   const struct GNUNET_CONFIGURATION_Handle *cfg;
172
173   /**
174    * Current connection to the datastore service.
175    */
176   struct GNUNET_MQ_Handle *mq;
177
178   /**
179    * Handle for statistics.
180    */
181   struct GNUNET_STATISTICS_Handle *stats;
182
183   /**
184    * Current head of priority queue.
185    */
186   struct GNUNET_DATASTORE_QueueEntry *queue_head;
187
188   /**
189    * Current tail of priority queue.
190    */
191   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
192
193   /**
194    * Task for trying to reconnect.
195    */
196   struct GNUNET_SCHEDULER_Task *reconnect_task;
197
198   /**
199    * How quickly should we retry?  Used for exponential back-off on
200    * connect-errors.
201    */
202   struct GNUNET_TIME_Relative retry_time;
203
204   /**
205    * Number of entries in the queue.
206    */
207   unsigned int queue_size;
208
209   /**
210    * Number of results we're receiving for the current query
211    * after application stopped to care.  Used to determine when
212    * to reset the connection.
213    */
214   unsigned int result_count;
215
216   /**
217    * We should ignore the next message(s) from the service.
218    */
219   unsigned int skip_next_messages;
220
221 };
222
223
224 /**
225  * Try reconnecting to the datastore service.
226  *
227  * @param cls the `struct GNUNET_DATASTORE_Handle`
228  */
229 static void
230 try_reconnect (void *cls);
231
232
233 /**
234  * Disconnect from the service and then try reconnecting to the datastore service
235  * after some delay.
236  *
237  * @param h handle to datastore to disconnect and reconnect
238  */
239 static void
240 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
241 {
242   if (NULL == h->mq)
243   {
244     GNUNET_break (0);
245     return;
246   }
247   GNUNET_MQ_destroy (h->mq);
248   h->mq = NULL;
249   h->skip_next_messages = 0;
250   h->reconnect_task
251     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
252                                     &try_reconnect,
253                                     h);
254 }
255
256
257 /**
258  * Free a queue entry.  Removes the given entry from the
259  * queue and releases associated resources.  Does NOT
260  * call the callback.
261  *
262  * @param qe entry to free.
263  */
264 static void
265 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
266 {
267   struct GNUNET_DATASTORE_Handle *h = qe->h;
268
269   GNUNET_CONTAINER_DLL_remove (h->queue_head,
270                                h->queue_tail,
271                                qe);
272   h->queue_size--;
273   if (NULL != qe->env)
274     GNUNET_MQ_discard (qe->env);
275   if (NULL != qe->delay_warn_task)
276     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
277   GNUNET_free (qe);
278 }
279
280
281 /**
282  * Task that logs an error after some time.
283  *
284  * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
285  */
286 static void
287 delay_warning (void *cls)
288 {
289   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
290
291   qe->delay_warn_task = NULL;
292   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
293               "Request %p of type %u at head of datastore queue for more than %s\n",
294               qe,
295               (unsigned int) qe->response_type,
296               GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
297                                                       GNUNET_YES));
298   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
299                                                       &delay_warning,
300                                                       qe);
301 }
302
303
304 /**
305  * Handle error in sending drop request to datastore.
306  *
307  * @param cls closure with the datastore handle
308  * @param error error code
309  */
310 static void
311 mq_error_handler (void *cls,
312                   enum GNUNET_MQ_Error error)
313 {
314   struct GNUNET_DATASTORE_Handle *h = cls;
315   struct GNUNET_DATASTORE_QueueEntry *qe;
316
317   LOG (GNUNET_ERROR_TYPE_DEBUG,
318        "MQ error, reconnecting to DATASTORE\n");
319   do_disconnect (h);
320   qe = h->queue_head;
321   if (NULL == qe)
322     return;
323   if (NULL != qe->delay_warn_task)
324   {
325     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
326     qe->delay_warn_task = NULL;
327   }
328   if (NULL == qe->env)
329   {
330     union QueueContext qc = qe->qc;
331     uint16_t rt = qe->response_type;
332
333     LOG (GNUNET_ERROR_TYPE_DEBUG,
334          "Failed to receive response from database.\n");
335     free_queue_entry (qe);
336     switch (rt)
337     {
338     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
339       if (NULL != qc.sc.cont)
340         qc.sc.cont (qc.sc.cont_cls,
341                     GNUNET_SYSERR,
342                     GNUNET_TIME_UNIT_ZERO_ABS,
343                     _("DATASTORE disconnected"));
344       break;
345     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
346       if (NULL != qc.rc.proc)
347         qc.rc.proc (qc.rc.proc_cls,
348                     NULL,
349                     0,
350                     NULL,
351                     0,
352                     0,
353                     0,
354                     0,
355                     GNUNET_TIME_UNIT_ZERO_ABS,
356                     0);
357       break;
358     default:
359       GNUNET_break (0);
360     }
361   }
362 }
363
364
365 /**
366  * Connect to the datastore service.
367  *
368  * @param cfg configuration to use
369  * @return handle to use to access the service
370  */
371 struct GNUNET_DATASTORE_Handle *
372 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
373 {
374   struct GNUNET_DATASTORE_Handle *h;
375
376   LOG (GNUNET_ERROR_TYPE_DEBUG,
377        "Establishing DATASTORE connection!\n");
378   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
379   h->cfg = cfg;
380   try_reconnect (h);
381   if (NULL == h->mq)
382   {
383     GNUNET_free (h);
384     return NULL;
385   }
386   h->stats = GNUNET_STATISTICS_create ("datastore-api",
387                                        cfg);
388   return h;
389 }
390
391
392 /**
393  * Task used by to disconnect from the datastore after
394  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
395  *
396  * @param cls the datastore handle
397  */
398 static void
399 disconnect_after_drop (void *cls)
400 {
401   struct GNUNET_DATASTORE_Handle *h = cls;
402
403   LOG (GNUNET_ERROR_TYPE_DEBUG,
404        "Drop sent, disconnecting\n");
405   GNUNET_DATASTORE_disconnect (h,
406                                GNUNET_NO);
407 }
408
409
410 /**
411  * Handle error in sending drop request to datastore.
412  *
413  * @param cls closure with the datastore handle
414  * @param error error code
415  */
416 static void
417 disconnect_on_mq_error (void *cls,
418                         enum GNUNET_MQ_Error error)
419 {
420   struct GNUNET_DATASTORE_Handle *h = cls;
421
422   LOG (GNUNET_ERROR_TYPE_ERROR,
423        "Failed to ask datastore to drop tables\n");
424   GNUNET_DATASTORE_disconnect (h,
425                                GNUNET_NO);
426 }
427
428
429 /**
430  * Disconnect from the datastore service (and free
431  * associated resources).
432  *
433  * @param h handle to the datastore
434  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
435  */
436 void
437 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
438                              int drop)
439 {
440   struct GNUNET_DATASTORE_QueueEntry *qe;
441
442   LOG (GNUNET_ERROR_TYPE_DEBUG,
443        "Datastore disconnect\n");
444   if (NULL != h->mq)
445   {
446     GNUNET_MQ_destroy (h->mq);
447     h->mq = NULL;
448   }
449   if (NULL != h->reconnect_task)
450   {
451     GNUNET_SCHEDULER_cancel (h->reconnect_task);
452     h->reconnect_task = NULL;
453   }
454   while (NULL != (qe = h->queue_head))
455   {
456     switch (qe->response_type)
457     {
458     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
459       if (NULL != qe->qc.sc.cont)
460         qe->qc.sc.cont (qe->qc.sc.cont_cls,
461                         GNUNET_SYSERR,
462                         GNUNET_TIME_UNIT_ZERO_ABS,
463                         _("Disconnected from DATASTORE"));
464       break;
465     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
466       if (NULL != qe->qc.rc.proc)
467         qe->qc.rc.proc (qe->qc.rc.proc_cls,
468                         NULL,
469                         0,
470                         NULL,
471                         0,
472                         0,
473                         0,
474                         0,
475                         GNUNET_TIME_UNIT_ZERO_ABS,
476                         0);
477       break;
478     default:
479       GNUNET_break (0);
480     }
481     free_queue_entry (qe);
482   }
483   if (GNUNET_YES == drop)
484   {
485     LOG (GNUNET_ERROR_TYPE_DEBUG,
486          "Re-connecting to issue DROP!\n");
487     GNUNET_assert (NULL == h->mq);
488     h->mq = GNUNET_CLIENT_connect (h->cfg,
489                                    "datastore",
490                                    NULL,
491                                    &disconnect_on_mq_error,
492                                    h);
493     if (NULL != h->mq)
494     {
495       struct GNUNET_MessageHeader *hdr;
496       struct GNUNET_MQ_Envelope *env;
497
498       env = GNUNET_MQ_msg (hdr,
499                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
500       GNUNET_MQ_notify_sent (env,
501                              &disconnect_after_drop,
502                              h);
503       GNUNET_MQ_send (h->mq,
504                       env);
505       return;
506     }
507     GNUNET_break (0);
508   }
509   GNUNET_STATISTICS_destroy (h->stats,
510                              GNUNET_NO);
511   h->stats = NULL;
512   GNUNET_free (h);
513 }
514
515
516 /**
517  * Create a new entry for our priority queue (and possibly discard other entires if
518  * the queue is getting too long).
519  *
520  * @param h handle to the datastore
521  * @param env envelope with the message to queue
522  * @param queue_priority priority of the entry
523  * @param max_queue_size at what queue size should this request be dropped
524  *        (if other requests of higher priority are in the queue)
525  * @param expected_type which type of response do we expect,
526  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
527  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
528  * @param qc client context (NOT a closure for @a response_proc)
529  * @return NULL if the queue is full
530  */
531 static struct GNUNET_DATASTORE_QueueEntry *
532 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
533                   struct GNUNET_MQ_Envelope *env,
534                   unsigned int queue_priority,
535                   unsigned int max_queue_size,
536                   uint16_t expected_type,
537                   const union QueueContext *qc)
538 {
539   struct GNUNET_DATASTORE_QueueEntry *qe;
540   struct GNUNET_DATASTORE_QueueEntry *pos;
541   unsigned int c;
542
543   if ( (NULL != h->queue_tail) &&
544        (h->queue_tail->priority >= queue_priority) )
545   {
546     c = h->queue_size;
547     pos = NULL;
548   }
549   else
550   {
551     c = 0;
552     pos = h->queue_head;
553   }
554   while ( (NULL != pos) &&
555           (c < max_queue_size) &&
556           (pos->priority >= queue_priority) )
557   {
558     c++;
559     pos = pos->next;
560   }
561   if (c >= max_queue_size)
562   {
563     GNUNET_STATISTICS_update (h->stats,
564                               gettext_noop ("# queue overflows"),
565                               1,
566                               GNUNET_NO);
567     GNUNET_MQ_discard (env);
568     return NULL;
569   }
570   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
571   qe->h = h;
572   qe->env = env;
573   qe->response_type = expected_type;
574   qe->qc = *qc;
575   qe->priority = queue_priority;
576   qe->max_queue = max_queue_size;
577   if (NULL == pos)
578   {
579     /* append at the tail */
580     pos = h->queue_tail;
581   }
582   else
583   {
584     pos = pos->prev;
585     /* do not insert at HEAD if HEAD query was already
586      * transmitted and we are still receiving replies! */
587     if ( (NULL == pos) &&
588          (NULL == h->queue_head->env) )
589       pos = h->queue_head;
590   }
591   c++;
592 #if INSANE_STATISTICS
593   GNUNET_STATISTICS_update (h->stats,
594                             gettext_noop ("# queue entries created"),
595                             1,
596                             GNUNET_NO);
597 #endif
598   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
599                                      h->queue_tail,
600                                      pos,
601                                      qe);
602   h->queue_size++;
603   return qe;
604 }
605
606
607 /**
608  * Process entries in the queue (or do nothing if we are already
609  * doing so).
610  *
611  * @param h handle to the datastore
612  */
613 static void
614 process_queue (struct GNUNET_DATASTORE_Handle *h)
615 {
616   struct GNUNET_DATASTORE_QueueEntry *qe;
617
618   if (NULL == (qe = h->queue_head))
619   {
620     /* no entry in queue */
621     LOG (GNUNET_ERROR_TYPE_DEBUG,
622          "Queue empty\n");
623     return;
624   }
625   if (NULL == qe->env)
626   {
627     /* waiting for replies */
628     LOG (GNUNET_ERROR_TYPE_DEBUG,
629          "Head request already transmitted\n");
630     return;
631   }
632   if (NULL == h->mq)
633   {
634     /* waiting for reconnect */
635     LOG (GNUNET_ERROR_TYPE_DEBUG,
636          "Not connected\n");
637     return;
638   }
639   GNUNET_assert (NULL == qe->delay_warn_task);
640   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
641                                                       &delay_warning,
642                                                       qe);
643   GNUNET_MQ_send (h->mq,
644                   qe->env);
645   qe->env = NULL;
646 }
647
648
649 /**
650  * Get the entry at the head of the message queue.
651  *
652  * @param h handle to the datastore
653  * @param response_type the expected response type
654  * @return the queue entry
655  */
656 static struct GNUNET_DATASTORE_QueueEntry *
657 get_queue_head (struct GNUNET_DATASTORE_Handle *h,
658                 uint16_t response_type)
659 {
660   struct GNUNET_DATASTORE_QueueEntry *qe;
661
662   if (h->skip_next_messages > 0)
663   {
664     h->skip_next_messages--;
665     process_queue (h);
666     return NULL;
667   }
668   qe = h->queue_head;
669   if (NULL == qe)
670   {
671     GNUNET_break (0);
672     do_disconnect (h);
673     return NULL;
674   }
675   if (NULL != qe->env)
676   {
677     GNUNET_break (0);
678     do_disconnect (h);
679     return NULL;
680   }
681   if (response_type != qe->response_type)
682   {
683     GNUNET_break (0);
684     do_disconnect (h);
685     return NULL;
686   }
687   return qe;
688 }
689
690
691 /**
692  * Function called to check status message from the service.
693  *
694  * @param cls closure
695  * @param sm status message received
696  * @return #GNUNET_OK if the message is well-formed
697  */
698 static int
699 check_status (void *cls,
700               const struct StatusMessage *sm)
701 {
702   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
703   int32_t status = ntohl (sm->status);
704
705   if (msize > 0)
706   {
707     const char *emsg = (const char *) &sm[1];
708
709     if ('\0' != emsg[msize - 1])
710     {
711       GNUNET_break (0);
712       return GNUNET_SYSERR;
713     }
714   }
715   else if (GNUNET_SYSERR == status)
716   {
717     GNUNET_break (0);
718     return GNUNET_SYSERR;
719   }
720   return GNUNET_OK;
721 }
722
723
724 /**
725  * Function called to handle status message from the service.
726  *
727  * @param cls closure
728  * @param sm status message received
729  */
730 static void
731 handle_status (void *cls,
732                const struct StatusMessage *sm)
733 {
734   struct GNUNET_DATASTORE_Handle *h = cls;
735   struct GNUNET_DATASTORE_QueueEntry *qe;
736   struct StatusContext rc;
737   const char *emsg;
738   int32_t status = ntohl (sm->status);
739
740   qe = get_queue_head (h,
741                        GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
742   if (NULL == qe)
743     return;
744   rc = qe->qc.sc;
745   free_queue_entry (qe);
746   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
747     emsg = (const char *) &sm[1];
748   else
749     emsg = NULL;
750   LOG (GNUNET_ERROR_TYPE_DEBUG,
751        "Received status %d/%s\n",
752        (int) status,
753        emsg);
754   GNUNET_STATISTICS_update (h->stats,
755                             gettext_noop ("# status messages received"),
756                             1,
757                             GNUNET_NO);
758   h->retry_time = GNUNET_TIME_UNIT_ZERO;
759   process_queue (h);
760   if (NULL != rc.cont)
761     rc.cont (rc.cont_cls,
762              status,
763              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
764              emsg);
765 }
766
767
768 /**
769  * Check data message we received from the service.
770  *
771  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
772  * @param dm message received
773  */
774 static int
775 check_data (void *cls,
776             const struct DataMessage *dm)
777 {
778   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
779
780   if (msize != ntohl (dm->size))
781   {
782     GNUNET_break (0);
783     return GNUNET_SYSERR;
784   }
785   return GNUNET_OK;
786 }
787
788
789 /**
790  * Handle data message we got from the service.
791  *
792  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
793  * @param dm message received
794  */
795 static void
796 handle_data (void *cls,
797              const struct DataMessage *dm)
798 {
799   struct GNUNET_DATASTORE_Handle *h = cls;
800   struct GNUNET_DATASTORE_QueueEntry *qe;
801   struct ResultContext rc;
802
803   qe = get_queue_head (h,
804                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
805   if (NULL == qe)
806     return;
807 #if INSANE_STATISTICS
808   GNUNET_STATISTICS_update (h->stats,
809                             gettext_noop ("# Results received"),
810                             1,
811                             GNUNET_NO);
812 #endif
813   LOG (GNUNET_ERROR_TYPE_DEBUG,
814        "Received result %llu with type %u and size %u with key %s\n",
815        (unsigned long long) GNUNET_ntohll (dm->uid),
816        ntohl (dm->type),
817        ntohl (dm->size),
818        GNUNET_h2s (&dm->key));
819   rc = qe->qc.rc;
820   free_queue_entry (qe);
821   h->retry_time = GNUNET_TIME_UNIT_ZERO;
822   process_queue (h);
823   if (NULL != rc.proc)
824     rc.proc (rc.proc_cls,
825              &dm->key,
826              ntohl (dm->size),
827              &dm[1],
828              ntohl (dm->type),
829              ntohl (dm->priority),
830              ntohl (dm->anonymity),
831              ntohl (dm->replication),
832              GNUNET_TIME_absolute_ntoh (dm->expiration),
833              GNUNET_ntohll (dm->uid));
834 }
835
836
837 /**
838  * Type of a function to call when we receive a
839  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
840  *
841  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
842  * @param msg message received
843  */
844 static void
845 handle_data_end (void *cls,
846                  const struct GNUNET_MessageHeader *msg)
847 {
848   struct GNUNET_DATASTORE_Handle *h = cls;
849   struct GNUNET_DATASTORE_QueueEntry *qe;
850   struct ResultContext rc;
851
852   qe = get_queue_head (h,
853                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
854   if (NULL == qe)
855     return;
856   rc = qe->qc.rc;
857   free_queue_entry (qe);
858   LOG (GNUNET_ERROR_TYPE_DEBUG,
859        "Received end of result set, new queue size is %u\n",
860        h->queue_size);
861   h->retry_time = GNUNET_TIME_UNIT_ZERO;
862   h->result_count = 0;
863   process_queue (h);
864   /* signal end of iteration */
865   if (NULL != rc.proc)
866     rc.proc (rc.proc_cls,
867              NULL,
868              0,
869              NULL,
870              0,
871              0,
872              0,
873              0,
874              GNUNET_TIME_UNIT_ZERO_ABS,
875              0);
876 }
877
878
879 /**
880  * Try reconnecting to the datastore service.
881  *
882  * @param cls the `struct GNUNET_DATASTORE_Handle`
883  */
884 static void
885 try_reconnect (void *cls)
886 {
887   struct GNUNET_DATASTORE_Handle *h = cls;
888   struct GNUNET_MQ_MessageHandler handlers[] = {
889     GNUNET_MQ_hd_var_size (status,
890                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
891                            struct StatusMessage,
892                            h),
893     GNUNET_MQ_hd_var_size (data,
894                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
895                            struct DataMessage,
896                            h),
897     GNUNET_MQ_hd_fixed_size (data_end,
898                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
899                              struct GNUNET_MessageHeader,
900                              h),
901     GNUNET_MQ_handler_end ()
902   };
903
904   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
905   h->reconnect_task = NULL;
906   GNUNET_assert (NULL == h->mq);
907   h->mq = GNUNET_CLIENT_connect (h->cfg,
908                                  "datastore",
909                                  handlers,
910                                  &mq_error_handler,
911                                  h);
912   if (NULL == h->mq)
913     return;
914   GNUNET_STATISTICS_update (h->stats,
915                             gettext_noop ("# datastore connections (re)created"),
916                             1,
917                             GNUNET_NO);
918   LOG (GNUNET_ERROR_TYPE_DEBUG,
919        "Reconnected to DATASTORE\n");
920   process_queue (h);
921 }
922
923
924 /**
925  * Dummy continuation used to do nothing (but be non-zero).
926  *
927  * @param cls closure
928  * @param result result
929  * @param min_expiration expiration time
930  * @param emsg error message
931  */
932 static void
933 drop_status_cont (void *cls,
934                   int32_t result,
935                   struct GNUNET_TIME_Absolute min_expiration,
936                   const char *emsg)
937 {
938   /* do nothing */
939 }
940
941
942 /**
943  * Store an item in the datastore.  If the item is already present,
944  * the priorities are summed up and the higher expiration time and
945  * lower anonymity level is used.
946  *
947  * @param h handle to the datastore
948  * @param rid reservation ID to use (from "reserve"); use 0 if no
949  *            prior reservation was made
950  * @param key key for the value
951  * @param size number of bytes in data
952  * @param data content stored
953  * @param type type of the content
954  * @param priority priority of the content
955  * @param anonymity anonymity-level for the content
956  * @param replication how often should the content be replicated to other peers?
957  * @param expiration expiration time for the content
958  * @param queue_priority ranking of this request in the priority queue
959  * @param max_queue_size at what queue size should this request be dropped
960  *        (if other requests of higher priority are in the queue)
961  * @param cont continuation to call when done
962  * @param cont_cls closure for @a cont
963  * @return NULL if the entry was not queued, otherwise a handle that can be used to
964  *         cancel; note that even if NULL is returned, the callback will be invoked
965  *         (or rather, will already have been invoked)
966  */
967 struct GNUNET_DATASTORE_QueueEntry *
968 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
969                       uint32_t rid,
970                       const struct GNUNET_HashCode *key,
971                       size_t size,
972                       const void *data,
973                       enum GNUNET_BLOCK_Type type,
974                       uint32_t priority,
975                       uint32_t anonymity,
976                       uint32_t replication,
977                       struct GNUNET_TIME_Absolute expiration,
978                       unsigned int queue_priority,
979                       unsigned int max_queue_size,
980                       GNUNET_DATASTORE_ContinuationWithStatus cont,
981                       void *cont_cls)
982 {
983   struct GNUNET_DATASTORE_QueueEntry *qe;
984   struct GNUNET_MQ_Envelope *env;
985   struct DataMessage *dm;
986   union QueueContext qc;
987
988   if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
989   {
990     GNUNET_break (0);
991     return NULL;
992   }
993
994   LOG (GNUNET_ERROR_TYPE_DEBUG,
995        "Asked to put %u bytes of data under key `%s' for %s\n",
996        size,
997        GNUNET_h2s (key),
998        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
999                                                GNUNET_YES));
1000   env = GNUNET_MQ_msg_extra (dm,
1001                              size,
1002                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1003   dm->rid = htonl (rid);
1004   dm->size = htonl ((uint32_t) size);
1005   dm->type = htonl (type);
1006   dm->priority = htonl (priority);
1007   dm->anonymity = htonl (anonymity);
1008   dm->replication = htonl (replication);
1009   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1010   dm->key = *key;
1011   GNUNET_memcpy (&dm[1],
1012           data,
1013           size);
1014   qc.sc.cont = cont;
1015   qc.sc.cont_cls = cont_cls;
1016   qe = make_queue_entry (h,
1017                          env,
1018                          queue_priority,
1019                          max_queue_size,
1020                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1021                          &qc);
1022   if (NULL == qe)
1023   {
1024     LOG (GNUNET_ERROR_TYPE_DEBUG,
1025          "Could not create queue entry for PUT\n");
1026     return NULL;
1027   }
1028   GNUNET_STATISTICS_update (h->stats,
1029                             gettext_noop ("# PUT requests executed"),
1030                             1,
1031                             GNUNET_NO);
1032   process_queue (h);
1033   return qe;
1034 }
1035
1036
1037 /**
1038  * Reserve space in the datastore.  This function should be used
1039  * to avoid "out of space" failures during a longer sequence of "put"
1040  * operations (for example, when a file is being inserted).
1041  *
1042  * @param h handle to the datastore
1043  * @param amount how much space (in bytes) should be reserved (for content only)
1044  * @param entries how many entries will be created (to calculate per-entry overhead)
1045  * @param cont continuation to call when done; "success" will be set to
1046  *             a positive reservation value if space could be reserved.
1047  * @param cont_cls closure for @a cont
1048  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1049  *         cancel; note that even if NULL is returned, the callback will be invoked
1050  *         (or rather, will already have been invoked)
1051  */
1052 struct GNUNET_DATASTORE_QueueEntry *
1053 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1054                           uint64_t amount,
1055                           uint32_t entries,
1056                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1057                           void *cont_cls)
1058 {
1059   struct GNUNET_DATASTORE_QueueEntry *qe;
1060   struct GNUNET_MQ_Envelope *env;
1061   struct ReserveMessage *rm;
1062   union QueueContext qc;
1063
1064   if (NULL == cont)
1065     cont = &drop_status_cont;
1066   LOG (GNUNET_ERROR_TYPE_DEBUG,
1067        "Asked to reserve %llu bytes of data and %u entries\n",
1068        (unsigned long long) amount,
1069        (unsigned int) entries);
1070   env = GNUNET_MQ_msg (rm,
1071                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1072   rm->entries = htonl (entries);
1073   rm->amount = GNUNET_htonll (amount);
1074
1075   qc.sc.cont = cont;
1076   qc.sc.cont_cls = cont_cls;
1077   qe = make_queue_entry (h,
1078                          env,
1079                          UINT_MAX,
1080                          UINT_MAX,
1081                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1082                          &qc);
1083   if (NULL == qe)
1084   {
1085     LOG (GNUNET_ERROR_TYPE_DEBUG,
1086          "Could not create queue entry to reserve\n");
1087     return NULL;
1088   }
1089   GNUNET_STATISTICS_update (h->stats,
1090                             gettext_noop ("# RESERVE requests executed"),
1091                             1,
1092                             GNUNET_NO);
1093   process_queue (h);
1094   return qe;
1095 }
1096
1097
1098 /**
1099  * Signal that all of the data for which a reservation was made has
1100  * been stored and that whatever excess space might have been reserved
1101  * can now be released.
1102  *
1103  * @param h handle to the datastore
1104  * @param rid reservation ID (value of "success" in original continuation
1105  *        from the "reserve" function).
1106  * @param queue_priority ranking of this request in the priority queue
1107  * @param max_queue_size at what queue size should this request be dropped
1108  *        (if other requests of higher priority are in the queue)
1109  * @param queue_priority ranking of this request in the priority queue
1110  * @param max_queue_size at what queue size should this request be dropped
1111  *        (if other requests of higher priority are in the queue)
1112  * @param cont continuation to call when done
1113  * @param cont_cls closure for @a cont
1114  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1115  *         cancel; note that even if NULL is returned, the callback will be invoked
1116  *         (or rather, will already have been invoked)
1117  */
1118 struct GNUNET_DATASTORE_QueueEntry *
1119 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1120                                   uint32_t rid,
1121                                   unsigned int queue_priority,
1122                                   unsigned int max_queue_size,
1123                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1124                                   void *cont_cls)
1125 {
1126   struct GNUNET_DATASTORE_QueueEntry *qe;
1127   struct GNUNET_MQ_Envelope *env;
1128   struct ReleaseReserveMessage *rrm;
1129   union QueueContext qc;
1130
1131   if (NULL == cont)
1132     cont = &drop_status_cont;
1133   LOG (GNUNET_ERROR_TYPE_DEBUG,
1134        "Asked to release reserve %d\n",
1135        rid);
1136   env = GNUNET_MQ_msg (rrm,
1137                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1138   rrm->rid = htonl (rid);
1139   qc.sc.cont = cont;
1140   qc.sc.cont_cls = cont_cls;
1141   qe = make_queue_entry (h,
1142                          env,
1143                          queue_priority,
1144                          max_queue_size,
1145                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1146                          &qc);
1147   if (NULL == qe)
1148   {
1149     LOG (GNUNET_ERROR_TYPE_DEBUG,
1150          "Could not create queue entry to release reserve\n");
1151     return NULL;
1152   }
1153   GNUNET_STATISTICS_update (h->stats,
1154                             gettext_noop
1155                             ("# RELEASE RESERVE requests executed"), 1,
1156                             GNUNET_NO);
1157   process_queue (h);
1158   return qe;
1159 }
1160
1161
1162 /**
1163  * Explicitly remove some content from the database.
1164  * The @a cont continuation will be called with `status`
1165  * #GNUNET_OK" if content was removed, #GNUNET_NO
1166  * if no matching entry was found and #GNUNET_SYSERR
1167  * on all other types of errors.
1168  *
1169  * @param h handle to the datastore
1170  * @param key key for the value
1171  * @param size number of bytes in data
1172  * @param data content stored
1173  * @param queue_priority ranking of this request in the priority queue
1174  * @param max_queue_size at what queue size should this request be dropped
1175  *        (if other requests of higher priority are in the queue)
1176  * @param cont continuation to call when done
1177  * @param cont_cls closure for @a cont
1178  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1179  *         cancel; note that even if NULL is returned, the callback will be invoked
1180  *         (or rather, will already have been invoked)
1181  */
1182 struct GNUNET_DATASTORE_QueueEntry *
1183 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1184                          const struct GNUNET_HashCode *key,
1185                          size_t size,
1186                          const void *data,
1187                          unsigned int queue_priority,
1188                          unsigned int max_queue_size,
1189                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1190                          void *cont_cls)
1191 {
1192   struct GNUNET_DATASTORE_QueueEntry *qe;
1193   struct DataMessage *dm;
1194   struct GNUNET_MQ_Envelope *env;
1195   union QueueContext qc;
1196
1197   if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1198   {
1199     GNUNET_break (0);
1200     return NULL;
1201   }
1202   if (NULL == cont)
1203     cont = &drop_status_cont;
1204   LOG (GNUNET_ERROR_TYPE_DEBUG,
1205        "Asked to remove %u bytes under key `%s'\n",
1206        size,
1207        GNUNET_h2s (key));
1208   env = GNUNET_MQ_msg_extra (dm,
1209                              size,
1210                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1211   dm->size = htonl (size);
1212   dm->key = *key;
1213   GNUNET_memcpy (&dm[1],
1214           data,
1215           size);
1216
1217   qc.sc.cont = cont;
1218   qc.sc.cont_cls = cont_cls;
1219
1220   qe = make_queue_entry (h,
1221                          env,
1222                          queue_priority,
1223                          max_queue_size,
1224                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1225                          &qc);
1226   if (NULL == qe)
1227   {
1228     LOG (GNUNET_ERROR_TYPE_DEBUG,
1229          "Could not create queue entry for REMOVE\n");
1230     return NULL;
1231   }
1232   GNUNET_STATISTICS_update (h->stats,
1233                             gettext_noop ("# REMOVE requests executed"),
1234                             1,
1235                             GNUNET_NO);
1236   process_queue (h);
1237   return qe;
1238 }
1239
1240
1241
1242 /**
1243  * Get a random value from the datastore for content replication.
1244  * Returns a single, random value among those with the highest
1245  * replication score, lowering positive replication scores by one for
1246  * the chosen value (if only content with a replication score exists,
1247  * a random value is returned and replication scores are not changed).
1248  *
1249  * @param h handle to the datastore
1250  * @param queue_priority ranking of this request in the priority queue
1251  * @param max_queue_size at what queue size should this request be dropped
1252  *        (if other requests of higher priority are in the queue)
1253  * @param proc function to call on a random value; it
1254  *        will be called once with a value (if available)
1255  *        and always once with a value of NULL.
1256  * @param proc_cls closure for @a proc
1257  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1258  *         cancel
1259  */
1260 struct GNUNET_DATASTORE_QueueEntry *
1261 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1262                                       unsigned int queue_priority,
1263                                       unsigned int max_queue_size,
1264                                       GNUNET_DATASTORE_DatumProcessor proc,
1265                                       void *proc_cls)
1266 {
1267   struct GNUNET_DATASTORE_QueueEntry *qe;
1268   struct GNUNET_MQ_Envelope *env;
1269   struct GNUNET_MessageHeader *m;
1270   union QueueContext qc;
1271
1272   GNUNET_assert (NULL != proc);
1273   LOG (GNUNET_ERROR_TYPE_DEBUG,
1274        "Asked to get replication entry\n");
1275   env = GNUNET_MQ_msg (m,
1276                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1277   qc.rc.proc = proc;
1278   qc.rc.proc_cls = proc_cls;
1279   qe = make_queue_entry (h,
1280                          env,
1281                          queue_priority,
1282                          max_queue_size,
1283                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1284                          &qc);
1285   if (NULL == qe)
1286   {
1287     LOG (GNUNET_ERROR_TYPE_DEBUG,
1288          "Could not create queue entry for GET REPLICATION\n");
1289     return NULL;
1290   }
1291   GNUNET_STATISTICS_update (h->stats,
1292                             gettext_noop
1293                             ("# GET REPLICATION requests executed"), 1,
1294                             GNUNET_NO);
1295   process_queue (h);
1296   return qe;
1297 }
1298
1299
1300 /**
1301  * Get a single zero-anonymity value from the datastore.
1302  *
1303  * @param h handle to the datastore
1304  * @param next_uid return the result with lowest uid >= next_uid
1305  * @param queue_priority ranking of this request in the priority queue
1306  * @param max_queue_size at what queue size should this request be dropped
1307  *        (if other requests of higher priority are in the queue)
1308  * @param type allowed type for the operation (never zero)
1309  * @param proc function to call on a random value; it
1310  *        will be called once with a value (if available)
1311  *        or with NULL if none value exists.
1312  * @param proc_cls closure for @a proc
1313  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1314  *         cancel
1315  */
1316 struct GNUNET_DATASTORE_QueueEntry *
1317 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1318                                      uint64_t next_uid,
1319                                      unsigned int queue_priority,
1320                                      unsigned int max_queue_size,
1321                                      enum GNUNET_BLOCK_Type type,
1322                                      GNUNET_DATASTORE_DatumProcessor proc,
1323                                      void *proc_cls)
1324 {
1325   struct GNUNET_DATASTORE_QueueEntry *qe;
1326   struct GNUNET_MQ_Envelope *env;
1327   struct GetZeroAnonymityMessage *m;
1328   union QueueContext qc;
1329
1330   GNUNET_assert (NULL != proc);
1331   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1332   LOG (GNUNET_ERROR_TYPE_DEBUG,
1333        "Asked to get a zero-anonymity entry of type %d\n",
1334        type);
1335   env = GNUNET_MQ_msg (m,
1336                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1337   m->type = htonl ((uint32_t) type);
1338   m->next_uid = GNUNET_htonll (next_uid);
1339   qc.rc.proc = proc;
1340   qc.rc.proc_cls = proc_cls;
1341   qe = make_queue_entry (h,
1342                          env,
1343                          queue_priority,
1344                          max_queue_size,
1345                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1346                          &qc);
1347   if (NULL == qe)
1348   {
1349     LOG (GNUNET_ERROR_TYPE_DEBUG,
1350          "Could not create queue entry for zero-anonymity procation\n");
1351     return NULL;
1352   }
1353   GNUNET_STATISTICS_update (h->stats,
1354                             gettext_noop
1355                             ("# GET ZERO ANONYMITY requests executed"), 1,
1356                             GNUNET_NO);
1357   process_queue (h);
1358   return qe;
1359 }
1360
1361
1362 /**
1363  * Get a result for a particular key from the datastore.  The processor
1364  * will only be called once.
1365  *
1366  * @param h handle to the datastore
1367  * @param next_uid return the result with lowest uid >= next_uid
1368  * @param random if true, return a random result instead of using next_uid
1369  * @param key maybe NULL (to match all entries)
1370  * @param type desired type, 0 for any
1371  * @param queue_priority ranking of this request in the priority queue
1372  * @param max_queue_size at what queue size should this request be dropped
1373  *        (if other requests of higher priority are in the queue)
1374  * @param proc function to call on each matching value;
1375  *        will be called once with a NULL value at the end
1376  * @param proc_cls closure for @a proc
1377  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1378  *         cancel
1379  */
1380 struct GNUNET_DATASTORE_QueueEntry *
1381 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1382                           uint64_t next_uid,
1383                           bool random,
1384                           const struct GNUNET_HashCode *key,
1385                           enum GNUNET_BLOCK_Type type,
1386                           unsigned int queue_priority,
1387                           unsigned int max_queue_size,
1388                           GNUNET_DATASTORE_DatumProcessor proc,
1389                           void *proc_cls)
1390 {
1391   struct GNUNET_DATASTORE_QueueEntry *qe;
1392   struct GNUNET_MQ_Envelope *env;
1393   struct GetKeyMessage *gkm;
1394   struct GetMessage *gm;
1395   union QueueContext qc;
1396
1397   GNUNET_assert (NULL != proc);
1398   LOG (GNUNET_ERROR_TYPE_DEBUG,
1399        "Asked to look for data of type %u under key `%s'\n",
1400        (unsigned int) type,
1401        GNUNET_h2s (key));
1402   if (NULL == key)
1403   {
1404     env = GNUNET_MQ_msg (gm,
1405                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1406     gm->type = htonl (type);
1407     gm->next_uid = GNUNET_htonll (next_uid);
1408     gm->random = random;
1409   }
1410   else
1411   {
1412     env = GNUNET_MQ_msg (gkm,
1413                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1414     gkm->type = htonl (type);
1415     gkm->next_uid = GNUNET_htonll (next_uid);
1416     gkm->random = random;
1417     gkm->key = *key;
1418   }
1419   qc.rc.proc = proc;
1420   qc.rc.proc_cls = proc_cls;
1421   qe = make_queue_entry (h,
1422                          env,
1423                          queue_priority,
1424                          max_queue_size,
1425                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1426                          &qc);
1427   if (NULL == qe)
1428   {
1429     LOG (GNUNET_ERROR_TYPE_DEBUG,
1430          "Could not queue request for `%s'\n",
1431          GNUNET_h2s (key));
1432     return NULL;
1433   }
1434 #if INSANE_STATISTICS
1435   GNUNET_STATISTICS_update (h->stats,
1436                             gettext_noop ("# GET requests executed"),
1437                             1,
1438                             GNUNET_NO);
1439 #endif
1440   process_queue (h);
1441   return qe;
1442 }
1443
1444
1445 /**
1446  * Cancel a datastore operation.  The final callback from the
1447  * operation must not have been done yet.
1448  *
1449  * @param qe operation to cancel
1450  */
1451 void
1452 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1453 {
1454   struct GNUNET_DATASTORE_Handle *h = qe->h;
1455
1456   LOG (GNUNET_ERROR_TYPE_DEBUG,
1457        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1458        qe,
1459        NULL == qe->env,
1460        h->queue_head == qe);
1461   if (NULL == qe->env)
1462   {
1463     free_queue_entry (qe);
1464     h->skip_next_messages++;
1465     return;
1466   }
1467   free_queue_entry (qe);
1468   process_queue (h);
1469 }
1470
1471
1472 /* end of datastore_api.c */