b2fc189475a5d6851e5c3021bf36a600ff6d3b5f
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for @e cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for @e proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98 /**
99  * Entry in our priority queue.
100  */
101 struct GNUNET_DATASTORE_QueueEntry
102 {
103
104   /**
105    * This is a linked list.
106    */
107   struct GNUNET_DATASTORE_QueueEntry *next;
108
109   /**
110    * This is a linked list.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *prev;
113
114   /**
115    * Handle to the master context.
116    */
117   struct GNUNET_DATASTORE_Handle *h;
118
119   /**
120    * Function to call after transmission of the request.
121    */
122   GNUNET_DATASTORE_ContinuationWithStatus cont;
123
124   /**
125    * Closure for @e cont.
126    */
127   void *cont_cls;
128
129   /**
130    * Context for the operation.
131    */
132   union QueueContext qc;
133
134   /**
135    * Envelope of the request to transmit, NULL after
136    * transmission.
137    */
138   struct GNUNET_MQ_Envelope *env;
139
140   /**
141    * Priority in the queue.
142    */
143   unsigned int priority;
144
145   /**
146    * Maximum allowed length of queue (otherwise
147    * this request should be discarded).
148    */
149   unsigned int max_queue;
150
151   /**
152    * Expected response type.
153    */
154   uint16_t response_type;
155
156 };
157
158
159 /**
160  * Handle to the datastore service.
161  */
162 struct GNUNET_DATASTORE_Handle
163 {
164
165   /**
166    * Our configuration.
167    */
168   const struct GNUNET_CONFIGURATION_Handle *cfg;
169
170   /**
171    * Current connection to the datastore service.
172    */
173   struct GNUNET_MQ_Handle *mq;
174
175   /**
176    * Handle for statistics.
177    */
178   struct GNUNET_STATISTICS_Handle *stats;
179
180   /**
181    * Current head of priority queue.
182    */
183   struct GNUNET_DATASTORE_QueueEntry *queue_head;
184
185   /**
186    * Current tail of priority queue.
187    */
188   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
189
190   /**
191    * Task for trying to reconnect.
192    */
193   struct GNUNET_SCHEDULER_Task *reconnect_task;
194
195   /**
196    * How quickly should we retry?  Used for exponential back-off on
197    * connect-errors.
198    */
199   struct GNUNET_TIME_Relative retry_time;
200
201   /**
202    * Number of entries in the queue.
203    */
204   unsigned int queue_size;
205
206   /**
207    * Number of results we're receiving for the current query
208    * after application stopped to care.  Used to determine when
209    * to reset the connection.
210    */
211   unsigned int result_count;
212
213   /**
214    * We should ignore the next message(s) from the service.
215    */
216   unsigned int skip_next_messages;
217
218 };
219
220
221 /**
222  * Try reconnecting to the datastore service.
223  *
224  * @param cls the `struct GNUNET_DATASTORE_Handle`
225  */
226 static void
227 try_reconnect (void *cls);
228
229
230 /**
231  * Disconnect from the service and then try reconnecting to the datastore service
232  * after some delay.
233  *
234  * @param h handle to datastore to disconnect and reconnect
235  */
236 static void
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238 {
239   if (NULL == h->mq)
240   {
241     GNUNET_break (0);
242     return;
243   }
244   GNUNET_MQ_destroy (h->mq);
245   h->mq = NULL;
246   h->skip_next_messages = 0;
247   h->reconnect_task
248     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249                                     &try_reconnect,
250                                     h);
251 }
252
253
254 /**
255  * Free a queue entry.  Removes the given entry from the
256  * queue and releases associated resources.  Does NOT
257  * call the callback.
258  *
259  * @param qe entry to free.
260  */
261 static void
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263 {
264   struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266   GNUNET_CONTAINER_DLL_remove (h->queue_head,
267                                h->queue_tail,
268                                qe);
269   h->queue_size--;
270   if (NULL != qe->env)
271     GNUNET_MQ_discard (qe->env);
272   GNUNET_free (qe);
273 }
274
275
276 /**
277  * Handle error in sending drop request to datastore.
278  *
279  * @param cls closure with the datastore handle
280  * @param error error code
281  */
282 static void
283 mq_error_handler (void *cls,
284                   enum GNUNET_MQ_Error error)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_DATASTORE_QueueEntry *qe;
288
289   LOG (GNUNET_ERROR_TYPE_DEBUG,
290        "MQ error, reconnecting to DATASTORE\n");
291   do_disconnect (h);
292   qe = h->queue_head;
293   if ( (NULL != qe) &&
294        (NULL == qe->env) )
295   {
296     union QueueContext qc = qe->qc;
297     uint16_t rt = qe->response_type;
298
299     LOG (GNUNET_ERROR_TYPE_DEBUG,
300          "Failed to receive response from database.\n");
301     free_queue_entry (qe);
302     switch (rt)
303     {
304     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
305       if (NULL != qc.sc.cont)
306         qc.sc.cont (qc.sc.cont_cls,
307                     GNUNET_SYSERR,
308                     GNUNET_TIME_UNIT_ZERO_ABS,
309                     _("DATASTORE disconnected"));
310       break;
311     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
312       if (NULL != qc.rc.proc)
313         qc.rc.proc (qc.rc.proc_cls,
314                     NULL,
315                     0,
316                     NULL, 0, 0, 0,
317                     GNUNET_TIME_UNIT_ZERO_ABS,
318                     0);
319       break;
320     default:
321       GNUNET_break (0);
322     }
323   }
324 }
325
326
327 /**
328  * Connect to the datastore service.
329  *
330  * @param cfg configuration to use
331  * @return handle to use to access the service
332  */
333 struct GNUNET_DATASTORE_Handle *
334 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
335 {
336   struct GNUNET_DATASTORE_Handle *h;
337
338   LOG (GNUNET_ERROR_TYPE_DEBUG,
339        "Establishing DATASTORE connection!\n");
340   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
341   h->cfg = cfg;
342   try_reconnect (h);
343   if (NULL == h->mq)
344   {
345     GNUNET_free (h);
346     return NULL;
347   }
348   h->stats = GNUNET_STATISTICS_create ("datastore-api",
349                                        cfg);
350   return h;
351 }
352
353
354 /**
355  * Task used by to disconnect from the datastore after
356  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
357  *
358  * @param cls the datastore handle
359  */
360 static void
361 disconnect_after_drop (void *cls)
362 {
363   struct GNUNET_DATASTORE_Handle *h = cls;
364
365   LOG (GNUNET_ERROR_TYPE_DEBUG,
366        "Drop sent, disconnecting\n");
367   GNUNET_DATASTORE_disconnect (h,
368                                GNUNET_NO);
369 }
370
371
372 /**
373  * Handle error in sending drop request to datastore.
374  *
375  * @param cls closure with the datastore handle
376  * @param error error code
377  */
378 static void
379 disconnect_on_mq_error (void *cls,
380                         enum GNUNET_MQ_Error error)
381 {
382   struct GNUNET_DATASTORE_Handle *h = cls;
383
384   LOG (GNUNET_ERROR_TYPE_ERROR,
385        "Failed to ask datastore to drop tables\n");
386   GNUNET_DATASTORE_disconnect (h,
387                                GNUNET_NO);
388 }
389
390
391 /**
392  * Disconnect from the datastore service (and free
393  * associated resources).
394  *
395  * @param h handle to the datastore
396  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
397  */
398 void
399 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
400                              int drop)
401 {
402   struct GNUNET_DATASTORE_QueueEntry *qe;
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Datastore disconnect\n");
406   if (NULL != h->mq)
407   {
408     GNUNET_MQ_destroy (h->mq);
409     h->mq = NULL;
410   }
411   if (NULL != h->reconnect_task)
412   {
413     GNUNET_SCHEDULER_cancel (h->reconnect_task);
414     h->reconnect_task = NULL;
415   }
416   while (NULL != (qe = h->queue_head))
417   {
418     switch (qe->response_type)
419     {
420     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
421       if (NULL != qe->qc.sc.cont)
422         qe->qc.sc.cont (qe->qc.sc.cont_cls,
423                         GNUNET_SYSERR,
424                         GNUNET_TIME_UNIT_ZERO_ABS,
425                         _("Disconnected from DATASTORE"));
426       break;
427     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
428       if (NULL != qe->qc.rc.proc)
429         qe->qc.rc.proc (qe->qc.rc.proc_cls,
430                         NULL,
431                         0,
432                         NULL, 0, 0, 0,
433                         GNUNET_TIME_UNIT_ZERO_ABS,
434                         0);
435       break;
436     default:
437       GNUNET_break (0);
438     }
439     free_queue_entry (qe);
440   }
441   if (GNUNET_YES == drop)
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "Re-connecting to issue DROP!\n");
445     GNUNET_assert (NULL == h->mq);
446     h->mq = GNUNET_CLIENT_connect (h->cfg,
447                                    "datastore",
448                                    NULL,
449                                    &disconnect_on_mq_error,
450                                    h);
451     if (NULL != h->mq)
452     {
453       struct GNUNET_MessageHeader *hdr;
454       struct GNUNET_MQ_Envelope *env;
455
456       env = GNUNET_MQ_msg (hdr,
457                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
458       GNUNET_MQ_notify_sent (env,
459                              &disconnect_after_drop,
460                              h);
461       GNUNET_MQ_send (h->mq,
462                       env);
463       return;
464     }
465     GNUNET_break (0);
466   }
467   GNUNET_STATISTICS_destroy (h->stats,
468                              GNUNET_NO);
469   h->stats = NULL;
470   GNUNET_free (h);
471 }
472
473
474 /**
475  * Create a new entry for our priority queue (and possibly discard other entires if
476  * the queue is getting too long).
477  *
478  * @param h handle to the datastore
479  * @param env envelope with the message to queue
480  * @param queue_priority priority of the entry
481  * @param max_queue_size at what queue size should this request be dropped
482  *        (if other requests of higher priority are in the queue)
483  * @param expected_type which type of response do we expect,
484  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
485  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
486  * @param qc client context (NOT a closure for @a response_proc)
487  * @return NULL if the queue is full
488  */
489 static struct GNUNET_DATASTORE_QueueEntry *
490 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
491                   struct GNUNET_MQ_Envelope *env,
492                   unsigned int queue_priority,
493                   unsigned int max_queue_size,
494                   uint16_t expected_type,
495                   const union QueueContext *qc)
496 {
497   struct GNUNET_DATASTORE_QueueEntry *qe;
498   struct GNUNET_DATASTORE_QueueEntry *pos;
499   unsigned int c;
500
501   if ( (h->queue_size == max_queue_size) &&
502        (h->queue_tail->priority >= queue_priority) )
503   {
504     GNUNET_STATISTICS_update (h->stats,
505                               gettext_noop ("# queue overflows"),
506                               1,
507                               GNUNET_NO);
508     GNUNET_MQ_discard (env);
509     return NULL;
510   }
511
512   c = 0;
513   pos = h->queue_head;
514   while ( (NULL != pos) &&
515           (c < max_queue_size) &&
516           (pos->priority >= queue_priority) )
517   {
518     c++;
519     pos = pos->next;
520   }
521   if (c >= max_queue_size)
522   {
523     GNUNET_STATISTICS_update (h->stats,
524                               gettext_noop ("# queue overflows"),
525                               1,
526                               GNUNET_NO);
527     GNUNET_MQ_discard (env);
528     return NULL;
529   }
530   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
531   qe->h = h;
532   qe->env = env;
533   qe->response_type = expected_type;
534   qe->qc = *qc;
535   qe->priority = queue_priority;
536   qe->max_queue = max_queue_size;
537   if (NULL == pos)
538   {
539     /* append at the tail */
540     pos = h->queue_tail;
541   }
542   else
543   {
544     pos = pos->prev;
545     /* do not insert at HEAD if HEAD query was already
546      * transmitted and we are still receiving replies! */
547     if ( (NULL == pos) &&
548          (NULL == h->queue_head->env) )
549       pos = h->queue_head;
550   }
551   c++;
552 #if INSANE_STATISTICS
553   GNUNET_STATISTICS_update (h->stats,
554                             gettext_noop ("# queue entries created"),
555                             1,
556                             GNUNET_NO);
557 #endif
558   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
559                                      h->queue_tail,
560                                      pos,
561                                      qe);
562   h->queue_size++;
563   return qe;
564 }
565
566
567 /**
568  * Process entries in the queue (or do nothing if we are already
569  * doing so).
570  *
571  * @param h handle to the datastore
572  */
573 static void
574 process_queue (struct GNUNET_DATASTORE_Handle *h)
575 {
576   struct GNUNET_DATASTORE_QueueEntry *qe;
577
578   if (NULL == (qe = h->queue_head))
579   {
580     /* no entry in queue */
581     LOG (GNUNET_ERROR_TYPE_DEBUG,
582          "Queue empty\n");
583     return;
584   }
585   if (NULL == qe->env)
586   {
587     /* waiting for replies */
588     LOG (GNUNET_ERROR_TYPE_DEBUG,
589          "Head request already transmitted\n");
590     return;
591   }
592   if (NULL == h->mq)
593   {
594     /* waiting for reconnect */
595     LOG (GNUNET_ERROR_TYPE_DEBUG,
596          "Not connected\n");
597     return;
598   }
599   GNUNET_MQ_send (h->mq,
600                   qe->env);
601   qe->env = NULL;
602 }
603
604
605
606
607 /**
608  * Function called to check status message from the service.
609  *
610  * @param cls closure
611  * @param sm status message received
612  * @return #GNUNET_OK if the message is well-formed
613  */
614 static int
615 check_status (void *cls,
616               const struct StatusMessage *sm)
617 {
618   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
619   int32_t status = ntohl (sm->status);
620
621   if (msize > 0)
622   {
623     const char *emsg = (const char *) &sm[1];
624
625     if ('\0' != emsg[msize - 1])
626     {
627       GNUNET_break (0);
628       return GNUNET_SYSERR;
629     }
630   }
631   else if (GNUNET_SYSERR == status)
632   {
633     GNUNET_break (0);
634     return GNUNET_SYSERR;
635   }
636   return GNUNET_OK;
637 }
638
639
640 /**
641  * Function called to handle status message from the service.
642  *
643  * @param cls closure
644  * @param sm status message received
645  */
646 static void
647 handle_status (void *cls,
648                const struct StatusMessage *sm)
649 {
650   struct GNUNET_DATASTORE_Handle *h = cls;
651   struct GNUNET_DATASTORE_QueueEntry *qe;
652   struct StatusContext rc;
653   const char *emsg;
654   int32_t status = ntohl (sm->status);
655
656   if (h->skip_next_messages > 0)
657   {
658     h->skip_next_messages--;
659     process_queue (h);
660     return;
661   }
662   if (NULL == (qe = h->queue_head))
663   {
664     GNUNET_break (0);
665     do_disconnect (h);
666     return;
667   }
668   if (NULL != qe->env)
669   {
670     GNUNET_break (0);
671     do_disconnect (h);
672     return;
673   }
674   if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
675   {
676     GNUNET_break (0);
677     do_disconnect (h);
678     return;
679   }
680   rc = qe->qc.sc;
681   free_queue_entry (qe);
682   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
683     emsg = (const char *) &sm[1];
684   else
685     emsg = NULL;
686   LOG (GNUNET_ERROR_TYPE_DEBUG,
687        "Received status %d/%s\n",
688        (int) status,
689        emsg);
690   GNUNET_STATISTICS_update (h->stats,
691                             gettext_noop ("# status messages received"),
692                             1,
693                             GNUNET_NO);
694   h->retry_time = GNUNET_TIME_UNIT_ZERO;
695   process_queue (h);
696   if (NULL != rc.cont)
697     rc.cont (rc.cont_cls,
698              status,
699              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
700              emsg);
701 }
702
703
704 /**
705  * Check data message we received from the service.
706  *
707  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
708  * @param dm message received
709  */
710 static int
711 check_data (void *cls,
712             const struct DataMessage *dm)
713 {
714   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
715
716   if (msize != ntohl (dm->size))
717   {
718     GNUNET_break (0);
719     return GNUNET_SYSERR;
720   }
721   return GNUNET_OK;
722 }
723
724
725 /**
726  * Handle data message we got from the service.
727  *
728  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
729  * @param dm message received
730  */
731 static void
732 handle_data (void *cls,
733              const struct DataMessage *dm)
734 {
735   struct GNUNET_DATASTORE_Handle *h = cls;
736   struct GNUNET_DATASTORE_QueueEntry *qe;
737   struct ResultContext rc;
738
739   if (h->skip_next_messages > 0)
740   {
741     process_queue (h);
742     return;
743   }
744   qe = h->queue_head;
745   if (NULL == qe)
746   {
747     GNUNET_break (0);
748     do_disconnect (h);
749     return;
750   }
751   if (NULL != qe->env)
752   {
753     GNUNET_break (0);
754     do_disconnect (h);
755     return;
756   }
757   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
758   {
759     GNUNET_break (0);
760     do_disconnect (h);
761     return;
762   }
763 #if INSANE_STATISTICS
764   GNUNET_STATISTICS_update (h->stats,
765                             gettext_noop ("# Results received"),
766                             1,
767                             GNUNET_NO);
768 #endif
769   LOG (GNUNET_ERROR_TYPE_DEBUG,
770        "Received result %llu with type %u and size %u with key %s\n",
771        (unsigned long long) GNUNET_ntohll (dm->uid),
772        ntohl (dm->type),
773        ntohl (dm->size),
774        GNUNET_h2s (&dm->key));
775   rc = qe->qc.rc;
776   free_queue_entry (qe);
777   h->retry_time = GNUNET_TIME_UNIT_ZERO;
778   process_queue (h);
779   if (NULL != rc.proc)
780     rc.proc (rc.proc_cls,
781              &dm->key,
782              ntohl (dm->size),
783              &dm[1],
784              ntohl (dm->type),
785              ntohl (dm->priority),
786              ntohl (dm->anonymity),
787              GNUNET_TIME_absolute_ntoh (dm->expiration),
788              GNUNET_ntohll (dm->uid));
789 }
790
791
792 /**
793  * Type of a function to call when we receive a
794  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
795  *
796  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
797  * @param msg message received
798  */
799 static void
800 handle_data_end (void *cls,
801                  const struct GNUNET_MessageHeader *msg)
802 {
803   struct GNUNET_DATASTORE_Handle *h = cls;
804   struct GNUNET_DATASTORE_QueueEntry *qe;
805   struct ResultContext rc;
806
807   if (h->skip_next_messages > 0)
808   {
809     h->skip_next_messages--;
810     process_queue (h);
811     return;
812   }
813   qe = h->queue_head;
814   if (NULL == qe)
815   {
816     GNUNET_break (0);
817     do_disconnect (h);
818     return;
819   }
820   if (NULL != qe->env)
821   {
822     GNUNET_break (0);
823     do_disconnect (h);
824     return;
825   }
826   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
827   {
828     GNUNET_break (0);
829     do_disconnect (h);
830     return;
831   }
832   rc = qe->qc.rc;
833   free_queue_entry (qe);
834   LOG (GNUNET_ERROR_TYPE_DEBUG,
835        "Received end of result set, new queue size is %u\n",
836        h->queue_size);
837   h->retry_time = GNUNET_TIME_UNIT_ZERO;
838   h->result_count = 0;
839   process_queue (h);
840   /* signal end of iteration */
841   if (NULL != rc.proc)
842     rc.proc (rc.proc_cls,
843              NULL,
844              0,
845              NULL,
846              0,
847              0,
848              0,
849              GNUNET_TIME_UNIT_ZERO_ABS,
850              0);
851 }
852
853
854 /**
855  * Try reconnecting to the datastore service.
856  *
857  * @param cls the `struct GNUNET_DATASTORE_Handle`
858  */
859 static void
860 try_reconnect (void *cls)
861 {
862   struct GNUNET_DATASTORE_Handle *h = cls;
863   struct GNUNET_MQ_MessageHandler handlers[] = {
864     GNUNET_MQ_hd_var_size (status,
865                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
866                            struct StatusMessage,
867                            h),
868     GNUNET_MQ_hd_var_size (data,
869                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
870                            struct DataMessage,
871                            h),
872     GNUNET_MQ_hd_fixed_size (data_end,
873                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
874                              struct GNUNET_MessageHeader,
875                              h),
876     GNUNET_MQ_handler_end ()
877   };
878
879   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
880   h->reconnect_task = NULL;
881   GNUNET_assert (NULL == h->mq);
882   h->mq = GNUNET_CLIENT_connect (h->cfg,
883                                  "datastore",
884                                  handlers,
885                                  &mq_error_handler,
886                                  h);
887   if (NULL == h->mq)
888     return;
889   GNUNET_STATISTICS_update (h->stats,
890                             gettext_noop ("# datastore connections (re)created"),
891                             1,
892                             GNUNET_NO);
893   LOG (GNUNET_ERROR_TYPE_DEBUG,
894        "Reconnected to DATASTORE\n");
895   process_queue (h);
896 }
897
898
899 /**
900  * Dummy continuation used to do nothing (but be non-zero).
901  *
902  * @param cls closure
903  * @param result result
904  * @param min_expiration expiration time
905  * @param emsg error message
906  */
907 static void
908 drop_status_cont (void *cls,
909                   int32_t result,
910                   struct GNUNET_TIME_Absolute min_expiration,
911                   const char *emsg)
912 {
913   /* do nothing */
914 }
915
916
917 /**
918  * Store an item in the datastore.  If the item is already present,
919  * the priorities are summed up and the higher expiration time and
920  * lower anonymity level is used.
921  *
922  * @param h handle to the datastore
923  * @param rid reservation ID to use (from "reserve"); use 0 if no
924  *            prior reservation was made
925  * @param key key for the value
926  * @param size number of bytes in data
927  * @param data content stored
928  * @param type type of the content
929  * @param priority priority of the content
930  * @param anonymity anonymity-level for the content
931  * @param replication how often should the content be replicated to other peers?
932  * @param expiration expiration time for the content
933  * @param queue_priority ranking of this request in the priority queue
934  * @param max_queue_size at what queue size should this request be dropped
935  *        (if other requests of higher priority are in the queue)
936  * @param cont continuation to call when done
937  * @param cont_cls closure for @a cont
938  * @return NULL if the entry was not queued, otherwise a handle that can be used to
939  *         cancel; note that even if NULL is returned, the callback will be invoked
940  *         (or rather, will already have been invoked)
941  */
942 struct GNUNET_DATASTORE_QueueEntry *
943 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
944                       uint32_t rid,
945                       const struct GNUNET_HashCode *key,
946                       size_t size,
947                       const void *data,
948                       enum GNUNET_BLOCK_Type type,
949                       uint32_t priority,
950                       uint32_t anonymity,
951                       uint32_t replication,
952                       struct GNUNET_TIME_Absolute expiration,
953                       unsigned int queue_priority,
954                       unsigned int max_queue_size,
955                       GNUNET_DATASTORE_ContinuationWithStatus cont,
956                       void *cont_cls)
957 {
958   struct GNUNET_DATASTORE_QueueEntry *qe;
959   struct GNUNET_MQ_Envelope *env;
960   struct DataMessage *dm;
961   union QueueContext qc;
962
963   if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
964   {
965     GNUNET_break (0);
966     return NULL;
967   }
968
969   LOG (GNUNET_ERROR_TYPE_DEBUG,
970        "Asked to put %u bytes of data under key `%s' for %s\n",
971        size,
972        GNUNET_h2s (key),
973        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
974                                                GNUNET_YES));
975   env = GNUNET_MQ_msg_extra (dm,
976                              size,
977                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
978   dm->rid = htonl (rid);
979   dm->size = htonl ((uint32_t) size);
980   dm->type = htonl (type);
981   dm->priority = htonl (priority);
982   dm->anonymity = htonl (anonymity);
983   dm->replication = htonl (replication);
984   dm->reserved = htonl (0);
985   dm->uid = GNUNET_htonll (0);
986   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
987   dm->key = *key;
988   GNUNET_memcpy (&dm[1],
989           data,
990           size);
991   qc.sc.cont = cont;
992   qc.sc.cont_cls = cont_cls;
993   qe = make_queue_entry (h,
994                          env,
995                          queue_priority,
996                          max_queue_size,
997                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
998                          &qc);
999   if (NULL == qe)
1000   {
1001     LOG (GNUNET_ERROR_TYPE_DEBUG,
1002          "Could not create queue entry for PUT\n");
1003     return NULL;
1004   }
1005   GNUNET_STATISTICS_update (h->stats,
1006                             gettext_noop ("# PUT requests executed"),
1007                             1,
1008                             GNUNET_NO);
1009   process_queue (h);
1010   return qe;
1011 }
1012
1013
1014 /**
1015  * Reserve space in the datastore.  This function should be used
1016  * to avoid "out of space" failures during a longer sequence of "put"
1017  * operations (for example, when a file is being inserted).
1018  *
1019  * @param h handle to the datastore
1020  * @param amount how much space (in bytes) should be reserved (for content only)
1021  * @param entries how many entries will be created (to calculate per-entry overhead)
1022  * @param cont continuation to call when done; "success" will be set to
1023  *             a positive reservation value if space could be reserved.
1024  * @param cont_cls closure for @a cont
1025  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1026  *         cancel; note that even if NULL is returned, the callback will be invoked
1027  *         (or rather, will already have been invoked)
1028  */
1029 struct GNUNET_DATASTORE_QueueEntry *
1030 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1031                           uint64_t amount,
1032                           uint32_t entries,
1033                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1034                           void *cont_cls)
1035 {
1036   struct GNUNET_DATASTORE_QueueEntry *qe;
1037   struct GNUNET_MQ_Envelope *env;
1038   struct ReserveMessage *rm;
1039   union QueueContext qc;
1040
1041   if (NULL == cont)
1042     cont = &drop_status_cont;
1043   LOG (GNUNET_ERROR_TYPE_DEBUG,
1044        "Asked to reserve %llu bytes of data and %u entries\n",
1045        (unsigned long long) amount,
1046        (unsigned int) entries);
1047   env = GNUNET_MQ_msg (rm,
1048                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1049   rm->entries = htonl (entries);
1050   rm->amount = GNUNET_htonll (amount);
1051
1052   qc.sc.cont = cont;
1053   qc.sc.cont_cls = cont_cls;
1054   qe = make_queue_entry (h,
1055                          env,
1056                          UINT_MAX,
1057                          UINT_MAX,
1058                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1059                          &qc);
1060   if (NULL == qe)
1061   {
1062     LOG (GNUNET_ERROR_TYPE_DEBUG,
1063          "Could not create queue entry to reserve\n");
1064     return NULL;
1065   }
1066   GNUNET_STATISTICS_update (h->stats,
1067                             gettext_noop ("# RESERVE requests executed"),
1068                             1,
1069                             GNUNET_NO);
1070   process_queue (h);
1071   return qe;
1072 }
1073
1074
1075 /**
1076  * Signal that all of the data for which a reservation was made has
1077  * been stored and that whatever excess space might have been reserved
1078  * can now be released.
1079  *
1080  * @param h handle to the datastore
1081  * @param rid reservation ID (value of "success" in original continuation
1082  *        from the "reserve" function).
1083  * @param queue_priority ranking of this request in the priority queue
1084  * @param max_queue_size at what queue size should this request be dropped
1085  *        (if other requests of higher priority are in the queue)
1086  * @param queue_priority ranking of this request in the priority queue
1087  * @param max_queue_size at what queue size should this request be dropped
1088  *        (if other requests of higher priority are in the queue)
1089  * @param cont continuation to call when done
1090  * @param cont_cls closure for @a cont
1091  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1092  *         cancel; note that even if NULL is returned, the callback will be invoked
1093  *         (or rather, will already have been invoked)
1094  */
1095 struct GNUNET_DATASTORE_QueueEntry *
1096 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1097                                   uint32_t rid,
1098                                   unsigned int queue_priority,
1099                                   unsigned int max_queue_size,
1100                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1101                                   void *cont_cls)
1102 {
1103   struct GNUNET_DATASTORE_QueueEntry *qe;
1104   struct GNUNET_MQ_Envelope *env;
1105   struct ReleaseReserveMessage *rrm;
1106   union QueueContext qc;
1107
1108   if (NULL == cont)
1109     cont = &drop_status_cont;
1110   LOG (GNUNET_ERROR_TYPE_DEBUG,
1111        "Asked to release reserve %d\n",
1112        rid);
1113   env = GNUNET_MQ_msg (rrm,
1114                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1115   rrm->rid = htonl (rid);
1116   qc.sc.cont = cont;
1117   qc.sc.cont_cls = cont_cls;
1118   qe = make_queue_entry (h,
1119                          env,
1120                          queue_priority,
1121                          max_queue_size,
1122                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1123                          &qc);
1124   if (NULL == qe)
1125   {
1126     LOG (GNUNET_ERROR_TYPE_DEBUG,
1127          "Could not create queue entry to release reserve\n");
1128     return NULL;
1129   }
1130   GNUNET_STATISTICS_update (h->stats,
1131                             gettext_noop
1132                             ("# RELEASE RESERVE requests executed"), 1,
1133                             GNUNET_NO);
1134   process_queue (h);
1135   return qe;
1136 }
1137
1138
1139 /**
1140  * Explicitly remove some content from the database.
1141  * The @a cont continuation will be called with `status`
1142  * #GNUNET_OK" if content was removed, #GNUNET_NO
1143  * if no matching entry was found and #GNUNET_SYSERR
1144  * on all other types of errors.
1145  *
1146  * @param h handle to the datastore
1147  * @param key key for the value
1148  * @param size number of bytes in data
1149  * @param data content stored
1150  * @param queue_priority ranking of this request in the priority queue
1151  * @param max_queue_size at what queue size should this request be dropped
1152  *        (if other requests of higher priority are in the queue)
1153  * @param cont continuation to call when done
1154  * @param cont_cls closure for @a cont
1155  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1156  *         cancel; note that even if NULL is returned, the callback will be invoked
1157  *         (or rather, will already have been invoked)
1158  */
1159 struct GNUNET_DATASTORE_QueueEntry *
1160 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1161                          const struct GNUNET_HashCode *key,
1162                          size_t size,
1163                          const void *data,
1164                          unsigned int queue_priority,
1165                          unsigned int max_queue_size,
1166                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1167                          void *cont_cls)
1168 {
1169   struct GNUNET_DATASTORE_QueueEntry *qe;
1170   struct DataMessage *dm;
1171   struct GNUNET_MQ_Envelope *env;
1172   union QueueContext qc;
1173
1174   if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1175   {
1176     GNUNET_break (0);
1177     return NULL;
1178   }
1179   if (NULL == cont)
1180     cont = &drop_status_cont;
1181   LOG (GNUNET_ERROR_TYPE_DEBUG,
1182        "Asked to remove %u bytes under key `%s'\n",
1183        size,
1184        GNUNET_h2s (key));
1185   env = GNUNET_MQ_msg_extra (dm,
1186                              size,
1187                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1188   dm->rid = htonl (0);
1189   dm->size = htonl (size);
1190   dm->type = htonl (0);
1191   dm->priority = htonl (0);
1192   dm->anonymity = htonl (0);
1193   dm->uid = GNUNET_htonll (0);
1194   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1195   dm->key = *key;
1196   GNUNET_memcpy (&dm[1],
1197           data,
1198           size);
1199
1200   qc.sc.cont = cont;
1201   qc.sc.cont_cls = cont_cls;
1202
1203   qe = make_queue_entry (h,
1204                          env,
1205                          queue_priority,
1206                          max_queue_size,
1207                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1208                          &qc);
1209   if (NULL == qe)
1210   {
1211     LOG (GNUNET_ERROR_TYPE_DEBUG,
1212          "Could not create queue entry for REMOVE\n");
1213     return NULL;
1214   }
1215   GNUNET_STATISTICS_update (h->stats,
1216                             gettext_noop ("# REMOVE requests executed"),
1217                             1,
1218                             GNUNET_NO);
1219   process_queue (h);
1220   return qe;
1221 }
1222
1223
1224
1225 /**
1226  * Get a random value from the datastore for content replication.
1227  * Returns a single, random value among those with the highest
1228  * replication score, lowering positive replication scores by one for
1229  * the chosen value (if only content with a replication score exists,
1230  * a random value is returned and replication scores are not changed).
1231  *
1232  * @param h handle to the datastore
1233  * @param queue_priority ranking of this request in the priority queue
1234  * @param max_queue_size at what queue size should this request be dropped
1235  *        (if other requests of higher priority are in the queue)
1236  * @param proc function to call on a random value; it
1237  *        will be called once with a value (if available)
1238  *        and always once with a value of NULL.
1239  * @param proc_cls closure for @a proc
1240  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1241  *         cancel
1242  */
1243 struct GNUNET_DATASTORE_QueueEntry *
1244 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1245                                       unsigned int queue_priority,
1246                                       unsigned int max_queue_size,
1247                                       GNUNET_DATASTORE_DatumProcessor proc,
1248                                       void *proc_cls)
1249 {
1250   struct GNUNET_DATASTORE_QueueEntry *qe;
1251   struct GNUNET_MQ_Envelope *env;
1252   struct GNUNET_MessageHeader *m;
1253   union QueueContext qc;
1254
1255   GNUNET_assert (NULL != proc);
1256   LOG (GNUNET_ERROR_TYPE_DEBUG,
1257        "Asked to get replication entry\n");
1258   env = GNUNET_MQ_msg (m,
1259                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1260   qc.rc.proc = proc;
1261   qc.rc.proc_cls = proc_cls;
1262   qe = make_queue_entry (h,
1263                          env,
1264                          queue_priority,
1265                          max_queue_size,
1266                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1267                          &qc);
1268   if (NULL == qe)
1269   {
1270     LOG (GNUNET_ERROR_TYPE_DEBUG,
1271          "Could not create queue entry for GET REPLICATION\n");
1272     return NULL;
1273   }
1274   GNUNET_STATISTICS_update (h->stats,
1275                             gettext_noop
1276                             ("# GET REPLICATION requests executed"), 1,
1277                             GNUNET_NO);
1278   process_queue (h);
1279   return qe;
1280 }
1281
1282
1283 /**
1284  * Get a single zero-anonymity value from the datastore.
1285  *
1286  * @param h handle to the datastore
1287  * @param offset offset of the result (modulo num-results); set to
1288  *               a random 64-bit value initially; then increment by
1289  *               one each time; detect that all results have been found by uid
1290  *               being again the first uid ever returned.
1291  * @param queue_priority ranking of this request in the priority queue
1292  * @param max_queue_size at what queue size should this request be dropped
1293  *        (if other requests of higher priority are in the queue)
1294  * @param type allowed type for the operation (never zero)
1295  * @param proc function to call on a random value; it
1296  *        will be called once with a value (if available)
1297  *        or with NULL if none value exists.
1298  * @param proc_cls closure for @a proc
1299  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1300  *         cancel
1301  */
1302 struct GNUNET_DATASTORE_QueueEntry *
1303 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1304                                      uint64_t offset,
1305                                      unsigned int queue_priority,
1306                                      unsigned int max_queue_size,
1307                                      enum GNUNET_BLOCK_Type type,
1308                                      GNUNET_DATASTORE_DatumProcessor proc,
1309                                      void *proc_cls)
1310 {
1311   struct GNUNET_DATASTORE_QueueEntry *qe;
1312   struct GNUNET_MQ_Envelope *env;
1313   struct GetZeroAnonymityMessage *m;
1314   union QueueContext qc;
1315
1316   GNUNET_assert (NULL != proc);
1317   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1318   LOG (GNUNET_ERROR_TYPE_DEBUG,
1319        "Asked to get %llu-th zero-anonymity entry of type %d\n",
1320        (unsigned long long) offset,
1321        type);
1322   env = GNUNET_MQ_msg (m,
1323                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1324   m->type = htonl ((uint32_t) type);
1325   m->offset = GNUNET_htonll (offset);
1326   qc.rc.proc = proc;
1327   qc.rc.proc_cls = proc_cls;
1328   qe = make_queue_entry (h,
1329                          env,
1330                          queue_priority,
1331                          max_queue_size,
1332                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1333                          &qc);
1334   if (NULL == qe)
1335   {
1336     LOG (GNUNET_ERROR_TYPE_DEBUG,
1337          "Could not create queue entry for zero-anonymity procation\n");
1338     return NULL;
1339   }
1340   GNUNET_STATISTICS_update (h->stats,
1341                             gettext_noop
1342                             ("# GET ZERO ANONYMITY requests executed"), 1,
1343                             GNUNET_NO);
1344   process_queue (h);
1345   return qe;
1346 }
1347
1348
1349 /**
1350  * Get a result for a particular key from the datastore.  The processor
1351  * will only be called once.
1352  *
1353  * @param h handle to the datastore
1354  * @param offset offset of the result (modulo num-results); set to
1355  *               a random 64-bit value initially; then increment by
1356  *               one each time; detect that all results have been found by uid
1357  *               being again the first uid ever returned.
1358  * @param key maybe NULL (to match all entries)
1359  * @param type desired type, 0 for any
1360  * @param queue_priority ranking of this request in the priority queue
1361  * @param max_queue_size at what queue size should this request be dropped
1362  *        (if other requests of higher priority are in the queue)
1363  * @param proc function to call on each matching value;
1364  *        will be called once with a NULL value at the end
1365  * @param proc_cls closure for @a proc
1366  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1367  *         cancel
1368  */
1369 struct GNUNET_DATASTORE_QueueEntry *
1370 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1371                           uint64_t offset,
1372                           const struct GNUNET_HashCode *key,
1373                           enum GNUNET_BLOCK_Type type,
1374                           unsigned int queue_priority,
1375                           unsigned int max_queue_size,
1376                           GNUNET_DATASTORE_DatumProcessor proc,
1377                           void *proc_cls)
1378 {
1379   struct GNUNET_DATASTORE_QueueEntry *qe;
1380   struct GNUNET_MQ_Envelope *env;
1381   struct GetKeyMessage *gkm;
1382   struct GetMessage *gm;
1383   union QueueContext qc;
1384
1385   GNUNET_assert (NULL != proc);
1386   LOG (GNUNET_ERROR_TYPE_DEBUG,
1387        "Asked to look for data of type %u under key `%s'\n",
1388        (unsigned int) type,
1389        GNUNET_h2s (key));
1390   if (NULL == key)
1391   {
1392     env = GNUNET_MQ_msg (gm,
1393                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1394     gm->type = htonl (type);
1395     gm->offset = GNUNET_htonll (offset);
1396   }
1397   else
1398   {
1399     env = GNUNET_MQ_msg (gkm,
1400                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1401     gkm->type = htonl (type);
1402     gkm->offset = GNUNET_htonll (offset);
1403     gkm->key = *key;
1404   }
1405   qc.rc.proc = proc;
1406   qc.rc.proc_cls = proc_cls;
1407   qe = make_queue_entry (h,
1408                          env,
1409                          queue_priority,
1410                          max_queue_size,
1411                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1412                          &qc);
1413   if (NULL == qe)
1414   {
1415     LOG (GNUNET_ERROR_TYPE_DEBUG,
1416          "Could not queue request for `%s'\n",
1417          GNUNET_h2s (key));
1418     return NULL;
1419   }
1420 #if INSANE_STATISTICS
1421   GNUNET_STATISTICS_update (h->stats,
1422                             gettext_noop ("# GET requests executed"),
1423                             1,
1424                             GNUNET_NO);
1425 #endif
1426   process_queue (h);
1427   return qe;
1428 }
1429
1430
1431 /**
1432  * Cancel a datastore operation.  The final callback from the
1433  * operation must not have been done yet.
1434  *
1435  * @param qe operation to cancel
1436  */
1437 void
1438 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1439 {
1440   struct GNUNET_DATASTORE_Handle *h = qe->h;
1441
1442   LOG (GNUNET_ERROR_TYPE_DEBUG,
1443        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1444        qe,
1445        NULL == qe->env,
1446        h->queue_head == qe);
1447   if (NULL == qe->env)
1448   {
1449     free_queue_entry (qe);
1450     h->skip_next_messages++;
1451     return;
1452   }
1453   free_queue_entry (qe);
1454   process_queue (h);
1455 }
1456
1457
1458 /* end of datastore_api.c */