cf6a2cc8e0a44bf9ced5eed25f2c8dd2d4bd64ff
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int32_t was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184
185   /**
186    * Current connection to the datastore service.
187    */
188   struct GNUNET_CLIENT_Connection *client;
189
190   /**
191    * Handle for statistics.
192    */
193   struct GNUNET_STATISTICS_Handle *stats;
194
195   /**
196    * Current transmit handle.
197    */
198   struct GNUNET_CLIENT_TransmitHandle *th;
199
200   /**
201    * Current head of priority queue.
202    */
203   struct GNUNET_DATASTORE_QueueEntry *queue_head;
204
205   /**
206    * Current tail of priority queue.
207    */
208   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
209
210   /**
211    * Task for trying to reconnect.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
214
215   /**
216    * How quickly should we retry?  Used for exponential back-off on
217    * connect-errors.
218    */
219   struct GNUNET_TIME_Relative retry_time;
220
221   /**
222    * Number of entries in the queue.
223    */
224   unsigned int queue_size;
225
226   /**
227    * Number of results we're receiving for the current query
228    * after application stopped to care.  Used to determine when
229    * to reset the connection.
230    */
231   unsigned int result_count;
232
233   /**
234    * Are we currently trying to receive from the service?
235    */
236   int in_receive;
237
238 };
239
240
241
242 /**
243  * Connect to the datastore service.
244  *
245  * @param cfg configuration to use
246  * @return handle to use to access the service
247  */
248 struct GNUNET_DATASTORE_Handle *
249 GNUNET_DATASTORE_connect (const struct
250                           GNUNET_CONFIGURATION_Handle
251                           *cfg)
252 {
253   struct GNUNET_CLIENT_Connection *c;
254   struct GNUNET_DATASTORE_Handle *h;
255   
256   c = GNUNET_CLIENT_connect ("datastore", cfg);
257   if (c == NULL)
258     return NULL; /* oops */
259   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
260                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
261   h->client = c;
262   h->cfg = cfg;
263   h->stats = GNUNET_STATISTICS_create ("datastore-api",
264                                        cfg);
265   return h;
266 }
267
268
269 /**
270  * Transmit DROP message to datastore service.
271  *
272  * @param cls the 'struct GNUNET_DATASTORE_Handle'
273  * @param size number of bytes that can be copied to buf
274  * @param buf where to copy the drop message
275  * @return number of bytes written to buf
276  */
277 static size_t
278 transmit_drop (void *cls,
279                size_t size, 
280                void *buf)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283   struct GNUNET_MessageHeader *hdr;
284   
285   if (buf == NULL)
286     {
287       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
288                   _("Failed to transmit request to drop database.\n"));
289       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290       return 0;
291     }
292   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
293   hdr = buf;
294   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
295   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
296   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
297   return sizeof(struct GNUNET_MessageHeader);
298 }
299
300
301 /**
302  * Disconnect from the datastore service (and free
303  * associated resources).
304  *
305  * @param h handle to the datastore
306  * @param drop set to GNUNET_YES to delete all data in datastore (!)
307  */
308 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
309                                   int drop)
310 {
311   struct GNUNET_DATASTORE_QueueEntry *qe;
312
313   if (h->client != NULL)
314     {
315       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
316       h->client = NULL;
317     }
318   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
319     {
320       GNUNET_SCHEDULER_cancel (h->reconnect_task);
321       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
322     }
323   while (NULL != (qe = h->queue_head))
324     {
325       GNUNET_assert (NULL != qe->response_proc);
326       qe->response_proc (qe, NULL);
327     }
328   if (GNUNET_YES == drop) 
329     {
330       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
331       if (h->client != NULL)
332         {
333           if (NULL != 
334               GNUNET_CLIENT_notify_transmit_ready (h->client,
335                                                    sizeof(struct GNUNET_MessageHeader),
336                                                    GNUNET_TIME_UNIT_MINUTES,
337                                                    GNUNET_YES,
338                                                    &transmit_drop,
339                                                    h))
340             return;
341           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
342         }
343       GNUNET_break (0);
344     }
345   GNUNET_STATISTICS_destroy (h->stats,
346                              GNUNET_NO);
347   GNUNET_free (h);
348 }
349
350
351 /**
352  * A request has timed out (before being transmitted to the service).
353  *
354  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
355  * @param tc scheduler context
356  */
357 static void
358 timeout_queue_entry (void *cls,
359                      const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
362
363   GNUNET_STATISTICS_update (qe->h->stats,
364                             gettext_noop ("# queue entry timeouts"),
365                             1,
366                             GNUNET_NO);
367   qe->task = GNUNET_SCHEDULER_NO_TASK;
368   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
369   qe->response_proc (qe, NULL);
370 }
371
372
373 /**
374  * Create a new entry for our priority queue (and possibly discard other entires if
375  * the queue is getting too long).
376  *
377  * @param h handle to the datastore
378  * @param msize size of the message to queue
379  * @param queue_priority priority of the entry
380  * @param max_queue_size at what queue size should this request be dropped
381  *        (if other requests of higher priority are in the queue)
382  * @param timeout timeout for the operation
383  * @param response_proc function to call with replies (can be NULL)
384  * @param qc client context (NOT a closure for response_proc)
385  * @return NULL if the queue is full (and this entry was dropped)
386  */
387 static struct GNUNET_DATASTORE_QueueEntry *
388 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
389                   size_t msize,
390                   unsigned int queue_priority,
391                   unsigned int max_queue_size,
392                   struct GNUNET_TIME_Relative timeout,
393                   GNUNET_CLIENT_MessageHandler response_proc,            
394                   const union QueueContext *qc)
395 {
396   struct GNUNET_DATASTORE_QueueEntry *ret;
397   struct GNUNET_DATASTORE_QueueEntry *pos;
398   unsigned int c;
399
400   c = 0;
401   pos = h->queue_head;
402   while ( (pos != NULL) &&
403           (c < max_queue_size) &&
404           (pos->priority >= queue_priority) )
405     {
406       c++;
407       pos = pos->next;
408     }
409   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
410   ret->h = h;
411   ret->response_proc = response_proc;
412   ret->qc = *qc;
413   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
414   ret->priority = queue_priority;
415   ret->max_queue = max_queue_size;
416   ret->message_size = msize;
417   ret->was_transmitted = GNUNET_NO;
418   if (pos == NULL)
419     {
420       /* append at the tail */
421       pos = h->queue_tail;
422     }
423   else
424     {
425       pos = pos->prev; 
426       /* do not insert at HEAD if HEAD query was already
427          transmitted and we are still receiving replies! */
428       if ( (pos == NULL) &&
429            (h->queue_head->was_transmitted) )
430         pos = h->queue_head;
431     }
432   c++;
433   GNUNET_STATISTICS_update (h->stats,
434                             gettext_noop ("# queue entries created"),
435                             1,
436                             GNUNET_NO);
437   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
438                                      h->queue_tail,
439                                      pos,
440                                      ret);
441   h->queue_size++;
442   if (c > max_queue_size)
443     {
444       GNUNET_STATISTICS_update (h->stats,
445                                 gettext_noop ("# queue overflows"),
446                                 1,
447                                 GNUNET_NO);
448       response_proc (ret, NULL);
449       return NULL;
450     }
451   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
452                                             &timeout_queue_entry,
453                                             ret);
454   pos = ret->next;
455   while (pos != NULL) 
456     {
457       if (pos->max_queue < h->queue_size)
458         {
459           GNUNET_assert (pos->response_proc != NULL);
460           pos->response_proc (pos, NULL);
461           break;
462         }
463       pos = pos->next;
464     }
465   return ret;
466 }
467
468
469 /**
470  * Process entries in the queue (or do nothing if we are already
471  * doing so).
472  * 
473  * @param h handle to the datastore
474  */
475 static void
476 process_queue (struct GNUNET_DATASTORE_Handle *h);
477
478
479 /**
480  * Try reconnecting to the datastore service.
481  *
482  * @param cls the 'struct GNUNET_DATASTORE_Handle'
483  * @param tc scheduler context
484  */
485 static void
486 try_reconnect (void *cls,
487                const struct GNUNET_SCHEDULER_TaskContext *tc)
488 {
489   struct GNUNET_DATASTORE_Handle *h = cls;
490
491   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
492     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
493   else
494     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
495   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
496     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
497   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
498   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
499   if (h->client == NULL)
500     {
501       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
502                   "DATASTORE reconnect failed (fatally)\n");
503       return;
504     }
505   GNUNET_STATISTICS_update (h->stats,
506                             gettext_noop ("# datastore connections (re)created"),
507                             1,
508                             GNUNET_NO);
509 #if DEBUG_DATASTORE
510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511               "Reconnected to DATASTORE\n");
512 #endif
513   process_queue (h);
514 }
515
516
517 /**
518  * Disconnect from the service and then try reconnecting to the datastore service
519  * after some delay.
520  *
521  * @param h handle to datastore to disconnect and reconnect
522  */
523 static void
524 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
525 {
526   if (h->client == NULL)
527     {
528 #if DEBUG_DATASTORE
529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                   "client NULL in disconnect, will not try to reconnect\n");
531 #endif
532       return;
533     }
534 #if 0
535   GNUNET_STATISTICS_update (stats,
536                             gettext_noop ("# reconnected to DATASTORE"),
537                             1,
538                             GNUNET_NO);
539 #endif
540   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
541   h->client = NULL;
542   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
543                                                     &try_reconnect,
544                                                     h);      
545 }
546
547
548 /**
549  * Transmit request from queue to datastore service.
550  *
551  * @param cls the 'struct GNUNET_DATASTORE_Handle'
552  * @param size number of bytes that can be copied to buf
553  * @param buf where to copy the drop message
554  * @return number of bytes written to buf
555  */
556 static size_t
557 transmit_request (void *cls,
558                   size_t size, 
559                   void *buf)
560 {
561   struct GNUNET_DATASTORE_Handle *h = cls;
562   struct GNUNET_DATASTORE_QueueEntry *qe;
563   size_t msize;
564
565   h->th = NULL;
566   if (NULL == (qe = h->queue_head))
567     return 0; /* no entry in queue */
568   if (buf == NULL)
569     {
570       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
571                   _("Failed to transmit request to DATASTORE.\n"));
572       GNUNET_STATISTICS_update (h->stats,
573                                 gettext_noop ("# transmission request failures"),
574                                 1,
575                                 GNUNET_NO);
576       do_disconnect (h);
577       return 0;
578     }
579   if (size < (msize = qe->message_size))
580     {
581       process_queue (h);
582       return 0;
583     }
584  #if DEBUG_DATASTORE
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "Transmitting %u byte request to DATASTORE\n",
587               msize);
588 #endif
589   memcpy (buf, &qe[1], msize);
590   qe->was_transmitted = GNUNET_YES;
591   GNUNET_SCHEDULER_cancel (qe->task);
592   qe->task = GNUNET_SCHEDULER_NO_TASK;
593   h->in_receive = GNUNET_YES;
594   GNUNET_CLIENT_receive (h->client,
595                          qe->response_proc,
596                          qe,
597                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
598   GNUNET_STATISTICS_update (h->stats,
599                             gettext_noop ("# bytes sent to datastore"),
600                             1,
601                             GNUNET_NO);
602   return msize;
603 }
604
605
606 /**
607  * Process entries in the queue (or do nothing if we are already
608  * doing so).
609  * 
610  * @param h handle to the datastore
611  */
612 static void
613 process_queue (struct GNUNET_DATASTORE_Handle *h)
614 {
615   struct GNUNET_DATASTORE_QueueEntry *qe;
616
617   if (NULL == (qe = h->queue_head))
618     {
619 #if DEBUG_DATASTORE > 1
620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621                   "Queue empty\n");
622 #endif
623       return; /* no entry in queue */
624     }
625   if (qe->was_transmitted == GNUNET_YES)
626     {
627 #if DEBUG_DATASTORE > 1
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                   "Head request already transmitted\n");
630 #endif
631       return; /* waiting for replies */
632     }
633   if (h->th != NULL)
634     {
635 #if DEBUG_DATASTORE > 1
636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637                   "Pending transmission request\n");
638 #endif
639       return; /* request pending */
640     }
641   if (h->client == NULL)
642     {
643 #if DEBUG_DATASTORE > 1
644       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645                   "Not connected\n");
646 #endif
647       return; /* waiting for reconnect */
648     }
649 #if DEBUG_DATASTORE
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Queueing %u byte request to DATASTORE\n",
652               qe->message_size);
653 #endif
654   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
655                                                qe->message_size,
656                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
657                                                GNUNET_YES,
658                                                &transmit_request,
659                                                h);
660 }
661
662
663 /**
664  * Dummy continuation used to do nothing (but be non-zero).
665  *
666  * @param cls closure
667  * @param result result 
668  * @param emsg error message
669  */
670 static void
671 drop_status_cont (void *cls, int result, const char *emsg)
672 {
673   /* do nothing */
674 }
675
676
677 static void
678 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
679 {
680   struct GNUNET_DATASTORE_Handle *h = qe->h;
681
682   GNUNET_CONTAINER_DLL_remove (h->queue_head,
683                                h->queue_tail,
684                                qe);
685   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
686     {
687       GNUNET_SCHEDULER_cancel (qe->task);
688       qe->task = GNUNET_SCHEDULER_NO_TASK;
689     }
690   h->queue_size--;
691   GNUNET_free (qe);
692 }
693
694 /**
695  * Type of a function to call when we receive a message
696  * from the service.
697  *
698  * @param cls closure
699  * @param msg message received, NULL on timeout or fatal error
700  */
701 static void 
702 process_status_message (void *cls,
703                         const struct
704                         GNUNET_MessageHeader * msg)
705 {
706   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
707   struct GNUNET_DATASTORE_Handle *h = qe->h;
708   struct StatusContext rc = qe->qc.sc;
709   const struct StatusMessage *sm;
710   const char *emsg;
711   int32_t status;
712   int was_transmitted;
713
714   h->in_receive = GNUNET_NO;
715   was_transmitted = qe->was_transmitted;
716   if (msg == NULL)
717     {      
718       free_queue_entry (qe);
719       if (NULL == h->client)
720         return; /* forced disconnect */
721       if (rc.cont != NULL)
722         rc.cont (rc.cont_cls, 
723                  GNUNET_SYSERR,
724                  _("Failed to receive status response from database."));
725       if (was_transmitted == GNUNET_YES)
726         do_disconnect (h);
727       return;
728     }
729   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
730   GNUNET_assert (h->queue_head == qe);
731   free_queue_entry (qe);
732   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
733        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
734     {
735       GNUNET_break (0);
736       h->retry_time = GNUNET_TIME_UNIT_ZERO;
737       do_disconnect (h);
738       if (rc.cont != NULL)
739         rc.cont (rc.cont_cls, 
740                  GNUNET_SYSERR,
741                  _("Error reading response from datastore service"));
742       return;
743     }
744   sm = (const struct StatusMessage*) msg;
745   status = ntohl(sm->status);
746   emsg = NULL;
747   if (ntohs(msg->size) > sizeof(struct StatusMessage))
748     {
749       emsg = (const char*) &sm[1];
750       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
751         {
752           GNUNET_break (0);
753           emsg = _("Invalid error message received from datastore service");
754         }
755     }  
756   if ( (status == GNUNET_SYSERR) &&
757        (emsg == NULL) )
758     {
759       GNUNET_break (0);
760       emsg = _("Invalid error message received from datastore service");
761     }
762 #if DEBUG_DATASTORE
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "Received status %d/%s\n",
765               (int) status,
766               emsg);
767 #endif
768   GNUNET_STATISTICS_update (h->stats,
769                             gettext_noop ("# status messages received"),
770                             1,
771                             GNUNET_NO);
772   h->retry_time.rel_value = 0;
773   process_queue (h);
774   if (rc.cont != NULL)
775     rc.cont (rc.cont_cls, 
776              status,
777              emsg);
778 }
779
780
781 /**
782  * Store an item in the datastore.  If the item is already present,
783  * the priorities are summed up and the higher expiration time and
784  * lower anonymity level is used.
785  *
786  * @param h handle to the datastore
787  * @param rid reservation ID to use (from "reserve"); use 0 if no
788  *            prior reservation was made
789  * @param key key for the value
790  * @param size number of bytes in data
791  * @param data content stored
792  * @param type type of the content
793  * @param priority priority of the content
794  * @param anonymity anonymity-level for the content
795  * @param expiration expiration time for the content
796  * @param queue_priority ranking of this request in the priority queue
797  * @param max_queue_size at what queue size should this request be dropped
798  *        (if other requests of higher priority are in the queue)
799  * @param timeout timeout for the operation
800  * @param cont continuation to call when done
801  * @param cont_cls closure for cont
802  * @return NULL if the entry was not queued, otherwise a handle that can be used to
803  *         cancel; note that even if NULL is returned, the callback will be invoked
804  *         (or rather, will already have been invoked)
805  */
806 struct GNUNET_DATASTORE_QueueEntry *
807 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
808                       int rid,
809                       const GNUNET_HashCode * key,
810                       size_t size,
811                       const void *data,
812                       enum GNUNET_BLOCK_Type type,
813                       uint32_t priority,
814                       uint32_t anonymity,
815                       struct GNUNET_TIME_Absolute expiration,
816                       unsigned int queue_priority,
817                       unsigned int max_queue_size,
818                       struct GNUNET_TIME_Relative timeout,
819                       GNUNET_DATASTORE_ContinuationWithStatus cont,
820                       void *cont_cls)
821 {
822   struct GNUNET_DATASTORE_QueueEntry *qe;
823   struct DataMessage *dm;
824   size_t msize;
825   union QueueContext qc;
826
827 #if DEBUG_DATASTORE
828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
829               "Asked to put %u bytes of data under key `%s'\n",
830               size,
831               GNUNET_h2s (key));
832 #endif
833   msize = sizeof(struct DataMessage) + size;
834   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
835   qc.sc.cont = cont;
836   qc.sc.cont_cls = cont_cls;
837   qe = make_queue_entry (h, msize,
838                          queue_priority, max_queue_size, timeout,
839                          &process_status_message, &qc);
840   if (qe == NULL)
841     {
842 #if DEBUG_DATASTORE
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                   "Could not create queue entry for PUT\n");
845 #endif
846       return NULL;
847     }
848   GNUNET_STATISTICS_update (h->stats,
849                             gettext_noop ("# PUT requests executed"),
850                             1,
851                             GNUNET_NO);
852   dm = (struct DataMessage* ) &qe[1];
853   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
854   dm->header.size = htons(msize);
855   dm->rid = htonl(rid);
856   dm->size = htonl( (uint32_t) size);
857   dm->type = htonl(type);
858   dm->priority = htonl(priority);
859   dm->anonymity = htonl(anonymity);
860   dm->uid = GNUNET_htonll(0);
861   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
862   dm->key = *key;
863   memcpy (&dm[1], data, size);
864   process_queue (h);
865   return qe;
866 }
867
868
869 /**
870  * Reserve space in the datastore.  This function should be used
871  * to avoid "out of space" failures during a longer sequence of "put"
872  * operations (for example, when a file is being inserted).
873  *
874  * @param h handle to the datastore
875  * @param amount how much space (in bytes) should be reserved (for content only)
876  * @param entries how many entries will be created (to calculate per-entry overhead)
877  * @param queue_priority ranking of this request in the priority queue
878  * @param max_queue_size at what queue size should this request be dropped
879  *        (if other requests of higher priority are in the queue)
880  * @param timeout how long to wait at most for a response (or before dying in queue)
881  * @param cont continuation to call when done; "success" will be set to
882  *             a positive reservation value if space could be reserved.
883  * @param cont_cls closure for cont
884  * @return NULL if the entry was not queued, otherwise a handle that can be used to
885  *         cancel; note that even if NULL is returned, the callback will be invoked
886  *         (or rather, will already have been invoked)
887  */
888 struct GNUNET_DATASTORE_QueueEntry *
889 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
890                           uint64_t amount,
891                           uint32_t entries,
892                           unsigned int queue_priority,
893                           unsigned int max_queue_size,
894                           struct GNUNET_TIME_Relative timeout,
895                           GNUNET_DATASTORE_ContinuationWithStatus cont,
896                           void *cont_cls)
897 {
898   struct GNUNET_DATASTORE_QueueEntry *qe;
899   struct ReserveMessage *rm;
900   union QueueContext qc;
901
902   if (cont == NULL)
903     cont = &drop_status_cont;
904 #if DEBUG_DATASTORE
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "Asked to reserve %llu bytes of data and %u entries'\n",
907               (unsigned long long) amount,
908               (unsigned int) entries);
909 #endif
910   qc.sc.cont = cont;
911   qc.sc.cont_cls = cont_cls;
912   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
913                          queue_priority, max_queue_size, timeout,
914                          &process_status_message, &qc);
915   if (qe == NULL)
916     {
917 #if DEBUG_DATASTORE
918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
919                   "Could not create queue entry to reserve\n");
920 #endif
921       return NULL;
922     }
923   GNUNET_STATISTICS_update (h->stats,
924                             gettext_noop ("# RESERVE requests executed"),
925                             1,
926                             GNUNET_NO);
927   rm = (struct ReserveMessage*) &qe[1];
928   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
929   rm->header.size = htons(sizeof (struct ReserveMessage));
930   rm->entries = htonl(entries);
931   rm->amount = GNUNET_htonll(amount);
932   process_queue (h);
933   return qe;
934 }
935
936
937 /**
938  * Signal that all of the data for which a reservation was made has
939  * been stored and that whatever excess space might have been reserved
940  * can now be released.
941  *
942  * @param h handle to the datastore
943  * @param rid reservation ID (value of "success" in original continuation
944  *        from the "reserve" function).
945  * @param queue_priority ranking of this request in the priority queue
946  * @param max_queue_size at what queue size should this request be dropped
947  *        (if other requests of higher priority are in the queue)
948  * @param queue_priority ranking of this request in the priority queue
949  * @param max_queue_size at what queue size should this request be dropped
950  *        (if other requests of higher priority are in the queue)
951  * @param timeout how long to wait at most for a response
952  * @param cont continuation to call when done
953  * @param cont_cls closure for cont
954  * @return NULL if the entry was not queued, otherwise a handle that can be used to
955  *         cancel; note that even if NULL is returned, the callback will be invoked
956  *         (or rather, will already have been invoked)
957  */
958 struct GNUNET_DATASTORE_QueueEntry *
959 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
960                                   int rid,
961                                   unsigned int queue_priority,
962                                   unsigned int max_queue_size,
963                                   struct GNUNET_TIME_Relative timeout,
964                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
965                                   void *cont_cls)
966 {
967   struct GNUNET_DATASTORE_QueueEntry *qe;
968   struct ReleaseReserveMessage *rrm;
969   union QueueContext qc;
970
971   if (cont == NULL)
972     cont = &drop_status_cont;
973 #if DEBUG_DATASTORE
974   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
975               "Asked to release reserve %d\n",
976               rid);
977 #endif
978   qc.sc.cont = cont;
979   qc.sc.cont_cls = cont_cls;
980   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
981                          queue_priority, max_queue_size, timeout,
982                          &process_status_message, &qc);
983   if (qe == NULL)
984     {
985 #if DEBUG_DATASTORE
986       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
987                   "Could not create queue entry to release reserve\n");
988 #endif
989       return NULL;
990     }
991   GNUNET_STATISTICS_update (h->stats,
992                             gettext_noop ("# RELEASE RESERVE requests executed"),
993                             1,
994                             GNUNET_NO);
995   rrm = (struct ReleaseReserveMessage*) &qe[1];
996   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
997   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
998   rrm->rid = htonl(rid);
999   process_queue (h);
1000   return qe;
1001 }
1002
1003
1004 /**
1005  * Update a value in the datastore.
1006  *
1007  * @param h handle to the datastore
1008  * @param uid identifier for the value
1009  * @param priority how much to increase the priority of the value
1010  * @param expiration new expiration value should be MAX of existing and this argument
1011  * @param queue_priority ranking of this request in the priority queue
1012  * @param max_queue_size at what queue size should this request be dropped
1013  *        (if other requests of higher priority are in the queue)
1014  * @param timeout how long to wait at most for a response
1015  * @param cont continuation to call when done
1016  * @param cont_cls closure for cont
1017  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1018  *         cancel; note that even if NULL is returned, the callback will be invoked
1019  *         (or rather, will already have been invoked)
1020  */
1021 struct GNUNET_DATASTORE_QueueEntry *
1022 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1023                          unsigned long long uid,
1024                          uint32_t priority,
1025                          struct GNUNET_TIME_Absolute expiration,
1026                          unsigned int queue_priority,
1027                          unsigned int max_queue_size,
1028                          struct GNUNET_TIME_Relative timeout,
1029                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1030                          void *cont_cls)
1031 {
1032   struct GNUNET_DATASTORE_QueueEntry *qe;
1033   struct UpdateMessage *um;
1034   union QueueContext qc;
1035
1036   if (cont == NULL)
1037     cont = &drop_status_cont;
1038 #if DEBUG_DATASTORE
1039   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1040               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1041               uid,
1042               (unsigned int) priority,
1043               (unsigned long long) expiration.abs_value);
1044 #endif
1045   qc.sc.cont = cont;
1046   qc.sc.cont_cls = cont_cls;
1047   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1048                          queue_priority, max_queue_size, timeout,
1049                          &process_status_message, &qc);
1050   if (qe == NULL)
1051     {
1052 #if DEBUG_DATASTORE
1053       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1054                   "Could not create queue entry for UPDATE\n");
1055 #endif
1056       return NULL;
1057     }
1058   GNUNET_STATISTICS_update (h->stats,
1059                             gettext_noop ("# UPDATE requests executed"),
1060                             1,
1061                             GNUNET_NO);
1062   um = (struct UpdateMessage*) &qe[1];
1063   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1064   um->header.size = htons(sizeof (struct UpdateMessage));
1065   um->priority = htonl(priority);
1066   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1067   um->uid = GNUNET_htonll(uid);
1068   process_queue (h);
1069   return qe;
1070 }
1071
1072
1073 /**
1074  * Explicitly remove some content from the database.
1075  * The "cont"inuation will be called with status
1076  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1077  * if no matching entry was found and "GNUNET_SYSERR"
1078  * on all other types of errors.
1079  *
1080  * @param h handle to the datastore
1081  * @param key key for the value
1082  * @param size number of bytes in data
1083  * @param data content stored
1084  * @param queue_priority ranking of this request in the priority queue
1085  * @param max_queue_size at what queue size should this request be dropped
1086  *        (if other requests of higher priority are in the queue)
1087  * @param timeout how long to wait at most for a response
1088  * @param cont continuation to call when done
1089  * @param cont_cls closure for cont
1090  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1091  *         cancel; note that even if NULL is returned, the callback will be invoked
1092  *         (or rather, will already have been invoked)
1093  */
1094 struct GNUNET_DATASTORE_QueueEntry *
1095 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1096                          const GNUNET_HashCode *key,
1097                          size_t size, 
1098                          const void *data,
1099                          unsigned int queue_priority,
1100                          unsigned int max_queue_size,
1101                          struct GNUNET_TIME_Relative timeout,
1102                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1103                          void *cont_cls)
1104 {
1105   struct GNUNET_DATASTORE_QueueEntry *qe;
1106   struct DataMessage *dm;
1107   size_t msize;
1108   union QueueContext qc;
1109
1110   if (cont == NULL)
1111     cont = &drop_status_cont;
1112 #if DEBUG_DATASTORE
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Asked to remove %u bytes under key `%s'\n",
1115               size,
1116               GNUNET_h2s (key));
1117 #endif
1118   qc.sc.cont = cont;
1119   qc.sc.cont_cls = cont_cls;
1120   msize = sizeof(struct DataMessage) + size;
1121   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1122   qe = make_queue_entry (h, msize,
1123                          queue_priority, max_queue_size, timeout,
1124                          &process_status_message, &qc);
1125   if (qe == NULL)
1126     {
1127 #if DEBUG_DATASTORE
1128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                   "Could not create queue entry for REMOVE\n");
1130 #endif
1131       return NULL;
1132     }
1133   GNUNET_STATISTICS_update (h->stats,
1134                             gettext_noop ("# REMOVE requests executed"),
1135                             1,
1136                             GNUNET_NO);
1137   dm = (struct DataMessage*) &qe[1];
1138   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1139   dm->header.size = htons(msize);
1140   dm->rid = htonl(0);
1141   dm->size = htonl(size);
1142   dm->type = htonl(0);
1143   dm->priority = htonl(0);
1144   dm->anonymity = htonl(0);
1145   dm->uid = GNUNET_htonll(0);
1146   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1147   dm->key = *key;
1148   memcpy (&dm[1], data, size);
1149   process_queue (h);
1150   return qe;
1151 }
1152
1153
1154 /**
1155  * Type of a function to call when we receive a message
1156  * from the service.
1157  *
1158  * @param cls closure
1159  * @param msg message received, NULL on timeout or fatal error
1160  */
1161 static void 
1162 process_result_message (void *cls,
1163                         const struct GNUNET_MessageHeader * msg)
1164 {
1165   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1166   struct GNUNET_DATASTORE_Handle *h = qe->h;
1167   struct ResultContext rc = qe->qc.rc;
1168   const struct DataMessage *dm;
1169   int was_transmitted;
1170
1171   h->in_receive = GNUNET_NO;
1172   if (msg == NULL)
1173    {
1174       was_transmitted = qe->was_transmitted;
1175       free_queue_entry (qe);
1176       if (was_transmitted == GNUNET_YES)
1177         {
1178           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1179                       _("Failed to receive response from database.\n"));
1180           do_disconnect (h);
1181         }
1182       else
1183         {
1184 #if DEBUG_DATASTORE
1185           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                       "Request dropped due to finite datastore queue length.\n");
1187 #endif
1188         }
1189       if (rc.iter != NULL)
1190         rc.iter (rc.iter_cls,
1191                  NULL, 0, NULL, 0, 0, 0, 
1192                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1193       return;
1194     }
1195   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1196   GNUNET_assert (h->queue_head == qe);
1197   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1198     {
1199       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1200       free_queue_entry (qe);
1201 #if DEBUG_DATASTORE
1202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1203                   "Received end of result set, new queue size is %u\n",
1204                   h->queue_size);
1205 #endif
1206       if (rc.iter != NULL)
1207         rc.iter (rc.iter_cls,
1208                  NULL, 0, NULL, 0, 0, 0, 
1209                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1210       h->retry_time.rel_value = 0;
1211       h->result_count = 0;
1212       process_queue (h);
1213       return;
1214     }
1215   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1216        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1217        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1218     {
1219       GNUNET_break (0);
1220       free_queue_entry (qe);
1221       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1222       do_disconnect (h);
1223       if (rc.iter != NULL)
1224         rc.iter (rc.iter_cls,
1225                  NULL, 0, NULL, 0, 0, 0, 
1226                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1227       return;
1228     }
1229   GNUNET_STATISTICS_update (h->stats,
1230                             gettext_noop ("# Results received"),
1231                             1,
1232                             GNUNET_NO);
1233   if (rc.iter == NULL)
1234     {
1235       h->result_count++;
1236       GNUNET_STATISTICS_update (h->stats,
1237                                 gettext_noop ("# Excess results received"),
1238                                 1,
1239                                 GNUNET_NO);
1240       if (h->result_count > MAX_EXCESS_RESULTS)
1241         {
1242           free_queue_entry (qe);
1243           GNUNET_STATISTICS_update (h->stats,
1244                                     gettext_noop ("# Forced database connection resets"),
1245                                     1,
1246                                     GNUNET_NO);
1247           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1248           do_disconnect (h);      
1249           return;
1250         }
1251       GNUNET_DATASTORE_get_next (h);
1252       return;
1253     }
1254   dm = (const struct DataMessage*) msg;
1255 #if DEBUG_DATASTORE
1256   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1257               "Received result %llu with type %u and size %u with key %s\n",
1258               (unsigned long long) GNUNET_ntohll(dm->uid),
1259               ntohl(dm->type),
1260               ntohl(dm->size),
1261               GNUNET_h2s(&dm->key));
1262 #endif
1263   h->retry_time.rel_value = 0;
1264   rc.iter (rc.iter_cls,
1265            &dm->key,
1266            ntohl(dm->size),
1267            &dm[1],
1268            ntohl(dm->type),
1269            ntohl(dm->priority),
1270            ntohl(dm->anonymity),
1271            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1272            GNUNET_ntohll(dm->uid));
1273 }
1274
1275
1276 /**
1277  * Get a random value from the datastore.
1278  *
1279  * @param h handle to the datastore
1280  * @param queue_priority ranking of this request in the priority queue
1281  * @param max_queue_size at what queue size should this request be dropped
1282  *        (if other requests of higher priority are in the queue)
1283  * @param timeout how long to wait at most for a response
1284  * @param iter function to call on a random value; it
1285  *        will be called once with a value (if available)
1286  *        and always once with a value of NULL.
1287  * @param iter_cls closure for iter
1288  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1289  *         cancel; note that even if NULL is returned, the callback will be invoked
1290  *         (or rather, will already have been invoked)
1291  */
1292 struct GNUNET_DATASTORE_QueueEntry *
1293 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1294                              unsigned int queue_priority,
1295                              unsigned int max_queue_size,
1296                              struct GNUNET_TIME_Relative timeout,
1297                              GNUNET_DATASTORE_Iterator iter, 
1298                              void *iter_cls)
1299 {
1300   struct GNUNET_DATASTORE_QueueEntry *qe;
1301   struct GNUNET_MessageHeader *m;
1302   union QueueContext qc;
1303
1304 #if DEBUG_DATASTORE
1305   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1306               "Asked to get random entry in %llu ms\n",
1307               (unsigned long long) timeout.rel_value);
1308 #endif
1309   qc.rc.iter = iter;
1310   qc.rc.iter_cls = iter_cls;
1311   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1312                          queue_priority, max_queue_size, timeout,
1313                          &process_result_message, &qc);
1314   if (qe == NULL)
1315     {
1316 #if DEBUG_DATASTORE
1317       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1318                   "Could not create queue entry for GET RANDOM\n");
1319 #endif
1320       return NULL;    
1321     }
1322   GNUNET_STATISTICS_update (h->stats,
1323                             gettext_noop ("# GET RANDOM requests executed"),
1324                             1,
1325                             GNUNET_NO);
1326   m = (struct GNUNET_MessageHeader*) &qe[1];
1327   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1328   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1329   process_queue (h);
1330   return qe;
1331 }
1332
1333
1334 /**
1335  * Get a zero-anonymity value from the datastore.
1336  *
1337  * @param h handle to the datastore
1338  * @param queue_priority ranking of this request in the priority queue
1339  * @param max_queue_size at what queue size should this request be dropped
1340  *        (if other requests of higher priority are in the queue)
1341  * @param timeout how long to wait at most for a response
1342  * @param type allowed type for the operation
1343  * @param iter function to call on a random value; it
1344  *        will be called once with a value (if available)
1345  *        and always once with a value of NULL.
1346  * @param iter_cls closure for iter
1347  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1348  *         cancel; note that even if NULL is returned, the callback will be invoked
1349  *         (or rather, will already have been invoked)
1350  */
1351 struct GNUNET_DATASTORE_QueueEntry *
1352 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1353                                      unsigned int queue_priority,
1354                                      unsigned int max_queue_size,
1355                                      struct GNUNET_TIME_Relative timeout,
1356                                      enum GNUNET_BLOCK_Type type,
1357                                      GNUNET_DATASTORE_Iterator iter, 
1358                                      void *iter_cls)
1359 {
1360   struct GNUNET_DATASTORE_QueueEntry *qe;
1361   struct GetZeroAnonymityMessage *m;
1362   union QueueContext qc;
1363
1364 #if DEBUG_DATASTORE
1365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1366               "Asked to get zero-anonymity entry in %llu ms\n",
1367               (unsigned long long) timeout.rel_value);
1368 #endif
1369   qc.rc.iter = iter;
1370   qc.rc.iter_cls = iter_cls;
1371   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1372                          queue_priority, max_queue_size, timeout,
1373                          &process_result_message, &qc);
1374   if (qe == NULL)
1375     {
1376 #if DEBUG_DATASTORE
1377       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378                   "Could not create queue entry for zero-anonymity iteration\n");
1379 #endif
1380       return NULL;    
1381     }
1382   GNUNET_STATISTICS_update (h->stats,
1383                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1384                             1,
1385                             GNUNET_NO);
1386   m = (struct GetZeroAnonymityMessage*) &qe[1];
1387   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1388   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1389   m->type = htonl ((uint32_t) type);
1390   process_queue (h);
1391   return qe;
1392 }
1393
1394
1395
1396 /**
1397  * Iterate over the results for a particular key
1398  * in the datastore.  The iterator will only be called
1399  * once initially; if the first call did contain a
1400  * result, further results can be obtained by calling
1401  * "GNUNET_DATASTORE_get_next" with the given argument.
1402  *
1403  * @param h handle to the datastore
1404  * @param key maybe NULL (to match all entries)
1405  * @param type desired type, 0 for any
1406  * @param queue_priority ranking of this request in the priority queue
1407  * @param max_queue_size at what queue size should this request be dropped
1408  *        (if other requests of higher priority are in the queue)
1409  * @param timeout how long to wait at most for a response
1410  * @param iter function to call on each matching value;
1411  *        will be called once with a NULL value at the end
1412  * @param iter_cls closure for iter
1413  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1414  *         cancel; note that even if NULL is returned, the callback will be invoked
1415  *         (or rather, will already have been invoked)
1416  */
1417 struct GNUNET_DATASTORE_QueueEntry *
1418 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1419                       const GNUNET_HashCode * key,
1420                       enum GNUNET_BLOCK_Type type,
1421                       unsigned int queue_priority,
1422                       unsigned int max_queue_size,
1423                       struct GNUNET_TIME_Relative timeout,
1424                       GNUNET_DATASTORE_Iterator iter, 
1425                       void *iter_cls)
1426 {
1427   struct GNUNET_DATASTORE_QueueEntry *qe;
1428   struct GetMessage *gm;
1429   union QueueContext qc;
1430
1431 #if DEBUG_DATASTORE
1432   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1433               "Asked to look for data of type %u under key `%s'\n",
1434               (unsigned int) type,
1435               GNUNET_h2s (key));
1436 #endif
1437   qc.rc.iter = iter;
1438   qc.rc.iter_cls = iter_cls;
1439   qe = make_queue_entry (h, sizeof(struct GetMessage),
1440                          queue_priority, max_queue_size, timeout,
1441                          &process_result_message, &qc);
1442   if (qe == NULL)
1443     {
1444 #if DEBUG_DATASTORE
1445       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1446                   "Could not queue request for `%s'\n",
1447                   GNUNET_h2s (key));
1448 #endif
1449       return NULL;
1450     }
1451   GNUNET_STATISTICS_update (h->stats,
1452                             gettext_noop ("# GET requests executed"),
1453                             1,
1454                             GNUNET_NO);
1455   gm = (struct GetMessage*) &qe[1];
1456   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1457   gm->type = htonl(type);
1458   if (key != NULL)
1459     {
1460       gm->header.size = htons(sizeof (struct GetMessage));
1461       gm->key = *key;
1462     }
1463   else
1464     {
1465       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1466     }
1467   process_queue (h);
1468   return qe;
1469 }
1470
1471
1472 /**
1473  * Function called to trigger obtaining the next result
1474  * from the datastore.
1475  * 
1476  * @param h handle to the datastore
1477  */
1478 void 
1479 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h)
1480 {
1481   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1482
1483   GNUNET_assert (&process_result_message == qe->response_proc);
1484   h->in_receive = GNUNET_YES;
1485   GNUNET_CLIENT_receive (h->client,
1486                          qe->response_proc,
1487                          qe,
1488                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1489 }
1490
1491
1492 /**
1493  * Cancel a datastore operation.  The final callback from the
1494  * operation must not have been done yet.
1495  * 
1496  * @param qe operation to cancel
1497  */
1498 void
1499 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1500 {
1501   struct GNUNET_DATASTORE_Handle *h;
1502
1503   h = qe->h;
1504 #if DEBUG_DATASTORE
1505   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1506                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1507                qe,
1508                qe->was_transmitted,
1509                h->queue_head == qe);
1510 #endif
1511   if (GNUNET_YES == qe->was_transmitted) 
1512     {
1513       free_queue_entry (qe);
1514       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1515       do_disconnect (h);
1516       return;
1517     }
1518   free_queue_entry (qe);
1519   process_queue (h);
1520 }
1521
1522
1523 /* end of datastore_api.c */