Add missing include
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98
99 /**
100  * Entry in our priority queue.
101  */
102 struct GNUNET_DATASTORE_QueueEntry
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *next;
109
110   /**
111    * This is a linked list.
112    */
113   struct GNUNET_DATASTORE_QueueEntry *prev;
114
115   /**
116    * Handle to the master context.
117    */
118   struct GNUNET_DATASTORE_Handle *h;
119
120   /**
121    * Response processor (NULL if we are not waiting for a response).
122    * This struct should be used for the closure, function-specific
123    * arguments can be passed via 'qc'.
124    */
125   GNUNET_CLIENT_MessageHandler response_proc;
126
127   /**
128    * Function to call after transmission of the request.
129    */
130   GNUNET_DATASTORE_ContinuationWithStatus cont;
131
132   /**
133    * Closure for 'cont'.
134    */
135   void *cont_cls;
136
137   /**
138    * Context for the operation.
139    */
140   union QueueContext qc;
141
142   /**
143    * Task for timeout signalling.
144    */
145   GNUNET_SCHEDULER_TaskIdentifier task;
146
147   /**
148    * Timeout for the current operation.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Priority in the queue.
154    */
155   unsigned int priority;
156
157   /**
158    * Maximum allowed length of queue (otherwise
159    * this request should be discarded).
160    */
161   unsigned int max_queue;
162
163   /**
164    * Number of bytes in the request message following
165    * this struct.  32-bit value for nicer memory
166    * access (and overall struct alignment).
167    */
168   uint32_t message_size;
169
170   /**
171    * Has this message been transmitted to the service?
172    * Only ever GNUNET_YES for the head of the queue.
173    * Note that the overall struct should end at a
174    * multiple of 64 bits.
175    */
176   int was_transmitted;
177
178 };
179
180 /**
181  * Handle to the datastore service.
182  */
183 struct GNUNET_DATASTORE_Handle
184 {
185
186   /**
187    * Our configuration.
188    */
189   const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244   /**
245    * We should ignore the next message(s) from the service.
246    */
247   unsigned int skip_next_messages;
248
249 };
250
251
252
253 /**
254  * Connect to the datastore service.
255  *
256  * @param cfg configuration to use
257  * @return handle to use to access the service
258  */
259 struct GNUNET_DATASTORE_Handle *
260 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
261 {
262   struct GNUNET_CLIENT_Connection *c;
263   struct GNUNET_DATASTORE_Handle *h;
264
265   c = GNUNET_CLIENT_connect ("datastore", cfg);
266   if (c == NULL)
267     return NULL;                /* oops */
268   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
269                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
270   h->client = c;
271   h->cfg = cfg;
272   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
273   return h;
274 }
275
276
277 /**
278  * Task used by 'transmit_drop' to disconnect the datastore.
279  *
280  * @param cls the datastore handle
281  * @param tc scheduler context
282  */
283 static void
284 disconnect_after_drop (void *cls,
285                        const struct GNUNET_SCHEDULER_TaskContext *tc)
286 {
287   struct GNUNET_DATASTORE_Handle *h = cls;
288
289   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290 }
291
292
293 /**
294  * Transmit DROP message to datastore service.
295  *
296  * @param cls the 'struct GNUNET_DATASTORE_Handle'
297  * @param size number of bytes that can be copied to buf
298  * @param buf where to copy the drop message
299  * @return number of bytes written to buf
300  */
301 static size_t
302 transmit_drop (void *cls, size_t size, void *buf)
303 {
304   struct GNUNET_DATASTORE_Handle *h = cls;
305   struct GNUNET_MessageHeader *hdr;
306
307   if (buf == NULL)
308   {
309     LOG (GNUNET_ERROR_TYPE_WARNING,
310          _("Failed to transmit request to drop database.\n"));
311     GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
312                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
313     return 0;
314   }
315   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
316   hdr = buf;
317   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
318   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
319   GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
320                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
321   return sizeof (struct GNUNET_MessageHeader);
322 }
323
324
325 /**
326  * Disconnect from the datastore service (and free
327  * associated resources).
328  *
329  * @param h handle to the datastore
330  * @param drop set to GNUNET_YES to delete all data in datastore (!)
331  */
332 void
333 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
334 {
335   struct GNUNET_DATASTORE_QueueEntry *qe;
336
337   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
338   if (NULL != h->th)
339   {
340     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
341     h->th = NULL;
342   }
343   if (h->client != NULL)
344   {
345     GNUNET_CLIENT_disconnect (h->client);
346     h->client = NULL;
347   }
348   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
349   {
350     GNUNET_SCHEDULER_cancel (h->reconnect_task);
351     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
352   }
353   while (NULL != (qe = h->queue_head))
354   {
355     GNUNET_assert (NULL != qe->response_proc);
356     qe->response_proc (h, NULL);
357   }
358   if (GNUNET_YES == drop)
359   {
360     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
361     if (h->client != NULL)
362     {
363       if (NULL !=
364           GNUNET_CLIENT_notify_transmit_ready (h->client,
365                                                sizeof (struct
366                                                        GNUNET_MessageHeader),
367                                                GNUNET_TIME_UNIT_MINUTES,
368                                                GNUNET_YES, &transmit_drop, h))
369         return;
370       GNUNET_CLIENT_disconnect (h->client);
371       h->client = NULL;
372     }
373     GNUNET_break (0);
374   }
375   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
376   h->stats = NULL;
377   GNUNET_free (h);
378 }
379
380
381 /**
382  * A request has timed out (before being transmitted to the service).
383  *
384  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
385  * @param tc scheduler context
386  */
387 static void
388 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
391
392   GNUNET_STATISTICS_update (qe->h->stats,
393                             gettext_noop ("# queue entry timeouts"), 1,
394                             GNUNET_NO);
395   qe->task = GNUNET_SCHEDULER_NO_TASK;
396   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
397   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
398   qe->response_proc (qe->h, NULL);
399 }
400
401
402 /**
403  * Create a new entry for our priority queue (and possibly discard other entires if
404  * the queue is getting too long).
405  *
406  * @param h handle to the datastore
407  * @param msize size of the message to queue
408  * @param queue_priority priority of the entry
409  * @param max_queue_size at what queue size should this request be dropped
410  *        (if other requests of higher priority are in the queue)
411  * @param timeout timeout for the operation
412  * @param response_proc function to call with replies (can be NULL)
413  * @param qc client context (NOT a closure for response_proc)
414  * @return NULL if the queue is full
415  */
416 static struct GNUNET_DATASTORE_QueueEntry *
417 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
418                   unsigned int queue_priority, unsigned int max_queue_size,
419                   struct GNUNET_TIME_Relative timeout,
420                   GNUNET_CLIENT_MessageHandler response_proc,
421                   const union QueueContext *qc)
422 {
423   struct GNUNET_DATASTORE_QueueEntry *ret;
424   struct GNUNET_DATASTORE_QueueEntry *pos;
425   unsigned int c;
426
427   c = 0;
428   pos = h->queue_head;
429   while ((pos != NULL) && (c < max_queue_size) &&
430          (pos->priority >= queue_priority))
431   {
432     c++;
433     pos = pos->next;
434   }
435   if (c >= max_queue_size)
436   {
437     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
438                               GNUNET_NO);
439     return NULL;
440   }
441   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
442   ret->h = h;
443   ret->response_proc = response_proc;
444   ret->qc = *qc;
445   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
446   ret->priority = queue_priority;
447   ret->max_queue = max_queue_size;
448   ret->message_size = msize;
449   ret->was_transmitted = GNUNET_NO;
450   if (pos == NULL)
451   {
452     /* append at the tail */
453     pos = h->queue_tail;
454   }
455   else
456   {
457     pos = pos->prev;
458     /* do not insert at HEAD if HEAD query was already
459      * transmitted and we are still receiving replies! */
460     if ((pos == NULL) && (h->queue_head->was_transmitted))
461       pos = h->queue_head;
462   }
463   c++;
464 #if INSANE_STATISTICS
465   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
466                             1, GNUNET_NO);
467 #endif
468   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
469   h->queue_size++;
470   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
471   for (pos = ret->next; NULL != pos; pos = pos->next)
472   {
473     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
474     {
475       GNUNET_assert (pos->response_proc != NULL);
476       /* move 'pos' element to head so that it will be
477        * killed on 'NULL' call below */
478       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");
479       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
480       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
481       GNUNET_STATISTICS_update (h->stats,
482                                 gettext_noop
483                                 ("# Requests dropped from datastore queue"), 1,
484                                 GNUNET_NO);
485       GNUNET_assert (h->queue_head == pos);
486       pos->response_proc (h, NULL);
487       break;
488     }
489   }
490   return ret;
491 }
492
493
494 /**
495  * Process entries in the queue (or do nothing if we are already
496  * doing so).
497  *
498  * @param h handle to the datastore
499  */
500 static void
501 process_queue (struct GNUNET_DATASTORE_Handle *h);
502
503
504 /**
505  * Try reconnecting to the datastore service.
506  *
507  * @param cls the 'struct GNUNET_DATASTORE_Handle'
508  * @param tc scheduler context
509  */
510 static void
511 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
512 {
513   struct GNUNET_DATASTORE_Handle *h = cls;
514
515   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
516   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
517   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
518   if (h->client == NULL)
519   {
520     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
521     return;
522   }
523   GNUNET_STATISTICS_update (h->stats,
524                             gettext_noop
525                             ("# datastore connections (re)created"), 1,
526                             GNUNET_NO);
527   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
528   process_queue (h);
529 }
530
531
532 /**
533  * Disconnect from the service and then try reconnecting to the datastore service
534  * after some delay.
535  *
536  * @param h handle to datastore to disconnect and reconnect
537  */
538 static void
539 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
540 {
541   if (h->client == NULL)
542   {
543     LOG (GNUNET_ERROR_TYPE_DEBUG,
544          "client NULL in disconnect, will not try to reconnect\n");
545     return;
546   }
547   GNUNET_CLIENT_disconnect (h->client);
548   h->skip_next_messages = 0;
549   h->client = NULL;
550   h->reconnect_task =
551       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
552 }
553
554
555 /**
556  * Function called whenever we receive a message from
557  * the service.  Calls the appropriate handler.
558  *
559  * @param cls the 'struct GNUNET_DATASTORE_Handle'
560  * @param msg the received message
561  */
562 static void
563 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
564 {
565   struct GNUNET_DATASTORE_Handle *h = cls;
566   struct GNUNET_DATASTORE_QueueEntry *qe;
567
568   h->in_receive = GNUNET_NO;
569   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
570   if (h->skip_next_messages > 0)
571   {
572     h->skip_next_messages--;
573     process_queue (h);
574     return;
575   }
576   if (NULL == (qe = h->queue_head))
577   {
578     GNUNET_break (0);
579     process_queue (h);
580     return;
581   }
582   qe->response_proc (h, msg);
583 }
584
585
586 /**
587  * Transmit request from queue to datastore service.
588  *
589  * @param cls the 'struct GNUNET_DATASTORE_Handle'
590  * @param size number of bytes that can be copied to buf
591  * @param buf where to copy the drop message
592  * @return number of bytes written to buf
593  */
594 static size_t
595 transmit_request (void *cls, size_t size, void *buf)
596 {
597   struct GNUNET_DATASTORE_Handle *h = cls;
598   struct GNUNET_DATASTORE_QueueEntry *qe;
599   size_t msize;
600
601   h->th = NULL;
602   if (NULL == (qe = h->queue_head))
603     return 0;                   /* no entry in queue */
604   if (buf == NULL)
605   {
606     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
607     GNUNET_STATISTICS_update (h->stats,
608                               gettext_noop ("# transmission request failures"),
609                               1, GNUNET_NO);
610     do_disconnect (h);
611     return 0;
612   }
613   if (size < (msize = qe->message_size))
614   {
615     process_queue (h);
616     return 0;
617   }
618   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
619        msize);
620   memcpy (buf, &qe[1], msize);
621   qe->was_transmitted = GNUNET_YES;
622   GNUNET_SCHEDULER_cancel (qe->task);
623   qe->task = GNUNET_SCHEDULER_NO_TASK;
624   GNUNET_assert (GNUNET_NO == h->in_receive);
625   h->in_receive = GNUNET_YES;
626   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
627                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
628 #if INSANE_STATISTICS
629   GNUNET_STATISTICS_update (h->stats,
630                             gettext_noop ("# bytes sent to datastore"), 1,
631                             GNUNET_NO);
632 #endif
633   return msize;
634 }
635
636
637 /**
638  * Process entries in the queue (or do nothing if we are already
639  * doing so).
640  *
641  * @param h handle to the datastore
642  */
643 static void
644 process_queue (struct GNUNET_DATASTORE_Handle *h)
645 {
646   struct GNUNET_DATASTORE_QueueEntry *qe;
647
648   if (NULL == (qe = h->queue_head))
649   {
650     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
651     return;                     /* no entry in queue */
652   }
653   if (qe->was_transmitted == GNUNET_YES)
654   {
655     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
656     return;                     /* waiting for replies */
657   }
658   if (h->th != NULL)
659   {
660     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
661     return;                     /* request pending */
662   }
663   if (h->client == NULL)
664   {
665     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
666     return;                     /* waiting for reconnect */
667   }
668   if (GNUNET_YES == h->in_receive)
669   {
670     /* wait for response to previous query */
671     return;
672   }
673   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
674        qe->message_size);
675   h->th =
676       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
677                                            GNUNET_TIME_absolute_get_remaining
678                                            (qe->timeout), GNUNET_YES,
679                                            &transmit_request, h);
680   GNUNET_assert (GNUNET_NO == h->in_receive);
681   GNUNET_break (NULL != h->th);
682 }
683
684
685 /**
686  * Dummy continuation used to do nothing (but be non-zero).
687  *
688  * @param cls closure
689  * @param result result
690  * @param min_expiration expiration time
691  * @param emsg error message
692  */
693 static void
694 drop_status_cont (void *cls, int32_t result,
695                   struct GNUNET_TIME_Absolute min_expiration,
696                   const char *emsg)
697 {
698   /* do nothing */
699 }
700
701
702 /**
703  * Free a queue entry.  Removes the given entry from the
704  * queue and releases associated resources.  Does NOT
705  * call the callback.
706  *
707  * @param qe entry to free.
708  */
709 static void
710 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
711 {
712   struct GNUNET_DATASTORE_Handle *h = qe->h;
713
714   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
715   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
716   {
717     GNUNET_SCHEDULER_cancel (qe->task);
718     qe->task = GNUNET_SCHEDULER_NO_TASK;
719   }
720   h->queue_size--;
721   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
722   GNUNET_free (qe);
723 }
724
725
726 /**
727  * Type of a function to call when we receive a message
728  * from the service.
729  *
730  * @param cls closure
731  * @param msg message received, NULL on timeout or fatal error
732  */
733 static void
734 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
735 {
736   struct GNUNET_DATASTORE_Handle *h = cls;
737   struct GNUNET_DATASTORE_QueueEntry *qe;
738   struct StatusContext rc;
739   const struct StatusMessage *sm;
740   const char *emsg;
741   int32_t status;
742   int was_transmitted;
743
744   if (NULL == (qe = h->queue_head))
745   {
746     GNUNET_break (0);
747     do_disconnect (h);
748     return;
749   }
750   rc = qe->qc.sc;
751   if (msg == NULL)
752   {
753     was_transmitted = qe->was_transmitted;
754     free_queue_entry (qe);
755     if (was_transmitted == GNUNET_YES)
756       do_disconnect (h);
757     else
758       process_queue (h);
759     if (rc.cont != NULL)
760       rc.cont (rc.cont_cls, GNUNET_SYSERR,
761                GNUNET_TIME_UNIT_ZERO_ABS,
762                _("Failed to receive status response from database."));
763     return;
764   }
765   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
766   free_queue_entry (qe);
767   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
768       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
769   {
770     GNUNET_break (0);
771     h->retry_time = GNUNET_TIME_UNIT_ZERO;
772     do_disconnect (h);
773     if (rc.cont != NULL)
774       rc.cont (rc.cont_cls, GNUNET_SYSERR,
775                GNUNET_TIME_UNIT_ZERO_ABS,
776                _("Error reading response from datastore service"));
777     return;
778   }
779   sm = (const struct StatusMessage *) msg;
780   status = ntohl (sm->status);
781   emsg = NULL;
782   if (ntohs (msg->size) > sizeof (struct StatusMessage))
783   {
784     emsg = (const char *) &sm[1];
785     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
786     {
787       GNUNET_break (0);
788       emsg = _("Invalid error message received from datastore service");
789     }
790   }
791   if ((status == GNUNET_SYSERR) && (emsg == NULL))
792   {
793     GNUNET_break (0);
794     emsg = _("Invalid error message received from datastore service");
795   }
796   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
797   GNUNET_STATISTICS_update (h->stats,
798                             gettext_noop ("# status messages received"), 1,
799                             GNUNET_NO);
800   h->retry_time = GNUNET_TIME_UNIT_ZERO;
801   process_queue (h);
802   if (rc.cont != NULL)
803     rc.cont (rc.cont_cls, status,
804              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
805              emsg);
806 }
807
808
809 /**
810  * Store an item in the datastore.  If the item is already present,
811  * the priorities are summed up and the higher expiration time and
812  * lower anonymity level is used.
813  *
814  * @param h handle to the datastore
815  * @param rid reservation ID to use (from "reserve"); use 0 if no
816  *            prior reservation was made
817  * @param key key for the value
818  * @param size number of bytes in data
819  * @param data content stored
820  * @param type type of the content
821  * @param priority priority of the content
822  * @param anonymity anonymity-level for the content
823  * @param replication how often should the content be replicated to other peers?
824  * @param expiration expiration time for the content
825  * @param queue_priority ranking of this request in the priority queue
826  * @param max_queue_size at what queue size should this request be dropped
827  *        (if other requests of higher priority are in the queue)
828  * @param timeout timeout for the operation
829  * @param cont continuation to call when done
830  * @param cont_cls closure for cont
831  * @return NULL if the entry was not queued, otherwise a handle that can be used to
832  *         cancel; note that even if NULL is returned, the callback will be invoked
833  *         (or rather, will already have been invoked)
834  */
835 struct GNUNET_DATASTORE_QueueEntry *
836 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
837                       const struct GNUNET_HashCode * key, size_t size,
838                       const void *data, enum GNUNET_BLOCK_Type type,
839                       uint32_t priority, uint32_t anonymity,
840                       uint32_t replication,
841                       struct GNUNET_TIME_Absolute expiration,
842                       unsigned int queue_priority, unsigned int max_queue_size,
843                       struct GNUNET_TIME_Relative timeout,
844                       GNUNET_DATASTORE_ContinuationWithStatus cont,
845                       void *cont_cls)
846 {
847   struct GNUNET_DATASTORE_QueueEntry *qe;
848   struct DataMessage *dm;
849   size_t msize;
850   union QueueContext qc;
851
852   LOG (GNUNET_ERROR_TYPE_DEBUG,
853        "Asked to put %u bytes of data under key `%s' for %s\n", size,
854        GNUNET_h2s (key),
855        GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (expiration),
856                                                GNUNET_YES));
857   msize = sizeof (struct DataMessage) + size;
858   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
859   qc.sc.cont = cont;
860   qc.sc.cont_cls = cont_cls;
861   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
862                          &process_status_message, &qc);
863   if (qe == NULL)
864   {
865     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
866     return NULL;
867   }
868   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
869                             1, GNUNET_NO);
870   dm = (struct DataMessage *) &qe[1];
871   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
872   dm->header.size = htons (msize);
873   dm->rid = htonl (rid);
874   dm->size = htonl ((uint32_t) size);
875   dm->type = htonl (type);
876   dm->priority = htonl (priority);
877   dm->anonymity = htonl (anonymity);
878   dm->replication = htonl (replication);
879   dm->reserved = htonl (0);
880   dm->uid = GNUNET_htonll (0);
881   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
882   dm->key = *key;
883   memcpy (&dm[1], data, size);
884   process_queue (h);
885   return qe;
886 }
887
888
889 /**
890  * Reserve space in the datastore.  This function should be used
891  * to avoid "out of space" failures during a longer sequence of "put"
892  * operations (for example, when a file is being inserted).
893  *
894  * @param h handle to the datastore
895  * @param amount how much space (in bytes) should be reserved (for content only)
896  * @param entries how many entries will be created (to calculate per-entry overhead)
897  * @param queue_priority ranking of this request in the priority queue
898  * @param max_queue_size at what queue size should this request be dropped
899  *        (if other requests of higher priority are in the queue)
900  * @param timeout how long to wait at most for a response (or before dying in queue)
901  * @param cont continuation to call when done; "success" will be set to
902  *             a positive reservation value if space could be reserved.
903  * @param cont_cls closure for cont
904  * @return NULL if the entry was not queued, otherwise a handle that can be used to
905  *         cancel; note that even if NULL is returned, the callback will be invoked
906  *         (or rather, will already have been invoked)
907  */
908 struct GNUNET_DATASTORE_QueueEntry *
909 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
910                           uint32_t entries, unsigned int queue_priority,
911                           unsigned int max_queue_size,
912                           struct GNUNET_TIME_Relative timeout,
913                           GNUNET_DATASTORE_ContinuationWithStatus cont,
914                           void *cont_cls)
915 {
916   struct GNUNET_DATASTORE_QueueEntry *qe;
917   struct ReserveMessage *rm;
918   union QueueContext qc;
919
920   if (cont == NULL)
921     cont = &drop_status_cont;
922   LOG (GNUNET_ERROR_TYPE_DEBUG,
923        "Asked to reserve %llu bytes of data and %u entries\n",
924        (unsigned long long) amount, (unsigned int) entries);
925   qc.sc.cont = cont;
926   qc.sc.cont_cls = cont_cls;
927   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
928                          max_queue_size, timeout, &process_status_message, &qc);
929   if (qe == NULL)
930   {
931     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
932     return NULL;
933   }
934   GNUNET_STATISTICS_update (h->stats,
935                             gettext_noop ("# RESERVE requests executed"), 1,
936                             GNUNET_NO);
937   rm = (struct ReserveMessage *) &qe[1];
938   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
939   rm->header.size = htons (sizeof (struct ReserveMessage));
940   rm->entries = htonl (entries);
941   rm->amount = GNUNET_htonll (amount);
942   process_queue (h);
943   return qe;
944 }
945
946
947 /**
948  * Signal that all of the data for which a reservation was made has
949  * been stored and that whatever excess space might have been reserved
950  * can now be released.
951  *
952  * @param h handle to the datastore
953  * @param rid reservation ID (value of "success" in original continuation
954  *        from the "reserve" function).
955  * @param queue_priority ranking of this request in the priority queue
956  * @param max_queue_size at what queue size should this request be dropped
957  *        (if other requests of higher priority are in the queue)
958  * @param queue_priority ranking of this request in the priority queue
959  * @param max_queue_size at what queue size should this request be dropped
960  *        (if other requests of higher priority are in the queue)
961  * @param timeout how long to wait at most for a response
962  * @param cont continuation to call when done
963  * @param cont_cls closure for cont
964  * @return NULL if the entry was not queued, otherwise a handle that can be used to
965  *         cancel; note that even if NULL is returned, the callback will be invoked
966  *         (or rather, will already have been invoked)
967  */
968 struct GNUNET_DATASTORE_QueueEntry *
969 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
970                                   uint32_t rid, unsigned int queue_priority,
971                                   unsigned int max_queue_size,
972                                   struct GNUNET_TIME_Relative timeout,
973                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
974                                   void *cont_cls)
975 {
976   struct GNUNET_DATASTORE_QueueEntry *qe;
977   struct ReleaseReserveMessage *rrm;
978   union QueueContext qc;
979
980   if (cont == NULL)
981     cont = &drop_status_cont;
982   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
983   qc.sc.cont = cont;
984   qc.sc.cont_cls = cont_cls;
985   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
986                          queue_priority, max_queue_size, timeout,
987                          &process_status_message, &qc);
988   if (qe == NULL)
989   {
990     LOG (GNUNET_ERROR_TYPE_DEBUG,
991          "Could not create queue entry to release reserve\n");
992     return NULL;
993   }
994   GNUNET_STATISTICS_update (h->stats,
995                             gettext_noop
996                             ("# RELEASE RESERVE requests executed"), 1,
997                             GNUNET_NO);
998   rrm = (struct ReleaseReserveMessage *) &qe[1];
999   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1000   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1001   rrm->rid = htonl (rid);
1002   process_queue (h);
1003   return qe;
1004 }
1005
1006
1007 /**
1008  * Update a value in the datastore.
1009  *
1010  * @param h handle to the datastore
1011  * @param uid identifier for the value
1012  * @param priority how much to increase the priority of the value
1013  * @param expiration new expiration value should be MAX of existing and this argument
1014  * @param queue_priority ranking of this request in the priority queue
1015  * @param max_queue_size at what queue size should this request be dropped
1016  *        (if other requests of higher priority are in the queue)
1017  * @param timeout how long to wait at most for a response
1018  * @param cont continuation to call when done
1019  * @param cont_cls closure for cont
1020  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1021  *         cancel; note that even if NULL is returned, the callback will be invoked
1022  *         (or rather, will already have been invoked)
1023  */
1024 struct GNUNET_DATASTORE_QueueEntry *
1025 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1026                          uint32_t priority,
1027                          struct GNUNET_TIME_Absolute expiration,
1028                          unsigned int queue_priority,
1029                          unsigned int max_queue_size,
1030                          struct GNUNET_TIME_Relative timeout,
1031                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1032                          void *cont_cls)
1033 {
1034   struct GNUNET_DATASTORE_QueueEntry *qe;
1035   struct UpdateMessage *um;
1036   union QueueContext qc;
1037
1038   if (cont == NULL)
1039     cont = &drop_status_cont;
1040   LOG (GNUNET_ERROR_TYPE_DEBUG,
1041        "Asked to update entry %llu raising priority by %u and expiration to %s\n",
1042        uid,
1043        (unsigned int) priority,
1044        GNUNET_STRINGS_absolute_time_to_string (expiration));
1045   qc.sc.cont = cont;
1046   qc.sc.cont_cls = cont_cls;
1047   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1048                          max_queue_size, timeout, &process_status_message, &qc);
1049   if (qe == NULL)
1050   {
1051     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1052     return NULL;
1053   }
1054   GNUNET_STATISTICS_update (h->stats,
1055                             gettext_noop ("# UPDATE requests executed"), 1,
1056                             GNUNET_NO);
1057   um = (struct UpdateMessage *) &qe[1];
1058   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1059   um->header.size = htons (sizeof (struct UpdateMessage));
1060   um->priority = htonl (priority);
1061   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1062   um->uid = GNUNET_htonll (uid);
1063   process_queue (h);
1064   return qe;
1065 }
1066
1067
1068 /**
1069  * Explicitly remove some content from the database.
1070  * The "cont"inuation will be called with status
1071  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1072  * if no matching entry was found and "GNUNET_SYSERR"
1073  * on all other types of errors.
1074  *
1075  * @param h handle to the datastore
1076  * @param key key for the value
1077  * @param size number of bytes in data
1078  * @param data content stored
1079  * @param queue_priority ranking of this request in the priority queue
1080  * @param max_queue_size at what queue size should this request be dropped
1081  *        (if other requests of higher priority are in the queue)
1082  * @param timeout how long to wait at most for a response
1083  * @param cont continuation to call when done
1084  * @param cont_cls closure for cont
1085  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1086  *         cancel; note that even if NULL is returned, the callback will be invoked
1087  *         (or rather, will already have been invoked)
1088  */
1089 struct GNUNET_DATASTORE_QueueEntry *
1090 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1091                          const struct GNUNET_HashCode * key, size_t size,
1092                          const void *data, unsigned int queue_priority,
1093                          unsigned int max_queue_size,
1094                          struct GNUNET_TIME_Relative timeout,
1095                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1096                          void *cont_cls)
1097 {
1098   struct GNUNET_DATASTORE_QueueEntry *qe;
1099   struct DataMessage *dm;
1100   size_t msize;
1101   union QueueContext qc;
1102
1103   if (cont == NULL)
1104     cont = &drop_status_cont;
1105   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1106        size, GNUNET_h2s (key));
1107   qc.sc.cont = cont;
1108   qc.sc.cont_cls = cont_cls;
1109   msize = sizeof (struct DataMessage) + size;
1110   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1111   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1112                          &process_status_message, &qc);
1113   if (qe == NULL)
1114   {
1115     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1116     return NULL;
1117   }
1118   GNUNET_STATISTICS_update (h->stats,
1119                             gettext_noop ("# REMOVE requests executed"), 1,
1120                             GNUNET_NO);
1121   dm = (struct DataMessage *) &qe[1];
1122   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1123   dm->header.size = htons (msize);
1124   dm->rid = htonl (0);
1125   dm->size = htonl (size);
1126   dm->type = htonl (0);
1127   dm->priority = htonl (0);
1128   dm->anonymity = htonl (0);
1129   dm->uid = GNUNET_htonll (0);
1130   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1131   dm->key = *key;
1132   memcpy (&dm[1], data, size);
1133   process_queue (h);
1134   return qe;
1135 }
1136
1137
1138 /**
1139  * Type of a function to call when we receive a message
1140  * from the service.
1141  *
1142  * @param cls closure
1143  * @param msg message received, NULL on timeout or fatal error
1144  */
1145 static void
1146 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1147 {
1148   struct GNUNET_DATASTORE_Handle *h = cls;
1149   struct GNUNET_DATASTORE_QueueEntry *qe;
1150   struct ResultContext rc;
1151   const struct DataMessage *dm;
1152   int was_transmitted;
1153
1154   if (NULL == msg)
1155   {
1156     qe = h->queue_head;
1157     GNUNET_assert (NULL != qe);
1158     rc = qe->qc.rc;
1159     was_transmitted = qe->was_transmitted;
1160     free_queue_entry (qe);
1161     if (GNUNET_YES == was_transmitted)
1162     {
1163       LOG (GNUNET_ERROR_TYPE_WARNING,
1164            _("Failed to receive response from database.\n"));
1165       do_disconnect (h);
1166     }
1167     else
1168     {
1169       process_queue (h);
1170     }
1171     if (NULL != rc.proc)
1172       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1173                0);
1174     return;
1175   }
1176   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1177   {
1178     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1179     qe = h->queue_head;
1180     rc = qe->qc.rc;
1181     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1182     free_queue_entry (qe);
1183     LOG (GNUNET_ERROR_TYPE_DEBUG,
1184          "Received end of result set, new queue size is %u\n", h->queue_size);
1185     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1186     h->result_count = 0;
1187     process_queue (h);
1188     if (rc.proc != NULL)
1189       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1190                0);
1191     return;
1192   }
1193   qe = h->queue_head;
1194   GNUNET_assert (NULL != qe);
1195   rc = qe->qc.rc;
1196   if (GNUNET_YES != qe->was_transmitted)
1197   {
1198     GNUNET_break (0);
1199     free_queue_entry (qe);
1200     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1201     do_disconnect (h);
1202     if (rc.proc != NULL)
1203       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1204                0);
1205     return;
1206   }
1207   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1208       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1209       (ntohs (msg->size) !=
1210        sizeof (struct DataMessage) +
1211        ntohl (((const struct DataMessage *) msg)->size)))
1212   {
1213     GNUNET_break (0);
1214     free_queue_entry (qe);
1215     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1216     do_disconnect (h);
1217     if (rc.proc != NULL)
1218       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1219                0);
1220     return;
1221   }
1222 #if INSANE_STATISTICS
1223   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1224                             GNUNET_NO);
1225 #endif
1226   dm = (const struct DataMessage *) msg;
1227   LOG (GNUNET_ERROR_TYPE_DEBUG,
1228        "Received result %llu with type %u and size %u with key %s\n",
1229        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1230        ntohl (dm->size), GNUNET_h2s (&dm->key));
1231   free_queue_entry (qe);
1232   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1233   process_queue (h);
1234   if (rc.proc != NULL)
1235     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1236              ntohl (dm->priority), ntohl (dm->anonymity),
1237              GNUNET_TIME_absolute_ntoh (dm->expiration),
1238              GNUNET_ntohll (dm->uid));
1239 }
1240
1241
1242 /**
1243  * Get a random value from the datastore for content replication.
1244  * Returns a single, random value among those with the highest
1245  * replication score, lowering positive replication scores by one for
1246  * the chosen value (if only content with a replication score exists,
1247  * a random value is returned and replication scores are not changed).
1248  *
1249  * @param h handle to the datastore
1250  * @param queue_priority ranking of this request in the priority queue
1251  * @param max_queue_size at what queue size should this request be dropped
1252  *        (if other requests of higher priority are in the queue)
1253  * @param timeout how long to wait at most for a response
1254  * @param proc function to call on a random value; it
1255  *        will be called once with a value (if available)
1256  *        and always once with a value of NULL.
1257  * @param proc_cls closure for proc
1258  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1259  *         cancel
1260  */
1261 struct GNUNET_DATASTORE_QueueEntry *
1262 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1263                                       unsigned int queue_priority,
1264                                       unsigned int max_queue_size,
1265                                       struct GNUNET_TIME_Relative timeout,
1266                                       GNUNET_DATASTORE_DatumProcessor proc,
1267                                       void *proc_cls)
1268 {
1269   struct GNUNET_DATASTORE_QueueEntry *qe;
1270   struct GNUNET_MessageHeader *m;
1271   union QueueContext qc;
1272
1273   GNUNET_assert (NULL != proc);
1274   LOG (GNUNET_ERROR_TYPE_DEBUG,
1275        "Asked to get replication entry in %s\n",
1276        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1277   qc.rc.proc = proc;
1278   qc.rc.proc_cls = proc_cls;
1279   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1280                          queue_priority, max_queue_size, timeout,
1281                          &process_result_message, &qc);
1282   if (qe == NULL)
1283   {
1284     LOG (GNUNET_ERROR_TYPE_DEBUG,
1285          "Could not create queue entry for GET REPLICATION\n");
1286     return NULL;
1287   }
1288   GNUNET_STATISTICS_update (h->stats,
1289                             gettext_noop
1290                             ("# GET REPLICATION requests executed"), 1,
1291                             GNUNET_NO);
1292   m = (struct GNUNET_MessageHeader *) &qe[1];
1293   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1294   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1295   process_queue (h);
1296   return qe;
1297 }
1298
1299
1300 /**
1301  * Get a single zero-anonymity value from the datastore.
1302  *
1303  * @param h handle to the datastore
1304  * @param offset offset of the result (modulo num-results); set to
1305  *               a random 64-bit value initially; then increment by
1306  *               one each time; detect that all results have been found by uid
1307  *               being again the first uid ever returned.
1308  * @param queue_priority ranking of this request in the priority queue
1309  * @param max_queue_size at what queue size should this request be dropped
1310  *        (if other requests of higher priority are in the queue)
1311  * @param timeout how long to wait at most for a response
1312  * @param type allowed type for the operation (never zero)
1313  * @param proc function to call on a random value; it
1314  *        will be called once with a value (if available)
1315  *        or with NULL if none value exists.
1316  * @param proc_cls closure for proc
1317  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1318  *         cancel
1319  */
1320 struct GNUNET_DATASTORE_QueueEntry *
1321 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1322                                      uint64_t offset,
1323                                      unsigned int queue_priority,
1324                                      unsigned int max_queue_size,
1325                                      struct GNUNET_TIME_Relative timeout,
1326                                      enum GNUNET_BLOCK_Type type,
1327                                      GNUNET_DATASTORE_DatumProcessor proc,
1328                                      void *proc_cls)
1329 {
1330   struct GNUNET_DATASTORE_QueueEntry *qe;
1331   struct GetZeroAnonymityMessage *m;
1332   union QueueContext qc;
1333
1334   GNUNET_assert (NULL != proc);
1335   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1336   LOG (GNUNET_ERROR_TYPE_DEBUG,
1337        "Asked to get %llu-th zero-anonymity entry of type %d in %s\n",
1338        (unsigned long long) offset, type,
1339        GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_YES));
1340   qc.rc.proc = proc;
1341   qc.rc.proc_cls = proc_cls;
1342   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1343                          queue_priority, max_queue_size, timeout,
1344                          &process_result_message, &qc);
1345   if (qe == NULL)
1346   {
1347     LOG (GNUNET_ERROR_TYPE_DEBUG,
1348          "Could not create queue entry for zero-anonymity procation\n");
1349     return NULL;
1350   }
1351   GNUNET_STATISTICS_update (h->stats,
1352                             gettext_noop
1353                             ("# GET ZERO ANONYMITY requests executed"), 1,
1354                             GNUNET_NO);
1355   m = (struct GetZeroAnonymityMessage *) &qe[1];
1356   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1357   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1358   m->type = htonl ((uint32_t) type);
1359   m->offset = GNUNET_htonll (offset);
1360   process_queue (h);
1361   return qe;
1362 }
1363
1364
1365 /**
1366  * Get a result for a particular key from the datastore.  The processor
1367  * will only be called once.
1368  *
1369  * @param h handle to the datastore
1370  * @param offset offset of the result (modulo num-results); set to
1371  *               a random 64-bit value initially; then increment by
1372  *               one each time; detect that all results have been found by uid
1373  *               being again the first uid ever returned.
1374  * @param key maybe NULL (to match all entries)
1375  * @param type desired type, 0 for any
1376  * @param queue_priority ranking of this request in the priority queue
1377  * @param max_queue_size at what queue size should this request be dropped
1378  *        (if other requests of higher priority are in the queue)
1379  * @param timeout how long to wait at most for a response
1380  * @param proc function to call on each matching value;
1381  *        will be called once with a NULL value at the end
1382  * @param proc_cls closure for proc
1383  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1384  *         cancel
1385  */
1386 struct GNUNET_DATASTORE_QueueEntry *
1387 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1388                           const struct GNUNET_HashCode * key,
1389                           enum GNUNET_BLOCK_Type type,
1390                           unsigned int queue_priority,
1391                           unsigned int max_queue_size,
1392                           struct GNUNET_TIME_Relative timeout,
1393                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1394 {
1395   struct GNUNET_DATASTORE_QueueEntry *qe;
1396   struct GetMessage *gm;
1397   union QueueContext qc;
1398
1399   GNUNET_assert (NULL != proc);
1400   LOG (GNUNET_ERROR_TYPE_DEBUG,
1401        "Asked to look for data of type %u under key `%s'\n",
1402        (unsigned int) type, GNUNET_h2s (key));
1403   qc.rc.proc = proc;
1404   qc.rc.proc_cls = proc_cls;
1405   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1406                          max_queue_size, timeout, &process_result_message, &qc);
1407   if (qe == NULL)
1408   {
1409     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1410          GNUNET_h2s (key));
1411     return NULL;
1412   }
1413 #if INSANE_STATISTICS
1414   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1415                             1, GNUNET_NO);
1416 #endif
1417   gm = (struct GetMessage *) &qe[1];
1418   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1419   gm->type = htonl (type);
1420   gm->offset = GNUNET_htonll (offset);
1421   if (key != NULL)
1422   {
1423     gm->header.size = htons (sizeof (struct GetMessage));
1424     gm->key = *key;
1425   }
1426   else
1427   {
1428     gm->header.size =
1429         htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1430   }
1431   process_queue (h);
1432   return qe;
1433 }
1434
1435
1436 /**
1437  * Cancel a datastore operation.  The final callback from the
1438  * operation must not have been done yet.
1439  *
1440  * @param qe operation to cancel
1441  */
1442 void
1443 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1444 {
1445   struct GNUNET_DATASTORE_Handle *h;
1446
1447   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1448   h = qe->h;
1449   LOG (GNUNET_ERROR_TYPE_DEBUG,
1450        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1451        qe->was_transmitted, h->queue_head == qe);
1452   if (GNUNET_YES == qe->was_transmitted)
1453   {
1454     free_queue_entry (qe);
1455     h->skip_next_messages++;
1456     return;
1457   }
1458   free_queue_entry (qe);
1459   process_queue (h);
1460 }
1461
1462
1463 /* end of datastore_api.c */