- simplify parameters
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * Collect an instane number of statistics?  May cause excessive IPC.
38  */
39 #define INSANE_STATISTICS GNUNET_NO
40
41 /**
42  * If a client stopped asking for more results, how many more do
43  * we receive from the DB before killing the connection?  Trade-off
44  * between re-doing TCP handshakes and (needlessly) receiving
45  * useless results.
46  */
47 #define MAX_EXCESS_RESULTS 8
48
49 /**
50  * Context for processing status messages.
51  */
52 struct StatusContext
53 {
54   /**
55    * Continuation to call with the status.
56    */
57   GNUNET_DATASTORE_ContinuationWithStatus cont;
58
59   /**
60    * Closure for cont.
61    */
62   void *cont_cls;
63
64 };
65
66
67 /**
68  * Context for processing result messages.
69  */
70 struct ResultContext
71 {
72   /**
73    * Function to call with the result.
74    */
75   GNUNET_DATASTORE_DatumProcessor proc;
76
77   /**
78    * Closure for proc.
79    */
80   void *proc_cls;
81
82 };
83
84
85 /**
86  *  Context for a queue operation.
87  */
88 union QueueContext
89 {
90
91   struct StatusContext sc;
92
93   struct ResultContext rc;
94
95 };
96
97
98
99 /**
100  * Entry in our priority queue.
101  */
102 struct GNUNET_DATASTORE_QueueEntry
103 {
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *next;
109
110   /**
111    * This is a linked list.
112    */
113   struct GNUNET_DATASTORE_QueueEntry *prev;
114
115   /**
116    * Handle to the master context.
117    */
118   struct GNUNET_DATASTORE_Handle *h;
119
120   /**
121    * Response processor (NULL if we are not waiting for a response).
122    * This struct should be used for the closure, function-specific
123    * arguments can be passed via 'qc'.
124    */
125   GNUNET_CLIENT_MessageHandler response_proc;
126
127   /**
128    * Function to call after transmission of the request.
129    */
130   GNUNET_DATASTORE_ContinuationWithStatus cont;
131
132   /**
133    * Closure for 'cont'.
134    */
135   void *cont_cls;
136
137   /**
138    * Context for the operation.
139    */
140   union QueueContext qc;
141
142   /**
143    * Task for timeout signalling.
144    */
145   GNUNET_SCHEDULER_TaskIdentifier task;
146
147   /**
148    * Timeout for the current operation.
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Priority in the queue.
154    */
155   unsigned int priority;
156
157   /**
158    * Maximum allowed length of queue (otherwise
159    * this request should be discarded).
160    */
161   unsigned int max_queue;
162
163   /**
164    * Number of bytes in the request message following
165    * this struct.  32-bit value for nicer memory
166    * access (and overall struct alignment).
167    */
168   uint32_t message_size;
169
170   /**
171    * Has this message been transmitted to the service?
172    * Only ever GNUNET_YES for the head of the queue.
173    * Note that the overall struct should end at a
174    * multiple of 64 bits.
175    */
176   int was_transmitted;
177
178 };
179
180 /**
181  * Handle to the datastore service.
182  */
183 struct GNUNET_DATASTORE_Handle
184 {
185
186   /**
187    * Our configuration.
188    */
189   const struct GNUNET_CONFIGURATION_Handle *cfg;
190
191   /**
192    * Current connection to the datastore service.
193    */
194   struct GNUNET_CLIENT_Connection *client;
195
196   /**
197    * Handle for statistics.
198    */
199   struct GNUNET_STATISTICS_Handle *stats;
200
201   /**
202    * Current transmit handle.
203    */
204   struct GNUNET_CLIENT_TransmitHandle *th;
205
206   /**
207    * Current head of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_head;
210
211   /**
212    * Current tail of priority queue.
213    */
214   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
215
216   /**
217    * Task for trying to reconnect.
218    */
219   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
220
221   /**
222    * How quickly should we retry?  Used for exponential back-off on
223    * connect-errors.
224    */
225   struct GNUNET_TIME_Relative retry_time;
226
227   /**
228    * Number of entries in the queue.
229    */
230   unsigned int queue_size;
231
232   /**
233    * Number of results we're receiving for the current query
234    * after application stopped to care.  Used to determine when
235    * to reset the connection.
236    */
237   unsigned int result_count;
238
239   /**
240    * Are we currently trying to receive from the service?
241    */
242   int in_receive;
243
244   /**
245    * We should ignore the next message(s) from the service.
246    */
247   unsigned int skip_next_messages;
248
249 };
250
251
252
253 /**
254  * Connect to the datastore service.
255  *
256  * @param cfg configuration to use
257  * @return handle to use to access the service
258  */
259 struct GNUNET_DATASTORE_Handle *
260 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
261 {
262   struct GNUNET_CLIENT_Connection *c;
263   struct GNUNET_DATASTORE_Handle *h;
264
265   c = GNUNET_CLIENT_connect ("datastore", cfg);
266   if (c == NULL)
267     return NULL;                /* oops */
268   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
269                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
270   h->client = c;
271   h->cfg = cfg;
272   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
273   return h;
274 }
275
276
277 /**
278  * Task used by 'transmit_drop' to disconnect the datastore.
279  *
280  * @param cls the datastore handle
281  * @param tc scheduler context
282  */
283 static void
284 disconnect_after_drop (void *cls,
285                        const struct GNUNET_SCHEDULER_TaskContext *tc)
286 {
287   struct GNUNET_DATASTORE_Handle *h = cls;
288
289   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290 }
291
292
293 /**
294  * Transmit DROP message to datastore service.
295  *
296  * @param cls the 'struct GNUNET_DATASTORE_Handle'
297  * @param size number of bytes that can be copied to buf
298  * @param buf where to copy the drop message
299  * @return number of bytes written to buf
300  */
301 static size_t
302 transmit_drop (void *cls, size_t size, void *buf)
303 {
304   struct GNUNET_DATASTORE_Handle *h = cls;
305   struct GNUNET_MessageHeader *hdr;
306
307   if (buf == NULL)
308   {
309     LOG (GNUNET_ERROR_TYPE_WARNING,
310          _("Failed to transmit request to drop database.\n"));
311     GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
312                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
313     return 0;
314   }
315   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
316   hdr = buf;
317   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
318   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
319   GNUNET_SCHEDULER_add_continuation (&disconnect_after_drop, h,
320                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
321   return sizeof (struct GNUNET_MessageHeader);
322 }
323
324
325 /**
326  * Disconnect from the datastore service (and free
327  * associated resources).
328  *
329  * @param h handle to the datastore
330  * @param drop set to GNUNET_YES to delete all data in datastore (!)
331  */
332 void
333 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
334 {
335   struct GNUNET_DATASTORE_QueueEntry *qe;
336
337   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
338   if (NULL != h->th)
339   {
340     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
341     h->th = NULL;
342   }
343   if (h->client != NULL)
344   {
345     GNUNET_CLIENT_disconnect (h->client);
346     h->client = NULL;
347   }
348   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
349   {
350     GNUNET_SCHEDULER_cancel (h->reconnect_task);
351     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
352   }
353   while (NULL != (qe = h->queue_head))
354   {
355     GNUNET_assert (NULL != qe->response_proc);
356     qe->response_proc (h, NULL);
357   }
358   if (GNUNET_YES == drop)
359   {
360     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
361     if (h->client != NULL)
362     {
363       if (NULL !=
364           GNUNET_CLIENT_notify_transmit_ready (h->client,
365                                                sizeof (struct
366                                                        GNUNET_MessageHeader),
367                                                GNUNET_TIME_UNIT_MINUTES,
368                                                GNUNET_YES, &transmit_drop, h))
369         return;
370       GNUNET_CLIENT_disconnect (h->client);
371       h->client = NULL;
372     }
373     GNUNET_break (0);
374   }
375   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
376   h->stats = NULL;
377   GNUNET_free (h);
378 }
379
380
381 /**
382  * A request has timed out (before being transmitted to the service).
383  *
384  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
385  * @param tc scheduler context
386  */
387 static void
388 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
389 {
390   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
391
392   GNUNET_STATISTICS_update (qe->h->stats,
393                             gettext_noop ("# queue entry timeouts"), 1,
394                             GNUNET_NO);
395   qe->task = GNUNET_SCHEDULER_NO_TASK;
396   GNUNET_assert (GNUNET_NO == qe->was_transmitted);
397   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
398   qe->response_proc (qe->h, NULL);
399 }
400
401
402 /**
403  * Create a new entry for our priority queue (and possibly discard other entires if
404  * the queue is getting too long).
405  *
406  * @param h handle to the datastore
407  * @param msize size of the message to queue
408  * @param queue_priority priority of the entry
409  * @param max_queue_size at what queue size should this request be dropped
410  *        (if other requests of higher priority are in the queue)
411  * @param timeout timeout for the operation
412  * @param response_proc function to call with replies (can be NULL)
413  * @param qc client context (NOT a closure for response_proc)
414  * @return NULL if the queue is full
415  */
416 static struct GNUNET_DATASTORE_QueueEntry *
417 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
418                   unsigned int queue_priority, unsigned int max_queue_size,
419                   struct GNUNET_TIME_Relative timeout,
420                   GNUNET_CLIENT_MessageHandler response_proc,
421                   const union QueueContext *qc)
422 {
423   struct GNUNET_DATASTORE_QueueEntry *ret;
424   struct GNUNET_DATASTORE_QueueEntry *pos;
425   unsigned int c;
426
427   c = 0;
428   pos = h->queue_head;
429   while ((pos != NULL) && (c < max_queue_size) &&
430          (pos->priority >= queue_priority))
431   {
432     c++;
433     pos = pos->next;
434   }
435   if (c >= max_queue_size)
436   {
437     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
438                               GNUNET_NO);
439     return NULL;
440   }
441   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
442   ret->h = h;
443   ret->response_proc = response_proc;
444   ret->qc = *qc;
445   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
446   ret->priority = queue_priority;
447   ret->max_queue = max_queue_size;
448   ret->message_size = msize;
449   ret->was_transmitted = GNUNET_NO;
450   if (pos == NULL)
451   {
452     /* append at the tail */
453     pos = h->queue_tail;
454   }
455   else
456   {
457     pos = pos->prev;
458     /* do not insert at HEAD if HEAD query was already
459      * transmitted and we are still receiving replies! */
460     if ((pos == NULL) && (h->queue_head->was_transmitted))
461       pos = h->queue_head;
462   }
463   c++;
464 #if INSANE_STATISTICS
465   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
466                             1, GNUNET_NO);
467 #endif
468   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
469   h->queue_size++;
470   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
471   for (pos = ret->next; NULL != pos; pos = pos->next)
472   {
473     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
474     {
475       GNUNET_assert (pos->response_proc != NULL);
476       /* move 'pos' element to head so that it will be
477        * killed on 'NULL' call below */
478       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");     
479       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
480       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
481       GNUNET_STATISTICS_update (h->stats,
482                                 gettext_noop
483                                 ("# Requests dropped from datastore queue"), 1,
484                                 GNUNET_NO);
485       GNUNET_assert (h->queue_head == pos);
486       pos->response_proc (h, NULL);
487       break;
488     }
489   }
490   return ret;
491 }
492
493
494 /**
495  * Process entries in the queue (or do nothing if we are already
496  * doing so).
497  *
498  * @param h handle to the datastore
499  */
500 static void
501 process_queue (struct GNUNET_DATASTORE_Handle *h);
502
503
504 /**
505  * Try reconnecting to the datastore service.
506  *
507  * @param cls the 'struct GNUNET_DATASTORE_Handle'
508  * @param tc scheduler context
509  */
510 static void
511 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
512 {
513   struct GNUNET_DATASTORE_Handle *h = cls;
514
515   h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
516   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
517   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
518   if (h->client == NULL)
519   {
520     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
521     return;
522   }
523   GNUNET_STATISTICS_update (h->stats,
524                             gettext_noop
525                             ("# datastore connections (re)created"), 1,
526                             GNUNET_NO);
527   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
528   process_queue (h);
529 }
530
531
532 /**
533  * Disconnect from the service and then try reconnecting to the datastore service
534  * after some delay.
535  *
536  * @param h handle to datastore to disconnect and reconnect
537  */
538 static void
539 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
540 {
541   if (h->client == NULL)
542   {
543     LOG (GNUNET_ERROR_TYPE_DEBUG,
544          "client NULL in disconnect, will not try to reconnect\n");
545     return;
546   }
547   GNUNET_CLIENT_disconnect (h->client);
548   h->skip_next_messages = 0;
549   h->client = NULL;
550   h->reconnect_task =
551       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
552 }
553
554
555 /**
556  * Function called whenever we receive a message from
557  * the service.  Calls the appropriate handler.
558  *
559  * @param cls the 'struct GNUNET_DATASTORE_Handle'
560  * @param msg the received message
561  */
562 static void
563 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
564 {
565   struct GNUNET_DATASTORE_Handle *h = cls;
566   struct GNUNET_DATASTORE_QueueEntry *qe;
567
568   h->in_receive = GNUNET_NO;
569   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
570   if (h->skip_next_messages > 0)
571   {
572     h->skip_next_messages--;
573     process_queue (h);
574     return;
575   }
576   if (NULL == (qe = h->queue_head))
577   {
578     GNUNET_break (0);
579     process_queue (h);
580     return;
581   }
582   qe->response_proc (h, msg);
583 }
584
585
586 /**
587  * Transmit request from queue to datastore service.
588  *
589  * @param cls the 'struct GNUNET_DATASTORE_Handle'
590  * @param size number of bytes that can be copied to buf
591  * @param buf where to copy the drop message
592  * @return number of bytes written to buf
593  */
594 static size_t
595 transmit_request (void *cls, size_t size, void *buf)
596 {
597   struct GNUNET_DATASTORE_Handle *h = cls;
598   struct GNUNET_DATASTORE_QueueEntry *qe;
599   size_t msize;
600
601   h->th = NULL;
602   if (NULL == (qe = h->queue_head))
603     return 0;                   /* no entry in queue */
604   if (buf == NULL)
605   {
606     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
607     GNUNET_STATISTICS_update (h->stats,
608                               gettext_noop ("# transmission request failures"),
609                               1, GNUNET_NO);
610     do_disconnect (h);
611     return 0;
612   }
613   if (size < (msize = qe->message_size))
614   {
615     process_queue (h);
616     return 0;
617   }
618   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
619        msize);
620   memcpy (buf, &qe[1], msize);
621   qe->was_transmitted = GNUNET_YES;
622   GNUNET_SCHEDULER_cancel (qe->task);
623   qe->task = GNUNET_SCHEDULER_NO_TASK;
624   GNUNET_assert (GNUNET_NO == h->in_receive);
625   h->in_receive = GNUNET_YES;
626   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
627                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
628 #if INSANE_STATISTICS
629   GNUNET_STATISTICS_update (h->stats,
630                             gettext_noop ("# bytes sent to datastore"), 1,
631                             GNUNET_NO);
632 #endif
633   return msize;
634 }
635
636
637 /**
638  * Process entries in the queue (or do nothing if we are already
639  * doing so).
640  *
641  * @param h handle to the datastore
642  */
643 static void
644 process_queue (struct GNUNET_DATASTORE_Handle *h)
645 {
646   struct GNUNET_DATASTORE_QueueEntry *qe;
647
648   if (NULL == (qe = h->queue_head))
649   {
650     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
651     return;                     /* no entry in queue */
652   }
653   if (qe->was_transmitted == GNUNET_YES)
654   {
655     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
656     return;                     /* waiting for replies */
657   }
658   if (h->th != NULL)
659   {
660     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
661     return;                     /* request pending */
662   }
663   if (h->client == NULL)
664   {
665     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
666     return;                     /* waiting for reconnect */
667   }
668   if (GNUNET_YES == h->in_receive)
669   {
670     /* wait for response to previous query */
671     return;
672   }
673   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
674        qe->message_size);
675   h->th =
676       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
677                                            GNUNET_TIME_absolute_get_remaining
678                                            (qe->timeout), GNUNET_YES,
679                                            &transmit_request, h);
680   GNUNET_assert (GNUNET_NO == h->in_receive);
681   GNUNET_break (NULL != h->th);
682 }
683
684
685 /**
686  * Dummy continuation used to do nothing (but be non-zero).
687  *
688  * @param cls closure
689  * @param result result
690  * @param min_expiration expiration time
691  * @param emsg error message
692  */
693 static void
694 drop_status_cont (void *cls, int32_t result, 
695                   struct GNUNET_TIME_Absolute min_expiration,
696                   const char *emsg)
697 {
698   /* do nothing */
699 }
700
701
702 /**
703  * Free a queue entry.  Removes the given entry from the
704  * queue and releases associated resources.  Does NOT
705  * call the callback.
706  *
707  * @param qe entry to free.
708  */
709 static void
710 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
711 {
712   struct GNUNET_DATASTORE_Handle *h = qe->h;
713
714   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
715   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
716   {
717     GNUNET_SCHEDULER_cancel (qe->task);
718     qe->task = GNUNET_SCHEDULER_NO_TASK;
719   }
720   h->queue_size--;
721   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
722   GNUNET_free (qe);
723 }
724
725
726 /**
727  * Type of a function to call when we receive a message
728  * from the service.
729  *
730  * @param cls closure
731  * @param msg message received, NULL on timeout or fatal error
732  */
733 static void
734 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
735 {
736   struct GNUNET_DATASTORE_Handle *h = cls;
737   struct GNUNET_DATASTORE_QueueEntry *qe;
738   struct StatusContext rc;
739   const struct StatusMessage *sm;
740   const char *emsg;
741   int32_t status;
742   int was_transmitted;
743
744   if (NULL == (qe = h->queue_head))
745   {
746     GNUNET_break (0);
747     do_disconnect (h);
748     return;
749   }
750   rc = qe->qc.sc;
751   if (msg == NULL)
752   {
753     was_transmitted = qe->was_transmitted;
754     free_queue_entry (qe);
755     if (was_transmitted == GNUNET_YES)
756       do_disconnect (h);
757     else
758       process_queue (h);
759     if (rc.cont != NULL)
760       rc.cont (rc.cont_cls, GNUNET_SYSERR,
761                GNUNET_TIME_UNIT_ZERO_ABS,
762                _("Failed to receive status response from database."));
763     return;
764   }
765   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
766   free_queue_entry (qe);
767   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
768       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
769   {
770     GNUNET_break (0);
771     h->retry_time = GNUNET_TIME_UNIT_ZERO;
772     do_disconnect (h);
773     if (rc.cont != NULL)
774       rc.cont (rc.cont_cls, GNUNET_SYSERR,
775                GNUNET_TIME_UNIT_ZERO_ABS,
776                _("Error reading response from datastore service"));
777     return;
778   }
779   sm = (const struct StatusMessage *) msg;
780   status = ntohl (sm->status);
781   emsg = NULL;
782   if (ntohs (msg->size) > sizeof (struct StatusMessage))
783   {
784     emsg = (const char *) &sm[1];
785     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
786     {
787       GNUNET_break (0);
788       emsg = _("Invalid error message received from datastore service");
789     }
790   }
791   if ((status == GNUNET_SYSERR) && (emsg == NULL))
792   {
793     GNUNET_break (0);
794     emsg = _("Invalid error message received from datastore service");
795   }
796   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
797   GNUNET_STATISTICS_update (h->stats,
798                             gettext_noop ("# status messages received"), 1,
799                             GNUNET_NO);
800   h->retry_time = GNUNET_TIME_UNIT_ZERO;
801   process_queue (h);
802   if (rc.cont != NULL)
803     rc.cont (rc.cont_cls, status, 
804              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
805              emsg);
806 }
807
808
809 /**
810  * Store an item in the datastore.  If the item is already present,
811  * the priorities are summed up and the higher expiration time and
812  * lower anonymity level is used.
813  *
814  * @param h handle to the datastore
815  * @param rid reservation ID to use (from "reserve"); use 0 if no
816  *            prior reservation was made
817  * @param key key for the value
818  * @param size number of bytes in data
819  * @param data content stored
820  * @param type type of the content
821  * @param priority priority of the content
822  * @param anonymity anonymity-level for the content
823  * @param replication how often should the content be replicated to other peers?
824  * @param expiration expiration time for the content
825  * @param queue_priority ranking of this request in the priority queue
826  * @param max_queue_size at what queue size should this request be dropped
827  *        (if other requests of higher priority are in the queue)
828  * @param timeout timeout for the operation
829  * @param cont continuation to call when done
830  * @param cont_cls closure for cont
831  * @return NULL if the entry was not queued, otherwise a handle that can be used to
832  *         cancel; note that even if NULL is returned, the callback will be invoked
833  *         (or rather, will already have been invoked)
834  */
835 struct GNUNET_DATASTORE_QueueEntry *
836 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
837                       const struct GNUNET_HashCode * key, size_t size,
838                       const void *data, enum GNUNET_BLOCK_Type type,
839                       uint32_t priority, uint32_t anonymity,
840                       uint32_t replication,
841                       struct GNUNET_TIME_Absolute expiration,
842                       unsigned int queue_priority, unsigned int max_queue_size,
843                       struct GNUNET_TIME_Relative timeout,
844                       GNUNET_DATASTORE_ContinuationWithStatus cont,
845                       void *cont_cls)
846 {
847   struct GNUNET_DATASTORE_QueueEntry *qe;
848   struct DataMessage *dm;
849   size_t msize;
850   union QueueContext qc;
851
852   LOG (GNUNET_ERROR_TYPE_DEBUG,
853        "Asked to put %u bytes of data under key `%s' for %llu ms\n", size,
854        GNUNET_h2s (key),
855        GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
856   msize = sizeof (struct DataMessage) + size;
857   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
858   qc.sc.cont = cont;
859   qc.sc.cont_cls = cont_cls;
860   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
861                          &process_status_message, &qc);
862   if (qe == NULL)
863   {
864     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
865     return NULL;
866   }
867   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
868                             1, GNUNET_NO);
869   dm = (struct DataMessage *) &qe[1];
870   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
871   dm->header.size = htons (msize);
872   dm->rid = htonl (rid);
873   dm->size = htonl ((uint32_t) size);
874   dm->type = htonl (type);
875   dm->priority = htonl (priority);
876   dm->anonymity = htonl (anonymity);
877   dm->replication = htonl (replication);
878   dm->reserved = htonl (0);
879   dm->uid = GNUNET_htonll (0);
880   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
881   dm->key = *key;
882   memcpy (&dm[1], data, size);
883   process_queue (h);
884   return qe;
885 }
886
887
888 /**
889  * Reserve space in the datastore.  This function should be used
890  * to avoid "out of space" failures during a longer sequence of "put"
891  * operations (for example, when a file is being inserted).
892  *
893  * @param h handle to the datastore
894  * @param amount how much space (in bytes) should be reserved (for content only)
895  * @param entries how many entries will be created (to calculate per-entry overhead)
896  * @param queue_priority ranking of this request in the priority queue
897  * @param max_queue_size at what queue size should this request be dropped
898  *        (if other requests of higher priority are in the queue)
899  * @param timeout how long to wait at most for a response (or before dying in queue)
900  * @param cont continuation to call when done; "success" will be set to
901  *             a positive reservation value if space could be reserved.
902  * @param cont_cls closure for cont
903  * @return NULL if the entry was not queued, otherwise a handle that can be used to
904  *         cancel; note that even if NULL is returned, the callback will be invoked
905  *         (or rather, will already have been invoked)
906  */
907 struct GNUNET_DATASTORE_QueueEntry *
908 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
909                           uint32_t entries, unsigned int queue_priority,
910                           unsigned int max_queue_size,
911                           struct GNUNET_TIME_Relative timeout,
912                           GNUNET_DATASTORE_ContinuationWithStatus cont,
913                           void *cont_cls)
914 {
915   struct GNUNET_DATASTORE_QueueEntry *qe;
916   struct ReserveMessage *rm;
917   union QueueContext qc;
918
919   if (cont == NULL)
920     cont = &drop_status_cont;
921   LOG (GNUNET_ERROR_TYPE_DEBUG,
922        "Asked to reserve %llu bytes of data and %u entries\n",
923        (unsigned long long) amount, (unsigned int) entries);
924   qc.sc.cont = cont;
925   qc.sc.cont_cls = cont_cls;
926   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
927                          max_queue_size, timeout, &process_status_message, &qc);
928   if (qe == NULL)
929   {
930     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
931     return NULL;
932   }
933   GNUNET_STATISTICS_update (h->stats,
934                             gettext_noop ("# RESERVE requests executed"), 1,
935                             GNUNET_NO);
936   rm = (struct ReserveMessage *) &qe[1];
937   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
938   rm->header.size = htons (sizeof (struct ReserveMessage));
939   rm->entries = htonl (entries);
940   rm->amount = GNUNET_htonll (amount);
941   process_queue (h);
942   return qe;
943 }
944
945
946 /**
947  * Signal that all of the data for which a reservation was made has
948  * been stored and that whatever excess space might have been reserved
949  * can now be released.
950  *
951  * @param h handle to the datastore
952  * @param rid reservation ID (value of "success" in original continuation
953  *        from the "reserve" function).
954  * @param queue_priority ranking of this request in the priority queue
955  * @param max_queue_size at what queue size should this request be dropped
956  *        (if other requests of higher priority are in the queue)
957  * @param queue_priority ranking of this request in the priority queue
958  * @param max_queue_size at what queue size should this request be dropped
959  *        (if other requests of higher priority are in the queue)
960  * @param timeout how long to wait at most for a response
961  * @param cont continuation to call when done
962  * @param cont_cls closure for cont
963  * @return NULL if the entry was not queued, otherwise a handle that can be used to
964  *         cancel; note that even if NULL is returned, the callback will be invoked
965  *         (or rather, will already have been invoked)
966  */
967 struct GNUNET_DATASTORE_QueueEntry *
968 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
969                                   uint32_t rid, unsigned int queue_priority,
970                                   unsigned int max_queue_size,
971                                   struct GNUNET_TIME_Relative timeout,
972                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
973                                   void *cont_cls)
974 {
975   struct GNUNET_DATASTORE_QueueEntry *qe;
976   struct ReleaseReserveMessage *rrm;
977   union QueueContext qc;
978
979   if (cont == NULL)
980     cont = &drop_status_cont;
981   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
982   qc.sc.cont = cont;
983   qc.sc.cont_cls = cont_cls;
984   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
985                          queue_priority, max_queue_size, timeout,
986                          &process_status_message, &qc);
987   if (qe == NULL)
988   {
989     LOG (GNUNET_ERROR_TYPE_DEBUG,
990          "Could not create queue entry to release reserve\n");
991     return NULL;
992   }
993   GNUNET_STATISTICS_update (h->stats,
994                             gettext_noop
995                             ("# RELEASE RESERVE requests executed"), 1,
996                             GNUNET_NO);
997   rrm = (struct ReleaseReserveMessage *) &qe[1];
998   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
999   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1000   rrm->rid = htonl (rid);
1001   process_queue (h);
1002   return qe;
1003 }
1004
1005
1006 /**
1007  * Update a value in the datastore.
1008  *
1009  * @param h handle to the datastore
1010  * @param uid identifier for the value
1011  * @param priority how much to increase the priority of the value
1012  * @param expiration new expiration value should be MAX of existing and this argument
1013  * @param queue_priority ranking of this request in the priority queue
1014  * @param max_queue_size at what queue size should this request be dropped
1015  *        (if other requests of higher priority are in the queue)
1016  * @param timeout how long to wait at most for a response
1017  * @param cont continuation to call when done
1018  * @param cont_cls closure for cont
1019  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1020  *         cancel; note that even if NULL is returned, the callback will be invoked
1021  *         (or rather, will already have been invoked)
1022  */
1023 struct GNUNET_DATASTORE_QueueEntry *
1024 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1025                          uint32_t priority,
1026                          struct GNUNET_TIME_Absolute expiration,
1027                          unsigned int queue_priority,
1028                          unsigned int max_queue_size,
1029                          struct GNUNET_TIME_Relative timeout,
1030                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1031                          void *cont_cls)
1032 {
1033   struct GNUNET_DATASTORE_QueueEntry *qe;
1034   struct UpdateMessage *um;
1035   union QueueContext qc;
1036
1037   if (cont == NULL)
1038     cont = &drop_status_cont;
1039   LOG (GNUNET_ERROR_TYPE_DEBUG,
1040        "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1041        uid, (unsigned int) priority, (unsigned long long) expiration.abs_value);
1042   qc.sc.cont = cont;
1043   qc.sc.cont_cls = cont_cls;
1044   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1045                          max_queue_size, timeout, &process_status_message, &qc);
1046   if (qe == NULL)
1047   {
1048     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1049     return NULL;
1050   }
1051   GNUNET_STATISTICS_update (h->stats,
1052                             gettext_noop ("# UPDATE requests executed"), 1,
1053                             GNUNET_NO);
1054   um = (struct UpdateMessage *) &qe[1];
1055   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1056   um->header.size = htons (sizeof (struct UpdateMessage));
1057   um->priority = htonl (priority);
1058   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1059   um->uid = GNUNET_htonll (uid);
1060   process_queue (h);
1061   return qe;
1062 }
1063
1064
1065 /**
1066  * Explicitly remove some content from the database.
1067  * The "cont"inuation will be called with status
1068  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1069  * if no matching entry was found and "GNUNET_SYSERR"
1070  * on all other types of errors.
1071  *
1072  * @param h handle to the datastore
1073  * @param key key for the value
1074  * @param size number of bytes in data
1075  * @param data content stored
1076  * @param queue_priority ranking of this request in the priority queue
1077  * @param max_queue_size at what queue size should this request be dropped
1078  *        (if other requests of higher priority are in the queue)
1079  * @param timeout how long to wait at most for a response
1080  * @param cont continuation to call when done
1081  * @param cont_cls closure for cont
1082  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1083  *         cancel; note that even if NULL is returned, the callback will be invoked
1084  *         (or rather, will already have been invoked)
1085  */
1086 struct GNUNET_DATASTORE_QueueEntry *
1087 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1088                          const struct GNUNET_HashCode * key, size_t size,
1089                          const void *data, unsigned int queue_priority,
1090                          unsigned int max_queue_size,
1091                          struct GNUNET_TIME_Relative timeout,
1092                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1093                          void *cont_cls)
1094 {
1095   struct GNUNET_DATASTORE_QueueEntry *qe;
1096   struct DataMessage *dm;
1097   size_t msize;
1098   union QueueContext qc;
1099
1100   if (cont == NULL)
1101     cont = &drop_status_cont;
1102   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1103        size, GNUNET_h2s (key));
1104   qc.sc.cont = cont;
1105   qc.sc.cont_cls = cont_cls;
1106   msize = sizeof (struct DataMessage) + size;
1107   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1108   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1109                          &process_status_message, &qc);
1110   if (qe == NULL)
1111   {
1112     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1113     return NULL;
1114   }
1115   GNUNET_STATISTICS_update (h->stats,
1116                             gettext_noop ("# REMOVE requests executed"), 1,
1117                             GNUNET_NO);
1118   dm = (struct DataMessage *) &qe[1];
1119   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1120   dm->header.size = htons (msize);
1121   dm->rid = htonl (0);
1122   dm->size = htonl (size);
1123   dm->type = htonl (0);
1124   dm->priority = htonl (0);
1125   dm->anonymity = htonl (0);
1126   dm->uid = GNUNET_htonll (0);
1127   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1128   dm->key = *key;
1129   memcpy (&dm[1], data, size);
1130   process_queue (h);
1131   return qe;
1132 }
1133
1134
1135 /**
1136  * Type of a function to call when we receive a message
1137  * from the service.
1138  *
1139  * @param cls closure
1140  * @param msg message received, NULL on timeout or fatal error
1141  */
1142 static void
1143 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1144 {
1145   struct GNUNET_DATASTORE_Handle *h = cls;
1146   struct GNUNET_DATASTORE_QueueEntry *qe;
1147   struct ResultContext rc;
1148   const struct DataMessage *dm;
1149   int was_transmitted;
1150
1151   if (NULL == msg)
1152   {
1153     qe = h->queue_head;
1154     GNUNET_assert (NULL != qe);
1155     rc = qe->qc.rc;
1156     was_transmitted = qe->was_transmitted;
1157     free_queue_entry (qe);
1158     if (GNUNET_YES == was_transmitted)
1159     {
1160       LOG (GNUNET_ERROR_TYPE_WARNING,
1161            _("Failed to receive response from database.\n"));
1162       do_disconnect (h);
1163     }
1164     else
1165     {
1166       process_queue (h);
1167     }
1168     if (NULL != rc.proc)
1169       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1170                0);
1171     return;
1172   }
1173   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1174   {
1175     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1176     qe = h->queue_head;
1177     rc = qe->qc.rc;
1178     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1179     free_queue_entry (qe);
1180     LOG (GNUNET_ERROR_TYPE_DEBUG,
1181          "Received end of result set, new queue size is %u\n", h->queue_size);
1182     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1183     h->result_count = 0;
1184     process_queue (h);
1185     if (rc.proc != NULL)
1186       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1187                0);
1188     return;
1189   }
1190   qe = h->queue_head;
1191   GNUNET_assert (NULL != qe);
1192   rc = qe->qc.rc;
1193   if (GNUNET_YES != qe->was_transmitted)
1194   {
1195     GNUNET_break (0);
1196     free_queue_entry (qe);
1197     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1198     do_disconnect (h);
1199     if (rc.proc != NULL)
1200       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1201                0);
1202     return;
1203   }
1204   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1205       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1206       (ntohs (msg->size) !=
1207        sizeof (struct DataMessage) +
1208        ntohl (((const struct DataMessage *) msg)->size)))
1209   {
1210     GNUNET_break (0);
1211     free_queue_entry (qe);
1212     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1213     do_disconnect (h);
1214     if (rc.proc != NULL)
1215       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1216                0);
1217     return;
1218   }
1219 #if INSANE_STATISTICS
1220   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1221                             GNUNET_NO);
1222 #endif
1223   dm = (const struct DataMessage *) msg;
1224   LOG (GNUNET_ERROR_TYPE_DEBUG,
1225        "Received result %llu with type %u and size %u with key %s\n",
1226        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1227        ntohl (dm->size), GNUNET_h2s (&dm->key));
1228   free_queue_entry (qe);
1229   h->retry_time = GNUNET_TIME_UNIT_ZERO;
1230   process_queue (h);
1231   if (rc.proc != NULL)
1232     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1233              ntohl (dm->priority), ntohl (dm->anonymity),
1234              GNUNET_TIME_absolute_ntoh (dm->expiration),
1235              GNUNET_ntohll (dm->uid));
1236 }
1237
1238
1239 /**
1240  * Get a random value from the datastore for content replication.
1241  * Returns a single, random value among those with the highest
1242  * replication score, lowering positive replication scores by one for
1243  * the chosen value (if only content with a replication score exists,
1244  * a random value is returned and replication scores are not changed).
1245  *
1246  * @param h handle to the datastore
1247  * @param queue_priority ranking of this request in the priority queue
1248  * @param max_queue_size at what queue size should this request be dropped
1249  *        (if other requests of higher priority are in the queue)
1250  * @param timeout how long to wait at most for a response
1251  * @param proc function to call on a random value; it
1252  *        will be called once with a value (if available)
1253  *        and always once with a value of NULL.
1254  * @param proc_cls closure for proc
1255  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1256  *         cancel
1257  */
1258 struct GNUNET_DATASTORE_QueueEntry *
1259 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1260                                       unsigned int queue_priority,
1261                                       unsigned int max_queue_size,
1262                                       struct GNUNET_TIME_Relative timeout,
1263                                       GNUNET_DATASTORE_DatumProcessor proc,
1264                                       void *proc_cls)
1265 {
1266   struct GNUNET_DATASTORE_QueueEntry *qe;
1267   struct GNUNET_MessageHeader *m;
1268   union QueueContext qc;
1269
1270   GNUNET_assert (NULL != proc);
1271   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n",
1272        (unsigned long long) timeout.rel_value);
1273   qc.rc.proc = proc;
1274   qc.rc.proc_cls = proc_cls;
1275   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1276                          queue_priority, max_queue_size, timeout,
1277                          &process_result_message, &qc);
1278   if (qe == NULL)
1279   {
1280     LOG (GNUNET_ERROR_TYPE_DEBUG,
1281          "Could not create queue entry for GET REPLICATION\n");
1282     return NULL;
1283   }
1284   GNUNET_STATISTICS_update (h->stats,
1285                             gettext_noop
1286                             ("# GET REPLICATION requests executed"), 1,
1287                             GNUNET_NO);
1288   m = (struct GNUNET_MessageHeader *) &qe[1];
1289   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1290   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1291   process_queue (h);
1292   return qe;
1293 }
1294
1295
1296 /**
1297  * Get a single zero-anonymity value from the datastore.
1298  *
1299  * @param h handle to the datastore
1300  * @param offset offset of the result (modulo num-results); set to
1301  *               a random 64-bit value initially; then increment by
1302  *               one each time; detect that all results have been found by uid
1303  *               being again the first uid ever returned.
1304  * @param queue_priority ranking of this request in the priority queue
1305  * @param max_queue_size at what queue size should this request be dropped
1306  *        (if other requests of higher priority are in the queue)
1307  * @param timeout how long to wait at most for a response
1308  * @param type allowed type for the operation (never zero)
1309  * @param proc function to call on a random value; it
1310  *        will be called once with a value (if available)
1311  *        or with NULL if none value exists.
1312  * @param proc_cls closure for proc
1313  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1314  *         cancel
1315  */
1316 struct GNUNET_DATASTORE_QueueEntry *
1317 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1318                                      uint64_t offset,
1319                                      unsigned int queue_priority,
1320                                      unsigned int max_queue_size,
1321                                      struct GNUNET_TIME_Relative timeout,
1322                                      enum GNUNET_BLOCK_Type type,
1323                                      GNUNET_DATASTORE_DatumProcessor proc,
1324                                      void *proc_cls)
1325 {
1326   struct GNUNET_DATASTORE_QueueEntry *qe;
1327   struct GetZeroAnonymityMessage *m;
1328   union QueueContext qc;
1329
1330   GNUNET_assert (NULL != proc);
1331   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1332   LOG (GNUNET_ERROR_TYPE_DEBUG,
1333        "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1334        (unsigned long long) offset, type,
1335        (unsigned long long) timeout.rel_value);
1336   qc.rc.proc = proc;
1337   qc.rc.proc_cls = proc_cls;
1338   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1339                          queue_priority, max_queue_size, timeout,
1340                          &process_result_message, &qc);
1341   if (qe == NULL)
1342   {
1343     LOG (GNUNET_ERROR_TYPE_DEBUG,
1344          "Could not create queue entry for zero-anonymity procation\n");
1345     return NULL;
1346   }
1347   GNUNET_STATISTICS_update (h->stats,
1348                             gettext_noop
1349                             ("# GET ZERO ANONYMITY requests executed"), 1,
1350                             GNUNET_NO);
1351   m = (struct GetZeroAnonymityMessage *) &qe[1];
1352   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1353   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1354   m->type = htonl ((uint32_t) type);
1355   m->offset = GNUNET_htonll (offset);
1356   process_queue (h);
1357   return qe;
1358 }
1359
1360
1361 /**
1362  * Get a result for a particular key from the datastore.  The processor
1363  * will only be called once.
1364  *
1365  * @param h handle to the datastore
1366  * @param offset offset of the result (modulo num-results); set to
1367  *               a random 64-bit value initially; then increment by
1368  *               one each time; detect that all results have been found by uid
1369  *               being again the first uid ever returned.
1370  * @param key maybe NULL (to match all entries)
1371  * @param type desired type, 0 for any
1372  * @param queue_priority ranking of this request in the priority queue
1373  * @param max_queue_size at what queue size should this request be dropped
1374  *        (if other requests of higher priority are in the queue)
1375  * @param timeout how long to wait at most for a response
1376  * @param proc function to call on each matching value;
1377  *        will be called once with a NULL value at the end
1378  * @param proc_cls closure for proc
1379  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1380  *         cancel
1381  */
1382 struct GNUNET_DATASTORE_QueueEntry *
1383 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1384                           const struct GNUNET_HashCode * key,
1385                           enum GNUNET_BLOCK_Type type,
1386                           unsigned int queue_priority,
1387                           unsigned int max_queue_size,
1388                           struct GNUNET_TIME_Relative timeout,
1389                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1390 {
1391   struct GNUNET_DATASTORE_QueueEntry *qe;
1392   struct GetMessage *gm;
1393   union QueueContext qc;
1394
1395   GNUNET_assert (NULL != proc);
1396   LOG (GNUNET_ERROR_TYPE_DEBUG,
1397        "Asked to look for data of type %u under key `%s'\n",
1398        (unsigned int) type, GNUNET_h2s (key));
1399   qc.rc.proc = proc;
1400   qc.rc.proc_cls = proc_cls;
1401   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1402                          max_queue_size, timeout, &process_result_message, &qc);
1403   if (qe == NULL)
1404   {
1405     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1406          GNUNET_h2s (key));
1407     return NULL;
1408   }
1409 #if INSANE_STATISTICS
1410   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1411                             1, GNUNET_NO);
1412 #endif
1413   gm = (struct GetMessage *) &qe[1];
1414   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1415   gm->type = htonl (type);
1416   gm->offset = GNUNET_htonll (offset);
1417   if (key != NULL)
1418   {
1419     gm->header.size = htons (sizeof (struct GetMessage));
1420     gm->key = *key;
1421   }
1422   else
1423   {
1424     gm->header.size =
1425         htons (sizeof (struct GetMessage) - sizeof (struct GNUNET_HashCode));
1426   }
1427   process_queue (h);
1428   return qe;
1429 }
1430
1431
1432 /**
1433  * Cancel a datastore operation.  The final callback from the
1434  * operation must not have been done yet.
1435  *
1436  * @param qe operation to cancel
1437  */
1438 void
1439 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1440 {
1441   struct GNUNET_DATASTORE_Handle *h;
1442
1443   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1444   h = qe->h;
1445   LOG (GNUNET_ERROR_TYPE_DEBUG,
1446        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1447        qe->was_transmitted, h->queue_head == qe);
1448   if (GNUNET_YES == qe->was_transmitted)
1449   {
1450     free_queue_entry (qe);
1451     h->skip_next_messages++;
1452     return;
1453   }
1454   free_queue_entry (qe);
1455   process_queue (h);
1456 }
1457
1458
1459 /* end of datastore_api.c */