paragraph for gnunet devs that don't know how to use the web
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14     
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 /**
20  * @file datastore/datastore_api.c
21  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
22  *        a priority queue for requests
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_arm_service.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_datastore_service.h"
29 #include "gnunet_statistics_service.h"
30 #include "datastore.h"
31
32 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
33
34 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for @e cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for @e proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98 /**
99  * Entry in our priority queue.
100  */
101 struct GNUNET_DATASTORE_QueueEntry
102 {
103
104   /**
105    * This is a linked list.
106    */
107   struct GNUNET_DATASTORE_QueueEntry *next;
108
109   /**
110    * This is a linked list.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *prev;
113
114   /**
115    * Handle to the master context.
116    */
117   struct GNUNET_DATASTORE_Handle *h;
118
119   /**
120    * Function to call after transmission of the request.
121    */
122   GNUNET_DATASTORE_ContinuationWithStatus cont;
123
124   /**
125    * Closure for @e cont.
126    */
127   void *cont_cls;
128
129   /**
130    * Context for the operation.
131    */
132   union QueueContext qc;
133
134   /**
135    * Envelope of the request to transmit, NULL after
136    * transmission.
137    */
138   struct GNUNET_MQ_Envelope *env;
139
140   /**
141    * Task we run if this entry stalls the queue and we
142    * need to warn the user.
143    */
144   struct GNUNET_SCHEDULER_Task *delay_warn_task;
145
146   /**
147    * Priority in the queue.
148    */
149   unsigned int priority;
150
151   /**
152    * Maximum allowed length of queue (otherwise
153    * this request should be discarded).
154    */
155   unsigned int max_queue;
156
157   /**
158    * Expected response type.
159    */
160   uint16_t response_type;
161
162 };
163
164
165 /**
166  * Handle to the datastore service.
167  */
168 struct GNUNET_DATASTORE_Handle
169 {
170
171   /**
172    * Our configuration.
173    */
174   const struct GNUNET_CONFIGURATION_Handle *cfg;
175
176   /**
177    * Current connection to the datastore service.
178    */
179   struct GNUNET_MQ_Handle *mq;
180
181   /**
182    * Handle for statistics.
183    */
184   struct GNUNET_STATISTICS_Handle *stats;
185
186   /**
187    * Current head of priority queue.
188    */
189   struct GNUNET_DATASTORE_QueueEntry *queue_head;
190
191   /**
192    * Current tail of priority queue.
193    */
194   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
195
196   /**
197    * Task for trying to reconnect.
198    */
199   struct GNUNET_SCHEDULER_Task *reconnect_task;
200
201   /**
202    * How quickly should we retry?  Used for exponential back-off on
203    * connect-errors.
204    */
205   struct GNUNET_TIME_Relative retry_time;
206
207   /**
208    * Number of entries in the queue.
209    */
210   unsigned int queue_size;
211
212   /**
213    * Number of results we're receiving for the current query
214    * after application stopped to care.  Used to determine when
215    * to reset the connection.
216    */
217   unsigned int result_count;
218
219   /**
220    * We should ignore the next message(s) from the service.
221    */
222   unsigned int skip_next_messages;
223
224 };
225
226
227 /**
228  * Try reconnecting to the datastore service.
229  *
230  * @param cls the `struct GNUNET_DATASTORE_Handle`
231  */
232 static void
233 try_reconnect (void *cls);
234
235
236 /**
237  * Disconnect from the service and then try reconnecting to the datastore service
238  * after some delay.
239  *
240  * @param h handle to datastore to disconnect and reconnect
241  */
242 static void
243 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
244 {
245   if (NULL == h->mq)
246   {
247     GNUNET_break (0);
248     return;
249   }
250   GNUNET_MQ_destroy (h->mq);
251   h->mq = NULL;
252   h->skip_next_messages = 0;
253   h->reconnect_task
254     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
255                                     &try_reconnect,
256                                     h);
257 }
258
259
260 /**
261  * Free a queue entry.  Removes the given entry from the
262  * queue and releases associated resources.  Does NOT
263  * call the callback.
264  *
265  * @param qe entry to free.
266  */
267 static void
268 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
269 {
270   struct GNUNET_DATASTORE_Handle *h = qe->h;
271
272   GNUNET_CONTAINER_DLL_remove (h->queue_head,
273                                h->queue_tail,
274                                qe);
275   h->queue_size--;
276   if (NULL != qe->env)
277     GNUNET_MQ_discard (qe->env);
278   if (NULL != qe->delay_warn_task)
279     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
280   GNUNET_free (qe);
281 }
282
283
284 /**
285  * Task that logs an error after some time.
286  *
287  * @param qe `struct GNUNET_DATASTORE_QueueEntry` about which the error is
288  */
289 static void
290 delay_warning (void *cls)
291 {
292   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
293
294   qe->delay_warn_task = NULL;
295   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
296               "Request %p of type %u at head of datastore queue for more than %s\n",
297               qe,
298               (unsigned int) qe->response_type,
299               GNUNET_STRINGS_relative_time_to_string (DELAY_WARN_TIMEOUT,
300                                                       GNUNET_YES));
301   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
302                                                       &delay_warning,
303                                                       qe);
304 }
305
306
307 /**
308  * Handle error in sending drop request to datastore.
309  *
310  * @param cls closure with the datastore handle
311  * @param error error code
312  */
313 static void
314 mq_error_handler (void *cls,
315                   enum GNUNET_MQ_Error error)
316 {
317   struct GNUNET_DATASTORE_Handle *h = cls;
318   struct GNUNET_DATASTORE_QueueEntry *qe;
319
320   LOG (GNUNET_ERROR_TYPE_DEBUG,
321        "MQ error, reconnecting to DATASTORE\n");
322   do_disconnect (h);
323   qe = h->queue_head;
324   if (NULL == qe)
325     return;
326   if (NULL != qe->delay_warn_task)
327   {
328     GNUNET_SCHEDULER_cancel (qe->delay_warn_task);
329     qe->delay_warn_task = NULL;
330   }
331   if (NULL == qe->env)
332   {
333     union QueueContext qc = qe->qc;
334     uint16_t rt = qe->response_type;
335
336     LOG (GNUNET_ERROR_TYPE_DEBUG,
337          "Failed to receive response from database.\n");
338     free_queue_entry (qe);
339     switch (rt)
340     {
341     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
342       if (NULL != qc.sc.cont)
343         qc.sc.cont (qc.sc.cont_cls,
344                     GNUNET_SYSERR,
345                     GNUNET_TIME_UNIT_ZERO_ABS,
346                     _("DATASTORE disconnected"));
347       break;
348     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
349       if (NULL != qc.rc.proc)
350         qc.rc.proc (qc.rc.proc_cls,
351                     NULL,
352                     0,
353                     NULL,
354                     0,
355                     0,
356                     0,
357                     0,
358                     GNUNET_TIME_UNIT_ZERO_ABS,
359                     0);
360       break;
361     default:
362       GNUNET_break (0);
363     }
364   }
365 }
366
367
368 /**
369  * Connect to the datastore service.
370  *
371  * @param cfg configuration to use
372  * @return handle to use to access the service
373  */
374 struct GNUNET_DATASTORE_Handle *
375 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
376 {
377   struct GNUNET_DATASTORE_Handle *h;
378
379   LOG (GNUNET_ERROR_TYPE_DEBUG,
380        "Establishing DATASTORE connection!\n");
381   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
382   h->cfg = cfg;
383   try_reconnect (h);
384   if (NULL == h->mq)
385   {
386     GNUNET_free (h);
387     return NULL;
388   }
389   h->stats = GNUNET_STATISTICS_create ("datastore-api",
390                                        cfg);
391   return h;
392 }
393
394
395 /**
396  * Task used by to disconnect from the datastore after
397  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
398  *
399  * @param cls the datastore handle
400  */
401 static void
402 disconnect_after_drop (void *cls)
403 {
404   struct GNUNET_DATASTORE_Handle *h = cls;
405
406   LOG (GNUNET_ERROR_TYPE_DEBUG,
407        "Drop sent, disconnecting\n");
408   GNUNET_DATASTORE_disconnect (h,
409                                GNUNET_NO);
410 }
411
412
413 /**
414  * Handle error in sending drop request to datastore.
415  *
416  * @param cls closure with the datastore handle
417  * @param error error code
418  */
419 static void
420 disconnect_on_mq_error (void *cls,
421                         enum GNUNET_MQ_Error error)
422 {
423   struct GNUNET_DATASTORE_Handle *h = cls;
424
425   LOG (GNUNET_ERROR_TYPE_ERROR,
426        "Failed to ask datastore to drop tables\n");
427   GNUNET_DATASTORE_disconnect (h,
428                                GNUNET_NO);
429 }
430
431
432 /**
433  * Disconnect from the datastore service (and free
434  * associated resources).
435  *
436  * @param h handle to the datastore
437  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
438  */
439 void
440 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
441                              int drop)
442 {
443   struct GNUNET_DATASTORE_QueueEntry *qe;
444
445   LOG (GNUNET_ERROR_TYPE_DEBUG,
446        "Datastore disconnect\n");
447   if (NULL != h->mq)
448   {
449     GNUNET_MQ_destroy (h->mq);
450     h->mq = NULL;
451   }
452   if (NULL != h->reconnect_task)
453   {
454     GNUNET_SCHEDULER_cancel (h->reconnect_task);
455     h->reconnect_task = NULL;
456   }
457   while (NULL != (qe = h->queue_head))
458   {
459     switch (qe->response_type)
460     {
461     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
462       if (NULL != qe->qc.sc.cont)
463         qe->qc.sc.cont (qe->qc.sc.cont_cls,
464                         GNUNET_SYSERR,
465                         GNUNET_TIME_UNIT_ZERO_ABS,
466                         _("Disconnected from DATASTORE"));
467       break;
468     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
469       if (NULL != qe->qc.rc.proc)
470         qe->qc.rc.proc (qe->qc.rc.proc_cls,
471                         NULL,
472                         0,
473                         NULL,
474                         0,
475                         0,
476                         0,
477                         0,
478                         GNUNET_TIME_UNIT_ZERO_ABS,
479                         0);
480       break;
481     default:
482       GNUNET_break (0);
483     }
484     free_queue_entry (qe);
485   }
486   if (GNUNET_YES == drop)
487   {
488     LOG (GNUNET_ERROR_TYPE_DEBUG,
489          "Re-connecting to issue DROP!\n");
490     GNUNET_assert (NULL == h->mq);
491     h->mq = GNUNET_CLIENT_connect (h->cfg,
492                                    "datastore",
493                                    NULL,
494                                    &disconnect_on_mq_error,
495                                    h);
496     if (NULL != h->mq)
497     {
498       struct GNUNET_MessageHeader *hdr;
499       struct GNUNET_MQ_Envelope *env;
500
501       env = GNUNET_MQ_msg (hdr,
502                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
503       GNUNET_MQ_notify_sent (env,
504                              &disconnect_after_drop,
505                              h);
506       GNUNET_MQ_send (h->mq,
507                       env);
508       return;
509     }
510     GNUNET_break (0);
511   }
512   GNUNET_STATISTICS_destroy (h->stats,
513                              GNUNET_NO);
514   h->stats = NULL;
515   GNUNET_free (h);
516 }
517
518
519 /**
520  * Create a new entry for our priority queue (and possibly discard other entires if
521  * the queue is getting too long).
522  *
523  * @param h handle to the datastore
524  * @param env envelope with the message to queue
525  * @param queue_priority priority of the entry
526  * @param max_queue_size at what queue size should this request be dropped
527  *        (if other requests of higher priority are in the queue)
528  * @param expected_type which type of response do we expect,
529  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
530  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
531  * @param qc client context (NOT a closure for @a response_proc)
532  * @return NULL if the queue is full
533  */
534 static struct GNUNET_DATASTORE_QueueEntry *
535 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
536                   struct GNUNET_MQ_Envelope *env,
537                   unsigned int queue_priority,
538                   unsigned int max_queue_size,
539                   uint16_t expected_type,
540                   const union QueueContext *qc)
541 {
542   struct GNUNET_DATASTORE_QueueEntry *qe;
543   struct GNUNET_DATASTORE_QueueEntry *pos;
544   unsigned int c;
545
546   if ( (NULL != h->queue_tail) &&
547        (h->queue_tail->priority >= queue_priority) )
548   {
549     c = h->queue_size;
550     pos = NULL;
551   }
552   else
553   {
554     c = 0;
555     pos = h->queue_head;
556   }
557   while ( (NULL != pos) &&
558           (c < max_queue_size) &&
559           (pos->priority >= queue_priority) )
560   {
561     c++;
562     pos = pos->next;
563   }
564   if (c >= max_queue_size)
565   {
566     GNUNET_STATISTICS_update (h->stats,
567                               gettext_noop ("# queue overflows"),
568                               1,
569                               GNUNET_NO);
570     GNUNET_MQ_discard (env);
571     return NULL;
572   }
573   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
574   qe->h = h;
575   qe->env = env;
576   qe->response_type = expected_type;
577   qe->qc = *qc;
578   qe->priority = queue_priority;
579   qe->max_queue = max_queue_size;
580   if (NULL == pos)
581   {
582     /* append at the tail */
583     pos = h->queue_tail;
584   }
585   else
586   {
587     pos = pos->prev;
588     /* do not insert at HEAD if HEAD query was already
589      * transmitted and we are still receiving replies! */
590     if ( (NULL == pos) &&
591          (NULL == h->queue_head->env) )
592       pos = h->queue_head;
593   }
594   c++;
595 #if INSANE_STATISTICS
596   GNUNET_STATISTICS_update (h->stats,
597                             gettext_noop ("# queue entries created"),
598                             1,
599                             GNUNET_NO);
600 #endif
601   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
602                                      h->queue_tail,
603                                      pos,
604                                      qe);
605   h->queue_size++;
606   return qe;
607 }
608
609
610 /**
611  * Process entries in the queue (or do nothing if we are already
612  * doing so).
613  *
614  * @param h handle to the datastore
615  */
616 static void
617 process_queue (struct GNUNET_DATASTORE_Handle *h)
618 {
619   struct GNUNET_DATASTORE_QueueEntry *qe;
620
621   if (NULL == (qe = h->queue_head))
622   {
623     /* no entry in queue */
624     LOG (GNUNET_ERROR_TYPE_DEBUG,
625          "Queue empty\n");
626     return;
627   }
628   if (NULL == qe->env)
629   {
630     /* waiting for replies */
631     LOG (GNUNET_ERROR_TYPE_DEBUG,
632          "Head request already transmitted\n");
633     return;
634   }
635   if (NULL == h->mq)
636   {
637     /* waiting for reconnect */
638     LOG (GNUNET_ERROR_TYPE_DEBUG,
639          "Not connected\n");
640     return;
641   }
642   GNUNET_assert (NULL == qe->delay_warn_task);
643   qe->delay_warn_task = GNUNET_SCHEDULER_add_delayed (DELAY_WARN_TIMEOUT,
644                                                       &delay_warning,
645                                                       qe);
646   GNUNET_MQ_send (h->mq,
647                   qe->env);
648   qe->env = NULL;
649 }
650
651
652 /**
653  * Get the entry at the head of the message queue.
654  *
655  * @param h handle to the datastore
656  * @param response_type the expected response type
657  * @return the queue entry
658  */
659 static struct GNUNET_DATASTORE_QueueEntry *
660 get_queue_head (struct GNUNET_DATASTORE_Handle *h,
661                 uint16_t response_type)
662 {
663   struct GNUNET_DATASTORE_QueueEntry *qe;
664
665   if (h->skip_next_messages > 0)
666   {
667     h->skip_next_messages--;
668     process_queue (h);
669     return NULL;
670   }
671   qe = h->queue_head;
672   if (NULL == qe)
673   {
674     GNUNET_break (0);
675     do_disconnect (h);
676     return NULL;
677   }
678   if (NULL != qe->env)
679   {
680     GNUNET_break (0);
681     do_disconnect (h);
682     return NULL;
683   }
684   if (response_type != qe->response_type)
685   {
686     GNUNET_break (0);
687     do_disconnect (h);
688     return NULL;
689   }
690   return qe;
691 }
692
693
694 /**
695  * Function called to check status message from the service.
696  *
697  * @param cls closure
698  * @param sm status message received
699  * @return #GNUNET_OK if the message is well-formed
700  */
701 static int
702 check_status (void *cls,
703               const struct StatusMessage *sm)
704 {
705   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
706   int32_t status = ntohl (sm->status);
707
708   if (msize > 0)
709   {
710     const char *emsg = (const char *) &sm[1];
711
712     if ('\0' != emsg[msize - 1])
713     {
714       GNUNET_break (0);
715       return GNUNET_SYSERR;
716     }
717   }
718   else if (GNUNET_SYSERR == status)
719   {
720     GNUNET_break (0);
721     return GNUNET_SYSERR;
722   }
723   return GNUNET_OK;
724 }
725
726
727 /**
728  * Function called to handle status message from the service.
729  *
730  * @param cls closure
731  * @param sm status message received
732  */
733 static void
734 handle_status (void *cls,
735                const struct StatusMessage *sm)
736 {
737   struct GNUNET_DATASTORE_Handle *h = cls;
738   struct GNUNET_DATASTORE_QueueEntry *qe;
739   struct StatusContext rc;
740   const char *emsg;
741   int32_t status = ntohl (sm->status);
742
743   qe = get_queue_head (h,
744                        GNUNET_MESSAGE_TYPE_DATASTORE_STATUS);
745   if (NULL == qe)
746     return;
747   rc = qe->qc.sc;
748   free_queue_entry (qe);
749   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
750     emsg = (const char *) &sm[1];
751   else
752     emsg = NULL;
753   LOG (GNUNET_ERROR_TYPE_DEBUG,
754        "Received status %d/%s\n",
755        (int) status,
756        emsg);
757   GNUNET_STATISTICS_update (h->stats,
758                             gettext_noop ("# status messages received"),
759                             1,
760                             GNUNET_NO);
761   h->retry_time = GNUNET_TIME_UNIT_ZERO;
762   process_queue (h);
763   if (NULL != rc.cont)
764     rc.cont (rc.cont_cls,
765              status,
766              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
767              emsg);
768 }
769
770
771 /**
772  * Check data message we received from the service.
773  *
774  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
775  * @param dm message received
776  */
777 static int
778 check_data (void *cls,
779             const struct DataMessage *dm)
780 {
781   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
782
783   if (msize != ntohl (dm->size))
784   {
785     GNUNET_break (0);
786     return GNUNET_SYSERR;
787   }
788   return GNUNET_OK;
789 }
790
791
792 /**
793  * Handle data message we got from the service.
794  *
795  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
796  * @param dm message received
797  */
798 static void
799 handle_data (void *cls,
800              const struct DataMessage *dm)
801 {
802   struct GNUNET_DATASTORE_Handle *h = cls;
803   struct GNUNET_DATASTORE_QueueEntry *qe;
804   struct ResultContext rc;
805
806   qe = get_queue_head (h,
807                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
808   if (NULL == qe)
809     return;
810 #if INSANE_STATISTICS
811   GNUNET_STATISTICS_update (h->stats,
812                             gettext_noop ("# Results received"),
813                             1,
814                             GNUNET_NO);
815 #endif
816   LOG (GNUNET_ERROR_TYPE_DEBUG,
817        "Received result %llu with type %u and size %u with key %s\n",
818        (unsigned long long) GNUNET_ntohll (dm->uid),
819        ntohl (dm->type),
820        ntohl (dm->size),
821        GNUNET_h2s (&dm->key));
822   rc = qe->qc.rc;
823   free_queue_entry (qe);
824   h->retry_time = GNUNET_TIME_UNIT_ZERO;
825   process_queue (h);
826   if (NULL != rc.proc)
827     rc.proc (rc.proc_cls,
828              &dm->key,
829              ntohl (dm->size),
830              &dm[1],
831              ntohl (dm->type),
832              ntohl (dm->priority),
833              ntohl (dm->anonymity),
834              ntohl (dm->replication),
835              GNUNET_TIME_absolute_ntoh (dm->expiration),
836              GNUNET_ntohll (dm->uid));
837 }
838
839
840 /**
841  * Type of a function to call when we receive a
842  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
843  *
844  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
845  * @param msg message received
846  */
847 static void
848 handle_data_end (void *cls,
849                  const struct GNUNET_MessageHeader *msg)
850 {
851   struct GNUNET_DATASTORE_Handle *h = cls;
852   struct GNUNET_DATASTORE_QueueEntry *qe;
853   struct ResultContext rc;
854
855   qe = get_queue_head (h,
856                        GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
857   if (NULL == qe)
858     return;
859   rc = qe->qc.rc;
860   free_queue_entry (qe);
861   LOG (GNUNET_ERROR_TYPE_DEBUG,
862        "Received end of result set, new queue size is %u\n",
863        h->queue_size);
864   h->retry_time = GNUNET_TIME_UNIT_ZERO;
865   h->result_count = 0;
866   process_queue (h);
867   /* signal end of iteration */
868   if (NULL != rc.proc)
869     rc.proc (rc.proc_cls,
870              NULL,
871              0,
872              NULL,
873              0,
874              0,
875              0,
876              0,
877              GNUNET_TIME_UNIT_ZERO_ABS,
878              0);
879 }
880
881
882 /**
883  * Try reconnecting to the datastore service.
884  *
885  * @param cls the `struct GNUNET_DATASTORE_Handle`
886  */
887 static void
888 try_reconnect (void *cls)
889 {
890   struct GNUNET_DATASTORE_Handle *h = cls;
891   struct GNUNET_MQ_MessageHandler handlers[] = {
892     GNUNET_MQ_hd_var_size (status,
893                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
894                            struct StatusMessage,
895                            h),
896     GNUNET_MQ_hd_var_size (data,
897                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
898                            struct DataMessage,
899                            h),
900     GNUNET_MQ_hd_fixed_size (data_end,
901                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
902                              struct GNUNET_MessageHeader,
903                              h),
904     GNUNET_MQ_handler_end ()
905   };
906
907   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
908   h->reconnect_task = NULL;
909   GNUNET_assert (NULL == h->mq);
910   h->mq = GNUNET_CLIENT_connect (h->cfg,
911                                  "datastore",
912                                  handlers,
913                                  &mq_error_handler,
914                                  h);
915   if (NULL == h->mq)
916     return;
917   GNUNET_STATISTICS_update (h->stats,
918                             gettext_noop ("# datastore connections (re)created"),
919                             1,
920                             GNUNET_NO);
921   LOG (GNUNET_ERROR_TYPE_DEBUG,
922        "Reconnected to DATASTORE\n");
923   process_queue (h);
924 }
925
926
927 /**
928  * Dummy continuation used to do nothing (but be non-zero).
929  *
930  * @param cls closure
931  * @param result result
932  * @param min_expiration expiration time
933  * @param emsg error message
934  */
935 static void
936 drop_status_cont (void *cls,
937                   int32_t result,
938                   struct GNUNET_TIME_Absolute min_expiration,
939                   const char *emsg)
940 {
941   /* do nothing */
942 }
943
944
945 /**
946  * Store an item in the datastore.  If the item is already present,
947  * the priorities are summed up and the higher expiration time and
948  * lower anonymity level is used.
949  *
950  * @param h handle to the datastore
951  * @param rid reservation ID to use (from "reserve"); use 0 if no
952  *            prior reservation was made
953  * @param key key for the value
954  * @param size number of bytes in data
955  * @param data content stored
956  * @param type type of the content
957  * @param priority priority of the content
958  * @param anonymity anonymity-level for the content
959  * @param replication how often should the content be replicated to other peers?
960  * @param expiration expiration time for the content
961  * @param queue_priority ranking of this request in the priority queue
962  * @param max_queue_size at what queue size should this request be dropped
963  *        (if other requests of higher priority are in the queue)
964  * @param cont continuation to call when done
965  * @param cont_cls closure for @a cont
966  * @return NULL if the entry was not queued, otherwise a handle that can be used to
967  *         cancel; note that even if NULL is returned, the callback will be invoked
968  *         (or rather, will already have been invoked)
969  */
970 struct GNUNET_DATASTORE_QueueEntry *
971 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
972                       uint32_t rid,
973                       const struct GNUNET_HashCode *key,
974                       size_t size,
975                       const void *data,
976                       enum GNUNET_BLOCK_Type type,
977                       uint32_t priority,
978                       uint32_t anonymity,
979                       uint32_t replication,
980                       struct GNUNET_TIME_Absolute expiration,
981                       unsigned int queue_priority,
982                       unsigned int max_queue_size,
983                       GNUNET_DATASTORE_ContinuationWithStatus cont,
984                       void *cont_cls)
985 {
986   struct GNUNET_DATASTORE_QueueEntry *qe;
987   struct GNUNET_MQ_Envelope *env;
988   struct DataMessage *dm;
989   union QueueContext qc;
990
991   if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
992   {
993     GNUNET_break (0);
994     return NULL;
995   }
996
997   LOG (GNUNET_ERROR_TYPE_DEBUG,
998        "Asked to put %u bytes of data under key `%s' for %s\n",
999        size,
1000        GNUNET_h2s (key),
1001        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
1002                                                GNUNET_YES));
1003   env = GNUNET_MQ_msg_extra (dm,
1004                              size,
1005                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
1006   dm->rid = htonl (rid);
1007   dm->size = htonl ((uint32_t) size);
1008   dm->type = htonl (type);
1009   dm->priority = htonl (priority);
1010   dm->anonymity = htonl (anonymity);
1011   dm->replication = htonl (replication);
1012   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1013   dm->key = *key;
1014   GNUNET_memcpy (&dm[1],
1015           data,
1016           size);
1017   qc.sc.cont = cont;
1018   qc.sc.cont_cls = cont_cls;
1019   qe = make_queue_entry (h,
1020                          env,
1021                          queue_priority,
1022                          max_queue_size,
1023                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1024                          &qc);
1025   if (NULL == qe)
1026   {
1027     LOG (GNUNET_ERROR_TYPE_DEBUG,
1028          "Could not create queue entry for PUT\n");
1029     return NULL;
1030   }
1031   GNUNET_STATISTICS_update (h->stats,
1032                             gettext_noop ("# PUT requests executed"),
1033                             1,
1034                             GNUNET_NO);
1035   process_queue (h);
1036   return qe;
1037 }
1038
1039
1040 /**
1041  * Reserve space in the datastore.  This function should be used
1042  * to avoid "out of space" failures during a longer sequence of "put"
1043  * operations (for example, when a file is being inserted).
1044  *
1045  * @param h handle to the datastore
1046  * @param amount how much space (in bytes) should be reserved (for content only)
1047  * @param entries how many entries will be created (to calculate per-entry overhead)
1048  * @param cont continuation to call when done; "success" will be set to
1049  *             a positive reservation value if space could be reserved.
1050  * @param cont_cls closure for @a cont
1051  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1052  *         cancel; note that even if NULL is returned, the callback will be invoked
1053  *         (or rather, will already have been invoked)
1054  */
1055 struct GNUNET_DATASTORE_QueueEntry *
1056 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1057                           uint64_t amount,
1058                           uint32_t entries,
1059                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1060                           void *cont_cls)
1061 {
1062   struct GNUNET_DATASTORE_QueueEntry *qe;
1063   struct GNUNET_MQ_Envelope *env;
1064   struct ReserveMessage *rm;
1065   union QueueContext qc;
1066
1067   if (NULL == cont)
1068     cont = &drop_status_cont;
1069   LOG (GNUNET_ERROR_TYPE_DEBUG,
1070        "Asked to reserve %llu bytes of data and %u entries\n",
1071        (unsigned long long) amount,
1072        (unsigned int) entries);
1073   env = GNUNET_MQ_msg (rm,
1074                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1075   rm->entries = htonl (entries);
1076   rm->amount = GNUNET_htonll (amount);
1077
1078   qc.sc.cont = cont;
1079   qc.sc.cont_cls = cont_cls;
1080   qe = make_queue_entry (h,
1081                          env,
1082                          UINT_MAX,
1083                          UINT_MAX,
1084                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1085                          &qc);
1086   if (NULL == qe)
1087   {
1088     LOG (GNUNET_ERROR_TYPE_DEBUG,
1089          "Could not create queue entry to reserve\n");
1090     return NULL;
1091   }
1092   GNUNET_STATISTICS_update (h->stats,
1093                             gettext_noop ("# RESERVE requests executed"),
1094                             1,
1095                             GNUNET_NO);
1096   process_queue (h);
1097   return qe;
1098 }
1099
1100
1101 /**
1102  * Signal that all of the data for which a reservation was made has
1103  * been stored and that whatever excess space might have been reserved
1104  * can now be released.
1105  *
1106  * @param h handle to the datastore
1107  * @param rid reservation ID (value of "success" in original continuation
1108  *        from the "reserve" function).
1109  * @param queue_priority ranking of this request in the priority queue
1110  * @param max_queue_size at what queue size should this request be dropped
1111  *        (if other requests of higher priority are in the queue)
1112  * @param queue_priority ranking of this request in the priority queue
1113  * @param max_queue_size at what queue size should this request be dropped
1114  *        (if other requests of higher priority are in the queue)
1115  * @param cont continuation to call when done
1116  * @param cont_cls closure for @a cont
1117  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1118  *         cancel; note that even if NULL is returned, the callback will be invoked
1119  *         (or rather, will already have been invoked)
1120  */
1121 struct GNUNET_DATASTORE_QueueEntry *
1122 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1123                                   uint32_t rid,
1124                                   unsigned int queue_priority,
1125                                   unsigned int max_queue_size,
1126                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1127                                   void *cont_cls)
1128 {
1129   struct GNUNET_DATASTORE_QueueEntry *qe;
1130   struct GNUNET_MQ_Envelope *env;
1131   struct ReleaseReserveMessage *rrm;
1132   union QueueContext qc;
1133
1134   if (NULL == cont)
1135     cont = &drop_status_cont;
1136   LOG (GNUNET_ERROR_TYPE_DEBUG,
1137        "Asked to release reserve %d\n",
1138        rid);
1139   env = GNUNET_MQ_msg (rrm,
1140                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1141   rrm->rid = htonl (rid);
1142   qc.sc.cont = cont;
1143   qc.sc.cont_cls = cont_cls;
1144   qe = make_queue_entry (h,
1145                          env,
1146                          queue_priority,
1147                          max_queue_size,
1148                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1149                          &qc);
1150   if (NULL == qe)
1151   {
1152     LOG (GNUNET_ERROR_TYPE_DEBUG,
1153          "Could not create queue entry to release reserve\n");
1154     return NULL;
1155   }
1156   GNUNET_STATISTICS_update (h->stats,
1157                             gettext_noop
1158                             ("# RELEASE RESERVE requests executed"), 1,
1159                             GNUNET_NO);
1160   process_queue (h);
1161   return qe;
1162 }
1163
1164
1165 /**
1166  * Explicitly remove some content from the database.
1167  * The @a cont continuation will be called with `status`
1168  * #GNUNET_OK" if content was removed, #GNUNET_NO
1169  * if no matching entry was found and #GNUNET_SYSERR
1170  * on all other types of errors.
1171  *
1172  * @param h handle to the datastore
1173  * @param key key for the value
1174  * @param size number of bytes in data
1175  * @param data content stored
1176  * @param queue_priority ranking of this request in the priority queue
1177  * @param max_queue_size at what queue size should this request be dropped
1178  *        (if other requests of higher priority are in the queue)
1179  * @param cont continuation to call when done
1180  * @param cont_cls closure for @a cont
1181  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1182  *         cancel; note that even if NULL is returned, the callback will be invoked
1183  *         (or rather, will already have been invoked)
1184  */
1185 struct GNUNET_DATASTORE_QueueEntry *
1186 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1187                          const struct GNUNET_HashCode *key,
1188                          size_t size,
1189                          const void *data,
1190                          unsigned int queue_priority,
1191                          unsigned int max_queue_size,
1192                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1193                          void *cont_cls)
1194 {
1195   struct GNUNET_DATASTORE_QueueEntry *qe;
1196   struct DataMessage *dm;
1197   struct GNUNET_MQ_Envelope *env;
1198   union QueueContext qc;
1199
1200   if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1201   {
1202     GNUNET_break (0);
1203     return NULL;
1204   }
1205   if (NULL == cont)
1206     cont = &drop_status_cont;
1207   LOG (GNUNET_ERROR_TYPE_DEBUG,
1208        "Asked to remove %u bytes under key `%s'\n",
1209        size,
1210        GNUNET_h2s (key));
1211   env = GNUNET_MQ_msg_extra (dm,
1212                              size,
1213                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1214   dm->size = htonl (size);
1215   dm->key = *key;
1216   GNUNET_memcpy (&dm[1],
1217           data,
1218           size);
1219
1220   qc.sc.cont = cont;
1221   qc.sc.cont_cls = cont_cls;
1222
1223   qe = make_queue_entry (h,
1224                          env,
1225                          queue_priority,
1226                          max_queue_size,
1227                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1228                          &qc);
1229   if (NULL == qe)
1230   {
1231     LOG (GNUNET_ERROR_TYPE_DEBUG,
1232          "Could not create queue entry for REMOVE\n");
1233     return NULL;
1234   }
1235   GNUNET_STATISTICS_update (h->stats,
1236                             gettext_noop ("# REMOVE requests executed"),
1237                             1,
1238                             GNUNET_NO);
1239   process_queue (h);
1240   return qe;
1241 }
1242
1243
1244
1245 /**
1246  * Get a random value from the datastore for content replication.
1247  * Returns a single, random value among those with the highest
1248  * replication score, lowering positive replication scores by one for
1249  * the chosen value (if only content with a replication score exists,
1250  * a random value is returned and replication scores are not changed).
1251  *
1252  * @param h handle to the datastore
1253  * @param queue_priority ranking of this request in the priority queue
1254  * @param max_queue_size at what queue size should this request be dropped
1255  *        (if other requests of higher priority are in the queue)
1256  * @param proc function to call on a random value; it
1257  *        will be called once with a value (if available)
1258  *        and always once with a value of NULL.
1259  * @param proc_cls closure for @a proc
1260  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1261  *         cancel
1262  */
1263 struct GNUNET_DATASTORE_QueueEntry *
1264 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1265                                       unsigned int queue_priority,
1266                                       unsigned int max_queue_size,
1267                                       GNUNET_DATASTORE_DatumProcessor proc,
1268                                       void *proc_cls)
1269 {
1270   struct GNUNET_DATASTORE_QueueEntry *qe;
1271   struct GNUNET_MQ_Envelope *env;
1272   struct GNUNET_MessageHeader *m;
1273   union QueueContext qc;
1274
1275   GNUNET_assert (NULL != proc);
1276   LOG (GNUNET_ERROR_TYPE_DEBUG,
1277        "Asked to get replication entry\n");
1278   env = GNUNET_MQ_msg (m,
1279                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1280   qc.rc.proc = proc;
1281   qc.rc.proc_cls = proc_cls;
1282   qe = make_queue_entry (h,
1283                          env,
1284                          queue_priority,
1285                          max_queue_size,
1286                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1287                          &qc);
1288   if (NULL == qe)
1289   {
1290     LOG (GNUNET_ERROR_TYPE_DEBUG,
1291          "Could not create queue entry for GET REPLICATION\n");
1292     return NULL;
1293   }
1294   GNUNET_STATISTICS_update (h->stats,
1295                             gettext_noop
1296                             ("# GET REPLICATION requests executed"), 1,
1297                             GNUNET_NO);
1298   process_queue (h);
1299   return qe;
1300 }
1301
1302
1303 /**
1304  * Get a single zero-anonymity value from the datastore.
1305  *
1306  * @param h handle to the datastore
1307  * @param next_uid return the result with lowest uid >= next_uid
1308  * @param queue_priority ranking of this request in the priority queue
1309  * @param max_queue_size at what queue size should this request be dropped
1310  *        (if other requests of higher priority are in the queue)
1311  * @param type allowed type for the operation (never zero)
1312  * @param proc function to call on a random value; it
1313  *        will be called once with a value (if available)
1314  *        or with NULL if none value exists.
1315  * @param proc_cls closure for @a proc
1316  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1317  *         cancel
1318  */
1319 struct GNUNET_DATASTORE_QueueEntry *
1320 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1321                                      uint64_t next_uid,
1322                                      unsigned int queue_priority,
1323                                      unsigned int max_queue_size,
1324                                      enum GNUNET_BLOCK_Type type,
1325                                      GNUNET_DATASTORE_DatumProcessor proc,
1326                                      void *proc_cls)
1327 {
1328   struct GNUNET_DATASTORE_QueueEntry *qe;
1329   struct GNUNET_MQ_Envelope *env;
1330   struct GetZeroAnonymityMessage *m;
1331   union QueueContext qc;
1332
1333   GNUNET_assert (NULL != proc);
1334   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1335   LOG (GNUNET_ERROR_TYPE_DEBUG,
1336        "Asked to get a zero-anonymity entry of type %d\n",
1337        type);
1338   env = GNUNET_MQ_msg (m,
1339                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1340   m->type = htonl ((uint32_t) type);
1341   m->next_uid = GNUNET_htonll (next_uid);
1342   qc.rc.proc = proc;
1343   qc.rc.proc_cls = proc_cls;
1344   qe = make_queue_entry (h,
1345                          env,
1346                          queue_priority,
1347                          max_queue_size,
1348                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1349                          &qc);
1350   if (NULL == qe)
1351   {
1352     LOG (GNUNET_ERROR_TYPE_DEBUG,
1353          "Could not create queue entry for zero-anonymity procation\n");
1354     return NULL;
1355   }
1356   GNUNET_STATISTICS_update (h->stats,
1357                             gettext_noop
1358                             ("# GET ZERO ANONYMITY requests executed"), 1,
1359                             GNUNET_NO);
1360   process_queue (h);
1361   return qe;
1362 }
1363
1364
1365 /**
1366  * Get a result for a particular key from the datastore.  The processor
1367  * will only be called once.
1368  *
1369  * @param h handle to the datastore
1370  * @param next_uid return the result with lowest uid >= next_uid
1371  * @param random if true, return a random result instead of using next_uid
1372  * @param key maybe NULL (to match all entries)
1373  * @param type desired type, 0 for any
1374  * @param queue_priority ranking of this request in the priority queue
1375  * @param max_queue_size at what queue size should this request be dropped
1376  *        (if other requests of higher priority are in the queue)
1377  * @param proc function to call on each matching value;
1378  *        will be called once with a NULL value at the end
1379  * @param proc_cls closure for @a proc
1380  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1381  *         cancel
1382  */
1383 struct GNUNET_DATASTORE_QueueEntry *
1384 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1385                           uint64_t next_uid,
1386                           bool random,
1387                           const struct GNUNET_HashCode *key,
1388                           enum GNUNET_BLOCK_Type type,
1389                           unsigned int queue_priority,
1390                           unsigned int max_queue_size,
1391                           GNUNET_DATASTORE_DatumProcessor proc,
1392                           void *proc_cls)
1393 {
1394   struct GNUNET_DATASTORE_QueueEntry *qe;
1395   struct GNUNET_MQ_Envelope *env;
1396   struct GetKeyMessage *gkm;
1397   struct GetMessage *gm;
1398   union QueueContext qc;
1399
1400   GNUNET_assert (NULL != proc);
1401   LOG (GNUNET_ERROR_TYPE_DEBUG,
1402        "Asked to look for data of type %u under key `%s'\n",
1403        (unsigned int) type,
1404        GNUNET_h2s (key));
1405   if (NULL == key)
1406   {
1407     env = GNUNET_MQ_msg (gm,
1408                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1409     gm->type = htonl (type);
1410     gm->next_uid = GNUNET_htonll (next_uid);
1411     gm->random = random;
1412   }
1413   else
1414   {
1415     env = GNUNET_MQ_msg (gkm,
1416                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1417     gkm->type = htonl (type);
1418     gkm->next_uid = GNUNET_htonll (next_uid);
1419     gkm->random = random;
1420     gkm->key = *key;
1421   }
1422   qc.rc.proc = proc;
1423   qc.rc.proc_cls = proc_cls;
1424   qe = make_queue_entry (h,
1425                          env,
1426                          queue_priority,
1427                          max_queue_size,
1428                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1429                          &qc);
1430   if (NULL == qe)
1431   {
1432     LOG (GNUNET_ERROR_TYPE_DEBUG,
1433          "Could not queue request for `%s'\n",
1434          GNUNET_h2s (key));
1435     return NULL;
1436   }
1437 #if INSANE_STATISTICS
1438   GNUNET_STATISTICS_update (h->stats,
1439                             gettext_noop ("# GET requests executed"),
1440                             1,
1441                             GNUNET_NO);
1442 #endif
1443   process_queue (h);
1444   return qe;
1445 }
1446
1447
1448 /**
1449  * Cancel a datastore operation.  The final callback from the
1450  * operation must not have been done yet.
1451  *
1452  * @param qe operation to cancel
1453  */
1454 void
1455 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1456 {
1457   struct GNUNET_DATASTORE_Handle *h = qe->h;
1458
1459   LOG (GNUNET_ERROR_TYPE_DEBUG,
1460        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1461        qe,
1462        NULL == qe->env,
1463        h->queue_head == qe);
1464   if (NULL == qe->env)
1465   {
1466     free_queue_entry (qe);
1467     h->skip_next_messages++;
1468     return;
1469   }
1470   free_queue_entry (qe);
1471   process_queue (h);
1472 }
1473
1474
1475 /* end of datastore_api.c */