-LRN: use correct character counting, instead of byte counting
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35
36 /**
37  * If a client stopped asking for more results, how many more do
38  * we receive from the DB before killing the connection?  Trade-off
39  * between re-doing TCP handshakes and (needlessly) receiving
40  * useless results.
41  */
42 #define MAX_EXCESS_RESULTS 8
43
44 /**
45  * Context for processing status messages.
46  */
47 struct StatusContext
48 {
49   /**
50    * Continuation to call with the status.
51    */
52   GNUNET_DATASTORE_ContinuationWithStatus cont;
53
54   /**
55    * Closure for cont.
56    */
57   void *cont_cls;
58
59 };
60
61
62 /**
63  * Context for processing result messages.
64  */
65 struct ResultContext
66 {
67   /**
68    * Function to call with the result.
69    */
70   GNUNET_DATASTORE_DatumProcessor proc;
71
72   /**
73    * Closure for proc.
74    */
75   void *proc_cls;
76
77 };
78
79
80 /**
81  *  Context for a queue operation.
82  */
83 union QueueContext
84 {
85
86   struct StatusContext sc;
87
88   struct ResultContext rc;
89
90 };
91
92
93
94 /**
95  * Entry in our priority queue.
96  */
97 struct GNUNET_DATASTORE_QueueEntry
98 {
99
100   /**
101    * This is a linked list.
102    */
103   struct GNUNET_DATASTORE_QueueEntry *next;
104
105   /**
106    * This is a linked list.
107    */
108   struct GNUNET_DATASTORE_QueueEntry *prev;
109
110   /**
111    * Handle to the master context.
112    */
113   struct GNUNET_DATASTORE_Handle *h;
114
115   /**
116    * Response processor (NULL if we are not waiting for a response).
117    * This struct should be used for the closure, function-specific
118    * arguments can be passed via 'qc'.
119    */
120   GNUNET_CLIENT_MessageHandler response_proc;
121
122   /**
123    * Function to call after transmission of the request.
124    */
125   GNUNET_DATASTORE_ContinuationWithStatus cont;
126
127   /**
128    * Closure for 'cont'.
129    */
130   void *cont_cls;
131
132   /**
133    * Context for the operation.
134    */
135   union QueueContext qc;
136
137   /**
138    * Task for timeout signalling.
139    */
140   GNUNET_SCHEDULER_TaskIdentifier task;
141
142   /**
143    * Timeout for the current operation.
144    */
145   struct GNUNET_TIME_Absolute timeout;
146
147   /**
148    * Priority in the queue.
149    */
150   unsigned int priority;
151
152   /**
153    * Maximum allowed length of queue (otherwise
154    * this request should be discarded).
155    */
156   unsigned int max_queue;
157
158   /**
159    * Number of bytes in the request message following
160    * this struct.  32-bit value for nicer memory
161    * access (and overall struct alignment).
162    */
163   uint32_t message_size;
164
165   /**
166    * Has this message been transmitted to the service?
167    * Only ever GNUNET_YES for the head of the queue.
168    * Note that the overall struct should end at a
169    * multiple of 64 bits.
170    */
171   int was_transmitted;
172
173 };
174
175 /**
176  * Handle to the datastore service.
177  */
178 struct GNUNET_DATASTORE_Handle
179 {
180
181   /**
182    * Our configuration.
183    */
184   const struct GNUNET_CONFIGURATION_Handle *cfg;
185
186   /**
187    * Current connection to the datastore service.
188    */
189   struct GNUNET_CLIENT_Connection *client;
190
191   /**
192    * Handle for statistics.
193    */
194   struct GNUNET_STATISTICS_Handle *stats;
195
196   /**
197    * Current transmit handle.
198    */
199   struct GNUNET_CLIENT_TransmitHandle *th;
200
201   /**
202    * Current head of priority queue.
203    */
204   struct GNUNET_DATASTORE_QueueEntry *queue_head;
205
206   /**
207    * Current tail of priority queue.
208    */
209   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
210
211   /**
212    * Task for trying to reconnect.
213    */
214   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
215
216   /**
217    * How quickly should we retry?  Used for exponential back-off on
218    * connect-errors.
219    */
220   struct GNUNET_TIME_Relative retry_time;
221
222   /**
223    * Number of entries in the queue.
224    */
225   unsigned int queue_size;
226
227   /**
228    * Number of results we're receiving for the current query
229    * after application stopped to care.  Used to determine when
230    * to reset the connection.
231    */
232   unsigned int result_count;
233
234   /**
235    * Are we currently trying to receive from the service?
236    */
237   int in_receive;
238
239   /**
240    * We should ignore the next message(s) from the service.
241    */
242   unsigned int skip_next_messages;
243
244 };
245
246
247
248 /**
249  * Connect to the datastore service.
250  *
251  * @param cfg configuration to use
252  * @return handle to use to access the service
253  */
254 struct GNUNET_DATASTORE_Handle *
255 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
256 {
257   struct GNUNET_CLIENT_Connection *c;
258   struct GNUNET_DATASTORE_Handle *h;
259
260   c = GNUNET_CLIENT_connect ("datastore", cfg);
261   if (c == NULL)
262     return NULL;                /* oops */
263   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
264                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
265   h->client = c;
266   h->cfg = cfg;
267   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
268   return h;
269 }
270
271
272 /**
273  * Transmit DROP message to datastore service.
274  *
275  * @param cls the 'struct GNUNET_DATASTORE_Handle'
276  * @param size number of bytes that can be copied to buf
277  * @param buf where to copy the drop message
278  * @return number of bytes written to buf
279  */
280 static size_t
281 transmit_drop (void *cls, size_t size, void *buf)
282 {
283   struct GNUNET_DATASTORE_Handle *h = cls;
284   struct GNUNET_MessageHeader *hdr;
285
286   if (buf == NULL)
287   {
288     LOG (GNUNET_ERROR_TYPE_WARNING,
289          _("Failed to transmit request to drop database.\n"));
290     GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
291     return 0;
292   }
293   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
294   hdr = buf;
295   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
296   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
297   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
298   return sizeof (struct GNUNET_MessageHeader);
299 }
300
301
302 /**
303  * Disconnect from the datastore service (and free
304  * associated resources).
305  *
306  * @param h handle to the datastore
307  * @param drop set to GNUNET_YES to delete all data in datastore (!)
308  */
309 void
310 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
311 {
312   struct GNUNET_DATASTORE_QueueEntry *qe;
313
314 #if DEBUG_DATASTORE
315   LOG (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
316 #endif
317   if (NULL != h->th)
318   {
319     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
320     h->th = NULL;
321   }
322   if (h->client != NULL)
323   {
324     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
325     h->client = NULL;
326   }
327   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
328   {
329     GNUNET_SCHEDULER_cancel (h->reconnect_task);
330     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
331   }
332   while (NULL != (qe = h->queue_head))
333   {
334     GNUNET_assert (NULL != qe->response_proc);
335     qe->response_proc (h, NULL);
336   }
337   if (GNUNET_YES == drop)
338   {
339     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
340     if (h->client != NULL)
341     {
342       if (NULL !=
343           GNUNET_CLIENT_notify_transmit_ready (h->client,
344                                                sizeof (struct
345                                                        GNUNET_MessageHeader),
346                                                GNUNET_TIME_UNIT_MINUTES,
347                                                GNUNET_YES, &transmit_drop, h))
348         return;
349       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
350       h->client = NULL;
351     }
352     GNUNET_break (0);
353   }
354   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
355   h->stats = NULL;
356   GNUNET_free (h);
357 }
358
359
360 /**
361  * A request has timed out (before being transmitted to the service).
362  *
363  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
364  * @param tc scheduler context
365  */
366 static void
367 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
368 {
369   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
370
371   GNUNET_STATISTICS_update (qe->h->stats,
372                             gettext_noop ("# queue entry timeouts"), 1,
373                             GNUNET_NO);
374   qe->task = GNUNET_SCHEDULER_NO_TASK;
375   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
376 #if DEBUG_DATASTORE
377   LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout of request in datastore queue\n");
378 #endif
379   qe->response_proc (qe->h, NULL);
380 }
381
382
383 /**
384  * Create a new entry for our priority queue (and possibly discard other entires if
385  * the queue is getting too long).
386  *
387  * @param h handle to the datastore
388  * @param msize size of the message to queue
389  * @param queue_priority priority of the entry
390  * @param max_queue_size at what queue size should this request be dropped
391  *        (if other requests of higher priority are in the queue)
392  * @param timeout timeout for the operation
393  * @param response_proc function to call with replies (can be NULL)
394  * @param qc client context (NOT a closure for response_proc)
395  * @return NULL if the queue is full
396  */
397 static struct GNUNET_DATASTORE_QueueEntry *
398 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
399                   unsigned int queue_priority, unsigned int max_queue_size,
400                   struct GNUNET_TIME_Relative timeout,
401                   GNUNET_CLIENT_MessageHandler response_proc,
402                   const union QueueContext *qc)
403 {
404   struct GNUNET_DATASTORE_QueueEntry *ret;
405   struct GNUNET_DATASTORE_QueueEntry *pos;
406   unsigned int c;
407
408   c = 0;
409   pos = h->queue_head;
410   while ((pos != NULL) && (c < max_queue_size) &&
411          (pos->priority >= queue_priority))
412   {
413     c++;
414     pos = pos->next;
415   }
416   if (c >= max_queue_size)
417   {
418     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
419                               GNUNET_NO);
420     return NULL;
421   }
422   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
423   ret->h = h;
424   ret->response_proc = response_proc;
425   ret->qc = *qc;
426   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
427   ret->priority = queue_priority;
428   ret->max_queue = max_queue_size;
429   ret->message_size = msize;
430   ret->was_transmitted = GNUNET_NO;
431   if (pos == NULL)
432   {
433     /* append at the tail */
434     pos = h->queue_tail;
435   }
436   else
437   {
438     pos = pos->prev;
439     /* do not insert at HEAD if HEAD query was already
440      * transmitted and we are still receiving replies! */
441     if ((pos == NULL) && (h->queue_head->was_transmitted))
442       pos = h->queue_head;
443   }
444   c++;
445   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
446                             1, GNUNET_NO);
447   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
448   h->queue_size++;
449   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
450   pos = ret->next;
451   while (pos != NULL)
452   {
453     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
454     {
455       GNUNET_assert (pos->response_proc != NULL);
456       /* move 'pos' element to head so that it will be
457        * killed on 'NULL' call below */
458 #if DEBUG_DATASTORE
459       LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping request from datastore queue\n");
460 #endif
461       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
462       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
463       GNUNET_STATISTICS_update (h->stats,
464                                 gettext_noop
465                                 ("# Requests dropped from datastore queue"), 1,
466                                 GNUNET_NO);
467       GNUNET_assert (h->queue_head == pos);
468       pos->response_proc (h, NULL);
469       break;
470     }
471     pos = pos->next;
472   }
473   return ret;
474 }
475
476
477 /**
478  * Process entries in the queue (or do nothing if we are already
479  * doing so).
480  *
481  * @param h handle to the datastore
482  */
483 static void
484 process_queue (struct GNUNET_DATASTORE_Handle *h);
485
486
487 /**
488  * Try reconnecting to the datastore service.
489  *
490  * @param cls the 'struct GNUNET_DATASTORE_Handle'
491  * @param tc scheduler context
492  */
493 static void
494 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
495 {
496   struct GNUNET_DATASTORE_Handle *h = cls;
497
498   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
499     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
500   else
501     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
502   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
503     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
504   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
505   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
506   if (h->client == NULL)
507   {
508     LOG (GNUNET_ERROR_TYPE_ERROR, "DATASTORE reconnect failed (fatally)\n");
509     return;
510   }
511   GNUNET_STATISTICS_update (h->stats,
512                             gettext_noop
513                             ("# datastore connections (re)created"), 1,
514                             GNUNET_NO);
515 #if DEBUG_DATASTORE
516   LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
517 #endif
518   process_queue (h);
519 }
520
521
522 /**
523  * Disconnect from the service and then try reconnecting to the datastore service
524  * after some delay.
525  *
526  * @param h handle to datastore to disconnect and reconnect
527  */
528 static void
529 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
530 {
531   if (h->client == NULL)
532   {
533 #if DEBUG_DATASTORE
534     LOG (GNUNET_ERROR_TYPE_DEBUG,
535          "client NULL in disconnect, will not try to reconnect\n");
536 #endif
537     return;
538   }
539 #if 0
540   GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
541                             1, GNUNET_NO);
542 #endif
543   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
544   h->skip_next_messages = 0;
545   h->client = NULL;
546   h->reconnect_task =
547       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
548 }
549
550
551 /**
552  * Function called whenever we receive a message from
553  * the service.  Calls the appropriate handler.
554  *
555  * @param cls the 'struct GNUNET_DATASTORE_Handle'
556  * @param msg the received message
557  */
558 static void
559 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
560 {
561   struct GNUNET_DATASTORE_Handle *h = cls;
562   struct GNUNET_DATASTORE_QueueEntry *qe;
563
564   h->in_receive = GNUNET_NO;
565 #if DEBUG_DATASTORE
566   LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
567 #endif
568   if (h->skip_next_messages > 0)
569   {
570     h->skip_next_messages--;
571     process_queue (h);
572     return;
573   }
574   if (NULL == (qe = h->queue_head))
575   {
576     GNUNET_break (0);
577     process_queue (h);
578     return;
579   }
580   qe->response_proc (h, msg);
581 }
582
583
584 /**
585  * Transmit request from queue to datastore service.
586  *
587  * @param cls the 'struct GNUNET_DATASTORE_Handle'
588  * @param size number of bytes that can be copied to buf
589  * @param buf where to copy the drop message
590  * @return number of bytes written to buf
591  */
592 static size_t
593 transmit_request (void *cls, size_t size, void *buf)
594 {
595   struct GNUNET_DATASTORE_Handle *h = cls;
596   struct GNUNET_DATASTORE_QueueEntry *qe;
597   size_t msize;
598
599   h->th = NULL;
600   if (NULL == (qe = h->queue_head))
601     return 0;                   /* no entry in queue */
602   if (buf == NULL)
603   {
604 #if DEBUG_DATASTORE
605     LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to DATASTORE.\n");
606 #endif
607     GNUNET_STATISTICS_update (h->stats,
608                               gettext_noop ("# transmission request failures"),
609                               1, GNUNET_NO);
610     do_disconnect (h);
611     return 0;
612   }
613   if (size < (msize = qe->message_size))
614   {
615     process_queue (h);
616     return 0;
617   }
618 #if DEBUG_DATASTORE
619   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to DATASTORE\n",
620        msize);
621 #endif
622   memcpy (buf, &qe[1], msize);
623   qe->was_transmitted = GNUNET_YES;
624   GNUNET_SCHEDULER_cancel (qe->task);
625   qe->task = GNUNET_SCHEDULER_NO_TASK;
626   GNUNET_assert (GNUNET_NO == h->in_receive);
627   h->in_receive = GNUNET_YES;
628   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
629                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
630   GNUNET_STATISTICS_update (h->stats,
631                             gettext_noop ("# bytes sent to datastore"), 1,
632                             GNUNET_NO);
633   return msize;
634 }
635
636
637 /**
638  * Process entries in the queue (or do nothing if we are already
639  * doing so).
640  *
641  * @param h handle to the datastore
642  */
643 static void
644 process_queue (struct GNUNET_DATASTORE_Handle *h)
645 {
646   struct GNUNET_DATASTORE_QueueEntry *qe;
647
648   if (NULL == (qe = h->queue_head))
649   {
650 #if DEBUG_DATASTORE > 1
651     LOG (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
652 #endif
653     return;                     /* no entry in queue */
654   }
655   if (qe->was_transmitted == GNUNET_YES)
656   {
657 #if DEBUG_DATASTORE > 1
658     LOG (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
659 #endif
660     return;                     /* waiting for replies */
661   }
662   if (h->th != NULL)
663   {
664 #if DEBUG_DATASTORE > 1
665     LOG (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
666 #endif
667     return;                     /* request pending */
668   }
669   if (h->client == NULL)
670   {
671 #if DEBUG_DATASTORE > 1
672     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
673 #endif
674     return;                     /* waiting for reconnect */
675   }
676   if (GNUNET_YES == h->in_receive)
677   {
678     /* wait for response to previous query */
679     return;
680   }
681 #if DEBUG_DATASTORE
682   LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing %u byte request to DATASTORE\n",
683        qe->message_size);
684 #endif
685   h->th =
686       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
687                                            GNUNET_TIME_absolute_get_remaining
688                                            (qe->timeout), GNUNET_YES,
689                                            &transmit_request, h);
690   GNUNET_assert (GNUNET_NO == h->in_receive);
691   GNUNET_break (NULL != h->th);
692 }
693
694
695 /**
696  * Dummy continuation used to do nothing (but be non-zero).
697  *
698  * @param cls closure
699  * @param result result
700  * @param min_expiration expiration time
701  * @param emsg error message
702  */
703 static void
704 drop_status_cont (void *cls, int32_t result, 
705                   struct GNUNET_TIME_Absolute min_expiration,
706                   const char *emsg)
707 {
708   /* do nothing */
709 }
710
711
712 /**
713  * Free a queue entry.  Removes the given entry from the
714  * queue and releases associated resources.  Does NOT
715  * call the callback.
716  *
717  * @param qe entry to free.
718  */
719 static void
720 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
721 {
722   struct GNUNET_DATASTORE_Handle *h = qe->h;
723
724   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
725   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
726   {
727     GNUNET_SCHEDULER_cancel (qe->task);
728     qe->task = GNUNET_SCHEDULER_NO_TASK;
729   }
730   h->queue_size--;
731   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
732   GNUNET_free (qe);
733 }
734
735
736 /**
737  * Type of a function to call when we receive a message
738  * from the service.
739  *
740  * @param cls closure
741  * @param msg message received, NULL on timeout or fatal error
742  */
743 static void
744 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
745 {
746   struct GNUNET_DATASTORE_Handle *h = cls;
747   struct GNUNET_DATASTORE_QueueEntry *qe;
748   struct StatusContext rc;
749   const struct StatusMessage *sm;
750   const char *emsg;
751   int32_t status;
752   int was_transmitted;
753
754   if (NULL == (qe = h->queue_head))
755   {
756     GNUNET_break (0);
757     do_disconnect (h);
758     return;
759   }
760   rc = qe->qc.sc;
761   if (msg == NULL)
762   {
763     was_transmitted = qe->was_transmitted;
764     free_queue_entry (qe);
765     if (was_transmitted == GNUNET_YES)
766       do_disconnect (h);
767     else
768       process_queue (h);
769     if (rc.cont != NULL)
770       rc.cont (rc.cont_cls, GNUNET_SYSERR,
771                GNUNET_TIME_UNIT_ZERO_ABS,
772                _("Failed to receive status response from database."));
773     return;
774   }
775   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
776   free_queue_entry (qe);
777   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
778       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
779   {
780     GNUNET_break (0);
781     h->retry_time = GNUNET_TIME_UNIT_ZERO;
782     do_disconnect (h);
783     if (rc.cont != NULL)
784       rc.cont (rc.cont_cls, GNUNET_SYSERR,
785                GNUNET_TIME_UNIT_ZERO_ABS,
786                _("Error reading response from datastore service"));
787     return;
788   }
789   sm = (const struct StatusMessage *) msg;
790   status = ntohl (sm->status);
791   emsg = NULL;
792   if (ntohs (msg->size) > sizeof (struct StatusMessage))
793   {
794     emsg = (const char *) &sm[1];
795     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
796     {
797       GNUNET_break (0);
798       emsg = _("Invalid error message received from datastore service");
799     }
800   }
801   if ((status == GNUNET_SYSERR) && (emsg == NULL))
802   {
803     GNUNET_break (0);
804     emsg = _("Invalid error message received from datastore service");
805   }
806 #if DEBUG_DATASTORE
807   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status, emsg);
808 #endif
809   GNUNET_STATISTICS_update (h->stats,
810                             gettext_noop ("# status messages received"), 1,
811                             GNUNET_NO);
812   h->retry_time.rel_value = 0;
813   process_queue (h);
814   if (rc.cont != NULL)
815     rc.cont (rc.cont_cls, status, 
816              GNUNET_TIME_absolute_ntoh (sm->min_expiration),
817              emsg);
818 }
819
820
821 /**
822  * Store an item in the datastore.  If the item is already present,
823  * the priorities are summed up and the higher expiration time and
824  * lower anonymity level is used.
825  *
826  * @param h handle to the datastore
827  * @param rid reservation ID to use (from "reserve"); use 0 if no
828  *            prior reservation was made
829  * @param key key for the value
830  * @param size number of bytes in data
831  * @param data content stored
832  * @param type type of the content
833  * @param priority priority of the content
834  * @param anonymity anonymity-level for the content
835  * @param replication how often should the content be replicated to other peers?
836  * @param expiration expiration time for the content
837  * @param queue_priority ranking of this request in the priority queue
838  * @param max_queue_size at what queue size should this request be dropped
839  *        (if other requests of higher priority are in the queue)
840  * @param timeout timeout for the operation
841  * @param cont continuation to call when done
842  * @param cont_cls closure for cont
843  * @return NULL if the entry was not queued, otherwise a handle that can be used to
844  *         cancel; note that even if NULL is returned, the callback will be invoked
845  *         (or rather, will already have been invoked)
846  */
847 struct GNUNET_DATASTORE_QueueEntry *
848 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
849                       const GNUNET_HashCode * key, size_t size,
850                       const void *data, enum GNUNET_BLOCK_Type type,
851                       uint32_t priority, uint32_t anonymity,
852                       uint32_t replication,
853                       struct GNUNET_TIME_Absolute expiration,
854                       unsigned int queue_priority, unsigned int max_queue_size,
855                       struct GNUNET_TIME_Relative timeout,
856                       GNUNET_DATASTORE_ContinuationWithStatus cont,
857                       void *cont_cls)
858 {
859   struct GNUNET_DATASTORE_QueueEntry *qe;
860   struct DataMessage *dm;
861   size_t msize;
862   union QueueContext qc;
863
864 #if DEBUG_DATASTORE
865   LOG (GNUNET_ERROR_TYPE_DEBUG,
866        "Asked to put %u bytes of data under key `%s' for %llu ms\n", size,
867        GNUNET_h2s (key),
868        GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
869 #endif
870   msize = sizeof (struct DataMessage) + size;
871   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
872   qc.sc.cont = cont;
873   qc.sc.cont_cls = cont_cls;
874   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
875                          &process_status_message, &qc);
876   if (qe == NULL)
877   {
878 #if DEBUG_DATASTORE
879     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for PUT\n");
880 #endif
881     return NULL;
882   }
883   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
884                             1, GNUNET_NO);
885   dm = (struct DataMessage *) &qe[1];
886   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
887   dm->header.size = htons (msize);
888   dm->rid = htonl (rid);
889   dm->size = htonl ((uint32_t) size);
890   dm->type = htonl (type);
891   dm->priority = htonl (priority);
892   dm->anonymity = htonl (anonymity);
893   dm->replication = htonl (replication);
894   dm->reserved = htonl (0);
895   dm->uid = GNUNET_htonll (0);
896   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
897   dm->key = *key;
898   memcpy (&dm[1], data, size);
899   process_queue (h);
900   return qe;
901 }
902
903
904 /**
905  * Reserve space in the datastore.  This function should be used
906  * to avoid "out of space" failures during a longer sequence of "put"
907  * operations (for example, when a file is being inserted).
908  *
909  * @param h handle to the datastore
910  * @param amount how much space (in bytes) should be reserved (for content only)
911  * @param entries how many entries will be created (to calculate per-entry overhead)
912  * @param queue_priority ranking of this request in the priority queue
913  * @param max_queue_size at what queue size should this request be dropped
914  *        (if other requests of higher priority are in the queue)
915  * @param timeout how long to wait at most for a response (or before dying in queue)
916  * @param cont continuation to call when done; "success" will be set to
917  *             a positive reservation value if space could be reserved.
918  * @param cont_cls closure for cont
919  * @return NULL if the entry was not queued, otherwise a handle that can be used to
920  *         cancel; note that even if NULL is returned, the callback will be invoked
921  *         (or rather, will already have been invoked)
922  */
923 struct GNUNET_DATASTORE_QueueEntry *
924 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
925                           uint32_t entries, unsigned int queue_priority,
926                           unsigned int max_queue_size,
927                           struct GNUNET_TIME_Relative timeout,
928                           GNUNET_DATASTORE_ContinuationWithStatus cont,
929                           void *cont_cls)
930 {
931   struct GNUNET_DATASTORE_QueueEntry *qe;
932   struct ReserveMessage *rm;
933   union QueueContext qc;
934
935   if (cont == NULL)
936     cont = &drop_status_cont;
937 #if DEBUG_DATASTORE
938   LOG (GNUNET_ERROR_TYPE_DEBUG,
939        "Asked to reserve %llu bytes of data and %u entries\n",
940        (unsigned long long) amount, (unsigned int) entries);
941 #endif
942   qc.sc.cont = cont;
943   qc.sc.cont_cls = cont_cls;
944   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
945                          max_queue_size, timeout, &process_status_message, &qc);
946   if (qe == NULL)
947   {
948 #if DEBUG_DATASTORE
949     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry to reserve\n");
950 #endif
951     return NULL;
952   }
953   GNUNET_STATISTICS_update (h->stats,
954                             gettext_noop ("# RESERVE requests executed"), 1,
955                             GNUNET_NO);
956   rm = (struct ReserveMessage *) &qe[1];
957   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
958   rm->header.size = htons (sizeof (struct ReserveMessage));
959   rm->entries = htonl (entries);
960   rm->amount = GNUNET_htonll (amount);
961   process_queue (h);
962   return qe;
963 }
964
965
966 /**
967  * Signal that all of the data for which a reservation was made has
968  * been stored and that whatever excess space might have been reserved
969  * can now be released.
970  *
971  * @param h handle to the datastore
972  * @param rid reservation ID (value of "success" in original continuation
973  *        from the "reserve" function).
974  * @param queue_priority ranking of this request in the priority queue
975  * @param max_queue_size at what queue size should this request be dropped
976  *        (if other requests of higher priority are in the queue)
977  * @param queue_priority ranking of this request in the priority queue
978  * @param max_queue_size at what queue size should this request be dropped
979  *        (if other requests of higher priority are in the queue)
980  * @param timeout how long to wait at most for a response
981  * @param cont continuation to call when done
982  * @param cont_cls closure for cont
983  * @return NULL if the entry was not queued, otherwise a handle that can be used to
984  *         cancel; note that even if NULL is returned, the callback will be invoked
985  *         (or rather, will already have been invoked)
986  */
987 struct GNUNET_DATASTORE_QueueEntry *
988 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
989                                   uint32_t rid, unsigned int queue_priority,
990                                   unsigned int max_queue_size,
991                                   struct GNUNET_TIME_Relative timeout,
992                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
993                                   void *cont_cls)
994 {
995   struct GNUNET_DATASTORE_QueueEntry *qe;
996   struct ReleaseReserveMessage *rrm;
997   union QueueContext qc;
998
999   if (cont == NULL)
1000     cont = &drop_status_cont;
1001 #if DEBUG_DATASTORE
1002   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1003 #endif
1004   qc.sc.cont = cont;
1005   qc.sc.cont_cls = cont_cls;
1006   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1007                          queue_priority, max_queue_size, timeout,
1008                          &process_status_message, &qc);
1009   if (qe == NULL)
1010   {
1011 #if DEBUG_DATASTORE
1012     LOG (GNUNET_ERROR_TYPE_DEBUG,
1013          "Could not create queue entry to release reserve\n");
1014 #endif
1015     return NULL;
1016   }
1017   GNUNET_STATISTICS_update (h->stats,
1018                             gettext_noop
1019                             ("# RELEASE RESERVE requests executed"), 1,
1020                             GNUNET_NO);
1021   rrm = (struct ReleaseReserveMessage *) &qe[1];
1022   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1023   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1024   rrm->rid = htonl (rid);
1025   process_queue (h);
1026   return qe;
1027 }
1028
1029
1030 /**
1031  * Update a value in the datastore.
1032  *
1033  * @param h handle to the datastore
1034  * @param uid identifier for the value
1035  * @param priority how much to increase the priority of the value
1036  * @param expiration new expiration value should be MAX of existing and this argument
1037  * @param queue_priority ranking of this request in the priority queue
1038  * @param max_queue_size at what queue size should this request be dropped
1039  *        (if other requests of higher priority are in the queue)
1040  * @param timeout how long to wait at most for a response
1041  * @param cont continuation to call when done
1042  * @param cont_cls closure for cont
1043  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1044  *         cancel; note that even if NULL is returned, the callback will be invoked
1045  *         (or rather, will already have been invoked)
1046  */
1047 struct GNUNET_DATASTORE_QueueEntry *
1048 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1049                          uint32_t priority,
1050                          struct GNUNET_TIME_Absolute expiration,
1051                          unsigned int queue_priority,
1052                          unsigned int max_queue_size,
1053                          struct GNUNET_TIME_Relative timeout,
1054                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1055                          void *cont_cls)
1056 {
1057   struct GNUNET_DATASTORE_QueueEntry *qe;
1058   struct UpdateMessage *um;
1059   union QueueContext qc;
1060
1061   if (cont == NULL)
1062     cont = &drop_status_cont;
1063 #if DEBUG_DATASTORE
1064   LOG (GNUNET_ERROR_TYPE_DEBUG,
1065        "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1066        uid, (unsigned int) priority, (unsigned long long) expiration.abs_value);
1067 #endif
1068   qc.sc.cont = cont;
1069   qc.sc.cont_cls = cont_cls;
1070   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1071                          max_queue_size, timeout, &process_status_message, &qc);
1072   if (qe == NULL)
1073   {
1074 #if DEBUG_DATASTORE
1075     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for UPDATE\n");
1076 #endif
1077     return NULL;
1078   }
1079   GNUNET_STATISTICS_update (h->stats,
1080                             gettext_noop ("# UPDATE requests executed"), 1,
1081                             GNUNET_NO);
1082   um = (struct UpdateMessage *) &qe[1];
1083   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1084   um->header.size = htons (sizeof (struct UpdateMessage));
1085   um->priority = htonl (priority);
1086   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1087   um->uid = GNUNET_htonll (uid);
1088   process_queue (h);
1089   return qe;
1090 }
1091
1092
1093 /**
1094  * Explicitly remove some content from the database.
1095  * The "cont"inuation will be called with status
1096  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1097  * if no matching entry was found and "GNUNET_SYSERR"
1098  * on all other types of errors.
1099  *
1100  * @param h handle to the datastore
1101  * @param key key for the value
1102  * @param size number of bytes in data
1103  * @param data content stored
1104  * @param queue_priority ranking of this request in the priority queue
1105  * @param max_queue_size at what queue size should this request be dropped
1106  *        (if other requests of higher priority are in the queue)
1107  * @param timeout how long to wait at most for a response
1108  * @param cont continuation to call when done
1109  * @param cont_cls closure for cont
1110  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1111  *         cancel; note that even if NULL is returned, the callback will be invoked
1112  *         (or rather, will already have been invoked)
1113  */
1114 struct GNUNET_DATASTORE_QueueEntry *
1115 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1116                          const GNUNET_HashCode * key, size_t size,
1117                          const void *data, unsigned int queue_priority,
1118                          unsigned int max_queue_size,
1119                          struct GNUNET_TIME_Relative timeout,
1120                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1121                          void *cont_cls)
1122 {
1123   struct GNUNET_DATASTORE_QueueEntry *qe;
1124   struct DataMessage *dm;
1125   size_t msize;
1126   union QueueContext qc;
1127
1128   if (cont == NULL)
1129     cont = &drop_status_cont;
1130 #if DEBUG_DATASTORE
1131   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to remove %u bytes under key `%s'\n",
1132        size, GNUNET_h2s (key));
1133 #endif
1134   qc.sc.cont = cont;
1135   qc.sc.cont_cls = cont_cls;
1136   msize = sizeof (struct DataMessage) + size;
1137   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1138   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1139                          &process_status_message, &qc);
1140   if (qe == NULL)
1141   {
1142 #if DEBUG_DATASTORE
1143     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not create queue entry for REMOVE\n");
1144 #endif
1145     return NULL;
1146   }
1147   GNUNET_STATISTICS_update (h->stats,
1148                             gettext_noop ("# REMOVE requests executed"), 1,
1149                             GNUNET_NO);
1150   dm = (struct DataMessage *) &qe[1];
1151   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1152   dm->header.size = htons (msize);
1153   dm->rid = htonl (0);
1154   dm->size = htonl (size);
1155   dm->type = htonl (0);
1156   dm->priority = htonl (0);
1157   dm->anonymity = htonl (0);
1158   dm->uid = GNUNET_htonll (0);
1159   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1160   dm->key = *key;
1161   memcpy (&dm[1], data, size);
1162   process_queue (h);
1163   return qe;
1164 }
1165
1166
1167 /**
1168  * Type of a function to call when we receive a message
1169  * from the service.
1170  *
1171  * @param cls closure
1172  * @param msg message received, NULL on timeout or fatal error
1173  */
1174 static void
1175 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1176 {
1177   struct GNUNET_DATASTORE_Handle *h = cls;
1178   struct GNUNET_DATASTORE_QueueEntry *qe;
1179   struct ResultContext rc;
1180   const struct DataMessage *dm;
1181   int was_transmitted;
1182
1183   if (msg == NULL)
1184   {
1185     qe = h->queue_head;
1186     GNUNET_assert (NULL != qe);
1187     rc = qe->qc.rc;
1188     was_transmitted = qe->was_transmitted;
1189     free_queue_entry (qe);
1190     if (was_transmitted == GNUNET_YES)
1191     {
1192       LOG (GNUNET_ERROR_TYPE_WARNING,
1193            _("Failed to receive response from database.\n"));
1194       do_disconnect (h);
1195     }
1196     else
1197     {
1198       process_queue (h);
1199     }
1200     if (rc.proc != NULL)
1201       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1202                0);
1203     return;
1204   }
1205   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1206   {
1207     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1208     qe = h->queue_head;
1209     rc = qe->qc.rc;
1210     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1211     free_queue_entry (qe);
1212 #if DEBUG_DATASTORE
1213     LOG (GNUNET_ERROR_TYPE_DEBUG,
1214          "Received end of result set, new queue size is %u\n", h->queue_size);
1215 #endif
1216     if (rc.proc != NULL)
1217       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1218                0);
1219     h->retry_time.rel_value = 0;
1220     h->result_count = 0;
1221     process_queue (h);
1222     return;
1223   }
1224   qe = h->queue_head;
1225   GNUNET_assert (NULL != qe);
1226   rc = qe->qc.rc;
1227   if (GNUNET_YES != qe->was_transmitted)
1228   {
1229     GNUNET_break (0);
1230     free_queue_entry (qe);
1231     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1232     do_disconnect (h);
1233     if (rc.proc != NULL)
1234       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1235                0);
1236     return;
1237   }
1238   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1239       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1240       (ntohs (msg->size) !=
1241        sizeof (struct DataMessage) +
1242        ntohl (((const struct DataMessage *) msg)->size)))
1243   {
1244     GNUNET_break (0);
1245     free_queue_entry (qe);
1246     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1247     do_disconnect (h);
1248     if (rc.proc != NULL)
1249       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1250                0);
1251     return;
1252   }
1253   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1254                             GNUNET_NO);
1255   dm = (const struct DataMessage *) msg;
1256 #if DEBUG_DATASTORE
1257   LOG (GNUNET_ERROR_TYPE_DEBUG,
1258        "Received result %llu with type %u and size %u with key %s\n",
1259        (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1260        ntohl (dm->size), GNUNET_h2s (&dm->key));
1261 #endif
1262   free_queue_entry (qe);
1263   h->retry_time.rel_value = 0;
1264   process_queue (h);
1265   if (rc.proc != NULL)
1266     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1267              ntohl (dm->priority), ntohl (dm->anonymity),
1268              GNUNET_TIME_absolute_ntoh (dm->expiration),
1269              GNUNET_ntohll (dm->uid));
1270 }
1271
1272
1273 /**
1274  * Get a random value from the datastore for content replication.
1275  * Returns a single, random value among those with the highest
1276  * replication score, lowering positive replication scores by one for
1277  * the chosen value (if only content with a replication score exists,
1278  * a random value is returned and replication scores are not changed).
1279  *
1280  * @param h handle to the datastore
1281  * @param queue_priority ranking of this request in the priority queue
1282  * @param max_queue_size at what queue size should this request be dropped
1283  *        (if other requests of higher priority are in the queue)
1284  * @param timeout how long to wait at most for a response
1285  * @param proc function to call on a random value; it
1286  *        will be called once with a value (if available)
1287  *        and always once with a value of NULL.
1288  * @param proc_cls closure for proc
1289  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1290  *         cancel
1291  */
1292 struct GNUNET_DATASTORE_QueueEntry *
1293 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1294                                       unsigned int queue_priority,
1295                                       unsigned int max_queue_size,
1296                                       struct GNUNET_TIME_Relative timeout,
1297                                       GNUNET_DATASTORE_DatumProcessor proc,
1298                                       void *proc_cls)
1299 {
1300   struct GNUNET_DATASTORE_QueueEntry *qe;
1301   struct GNUNET_MessageHeader *m;
1302   union QueueContext qc;
1303
1304   GNUNET_assert (NULL != proc);
1305 #if DEBUG_DATASTORE
1306   LOG (GNUNET_ERROR_TYPE_DEBUG, "Asked to get replication entry in %llu ms\n",
1307        (unsigned long long) timeout.rel_value);
1308 #endif
1309   qc.rc.proc = proc;
1310   qc.rc.proc_cls = proc_cls;
1311   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1312                          queue_priority, max_queue_size, timeout,
1313                          &process_result_message, &qc);
1314   if (qe == NULL)
1315   {
1316 #if DEBUG_DATASTORE
1317     LOG (GNUNET_ERROR_TYPE_DEBUG,
1318          "Could not create queue entry for GET REPLICATION\n");
1319 #endif
1320     return NULL;
1321   }
1322   GNUNET_STATISTICS_update (h->stats,
1323                             gettext_noop
1324                             ("# GET REPLICATION requests executed"), 1,
1325                             GNUNET_NO);
1326   m = (struct GNUNET_MessageHeader *) &qe[1];
1327   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1328   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1329   process_queue (h);
1330   return qe;
1331 }
1332
1333
1334 /**
1335  * Get a single zero-anonymity value from the datastore.
1336  *
1337  * @param h handle to the datastore
1338  * @param offset offset of the result (modulo num-results); set to
1339  *               a random 64-bit value initially; then increment by
1340  *               one each time; detect that all results have been found by uid
1341  *               being again the first uid ever returned.
1342  * @param queue_priority ranking of this request in the priority queue
1343  * @param max_queue_size at what queue size should this request be dropped
1344  *        (if other requests of higher priority are in the queue)
1345  * @param timeout how long to wait at most for a response
1346  * @param type allowed type for the operation (never zero)
1347  * @param proc function to call on a random value; it
1348  *        will be called once with a value (if available)
1349  *        or with NULL if none value exists.
1350  * @param proc_cls closure for proc
1351  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1352  *         cancel
1353  */
1354 struct GNUNET_DATASTORE_QueueEntry *
1355 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1356                                      uint64_t offset,
1357                                      unsigned int queue_priority,
1358                                      unsigned int max_queue_size,
1359                                      struct GNUNET_TIME_Relative timeout,
1360                                      enum GNUNET_BLOCK_Type type,
1361                                      GNUNET_DATASTORE_DatumProcessor proc,
1362                                      void *proc_cls)
1363 {
1364   struct GNUNET_DATASTORE_QueueEntry *qe;
1365   struct GetZeroAnonymityMessage *m;
1366   union QueueContext qc;
1367
1368   GNUNET_assert (NULL != proc);
1369   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1370 #if DEBUG_DATASTORE
1371   LOG (GNUNET_ERROR_TYPE_DEBUG,
1372        "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1373        (unsigned long long) offset, type,
1374        (unsigned long long) timeout.rel_value);
1375 #endif
1376   qc.rc.proc = proc;
1377   qc.rc.proc_cls = proc_cls;
1378   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1379                          queue_priority, max_queue_size, timeout,
1380                          &process_result_message, &qc);
1381   if (qe == NULL)
1382   {
1383 #if DEBUG_DATASTORE
1384     LOG (GNUNET_ERROR_TYPE_DEBUG,
1385          "Could not create queue entry for zero-anonymity procation\n");
1386 #endif
1387     return NULL;
1388   }
1389   GNUNET_STATISTICS_update (h->stats,
1390                             gettext_noop
1391                             ("# GET ZERO ANONYMITY requests executed"), 1,
1392                             GNUNET_NO);
1393   m = (struct GetZeroAnonymityMessage *) &qe[1];
1394   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1395   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1396   m->type = htonl ((uint32_t) type);
1397   m->offset = GNUNET_htonll (offset);
1398   process_queue (h);
1399   return qe;
1400 }
1401
1402
1403 /**
1404  * Get a result for a particular key from the datastore.  The processor
1405  * will only be called once.
1406  *
1407  * @param h handle to the datastore
1408  * @param offset offset of the result (modulo num-results); set to
1409  *               a random 64-bit value initially; then increment by
1410  *               one each time; detect that all results have been found by uid
1411  *               being again the first uid ever returned.
1412  * @param key maybe NULL (to match all entries)
1413  * @param type desired type, 0 for any
1414  * @param queue_priority ranking of this request in the priority queue
1415  * @param max_queue_size at what queue size should this request be dropped
1416  *        (if other requests of higher priority are in the queue)
1417  * @param timeout how long to wait at most for a response
1418  * @param proc function to call on each matching value;
1419  *        will be called once with a NULL value at the end
1420  * @param proc_cls closure for proc
1421  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1422  *         cancel
1423  */
1424 struct GNUNET_DATASTORE_QueueEntry *
1425 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1426                           const GNUNET_HashCode * key,
1427                           enum GNUNET_BLOCK_Type type,
1428                           unsigned int queue_priority,
1429                           unsigned int max_queue_size,
1430                           struct GNUNET_TIME_Relative timeout,
1431                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1432 {
1433   struct GNUNET_DATASTORE_QueueEntry *qe;
1434   struct GetMessage *gm;
1435   union QueueContext qc;
1436
1437   GNUNET_assert (NULL != proc);
1438 #if DEBUG_DATASTORE
1439   LOG (GNUNET_ERROR_TYPE_DEBUG,
1440        "Asked to look for data of type %u under key `%s'\n",
1441        (unsigned int) type, GNUNET_h2s (key));
1442 #endif
1443   qc.rc.proc = proc;
1444   qc.rc.proc_cls = proc_cls;
1445   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1446                          max_queue_size, timeout, &process_result_message, &qc);
1447   if (qe == NULL)
1448   {
1449 #if DEBUG_DATASTORE
1450     LOG (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1451          GNUNET_h2s (key));
1452 #endif
1453     return NULL;
1454   }
1455   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1456                             1, GNUNET_NO);
1457   gm = (struct GetMessage *) &qe[1];
1458   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1459   gm->type = htonl (type);
1460   gm->offset = GNUNET_htonll (offset);
1461   if (key != NULL)
1462   {
1463     gm->header.size = htons (sizeof (struct GetMessage));
1464     gm->key = *key;
1465   }
1466   else
1467   {
1468     gm->header.size =
1469         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1470   }
1471   process_queue (h);
1472   return qe;
1473 }
1474
1475
1476 /**
1477  * Cancel a datastore operation.  The final callback from the
1478  * operation must not have been done yet.
1479  *
1480  * @param qe operation to cancel
1481  */
1482 void
1483 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1484 {
1485   struct GNUNET_DATASTORE_Handle *h;
1486
1487   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1488   h = qe->h;
1489 #if DEBUG_DATASTORE
1490   LOG (GNUNET_ERROR_TYPE_DEBUG,
1491        "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1492        qe->was_transmitted, h->queue_head == qe);
1493 #endif
1494   if (GNUNET_YES == qe->was_transmitted)
1495   {
1496     free_queue_entry (qe);
1497     h->skip_next_messages++;
1498     return;
1499   }
1500   free_queue_entry (qe);
1501   process_queue (h);
1502 }
1503
1504
1505 /* end of datastore_api.c */