(no commit message)
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int32_t was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184   /**
185    * Our scheduler.
186    */
187   struct GNUNET_SCHEDULER_Handle *sched;
188
189   /**
190    * Current connection to the datastore service.
191    */
192   struct GNUNET_CLIENT_Connection *client;
193
194   /**
195    * Handle for statistics.
196    */
197   struct GNUNET_STATISTICS_Handle *stats;
198
199   /**
200    * Current transmit handle.
201    */
202   struct GNUNET_CLIENT_TransmitHandle *th;
203
204   /**
205    * Current head of priority queue.
206    */
207   struct GNUNET_DATASTORE_QueueEntry *queue_head;
208
209   /**
210    * Current tail of priority queue.
211    */
212   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
213
214   /**
215    * Task for trying to reconnect.
216    */
217   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
218
219   /**
220    * How quickly should we retry?  Used for exponential back-off on
221    * connect-errors.
222    */
223   struct GNUNET_TIME_Relative retry_time;
224
225   /**
226    * Number of entries in the queue.
227    */
228   unsigned int queue_size;
229
230   /**
231    * Number of results we're receiving for the current query
232    * after application stopped to care.  Used to determine when
233    * to reset the connection.
234    */
235   unsigned int result_count;
236
237   /**
238    * Are we currently trying to receive from the service?
239    */
240   int in_receive;
241
242 };
243
244
245
246 /**
247  * Connect to the datastore service.
248  *
249  * @param cfg configuration to use
250  * @param sched scheduler to use
251  * @return handle to use to access the service
252  */
253 struct GNUNET_DATASTORE_Handle *
254 GNUNET_DATASTORE_connect (const struct
255                           GNUNET_CONFIGURATION_Handle
256                           *cfg,
257                           struct
258                           GNUNET_SCHEDULER_Handle
259                           *sched)
260 {
261   struct GNUNET_CLIENT_Connection *c;
262   struct GNUNET_DATASTORE_Handle *h;
263   
264   c = GNUNET_CLIENT_connect (sched, "datastore", cfg);
265   if (c == NULL)
266     return NULL; /* oops */
267   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
268                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
269   h->client = c;
270   h->cfg = cfg;
271   h->sched = sched;
272   h->stats = GNUNET_STATISTICS_create (sched,
273                                        "datastore-api", 
274                                        cfg);
275   return h;
276 }
277
278
279 /**
280  * Transmit DROP message to datastore service.
281  *
282  * @param cls the 'struct GNUNET_DATASTORE_Handle'
283  * @param size number of bytes that can be copied to buf
284  * @param buf where to copy the drop message
285  * @return number of bytes written to buf
286  */
287 static size_t
288 transmit_drop (void *cls,
289                size_t size, 
290                void *buf)
291 {
292   struct GNUNET_DATASTORE_Handle *h = cls;
293   struct GNUNET_MessageHeader *hdr;
294   
295   if (buf == NULL)
296     {
297       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
298                   _("Failed to transmit request to drop database.\n"));
299       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
300       return 0;
301     }
302   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
303   hdr = buf;
304   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
305   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
306   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
307   return sizeof(struct GNUNET_MessageHeader);
308 }
309
310
311 /**
312  * Disconnect from the datastore service (and free
313  * associated resources).
314  *
315  * @param h handle to the datastore
316  * @param drop set to GNUNET_YES to delete all data in datastore (!)
317  */
318 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
319                                   int drop)
320 {
321   struct GNUNET_DATASTORE_QueueEntry *qe;
322
323   if (h->client != NULL)
324     {
325       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
326       h->client = NULL;
327     }
328   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
329     {
330       GNUNET_SCHEDULER_cancel (h->sched,
331                                h->reconnect_task);
332       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
333     }
334   while (NULL != (qe = h->queue_head))
335     {
336       GNUNET_assert (NULL != qe->response_proc);
337       qe->response_proc (qe, NULL);
338     }
339   if (GNUNET_YES == drop) 
340     {
341       h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
342       if (h->client != NULL)
343         {
344           if (NULL != 
345               GNUNET_CLIENT_notify_transmit_ready (h->client,
346                                                    sizeof(struct GNUNET_MessageHeader),
347                                                    GNUNET_TIME_UNIT_MINUTES,
348                                                    GNUNET_YES,
349                                                    &transmit_drop,
350                                                    h))
351             return;
352           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
353         }
354       GNUNET_break (0);
355     }
356   GNUNET_STATISTICS_destroy (h->stats,
357                              GNUNET_NO);
358   GNUNET_free (h);
359 }
360
361
362 /**
363  * A request has timed out (before being transmitted to the service).
364  *
365  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
366  * @param tc scheduler context
367  */
368 static void
369 timeout_queue_entry (void *cls,
370                      const struct GNUNET_SCHEDULER_TaskContext *tc)
371 {
372   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
373
374   GNUNET_STATISTICS_update (qe->h->stats,
375                             gettext_noop ("# queue entry timeouts"),
376                             1,
377                             GNUNET_NO);
378   qe->task = GNUNET_SCHEDULER_NO_TASK;
379   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
380   qe->response_proc (qe, NULL);
381 }
382
383
384 /**
385  * Create a new entry for our priority queue (and possibly discard other entires if
386  * the queue is getting too long).
387  *
388  * @param h handle to the datastore
389  * @param msize size of the message to queue
390  * @param queue_priority priority of the entry
391  * @param max_queue_size at what queue size should this request be dropped
392  *        (if other requests of higher priority are in the queue)
393  * @param timeout timeout for the operation
394  * @param response_proc function to call with replies (can be NULL)
395  * @param qc client context (NOT a closure for response_proc)
396  * @return NULL if the queue is full (and this entry was dropped)
397  */
398 static struct GNUNET_DATASTORE_QueueEntry *
399 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
400                   size_t msize,
401                   unsigned int queue_priority,
402                   unsigned int max_queue_size,
403                   struct GNUNET_TIME_Relative timeout,
404                   GNUNET_CLIENT_MessageHandler response_proc,            
405                   const union QueueContext *qc)
406 {
407   struct GNUNET_DATASTORE_QueueEntry *ret;
408   struct GNUNET_DATASTORE_QueueEntry *pos;
409   unsigned int c;
410
411   c = 0;
412   pos = h->queue_head;
413   while ( (pos != NULL) &&
414           (c < max_queue_size) &&
415           (pos->priority >= queue_priority) )
416     {
417       c++;
418       pos = pos->next;
419     }
420   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
421   ret->h = h;
422   ret->response_proc = response_proc;
423   ret->qc = *qc;
424   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
425   ret->priority = queue_priority;
426   ret->max_queue = max_queue_size;
427   ret->message_size = msize;
428   ret->was_transmitted = GNUNET_NO;
429   if (pos == NULL)
430     {
431       /* append at the tail */
432       pos = h->queue_tail;
433     }
434   else
435     {
436       pos = pos->prev; 
437       /* do not insert at HEAD if HEAD query was already
438          transmitted and we are still receiving replies! */
439       if ( (pos == NULL) &&
440            (h->queue_head->was_transmitted) )
441         pos = h->queue_head;
442     }
443   c++;
444   GNUNET_STATISTICS_update (h->stats,
445                             gettext_noop ("# queue entries created"),
446                             1,
447                             GNUNET_NO);
448   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
449                                      h->queue_tail,
450                                      pos,
451                                      ret);
452   h->queue_size++;
453   if (c > max_queue_size)
454     {
455       GNUNET_STATISTICS_update (h->stats,
456                                 gettext_noop ("# queue overflows"),
457                                 1,
458                                 GNUNET_NO);
459       response_proc (ret, NULL);
460       return NULL;
461     }
462   ret->task = GNUNET_SCHEDULER_add_delayed (h->sched,
463                                             timeout,
464                                             &timeout_queue_entry,
465                                             ret);
466   pos = ret->next;
467   while (pos != NULL) 
468     {
469       if (pos->max_queue < h->queue_size)
470         {
471           GNUNET_assert (pos->response_proc != NULL);
472           pos->response_proc (pos, NULL);
473           break;
474         }
475       pos = pos->next;
476     }
477   return ret;
478 }
479
480
481 /**
482  * Process entries in the queue (or do nothing if we are already
483  * doing so).
484  * 
485  * @param h handle to the datastore
486  */
487 static void
488 process_queue (struct GNUNET_DATASTORE_Handle *h);
489
490
491 /**
492  * Try reconnecting to the datastore service.
493  *
494  * @param cls the 'struct GNUNET_DATASTORE_Handle'
495  * @param tc scheduler context
496  */
497 static void
498 try_reconnect (void *cls,
499                const struct GNUNET_SCHEDULER_TaskContext *tc)
500 {
501   struct GNUNET_DATASTORE_Handle *h = cls;
502
503   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
504     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
505   else
506     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
507   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
508     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
509   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
510   h->client = GNUNET_CLIENT_connect (h->sched, "datastore", h->cfg);
511   if (h->client == NULL)
512     {
513       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
514                   "DATASTORE reconnect failed (fatally)\n");
515       return;
516     }
517   GNUNET_STATISTICS_update (h->stats,
518                             gettext_noop ("# datastore connections (re)created"),
519                             1,
520                             GNUNET_NO);
521 #if DEBUG_DATASTORE
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Reconnected to DATASTORE\n");
524 #endif
525   process_queue (h);
526 }
527
528
529 /**
530  * Disconnect from the service and then try reconnecting to the datastore service
531  * after some delay.
532  *
533  * @param h handle to datastore to disconnect and reconnect
534  */
535 static void
536 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
537 {
538   if (h->client == NULL)
539     {
540 #if DEBUG_DATASTORE
541       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
542                   "client NULL in disconnect, will not try to reconnect\n");
543 #endif
544       return;
545     }
546 #if 0
547   GNUNET_STATISTICS_update (stats,
548                             gettext_noop ("# reconnected to DATASTORE"),
549                             1,
550                             GNUNET_NO);
551 #endif
552   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
553   h->client = NULL;
554   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->sched,
555                                                     h->retry_time,
556                                                     &try_reconnect,
557                                                     h);      
558 }
559
560
561 /**
562  * Transmit request from queue to datastore service.
563  *
564  * @param cls the 'struct GNUNET_DATASTORE_Handle'
565  * @param size number of bytes that can be copied to buf
566  * @param buf where to copy the drop message
567  * @return number of bytes written to buf
568  */
569 static size_t
570 transmit_request (void *cls,
571                   size_t size, 
572                   void *buf)
573 {
574   struct GNUNET_DATASTORE_Handle *h = cls;
575   struct GNUNET_DATASTORE_QueueEntry *qe;
576   size_t msize;
577
578   h->th = NULL;
579   if (NULL == (qe = h->queue_head))
580     return 0; /* no entry in queue */
581   if (buf == NULL)
582     {
583       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
584                   _("Failed to transmit request to DATASTORE.\n"));
585       GNUNET_STATISTICS_update (h->stats,
586                                 gettext_noop ("# transmission request failures"),
587                                 1,
588                                 GNUNET_NO);
589       do_disconnect (h);
590       return 0;
591     }
592   if (size < (msize = qe->message_size))
593     {
594       process_queue (h);
595       return 0;
596     }
597  #if DEBUG_DATASTORE
598   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
599               "Transmitting %u byte request to DATASTORE\n",
600               msize);
601 #endif
602   memcpy (buf, &qe[1], msize);
603   qe->was_transmitted = GNUNET_YES;
604   GNUNET_SCHEDULER_cancel (h->sched,
605                            qe->task);
606   qe->task = GNUNET_SCHEDULER_NO_TASK;
607   h->in_receive = GNUNET_YES;
608   GNUNET_CLIENT_receive (h->client,
609                          qe->response_proc,
610                          qe,
611                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
612   GNUNET_STATISTICS_update (h->stats,
613                             gettext_noop ("# bytes sent to datastore"),
614                             1,
615                             GNUNET_NO);
616   return msize;
617 }
618
619
620 /**
621  * Process entries in the queue (or do nothing if we are already
622  * doing so).
623  * 
624  * @param h handle to the datastore
625  */
626 static void
627 process_queue (struct GNUNET_DATASTORE_Handle *h)
628 {
629   struct GNUNET_DATASTORE_QueueEntry *qe;
630
631   if (NULL == (qe = h->queue_head))
632     {
633 #if DEBUG_DATASTORE > 1
634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
635                   "Queue empty\n");
636 #endif
637       return; /* no entry in queue */
638     }
639   if (qe->was_transmitted == GNUNET_YES)
640     {
641 #if DEBUG_DATASTORE > 1
642       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643                   "Head request already transmitted\n");
644 #endif
645       return; /* waiting for replies */
646     }
647   if (h->th != NULL)
648     {
649 #if DEBUG_DATASTORE > 1
650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651                   "Pending transmission request\n");
652 #endif
653       return; /* request pending */
654     }
655   if (h->client == NULL)
656     {
657 #if DEBUG_DATASTORE > 1
658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659                   "Not connected\n");
660 #endif
661       return; /* waiting for reconnect */
662     }
663 #if DEBUG_DATASTORE
664   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
665               "Queueing %u byte request to DATASTORE\n",
666               qe->message_size);
667 #endif
668   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
669                                                qe->message_size,
670                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
671                                                GNUNET_YES,
672                                                &transmit_request,
673                                                h);
674 }
675
676
677 /**
678  * Dummy continuation used to do nothing (but be non-zero).
679  *
680  * @param cls closure
681  * @param result result 
682  * @param emsg error message
683  */
684 static void
685 drop_status_cont (void *cls, int result, const char *emsg)
686 {
687   /* do nothing */
688 }
689
690
691 static void
692 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
693 {
694   struct GNUNET_DATASTORE_Handle *h = qe->h;
695
696   GNUNET_CONTAINER_DLL_remove (h->queue_head,
697                                h->queue_tail,
698                                qe);
699   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
700     {
701       GNUNET_SCHEDULER_cancel (h->sched,
702                                qe->task);
703       qe->task = GNUNET_SCHEDULER_NO_TASK;
704     }
705   h->queue_size--;
706   GNUNET_free (qe);
707 }
708
709 /**
710  * Type of a function to call when we receive a message
711  * from the service.
712  *
713  * @param cls closure
714  * @param msg message received, NULL on timeout or fatal error
715  */
716 static void 
717 process_status_message (void *cls,
718                         const struct
719                         GNUNET_MessageHeader * msg)
720 {
721   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
722   struct GNUNET_DATASTORE_Handle *h = qe->h;
723   struct StatusContext rc = qe->qc.sc;
724   const struct StatusMessage *sm;
725   const char *emsg;
726   int32_t status;
727   int was_transmitted;
728
729   h->in_receive = GNUNET_NO;
730   was_transmitted = qe->was_transmitted;
731   if (msg == NULL)
732     {      
733       free_queue_entry (qe);
734       if (NULL == h->client)
735         return; /* forced disconnect */
736       if (rc.cont != NULL)
737         rc.cont (rc.cont_cls, 
738                  GNUNET_SYSERR,
739                  _("Failed to receive status response from database."));
740       if (was_transmitted == GNUNET_YES)
741         do_disconnect (h);
742       return;
743     }
744   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
745   GNUNET_assert (h->queue_head == qe);
746   free_queue_entry (qe);
747   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
748        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
749     {
750       GNUNET_break (0);
751       h->retry_time = GNUNET_TIME_UNIT_ZERO;
752       do_disconnect (h);
753       if (rc.cont != NULL)
754         rc.cont (rc.cont_cls, 
755                  GNUNET_SYSERR,
756                  _("Error reading response from datastore service"));
757       return;
758     }
759   sm = (const struct StatusMessage*) msg;
760   status = ntohl(sm->status);
761   emsg = NULL;
762   if (ntohs(msg->size) > sizeof(struct StatusMessage))
763     {
764       emsg = (const char*) &sm[1];
765       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
766         {
767           GNUNET_break (0);
768           emsg = _("Invalid error message received from datastore service");
769         }
770     }  
771   if ( (status == GNUNET_SYSERR) &&
772        (emsg == NULL) )
773     {
774       GNUNET_break (0);
775       emsg = _("Invalid error message received from datastore service");
776     }
777 #if DEBUG_DATASTORE
778   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779               "Received status %d/%s\n",
780               (int) status,
781               emsg);
782 #endif
783   GNUNET_STATISTICS_update (h->stats,
784                             gettext_noop ("# status messages received"),
785                             1,
786                             GNUNET_NO);
787   h->retry_time.rel_value = 0;
788   process_queue (h);
789   if (rc.cont != NULL)
790     rc.cont (rc.cont_cls, 
791              status,
792              emsg);
793 }
794
795
796 /**
797  * Store an item in the datastore.  If the item is already present,
798  * the priorities are summed up and the higher expiration time and
799  * lower anonymity level is used.
800  *
801  * @param h handle to the datastore
802  * @param rid reservation ID to use (from "reserve"); use 0 if no
803  *            prior reservation was made
804  * @param key key for the value
805  * @param size number of bytes in data
806  * @param data content stored
807  * @param type type of the content
808  * @param priority priority of the content
809  * @param anonymity anonymity-level for the content
810  * @param expiration expiration time for the content
811  * @param queue_priority ranking of this request in the priority queue
812  * @param max_queue_size at what queue size should this request be dropped
813  *        (if other requests of higher priority are in the queue)
814  * @param timeout timeout for the operation
815  * @param cont continuation to call when done
816  * @param cont_cls closure for cont
817  * @return NULL if the entry was not queued, otherwise a handle that can be used to
818  *         cancel; note that even if NULL is returned, the callback will be invoked
819  *         (or rather, will already have been invoked)
820  */
821 struct GNUNET_DATASTORE_QueueEntry *
822 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
823                       int rid,
824                       const GNUNET_HashCode * key,
825                       size_t size,
826                       const void *data,
827                       enum GNUNET_BLOCK_Type type,
828                       uint32_t priority,
829                       uint32_t anonymity,
830                       struct GNUNET_TIME_Absolute expiration,
831                       unsigned int queue_priority,
832                       unsigned int max_queue_size,
833                       struct GNUNET_TIME_Relative timeout,
834                       GNUNET_DATASTORE_ContinuationWithStatus cont,
835                       void *cont_cls)
836 {
837   struct GNUNET_DATASTORE_QueueEntry *qe;
838   struct DataMessage *dm;
839   size_t msize;
840   union QueueContext qc;
841
842 #if DEBUG_DATASTORE
843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844               "Asked to put %u bytes of data under key `%s'\n",
845               size,
846               GNUNET_h2s (key));
847 #endif
848   msize = sizeof(struct DataMessage) + size;
849   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
850   qc.sc.cont = cont;
851   qc.sc.cont_cls = cont_cls;
852   qe = make_queue_entry (h, msize,
853                          queue_priority, max_queue_size, timeout,
854                          &process_status_message, &qc);
855   if (qe == NULL)
856     {
857 #if DEBUG_DATASTORE
858       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
859                   "Could not create queue entry for PUT\n");
860 #endif
861       return NULL;
862     }
863   GNUNET_STATISTICS_update (h->stats,
864                             gettext_noop ("# PUT requests executed"),
865                             1,
866                             GNUNET_NO);
867   dm = (struct DataMessage* ) &qe[1];
868   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
869   dm->header.size = htons(msize);
870   dm->rid = htonl(rid);
871   dm->size = htonl( (uint32_t) size);
872   dm->type = htonl(type);
873   dm->priority = htonl(priority);
874   dm->anonymity = htonl(anonymity);
875   dm->uid = GNUNET_htonll(0);
876   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
877   dm->key = *key;
878   memcpy (&dm[1], data, size);
879   process_queue (h);
880   return qe;
881 }
882
883
884 /**
885  * Reserve space in the datastore.  This function should be used
886  * to avoid "out of space" failures during a longer sequence of "put"
887  * operations (for example, when a file is being inserted).
888  *
889  * @param h handle to the datastore
890  * @param amount how much space (in bytes) should be reserved (for content only)
891  * @param entries how many entries will be created (to calculate per-entry overhead)
892  * @param queue_priority ranking of this request in the priority queue
893  * @param max_queue_size at what queue size should this request be dropped
894  *        (if other requests of higher priority are in the queue)
895  * @param timeout how long to wait at most for a response (or before dying in queue)
896  * @param cont continuation to call when done; "success" will be set to
897  *             a positive reservation value if space could be reserved.
898  * @param cont_cls closure for cont
899  * @return NULL if the entry was not queued, otherwise a handle that can be used to
900  *         cancel; note that even if NULL is returned, the callback will be invoked
901  *         (or rather, will already have been invoked)
902  */
903 struct GNUNET_DATASTORE_QueueEntry *
904 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
905                           uint64_t amount,
906                           uint32_t entries,
907                           unsigned int queue_priority,
908                           unsigned int max_queue_size,
909                           struct GNUNET_TIME_Relative timeout,
910                           GNUNET_DATASTORE_ContinuationWithStatus cont,
911                           void *cont_cls)
912 {
913   struct GNUNET_DATASTORE_QueueEntry *qe;
914   struct ReserveMessage *rm;
915   union QueueContext qc;
916
917   if (cont == NULL)
918     cont = &drop_status_cont;
919 #if DEBUG_DATASTORE
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Asked to reserve %llu bytes of data and %u entries'\n",
922               (unsigned long long) amount,
923               (unsigned int) entries);
924 #endif
925   qc.sc.cont = cont;
926   qc.sc.cont_cls = cont_cls;
927   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
928                          queue_priority, max_queue_size, timeout,
929                          &process_status_message, &qc);
930   if (qe == NULL)
931     {
932 #if DEBUG_DATASTORE
933       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
934                   "Could not create queue entry to reserve\n");
935 #endif
936       return NULL;
937     }
938   GNUNET_STATISTICS_update (h->stats,
939                             gettext_noop ("# RESERVE requests executed"),
940                             1,
941                             GNUNET_NO);
942   rm = (struct ReserveMessage*) &qe[1];
943   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
944   rm->header.size = htons(sizeof (struct ReserveMessage));
945   rm->entries = htonl(entries);
946   rm->amount = GNUNET_htonll(amount);
947   process_queue (h);
948   return qe;
949 }
950
951
952 /**
953  * Signal that all of the data for which a reservation was made has
954  * been stored and that whatever excess space might have been reserved
955  * can now be released.
956  *
957  * @param h handle to the datastore
958  * @param rid reservation ID (value of "success" in original continuation
959  *        from the "reserve" function).
960  * @param queue_priority ranking of this request in the priority queue
961  * @param max_queue_size at what queue size should this request be dropped
962  *        (if other requests of higher priority are in the queue)
963  * @param queue_priority ranking of this request in the priority queue
964  * @param max_queue_size at what queue size should this request be dropped
965  *        (if other requests of higher priority are in the queue)
966  * @param timeout how long to wait at most for a response
967  * @param cont continuation to call when done
968  * @param cont_cls closure for cont
969  * @return NULL if the entry was not queued, otherwise a handle that can be used to
970  *         cancel; note that even if NULL is returned, the callback will be invoked
971  *         (or rather, will already have been invoked)
972  */
973 struct GNUNET_DATASTORE_QueueEntry *
974 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
975                                   int rid,
976                                   unsigned int queue_priority,
977                                   unsigned int max_queue_size,
978                                   struct GNUNET_TIME_Relative timeout,
979                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
980                                   void *cont_cls)
981 {
982   struct GNUNET_DATASTORE_QueueEntry *qe;
983   struct ReleaseReserveMessage *rrm;
984   union QueueContext qc;
985
986   if (cont == NULL)
987     cont = &drop_status_cont;
988 #if DEBUG_DATASTORE
989   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
990               "Asked to release reserve %d\n",
991               rid);
992 #endif
993   qc.sc.cont = cont;
994   qc.sc.cont_cls = cont_cls;
995   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
996                          queue_priority, max_queue_size, timeout,
997                          &process_status_message, &qc);
998   if (qe == NULL)
999     {
1000 #if DEBUG_DATASTORE
1001       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002                   "Could not create queue entry to release reserve\n");
1003 #endif
1004       return NULL;
1005     }
1006   GNUNET_STATISTICS_update (h->stats,
1007                             gettext_noop ("# RELEASE RESERVE requests executed"),
1008                             1,
1009                             GNUNET_NO);
1010   rrm = (struct ReleaseReserveMessage*) &qe[1];
1011   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
1012   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1013   rrm->rid = htonl(rid);
1014   process_queue (h);
1015   return qe;
1016 }
1017
1018
1019 /**
1020  * Update a value in the datastore.
1021  *
1022  * @param h handle to the datastore
1023  * @param uid identifier for the value
1024  * @param priority how much to increase the priority of the value
1025  * @param expiration new expiration value should be MAX of existing and this argument
1026  * @param queue_priority ranking of this request in the priority queue
1027  * @param max_queue_size at what queue size should this request be dropped
1028  *        (if other requests of higher priority are in the queue)
1029  * @param timeout how long to wait at most for a response
1030  * @param cont continuation to call when done
1031  * @param cont_cls closure for cont
1032  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1033  *         cancel; note that even if NULL is returned, the callback will be invoked
1034  *         (or rather, will already have been invoked)
1035  */
1036 struct GNUNET_DATASTORE_QueueEntry *
1037 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1038                          unsigned long long uid,
1039                          uint32_t priority,
1040                          struct GNUNET_TIME_Absolute expiration,
1041                          unsigned int queue_priority,
1042                          unsigned int max_queue_size,
1043                          struct GNUNET_TIME_Relative timeout,
1044                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1045                          void *cont_cls)
1046 {
1047   struct GNUNET_DATASTORE_QueueEntry *qe;
1048   struct UpdateMessage *um;
1049   union QueueContext qc;
1050
1051   if (cont == NULL)
1052     cont = &drop_status_cont;
1053 #if DEBUG_DATASTORE
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1056               uid,
1057               (unsigned int) priority,
1058               (unsigned long long) expiration.abs_value);
1059 #endif
1060   qc.sc.cont = cont;
1061   qc.sc.cont_cls = cont_cls;
1062   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1063                          queue_priority, max_queue_size, timeout,
1064                          &process_status_message, &qc);
1065   if (qe == NULL)
1066     {
1067 #if DEBUG_DATASTORE
1068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                   "Could not create queue entry for UPDATE\n");
1070 #endif
1071       return NULL;
1072     }
1073   GNUNET_STATISTICS_update (h->stats,
1074                             gettext_noop ("# UPDATE requests executed"),
1075                             1,
1076                             GNUNET_NO);
1077   um = (struct UpdateMessage*) &qe[1];
1078   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1079   um->header.size = htons(sizeof (struct UpdateMessage));
1080   um->priority = htonl(priority);
1081   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1082   um->uid = GNUNET_htonll(uid);
1083   process_queue (h);
1084   return qe;
1085 }
1086
1087
1088 /**
1089  * Explicitly remove some content from the database.
1090  * The "cont"inuation will be called with status
1091  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1092  * if no matching entry was found and "GNUNET_SYSERR"
1093  * on all other types of errors.
1094  *
1095  * @param h handle to the datastore
1096  * @param key key for the value
1097  * @param size number of bytes in data
1098  * @param data content stored
1099  * @param queue_priority ranking of this request in the priority queue
1100  * @param max_queue_size at what queue size should this request be dropped
1101  *        (if other requests of higher priority are in the queue)
1102  * @param timeout how long to wait at most for a response
1103  * @param cont continuation to call when done
1104  * @param cont_cls closure for cont
1105  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1106  *         cancel; note that even if NULL is returned, the callback will be invoked
1107  *         (or rather, will already have been invoked)
1108  */
1109 struct GNUNET_DATASTORE_QueueEntry *
1110 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1111                          const GNUNET_HashCode *key,
1112                          size_t size, 
1113                          const void *data,
1114                          unsigned int queue_priority,
1115                          unsigned int max_queue_size,
1116                          struct GNUNET_TIME_Relative timeout,
1117                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1118                          void *cont_cls)
1119 {
1120   struct GNUNET_DATASTORE_QueueEntry *qe;
1121   struct DataMessage *dm;
1122   size_t msize;
1123   union QueueContext qc;
1124
1125   if (cont == NULL)
1126     cont = &drop_status_cont;
1127 #if DEBUG_DATASTORE
1128   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129               "Asked to remove %u bytes under key `%s'\n",
1130               size,
1131               GNUNET_h2s (key));
1132 #endif
1133   qc.sc.cont = cont;
1134   qc.sc.cont_cls = cont_cls;
1135   msize = sizeof(struct DataMessage) + size;
1136   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1137   qe = make_queue_entry (h, msize,
1138                          queue_priority, max_queue_size, timeout,
1139                          &process_status_message, &qc);
1140   if (qe == NULL)
1141     {
1142 #if DEBUG_DATASTORE
1143       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1144                   "Could not create queue entry for REMOVE\n");
1145 #endif
1146       return NULL;
1147     }
1148   GNUNET_STATISTICS_update (h->stats,
1149                             gettext_noop ("# REMOVE requests executed"),
1150                             1,
1151                             GNUNET_NO);
1152   dm = (struct DataMessage*) &qe[1];
1153   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1154   dm->header.size = htons(msize);
1155   dm->rid = htonl(0);
1156   dm->size = htonl(size);
1157   dm->type = htonl(0);
1158   dm->priority = htonl(0);
1159   dm->anonymity = htonl(0);
1160   dm->uid = GNUNET_htonll(0);
1161   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1162   dm->key = *key;
1163   memcpy (&dm[1], data, size);
1164   process_queue (h);
1165   return qe;
1166 }
1167
1168
1169 /**
1170  * Type of a function to call when we receive a message
1171  * from the service.
1172  *
1173  * @param cls closure
1174  * @param msg message received, NULL on timeout or fatal error
1175  */
1176 static void 
1177 process_result_message (void *cls,
1178                         const struct GNUNET_MessageHeader * msg)
1179 {
1180   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1181   struct GNUNET_DATASTORE_Handle *h = qe->h;
1182   struct ResultContext rc = qe->qc.rc;
1183   const struct DataMessage *dm;
1184   int was_transmitted;
1185
1186   h->in_receive = GNUNET_NO;
1187   if (msg == NULL)
1188    {
1189       was_transmitted = qe->was_transmitted;
1190       free_queue_entry (qe);
1191       if (was_transmitted == GNUNET_YES)
1192         {
1193           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1194                       _("Failed to receive response from database.\n"));
1195           do_disconnect (h);
1196         }
1197       else
1198         {
1199 #if DEBUG_DATASTORE
1200           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1201                       "Request dropped due to finite datastore queue length.\n");
1202 #endif
1203         }
1204       if (rc.iter != NULL)
1205         rc.iter (rc.iter_cls,
1206                  NULL, 0, NULL, 0, 0, 0, 
1207                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1208       return;
1209     }
1210   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1211   GNUNET_assert (h->queue_head == qe);
1212   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1213     {
1214       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1215 #if DEBUG_DATASTORE
1216       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217                   "Received end of result set\n");
1218 #endif
1219       free_queue_entry (qe);
1220       if (rc.iter != NULL)
1221         rc.iter (rc.iter_cls,
1222                  NULL, 0, NULL, 0, 0, 0, 
1223                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1224       h->retry_time.rel_value = 0;
1225       h->result_count = 0;
1226       process_queue (h);
1227       return;
1228     }
1229   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1230        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1231        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1232     {
1233       GNUNET_break (0);
1234       free_queue_entry (qe);
1235       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1236       do_disconnect (h);
1237       if (rc.iter != NULL)
1238         rc.iter (rc.iter_cls,
1239                  NULL, 0, NULL, 0, 0, 0, 
1240                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1241       return;
1242     }
1243   GNUNET_STATISTICS_update (h->stats,
1244                             gettext_noop ("# Results received"),
1245                             1,
1246                             GNUNET_NO);
1247   if (rc.iter == NULL)
1248     {
1249       h->result_count++;
1250       GNUNET_STATISTICS_update (h->stats,
1251                                 gettext_noop ("# Excess results received"),
1252                                 1,
1253                                 GNUNET_NO);
1254       if (h->result_count > MAX_EXCESS_RESULTS)
1255         {
1256           free_queue_entry (qe);
1257           GNUNET_STATISTICS_update (h->stats,
1258                                     gettext_noop ("# Forced database connection resets"),
1259                                     1,
1260                                     GNUNET_NO);
1261           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1262           do_disconnect (h);      
1263           return;
1264         }
1265       GNUNET_DATASTORE_get_next (h, GNUNET_NO);
1266       return;
1267     }
1268   dm = (const struct DataMessage*) msg;
1269 #if DEBUG_DATASTORE
1270   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271               "Received result %llu with type %u and size %u with key %s\n",
1272               (unsigned long long) GNUNET_ntohll(dm->uid),
1273               ntohl(dm->type),
1274               ntohl(dm->size),
1275               GNUNET_h2s(&dm->key));
1276 #endif
1277   h->retry_time.rel_value = 0;
1278   rc.iter (rc.iter_cls,
1279            &dm->key,
1280            ntohl(dm->size),
1281            &dm[1],
1282            ntohl(dm->type),
1283            ntohl(dm->priority),
1284            ntohl(dm->anonymity),
1285            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1286            GNUNET_ntohll(dm->uid));
1287 }
1288
1289
1290 /**
1291  * Get a random value from the datastore.
1292  *
1293  * @param h handle to the datastore
1294  * @param queue_priority ranking of this request in the priority queue
1295  * @param max_queue_size at what queue size should this request be dropped
1296  *        (if other requests of higher priority are in the queue)
1297  * @param timeout how long to wait at most for a response
1298  * @param iter function to call on a random value; it
1299  *        will be called once with a value (if available)
1300  *        and always once with a value of NULL.
1301  * @param iter_cls closure for iter
1302  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1303  *         cancel; note that even if NULL is returned, the callback will be invoked
1304  *         (or rather, will already have been invoked)
1305  */
1306 struct GNUNET_DATASTORE_QueueEntry *
1307 GNUNET_DATASTORE_get_random (struct GNUNET_DATASTORE_Handle *h,
1308                              unsigned int queue_priority,
1309                              unsigned int max_queue_size,
1310                              struct GNUNET_TIME_Relative timeout,
1311                              GNUNET_DATASTORE_Iterator iter, 
1312                              void *iter_cls)
1313 {
1314   struct GNUNET_DATASTORE_QueueEntry *qe;
1315   struct GNUNET_MessageHeader *m;
1316   union QueueContext qc;
1317
1318 #if DEBUG_DATASTORE
1319   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1320               "Asked to get random entry in %llu ms\n",
1321               (unsigned long long) timeout.abs_value);
1322 #endif
1323   qc.rc.iter = iter;
1324   qc.rc.iter_cls = iter_cls;
1325   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1326                          queue_priority, max_queue_size, timeout,
1327                          &process_result_message, &qc);
1328   if (qe == NULL)
1329     {
1330 #if DEBUG_DATASTORE
1331       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1332                   "Could not create queue entry for GET RANDOM\n");
1333 #endif
1334       return NULL;    
1335     }
1336   GNUNET_STATISTICS_update (h->stats,
1337                             gettext_noop ("# GET RANDOM requests executed"),
1338                             1,
1339                             GNUNET_NO);
1340   m = (struct GNUNET_MessageHeader*) &qe[1];
1341   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1342   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1343   process_queue (h);
1344   return qe;
1345 }
1346
1347
1348 /**
1349  * Get a zero-anonymity value from the datastore.
1350  *
1351  * @param h handle to the datastore
1352  * @param queue_priority ranking of this request in the priority queue
1353  * @param max_queue_size at what queue size should this request be dropped
1354  *        (if other requests of higher priority are in the queue)
1355  * @param timeout how long to wait at most for a response
1356  * @param type allowed type for the operation
1357  * @param iter function to call on a random value; it
1358  *        will be called once with a value (if available)
1359  *        and always once with a value of NULL.
1360  * @param iter_cls closure for iter
1361  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1362  *         cancel; note that even if NULL is returned, the callback will be invoked
1363  *         (or rather, will already have been invoked)
1364  */
1365 struct GNUNET_DATASTORE_QueueEntry *
1366 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1367                                      unsigned int queue_priority,
1368                                      unsigned int max_queue_size,
1369                                      struct GNUNET_TIME_Relative timeout,
1370                                      enum GNUNET_BLOCK_Type type,
1371                                      GNUNET_DATASTORE_Iterator iter, 
1372                                      void *iter_cls)
1373 {
1374   struct GNUNET_DATASTORE_QueueEntry *qe;
1375   struct GetZeroAnonymityMessage *m;
1376   union QueueContext qc;
1377
1378 #if DEBUG_DATASTORE
1379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1380               "Asked to get zero-anonymity entry in %llu ms\n",
1381               (unsigned long long) timeout.abs_value);
1382 #endif
1383   qc.rc.iter = iter;
1384   qc.rc.iter_cls = iter_cls;
1385   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1386                          queue_priority, max_queue_size, timeout,
1387                          &process_result_message, &qc);
1388   if (qe == NULL)
1389     {
1390 #if DEBUG_DATASTORE
1391       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392                   "Could not create queue entry for zero-anonymity iteration\n");
1393 #endif
1394       return NULL;    
1395     }
1396   GNUNET_STATISTICS_update (h->stats,
1397                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1398                             1,
1399                             GNUNET_NO);
1400   m = (struct GetZeroAnonymityMessage*) &qe[1];
1401   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1402   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1403   m->type = htonl ((uint32_t) type);
1404   process_queue (h);
1405   return qe;
1406 }
1407
1408
1409
1410 /**
1411  * Iterate over the results for a particular key
1412  * in the datastore.  The iterator will only be called
1413  * once initially; if the first call did contain a
1414  * result, further results can be obtained by calling
1415  * "GNUNET_DATASTORE_get_next" with the given argument.
1416  *
1417  * @param h handle to the datastore
1418  * @param key maybe NULL (to match all entries)
1419  * @param type desired type, 0 for any
1420  * @param queue_priority ranking of this request in the priority queue
1421  * @param max_queue_size at what queue size should this request be dropped
1422  *        (if other requests of higher priority are in the queue)
1423  * @param timeout how long to wait at most for a response
1424  * @param iter function to call on each matching value;
1425  *        will be called once with a NULL value at the end
1426  * @param iter_cls closure for iter
1427  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1428  *         cancel; note that even if NULL is returned, the callback will be invoked
1429  *         (or rather, will already have been invoked)
1430  */
1431 struct GNUNET_DATASTORE_QueueEntry *
1432 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1433                       const GNUNET_HashCode * key,
1434                       enum GNUNET_BLOCK_Type type,
1435                       unsigned int queue_priority,
1436                       unsigned int max_queue_size,
1437                       struct GNUNET_TIME_Relative timeout,
1438                       GNUNET_DATASTORE_Iterator iter, 
1439                       void *iter_cls)
1440 {
1441   struct GNUNET_DATASTORE_QueueEntry *qe;
1442   struct GetMessage *gm;
1443   union QueueContext qc;
1444
1445 #if DEBUG_DATASTORE
1446   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1447               "Asked to look for data of type %u under key `%s'\n",
1448               (unsigned int) type,
1449               GNUNET_h2s (key));
1450 #endif
1451   qc.rc.iter = iter;
1452   qc.rc.iter_cls = iter_cls;
1453   qe = make_queue_entry (h, sizeof(struct GetMessage),
1454                          queue_priority, max_queue_size, timeout,
1455                          &process_result_message, &qc);
1456   if (qe == NULL)
1457     {
1458 #if DEBUG_DATASTORE
1459       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1460                   "Could not queue request for `%s'\n",
1461                   GNUNET_h2s (key));
1462 #endif
1463       return NULL;
1464     }
1465   GNUNET_STATISTICS_update (h->stats,
1466                             gettext_noop ("# GET requests executed"),
1467                             1,
1468                             GNUNET_NO);
1469   gm = (struct GetMessage*) &qe[1];
1470   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1471   gm->type = htonl(type);
1472   if (key != NULL)
1473     {
1474       gm->header.size = htons(sizeof (struct GetMessage));
1475       gm->key = *key;
1476     }
1477   else
1478     {
1479       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1480     }
1481   process_queue (h);
1482   return qe;
1483 }
1484
1485
1486 /**
1487  * Function called to trigger obtaining the next result
1488  * from the datastore.
1489  * 
1490  * @param h handle to the datastore
1491  * @param more GNUNET_YES to get moxre results, GNUNET_NO to abort
1492  *        iteration (with a final call to "iter" with key/data == NULL).
1493  */
1494 void 
1495 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h,
1496                            int more)
1497 {
1498   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1499   struct ResultContext rc = qe->qc.rc;
1500
1501   GNUNET_assert (&process_result_message == qe->response_proc);
1502   if (GNUNET_YES != more)
1503     {
1504       qe->qc.rc.iter = NULL;
1505       qe->qc.rc.iter_cls = NULL;
1506       if (rc.iter != NULL)
1507         rc.iter (rc.iter_cls,
1508                  NULL, 0, NULL, 0, 0, 0, 
1509                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1510     }
1511   h->in_receive = GNUNET_YES;
1512   GNUNET_CLIENT_receive (h->client,
1513                          qe->response_proc,
1514                          qe,
1515                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1516 }
1517
1518
1519 /**
1520  * Cancel a datastore operation.  The final callback from the
1521  * operation must not have been done yet.
1522  * 
1523  * @param qe operation to cancel
1524  */
1525 void
1526 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1527 {
1528   struct GNUNET_DATASTORE_Handle *h;
1529   int reconnect;
1530
1531   h = qe->h;
1532 #if DEBUG_DATASTORE
1533   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1534                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1535                qe,
1536                qe->was_transmitted,
1537                h->queue_head == qe);
1538 #endif
1539   reconnect = GNUNET_NO;
1540   if (GNUNET_YES == qe->was_transmitted) 
1541     {
1542       if (qe->response_proc == &process_result_message) 
1543         {
1544           qe->qc.rc.iter = NULL;    
1545           if (GNUNET_YES != h->in_receive)
1546             GNUNET_DATASTORE_get_next (h, GNUNET_YES);
1547         }
1548       else
1549         {
1550           qe->qc.sc.cont = NULL;
1551         }
1552       return;
1553     }
1554   free_queue_entry (qe);
1555   process_queue (h);
1556 }
1557
1558
1559 /* end of datastore_api.c */