85e402a4d9468a30bee381c88e56295bc756a2f1
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Function to call with the result.
67    */
68   GNUNET_DATASTORE_DatumProcessor proc;
69
70   /**
71    * Closure for proc.
72    */
73   void *proc_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int was_transmitted;
170
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Current connection to the datastore service.
186    */
187   struct GNUNET_CLIENT_Connection *client;
188
189   /**
190    * Handle for statistics.
191    */
192   struct GNUNET_STATISTICS_Handle *stats;
193
194   /**
195    * Current transmit handle.
196    */
197   struct GNUNET_CLIENT_TransmitHandle *th;
198
199   /**
200    * Current head of priority queue.
201    */
202   struct GNUNET_DATASTORE_QueueEntry *queue_head;
203
204   /**
205    * Current tail of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
208
209   /**
210    * Task for trying to reconnect.
211    */
212   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
213
214   /**
215    * How quickly should we retry?  Used for exponential back-off on
216    * connect-errors.
217    */
218   struct GNUNET_TIME_Relative retry_time;
219
220   /**
221    * Number of entries in the queue.
222    */
223   unsigned int queue_size;
224
225   /**
226    * Number of results we're receiving for the current query
227    * after application stopped to care.  Used to determine when
228    * to reset the connection.
229    */
230   unsigned int result_count;
231
232   /**
233    * Are we currently trying to receive from the service?
234    */
235   int in_receive;
236
237   /**
238    * We should ignore the next message(s) from the service.
239    */
240   unsigned int skip_next_messages;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @return handle to use to access the service
251  */
252 struct GNUNET_DATASTORE_Handle *
253 GNUNET_DATASTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
254 {
255   struct GNUNET_CLIENT_Connection *c;
256   struct GNUNET_DATASTORE_Handle *h;
257
258   c = GNUNET_CLIENT_connect ("datastore", cfg);
259   if (c == NULL)
260     return NULL;                /* oops */
261   h = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_Handle) +
262                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
263   h->client = c;
264   h->cfg = cfg;
265   h->stats = GNUNET_STATISTICS_create ("datastore-api", cfg);
266   return h;
267 }
268
269
270 /**
271  * Transmit DROP message to datastore service.
272  *
273  * @param cls the 'struct GNUNET_DATASTORE_Handle'
274  * @param size number of bytes that can be copied to buf
275  * @param buf where to copy the drop message
276  * @return number of bytes written to buf
277  */
278 static size_t
279 transmit_drop (void *cls, size_t size, void *buf)
280 {
281   struct GNUNET_DATASTORE_Handle *h = cls;
282   struct GNUNET_MessageHeader *hdr;
283
284   if (buf == NULL)
285   {
286     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
287                 _("Failed to transmit request to drop database.\n"));
288     GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
289     return 0;
290   }
291   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
292   hdr = buf;
293   hdr->size = htons (sizeof (struct GNUNET_MessageHeader));
294   hdr->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
295   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
296   return sizeof (struct GNUNET_MessageHeader);
297 }
298
299
300 /**
301  * Disconnect from the datastore service (and free
302  * associated resources).
303  *
304  * @param h handle to the datastore
305  * @param drop set to GNUNET_YES to delete all data in datastore (!)
306  */
307 void
308 GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h, int drop)
309 {
310   struct GNUNET_DATASTORE_QueueEntry *qe;
311
312 #if DEBUG_DATASTORE
313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Datastore disconnect\n");
314 #endif
315   if (NULL != h->th)
316   {
317     GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
318     h->th = NULL;
319   }
320   if (h->client != NULL)
321   {
322     GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
323     h->client = NULL;
324   }
325   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
326   {
327     GNUNET_SCHEDULER_cancel (h->reconnect_task);
328     h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
329   }
330   while (NULL != (qe = h->queue_head))
331   {
332     GNUNET_assert (NULL != qe->response_proc);
333     qe->response_proc (h, NULL);
334   }
335   if (GNUNET_YES == drop)
336   {
337     h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
338     if (h->client != NULL)
339     {
340       if (NULL !=
341           GNUNET_CLIENT_notify_transmit_ready (h->client,
342                                                sizeof (struct
343                                                        GNUNET_MessageHeader),
344                                                GNUNET_TIME_UNIT_MINUTES,
345                                                GNUNET_YES, &transmit_drop, h))
346         return;
347       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
348       h->client = NULL;
349     }
350     GNUNET_break (0);
351   }
352   GNUNET_STATISTICS_destroy (h->stats, GNUNET_NO);
353   h->stats = NULL;
354   GNUNET_free (h);
355 }
356
357
358 /**
359  * A request has timed out (before being transmitted to the service).
360  *
361  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
362  * @param tc scheduler context
363  */
364 static void
365 timeout_queue_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
366 {
367   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
368
369   GNUNET_STATISTICS_update (qe->h->stats,
370                             gettext_noop ("# queue entry timeouts"),
371                             1, GNUNET_NO);
372   qe->task = GNUNET_SCHEDULER_NO_TASK;
373   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
374 #if DEBUG_DATASTORE
375   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
376               "Timeout of request in datastore queue\n");
377 #endif
378   qe->response_proc (qe->h, NULL);
379 }
380
381
382 /**
383  * Create a new entry for our priority queue (and possibly discard other entires if
384  * the queue is getting too long).
385  *
386  * @param h handle to the datastore
387  * @param msize size of the message to queue
388  * @param queue_priority priority of the entry
389  * @param max_queue_size at what queue size should this request be dropped
390  *        (if other requests of higher priority are in the queue)
391  * @param timeout timeout for the operation
392  * @param response_proc function to call with replies (can be NULL)
393  * @param qc client context (NOT a closure for response_proc)
394  * @return NULL if the queue is full 
395  */
396 static struct GNUNET_DATASTORE_QueueEntry *
397 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
398                   size_t msize,
399                   unsigned int queue_priority,
400                   unsigned int max_queue_size,
401                   struct GNUNET_TIME_Relative timeout,
402                   GNUNET_CLIENT_MessageHandler response_proc,
403                   const union QueueContext *qc)
404 {
405   struct GNUNET_DATASTORE_QueueEntry *ret;
406   struct GNUNET_DATASTORE_QueueEntry *pos;
407   unsigned int c;
408
409   c = 0;
410   pos = h->queue_head;
411   while ((pos != NULL) &&
412          (c < max_queue_size) && (pos->priority >= queue_priority))
413   {
414     c++;
415     pos = pos->next;
416   }
417   if (c >= max_queue_size)
418   {
419     GNUNET_STATISTICS_update (h->stats,
420                               gettext_noop ("# queue overflows"), 1, GNUNET_NO);
421     return NULL;
422   }
423   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
424   ret->h = h;
425   ret->response_proc = response_proc;
426   ret->qc = *qc;
427   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
428   ret->priority = queue_priority;
429   ret->max_queue = max_queue_size;
430   ret->message_size = msize;
431   ret->was_transmitted = GNUNET_NO;
432   if (pos == NULL)
433   {
434     /* append at the tail */
435     pos = h->queue_tail;
436   }
437   else
438   {
439     pos = pos->prev;
440     /* do not insert at HEAD if HEAD query was already
441      * transmitted and we are still receiving replies! */
442     if ((pos == NULL) && (h->queue_head->was_transmitted))
443       pos = h->queue_head;
444   }
445   c++;
446   GNUNET_STATISTICS_update (h->stats,
447                             gettext_noop ("# queue entries created"),
448                             1, GNUNET_NO);
449   GNUNET_CONTAINER_DLL_insert_after (h->queue_head, h->queue_tail, pos, ret);
450   h->queue_size++;
451   ret->task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_queue_entry, ret);
452   pos = ret->next;
453   while (pos != NULL)
454   {
455     if ((pos->max_queue < h->queue_size) && (pos->was_transmitted == GNUNET_NO))
456     {
457       GNUNET_assert (pos->response_proc != NULL);
458       /* move 'pos' element to head so that it will be 
459        * killed on 'NULL' call below */
460 #if DEBUG_DATASTORE
461       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
462                   "Dropping request from datastore queue\n");
463 #endif
464       GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, pos);
465       GNUNET_CONTAINER_DLL_insert (h->queue_head, h->queue_tail, pos);
466       GNUNET_STATISTICS_update (h->stats,
467                                 gettext_noop
468                                 ("# Requests dropped from datastore queue"), 1,
469                                 GNUNET_NO);
470       GNUNET_assert (h->queue_head == pos);
471       pos->response_proc (h, NULL);
472       break;
473     }
474     pos = pos->next;
475   }
476   return ret;
477 }
478
479
480 /**
481  * Process entries in the queue (or do nothing if we are already
482  * doing so).
483  * 
484  * @param h handle to the datastore
485  */
486 static void process_queue (struct GNUNET_DATASTORE_Handle *h);
487
488
489 /**
490  * Try reconnecting to the datastore service.
491  *
492  * @param cls the 'struct GNUNET_DATASTORE_Handle'
493  * @param tc scheduler context
494  */
495 static void
496 try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
497 {
498   struct GNUNET_DATASTORE_Handle *h = cls;
499
500   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
501     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
502   else
503     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
504   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
505     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
506   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
507   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
508   if (h->client == NULL)
509   {
510     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
511                 "DATASTORE reconnect failed (fatally)\n");
512     return;
513   }
514   GNUNET_STATISTICS_update (h->stats,
515                             gettext_noop
516                             ("# datastore connections (re)created"), 1,
517                             GNUNET_NO);
518 #if DEBUG_DATASTORE
519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnected to DATASTORE\n");
520 #endif
521   process_queue (h);
522 }
523
524
525 /**
526  * Disconnect from the service and then try reconnecting to the datastore service
527  * after some delay.
528  *
529  * @param h handle to datastore to disconnect and reconnect
530  */
531 static void
532 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
533 {
534   if (h->client == NULL)
535   {
536 #if DEBUG_DATASTORE
537     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538                 "client NULL in disconnect, will not try to reconnect\n");
539 #endif
540     return;
541   }
542 #if 0
543   GNUNET_STATISTICS_update (stats,
544                             gettext_noop ("# reconnected to DATASTORE"),
545                             1, GNUNET_NO);
546 #endif
547   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
548   h->skip_next_messages = 0;
549   h->client = NULL;
550   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
551                                                     &try_reconnect, h);
552 }
553
554
555 /**
556  * Function called whenever we receive a message from
557  * the service.  Calls the appropriate handler.
558  *
559  * @param cls the 'struct GNUNET_DATASTORE_Handle'
560  * @param msg the received message
561  */
562 static void
563 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
564 {
565   struct GNUNET_DATASTORE_Handle *h = cls;
566   struct GNUNET_DATASTORE_QueueEntry *qe;
567
568   h->in_receive = GNUNET_NO;
569 #if DEBUG_DATASTORE
570   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Receiving reply from datastore\n");
571 #endif
572   if (h->skip_next_messages > 0)
573   {
574     h->skip_next_messages--;
575     process_queue (h);
576     return;
577   }
578   if (NULL == (qe = h->queue_head))
579   {
580     GNUNET_break (0);
581     process_queue (h);
582     return;
583   }
584   qe->response_proc (h, msg);
585 }
586
587
588 /**
589  * Transmit request from queue to datastore service.
590  *
591  * @param cls the 'struct GNUNET_DATASTORE_Handle'
592  * @param size number of bytes that can be copied to buf
593  * @param buf where to copy the drop message
594  * @return number of bytes written to buf
595  */
596 static size_t
597 transmit_request (void *cls, size_t size, void *buf)
598 {
599   struct GNUNET_DATASTORE_Handle *h = cls;
600   struct GNUNET_DATASTORE_QueueEntry *qe;
601   size_t msize;
602
603   h->th = NULL;
604   if (NULL == (qe = h->queue_head))
605     return 0;                   /* no entry in queue */
606   if (buf == NULL)
607   {
608     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
609                 _("Failed to transmit request to DATASTORE.\n"));
610     GNUNET_STATISTICS_update (h->stats,
611                               gettext_noop ("# transmission request failures"),
612                               1, GNUNET_NO);
613     do_disconnect (h);
614     return 0;
615   }
616   if (size < (msize = qe->message_size))
617   {
618     process_queue (h);
619     return 0;
620   }
621 #if DEBUG_DATASTORE
622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
623               "Transmitting %u byte request to DATASTORE\n", msize);
624 #endif
625   memcpy (buf, &qe[1], msize);
626   qe->was_transmitted = GNUNET_YES;
627   GNUNET_SCHEDULER_cancel (qe->task);
628   qe->task = GNUNET_SCHEDULER_NO_TASK;
629   GNUNET_assert (GNUNET_NO == h->in_receive);
630   h->in_receive = GNUNET_YES;
631   GNUNET_CLIENT_receive (h->client,
632                          &receive_cb,
633                          h, GNUNET_TIME_absolute_get_remaining (qe->timeout));
634   GNUNET_STATISTICS_update (h->stats,
635                             gettext_noop ("# bytes sent to datastore"),
636                             1, GNUNET_NO);
637   return msize;
638 }
639
640
641 /**
642  * Process entries in the queue (or do nothing if we are already
643  * doing so).
644  * 
645  * @param h handle to the datastore
646  */
647 static void
648 process_queue (struct GNUNET_DATASTORE_Handle *h)
649 {
650   struct GNUNET_DATASTORE_QueueEntry *qe;
651
652   if (NULL == (qe = h->queue_head))
653   {
654 #if DEBUG_DATASTORE > 1
655     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queue empty\n");
656 #endif
657     return;                     /* no entry in queue */
658   }
659   if (qe->was_transmitted == GNUNET_YES)
660   {
661 #if DEBUG_DATASTORE > 1
662     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Head request already transmitted\n");
663 #endif
664     return;                     /* waiting for replies */
665   }
666   if (h->th != NULL)
667   {
668 #if DEBUG_DATASTORE > 1
669     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Pending transmission request\n");
670 #endif
671     return;                     /* request pending */
672   }
673   if (h->client == NULL)
674   {
675 #if DEBUG_DATASTORE > 1
676     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not connected\n");
677 #endif
678     return;                     /* waiting for reconnect */
679   }
680   if (GNUNET_YES == h->in_receive)
681   {
682     /* wait for response to previous query */
683     return;
684   }
685 #if DEBUG_DATASTORE
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "Queueing %u byte request to DATASTORE\n", qe->message_size);
688 #endif
689   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
690                                                qe->message_size,
691                                                GNUNET_TIME_absolute_get_remaining
692                                                (qe->timeout), GNUNET_YES,
693                                                &transmit_request, h);
694   GNUNET_assert (GNUNET_NO == h->in_receive);
695   GNUNET_break (NULL != h->th);
696 }
697
698
699 /**
700  * Dummy continuation used to do nothing (but be non-zero).
701  *
702  * @param cls closure
703  * @param result result 
704  * @param emsg error message
705  */
706 static void
707 drop_status_cont (void *cls, int32_t result, const char *emsg)
708 {
709   /* do nothing */
710 }
711
712
713 /**
714  * Free a queue entry.  Removes the given entry from the
715  * queue and releases associated resources.  Does NOT
716  * call the callback.
717  * 
718  * @param qe entry to free.
719  */
720 static void
721 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
722 {
723   struct GNUNET_DATASTORE_Handle *h = qe->h;
724
725   GNUNET_CONTAINER_DLL_remove (h->queue_head, h->queue_tail, qe);
726   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
727   {
728     GNUNET_SCHEDULER_cancel (qe->task);
729     qe->task = GNUNET_SCHEDULER_NO_TASK;
730   }
731   h->queue_size--;
732   qe->was_transmitted = GNUNET_SYSERR;  /* use-after-free warning */
733   GNUNET_free (qe);
734 }
735
736
737 /**
738  * Type of a function to call when we receive a message
739  * from the service.
740  *
741  * @param cls closure
742  * @param msg message received, NULL on timeout or fatal error
743  */
744 static void
745 process_status_message (void *cls, const struct GNUNET_MessageHeader *msg)
746 {
747   struct GNUNET_DATASTORE_Handle *h = cls;
748   struct GNUNET_DATASTORE_QueueEntry *qe;
749   struct StatusContext rc;
750   const struct StatusMessage *sm;
751   const char *emsg;
752   int32_t status;
753   int was_transmitted;
754
755   if (NULL == (qe = h->queue_head))
756   {
757     GNUNET_break (0);
758     do_disconnect (h);
759     return;
760   }
761   rc = qe->qc.sc;
762   if (msg == NULL)
763   {
764     was_transmitted = qe->was_transmitted;
765     free_queue_entry (qe);
766     if (was_transmitted == GNUNET_YES)
767       do_disconnect (h);
768     else
769       process_queue (h);
770     if (rc.cont != NULL)
771       rc.cont (rc.cont_cls,
772                GNUNET_SYSERR,
773                _("Failed to receive status response from database."));
774     return;
775   }
776   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
777   free_queue_entry (qe);
778   if ((ntohs (msg->size) < sizeof (struct StatusMessage)) ||
779       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS))
780   {
781     GNUNET_break (0);
782     h->retry_time = GNUNET_TIME_UNIT_ZERO;
783     do_disconnect (h);
784     if (rc.cont != NULL)
785       rc.cont (rc.cont_cls,
786                GNUNET_SYSERR,
787                _("Error reading response from datastore service"));
788     return;
789   }
790   sm = (const struct StatusMessage *) msg;
791   status = ntohl (sm->status);
792   emsg = NULL;
793   if (ntohs (msg->size) > sizeof (struct StatusMessage))
794   {
795     emsg = (const char *) &sm[1];
796     if (emsg[ntohs (msg->size) - sizeof (struct StatusMessage) - 1] != '\0')
797     {
798       GNUNET_break (0);
799       emsg = _("Invalid error message received from datastore service");
800     }
801   }
802   if ((status == GNUNET_SYSERR) && (emsg == NULL))
803   {
804     GNUNET_break (0);
805     emsg = _("Invalid error message received from datastore service");
806   }
807 #if DEBUG_DATASTORE
808   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
809               "Received status %d/%s\n", (int) status, emsg);
810 #endif
811   GNUNET_STATISTICS_update (h->stats,
812                             gettext_noop ("# status messages received"),
813                             1, GNUNET_NO);
814   h->retry_time.rel_value = 0;
815   process_queue (h);
816   if (rc.cont != NULL)
817     rc.cont (rc.cont_cls, status, emsg);
818 }
819
820
821 /**
822  * Store an item in the datastore.  If the item is already present,
823  * the priorities are summed up and the higher expiration time and
824  * lower anonymity level is used.
825  *
826  * @param h handle to the datastore
827  * @param rid reservation ID to use (from "reserve"); use 0 if no
828  *            prior reservation was made
829  * @param key key for the value
830  * @param size number of bytes in data
831  * @param data content stored
832  * @param type type of the content
833  * @param priority priority of the content
834  * @param anonymity anonymity-level for the content
835  * @param replication how often should the content be replicated to other peers?
836  * @param expiration expiration time for the content
837  * @param queue_priority ranking of this request in the priority queue
838  * @param max_queue_size at what queue size should this request be dropped
839  *        (if other requests of higher priority are in the queue)
840  * @param timeout timeout for the operation
841  * @param cont continuation to call when done
842  * @param cont_cls closure for cont
843  * @return NULL if the entry was not queued, otherwise a handle that can be used to
844  *         cancel; note that even if NULL is returned, the callback will be invoked
845  *         (or rather, will already have been invoked)
846  */
847 struct GNUNET_DATASTORE_QueueEntry *
848 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
849                       uint32_t rid,
850                       const GNUNET_HashCode * key,
851                       size_t size,
852                       const void *data,
853                       enum GNUNET_BLOCK_Type type,
854                       uint32_t priority,
855                       uint32_t anonymity,
856                       uint32_t replication,
857                       struct GNUNET_TIME_Absolute expiration,
858                       unsigned int queue_priority,
859                       unsigned int max_queue_size,
860                       struct GNUNET_TIME_Relative timeout,
861                       GNUNET_DATASTORE_ContinuationWithStatus cont,
862                       void *cont_cls)
863 {
864   struct GNUNET_DATASTORE_QueueEntry *qe;
865   struct DataMessage *dm;
866   size_t msize;
867   union QueueContext qc;
868
869 #if DEBUG_DATASTORE
870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
871               "Asked to put %u bytes of data under key `%s' for %llu ms\n",
872               size,
873               GNUNET_h2s (key),
874               GNUNET_TIME_absolute_get_remaining (expiration).rel_value);
875 #endif
876   msize = sizeof (struct DataMessage) + size;
877   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
878   qc.sc.cont = cont;
879   qc.sc.cont_cls = cont_cls;
880   qe = make_queue_entry (h, msize,
881                          queue_priority, max_queue_size, timeout,
882                          &process_status_message, &qc);
883   if (qe == NULL)
884   {
885 #if DEBUG_DATASTORE
886     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
887                 "Could not create queue entry for PUT\n");
888 #endif
889     return NULL;
890   }
891   GNUNET_STATISTICS_update (h->stats,
892                             gettext_noop ("# PUT requests executed"),
893                             1, GNUNET_NO);
894   dm = (struct DataMessage *) &qe[1];
895   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
896   dm->header.size = htons (msize);
897   dm->rid = htonl (rid);
898   dm->size = htonl ((uint32_t) size);
899   dm->type = htonl (type);
900   dm->priority = htonl (priority);
901   dm->anonymity = htonl (anonymity);
902   dm->replication = htonl (replication);
903   dm->reserved = htonl (0);
904   dm->uid = GNUNET_htonll (0);
905   dm->expiration = GNUNET_TIME_absolute_hton (expiration);
906   dm->key = *key;
907   memcpy (&dm[1], data, size);
908   process_queue (h);
909   return qe;
910 }
911
912
913 /**
914  * Reserve space in the datastore.  This function should be used
915  * to avoid "out of space" failures during a longer sequence of "put"
916  * operations (for example, when a file is being inserted).
917  *
918  * @param h handle to the datastore
919  * @param amount how much space (in bytes) should be reserved (for content only)
920  * @param entries how many entries will be created (to calculate per-entry overhead)
921  * @param queue_priority ranking of this request in the priority queue
922  * @param max_queue_size at what queue size should this request be dropped
923  *        (if other requests of higher priority are in the queue)
924  * @param timeout how long to wait at most for a response (or before dying in queue)
925  * @param cont continuation to call when done; "success" will be set to
926  *             a positive reservation value if space could be reserved.
927  * @param cont_cls closure for cont
928  * @return NULL if the entry was not queued, otherwise a handle that can be used to
929  *         cancel; note that even if NULL is returned, the callback will be invoked
930  *         (or rather, will already have been invoked)
931  */
932 struct GNUNET_DATASTORE_QueueEntry *
933 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
934                           uint64_t amount,
935                           uint32_t entries,
936                           unsigned int queue_priority,
937                           unsigned int max_queue_size,
938                           struct GNUNET_TIME_Relative timeout,
939                           GNUNET_DATASTORE_ContinuationWithStatus cont,
940                           void *cont_cls)
941 {
942   struct GNUNET_DATASTORE_QueueEntry *qe;
943   struct ReserveMessage *rm;
944   union QueueContext qc;
945
946   if (cont == NULL)
947     cont = &drop_status_cont;
948 #if DEBUG_DATASTORE
949   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
950               "Asked to reserve %llu bytes of data and %u entries\n",
951               (unsigned long long) amount, (unsigned int) entries);
952 #endif
953   qc.sc.cont = cont;
954   qc.sc.cont_cls = cont_cls;
955   qe = make_queue_entry (h, sizeof (struct ReserveMessage),
956                          queue_priority, max_queue_size, timeout,
957                          &process_status_message, &qc);
958   if (qe == NULL)
959   {
960 #if DEBUG_DATASTORE
961     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
962                 "Could not create queue entry to reserve\n");
963 #endif
964     return NULL;
965   }
966   GNUNET_STATISTICS_update (h->stats,
967                             gettext_noop ("# RESERVE requests executed"),
968                             1, GNUNET_NO);
969   rm = (struct ReserveMessage *) &qe[1];
970   rm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
971   rm->header.size = htons (sizeof (struct ReserveMessage));
972   rm->entries = htonl (entries);
973   rm->amount = GNUNET_htonll (amount);
974   process_queue (h);
975   return qe;
976 }
977
978
979 /**
980  * Signal that all of the data for which a reservation was made has
981  * been stored and that whatever excess space might have been reserved
982  * can now be released.
983  *
984  * @param h handle to the datastore
985  * @param rid reservation ID (value of "success" in original continuation
986  *        from the "reserve" function).
987  * @param queue_priority ranking of this request in the priority queue
988  * @param max_queue_size at what queue size should this request be dropped
989  *        (if other requests of higher priority are in the queue)
990  * @param queue_priority ranking of this request in the priority queue
991  * @param max_queue_size at what queue size should this request be dropped
992  *        (if other requests of higher priority are in the queue)
993  * @param timeout how long to wait at most for a response
994  * @param cont continuation to call when done
995  * @param cont_cls closure for cont
996  * @return NULL if the entry was not queued, otherwise a handle that can be used to
997  *         cancel; note that even if NULL is returned, the callback will be invoked
998  *         (or rather, will already have been invoked)
999  */
1000 struct GNUNET_DATASTORE_QueueEntry *
1001 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
1002                                   uint32_t rid,
1003                                   unsigned int queue_priority,
1004                                   unsigned int max_queue_size,
1005                                   struct GNUNET_TIME_Relative timeout,
1006                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
1007                                   void *cont_cls)
1008 {
1009   struct GNUNET_DATASTORE_QueueEntry *qe;
1010   struct ReleaseReserveMessage *rrm;
1011   union QueueContext qc;
1012
1013   if (cont == NULL)
1014     cont = &drop_status_cont;
1015 #if DEBUG_DATASTORE
1016   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asked to release reserve %d\n", rid);
1017 #endif
1018   qc.sc.cont = cont;
1019   qc.sc.cont_cls = cont_cls;
1020   qe = make_queue_entry (h, sizeof (struct ReleaseReserveMessage),
1021                          queue_priority, max_queue_size, timeout,
1022                          &process_status_message, &qc);
1023   if (qe == NULL)
1024   {
1025 #if DEBUG_DATASTORE
1026     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                 "Could not create queue entry to release reserve\n");
1028 #endif
1029     return NULL;
1030   }
1031   GNUNET_STATISTICS_update (h->stats,
1032                             gettext_noop
1033                             ("# RELEASE RESERVE requests executed"), 1,
1034                             GNUNET_NO);
1035   rrm = (struct ReleaseReserveMessage *) &qe[1];
1036   rrm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1037   rrm->header.size = htons (sizeof (struct ReleaseReserveMessage));
1038   rrm->rid = htonl (rid);
1039   process_queue (h);
1040   return qe;
1041 }
1042
1043
1044 /**
1045  * Update a value in the datastore.
1046  *
1047  * @param h handle to the datastore
1048  * @param uid identifier for the value
1049  * @param priority how much to increase the priority of the value
1050  * @param expiration new expiration value should be MAX of existing and this argument
1051  * @param queue_priority ranking of this request in the priority queue
1052  * @param max_queue_size at what queue size should this request be dropped
1053  *        (if other requests of higher priority are in the queue)
1054  * @param timeout how long to wait at most for a response
1055  * @param cont continuation to call when done
1056  * @param cont_cls closure for cont
1057  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1058  *         cancel; note that even if NULL is returned, the callback will be invoked
1059  *         (or rather, will already have been invoked)
1060  */
1061 struct GNUNET_DATASTORE_QueueEntry *
1062 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1063                          uint64_t uid,
1064                          uint32_t priority,
1065                          struct GNUNET_TIME_Absolute expiration,
1066                          unsigned int queue_priority,
1067                          unsigned int max_queue_size,
1068                          struct GNUNET_TIME_Relative timeout,
1069                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1070                          void *cont_cls)
1071 {
1072   struct GNUNET_DATASTORE_QueueEntry *qe;
1073   struct UpdateMessage *um;
1074   union QueueContext qc;
1075
1076   if (cont == NULL)
1077     cont = &drop_status_cont;
1078 #if DEBUG_DATASTORE
1079   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1081               uid,
1082               (unsigned int) priority,
1083               (unsigned long long) expiration.abs_value);
1084 #endif
1085   qc.sc.cont = cont;
1086   qc.sc.cont_cls = cont_cls;
1087   qe = make_queue_entry (h, sizeof (struct UpdateMessage),
1088                          queue_priority, max_queue_size, timeout,
1089                          &process_status_message, &qc);
1090   if (qe == NULL)
1091   {
1092 #if DEBUG_DATASTORE
1093     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1094                 "Could not create queue entry for UPDATE\n");
1095 #endif
1096     return NULL;
1097   }
1098   GNUNET_STATISTICS_update (h->stats,
1099                             gettext_noop ("# UPDATE requests executed"),
1100                             1, GNUNET_NO);
1101   um = (struct UpdateMessage *) &qe[1];
1102   um->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1103   um->header.size = htons (sizeof (struct UpdateMessage));
1104   um->priority = htonl (priority);
1105   um->expiration = GNUNET_TIME_absolute_hton (expiration);
1106   um->uid = GNUNET_htonll (uid);
1107   process_queue (h);
1108   return qe;
1109 }
1110
1111
1112 /**
1113  * Explicitly remove some content from the database.
1114  * The "cont"inuation will be called with status
1115  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1116  * if no matching entry was found and "GNUNET_SYSERR"
1117  * on all other types of errors.
1118  *
1119  * @param h handle to the datastore
1120  * @param key key for the value
1121  * @param size number of bytes in data
1122  * @param data content stored
1123  * @param queue_priority ranking of this request in the priority queue
1124  * @param max_queue_size at what queue size should this request be dropped
1125  *        (if other requests of higher priority are in the queue)
1126  * @param timeout how long to wait at most for a response
1127  * @param cont continuation to call when done
1128  * @param cont_cls closure for cont
1129  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1130  *         cancel; note that even if NULL is returned, the callback will be invoked
1131  *         (or rather, will already have been invoked)
1132  */
1133 struct GNUNET_DATASTORE_QueueEntry *
1134 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1135                          const GNUNET_HashCode * key,
1136                          size_t size,
1137                          const void *data,
1138                          unsigned int queue_priority,
1139                          unsigned int max_queue_size,
1140                          struct GNUNET_TIME_Relative timeout,
1141                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1142                          void *cont_cls)
1143 {
1144   struct GNUNET_DATASTORE_QueueEntry *qe;
1145   struct DataMessage *dm;
1146   size_t msize;
1147   union QueueContext qc;
1148
1149   if (cont == NULL)
1150     cont = &drop_status_cont;
1151 #if DEBUG_DATASTORE
1152   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1153               "Asked to remove %u bytes under key `%s'\n",
1154               size, GNUNET_h2s (key));
1155 #endif
1156   qc.sc.cont = cont;
1157   qc.sc.cont_cls = cont_cls;
1158   msize = sizeof (struct DataMessage) + size;
1159   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1160   qe = make_queue_entry (h, msize,
1161                          queue_priority, max_queue_size, timeout,
1162                          &process_status_message, &qc);
1163   if (qe == NULL)
1164   {
1165 #if DEBUG_DATASTORE
1166     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1167                 "Could not create queue entry for REMOVE\n");
1168 #endif
1169     return NULL;
1170   }
1171   GNUNET_STATISTICS_update (h->stats,
1172                             gettext_noop ("# REMOVE requests executed"),
1173                             1, GNUNET_NO);
1174   dm = (struct DataMessage *) &qe[1];
1175   dm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1176   dm->header.size = htons (msize);
1177   dm->rid = htonl (0);
1178   dm->size = htonl (size);
1179   dm->type = htonl (0);
1180   dm->priority = htonl (0);
1181   dm->anonymity = htonl (0);
1182   dm->uid = GNUNET_htonll (0);
1183   dm->expiration = GNUNET_TIME_absolute_hton (GNUNET_TIME_UNIT_ZERO_ABS);
1184   dm->key = *key;
1185   memcpy (&dm[1], data, size);
1186   process_queue (h);
1187   return qe;
1188 }
1189
1190
1191 /**
1192  * Type of a function to call when we receive a message
1193  * from the service.
1194  *
1195  * @param cls closure
1196  * @param msg message received, NULL on timeout or fatal error
1197  */
1198 static void
1199 process_result_message (void *cls, const struct GNUNET_MessageHeader *msg)
1200 {
1201   struct GNUNET_DATASTORE_Handle *h = cls;
1202   struct GNUNET_DATASTORE_QueueEntry *qe;
1203   struct ResultContext rc;
1204   const struct DataMessage *dm;
1205   int was_transmitted;
1206
1207   if (msg == NULL)
1208   {
1209     qe = h->queue_head;
1210     GNUNET_assert (NULL != qe);
1211     rc = qe->qc.rc;
1212     was_transmitted = qe->was_transmitted;
1213     free_queue_entry (qe);
1214     if (was_transmitted == GNUNET_YES)
1215     {
1216       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1217                   _("Failed to receive response from database.\n"));
1218       do_disconnect (h);
1219     }
1220     else
1221     {
1222       process_queue (h);
1223     }
1224     if (rc.proc != NULL)
1225       rc.proc (rc.proc_cls,
1226                NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1227     return;
1228   }
1229   if (ntohs (msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END)
1230   {
1231     GNUNET_break (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader));
1232     qe = h->queue_head;
1233     rc = qe->qc.rc;
1234     GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1235     free_queue_entry (qe);
1236 #if DEBUG_DATASTORE
1237     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1238                 "Received end of result set, new queue size is %u\n",
1239                 h->queue_size);
1240 #endif
1241     if (rc.proc != NULL)
1242       rc.proc (rc.proc_cls,
1243                NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1244     h->retry_time.rel_value = 0;
1245     h->result_count = 0;
1246     process_queue (h);
1247     return;
1248   }
1249   qe = h->queue_head;
1250   GNUNET_assert (NULL != qe);
1251   rc = qe->qc.rc;
1252   if (GNUNET_YES != qe->was_transmitted)
1253   {
1254     GNUNET_break (0);
1255     free_queue_entry (qe);
1256     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1257     do_disconnect (h);
1258     if (rc.proc != NULL)
1259       rc.proc (rc.proc_cls,
1260                NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1261     return;
1262   }
1263   if ((ntohs (msg->size) < sizeof (struct DataMessage)) ||
1264       (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1265       (ntohs (msg->size) !=
1266        sizeof (struct DataMessage) +
1267        ntohl (((const struct DataMessage *) msg)->size)))
1268   {
1269     GNUNET_break (0);
1270     free_queue_entry (qe);
1271     h->retry_time = GNUNET_TIME_UNIT_ZERO;
1272     do_disconnect (h);
1273     if (rc.proc != NULL)
1274       rc.proc (rc.proc_cls,
1275                NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
1276     return;
1277   }
1278   GNUNET_STATISTICS_update (h->stats,
1279                             gettext_noop ("# Results received"), 1, GNUNET_NO);
1280   dm = (const struct DataMessage *) msg;
1281 #if DEBUG_DATASTORE
1282   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283               "Received result %llu with type %u and size %u with key %s\n",
1284               (unsigned long long) GNUNET_ntohll (dm->uid),
1285               ntohl (dm->type), ntohl (dm->size), GNUNET_h2s (&dm->key));
1286 #endif
1287   free_queue_entry (qe);
1288   h->retry_time.rel_value = 0;
1289   process_queue (h);
1290   if (rc.proc != NULL)
1291     rc.proc (rc.proc_cls,
1292              &dm->key,
1293              ntohl (dm->size),
1294              &dm[1],
1295              ntohl (dm->type),
1296              ntohl (dm->priority),
1297              ntohl (dm->anonymity),
1298              GNUNET_TIME_absolute_ntoh (dm->expiration),
1299              GNUNET_ntohll (dm->uid));
1300 }
1301
1302
1303 /**
1304  * Get a random value from the datastore for content replication.
1305  * Returns a single, random value among those with the highest
1306  * replication score, lowering positive replication scores by one for
1307  * the chosen value (if only content with a replication score exists,
1308  * a random value is returned and replication scores are not changed).
1309  *
1310  * @param h handle to the datastore
1311  * @param queue_priority ranking of this request in the priority queue
1312  * @param max_queue_size at what queue size should this request be dropped
1313  *        (if other requests of higher priority are in the queue)
1314  * @param timeout how long to wait at most for a response
1315  * @param proc function to call on a random value; it
1316  *        will be called once with a value (if available)
1317  *        and always once with a value of NULL.
1318  * @param proc_cls closure for proc
1319  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1320  *         cancel
1321  */
1322 struct GNUNET_DATASTORE_QueueEntry *
1323 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1324                                       unsigned int queue_priority,
1325                                       unsigned int max_queue_size,
1326                                       struct GNUNET_TIME_Relative timeout,
1327                                       GNUNET_DATASTORE_DatumProcessor proc,
1328                                       void *proc_cls)
1329 {
1330   struct GNUNET_DATASTORE_QueueEntry *qe;
1331   struct GNUNET_MessageHeader *m;
1332   union QueueContext qc;
1333
1334   GNUNET_assert (NULL != proc);
1335 #if DEBUG_DATASTORE
1336   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1337               "Asked to get replication entry in %llu ms\n",
1338               (unsigned long long) timeout.rel_value);
1339 #endif
1340   qc.rc.proc = proc;
1341   qc.rc.proc_cls = proc_cls;
1342   qe = make_queue_entry (h, sizeof (struct GNUNET_MessageHeader),
1343                          queue_priority, max_queue_size, timeout,
1344                          &process_result_message, &qc);
1345   if (qe == NULL)
1346   {
1347 #if DEBUG_DATASTORE
1348     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349                 "Could not create queue entry for GET REPLICATION\n");
1350 #endif
1351     return NULL;
1352   }
1353   GNUNET_STATISTICS_update (h->stats,
1354                             gettext_noop
1355                             ("# GET REPLICATION requests executed"), 1,
1356                             GNUNET_NO);
1357   m = (struct GNUNET_MessageHeader *) &qe[1];
1358   m->type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION);
1359   m->size = htons (sizeof (struct GNUNET_MessageHeader));
1360   process_queue (h);
1361   return qe;
1362 }
1363
1364
1365 /**
1366  * Get a single zero-anonymity value from the datastore.
1367  *
1368  * @param h handle to the datastore
1369  * @param offset offset of the result (modulo num-results); set to
1370  *               a random 64-bit value initially; then increment by
1371  *               one each time; detect that all results have been found by uid
1372  *               being again the first uid ever returned.
1373  * @param queue_priority ranking of this request in the priority queue
1374  * @param max_queue_size at what queue size should this request be dropped
1375  *        (if other requests of higher priority are in the queue)
1376  * @param timeout how long to wait at most for a response
1377  * @param type allowed type for the operation (never zero)
1378  * @param proc function to call on a random value; it
1379  *        will be called once with a value (if available)
1380  *        or with NULL if none value exists.
1381  * @param proc_cls closure for proc
1382  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1383  *         cancel
1384  */
1385 struct GNUNET_DATASTORE_QueueEntry *
1386 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1387                                      uint64_t offset,
1388                                      unsigned int queue_priority,
1389                                      unsigned int max_queue_size,
1390                                      struct GNUNET_TIME_Relative timeout,
1391                                      enum GNUNET_BLOCK_Type type,
1392                                      GNUNET_DATASTORE_DatumProcessor proc,
1393                                      void *proc_cls)
1394 {
1395   struct GNUNET_DATASTORE_QueueEntry *qe;
1396   struct GetZeroAnonymityMessage *m;
1397   union QueueContext qc;
1398
1399   GNUNET_assert (NULL != proc);
1400   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
1401 #if DEBUG_DATASTORE
1402   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403               "Asked to get %llu-th zero-anonymity entry of type %d in %llu ms\n",
1404               (unsigned long long) offset,
1405               type, (unsigned long long) timeout.rel_value);
1406 #endif
1407   qc.rc.proc = proc;
1408   qc.rc.proc_cls = proc_cls;
1409   qe = make_queue_entry (h, sizeof (struct GetZeroAnonymityMessage),
1410                          queue_priority, max_queue_size, timeout,
1411                          &process_result_message, &qc);
1412   if (qe == NULL)
1413   {
1414 #if DEBUG_DATASTORE
1415     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1416                 "Could not create queue entry for zero-anonymity procation\n");
1417 #endif
1418     return NULL;
1419   }
1420   GNUNET_STATISTICS_update (h->stats,
1421                             gettext_noop
1422                             ("# GET ZERO ANONYMITY requests executed"), 1,
1423                             GNUNET_NO);
1424   m = (struct GetZeroAnonymityMessage *) &qe[1];
1425   m->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1426   m->header.size = htons (sizeof (struct GetZeroAnonymityMessage));
1427   m->type = htonl ((uint32_t) type);
1428   m->offset = GNUNET_htonll (offset);
1429   process_queue (h);
1430   return qe;
1431 }
1432
1433
1434 /**
1435  * Get a result for a particular key from the datastore.  The processor
1436  * will only be called once.
1437  *
1438  * @param h handle to the datastore
1439  * @param offset offset of the result (modulo num-results); set to
1440  *               a random 64-bit value initially; then increment by
1441  *               one each time; detect that all results have been found by uid
1442  *               being again the first uid ever returned.
1443  * @param key maybe NULL (to match all entries)
1444  * @param type desired type, 0 for any
1445  * @param queue_priority ranking of this request in the priority queue
1446  * @param max_queue_size at what queue size should this request be dropped
1447  *        (if other requests of higher priority are in the queue)
1448  * @param timeout how long to wait at most for a response
1449  * @param proc function to call on each matching value;
1450  *        will be called once with a NULL value at the end
1451  * @param proc_cls closure for proc
1452  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1453  *         cancel
1454  */
1455 struct GNUNET_DATASTORE_QueueEntry *
1456 GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
1457                           uint64_t offset,
1458                           const GNUNET_HashCode * key,
1459                           enum GNUNET_BLOCK_Type type,
1460                           unsigned int queue_priority,
1461                           unsigned int max_queue_size,
1462                           struct GNUNET_TIME_Relative timeout,
1463                           GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
1464 {
1465   struct GNUNET_DATASTORE_QueueEntry *qe;
1466   struct GetMessage *gm;
1467   union QueueContext qc;
1468
1469   GNUNET_assert (NULL != proc);
1470 #if DEBUG_DATASTORE
1471   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472               "Asked to look for data of type %u under key `%s'\n",
1473               (unsigned int) type, GNUNET_h2s (key));
1474 #endif
1475   qc.rc.proc = proc;
1476   qc.rc.proc_cls = proc_cls;
1477   qe = make_queue_entry (h, sizeof (struct GetMessage),
1478                          queue_priority, max_queue_size, timeout,
1479                          &process_result_message, &qc);
1480   if (qe == NULL)
1481   {
1482 #if DEBUG_DATASTORE
1483     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1484                 "Could not queue request for `%s'\n", GNUNET_h2s (key));
1485 #endif
1486     return NULL;
1487   }
1488   GNUNET_STATISTICS_update (h->stats,
1489                             gettext_noop ("# GET requests executed"),
1490                             1, GNUNET_NO);
1491   gm = (struct GetMessage *) &qe[1];
1492   gm->header.type = htons (GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1493   gm->type = htonl (type);
1494   gm->offset = GNUNET_htonll (offset);
1495   if (key != NULL)
1496   {
1497     gm->header.size = htons (sizeof (struct GetMessage));
1498     gm->key = *key;
1499   }
1500   else
1501   {
1502     gm->header.size =
1503         htons (sizeof (struct GetMessage) - sizeof (GNUNET_HashCode));
1504   }
1505   process_queue (h);
1506   return qe;
1507 }
1508
1509
1510 /**
1511  * Cancel a datastore operation.  The final callback from the
1512  * operation must not have been done yet.
1513  * 
1514  * @param qe operation to cancel
1515  */
1516 void
1517 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1518 {
1519   struct GNUNET_DATASTORE_Handle *h;
1520
1521   GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
1522   h = qe->h;
1523 #if DEBUG_DATASTORE
1524   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525               "Pending DATASTORE request %p cancelled (%d, %d)\n",
1526               qe, qe->was_transmitted, h->queue_head == qe);
1527 #endif
1528   if (GNUNET_YES == qe->was_transmitted)
1529   {
1530     free_queue_entry (qe);
1531     h->skip_next_messages++;
1532     return;
1533   }
1534   free_queue_entry (qe);
1535   process_queue (h);
1536 }
1537
1538
1539 /* end of datastore_api.c */