run indent twice, it alternates between two 'canonical' forms, also run whitespace...
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170
171 };
172
173 /**
174  * Handle to the datastore service.
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
254 {
255   struct GNUNET_CLIENT_Connection *c;
256   struct GNUNET_DATASTORE_Handle *h;
257
258   c = GNUNET_CLIENT_connect ("datastore", cfg);
259   if (c == NULL)
260     return NULL;                /* oops */
261   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
262                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
263   h->client = c;
264   h->cfg = cfg;
265   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
266   return h;
267 }
268
269
270 /**
271  * Transmit DROP message to datastore service.
272  *
273  * @param cls the 'struct GNUNET_DATASTORE_Handle'
274  * @param size number of bytes that can be copied to buf
275  * @param buf where to copy the drop message
276  * @return number of bytes written to buf
277  */
278 static size_t
279 transmit_drop (void *cls, size_t size, void *buf)
280 {
281   struct GNUNET_DATASTORE_Handle *h = cls;
282   struct GNUNET_MessageHeader *hdr;
283
284   if (buf == NULL)
285   {
286     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
287                 _("Failed to transmit request to drop database.\n"));
288     GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
289     return 0;
290   }
291   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
292   hdr = buf;
293   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
294   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
295   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
296   return sizeof (struct GNUNET_MessageHeader);
297 }
298
299
300 /**
301  * Disconnect from the datastore service (and free
302  * associated resources).
303  *
304  * @param h handle to the datastore
305  * @param drop set to GNUNET_YES to delete all data in datastore (!)
306  */
307 void
308 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
309 {
310   struct GNUNET_DATASTORE_QueueEntry *qe;
311
312 #if DEBUG_DATASTORE
313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
314 #endif
315   if (NULL != h->th)
316   {
317     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
318     h->th = NULL;
319   }
320   if (h->client != NULL)
321   {
322     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
323     h->client = NULL;
324   }
325   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
326   {
327     GNUNET_SCHEDULER_cancel (h->reconnect_task);
328     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
329   }
330   while (NULL != (qe = h->queue_head))
331   {
332     GNUNET_assert (NULL != qe->response_proc);
333     qe->response_proc (h, NULL);
334   }
335   if (GNUNET_YES == drop)
336   {
337     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
338     if (h->client != NULL)
339     {
340       if (NULL !=
341           GNUNET_CLIENT_notify_transmit_ready (h->client,
342                                                sizeof (struct
343                                                        GNUNET_MessageHeader),
344                                                GNUNET_TIME_UNIT_MINUTES,
345                                                GNUNET_YES, &transmit_drop, h))
346         return;
347       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
348       h->client = NULL;
349     }
350     GNUNET_break (0);
351   }
352   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
353   h->stats = NULL;
354   GNUNET_free (h);
355 }
356
357
358 /**
359  * A request has timed out (before being transmitted to the service).
360  *
361  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
362  * @param tc scheduler context
363  */
364 static void
365 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
366 {
367   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
368
369   GNUNET_STATISTICS_update (qe->h->stats,
370                             gettext_noop ("# queue entry timeouts"), 1,
371                             GNUNET_NO);
372   qe->task = GNUNET_SCHEDULER_NO_TASK;
373   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374 #if DEBUG_DATASTORE
375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376               "Timeout of request in datastore queue\n");
377 #endif
378   qe->response_proc (qe->h, NULL);
379 }
380
381
382 /**
383  * Create a new entry for our priority queue (and possibly discard other entires if
384  * the queue is getting too long).
385  *
386  * @param h handle to the datastore
387  * @param msize size of the message to queue
388  * @param queue_priority priority of the entry
389  * @param max_queue_size at what queue size should this request be dropped
390  *        (if other requests of higher priority are in the queue)
391  * @param timeout timeout for the operation
392  * @param response_proc function to call with replies (can be NULL)
393  * @param qc client context (NOT a closure for response_proc)
394  * @return NULL if the queue is full
395  */
396 static struct GNUNET_DATASTORE_QueueEntry *
397 make_queue_entry (struct GNUNET_DATASTORE_Handle *h, size_t msize,
398                   unsigned int queue_priority, unsigned int max_queue_size,
399                   struct GNUNET_TIME_Relative timeout,
400                   GNUNET_CLIENT_MessageHandler response_proc,
401                   const union QueueContext *qc)
402 {
403   struct GNUNET_DATASTORE_QueueEntry *ret;
404   struct GNUNET_DATASTORE_QueueEntry *pos;
405   unsigned int c;
406
407   c = 0;
408   pos = h->queue_head;
409   while ((pos != NULL) && (c < max_queue_size) &&
410          (pos->priority >= queue_priority))
411   {
412     c++;
413     pos = pos->next;
414   }
415   if (c >= max_queue_size)
416   {
417     GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue overflows"), 1,
418                               GNUNET_NO);
419     return NULL;
420   }
421   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
422   ret->h = h;
423   ret->response_proc = response_proc;
424   ret->qc = *qc;
425   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
426   ret->priority = queue_priority;
427   ret->max_queue = max_queue_size;
428   ret->message_size = msize;
429   ret->was_transmitted = GNUNET_NO;
430   if (pos == NULL)
431   {
432     /* append at the tail */
433     pos = h->queue_tail;
434   }
435   else
436   {
437     pos = pos->prev;
438     /* do not insert at HEAD if HEAD query was already
439      * transmitted and we are still receiving replies! */
440     if ((pos == NULL) && (h->queue_head->was_transmitted))
441       pos = h->queue_head;
442   }
443   c++;
444   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# queue entries created"),
445                             1, GNUNET_NO);
446   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
447   h->queue_size++;
448   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
449   pos = ret->next;
450   while (pos != NULL)
451   {
452     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
453     {
454       GNUNET_assert (pos->response_proc != NULL);
455       /* move 'pos' element to head so that it will be
456        * killed on 'NULL' call below */
457 #if DEBUG_DATASTORE
458       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
459                   "Dropping request from datastore queue\n");
460 #endif
461       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
462       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
463       GNUNET_STATISTICS_update (h->stats,
464                                 gettext_noop
465                                 ("# Requests dropped from datastore queue"), 1,
466                                 GNUNET_NO);
467       GNUNET_assert (h->queue_head == pos);
468       pos->response_proc (h, NULL);
469       break;
470     }
471     pos = pos->next;
472   }
473   return ret;
474 }
475
476
477 /**
478  * Process entries in the queue (or do nothing if we are already
479  * doing so).
480  *
481  * @param h handle to the datastore
482  */
483 static void
484 process_queue (struct GNUNET_DATASTORE_Handle *h);
485
486
487 /**
488  * Try reconnecting to the datastore service.
489  *
490  * @param cls the 'struct GNUNET_DATASTORE_Handle'
491  * @param tc scheduler context
492  */
493 static void
494 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
495 {
496   struct GNUNET_DATASTORE_Handle *h = cls;
497
498   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
499     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
500   else
501     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
502   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
503     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
504   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
505   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
506   if (h->client == NULL)
507   {
508     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
509                 "DATASTORE reconnect failed (fatally)\n");
510     return;
511   }
512   GNUNET_STATISTICS_update (h->stats,
513                             gettext_noop
514                             ("# datastore connections (re)created"), 1,
515                             GNUNET_NO);
516 #if DEBUG_DATASTORE
517   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
518 #endif
519   process_queue (h);
520 }
521
522
523 /**
524  * Disconnect from the service and then try reconnecting to the datastore service
525  * after some delay.
526  *
527  * @param h handle to datastore to disconnect and reconnect
528  */
529 static void
530 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
531 {
532   if (h->client == NULL)
533   {
534 #if DEBUG_DATASTORE
535     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
536                 "client NULL in disconnect, will not try to reconnect\n");
537 #endif
538     return;
539   }
540 #if 0
541   GNUNET_STATISTICS_update (stats, gettext_noop ("# reconnected to DATASTORE"),
542                             1, GNUNET_NO);
543 #endif
544   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
545   h->skip_next_messages = 0;
546   h->client = NULL;
547   h->reconnect_task =
548       GNUNET_SCHEDULER_add_delayed (h->retry_time, &try_reconnect, h);
549 }
550
551
552 /**
553  * Function called whenever we receive a message from
554  * the service.  Calls the appropriate handler.
555  *
556  * @param cls the 'struct GNUNET_DATASTORE_Handle'
557  * @param msg the received message
558  */
559 static void
560 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
561 {
562   struct GNUNET_DATASTORE_Handle *h = cls;
563   struct GNUNET_DATASTORE_QueueEntry *qe;
564
565   h->in_receive = GNUNET_NO;
566 #if DEBUG_DATASTORE
567   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
568 #endif
569   if (h->skip_next_messages > 0)
570   {
571     h->skip_next_messages--;
572     process_queue (h);
573     return;
574   }
575   if (NULL == (qe = h->queue_head))
576   {
577     GNUNET_break (0);
578     process_queue (h);
579     return;
580   }
581   qe->response_proc (h, msg);
582 }
583
584
585 /**
586  * Transmit request from queue to datastore service.
587  *
588  * @param cls the 'struct GNUNET_DATASTORE_Handle'
589  * @param size number of bytes that can be copied to buf
590  * @param buf where to copy the drop message
591  * @return number of bytes written to buf
592  */
593 static size_t
594 transmit_request (void *cls, size_t size, void *buf)
595 {
596   struct GNUNET_DATASTORE_Handle *h = cls;
597   struct GNUNET_DATASTORE_QueueEntry *qe;
598   size_t msize;
599
600   h->th = NULL;
601   if (NULL == (qe = h->queue_head))
602     return 0;                   /* no entry in queue */
603   if (buf == NULL)
604   {
605     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
606                 _("Failed to transmit request to DATASTORE.\n"));
607     GNUNET_STATISTICS_update (h->stats,
608                               gettext_noop ("# transmission request failures"),
609                               1, GNUNET_NO);
610     do_disconnect (h);
611     return 0;
612   }
613   if (size < (msize = qe->message_size))
614   {
615     process_queue (h);
616     return 0;
617   }
618 #if DEBUG_DATASTORE
619   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
620               "Transmitting %u byte request to DATASTORE\n", msize);
621 #endif
622   memcpy (buf, &qe[1], msize);
623   qe->was_transmitted = GNUNET_YES;
624   GNUNET_SCHEDULER_cancel (qe->task);
625   qe->task = GNUNET_SCHEDULER_NO_TASK;
626   GNUNET_assert (GNUNET_NO == h->in_receive);
627   h->in_receive = GNUNET_YES;
628   GNUNET_CLIENT_receive (h->client, &receive_cb, h,
629                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
630   GNUNET_STATISTICS_update (h->stats,
631                             gettext_noop ("# bytes sent to datastore"), 1,
632                             GNUNET_NO);
633   return msize;
634 }
635
636
637 /**
638  * Process entries in the queue (or do nothing if we are already
639  * doing so).
640  *
641  * @param h handle to the datastore
642  */
643 static void
644 process_queue (struct GNUNET_DATASTORE_Handle *h)
645 {
646   struct GNUNET_DATASTORE_QueueEntry *qe;
647
648   if (NULL == (qe = h->queue_head))
649   {
650 #if DEBUG_DATASTORE > 1
651     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
652 #endif
653     return;                     /* no entry in queue */
654   }
655   if (qe->was_transmitted == GNUNET_YES)
656   {
657 #if DEBUG_DATASTORE > 1
658     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
659 #endif
660     return;                     /* waiting for replies */
661   }
662   if (h->th != NULL)
663   {
664 #if DEBUG_DATASTORE > 1
665     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
666 #endif
667     return;                     /* request pending */
668   }
669   if (h->client == NULL)
670   {
671 #if DEBUG_DATASTORE > 1
672     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
673 #endif
674     return;                     /* waiting for reconnect */
675   }
676   if (GNUNET_YES == h->in_receive)
677   {
678     /* wait for response to previous query */
679     return;
680   }
681 #if DEBUG_DATASTORE
682   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
683               "Queueing %u byte request to DATASTORE\n", qe->message_size);
684 #endif
685   h->th =
686       GNUNET_CLIENT_notify_transmit_ready (h->client, qe->message_size,
687                                            GNUNET_TIME_absolute_get_remaining
688                                            (qe->timeout), GNUNET_YES,
689                                            &transmit_request, h);
690   GNUNET_assert (GNUNET_NO == h->in_receive);
691   GNUNET_break (NULL != h->th);
692 }
693
694
695 /**
696  * Dummy continuation used to do nothing (but be non-zero).
697  *
698  * @param cls closure
699  * @param result result
700  * @param emsg error message
701  */
702 static void
703 drop_status_cont (void *cls, int32_t result, const char *emsg)
704 {
705   /* do nothing */
706 }
707
708
709 /**
710  * Free a queue entry.  Removes the given entry from the
711  * queue and releases associated resources.  Does NOT
712  * call the callback.
713  *
714  * @param qe entry to free.
715  */
716 static void
717 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
718 {
719   struct GNUNET_DATASTORE_Handle *h = qe->h;
720
721   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
722   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
723   {
724     GNUNET_SCHEDULER_cancel (qe->task);
725     qe->task = GNUNET_SCHEDULER_NO_TASK;
726   }
727   h->queue_size--;
728   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
729   GNUNET_free (qe);
730 }
731
732
733 /**
734  * Type of a function to call when we receive a message
735  * from the service.
736  *
737  * @param cls closure
738  * @param msg message received, NULL on timeout or fatal error
739  */
740 static void
741 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
742 {
743   struct GNUNET_DATASTORE_Handle *h = cls;
744   struct GNUNET_DATASTORE_QueueEntry *qe;
745   struct StatusContext rc;
746   const struct StatusMessage *sm;
747   const char *emsg;
748   int32_t status;
749   int was_transmitted;
750
751   if (NULL == (qe = h->queue_head))
752   {
753     GNUNET_break (0);
754     do_disconnect (h);
755     return;
756   }
757   rc = qe->qc.sc;
758   if (msg == NULL)
759   {
760     was_transmitted = qe->was_transmitted;
761     free_queue_entry (qe);
762     if (was_transmitted == GNUNET_YES)
763       do_disconnect (h);
764     else
765       process_queue (h);
766     if (rc.cont != NULL)
767       rc.cont (rc.cont_cls, GNUNET_SYSERR,
768                _("Failed to receive status response from database."));
769     return;
770   }
771   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
772   free_queue_entry (qe);
773   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
774       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
775   {
776     GNUNET_break (0);
777     h->retry_time = GNUNET_TIME_UNIT_ZERO;
778     do_disconnect (h);
779     if (rc.cont != NULL)
780       rc.cont (rc.cont_cls, GNUNET_SYSERR,
781                _("Error reading response from datastore service"));
782     return;
783   }
784   sm = (const struct StatusMessage *) msg;
785   status = ntohl (sm->status);
786   emsg = NULL;
787   if (ntohs (msg->size) > sizeof (struct StatusMessage))
788   {
789     emsg = (const char *) &sm[1];
790     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
791     {
792       GNUNET_break (0);
793       emsg = _("Invalid error message received from datastore service");
794     }
795   }
796   if ((status == GNUNET_SYSERR) && (emsg == NULL))
797   {
798     GNUNET_break (0);
799     emsg = _("Invalid error message received from datastore service");
800   }
801 #if DEBUG_DATASTORE
802   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received status %d/%s\n", (int) status,
803               emsg);
804 #endif
805   GNUNET_STATISTICS_update (h->stats,
806                             gettext_noop ("# status messages received"), 1,
807                             GNUNET_NO);
808   h->retry_time.rel_value = 0;
809   process_queue (h);
810   if (rc.cont != NULL)
811     rc.cont (rc.cont_cls, status, emsg);
812 }
813
814
815 /**
816  * Store an item in the datastore.  If the item is already present,
817  * the priorities are summed up and the higher expiration time and
818  * lower anonymity level is used.
819  *
820  * @param h handle to the datastore
821  * @param rid reservation ID to use (from "reserve"); use 0 if no
822  *            prior reservation was made
823  * @param key key for the value
824  * @param size number of bytes in data
825  * @param data content stored
826  * @param type type of the content
827  * @param priority priority of the content
828  * @param anonymity anonymity-level for the content
829  * @param replication how often should the content be replicated to other peers?
830  * @param expiration expiration time for the content
831  * @param queue_priority ranking of this request in the priority queue
832  * @param max_queue_size at what queue size should this request be dropped
833  *        (if other requests of higher priority are in the queue)
834  * @param timeout timeout for the operation
835  * @param cont continuation to call when done
836  * @param cont_cls closure for cont
837  * @return NULL if the entry was not queued, otherwise a handle that can be used to
838  *         cancel; note that even if NULL is returned, the callback will be invoked
839  *         (or rather, will already have been invoked)
840  */
841 struct GNUNET_DATASTORE_QueueEntry *
842 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h, uint32_t rid,
843                       const GNUNET_HashCode * key, size_t size,
844                       const void *data, enum GNUNET_BLOCK_Type type,
845                       uint32_t priority, uint32_t anonymity,
846                       uint32_t replication,
847                       struct GNUNET_TIME_Absolute expiration,
848                       unsigned int queue_priority, unsigned int max_queue_size,
849                       struct GNUNET_TIME_Relative timeout,
850                       GNUNET_DATASTORE_ContinuationWithStatus cont,
851                       void *cont_cls)
852 {
853   struct GNUNET_DATASTORE_QueueEntry *qe;
854   struct DataMessage *dm;
855   size_t msize;
856   union QueueContext qc;
857
858 #if DEBUG_DATASTORE
859   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
860               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
861               size, GNUNET_h2s (key),
862               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
863 #endif
864   msize = sizeof (struct DataMessage) + size;
865   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
866   qc.sc.cont = cont;
867   qc.sc.cont_cls = cont_cls;
868   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
869                          &process_status_message, &qc);
870   if (qe == NULL)
871   {
872 #if DEBUG_DATASTORE
873     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874                 "Could not create queue entry for PUT\n");
875 #endif
876     return NULL;
877   }
878   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# PUT requests executed"),
879                             1, GNUNET_NO);
880   dm = (struct DataMessage *) &qe[1];
881   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
882   dm->header.size = htons (msize);
883   dm->rid = htonl (rid);
884   dm->size = htonl ((uint32_t) size);
885   dm->type = htonl (type);
886   dm->priority = htonl (priority);
887   dm->anonymity = htonl (anonymity);
888   dm->replication = htonl (replication);
889   dm->reserved = htonl (0);
890   dm->uid = GNUNET_htonll (0);
891   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
892   dm->key = *key;
893   memcpy (&dm[1], data, size);
894   process_queue (h);
895   return qe;
896 }
897
898
899 /**
900  * Reserve space in the datastore.  This function should be used
901  * to avoid "out of space" failures during a longer sequence of "put"
902  * operations (for example, when a file is being inserted).
903  *
904  * @param h handle to the datastore
905  * @param amount how much space (in bytes) should be reserved (for content only)
906  * @param entries how many entries will be created (to calculate per-entry overhead)
907  * @param queue_priority ranking of this request in the priority queue
908  * @param max_queue_size at what queue size should this request be dropped
909  *        (if other requests of higher priority are in the queue)
910  * @param timeout how long to wait at most for a response (or before dying in queue)
911  * @param cont continuation to call when done; "success" will be set to
912  *             a positive reservation value if space could be reserved.
913  * @param cont_cls closure for cont
914  * @return NULL if the entry was not queued, otherwise a handle that can be used to
915  *         cancel; note that even if NULL is returned, the callback will be invoked
916  *         (or rather, will already have been invoked)
917  */
918 struct GNUNET_DATASTORE_QueueEntry *
919 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h, uint64_t amount,
920                           uint32_t entries, unsigned int queue_priority,
921                           unsigned int max_queue_size,
922                           struct GNUNET_TIME_Relative timeout,
923                           GNUNET_DATASTORE_ContinuationWithStatus cont,
924                           void *cont_cls)
925 {
926   struct GNUNET_DATASTORE_QueueEntry *qe;
927   struct ReserveMessage *rm;
928   union QueueContext qc;
929
930   if (cont == NULL)
931     cont = &drop_status_cont;
932 #if DEBUG_DATASTORE
933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934               "Asked to reserve %llu bytes of data and %u entries\n",
935               (unsigned long long) amount, (unsigned int) entries);
936 #endif
937   qc.sc.cont = cont;
938   qc.sc.cont_cls = cont_cls;
939   qe = make_queue_entry (h, sizeof (struct ReserveMessage), queue_priority,
940                          max_queue_size, timeout, &process_status_message, &qc);
941   if (qe == NULL)
942   {
943 #if DEBUG_DATASTORE
944     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945                 "Could not create queue entry to reserve\n");
946 #endif
947     return NULL;
948   }
949   GNUNET_STATISTICS_update (h->stats,
950                             gettext_noop ("# RESERVE requests executed"), 1,
951                             GNUNET_NO);
952   rm = (struct ReserveMessage *) &qe[1];
953   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
954   rm->header.size = htons (sizeof (struct ReserveMessage));
955   rm->entries = htonl (entries);
956   rm->amount = GNUNET_htonll (amount);
957   process_queue (h);
958   return qe;
959 }
960
961
962 /**
963  * Signal that all of the data for which a reservation was made has
964  * been stored and that whatever excess space might have been reserved
965  * can now be released.
966  *
967  * @param h handle to the datastore
968  * @param rid reservation ID (value of "success" in original continuation
969  *        from the "reserve" function).
970  * @param queue_priority ranking of this request in the priority queue
971  * @param max_queue_size at what queue size should this request be dropped
972  *        (if other requests of higher priority are in the queue)
973  * @param queue_priority ranking of this request in the priority queue
974  * @param max_queue_size at what queue size should this request be dropped
975  *        (if other requests of higher priority are in the queue)
976  * @param timeout how long to wait at most for a response
977  * @param cont continuation to call when done
978  * @param cont_cls closure for cont
979  * @return NULL if the entry was not queued, otherwise a handle that can be used to
980  *         cancel; note that even if NULL is returned, the callback will be invoked
981  *         (or rather, will already have been invoked)
982  */
983 struct GNUNET_DATASTORE_QueueEntry *
984 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
985                                   uint32_t rid, unsigned int queue_priority,
986                                   unsigned int max_queue_size,
987                                   struct GNUNET_TIME_Relative timeout,
988                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
989                                   void *cont_cls)
990 {
991   struct GNUNET_DATASTORE_QueueEntry *qe;
992   struct ReleaseReserveMessage *rrm;
993   union QueueContext qc;
994
995   if (cont == NULL)
996     cont = &drop_status_cont;
997 #if DEBUG_DATASTORE
998   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
999 #endif
1000   qc.sc.cont = cont;
1001   qc.sc.cont_cls = cont_cls;
1002   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1003                          queue_priority, max_queue_size, timeout,
1004                          &process_status_message, &qc);
1005   if (qe == NULL)
1006   {
1007 #if DEBUG_DATASTORE
1008     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1009                 "Could not create queue entry to release reserve\n");
1010 #endif
1011     return NULL;
1012   }
1013   GNUNET_STATISTICS_update (h->stats,
1014                             gettext_noop
1015                             ("# RELEASE RESERVE requests executed"), 1,
1016                             GNUNET_NO);
1017   rrm = (struct ReleaseReserveMessage *) &qe[1];
1018   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1019   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1020   rrm->rid = htonl (rid);
1021   process_queue (h);
1022   return qe;
1023 }
1024
1025
1026 /**
1027  * Update a value in the datastore.
1028  *
1029  * @param h handle to the datastore
1030  * @param uid identifier for the value
1031  * @param priority how much to increase the priority of the value
1032  * @param expiration new expiration value should be MAX of existing and this argument
1033  * @param queue_priority ranking of this request in the priority queue
1034  * @param max_queue_size at what queue size should this request be dropped
1035  *        (if other requests of higher priority are in the queue)
1036  * @param timeout how long to wait at most for a response
1037  * @param cont continuation to call when done
1038  * @param cont_cls closure for cont
1039  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1040  *         cancel; note that even if NULL is returned, the callback will be invoked
1041  *         (or rather, will already have been invoked)
1042  */
1043 struct GNUNET_DATASTORE_QueueEntry *
1044 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h, uint64_t uid,
1045                          uint32_t priority,
1046                          struct GNUNET_TIME_Absolute expiration,
1047                          unsigned int queue_priority,
1048                          unsigned int max_queue_size,
1049                          struct GNUNET_TIME_Relative timeout,
1050                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1051                          void *cont_cls)
1052 {
1053   struct GNUNET_DATASTORE_QueueEntry *qe;
1054   struct UpdateMessage *um;
1055   union QueueContext qc;
1056
1057   if (cont == NULL)
1058     cont = &drop_status_cont;
1059 #if DEBUG_DATASTORE
1060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1061               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1062               uid, (unsigned int) priority,
1063               (unsigned long long) expiration.abs_value);
1064 #endif
1065   qc.sc.cont = cont;
1066   qc.sc.cont_cls = cont_cls;
1067   qe = make_queue_entry (h, sizeof (struct UpdateMessage), queue_priority,
1068                          max_queue_size, timeout, &process_status_message, &qc);
1069   if (qe == NULL)
1070   {
1071 #if DEBUG_DATASTORE
1072     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073                 "Could not create queue entry for UPDATE\n");
1074 #endif
1075     return NULL;
1076   }
1077   GNUNET_STATISTICS_update (h->stats,
1078                             gettext_noop ("# UPDATE requests executed"), 1,
1079                             GNUNET_NO);
1080   um = (struct UpdateMessage *) &qe[1];
1081   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1082   um->header.size = htons (sizeof (struct UpdateMessage));
1083   um->priority = htonl (priority);
1084   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1085   um->uid = GNUNET_htonll (uid);
1086   process_queue (h);
1087   return qe;
1088 }
1089
1090
1091 /**
1092  * Explicitly remove some content from the database.
1093  * The "cont"inuation will be called with status
1094  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1095  * if no matching entry was found and "GNUNET_SYSERR"
1096  * on all other types of errors.
1097  *
1098  * @param h handle to the datastore
1099  * @param key key for the value
1100  * @param size number of bytes in data
1101  * @param data content stored
1102  * @param queue_priority ranking of this request in the priority queue
1103  * @param max_queue_size at what queue size should this request be dropped
1104  *        (if other requests of higher priority are in the queue)
1105  * @param timeout how long to wait at most for a response
1106  * @param cont continuation to call when done
1107  * @param cont_cls closure for cont
1108  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1109  *         cancel; note that even if NULL is returned, the callback will be invoked
1110  *         (or rather, will already have been invoked)
1111  */
1112 struct GNUNET_DATASTORE_QueueEntry *
1113 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1114                          const GNUNET_HashCode * key, size_t size,
1115                          const void *data, unsigned int queue_priority,
1116                          unsigned int max_queue_size,
1117                          struct GNUNET_TIME_Relative timeout,
1118                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1119                          void *cont_cls)
1120 {
1121   struct GNUNET_DATASTORE_QueueEntry *qe;
1122   struct DataMessage *dm;
1123   size_t msize;
1124   union QueueContext qc;
1125
1126   if (cont == NULL)
1127     cont = &drop_status_cont;
1128 #if DEBUG_DATASTORE
1129   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1130               "Asked to remove %u bytes under key `%s'\n", size,
1131               GNUNET_h2s (key));
1132 #endif
1133   qc.sc.cont = cont;
1134   qc.sc.cont_cls = cont_cls;
1135   msize = sizeof (struct DataMessage) + size;
1136   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1137   qe = make_queue_entry (h, msize, queue_priority, max_queue_size, timeout,
1138                          &process_status_message, &qc);
1139   if (qe == NULL)
1140   {
1141 #if DEBUG_DATASTORE
1142     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1143                 "Could not create queue entry for REMOVE\n");
1144 #endif
1145     return NULL;
1146   }
1147   GNUNET_STATISTICS_update (h->stats,
1148                             gettext_noop ("# REMOVE requests executed"), 1,
1149                             GNUNET_NO);
1150   dm = (struct DataMessage *) &qe[1];
1151   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1152   dm->header.size = htons (msize);
1153   dm->rid = htonl (0);
1154   dm->size = htonl (size);
1155   dm->type = htonl (0);
1156   dm->priority = htonl (0);
1157   dm->anonymity = htonl (0);
1158   dm->uid = GNUNET_htonll (0);
1159   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1160   dm->key = *key;
1161   memcpy (&dm[1], data, size);
1162   process_queue (h);
1163   return qe;
1164 }
1165
1166
1167 /**
1168  * Type of a function to call when we receive a message
1169  * from the service.
1170  *
1171  * @param cls closure
1172  * @param msg message received, NULL on timeout or fatal error
1173  */
1174 static void
1175 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1176 {
1177   struct GNUNET_DATASTORE_Handle *h = cls;
1178   struct GNUNET_DATASTORE_QueueEntry *qe;
1179   struct ResultContext rc;
1180   const struct DataMessage *dm;
1181   int was_transmitted;
1182
1183   if (msg == NULL)
1184   {
1185     qe = h->queue_head;
1186     GNUNET_assert (NULL != qe);
1187     rc = qe->qc.rc;
1188     was_transmitted = qe->was_transmitted;
1189     free_queue_entry (qe);
1190     if (was_transmitted == GNUNET_YES)
1191     {
1192       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1193                   _("Failed to receive response from database.\n"));
1194       do_disconnect (h);
1195     }
1196     else
1197     {
1198       process_queue (h);
1199     }
1200     if (rc.proc != NULL)
1201       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1202                0);
1203     return;
1204   }
1205   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1206   {
1207     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1208     qe = h->queue_head;
1209     rc = qe->qc.rc;
1210     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1211     free_queue_entry (qe);
1212 #if DEBUG_DATASTORE
1213     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1214                 "Received end of result set, new queue size is %u\n",
1215                 h->queue_size);
1216 #endif
1217     if (rc.proc != NULL)
1218       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1219                0);
1220     h->retry_time.rel_value = 0;
1221     h->result_count = 0;
1222     process_queue (h);
1223     return;
1224   }
1225   qe = h->queue_head;
1226   GNUNET_assert (NULL != qe);
1227   rc = qe->qc.rc;
1228   if (GNUNET_YES != qe->was_transmitted)
1229   {
1230     GNUNET_break (0);
1231     free_queue_entry (qe);
1232     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1233     do_disconnect (h);
1234     if (rc.proc != NULL)
1235       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1236                0);
1237     return;
1238   }
1239   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1240       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1241       (ntohs (msg->size) !=
1242        sizeof (struct DataMessage) +
1243        ntohl (((const struct DataMessage *) msg)->size)))
1244   {
1245     GNUNET_break (0);
1246     free_queue_entry (qe);
1247     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1248     do_disconnect (h);
1249     if (rc.proc != NULL)
1250       rc.proc (rc.proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS,
1251                0);
1252     return;
1253   }
1254   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# Results received"), 1,
1255                             GNUNET_NO);
1256   dm = (const struct DataMessage *) msg;
1257 #if DEBUG_DATASTORE
1258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259               "Received result %llu with type %u and size %u with key %s\n",
1260               (unsigned long long) GNUNET_ntohll (dm->uid), ntohl (dm->type),
1261               ntohl (dm->size), GNUNET_h2s (&dm->key));
1262 #endif
1263   free_queue_entry (qe);
1264   h->retry_time.rel_value = 0;
1265   process_queue (h);
1266   if (rc.proc != NULL)
1267     rc.proc (rc.proc_cls, &dm->key, ntohl (dm->size), &dm[1], ntohl (dm->type),
1268              ntohl (dm->priority), ntohl (dm->anonymity),
1269              GNUNET_TIME_absolute_ntoh (dm->expiration),
1270              GNUNET_ntohll (dm->uid));
1271 }
1272
1273
1274 /**
1275  * Get a random value from the datastore for content replication.
1276  * Returns a single, random value among those with the highest
1277  * replication score, lowering positive replication scores by one for
1278  * the chosen value (if only content with a replication score exists,
1279  * a random value is returned and replication scores are not changed).
1280  *
1281  * @param h handle to the datastore
1282  * @param queue_priority ranking of this request in the priority queue
1283  * @param max_queue_size at what queue size should this request be dropped
1284  *        (if other requests of higher priority are in the queue)
1285  * @param timeout how long to wait at most for a response
1286  * @param proc function to call on a random value; it
1287  *        will be called once with a value (if available)
1288  *        and always once with a value of NULL.
1289  * @param proc_cls closure for proc
1290  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1291  *         cancel
1292  */
1293 struct GNUNET_DATASTORE_QueueEntry *
1294 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1295                                       unsigned int queue_priority,
1296                                       unsigned int max_queue_size,
1297                                       struct GNUNET_TIME_Relative timeout,
1298                                       GNUNET_DATASTORE_DatumProcessor proc,
1299                                       void *proc_cls)
1300 {
1301   struct GNUNET_DATASTORE_QueueEntry *qe;
1302   struct GNUNET_MessageHeader *m;
1303   union QueueContext qc;
1304
1305   GNUNET_assert (NULL != proc);
1306 #if DEBUG_DATASTORE
1307   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1308               "Asked to get replication entry in %llu ms\n",
1309               (unsigned long long) timeout.rel_value);
1310 #endif
1311   qc.rc.proc = proc;
1312   qc.rc.proc_cls = proc_cls;
1313   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1314                          queue_priority, max_queue_size, timeout,
1315                          &process_result_message, &qc);
1316   if (qe == NULL)
1317   {
1318 #if DEBUG_DATASTORE
1319     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320                 "Could not create queue entry for GET REPLICATION\n");
1321 #endif
1322     return NULL;
1323   }
1324   GNUNET_STATISTICS_update (h->stats,
1325                             gettext_noop
1326                             ("# GET REPLICATION requests executed"), 1,
1327                             GNUNET_NO);
1328   m = (struct GNUNET_MessageHeader *) &qe[1];
1329   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1330   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1331   process_queue (h);
1332   return qe;
1333 }
1334
1335
1336 /**
1337  * Get a single zero-anonymity value from the datastore.
1338  *
1339  * @param h handle to the datastore
1340  * @param offset offset of the result (modulo num-results); set to
1341  *               a random 64-bit value initially; then increment by
1342  *               one each time; detect that all results have been found by uid
1343  *               being again the first uid ever returned.
1344  * @param queue_priority ranking of this request in the priority queue
1345  * @param max_queue_size at what queue size should this request be dropped
1346  *        (if other requests of higher priority are in the queue)
1347  * @param timeout how long to wait at most for a response
1348  * @param type allowed type for the operation (never zero)
1349  * @param proc function to call on a random value; it
1350  *        will be called once with a value (if available)
1351  *        or with NULL if none value exists.
1352  * @param proc_cls closure for proc
1353  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1354  *         cancel
1355  */
1356 struct GNUNET_DATASTORE_QueueEntry *
1357 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1358                                      uint64_t offset,
1359                                      unsigned int queue_priority,
1360                                      unsigned int max_queue_size,
1361                                      struct GNUNET_TIME_Relative timeout,
1362                                      enum GNUNET_BLOCK_Type type,
1363                                      GNUNET_DATASTORE_DatumProcessor proc,
1364                                      void *proc_cls)
1365 {
1366   struct GNUNET_DATASTORE_QueueEntry *qe;
1367   struct GetZeroAnonymityMessage *m;
1368   union QueueContext qc;
1369
1370   GNUNET_assert (NULL != proc);
1371   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1372 #if DEBUG_DATASTORE
1373   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1374               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1375               (unsigned long long) offset, type,
1376               (unsigned long long) timeout.rel_value);
1377 #endif
1378   qc.rc.proc = proc;
1379   qc.rc.proc_cls = proc_cls;
1380   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1381                          queue_priority, max_queue_size, timeout,
1382                          &process_result_message, &qc);
1383   if (qe == NULL)
1384   {
1385 #if DEBUG_DATASTORE
1386     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387                 "Could not create queue entry for zero-anonymity procation\n");
1388 #endif
1389     return NULL;
1390   }
1391   GNUNET_STATISTICS_update (h->stats,
1392                             gettext_noop
1393                             ("# GET ZERO ANONYMITY requests executed"), 1,
1394                             GNUNET_NO);
1395   m = (struct GetZeroAnonymityMessage *) &qe[1];
1396   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1397   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1398   m->type = htonl ((uint32_t) type);
1399   m->offset = GNUNET_htonll (offset);
1400   process_queue (h);
1401   return qe;
1402 }
1403
1404
1405 /**
1406  * Get a result for a particular key from the datastore.  The processor
1407  * will only be called once.
1408  *
1409  * @param h handle to the datastore
1410  * @param offset offset of the result (modulo num-results); set to
1411  *               a random 64-bit value initially; then increment by
1412  *               one each time; detect that all results have been found by uid
1413  *               being again the first uid ever returned.
1414  * @param key maybe NULL (to match all entries)
1415  * @param type desired type, 0 for any
1416  * @param queue_priority ranking of this request in the priority queue
1417  * @param max_queue_size at what queue size should this request be dropped
1418  *        (if other requests of higher priority are in the queue)
1419  * @param timeout how long to wait at most for a response
1420  * @param proc function to call on each matching value;
1421  *        will be called once with a NULL value at the end
1422  * @param proc_cls closure for proc
1423  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1424  *         cancel
1425  */
1426 struct GNUNET_DATASTORE_QueueEntry *
1427 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h, uint64_t offset,
1428                           const GNUNET_HashCode * key,
1429                           enum GNUNET_BLOCK_Type type,
1430                           unsigned int queue_priority,
1431                           unsigned int max_queue_size,
1432                           struct GNUNET_TIME_Relative timeout,
1433                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1434 {
1435   struct GNUNET_DATASTORE_QueueEntry *qe;
1436   struct GetMessage *gm;
1437   union QueueContext qc;
1438
1439   GNUNET_assert (NULL != proc);
1440 #if DEBUG_DATASTORE
1441   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1442               "Asked to look for data of type %u under key `%s'\n",
1443               (unsigned int) type, GNUNET_h2s (key));
1444 #endif
1445   qc.rc.proc = proc;
1446   qc.rc.proc_cls = proc_cls;
1447   qe = make_queue_entry (h, sizeof (struct GetMessage), queue_priority,
1448                          max_queue_size, timeout, &process_result_message, &qc);
1449   if (qe == NULL)
1450   {
1451 #if DEBUG_DATASTORE
1452     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Could not queue request for `%s'\n",
1453                 GNUNET_h2s (key));
1454 #endif
1455     return NULL;
1456   }
1457   GNUNET_STATISTICS_update (h->stats, gettext_noop ("# GET requests executed"),
1458                             1, GNUNET_NO);
1459   gm = (struct GetMessage *) &qe[1];
1460   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1461   gm->type = htonl (type);
1462   gm->offset = GNUNET_htonll (offset);
1463   if (key != NULL)
1464   {
1465     gm->header.size = htons (sizeof (struct GetMessage));
1466     gm->key = *key;
1467   }
1468   else
1469   {
1470     gm->header.size =
1471         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1472   }
1473   process_queue (h);
1474   return qe;
1475 }
1476
1477
1478 /**
1479  * Cancel a datastore operation.  The final callback from the
1480  * operation must not have been done yet.
1481  *
1482  * @param qe operation to cancel
1483  */
1484 void
1485 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1486 {
1487   struct GNUNET_DATASTORE_Handle *h;
1488
1489   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1490   h = qe->h;
1491 #if DEBUG_DATASTORE
1492   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1493               "Pending DATASTORE request %p cancelled (%d, %d)\n", qe,
1494               qe->was_transmitted, h->queue_head == qe);
1495 #endif
1496   if (GNUNET_YES == qe->was_transmitted)
1497   {
1498     free_queue_entry (qe);
1499     h->skip_next_messages++;
1500     return;
1501   }
1502   free_queue_entry (qe);
1503   process_queue (h);
1504 }
1505
1506
1507 /* end of datastore_api.c */