first hack at implementing new replication select code
[oweals/gnunet.git] / src / datastore / datastore_api.c
1 /*
2      This file is part of GNUnet
3      (C) 2004, 2005, 2006, 2007, 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file datastore/datastore_api.c
23  * @brief Management for the datastore for files stored on a GNUnet node.  Implements
24  *        a priority queue for requests (with timeouts).
25  * @author Christian Grothoff
26  */
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_datastore_service.h"
31 #include "gnunet_statistics_service.h"
32 #include "datastore.h"
33
34 /**
35  * If a client stopped asking for more results, how many more do
36  * we receive from the DB before killing the connection?  Trade-off
37  * between re-doing TCP handshakes and (needlessly) receiving 
38  * useless results.
39  */
40 #define MAX_EXCESS_RESULTS 8
41
42 /**
43  * Context for processing status messages.
44  */
45 struct StatusContext
46 {
47   /**
48    * Continuation to call with the status.
49    */
50   GNUNET_DATASTORE_ContinuationWithStatus cont;
51
52   /**
53    * Closure for cont.
54    */
55   void *cont_cls;
56
57 };
58
59
60 /**
61  * Context for processing result messages.
62  */
63 struct ResultContext
64 {
65   /**
66    * Iterator to call with the result.
67    */
68   GNUNET_DATASTORE_Iterator iter;
69
70   /**
71    * Closure for iter.
72    */
73   void *iter_cls;
74
75 };
76
77
78 /**
79  *  Context for a queue operation.
80  */
81 union QueueContext
82 {
83
84   struct StatusContext sc;
85   
86   struct ResultContext rc;
87
88 };
89
90
91
92 /**
93  * Entry in our priority queue.
94  */
95 struct GNUNET_DATASTORE_QueueEntry
96 {
97
98   /**
99    * This is a linked list.
100    */
101   struct GNUNET_DATASTORE_QueueEntry *next;
102
103   /**
104    * This is a linked list.
105    */
106   struct GNUNET_DATASTORE_QueueEntry *prev;
107
108   /**
109    * Handle to the master context.
110    */
111   struct GNUNET_DATASTORE_Handle *h;
112
113   /**
114    * Response processor (NULL if we are not waiting for a response).
115    * This struct should be used for the closure, function-specific
116    * arguments can be passed via 'qc'.
117    */
118   GNUNET_CLIENT_MessageHandler response_proc;
119
120   /**
121    * Function to call after transmission of the request.
122    */
123   GNUNET_DATASTORE_ContinuationWithStatus cont;
124    
125   /**
126    * Closure for 'cont'.
127    */
128   void *cont_cls;
129
130   /**
131    * Context for the operation.
132    */
133   union QueueContext qc;
134
135   /**
136    * Task for timeout signalling.
137    */
138   GNUNET_SCHEDULER_TaskIdentifier task;
139
140   /**
141    * Timeout for the current operation.
142    */
143   struct GNUNET_TIME_Absolute timeout;
144
145   /**
146    * Priority in the queue.
147    */
148   unsigned int priority;
149
150   /**
151    * Maximum allowed length of queue (otherwise
152    * this request should be discarded).
153    */
154   unsigned int max_queue;
155
156   /**
157    * Number of bytes in the request message following
158    * this struct.  32-bit value for nicer memory
159    * access (and overall struct alignment).
160    */
161   uint32_t message_size;
162
163   /**
164    * Has this message been transmitted to the service?
165    * Only ever GNUNET_YES for the head of the queue.
166    * Note that the overall struct should end at a 
167    * multiple of 64 bits.
168    */
169   int32_t was_transmitted;
170   
171 };
172
173 /**
174  * Handle to the datastore service. 
175  */
176 struct GNUNET_DATASTORE_Handle
177 {
178
179   /**
180    * Our configuration.
181    */
182   const struct GNUNET_CONFIGURATION_Handle *cfg;
183
184
185   /**
186    * Current connection to the datastore service.
187    */
188   struct GNUNET_CLIENT_Connection *client;
189
190   /**
191    * Handle for statistics.
192    */
193   struct GNUNET_STATISTICS_Handle *stats;
194
195   /**
196    * Current transmit handle.
197    */
198   struct GNUNET_CLIENT_TransmitHandle *th;
199
200   /**
201    * Current head of priority queue.
202    */
203   struct GNUNET_DATASTORE_QueueEntry *queue_head;
204
205   /**
206    * Current tail of priority queue.
207    */
208   struct GNUNET_DATASTORE_QueueEntry *queue_tail;
209
210   /**
211    * Task for trying to reconnect.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
214
215   /**
216    * How quickly should we retry?  Used for exponential back-off on
217    * connect-errors.
218    */
219   struct GNUNET_TIME_Relative retry_time;
220
221   /**
222    * Number of entries in the queue.
223    */
224   unsigned int queue_size;
225
226   /**
227    * Number of results we're receiving for the current query
228    * after application stopped to care.  Used to determine when
229    * to reset the connection.
230    */
231   unsigned int result_count;
232
233   /**
234    * Are we currently trying to receive from the service?
235    */
236   int in_receive;
237
238 };
239
240
241
242 /**
243  * Connect to the datastore service.
244  *
245  * @param cfg configuration to use
246  * @return handle to use to access the service
247  */
248 struct GNUNET_DATASTORE_Handle *
249 GNUNET_DATASTORE_connect (const struct
250                           GNUNET_CONFIGURATION_Handle
251                           *cfg)
252 {
253   struct GNUNET_CLIENT_Connection *c;
254   struct GNUNET_DATASTORE_Handle *h;
255   
256   c = GNUNET_CLIENT_connect ("datastore", cfg);
257   if (c == NULL)
258     return NULL; /* oops */
259   h = GNUNET_malloc (sizeof(struct GNUNET_DATASTORE_Handle) + 
260                      GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
261   h->client = c;
262   h->cfg = cfg;
263   h->stats = GNUNET_STATISTICS_create ("datastore-api",
264                                        cfg);
265   return h;
266 }
267
268
269 /**
270  * Transmit DROP message to datastore service.
271  *
272  * @param cls the 'struct GNUNET_DATASTORE_Handle'
273  * @param size number of bytes that can be copied to buf
274  * @param buf where to copy the drop message
275  * @return number of bytes written to buf
276  */
277 static size_t
278 transmit_drop (void *cls,
279                size_t size, 
280                void *buf)
281 {
282   struct GNUNET_DATASTORE_Handle *h = cls;
283   struct GNUNET_MessageHeader *hdr;
284   
285   if (buf == NULL)
286     {
287       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
288                   _("Failed to transmit request to drop database.\n"));
289       GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
290       return 0;
291     }
292   GNUNET_assert (size >= sizeof(struct GNUNET_MessageHeader));
293   hdr = buf;
294   hdr->size = htons(sizeof(struct GNUNET_MessageHeader));
295   hdr->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DROP);
296   GNUNET_DATASTORE_disconnect (h, GNUNET_NO);
297   return sizeof(struct GNUNET_MessageHeader);
298 }
299
300
301 /**
302  * Disconnect from the datastore service (and free
303  * associated resources).
304  *
305  * @param h handle to the datastore
306  * @param drop set to GNUNET_YES to delete all data in datastore (!)
307  */
308 void GNUNET_DATASTORE_disconnect (struct GNUNET_DATASTORE_Handle *h,
309                                   int drop)
310 {
311   struct GNUNET_DATASTORE_QueueEntry *qe;
312
313   if (h->client != NULL)
314     {
315       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
316       h->client = NULL;
317     }
318   if (h->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
319     {
320       GNUNET_SCHEDULER_cancel (h->reconnect_task);
321       h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
322     }
323   while (NULL != (qe = h->queue_head))
324     {
325       GNUNET_assert (NULL != qe->response_proc);
326       qe->response_proc (qe, NULL);
327     }
328   if (GNUNET_YES == drop) 
329     {
330       h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
331       if (h->client != NULL)
332         {
333           if (NULL != 
334               GNUNET_CLIENT_notify_transmit_ready (h->client,
335                                                    sizeof(struct GNUNET_MessageHeader),
336                                                    GNUNET_TIME_UNIT_MINUTES,
337                                                    GNUNET_YES,
338                                                    &transmit_drop,
339                                                    h))
340             return;
341           GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
342         }
343       GNUNET_break (0);
344     }
345   GNUNET_STATISTICS_destroy (h->stats,
346                              GNUNET_NO);
347   GNUNET_free (h);
348 }
349
350
351 /**
352  * A request has timed out (before being transmitted to the service).
353  *
354  * @param cls the 'struct GNUNET_DATASTORE_QueueEntry'
355  * @param tc scheduler context
356  */
357 static void
358 timeout_queue_entry (void *cls,
359                      const struct GNUNET_SCHEDULER_TaskContext *tc)
360 {
361   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
362
363   GNUNET_STATISTICS_update (qe->h->stats,
364                             gettext_noop ("# queue entry timeouts"),
365                             1,
366                             GNUNET_NO);
367   qe->task = GNUNET_SCHEDULER_NO_TASK;
368   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
369   qe->response_proc (qe, NULL);
370 }
371
372
373 /**
374  * Create a new entry for our priority queue (and possibly discard other entires if
375  * the queue is getting too long).
376  *
377  * @param h handle to the datastore
378  * @param msize size of the message to queue
379  * @param queue_priority priority of the entry
380  * @param max_queue_size at what queue size should this request be dropped
381  *        (if other requests of higher priority are in the queue)
382  * @param timeout timeout for the operation
383  * @param response_proc function to call with replies (can be NULL)
384  * @param qc client context (NOT a closure for response_proc)
385  * @return NULL if the queue is full (and this entry was dropped)
386  */
387 static struct GNUNET_DATASTORE_QueueEntry *
388 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
389                   size_t msize,
390                   unsigned int queue_priority,
391                   unsigned int max_queue_size,
392                   struct GNUNET_TIME_Relative timeout,
393                   GNUNET_CLIENT_MessageHandler response_proc,            
394                   const union QueueContext *qc)
395 {
396   struct GNUNET_DATASTORE_QueueEntry *ret;
397   struct GNUNET_DATASTORE_QueueEntry *pos;
398   unsigned int c;
399
400   c = 0;
401   pos = h->queue_head;
402   while ( (pos != NULL) &&
403           (c < max_queue_size) &&
404           (pos->priority >= queue_priority) )
405     {
406       c++;
407       pos = pos->next;
408     }
409   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
410   ret->h = h;
411   ret->response_proc = response_proc;
412   ret->qc = *qc;
413   ret->timeout = GNUNET_TIME_relative_to_absolute (timeout);
414   ret->priority = queue_priority;
415   ret->max_queue = max_queue_size;
416   ret->message_size = msize;
417   ret->was_transmitted = GNUNET_NO;
418   if (pos == NULL)
419     {
420       /* append at the tail */
421       pos = h->queue_tail;
422     }
423   else
424     {
425       pos = pos->prev; 
426       /* do not insert at HEAD if HEAD query was already
427          transmitted and we are still receiving replies! */
428       if ( (pos == NULL) &&
429            (h->queue_head->was_transmitted) )
430         pos = h->queue_head;
431     }
432   c++;
433   GNUNET_STATISTICS_update (h->stats,
434                             gettext_noop ("# queue entries created"),
435                             1,
436                             GNUNET_NO);
437   GNUNET_CONTAINER_DLL_insert_after (h->queue_head,
438                                      h->queue_tail,
439                                      pos,
440                                      ret);
441   h->queue_size++;
442   if (c > max_queue_size)
443     {
444       GNUNET_STATISTICS_update (h->stats,
445                                 gettext_noop ("# queue overflows"),
446                                 1,
447                                 GNUNET_NO);
448       response_proc (ret, NULL);
449       return NULL;
450     }
451   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
452                                             &timeout_queue_entry,
453                                             ret);
454   pos = ret->next;
455   while (pos != NULL) 
456     {
457       if (pos->max_queue < h->queue_size)
458         {
459           GNUNET_assert (pos->response_proc != NULL);
460           pos->response_proc (pos, NULL);
461           break;
462         }
463       pos = pos->next;
464     }
465   return ret;
466 }
467
468
469 /**
470  * Process entries in the queue (or do nothing if we are already
471  * doing so).
472  * 
473  * @param h handle to the datastore
474  */
475 static void
476 process_queue (struct GNUNET_DATASTORE_Handle *h);
477
478
479 /**
480  * Try reconnecting to the datastore service.
481  *
482  * @param cls the 'struct GNUNET_DATASTORE_Handle'
483  * @param tc scheduler context
484  */
485 static void
486 try_reconnect (void *cls,
487                const struct GNUNET_SCHEDULER_TaskContext *tc)
488 {
489   struct GNUNET_DATASTORE_Handle *h = cls;
490
491   if (h->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
492     h->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
493   else
494     h->retry_time = GNUNET_TIME_relative_multiply (h->retry_time, 2);
495   if (h->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
496     h->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
497   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
498   h->client = GNUNET_CLIENT_connect ("datastore", h->cfg);
499   if (h->client == NULL)
500     {
501       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
502                   "DATASTORE reconnect failed (fatally)\n");
503       return;
504     }
505   GNUNET_STATISTICS_update (h->stats,
506                             gettext_noop ("# datastore connections (re)created"),
507                             1,
508                             GNUNET_NO);
509 #if DEBUG_DATASTORE
510   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
511               "Reconnected to DATASTORE\n");
512 #endif
513   process_queue (h);
514 }
515
516
517 /**
518  * Disconnect from the service and then try reconnecting to the datastore service
519  * after some delay.
520  *
521  * @param h handle to datastore to disconnect and reconnect
522  */
523 static void
524 do_disconnect (struct GNUNET_DATASTORE_Handle *h)
525 {
526   if (h->client == NULL)
527     {
528 #if DEBUG_DATASTORE
529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
530                   "client NULL in disconnect, will not try to reconnect\n");
531 #endif
532       return;
533     }
534 #if 0
535   GNUNET_STATISTICS_update (stats,
536                             gettext_noop ("# reconnected to DATASTORE"),
537                             1,
538                             GNUNET_NO);
539 #endif
540   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
541   h->client = NULL;
542   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
543                                                     &try_reconnect,
544                                                     h);      
545 }
546
547
548 /**
549  * Transmit request from queue to datastore service.
550  *
551  * @param cls the 'struct GNUNET_DATASTORE_Handle'
552  * @param size number of bytes that can be copied to buf
553  * @param buf where to copy the drop message
554  * @return number of bytes written to buf
555  */
556 static size_t
557 transmit_request (void *cls,
558                   size_t size, 
559                   void *buf)
560 {
561   struct GNUNET_DATASTORE_Handle *h = cls;
562   struct GNUNET_DATASTORE_QueueEntry *qe;
563   size_t msize;
564
565   h->th = NULL;
566   if (NULL == (qe = h->queue_head))
567     return 0; /* no entry in queue */
568   if (buf == NULL)
569     {
570       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
571                   _("Failed to transmit request to DATASTORE.\n"));
572       GNUNET_STATISTICS_update (h->stats,
573                                 gettext_noop ("# transmission request failures"),
574                                 1,
575                                 GNUNET_NO);
576       do_disconnect (h);
577       return 0;
578     }
579   if (size < (msize = qe->message_size))
580     {
581       process_queue (h);
582       return 0;
583     }
584  #if DEBUG_DATASTORE
585   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
586               "Transmitting %u byte request to DATASTORE\n",
587               msize);
588 #endif
589   memcpy (buf, &qe[1], msize);
590   qe->was_transmitted = GNUNET_YES;
591   GNUNET_SCHEDULER_cancel (qe->task);
592   qe->task = GNUNET_SCHEDULER_NO_TASK;
593   h->in_receive = GNUNET_YES;
594   GNUNET_CLIENT_receive (h->client,
595                          qe->response_proc,
596                          qe,
597                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
598   GNUNET_STATISTICS_update (h->stats,
599                             gettext_noop ("# bytes sent to datastore"),
600                             1,
601                             GNUNET_NO);
602   return msize;
603 }
604
605
606 /**
607  * Process entries in the queue (or do nothing if we are already
608  * doing so).
609  * 
610  * @param h handle to the datastore
611  */
612 static void
613 process_queue (struct GNUNET_DATASTORE_Handle *h)
614 {
615   struct GNUNET_DATASTORE_QueueEntry *qe;
616
617   if (NULL == (qe = h->queue_head))
618     {
619 #if DEBUG_DATASTORE > 1
620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621                   "Queue empty\n");
622 #endif
623       return; /* no entry in queue */
624     }
625   if (qe->was_transmitted == GNUNET_YES)
626     {
627 #if DEBUG_DATASTORE > 1
628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
629                   "Head request already transmitted\n");
630 #endif
631       return; /* waiting for replies */
632     }
633   if (h->th != NULL)
634     {
635 #if DEBUG_DATASTORE > 1
636       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
637                   "Pending transmission request\n");
638 #endif
639       return; /* request pending */
640     }
641   if (h->client == NULL)
642     {
643 #if DEBUG_DATASTORE > 1
644       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
645                   "Not connected\n");
646 #endif
647       return; /* waiting for reconnect */
648     }
649 #if DEBUG_DATASTORE
650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
651               "Queueing %u byte request to DATASTORE\n",
652               qe->message_size);
653 #endif
654   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
655                                                qe->message_size,
656                                                GNUNET_TIME_absolute_get_remaining (qe->timeout),
657                                                GNUNET_YES,
658                                                &transmit_request,
659                                                h);
660 }
661
662
663 /**
664  * Dummy continuation used to do nothing (but be non-zero).
665  *
666  * @param cls closure
667  * @param result result 
668  * @param emsg error message
669  */
670 static void
671 drop_status_cont (void *cls, int result, const char *emsg)
672 {
673   /* do nothing */
674 }
675
676
677 static void
678 free_queue_entry (struct GNUNET_DATASTORE_QueueEntry *qe)
679 {
680   struct GNUNET_DATASTORE_Handle *h = qe->h;
681
682   GNUNET_CONTAINER_DLL_remove (h->queue_head,
683                                h->queue_tail,
684                                qe);
685   if (qe->task != GNUNET_SCHEDULER_NO_TASK)
686     {
687       GNUNET_SCHEDULER_cancel (qe->task);
688       qe->task = GNUNET_SCHEDULER_NO_TASK;
689     }
690   h->queue_size--;
691   GNUNET_free (qe);
692 }
693
694 /**
695  * Type of a function to call when we receive a message
696  * from the service.
697  *
698  * @param cls closure
699  * @param msg message received, NULL on timeout or fatal error
700  */
701 static void 
702 process_status_message (void *cls,
703                         const struct
704                         GNUNET_MessageHeader * msg)
705 {
706   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
707   struct GNUNET_DATASTORE_Handle *h = qe->h;
708   struct StatusContext rc = qe->qc.sc;
709   const struct StatusMessage *sm;
710   const char *emsg;
711   int32_t status;
712   int was_transmitted;
713
714   h->in_receive = GNUNET_NO;
715   was_transmitted = qe->was_transmitted;
716   if (msg == NULL)
717     {      
718       free_queue_entry (qe);
719       if (NULL == h->client)
720         return; /* forced disconnect */
721       if (rc.cont != NULL)
722         rc.cont (rc.cont_cls, 
723                  GNUNET_SYSERR,
724                  _("Failed to receive status response from database."));
725       if (was_transmitted == GNUNET_YES)
726         do_disconnect (h);
727       return;
728     }
729   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
730   GNUNET_assert (h->queue_head == qe);
731   free_queue_entry (qe);
732   if ( (ntohs(msg->size) < sizeof(struct StatusMessage)) ||
733        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_STATUS) ) 
734     {
735       GNUNET_break (0);
736       h->retry_time = GNUNET_TIME_UNIT_ZERO;
737       do_disconnect (h);
738       if (rc.cont != NULL)
739         rc.cont (rc.cont_cls, 
740                  GNUNET_SYSERR,
741                  _("Error reading response from datastore service"));
742       return;
743     }
744   sm = (const struct StatusMessage*) msg;
745   status = ntohl(sm->status);
746   emsg = NULL;
747   if (ntohs(msg->size) > sizeof(struct StatusMessage))
748     {
749       emsg = (const char*) &sm[1];
750       if (emsg[ntohs(msg->size) - sizeof(struct StatusMessage) - 1] != '\0')
751         {
752           GNUNET_break (0);
753           emsg = _("Invalid error message received from datastore service");
754         }
755     }  
756   if ( (status == GNUNET_SYSERR) &&
757        (emsg == NULL) )
758     {
759       GNUNET_break (0);
760       emsg = _("Invalid error message received from datastore service");
761     }
762 #if DEBUG_DATASTORE
763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
764               "Received status %d/%s\n",
765               (int) status,
766               emsg);
767 #endif
768   GNUNET_STATISTICS_update (h->stats,
769                             gettext_noop ("# status messages received"),
770                             1,
771                             GNUNET_NO);
772   h->retry_time.rel_value = 0;
773   process_queue (h);
774   if (rc.cont != NULL)
775     rc.cont (rc.cont_cls, 
776              status,
777              emsg);
778 }
779
780
781 /**
782  * Store an item in the datastore.  If the item is already present,
783  * the priorities are summed up and the higher expiration time and
784  * lower anonymity level is used.
785  *
786  * @param h handle to the datastore
787  * @param rid reservation ID to use (from "reserve"); use 0 if no
788  *            prior reservation was made
789  * @param key key for the value
790  * @param size number of bytes in data
791  * @param data content stored
792  * @param type type of the content
793  * @param priority priority of the content
794  * @param anonymity anonymity-level for the content
795  * @param replication how often should the content be replicated to other peers?
796  * @param expiration expiration time for the content
797  * @param queue_priority ranking of this request in the priority queue
798  * @param max_queue_size at what queue size should this request be dropped
799  *        (if other requests of higher priority are in the queue)
800  * @param timeout timeout for the operation
801  * @param cont continuation to call when done
802  * @param cont_cls closure for cont
803  * @return NULL if the entry was not queued, otherwise a handle that can be used to
804  *         cancel; note that even if NULL is returned, the callback will be invoked
805  *         (or rather, will already have been invoked)
806  */
807 struct GNUNET_DATASTORE_QueueEntry *
808 GNUNET_DATASTORE_put (struct GNUNET_DATASTORE_Handle *h,
809                       int rid,
810                       const GNUNET_HashCode * key,
811                       size_t size,
812                       const void *data,
813                       enum GNUNET_BLOCK_Type type,
814                       uint32_t priority,
815                       uint32_t anonymity,
816                       uint32_t replication,
817                       struct GNUNET_TIME_Absolute expiration,
818                       unsigned int queue_priority,
819                       unsigned int max_queue_size,
820                       struct GNUNET_TIME_Relative timeout,
821                       GNUNET_DATASTORE_ContinuationWithStatus cont,
822                       void *cont_cls)
823 {
824   struct GNUNET_DATASTORE_QueueEntry *qe;
825   struct DataMessage *dm;
826   size_t msize;
827   union QueueContext qc;
828
829 #if DEBUG_DATASTORE
830   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
831               "Asked to put %u bytes of data under key `%s'\n",
832               size,
833               GNUNET_h2s (key));
834 #endif
835   msize = sizeof(struct DataMessage) + size;
836   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
837   qc.sc.cont = cont;
838   qc.sc.cont_cls = cont_cls;
839   qe = make_queue_entry (h, msize,
840                          queue_priority, max_queue_size, timeout,
841                          &process_status_message, &qc);
842   if (qe == NULL)
843     {
844 #if DEBUG_DATASTORE
845       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
846                   "Could not create queue entry for PUT\n");
847 #endif
848       return NULL;
849     }
850   GNUNET_STATISTICS_update (h->stats,
851                             gettext_noop ("# PUT requests executed"),
852                             1,
853                             GNUNET_NO);
854   dm = (struct DataMessage* ) &qe[1];
855   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_PUT);
856   dm->header.size = htons(msize);
857   dm->rid = htonl(rid);
858   dm->size = htonl( (uint32_t) size);
859   dm->type = htonl(type);
860   dm->priority = htonl(priority);
861   dm->anonymity = htonl(anonymity);
862   dm->uid = GNUNET_htonll(0);
863   dm->expiration = GNUNET_TIME_absolute_hton(expiration);
864   dm->key = *key;
865   memcpy (&dm[1], data, size);
866   process_queue (h);
867   return qe;
868 }
869
870
871 /**
872  * Reserve space in the datastore.  This function should be used
873  * to avoid "out of space" failures during a longer sequence of "put"
874  * operations (for example, when a file is being inserted).
875  *
876  * @param h handle to the datastore
877  * @param amount how much space (in bytes) should be reserved (for content only)
878  * @param entries how many entries will be created (to calculate per-entry overhead)
879  * @param queue_priority ranking of this request in the priority queue
880  * @param max_queue_size at what queue size should this request be dropped
881  *        (if other requests of higher priority are in the queue)
882  * @param timeout how long to wait at most for a response (or before dying in queue)
883  * @param cont continuation to call when done; "success" will be set to
884  *             a positive reservation value if space could be reserved.
885  * @param cont_cls closure for cont
886  * @return NULL if the entry was not queued, otherwise a handle that can be used to
887  *         cancel; note that even if NULL is returned, the callback will be invoked
888  *         (or rather, will already have been invoked)
889  */
890 struct GNUNET_DATASTORE_QueueEntry *
891 GNUNET_DATASTORE_reserve (struct GNUNET_DATASTORE_Handle *h,
892                           uint64_t amount,
893                           uint32_t entries,
894                           unsigned int queue_priority,
895                           unsigned int max_queue_size,
896                           struct GNUNET_TIME_Relative timeout,
897                           GNUNET_DATASTORE_ContinuationWithStatus cont,
898                           void *cont_cls)
899 {
900   struct GNUNET_DATASTORE_QueueEntry *qe;
901   struct ReserveMessage *rm;
902   union QueueContext qc;
903
904   if (cont == NULL)
905     cont = &drop_status_cont;
906 #if DEBUG_DATASTORE
907   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
908               "Asked to reserve %llu bytes of data and %u entries'\n",
909               (unsigned long long) amount,
910               (unsigned int) entries);
911 #endif
912   qc.sc.cont = cont;
913   qc.sc.cont_cls = cont_cls;
914   qe = make_queue_entry (h, sizeof(struct ReserveMessage),
915                          queue_priority, max_queue_size, timeout,
916                          &process_status_message, &qc);
917   if (qe == NULL)
918     {
919 #if DEBUG_DATASTORE
920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921                   "Could not create queue entry to reserve\n");
922 #endif
923       return NULL;
924     }
925   GNUNET_STATISTICS_update (h->stats,
926                             gettext_noop ("# RESERVE requests executed"),
927                             1,
928                             GNUNET_NO);
929   rm = (struct ReserveMessage*) &qe[1];
930   rm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE);
931   rm->header.size = htons(sizeof (struct ReserveMessage));
932   rm->entries = htonl(entries);
933   rm->amount = GNUNET_htonll(amount);
934   process_queue (h);
935   return qe;
936 }
937
938
939 /**
940  * Signal that all of the data for which a reservation was made has
941  * been stored and that whatever excess space might have been reserved
942  * can now be released.
943  *
944  * @param h handle to the datastore
945  * @param rid reservation ID (value of "success" in original continuation
946  *        from the "reserve" function).
947  * @param queue_priority ranking of this request in the priority queue
948  * @param max_queue_size at what queue size should this request be dropped
949  *        (if other requests of higher priority are in the queue)
950  * @param queue_priority ranking of this request in the priority queue
951  * @param max_queue_size at what queue size should this request be dropped
952  *        (if other requests of higher priority are in the queue)
953  * @param timeout how long to wait at most for a response
954  * @param cont continuation to call when done
955  * @param cont_cls closure for cont
956  * @return NULL if the entry was not queued, otherwise a handle that can be used to
957  *         cancel; note that even if NULL is returned, the callback will be invoked
958  *         (or rather, will already have been invoked)
959  */
960 struct GNUNET_DATASTORE_QueueEntry *
961 GNUNET_DATASTORE_release_reserve (struct GNUNET_DATASTORE_Handle *h,
962                                   int rid,
963                                   unsigned int queue_priority,
964                                   unsigned int max_queue_size,
965                                   struct GNUNET_TIME_Relative timeout,
966                                   GNUNET_DATASTORE_ContinuationWithStatus cont,
967                                   void *cont_cls)
968 {
969   struct GNUNET_DATASTORE_QueueEntry *qe;
970   struct ReleaseReserveMessage *rrm;
971   union QueueContext qc;
972
973   if (cont == NULL)
974     cont = &drop_status_cont;
975 #if DEBUG_DATASTORE
976   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977               "Asked to release reserve %d\n",
978               rid);
979 #endif
980   qc.sc.cont = cont;
981   qc.sc.cont_cls = cont_cls;
982   qe = make_queue_entry (h, sizeof(struct ReleaseReserveMessage),
983                          queue_priority, max_queue_size, timeout,
984                          &process_status_message, &qc);
985   if (qe == NULL)
986     {
987 #if DEBUG_DATASTORE
988       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
989                   "Could not create queue entry to release reserve\n");
990 #endif
991       return NULL;
992     }
993   GNUNET_STATISTICS_update (h->stats,
994                             gettext_noop ("# RELEASE RESERVE requests executed"),
995                             1,
996                             GNUNET_NO);
997   rrm = (struct ReleaseReserveMessage*) &qe[1];
998   rrm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE);
999   rrm->header.size = htons(sizeof (struct ReleaseReserveMessage));
1000   rrm->rid = htonl(rid);
1001   process_queue (h);
1002   return qe;
1003 }
1004
1005
1006 /**
1007  * Update a value in the datastore.
1008  *
1009  * @param h handle to the datastore
1010  * @param uid identifier for the value
1011  * @param priority how much to increase the priority of the value
1012  * @param expiration new expiration value should be MAX of existing and this argument
1013  * @param queue_priority ranking of this request in the priority queue
1014  * @param max_queue_size at what queue size should this request be dropped
1015  *        (if other requests of higher priority are in the queue)
1016  * @param timeout how long to wait at most for a response
1017  * @param cont continuation to call when done
1018  * @param cont_cls closure for cont
1019  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1020  *         cancel; note that even if NULL is returned, the callback will be invoked
1021  *         (or rather, will already have been invoked)
1022  */
1023 struct GNUNET_DATASTORE_QueueEntry *
1024 GNUNET_DATASTORE_update (struct GNUNET_DATASTORE_Handle *h,
1025                          unsigned long long uid,
1026                          uint32_t priority,
1027                          struct GNUNET_TIME_Absolute expiration,
1028                          unsigned int queue_priority,
1029                          unsigned int max_queue_size,
1030                          struct GNUNET_TIME_Relative timeout,
1031                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1032                          void *cont_cls)
1033 {
1034   struct GNUNET_DATASTORE_QueueEntry *qe;
1035   struct UpdateMessage *um;
1036   union QueueContext qc;
1037
1038   if (cont == NULL)
1039     cont = &drop_status_cont;
1040 #if DEBUG_DATASTORE
1041   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1042               "Asked to update entry %llu raising priority by %u and expiration to %llu\n",
1043               uid,
1044               (unsigned int) priority,
1045               (unsigned long long) expiration.abs_value);
1046 #endif
1047   qc.sc.cont = cont;
1048   qc.sc.cont_cls = cont_cls;
1049   qe = make_queue_entry (h, sizeof(struct UpdateMessage),
1050                          queue_priority, max_queue_size, timeout,
1051                          &process_status_message, &qc);
1052   if (qe == NULL)
1053     {
1054 #if DEBUG_DATASTORE
1055       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1056                   "Could not create queue entry for UPDATE\n");
1057 #endif
1058       return NULL;
1059     }
1060   GNUNET_STATISTICS_update (h->stats,
1061                             gettext_noop ("# UPDATE requests executed"),
1062                             1,
1063                             GNUNET_NO);
1064   um = (struct UpdateMessage*) &qe[1];
1065   um->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_UPDATE);
1066   um->header.size = htons(sizeof (struct UpdateMessage));
1067   um->priority = htonl(priority);
1068   um->expiration = GNUNET_TIME_absolute_hton(expiration);
1069   um->uid = GNUNET_htonll(uid);
1070   process_queue (h);
1071   return qe;
1072 }
1073
1074
1075 /**
1076  * Explicitly remove some content from the database.
1077  * The "cont"inuation will be called with status
1078  * "GNUNET_OK" if content was removed, "GNUNET_NO"
1079  * if no matching entry was found and "GNUNET_SYSERR"
1080  * on all other types of errors.
1081  *
1082  * @param h handle to the datastore
1083  * @param key key for the value
1084  * @param size number of bytes in data
1085  * @param data content stored
1086  * @param queue_priority ranking of this request in the priority queue
1087  * @param max_queue_size at what queue size should this request be dropped
1088  *        (if other requests of higher priority are in the queue)
1089  * @param timeout how long to wait at most for a response
1090  * @param cont continuation to call when done
1091  * @param cont_cls closure for cont
1092  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1093  *         cancel; note that even if NULL is returned, the callback will be invoked
1094  *         (or rather, will already have been invoked)
1095  */
1096 struct GNUNET_DATASTORE_QueueEntry *
1097 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
1098                          const GNUNET_HashCode *key,
1099                          size_t size, 
1100                          const void *data,
1101                          unsigned int queue_priority,
1102                          unsigned int max_queue_size,
1103                          struct GNUNET_TIME_Relative timeout,
1104                          GNUNET_DATASTORE_ContinuationWithStatus cont,
1105                          void *cont_cls)
1106 {
1107   struct GNUNET_DATASTORE_QueueEntry *qe;
1108   struct DataMessage *dm;
1109   size_t msize;
1110   union QueueContext qc;
1111
1112   if (cont == NULL)
1113     cont = &drop_status_cont;
1114 #if DEBUG_DATASTORE
1115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116               "Asked to remove %u bytes under key `%s'\n",
1117               size,
1118               GNUNET_h2s (key));
1119 #endif
1120   qc.sc.cont = cont;
1121   qc.sc.cont_cls = cont_cls;
1122   msize = sizeof(struct DataMessage) + size;
1123   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1124   qe = make_queue_entry (h, msize,
1125                          queue_priority, max_queue_size, timeout,
1126                          &process_status_message, &qc);
1127   if (qe == NULL)
1128     {
1129 #if DEBUG_DATASTORE
1130       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1131                   "Could not create queue entry for REMOVE\n");
1132 #endif
1133       return NULL;
1134     }
1135   GNUNET_STATISTICS_update (h->stats,
1136                             gettext_noop ("# REMOVE requests executed"),
1137                             1,
1138                             GNUNET_NO);
1139   dm = (struct DataMessage*) &qe[1];
1140   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE);
1141   dm->header.size = htons(msize);
1142   dm->rid = htonl(0);
1143   dm->size = htonl(size);
1144   dm->type = htonl(0);
1145   dm->priority = htonl(0);
1146   dm->anonymity = htonl(0);
1147   dm->uid = GNUNET_htonll(0);
1148   dm->expiration = GNUNET_TIME_absolute_hton(GNUNET_TIME_UNIT_ZERO_ABS);
1149   dm->key = *key;
1150   memcpy (&dm[1], data, size);
1151   process_queue (h);
1152   return qe;
1153 }
1154
1155
1156 /**
1157  * Type of a function to call when we receive a message
1158  * from the service.
1159  *
1160  * @param cls closure
1161  * @param msg message received, NULL on timeout or fatal error
1162  */
1163 static void 
1164 process_result_message (void *cls,
1165                         const struct GNUNET_MessageHeader * msg)
1166 {
1167   struct GNUNET_DATASTORE_QueueEntry *qe = cls;
1168   struct GNUNET_DATASTORE_Handle *h = qe->h;
1169   struct ResultContext rc = qe->qc.rc;
1170   const struct DataMessage *dm;
1171   int was_transmitted;
1172
1173   h->in_receive = GNUNET_NO;
1174   if (msg == NULL)
1175    {
1176       was_transmitted = qe->was_transmitted;
1177       free_queue_entry (qe);
1178       if (was_transmitted == GNUNET_YES)
1179         {
1180           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1181                       _("Failed to receive response from database.\n"));
1182           do_disconnect (h);
1183         }
1184       else
1185         {
1186 #if DEBUG_DATASTORE
1187           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1188                       "Request dropped due to finite datastore queue length.\n");
1189 #endif
1190         }
1191       if (rc.iter != NULL)
1192         rc.iter (rc.iter_cls,
1193                  NULL, 0, NULL, 0, 0, 0, 
1194                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1195       return;
1196     }
1197   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
1198   GNUNET_assert (h->queue_head == qe);
1199   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
1200     {
1201       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
1202       free_queue_entry (qe);
1203 #if DEBUG_DATASTORE
1204       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1205                   "Received end of result set, new queue size is %u\n",
1206                   h->queue_size);
1207 #endif
1208       if (rc.iter != NULL)
1209         rc.iter (rc.iter_cls,
1210                  NULL, 0, NULL, 0, 0, 0, 
1211                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1212       h->retry_time.rel_value = 0;
1213       h->result_count = 0;
1214       process_queue (h);
1215       return;
1216     }
1217   if ( (ntohs(msg->size) < sizeof(struct DataMessage)) ||
1218        (ntohs(msg->type) != GNUNET_MESSAGE_TYPE_DATASTORE_DATA) ||
1219        (ntohs(msg->size) != sizeof(struct DataMessage) + ntohl (((const struct DataMessage*)msg)->size)) )
1220     {
1221       GNUNET_break (0);
1222       free_queue_entry (qe);
1223       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1224       do_disconnect (h);
1225       if (rc.iter != NULL)
1226         rc.iter (rc.iter_cls,
1227                  NULL, 0, NULL, 0, 0, 0, 
1228                  GNUNET_TIME_UNIT_ZERO_ABS, 0); 
1229       return;
1230     }
1231   GNUNET_STATISTICS_update (h->stats,
1232                             gettext_noop ("# Results received"),
1233                             1,
1234                             GNUNET_NO);
1235   if (rc.iter == NULL)
1236     {
1237       h->result_count++;
1238       GNUNET_STATISTICS_update (h->stats,
1239                                 gettext_noop ("# Excess results received"),
1240                                 1,
1241                                 GNUNET_NO);
1242       if (h->result_count > MAX_EXCESS_RESULTS)
1243         {
1244           free_queue_entry (qe);
1245           GNUNET_STATISTICS_update (h->stats,
1246                                     gettext_noop ("# Forced database connection resets"),
1247                                     1,
1248                                     GNUNET_NO);
1249           h->retry_time = GNUNET_TIME_UNIT_ZERO;
1250           do_disconnect (h);      
1251           return;
1252         }
1253       GNUNET_DATASTORE_get_next (h);
1254       return;
1255     }
1256   dm = (const struct DataMessage*) msg;
1257 #if DEBUG_DATASTORE
1258   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259               "Received result %llu with type %u and size %u with key %s\n",
1260               (unsigned long long) GNUNET_ntohll(dm->uid),
1261               ntohl(dm->type),
1262               ntohl(dm->size),
1263               GNUNET_h2s(&dm->key));
1264 #endif
1265   h->retry_time.rel_value = 0;
1266   rc.iter (rc.iter_cls,
1267            &dm->key,
1268            ntohl(dm->size),
1269            &dm[1],
1270            ntohl(dm->type),
1271            ntohl(dm->priority),
1272            ntohl(dm->anonymity),
1273            GNUNET_TIME_absolute_ntoh(dm->expiration),   
1274            GNUNET_ntohll(dm->uid));
1275 }
1276
1277
1278 /**
1279  * Get a random value from the datastore for content replication.
1280  * Returns a single, random value among those with the highest
1281  * replication score, lowering positive replication scores by one for
1282  * the chosen value (if only content with a replication score exists,
1283  * a random value is returned and replication scores are not changed).
1284  *
1285  * @param h handle to the datastore
1286  * @param queue_priority ranking of this request in the priority queue
1287  * @param max_queue_size at what queue size should this request be dropped
1288  *        (if other requests of higher priority are in the queue)
1289  * @param timeout how long to wait at most for a response
1290  * @param iter function to call on a random value; it
1291  *        will be called once with a value (if available)
1292  *        and always once with a value of NULL.
1293  * @param iter_cls closure for iter
1294  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1295  *         cancel; note that even if NULL is returned, the callback will be invoked
1296  *         (or rather, will already have been invoked)
1297  */
1298 struct GNUNET_DATASTORE_QueueEntry *
1299 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
1300                                       unsigned int queue_priority,
1301                                       unsigned int max_queue_size,
1302                                       struct GNUNET_TIME_Relative timeout,
1303                                       GNUNET_DATASTORE_Iterator iter, 
1304                                       void *iter_cls)
1305 {
1306   struct GNUNET_DATASTORE_QueueEntry *qe;
1307   struct GNUNET_MessageHeader *m;
1308   union QueueContext qc;
1309
1310 #if DEBUG_DATASTORE
1311   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1312               "Asked to get random entry in %llu ms\n",
1313               (unsigned long long) timeout.rel_value);
1314 #endif
1315   qc.rc.iter = iter;
1316   qc.rc.iter_cls = iter_cls;
1317   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
1318                          queue_priority, max_queue_size, timeout,
1319                          &process_result_message, &qc);
1320   if (qe == NULL)
1321     {
1322 #if DEBUG_DATASTORE
1323       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1324                   "Could not create queue entry for GET RANDOM\n");
1325 #endif
1326       return NULL;    
1327     }
1328   GNUNET_STATISTICS_update (h->stats,
1329                             gettext_noop ("# GET RANDOM requests executed"),
1330                             1,
1331                             GNUNET_NO);
1332   m = (struct GNUNET_MessageHeader*) &qe[1];
1333   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
1334   m->size = htons(sizeof (struct GNUNET_MessageHeader));
1335   process_queue (h);
1336   return qe;
1337 }
1338
1339
1340 /**
1341  * Get a zero-anonymity value from the datastore.
1342  *
1343  * @param h handle to the datastore
1344  * @param queue_priority ranking of this request in the priority queue
1345  * @param max_queue_size at what queue size should this request be dropped
1346  *        (if other requests of higher priority are in the queue)
1347  * @param timeout how long to wait at most for a response
1348  * @param type allowed type for the operation
1349  * @param iter function to call on a random value; it
1350  *        will be called once with a value (if available)
1351  *        and always once with a value of NULL.
1352  * @param iter_cls closure for iter
1353  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1354  *         cancel; note that even if NULL is returned, the callback will be invoked
1355  *         (or rather, will already have been invoked)
1356  */
1357 struct GNUNET_DATASTORE_QueueEntry *
1358 GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
1359                                      unsigned int queue_priority,
1360                                      unsigned int max_queue_size,
1361                                      struct GNUNET_TIME_Relative timeout,
1362                                      enum GNUNET_BLOCK_Type type,
1363                                      GNUNET_DATASTORE_Iterator iter, 
1364                                      void *iter_cls)
1365 {
1366   struct GNUNET_DATASTORE_QueueEntry *qe;
1367   struct GetZeroAnonymityMessage *m;
1368   union QueueContext qc;
1369
1370 #if DEBUG_DATASTORE
1371   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372               "Asked to get zero-anonymity entry in %llu ms\n",
1373               (unsigned long long) timeout.rel_value);
1374 #endif
1375   qc.rc.iter = iter;
1376   qc.rc.iter_cls = iter_cls;
1377   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
1378                          queue_priority, max_queue_size, timeout,
1379                          &process_result_message, &qc);
1380   if (qe == NULL)
1381     {
1382 #if DEBUG_DATASTORE
1383       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1384                   "Could not create queue entry for zero-anonymity iteration\n");
1385 #endif
1386       return NULL;    
1387     }
1388   GNUNET_STATISTICS_update (h->stats,
1389                             gettext_noop ("# GET ZERO ANONYMITY requests executed"),
1390                             1,
1391                             GNUNET_NO);
1392   m = (struct GetZeroAnonymityMessage*) &qe[1];
1393   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
1394   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
1395   m->type = htonl ((uint32_t) type);
1396   process_queue (h);
1397   return qe;
1398 }
1399
1400
1401
1402 /**
1403  * Iterate over the results for a particular key
1404  * in the datastore.  The iterator will only be called
1405  * once initially; if the first call did contain a
1406  * result, further results can be obtained by calling
1407  * "GNUNET_DATASTORE_get_next" with the given argument.
1408  *
1409  * @param h handle to the datastore
1410  * @param key maybe NULL (to match all entries)
1411  * @param type desired type, 0 for any
1412  * @param queue_priority ranking of this request in the priority queue
1413  * @param max_queue_size at what queue size should this request be dropped
1414  *        (if other requests of higher priority are in the queue)
1415  * @param timeout how long to wait at most for a response
1416  * @param iter function to call on each matching value;
1417  *        will be called once with a NULL value at the end
1418  * @param iter_cls closure for iter
1419  * @return NULL if the entry was not queued, otherwise a handle that can be used to
1420  *         cancel; note that even if NULL is returned, the callback will be invoked
1421  *         (or rather, will already have been invoked)
1422  */
1423 struct GNUNET_DATASTORE_QueueEntry *
1424 GNUNET_DATASTORE_get (struct GNUNET_DATASTORE_Handle *h,
1425                       const GNUNET_HashCode * key,
1426                       enum GNUNET_BLOCK_Type type,
1427                       unsigned int queue_priority,
1428                       unsigned int max_queue_size,
1429                       struct GNUNET_TIME_Relative timeout,
1430                       GNUNET_DATASTORE_Iterator iter, 
1431                       void *iter_cls)
1432 {
1433   struct GNUNET_DATASTORE_QueueEntry *qe;
1434   struct GetMessage *gm;
1435   union QueueContext qc;
1436
1437 #if DEBUG_DATASTORE
1438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1439               "Asked to look for data of type %u under key `%s'\n",
1440               (unsigned int) type,
1441               GNUNET_h2s (key));
1442 #endif
1443   qc.rc.iter = iter;
1444   qc.rc.iter_cls = iter_cls;
1445   qe = make_queue_entry (h, sizeof(struct GetMessage),
1446                          queue_priority, max_queue_size, timeout,
1447                          &process_result_message, &qc);
1448   if (qe == NULL)
1449     {
1450 #if DEBUG_DATASTORE
1451       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1452                   "Could not queue request for `%s'\n",
1453                   GNUNET_h2s (key));
1454 #endif
1455       return NULL;
1456     }
1457   GNUNET_STATISTICS_update (h->stats,
1458                             gettext_noop ("# GET requests executed"),
1459                             1,
1460                             GNUNET_NO);
1461   gm = (struct GetMessage*) &qe[1];
1462   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
1463   gm->type = htonl(type);
1464   if (key != NULL)
1465     {
1466       gm->header.size = htons(sizeof (struct GetMessage));
1467       gm->key = *key;
1468     }
1469   else
1470     {
1471       gm->header.size = htons(sizeof (struct GetMessage) - sizeof(GNUNET_HashCode));
1472     }
1473   process_queue (h);
1474   return qe;
1475 }
1476
1477
1478 /**
1479  * Function called to trigger obtaining the next result
1480  * from the datastore.
1481  * 
1482  * @param h handle to the datastore
1483  */
1484 void 
1485 GNUNET_DATASTORE_get_next (struct GNUNET_DATASTORE_Handle *h)
1486 {
1487   struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
1488
1489   GNUNET_assert (&process_result_message == qe->response_proc);
1490   h->in_receive = GNUNET_YES;
1491   GNUNET_CLIENT_receive (h->client,
1492                          qe->response_proc,
1493                          qe,
1494                          GNUNET_TIME_absolute_get_remaining (qe->timeout));
1495 }
1496
1497
1498 /**
1499  * Cancel a datastore operation.  The final callback from the
1500  * operation must not have been done yet.
1501  * 
1502  * @param qe operation to cancel
1503  */
1504 void
1505 GNUNET_DATASTORE_cancel (struct GNUNET_DATASTORE_QueueEntry *qe)
1506 {
1507   struct GNUNET_DATASTORE_Handle *h;
1508
1509   h = qe->h;
1510 #if DEBUG_DATASTORE
1511   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
1512                "Pending DATASTORE request %p cancelled (%d, %d)\n",
1513                qe,
1514                qe->was_transmitted,
1515                h->queue_head == qe);
1516 #endif
1517   if (GNUNET_YES == qe->was_transmitted) 
1518     {
1519       free_queue_entry (qe);
1520       h->retry_time = GNUNET_TIME_UNIT_ZERO;
1521       do_disconnect (h);
1522       return;
1523     }
1524   free_queue_entry (qe);
1525   process_queue (h);
1526 }
1527
1528
1529 /* end of datastore_api.c */