handling replies continuously from server
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * If a client stopped asking for more results, how many more do
38  * we receive from the DB before killing the connection?  Trade-off
39  * between re-doing TCP handshakes and (needlessly) receiving
40  * useless results.
41  */
42 #define MAX_EXCESS_RESULTS 8
43
44 /**
45  * Context for processing status messages.
46  */
47 struct StatusContext
48 {
49   /**
50    * Continuation to call with the status.
51    */
52   GNUNET_DATASTORE_ContinuationWithStatus cont;
53
54   /**
55    * Closure for cont.
56    */
57   void *cont_cls;
58
59 };
60
61
62 /**
63  * Context for processing result messages.
64  */
65 struct ResultContext
66 {
67   /**
68    * Function to call with the result.
69    */
70   GNUNET_DATASTORE_DatumProcessor proc;
71
72   /**
73    * Closure for proc.
74    */
75   void *proc_cls;
76
77 };
78
79
80 /**
81  *  Context for a queue operation.
82  */
83 union QueueContext
84 {
85
86   struct StatusContext sc;
87
88   struct ResultContext rc;
89
90 };
91
92
93
94 /**
95  * Entry in our priority queue.
96  */
97 struct GNUNET_DATASTORE_QueueEntry
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct GNUNET_DATASTORE_QueueEntry *next;
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *prev;
109
110   /**
111    * Handle to the master context.
112    */
113   struct GNUNET_DATASTORE_Handle *h;
114
115   /**
116    * Response processor (NULL if we are not waiting for a response).
117    * This struct should be used for the closure, function-specific
118    * arguments can be passed via 'qc'.
119    */
120   GNUNET_CLIENT_MessageHandler response_proc;
121
122   /**
123    * Function to call after transmission of the request.
124    */
125   GNUNET_DATASTORE_ContinuationWithStatus cont;
126
127   /**
128    * Closure for 'cont'.
129    */
130   void *cont_cls;
131
132   /**
133    * Context for the operation.
134    */
135   union QueueContext qc;
136
137   /**
138    * Task for timeout signalling.
139    */
140   GNUNET_SCHEDULER_TaskIdentifier task;
141
142   /**
143    * Timeout for the current operation.
144    */
145   struct GNUNET_TIME_Absolute timeout;
146
147   /**
148    * Priority in the queue.
149    */
150   unsigned int priority;
151
152   /**
153    * Maximum allowed length of queue (otherwise
154    * this request should be discarded).
155    */
156   unsigned int max_queue;
157
158   /**
159    * Number of bytes in the request message following
160    * this struct.  32-bit value for nicer memory
161    * access (and overall struct alignment).
162    */
163   uint32_t message_size;
164
165   /**
166    * Has this message been transmitted to the service?
167    * Only ever GNUNET_YES for the head of the queue.
168    * Note that the overall struct should end at a
169    * multiple of 64 bits.
170    */
171   int was_transmitted;
172
173 };
174
175 /**
176  * Handle to the datastore service.
177  */
178 struct GNUNET_DATASTORE_Handle
179 {
180
181   /**
182    * Our configuration.
183    */
184   const struct GNUNET_CONFIGURATION_Handle *cfg;
185
186   /**
187    * Current connection to the datastore service.
188    */
189   struct GNUNET_CLIENT_Connection *client;
190
191   /**
192    * Handle for statistics.
193    */
194   struct GNUNET_STATISTICS_Handle *stats;
195
196   /**
197    * Current transmit handle.
198    */
199   struct GNUNET_CLIENT_TransmitHandle *th;
200
201   /**
202    * Current head of priority queue.
203    */
204   struct GNUNET_DATASTORE_QueueEntry *queue_head;
205
206   /**
207    * Current tail of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
210
211   /**
212    * Task for trying to reconnect.
213    */
214   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
215
216   /**
217    * How quickly should we retry?  Used for exponential back-off on
218    * connect-errors.
219    */
220   struct GNUNET_TIME_Relative retry_time;
221
222   /**
223    * Number of entries in the queue.
224    */
225   unsigned int queue_size;
226
227   /**
228    * Number of results we're receiving for the current query
229    * after application stopped to care.  Used to determine when
230    * to reset the connection.
231    */
232   unsigned int result_count;
233
234   /**
235    * Are we currently trying to receive from the service?
236    */
237   int in_receive;
238
239   /**
240    * We should ignore the next message(s) from the service.
241    */
242   unsigned int skip_next_messages;
243
244 };
245
246
247
248 /**
249  * Connect to the datastore service.
250  *
251  * @param cfg configuration to use
252  * @return handle to use to access the service
253  */
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL;                /* oops */
263   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
268   return h;
269 }
270
271
272 /**
273  * Task used by 'transmit_drop' to disconnect the datastore.
274  *
275  * @param cls the datastore handle
276  * @param tc scheduler context
277  */
278 static void
279 disconnect_after_drop (void *cls,
280                        const struct GNUNET_SCHEDULER_TaskContext *tc)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283
284   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
285 }
286
287
288 /**
289  * Transmit DROP message to datastore service.
290  *
291  * @param cls the 'struct GNUNET_DATASTORE_Handle'
292  * @param size number of bytes that can be copied to buf
293  * @param buf where to copy the drop message
294  * @return number of bytes written to buf
295  */
296 static size_t
297 transmit_drop (void *cls, size_t size, void *buf)
298 {
299   struct GNUNET_DATASTORE_Handle *h = cls;
300   struct GNUNET_MessageHeader *hdr;
301
302   if (buf == NULL)
303   {
304     LOG (GNUNET_ERROR_TYPE_WARNING,
305          _("Failed to transmit request to drop database.\n"));
306     GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
307                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
308     return 0;
309   }
310   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
311   hdr = buf;
312   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
313   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
314   GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
315                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
316   return sizeof (struct GNUNET_MessageHeader);
317 }
318
319
320 /**
321  * Disconnect from the datastore service (and free
322  * associated resources).
323  *
324  * @param h handle to the datastore
325  * @param drop set to GNUNET_YES to delete all data in datastore (!)
326  */
327 void
328 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
329 {
330   struct GNUNET_DATASTORE_QueueEntry *qe;
331
332   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
333   if (NULL != h->th)
334   {
335     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
336     h->th = NULL;
337   }
338   if (h->client != NULL)
339   {
340     GNUNET_CLIENT_disconnect (h->client);
341     h->client = NULL;
342   }
343   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
344   {
345     GNUNET_SCHEDULER_cancel (h->reconnect_task);
346     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
347   }
348   while (NULL != (qe = h->queue_head))
349   {
350     GNUNET_assert (NULL != qe->response_proc);
351     qe->response_proc (h, NULL);
352   }
353   if (GNUNET_YES == drop)
354   {
355     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
356     if (h->client != NULL)
357     {
358       if (NULL !=
359           GNUNET_CLIENT_notify_transmit_ready (h->client,
360                                                sizeof (struct
361                                                        GNUNET_MessageHeader),
362                                                GNUNET_TIME_UNIT_MINUTES,
363                                                GNUNET_YES, &transmit_drop, h))
364         return;
365       GNUNET_CLIENT_disconnect (h->client);
366       h->client = NULL;
367     }
368     GNUNET_break (0);
369   }
370   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
371   h->stats = NULL;
372   GNUNET_free (h);
373 }
374
375
376 /**
377  * A request has timed out (before being transmitted to the service).
378  *
379  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
380  * @param tc scheduler context
381  */
382 static void
383 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
384 {
385   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
386
387   GNUNET_STATISTICS_update (qe->h->stats,
388                             gettext_noop ("# queue entry timeouts"), 1,
389                             GNUNET_NO);
390   qe->task = GNUNET_SCHEDULER_NO_TASK;
391   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
392   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
393   qe->response_proc (qe->h, NULL);
394 }
395
396
397 /**
398  * Create a new entry for our priority queue (and possibly discard other entires if
399  * the queue is getting too long).
400  *
401  * @param h handle to the datastore
402  * @param msize size of the message to queue
403  * @param queue_priority priority of the entry
404  * @param max_queue_size at what queue size should this request be dropped
405  *        (if other requests of higher priority are in the queue)
406  * @param timeout timeout for the operation
407  * @param response_proc function to call with replies (can be NULL)
408  * @param qc client context (NOT a closure for response_proc)
409  * @return NULL if the queue is full
410  */
411 static struct GNUNET_DATASTORE_QueueEntry *
412 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
413                   unsigned int queue_priority, unsigned int max_queue_size,
414                   struct GNUNET_TIME_Relative timeout,
415                   GNUNET_CLIENT_MessageHandler response_proc,
416                   const union QueueContext *qc)
417 {
418   struct GNUNET_DATASTORE_QueueEntry *ret;
419   struct GNUNET_DATASTORE_QueueEntry *pos;
420   unsigned int c;
421
422   c = 0;
423   pos = h->queue_head;
424   while ((pos != NULL) && (c < max_queue_size) &&
425          (pos->priority >= queue_priority))
426   {
427     c++;
428     pos = pos->next;
429   }
430   if (c >= max_queue_size)
431   {
432     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
433                               GNUNET_NO);
434     return NULL;
435   }
436   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
437   ret->h = h;
438   ret->response_proc = response_proc;
439   ret->qc = *qc;
440   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
441   ret->priority = queue_priority;
442   ret->max_queue = max_queue_size;
443   ret->message_size = msize;
444   ret->was_transmitted = GNUNET_NO;
445   if (pos == NULL)
446   {
447     /* append at the tail */
448     pos = h->queue_tail;
449   }
450   else
451   {
452     pos = pos->prev;
453     /* do not insert at HEAD if HEAD query was already
454      * transmitted and we are still receiving replies! */
455     if ((pos == NULL) && (h->queue_head->was_transmitted))
456       pos = h->queue_head;
457   }
458   c++;
459   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
460                             1, GNUNET_NO);
461   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
462   h->queue_size++;
463   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
464   pos = ret->next;
465   while (pos != NULL)
466   {
467     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
468     {
469       GNUNET_assert (pos->response_proc != NULL);
470       /* move 'pos' element to head so that it will be
471        * killed on 'NULL' call below */
472       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");
473       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
474       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
475       GNUNET_STATISTICS_update (h->stats,
476                                 gettext_noop
477                                 ("# Requests dropped from datastore queue"), 1,
478                                 GNUNET_NO);
479       GNUNET_assert (h->queue_head == pos);
480       pos->response_proc (h, NULL);
481       break;
482     }
483     pos = pos->next;
484   }
485   return ret;
486 }
487
488
489 /**
490  * Process entries in the queue (or do nothing if we are already
491  * doing so).
492  *
493  * @param h handle to the datastore
494  */
495 static void
496 process_queue (struct GNUNET_DATASTORE_Handle *h);
497
498
499 /**
500  * Try reconnecting to the datastore service.
501  *
502  * @param cls the 'struct GNUNET_DATASTORE_Handle'
503  * @param tc scheduler context
504  */
505 static void
506 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
507 {
508   struct GNUNET_DATASTORE_Handle *h = cls;
509
510   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
511     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
512   else
513     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
514   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
515     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
516   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
517   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
518   if (h->client == NULL)
519   {
520     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
521     return;
522   }
523   GNUNET_STATISTICS_update (h->stats,
524                             gettext_noop
525                             ("# datastore connections (re)created"), 1,
526                             GNUNET_NO);
527   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
528   process_queue (h);
529 }
530
531
532 /**
533  * Disconnect from the service and then try reconnecting to the datastore service
534  * after some delay.
535  *
536  * @param h handle to datastore to disconnect and reconnect
537  */
538 static void
539 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
540 {
541   if (h->client == NULL)
542   {
543     LOG (GNUNET_ERROR_TYPE_DEBUG,
544          "client NULL in disconnect, will not try to reconnect\n");
545     return;
546   }
547 #if 0
548   GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
549                             1, GNUNET_NO);
550 #endif
551   GNUNET_CLIENT_disconnect (h->client);
552   h->skip_next_messages = 0;
553   h->client = NULL;
554   h->reconnect_task =
555       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
556 }
557
558
559 /**
560  * Function called whenever we receive a message from
561  * the service.  Calls the appropriate handler.
562  *
563  * @param cls the 'struct GNUNET_DATASTORE_Handle'
564  * @param msg the received message
565  */
566 static void
567 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
568 {
569   struct GNUNET_DATASTORE_Handle *h = cls;
570   struct GNUNET_DATASTORE_QueueEntry *qe;
571
572   h->in_receive = GNUNET_NO;
573   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
574   if (h->skip_next_messages > 0)
575   {
576     h->skip_next_messages--;
577     process_queue (h);
578     return;
579   }
580   if (NULL == (qe = h->queue_head))
581   {
582     GNUNET_break (0);
583     process_queue (h);
584     return;
585   }
586   qe->response_proc (h, msg);
587 }
588
589
590 /**
591  * Transmit request from queue to datastore service.
592  *
593  * @param cls the 'struct GNUNET_DATASTORE_Handle'
594  * @param size number of bytes that can be copied to buf
595  * @param buf where to copy the drop message
596  * @return number of bytes written to buf
597  */
598 static size_t
599 transmit_request (void *cls, size_t size, void *buf)
600 {
601   struct GNUNET_DATASTORE_Handle *h = cls;
602   struct GNUNET_DATASTORE_QueueEntry *qe;
603   size_t msize;
604
605   h->th = NULL;
606   if (NULL == (qe = h->queue_head))
607     return 0;                   /* no entry in queue */
608   if (buf == NULL)
609   {
610     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
611     GNUNET_STATISTICS_update (h->stats,
612                               gettext_noop ("# transmission request failures"),
613                               1, GNUNET_NO);
614     do_disconnect (h);
615     return 0;
616   }
617   if (size < (msize = qe->message_size))
618   {
619     process_queue (h);
620     return 0;
621   }
622   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
623        msize);
624   memcpy (buf, &qe[1], msize);
625   qe->was_transmitted = GNUNET_YES;
626   GNUNET_SCHEDULER_cancel (qe->task);
627   qe->task = GNUNET_SCHEDULER_NO_TASK;
628   GNUNET_assert (GNUNET_NO == h->in_receive);
629   h->in_receive = GNUNET_YES;
630   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
631                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
632   GNUNET_STATISTICS_update (h->stats,
633                             gettext_noop ("# bytes sent to datastore"), 1,
634                             GNUNET_NO);
635   return msize;
636 }
637
638
639 /**
640  * Process entries in the queue (or do nothing if we are already
641  * doing so).
642  *
643  * @param h handle to the datastore
644  */
645 static void
646 process_queue (struct GNUNET_DATASTORE_Handle *h)
647 {
648   struct GNUNET_DATASTORE_QueueEntry *qe;
649
650   if (NULL == (qe = h->queue_head))
651   {
652     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
653     return;                     /* no entry in queue */
654   }
655   if (qe->was_transmitted == GNUNET_YES)
656   {
657     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
658     return;                     /* waiting for replies */
659   }
660   if (h->th != NULL)
661   {
662     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
663     return;                     /* request pending */
664   }
665   if (h->client == NULL)
666   {
667     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
668     return;                     /* waiting for reconnect */
669   }
670   if (GNUNET_YES == h->in_receive)
671   {
672     /* wait for response to previous query */
673     return;
674   }
675   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
676        qe->message_size);
677   h->th =
678       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
679                                            GNUNET_TIME_absolute_get_remaining
680                                            (qe->timeout), GNUNET_YES,
681                                            &transmit_request, h);
682   GNUNET_assert (GNUNET_NO == h->in_receive);
683   GNUNET_break (NULL != h->th);
684 }
685
686
687 /**
688  * Dummy continuation used to do nothing (but be non-zero).
689  *
690  * @param cls closure
691  * @param result result
692  * @param min_expiration expiration time
693  * @param emsg error message
694  */
695 static void
696 drop_status_cont (void *cls, int32_t result, 
697                   struct GNUNET_TIME_Absolute min_expiration,
698                   const char *emsg)
699 {
700   /* do nothing */
701 }
702
703
704 /**
705  * Free a queue entry.  Removes the given entry from the
706  * queue and releases associated resources.  Does NOT
707  * call the callback.
708  *
709  * @param qe entry to free.
710  */
711 static void
712 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
713 {
714   struct GNUNET_DATASTORE_Handle *h = qe->h;
715
716   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
717   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
718   {
719     GNUNET_SCHEDULER_cancel (qe->task);
720     qe->task = GNUNET_SCHEDULER_NO_TASK;
721   }
722   h->queue_size--;
723   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
724   GNUNET_free (qe);
725 }
726
727
728 /**
729  * Type of a function to call when we receive a message
730  * from the service.
731  *
732  * @param cls closure
733  * @param msg message received, NULL on timeout or fatal error
734  */
735 static void
736 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
737 {
738   struct GNUNET_DATASTORE_Handle *h = cls;
739   struct GNUNET_DATASTORE_QueueEntry *qe;
740   struct StatusContext rc;
741   const struct StatusMessage *sm;
742   const char *emsg;
743   int32_t status;
744   int was_transmitted;
745
746   if (NULL == (qe = h->queue_head))
747   {
748     GNUNET_break (0);
749     do_disconnect (h);
750     return;
751   }
752   rc = qe->qc.sc;
753   if (msg == NULL)
754   {
755     was_transmitted = qe->was_transmitted;
756     free_queue_entry (qe);
757     if (was_transmitted == GNUNET_YES)
758       do_disconnect (h);
759     else
760       process_queue (h);
761     if (rc.cont != NULL)
762       rc.cont (rc.cont_cls, GNUNET_SYSERR,
763                GNUNET_TIME_UNIT_ZERO_ABS,
764                _("Failed to receive status response from database."));
765     return;
766   }
767   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
768   free_queue_entry (qe);
769   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
770       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
771   {
772     GNUNET_break (0);
773     h->retry_time = GNUNET_TIME_UNIT_ZERO;
774     do_disconnect (h);
775     if (rc.cont != NULL)
776       rc.cont (rc.cont_cls, GNUNET_SYSERR,
777                GNUNET_TIME_UNIT_ZERO_ABS,
778                _("Error reading response from datastore service"));
779     return;
780   }
781   sm = (const struct StatusMessage *) msg;
782   status = ntohl (sm->status);
783   emsg = NULL;
784   if (ntohs (msg->size) > sizeof (struct StatusMessage))
785   {
786     emsg = (const char *) &sm[1];
787     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
788     {
789       GNUNET_break (0);
790       emsg = _("Invalid error message received from datastore service");
791     }
792   }
793   if ((status == GNUNET_SYSERR) && (emsg == NULL))
794   {
795     GNUNET_break (0);
796     emsg = _("Invalid error message received from datastore service");
797   }
798   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
799   GNUNET_STATISTICS_update (h->stats,
800                             gettext_noop ("# status messages received"), 1,
801                             GNUNET_NO);
802   h->retry_time.rel_value = 0;
803   process_queue (h);
804   if (rc.cont != NULL)
805     rc.cont (rc.cont_cls, status, 
806              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
807              emsg);
808 }
809
810
811 /**
812  * Store an item in the datastore.  If the item is already present,
813  * the priorities are summed up and the higher expiration time and
814  * lower anonymity level is used.
815  *
816  * @param h handle to the datastore
817  * @param rid reservation ID to use (from "reserve"); use 0 if no
818  *            prior reservation was made
819  * @param key key for the value
820  * @param size number of bytes in data
821  * @param data content stored
822  * @param type type of the content
823  * @param priority priority of the content
824  * @param anonymity anonymity-level for the content
825  * @param replication how often should the content be replicated to other peers?
826  * @param expiration expiration time for the content
827  * @param queue_priority ranking of this request in the priority queue
828  * @param max_queue_size at what queue size should this request be dropped
829  *        (if other requests of higher priority are in the queue)
830  * @param timeout timeout for the operation
831  * @param cont continuation to call when done
832  * @param cont_cls closure for cont
833  * @return NULL if the entry was not queued, otherwise a handle that can be used to
834  *         cancel; note that even if NULL is returned, the callback will be invoked
835  *         (or rather, will already have been invoked)
836  */
837 struct GNUNET_DATASTORE_QueueEntry *
838 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
839                       const GNUNET_HashCode * key, size_t size,
840                       const void *data, enum GNUNET_BLOCK_Type type,
841                       uint32_t priority, uint32_t anonymity,
842                       uint32_t replication,
843                       struct GNUNET_TIME_Absolute expiration,
844                       unsigned int queue_priority, unsigned int max_queue_size,
845                       struct GNUNET_TIME_Relative timeout,
846                       GNUNET_DATASTORE_ContinuationWithStatus cont,
847                       void *cont_cls)
848 {
849   struct GNUNET_DATASTORE_QueueEntry *qe;
850   struct DataMessage *dm;
851   size_t msize;
852   union QueueContext qc;
853
854   LOG (GNUNET_ERROR_TYPE_DEBUG,
855        "Asked to put %u bytes of data under key `%s' for %llu ms\n", size,
856        GNUNET_h2s (key),
857        GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
858   msize = sizeof (struct DataMessage) + size;
859   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
860   qc.sc.cont = cont;
861   qc.sc.cont_cls = cont_cls;
862   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
863                          &process_status_message, &qc);
864   if (qe == NULL)
865   {
866     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
867     return NULL;
868   }
869   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
870                             1, GNUNET_NO);
871   dm = (struct DataMessage *) &qe[1];
872   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
873   dm->header.size = htons (msize);
874   dm->rid = htonl (rid);
875   dm->size = htonl ((uint32_t) size);
876   dm->type = htonl (type);
877   dm->priority = htonl (priority);
878   dm->anonymity = htonl (anonymity);
879   dm->replication = htonl (replication);
880   dm->reserved = htonl (0);
881   dm->uid = GNUNET_htonll (0);
882   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
883   dm->key = *key;
884   memcpy (&dm[1], data, size);
885   process_queue (h);
886   return qe;
887 }
888
889
890 /**
891  * Reserve space in the datastore.  This function should be used
892  * to avoid "out of space" failures during a longer sequence of "put"
893  * operations (for example, when a file is being inserted).
894  *
895  * @param h handle to the datastore
896  * @param amount how much space (in bytes) should be reserved (for content only)
897  * @param entries how many entries will be created (to calculate per-entry overhead)
898  * @param queue_priority ranking of this request in the priority queue
899  * @param max_queue_size at what queue size should this request be dropped
900  *        (if other requests of higher priority are in the queue)
901  * @param timeout how long to wait at most for a response (or before dying in queue)
902  * @param cont continuation to call when done; "success" will be set to
903  *             a positive reservation value if space could be reserved.
904  * @param cont_cls closure for cont
905  * @return NULL if the entry was not queued, otherwise a handle that can be used to
906  *         cancel; note that even if NULL is returned, the callback will be invoked
907  *         (or rather, will already have been invoked)
908  */
909 struct GNUNET_DATASTORE_QueueEntry *
910 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
911                           uint32_t entries, unsigned int queue_priority,
912                           unsigned int max_queue_size,
913                           struct GNUNET_TIME_Relative timeout,
914                           GNUNET_DATASTORE_ContinuationWithStatus cont,
915                           void *cont_cls)
916 {
917   struct GNUNET_DATASTORE_QueueEntry *qe;
918   struct ReserveMessage *rm;
919   union QueueContext qc;
920
921   if (cont == NULL)
922     cont = &drop_status_cont;
923   LOG (GNUNET_ERROR_TYPE_DEBUG,
924        "Asked to reserve %llu bytes of data and %u entries\n",
925        (unsigned long long) amount, (unsigned int) entries);
926   qc.sc.cont = cont;
927   qc.sc.cont_cls = cont_cls;
928   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
929                          max_queue_size, timeout, &process_status_message, &qc);
930   if (qe == NULL)
931   {
932     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
933     return NULL;
934   }
935   GNUNET_STATISTICS_update (h->stats,
936                             gettext_noop ("# RESERVE requests executed"), 1,
937                             GNUNET_NO);
938   rm = (struct ReserveMessage *) &qe[1];
939   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
940   rm->header.size = htons (sizeof (struct ReserveMessage));
941   rm->entries = htonl (entries);
942   rm->amount = GNUNET_htonll (amount);
943   process_queue (h);
944   return qe;
945 }
946
947
948 /**
949  * Signal that all of the data for which a reservation was made has
950  * been stored and that whatever excess space might have been reserved
951  * can now be released.
952  *
953  * @param h handle to the datastore
954  * @param rid reservation ID (value of "success" in original continuation
955  *        from the "reserve" function).
956  * @param queue_priority ranking of this request in the priority queue
957  * @param max_queue_size at what queue size should this request be dropped
958  *        (if other requests of higher priority are in the queue)
959  * @param queue_priority ranking of this request in the priority queue
960  * @param max_queue_size at what queue size should this request be dropped
961  *        (if other requests of higher priority are in the queue)
962  * @param timeout how long to wait at most for a response
963  * @param cont continuation to call when done
964  * @param cont_cls closure for cont
965  * @return NULL if the entry was not queued, otherwise a handle that can be used to
966  *         cancel; note that even if NULL is returned, the callback will be invoked
967  *         (or rather, will already have been invoked)
968  */
969 struct GNUNET_DATASTORE_QueueEntry *
970 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
971                                   uint32_t rid, unsigned int queue_priority,
972                                   unsigned int max_queue_size,
973                                   struct GNUNET_TIME_Relative timeout,
974                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
975                                   void *cont_cls)
976 {
977   struct GNUNET_DATASTORE_QueueEntry *qe;
978   struct ReleaseReserveMessage *rrm;
979   union QueueContext qc;
980
981   if (cont == NULL)
982     cont = &drop_status_cont;
983   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
984   qc.sc.cont = cont;
985   qc.sc.cont_cls = cont_cls;
986   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
987                          queue_priority, max_queue_size, timeout,
988                          &process_status_message, &qc);
989   if (qe == NULL)
990   {
991     LOG (GNUNET_ERROR_TYPE_DEBUG,
992          "Could not create queue entry to release reserve\n");
993     return NULL;
994   }
995   GNUNET_STATISTICS_update (h->stats,
996                             gettext_noop
997                             ("# RELEASE RESERVE requests executed"), 1,
998                             GNUNET_NO);
999   rrm = (struct ReleaseReserveMessage *) &qe[1];
1000   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1001   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1002   rrm->rid = htonl (rid);
1003   process_queue (h);
1004   return qe;
1005 }
1006
1007
1008 /**
1009  * Update a value in the datastore.
1010  *
1011  * @param h handle to the datastore
1012  * @param uid identifier for the value
1013  * @param priority how much to increase the priority of the value
1014  * @param expiration new expiration value should be MAX of existing and this argument
1015  * @param queue_priority ranking of this request in the priority queue
1016  * @param max_queue_size at what queue size should this request be dropped
1017  *        (if other requests of higher priority are in the queue)
1018  * @param timeout how long to wait at most for a response
1019  * @param cont continuation to call when done
1020  * @param cont_cls closure for cont
1021  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1022  *         cancel; note that even if NULL is returned, the callback will be invoked
1023  *         (or rather, will already have been invoked)
1024  */
1025 struct GNUNET_DATASTORE_QueueEntry *
1026 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1027                          uint32_t priority,
1028                          struct GNUNET_TIME_Absolute expiration,
1029                          unsigned int queue_priority,
1030                          unsigned int max_queue_size,
1031                          struct GNUNET_TIME_Relative timeout,
1032                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1033                          void *cont_cls)
1034 {
1035   struct GNUNET_DATASTORE_QueueEntry *qe;
1036   struct UpdateMessage *um;
1037   union QueueContext qc;
1038
1039   if (cont == NULL)
1040     cont = &drop_status_cont;
1041   LOG (GNUNET_ERROR_TYPE_DEBUG,
1042        "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1043        uid, (unsigned int) priority, (unsigned long long) expiration.abs_value);
1044   qc.sc.cont = cont;
1045   qc.sc.cont_cls = cont_cls;
1046   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1047                          max_queue_size, timeout, &process_status_message, &qc);
1048   if (qe == NULL)
1049   {
1050     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1051     return NULL;
1052   }
1053   GNUNET_STATISTICS_update (h->stats,
1054                             gettext_noop ("# UPDATE requests executed"), 1,
1055                             GNUNET_NO);
1056   um = (struct UpdateMessage *) &qe[1];
1057   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1058   um->header.size = htons (sizeof (struct UpdateMessage));
1059   um->priority = htonl (priority);
1060   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1061   um->uid = GNUNET_htonll (uid);
1062   process_queue (h);
1063   return qe;
1064 }
1065
1066
1067 /**
1068  * Explicitly remove some content from the database.
1069  * The "cont"inuation will be called with status
1070  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1071  * if no matching entry was found and "GNUNET_SYSERR"
1072  * on all other types of errors.
1073  *
1074  * @param h handle to the datastore
1075  * @param key key for the value
1076  * @param size number of bytes in data
1077  * @param data content stored
1078  * @param queue_priority ranking of this request in the priority queue
1079  * @param max_queue_size at what queue size should this request be dropped
1080  *        (if other requests of higher priority are in the queue)
1081  * @param timeout how long to wait at most for a response
1082  * @param cont continuation to call when done
1083  * @param cont_cls closure for cont
1084  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1085  *         cancel; note that even if NULL is returned, the callback will be invoked
1086  *         (or rather, will already have been invoked)
1087  */
1088 struct GNUNET_DATASTORE_QueueEntry *
1089 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1090                          const GNUNET_HashCode * key, size_t size,
1091                          const void *data, unsigned int queue_priority,
1092                          unsigned int max_queue_size,
1093                          struct GNUNET_TIME_Relative timeout,
1094                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1095                          void *cont_cls)
1096 {
1097   struct GNUNET_DATASTORE_QueueEntry *qe;
1098   struct DataMessage *dm;
1099   size_t msize;
1100   union QueueContext qc;
1101
1102   if (cont == NULL)
1103     cont = &drop_status_cont;
1104   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1105        size, GNUNET_h2s (key));
1106   qc.sc.cont = cont;
1107   qc.sc.cont_cls = cont_cls;
1108   msize = sizeof (struct DataMessage) + size;
1109   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1110   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1111                          &process_status_message, &qc);
1112   if (qe == NULL)
1113   {
1114     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1115     return NULL;
1116   }
1117   GNUNET_STATISTICS_update (h->stats,
1118                             gettext_noop ("# REMOVE requests executed"), 1,
1119                             GNUNET_NO);
1120   dm = (struct DataMessage *) &qe[1];
1121   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1122   dm->header.size = htons (msize);
1123   dm->rid = htonl (0);
1124   dm->size = htonl (size);
1125   dm->type = htonl (0);
1126   dm->priority = htonl (0);
1127   dm->anonymity = htonl (0);
1128   dm->uid = GNUNET_htonll (0);
1129   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1130   dm->key = *key;
1131   memcpy (&dm[1], data, size);
1132   process_queue (h);
1133   return qe;
1134 }
1135
1136
1137 /**
1138  * Type of a function to call when we receive a message
1139  * from the service.
1140  *
1141  * @param cls closure
1142  * @param msg message received, NULL on timeout or fatal error
1143  */
1144 static void
1145 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1146 {
1147   struct GNUNET_DATASTORE_Handle *h = cls;
1148   struct GNUNET_DATASTORE_QueueEntry *qe;
1149   struct ResultContext rc;
1150   const struct DataMessage *dm;
1151   int was_transmitted;
1152
1153   if (msg == NULL)
1154   {
1155     qe = h->queue_head;
1156     GNUNET_assert (NULL != qe);
1157     rc = qe->qc.rc;
1158     was_transmitted = qe->was_transmitted;
1159     free_queue_entry (qe);
1160     if (was_transmitted == GNUNET_YES)
1161     {
1162       LOG (GNUNET_ERROR_TYPE_WARNING,
1163            _("Failed to receive response from database.\n"));
1164       do_disconnect (h);
1165     }
1166     else
1167     {
1168       process_queue (h);
1169     }
1170     if (rc.proc != NULL)
1171       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1172                0);
1173     return;
1174   }
1175   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1176   {
1177     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1178     qe = h->queue_head;
1179     rc = qe->qc.rc;
1180     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1181     free_queue_entry (qe);
1182     LOG (GNUNET_ERROR_TYPE_DEBUG,
1183          "Received end of result set, new queue size is %u\n", h->queue_size);
1184     h->retry_time.rel_value = 0;
1185     h->result_count = 0;
1186     process_queue (h);
1187     if (rc.proc != NULL)
1188       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1189                0);
1190     return;
1191   }
1192   qe = h->queue_head;
1193   GNUNET_assert (NULL != qe);
1194   rc = qe->qc.rc;
1195   if (GNUNET_YES != qe->was_transmitted)
1196   {
1197     GNUNET_break (0);
1198     free_queue_entry (qe);
1199     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1200     do_disconnect (h);
1201     if (rc.proc != NULL)
1202       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1203                0);
1204     return;
1205   }
1206   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1207       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1208       (ntohs (msg->size) !=
1209        sizeof (struct DataMessage) +
1210        ntohl (((const struct DataMessage *) msg)->size)))
1211   {
1212     GNUNET_break (0);
1213     free_queue_entry (qe);
1214     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1215     do_disconnect (h);
1216     if (rc.proc != NULL)
1217       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1218                0);
1219     return;
1220   }
1221   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1222                             GNUNET_NO);
1223   dm = (const struct DataMessage *) msg;
1224   LOG (GNUNET_ERROR_TYPE_DEBUG,
1225        "Received result %llu with type %u and size %u with key %s\n",
1226        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1227        ntohl (dm->size), GNUNET_h2s (&dm->key));
1228   free_queue_entry (qe);
1229   h->retry_time.rel_value = 0;
1230   process_queue (h);
1231   if (rc.proc != NULL)
1232     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1233              ntohl (dm->priority), ntohl (dm->anonymity),
1234              GNUNET_TIME_absolute_ntoh (dm->expiration),
1235              GNUNET_ntohll (dm->uid));
1236 }
1237
1238
1239 /**
1240  * Get a random value from the datastore for content replication.
1241  * Returns a single, random value among those with the highest
1242  * replication score, lowering positive replication scores by one for
1243  * the chosen value (if only content with a replication score exists,
1244  * a random value is returned and replication scores are not changed).
1245  *
1246  * @param h handle to the datastore
1247  * @param queue_priority ranking of this request in the priority queue
1248  * @param max_queue_size at what queue size should this request be dropped
1249  *        (if other requests of higher priority are in the queue)
1250  * @param timeout how long to wait at most for a response
1251  * @param proc function to call on a random value; it
1252  *        will be called once with a value (if available)
1253  *        and always once with a value of NULL.
1254  * @param proc_cls closure for proc
1255  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1256  *         cancel
1257  */
1258 struct GNUNET_DATASTORE_QueueEntry *
1259 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1260                                       unsigned int queue_priority,
1261                                       unsigned int max_queue_size,
1262                                       struct GNUNET_TIME_Relative timeout,
1263                                       GNUNET_DATASTORE_DatumProcessor proc,
1264                                       void *proc_cls)
1265 {
1266   struct GNUNET_DATASTORE_QueueEntry *qe;
1267   struct GNUNET_MessageHeader *m;
1268   union QueueContext qc;
1269
1270   GNUNET_assert (NULL != proc);
1271   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n",
1272        (unsigned long long) timeout.rel_value);
1273   qc.rc.proc = proc;
1274   qc.rc.proc_cls = proc_cls;
1275   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1276                          queue_priority, max_queue_size, timeout,
1277                          &process_result_message, &qc);
1278   if (qe == NULL)
1279   {
1280     LOG (GNUNET_ERROR_TYPE_DEBUG,
1281          "Could not create queue entry for GET REPLICATION\n");
1282     return NULL;
1283   }
1284   GNUNET_STATISTICS_update (h->stats,
1285                             gettext_noop
1286                             ("# GET REPLICATION requests executed"), 1,
1287                             GNUNET_NO);
1288   m = (struct GNUNET_MessageHeader *) &qe[1];
1289   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1290   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1291   process_queue (h);
1292   return qe;
1293 }
1294
1295
1296 /**
1297  * Get a single zero-anonymity value from the datastore.
1298  *
1299  * @param h handle to the datastore
1300  * @param offset offset of the result (modulo num-results); set to
1301  *               a random 64-bit value initially; then increment by
1302  *               one each time; detect that all results have been found by uid
1303  *               being again the first uid ever returned.
1304  * @param queue_priority ranking of this request in the priority queue
1305  * @param max_queue_size at what queue size should this request be dropped
1306  *        (if other requests of higher priority are in the queue)
1307  * @param timeout how long to wait at most for a response
1308  * @param type allowed type for the operation (never zero)
1309  * @param proc function to call on a random value; it
1310  *        will be called once with a value (if available)
1311  *        or with NULL if none value exists.
1312  * @param proc_cls closure for proc
1313  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1314  *         cancel
1315  */
1316 struct GNUNET_DATASTORE_QueueEntry *
1317 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1318                                      uint64_t offset,
1319                                      unsigned int queue_priority,
1320                                      unsigned int max_queue_size,
1321                                      struct GNUNET_TIME_Relative timeout,
1322                                      enum GNUNET_BLOCK_Type type,
1323                                      GNUNET_DATASTORE_DatumProcessor proc,
1324                                      void *proc_cls)
1325 {
1326   struct GNUNET_DATASTORE_QueueEntry *qe;
1327   struct GetZeroAnonymityMessage *m;
1328   union QueueContext qc;
1329
1330   GNUNET_assert (NULL != proc);
1331   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1332   LOG (GNUNET_ERROR_TYPE_DEBUG,
1333        "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1334        (unsigned long long) offset, type,
1335        (unsigned long long) timeout.rel_value);
1336   qc.rc.proc = proc;
1337   qc.rc.proc_cls = proc_cls;
1338   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1339                          queue_priority, max_queue_size, timeout,
1340                          &process_result_message, &qc);
1341   if (qe == NULL)
1342   {
1343     LOG (GNUNET_ERROR_TYPE_DEBUG,
1344          "Could not create queue entry for zero-anonymity procation\n");
1345     return NULL;
1346   }
1347   GNUNET_STATISTICS_update (h->stats,
1348                             gettext_noop
1349                             ("# GET ZERO ANONYMITY requests executed"), 1,
1350                             GNUNET_NO);
1351   m = (struct GetZeroAnonymityMessage *) &qe[1];
1352   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1353   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1354   m->type = htonl ((uint32_t) type);
1355   m->offset = GNUNET_htonll (offset);
1356   process_queue (h);
1357   return qe;
1358 }
1359
1360
1361 /**
1362  * Get a result for a particular key from the datastore.  The processor
1363  * will only be called once.
1364  *
1365  * @param h handle to the datastore
1366  * @param offset offset of the result (modulo num-results); set to
1367  *               a random 64-bit value initially; then increment by
1368  *               one each time; detect that all results have been found by uid
1369  *               being again the first uid ever returned.
1370  * @param key maybe NULL (to match all entries)
1371  * @param type desired type, 0 for any
1372  * @param queue_priority ranking of this request in the priority queue
1373  * @param max_queue_size at what queue size should this request be dropped
1374  *        (if other requests of higher priority are in the queue)
1375  * @param timeout how long to wait at most for a response
1376  * @param proc function to call on each matching value;
1377  *        will be called once with a NULL value at the end
1378  * @param proc_cls closure for proc
1379  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1380  *         cancel
1381  */
1382 struct GNUNET_DATASTORE_QueueEntry *
1383 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1384                           const GNUNET_HashCode * key,
1385                           enum GNUNET_BLOCK_Type type,
1386                           unsigned int queue_priority,
1387                           unsigned int max_queue_size,
1388                           struct GNUNET_TIME_Relative timeout,
1389                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1390 {
1391   struct GNUNET_DATASTORE_QueueEntry *qe;
1392   struct GetMessage *gm;
1393   union QueueContext qc;
1394
1395   GNUNET_assert (NULL != proc);
1396   LOG (GNUNET_ERROR_TYPE_DEBUG,
1397        "Asked to look for data of type %u under key `%s'\n",
1398        (unsigned int) type, GNUNET_h2s (key));
1399   qc.rc.proc = proc;
1400   qc.rc.proc_cls = proc_cls;
1401   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1402                          max_queue_size, timeout, &process_result_message, &qc);
1403   if (qe == NULL)
1404   {
1405     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1406          GNUNET_h2s (key));
1407     return NULL;
1408   }
1409   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1410                             1, GNUNET_NO);
1411   gm = (struct GetMessage *) &qe[1];
1412   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1413   gm->type = htonl (type);
1414   gm->offset = GNUNET_htonll (offset);
1415   if (key != NULL)
1416   {
1417     gm->header.size = htons (sizeof (struct GetMessage));
1418     gm->key = *key;
1419   }
1420   else
1421   {
1422     gm->header.size =
1423         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1424   }
1425   process_queue (h);
1426   return qe;
1427 }
1428
1429
1430 /**
1431  * Cancel a datastore operation.  The final callback from the
1432  * operation must not have been done yet.
1433  *
1434  * @param qe operation to cancel
1435  */
1436 void
1437 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1438 {
1439   struct GNUNET_DATASTORE_Handle *h;
1440
1441   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1442   h = qe->h;
1443   LOG (GNUNET_ERROR_TYPE_DEBUG,
1444        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1445        qe->was_transmitted, h->queue_head == qe);
1446   if (GNUNET_YES == qe->was_transmitted)
1447   {
1448     free_queue_entry (qe);
1449     h->skip_next_messages++;
1450     return;
1451   }
1452   free_queue_entry (qe);
1453   process_queue (h);
1454 }
1455
1456
1457 /* end of datastore_api.c */