fix #4546
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      Copyright (C) 2004-2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98
99 /**
100  * Entry in our priority queue.
101  */
102 struct GNUNET_DATASTORE_QueueEntry
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *next;
109
110   /**
111    * This is a linked list.
112    */
113   struct GNUNET_DATASTORE_QueueEntry *prev;
114
115   /**
116    * Handle to the master context.
117    */
118   struct GNUNET_DATASTORE_Handle *h;
119
120   /**
121    * Response processor (NULL if we are not waiting for a response).
122    * This struct should be used for the closure, function-specific
123    * arguments can be passed via 'qc'.
124    */
125   GNUNET_CLIENT_MessageHandler response_proc;
126
127   /**
128    * Function to call after transmission of the request.
129    */
130   GNUNET_DATASTORE_ContinuationWithStatus cont;
131
132   /**
133    * Closure for 'cont'.
134    */
135   void *cont_cls;
136
137   /**
138    * Context for the operation.
139    */
140   union QueueContext qc;
141
142   /**
143    * Task for timeout signalling.
144    */
145   struct GNUNET_SCHEDULER_Task * task;
146
147   /**
148    * Timeout for the current operation.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Priority in the queue.
154    */
155   unsigned int priority;
156
157   /**
158    * Maximum allowed length of queue (otherwise
159    * this request should be discarded).
160    */
161   unsigned int max_queue;
162
163   /**
164    * Number of bytes in the request message following
165    * this struct.  32-bit value for nicer memory
166    * access (and overall struct alignment).
167    */
168   uint32_t message_size;
169
170   /**
171    * Has this message been transmitted to the service?
172    * Only ever GNUNET_YES for the head of the queue.
173    * Note that the overall struct should end at a
174    * multiple of 64 bits.
175    */
176   int was_transmitted;
177
178 };
179
180 /**
181  * Handle to the datastore service.
182  */
183 struct GNUNET_DATASTORE_Handle
184 {
185
186   /**
187    * Our configuration.
188    */
189   const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   struct GNUNET_SCHEDULER_Task * reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244   /**
245    * We should ignore the next message(s) from the service.
246    */
247   unsigned int skip_next_messages;
248
249 };
250
251
252
253 /**
254  * Connect to the datastore service.
255  *
256  * @param cfg configuration to use
257  * @return handle to use to access the service
258  */
259 struct GNUNET_DATASTORE_Handle *
260 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
261 {
262   struct GNUNET_CLIENT_Connection *c;
263   struct GNUNET_DATASTORE_Handle *h;
264
265   c = GNUNET_CLIENT_connect ("datastore", cfg);
266   if (c == NULL)
267     return NULL;                /* oops */
268   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
269                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
270   h->client = c;
271   h->cfg = cfg;
272   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
273   return h;
274 }
275
276
277 /**
278  * Task used by 'transmit_drop' to disconnect the datastore.
279  *
280  * @param cls the datastore handle
281  */
282 static void
283 disconnect_after_drop (void *cls)
284 {
285   struct GNUNET_DATASTORE_Handle *h = cls;
286
287   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
288 }
289
290
291 /**
292  * Transmit DROP message to datastore service.
293  *
294  * @param cls the `struct GNUNET_DATASTORE_Handle`
295  * @param size number of bytes that can be copied to @a buf
296  * @param buf where to copy the drop message
297  * @return number of bytes written to @a buf
298  */
299 static size_t
300 transmit_drop (void *cls, size_t size, void *buf)
301 {
302   struct GNUNET_DATASTORE_Handle *h = cls;
303   struct GNUNET_MessageHeader *hdr;
304
305   if (buf == NULL)
306   {
307     LOG (GNUNET_ERROR_TYPE_WARNING,
308          _("Failed to transmit request to drop database.\n"));
309     GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
310     return 0;
311   }
312   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
313   hdr = buf;
314   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
315   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
316   GNUNET_SCHEDULER_add_now (&disconnect_after_drop, h);
317   return sizeof (struct GNUNET_MessageHeader);
318 }
319
320
321 /**
322  * Disconnect from the datastore service (and free
323  * associated resources).
324  *
325  * @param h handle to the datastore
326  * @param drop set to #GNUNET_YES to delete all data in datastore (!)
327  */
328 void
329 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
330                              int drop)
331 {
332   struct GNUNET_DATASTORE_QueueEntry *qe;
333
334   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
335   if (NULL != h->th)
336   {
337     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
338     h->th = NULL;
339   }
340   if (NULL != h->client)
341   {
342     GNUNET_CLIENT_disconnect (h->client);
343     h->client = NULL;
344   }
345   if (NULL != h->reconnect_task)
346   {
347     GNUNET_SCHEDULER_cancel (h->reconnect_task);
348     h->reconnect_task = NULL;
349   }
350   while (NULL != (qe = h->queue_head))
351   {
352     GNUNET_assert (NULL != qe->response_proc);
353     qe->response_proc (h, NULL);
354   }
355   if (GNUNET_YES == drop)
356   {
357     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
358     if (NULL != h->client)
359     {
360       if (NULL !=
361           GNUNET_CLIENT_notify_transmit_ready (h->client,
362                                                sizeof (struct
363                                                        GNUNET_MessageHeader),
364                                                GNUNET_TIME_UNIT_SECONDS,
365                                                GNUNET_YES,
366                                                &transmit_drop, h))
367         return;
368       GNUNET_CLIENT_disconnect (h->client);
369       h->client = NULL;
370     }
371     GNUNET_break (0);
372   }
373   GNUNET_STATISTICS_destroy (h->stats,
374                              GNUNET_NO);
375   h->stats = NULL;
376   GNUNET_free (h);
377 }
378
379
380 /**
381  * A request has timed out (before being transmitted to the service).
382  *
383  * @param cls the `struct GNUNET_DATASTORE_QueueEntry`
384  */
385 static void
386 timeout_queue_entry (void *cls)
387 {
388   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
389   struct GNUNET_DATASTORE_Handle *h = qe->h;
390
391   GNUNET_STATISTICS_update (h->stats,
392                             gettext_noop ("# queue entry timeouts"), 1,
393                             GNUNET_NO);
394   qe->task = NULL;
395   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
396   LOG (GNUNET_ERROR_TYPE_DEBUG,
397        "Timeout of request in datastore queue\n");
398   /* response_proc's expect request at the head of the queue! */
399   GNUNET_CONTAINER_DLL_remove (h->queue_head,
400                                h->queue_tail,
401                                qe);
402   GNUNET_CONTAINER_DLL_insert (h->queue_head,
403                                h->queue_tail,
404                                qe);
405   GNUNET_assert (h->queue_head == qe);
406   qe->response_proc (qe->h, NULL);
407 }
408
409
410 /**
411  * Create a new entry for our priority queue (and possibly discard other entires if
412  * the queue is getting too long).
413  *
414  * @param h handle to the datastore
415  * @param msize size of the message to queue
416  * @param queue_priority priority of the entry
417  * @param max_queue_size at what queue size should this request be dropped
418  *        (if other requests of higher priority are in the queue)
419  * @param timeout timeout for the operation
420  * @param response_proc function to call with replies (can be NULL)
421  * @param qc client context (NOT a closure for @a response_proc)
422  * @return NULL if the queue is full
423  */
424 static struct GNUNET_DATASTORE_QueueEntry *
425 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
426                   size_t msize,
427                   unsigned int queue_priority,
428                   unsigned int max_queue_size,
429                   struct GNUNET_TIME_Relative timeout,
430                   GNUNET_CLIENT_MessageHandler response_proc,
431                   const union QueueContext *qc)
432 {
433   struct GNUNET_DATASTORE_QueueEntry *ret;
434   struct GNUNET_DATASTORE_QueueEntry *pos;
435   unsigned int c;
436
437   c = 0;
438   pos = h->queue_head;
439   while ((pos != NULL) && (c < max_queue_size) &&
440          (pos->priority >= queue_priority))
441   {
442     c++;
443     pos = pos->next;
444   }
445   if (c >= max_queue_size)
446   {
447     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
448                               GNUNET_NO);
449     return NULL;
450   }
451   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
452   ret->h = h;
453   ret->response_proc = response_proc;
454   ret->qc = *qc;
455   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
456   ret->priority = queue_priority;
457   ret->max_queue = max_queue_size;
458   ret->message_size = msize;
459   ret->was_transmitted = GNUNET_NO;
460   if (pos == NULL)
461   {
462     /* append at the tail */
463     pos = h->queue_tail;
464   }
465   else
466   {
467     pos = pos->prev;
468     /* do not insert at HEAD if HEAD query was already
469      * transmitted and we are still receiving replies! */
470     if ((pos == NULL) && (h->queue_head->was_transmitted))
471       pos = h->queue_head;
472   }
473   c++;
474 #if INSANE_STATISTICS
475   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
476                             1, GNUNET_NO);
477 #endif
478   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
479   h->queue_size++;
480   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
481   for (pos = ret->next; NULL != pos; pos = pos->next)
482   {
483     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
484     {
485       GNUNET_assert (NULL != pos->response_proc);
486       /* move 'pos' element to head so that it will be
487        * killed on 'NULL' call below */
488       LOG (GNUNET_ERROR_TYPE_DEBUG,
489            "Dropping request from datastore queue\n");
490       /* response_proc's expect request at the head of the queue! */
491       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
492       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
493       GNUNET_STATISTICS_update (h->stats,
494                                 gettext_noop
495                                 ("# Requests dropped from datastore queue"), 1,
496                                 GNUNET_NO);
497       GNUNET_assert (h->queue_head == pos);
498       pos->response_proc (h, NULL);
499       break;
500     }
501   }
502   return ret;
503 }
504
505
506 /**
507  * Process entries in the queue (or do nothing if we are already
508  * doing so).
509  *
510  * @param h handle to the datastore
511  */
512 static void
513 process_queue (struct GNUNET_DATASTORE_Handle *h);
514
515
516 /**
517  * Try reconnecting to the datastore service.
518  *
519  * @param cls the `struct GNUNET_DATASTORE_Handle`
520  */
521 static void
522 try_reconnect (void *cls)
523 {
524   struct GNUNET_DATASTORE_Handle *h = cls;
525
526   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
527   h->reconnect_task = NULL;
528   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
529   if (h->client == NULL)
530   {
531     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
532     return;
533   }
534   GNUNET_STATISTICS_update (h->stats,
535                             gettext_noop
536                             ("# datastore connections (re)created"), 1,
537                             GNUNET_NO);
538   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
539   process_queue (h);
540 }
541
542
543 /**
544  * Disconnect from the service and then try reconnecting to the datastore service
545  * after some delay.
546  *
547  * @param h handle to datastore to disconnect and reconnect
548  */
549 static void
550 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
551 {
552   if (NULL == h->client)
553   {
554     LOG (GNUNET_ERROR_TYPE_DEBUG,
555          "Client NULL in disconnect, will not try to reconnect\n");
556     return;
557   }
558   GNUNET_CLIENT_disconnect (h->client);
559   h->skip_next_messages = 0;
560   h->client = NULL;
561   h->reconnect_task =
562       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
563 }
564
565
566 /**
567  * Function called whenever we receive a message from
568  * the service.  Calls the appropriate handler.
569  *
570  * @param cls the `struct GNUNET_DATASTORE_Handle`
571  * @param msg the received message
572  */
573 static void
574 receive_cb (void *cls,
575             const struct GNUNET_MessageHeader *msg)
576 {
577   struct GNUNET_DATASTORE_Handle *h = cls;
578   struct GNUNET_DATASTORE_QueueEntry *qe;
579
580   h->in_receive = GNUNET_NO;
581   LOG (GNUNET_ERROR_TYPE_DEBUG,
582        "Receiving reply from datastore\n");
583   if (h->skip_next_messages > 0)
584   {
585     h->skip_next_messages--;
586     process_queue (h);
587     return;
588   }
589   if (NULL == (qe = h->queue_head))
590   {
591     GNUNET_break (0);
592     process_queue (h);
593     return;
594   }
595   qe->response_proc (h, msg);
596 }
597
598
599 /**
600  * Transmit request from queue to datastore service.
601  *
602  * @param cls the `struct GNUNET_DATASTORE_Handle`
603  * @param size number of bytes that can be copied to @a buf
604  * @param buf where to copy the drop message
605  * @return number of bytes written to @a buf
606  */
607 static size_t
608 transmit_request (void *cls,
609                   size_t size,
610                   void *buf)
611 {
612   struct GNUNET_DATASTORE_Handle *h = cls;
613   struct GNUNET_DATASTORE_QueueEntry *qe;
614   size_t msize;
615
616   h->th = NULL;
617   if (NULL == (qe = h->queue_head))
618     return 0;                   /* no entry in queue */
619   if (NULL == buf)
620   {
621     LOG (GNUNET_ERROR_TYPE_DEBUG,
622          "Failed to transmit request to DATASTORE.\n");
623     GNUNET_STATISTICS_update (h->stats,
624                               gettext_noop ("# transmission request failures"),
625                               1, GNUNET_NO);
626     do_disconnect (h);
627     return 0;
628   }
629   if (size < (msize = qe->message_size))
630   {
631     process_queue (h);
632     return 0;
633   }
634   LOG (GNUNET_ERROR_TYPE_DEBUG,
635        "Transmitting %u byte request to DATASTORE\n",
636        msize);
637   memcpy (buf, &qe[1], msize);
638   qe->was_transmitted = GNUNET_YES;
639   GNUNET_SCHEDULER_cancel (qe->task);
640   qe->task = NULL;
641   GNUNET_assert (GNUNET_NO == h->in_receive);
642   h->in_receive = GNUNET_YES;
643   GNUNET_CLIENT_receive (h->client,
644                          &receive_cb, h,
645                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
646 #if INSANE_STATISTICS
647   GNUNET_STATISTICS_update (h->stats,
648                             gettext_noop ("# bytes sent to datastore"), msize,
649                             GNUNET_NO);
650 #endif
651   return msize;
652 }
653
654
655 /**
656  * Process entries in the queue (or do nothing if we are already
657  * doing so).
658  *
659  * @param h handle to the datastore
660  */
661 static void
662 process_queue (struct GNUNET_DATASTORE_Handle *h)
663 {
664   struct GNUNET_DATASTORE_QueueEntry *qe;
665
666   if (NULL == (qe = h->queue_head))
667   {
668     /* no entry in queue */
669     LOG (GNUNET_ERROR_TYPE_DEBUG,
670          "Queue empty\n");
671     return;
672   }
673   if (GNUNET_YES == qe->was_transmitted)
674   {
675     /* waiting for replies */
676     LOG (GNUNET_ERROR_TYPE_DEBUG,
677          "Head request already transmitted\n");
678     return;
679   }
680   if (NULL != h->th)
681   {
682     /* request pending */
683     LOG (GNUNET_ERROR_TYPE_DEBUG,
684          "Pending transmission request\n");
685     return;
686   }
687   if (NULL == h->client)
688   {
689     /* waiting for reconnect */
690     LOG (GNUNET_ERROR_TYPE_DEBUG,
691          "Not connected\n");
692     return;
693   }
694   if (GNUNET_YES == h->in_receive)
695   {
696     /* wait for response to previous query */
697     return;
698   }
699   LOG (GNUNET_ERROR_TYPE_DEBUG,
700        "Queueing %u byte request to DATASTORE\n",
701        qe->message_size);
702   h->th
703     = GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
704                                            GNUNET_TIME_absolute_get_remaining (qe->timeout),
705                                            GNUNET_YES,
706                                            &transmit_request, h);
707   GNUNET_assert (GNUNET_NO == h->in_receive);
708   GNUNET_break (NULL != h->th);
709 }
710
711
712 /**
713  * Dummy continuation used to do nothing (but be non-zero).
714  *
715  * @param cls closure
716  * @param result result
717  * @param min_expiration expiration time
718  * @param emsg error message
719  */
720 static void
721 drop_status_cont (void *cls, int32_t result,
722                   struct GNUNET_TIME_Absolute min_expiration,
723                   const char *emsg)
724 {
725   /* do nothing */
726 }
727
728
729 /**
730  * Free a queue entry.  Removes the given entry from the
731  * queue and releases associated resources.  Does NOT
732  * call the callback.
733  *
734  * @param qe entry to free.
735  */
736 static void
737 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
738 {
739   struct GNUNET_DATASTORE_Handle *h = qe->h;
740
741   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
742   if (qe->task != NULL)
743   {
744     GNUNET_SCHEDULER_cancel (qe->task);
745     qe->task = NULL;
746   }
747   h->queue_size--;
748   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
749   GNUNET_free (qe);
750 }
751
752
753 /**
754  * Type of a function to call when we receive a message
755  * from the service.
756  *
757  * @param cls closure
758  * @param msg message received, NULL on timeout or fatal error
759  */
760 static void
761 process_status_message (void *cls,
762                         const struct GNUNET_MessageHeader *msg)
763 {
764   struct GNUNET_DATASTORE_Handle *h = cls;
765   struct GNUNET_DATASTORE_QueueEntry *qe;
766   struct StatusContext rc;
767   const struct StatusMessage *sm;
768   const char *emsg;
769   int32_t status;
770   int was_transmitted;
771
772   if (NULL == (qe = h->queue_head))
773   {
774     GNUNET_break (0);
775     do_disconnect (h);
776     return;
777   }
778   rc = qe->qc.sc;
779   if (NULL == msg)
780   {
781     was_transmitted = qe->was_transmitted;
782     free_queue_entry (qe);
783     if (was_transmitted == GNUNET_YES)
784       do_disconnect (h);
785     else
786       process_queue (h);
787     if (NULL != rc.cont)
788       rc.cont (rc.cont_cls, GNUNET_SYSERR,
789                GNUNET_TIME_UNIT_ZERO_ABS,
790                _("Failed to receive status response from database."));
791     return;
792   }
793   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
794   free_queue_entry (qe);
795   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
796       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
797   {
798     GNUNET_break (0);
799     h->retry_time = GNUNET_TIME_UNIT_ZERO;
800     do_disconnect (h);
801     if (rc.cont != NULL)
802       rc.cont (rc.cont_cls, GNUNET_SYSERR,
803                GNUNET_TIME_UNIT_ZERO_ABS,
804                _("Error reading response from datastore service"));
805     return;
806   }
807   sm = (const struct StatusMessage *) msg;
808   status = ntohl (sm->status);
809   emsg = NULL;
810   if (ntohs (msg->size) > sizeof (struct StatusMessage))
811   {
812     emsg = (const char *) &sm[1];
813     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
814     {
815       GNUNET_break (0);
816       emsg = _("Invalid error message received from datastore service");
817     }
818   }
819   if ((status == GNUNET_SYSERR) && (emsg == NULL))
820   {
821     GNUNET_break (0);
822     emsg = _("Invalid error message received from datastore service");
823   }
824   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
825   GNUNET_STATISTICS_update (h->stats,
826                             gettext_noop ("# status messages received"), 1,
827                             GNUNET_NO);
828   h->retry_time = GNUNET_TIME_UNIT_ZERO;
829   process_queue (h);
830   if (rc.cont != NULL)
831     rc.cont (rc.cont_cls, status,
832              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
833              emsg);
834 }
835
836
837 /**
838  * Store an item in the datastore.  If the item is already present,
839  * the priorities are summed up and the higher expiration time and
840  * lower anonymity level is used.
841  *
842  * @param h handle to the datastore
843  * @param rid reservation ID to use (from "reserve"); use 0 if no
844  *            prior reservation was made
845  * @param key key for the value
846  * @param size number of bytes in data
847  * @param data content stored
848  * @param type type of the content
849  * @param priority priority of the content
850  * @param anonymity anonymity-level for the content
851  * @param replication how often should the content be replicated to other peers?
852  * @param expiration expiration time for the content
853  * @param queue_priority ranking of this request in the priority queue
854  * @param max_queue_size at what queue size should this request be dropped
855  *        (if other requests of higher priority are in the queue)
856  * @param timeout timeout for the operation
857  * @param cont continuation to call when done
858  * @param cont_cls closure for @a cont
859  * @return NULL if the entry was not queued, otherwise a handle that can be used to
860  *         cancel; note that even if NULL is returned, the callback will be invoked
861  *         (or rather, will already have been invoked)
862  */
863 struct GNUNET_DATASTORE_QueueEntry *
864 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
865                       uint32_t rid,
866                       const struct GNUNET_HashCode *key,
867                       size_t size,
868                       const void *data,
869                       enum GNUNET_BLOCK_Type type,
870                       uint32_t priority,
871                       uint32_t anonymity,
872                       uint32_t replication,
873                       struct GNUNET_TIME_Absolute expiration,
874                       unsigned int queue_priority,
875                       unsigned int max_queue_size,
876                       struct GNUNET_TIME_Relative timeout,
877                       GNUNET_DATASTORE_ContinuationWithStatus cont,
878                       void *cont_cls)
879 {
880   struct GNUNET_DATASTORE_QueueEntry *qe;
881   struct DataMessage *dm;
882   size_t msize;
883   union QueueContext qc;
884
885   LOG (GNUNET_ERROR_TYPE_DEBUG,
886        "Asked to put %u bytes of data under key `%s' for %s\n", size,
887        GNUNET_h2s (key),
888        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
889                                                GNUNET_YES));
890   msize = sizeof (struct DataMessage) + size;
891   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
892   qc.sc.cont = cont;
893   qc.sc.cont_cls = cont_cls;
894   qe = make_queue_entry (h,
895                          msize,
896                          queue_priority,
897                          max_queue_size,
898                          timeout,
899                          &process_status_message, &qc);
900   if (qe == NULL)
901   {
902     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
903     return NULL;
904   }
905   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
906                             1, GNUNET_NO);
907   dm = (struct DataMessage *) &qe[1];
908   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
909   dm->header.size = htons (msize);
910   dm->rid = htonl (rid);
911   dm->size = htonl ((uint32_t) size);
912   dm->type = htonl (type);
913   dm->priority = htonl (priority);
914   dm->anonymity = htonl (anonymity);
915   dm->replication = htonl (replication);
916   dm->reserved = htonl (0);
917   dm->uid = GNUNET_htonll (0);
918   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
919   dm->key = *key;
920   memcpy (&dm[1], data, size);
921   process_queue (h);
922   return qe;
923 }
924
925
926 /**
927  * Reserve space in the datastore.  This function should be used
928  * to avoid "out of space" failures during a longer sequence of "put"
929  * operations (for example, when a file is being inserted).
930  *
931  * @param h handle to the datastore
932  * @param amount how much space (in bytes) should be reserved (for content only)
933  * @param entries how many entries will be created (to calculate per-entry overhead)
934  * @param cont continuation to call when done; "success" will be set to
935  *             a positive reservation value if space could be reserved.
936  * @param cont_cls closure for @a cont
937  * @return NULL if the entry was not queued, otherwise a handle that can be used to
938  *         cancel; note that even if NULL is returned, the callback will be invoked
939  *         (or rather, will already have been invoked)
940  */
941 struct GNUNET_DATASTORE_QueueEntry *
942 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
943                           uint32_t entries,
944                           GNUNET_DATASTORE_ContinuationWithStatus cont,
945                           void *cont_cls)
946 {
947   struct GNUNET_DATASTORE_QueueEntry *qe;
948   struct ReserveMessage *rm;
949   union QueueContext qc;
950
951   if (NULL == cont)
952     cont = &drop_status_cont;
953   LOG (GNUNET_ERROR_TYPE_DEBUG,
954        "Asked to reserve %llu bytes of data and %u entries\n",
955        (unsigned long long) amount, (unsigned int) entries);
956   qc.sc.cont = cont;
957   qc.sc.cont_cls = cont_cls;
958   qe = make_queue_entry (h,
959                          sizeof (struct ReserveMessage),
960                          UINT_MAX,
961                          UINT_MAX,
962                          GNUNET_TIME_UNIT_FOREVER_REL,
963                          &process_status_message, &qc);
964   if (NULL == qe)
965   {
966     LOG (GNUNET_ERROR_TYPE_DEBUG,
967          "Could not create queue entry to reserve\n");
968     return NULL;
969   }
970   GNUNET_STATISTICS_update (h->stats,
971                             gettext_noop ("# RESERVE requests executed"), 1,
972                             GNUNET_NO);
973   rm = (struct ReserveMessage *) &qe[1];
974   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
975   rm->header.size = htons (sizeof (struct ReserveMessage));
976   rm->entries = htonl (entries);
977   rm->amount = GNUNET_htonll (amount);
978   process_queue (h);
979   return qe;
980 }
981
982
983 /**
984  * Signal that all of the data for which a reservation was made has
985  * been stored and that whatever excess space might have been reserved
986  * can now be released.
987  *
988  * @param h handle to the datastore
989  * @param rid reservation ID (value of "success" in original continuation
990  *        from the "reserve" function).
991  * @param queue_priority ranking of this request in the priority queue
992  * @param max_queue_size at what queue size should this request be dropped
993  *        (if other requests of higher priority are in the queue)
994  * @param queue_priority ranking of this request in the priority queue
995  * @param max_queue_size at what queue size should this request be dropped
996  *        (if other requests of higher priority are in the queue)
997  * @param timeout how long to wait at most for a response
998  * @param cont continuation to call when done
999  * @param cont_cls closure for @a cont
1000  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1001  *         cancel; note that even if NULL is returned, the callback will be invoked
1002  *         (or rather, will already have been invoked)
1003  */
1004 struct GNUNET_DATASTORE_QueueEntry *
1005 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1006                                   uint32_t rid, unsigned int queue_priority,
1007                                   unsigned int max_queue_size,
1008                                   struct GNUNET_TIME_Relative timeout,
1009                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1010                                   void *cont_cls)
1011 {
1012   struct GNUNET_DATASTORE_QueueEntry *qe;
1013   struct ReleaseReserveMessage *rrm;
1014   union QueueContext qc;
1015
1016   if (cont == NULL)
1017     cont = &drop_status_cont;
1018   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1019   qc.sc.cont = cont;
1020   qc.sc.cont_cls = cont_cls;
1021   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1022                          queue_priority, max_queue_size, timeout,
1023                          &process_status_message, &qc);
1024   if (qe == NULL)
1025   {
1026     LOG (GNUNET_ERROR_TYPE_DEBUG,
1027          "Could not create queue entry to release reserve\n");
1028     return NULL;
1029   }
1030   GNUNET_STATISTICS_update (h->stats,
1031                             gettext_noop
1032                             ("# RELEASE RESERVE requests executed"), 1,
1033                             GNUNET_NO);
1034   rrm = (struct ReleaseReserveMessage *) &qe[1];
1035   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1036   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1037   rrm->rid = htonl (rid);
1038   process_queue (h);
1039   return qe;
1040 }
1041
1042
1043 /**
1044  * Update a value in the datastore.
1045  *
1046  * @param h handle to the datastore
1047  * @param uid identifier for the value
1048  * @param priority how much to increase the priority of the value
1049  * @param expiration new expiration value should be MAX of existing and this argument
1050  * @param queue_priority ranking of this request in the priority queue
1051  * @param max_queue_size at what queue size should this request be dropped
1052  *        (if other requests of higher priority are in the queue)
1053  * @param timeout how long to wait at most for a response
1054  * @param cont continuation to call when done
1055  * @param cont_cls closure for @a cont
1056  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1057  *         cancel; note that even if NULL is returned, the callback will be invoked
1058  *         (or rather, will already have been invoked)
1059  */
1060 struct GNUNET_DATASTORE_QueueEntry *
1061 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1062                          uint32_t priority,
1063                          struct GNUNET_TIME_Absolute expiration,
1064                          unsigned int queue_priority,
1065                          unsigned int max_queue_size,
1066                          struct GNUNET_TIME_Relative timeout,
1067                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1068                          void *cont_cls)
1069 {
1070   struct GNUNET_DATASTORE_QueueEntry *qe;
1071   struct UpdateMessage *um;
1072   union QueueContext qc;
1073
1074   if (cont == NULL)
1075     cont = &drop_status_cont;
1076   LOG (GNUNET_ERROR_TYPE_DEBUG,
1077        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1078        uid,
1079        (unsigned int) priority,
1080        GNUNET_STRINGS_absolute_time_to_string (expiration));
1081   qc.sc.cont = cont;
1082   qc.sc.cont_cls = cont_cls;
1083   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1084                          max_queue_size, timeout, &process_status_message, &qc);
1085   if (qe == NULL)
1086   {
1087     LOG (GNUNET_ERROR_TYPE_DEBUG,
1088          "Could not create queue entry for UPDATE\n");
1089     return NULL;
1090   }
1091   GNUNET_STATISTICS_update (h->stats,
1092                             gettext_noop ("# UPDATE requests executed"), 1,
1093                             GNUNET_NO);
1094   um = (struct UpdateMessage *) &qe[1];
1095   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1096   um->header.size = htons (sizeof (struct UpdateMessage));
1097   um->priority = htonl (priority);
1098   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1099   um->uid = GNUNET_htonll (uid);
1100   process_queue (h);
1101   return qe;
1102 }
1103
1104
1105 /**
1106  * Explicitly remove some content from the database.
1107  * The @a cont continuation will be called with `status`
1108  * #GNUNET_OK" if content was removed, #GNUNET_NO
1109  * if no matching entry was found and #GNUNET_SYSERR
1110  * on all other types of errors.
1111  *
1112  * @param h handle to the datastore
1113  * @param key key for the value
1114  * @param size number of bytes in data
1115  * @param data content stored
1116  * @param queue_priority ranking of this request in the priority queue
1117  * @param max_queue_size at what queue size should this request be dropped
1118  *        (if other requests of higher priority are in the queue)
1119  * @param timeout how long to wait at most for a response
1120  * @param cont continuation to call when done
1121  * @param cont_cls closure for @a cont
1122  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1123  *         cancel; note that even if NULL is returned, the callback will be invoked
1124  *         (or rather, will already have been invoked)
1125  */
1126 struct GNUNET_DATASTORE_QueueEntry *
1127 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1128                          const struct GNUNET_HashCode * key, size_t size,
1129                          const void *data, unsigned int queue_priority,
1130                          unsigned int max_queue_size,
1131                          struct GNUNET_TIME_Relative timeout,
1132                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1133                          void *cont_cls)
1134 {
1135   struct GNUNET_DATASTORE_QueueEntry *qe;
1136   struct DataMessage *dm;
1137   size_t msize;
1138   union QueueContext qc;
1139
1140   if (cont == NULL)
1141     cont = &drop_status_cont;
1142   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1143        size, GNUNET_h2s (key));
1144   qc.sc.cont = cont;
1145   qc.sc.cont_cls = cont_cls;
1146   msize = sizeof (struct DataMessage) + size;
1147   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1148   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1149                          &process_status_message, &qc);
1150   if (qe == NULL)
1151   {
1152     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1153     return NULL;
1154   }
1155   GNUNET_STATISTICS_update (h->stats,
1156                             gettext_noop ("# REMOVE requests executed"), 1,
1157                             GNUNET_NO);
1158   dm = (struct DataMessage *) &qe[1];
1159   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1160   dm->header.size = htons (msize);
1161   dm->rid = htonl (0);
1162   dm->size = htonl (size);
1163   dm->type = htonl (0);
1164   dm->priority = htonl (0);
1165   dm->anonymity = htonl (0);
1166   dm->uid = GNUNET_htonll (0);
1167   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1168   dm->key = *key;
1169   memcpy (&dm[1], data, size);
1170   process_queue (h);
1171   return qe;
1172 }
1173
1174
1175 /**
1176  * Type of a function to call when we receive a message
1177  * from the service.
1178  *
1179  * @param cls closure with the `struct GNUNET_DATASTORE_Handle *`
1180  * @param msg message received, NULL on timeout or fatal error
1181  */
1182 static void
1183 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1184 {
1185   struct GNUNET_DATASTORE_Handle *h = cls;
1186   struct GNUNET_DATASTORE_QueueEntry *qe;
1187   struct ResultContext rc;
1188   const struct DataMessage *dm;
1189   int was_transmitted;
1190
1191   if (NULL == msg)
1192   {
1193     qe = h->queue_head;
1194     GNUNET_assert (NULL != qe);
1195     rc = qe->qc.rc;
1196     was_transmitted = qe->was_transmitted;
1197     free_queue_entry (qe);
1198     if (GNUNET_YES == was_transmitted)
1199     {
1200       LOG (GNUNET_ERROR_TYPE_DEBUG,
1201            "Failed to receive response from database.\n");
1202       do_disconnect (h);
1203     }
1204     else
1205     {
1206       process_queue (h);
1207     }
1208     if (NULL != rc.proc)
1209       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1210                0);
1211     return;
1212   }
1213   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1214   {
1215     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1216     qe = h->queue_head;
1217     rc = qe->qc.rc;
1218     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1219     free_queue_entry (qe);
1220     LOG (GNUNET_ERROR_TYPE_DEBUG,
1221          "Received end of result set, new queue size is %u\n", h->queue_size);
1222     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1223     h->result_count = 0;
1224     process_queue (h);
1225     if (NULL != rc.proc)
1226       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1227                0);
1228     return;
1229   }
1230   qe = h->queue_head;
1231   GNUNET_assert (NULL != qe);
1232   rc = qe->qc.rc;
1233   if (GNUNET_YES != qe->was_transmitted)
1234   {
1235     GNUNET_break (0);
1236     free_queue_entry (qe);
1237     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1238     do_disconnect (h);
1239     if (rc.proc != NULL)
1240       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1241                0);
1242     return;
1243   }
1244   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1245       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1246       (ntohs (msg->size) !=
1247        sizeof (struct DataMessage) +
1248        ntohl (((const struct DataMessage *) msg)->size)))
1249   {
1250     GNUNET_break (0);
1251     free_queue_entry (qe);
1252     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1253     do_disconnect (h);
1254     if (rc.proc != NULL)
1255       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1256                0);
1257     return;
1258   }
1259 #if INSANE_STATISTICS
1260   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1261                             GNUNET_NO);
1262 #endif
1263   dm = (const struct DataMessage *) msg;
1264   LOG (GNUNET_ERROR_TYPE_DEBUG,
1265        "Received result %llu with type %u and size %u with key %s\n",
1266        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1267        ntohl (dm->size), GNUNET_h2s (&dm->key));
1268   free_queue_entry (qe);
1269   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1270   process_queue (h);
1271   if (rc.proc != NULL)
1272     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1273              ntohl (dm->priority), ntohl (dm->anonymity),
1274              GNUNET_TIME_absolute_ntoh (dm->expiration),
1275              GNUNET_ntohll (dm->uid));
1276 }
1277
1278
1279 /**
1280  * Get a random value from the datastore for content replication.
1281  * Returns a single, random value among those with the highest
1282  * replication score, lowering positive replication scores by one for
1283  * the chosen value (if only content with a replication score exists,
1284  * a random value is returned and replication scores are not changed).
1285  *
1286  * @param h handle to the datastore
1287  * @param queue_priority ranking of this request in the priority queue
1288  * @param max_queue_size at what queue size should this request be dropped
1289  *        (if other requests of higher priority are in the queue)
1290  * @param timeout how long to wait at most for a response
1291  * @param proc function to call on a random value; it
1292  *        will be called once with a value (if available)
1293  *        and always once with a value of NULL.
1294  * @param proc_cls closure for @a proc
1295  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1296  *         cancel
1297  */
1298 struct GNUNET_DATASTORE_QueueEntry *
1299 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1300                                       unsigned int queue_priority,
1301                                       unsigned int max_queue_size,
1302                                       struct GNUNET_TIME_Relative timeout,
1303                                       GNUNET_DATASTORE_DatumProcessor proc,
1304                                       void *proc_cls)
1305 {
1306   struct GNUNET_DATASTORE_QueueEntry *qe;
1307   struct GNUNET_MessageHeader *m;
1308   union QueueContext qc;
1309
1310   GNUNET_assert (NULL != proc);
1311   LOG (GNUNET_ERROR_TYPE_DEBUG,
1312        "Asked to get replication entry in %s\n",
1313        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1314   qc.rc.proc = proc;
1315   qc.rc.proc_cls = proc_cls;
1316   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1317                          queue_priority, max_queue_size, timeout,
1318                          &process_result_message, &qc);
1319   if (NULL == qe)
1320   {
1321     LOG (GNUNET_ERROR_TYPE_DEBUG,
1322          "Could not create queue entry for GET REPLICATION\n");
1323     return NULL;
1324   }
1325   GNUNET_STATISTICS_update (h->stats,
1326                             gettext_noop
1327                             ("# GET REPLICATION requests executed"), 1,
1328                             GNUNET_NO);
1329   m = (struct GNUNET_MessageHeader *) &qe[1];
1330   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1331   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1332   process_queue (h);
1333   return qe;
1334 }
1335
1336
1337 /**
1338  * Get a single zero-anonymity value from the datastore.
1339  *
1340  * @param h handle to the datastore
1341  * @param offset offset of the result (modulo num-results); set to
1342  *               a random 64-bit value initially; then increment by
1343  *               one each time; detect that all results have been found by uid
1344  *               being again the first uid ever returned.
1345  * @param queue_priority ranking of this request in the priority queue
1346  * @param max_queue_size at what queue size should this request be dropped
1347  *        (if other requests of higher priority are in the queue)
1348  * @param timeout how long to wait at most for a response
1349  * @param type allowed type for the operation (never zero)
1350  * @param proc function to call on a random value; it
1351  *        will be called once with a value (if available)
1352  *        or with NULL if none value exists.
1353  * @param proc_cls closure for @a proc
1354  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1355  *         cancel
1356  */
1357 struct GNUNET_DATASTORE_QueueEntry *
1358 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1359                                      uint64_t offset,
1360                                      unsigned int queue_priority,
1361                                      unsigned int max_queue_size,
1362                                      struct GNUNET_TIME_Relative timeout,
1363                                      enum GNUNET_BLOCK_Type type,
1364                                      GNUNET_DATASTORE_DatumProcessor proc,
1365                                      void *proc_cls)
1366 {
1367   struct GNUNET_DATASTORE_QueueEntry *qe;
1368   struct GetZeroAnonymityMessage *m;
1369   union QueueContext qc;
1370
1371   GNUNET_assert (NULL != proc);
1372   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1373   LOG (GNUNET_ERROR_TYPE_DEBUG,
1374        "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
1375        (unsigned long long) offset, type,
1376        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1377   qc.rc.proc = proc;
1378   qc.rc.proc_cls = proc_cls;
1379   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1380                          queue_priority, max_queue_size, timeout,
1381                          &process_result_message, &qc);
1382   if (NULL == qe)
1383   {
1384     LOG (GNUNET_ERROR_TYPE_DEBUG,
1385          "Could not create queue entry for zero-anonymity procation\n");
1386     return NULL;
1387   }
1388   GNUNET_STATISTICS_update (h->stats,
1389                             gettext_noop
1390                             ("# GET ZERO ANONYMITY requests executed"), 1,
1391                             GNUNET_NO);
1392   m = (struct GetZeroAnonymityMessage *) &qe[1];
1393   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1394   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1395   m->type = htonl ((uint32_t) type);
1396   m->offset = GNUNET_htonll (offset);
1397   process_queue (h);
1398   return qe;
1399 }
1400
1401
1402 /**
1403  * Get a result for a particular key from the datastore.  The processor
1404  * will only be called once.
1405  *
1406  * @param h handle to the datastore
1407  * @param offset offset of the result (modulo num-results); set to
1408  *               a random 64-bit value initially; then increment by
1409  *               one each time; detect that all results have been found by uid
1410  *               being again the first uid ever returned.
1411  * @param key maybe NULL (to match all entries)
1412  * @param type desired type, 0 for any
1413  * @param queue_priority ranking of this request in the priority queue
1414  * @param max_queue_size at what queue size should this request be dropped
1415  *        (if other requests of higher priority are in the queue)
1416  * @param timeout how long to wait at most for a response
1417  * @param proc function to call on each matching value;
1418  *        will be called once with a NULL value at the end
1419  * @param proc_cls closure for @a proc
1420  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1421  *         cancel
1422  */
1423 struct GNUNET_DATASTORE_QueueEntry *
1424 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1425                           uint64_t offset,
1426                           const struct GNUNET_HashCode * key,
1427                           enum GNUNET_BLOCK_Type type,
1428                           unsigned int queue_priority,
1429                           unsigned int max_queue_size,
1430                           struct GNUNET_TIME_Relative timeout,
1431                           GNUNET_DATASTORE_DatumProcessor proc,
1432                           void *proc_cls)
1433 {
1434   struct GNUNET_DATASTORE_QueueEntry *qe;
1435   struct GetMessage *gm;
1436   union QueueContext qc;
1437
1438   GNUNET_assert (NULL != proc);
1439   LOG (GNUNET_ERROR_TYPE_DEBUG,
1440        "Asked to look for data of type %u under key `%s'\n",
1441        (unsigned int) type, GNUNET_h2s (key));
1442   qc.rc.proc = proc;
1443   qc.rc.proc_cls = proc_cls;
1444   qe = make_queue_entry (h,
1445                          sizeof (struct GetMessage),
1446                          queue_priority,
1447                          max_queue_size,
1448                          timeout,
1449                          &process_result_message,
1450                          &qc);
1451   if (qe == NULL)
1452   {
1453     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1454          GNUNET_h2s (key));
1455     return NULL;
1456   }
1457 #if INSANE_STATISTICS
1458   GNUNET_STATISTICS_update (h->stats,
1459                             gettext_noop ("# GET requests executed"),
1460                             1,
1461                             GNUNET_NO);
1462 #endif
1463   gm = (struct GetMessage *) &qe[1];
1464   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1465   gm->type = htonl (type);
1466   gm->offset = GNUNET_htonll (offset);
1467   if (key != NULL)
1468   {
1469     gm->header.size = htons (sizeof (struct GetMessage));
1470     gm->key = *key;
1471   }
1472   else
1473   {
1474     gm->header.size =
1475         htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1476   }
1477   process_queue (h);
1478   return qe;
1479 }
1480
1481
1482 /**
1483  * Cancel a datastore operation.  The final callback from the
1484  * operation must not have been done yet.
1485  *
1486  * @param qe operation to cancel
1487  */
1488 void
1489 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1490 {
1491   struct GNUNET_DATASTORE_Handle *h;
1492
1493   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1494   h = qe->h;
1495   LOG (GNUNET_ERROR_TYPE_DEBUG,
1496        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1497        qe->was_transmitted, h->queue_head == qe);
1498   if (GNUNET_YES == qe->was_transmitted)
1499   {
1500     free_queue_entry (qe);
1501     h->skip_next_messages++;
1502     return;
1503   }
1504   free_queue_entry (qe);
1505   process_queue (h);
1506 }
1507
1508
1509 /* end of datastore_api.c */