d7e933fc64cd4dcb5fd73c450f1787a185ca2eed
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98
99 /**
100  * Entry in our priority queue.
101  */
102 struct GNUNET_DATASTORE_QueueEntry
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *next;
109
110   /**
111    * This is a linked list.
112    */
113   struct GNUNET_DATASTORE_QueueEntry *prev;
114
115   /**
116    * Handle to the master context.
117    */
118   struct GNUNET_DATASTORE_Handle *h;
119
120   /**
121    * Response processor (NULL if we are not waiting for a response).
122    * This struct should be used for the closure, function-specific
123    * arguments can be passed via 'qc'.
124    */
125   GNUNET_CLIENT_MessageHandler response_proc;
126
127   /**
128    * Function to call after transmission of the request.
129    */
130   GNUNET_DATASTORE_ContinuationWithStatus cont;
131
132   /**
133    * Closure for 'cont'.
134    */
135   void *cont_cls;
136
137   /**
138    * Context for the operation.
139    */
140   union QueueContext qc;
141
142   /**
143    * Task for timeout signalling.
144    */
145   struct GNUNET_SCHEDULER_Task * task;
146
147   /**
148    * Timeout for the current operation.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Priority in the queue.
154    */
155   unsigned int priority;
156
157   /**
158    * Maximum allowed length of queue (otherwise
159    * this request should be discarded).
160    */
161   unsigned int max_queue;
162
163   /**
164    * Number of bytes in the request message following
165    * this struct.  32-bit value for nicer memory
166    * access (and overall struct alignment).
167    */
168   uint32_t message_size;
169
170   /**
171    * Has this message been transmitted to the service?
172    * Only ever GNUNET_YES for the head of the queue.
173    * Note that the overall struct should end at a
174    * multiple of 64 bits.
175    */
176   int was_transmitted;
177
178 };
179
180 /**
181  * Handle to the datastore service.
182  */
183 struct GNUNET_DATASTORE_Handle
184 {
185
186   /**
187    * Our configuration.
188    */
189   const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   struct GNUNET_SCHEDULER_Task * reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244   /**
245    * We should ignore the next message(s) from the service.
246    */
247   unsigned int skip_next_messages;
248
249 };
250
251
252
253 /**
254  * Connect to the datastore service.
255  *
256  * @param cfg configuration to use
257  * @return handle to use to access the service
258  */
259 struct GNUNET_DATASTORE_Handle *
260 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
261 {
262   struct GNUNET_CLIENT_Connection *c;
263   struct GNUNET_DATASTORE_Handle *h;
264
265   c = GNUNET_CLIENT_connect ("datastore", cfg);
266   if (c == NULL)
267     return NULL;                /* oops */
268   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
269                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
270   h->client = c;
271   h->cfg = cfg;
272   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
273   return h;
274 }
275
276
277 /**
278  * Task used by 'transmit_drop' to disconnect the datastore.
279  *
280  * @param cls the datastore handle
281  * @param tc scheduler context
282  */
283 static void
284 disconnect_after_drop (void *cls,
285                        const struct GNUNET_SCHEDULER_TaskContext *tc)
286 {
287   struct GNUNET_DATASTORE_Handle *h = cls;
288
289   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290 }
291
292
293 /**
294  * Transmit DROP message to datastore service.
295  *
296  * @param cls the `struct GNUNET_DATASTORE_Handle`
297  * @param size number of bytes that can be copied to @a buf
298  * @param buf where to copy the drop message
299  * @return number of bytes written to @a buf
300  */
301 static size_t
302 transmit_drop (void *cls, size_t size, void *buf)
303 {
304   struct GNUNET_DATASTORE_Handle *h = cls;
305   struct GNUNET_MessageHeader *hdr;
306
307   if (buf == NULL)
308   {
309     LOG (GNUNET_ERROR_TYPE_WARNING,
310          _("Failed to transmit request to drop database.\n"));
311     GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
312     return 0;
313   }
314   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
315   hdr = buf;
316   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
317   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
318   GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
319   return sizeof (struct GNUNET_MessageHeader);
320 }
321
322
323 /**
324  * Disconnect from the datastore service (and free
325  * associated resources).
326  *
327  * @param h handle to the datastore
328  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
329  */
330 void
331 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
332 {
333   struct GNUNET_DATASTORE_QueueEntry *qe;
334
335   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
336   if (NULL != h->th)
337   {
338     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
339     h->th = NULL;
340   }
341   if (h->client != NULL)
342   {
343     GNUNET_CLIENT_disconnect (h->client);
344     h->client = NULL;
345   }
346   if (h->reconnect_task != NULL)
347   {
348     GNUNET_SCHEDULER_cancel (h->reconnect_task);
349     h->reconnect_task = NULL;
350   }
351   while (NULL != (qe = h->queue_head))
352   {
353     GNUNET_assert (NULL != qe->response_proc);
354     qe->response_proc (h, NULL);
355   }
356   if (GNUNET_YES == drop)
357   {
358     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
359     if (h->client != NULL)
360     {
361       if (NULL !=
362           GNUNET_CLIENT_notify_transmit_ready (h->client,
363                                                sizeof (struct
364                                                        GNUNET_MessageHeader),
365                                                GNUNET_TIME_UNIT_MINUTES,
366                                                GNUNET_YES, &transmit_drop, h))
367         return;
368       GNUNET_CLIENT_disconnect (h->client);
369       h->client = NULL;
370     }
371     GNUNET_break (0);
372   }
373   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
374   h->stats = NULL;
375   GNUNET_free (h);
376 }
377
378
379 /**
380  * A request has timed out (before being transmitted to the service).
381  *
382  * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
383  * @param tc scheduler context
384  */
385 static void
386 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
387 {
388   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
389   struct GNUNET_DATASTORE_Handle *h = qe->h;
390
391   GNUNET_STATISTICS_update (h->stats,
392                             gettext_noop ("# queue entry timeouts"), 1,
393                             GNUNET_NO);
394   qe->task = NULL;
395   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
396   LOG (GNUNET_ERROR_TYPE_DEBUG,
397        "Timeout of request in datastore queue\n");
398   /* response_proc's expect request at the head of the queue! */
399   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
400   GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, qe);
401   GNUNET_assert (h->queue_head == qe);
402   qe->response_proc (qe->h, NULL);
403 }
404
405
406 /**
407  * Create a new entry for our priority queue (and possibly discard other entires if
408  * the queue is getting too long).
409  *
410  * @param h handle to the datastore
411  * @param msize size of the message to queue
412  * @param queue_priority priority of the entry
413  * @param max_queue_size at what queue size should this request be dropped
414  *        (if other requests of higher priority are in the queue)
415  * @param timeout timeout for the operation
416  * @param response_proc function to call with replies (can be NULL)
417  * @param qc client context (NOT a closure for @a response_proc)
418  * @return NULL if the queue is full
419  */
420 static struct GNUNET_DATASTORE_QueueEntry *
421 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
422                   unsigned int queue_priority, unsigned int max_queue_size,
423                   struct GNUNET_TIME_Relative timeout,
424                   GNUNET_CLIENT_MessageHandler response_proc,
425                   const union QueueContext *qc)
426 {
427   struct GNUNET_DATASTORE_QueueEntry *ret;
428   struct GNUNET_DATASTORE_QueueEntry *pos;
429   unsigned int c;
430
431   c = 0;
432   pos = h->queue_head;
433   while ((pos != NULL) && (c < max_queue_size) &&
434          (pos->priority >= queue_priority))
435   {
436     c++;
437     pos = pos->next;
438   }
439   if (c >= max_queue_size)
440   {
441     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
442                               GNUNET_NO);
443     return NULL;
444   }
445   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
446   ret->h = h;
447   ret->response_proc = response_proc;
448   ret->qc = *qc;
449   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
450   ret->priority = queue_priority;
451   ret->max_queue = max_queue_size;
452   ret->message_size = msize;
453   ret->was_transmitted = GNUNET_NO;
454   if (pos == NULL)
455   {
456     /* append at the tail */
457     pos = h->queue_tail;
458   }
459   else
460   {
461     pos = pos->prev;
462     /* do not insert at HEAD if HEAD query was already
463      * transmitted and we are still receiving replies! */
464     if ((pos == NULL) && (h->queue_head->was_transmitted))
465       pos = h->queue_head;
466   }
467   c++;
468 #if INSANE_STATISTICS
469   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
470                             1, GNUNET_NO);
471 #endif
472   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
473   h->queue_size++;
474   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
475   for (pos = ret->next; NULL != pos; pos = pos->next)
476   {
477     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
478     {
479       GNUNET_assert (NULL != pos->response_proc);
480       /* move 'pos' element to head so that it will be
481        * killed on 'NULL' call below */
482       LOG (GNUNET_ERROR_TYPE_DEBUG,
483            "Dropping request from datastore queue\n");
484       /* response_proc's expect request at the head of the queue! */
485       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
486       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
487       GNUNET_STATISTICS_update (h->stats,
488                                 gettext_noop
489                                 ("# Requests dropped from datastore queue"), 1,
490                                 GNUNET_NO);
491       GNUNET_assert (h->queue_head == pos);
492       pos->response_proc (h, NULL);
493       break;
494     }
495   }
496   return ret;
497 }
498
499
500 /**
501  * Process entries in the queue (or do nothing if we are already
502  * doing so).
503  *
504  * @param h handle to the datastore
505  */
506 static void
507 process_queue (struct GNUNET_DATASTORE_Handle *h);
508
509
510 /**
511  * Try reconnecting to the datastore service.
512  *
513  * @param cls the `struct GNUNET_DATASTORE_Handle`
514  * @param tc scheduler context
515  */
516 static void
517 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
518 {
519   struct GNUNET_DATASTORE_Handle *h = cls;
520
521   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
522   h->reconnect_task = NULL;
523   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
524   if (h->client == NULL)
525   {
526     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
527     return;
528   }
529   GNUNET_STATISTICS_update (h->stats,
530                             gettext_noop
531                             ("# datastore connections (re)created"), 1,
532                             GNUNET_NO);
533   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
534   process_queue (h);
535 }
536
537
538 /**
539  * Disconnect from the service and then try reconnecting to the datastore service
540  * after some delay.
541  *
542  * @param h handle to datastore to disconnect and reconnect
543  */
544 static void
545 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
546 {
547   if (NULL == h->client)
548   {
549     LOG (GNUNET_ERROR_TYPE_DEBUG,
550          "Client NULL in disconnect, will not try to reconnect\n");
551     return;
552   }
553   GNUNET_CLIENT_disconnect (h->client);
554   h->skip_next_messages = 0;
555   h->client = NULL;
556   h->reconnect_task =
557       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
558 }
559
560
561 /**
562  * Function called whenever we receive a message from
563  * the service.  Calls the appropriate handler.
564  *
565  * @param cls the `struct GNUNET_DATASTORE_Handle`
566  * @param msg the received message
567  */
568 static void
569 receive_cb (void *cls,
570             const struct GNUNET_MessageHeader *msg)
571 {
572   struct GNUNET_DATASTORE_Handle *h = cls;
573   struct GNUNET_DATASTORE_QueueEntry *qe;
574
575   h->in_receive = GNUNET_NO;
576   LOG (GNUNET_ERROR_TYPE_DEBUG,
577        "Receiving reply from datastore\n");
578   if (h->skip_next_messages > 0)
579   {
580     h->skip_next_messages--;
581     process_queue (h);
582     return;
583   }
584   if (NULL == (qe = h->queue_head))
585   {
586     GNUNET_break (0);
587     process_queue (h);
588     return;
589   }
590   qe->response_proc (h, msg);
591 }
592
593
594 /**
595  * Transmit request from queue to datastore service.
596  *
597  * @param cls the `struct GNUNET_DATASTORE_Handle`
598  * @param size number of bytes that can be copied to @a buf
599  * @param buf where to copy the drop message
600  * @return number of bytes written to @a buf
601  */
602 static size_t
603 transmit_request (void *cls,
604                   size_t size,
605                   void *buf)
606 {
607   struct GNUNET_DATASTORE_Handle *h = cls;
608   struct GNUNET_DATASTORE_QueueEntry *qe;
609   size_t msize;
610
611   h->th = NULL;
612   if (NULL == (qe = h->queue_head))
613     return 0;                   /* no entry in queue */
614   if (NULL == buf)
615   {
616     LOG (GNUNET_ERROR_TYPE_DEBUG,
617          "Failed to transmit request to DATASTORE.\n");
618     GNUNET_STATISTICS_update (h->stats,
619                               gettext_noop ("# transmission request failures"),
620                               1, GNUNET_NO);
621     do_disconnect (h);
622     return 0;
623   }
624   if (size < (msize = qe->message_size))
625   {
626     process_queue (h);
627     return 0;
628   }
629   LOG (GNUNET_ERROR_TYPE_DEBUG,
630        "Transmitting %u byte request to DATASTORE\n",
631        msize);
632   memcpy (buf, &qe[1], msize);
633   qe->was_transmitted = GNUNET_YES;
634   GNUNET_SCHEDULER_cancel (qe->task);
635   qe->task = NULL;
636   GNUNET_assert (GNUNET_NO == h->in_receive);
637   h->in_receive = GNUNET_YES;
638   GNUNET_CLIENT_receive (h->client,
639                          &receive_cb, h,
640                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
641 #if INSANE_STATISTICS
642   GNUNET_STATISTICS_update (h->stats,
643                             gettext_noop ("# bytes sent to datastore"), msize,
644                             GNUNET_NO);
645 #endif
646   return msize;
647 }
648
649
650 /**
651  * Process entries in the queue (or do nothing if we are already
652  * doing so).
653  *
654  * @param h handle to the datastore
655  */
656 static void
657 process_queue (struct GNUNET_DATASTORE_Handle *h)
658 {
659   struct GNUNET_DATASTORE_QueueEntry *qe;
660
661   if (NULL == (qe = h->queue_head))
662   {
663     /* no entry in queue */
664     LOG (GNUNET_ERROR_TYPE_DEBUG,
665          "Queue empty\n");
666     return;
667   }
668   if (GNUNET_YES == qe->was_transmitted)
669   {
670     /* waiting for replies */
671     LOG (GNUNET_ERROR_TYPE_DEBUG,
672          "Head request already transmitted\n");
673     return;
674   }
675   if (NULL != h->th)
676   {
677     /* request pending */
678     LOG (GNUNET_ERROR_TYPE_DEBUG,
679          "Pending transmission request\n");
680     return;
681   }
682   if (NULL == h->client)
683   {
684     /* waiting for reconnect */
685     LOG (GNUNET_ERROR_TYPE_DEBUG,
686          "Not connected\n");
687     return;
688   }
689   if (GNUNET_YES == h->in_receive)
690   {
691     /* wait for response to previous query */
692     return;
693   }
694   LOG (GNUNET_ERROR_TYPE_DEBUG,
695        "Queueing %u byte request to DATASTORE\n",
696        qe->message_size);
697   h->th
698     = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
699                                            GNUNET_TIME_absolute_get_remaining (qe->timeout),
700                                            GNUNET_YES,
701                                            &transmit_request, h);
702   GNUNET_assert (GNUNET_NO == h->in_receive);
703   GNUNET_break (NULL != h->th);
704 }
705
706
707 /**
708  * Dummy continuation used to do nothing (but be non-zero).
709  *
710  * @param cls closure
711  * @param result result
712  * @param min_expiration expiration time
713  * @param emsg error message
714  */
715 static void
716 drop_status_cont (void *cls, int32_t result,
717                   struct GNUNET_TIME_Absolute min_expiration,
718                   const char *emsg)
719 {
720   /* do nothing */
721 }
722
723
724 /**
725  * Free a queue entry.  Removes the given entry from the
726  * queue and releases associated resources.  Does NOT
727  * call the callback.
728  *
729  * @param qe entry to free.
730  */
731 static void
732 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
733 {
734   struct GNUNET_DATASTORE_Handle *h = qe->h;
735
736   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
737   if (qe->task != NULL)
738   {
739     GNUNET_SCHEDULER_cancel (qe->task);
740     qe->task = NULL;
741   }
742   h->queue_size--;
743   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
744   GNUNET_free (qe);
745 }
746
747
748 /**
749  * Type of a function to call when we receive a message
750  * from the service.
751  *
752  * @param cls closure
753  * @param msg message received, NULL on timeout or fatal error
754  */
755 static void
756 process_status_message (void *cls,
757                         const struct GNUNET_MessageHeader *msg)
758 {
759   struct GNUNET_DATASTORE_Handle *h = cls;
760   struct GNUNET_DATASTORE_QueueEntry *qe;
761   struct StatusContext rc;
762   const struct StatusMessage *sm;
763   const char *emsg;
764   int32_t status;
765   int was_transmitted;
766
767   if (NULL == (qe = h->queue_head))
768   {
769     GNUNET_break (0);
770     do_disconnect (h);
771     return;
772   }
773   rc = qe->qc.sc;
774   if (NULL == msg)
775   {
776     was_transmitted = qe->was_transmitted;
777     free_queue_entry (qe);
778     if (was_transmitted == GNUNET_YES)
779       do_disconnect (h);
780     else
781       process_queue (h);
782     if (NULL != rc.cont)
783       rc.cont (rc.cont_cls, GNUNET_SYSERR,
784                GNUNET_TIME_UNIT_ZERO_ABS,
785                _("Failed to receive status response from database."));
786     return;
787   }
788   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
789   free_queue_entry (qe);
790   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
791       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
792   {
793     GNUNET_break (0);
794     h->retry_time = GNUNET_TIME_UNIT_ZERO;
795     do_disconnect (h);
796     if (rc.cont != NULL)
797       rc.cont (rc.cont_cls, GNUNET_SYSERR,
798                GNUNET_TIME_UNIT_ZERO_ABS,
799                _("Error reading response from datastore service"));
800     return;
801   }
802   sm = (const struct StatusMessage *) msg;
803   status = ntohl (sm->status);
804   emsg = NULL;
805   if (ntohs (msg->size) > sizeof (struct StatusMessage))
806   {
807     emsg = (const char *) &sm[1];
808     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
809     {
810       GNUNET_break (0);
811       emsg = _("Invalid error message received from datastore service");
812     }
813   }
814   if ((status == GNUNET_SYSERR) && (emsg == NULL))
815   {
816     GNUNET_break (0);
817     emsg = _("Invalid error message received from datastore service");
818   }
819   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
820   GNUNET_STATISTICS_update (h->stats,
821                             gettext_noop ("# status messages received"), 1,
822                             GNUNET_NO);
823   h->retry_time = GNUNET_TIME_UNIT_ZERO;
824   process_queue (h);
825   if (rc.cont != NULL)
826     rc.cont (rc.cont_cls, status,
827              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
828              emsg);
829 }
830
831
832 /**
833  * Store an item in the datastore.  If the item is already present,
834  * the priorities are summed up and the higher expiration time and
835  * lower anonymity level is used.
836  *
837  * @param h handle to the datastore
838  * @param rid reservation ID to use (from "reserve"); use 0 if no
839  *            prior reservation was made
840  * @param key key for the value
841  * @param size number of bytes in data
842  * @param data content stored
843  * @param type type of the content
844  * @param priority priority of the content
845  * @param anonymity anonymity-level for the content
846  * @param replication how often should the content be replicated to other peers?
847  * @param expiration expiration time for the content
848  * @param queue_priority ranking of this request in the priority queue
849  * @param max_queue_size at what queue size should this request be dropped
850  *        (if other requests of higher priority are in the queue)
851  * @param timeout timeout for the operation
852  * @param cont continuation to call when done
853  * @param cont_cls closure for @a cont
854  * @return NULL if the entry was not queued, otherwise a handle that can be used to
855  *         cancel; note that even if NULL is returned, the callback will be invoked
856  *         (or rather, will already have been invoked)
857  */
858 struct GNUNET_DATASTORE_QueueEntry *
859 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
860                       const struct GNUNET_HashCode * key, size_t size,
861                       const void *data, enum GNUNET_BLOCK_Type type,
862                       uint32_t priority, uint32_t anonymity,
863                       uint32_t replication,
864                       struct GNUNET_TIME_Absolute expiration,
865                       unsigned int queue_priority, unsigned int max_queue_size,
866                       struct GNUNET_TIME_Relative timeout,
867                       GNUNET_DATASTORE_ContinuationWithStatus cont,
868                       void *cont_cls)
869 {
870   struct GNUNET_DATASTORE_QueueEntry *qe;
871   struct DataMessage *dm;
872   size_t msize;
873   union QueueContext qc;
874
875   LOG (GNUNET_ERROR_TYPE_DEBUG,
876        "Asked to put %u bytes of data under key `%s' for %s\n", size,
877        GNUNET_h2s (key),
878        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
879                                                GNUNET_YES));
880   msize = sizeof (struct DataMessage) + size;
881   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
882   qc.sc.cont = cont;
883   qc.sc.cont_cls = cont_cls;
884   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
885                          &process_status_message, &qc);
886   if (qe == NULL)
887   {
888     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
889     return NULL;
890   }
891   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
892                             1, GNUNET_NO);
893   dm = (struct DataMessage *) &qe[1];
894   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
895   dm->header.size = htons (msize);
896   dm->rid = htonl (rid);
897   dm->size = htonl ((uint32_t) size);
898   dm->type = htonl (type);
899   dm->priority = htonl (priority);
900   dm->anonymity = htonl (anonymity);
901   dm->replication = htonl (replication);
902   dm->reserved = htonl (0);
903   dm->uid = GNUNET_htonll (0);
904   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
905   dm->key = *key;
906   memcpy (&dm[1], data, size);
907   process_queue (h);
908   return qe;
909 }
910
911
912 /**
913  * Reserve space in the datastore.  This function should be used
914  * to avoid "out of space" failures during a longer sequence of "put"
915  * operations (for example, when a file is being inserted).
916  *
917  * @param h handle to the datastore
918  * @param amount how much space (in bytes) should be reserved (for content only)
919  * @param entries how many entries will be created (to calculate per-entry overhead)
920  * @param cont continuation to call when done; "success" will be set to
921  *             a positive reservation value if space could be reserved.
922  * @param cont_cls closure for @a cont
923  * @return NULL if the entry was not queued, otherwise a handle that can be used to
924  *         cancel; note that even if NULL is returned, the callback will be invoked
925  *         (or rather, will already have been invoked)
926  */
927 struct GNUNET_DATASTORE_QueueEntry *
928 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
929                           uint32_t entries,
930                           GNUNET_DATASTORE_ContinuationWithStatus cont,
931                           void *cont_cls)
932 {
933   struct GNUNET_DATASTORE_QueueEntry *qe;
934   struct ReserveMessage *rm;
935   union QueueContext qc;
936
937   if (NULL == cont)
938     cont = &drop_status_cont;
939   LOG (GNUNET_ERROR_TYPE_DEBUG,
940        "Asked to reserve %llu bytes of data and %u entries\n",
941        (unsigned long long) amount, (unsigned int) entries);
942   qc.sc.cont = cont;
943   qc.sc.cont_cls = cont_cls;
944   qe = make_queue_entry (h,
945                          sizeof (struct ReserveMessage),
946                          UINT_MAX,
947                          UINT_MAX,
948                          GNUNET_TIME_UNIT_FOREVER_REL,
949                          &process_status_message, &qc);
950   if (NULL == qe)
951   {
952     LOG (GNUNET_ERROR_TYPE_DEBUG,
953          "Could not create queue entry to reserve\n");
954     return NULL;
955   }
956   GNUNET_STATISTICS_update (h->stats,
957                             gettext_noop ("# RESERVE requests executed"), 1,
958                             GNUNET_NO);
959   rm = (struct ReserveMessage *) &qe[1];
960   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
961   rm->header.size = htons (sizeof (struct ReserveMessage));
962   rm->entries = htonl (entries);
963   rm->amount = GNUNET_htonll (amount);
964   process_queue (h);
965   return qe;
966 }
967
968
969 /**
970  * Signal that all of the data for which a reservation was made has
971  * been stored and that whatever excess space might have been reserved
972  * can now be released.
973  *
974  * @param h handle to the datastore
975  * @param rid reservation ID (value of "success" in original continuation
976  *        from the "reserve" function).
977  * @param queue_priority ranking of this request in the priority queue
978  * @param max_queue_size at what queue size should this request be dropped
979  *        (if other requests of higher priority are in the queue)
980  * @param queue_priority ranking of this request in the priority queue
981  * @param max_queue_size at what queue size should this request be dropped
982  *        (if other requests of higher priority are in the queue)
983  * @param timeout how long to wait at most for a response
984  * @param cont continuation to call when done
985  * @param cont_cls closure for @a cont
986  * @return NULL if the entry was not queued, otherwise a handle that can be used to
987  *         cancel; note that even if NULL is returned, the callback will be invoked
988  *         (or rather, will already have been invoked)
989  */
990 struct GNUNET_DATASTORE_QueueEntry *
991 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
992                                   uint32_t rid, unsigned int queue_priority,
993                                   unsigned int max_queue_size,
994                                   struct GNUNET_TIME_Relative timeout,
995                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
996                                   void *cont_cls)
997 {
998   struct GNUNET_DATASTORE_QueueEntry *qe;
999   struct ReleaseReserveMessage *rrm;
1000   union QueueContext qc;
1001
1002   if (cont == NULL)
1003     cont = &drop_status_cont;
1004   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1005   qc.sc.cont = cont;
1006   qc.sc.cont_cls = cont_cls;
1007   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1008                          queue_priority, max_queue_size, timeout,
1009                          &process_status_message, &qc);
1010   if (qe == NULL)
1011   {
1012     LOG (GNUNET_ERROR_TYPE_DEBUG,
1013          "Could not create queue entry to release reserve\n");
1014     return NULL;
1015   }
1016   GNUNET_STATISTICS_update (h->stats,
1017                             gettext_noop
1018                             ("# RELEASE RESERVE requests executed"), 1,
1019                             GNUNET_NO);
1020   rrm = (struct ReleaseReserveMessage *) &qe[1];
1021   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1022   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1023   rrm->rid = htonl (rid);
1024   process_queue (h);
1025   return qe;
1026 }
1027
1028
1029 /**
1030  * Update a value in the datastore.
1031  *
1032  * @param h handle to the datastore
1033  * @param uid identifier for the value
1034  * @param priority how much to increase the priority of the value
1035  * @param expiration new expiration value should be MAX of existing and this argument
1036  * @param queue_priority ranking of this request in the priority queue
1037  * @param max_queue_size at what queue size should this request be dropped
1038  *        (if other requests of higher priority are in the queue)
1039  * @param timeout how long to wait at most for a response
1040  * @param cont continuation to call when done
1041  * @param cont_cls closure for @a cont
1042  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1043  *         cancel; note that even if NULL is returned, the callback will be invoked
1044  *         (or rather, will already have been invoked)
1045  */
1046 struct GNUNET_DATASTORE_QueueEntry *
1047 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1048                          uint32_t priority,
1049                          struct GNUNET_TIME_Absolute expiration,
1050                          unsigned int queue_priority,
1051                          unsigned int max_queue_size,
1052                          struct GNUNET_TIME_Relative timeout,
1053                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1054                          void *cont_cls)
1055 {
1056   struct GNUNET_DATASTORE_QueueEntry *qe;
1057   struct UpdateMessage *um;
1058   union QueueContext qc;
1059
1060   if (cont == NULL)
1061     cont = &drop_status_cont;
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1064        uid,
1065        (unsigned int) priority,
1066        GNUNET_STRINGS_absolute_time_to_string (expiration));
1067   qc.sc.cont = cont;
1068   qc.sc.cont_cls = cont_cls;
1069   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1070                          max_queue_size, timeout, &process_status_message, &qc);
1071   if (qe == NULL)
1072   {
1073     LOG (GNUNET_ERROR_TYPE_DEBUG,
1074          "Could not create queue entry for UPDATE\n");
1075     return NULL;
1076   }
1077   GNUNET_STATISTICS_update (h->stats,
1078                             gettext_noop ("# UPDATE requests executed"), 1,
1079                             GNUNET_NO);
1080   um = (struct UpdateMessage *) &qe[1];
1081   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1082   um->header.size = htons (sizeof (struct UpdateMessage));
1083   um->priority = htonl (priority);
1084   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1085   um->uid = GNUNET_htonll (uid);
1086   process_queue (h);
1087   return qe;
1088 }
1089
1090
1091 /**
1092  * Explicitly remove some content from the database.
1093  * The @a cont continuation will be called with `status`
1094  * #GNUNET_OK" if content was removed, #GNUNET_NO
1095  * if no matching entry was found and #GNUNET_SYSERR
1096  * on all other types of errors.
1097  *
1098  * @param h handle to the datastore
1099  * @param key key for the value
1100  * @param size number of bytes in data
1101  * @param data content stored
1102  * @param queue_priority ranking of this request in the priority queue
1103  * @param max_queue_size at what queue size should this request be dropped
1104  *        (if other requests of higher priority are in the queue)
1105  * @param timeout how long to wait at most for a response
1106  * @param cont continuation to call when done
1107  * @param cont_cls closure for @a cont
1108  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1109  *         cancel; note that even if NULL is returned, the callback will be invoked
1110  *         (or rather, will already have been invoked)
1111  */
1112 struct GNUNET_DATASTORE_QueueEntry *
1113 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1114                          const struct GNUNET_HashCode * key, size_t size,
1115                          const void *data, unsigned int queue_priority,
1116                          unsigned int max_queue_size,
1117                          struct GNUNET_TIME_Relative timeout,
1118                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1119                          void *cont_cls)
1120 {
1121   struct GNUNET_DATASTORE_QueueEntry *qe;
1122   struct DataMessage *dm;
1123   size_t msize;
1124   union QueueContext qc;
1125
1126   if (cont == NULL)
1127     cont = &drop_status_cont;
1128   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1129        size, GNUNET_h2s (key));
1130   qc.sc.cont = cont;
1131   qc.sc.cont_cls = cont_cls;
1132   msize = sizeof (struct DataMessage) + size;
1133   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1134   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1135                          &process_status_message, &qc);
1136   if (qe == NULL)
1137   {
1138     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1139     return NULL;
1140   }
1141   GNUNET_STATISTICS_update (h->stats,
1142                             gettext_noop ("# REMOVE requests executed"), 1,
1143                             GNUNET_NO);
1144   dm = (struct DataMessage *) &qe[1];
1145   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1146   dm->header.size = htons (msize);
1147   dm->rid = htonl (0);
1148   dm->size = htonl (size);
1149   dm->type = htonl (0);
1150   dm->priority = htonl (0);
1151   dm->anonymity = htonl (0);
1152   dm->uid = GNUNET_htonll (0);
1153   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1154   dm->key = *key;
1155   memcpy (&dm[1], data, size);
1156   process_queue (h);
1157   return qe;
1158 }
1159
1160
1161 /**
1162  * Type of a function to call when we receive a message
1163  * from the service.
1164  *
1165  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
1166  * @param msg message received, NULL on timeout or fatal error
1167  */
1168 static void
1169 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1170 {
1171   struct GNUNET_DATASTORE_Handle *h = cls;
1172   struct GNUNET_DATASTORE_QueueEntry *qe;
1173   struct ResultContext rc;
1174   const struct DataMessage *dm;
1175   int was_transmitted;
1176
1177   if (NULL == msg)
1178   {
1179     qe = h->queue_head;
1180     GNUNET_assert (NULL != qe);
1181     rc = qe->qc.rc;
1182     was_transmitted = qe->was_transmitted;
1183     free_queue_entry (qe);
1184     if (GNUNET_YES == was_transmitted)
1185     {
1186       LOG (GNUNET_ERROR_TYPE_DEBUG,
1187            "Failed to receive response from database.\n");
1188       do_disconnect (h);
1189     }
1190     else
1191     {
1192       process_queue (h);
1193     }
1194     if (NULL != rc.proc)
1195       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1196                0);
1197     return;
1198   }
1199   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1200   {
1201     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1202     qe = h->queue_head;
1203     rc = qe->qc.rc;
1204     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1205     free_queue_entry (qe);
1206     LOG (GNUNET_ERROR_TYPE_DEBUG,
1207          "Received end of result set, new queue size is %u\n", h->queue_size);
1208     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1209     h->result_count = 0;
1210     process_queue (h);
1211     if (NULL != rc.proc)
1212       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1213                0);
1214     return;
1215   }
1216   qe = h->queue_head;
1217   GNUNET_assert (NULL != qe);
1218   rc = qe->qc.rc;
1219   if (GNUNET_YES != qe->was_transmitted)
1220   {
1221     GNUNET_break (0);
1222     free_queue_entry (qe);
1223     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1224     do_disconnect (h);
1225     if (rc.proc != NULL)
1226       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1227                0);
1228     return;
1229   }
1230   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1231       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1232       (ntohs (msg->size) !=
1233        sizeof (struct DataMessage) +
1234        ntohl (((const struct DataMessage *) msg)->size)))
1235   {
1236     GNUNET_break (0);
1237     free_queue_entry (qe);
1238     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1239     do_disconnect (h);
1240     if (rc.proc != NULL)
1241       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1242                0);
1243     return;
1244   }
1245 #if INSANE_STATISTICS
1246   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1247                             GNUNET_NO);
1248 #endif
1249   dm = (const struct DataMessage *) msg;
1250   LOG (GNUNET_ERROR_TYPE_DEBUG,
1251        "Received result %llu with type %u and size %u with key %s\n",
1252        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1253        ntohl (dm->size), GNUNET_h2s (&dm->key));
1254   free_queue_entry (qe);
1255   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1256   process_queue (h);
1257   if (rc.proc != NULL)
1258     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1259              ntohl (dm->priority), ntohl (dm->anonymity),
1260              GNUNET_TIME_absolute_ntoh (dm->expiration),
1261              GNUNET_ntohll (dm->uid));
1262 }
1263
1264
1265 /**
1266  * Get a random value from the datastore for content replication.
1267  * Returns a single, random value among those with the highest
1268  * replication score, lowering positive replication scores by one for
1269  * the chosen value (if only content with a replication score exists,
1270  * a random value is returned and replication scores are not changed).
1271  *
1272  * @param h handle to the datastore
1273  * @param queue_priority ranking of this request in the priority queue
1274  * @param max_queue_size at what queue size should this request be dropped
1275  *        (if other requests of higher priority are in the queue)
1276  * @param timeout how long to wait at most for a response
1277  * @param proc function to call on a random value; it
1278  *        will be called once with a value (if available)
1279  *        and always once with a value of NULL.
1280  * @param proc_cls closure for @a proc
1281  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1282  *         cancel
1283  */
1284 struct GNUNET_DATASTORE_QueueEntry *
1285 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1286                                       unsigned int queue_priority,
1287                                       unsigned int max_queue_size,
1288                                       struct GNUNET_TIME_Relative timeout,
1289                                       GNUNET_DATASTORE_DatumProcessor proc,
1290                                       void *proc_cls)
1291 {
1292   struct GNUNET_DATASTORE_QueueEntry *qe;
1293   struct GNUNET_MessageHeader *m;
1294   union QueueContext qc;
1295
1296   GNUNET_assert (NULL != proc);
1297   LOG (GNUNET_ERROR_TYPE_DEBUG,
1298        "Asked to get replication entry in %s\n",
1299        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1300   qc.rc.proc = proc;
1301   qc.rc.proc_cls = proc_cls;
1302   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1303                          queue_priority, max_queue_size, timeout,
1304                          &process_result_message, &qc);
1305   if (NULL == qe)
1306   {
1307     LOG (GNUNET_ERROR_TYPE_DEBUG,
1308          "Could not create queue entry for GET REPLICATION\n");
1309     return NULL;
1310   }
1311   GNUNET_STATISTICS_update (h->stats,
1312                             gettext_noop
1313                             ("# GET REPLICATION requests executed"), 1,
1314                             GNUNET_NO);
1315   m = (struct GNUNET_MessageHeader *) &qe[1];
1316   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1317   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1318   process_queue (h);
1319   return qe;
1320 }
1321
1322
1323 /**
1324  * Get a single zero-anonymity value from the datastore.
1325  *
1326  * @param h handle to the datastore
1327  * @param offset offset of the result (modulo num-results); set to
1328  *               a random 64-bit value initially; then increment by
1329  *               one each time; detect that all results have been found by uid
1330  *               being again the first uid ever returned.
1331  * @param queue_priority ranking of this request in the priority queue
1332  * @param max_queue_size at what queue size should this request be dropped
1333  *        (if other requests of higher priority are in the queue)
1334  * @param timeout how long to wait at most for a response
1335  * @param type allowed type for the operation (never zero)
1336  * @param proc function to call on a random value; it
1337  *        will be called once with a value (if available)
1338  *        or with NULL if none value exists.
1339  * @param proc_cls closure for @a proc
1340  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1341  *         cancel
1342  */
1343 struct GNUNET_DATASTORE_QueueEntry *
1344 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1345                                      uint64_t offset,
1346                                      unsigned int queue_priority,
1347                                      unsigned int max_queue_size,
1348                                      struct GNUNET_TIME_Relative timeout,
1349                                      enum GNUNET_BLOCK_Type type,
1350                                      GNUNET_DATASTORE_DatumProcessor proc,
1351                                      void *proc_cls)
1352 {
1353   struct GNUNET_DATASTORE_QueueEntry *qe;
1354   struct GetZeroAnonymityMessage *m;
1355   union QueueContext qc;
1356
1357   GNUNET_assert (NULL != proc);
1358   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1359   LOG (GNUNET_ERROR_TYPE_DEBUG,
1360        "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
1361        (unsigned long long) offset, type,
1362        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1363   qc.rc.proc = proc;
1364   qc.rc.proc_cls = proc_cls;
1365   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1366                          queue_priority, max_queue_size, timeout,
1367                          &process_result_message, &qc);
1368   if (NULL == qe)
1369   {
1370     LOG (GNUNET_ERROR_TYPE_DEBUG,
1371          "Could not create queue entry for zero-anonymity procation\n");
1372     return NULL;
1373   }
1374   GNUNET_STATISTICS_update (h->stats,
1375                             gettext_noop
1376                             ("# GET ZERO ANONYMITY requests executed"), 1,
1377                             GNUNET_NO);
1378   m = (struct GetZeroAnonymityMessage *) &qe[1];
1379   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1380   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1381   m->type = htonl ((uint32_t) type);
1382   m->offset = GNUNET_htonll (offset);
1383   process_queue (h);
1384   return qe;
1385 }
1386
1387
1388 /**
1389  * Get a result for a particular key from the datastore.  The processor
1390  * will only be called once.
1391  *
1392  * @param h handle to the datastore
1393  * @param offset offset of the result (modulo num-results); set to
1394  *               a random 64-bit value initially; then increment by
1395  *               one each time; detect that all results have been found by uid
1396  *               being again the first uid ever returned.
1397  * @param key maybe NULL (to match all entries)
1398  * @param type desired type, 0 for any
1399  * @param queue_priority ranking of this request in the priority queue
1400  * @param max_queue_size at what queue size should this request be dropped
1401  *        (if other requests of higher priority are in the queue)
1402  * @param timeout how long to wait at most for a response
1403  * @param proc function to call on each matching value;
1404  *        will be called once with a NULL value at the end
1405  * @param proc_cls closure for @a proc
1406  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1407  *         cancel
1408  */
1409 struct GNUNET_DATASTORE_QueueEntry *
1410 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1411                           uint64_t offset,
1412                           const struct GNUNET_HashCode * key,
1413                           enum GNUNET_BLOCK_Type type,
1414                           unsigned int queue_priority,
1415                           unsigned int max_queue_size,
1416                           struct GNUNET_TIME_Relative timeout,
1417                           GNUNET_DATASTORE_DatumProcessor proc,
1418                           void *proc_cls)
1419 {
1420   struct GNUNET_DATASTORE_QueueEntry *qe;
1421   struct GetMessage *gm;
1422   union QueueContext qc;
1423
1424   GNUNET_assert (NULL != proc);
1425   LOG (GNUNET_ERROR_TYPE_DEBUG,
1426        "Asked to look for data of type %u under key `%s'\n",
1427        (unsigned int) type, GNUNET_h2s (key));
1428   qc.rc.proc = proc;
1429   qc.rc.proc_cls = proc_cls;
1430   qe = make_queue_entry (h,
1431                          sizeof (struct GetMessage),
1432                          queue_priority,
1433                          max_queue_size,
1434                          timeout,
1435                          &process_result_message,
1436                          &qc);
1437   if (qe == NULL)
1438   {
1439     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1440          GNUNET_h2s (key));
1441     return NULL;
1442   }
1443 #if INSANE_STATISTICS
1444   GNUNET_STATISTICS_update (h->stats,
1445                             gettext_noop ("# GET requests executed"),
1446                             1,
1447                             GNUNET_NO);
1448 #endif
1449   gm = (struct GetMessage *) &qe[1];
1450   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1451   gm->type = htonl (type);
1452   gm->offset = GNUNET_htonll (offset);
1453   if (key != NULL)
1454   {
1455     gm->header.size = htons (sizeof (struct GetMessage));
1456     gm->key = *key;
1457   }
1458   else
1459   {
1460     gm->header.size =
1461         htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1462   }
1463   process_queue (h);
1464   return qe;
1465 }
1466
1467
1468 /**
1469  * Cancel a datastore operation.  The final callback from the
1470  * operation must not have been done yet.
1471  *
1472  * @param qe operation to cancel
1473  */
1474 void
1475 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1476 {
1477   struct GNUNET_DATASTORE_Handle *h;
1478
1479   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1480   h = qe->h;
1481   LOG (GNUNET_ERROR_TYPE_DEBUG,
1482        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1483        qe->was_transmitted, h->queue_head == qe);
1484   if (GNUNET_YES == qe->was_transmitted)
1485   {
1486     free_queue_entry (qe);
1487     h->skip_next_messages++;
1488     return;
1489   }
1490   free_queue_entry (qe);
1491   process_queue (h);
1492 }
1493
1494
1495 /* end of datastore_api.c */