Merge branch 'master' of ssh://gnunet.org/gnunet
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for @e cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for @e proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98 /**
99  * Entry in our priority queue.
100  */
101 struct GNUNET_DATASTORE_QueueEntry
102 {
103
104   /**
105    * This is a linked list.
106    */
107   struct GNUNET_DATASTORE_QueueEntry *next;
108
109   /**
110    * This is a linked list.
111    */
112   struct GNUNET_DATASTORE_QueueEntry *prev;
113
114   /**
115    * Handle to the master context.
116    */
117   struct GNUNET_DATASTORE_Handle *h;
118
119   /**
120    * Function to call after transmission of the request.
121    */
122   GNUNET_DATASTORE_ContinuationWithStatus cont;
123
124   /**
125    * Closure for @e cont.
126    */
127   void *cont_cls;
128
129   /**
130    * Context for the operation.
131    */
132   union QueueContext qc;
133
134   /**
135    * Envelope of the request to transmit, NULL after
136    * transmission.
137    */
138   struct GNUNET_MQ_Envelope *env;
139
140   /**
141    * Priority in the queue.
142    */
143   unsigned int priority;
144
145   /**
146    * Maximum allowed length of queue (otherwise
147    * this request should be discarded).
148    */
149   unsigned int max_queue;
150
151   /**
152    * Expected response type.
153    */
154   uint16_t response_type;
155
156 };
157
158
159 /**
160  * Handle to the datastore service.
161  */
162 struct GNUNET_DATASTORE_Handle
163 {
164
165   /**
166    * Our configuration.
167    */
168   const struct GNUNET_CONFIGURATION_Handle *cfg;
169
170   /**
171    * Current connection to the datastore service.
172    */
173   struct GNUNET_MQ_Handle *mq;
174
175   /**
176    * Handle for statistics.
177    */
178   struct GNUNET_STATISTICS_Handle *stats;
179
180   /**
181    * Current head of priority queue.
182    */
183   struct GNUNET_DATASTORE_QueueEntry *queue_head;
184
185   /**
186    * Current tail of priority queue.
187    */
188   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
189
190   /**
191    * Task for trying to reconnect.
192    */
193   struct GNUNET_SCHEDULER_Task *reconnect_task;
194
195   /**
196    * How quickly should we retry?  Used for exponential back-off on
197    * connect-errors.
198    */
199   struct GNUNET_TIME_Relative retry_time;
200
201   /**
202    * Number of entries in the queue.
203    */
204   unsigned int queue_size;
205
206   /**
207    * Number of results we're receiving for the current query
208    * after application stopped to care.  Used to determine when
209    * to reset the connection.
210    */
211   unsigned int result_count;
212
213   /**
214    * We should ignore the next message(s) from the service.
215    */
216   unsigned int skip_next_messages;
217
218 };
219
220
221 /**
222  * Try reconnecting to the datastore service.
223  *
224  * @param cls the `struct GNUNET_DATASTORE_Handle`
225  */
226 static void
227 try_reconnect (void *cls);
228
229
230 /**
231  * Disconnect from the service and then try reconnecting to the datastore service
232  * after some delay.
233  *
234  * @param h handle to datastore to disconnect and reconnect
235  */
236 static void
237 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
238 {
239   if (NULL == h->mq)
240   {
241     GNUNET_break (0);
242     return;
243   }
244   GNUNET_MQ_destroy (h->mq);
245   h->mq = NULL;
246   h->skip_next_messages = 0;
247   h->reconnect_task
248     = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249                                     &try_reconnect,
250                                     h);
251 }
252
253
254 /**
255  * Free a queue entry.  Removes the given entry from the
256  * queue and releases associated resources.  Does NOT
257  * call the callback.
258  *
259  * @param qe entry to free.
260  */
261 static void
262 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
263 {
264   struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266   GNUNET_CONTAINER_DLL_remove (h->queue_head,
267                                h->queue_tail,
268                                qe);
269   h->queue_size--;
270   if (NULL != qe->env)
271     GNUNET_MQ_discard (qe->env);
272   GNUNET_free (qe);
273 }
274
275
276 /**
277  * Handle error in sending drop request to datastore.
278  *
279  * @param cls closure with the datastore handle
280  * @param error error code
281  */
282 static void
283 mq_error_handler (void *cls,
284                   enum GNUNET_MQ_Error error)
285 {
286   struct GNUNET_DATASTORE_Handle *h = cls;
287   struct GNUNET_DATASTORE_QueueEntry *qe;
288
289   LOG (GNUNET_ERROR_TYPE_DEBUG,
290        "MQ error, reconnecting to DATASTORE\n");
291   do_disconnect (h);
292   qe = h->queue_head;
293   if ( (NULL != qe) &&
294        (NULL == qe->env) )
295   {
296     union QueueContext qc = qe->qc;
297     uint16_t rt = qe->response_type;
298
299     LOG (GNUNET_ERROR_TYPE_DEBUG,
300          "Failed to receive response from database.\n");
301     free_queue_entry (qe);
302     switch (rt)
303     {
304     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
305       if (NULL != qc.sc.cont)
306         qc.sc.cont (qc.sc.cont_cls,
307                     GNUNET_SYSERR,
308                     GNUNET_TIME_UNIT_ZERO_ABS,
309                     _("DATASTORE disconnected"));
310       break;
311     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
312       if (NULL != qc.rc.proc)
313         qc.rc.proc (qc.rc.proc_cls,
314                     NULL,
315                     0,
316                     NULL, 0, 0, 0,
317                     GNUNET_TIME_UNIT_ZERO_ABS,
318                     0);
319       break;
320     default:
321       GNUNET_break (0);
322     }
323   }
324 }
325
326
327 /**
328  * Connect to the datastore service.
329  *
330  * @param cfg configuration to use
331  * @return handle to use to access the service
332  */
333 struct GNUNET_DATASTORE_Handle *
334 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
335 {
336   struct GNUNET_DATASTORE_Handle *h;
337
338   LOG (GNUNET_ERROR_TYPE_DEBUG,
339        "Establishing DATASTORE connection!\n");
340   h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
341   h->cfg = cfg;
342   try_reconnect (h);
343   if (NULL == h->mq)
344   {
345     GNUNET_free (h);
346     return NULL;
347   }
348   h->stats = GNUNET_STATISTICS_create ("datastore-api",
349                                        cfg);
350   return h;
351 }
352
353
354 /**
355  * Task used by to disconnect from the datastore after
356  * we send the #GNUNET_MESSAGE_TYPE_DATASTORE_DROP message.
357  *
358  * @param cls the datastore handle
359  */
360 static void
361 disconnect_after_drop (void *cls)
362 {
363   struct GNUNET_DATASTORE_Handle *h = cls;
364
365   LOG (GNUNET_ERROR_TYPE_DEBUG,
366        "Drop sent, disconnecting\n");
367   GNUNET_DATASTORE_disconnect (h,
368                                GNUNET_NO);
369 }
370
371
372 /**
373  * Handle error in sending drop request to datastore.
374  *
375  * @param cls closure with the datastore handle
376  * @param error error code
377  */
378 static void
379 disconnect_on_mq_error (void *cls,
380                         enum GNUNET_MQ_Error error)
381 {
382   struct GNUNET_DATASTORE_Handle *h = cls;
383
384   LOG (GNUNET_ERROR_TYPE_ERROR,
385        "Failed to ask datastore to drop tables\n");
386   GNUNET_DATASTORE_disconnect (h,
387                                GNUNET_NO);
388 }
389
390
391 /**
392  * Disconnect from the datastore service (and free
393  * associated resources).
394  *
395  * @param h handle to the datastore
396  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
397  */
398 void
399 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
400                              int drop)
401 {
402   struct GNUNET_DATASTORE_QueueEntry *qe;
403
404   LOG (GNUNET_ERROR_TYPE_DEBUG,
405        "Datastore disconnect\n");
406   if (NULL != h->mq)
407   {
408     GNUNET_MQ_destroy (h->mq);
409     h->mq = NULL;
410   }
411   if (NULL != h->reconnect_task)
412   {
413     GNUNET_SCHEDULER_cancel (h->reconnect_task);
414     h->reconnect_task = NULL;
415   }
416   while (NULL != (qe = h->queue_head))
417   {
418     switch (qe->response_type)
419     {
420     case GNUNET_MESSAGE_TYPE_DATASTORE_STATUS:
421       if (NULL != qe->qc.sc.cont)
422         qe->qc.sc.cont (qe->qc.sc.cont_cls,
423                         GNUNET_SYSERR,
424                         GNUNET_TIME_UNIT_ZERO_ABS,
425                         _("Disconnected from DATASTORE"));
426       break;
427     case GNUNET_MESSAGE_TYPE_DATASTORE_DATA:
428       if (NULL != qe->qc.rc.proc)
429         qe->qc.rc.proc (qe->qc.rc.proc_cls,
430                         NULL,
431                         0,
432                         NULL, 0, 0, 0,
433                         GNUNET_TIME_UNIT_ZERO_ABS,
434                         0);
435       break;
436     default:
437       GNUNET_break (0);
438     }
439     free_queue_entry (qe);
440   }
441   if (GNUNET_YES == drop)
442   {
443     LOG (GNUNET_ERROR_TYPE_DEBUG,
444          "Re-connecting to issue DROP!\n");
445     GNUNET_assert (NULL == h->mq);
446     h->mq = GNUNET_CLIENT_connect (h->cfg,
447                                    "datastore",
448                                    NULL,
449                                    &disconnect_on_mq_error,
450                                    h);
451     if (NULL != h->mq)
452     {
453       struct GNUNET_MessageHeader *hdr;
454       struct GNUNET_MQ_Envelope *env;
455
456       env = GNUNET_MQ_msg (hdr,
457                            GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
458       GNUNET_MQ_notify_sent (env,
459                              &disconnect_after_drop,
460                              h);
461       GNUNET_MQ_send (h->mq,
462                       env);
463       return;
464     }
465     GNUNET_break (0);
466   }
467   GNUNET_STATISTICS_destroy (h->stats,
468                              GNUNET_NO);
469   h->stats = NULL;
470   GNUNET_free (h);
471 }
472
473
474 /**
475  * Create a new entry for our priority queue (and possibly discard other entires if
476  * the queue is getting too long).
477  *
478  * @param h handle to the datastore
479  * @param env envelope with the message to queue
480  * @param queue_priority priority of the entry
481  * @param max_queue_size at what queue size should this request be dropped
482  *        (if other requests of higher priority are in the queue)
483  * @param expected_type which type of response do we expect,
484  *        #GNUNET_MESSAGE_TYPE_DATASTORE_STATUS or
485  *        #GNUNET_MESSAGE_TYPE_DATASTORE_DATA
486  * @param qc client context (NOT a closure for @a response_proc)
487  * @return NULL if the queue is full
488  */
489 static struct GNUNET_DATASTORE_QueueEntry *
490 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
491                   struct GNUNET_MQ_Envelope *env,
492                   unsigned int queue_priority,
493                   unsigned int max_queue_size,
494                   uint16_t expected_type,
495                   const union QueueContext *qc)
496 {
497   struct GNUNET_DATASTORE_QueueEntry *qe;
498   struct GNUNET_DATASTORE_QueueEntry *pos;
499   unsigned int c;
500
501   if ( (NULL != h->queue_tail) &&
502        (h->queue_tail->priority >= queue_priority) )
503   {
504     c = h->queue_size;
505     pos = NULL;
506   }
507   else
508   {
509     c = 0;
510     pos = h->queue_head;
511   }
512   while ( (NULL != pos) &&
513           (c < max_queue_size) &&
514           (pos->priority >= queue_priority) )
515   {
516     c++;
517     pos = pos->next;
518   }
519   if (c >= max_queue_size)
520   {
521     GNUNET_STATISTICS_update (h->stats,
522                               gettext_noop ("# queue overflows"),
523                               1,
524                               GNUNET_NO);
525     GNUNET_MQ_discard (env);
526     return NULL;
527   }
528   qe = GNUNET_new (struct GNUNET_DATASTORE_QueueEntry);
529   qe->h = h;
530   qe->env = env;
531   qe->response_type = expected_type;
532   qe->qc = *qc;
533   qe->priority = queue_priority;
534   qe->max_queue = max_queue_size;
535   if (NULL == pos)
536   {
537     /* append at the tail */
538     pos = h->queue_tail;
539   }
540   else
541   {
542     pos = pos->prev;
543     /* do not insert at HEAD if HEAD query was already
544      * transmitted and we are still receiving replies! */
545     if ( (NULL == pos) &&
546          (NULL == h->queue_head->env) )
547       pos = h->queue_head;
548   }
549   c++;
550 #if INSANE_STATISTICS
551   GNUNET_STATISTICS_update (h->stats,
552                             gettext_noop ("# queue entries created"),
553                             1,
554                             GNUNET_NO);
555 #endif
556   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
557                                      h->queue_tail,
558                                      pos,
559                                      qe);
560   h->queue_size++;
561   return qe;
562 }
563
564
565 /**
566  * Process entries in the queue (or do nothing if we are already
567  * doing so).
568  *
569  * @param h handle to the datastore
570  */
571 static void
572 process_queue (struct GNUNET_DATASTORE_Handle *h)
573 {
574   struct GNUNET_DATASTORE_QueueEntry *qe;
575
576   if (NULL == (qe = h->queue_head))
577   {
578     /* no entry in queue */
579     LOG (GNUNET_ERROR_TYPE_DEBUG,
580          "Queue empty\n");
581     return;
582   }
583   if (NULL == qe->env)
584   {
585     /* waiting for replies */
586     LOG (GNUNET_ERROR_TYPE_DEBUG,
587          "Head request already transmitted\n");
588     return;
589   }
590   if (NULL == h->mq)
591   {
592     /* waiting for reconnect */
593     LOG (GNUNET_ERROR_TYPE_DEBUG,
594          "Not connected\n");
595     return;
596   }
597   GNUNET_MQ_send (h->mq,
598                   qe->env);
599   qe->env = NULL;
600 }
601
602
603
604
605 /**
606  * Function called to check status message from the service.
607  *
608  * @param cls closure
609  * @param sm status message received
610  * @return #GNUNET_OK if the message is well-formed
611  */
612 static int
613 check_status (void *cls,
614               const struct StatusMessage *sm)
615 {
616   uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
617   int32_t status = ntohl (sm->status);
618
619   if (msize > 0)
620   {
621     const char *emsg = (const char *) &sm[1];
622
623     if ('\0' != emsg[msize - 1])
624     {
625       GNUNET_break (0);
626       return GNUNET_SYSERR;
627     }
628   }
629   else if (GNUNET_SYSERR == status)
630   {
631     GNUNET_break (0);
632     return GNUNET_SYSERR;
633   }
634   return GNUNET_OK;
635 }
636
637
638 /**
639  * Function called to handle status message from the service.
640  *
641  * @param cls closure
642  * @param sm status message received
643  */
644 static void
645 handle_status (void *cls,
646                const struct StatusMessage *sm)
647 {
648   struct GNUNET_DATASTORE_Handle *h = cls;
649   struct GNUNET_DATASTORE_QueueEntry *qe;
650   struct StatusContext rc;
651   const char *emsg;
652   int32_t status = ntohl (sm->status);
653
654   if (h->skip_next_messages > 0)
655   {
656     h->skip_next_messages--;
657     process_queue (h);
658     return;
659   }
660   if (NULL == (qe = h->queue_head))
661   {
662     GNUNET_break (0);
663     do_disconnect (h);
664     return;
665   }
666   if (NULL != qe->env)
667   {
668     GNUNET_break (0);
669     do_disconnect (h);
670     return;
671   }
672   if (GNUNET_MESSAGE_TYPE_DATASTORE_STATUS != qe->response_type)
673   {
674     GNUNET_break (0);
675     do_disconnect (h);
676     return;
677   }
678   rc = qe->qc.sc;
679   free_queue_entry (qe);
680   if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
681     emsg = (const char *) &sm[1];
682   else
683     emsg = NULL;
684   LOG (GNUNET_ERROR_TYPE_DEBUG,
685        "Received status %d/%s\n",
686        (int) status,
687        emsg);
688   GNUNET_STATISTICS_update (h->stats,
689                             gettext_noop ("# status messages received"),
690                             1,
691                             GNUNET_NO);
692   h->retry_time = GNUNET_TIME_UNIT_ZERO;
693   process_queue (h);
694   if (NULL != rc.cont)
695     rc.cont (rc.cont_cls,
696              status,
697              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
698              emsg);
699 }
700
701
702 /**
703  * Check data message we received from the service.
704  *
705  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
706  * @param dm message received
707  */
708 static int
709 check_data (void *cls,
710             const struct DataMessage *dm)
711 {
712   uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
713
714   if (msize != ntohl (dm->size))
715   {
716     GNUNET_break (0);
717     return GNUNET_SYSERR;
718   }
719   return GNUNET_OK;
720 }
721
722
723 /**
724  * Handle data message we got from the service.
725  *
726  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
727  * @param dm message received
728  */
729 static void
730 handle_data (void *cls,
731              const struct DataMessage *dm)
732 {
733   struct GNUNET_DATASTORE_Handle *h = cls;
734   struct GNUNET_DATASTORE_QueueEntry *qe;
735   struct ResultContext rc;
736
737   if (h->skip_next_messages > 0)
738   {
739     process_queue (h);
740     return;
741   }
742   qe = h->queue_head;
743   if (NULL == qe)
744   {
745     GNUNET_break (0);
746     do_disconnect (h);
747     return;
748   }
749   if (NULL != qe->env)
750   {
751     GNUNET_break (0);
752     do_disconnect (h);
753     return;
754   }
755   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
756   {
757     GNUNET_break (0);
758     do_disconnect (h);
759     return;
760   }
761 #if INSANE_STATISTICS
762   GNUNET_STATISTICS_update (h->stats,
763                             gettext_noop ("# Results received"),
764                             1,
765                             GNUNET_NO);
766 #endif
767   LOG (GNUNET_ERROR_TYPE_DEBUG,
768        "Received result %llu with type %u and size %u with key %s\n",
769        (unsigned long long) GNUNET_ntohll (dm->uid),
770        ntohl (dm->type),
771        ntohl (dm->size),
772        GNUNET_h2s (&dm->key));
773   rc = qe->qc.rc;
774   free_queue_entry (qe);
775   h->retry_time = GNUNET_TIME_UNIT_ZERO;
776   process_queue (h);
777   if (NULL != rc.proc)
778     rc.proc (rc.proc_cls,
779              &dm->key,
780              ntohl (dm->size),
781              &dm[1],
782              ntohl (dm->type),
783              ntohl (dm->priority),
784              ntohl (dm->anonymity),
785              GNUNET_TIME_absolute_ntoh (dm->expiration),
786              GNUNET_ntohll (dm->uid));
787 }
788
789
790 /**
791  * Type of a function to call when we receive a
792  * #GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the service.
793  *
794  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
795  * @param msg message received
796  */
797 static void
798 handle_data_end (void *cls,
799                  const struct GNUNET_MessageHeader *msg)
800 {
801   struct GNUNET_DATASTORE_Handle *h = cls;
802   struct GNUNET_DATASTORE_QueueEntry *qe;
803   struct ResultContext rc;
804
805   if (h->skip_next_messages > 0)
806   {
807     h->skip_next_messages--;
808     process_queue (h);
809     return;
810   }
811   qe = h->queue_head;
812   if (NULL == qe)
813   {
814     GNUNET_break (0);
815     do_disconnect (h);
816     return;
817   }
818   if (NULL != qe->env)
819   {
820     GNUNET_break (0);
821     do_disconnect (h);
822     return;
823   }
824   if (GNUNET_MESSAGE_TYPE_DATASTORE_DATA != qe->response_type)
825   {
826     GNUNET_break (0);
827     do_disconnect (h);
828     return;
829   }
830   rc = qe->qc.rc;
831   free_queue_entry (qe);
832   LOG (GNUNET_ERROR_TYPE_DEBUG,
833        "Received end of result set, new queue size is %u\n",
834        h->queue_size);
835   h->retry_time = GNUNET_TIME_UNIT_ZERO;
836   h->result_count = 0;
837   process_queue (h);
838   /* signal end of iteration */
839   if (NULL != rc.proc)
840     rc.proc (rc.proc_cls,
841              NULL,
842              0,
843              NULL,
844              0,
845              0,
846              0,
847              GNUNET_TIME_UNIT_ZERO_ABS,
848              0);
849 }
850
851
852 /**
853  * Try reconnecting to the datastore service.
854  *
855  * @param cls the `struct GNUNET_DATASTORE_Handle`
856  */
857 static void
858 try_reconnect (void *cls)
859 {
860   struct GNUNET_DATASTORE_Handle *h = cls;
861   struct GNUNET_MQ_MessageHandler handlers[] = {
862     GNUNET_MQ_hd_var_size (status,
863                            GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
864                            struct StatusMessage,
865                            h),
866     GNUNET_MQ_hd_var_size (data,
867                            GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
868                            struct DataMessage,
869                            h),
870     GNUNET_MQ_hd_fixed_size (data_end,
871                              GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END,
872                              struct GNUNET_MessageHeader,
873                              h),
874     GNUNET_MQ_handler_end ()
875   };
876
877   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
878   h->reconnect_task = NULL;
879   GNUNET_assert (NULL == h->mq);
880   h->mq = GNUNET_CLIENT_connect (h->cfg,
881                                  "datastore",
882                                  handlers,
883                                  &mq_error_handler,
884                                  h);
885   if (NULL == h->mq)
886     return;
887   GNUNET_STATISTICS_update (h->stats,
888                             gettext_noop ("# datastore connections (re)created"),
889                             1,
890                             GNUNET_NO);
891   LOG (GNUNET_ERROR_TYPE_DEBUG,
892        "Reconnected to DATASTORE\n");
893   process_queue (h);
894 }
895
896
897 /**
898  * Dummy continuation used to do nothing (but be non-zero).
899  *
900  * @param cls closure
901  * @param result result
902  * @param min_expiration expiration time
903  * @param emsg error message
904  */
905 static void
906 drop_status_cont (void *cls,
907                   int32_t result,
908                   struct GNUNET_TIME_Absolute min_expiration,
909                   const char *emsg)
910 {
911   /* do nothing */
912 }
913
914
915 /**
916  * Store an item in the datastore.  If the item is already present,
917  * the priorities are summed up and the higher expiration time and
918  * lower anonymity level is used.
919  *
920  * @param h handle to the datastore
921  * @param rid reservation ID to use (from "reserve"); use 0 if no
922  *            prior reservation was made
923  * @param key key for the value
924  * @param size number of bytes in data
925  * @param data content stored
926  * @param type type of the content
927  * @param priority priority of the content
928  * @param anonymity anonymity-level for the content
929  * @param replication how often should the content be replicated to other peers?
930  * @param expiration expiration time for the content
931  * @param queue_priority ranking of this request in the priority queue
932  * @param max_queue_size at what queue size should this request be dropped
933  *        (if other requests of higher priority are in the queue)
934  * @param cont continuation to call when done
935  * @param cont_cls closure for @a cont
936  * @return NULL if the entry was not queued, otherwise a handle that can be used to
937  *         cancel; note that even if NULL is returned, the callback will be invoked
938  *         (or rather, will already have been invoked)
939  */
940 struct GNUNET_DATASTORE_QueueEntry *
941 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
942                       uint32_t rid,
943                       const struct GNUNET_HashCode *key,
944                       size_t size,
945                       const void *data,
946                       enum GNUNET_BLOCK_Type type,
947                       uint32_t priority,
948                       uint32_t anonymity,
949                       uint32_t replication,
950                       struct GNUNET_TIME_Absolute expiration,
951                       unsigned int queue_priority,
952                       unsigned int max_queue_size,
953                       GNUNET_DATASTORE_ContinuationWithStatus cont,
954                       void *cont_cls)
955 {
956   struct GNUNET_DATASTORE_QueueEntry *qe;
957   struct GNUNET_MQ_Envelope *env;
958   struct DataMessage *dm;
959   union QueueContext qc;
960
961   if (size + sizeof (*dm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
962   {
963     GNUNET_break (0);
964     return NULL;
965   }
966
967   LOG (GNUNET_ERROR_TYPE_DEBUG,
968        "Asked to put %u bytes of data under key `%s' for %s\n",
969        size,
970        GNUNET_h2s (key),
971        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
972                                                GNUNET_YES));
973   env = GNUNET_MQ_msg_extra (dm,
974                              size,
975                              GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
976   dm->rid = htonl (rid);
977   dm->size = htonl ((uint32_t) size);
978   dm->type = htonl (type);
979   dm->priority = htonl (priority);
980   dm->anonymity = htonl (anonymity);
981   dm->replication = htonl (replication);
982   dm->reserved = htonl (0);
983   dm->uid = GNUNET_htonll (0);
984   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
985   dm->key = *key;
986   GNUNET_memcpy (&dm[1],
987           data,
988           size);
989   qc.sc.cont = cont;
990   qc.sc.cont_cls = cont_cls;
991   qe = make_queue_entry (h,
992                          env,
993                          queue_priority,
994                          max_queue_size,
995                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
996                          &qc);
997   if (NULL == qe)
998   {
999     LOG (GNUNET_ERROR_TYPE_DEBUG,
1000          "Could not create queue entry for PUT\n");
1001     return NULL;
1002   }
1003   GNUNET_STATISTICS_update (h->stats,
1004                             gettext_noop ("# PUT requests executed"),
1005                             1,
1006                             GNUNET_NO);
1007   process_queue (h);
1008   return qe;
1009 }
1010
1011
1012 /**
1013  * Reserve space in the datastore.  This function should be used
1014  * to avoid "out of space" failures during a longer sequence of "put"
1015  * operations (for example, when a file is being inserted).
1016  *
1017  * @param h handle to the datastore
1018  * @param amount how much space (in bytes) should be reserved (for content only)
1019  * @param entries how many entries will be created (to calculate per-entry overhead)
1020  * @param cont continuation to call when done; "success" will be set to
1021  *             a positive reservation value if space could be reserved.
1022  * @param cont_cls closure for @a cont
1023  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1024  *         cancel; note that even if NULL is returned, the callback will be invoked
1025  *         (or rather, will already have been invoked)
1026  */
1027 struct GNUNET_DATASTORE_QueueEntry *
1028 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
1029                           uint64_t amount,
1030                           uint32_t entries,
1031                           GNUNET_DATASTORE_ContinuationWithStatus cont,
1032                           void *cont_cls)
1033 {
1034   struct GNUNET_DATASTORE_QueueEntry *qe;
1035   struct GNUNET_MQ_Envelope *env;
1036   struct ReserveMessage *rm;
1037   union QueueContext qc;
1038
1039   if (NULL == cont)
1040     cont = &drop_status_cont;
1041   LOG (GNUNET_ERROR_TYPE_DEBUG,
1042        "Asked to reserve %llu bytes of data and %u entries\n",
1043        (unsigned long long) amount,
1044        (unsigned int) entries);
1045   env = GNUNET_MQ_msg (rm,
1046                        GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
1047   rm->entries = htonl (entries);
1048   rm->amount = GNUNET_htonll (amount);
1049
1050   qc.sc.cont = cont;
1051   qc.sc.cont_cls = cont_cls;
1052   qe = make_queue_entry (h,
1053                          env,
1054                          UINT_MAX,
1055                          UINT_MAX,
1056                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1057                          &qc);
1058   if (NULL == qe)
1059   {
1060     LOG (GNUNET_ERROR_TYPE_DEBUG,
1061          "Could not create queue entry to reserve\n");
1062     return NULL;
1063   }
1064   GNUNET_STATISTICS_update (h->stats,
1065                             gettext_noop ("# RESERVE requests executed"),
1066                             1,
1067                             GNUNET_NO);
1068   process_queue (h);
1069   return qe;
1070 }
1071
1072
1073 /**
1074  * Signal that all of the data for which a reservation was made has
1075  * been stored and that whatever excess space might have been reserved
1076  * can now be released.
1077  *
1078  * @param h handle to the datastore
1079  * @param rid reservation ID (value of "success" in original continuation
1080  *        from the "reserve" function).
1081  * @param queue_priority ranking of this request in the priority queue
1082  * @param max_queue_size at what queue size should this request be dropped
1083  *        (if other requests of higher priority are in the queue)
1084  * @param queue_priority ranking of this request in the priority queue
1085  * @param max_queue_size at what queue size should this request be dropped
1086  *        (if other requests of higher priority are in the queue)
1087  * @param cont continuation to call when done
1088  * @param cont_cls closure for @a cont
1089  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1090  *         cancel; note that even if NULL is returned, the callback will be invoked
1091  *         (or rather, will already have been invoked)
1092  */
1093 struct GNUNET_DATASTORE_QueueEntry *
1094 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1095                                   uint32_t rid,
1096                                   unsigned int queue_priority,
1097                                   unsigned int max_queue_size,
1098                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1099                                   void *cont_cls)
1100 {
1101   struct GNUNET_DATASTORE_QueueEntry *qe;
1102   struct GNUNET_MQ_Envelope *env;
1103   struct ReleaseReserveMessage *rrm;
1104   union QueueContext qc;
1105
1106   if (NULL == cont)
1107     cont = &drop_status_cont;
1108   LOG (GNUNET_ERROR_TYPE_DEBUG,
1109        "Asked to release reserve %d\n",
1110        rid);
1111   env = GNUNET_MQ_msg (rrm,
1112                        GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1113   rrm->rid = htonl (rid);
1114   qc.sc.cont = cont;
1115   qc.sc.cont_cls = cont_cls;
1116   qe = make_queue_entry (h,
1117                          env,
1118                          queue_priority,
1119                          max_queue_size,
1120                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1121                          &qc);
1122   if (NULL == qe)
1123   {
1124     LOG (GNUNET_ERROR_TYPE_DEBUG,
1125          "Could not create queue entry to release reserve\n");
1126     return NULL;
1127   }
1128   GNUNET_STATISTICS_update (h->stats,
1129                             gettext_noop
1130                             ("# RELEASE RESERVE requests executed"), 1,
1131                             GNUNET_NO);
1132   process_queue (h);
1133   return qe;
1134 }
1135
1136
1137 /**
1138  * Explicitly remove some content from the database.
1139  * The @a cont continuation will be called with `status`
1140  * #GNUNET_OK" if content was removed, #GNUNET_NO
1141  * if no matching entry was found and #GNUNET_SYSERR
1142  * on all other types of errors.
1143  *
1144  * @param h handle to the datastore
1145  * @param key key for the value
1146  * @param size number of bytes in data
1147  * @param data content stored
1148  * @param queue_priority ranking of this request in the priority queue
1149  * @param max_queue_size at what queue size should this request be dropped
1150  *        (if other requests of higher priority are in the queue)
1151  * @param cont continuation to call when done
1152  * @param cont_cls closure for @a cont
1153  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1154  *         cancel; note that even if NULL is returned, the callback will be invoked
1155  *         (or rather, will already have been invoked)
1156  */
1157 struct GNUNET_DATASTORE_QueueEntry *
1158 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1159                          const struct GNUNET_HashCode *key,
1160                          size_t size,
1161                          const void *data,
1162                          unsigned int queue_priority,
1163                          unsigned int max_queue_size,
1164                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1165                          void *cont_cls)
1166 {
1167   struct GNUNET_DATASTORE_QueueEntry *qe;
1168   struct DataMessage *dm;
1169   struct GNUNET_MQ_Envelope *env;
1170   union QueueContext qc;
1171
1172   if (sizeof (*dm) + size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
1173   {
1174     GNUNET_break (0);
1175     return NULL;
1176   }
1177   if (NULL == cont)
1178     cont = &drop_status_cont;
1179   LOG (GNUNET_ERROR_TYPE_DEBUG,
1180        "Asked to remove %u bytes under key `%s'\n",
1181        size,
1182        GNUNET_h2s (key));
1183   env = GNUNET_MQ_msg_extra (dm,
1184                              size,
1185                              GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1186   dm->rid = htonl (0);
1187   dm->size = htonl (size);
1188   dm->type = htonl (0);
1189   dm->priority = htonl (0);
1190   dm->anonymity = htonl (0);
1191   dm->uid = GNUNET_htonll (0);
1192   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1193   dm->key = *key;
1194   GNUNET_memcpy (&dm[1],
1195           data,
1196           size);
1197
1198   qc.sc.cont = cont;
1199   qc.sc.cont_cls = cont_cls;
1200
1201   qe = make_queue_entry (h,
1202                          env,
1203                          queue_priority,
1204                          max_queue_size,
1205                          GNUNET_MESSAGE_TYPE_DATASTORE_STATUS,
1206                          &qc);
1207   if (NULL == qe)
1208   {
1209     LOG (GNUNET_ERROR_TYPE_DEBUG,
1210          "Could not create queue entry for REMOVE\n");
1211     return NULL;
1212   }
1213   GNUNET_STATISTICS_update (h->stats,
1214                             gettext_noop ("# REMOVE requests executed"),
1215                             1,
1216                             GNUNET_NO);
1217   process_queue (h);
1218   return qe;
1219 }
1220
1221
1222
1223 /**
1224  * Get a random value from the datastore for content replication.
1225  * Returns a single, random value among those with the highest
1226  * replication score, lowering positive replication scores by one for
1227  * the chosen value (if only content with a replication score exists,
1228  * a random value is returned and replication scores are not changed).
1229  *
1230  * @param h handle to the datastore
1231  * @param queue_priority ranking of this request in the priority queue
1232  * @param max_queue_size at what queue size should this request be dropped
1233  *        (if other requests of higher priority are in the queue)
1234  * @param proc function to call on a random value; it
1235  *        will be called once with a value (if available)
1236  *        and always once with a value of NULL.
1237  * @param proc_cls closure for @a proc
1238  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1239  *         cancel
1240  */
1241 struct GNUNET_DATASTORE_QueueEntry *
1242 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1243                                       unsigned int queue_priority,
1244                                       unsigned int max_queue_size,
1245                                       GNUNET_DATASTORE_DatumProcessor proc,
1246                                       void *proc_cls)
1247 {
1248   struct GNUNET_DATASTORE_QueueEntry *qe;
1249   struct GNUNET_MQ_Envelope *env;
1250   struct GNUNET_MessageHeader *m;
1251   union QueueContext qc;
1252
1253   GNUNET_assert (NULL != proc);
1254   LOG (GNUNET_ERROR_TYPE_DEBUG,
1255        "Asked to get replication entry\n");
1256   env = GNUNET_MQ_msg (m,
1257                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1258   qc.rc.proc = proc;
1259   qc.rc.proc_cls = proc_cls;
1260   qe = make_queue_entry (h,
1261                          env,
1262                          queue_priority,
1263                          max_queue_size,
1264                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1265                          &qc);
1266   if (NULL == qe)
1267   {
1268     LOG (GNUNET_ERROR_TYPE_DEBUG,
1269          "Could not create queue entry for GET REPLICATION\n");
1270     return NULL;
1271   }
1272   GNUNET_STATISTICS_update (h->stats,
1273                             gettext_noop
1274                             ("# GET REPLICATION requests executed"), 1,
1275                             GNUNET_NO);
1276   process_queue (h);
1277   return qe;
1278 }
1279
1280
1281 /**
1282  * Get a single zero-anonymity value from the datastore.
1283  *
1284  * @param h handle to the datastore
1285  * @param offset offset of the result (modulo num-results); set to
1286  *               a random 64-bit value initially; then increment by
1287  *               one each time; detect that all results have been found by uid
1288  *               being again the first uid ever returned.
1289  * @param queue_priority ranking of this request in the priority queue
1290  * @param max_queue_size at what queue size should this request be dropped
1291  *        (if other requests of higher priority are in the queue)
1292  * @param type allowed type for the operation (never zero)
1293  * @param proc function to call on a random value; it
1294  *        will be called once with a value (if available)
1295  *        or with NULL if none value exists.
1296  * @param proc_cls closure for @a proc
1297  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1298  *         cancel
1299  */
1300 struct GNUNET_DATASTORE_QueueEntry *
1301 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1302                                      uint64_t offset,
1303                                      unsigned int queue_priority,
1304                                      unsigned int max_queue_size,
1305                                      enum GNUNET_BLOCK_Type type,
1306                                      GNUNET_DATASTORE_DatumProcessor proc,
1307                                      void *proc_cls)
1308 {
1309   struct GNUNET_DATASTORE_QueueEntry *qe;
1310   struct GNUNET_MQ_Envelope *env;
1311   struct GetZeroAnonymityMessage *m;
1312   union QueueContext qc;
1313
1314   GNUNET_assert (NULL != proc);
1315   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1316   LOG (GNUNET_ERROR_TYPE_DEBUG,
1317        "Asked to get %llu-th zero-anonymity entry of type %d\n",
1318        (unsigned long long) offset,
1319        type);
1320   env = GNUNET_MQ_msg (m,
1321                        GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1322   m->type = htonl ((uint32_t) type);
1323   m->offset = GNUNET_htonll (offset);
1324   qc.rc.proc = proc;
1325   qc.rc.proc_cls = proc_cls;
1326   qe = make_queue_entry (h,
1327                          env,
1328                          queue_priority,
1329                          max_queue_size,
1330                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1331                          &qc);
1332   if (NULL == qe)
1333   {
1334     LOG (GNUNET_ERROR_TYPE_DEBUG,
1335          "Could not create queue entry for zero-anonymity procation\n");
1336     return NULL;
1337   }
1338   GNUNET_STATISTICS_update (h->stats,
1339                             gettext_noop
1340                             ("# GET ZERO ANONYMITY requests executed"), 1,
1341                             GNUNET_NO);
1342   process_queue (h);
1343   return qe;
1344 }
1345
1346
1347 /**
1348  * Get a result for a particular key from the datastore.  The processor
1349  * will only be called once.
1350  *
1351  * @param h handle to the datastore
1352  * @param offset offset of the result (modulo num-results); set to
1353  *               a random 64-bit value initially; then increment by
1354  *               one each time; detect that all results have been found by uid
1355  *               being again the first uid ever returned.
1356  * @param key maybe NULL (to match all entries)
1357  * @param type desired type, 0 for any
1358  * @param queue_priority ranking of this request in the priority queue
1359  * @param max_queue_size at what queue size should this request be dropped
1360  *        (if other requests of higher priority are in the queue)
1361  * @param proc function to call on each matching value;
1362  *        will be called once with a NULL value at the end
1363  * @param proc_cls closure for @a proc
1364  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1365  *         cancel
1366  */
1367 struct GNUNET_DATASTORE_QueueEntry *
1368 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1369                           uint64_t offset,
1370                           const struct GNUNET_HashCode *key,
1371                           enum GNUNET_BLOCK_Type type,
1372                           unsigned int queue_priority,
1373                           unsigned int max_queue_size,
1374                           GNUNET_DATASTORE_DatumProcessor proc,
1375                           void *proc_cls)
1376 {
1377   struct GNUNET_DATASTORE_QueueEntry *qe;
1378   struct GNUNET_MQ_Envelope *env;
1379   struct GetKeyMessage *gkm;
1380   struct GetMessage *gm;
1381   union QueueContext qc;
1382
1383   GNUNET_assert (NULL != proc);
1384   LOG (GNUNET_ERROR_TYPE_DEBUG,
1385        "Asked to look for data of type %u under key `%s'\n",
1386        (unsigned int) type,
1387        GNUNET_h2s (key));
1388   if (NULL == key)
1389   {
1390     env = GNUNET_MQ_msg (gm,
1391                          GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1392     gm->type = htonl (type);
1393     gm->offset = GNUNET_htonll (offset);
1394   }
1395   else
1396   {
1397     env = GNUNET_MQ_msg (gkm,
1398                          GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY);
1399     gkm->type = htonl (type);
1400     gkm->offset = GNUNET_htonll (offset);
1401     gkm->key = *key;
1402   }
1403   qc.rc.proc = proc;
1404   qc.rc.proc_cls = proc_cls;
1405   qe = make_queue_entry (h,
1406                          env,
1407                          queue_priority,
1408                          max_queue_size,
1409                          GNUNET_MESSAGE_TYPE_DATASTORE_DATA,
1410                          &qc);
1411   if (NULL == qe)
1412   {
1413     LOG (GNUNET_ERROR_TYPE_DEBUG,
1414          "Could not queue request for `%s'\n",
1415          GNUNET_h2s (key));
1416     return NULL;
1417   }
1418 #if INSANE_STATISTICS
1419   GNUNET_STATISTICS_update (h->stats,
1420                             gettext_noop ("# GET requests executed"),
1421                             1,
1422                             GNUNET_NO);
1423 #endif
1424   process_queue (h);
1425   return qe;
1426 }
1427
1428
1429 /**
1430  * Cancel a datastore operation.  The final callback from the
1431  * operation must not have been done yet.
1432  *
1433  * @param qe operation to cancel
1434  */
1435 void
1436 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1437 {
1438   struct GNUNET_DATASTORE_Handle *h = qe->h;
1439
1440   LOG (GNUNET_ERROR_TYPE_DEBUG,
1441        "Pending DATASTORE request %p cancelled (%d, %d)\n",
1442        qe,
1443        NULL == qe->env,
1444        h->queue_head == qe);
1445   if (NULL == qe->env)
1446   {
1447     free_queue_entry (qe);
1448     h->skip_next_messages++;
1449     return;
1450   }
1451   free_queue_entry (qe);
1452   process_queue (h);
1453 }
1454
1455
1456 /* end of datastore_api.c */