tighten formatting rules
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
38 /**
39  * Collect an instane number of statistics?  May cause excessive IPC.
40  */
41 #define INSANE_STATISTICS GNUNET_NO
42
43 /**
44  * If a client stopped asking for more results, how many more do
45  * we receive from the DB before killing the connection?  Trade-off
46  * between re-doing TCP handshakes and (needlessly) receiving
47  * useless results.
48  */
49 #define MAX_EXCESS_RESULTS 8
50
51 /**
52  * Context for processing status messages.
53  */
54 struct StatusContext
55 {
56   /**
57    * Continuation to call with the status.
58    */
59   GNUNET_DATASTORE_ContinuationWithStatus cont;
60
61   /**
62    * Closure for @e cont.
63    */
64   void *cont_cls;
65 };
66
67
68 /**
69  * Context for processing result messages.
70  */
71 struct ResultContext
72 {
73   /**
74    * Function to call with the result.
75    */
76   GNUNET_DATASTORE_DatumProcessor proc;
77
78   /**
79    * Closure for @e proc.
80    */
81   void *proc_cls;
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90   struct StatusContext sc;
91
92   struct ResultContext rc;
93 };
94
95
96 /**
97  * Entry in our priority queue.
98  */
99 struct GNUNET_DATASTORE_QueueEntry
100 {
101   /**
102    * This is a linked list.
103    */
104   struct GNUNET_DATASTORE_QueueEntry *next;
105
106   /**
107    * This is a linked list.
108    */
109   struct GNUNET_DATASTORE_QueueEntry *prev;
110
111   /**
112    * Handle to the master context.
113    */
114   struct GNUNET_DATASTORE_Handle *h;
115
116   /**
117    * Function to call after transmission of the request.
118    */
119   GNUNET_DATASTORE_ContinuationWithStatus cont;
120
121   /**
122    * Closure for @e cont.
123    */
124   void *cont_cls;
125
126   /**
127    * Context for the operation.
128    */
129   union QueueContext qc;
130
131   /**
132    * Envelope of the request to transmit, NULL after
133    * transmission.
134    */
135   struct GNUNET_MQ_Envelope *env;
136
137   /**
138    * Task we run if this entry stalls the queue and we
139    * need to warn the user.
140    */
141   struct GNUNET_SCHEDULER_Task *delay_warn_task;
142
143   /**
144    * Priority in the queue.
145    */
146   unsigned int priority;
147
148   /**
149    * Maximum allowed length of queue (otherwise
150    * this request should be discarded).
151    */
152   unsigned int max_queue;
153
154   /**
155    * Expected response type.
156    */
157   uint16_t response_type;
158 };
159
160
161 /**
162  * Handle to the datastore service.
163  */
164 struct GNUNET_DATASTORE_Handle
165 {
166   /**
167    * Our configuration.
168    */
169   const struct GNUNET_CONFIGURATION_Handle *cfg;
170
171   /**
172    * Current connection to the datastore service.
173    */
174   struct GNUNET_MQ_Handle *mq;
175
176   /**
177    * Handle for statistics.
178    */
179   struct GNUNET_STATISTICS_Handle *stats;
180
181   /**
182    * Current head of priority queue.
183    */
184   struct GNUNET_DATASTORE_QueueEntry *queue_head;
185
186   /**
187    * Current tail of priority queue.
188    */
189   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
190
191   /**
192    * Task for trying to reconnect.
193    */
194   struct GNUNET_SCHEDULER_Task *reconnect_task;
195
196   /**
197    * How quickly should we retry?  Used for exponential back-off on
198    * connect-errors.
199    */
200   struct GNUNET_TIME_Relative retry_time;
201
202   /**
203    * Number of entries in the queue.
204    */
205   unsigned int queue_size;
206
207   /**
208    * Number of results we're receiving for the current query
209    * after application stopped to care.  Used to determine when
210    * to reset the connection.
211    */
212   unsigned int result_count;
213
214   /**
215    * We should ignore the next message(s) from the service.
216    */
217   unsigned int skip_next_messages;
218 };
219
220
221 /**
222  * Try reconnecting to the datastore service.
223  *
224  * @param cls the `struct GNUNET_DATASTORE_Handle`
225  */
226 static void
227 try_reconnect (void *cls);
228
229
230 /**
231  * Disconnect from the service and then try reconnecting to the datastore service
232  * after some delay.
233  *
234  * @param h handle to datastore to disconnect and reconnect
235  */
236 static void
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238 {
239   if (NULL == h->mq)
240   {
241     GNUNET_break (0);
242     return;
243   }
244   GNUNET_MQ_destroy (h->mq);
245   h->mq = NULL;
246   h->skip_next_messages = 0;
247   h->reconnect_task
248     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249                                     &try_reconnect,
250                                     h);
251 }
252
253
254 /**
255  * Free a queue entry.  Removes the given entry from the
256  * queue and releases associated resources.  Does NOT
257  * call the callback.
258  *
259  * @param qe entry to free.
260  */
261 static void
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263 {
264   struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266   GNUNET_CONTAINER_DLL_remove (h->queue_head,
267                                h->queue_tail,
268                                qe);
269   h->queue_size--;
270   if (NULL != qe->env)
271     GNUNET_MQ_discard (qe->env);
272   if (NULL != qe->delay_warn_task)
273     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
274   GNUNET_free (qe);
275 }
276
277
278 /**
279  * Task that logs an error after some time.
280  *
281  * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
282  */
283 static void
284 delay_warning (void *cls)
285 {
286   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
287
288   qe->delay_warn_task = NULL;
289   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
290               "Request %p of type %u at head of datastore queue for more than %s\n",
291               qe,
292               (unsigned int) qe->response_type,
293               GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
294                                                       GNUNET_YES));
295   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
296                                                       &delay_warning,
297                                                       qe);
298 }
299
300
301 /**
302  * Handle error in sending drop request to datastore.
303  *
304  * @param cls closure with the datastore handle
305  * @param error error code
306  */
307 static void
308 mq_error_handler (void *cls,
309                   enum GNUNET_MQ_Error error)
310 {
311   struct GNUNET_DATASTORE_Handle *h = cls;
312   struct GNUNET_DATASTORE_QueueEntry *qe;
313
314   LOG (GNUNET_ERROR_TYPE_DEBUG,
315        "MQ error, reconnecting to DATASTORE\n");
316   do_disconnect (h);
317   qe = h->queue_head;
318   if (NULL == qe)
319     return;
320   if (NULL != qe->delay_warn_task)
321   {
322     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
323     qe->delay_warn_task = NULL;
324   }
325   if (NULL == qe->env)
326   {
327     union QueueContext qc = qe->qc;
328     uint16_t rt = qe->response_type;
329
330     LOG (GNUNET_ERROR_TYPE_DEBUG,
331          "Failed to receive response from database.\n");
332     free_queue_entry (qe);
333     switch (rt)
334     {
335     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
336       if (NULL != qc.sc.cont)
337         qc.sc.cont (qc.sc.cont_cls,
338                     GNUNET_SYSERR,
339                     GNUNET_TIME_UNIT_ZERO_ABS,
340                     _ ("DATASTORE disconnected"));
341       break;
342
343     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
344       if (NULL != qc.rc.proc)
345         qc.rc.proc (qc.rc.proc_cls,
346                     NULL,
347                     0,
348                     NULL,
349                     0,
350                     0,
351                     0,
352                     0,
353                     GNUNET_TIME_UNIT_ZERO_ABS,
354                     0);
355       break;
356
357     default:
358       GNUNET_break (0);
359     }
360   }
361 }
362
363
364 /**
365  * Connect to the datastore service.
366  *
367  * @param cfg configuration to use
368  * @return handle to use to access the service
369  */
370 struct GNUNET_DATASTORE_Handle *
371 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
372 {
373   struct GNUNET_DATASTORE_Handle *h;
374
375   LOG (GNUNET_ERROR_TYPE_DEBUG,
376        "Establishing DATASTORE connection!\n");
377   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
378   h->cfg = cfg;
379   try_reconnect (h);
380   if (NULL == h->mq)
381   {
382     GNUNET_free (h);
383     return NULL;
384   }
385   h->stats = GNUNET_STATISTICS_create ("datastore-api",
386                                        cfg);
387   return h;
388 }
389
390
391 /**
392  * Task used by to disconnect from the datastore after
393  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
394  *
395  * @param cls the datastore handle
396  */
397 static void
398 disconnect_after_drop (void *cls)
399 {
400   struct GNUNET_DATASTORE_Handle *h = cls;
401
402   LOG (GNUNET_ERROR_TYPE_DEBUG,
403        "Drop sent, disconnecting\n");
404   GNUNET_DATASTORE_disconnect (h,
405                                GNUNET_NO);
406 }
407
408
409 /**
410  * Handle error in sending drop request to datastore.
411  *
412  * @param cls closure with the datastore handle
413  * @param error error code
414  */
415 static void
416 disconnect_on_mq_error (void *cls,
417                         enum GNUNET_MQ_Error error)
418 {
419   struct GNUNET_DATASTORE_Handle *h = cls;
420
421   LOG (GNUNET_ERROR_TYPE_ERROR,
422        "Failed to ask datastore to drop tables\n");
423   GNUNET_DATASTORE_disconnect (h,
424                                GNUNET_NO);
425 }
426
427
428 /**
429  * Disconnect from the datastore service (and free
430  * associated resources).
431  *
432  * @param h handle to the datastore
433  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
434  */
435 void
436 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
437                              int drop)
438 {
439   struct GNUNET_DATASTORE_QueueEntry *qe;
440
441   LOG (GNUNET_ERROR_TYPE_DEBUG,
442        "Datastore disconnect\n");
443   if (NULL != h->mq)
444   {
445     GNUNET_MQ_destroy (h->mq);
446     h->mq = NULL;
447   }
448   if (NULL != h->reconnect_task)
449   {
450     GNUNET_SCHEDULER_cancel (h->reconnect_task);
451     h->reconnect_task = NULL;
452   }
453   while (NULL != (qe = h->queue_head))
454   {
455     switch (qe->response_type)
456     {
457     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
458       if (NULL != qe->qc.sc.cont)
459         qe->qc.sc.cont (qe->qc.sc.cont_cls,
460                         GNUNET_SYSERR,
461                         GNUNET_TIME_UNIT_ZERO_ABS,
462                         _ ("Disconnected from DATASTORE"));
463       break;
464
465     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
466       if (NULL != qe->qc.rc.proc)
467         qe->qc.rc.proc (qe->qc.rc.proc_cls,
468                         NULL,
469                         0,
470                         NULL,
471                         0,
472                         0,
473                         0,
474                         0,
475                         GNUNET_TIME_UNIT_ZERO_ABS,
476                         0);
477       break;
478
479     default:
480       GNUNET_break (0);
481     }
482     free_queue_entry (qe);
483   }
484   if (GNUNET_YES == drop)
485   {
486     LOG (GNUNET_ERROR_TYPE_DEBUG,
487          "Re-connecting to issue DROP!\n");
488     GNUNET_assert (NULL == h->mq);
489     h->mq = GNUNET_CLIENT_connect (h->cfg,
490                                    "datastore",
491                                    NULL,
492                                    &disconnect_on_mq_error,
493                                    h);
494     if (NULL != h->mq)
495     {
496       struct GNUNET_MessageHeader *hdr;
497       struct GNUNET_MQ_Envelope *env;
498
499       env = GNUNET_MQ_msg (hdr,
500                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
501       GNUNET_MQ_notify_sent (env,
502                              &disconnect_after_drop,
503                              h);
504       GNUNET_MQ_send (h->mq,
505                       env);
506       return;
507     }
508     GNUNET_break (0);
509   }
510   GNUNET_STATISTICS_destroy (h->stats,
511                              GNUNET_NO);
512   h->stats = NULL;
513   GNUNET_free (h);
514 }
515
516
517 /**
518  * Create a new entry for our priority queue (and possibly discard other entires if
519  * the queue is getting too long).
520  *
521  * @param h handle to the datastore
522  * @param env envelope with the message to queue
523  * @param queue_priority priority of the entry
524  * @param max_queue_size at what queue size should this request be dropped
525  *        (if other requests of higher priority are in the queue)
526  * @param expected_type which type of response do we expect,
527  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
528  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
529  * @param qc client context (NOT a closure for @a response_proc)
530  * @return NULL if the queue is full
531  */
532 static struct GNUNET_DATASTORE_QueueEntry *
533 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
534                   struct GNUNET_MQ_Envelope *env,
535                   unsigned int queue_priority,
536                   unsigned int max_queue_size,
537                   uint16_t expected_type,
538                   const union QueueContext *qc)
539 {
540   struct GNUNET_DATASTORE_QueueEntry *qe;
541   struct GNUNET_DATASTORE_QueueEntry *pos;
542   unsigned int c;
543
544   if ((NULL != h->queue_tail) &&
545       (h->queue_tail->priority >= queue_priority))
546   {
547     c = h->queue_size;
548     pos = NULL;
549   }
550   else
551   {
552     c = 0;
553     pos = h->queue_head;
554   }
555   while ((NULL != pos) &&
556          (c < max_queue_size) &&
557          (pos->priority >= queue_priority))
558   {
559     c++;
560     pos = pos->next;
561   }
562   if (c >= max_queue_size)
563   {
564     GNUNET_STATISTICS_update (h->stats,
565                               gettext_noop ("# queue overflows"),
566                               1,
567                               GNUNET_NO);
568     GNUNET_MQ_discard (env);
569     return NULL;
570   }
571   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
572   qe->h = h;
573   qe->env = env;
574   qe->response_type = expected_type;
575   qe->qc = *qc;
576   qe->priority = queue_priority;
577   qe->max_queue = max_queue_size;
578   if (NULL == pos)
579   {
580     /* append at the tail */
581     pos = h->queue_tail;
582   }
583   else
584   {
585     pos = pos->prev;
586     /* do not insert at HEAD if HEAD query was already
587      * transmitted and we are still receiving replies! */
588     if ((NULL == pos) &&
589         (NULL == h->queue_head->env))
590       pos = h->queue_head;
591   }
592   c++;
593 #if INSANE_STATISTICS
594   GNUNET_STATISTICS_update (h->stats,
595                             gettext_noop ("# queue entries created"),
596                             1,
597                             GNUNET_NO);
598 #endif
599   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
600                                      h->queue_tail,
601                                      pos,
602                                      qe);
603   h->queue_size++;
604   return qe;
605 }
606
607
608 /**
609  * Process entries in the queue (or do nothing if we are already
610  * doing so).
611  *
612  * @param h handle to the datastore
613  */
614 static void
615 process_queue (struct GNUNET_DATASTORE_Handle *h)
616 {
617   struct GNUNET_DATASTORE_QueueEntry *qe;
618
619   if (NULL == (qe = h->queue_head))
620   {
621     /* no entry in queue */
622     LOG (GNUNET_ERROR_TYPE_DEBUG,
623          "Queue empty\n");
624     return;
625   }
626   if (NULL == qe->env)
627   {
628     /* waiting for replies */
629     LOG (GNUNET_ERROR_TYPE_DEBUG,
630          "Head request already transmitted\n");
631     return;
632   }
633   if (NULL == h->mq)
634   {
635     /* waiting for reconnect */
636     LOG (GNUNET_ERROR_TYPE_DEBUG,
637          "Not connected\n");
638     return;
639   }
640   GNUNET_assert (NULL == qe->delay_warn_task);
641   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
642                                                       &delay_warning,
643                                                       qe);
644   GNUNET_MQ_send (h->mq,
645                   qe->env);
646   qe->env = NULL;
647 }
648
649
650 /**
651  * Get the entry at the head of the message queue.
652  *
653  * @param h handle to the datastore
654  * @param response_type the expected response type
655  * @return the queue entry
656  */
657 static struct GNUNET_DATASTORE_QueueEntry *
658 get_queue_head (struct GNUNET_DATASTORE_Handle *h,
659                 uint16_t response_type)
660 {
661   struct GNUNET_DATASTORE_QueueEntry *qe;
662
663   if (h->skip_next_messages > 0)
664   {
665     h->skip_next_messages--;
666     process_queue (h);
667     return NULL;
668   }
669   qe = h->queue_head;
670   if (NULL == qe)
671   {
672     GNUNET_break (0);
673     do_disconnect (h);
674     return NULL;
675   }
676   if (NULL != qe->env)
677   {
678     GNUNET_break (0);
679     do_disconnect (h);
680     return NULL;
681   }
682   if (response_type != qe->response_type)
683   {
684     GNUNET_break (0);
685     do_disconnect (h);
686     return NULL;
687   }
688   return qe;
689 }
690
691
692 /**
693  * Function called to check status message from the service.
694  *
695  * @param cls closure
696  * @param sm status message received
697  * @return #GNUNET_OK if the message is well-formed
698  */
699 static int
700 check_status (void *cls,
701               const struct StatusMessage *sm)
702 {
703   uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
704   int32_t status = ntohl (sm->status);
705
706   if (msize > 0)
707   {
708     const char *emsg = (const char *) &sm[1];
709
710     if ('\0' != emsg[msize - 1])
711     {
712       GNUNET_break (0);
713       return GNUNET_SYSERR;
714     }
715   }
716   else if (GNUNET_SYSERR == status)
717   {
718     GNUNET_break (0);
719     return GNUNET_SYSERR;
720   }
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Function called to handle status message from the service.
727  *
728  * @param cls closure
729  * @param sm status message received
730  */
731 static void
732 handle_status (void *cls,
733                const struct StatusMessage *sm)
734 {
735   struct GNUNET_DATASTORE_Handle *h = cls;
736   struct GNUNET_DATASTORE_QueueEntry *qe;
737   struct StatusContext rc;
738   const char *emsg;
739   int32_t status = ntohl (sm->status);
740
741   qe = get_queue_head (h,
742                        GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
743   if (NULL == qe)
744     return;
745   rc = qe->qc.sc;
746   free_queue_entry (qe);
747   if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
748     emsg = (const char *) &sm[1];
749   else
750     emsg = NULL;
751   LOG (GNUNET_ERROR_TYPE_DEBUG,
752        "Received status %d/%s\n",
753        (int) status,
754        emsg);
755   GNUNET_STATISTICS_update (h->stats,
756                             gettext_noop ("# status messages received"),
757                             1,
758                             GNUNET_NO);
759   h->retry_time = GNUNET_TIME_UNIT_ZERO;
760   process_queue (h);
761   if (NULL != rc.cont)
762     rc.cont (rc.cont_cls,
763              status,
764              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
765              emsg);
766 }
767
768
769 /**
770  * Check data message we received from the service.
771  *
772  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
773  * @param dm message received
774  */
775 static int
776 check_data (void *cls,
777             const struct DataMessage *dm)
778 {
779   uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
780
781   if (msize != ntohl (dm->size))
782   {
783     GNUNET_break (0);
784     return GNUNET_SYSERR;
785   }
786   return GNUNET_OK;
787 }
788
789
790 /**
791  * Handle data message we got from the service.
792  *
793  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
794  * @param dm message received
795  */
796 static void
797 handle_data (void *cls,
798              const struct DataMessage *dm)
799 {
800   struct GNUNET_DATASTORE_Handle *h = cls;
801   struct GNUNET_DATASTORE_QueueEntry *qe;
802   struct ResultContext rc;
803
804   qe = get_queue_head (h,
805                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
806   if (NULL == qe)
807     return;
808 #if INSANE_STATISTICS
809   GNUNET_STATISTICS_update (h->stats,
810                             gettext_noop ("# Results received"),
811                             1,
812                             GNUNET_NO);
813 #endif
814   LOG (GNUNET_ERROR_TYPE_DEBUG,
815        "Received result %llu with type %u and size %u with key %s\n",
816        (unsigned long long) GNUNET_ntohll (dm->uid),
817        ntohl (dm->type),
818        ntohl (dm->size),
819        GNUNET_h2s (&dm->key));
820   rc = qe->qc.rc;
821   free_queue_entry (qe);
822   h->retry_time = GNUNET_TIME_UNIT_ZERO;
823   process_queue (h);
824   if (NULL != rc.proc)
825     rc.proc (rc.proc_cls,
826              &dm->key,
827              ntohl (dm->size),
828              &dm[1],
829              ntohl (dm->type),
830              ntohl (dm->priority),
831              ntohl (dm->anonymity),
832              ntohl (dm->replication),
833              GNUNET_TIME_absolute_ntoh (dm->expiration),
834              GNUNET_ntohll (dm->uid));
835 }
836
837
838 /**
839  * Type of a function to call when we receive a
840  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
841  *
842  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
843  * @param msg message received
844  */
845 static void
846 handle_data_end (void *cls,
847                  const struct GNUNET_MessageHeader *msg)
848 {
849   struct GNUNET_DATASTORE_Handle *h = cls;
850   struct GNUNET_DATASTORE_QueueEntry *qe;
851   struct ResultContext rc;
852
853   qe = get_queue_head (h,
854                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
855   if (NULL == qe)
856     return;
857   rc = qe->qc.rc;
858   free_queue_entry (qe);
859   LOG (GNUNET_ERROR_TYPE_DEBUG,
860        "Received end of result set, new queue size is %u\n",
861        h->queue_size);
862   h->retry_time = GNUNET_TIME_UNIT_ZERO;
863   h->result_count = 0;
864   process_queue (h);
865   /* signal end of iteration */
866   if (NULL != rc.proc)
867     rc.proc (rc.proc_cls,
868              NULL,
869              0,
870              NULL,
871              0,
872              0,
873              0,
874              0,
875              GNUNET_TIME_UNIT_ZERO_ABS,
876              0);
877 }
878
879
880 /**
881  * Try reconnecting to the datastore service.
882  *
883  * @param cls the `struct GNUNET_DATASTORE_Handle`
884  */
885 static void
886 try_reconnect (void *cls)
887 {
888   struct GNUNET_DATASTORE_Handle *h = cls;
889   struct GNUNET_MQ_MessageHandler handlers[] = {
890     GNUNET_MQ_hd_var_size (status,
891                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
892                            struct StatusMessage,
893                            h),
894     GNUNET_MQ_hd_var_size (data,
895                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
896                            struct DataMessage,
897                            h),
898     GNUNET_MQ_hd_fixed_size (data_end,
899                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
900                              struct GNUNET_MessageHeader,
901                              h),
902     GNUNET_MQ_handler_end ()
903   };
904
905   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
906   h->reconnect_task = NULL;
907   GNUNET_assert (NULL == h->mq);
908   h->mq = GNUNET_CLIENT_connect (h->cfg,
909                                  "datastore",
910                                  handlers,
911                                  &mq_error_handler,
912                                  h);
913   if (NULL == h->mq)
914     return;
915   GNUNET_STATISTICS_update (h->stats,
916                             gettext_noop (
917                               "# datastore connections (re)created"),
918                             1,
919                             GNUNET_NO);
920   LOG (GNUNET_ERROR_TYPE_DEBUG,
921        "Reconnected to DATASTORE\n");
922   process_queue (h);
923 }
924
925
926 /**
927  * Dummy continuation used to do nothing (but be non-zero).
928  *
929  * @param cls closure
930  * @param result result
931  * @param min_expiration expiration time
932  * @param emsg error message
933  */
934 static void
935 drop_status_cont (void *cls,
936                   int32_t result,
937                   struct GNUNET_TIME_Absolute min_expiration,
938                   const char *emsg)
939 {
940   /* do nothing */
941 }
942
943
944 /**
945  * Store an item in the datastore.  If the item is already present,
946  * the priorities are summed up and the higher expiration time and
947  * lower anonymity level is used.
948  *
949  * @param h handle to the datastore
950  * @param rid reservation ID to use (from "reserve"); use 0 if no
951  *            prior reservation was made
952  * @param key key for the value
953  * @param size number of bytes in data
954  * @param data content stored
955  * @param type type of the content
956  * @param priority priority of the content
957  * @param anonymity anonymity-level for the content
958  * @param replication how often should the content be replicated to other peers?
959  * @param expiration expiration time for the content
960  * @param queue_priority ranking of this request in the priority queue
961  * @param max_queue_size at what queue size should this request be dropped
962  *        (if other requests of higher priority are in the queue)
963  * @param cont continuation to call when done
964  * @param cont_cls closure for @a cont
965  * @return NULL if the entry was not queued, otherwise a handle that can be used to
966  *         cancel; note that even if NULL is returned, the callback will be invoked
967  *         (or rather, will already have been invoked)
968  */
969 struct GNUNET_DATASTORE_QueueEntry *
970 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
971                       uint32_t rid,
972                       const struct GNUNET_HashCode *key,
973                       size_t size,
974                       const void *data,
975                       enum GNUNET_BLOCK_Type type,
976                       uint32_t priority,
977                       uint32_t anonymity,
978                       uint32_t replication,
979                       struct GNUNET_TIME_Absolute expiration,
980                       unsigned int queue_priority,
981                       unsigned int max_queue_size,
982                       GNUNET_DATASTORE_ContinuationWithStatus cont,
983                       void *cont_cls)
984 {
985   struct GNUNET_DATASTORE_QueueEntry *qe;
986   struct GNUNET_MQ_Envelope *env;
987   struct DataMessage *dm;
988   union QueueContext qc;
989
990   if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
991   {
992     GNUNET_break (0);
993     return NULL;
994   }
995
996   LOG (GNUNET_ERROR_TYPE_DEBUG,
997        "Asked to put %u bytes of data under key `%s' for %s\n",
998        size,
999        GNUNET_h2s (key),
1000        GNUNET_STRINGS_relative_time_to_string (
1001          GNUNET_TIME_absolute_get_remaining (expiration),
1002          GNUNET_YES));
1003   env = GNUNET_MQ_msg_extra (dm,
1004                              size,
1005                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1006   dm->rid = htonl (rid);
1007   dm->size = htonl ((uint32_t) size);
1008   dm->type = htonl (type);
1009   dm->priority = htonl (priority);
1010   dm->anonymity = htonl (anonymity);
1011   dm->replication = htonl (replication);
1012   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1013   dm->key = *key;
1014   GNUNET_memcpy (&dm[1],
1015                  data,
1016                  size);
1017   qc.sc.cont = cont;
1018   qc.sc.cont_cls = cont_cls;
1019   qe = make_queue_entry (h,
1020                          env,
1021                          queue_priority,
1022                          max_queue_size,
1023                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1024                          &qc);
1025   if (NULL == qe)
1026   {
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Could not create queue entry for PUT\n");
1029     return NULL;
1030   }
1031   GNUNET_STATISTICS_update (h->stats,
1032                             gettext_noop ("# PUT requests executed"),
1033                             1,
1034                             GNUNET_NO);
1035   process_queue (h);
1036   return qe;
1037 }
1038
1039
1040 /**
1041  * Reserve space in the datastore.  This function should be used
1042  * to avoid "out of space" failures during a longer sequence of "put"
1043  * operations (for example, when a file is being inserted).
1044  *
1045  * @param h handle to the datastore
1046  * @param amount how much space (in bytes) should be reserved (for content only)
1047  * @param entries how many entries will be created (to calculate per-entry overhead)
1048  * @param cont continuation to call when done; "success" will be set to
1049  *             a positive reservation value if space could be reserved.
1050  * @param cont_cls closure for @a cont
1051  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1052  *         cancel; note that even if NULL is returned, the callback will be invoked
1053  *         (or rather, will already have been invoked)
1054  */
1055 struct GNUNET_DATASTORE_QueueEntry *
1056 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1057                           uint64_t amount,
1058                           uint32_t entries,
1059                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1060                           void *cont_cls)
1061 {
1062   struct GNUNET_DATASTORE_QueueEntry *qe;
1063   struct GNUNET_MQ_Envelope *env;
1064   struct ReserveMessage *rm;
1065   union QueueContext qc;
1066
1067   if (NULL == cont)
1068     cont = &drop_status_cont;
1069   LOG (GNUNET_ERROR_TYPE_DEBUG,
1070        "Asked to reserve %llu bytes of data and %u entries\n",
1071        (unsigned long long) amount,
1072        (unsigned int) entries);
1073   env = GNUNET_MQ_msg (rm,
1074                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1075   rm->entries = htonl (entries);
1076   rm->amount = GNUNET_htonll (amount);
1077
1078   qc.sc.cont = cont;
1079   qc.sc.cont_cls = cont_cls;
1080   qe = make_queue_entry (h,
1081                          env,
1082                          UINT_MAX,
1083                          UINT_MAX,
1084                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1085                          &qc);
1086   if (NULL == qe)
1087   {
1088     LOG (GNUNET_ERROR_TYPE_DEBUG,
1089          "Could not create queue entry to reserve\n");
1090     return NULL;
1091   }
1092   GNUNET_STATISTICS_update (h->stats,
1093                             gettext_noop ("# RESERVE requests executed"),
1094                             1,
1095                             GNUNET_NO);
1096   process_queue (h);
1097   return qe;
1098 }
1099
1100
1101 /**
1102  * Signal that all of the data for which a reservation was made has
1103  * been stored and that whatever excess space might have been reserved
1104  * can now be released.
1105  *
1106  * @param h handle to the datastore
1107  * @param rid reservation ID (value of "success" in original continuation
1108  *        from the "reserve" function).
1109  * @param queue_priority ranking of this request in the priority queue
1110  * @param max_queue_size at what queue size should this request be dropped
1111  *        (if other requests of higher priority are in the queue)
1112  * @param queue_priority ranking of this request in the priority queue
1113  * @param max_queue_size at what queue size should this request be dropped
1114  *        (if other requests of higher priority are in the queue)
1115  * @param cont continuation to call when done
1116  * @param cont_cls closure for @a cont
1117  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1118  *         cancel; note that even if NULL is returned, the callback will be invoked
1119  *         (or rather, will already have been invoked)
1120  */
1121 struct GNUNET_DATASTORE_QueueEntry *
1122 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1123                                   uint32_t rid,
1124                                   unsigned int queue_priority,
1125                                   unsigned int max_queue_size,
1126                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1127                                   void *cont_cls)
1128 {
1129   struct GNUNET_DATASTORE_QueueEntry *qe;
1130   struct GNUNET_MQ_Envelope *env;
1131   struct ReleaseReserveMessage *rrm;
1132   union QueueContext qc;
1133
1134   if (NULL == cont)
1135     cont = &drop_status_cont;
1136   LOG (GNUNET_ERROR_TYPE_DEBUG,
1137        "Asked to release reserve %d\n",
1138        rid);
1139   env = GNUNET_MQ_msg (rrm,
1140                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1141   rrm->rid = htonl (rid);
1142   qc.sc.cont = cont;
1143   qc.sc.cont_cls = cont_cls;
1144   qe = make_queue_entry (h,
1145                          env,
1146                          queue_priority,
1147                          max_queue_size,
1148                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1149                          &qc);
1150   if (NULL == qe)
1151   {
1152     LOG (GNUNET_ERROR_TYPE_DEBUG,
1153          "Could not create queue entry to release reserve\n");
1154     return NULL;
1155   }
1156   GNUNET_STATISTICS_update (h->stats,
1157                             gettext_noop
1158                               ("# RELEASE RESERVE requests executed"), 1,
1159                             GNUNET_NO);
1160   process_queue (h);
1161   return qe;
1162 }
1163
1164
1165 /**
1166  * Explicitly remove some content from the database.
1167  * The @a cont continuation will be called with `status`
1168  * #GNUNET_OK" if content was removed, #GNUNET_NO
1169  * if no matching entry was found and #GNUNET_SYSERR
1170  * on all other types of errors.
1171  *
1172  * @param h handle to the datastore
1173  * @param key key for the value
1174  * @param size number of bytes in data
1175  * @param data content stored
1176  * @param queue_priority ranking of this request in the priority queue
1177  * @param max_queue_size at what queue size should this request be dropped
1178  *        (if other requests of higher priority are in the queue)
1179  * @param cont continuation to call when done
1180  * @param cont_cls closure for @a cont
1181  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1182  *         cancel; note that even if NULL is returned, the callback will be invoked
1183  *         (or rather, will already have been invoked)
1184  */
1185 struct GNUNET_DATASTORE_QueueEntry *
1186 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1187                          const struct GNUNET_HashCode *key,
1188                          size_t size,
1189                          const void *data,
1190                          unsigned int queue_priority,
1191                          unsigned int max_queue_size,
1192                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1193                          void *cont_cls)
1194 {
1195   struct GNUNET_DATASTORE_QueueEntry *qe;
1196   struct DataMessage *dm;
1197   struct GNUNET_MQ_Envelope *env;
1198   union QueueContext qc;
1199
1200   if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1201   {
1202     GNUNET_break (0);
1203     return NULL;
1204   }
1205   if (NULL == cont)
1206     cont = &drop_status_cont;
1207   LOG (GNUNET_ERROR_TYPE_DEBUG,
1208        "Asked to remove %u bytes under key `%s'\n",
1209        size,
1210        GNUNET_h2s (key));
1211   env = GNUNET_MQ_msg_extra (dm,
1212                              size,
1213                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1214   dm->size = htonl (size);
1215   dm->key = *key;
1216   GNUNET_memcpy (&dm[1],
1217                  data,
1218                  size);
1219
1220   qc.sc.cont = cont;
1221   qc.sc.cont_cls = cont_cls;
1222
1223   qe = make_queue_entry (h,
1224                          env,
1225                          queue_priority,
1226                          max_queue_size,
1227                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1228                          &qc);
1229   if (NULL == qe)
1230   {
1231     LOG (GNUNET_ERROR_TYPE_DEBUG,
1232          "Could not create queue entry for REMOVE\n");
1233     return NULL;
1234   }
1235   GNUNET_STATISTICS_update (h->stats,
1236                             gettext_noop ("# REMOVE requests executed"),
1237                             1,
1238                             GNUNET_NO);
1239   process_queue (h);
1240   return qe;
1241 }
1242
1243
1244 /**
1245  * Get a random value from the datastore for content replication.
1246  * Returns a single, random value among those with the highest
1247  * replication score, lowering positive replication scores by one for
1248  * the chosen value (if only content with a replication score exists,
1249  * a random value is returned and replication scores are not changed).
1250  *
1251  * @param h handle to the datastore
1252  * @param queue_priority ranking of this request in the priority queue
1253  * @param max_queue_size at what queue size should this request be dropped
1254  *        (if other requests of higher priority are in the queue)
1255  * @param proc function to call on a random value; it
1256  *        will be called once with a value (if available)
1257  *        and always once with a value of NULL.
1258  * @param proc_cls closure for @a proc
1259  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1260  *         cancel
1261  */
1262 struct GNUNET_DATASTORE_QueueEntry *
1263 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1264                                       unsigned int queue_priority,
1265                                       unsigned int max_queue_size,
1266                                       GNUNET_DATASTORE_DatumProcessor proc,
1267                                       void *proc_cls)
1268 {
1269   struct GNUNET_DATASTORE_QueueEntry *qe;
1270   struct GNUNET_MQ_Envelope *env;
1271   struct GNUNET_MessageHeader *m;
1272   union QueueContext qc;
1273
1274   GNUNET_assert (NULL != proc);
1275   LOG (GNUNET_ERROR_TYPE_DEBUG,
1276        "Asked to get replication entry\n");
1277   env = GNUNET_MQ_msg (m,
1278                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1279   qc.rc.proc = proc;
1280   qc.rc.proc_cls = proc_cls;
1281   qe = make_queue_entry (h,
1282                          env,
1283                          queue_priority,
1284                          max_queue_size,
1285                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1286                          &qc);
1287   if (NULL == qe)
1288   {
1289     LOG (GNUNET_ERROR_TYPE_DEBUG,
1290          "Could not create queue entry for GET REPLICATION\n");
1291     return NULL;
1292   }
1293   GNUNET_STATISTICS_update (h->stats,
1294                             gettext_noop
1295                               ("# GET REPLICATION requests executed"), 1,
1296                             GNUNET_NO);
1297   process_queue (h);
1298   return qe;
1299 }
1300
1301
1302 /**
1303  * Get a single zero-anonymity value from the datastore.
1304  *
1305  * @param h handle to the datastore
1306  * @param next_uid return the result with lowest uid >= next_uid
1307  * @param queue_priority ranking of this request in the priority queue
1308  * @param max_queue_size at what queue size should this request be dropped
1309  *        (if other requests of higher priority are in the queue)
1310  * @param type allowed type for the operation (never zero)
1311  * @param proc function to call on a random value; it
1312  *        will be called once with a value (if available)
1313  *        or with NULL if none value exists.
1314  * @param proc_cls closure for @a proc
1315  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1316  *         cancel
1317  */
1318 struct GNUNET_DATASTORE_QueueEntry *
1319 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1320                                      uint64_t next_uid,
1321                                      unsigned int queue_priority,
1322                                      unsigned int max_queue_size,
1323                                      enum GNUNET_BLOCK_Type type,
1324                                      GNUNET_DATASTORE_DatumProcessor proc,
1325                                      void *proc_cls)
1326 {
1327   struct GNUNET_DATASTORE_QueueEntry *qe;
1328   struct GNUNET_MQ_Envelope *env;
1329   struct GetZeroAnonymityMessage *m;
1330   union QueueContext qc;
1331
1332   GNUNET_assert (NULL != proc);
1333   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1334   LOG (GNUNET_ERROR_TYPE_DEBUG,
1335        "Asked to get a zero-anonymity entry of type %d\n",
1336        type);
1337   env = GNUNET_MQ_msg (m,
1338                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1339   m->type = htonl ((uint32_t) type);
1340   m->next_uid = GNUNET_htonll (next_uid);
1341   qc.rc.proc = proc;
1342   qc.rc.proc_cls = proc_cls;
1343   qe = make_queue_entry (h,
1344                          env,
1345                          queue_priority,
1346                          max_queue_size,
1347                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1348                          &qc);
1349   if (NULL == qe)
1350   {
1351     LOG (GNUNET_ERROR_TYPE_DEBUG,
1352          "Could not create queue entry for zero-anonymity procation\n");
1353     return NULL;
1354   }
1355   GNUNET_STATISTICS_update (h->stats,
1356                             gettext_noop
1357                               ("# GET ZERO ANONYMITY requests executed"), 1,
1358                             GNUNET_NO);
1359   process_queue (h);
1360   return qe;
1361 }
1362
1363
1364 /**
1365  * Get a result for a particular key from the datastore.  The processor
1366  * will only be called once.
1367  *
1368  * @param h handle to the datastore
1369  * @param next_uid return the result with lowest uid >= next_uid
1370  * @param random if true, return a random result instead of using next_uid
1371  * @param key maybe NULL (to match all entries)
1372  * @param type desired type, 0 for any
1373  * @param queue_priority ranking of this request in the priority queue
1374  * @param max_queue_size at what queue size should this request be dropped
1375  *        (if other requests of higher priority are in the queue)
1376  * @param proc function to call on each matching value;
1377  *        will be called once with a NULL value at the end
1378  * @param proc_cls closure for @a proc
1379  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1380  *         cancel
1381  */
1382 struct GNUNET_DATASTORE_QueueEntry *
1383 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1384                           uint64_t next_uid,
1385                           bool random,
1386                           const struct GNUNET_HashCode *key,
1387                           enum GNUNET_BLOCK_Type type,
1388                           unsigned int queue_priority,
1389                           unsigned int max_queue_size,
1390                           GNUNET_DATASTORE_DatumProcessor proc,
1391                           void *proc_cls)
1392 {
1393   struct GNUNET_DATASTORE_QueueEntry *qe;
1394   struct GNUNET_MQ_Envelope *env;
1395   struct GetKeyMessage *gkm;
1396   struct GetMessage *gm;
1397   union QueueContext qc;
1398
1399   GNUNET_assert (NULL != proc);
1400   LOG (GNUNET_ERROR_TYPE_DEBUG,
1401        "Asked to look for data of type %u under key `%s'\n",
1402        (unsigned int) type,
1403        GNUNET_h2s (key));
1404   if (NULL == key)
1405   {
1406     env = GNUNET_MQ_msg (gm,
1407                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1408     gm->type = htonl (type);
1409     gm->next_uid = GNUNET_htonll (next_uid);
1410     gm->random = random;
1411   }
1412   else
1413   {
1414     env = GNUNET_MQ_msg (gkm,
1415                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1416     gkm->type = htonl (type);
1417     gkm->next_uid = GNUNET_htonll (next_uid);
1418     gkm->random = random;
1419     gkm->key = *key;
1420   }
1421   qc.rc.proc = proc;
1422   qc.rc.proc_cls = proc_cls;
1423   qe = make_queue_entry (h,
1424                          env,
1425                          queue_priority,
1426                          max_queue_size,
1427                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1428                          &qc);
1429   if (NULL == qe)
1430   {
1431     LOG (GNUNET_ERROR_TYPE_DEBUG,
1432          "Could not queue request for `%s'\n",
1433          GNUNET_h2s (key));
1434     return NULL;
1435   }
1436 #if INSANE_STATISTICS
1437   GNUNET_STATISTICS_update (h->stats,
1438                             gettext_noop ("# GET requests executed"),
1439                             1,
1440                             GNUNET_NO);
1441 #endif
1442   process_queue (h);
1443   return qe;
1444 }
1445
1446
1447 /**
1448  * Cancel a datastore operation.  The final callback from the
1449  * operation must not have been done yet.
1450  *
1451  * @param qe operation to cancel
1452  */
1453 void
1454 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1455 {
1456   struct GNUNET_DATASTORE_Handle *h = qe->h;
1457
1458   LOG (GNUNET_ERROR_TYPE_DEBUG,
1459        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1460        qe,
1461        NULL == qe->env,
1462        h->queue_head == qe);
1463   if (NULL == qe->env)
1464   {
1465     free_queue_entry (qe);
1466     h->skip_next_messages++;
1467     return;
1468   }
1469   free_queue_entry (qe);
1470   process_queue (h);
1471 }
1472
1473
1474 /* end of datastore_api.c */